aquarium_control/recorder/
data_logger.rs

1/* Copyright 2024 Uwe Martin
2
3Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
4
5The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
6
7THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
8*/
9
10#![allow(non_snake_case)]
11use chrono::{Local, NaiveDateTime};
12#[cfg(all(not(test), target_os = "linux"))]
13use log::info;
14use log::{error, warn};
15
16#[cfg(feature = "debug_data_logger")]
17use log::debug;
18
19#[cfg(all(not(test), target_os = "linux"))]
20use nix::unistd::gettid;
21
22use mysql::PooledConn;
23use spin_sleep::SpinSleeper;
24use std::fs;
25use std::sync::{Arc, Mutex};
26use std::time::{Duration, Instant};
27use thiserror::Error;
28
29use crate::database::sql_interface::SqlInterface;
30use crate::database::sql_interface_data::SqlInterfaceData;
31use crate::recorder::data_injection::DataInjectionTrait;
32use crate::recorder::data_logger_channels::DataLoggerChannels;
33use crate::recorder::data_logger_config::DataLoggerConfig;
34use crate::recorder::recorder_data_frame::RecorderDataFrame;
35use crate::sensors::sensor_manager::SensorManagerSignals;
36use crate::sensors::tank_level_switch::TankLevelSwitchSignals;
37use crate::utilities::acknowledge_signal_handler::AcknowledgeSignalHandlerTrait;
38use crate::utilities::channel_content::AquariumSignal;
39use crate::utilities::database_ping_trait::DatabasePingTrait;
40use crate::utilities::iir_filter::IIRFilter;
41use crate::utilities::proc_ext_req::ProcessExternalRequestTrait;
42use crate::utilities::wait_for_termination::WaitForTerminationTrait;
43use crate::water::refill::RefillStatus;
44
45const MILLIS_PER_SEC: u64 = 1000;
46
47/// Contains `Arc<Mutex>`-wrapped shared data points accessed by the data logger.
48///
49/// These mutexes provide thread-safe access to various sensor readings and
50/// device states that are collected by the data logger for recording and filtering.
51pub struct DataLoggerMutexes {
52    /// Mutex for sensor manager signals
53    pub mutex_sensor_manager_signals: Arc<Mutex<SensorManagerSignals>>,
54
55    /// Mutex for tank level switch signals
56    pub mutex_tank_level_switch_signals: Arc<Mutex<TankLevelSwitchSignals>>,
57
58    /// Mutex for heater status
59    pub mutex_heating_status: Arc<Mutex<bool>>,
60
61    /// Mutex for ventilation status
62    pub mutex_ventilation_status: Arc<Mutex<bool>>,
63
64    /// Mutex for refill status
65    pub mutex_refill_status: Arc<Mutex<RefillStatus>>,
66}
67
68/// Contains the error definition for DataLogger
69#[derive(Error, Debug)]
70pub enum DataLoggerError {
71    #[error("[{0}] One of the output file names for the data logging to filesystem is empty.")]
72    OutputFilenameEmpty(String),
73
74    #[error("Could not reset content of output file for {signal_name} ({output_file_name})")]
75    FileResetFailure {
76        location: String,
77        signal_name: String,
78        output_file_name: String,
79
80        #[source]
81        source: std::io::Error,
82    },
83}
84
85#[cfg_attr(doc, aquamarine::aquamarine)]
86/// Contains the configuration and the implementation for regular data logging to SQL database
87/// and writing to the file (in RAM disk).
88/// Thread communication is as follows:
89/// ```mermaid
90/// graph LR
91///     atlas_scientific[AtlasScientific] -.-> data_logger
92///     ambient[Ambient] -.-> data_logger
93///     tank_level_switch[TankLevelSwitch] --> data_logger
94///     refill[Refill control] -.-> data_logger
95///     ventilation[Ventilation control] --> data_logger
96///     data_logger --> signal_handler[Signal handler]
97///     signal_handler[Signal handler] --> data_logger
98/// ```
99pub struct DataLogger {
100    /// configuration data for the data logger
101    config: DataLoggerConfig,
102
103    /// an inhibition flag to avoid flooding the log file with repeated messages about failure to create the timestamp
104    lock_error_get_timestamp: bool,
105
106    /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write refill in progress to disk
107    lock_error_output_state_refill_in_progress: bool,
108
109    /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write heating is on to disk
110    lock_error_output_state_heating_is_on: bool,
111
112    /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write ventilation is on to disk
113    lock_error_output_state_ventilation_is_on: bool,
114
115    /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write water temperature to disk
116    lock_error_output_signal_water_temperature: bool,
117
118    /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write pH to disk
119    lock_error_output_signal_pH: bool,
120
121    /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write conductivity to disk
122    lock_error_output_signal_conductivity: bool,
123
124    /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write ambient temperature to disk
125    lock_error_output_signal_ambient_temperature: bool,
126
127    /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write humidity to disk
128    lock_error_output_signal_humidity: bool,
129
130    /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write tank level switch position to disk
131    lock_error_output_state_tank_level_switch_position: bool,
132
133    /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write tank level switch invalid to disk
134    lock_error_output_state_tank_level_switch_invalid: bool,
135
136    /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write tank level switch position stabilized to disk
137    lock_error_output_state_tank_level_switch_position_stabilized: bool,
138
139    /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write temperature filtered to disk
140    lock_error_output_signal_water_temperature_filtered: bool,
141
142    /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write pH filtered to disk
143    lock_error_output_signal_pH_filtered: bool,
144
145    /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write conductivity filtered to disk
146    lock_error_output_signal_conductivity_filtered: bool,
147
148    /// an inhibition flag to avoid flooding the log file with repeated messages about failure to write timestamp to disk
149    lock_error_output_timestamp: bool,
150
151    /// an inhibition flag to avoid flooding the log file with repeated messages about having received an inapplicable command
152    pub lock_warn_inapplicable_command_signal_handler: bool,
153
154    /// an inhibition flag to avoid flooding the log file with repeated messages about failure to receive termination signal via the channel
155    pub lock_error_channel_receive_termination: bool,
156
157    /// recording when the last database ping happened
158    pub last_ping_instant: Instant,
159
160    /// database ping interval
161    pub database_ping_interval: Duration,
162    lock_error_mutex_signal_manager: bool,
163}
164
165impl DataLogger {
166    /// Creates a new `DataLogger` instance.
167    ///
168    /// This constructor initializes the data logger module with its configuration.
169    /// If `output_to_disk` is enabled, it validates that all required output file
170    /// paths are non-empty and then resets each file by writing an empty string to it.
171    /// It also initializes many internal "lock" flags designed to prevent log flooding.
172    ///
173    /// # Arguments
174    /// * `config` - Configuration data for the data logger, defining logging intervals,
175    ///   filter coefficients, output file paths, and other operational parameters.
176    /// * `database_ping_interval` - A `Duration` instance, providing the interval to ping the database.
177    ///
178    /// # Returns
179    /// A `Result` containing a new, initialized `DataLogger` instance on success.
180    ///
181    /// # Errors
182    /// This function will return a `DataLoggerError` if `output_to_disk` is enabled and:
183    /// - Any of the configured output filenames are empty (`OutputFilenameEmpty`).
184    /// - It fails to write to (and thus reset) any of the configured output files,
185    ///   which could be due to permission issues or an invalid path (`FileResetFailure`).
186    pub fn new(
187        config: DataLoggerConfig,
188        database_ping_interval: Duration,
189    ) -> Result<DataLogger, DataLoggerError> {
190        if config.output_to_disk {
191            // By defining the files in a collection, we can use iterators to check and
192            // initialize them, making the code concise, readable, and easy to maintain.
193            let files_to_initialize = [
194                (&config.output_file_name_timestamp, "timestamp"),
195                (
196                    &config.output_file_name_water_temperature,
197                    "water temperature",
198                ),
199                (
200                    &config.output_file_name_water_temperature_filtered,
201                    "filtered water temperature",
202                ),
203                (&config.output_file_name_pH, "pH value"),
204                (&config.output_file_name_pH_filtered, "filtered pH value"),
205                (&config.output_file_name_conductivity, "conductivity"),
206                (
207                    &config.output_file_name_conductivity_filtered,
208                    "filtered conductivity",
209                ),
210                (
211                    &config.output_file_name_tank_level_switch_position,
212                    "tank level switch position",
213                ),
214                (
215                    &config.output_file_name_tank_level_switch_invalid,
216                    "tank level switch invalid",
217                ),
218                (
219                    &config.output_file_name_tank_level_switch_position_stabilized,
220                    "tank level switch stabilized",
221                ),
222                (
223                    &config.output_file_name_ventilation_is_on,
224                    "ventilation status",
225                ),
226                (&config.output_file_name_heating_is_on, "heating status"),
227                (
228                    &config.output_file_name_ambient_temperature,
229                    "ambient temperature",
230                ),
231                (&config.output_file_name_humidity, "humidity"),
232                (
233                    &config.output_file_name_refill_in_progress,
234                    "refill in progress",
235                ),
236            ];
237
238            // 1. Check if any filename is empty.
239            if files_to_initialize
240                .iter()
241                .any(|(filename, _)| filename.is_empty())
242            {
243                return Err(DataLoggerError::OutputFilenameEmpty(
244                    module_path!().to_string(),
245                ));
246            }
247
248            // 2. Initialize all files by writing an empty string.
249            for (filename, signal_name) in files_to_initialize {
250                fs::write(filename, String::new()).map_err(|e| {
251                    DataLoggerError::FileResetFailure {
252                        location: module_path!().to_string(),
253                        signal_name: signal_name.to_string(),
254                        output_file_name: filename.clone(),
255                        source: e,
256                    }
257                })?;
258            }
259        }
260
261        Ok(Self {
262            config,
263            lock_error_get_timestamp: false,
264            lock_error_output_signal_water_temperature: false,
265            lock_error_output_state_refill_in_progress: false,
266            lock_error_output_state_heating_is_on: false,
267            lock_error_output_state_ventilation_is_on: false,
268            lock_error_output_signal_pH: false,
269            lock_error_output_signal_conductivity: false,
270            lock_error_output_signal_ambient_temperature: false,
271            lock_error_output_signal_humidity: false,
272            lock_error_output_state_tank_level_switch_position: false,
273            lock_error_output_state_tank_level_switch_invalid: false,
274            lock_error_output_state_tank_level_switch_position_stabilized: false,
275            lock_error_output_signal_water_temperature_filtered: false,
276            lock_error_output_signal_pH_filtered: false,
277            lock_error_output_signal_conductivity_filtered: false,
278            lock_error_output_timestamp: false,
279            lock_warn_inapplicable_command_signal_handler: false,
280            lock_error_channel_receive_termination: false,
281            last_ping_instant: Instant::now(),
282            database_ping_interval,
283            lock_error_mutex_signal_manager: false,
284        })
285    }
286
287    /// Outputs a non-binary sensor signal to a specified file, typically located on a RAM disk.
288    ///
289    /// This private helper function converts a floating-point `signal` into a human-readable
290    /// string format (determined by `signal_type`) and attempts to write it to the given
291    /// `output_file_name`. The operation only proceeds if `output_to_disk` is enabled in
292    /// the data logger's configuration.
293    ///
294    /// Error logging for writing failures is managed by the `inhibit_error_log` flag,
295    /// preventing a flood of repetitive error messages.
296    ///
297    /// # Arguments
298    /// * `signal` - The `f32` sensor value to be written to the file.
299    /// * `signal_type` - An `AquariumSignal` enum variant, used to determine the correct
300    ///   formatting for the `signal` value (e.g., number of decimal places).
301    /// * `output_file_name` - A reference to the `String` containing the full path to the output file.
302    /// * `inhibit_error_log` - A boolean flag that, if `true`, will prevent this specific
303    ///   error from being logged to the console/main log if a write operation fails.
304    ///
305    /// # Returns
306    /// `true` if an error occurred during the file write operation; `false` otherwise (including
307    /// when `output_to_disk` is `false` and no write is attempted).
308    fn output_signal(
309        &self,
310        signal: f32,
311        signal_type: AquariumSignal,
312        output_file_name: &String,
313        inhibit_error_log: bool,
314    ) -> bool {
315        if self.config.output_to_disk {
316            let signal_string = signal_type.output_file_string(signal);
317
318            match fs::write(output_file_name, signal_string.clone()) {
319                Ok(_) => {
320                    /* do nothing */
321                    false // indicate no error
322                }
323                Err(e) => {
324                    if !inhibit_error_log {
325                        error!(
326                            target: module_path!(),
327                            "output of value {signal_string} to {output_file_name} failed: {e:?}"
328                        );
329                    }
330                    true // indicate error
331                }
332            }
333        } else {
334            false // no output = no error
335        }
336    }
337
338    /// Outputs a boolean (binary) signal's state to a specified file, typically located on a RAM disk.
339    ///
340    /// This private helper function converts a boolean `state` into a descriptive string
341    /// (e.g., "ON"/"OFF", "HIGH"/"LOW") using provided labels and attempts to write it
342    /// to the given `output_file_name`. This operation only occurs if `output_to_disk`
343    /// is enabled in the data logger's configuration.
344    ///
345    /// Error logging for writing failures is controlled by the `inhibit_error_log` flag,
346    /// preventing a flood of repetitive error messages.
347    ///
348    /// # Arguments
349    /// * `state` - The `bool` value of the binary signal (e.g., `true` for ON, `false` for OFF).
350    /// * `state_description_true` - The string label to write to the file when `state` is `true`.
351    /// * `state_description_false` - The string label to write to the file when `state` is `false`.
352    /// * `output_file_name` - A reference to the `String` containing the full path to the output file.
353    /// * `inhibit_error_log` - A boolean flag that, if `true`, will prevent this specific
354    ///   error from being logged to the console/main log if a write operation fails.
355    ///
356    /// # Returns
357    /// - `true`: If an error occurred during the file write operation.
358    /// - `false`: If the write-operation was successful or if `output_to_disk` is disabled.
359    fn output_state(
360        &self,
361        state: bool,
362        state_description_true: &str,
363        state_description_false: &str,
364        output_file_name: &String,
365        inhibit_error_log: bool,
366    ) -> bool {
367        if self.config.output_to_disk {
368            let state_string = match state {
369                true => state_description_true,
370                false => state_description_false,
371            };
372
373            match fs::write(output_file_name, state_string) {
374                Ok(_) => {
375                    /* do nothing */
376                    false // indicate no error
377                }
378                Err(e) => {
379                    if !inhibit_error_log {
380                        error!(
381                            target: module_path!(),
382                            "output of value {state_string} to {output_file_name} failed: {e:?}"
383                        );
384                    }
385                    true // indicate error
386                }
387            }
388        } else {
389            false // no output = no error
390        }
391    }
392
393    /// Outputs a `NaiveDateTime` timestamp to a specified file, typically located on a RAM disk.
394    ///
395    /// This private helper function converts a `NaiveDateTime` into its string representation
396    /// and attempts to write it to the given `output_file_name`. This operation only occurs
397    /// if `output_to_disk` is enabled in the data logger's configuration.
398    ///
399    /// Error logging for writing failures is controlled by the `inhibit_error_log` flag,
400    /// preventing a flood of repetitive error messages.
401    ///
402    /// # Arguments
403    /// * `timestamp` - The `NaiveDateTime` value to be written to the file.
404    /// * `output_file_name` - A reference to the `String` containing the full path to the output file.
405    /// * `inhibit_error_log` - A boolean flag that, if `true`, will prevent this specific
406    ///   error from being logged to the console/main log if a write operation fails.
407    ///
408    /// # Returns
409    /// - `true`: If an error occurred during the file write operation.
410    /// - `false`: If the write-operation was successful or if `output_to_disk` is disabled.
411    fn output_timestamp(
412        &self,
413        timestamp: NaiveDateTime,
414        output_file_name: &String,
415        inhibit_error_log: bool,
416    ) -> bool {
417        if self.config.output_to_disk {
418            match fs::write(output_file_name, timestamp.to_string()) {
419                Ok(_) => {
420                    /* do nothing */
421                    false // indicate no error
422                }
423                Err(e) => {
424                    if !inhibit_error_log {
425                        error!(
426                            target: module_path!(),
427                            "output of value {timestamp} to {output_file_name} failed: {e:?}"
428                        );
429                    }
430                    true // indicate error
431                }
432            }
433        } else {
434            false // no output = no error
435        }
436    }
437
438    /// Retrieves current ON/OFF status signals from Refill, Heating, and Ventilation control modules via channels.
439    ///
440    /// This function sends a `RequestSignal` command to each of the specified
441    /// control modules (Refill, Heating, Ventilation) and updates the provided
442    /// status flags based on their responses. It also tracks channel disconnection
443    /// status to prevent repeated errors.
444    ///
445    /// # Arguments
446    /// * `refill_status` - A mutable reference for indicating if the refill is in progress.
447    /// * `heater_status` - A mutable reference for indicating the heater state.
448    /// * `ventilation_status` - A mutable reference for indicating the ventilation state.
449    fn update_signals(
450        &mut self,
451        refill_status_live: &mut bool,
452        refill_status_for_database: &mut bool,
453        refill_reset_for_database: bool,
454        heater_status: &mut bool,
455        ventilation_status: &mut bool,
456        data_logger_mutexes: &DataLoggerMutexes,
457    ) {
458        {
459            // internal scope to limit lifetime of mutex lock
460            match data_logger_mutexes.mutex_refill_status.lock() {
461                Ok(mut refill_status) => {
462                    self.lock_error_output_state_refill_in_progress = self.output_state(
463                        refill_status.refill_in_progress_live, // use the live signal
464                        "ON",
465                        "OFF",
466                        &self.config.output_file_name_refill_in_progress,
467                        self.lock_error_output_state_refill_in_progress,
468                    );
469                    *refill_status_live = refill_status.refill_in_progress_live;
470                    *refill_status_for_database = refill_status.refill_in_progress_for_database;
471                    if refill_reset_for_database {
472                        refill_status.refill_in_progress_for_database =
473                            refill_status.refill_in_progress_live;
474                    }
475                }
476                Err(_) => {
477                    self.lock_error_output_state_refill_in_progress = true;
478                    error!(target: module_path!(), "error locking mutex for refill status");
479                }
480            };
481        }
482
483        {
484            // internal scope to limit lifetime of mutex lock
485            *heater_status = match data_logger_mutexes.mutex_heating_status.lock() {
486                Ok(c) => {
487                    self.lock_error_output_state_heating_is_on = self.output_state(
488                        *c,
489                        "ON",
490                        "OFF",
491                        &self.config.output_file_name_heating_is_on,
492                        self.lock_error_output_state_heating_is_on,
493                    );
494                    *c
495                }
496                Err(_) => {
497                    self.lock_error_output_state_heating_is_on = true;
498                    error!(target: module_path!(), "error locking mutex for heating status");
499                    false
500                }
501            };
502        }
503
504        {
505            // internal scope to limit lifetime of mutex lock
506            *ventilation_status = match data_logger_mutexes.mutex_ventilation_status.lock() {
507                Ok(c) => {
508                    self.lock_error_output_state_ventilation_is_on = self.output_state(
509                        *c,
510                        "ON",
511                        "OFF",
512                        &self.config.output_file_name_ventilation_is_on,
513                        self.lock_error_output_state_ventilation_is_on,
514                    );
515                    *c
516                }
517                Err(_) => {
518                    self.lock_error_output_state_ventilation_is_on = true;
519                    error!(target: module_path!(), "error locking mutex for ventilation status");
520                    false
521                }
522            };
523        }
524    }
525
526    /// Retrieves current sensor readings from the `SensorManager` via a shared mutex and outputs them to files.
527    ///
528    /// This private helper function locks the `mutex_sensor_manager_signals` to get the latest
529    /// readings for water temperature, pH, conductivity, ambient temperature, and humidity.
530    /// It updates the corresponding `Option<f32>` variables with the retrieved values and then
531    /// calls `output_signal` to write these readings to their respective configured files on disk.
532    /// If the mutex lock fails, an error is logged, and the function returns without updating the values.
533    ///
534    /// # Arguments
535    /// * `mutex_sensor_manager_signals` - An `Arc<Mutex<SensorManagerSignals>>` holding the latest sensor readings.
536    /// * `water_temperature_opt` - A mutable reference to an `Option<f32>` where the retrieved water temperature will be stored.
537    /// * `ph_opt` - A mutable reference to an `Option<f32>` where the retrieved pH value will be stored.
538    /// * `conductivity_opt` - A mutable reference to an `Option<f32>` where the retrieved conductivity value will be stored.
539    /// * `ambient_temperature_opt` - A mutable reference to an `Option<f32>` where the retrieved ambient temperature will be stored.
540    /// * `humidity_opt` - A mutable reference to an `Option<f32>` where the retrieved humidity will be stored.
541    fn update_sensor_manager_signals(
542        &mut self,
543        mutex_sensor_manager_signals: Arc<Mutex<SensorManagerSignals>>,
544        water_temperature_opt: &mut Option<f32>,
545        ph_opt: &mut Option<f32>,
546        conductivity_opt: &mut Option<f32>,
547        ambient_temperature_opt: &mut Option<f32>,
548        humidity_opt: &mut Option<f32>,
549    ) {
550        let water_temperature: f32;
551        let ph: f32;
552        let conductivity: f32;
553        let ambient_temperature: f32;
554        let humidity: f32;
555        {
556            // internal scope to limit lifetime of mutex lock
557            match mutex_sensor_manager_signals.lock() {
558                Ok(c) => {
559                    self.lock_error_mutex_signal_manager = false;
560                    water_temperature = c.water_temperature;
561                    ph = c.ph;
562                    conductivity = c.conductivity;
563                    ambient_temperature = c.ambient_temperature;
564                    humidity = c.ambient_humidity;
565                }
566                Err(_) => {
567                    if !self.lock_error_mutex_signal_manager {
568                        self.lock_error_mutex_signal_manager = true;
569                        error!(target: module_path!(), "error locking mutex for sensor manager");
570                    }
571                    return;
572                }
573            };
574            self.lock_error_output_signal_water_temperature = self.output_signal(
575                water_temperature,
576                AquariumSignal::WaterTemperature,
577                &self.config.output_file_name_water_temperature,
578                self.lock_error_output_signal_water_temperature,
579            );
580            self.lock_error_output_signal_pH = self.output_signal(
581                ph,
582                AquariumSignal::pH,
583                &self.config.output_file_name_pH,
584                self.lock_error_output_signal_pH,
585            );
586            self.lock_error_output_signal_conductivity = self.output_signal(
587                conductivity,
588                AquariumSignal::Conductivity,
589                &self.config.output_file_name_conductivity,
590                self.lock_error_output_signal_conductivity,
591            );
592            self.lock_error_output_signal_ambient_temperature = self.output_signal(
593                ambient_temperature,
594                AquariumSignal::AmbientTemperature,
595                &self.config.output_file_name_ambient_temperature,
596                self.lock_error_output_signal_ambient_temperature,
597            );
598            self.lock_error_output_signal_humidity = self.output_signal(
599                humidity,
600                AquariumSignal::AmbientHumidity,
601                &self.config.output_file_name_humidity,
602                self.lock_error_output_signal_humidity,
603            );
604
605            *water_temperature_opt = Some(water_temperature);
606            *ph_opt = Some(ph);
607            *conductivity_opt = Some(conductivity);
608            *ambient_temperature_opt = Some(ambient_temperature);
609            *humidity_opt = Some(humidity);
610        }
611    }
612
613    /// Retrieves current signals from the `TankLevelSwitch` via a shared mutex and outputs them to files.
614    ///
615    /// This private helper function locks the `mutex_tank_level_switch_signals` to get the latest
616    /// state for the switch's position, its stabilized position, and its invalid status.
617    /// It updates the corresponding `Option<bool>` variables with the retrieved values and then
618    /// calls `output_state` to write these states to their respective configured files on disk.
619    /// If the mutex lock fails, the `Option` variables are set to `None`.
620    ///
621    /// # Arguments
622    /// * `mutex_tank_level_switch_signals` - An `Arc<Mutex<TankLevelSwitchSignals>>` holding the latest switch states.
623    /// * `tank_level_switch_position_opt` - A mutable reference to an `Option<bool>` for the switch's current position.
624    /// * `tank_level_switch_position_stabilized_opt` - A mutable reference to an `Option<bool>` for the switch's stabilized position.
625    /// * `tank_level_switch_invalid_opt` - A mutable reference to an `Option<bool>` for the switch's invalid status.
626    fn update_tank_level_switch_signals(
627        &mut self,
628        mutex_tank_level_switch_signals: &Arc<Mutex<TankLevelSwitchSignals>>,
629        tank_level_switch_position_opt: &mut Option<bool>,
630        tank_level_switch_position_stabilized_opt: &mut Option<bool>,
631        tank_level_switch_invalid_opt: &mut Option<bool>,
632    ) {
633        {
634            // internal scope to limit lifetime of mutex lock
635            match mutex_tank_level_switch_signals.lock() {
636                Ok(tank_level_switch_signals) => {
637                    *tank_level_switch_position_opt =
638                        Some(tank_level_switch_signals.tank_level_switch_position);
639                    *tank_level_switch_position_stabilized_opt =
640                        Some(tank_level_switch_signals.tank_level_switch_position_stabilized);
641                    *tank_level_switch_invalid_opt =
642                        Some(tank_level_switch_signals.tank_level_switch_invalid);
643
644                    self.lock_error_output_state_tank_level_switch_position = self.output_state(
645                        tank_level_switch_signals.tank_level_switch_position,
646                        "HIGH",
647                        "LOW",
648                        &self.config.output_file_name_tank_level_switch_position,
649                        self.lock_error_output_state_tank_level_switch_position,
650                    );
651
652                    self.lock_error_output_state_tank_level_switch_position_stabilized = self
653                        .output_state(
654                            tank_level_switch_signals.tank_level_switch_position_stabilized,
655                            "HIGH",
656                            "LOW",
657                            &self
658                                .config
659                                .output_file_name_tank_level_switch_position_stabilized,
660                            self.lock_error_output_state_tank_level_switch_position_stabilized,
661                        );
662
663                    self.lock_error_output_state_tank_level_switch_invalid = self.output_state(
664                        tank_level_switch_signals.tank_level_switch_invalid,
665                        "INVALID",
666                        "VALID",
667                        &self.config.output_file_name_tank_level_switch_invalid,
668                        self.lock_error_output_state_tank_level_switch_invalid,
669                    );
670                }
671                Err(_) => {
672                    *tank_level_switch_position_opt = None;
673                    *tank_level_switch_position_stabilized_opt = None;
674                    *tank_level_switch_invalid_opt = None;
675                }
676            };
677        }
678    }
679
680    /// Calculates filtered values for water temperature, pH, and conductivity, then outputs them to files.
681    ///
682    /// This private helper function takes raw (unfiltered) sensor readings. For each valid reading,
683    /// it applies its corresponding `IIRFilter` to smooth the data. The resulting filtered values
684    /// are then stored in new `Option<f32>` variables and written to their designated files on disk.
685    /// If a raw reading is `None`, no filtering or output occurs for that signal.
686    ///
687    /// # Arguments
688    /// * `water_temperature` - The measured water temperature, wrapped in an `Option`.
689    /// * `ph` - The measured pH value, wrapped in an `Option`.
690    /// * `conductivity` - The measured conductivity value, wrapped in an `Option`.
691    ///
692    /// # Returns
693    /// A tuple `(Option<f32>, Option<f32>, Option<f32>)` containing the filtered
694    /// water temperature, pH, and conductivity values, respectively. Each value is
695    /// `Some` if the corresponding input was valid and filtered, or `None` otherwise.
696    fn calculate_filtered_signals(
697        &mut self,
698        water_temperature: Option<f32>,
699        ph: Option<f32>,
700        conductivity: Option<f32>,
701        filter_water_temperature: &mut IIRFilter,
702        filter_pH: &mut IIRFilter,
703        filter_conductivity: &mut IIRFilter,
704    ) -> (Option<f32>, Option<f32>, Option<f32>) {
705        let water_temperature_filtered = match water_temperature {
706            Some(water_temperature_data) => {
707                let filtered_water_temperature =
708                    filter_water_temperature.get_filtered_value(water_temperature_data);
709                self.lock_error_output_signal_water_temperature_filtered = self.output_signal(
710                    filtered_water_temperature,
711                    AquariumSignal::WaterTemperature,
712                    &self.config.output_file_name_water_temperature_filtered,
713                    self.lock_error_output_signal_water_temperature_filtered,
714                );
715                Some(filtered_water_temperature)
716            }
717            None => None,
718        };
719
720        let pH_filtered = match ph {
721            Some(ph_data) => {
722                let filtered_pH = filter_pH.get_filtered_value(ph_data);
723                self.lock_error_output_signal_pH_filtered = self.output_signal(
724                    filtered_pH,
725                    AquariumSignal::pH,
726                    &self.config.output_file_name_pH_filtered,
727                    self.lock_error_output_signal_pH_filtered,
728                );
729                Some(filtered_pH)
730            }
731            None => None,
732        };
733
734        let conductivity_filtered = match conductivity {
735            Some(conductivity_data) => {
736                let filtered_conductivity =
737                    filter_conductivity.get_filtered_value(conductivity_data);
738                self.lock_error_output_signal_conductivity_filtered = self.output_signal(
739                    filtered_conductivity,
740                    AquariumSignal::Conductivity,
741                    &self.config.output_file_name_conductivity_filtered,
742                    self.lock_error_output_signal_conductivity_filtered,
743                );
744                Some(filtered_conductivity)
745            }
746            None => None,
747        };
748
749        (
750            water_temperature_filtered,
751            pH_filtered,
752            conductivity_filtered,
753        )
754    }
755
756    /// Retrieves the current timestamp, either from the SQL database or generated locally by the application.
757    ///
758    /// This private helper function determines the timestamp source based on the `use_sql_timestamp`
759    /// configuration setting. If `use_sql_timestamp` is `true`, it attempts to fetch the timestamp
760    /// from the connected SQL database. If that fails, it falls back to using a locally generated
761    /// timestamp and logs an error. If `use_sql_timestamp` is `false`, it always uses a local timestamp.
762    /// The chosen timestamp is also written to a configured file on the disk.
763    ///
764    /// # Arguments
765    /// * `conn` - A mutable reference to an active SQL database connection, used if the SQL timestamp option is enabled.
766    ///
767    /// # Returns
768    /// A `NaiveDateTime` representing the current timestamp, sourced either from the database or the local system.
769    fn update_timestamp(&mut self, conn: &mut PooledConn) -> NaiveDateTime {
770        match self.config.use_sql_timestamp {
771            true => match SqlInterface::get_current_timestamp(conn) {
772                Ok(c) => {
773                    self.lock_error_get_timestamp = false;
774                    self.lock_error_output_timestamp = self.output_timestamp(
775                        c,
776                        &self.config.output_file_name_timestamp,
777                        self.lock_error_output_timestamp,
778                    );
779                    c
780                }
781                Err(e) => {
782                    if !self.lock_error_get_timestamp {
783                        error!(
784                            target: module_path!(),
785                            "could not retrieve timestamp from database: ({e:?}) - using application-generated timestamp."
786                        );
787                        self.lock_error_get_timestamp = true;
788                    }
789                    Local::now().naive_local()
790                }
791            },
792            false => Local::now().naive_local(),
793        }
794    }
795
796    /// Confirms the application's readiness to quit and then waits for a specific termination command from the signal handler.
797    ///
798    /// This function is called when the thread has received the Quit-command from the signal handler.
799    /// It first sends a confirmation back to the signal handler,
800    /// indicating that this component is ready to proceed with shutdown.
801    ///
802    /// After confirming, it enters a loop, continuously listening for an `InternalCommand::Terminate`
803    /// signal from the `signal_handler`. This ensures that the application's internal threads,
804    /// which might be listening to this component's channels, stop receiving commands.
805    ///
806    /// # Arguments
807    ///
808    /// * `data_logger_channels` - A mutable reference to the struct that contains the channels.
809    fn confirm_quit_and_wait_for_termination_command(
810        &mut self,
811        data_logger_channels: &mut DataLoggerChannels,
812    ) {
813        let sleep_duration_hundred_millis = Duration::from_millis(100);
814
815        data_logger_channels.acknowledge_signal_handler();
816
817        // This thread has channel connections to underlying threads.
818        // Those threads have to stop receiving commands from this thread.
819        // The shutdown sequence is handled by the signal_handler module.
820        self.wait_for_termination(
821            &mut data_logger_channels.rx_data_logger_from_signal_handler,
822            sleep_duration_hundred_millis,
823            module_path!(),
824        );
825    }
826
827    /// Executes the main control loop for the data logger module.
828    ///
829    /// This function runs continuously, acting as the central data collection and logging hub for
830    /// the application.
831    /// Before entering the loop, the function will set up Infinite Impulse Response (IIR) filters
832    /// for various sensor signals.  
833    /// The IIR filters for water temperature, pH, and conductivity are initialized
834    /// with available data and a 1-second time step.
835    ///
836    /// It performs the following key tasks in a recurring cycle:
837    /// 1. **Signal Retrieval**: Gathers current status (ON/OFF) from control modules (Refill, Heating, Ventilation) via channels.
838    /// 2. **Sensor Data Acquisition**: Reads raw sensor values (temperatures, pH, conductivity, ambient conditions, tank level switch states) from shared mutexes, which are updated by other sensor-reading threads.
839    /// 3. **Signal Filtering**: Applies IIR (Infinite Impulse Response) filters to sensor signals (e.g., water temperature, pH, conductivity) to produce smoothed values.
840    /// 4. **Timestamping**: Determines the current timestamp, either from the SQL database or generated locally, based on configuration.
841    /// 5. **DataFrame Construction**: Assembles all collected and processed data into a comprehensive `RecorderDataFrame`.
842    /// 6. **Data Injection & Disk Output**: Transfers the `RecorderDataFrame` to the SQL database (via the `data_injection` trait) and writes various individual signal/state values to designated files on a RAM disk (if configured).
843    /// 7. **Cycle Management**: Calculates the execution time of the current cycle and idles for the remaining duration to meet the configured `logging_interval`, ensuring precise data logging frequency. It warns if the cycle time is exceeded.
844    /// 8. **Responsiveness**: Remains responsive to `Quit` commands from the signal handler during idle periods to enable graceful shutdown.
845    ///
846    /// The loop continues indefinitely until a `Quit` command is received from the signal handler.
847    /// Upon receiving `Quit`, it performs a graceful exit sequence: sending confirmation back
848    /// to the signal handler and then waiting for a `Terminate` command before finally exiting the thread.
849    ///
850    /// # Arguments
851    /// * `data_injection` - An object implementing the `DataInjectionTrait`, used for transferring the collected `RecorderDataFrame` to the SQL database or a mock data structure during testing.
852    /// * `sql_interface_data` - A `SqlInterfaceData` instance, providing the direct interface for writing `RecorderDataFrame`s to the SQL database.
853    /// * `data_logger_channels` - A `DataLoggerChannels` struct, containing all the `mpsc` sender and receiver channels necessary for inter-thread communication with other modules and the signal handler.
854    /// * `data_logger_mutexes` - A `DataLoggerMutexes` struct, providing access to shared `Arc<Mutex>` protected sensor data and device states.
855    pub fn execute(
856        &mut self,
857        data_injection: &mut impl DataInjectionTrait,
858        mut sql_interface_data: SqlInterfaceData,
859        data_logger_channels: &mut DataLoggerChannels,
860        data_logger_mutexes: DataLoggerMutexes,
861    ) {
862        #[cfg(all(target_os = "linux", not(test)))]
863        info!(target: module_path!(), "Thread started with TID: {}", gettid());
864
865        let cycle_time_duration =
866            Duration::from_millis(self.config.logging_interval * MILLIS_PER_SEC);
867        let sleep_duration_hundred_millis = Duration::from_millis(100);
868        let spin_sleeper = SpinSleeper::default();
869        let mut i;
870
871        // define and initialize with values which will be overwritten in the first iteration
872        let mut water_temperature: Option<f32> = Some(10.0);
873        let mut ph: Option<f32> = Some(0.0);
874        let mut conductivity: Option<f32> = Some(0.0);
875        let mut ambient_temperature: Option<f32> = Some(0.0);
876        let mut ambient_humidity: Option<f32> = Some(0.0);
877        let mut tank_level_switch_invalid: Option<bool> = Some(false);
878        let mut tank_level_switch_position: Option<bool> = Some(true);
879        let mut tank_level_switch_position_stabilized: Option<bool> = Some(true);
880        let mut heater_is_on: bool = false;
881        let mut ventilation_is_on: bool = false;
882        let mut refill_in_progress_live: bool = false;
883        let mut refill_in_progress_for_database: bool = false;
884        let mut quit_command_received = false;
885        let mut lock_warn_cycle_time_exceeded = false;
886        let mut cycle_time_exceeded: bool; // ensures that signal handler/messaging are polled at least once even if cycle time is exceeded
887
888        // update signals by accessing mutex for Atlas Scientific signals
889        self.update_sensor_manager_signals(
890            data_logger_mutexes.mutex_sensor_manager_signals.clone(),
891            &mut water_temperature,
892            &mut ph,
893            &mut conductivity,
894            &mut ambient_temperature,
895            &mut ambient_humidity,
896        );
897
898        let mut filter_water_temperature = IIRFilter::new_requires_initial(
899            &self.config.filter_coefficient_water_temperature,
900            water_temperature.unwrap_or(25.0),
901            1.0, // 1 second time step
902        );
903        let mut filter_pH = IIRFilter::new_requires_initial(
904            &self.config.filter_coefficient_pH,
905            ph.unwrap_or(7.0),
906            1.0, // 1 second time step
907        );
908        let mut filter_conductivity = IIRFilter::new_requires_initial(
909            &self.config.filter_coefficient_conductivity,
910            conductivity.unwrap_or(50_000.0),
911            1.0, // 1 second time step
912        );
913
914        let initial_sleep_time_start = Instant::now();
915        while Instant::now()
916            .duration_since(initial_sleep_time_start)
917            .as_millis()
918            < self.config.initial_sleep_time_millis.into()
919        {
920            spin_sleeper.sleep(sleep_duration_hundred_millis);
921            (quit_command_received, _, _) = self.process_external_request(
922                &mut data_logger_channels.rx_data_logger_from_signal_handler,
923                None,
924            );
925            if quit_command_received {
926                self.confirm_quit_and_wait_for_termination_command(data_logger_channels);
927                return;
928            }
929        }
930
931        // Initialize reference for cycle time measurement
932        let mut start_time = Instant::now();
933
934        loop {
935            if self.config.active {
936                self.update_signals(
937                    &mut refill_in_progress_live,
938                    &mut refill_in_progress_for_database,
939                    true,
940                    &mut heater_is_on,
941                    &mut ventilation_is_on,
942                    &data_logger_mutexes,
943                );
944
945                // update signals by accessing mutex for Atlas Scientific signals
946                self.update_sensor_manager_signals(
947                    data_logger_mutexes.mutex_sensor_manager_signals.clone(),
948                    &mut water_temperature,
949                    &mut ph,
950                    &mut conductivity,
951                    &mut ambient_temperature,
952                    &mut ambient_humidity,
953                );
954
955                // update signals by accessing mutex for tank level switch signals
956                self.update_tank_level_switch_signals(
957                    &data_logger_mutexes.mutex_tank_level_switch_signals,
958                    &mut tank_level_switch_position,
959                    &mut tank_level_switch_position_stabilized,
960                    &mut tank_level_switch_invalid,
961                );
962
963                let (water_temperature_filtered, pH_filtered, conductivity_filtered) = self
964                    .calculate_filtered_signals(
965                        water_temperature,
966                        ph,
967                        conductivity,
968                        &mut filter_water_temperature,
969                        &mut filter_pH,
970                        &mut filter_conductivity,
971                    );
972
973                let current_timestamp = self.update_timestamp(&mut sql_interface_data.conn);
974
975                // all data received, let's construct a data frame
976                let data_frame = RecorderDataFrame {
977                    timestamp: current_timestamp,
978                    water_temperature,
979                    water_temperature_filtered,
980                    ph,
981                    ph_filtered: pH_filtered,
982                    conductivity,
983                    conductivity_filtered,
984                    conductivity_compensated: None,
985                    refill_in_progress: Some(refill_in_progress_for_database),
986                    tank_level_switch_position,
987                    tank_level_switch_position_stabilized,
988                    tank_level_switch_invalid,
989                    surface_ventilation_status: Some(ventilation_is_on),
990                    ambient_temperature,
991                    ambient_humidity,
992                    heater_status: Some(heater_is_on),
993                };
994
995                data_injection.inject_data(data_frame, &mut sql_interface_data);
996            }
997            let stop_time = Instant::now();
998            let execution_duration = stop_time.duration_since(start_time);
999
1000            // deduct the execution time of the thread from the target cycle time
1001            let remaining_sleep_time_millis: u64;
1002            if execution_duration > cycle_time_duration {
1003                if !lock_warn_cycle_time_exceeded {
1004                    warn!(target: module_path!(),
1005                        "execution duration of {} milliseconds exceeds cycle time of {}",
1006                        execution_duration.as_millis(),
1007                        cycle_time_duration.as_millis());
1008                    lock_warn_cycle_time_exceeded = true;
1009                }
1010                remaining_sleep_time_millis = 0;
1011                cycle_time_exceeded = true;
1012            } else {
1013                // underflow cannot happen because the else statement requires that the first operand is higher than the second
1014                lock_warn_cycle_time_exceeded = false;
1015                let remaining_sleep_time_duration = cycle_time_duration - execution_duration;
1016                remaining_sleep_time_millis = remaining_sleep_time_duration.as_millis() as u64;
1017                cycle_time_exceeded = false;
1018            }
1019
1020            #[cfg(feature = "debug_data_logger")]
1021            debug!(
1022                target: module_path!(),
1023                "inserted data into database, now idling for {} milliseconds",
1024                remaining_sleep_time_millis
1025            );
1026
1027            // wait the remaining time period to complete the target cycle time
1028            i = 0;
1029            while i < remaining_sleep_time_millis || cycle_time_exceeded {
1030                i += 100;
1031                spin_sleeper.sleep(sleep_duration_hundred_millis);
1032                (quit_command_received, _, _) = self.process_external_request(
1033                    &mut data_logger_channels.rx_data_logger_from_signal_handler,
1034                    None,
1035                );
1036                if quit_command_received {
1037                    #[cfg(feature = "debug_data_logger")]
1038                    debug!(
1039                        target: module_path!(),
1040                        "received QUIT command from signal handler"
1041                    );
1042                    break;
1043                }
1044
1045                if self.config.active {
1046                    // once per second, retrieve signals and write them to the RAM disk
1047                    if i % 1000 == 0 {
1048                        self.update_signals(
1049                            &mut refill_in_progress_live,
1050                            &mut refill_in_progress_for_database,
1051                            false,
1052                            &mut heater_is_on,
1053                            &mut ventilation_is_on,
1054                            &data_logger_mutexes,
1055                        );
1056
1057                        // update signals by accessing mutex for Atlas Scientific signals
1058                        self.update_sensor_manager_signals(
1059                            data_logger_mutexes.mutex_sensor_manager_signals.clone(),
1060                            &mut water_temperature,
1061                            &mut ph,
1062                            &mut conductivity,
1063                            &mut ambient_temperature,
1064                            &mut ambient_humidity,
1065                        );
1066
1067                        // update signals by accessing mutex for tank level switch signals
1068                        self.update_tank_level_switch_signals(
1069                            &data_logger_mutexes.mutex_tank_level_switch_signals,
1070                            &mut tank_level_switch_position,
1071                            &mut tank_level_switch_position_stabilized,
1072                            &mut tank_level_switch_invalid,
1073                        );
1074                    }
1075                }
1076                cycle_time_exceeded = false;
1077            }
1078            if quit_command_received {
1079                break;
1080            }
1081
1082            #[cfg(feature = "debug_data_logger")]
1083            debug!(
1084                target: module_path!(),
1085                "finished idling restarting loop"
1086            );
1087
1088            self.check_timing_and_ping_database(&mut sql_interface_data);
1089
1090            start_time = Instant::now(); // start time for the next cycle
1091        }
1092
1093        self.confirm_quit_and_wait_for_termination_command(data_logger_channels);
1094    }
1095}
1096
1097#[cfg(test)]
1098pub mod tests {
1099    use crate::database::{sql_interface::SqlInterface, sql_interface_data::SqlInterfaceData};
1100    use crate::launch::channels::{channel, Channels};
1101    use crate::mocks::mock_data_injection::tests::MockDataInjection;
1102    use crate::mocks::mock_heating::tests::mock_heating;
1103    use crate::mocks::mock_signal::tests::MockSignal;
1104    use crate::mocks::mock_ventilation::tests::mock_ventilation;
1105    use crate::mocks::test_command::tests::TestCommand;
1106    use crate::recorder::data_logger::{DataLogger, DataLoggerConfig, DataLoggerMutexes};
1107    use crate::sensors::sensor_manager::SensorManagerSignals;
1108    use crate::sensors::tank_level_switch::TankLevelSwitchSignals;
1109    use crate::utilities::channel_content::InternalCommand;
1110    use crate::utilities::config::{read_config_file_with_test_database, ConfigData};
1111    use crate::water::refill::RefillStatus;
1112    use spin_sleep::SpinSleeper;
1113    use std::sync::{Arc, Mutex};
1114    use std::{fs, thread, time::Duration};
1115
1116    #[cfg(test)]
1117    // Asserts the content of all data logger output files on disk.
1118    //
1119    // This comprehensive helper function is used in test environments to verify that
1120    // the `DataLogger` correctly writes various sensor signals and device states
1121    // to their respective configured files. It reads each file, parses its content
1122    // (for floating-point numbers) or compares it directly (for boolean states),
1123    // and asserts that it matches the expected target values.
1124    //
1125    // Arguments:
1126    // * `data_logger_config`: The `DataLoggerConfig` struct containing the names of all output files.
1127    // * `target_water_temperature`: The expected `f32` value for water temperature.
1128    // * `target_water_temperature_filtered`: The expected `f32` value for filtered water temperature.
1129    // * `target_water_ph`: The expected `f32` value for pH.
1130    // * `target_water_ph_filtered`: The expected `f32` value for filtered pH.
1131    // * `target_water_conductivity`: The expected `f32` value for conductivity.
1132    // * `target_water_conductivity_filtered`: The expected `f32` value for filtered conductivity.
1133    // * `_target_water_conductivity_compensated`: (Ignored) Placeholder for a compensated conductivity value.
1134    // * `target_refill_in_progress`: The expected `bool` state for refill in progress.
1135    // * `target_tank_level_switch_position`: The expected `bool` state for tank level switch position.
1136    // * `target_tank_level_switch_invalid`: The expected `bool` state for tank level switch invalid status.
1137    // * `target_tank_level_switch_position_stabilized`: The expected `bool` state for stabilized tank level switch position.
1138    // * `target_surface_ventilation_status`: The expected `bool` state for surface ventilation status.
1139    // * `target_ambient_temperature`: The expected `f32` value for ambient temperature.
1140    // * `target_ambient_humidity`: The expected `f32` value for ambient humidity.
1141    // * `target_heater_status`: The expected `bool` state for heater status.
1142    // * `target_water_temperature_ds18b20`: The expected `f32` value for DS18B20 water temperature.
1143    // * `target_ambient_temperature_ds18b20`: The expected `f32` value for DS18B20 ambient temperature.
1144    //
1145    // Panics:
1146    // This function will panic if:
1147    // - It fails to read any of the specified output files.
1148    // - It fails to parse a file's content into the expected numeric type.
1149    // - The content of any file does not match its corresponding target value.
1150    fn assert_output_file_contents(
1151        data_logger_config: &DataLoggerConfig,
1152        target_water_temperature: f32,
1153        target_water_temperature_filtered: f32,
1154        target_water_ph: f32,
1155        target_water_ph_filtered: f32,
1156        target_water_conductivity: f32,
1157        target_water_conductivity_filtered: f32,
1158        _target_water_conductivity_compensated: f32,
1159        target_refill_in_progress: bool,
1160        target_tank_level_switch_position: bool,
1161        target_tank_level_switch_invalid: bool,
1162        target_tank_level_switch_position_stabilized: bool,
1163        target_surface_ventilation_status: bool,
1164        target_ambient_temperature: f32,
1165        target_ambient_humidity: f32,
1166        target_heater_status: bool,
1167    ) {
1168        // Helper to assert the content of a file containing a f32 value.
1169        let assert_file_f32 = |filename: &str, expected: f32| {
1170            let content = fs::read_to_string(filename)
1171                .unwrap_or_else(|_| panic!("Could not read file: {}", filename));
1172            let value = content
1173                .parse::<f32>()
1174                .unwrap_or_else(|_| panic!("Could not parse f32 from file: {}", filename));
1175            assert_eq!(value, expected, "Mismatch in file: {}", filename);
1176        };
1177
1178        // Helper to assert the content of a file containing a boolean state.
1179        let assert_file_bool =
1180            |filename: &str, expected_bool: bool, true_str: &str, false_str: &str| {
1181                let content = fs::read_to_string(filename)
1182                    .unwrap_or_else(|_| panic!("Could not read file: {}", filename));
1183                let expected_str = if expected_bool { true_str } else { false_str };
1184                assert_eq!(content, expected_str, "Mismatch in file: {}", filename);
1185            };
1186
1187        // Now, the assertions are clean, single-line calls.
1188        assert_file_f32(
1189            &data_logger_config.output_file_name_water_temperature,
1190            target_water_temperature,
1191        );
1192        assert_file_f32(
1193            &data_logger_config.output_file_name_water_temperature_filtered,
1194            target_water_temperature_filtered,
1195        );
1196        assert_file_f32(&data_logger_config.output_file_name_pH, target_water_ph);
1197        assert_file_f32(
1198            &data_logger_config.output_file_name_pH_filtered,
1199            target_water_ph_filtered,
1200        );
1201        assert_file_f32(
1202            &data_logger_config.output_file_name_conductivity,
1203            target_water_conductivity,
1204        );
1205        assert_file_f32(
1206            &data_logger_config.output_file_name_conductivity_filtered,
1207            target_water_conductivity_filtered,
1208        );
1209        assert_file_f32(
1210            &data_logger_config.output_file_name_ambient_temperature,
1211            target_ambient_temperature,
1212        );
1213        assert_file_f32(
1214            &data_logger_config.output_file_name_humidity,
1215            target_ambient_humidity,
1216        );
1217
1218        assert_file_bool(
1219            &data_logger_config.output_file_name_refill_in_progress,
1220            target_refill_in_progress,
1221            "ON",
1222            "OFF",
1223        );
1224        assert_file_bool(
1225            &data_logger_config.output_file_name_tank_level_switch_position,
1226            target_tank_level_switch_position,
1227            "HIGH",
1228            "LOW",
1229        );
1230        assert_file_bool(
1231            &data_logger_config.output_file_name_tank_level_switch_invalid,
1232            target_tank_level_switch_invalid,
1233            "INVALID",
1234            "VALID",
1235        );
1236        assert_file_bool(
1237            &data_logger_config.output_file_name_tank_level_switch_position_stabilized,
1238            target_tank_level_switch_position_stabilized,
1239            "HIGH",
1240            "LOW",
1241        );
1242        assert_file_bool(
1243            &data_logger_config.output_file_name_ventilation_is_on,
1244            target_surface_ventilation_status,
1245            "ON",
1246            "OFF",
1247        );
1248        assert_file_bool(
1249            &data_logger_config.output_file_name_heating_is_on,
1250            target_heater_status,
1251            "ON",
1252            "OFF",
1253        );
1254    }
1255
1256    #[test]
1257    // test case checks if DataLogger correctly processes all information:
1258    // - storage in SQL database using mock implementation
1259    // - writing output to the filesystem
1260    // Test case uses test database #45.
1261    pub fn test_data_log() {
1262        let mut channels = Channels::new_for_test();
1263
1264        // test environment-specific channels
1265        let (mut tx_test_environment_to_ventilation, mut rx_ventilation_from_test_environment) =
1266            channel(1);
1267        let (mut tx_test_environment_to_heating, mut rx_heating_from_test_environment) = channel(1);
1268
1269        let mut mock_data_injection = MockDataInjection::new();
1270
1271        let mut config: ConfigData = read_config_file_with_test_database(
1272            "/config/aquarium_control_test_generic.toml".to_string(),
1273            45,
1274        );
1275        config.data_logger.logging_interval = 1;
1276
1277        let mut config2: ConfigData = read_config_file_with_test_database(
1278            "/config/aquarium_control_test_generic.toml".to_string(),
1279            45,
1280        );
1281        config2.data_logger.logging_interval = 1;
1282
1283        let max_rows_data = config.sql_interface.max_rows_data;
1284
1285        let sql_interface = match SqlInterface::new(config.sql_interface.clone()) {
1286            Ok(c) => c,
1287            Err(e) => {
1288                panic!("Could not connect to SQL database: {e:?}");
1289            }
1290        };
1291        let sql_interface_data =
1292            SqlInterfaceData::new(sql_interface.get_connection().unwrap(), max_rows_data).unwrap();
1293        let mut data_logger =
1294            DataLogger::new(config.data_logger, Duration::from_millis(1000)).unwrap();
1295
1296        // replacement values are used to initialize the mutex content
1297        config.sensor_manager.replacement_value_water_temperature = 10.0;
1298        config.sensor_manager.replacement_value_ph = 11.0;
1299        config.sensor_manager.replacement_value_conductivity = 12.0;
1300
1301        let mutex_ds18b20_water_temperature = Arc::new(Mutex::new(15.0));
1302        let mutex_ds18b20_ambient_temperature = Arc::new(Mutex::new(16.0));
1303        let mutex_tank_level_switch_signals =
1304            Arc::new(Mutex::new(TankLevelSwitchSignals::new(false, false, false)));
1305        let mutex_tank_level_switch_signals_clone = mutex_tank_level_switch_signals.clone();
1306        let mutex_heating_status = Arc::new(Mutex::new(false));
1307        let mutex_heating_status_clone_for_data_logger = mutex_heating_status.clone();
1308        let mutex_ventilation_status = Arc::new(Mutex::new(false));
1309        let mutex_ventilation_status_clone_for_data_logger = mutex_ventilation_status.clone();
1310        let refill_status = RefillStatus {
1311            refill_in_progress_live: false,
1312            refill_in_progress_for_database: false,
1313        };
1314        let mutex_refill_status = Arc::new(Mutex::new(refill_status));
1315        let mutex_refill_status_clone_for_data_logger = mutex_refill_status.clone();
1316        let mutex_refill_status_clone_for_test_environment = mutex_refill_status.clone();
1317        config.sensor_manager.replacement_value_ambient_temperature = 13.0;
1318        config.sensor_manager.replacement_value_ambient_humidity = 14.0;
1319        let mutex_sensor_manager_signals = Arc::new(Mutex::new(SensorManagerSignals::new(
1320            &config.sensor_manager,
1321        )));
1322        let mutex_sensor_manager_signals_clone_for_data_logger =
1323            mutex_sensor_manager_signals.clone();
1324        // thread for test environment and mock signal handler
1325        let join_handle_test_environment = thread::Builder::new()
1326            .name("test_environment".to_string())
1327            .spawn(move || {
1328                let spin_sleeper = SpinSleeper::default();
1329                let sleep_duration_1_second = Duration::from_millis(1000);
1330                let sleep_duration_100_millis = Duration::from_millis(100);
1331                spin_sleeper.sleep(sleep_duration_1_second);
1332                spin_sleeper.sleep(sleep_duration_100_millis);
1333
1334                let _ = tx_test_environment_to_ventilation.send(TestCommand::UpdateSignal(
1335                    MockSignal::VentilationControlStatus(true),
1336                ));
1337                {
1338                    let mut sensor_manager_signals = mutex_sensor_manager_signals.lock().unwrap();
1339                    sensor_manager_signals.water_temperature = 29.0;
1340                }
1341
1342                spin_sleeper.sleep(sleep_duration_1_second);
1343
1344                let _ = tx_test_environment_to_heating.send(TestCommand::UpdateSignal(
1345                    MockSignal::HeatingControlStatus(true),
1346                ));
1347
1348                {
1349                    let mut sensor_manager_signals = mutex_sensor_manager_signals.lock().unwrap();
1350                    sensor_manager_signals.ph = 7.0;
1351                }
1352
1353                spin_sleeper.sleep(sleep_duration_1_second);
1354
1355                {
1356                    let mut sensor_manager_signals = mutex_sensor_manager_signals.lock().unwrap();
1357                    sensor_manager_signals.conductivity = 50000.0;
1358                }
1359
1360                spin_sleeper.sleep(sleep_duration_1_second);
1361
1362                {
1363                    let mut tank_level_switch_signals =
1364                        mutex_tank_level_switch_signals.lock().unwrap();
1365                    tank_level_switch_signals.tank_level_switch_invalid = true;
1366                }
1367
1368                {
1369                    let mut sensor_manager_signals = mutex_sensor_manager_signals.lock().unwrap();
1370                    sensor_manager_signals.ambient_temperature = 35.0;
1371                }
1372
1373                {
1374                    let mut refill_status = mutex_refill_status_clone_for_test_environment
1375                        .lock()
1376                        .unwrap();
1377                    refill_status.refill_in_progress_for_database = true;
1378                }
1379
1380                spin_sleeper.sleep(sleep_duration_1_second);
1381
1382                {
1383                    let mut tank_level_switch_signals =
1384                        mutex_tank_level_switch_signals.lock().unwrap();
1385                    tank_level_switch_signals.tank_level_switch_position = true;
1386                }
1387
1388                {
1389                    let mut sensor_manager_signals = mutex_sensor_manager_signals.lock().unwrap();
1390                    sensor_manager_signals.ambient_humidity = 66.0;
1391                }
1392
1393                spin_sleeper.sleep(sleep_duration_1_second);
1394
1395                {
1396                    let mut water_temperature_ds18b20 =
1397                        mutex_ds18b20_water_temperature.lock().unwrap();
1398                    *water_temperature_ds18b20 = 15.5;
1399                    let mut ambient_temperature_ds18b20 =
1400                        mutex_ds18b20_ambient_temperature.lock().unwrap();
1401                    *ambient_temperature_ds18b20 = 16.5;
1402                }
1403
1404                {
1405                    let mut tank_level_switch_signals =
1406                        mutex_tank_level_switch_signals.lock().unwrap();
1407                    tank_level_switch_signals.tank_level_switch_position_stabilized = true;
1408                }
1409
1410                spin_sleeper.sleep(sleep_duration_1_second);
1411
1412                // cease operation of the test object
1413                let _ = channels
1414                    .signal_handler
1415                    .send_to_data_logger(InternalCommand::Quit);
1416                channels.signal_handler.receive_from_data_logger().unwrap();
1417
1418                // signal end of test case execution to mock threads
1419                let _ = channels
1420                    .signal_handler
1421                    .send_to_heating(InternalCommand::Quit);
1422                let _ = channels
1423                    .signal_handler
1424                    .send_to_ventilation(InternalCommand::Quit);
1425
1426                spin_sleeper.sleep(sleep_duration_1_second);
1427                let _ = channels
1428                    .signal_handler
1429                    .send_to_data_logger(InternalCommand::Terminate);
1430            })
1431            .unwrap();
1432
1433        // thread for mock heating
1434        let join_handle_mock_heating = thread::Builder::new()
1435            .name("mock_heating".to_string())
1436            .spawn(move || {
1437                mock_heating(
1438                    None,
1439                    None,
1440                    &mut channels.heating.rx_heating_from_signal_handler,
1441                    Some(&mut rx_heating_from_test_environment),
1442                    mutex_heating_status, // initial heating control status
1443                );
1444            })
1445            .unwrap();
1446
1447        // thread for mock ventilation
1448        let join_handle_mock_ventilation = thread::Builder::new()
1449            .name("mock_ventilation".to_string())
1450            .spawn(move || {
1451                mock_ventilation(
1452                    &mut channels.ventilation.rx_ventilation_from_signal_handler,
1453                    Some(&mut rx_ventilation_from_test_environment),
1454                    mutex_ventilation_status, // initial ventilation control status
1455                );
1456            })
1457            .unwrap();
1458
1459        // give mock threads some time to establish
1460        let spin_sleeper = SpinSleeper::default();
1461        let sleep_duration_100_millis = Duration::from_millis(100);
1462        spin_sleeper.sleep(sleep_duration_100_millis);
1463
1464        // thread for the test object and asserts
1465        let join_handle_test_object = thread::Builder::new()
1466            .name("test_object".to_string())
1467            .spawn(move || {
1468                let data_logger_mutexes = DataLoggerMutexes {
1469                    mutex_sensor_manager_signals:
1470                        mutex_sensor_manager_signals_clone_for_data_logger,
1471                    mutex_tank_level_switch_signals: mutex_tank_level_switch_signals_clone,
1472                    mutex_heating_status: mutex_heating_status_clone_for_data_logger,
1473                    mutex_ventilation_status: mutex_ventilation_status_clone_for_data_logger,
1474                    mutex_refill_status: mutex_refill_status_clone_for_data_logger,
1475                };
1476
1477                data_logger.execute(
1478                    &mut mock_data_injection,
1479                    sql_interface_data,
1480                    &mut channels.data_logger,
1481                    data_logger_mutexes,
1482                );
1483
1484                println!("Printing recorded data:");
1485                mock_data_injection.output();
1486
1487                // Assert if data logger has set the refill-in-progress bit to false.
1488                {
1489                    let refill_status = mutex_refill_status.lock().unwrap();
1490                    println!(
1491                        "Checking if data logger has set the refill-in-progress bit to false..."
1492                    );
1493                    assert_eq!(refill_status.refill_in_progress_for_database, false);
1494                }
1495
1496                // Dataframe #5
1497                let mut data_frame = mock_data_injection.data_frames.pop().unwrap();
1498                println!("Asserting data frame #5");
1499                data_frame.assert(
1500                    Some(29.0),     // target_water_temperature
1501                    Some(28.40625), // target_water_temperature_filtered
1502                    Some(7.0),      // target_water_ph
1503                    Some(7.125),    // target_water_ph_filtered
1504                    Some(50_000.0), // target_water_conductivity
1505                    Some(46875.75), // target_water_conductivity_filtered
1506                    None,           // target_water_conductivity_compensated
1507                    Some(false),    // target_refill_in_progress
1508                    Some(true),     // target_tank_level_switch_position
1509                    Some(true),     // target_tank_level_switch_position_stabilized
1510                    Some(true),     // target_tank_level_switch_invalid
1511                    Some(true),     // target_surface_ventilation_status
1512                    Some(35.0),     // target_ambient_temperature
1513                    Some(66.0),     // target_ambient_humidity
1514                    Some(true),     // target_heater_status
1515                );
1516
1517                // Dataframe #4
1518                data_frame = mock_data_injection.data_frames.pop().unwrap();
1519                println!("Asserting data frame #4");
1520                data_frame.assert(
1521                    Some(29.0),     // target_water_temperature
1522                    Some(27.8125),  // target_water_temperature_filtered
1523                    Some(7.0),      // target_water_ph
1524                    Some(7.25),     // target_water_ph_filtered
1525                    Some(50_000.0), // target_water_conductivity
1526                    Some(43751.5),  // target_water_conductivity_filtered
1527                    None,           // target_water_conductivity_compensated
1528                    Some(false),    // target_refill_in_progress
1529                    Some(true),     // target_tank_level_switch_position
1530                    Some(false),    // target_tank_level_switch_position_stabilized
1531                    Some(true),     // target_tank_level_switch_invalid
1532                    Some(true),     // target_surface_ventilation_status
1533                    Some(35.0),     // target_ambient_temperature
1534                    Some(66.0),     // target_ambient_humidity
1535                    Some(true),     // target_heater_status
1536                );
1537
1538                // Dataframe #3
1539                data_frame = mock_data_injection.data_frames.pop().unwrap();
1540                println!("Asserting data frame #3");
1541                data_frame.assert(
1542                    Some(29.0),     // target_water_temperature
1543                    Some(26.625),   // target_water_temperature_filtered
1544                    Some(7.0),      // target_water_ph
1545                    Some(7.5),      // target_water_ph_filtered
1546                    Some(50_000.0), // target_water_conductivity
1547                    Some(37503.0),  // target_water_conductivity_filtered
1548                    None,           // target_water_conductivity_compensated
1549                    Some(true),     // target_refill_in_progress
1550                    Some(false),    // target_tank_level_switch_position
1551                    Some(false),    // target_tank_level_switch_position_stabilized
1552                    Some(true),     // target_tank_level_switch_invalid
1553                    Some(true),     // target_surface_ventilation_status
1554                    Some(35.0),     // target_ambient_temperature
1555                    Some(14.0),     // target_ambient_humidity
1556                    Some(true),     // target_heater_status
1557                );
1558
1559                // Dataframe #2
1560                data_frame = mock_data_injection.data_frames.pop().unwrap();
1561                println!("Asserting data frame #2");
1562                data_frame.assert(
1563                    Some(29.0),     // target_water_temperature
1564                    Some(24.25),    // target_water_temperature_filtered
1565                    Some(7.0),      // target_water_ph
1566                    Some(8.0),      // target_water_ph_filtered
1567                    Some(50_000.0), // target_water_conductivity
1568                    Some(25006.0),  // target_water_conductivity_filtered
1569                    None,           // target_water_conductivity_compensated
1570                    Some(false),    // target_refill_in_progress
1571                    Some(false),    // target_tank_level_switch_position
1572                    Some(false),    // target_tank_level_switch_position_stabilized
1573                    Some(false),    // target_tank_level_switch_invalid
1574                    Some(true),     // target_surface_ventilation_status
1575                    Some(13.0),     // target_ambient_temperature
1576                    Some(14.0),     // target_ambient_humidity
1577                    Some(true),     // target_heater_status
1578                );
1579
1580                // Dataframe #1
1581                data_frame = mock_data_injection.data_frames.pop().unwrap();
1582                println!("Asserting data frame #1");
1583                data_frame.assert(
1584                    Some(29.0),  // target_water_temperature
1585                    Some(19.5),  // target_water_temperature_filtered
1586                    Some(7.0),   // target_water_ph
1587                    Some(9.0),   // target_water_ph_filtered
1588                    Some(12.0),  // target_water_conductivity
1589                    Some(12.0),  // target_water_conductivity_filtered
1590                    None,        // target_water_conductivity_compensated
1591                    Some(false), // target_refill_in_progress
1592                    Some(false), // target_tank_level_switch_position
1593                    Some(false), // target_tank_level_switch_position_stabilized
1594                    Some(false), // target_tank_level_switch_invalid
1595                    Some(true),  // target_surface_ventilation_status
1596                    Some(13.0),  // target_ambient_temperature
1597                    Some(14.0),  // target_ambient_humidity
1598                    Some(true),  // target_heater_status
1599                );
1600
1601                assert!(mock_data_injection.data_frames.is_empty());
1602
1603                assert_output_file_contents(
1604                    &config2.data_logger, // data_logger_config,
1605                    29.0,                 // target_water_temperature
1606                    28.4,                 // target_water_temperature_filtered
1607                    7.0,                  // target_water_ph
1608                    7.12,                 // target_water_ph_filtered
1609                    50_000.0,             // target_water_conductivity
1610                    46876.0,              // target_water_conductivity_filtered
1611                    0.0,                  // target_water_conductivity_compensated
1612                    false,                // target_refill_in_progress
1613                    true,                 // target_tank_level_switch_position
1614                    true,                 // target_tank_level_switch_invalid
1615                    true,                 // target_tank_level_switch_position_stabilized
1616                    true,                 // target_surface_ventilation_status
1617                    35.0,                 // target_ambient_temperature
1618                    66.0,                 // target_ambient_humidity
1619                    true,                 // target_heater_status
1620                );
1621            })
1622            .unwrap();
1623
1624        join_handle_mock_ventilation
1625            .join()
1626            .expect("Mock ventilation thread did not finish.");
1627        join_handle_mock_heating
1628            .join()
1629            .expect("Mock heating thread did not finish.");
1630        join_handle_test_environment
1631            .join()
1632            .expect("Test environment thread did not finish.");
1633        join_handle_test_object
1634            .join()
1635            .expect("Mock data logger thread did not finish.");
1636    }
1637}