aquarium_control/recorder/data_logger.rs
1/* Copyright 2024 Uwe Martin
2
3Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
4
5The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
6
7THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
8*/
9
10#![allow(non_snake_case)]
11use chrono::{Local, NaiveDateTime};
12#[cfg(all(not(test), target_os = "linux"))]
13use log::info;
14use log::{error, warn};
15
16#[cfg(feature = "debug_data_logger")]
17use log::debug;
18
19#[cfg(all(not(test), target_os = "linux"))]
20use nix::unistd::gettid;
21
22use mysql::PooledConn;
23use spin_sleep::SpinSleeper;
24use std::fs;
25use std::sync::{Arc, Mutex};
26use std::time::{Duration, Instant};
27use thiserror::Error;
28
29use crate::database::sql_interface::SqlInterface;
30use crate::database::sql_interface_data::SqlInterfaceData;
31use crate::recorder::data_injection::DataInjectionTrait;
32use crate::recorder::data_logger_channels::DataLoggerChannels;
33use crate::recorder::data_logger_config::DataLoggerConfig;
34use crate::recorder::recorder_data_frame::RecorderDataFrame;
35use crate::sensors::sensor_manager::SensorManagerSignals;
36use crate::sensors::tank_level_switch::TankLevelSwitchSignals;
37use crate::utilities::acknowledge_signal_handler::AcknowledgeSignalHandlerTrait;
38use crate::utilities::channel_content::AquariumSignal;
39use crate::utilities::database_ping_trait::DatabasePingTrait;
40use crate::utilities::iir_filter::IIRFilter;
41use crate::utilities::proc_ext_req::ProcessExternalRequestTrait;
42use crate::utilities::wait_for_termination::WaitForTerminationTrait;
43use crate::water::refill::RefillStatus;
44
45const MILLIS_PER_SEC: u64 = 1000;
46
47/// Contains `Arc<Mutex>`-wrapped shared data points accessed by the data logger.
48///
49/// These mutexes provide thread-safe access to various sensor readings and
50/// device states that are collected by the data logger for recording and filtering.
51pub struct DataLoggerMutexes {
52 /// Mutex for sensor manager signals
53 pub mutex_sensor_manager_signals: Arc<Mutex<SensorManagerSignals>>,
54
55 /// Mutex for tank level switch signals
56 pub mutex_tank_level_switch_signals: Arc<Mutex<TankLevelSwitchSignals>>,
57
58 /// Mutex for heater status
59 pub mutex_heating_status: Arc<Mutex<bool>>,
60
61 /// Mutex for ventilation status
62 pub mutex_ventilation_status: Arc<Mutex<bool>>,
63
64 /// Mutex for refill status
65 pub mutex_refill_status: Arc<Mutex<RefillStatus>>,
66}
67
68/// Contains the error definition for DataLogger
69#[derive(Error, Debug)]
70pub enum DataLoggerError {
71 #[error("[{0}] One of the output file names for the data logging to filesystem is empty.")]
72 OutputFilenameEmpty(String),
73
74 #[error("Could not reset content of output file for {signal_name} ({output_file_name})")]
75 FileResetFailure {
76 location: String,
77 signal_name: String,
78 output_file_name: String,
79
80 #[source]
81 source: std::io::Error,
82 },
83}
84
85#[cfg_attr(doc, aquamarine::aquamarine)]
86/// Contains the configuration and the implementation for regular data logging to SQL database
87/// and writing to the file (in RAM disk).
88/// Thread communication is as follows:
89/// ```mermaid
90/// graph LR
91/// atlas_scientific[AtlasScientific] -.-> data_logger
92/// ambient[Ambient] -.-> data_logger
93/// tank_level_switch[TankLevelSwitch] --> data_logger
94/// refill[Refill control] -.-> data_logger
95/// ventilation[Ventilation control] --> data_logger
96/// data_logger --> signal_handler[Signal handler]
97/// signal_handler[Signal handler] --> data_logger
98/// ```
99pub struct DataLogger {
100 /// configuration data for the data logger
101 config: DataLoggerConfig,
102
103 /// an inhibition flag to avoid flooding the log file with repeated messages about failure to create the timestamp
104 lock_error_get_timestamp: bool,
105
106 /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write refill in progress to disk
107 lock_error_output_state_refill_in_progress: bool,
108
109 /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write heating is on to disk
110 lock_error_output_state_heating_is_on: bool,
111
112 /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write ventilation is on to disk
113 lock_error_output_state_ventilation_is_on: bool,
114
115 /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write water temperature to disk
116 lock_error_output_signal_water_temperature: bool,
117
118 /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write pH to disk
119 lock_error_output_signal_pH: bool,
120
121 /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write conductivity to disk
122 lock_error_output_signal_conductivity: bool,
123
124 /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write ambient temperature to disk
125 lock_error_output_signal_ambient_temperature: bool,
126
127 /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write humidity to disk
128 lock_error_output_signal_humidity: bool,
129
130 /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write tank level switch position to disk
131 lock_error_output_state_tank_level_switch_position: bool,
132
133 /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write tank level switch invalid to disk
134 lock_error_output_state_tank_level_switch_invalid: bool,
135
136 /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write tank level switch position stabilized to disk
137 lock_error_output_state_tank_level_switch_position_stabilized: bool,
138
139 /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write temperature filtered to disk
140 lock_error_output_signal_water_temperature_filtered: bool,
141
142 /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write pH filtered to disk
143 lock_error_output_signal_pH_filtered: bool,
144
145 /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write conductivity filtered to disk
146 lock_error_output_signal_conductivity_filtered: bool,
147
148 /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write timestamp to disk
149 lock_error_output_timestamp: bool,
150
151 /// an inhibition flag to avoid flooding the log file with repeated messages about having received an inapplicable command
152 pub lock_warn_inapplicable_command_signal_handler: bool,
153
154 /// an inhibition flag to avoid flooding the log file with repeated messages about failure to receive termination signal via the channel
155 pub lock_error_channel_receive_termination: bool,
156
157 /// recording when the last database ping happened
158 pub last_ping_instant: Instant,
159
160 /// database ping interval
161 pub database_ping_interval: Duration,
162 lock_error_mutex_signal_manager: bool,
163}
164
165impl DataLogger {
166 /// Creates a new `DataLogger` instance.
167 ///
168 /// This constructor initializes the data logger module with its configuration.
169 /// If `output_to_disk` is enabled, it validates that all required output file
170 /// paths are non-empty and then resets each file by writing an empty string to it.
171 /// It also initializes many internal "lock" flags designed to prevent log flooding.
172 ///
173 /// # Arguments
174 /// * `config` - Configuration data for the data logger, defining logging intervals,
175 /// filter coefficients, output file paths, and other operational parameters.
176 /// * `database_ping_interval` - A `Duration` instance, providing the interval to ping the database.
177 ///
178 /// # Returns
179 /// A `Result` containing a new, initialized `DataLogger` instance on success.
180 ///
181 /// # Errors
182 /// This function will return a `DataLoggerError` if `output_to_disk` is enabled and:
183 /// - Any of the configured output filenames are empty (`OutputFilenameEmpty`).
184 /// - It fails to write to (and thus reset) any of the configured output files,
185 /// which could be due to permission issues or an invalid path (`FileResetFailure`).
186 pub fn new(
187 config: DataLoggerConfig,
188 database_ping_interval: Duration,
189 ) -> Result<DataLogger, DataLoggerError> {
190 if config.output_to_disk {
191 // By defining the files in a collection, we can use iterators to check and
192 // initialize them, making the code concise, readable, and easy to maintain.
193 let files_to_initialize = [
194 (&config.output_file_name_timestamp, "timestamp"),
195 (
196 &config.output_file_name_water_temperature,
197 "water temperature",
198 ),
199 (
200 &config.output_file_name_water_temperature_filtered,
201 "filtered water temperature",
202 ),
203 (&config.output_file_name_pH, "pH value"),
204 (&config.output_file_name_pH_filtered, "filtered pH value"),
205 (&config.output_file_name_conductivity, "conductivity"),
206 (
207 &config.output_file_name_conductivity_filtered,
208 "filtered conductivity",
209 ),
210 (
211 &config.output_file_name_tank_level_switch_position,
212 "tank level switch position",
213 ),
214 (
215 &config.output_file_name_tank_level_switch_invalid,
216 "tank level switch invalid",
217 ),
218 (
219 &config.output_file_name_tank_level_switch_position_stabilized,
220 "tank level switch stabilized",
221 ),
222 (
223 &config.output_file_name_ventilation_is_on,
224 "ventilation status",
225 ),
226 (&config.output_file_name_heating_is_on, "heating status"),
227 (
228 &config.output_file_name_ambient_temperature,
229 "ambient temperature",
230 ),
231 (&config.output_file_name_humidity, "humidity"),
232 (
233 &config.output_file_name_refill_in_progress,
234 "refill in progress",
235 ),
236 ];
237
238 // 1. Check if any filename is empty.
239 if files_to_initialize
240 .iter()
241 .any(|(filename, _)| filename.is_empty())
242 {
243 return Err(DataLoggerError::OutputFilenameEmpty(
244 module_path!().to_string(),
245 ));
246 }
247
248 // 2. Initialize all files by writing an empty string.
249 for (filename, signal_name) in files_to_initialize {
250 fs::write(filename, String::new()).map_err(|e| {
251 DataLoggerError::FileResetFailure {
252 location: module_path!().to_string(),
253 signal_name: signal_name.to_string(),
254 output_file_name: filename.clone(),
255 source: e,
256 }
257 })?;
258 }
259 }
260
261 Ok(Self {
262 config,
263 lock_error_get_timestamp: false,
264 lock_error_output_signal_water_temperature: false,
265 lock_error_output_state_refill_in_progress: false,
266 lock_error_output_state_heating_is_on: false,
267 lock_error_output_state_ventilation_is_on: false,
268 lock_error_output_signal_pH: false,
269 lock_error_output_signal_conductivity: false,
270 lock_error_output_signal_ambient_temperature: false,
271 lock_error_output_signal_humidity: false,
272 lock_error_output_state_tank_level_switch_position: false,
273 lock_error_output_state_tank_level_switch_invalid: false,
274 lock_error_output_state_tank_level_switch_position_stabilized: false,
275 lock_error_output_signal_water_temperature_filtered: false,
276 lock_error_output_signal_pH_filtered: false,
277 lock_error_output_signal_conductivity_filtered: false,
278 lock_error_output_timestamp: false,
279 lock_warn_inapplicable_command_signal_handler: false,
280 lock_error_channel_receive_termination: false,
281 last_ping_instant: Instant::now(),
282 database_ping_interval,
283 lock_error_mutex_signal_manager: false,
284 })
285 }
286
287 /// Outputs a non-binary sensor signal to a specified file, typically located on a RAM disk.
288 ///
289 /// This private helper function converts a floating-point `signal` into a human-readable
290 /// string format (determined by `signal_type`) and attempts to write it to the given
291 /// `output_file_name`. The operation only proceeds if `output_to_disk` is enabled in
292 /// the data logger's configuration.
293 ///
294 /// Error logging for writing failures is managed by the `inhibit_error_log` flag,
295 /// preventing a flood of repetitive error messages.
296 ///
297 /// # Arguments
298 /// * `signal` - The `f32` sensor value to be written to the file.
299 /// * `signal_type` - An `AquariumSignal` enum variant, used to determine the correct
300 /// formatting for the `signal` value (e.g., number of decimal places).
301 /// * `output_file_name` - A reference to the `String` containing the full path to the output file.
302 /// * `inhibit_error_log` - A boolean flag that, if `true`, will prevent this specific
303 /// error from being logged to the console/main log if a write operation fails.
304 ///
305 /// # Returns
306 /// `true` if an error occurred during the file write operation; `false` otherwise (including
307 /// when `output_to_disk` is `false` and no write is attempted).
308 fn output_signal(
309 &self,
310 signal: f32,
311 signal_type: AquariumSignal,
312 output_file_name: &String,
313 inhibit_error_log: bool,
314 ) -> bool {
315 if self.config.output_to_disk {
316 let signal_string = signal_type.output_file_string(signal);
317
318 match fs::write(output_file_name, signal_string.clone()) {
319 Ok(_) => {
320 /* do nothing */
321 false // indicate no error
322 }
323 Err(e) => {
324 if !inhibit_error_log {
325 error!(
326 target: module_path!(),
327 "output of value {signal_string} to {output_file_name} failed: {e:?}"
328 );
329 }
330 true // indicate error
331 }
332 }
333 } else {
334 false // no output = no error
335 }
336 }
337
338 /// Outputs a boolean (binary) signal's state to a specified file, typically located on a RAM disk.
339 ///
340 /// This private helper function converts a boolean `state` into a descriptive string
341 /// (e.g., "ON"/"OFF", "HIGH"/"LOW") using provided labels and attempts to write it
342 /// to the given `output_file_name`. This operation only occurs if `output_to_disk`
343 /// is enabled in the data logger's configuration.
344 ///
345 /// Error logging for writing failures is controlled by the `inhibit_error_log` flag,
346 /// preventing a flood of repetitive error messages.
347 ///
348 /// # Arguments
349 /// * `state` - The `bool` value of the binary signal (e.g., `true` for ON, `false` for OFF).
350 /// * `state_description_true` - The string label to write to the file when `state` is `true`.
351 /// * `state_description_false` - The string label to write to the file when `state` is `false`.
352 /// * `output_file_name` - A reference to the `String` containing the full path to the output file.
353 /// * `inhibit_error_log` - A boolean flag that, if `true`, will prevent this specific
354 /// error from being logged to the console/main log if a write operation fails.
355 ///
356 /// # Returns
357 /// - `true`: If an error occurred during the file write operation.
358 /// - `false`: If the write-operation was successful or if `output_to_disk` is disabled.
359 fn output_state(
360 &self,
361 state: bool,
362 state_description_true: &str,
363 state_description_false: &str,
364 output_file_name: &String,
365 inhibit_error_log: bool,
366 ) -> bool {
367 if self.config.output_to_disk {
368 let state_string = match state {
369 true => state_description_true,
370 false => state_description_false,
371 };
372
373 match fs::write(output_file_name, state_string) {
374 Ok(_) => {
375 /* do nothing */
376 false // indicate no error
377 }
378 Err(e) => {
379 if !inhibit_error_log {
380 error!(
381 target: module_path!(),
382 "output of value {state_string} to {output_file_name} failed: {e:?}"
383 );
384 }
385 true // indicate error
386 }
387 }
388 } else {
389 false // no output = no error
390 }
391 }
392
393 /// Outputs a `NaiveDateTime` timestamp to a specified file, typically located on a RAM disk.
394 ///
395 /// This private helper function converts a `NaiveDateTime` into its string representation
396 /// and attempts to write it to the given `output_file_name`. This operation only occurs
397 /// if `output_to_disk` is enabled in the data logger's configuration.
398 ///
399 /// Error logging for writing failures is controlled by the `inhibit_error_log` flag,
400 /// preventing a flood of repetitive error messages.
401 ///
402 /// # Arguments
403 /// * `timestamp` - The `NaiveDateTime` value to be written to the file.
404 /// * `output_file_name` - A reference to the `String` containing the full path to the output file.
405 /// * `inhibit_error_log` - A boolean flag that, if `true`, will prevent this specific
406 /// error from being logged to the console/main log if a write operation fails.
407 ///
408 /// # Returns
409 /// - `true`: If an error occurred during the file write operation.
410 /// - `false`: If the write-operation was successful or if `output_to_disk` is disabled.
411 fn output_timestamp(
412 &self,
413 timestamp: NaiveDateTime,
414 output_file_name: &String,
415 inhibit_error_log: bool,
416 ) -> bool {
417 if self.config.output_to_disk {
418 match fs::write(output_file_name, timestamp.to_string()) {
419 Ok(_) => {
420 /* do nothing */
421 false // indicate no error
422 }
423 Err(e) => {
424 if !inhibit_error_log {
425 error!(
426 target: module_path!(),
427 "output of value {timestamp} to {output_file_name} failed: {e:?}"
428 );
429 }
430 true // indicate error
431 }
432 }
433 } else {
434 false // no output = no error
435 }
436 }
437
438 /// Retrieves current ON/OFF status signals from Refill, Heating, and Ventilation control modules via channels.
439 ///
440 /// This function sends a `RequestSignal` command to each of the specified
441 /// control modules (Refill, Heating, Ventilation) and updates the provided
442 /// status flags based on their responses. It also tracks channel disconnection
443 /// status to prevent repeated errors.
444 ///
445 /// # Arguments
446 /// * `refill_status` - A mutable reference for indicating if the refill is in progress.
447 /// * `heater_status` - A mutable reference for indicating the heater state.
448 /// * `ventilation_status` - A mutable reference for indicating the ventilation state.
449 fn update_signals(
450 &mut self,
451 refill_status_live: &mut bool,
452 refill_status_for_database: &mut bool,
453 refill_reset_for_database: bool,
454 heater_status: &mut bool,
455 ventilation_status: &mut bool,
456 data_logger_mutexes: &DataLoggerMutexes,
457 ) {
458 {
459 // internal scope to limit lifetime of mutex lock
460 match data_logger_mutexes.mutex_refill_status.lock() {
461 Ok(mut refill_status) => {
462 self.lock_error_output_state_refill_in_progress = self.output_state(
463 refill_status.refill_in_progress_live, // use the live signal
464 "ON",
465 "OFF",
466 &self.config.output_file_name_refill_in_progress,
467 self.lock_error_output_state_refill_in_progress,
468 );
469 *refill_status_live = refill_status.refill_in_progress_live;
470 *refill_status_for_database = refill_status.refill_in_progress_for_database;
471 if refill_reset_for_database {
472 refill_status.refill_in_progress_for_database =
473 refill_status.refill_in_progress_live;
474 }
475 }
476 Err(_) => {
477 self.lock_error_output_state_refill_in_progress = true;
478 error!(target: module_path!(), "error locking mutex for refill status");
479 }
480 };
481 }
482
483 {
484 // internal scope to limit lifetime of mutex lock
485 *heater_status = match data_logger_mutexes.mutex_heating_status.lock() {
486 Ok(c) => {
487 self.lock_error_output_state_heating_is_on = self.output_state(
488 *c,
489 "ON",
490 "OFF",
491 &self.config.output_file_name_heating_is_on,
492 self.lock_error_output_state_heating_is_on,
493 );
494 *c
495 }
496 Err(_) => {
497 self.lock_error_output_state_heating_is_on = true;
498 error!(target: module_path!(), "error locking mutex for heating status");
499 false
500 }
501 };
502 }
503
504 {
505 // internal scope to limit lifetime of mutex lock
506 *ventilation_status = match data_logger_mutexes.mutex_ventilation_status.lock() {
507 Ok(c) => {
508 self.lock_error_output_state_ventilation_is_on = self.output_state(
509 *c,
510 "ON",
511 "OFF",
512 &self.config.output_file_name_ventilation_is_on,
513 self.lock_error_output_state_ventilation_is_on,
514 );
515 *c
516 }
517 Err(_) => {
518 self.lock_error_output_state_ventilation_is_on = true;
519 error!(target: module_path!(), "error locking mutex for ventilation status");
520 false
521 }
522 };
523 }
524 }
525
526 /// Retrieves current sensor readings from the `SensorManager` via a shared mutex and outputs them to files.
527 ///
528 /// This private helper function locks the `mutex_sensor_manager_signals` to get the latest
529 /// readings for water temperature, pH, conductivity, ambient temperature, and humidity.
530 /// It updates the corresponding `Option<f32>` variables with the retrieved values and then
531 /// calls `output_signal` to write these readings to their respective configured files on disk.
532 /// If the mutex lock fails, an error is logged, and the function returns without updating the values.
533 ///
534 /// # Arguments
535 /// * `mutex_sensor_manager_signals` - An `Arc<Mutex<SensorManagerSignals>>` holding the latest sensor readings.
536 /// * `water_temperature_opt` - A mutable reference to an `Option<f32>` where the retrieved water temperature will be stored.
537 /// * `ph_opt` - A mutable reference to an `Option<f32>` where the retrieved pH value will be stored.
538 /// * `conductivity_opt` - A mutable reference to an `Option<f32>` where the retrieved conductivity value will be stored.
539 /// * `ambient_temperature_opt` - A mutable reference to an `Option<f32>` where the retrieved ambient temperature will be stored.
540 /// * `humidity_opt` - A mutable reference to an `Option<f32>` where the retrieved humidity will be stored.
541 fn update_sensor_manager_signals(
542 &mut self,
543 mutex_sensor_manager_signals: Arc<Mutex<SensorManagerSignals>>,
544 water_temperature_opt: &mut Option<f32>,
545 ph_opt: &mut Option<f32>,
546 conductivity_opt: &mut Option<f32>,
547 ambient_temperature_opt: &mut Option<f32>,
548 humidity_opt: &mut Option<f32>,
549 ) {
550 let water_temperature: f32;
551 let ph: f32;
552 let conductivity: f32;
553 let ambient_temperature: f32;
554 let humidity: f32;
555 {
556 // internal scope to limit lifetime of mutex lock
557 match mutex_sensor_manager_signals.lock() {
558 Ok(c) => {
559 self.lock_error_mutex_signal_manager = false;
560 water_temperature = c.water_temperature;
561 ph = c.ph;
562 conductivity = c.conductivity;
563 ambient_temperature = c.ambient_temperature;
564 humidity = c.ambient_humidity;
565 }
566 Err(_) => {
567 if !self.lock_error_mutex_signal_manager {
568 self.lock_error_mutex_signal_manager = true;
569 error!(target: module_path!(), "error locking mutex for sensor manager");
570 }
571 return;
572 }
573 };
574 self.lock_error_output_signal_water_temperature = self.output_signal(
575 water_temperature,
576 AquariumSignal::WaterTemperature,
577 &self.config.output_file_name_water_temperature,
578 self.lock_error_output_signal_water_temperature,
579 );
580 self.lock_error_output_signal_pH = self.output_signal(
581 ph,
582 AquariumSignal::pH,
583 &self.config.output_file_name_pH,
584 self.lock_error_output_signal_pH,
585 );
586 self.lock_error_output_signal_conductivity = self.output_signal(
587 conductivity,
588 AquariumSignal::Conductivity,
589 &self.config.output_file_name_conductivity,
590 self.lock_error_output_signal_conductivity,
591 );
592 self.lock_error_output_signal_ambient_temperature = self.output_signal(
593 ambient_temperature,
594 AquariumSignal::AmbientTemperature,
595 &self.config.output_file_name_ambient_temperature,
596 self.lock_error_output_signal_ambient_temperature,
597 );
598 self.lock_error_output_signal_humidity = self.output_signal(
599 humidity,
600 AquariumSignal::AmbientHumidity,
601 &self.config.output_file_name_humidity,
602 self.lock_error_output_signal_humidity,
603 );
604
605 *water_temperature_opt = Some(water_temperature);
606 *ph_opt = Some(ph);
607 *conductivity_opt = Some(conductivity);
608 *ambient_temperature_opt = Some(ambient_temperature);
609 *humidity_opt = Some(humidity);
610 }
611 }
612
613 /// Retrieves current signals from the `TankLevelSwitch` via a shared mutex and outputs them to files.
614 ///
615 /// This private helper function locks the `mutex_tank_level_switch_signals` to get the latest
616 /// state for the switch's position, its stabilized position, and its invalid status.
617 /// It updates the corresponding `Option<bool>` variables with the retrieved values and then
618 /// calls `output_state` to write these states to their respective configured files on disk.
619 /// If the mutex lock fails, the `Option` variables are set to `None`.
620 ///
621 /// # Arguments
622 /// * `mutex_tank_level_switch_signals` - An `Arc<Mutex<TankLevelSwitchSignals>>` holding the latest switch states.
623 /// * `tank_level_switch_position_opt` - A mutable reference to an `Option<bool>` for the switch's current position.
624 /// * `tank_level_switch_position_stabilized_opt` - A mutable reference to an `Option<bool>` for the switch's stabilized position.
625 /// * `tank_level_switch_invalid_opt` - A mutable reference to an `Option<bool>` for the switch's invalid status.
626 fn update_tank_level_switch_signals(
627 &mut self,
628 mutex_tank_level_switch_signals: &Arc<Mutex<TankLevelSwitchSignals>>,
629 tank_level_switch_position_opt: &mut Option<bool>,
630 tank_level_switch_position_stabilized_opt: &mut Option<bool>,
631 tank_level_switch_invalid_opt: &mut Option<bool>,
632 ) {
633 {
634 // internal scope to limit lifetime of mutex lock
635 match mutex_tank_level_switch_signals.lock() {
636 Ok(tank_level_switch_signals) => {
637 *tank_level_switch_position_opt =
638 Some(tank_level_switch_signals.tank_level_switch_position);
639 *tank_level_switch_position_stabilized_opt =
640 Some(tank_level_switch_signals.tank_level_switch_position_stabilized);
641 *tank_level_switch_invalid_opt =
642 Some(tank_level_switch_signals.tank_level_switch_invalid);
643
644 self.lock_error_output_state_tank_level_switch_position = self.output_state(
645 tank_level_switch_signals.tank_level_switch_position,
646 "HIGH",
647 "LOW",
648 &self.config.output_file_name_tank_level_switch_position,
649 self.lock_error_output_state_tank_level_switch_position,
650 );
651
652 self.lock_error_output_state_tank_level_switch_position_stabilized = self
653 .output_state(
654 tank_level_switch_signals.tank_level_switch_position_stabilized,
655 "HIGH",
656 "LOW",
657 &self
658 .config
659 .output_file_name_tank_level_switch_position_stabilized,
660 self.lock_error_output_state_tank_level_switch_position_stabilized,
661 );
662
663 self.lock_error_output_state_tank_level_switch_invalid = self.output_state(
664 tank_level_switch_signals.tank_level_switch_invalid,
665 "INVALID",
666 "VALID",
667 &self.config.output_file_name_tank_level_switch_invalid,
668 self.lock_error_output_state_tank_level_switch_invalid,
669 );
670 }
671 Err(_) => {
672 *tank_level_switch_position_opt = None;
673 *tank_level_switch_position_stabilized_opt = None;
674 *tank_level_switch_invalid_opt = None;
675 }
676 };
677 }
678 }
679
680 /// Calculates filtered values for water temperature, pH, and conductivity, then outputs them to files.
681 ///
682 /// This private helper function takes raw (unfiltered) sensor readings. For each valid reading,
683 /// it applies its corresponding `IIRFilter` to smooth the data. The resulting filtered values
684 /// are then stored in new `Option<f32>` variables and written to their designated files on disk.
685 /// If a raw reading is `None`, no filtering or output occurs for that signal.
686 ///
687 /// # Arguments
688 /// * `water_temperature` - The measured water temperature, wrapped in an `Option`.
689 /// * `ph` - The measured pH value, wrapped in an `Option`.
690 /// * `conductivity` - The measured conductivity value, wrapped in an `Option`.
691 ///
692 /// # Returns
693 /// A tuple `(Option<f32>, Option<f32>, Option<f32>)` containing the filtered
694 /// water temperature, pH, and conductivity values, respectively. Each value is
695 /// `Some` if the corresponding input was valid and filtered, or `None` otherwise.
696 fn calculate_filtered_signals(
697 &mut self,
698 water_temperature: Option<f32>,
699 ph: Option<f32>,
700 conductivity: Option<f32>,
701 filter_water_temperature: &mut IIRFilter,
702 filter_pH: &mut IIRFilter,
703 filter_conductivity: &mut IIRFilter,
704 ) -> (Option<f32>, Option<f32>, Option<f32>) {
705 let water_temperature_filtered = match water_temperature {
706 Some(water_temperature_data) => {
707 let filtered_water_temperature =
708 filter_water_temperature.get_filtered_value(water_temperature_data);
709 self.lock_error_output_signal_water_temperature_filtered = self.output_signal(
710 filtered_water_temperature,
711 AquariumSignal::WaterTemperature,
712 &self.config.output_file_name_water_temperature_filtered,
713 self.lock_error_output_signal_water_temperature_filtered,
714 );
715 Some(filtered_water_temperature)
716 }
717 None => None,
718 };
719
720 let pH_filtered = match ph {
721 Some(ph_data) => {
722 let filtered_pH = filter_pH.get_filtered_value(ph_data);
723 self.lock_error_output_signal_pH_filtered = self.output_signal(
724 filtered_pH,
725 AquariumSignal::pH,
726 &self.config.output_file_name_pH_filtered,
727 self.lock_error_output_signal_pH_filtered,
728 );
729 Some(filtered_pH)
730 }
731 None => None,
732 };
733
734 let conductivity_filtered = match conductivity {
735 Some(conductivity_data) => {
736 let filtered_conductivity =
737 filter_conductivity.get_filtered_value(conductivity_data);
738 self.lock_error_output_signal_conductivity_filtered = self.output_signal(
739 filtered_conductivity,
740 AquariumSignal::Conductivity,
741 &self.config.output_file_name_conductivity_filtered,
742 self.lock_error_output_signal_conductivity_filtered,
743 );
744 Some(filtered_conductivity)
745 }
746 None => None,
747 };
748
749 (
750 water_temperature_filtered,
751 pH_filtered,
752 conductivity_filtered,
753 )
754 }
755
756 /// Retrieves the current timestamp, either from the SQL database or generated locally by the application.
757 ///
758 /// This private helper function determines the timestamp source based on the `use_sql_timestamp`
759 /// configuration setting. If `use_sql_timestamp` is `true`, it attempts to fetch the timestamp
760 /// from the connected SQL database. If that fails, it falls back to using a locally generated
761 /// timestamp and logs an error. If `use_sql_timestamp` is `false`, it always uses a local timestamp.
762 /// The chosen timestamp is also written to a configured file on the disk.
763 ///
764 /// # Arguments
765 /// * `conn` - A mutable reference to an active SQL database connection, used if the SQL timestamp option is enabled.
766 ///
767 /// # Returns
768 /// A `NaiveDateTime` representing the current timestamp, sourced either from the database or the local system.
769 fn update_timestamp(&mut self, conn: &mut PooledConn) -> NaiveDateTime {
770 match self.config.use_sql_timestamp {
771 true => match SqlInterface::get_current_timestamp(conn) {
772 Ok(c) => {
773 self.lock_error_get_timestamp = false;
774 self.lock_error_output_timestamp = self.output_timestamp(
775 c,
776 &self.config.output_file_name_timestamp,
777 self.lock_error_output_timestamp,
778 );
779 c
780 }
781 Err(e) => {
782 if !self.lock_error_get_timestamp {
783 error!(
784 target: module_path!(),
785 "could not retrieve timestamp from database: ({e:?}) - using application-generated timestamp."
786 );
787 self.lock_error_get_timestamp = true;
788 }
789 Local::now().naive_local()
790 }
791 },
792 false => Local::now().naive_local(),
793 }
794 }
795
796 /// Confirms the application's readiness to quit and then waits for a specific termination command from the signal handler.
797 ///
798 /// This function is called when the thread has received the Quit-command from the signal handler.
799 /// It first sends a confirmation back to the signal handler,
800 /// indicating that this component is ready to proceed with shutdown.
801 ///
802 /// After confirming, it enters a loop, continuously listening for an `InternalCommand::Terminate`
803 /// signal from the `signal_handler`. This ensures that the application's internal threads,
804 /// which might be listening to this component's channels, stop receiving commands.
805 ///
806 /// # Arguments
807 ///
808 /// * `data_logger_channels` - A mutable reference to the struct that contains the channels.
809 fn confirm_quit_and_wait_for_termination_command(
810 &mut self,
811 data_logger_channels: &mut DataLoggerChannels,
812 ) {
813 let sleep_duration_hundred_millis = Duration::from_millis(100);
814
815 data_logger_channels.acknowledge_signal_handler();
816
817 // This thread has channel connections to underlying threads.
818 // Those threads have to stop receiving commands from this thread.
819 // The shutdown sequence is handled by the signal_handler module.
820 self.wait_for_termination(
821 &mut data_logger_channels.rx_data_logger_from_signal_handler,
822 sleep_duration_hundred_millis,
823 module_path!(),
824 );
825 }
826
827 /// Executes the main control loop for the data logger module.
828 ///
829 /// This function runs continuously, acting as the central data collection and logging hub for
830 /// the application.
831 /// Before entering the loop, the function will set up Infinite Impulse Response (IIR) filters
832 /// for various sensor signals.
833 /// The IIR filters for water temperature, pH, and conductivity are initialized
834 /// with available data and a 1-second time step.
835 ///
836 /// It performs the following key tasks in a recurring cycle:
837 /// 1. **Signal Retrieval**: Gathers current status (ON/OFF) from control modules (Refill, Heating, Ventilation) via channels.
838 /// 2. **Sensor Data Acquisition**: Reads raw sensor values (temperatures, pH, conductivity, ambient conditions, tank level switch states) from shared mutexes, which are updated by other sensor-reading threads.
839 /// 3. **Signal Filtering**: Applies IIR (Infinite Impulse Response) filters to sensor signals (e.g., water temperature, pH, conductivity) to produce smoothed values.
840 /// 4. **Timestamping**: Determines the current timestamp, either from the SQL database or generated locally, based on configuration.
841 /// 5. **DataFrame Construction**: Assembles all collected and processed data into a comprehensive `RecorderDataFrame`.
842 /// 6. **Data Injection & Disk Output**: Transfers the `RecorderDataFrame` to the SQL database (via the `data_injection` trait) and writes various individual signal/state values to designated files on a RAM disk (if configured).
843 /// 7. **Cycle Management**: Calculates the execution time of the current cycle and idles for the remaining duration to meet the configured `logging_interval`, ensuring precise data logging frequency. It warns if the cycle time is exceeded.
844 /// 8. **Responsiveness**: Remains responsive to `Quit` commands from the signal handler during idle periods to enable graceful shutdown.
845 ///
846 /// The loop continues indefinitely until a `Quit` command is received from the signal handler.
847 /// Upon receiving `Quit`, it performs a graceful exit sequence: sending confirmation back
848 /// to the signal handler and then waiting for a `Terminate` command before finally exiting the thread.
849 ///
850 /// # Arguments
851 /// * `data_injection` - An object implementing the `DataInjectionTrait`, used for transferring the collected `RecorderDataFrame` to the SQL database or a mock data structure during testing.
852 /// * `sql_interface_data` - A `SqlInterfaceData` instance, providing the direct interface for writing `RecorderDataFrame`s to the SQL database.
853 /// * `data_logger_channels` - A `DataLoggerChannels` struct, containing all the `mpsc` sender and receiver channels necessary for inter-thread communication with other modules and the signal handler.
854 /// * `data_logger_mutexes` - A `DataLoggerMutexes` struct, providing access to shared `Arc<Mutex>` protected sensor data and device states.
855 pub fn execute(
856 &mut self,
857 data_injection: &mut impl DataInjectionTrait,
858 mut sql_interface_data: SqlInterfaceData,
859 data_logger_channels: &mut DataLoggerChannels,
860 data_logger_mutexes: DataLoggerMutexes,
861 ) {
862 #[cfg(all(target_os = "linux", not(test)))]
863 info!(target: module_path!(), "Thread started with TID: {}", gettid());
864
865 let cycle_time_duration =
866 Duration::from_millis(self.config.logging_interval * MILLIS_PER_SEC);
867 let sleep_duration_hundred_millis = Duration::from_millis(100);
868 let spin_sleeper = SpinSleeper::default();
869 let mut i;
870
871 // define and initialize with values which will be overwritten in the first iteration
872 let mut water_temperature: Option<f32> = Some(10.0);
873 let mut ph: Option<f32> = Some(0.0);
874 let mut conductivity: Option<f32> = Some(0.0);
875 let mut ambient_temperature: Option<f32> = Some(0.0);
876 let mut ambient_humidity: Option<f32> = Some(0.0);
877 let mut tank_level_switch_invalid: Option<bool> = Some(false);
878 let mut tank_level_switch_position: Option<bool> = Some(true);
879 let mut tank_level_switch_position_stabilized: Option<bool> = Some(true);
880 let mut heater_is_on: bool = false;
881 let mut ventilation_is_on: bool = false;
882 let mut refill_in_progress_live: bool = false;
883 let mut refill_in_progress_for_database: bool = false;
884 let mut quit_command_received = false;
885 let mut lock_warn_cycle_time_exceeded = false;
886 let mut cycle_time_exceeded: bool; // ensures that signal handler/messaging are polled at least once even if cycle time is exceeded
887
888 // update signals by accessing mutex for Atlas Scientific signals
889 self.update_sensor_manager_signals(
890 data_logger_mutexes.mutex_sensor_manager_signals.clone(),
891 &mut water_temperature,
892 &mut ph,
893 &mut conductivity,
894 &mut ambient_temperature,
895 &mut ambient_humidity,
896 );
897
898 let mut filter_water_temperature = IIRFilter::new_requires_initial(
899 &self.config.filter_coefficient_water_temperature,
900 water_temperature.unwrap_or(25.0),
901 1.0, // 1 second time step
902 );
903 let mut filter_pH = IIRFilter::new_requires_initial(
904 &self.config.filter_coefficient_pH,
905 ph.unwrap_or(7.0),
906 1.0, // 1 second time step
907 );
908 let mut filter_conductivity = IIRFilter::new_requires_initial(
909 &self.config.filter_coefficient_conductivity,
910 conductivity.unwrap_or(50_000.0),
911 1.0, // 1 second time step
912 );
913
914 let initial_sleep_time_start = Instant::now();
915 while Instant::now()
916 .duration_since(initial_sleep_time_start)
917 .as_millis()
918 < self.config.initial_sleep_time_millis.into()
919 {
920 spin_sleeper.sleep(sleep_duration_hundred_millis);
921 (quit_command_received, _, _) = self.process_external_request(
922 &mut data_logger_channels.rx_data_logger_from_signal_handler,
923 None,
924 );
925 if quit_command_received {
926 self.confirm_quit_and_wait_for_termination_command(data_logger_channels);
927 return;
928 }
929 }
930
931 // Initialize reference for cycle time measurement
932 let mut start_time = Instant::now();
933
934 loop {
935 if self.config.active {
936 self.update_signals(
937 &mut refill_in_progress_live,
938 &mut refill_in_progress_for_database,
939 true,
940 &mut heater_is_on,
941 &mut ventilation_is_on,
942 &data_logger_mutexes,
943 );
944
945 // update signals by accessing mutex for Atlas Scientific signals
946 self.update_sensor_manager_signals(
947 data_logger_mutexes.mutex_sensor_manager_signals.clone(),
948 &mut water_temperature,
949 &mut ph,
950 &mut conductivity,
951 &mut ambient_temperature,
952 &mut ambient_humidity,
953 );
954
955 // update signals by accessing mutex for tank level switch signals
956 self.update_tank_level_switch_signals(
957 &data_logger_mutexes.mutex_tank_level_switch_signals,
958 &mut tank_level_switch_position,
959 &mut tank_level_switch_position_stabilized,
960 &mut tank_level_switch_invalid,
961 );
962
963 let (water_temperature_filtered, pH_filtered, conductivity_filtered) = self
964 .calculate_filtered_signals(
965 water_temperature,
966 ph,
967 conductivity,
968 &mut filter_water_temperature,
969 &mut filter_pH,
970 &mut filter_conductivity,
971 );
972
973 let current_timestamp = self.update_timestamp(&mut sql_interface_data.conn);
974
975 // all data received, let's construct a data frame
976 let data_frame = RecorderDataFrame {
977 timestamp: current_timestamp,
978 water_temperature,
979 water_temperature_filtered,
980 ph,
981 ph_filtered: pH_filtered,
982 conductivity,
983 conductivity_filtered,
984 conductivity_compensated: None,
985 refill_in_progress: Some(refill_in_progress_for_database),
986 tank_level_switch_position,
987 tank_level_switch_position_stabilized,
988 tank_level_switch_invalid,
989 surface_ventilation_status: Some(ventilation_is_on),
990 ambient_temperature,
991 ambient_humidity,
992 heater_status: Some(heater_is_on),
993 };
994
995 data_injection.inject_data(data_frame, &mut sql_interface_data);
996 }
997 let stop_time = Instant::now();
998 let execution_duration = stop_time.duration_since(start_time);
999
1000 // deduct the execution time of the thread from the target cycle time
1001 let remaining_sleep_time_millis: u64;
1002 if execution_duration > cycle_time_duration {
1003 if !lock_warn_cycle_time_exceeded {
1004 warn!(target: module_path!(),
1005 "execution duration of {} milliseconds exceeds cycle time of {}",
1006 execution_duration.as_millis(),
1007 cycle_time_duration.as_millis());
1008 lock_warn_cycle_time_exceeded = true;
1009 }
1010 remaining_sleep_time_millis = 0;
1011 cycle_time_exceeded = true;
1012 } else {
1013 // underflow cannot happen because the else statement requires that the first operand is higher than the second
1014 lock_warn_cycle_time_exceeded = false;
1015 let remaining_sleep_time_duration = cycle_time_duration - execution_duration;
1016 remaining_sleep_time_millis = remaining_sleep_time_duration.as_millis() as u64;
1017 cycle_time_exceeded = false;
1018 }
1019
1020 #[cfg(feature = "debug_data_logger")]
1021 debug!(
1022 target: module_path!(),
1023 "inserted data into database, now idling for {} milliseconds",
1024 remaining_sleep_time_millis
1025 );
1026
1027 // wait the remaining time period to complete the target cycle time
1028 i = 0;
1029 while i < remaining_sleep_time_millis || cycle_time_exceeded {
1030 i += 100;
1031 spin_sleeper.sleep(sleep_duration_hundred_millis);
1032 (quit_command_received, _, _) = self.process_external_request(
1033 &mut data_logger_channels.rx_data_logger_from_signal_handler,
1034 None,
1035 );
1036 if quit_command_received {
1037 #[cfg(feature = "debug_data_logger")]
1038 debug!(
1039 target: module_path!(),
1040 "received QUIT command from signal handler"
1041 );
1042 break;
1043 }
1044
1045 if self.config.active {
1046 // once per second, retrieve signals and write them to the RAM disk
1047 if i % 1000 == 0 {
1048 self.update_signals(
1049 &mut refill_in_progress_live,
1050 &mut refill_in_progress_for_database,
1051 false,
1052 &mut heater_is_on,
1053 &mut ventilation_is_on,
1054 &data_logger_mutexes,
1055 );
1056
1057 // update signals by accessing mutex for Atlas Scientific signals
1058 self.update_sensor_manager_signals(
1059 data_logger_mutexes.mutex_sensor_manager_signals.clone(),
1060 &mut water_temperature,
1061 &mut ph,
1062 &mut conductivity,
1063 &mut ambient_temperature,
1064 &mut ambient_humidity,
1065 );
1066
1067 // update signals by accessing mutex for tank level switch signals
1068 self.update_tank_level_switch_signals(
1069 &data_logger_mutexes.mutex_tank_level_switch_signals,
1070 &mut tank_level_switch_position,
1071 &mut tank_level_switch_position_stabilized,
1072 &mut tank_level_switch_invalid,
1073 );
1074 }
1075 }
1076 cycle_time_exceeded = false;
1077 }
1078 if quit_command_received {
1079 break;
1080 }
1081
1082 #[cfg(feature = "debug_data_logger")]
1083 debug!(
1084 target: module_path!(),
1085 "finished idling restarting loop"
1086 );
1087
1088 self.check_timing_and_ping_database(&mut sql_interface_data);
1089
1090 start_time = Instant::now(); // start time for the next cycle
1091 }
1092
1093 self.confirm_quit_and_wait_for_termination_command(data_logger_channels);
1094 }
1095}
1096
1097#[cfg(test)]
1098pub mod tests {
1099 use crate::database::{sql_interface::SqlInterface, sql_interface_data::SqlInterfaceData};
1100 use crate::launch::channels::{channel, Channels};
1101 use crate::mocks::mock_data_injection::tests::MockDataInjection;
1102 use crate::mocks::mock_heating::tests::mock_heating;
1103 use crate::mocks::mock_signal::tests::MockSignal;
1104 use crate::mocks::mock_ventilation::tests::mock_ventilation;
1105 use crate::mocks::test_command::tests::TestCommand;
1106 use crate::recorder::data_logger::{DataLogger, DataLoggerConfig, DataLoggerMutexes};
1107 use crate::sensors::sensor_manager::SensorManagerSignals;
1108 use crate::sensors::tank_level_switch::TankLevelSwitchSignals;
1109 use crate::utilities::channel_content::InternalCommand;
1110 use crate::utilities::config::{read_config_file_with_test_database, ConfigData};
1111 use crate::water::refill::RefillStatus;
1112 use spin_sleep::SpinSleeper;
1113 use std::sync::{Arc, Mutex};
1114 use std::{fs, thread, time::Duration};
1115
1116 #[cfg(test)]
1117 // Asserts the content of all data logger output files on disk.
1118 //
1119 // This comprehensive helper function is used in test environments to verify that
1120 // the `DataLogger` correctly writes various sensor signals and device states
1121 // to their respective configured files. It reads each file, parses its content
1122 // (for floating-point numbers) or compares it directly (for boolean states),
1123 // and asserts that it matches the expected target values.
1124 //
1125 // Arguments:
1126 // * `data_logger_config`: The `DataLoggerConfig` struct containing the names of all output files.
1127 // * `target_water_temperature`: The expected `f32` value for water temperature.
1128 // * `target_water_temperature_filtered`: The expected `f32` value for filtered water temperature.
1129 // * `target_water_ph`: The expected `f32` value for pH.
1130 // * `target_water_ph_filtered`: The expected `f32` value for filtered pH.
1131 // * `target_water_conductivity`: The expected `f32` value for conductivity.
1132 // * `target_water_conductivity_filtered`: The expected `f32` value for filtered conductivity.
1133 // * `_target_water_conductivity_compensated`: (Ignored) Placeholder for a compensated conductivity value.
1134 // * `target_refill_in_progress`: The expected `bool` state for refill in progress.
1135 // * `target_tank_level_switch_position`: The expected `bool` state for tank level switch position.
1136 // * `target_tank_level_switch_invalid`: The expected `bool` state for tank level switch invalid status.
1137 // * `target_tank_level_switch_position_stabilized`: The expected `bool` state for stabilized tank level switch position.
1138 // * `target_surface_ventilation_status`: The expected `bool` state for surface ventilation status.
1139 // * `target_ambient_temperature`: The expected `f32` value for ambient temperature.
1140 // * `target_ambient_humidity`: The expected `f32` value for ambient humidity.
1141 // * `target_heater_status`: The expected `bool` state for heater status.
1142 // * `target_water_temperature_ds18b20`: The expected `f32` value for DS18B20 water temperature.
1143 // * `target_ambient_temperature_ds18b20`: The expected `f32` value for DS18B20 ambient temperature.
1144 //
1145 // Panics:
1146 // This function will panic if:
1147 // - It fails to read any of the specified output files.
1148 // - It fails to parse a file's content into the expected numeric type.
1149 // - The content of any file does not match its corresponding target value.
1150 fn assert_output_file_contents(
1151 data_logger_config: &DataLoggerConfig,
1152 target_water_temperature: f32,
1153 target_water_temperature_filtered: f32,
1154 target_water_ph: f32,
1155 target_water_ph_filtered: f32,
1156 target_water_conductivity: f32,
1157 target_water_conductivity_filtered: f32,
1158 _target_water_conductivity_compensated: f32,
1159 target_refill_in_progress: bool,
1160 target_tank_level_switch_position: bool,
1161 target_tank_level_switch_invalid: bool,
1162 target_tank_level_switch_position_stabilized: bool,
1163 target_surface_ventilation_status: bool,
1164 target_ambient_temperature: f32,
1165 target_ambient_humidity: f32,
1166 target_heater_status: bool,
1167 ) {
1168 // Helper to assert the content of a file containing a f32 value.
1169 let assert_file_f32 = |filename: &str, expected: f32| {
1170 let content = fs::read_to_string(filename)
1171 .unwrap_or_else(|_| panic!("Could not read file: {}", filename));
1172 let value = content
1173 .parse::<f32>()
1174 .unwrap_or_else(|_| panic!("Could not parse f32 from file: {}", filename));
1175 assert_eq!(value, expected, "Mismatch in file: {}", filename);
1176 };
1177
1178 // Helper to assert the content of a file containing a boolean state.
1179 let assert_file_bool =
1180 |filename: &str, expected_bool: bool, true_str: &str, false_str: &str| {
1181 let content = fs::read_to_string(filename)
1182 .unwrap_or_else(|_| panic!("Could not read file: {}", filename));
1183 let expected_str = if expected_bool { true_str } else { false_str };
1184 assert_eq!(content, expected_str, "Mismatch in file: {}", filename);
1185 };
1186
1187 // Now, the assertions are clean, single-line calls.
1188 assert_file_f32(
1189 &data_logger_config.output_file_name_water_temperature,
1190 target_water_temperature,
1191 );
1192 assert_file_f32(
1193 &data_logger_config.output_file_name_water_temperature_filtered,
1194 target_water_temperature_filtered,
1195 );
1196 assert_file_f32(&data_logger_config.output_file_name_pH, target_water_ph);
1197 assert_file_f32(
1198 &data_logger_config.output_file_name_pH_filtered,
1199 target_water_ph_filtered,
1200 );
1201 assert_file_f32(
1202 &data_logger_config.output_file_name_conductivity,
1203 target_water_conductivity,
1204 );
1205 assert_file_f32(
1206 &data_logger_config.output_file_name_conductivity_filtered,
1207 target_water_conductivity_filtered,
1208 );
1209 assert_file_f32(
1210 &data_logger_config.output_file_name_ambient_temperature,
1211 target_ambient_temperature,
1212 );
1213 assert_file_f32(
1214 &data_logger_config.output_file_name_humidity,
1215 target_ambient_humidity,
1216 );
1217
1218 assert_file_bool(
1219 &data_logger_config.output_file_name_refill_in_progress,
1220 target_refill_in_progress,
1221 "ON",
1222 "OFF",
1223 );
1224 assert_file_bool(
1225 &data_logger_config.output_file_name_tank_level_switch_position,
1226 target_tank_level_switch_position,
1227 "HIGH",
1228 "LOW",
1229 );
1230 assert_file_bool(
1231 &data_logger_config.output_file_name_tank_level_switch_invalid,
1232 target_tank_level_switch_invalid,
1233 "INVALID",
1234 "VALID",
1235 );
1236 assert_file_bool(
1237 &data_logger_config.output_file_name_tank_level_switch_position_stabilized,
1238 target_tank_level_switch_position_stabilized,
1239 "HIGH",
1240 "LOW",
1241 );
1242 assert_file_bool(
1243 &data_logger_config.output_file_name_ventilation_is_on,
1244 target_surface_ventilation_status,
1245 "ON",
1246 "OFF",
1247 );
1248 assert_file_bool(
1249 &data_logger_config.output_file_name_heating_is_on,
1250 target_heater_status,
1251 "ON",
1252 "OFF",
1253 );
1254 }
1255
1256 #[test]
1257 // test case checks if DataLogger correctly processes all information:
1258 // - storage in SQL database using mock implementation
1259 // - writing output to the filesystem
1260 // Test case uses test database #45.
1261 pub fn test_data_log() {
1262 let mut channels = Channels::new_for_test();
1263
1264 // test environment-specific channels
1265 let (mut tx_test_environment_to_ventilation, mut rx_ventilation_from_test_environment) =
1266 channel(1);
1267 let (mut tx_test_environment_to_heating, mut rx_heating_from_test_environment) = channel(1);
1268
1269 let mut mock_data_injection = MockDataInjection::new();
1270
1271 let mut config: ConfigData = read_config_file_with_test_database(
1272 "/config/aquarium_control_test_generic.toml".to_string(),
1273 45,
1274 );
1275 config.data_logger.logging_interval = 1;
1276
1277 let mut config2: ConfigData = read_config_file_with_test_database(
1278 "/config/aquarium_control_test_generic.toml".to_string(),
1279 45,
1280 );
1281 config2.data_logger.logging_interval = 1;
1282
1283 let max_rows_data = config.sql_interface.max_rows_data;
1284
1285 let sql_interface = match SqlInterface::new(config.sql_interface.clone()) {
1286 Ok(c) => c,
1287 Err(e) => {
1288 panic!("Could not connect to SQL database: {e:?}");
1289 }
1290 };
1291 let sql_interface_data =
1292 SqlInterfaceData::new(sql_interface.get_connection().unwrap(), max_rows_data).unwrap();
1293 let mut data_logger =
1294 DataLogger::new(config.data_logger, Duration::from_millis(1000)).unwrap();
1295
1296 // replacement values are used to initialize the mutex content
1297 config.sensor_manager.replacement_value_water_temperature = 10.0;
1298 config.sensor_manager.replacement_value_ph = 11.0;
1299 config.sensor_manager.replacement_value_conductivity = 12.0;
1300
1301 let mutex_ds18b20_water_temperature = Arc::new(Mutex::new(15.0));
1302 let mutex_ds18b20_ambient_temperature = Arc::new(Mutex::new(16.0));
1303 let mutex_tank_level_switch_signals =
1304 Arc::new(Mutex::new(TankLevelSwitchSignals::new(false, false, false)));
1305 let mutex_tank_level_switch_signals_clone = mutex_tank_level_switch_signals.clone();
1306 let mutex_heating_status = Arc::new(Mutex::new(false));
1307 let mutex_heating_status_clone_for_data_logger = mutex_heating_status.clone();
1308 let mutex_ventilation_status = Arc::new(Mutex::new(false));
1309 let mutex_ventilation_status_clone_for_data_logger = mutex_ventilation_status.clone();
1310 let refill_status = RefillStatus {
1311 refill_in_progress_live: false,
1312 refill_in_progress_for_database: false,
1313 };
1314 let mutex_refill_status = Arc::new(Mutex::new(refill_status));
1315 let mutex_refill_status_clone_for_data_logger = mutex_refill_status.clone();
1316 let mutex_refill_status_clone_for_test_environment = mutex_refill_status.clone();
1317 config.sensor_manager.replacement_value_ambient_temperature = 13.0;
1318 config.sensor_manager.replacement_value_ambient_humidity = 14.0;
1319 let mutex_sensor_manager_signals = Arc::new(Mutex::new(SensorManagerSignals::new(
1320 &config.sensor_manager,
1321 )));
1322 let mutex_sensor_manager_signals_clone_for_data_logger =
1323 mutex_sensor_manager_signals.clone();
1324 // thread for test environment and mock signal handler
1325 let join_handle_test_environment = thread::Builder::new()
1326 .name("test_environment".to_string())
1327 .spawn(move || {
1328 let spin_sleeper = SpinSleeper::default();
1329 let sleep_duration_1_second = Duration::from_millis(1000);
1330 let sleep_duration_100_millis = Duration::from_millis(100);
1331 spin_sleeper.sleep(sleep_duration_1_second);
1332 spin_sleeper.sleep(sleep_duration_100_millis);
1333
1334 let _ = tx_test_environment_to_ventilation.send(TestCommand::UpdateSignal(
1335 MockSignal::VentilationControlStatus(true),
1336 ));
1337 {
1338 let mut sensor_manager_signals = mutex_sensor_manager_signals.lock().unwrap();
1339 sensor_manager_signals.water_temperature = 29.0;
1340 }
1341
1342 spin_sleeper.sleep(sleep_duration_1_second);
1343
1344 let _ = tx_test_environment_to_heating.send(TestCommand::UpdateSignal(
1345 MockSignal::HeatingControlStatus(true),
1346 ));
1347
1348 {
1349 let mut sensor_manager_signals = mutex_sensor_manager_signals.lock().unwrap();
1350 sensor_manager_signals.ph = 7.0;
1351 }
1352
1353 spin_sleeper.sleep(sleep_duration_1_second);
1354
1355 {
1356 let mut sensor_manager_signals = mutex_sensor_manager_signals.lock().unwrap();
1357 sensor_manager_signals.conductivity = 50000.0;
1358 }
1359
1360 spin_sleeper.sleep(sleep_duration_1_second);
1361
1362 {
1363 let mut tank_level_switch_signals =
1364 mutex_tank_level_switch_signals.lock().unwrap();
1365 tank_level_switch_signals.tank_level_switch_invalid = true;
1366 }
1367
1368 {
1369 let mut sensor_manager_signals = mutex_sensor_manager_signals.lock().unwrap();
1370 sensor_manager_signals.ambient_temperature = 35.0;
1371 }
1372
1373 {
1374 let mut refill_status = mutex_refill_status_clone_for_test_environment
1375 .lock()
1376 .unwrap();
1377 refill_status.refill_in_progress_for_database = true;
1378 }
1379
1380 spin_sleeper.sleep(sleep_duration_1_second);
1381
1382 {
1383 let mut tank_level_switch_signals =
1384 mutex_tank_level_switch_signals.lock().unwrap();
1385 tank_level_switch_signals.tank_level_switch_position = true;
1386 }
1387
1388 {
1389 let mut sensor_manager_signals = mutex_sensor_manager_signals.lock().unwrap();
1390 sensor_manager_signals.ambient_humidity = 66.0;
1391 }
1392
1393 spin_sleeper.sleep(sleep_duration_1_second);
1394
1395 {
1396 let mut water_temperature_ds18b20 =
1397 mutex_ds18b20_water_temperature.lock().unwrap();
1398 *water_temperature_ds18b20 = 15.5;
1399 let mut ambient_temperature_ds18b20 =
1400 mutex_ds18b20_ambient_temperature.lock().unwrap();
1401 *ambient_temperature_ds18b20 = 16.5;
1402 }
1403
1404 {
1405 let mut tank_level_switch_signals =
1406 mutex_tank_level_switch_signals.lock().unwrap();
1407 tank_level_switch_signals.tank_level_switch_position_stabilized = true;
1408 }
1409
1410 spin_sleeper.sleep(sleep_duration_1_second);
1411
1412 // cease operation of the test object
1413 let _ = channels
1414 .signal_handler
1415 .send_to_data_logger(InternalCommand::Quit);
1416 channels.signal_handler.receive_from_data_logger().unwrap();
1417
1418 // signal end of test case execution to mock threads
1419 let _ = channels
1420 .signal_handler
1421 .send_to_heating(InternalCommand::Quit);
1422 let _ = channels
1423 .signal_handler
1424 .send_to_ventilation(InternalCommand::Quit);
1425
1426 spin_sleeper.sleep(sleep_duration_1_second);
1427 let _ = channels
1428 .signal_handler
1429 .send_to_data_logger(InternalCommand::Terminate);
1430 })
1431 .unwrap();
1432
1433 // thread for mock heating
1434 let join_handle_mock_heating = thread::Builder::new()
1435 .name("mock_heating".to_string())
1436 .spawn(move || {
1437 mock_heating(
1438 None,
1439 None,
1440 &mut channels.heating.rx_heating_from_signal_handler,
1441 Some(&mut rx_heating_from_test_environment),
1442 mutex_heating_status, // initial heating control status
1443 );
1444 })
1445 .unwrap();
1446
1447 // thread for mock ventilation
1448 let join_handle_mock_ventilation = thread::Builder::new()
1449 .name("mock_ventilation".to_string())
1450 .spawn(move || {
1451 mock_ventilation(
1452 &mut channels.ventilation.rx_ventilation_from_signal_handler,
1453 Some(&mut rx_ventilation_from_test_environment),
1454 mutex_ventilation_status, // initial ventilation control status
1455 );
1456 })
1457 .unwrap();
1458
1459 // give mock threads some time to establish
1460 let spin_sleeper = SpinSleeper::default();
1461 let sleep_duration_100_millis = Duration::from_millis(100);
1462 spin_sleeper.sleep(sleep_duration_100_millis);
1463
1464 // thread for the test object and asserts
1465 let join_handle_test_object = thread::Builder::new()
1466 .name("test_object".to_string())
1467 .spawn(move || {
1468 let data_logger_mutexes = DataLoggerMutexes {
1469 mutex_sensor_manager_signals:
1470 mutex_sensor_manager_signals_clone_for_data_logger,
1471 mutex_tank_level_switch_signals: mutex_tank_level_switch_signals_clone,
1472 mutex_heating_status: mutex_heating_status_clone_for_data_logger,
1473 mutex_ventilation_status: mutex_ventilation_status_clone_for_data_logger,
1474 mutex_refill_status: mutex_refill_status_clone_for_data_logger,
1475 };
1476
1477 data_logger.execute(
1478 &mut mock_data_injection,
1479 sql_interface_data,
1480 &mut channels.data_logger,
1481 data_logger_mutexes,
1482 );
1483
1484 println!("Printing recorded data:");
1485 mock_data_injection.output();
1486
1487 // Assert if data logger has set the refill-in-progress bit to false.
1488 {
1489 let refill_status = mutex_refill_status.lock().unwrap();
1490 println!(
1491 "Checking if data logger has set the refill-in-progress bit to false..."
1492 );
1493 assert_eq!(refill_status.refill_in_progress_for_database, false);
1494 }
1495
1496 // Dataframe #5
1497 let mut data_frame = mock_data_injection.data_frames.pop().unwrap();
1498 println!("Asserting data frame #5");
1499 data_frame.assert(
1500 Some(29.0), // target_water_temperature
1501 Some(28.40625), // target_water_temperature_filtered
1502 Some(7.0), // target_water_ph
1503 Some(7.125), // target_water_ph_filtered
1504 Some(50_000.0), // target_water_conductivity
1505 Some(46875.75), // target_water_conductivity_filtered
1506 None, // target_water_conductivity_compensated
1507 Some(false), // target_refill_in_progress
1508 Some(true), // target_tank_level_switch_position
1509 Some(true), // target_tank_level_switch_position_stabilized
1510 Some(true), // target_tank_level_switch_invalid
1511 Some(true), // target_surface_ventilation_status
1512 Some(35.0), // target_ambient_temperature
1513 Some(66.0), // target_ambient_humidity
1514 Some(true), // target_heater_status
1515 );
1516
1517 // Dataframe #4
1518 data_frame = mock_data_injection.data_frames.pop().unwrap();
1519 println!("Asserting data frame #4");
1520 data_frame.assert(
1521 Some(29.0), // target_water_temperature
1522 Some(27.8125), // target_water_temperature_filtered
1523 Some(7.0), // target_water_ph
1524 Some(7.25), // target_water_ph_filtered
1525 Some(50_000.0), // target_water_conductivity
1526 Some(43751.5), // target_water_conductivity_filtered
1527 None, // target_water_conductivity_compensated
1528 Some(false), // target_refill_in_progress
1529 Some(true), // target_tank_level_switch_position
1530 Some(false), // target_tank_level_switch_position_stabilized
1531 Some(true), // target_tank_level_switch_invalid
1532 Some(true), // target_surface_ventilation_status
1533 Some(35.0), // target_ambient_temperature
1534 Some(66.0), // target_ambient_humidity
1535 Some(true), // target_heater_status
1536 );
1537
1538 // Dataframe #3
1539 data_frame = mock_data_injection.data_frames.pop().unwrap();
1540 println!("Asserting data frame #3");
1541 data_frame.assert(
1542 Some(29.0), // target_water_temperature
1543 Some(26.625), // target_water_temperature_filtered
1544 Some(7.0), // target_water_ph
1545 Some(7.5), // target_water_ph_filtered
1546 Some(50_000.0), // target_water_conductivity
1547 Some(37503.0), // target_water_conductivity_filtered
1548 None, // target_water_conductivity_compensated
1549 Some(true), // target_refill_in_progress
1550 Some(false), // target_tank_level_switch_position
1551 Some(false), // target_tank_level_switch_position_stabilized
1552 Some(true), // target_tank_level_switch_invalid
1553 Some(true), // target_surface_ventilation_status
1554 Some(35.0), // target_ambient_temperature
1555 Some(14.0), // target_ambient_humidity
1556 Some(true), // target_heater_status
1557 );
1558
1559 // Dataframe #2
1560 data_frame = mock_data_injection.data_frames.pop().unwrap();
1561 println!("Asserting data frame #2");
1562 data_frame.assert(
1563 Some(29.0), // target_water_temperature
1564 Some(24.25), // target_water_temperature_filtered
1565 Some(7.0), // target_water_ph
1566 Some(8.0), // target_water_ph_filtered
1567 Some(50_000.0), // target_water_conductivity
1568 Some(25006.0), // target_water_conductivity_filtered
1569 None, // target_water_conductivity_compensated
1570 Some(false), // target_refill_in_progress
1571 Some(false), // target_tank_level_switch_position
1572 Some(false), // target_tank_level_switch_position_stabilized
1573 Some(false), // target_tank_level_switch_invalid
1574 Some(true), // target_surface_ventilation_status
1575 Some(13.0), // target_ambient_temperature
1576 Some(14.0), // target_ambient_humidity
1577 Some(true), // target_heater_status
1578 );
1579
1580 // Dataframe #1
1581 data_frame = mock_data_injection.data_frames.pop().unwrap();
1582 println!("Asserting data frame #1");
1583 data_frame.assert(
1584 Some(29.0), // target_water_temperature
1585 Some(19.5), // target_water_temperature_filtered
1586 Some(7.0), // target_water_ph
1587 Some(9.0), // target_water_ph_filtered
1588 Some(12.0), // target_water_conductivity
1589 Some(12.0), // target_water_conductivity_filtered
1590 None, // target_water_conductivity_compensated
1591 Some(false), // target_refill_in_progress
1592 Some(false), // target_tank_level_switch_position
1593 Some(false), // target_tank_level_switch_position_stabilized
1594 Some(false), // target_tank_level_switch_invalid
1595 Some(true), // target_surface_ventilation_status
1596 Some(13.0), // target_ambient_temperature
1597 Some(14.0), // target_ambient_humidity
1598 Some(true), // target_heater_status
1599 );
1600
1601 assert!(mock_data_injection.data_frames.is_empty());
1602
1603 assert_output_file_contents(
1604 &config2.data_logger, // data_logger_config,
1605 29.0, // target_water_temperature
1606 28.4, // target_water_temperature_filtered
1607 7.0, // target_water_ph
1608 7.12, // target_water_ph_filtered
1609 50_000.0, // target_water_conductivity
1610 46876.0, // target_water_conductivity_filtered
1611 0.0, // target_water_conductivity_compensated
1612 false, // target_refill_in_progress
1613 true, // target_tank_level_switch_position
1614 true, // target_tank_level_switch_invalid
1615 true, // target_tank_level_switch_position_stabilized
1616 true, // target_surface_ventilation_status
1617 35.0, // target_ambient_temperature
1618 66.0, // target_ambient_humidity
1619 true, // target_heater_status
1620 );
1621 })
1622 .unwrap();
1623
1624 join_handle_mock_ventilation
1625 .join()
1626 .expect("Mock ventilation thread did not finish.");
1627 join_handle_mock_heating
1628 .join()
1629 .expect("Mock heating thread did not finish.");
1630 join_handle_test_environment
1631 .join()
1632 .expect("Test environment thread did not finish.");
1633 join_handle_test_object
1634 .join()
1635 .expect("Mock data logger thread did not finish.");
1636 }
1637}