aquarium_control/sensors/
atlas_scientific.rs

1/* Copyright 2024 Uwe Martin
2
3Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
4
5The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
6
7THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
8*/
9#![allow(non_snake_case)]
10
11#[cfg(all(not(test), target_os = "linux"))]
12use log::info;
13use std::fmt;
14
15#[cfg(not(test))]
16use log::warn;
17
18use crate::launch::channels::AquaChannelError;
19use crate::sensors::atlas_scientific_channels::AtlasScientificChannels;
20use crate::sensors::atlas_scientific_config::AtlasScientificConfig;
21use crate::sensors::atlas_scientific_error::AtlasScientificError;
22use crate::sensors::atlas_scientific_error::AtlasScientificError::{
23    InvalidSignal, SignalRequestCreationFailure,
24};
25use crate::sensors::i2c_interface::I2cRequest;
26use crate::simulator::get_resp_sim::GetResponseFromSimulatorTrait;
27use crate::utilities::acknowledge_signal_handler::AcknowledgeSignalHandlerTrait;
28use crate::utilities::channel_content::AquariumSignal;
29use crate::utilities::check_mutex_access_duration::CheckMutexAccessDurationTrait;
30use crate::utilities::logger::log_error_chain;
31use crate::utilities::proc_ext_req::ProcessExternalRequestTrait;
32use crate::utilities::wait_for_termination::WaitForTerminationTrait;
33#[cfg(all(not(test), target_os = "linux"))]
34use nix::unistd::gettid;
35use spin_sleep::SpinSleeper;
36use std::sync::{Arc, Mutex};
37use std::time::{Duration, Instant};
38
39// max loops for receiving when the quit command has been received
40#[allow(unused)] // used in conditionally compiled code
41pub const RECEIVE_COUNTER_MAX: u32 = 10;
42
43#[derive(Clone, Debug)]
44pub struct AtlasScientificResultData {
45    /// The measurement value
46    pub value: f32,
47
48    /// Bit indicating if the value is valid or not
49    #[allow(unused)]
50    invalid: bool,
51
52    /// The instant when the measurement result was recorded
53    #[allow(unused)]
54    measurement_instant: Instant,
55}
56
57impl AtlasScientificResultData {
58    pub fn new(value: f32) -> Self {
59        AtlasScientificResultData {
60            value,
61            invalid: false,
62            measurement_instant: Instant::now(),
63        }
64    }
65}
66
67impl fmt::Display for AtlasScientificResultData {
68    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69        // Write the floating-point sensor value directly to the formatter.
70        write!(f, "{}", self.value)
71    }
72}
73
74#[allow(unused)] // used in conditionally compiled code
75pub type AtlasScientificResult = Result<AtlasScientificResultData, AtlasScientificError>;
76
77#[allow(dead_code)]
78/// numeric values used in communication with Atlas Scientific sensor units
79mod atlas_scientific_constants {
80    /// command sent to sensor unit for initiating response
81    pub const COMMAND_READ: u8 = b'R';
82
83    /// length of the message received from the sensor (number of bytes)
84    pub const MESSAGE_SIZE: usize = 8;
85
86    /// location of the first character to be checked inside the message from the sensor
87    pub const INDEX_FIRST_CHARACTER: usize = 0;
88
89    /// start of data inside the message from the sensor
90    pub const INDEX_CONTENT_START: usize = 1;
91
92    /// finish of data inside the message from the sensor
93    pub const INDEX_PENULTIMATE_CHARACTER: usize = 6;
94
95    /// location of the last character to be checked inside the message from the sensor
96    pub const INDEX_LAST_CHARACTER: usize = 7;
97
98    /// ASCII value for representation of figure zero
99    pub const ASCII_VAL_ZERO: u8 = 48;
100
101    /// ASCII value for representation of figure nine
102    pub const ASCII_VAL_NINE: u8 = 57;
103
104    /// decimal point
105    pub const DECIMAL_POINT: u8 = b'.';
106
107    /// negative sign (note: positive sign does not occur in the message from the sensor)
108    pub const NEGATIVE_SIGN: u8 = b'-';
109
110    /// magic number at front position inside the message which identifies valid response from sensor unit
111    pub const MESSAGE_START_MAGIC: u8 = 1;
112
113    /// magic number at finish position inside the message which identifies valid response from sensor unit
114    pub const MESSAGE_FINISH_MAGIC: u8 = 0;
115
116    /// allow max. 10 milliseconds for mutex to be blocked by any other thread
117    pub const MAX_MUTEX_ACCESS_DURATION_MILLIS: u64 = 10;
118}
119
120/// Contains the communication Atlas Scientific sensors via I2C.
121/// Alternatively, TCP communication is used when configured to run with the simulator.
122/// The struct holds attributes for the results and for error flags indicating unsuccessful communication.
123/// Sensor data is read periodically and stored.
124/// Requests from data logger, ventilation, and heating control are answered with last measured data.
125#[cfg_attr(doc, aquamarine::aquamarine)]
126/// Creates the channels and sets up threads.
127/// Thread communication of this component is as follows:
128/// ```mermaid
129/// graph LR
130///     atlas_scientific --> signal_handler[SignalHandler]
131///     signal_handler --> atlas_scientific
132///     atlas_scientific -.-> data_logger[DataLogger]
133/// ```
134pub struct AtlasScientific {
135    /// configuration data for AtlasScientific
136    #[allow(unused)] // used in conditionally compiled code
137    config: AtlasScientificConfig,
138
139    /// inhibition flag to avoid flooding the log file with repeated messages about failure to receive termination signal via the channel
140    #[allow(unused)] // used in conditionally compiled code
141    pub(crate) lock_error_channel_receive_termination: bool,
142
143    /// inhibition flag to avoid flooding the log file with repeated messages about having received inapplicable command via the channel from the signal handler thread
144    #[allow(unused)] // used in conditionally compiled code
145    pub(crate) lock_warn_inapplicable_command_signal_handler: bool,
146
147    /// inhibition flag to avoid flooding the log file with repeated messages about to send request via the channel to I2cInterface thread
148    lock_error_channel_send_i2c_interface: bool,
149
150    /// inhibition flag to avoid flooding the log file with repeated messages about excessive access time to mutex for water temperature
151    pub lock_warn_max_mutex_access_duration_temperature: bool,
152
153    /// inhibition flag to avoid flooding the log file with repeated messages about excessive access time to mutex for pH
154    pub lock_warn_max_mutex_access_duration_ph: bool,
155
156    /// inhibition flag to avoid flooding the log file with repeated messages about excessive access time to mutex for conductivity
157    pub lock_warn_max_mutex_access_duration_conductivity: bool,
158
159    // for testing purposes: record when mutex access time is exceeded without resetting it
160    #[cfg(test)]
161    pub(crate) mutex_temperature_access_duration_exceeded: bool,
162    #[cfg(test)]
163    pub(crate) mutex_ph_access_duration_exceeded: bool,
164    #[cfg(test)]
165    pub(crate) mutex_conductivity_access_duration_exceeded: bool,
166
167    /// instant when the last measurement took place
168    #[allow(unused)] // used in conditionally compiled code
169    pub last_measurement_instant: Instant,
170
171    /// duration of the measurement interval calculated from the configuration data
172    #[allow(unused)] // used in conditionally compiled code
173    pub measurement_interval: Duration,
174
175    /// maximum allowed duration for access to mutex
176    pub max_mutex_access_duration_millis: Duration,
177}
178impl ProcessExternalRequestTrait for AtlasScientific {}
179
180impl GetResponseFromSimulatorTrait for AtlasScientific {}
181
182impl AtlasScientific {
183    /// Creates a new `AtlasScientific` control instance.
184    ///
185    /// This constructor initializes the Atlas Scientific sensor module. It takes the
186    /// configuration, calculates the measurement interval, and sets up internal
187    /// state, including lock flags used to prevent log flooding from recurring
188    /// errors or warnings.
189    ///
190    /// # Arguments
191    /// * `config` - Configuration data for the Atlas Scientific module, specifying I2C
192    ///   addresses, sleep times, and the measurement interval.
193    ///
194    /// # Returns
195    /// A new `AtlasScientific` struct, ready to manage sensor data acquisition.
196    #[allow(unused)] // used in conditionally compiled code
197    pub fn new(config: AtlasScientificConfig) -> AtlasScientific {
198        let measurement_interval = Duration::from_millis(config.measurement_interval_millis);
199
200        AtlasScientific {
201            config,
202            lock_error_channel_receive_termination: false,
203            lock_warn_inapplicable_command_signal_handler: false,
204            lock_error_channel_send_i2c_interface: false,
205            lock_warn_max_mutex_access_duration_temperature: false,
206            lock_warn_max_mutex_access_duration_ph: false,
207            lock_warn_max_mutex_access_duration_conductivity: false,
208
209            #[cfg(test)]
210            mutex_temperature_access_duration_exceeded: false,
211
212            #[cfg(test)]
213            mutex_ph_access_duration_exceeded: false,
214
215            #[cfg(test)]
216            mutex_conductivity_access_duration_exceeded: false,
217
218            last_measurement_instant: Instant::now(),
219            measurement_interval,
220            max_mutex_access_duration_millis: Duration::from_millis(
221                atlas_scientific_constants::MAX_MUTEX_ACCESS_DURATION_MILLIS,
222            ),
223        }
224    }
225
226    /// Checks if a given character is a valid part of a numeric string, considering optional negative signs.
227    ///
228    /// This private helper function is used during the parsing of sensor responses to validate
229    /// that each character in the data section is either a digit (`0`-`9`), a decimal point (`.`),
230    /// or, if `negative_allowed` is `true`, a negative sign (`-`).
231    ///
232    /// # Arguments
233    /// * `character` - The `u8` byte value of the character to be checked (typically an ASCII representation).
234    /// * `negative_allowed` - A `bool` flag. If `true`, a negative sign character (`-`) will be considered
235    ///   valid; otherwise, it will be considered invalid.
236    ///
237    /// # Returns
238    /// `true` if the character is a valid numeric component based on the rules; `false` otherwise.
239    #[allow(unused)] // used in conditionally compiled code
240    fn is_numeric(character: u8, negative_allowed: bool) -> bool {
241        match character {
242            atlas_scientific_constants::ASCII_VAL_ZERO
243                ..=atlas_scientific_constants::ASCII_VAL_NINE => true,
244            atlas_scientific_constants::DECIMAL_POINT => true,
245            atlas_scientific_constants::NEGATIVE_SIGN => negative_allowed,
246            _ => false,
247        }
248    }
249
250    /// Evaluates a raw byte array response from an Atlas Scientific sensor unit, validating its format and converting it to `f32`.
251    ///
252    /// This private helper function performs several critical steps to parse sensor data:
253    /// 1.  **Magic Number Check**: Verifies the protocol's start and end "magic" bytes.
254    /// 2.  **Numeric Content Validation**: Ensures all characters within the data section are valid numeric components.
255    /// 3.  **UTF-8 Conversion & Float Parsing**: Converts the validated byte sequence into a string and parses it into an `f32` value.
256    ///
257    /// The `is_ph` flag specifically controls whether a negative sign is allowed, as pH values are non-negative by definition.
258    ///
259    /// # Arguments
260    /// * `buffer` - A fixed-size array of `u8` bytes representing the complete message received from the sensor.
261    /// * `is_ph` - A `bool` flag indicating if the response is from a pH sensor (`true`) or another type (`false`).
262    ///
263    /// # Returns
264    /// A `Result` containing the successfully parsed `f32` sensor reading.
265    ///
266    /// # Errors
267    /// Returns an `AtlasScientificError` variant if any validation or parsing step fails:
268    /// - `FirstCharacterNotMagic`: If the message's first byte is incorrect.
269    /// - `LastCharacterNotMagic`: If the message's last byte is incorrect.
270    /// - `ContainsInvalidCharacter`: If the data section contains non-numeric characters.
271    /// - `InvalidUtf8Sequence`: If the numeric byte sequence is not valid UTF-8.
272    /// - `ConversionFailure`: If the valid numeric string cannot be parsed into an `f32`.
273    #[allow(unused)] // used in conditionally compiled code
274    fn check_response(
275        buffer: [u8; atlas_scientific_constants::MESSAGE_SIZE],
276        is_ph: bool,
277    ) -> Result<f32, AtlasScientificError> {
278        // first check: first character not magic
279        let first_character = buffer[atlas_scientific_constants::INDEX_FIRST_CHARACTER];
280        if first_character != atlas_scientific_constants::MESSAGE_START_MAGIC {
281            return Err(AtlasScientificError::FirstCharacterNotMagic(
282                module_path!().to_string(),
283                atlas_scientific_constants::MESSAGE_START_MAGIC,
284                first_character,
285            ));
286        }
287        // second check: last character not magic
288        let last_character = buffer[atlas_scientific_constants::INDEX_LAST_CHARACTER];
289        if last_character != atlas_scientific_constants::MESSAGE_FINISH_MAGIC {
290            return Err(AtlasScientificError::LastCharacterNotMagic(
291                module_path!().to_string(),
292                atlas_scientific_constants::MESSAGE_FINISH_MAGIC,
293                last_character,
294            ));
295        }
296
297        // get a slice of the array
298        let numeric_content = &buffer[atlas_scientific_constants::INDEX_CONTENT_START
299            ..atlas_scientific_constants::INDEX_PENULTIMATE_CHARACTER];
300
301        // third check: all characters in between belong to a number
302        for character in numeric_content {
303            if !Self::is_numeric(*character, !is_ph) {
304                return Err(AtlasScientificError::ContainsInvalidCharacter(
305                    module_path!().to_string(),
306                    *character,
307                ));
308            }
309        }
310
311        // conversion to String
312        let numeric_string = str::from_utf8(numeric_content).map_err(|e| {
313            AtlasScientificError::InvalidUtf8Sequence {
314                location: module_path!().to_string(),
315                source: e,
316            }
317        })?;
318
319        // conversion to f32
320        numeric_string
321            .parse::<f32>()
322            .map_err(|e| AtlasScientificError::ConversionFailure {
323                location: module_path!().to_string(),
324                source: e.clone(),
325            })
326    }
327
328    /// Constructs and sends a measurement request to the I2C interface thread.
329    ///
330    /// This function creates an `I2cRequest` based on the specified `signal`. It
331    /// retrieves the correct I2C address and sleep time from the configuration
332    /// for the given sensor. The request is then sent over the provided channel
333    /// to the I2C thread for execution.
334    ///
335    /// # Arguments
336    /// * `signal` - The `AquariumSignal` identifying which sensor to request a reading from.
337    /// * `atlas_scientific_channels` - A mutable reference to the struct containing the channels.
338    ///
339    /// # Returns
340    /// An empty `Result` (`Ok(())`) if the measurement request was successfully created and sent.
341    ///
342    /// # Errors
343    /// Returns an `AtlasScientificError` if any step fails:
344    /// - `InvalidSignal`: If an unsupported `AquariumSignal` is provided.
345    /// - `SignalRequestCreationFailure`: If the underlying `I2cRequest::new` call fails (e.g., command too long).
346    /// - `SendingRequestToI2cInterfaceFailed`: If sending the request over the channel fails, which
347    ///   typically means the receiver (the I2C thread) has been dropped.
348    #[allow(unused)] // used in conditionally compiled code
349    fn send_request_to_i2c_interface(
350        &mut self,
351        signal: &AquariumSignal,
352        atlas_scientific_channels: &mut AtlasScientificChannels,
353    ) -> Result<(), AtlasScientificError> {
354        let (i2c_address, sleep_time_millis) = match signal {
355            AquariumSignal::pH => (
356                self.config.address_atlas_scientific_ph,
357                self.config.sleep_time_millis_ph,
358            ),
359            AquariumSignal::Conductivity => (
360                self.config.address_atlas_scientific_conductivity,
361                self.config.sleep_time_millis_conductivity,
362            ),
363            AquariumSignal::WaterTemperature => (
364                self.config.address_atlas_scientific_temperature,
365                self.config.sleep_time_millis_temperature,
366            ),
367            _ => {
368                #[cfg(not(test))]
369                warn!(
370                    target: module_path!(),
371                    "ignoring invalid signal request ({signal})"
372                );
373                return Err(InvalidSignal(module_path!().to_string(), signal.clone()));
374            }
375        };
376
377        let command_buffer = [atlas_scientific_constants::COMMAND_READ];
378
379        let request = I2cRequest::new(
380            i2c_address,
381            &command_buffer,
382            sleep_time_millis,
383            atlas_scientific_constants::MESSAGE_SIZE,
384        )
385        .map_err(|e| SignalRequestCreationFailure {
386            location: module_path!().to_string(),
387            source: e,
388        })?;
389
390        match atlas_scientific_channels.send_to_i2c_interface(request) {
391            Ok(()) => {
392                self.lock_error_channel_send_i2c_interface = false;
393                Ok(())
394            }
395            Err(e) => Err(AtlasScientificError::SendingRequestToI2cInterfaceFailed {
396                location: module_path!().to_string(),
397                source: e,
398            }),
399        }
400    }
401
402    /// Atomically writes a sensor measurement result to the appropriate shared mutex.
403    ///
404    /// This helper function directs an `AtlasScientificResultData` to its corresponding
405    /// shared `Mutex` based on the provided `signal`. It centralizes the
406    /// mutex selection and locking logic.
407    ///
408    /// # Arguments
409    /// * `signal` - The `AquariumSignal` that determines which mutex to update.
410    /// * `result` - The `AtlasScientificResultData` to be written.
411    /// * `mutex_water_temperature` - The shared mutex for the water temperature result.
412    /// * `mutex_ph` - The shared mutex for the pH sensor result.
413    /// * `mutex_conductivity` - The shared mutex for the conductivity sensor result.
414    ///
415    /// # Returns
416    /// A `Result` containing the `Instant` at which the lock was released on success.
417    ///
418    /// # Errors
419    /// Returns `Err(AtlasScientificError::CouldNotLockMutex)` if the target mutex is
420    /// poisoned and cannot be locked.
421    #[allow(unused)] // used in conditionally compiled code
422    fn write_result_to_mutex(
423        &self,
424        signal: &AquariumSignal,
425        result: AtlasScientificResultData,
426        mutex_water_temperature: &Arc<Mutex<AtlasScientificResultData>>,
427        mutex_ph: &Arc<Mutex<AtlasScientificResultData>>,
428        mutex_conductivity: &Arc<Mutex<AtlasScientificResultData>>,
429    ) -> Result<Instant, AtlasScientificError> {
430        let target_mutex = match signal {
431            AquariumSignal::WaterTemperature => mutex_water_temperature,
432            AquariumSignal::pH => mutex_ph,
433            AquariumSignal::Conductivity => mutex_conductivity,
434            // For unsupported signals, do nothing and return success, matching original behavior.
435            _ => return Ok(Instant::now()),
436        };
437
438        match target_mutex.lock() {
439            Ok(mut guard) => *guard = result,
440            Err(_) => {
441                return Err(AtlasScientificError::CouldNotLockMutex {
442                    location: module_path!().to_string(),
443                    signal: signal.clone(),
444                });
445            }
446        }
447        Ok(Instant::now())
448    }
449
450    /// Receives and processes a measurement response from the I2C interface thread.
451    ///
452    /// This function orchestrates the full cycle of handling a sensor reading. It performs a
453    /// non-blocking receive on the I2C channel. If a message is available, it processes
454    /// the result, which can be either a successful data payload or an I2C-level error.
455    ///
456    /// For a successful payload, the function:
457    /// 1. Parses the raw byte buffer using `check_response`.
458    /// 2. Converts the valid data into an `AtlasScientificResultData` struct.
459    /// 3. Atomically writes this result to the correct shared `Mutex` using `write_result_to_mutex`.
460    /// 4. Monitors the duration of the mutex lock, issuing a warning if it exceeds the threshold.
461    ///
462    /// # Arguments
463    /// * `signal` - The `AquariumSignal` indicating which sensor's data is expected.
464    /// * `atlas_scientific_channels` - A mutable reference to the struct containing the channels.
465    /// * `mutex_water_temperature` - The shared `Mutex` for the water temperature result.
466    /// * `mutex_ph` - The shared `Mutex` for the pH result.
467    /// * `mutex_conductivity` - The shared `Mutex` for the conductivity result.
468    ///
469    /// # Returns
470    /// An `AtlasScientificResult` containing the `AtlasScientificResultData` if a valid sensor
471    /// reading was successfully received, parsed, and stored.
472    ///
473    /// # Errors
474    /// Returns an `AtlasScientificError` variant if any error occurred:
475    ///   - `ChannelIsEmpty`: A recoverable error indicating no message was available.
476    ///   - `ChannelIsDisconnected`: A critical error indicating the I2C thread has terminated.
477    ///   - `I2cCommunicationFailure`: The I2C thread reported a hardware-level communication error.
478    ///   - `ResponseCheckFailed` / `IncorrectBufferLength`: The received data payload was malformed.
479    ///   - `CouldNotLockMutex`: The target mutex was poisoned and could not be updated.
480    #[allow(unused)] // used in conditionally compiled code
481    fn receive_response_from_i2c_interface(
482        &mut self,
483        signal: &AquariumSignal,
484        atlas_scientific_channels: &mut AtlasScientificChannels,
485        mutex_water_temperature: &Arc<Mutex<AtlasScientificResultData>>,
486        mutex_ph: &Arc<Mutex<AtlasScientificResultData>>,
487        mutex_conductivity: &Arc<Mutex<AtlasScientificResultData>>,
488    ) -> AtlasScientificResult {
489        let is_ph = matches!(signal, AquariumSignal::pH);
490        let result: AtlasScientificResult;
491
492        match atlas_scientific_channels.receive_from_i2c_interface() {
493            Ok(i2c_result) => match i2c_result {
494                // Result from channel is available
495                Ok(i2c_response) => {
496                    // First, check if the response has the expected length for an Atlas sensor.
497                    if i2c_response.length != atlas_scientific_constants::MESSAGE_SIZE {
498                        result = Err(AtlasScientificError::IncorrectBufferLength(
499                            module_path!().to_string(),
500                            signal.clone(),
501                            i2c_response.length,
502                        ));
503                    } else {
504                        // The length is correct. Convert the relevant part of the buffer
505                        // into the [u8; 8] array that `check_response` expects.
506                        let response_buffer: [u8; atlas_scientific_constants::MESSAGE_SIZE] =
507                            i2c_response.read_buf[..atlas_scientific_constants::MESSAGE_SIZE]
508                                .try_into()
509                                .unwrap(); // This unwrap is safe due to the length check above.
510
511                        // Now, parse the validated buffer.
512                        let parsed_result = match Self::check_response(response_buffer, is_ph) {
513                            Ok(value) => Ok(AtlasScientificResultData::new(value)),
514                            Err(e) => Err(AtlasScientificError::ResponseCheckFailed {
515                                location: module_path!().to_string(),
516                                signal: signal.clone(),
517                                source: Box::new(e),
518                            }),
519                        };
520
521                        // If parsing was successful, write the result to the shared mutex.
522                        if let Ok(result_data) = parsed_result {
523                            let instant_before_locking_mutex = Instant::now();
524                            let result_data_clone = result_data.clone();
525                            let instant_after_locking_mutex = self.write_result_to_mutex(
526                                signal,
527                                result_data,
528                                mutex_water_temperature,
529                                mutex_ph,
530                                mutex_conductivity,
531                            )?; // The '?' will propagate a mutex lock error.
532
533                            // Check if access to mutex took too long.
534                            self.check_mutex_access_duration(
535                                Some(signal),
536                                instant_after_locking_mutex,
537                                instant_before_locking_mutex,
538                            );
539
540                            // The final result of this arm is the successfully parsed data.
541                            result = Ok(result_data_clone);
542                        } else {
543                            // If parsing failed, propagate the specific parsing error
544                            // instead of letting it fall through as a generic 'Initial' error.
545                            result = parsed_result;
546                        }
547                    }
548                }
549                Err(e) => {
550                    result = Err(AtlasScientificError::I2cCommunicationFailure {
551                        location: module_path!().to_string(),
552                        signal: signal.clone(),
553                        source: e,
554                    })
555                }
556            },
557            Err(e) => match e {
558                #[cfg(feature = "debug_channels")]
559                AquaChannelError::Full => {
560                    result = Err(AtlasScientificError::UnknownError(
561                        module_path!().to_string(),
562                    ));
563                }
564                AquaChannelError::Empty => {
565                    result = Err(AtlasScientificError::ChannelIsEmpty(
566                        module_path!().to_string(),
567                    ));
568                }
569                AquaChannelError::Disconnected => {
570                    result = Err(AtlasScientificError::ChannelIsDisconnected {
571                        location: module_path!().to_string(),
572                        source: e,
573                    });
574                }
575            },
576        }
577        result
578    }
579
580    /// Executes the main control loop for the Atlas Scientific sensor module.
581    ///
582    /// This function runs continuously, managing the periodic and sequential measurement of
583    /// water temperature, pH, and conductivity. It interacts with an I2C interface
584    /// thread by sending requests for one sensor at a time and then processing the response.
585    ///
586    /// The collected data (or any resulting error) is written to a shared `Mutex`,
587    /// making it available to other application threads. The loop respects the configured
588    /// `measurement_interval` between cycles and remains responsive to `Quit` and
589    /// `Terminate` commands from the signal handler for a graceful shutdown.
590    ///
591    /// # Arguments
592    /// * `atlas_scientific_channels` - A mutable reference to the struct containing all `mpsc` channels necessary for
593    ///   inter-thread communication with the `I2CInterface` and the `SignalHandler`.
594    /// * `mutex_atlas_scientific_water_temperature` - The shared `Mutex` for storing the water
595    ///   temperature result.
596    /// * `mutex_atlas_scientific_ph` - The shared `Mutex` for storing the pH result.
597    /// * `mutex_atlas_scientific_conductivity` - The shared `Mutex` for storing the conductivity result.
598    #[allow(unused)] // used in conditionally compiled code
599    pub fn execute(
600        &mut self,
601        atlas_scientific_channels: &mut AtlasScientificChannels,
602        mutex_atlas_scientific_water_temperature: Arc<Mutex<AtlasScientificResultData>>,
603        mutex_atlas_scientific_ph: Arc<Mutex<AtlasScientificResultData>>,
604        mutex_atlas_scientific_conductivity: Arc<Mutex<AtlasScientificResultData>>,
605    ) {
606        #[cfg(all(target_os = "linux", not(test)))]
607        info!(target: module_path!(), "Thread started with TID: {}", gettid());
608
609        let sleep_duration_ten_millis = Duration::from_millis(10);
610        let sleep_duration_one_millis = Duration::from_millis(1);
611        let spin_sleeper = SpinSleeper::default();
612
613        let mut state_send_receive: bool = false;
614        let mut channel_is_disconnected: bool = false;
615        let mut receive_counter: u32 = 0;
616
617        let mut signal = AquariumSignal::WaterTemperature;
618
619        loop {
620            let (
621                quit_command_received, // the request to end the application has been received
622                _,
623                _,
624            ) = self.process_external_request(
625                &mut atlas_scientific_channels.rx_atlas_scientific_from_signal_handler,
626                None,
627            );
628
629            if quit_command_received
630                && (!state_send_receive || receive_counter > RECEIVE_COUNTER_MAX)
631            {
632                // quit command has been received, and there is no pending measurement
633                // no further waiting for I2cInterface is required
634                break;
635            } else if quit_command_received {
636                break;
637            }
638
639            // Check if data acquisition is enabled.
640            if self.config.active && !channel_is_disconnected {
641                let duration_since_last_measurement =
642                    Instant::now().duration_since(self.last_measurement_instant);
643                //#[cfg(test)]
644                //println!(
645                //    "{}: state_send_receive={}, duration since last measurement={}, measurement_interval={}",
646                //    module_path!(),
647                //    state_send_receive,
648                //    duration_since_last_measurement.as_millis(),
649                //    self.measurement_interval.as_millis(),
650                //);
651                if !state_send_receive
652                    && duration_since_last_measurement > self.measurement_interval
653                {
654                    #[cfg(test)]
655                    println!(
656                        "{}: Sending to i2c interface request for {}",
657                        module_path!(),
658                        signal
659                    );
660                    let send_result =
661                        self.send_request_to_i2c_interface(&signal, atlas_scientific_channels);
662                    state_send_receive = match send_result {
663                        Ok(()) => true,
664                        Err(e) => {
665                            if !matches!(
666                                e,
667                                AtlasScientificError::SendingRequestToI2cInterfaceFailed { .. }
668                            ) || !self.lock_error_channel_send_i2c_interface
669                            {
670                                log_error_chain(
671                                    module_path!(),
672                                    "Error in communication with I2C thread.",
673                                    e,
674                                );
675                            }
676                            false
677                        }
678                    };
679
680                    #[cfg(test)]
681                    println!(
682                        "{}: Sent to i2c interface request for {}",
683                        module_path!(),
684                        signal
685                    );
686                } else if state_send_receive {
687                    // read from the channel if the measurement result is available from I2cInterface thread
688                    match self.receive_response_from_i2c_interface(
689                        &signal,
690                        atlas_scientific_channels,
691                        &mutex_atlas_scientific_water_temperature,
692                        &mutex_atlas_scientific_ph,
693                        &mutex_atlas_scientific_conductivity,
694                    ) {
695                        Ok(_) => {
696                            state_send_receive = false; // allow sending again
697                        }
698                        Err(e) => {
699                            if matches!(e, AtlasScientificError::ChannelIsEmpty(_)) {
700                                state_send_receive = true; // receive again
701                            } else if matches!(
702                                e,
703                                AtlasScientificError::ChannelIsDisconnected {
704                                    location: _,
705                                    source: _
706                                }
707                            ) {
708                                channel_is_disconnected = true;
709                            } else {
710                                log_error_chain(
711                                    module_path!(),
712                                    "Encountered error trying to receive from I2C",
713                                    e,
714                                );
715                                state_send_receive = false; // allow sending again
716                            }
717                        }
718                    }
719
720                    if !state_send_receive {
721                        // data could be received successfully
722                        receive_counter = 0;
723                        self.last_measurement_instant = Instant::now();
724                        signal = signal.get_next_atlas_scientific_signal();
725                    } else {
726                        receive_counter = receive_counter.saturating_add(1);
727                    }
728                }
729            }
730
731            spin_sleeper.sleep(sleep_duration_ten_millis);
732        }
733
734        atlas_scientific_channels.acknowledge_signal_handler();
735
736        // This thread has channel connections to underlying threads.
737        // Those threads have to stop receiving commands from this thread.
738        // The shutdown sequence is handled by the signal_handler module.
739        self.wait_for_termination(
740            &mut atlas_scientific_channels.rx_atlas_scientific_from_signal_handler,
741            sleep_duration_one_millis,
742            module_path!(),
743        );
744    }
745}
746
747#[cfg(test)]
748pub mod tests {
749    use crate::launch::channels::{channel, AquaReceiver, AquaSender, Channels};
750    use crate::mocks::mock_i2c_interface::tests::MockI2cInterface;
751    use crate::sensors::atlas_scientific::{
752        atlas_scientific_constants, AtlasScientific, AtlasScientificChannels, AtlasScientificError,
753        AtlasScientificResultData,
754    };
755    use crate::sensors::i2c_error::I2cError;
756    use crate::sensors::i2c_interface::{I2cRequest, I2cResponse, I2cResult};
757    use crate::utilities::channel_content::{AquariumSignal, InternalCommand};
758    use crate::utilities::config::{read_config_file, ConfigData};
759    use assert_float_eq::*;
760    use spin_sleep::SpinSleeper;
761    use std::sync::{Arc, Mutex};
762    use std::thread;
763    use std::time::{Duration, Instant};
764
765    #[test]
766    // happy case for evaluating response from pH sensor unit
767    pub fn test_atlas_scientific_check_response_valid_ph() {
768        let stimuli: [u8; atlas_scientific_constants::MESSAGE_SIZE] = [
769            atlas_scientific_constants::MESSAGE_START_MAGIC,
770            '8' as u8,
771            atlas_scientific_constants::DECIMAL_POINT,
772            '0' as u8,
773            '0' as u8,
774            '0' as u8,
775            '0' as u8,
776            atlas_scientific_constants::MESSAGE_FINISH_MAGIC,
777        ];
778        let result = AtlasScientific::check_response(stimuli, true);
779        match result {
780            Ok(c) => {
781                assert_float_absolute_eq!(c, 8.0, 0.001);
782            }
783            Err(e) => {
784                panic!("test_atlas_scientific_check_response_valid_ph returned error: {e:?}");
785            }
786        }
787    }
788    #[test]
789    // evaluating response from pH sensor unit
790    // checking if negative values are correctly identified
791    pub fn test_atlas_scientific_check_response_invalid_ph_negative() {
792        let sign = '-' as u8;
793
794        let stimuli: [u8; atlas_scientific_constants::MESSAGE_SIZE] = [
795            atlas_scientific_constants::MESSAGE_START_MAGIC,
796            sign,
797            '8' as u8,
798            atlas_scientific_constants::DECIMAL_POINT,
799            '0' as u8,
800            '0' as u8,
801            '0' as u8,
802            atlas_scientific_constants::MESSAGE_FINISH_MAGIC,
803        ];
804
805        let result = AtlasScientific::check_response(stimuli, true);
806
807        let error_result = result.unwrap_err();
808
809        assert!(matches!(
810            error_result,
811            AtlasScientificError::ContainsInvalidCharacter(_, _sign)
812        ));
813    }
814
815    #[test]
816    // happy case for evaluating response from conductivity sensor
817    pub fn test_i2c_interface_check_response_valid_conductivity() {
818        let stimuli: [u8; atlas_scientific_constants::MESSAGE_SIZE] = [
819            atlas_scientific_constants::MESSAGE_START_MAGIC,
820            '5' as u8,
821            '0' as u8,
822            '0' as u8,
823            '0' as u8,
824            '0' as u8,
825            '0' as u8,
826            atlas_scientific_constants::MESSAGE_FINISH_MAGIC,
827        ];
828        let result = AtlasScientific::check_response(stimuli, false);
829        match result {
830            Ok(c) => {
831                assert_float_absolute_eq!(c, 50000.0, 0.001);
832            }
833            Err(e) => {
834                panic!("test_i2c_interface_check_response_valid_conductivity return error: {e:?}");
835            }
836        }
837    }
838
839    // happy case for evaluating the response from the temperature sensor unit
840    #[test]
841    pub fn test_atlas_scientific_check_response_valid_temperature() {
842        let stimuli: [u8; atlas_scientific_constants::MESSAGE_SIZE] = [
843            atlas_scientific_constants::MESSAGE_START_MAGIC,
844            '2' as u8,
845            '5' as u8,
846            atlas_scientific_constants::DECIMAL_POINT,
847            '0' as u8,
848            '0' as u8,
849            '0' as u8,
850            atlas_scientific_constants::MESSAGE_FINISH_MAGIC,
851        ];
852        let result = AtlasScientific::check_response(stimuli, false);
853        match result {
854            Ok(c) => {
855                assert_float_absolute_eq!(c, 25.0, 0.001);
856            }
857            Err(e) => {
858                panic!(
859                    "test_atlas_scientific_check_response_valid_temperature return error: {e:?}"
860                );
861            }
862        }
863    }
864
865    #[test]
866    // checking if an incorrect magic number in the front of the message is identified correctly
867    pub fn test_atlas_scientific_check_response_invalid_magic_front() {
868        let incorrect_magic_front = atlas_scientific_constants::MESSAGE_START_MAGIC - 1;
869
870        let stimuli: [u8; atlas_scientific_constants::MESSAGE_SIZE] = [
871            incorrect_magic_front,
872            '2' as u8,
873            '5' as u8,
874            atlas_scientific_constants::DECIMAL_POINT,
875            '0' as u8,
876            '0' as u8,
877            '0' as u8,
878            atlas_scientific_constants::MESSAGE_FINISH_MAGIC,
879        ];
880
881        let result = AtlasScientific::check_response(stimuli, false);
882
883        let error_result = result.unwrap_err();
884
885        assert!(matches!(
886            error_result,
887            AtlasScientificError::FirstCharacterNotMagic(
888                _,
889                atlas_scientific_constants::MESSAGE_START_MAGIC,
890                _incorrect_magic_front
891            )
892        ));
893    }
894
895    #[test]
896    // checking if an incorrect magic number in the rear part of the message is identified correctly
897    pub fn test_atlas_scientific_check_response_invalid_magic_rear() {
898        let incorrect_magic_rear = atlas_scientific_constants::MESSAGE_FINISH_MAGIC + 1;
899
900        let stimuli: [u8; atlas_scientific_constants::MESSAGE_SIZE] = [
901            atlas_scientific_constants::MESSAGE_START_MAGIC,
902            '2' as u8,
903            '5' as u8,
904            atlas_scientific_constants::DECIMAL_POINT,
905            '0' as u8,
906            '0' as u8,
907            '0' as u8,
908            incorrect_magic_rear,
909        ];
910
911        let result = AtlasScientific::check_response(stimuli, false);
912
913        let error_result = result.unwrap_err();
914
915        assert!(matches!(
916            error_result,
917            AtlasScientificError::LastCharacterNotMagic(
918                _,
919                atlas_scientific_constants::MESSAGE_FINISH_MAGIC,
920                _incorrect_magic_rear
921            )
922        ));
923    }
924
925    #[test]
926    // checking if an incorrect sequence of numeric characters is identified correctly
927    pub fn test_atlas_scientific_check_response_invalid_conversion() {
928        let stimuli: [u8; atlas_scientific_constants::MESSAGE_SIZE] = [
929            atlas_scientific_constants::MESSAGE_START_MAGIC,
930            '2' as u8,
931            '5' as u8,
932            atlas_scientific_constants::NEGATIVE_SIGN,
933            '0' as u8,
934            '0' as u8,
935            '0' as u8,
936            atlas_scientific_constants::MESSAGE_FINISH_MAGIC,
937        ];
938        assert!(matches!(
939            AtlasScientific::check_response(stimuli, false),
940            Err(AtlasScientificError::ConversionFailure {
941                location: _,
942                source: _
943            })
944        ));
945    }
946
947    #[test]
948    // Test case checks if the check_response function correctly identifies an
949    // invalid non-numeric character (like an alphabet letter) in the data payload.
950    fn test_atlas_scientific_check_response_invalid_character_in_payload() {
951        let stimuli: [u8; atlas_scientific_constants::MESSAGE_SIZE] = [
952            atlas_scientific_constants::MESSAGE_START_MAGIC,
953            b'2',
954            b'5',
955            atlas_scientific_constants::DECIMAL_POINT,
956            b'A', // Invalid character
957            b'0',
958            b'0',
959            atlas_scientific_constants::MESSAGE_FINISH_MAGIC,
960        ];
961        let result = AtlasScientific::check_response(stimuli, false);
962
963        let error_result = result.unwrap_err();
964
965        assert!(matches!(
966            error_result,
967            AtlasScientificError::ContainsInvalidCharacter(_, b'A')
968        ));
969    }
970
971    #[test]
972    // Test the error response of get_response_from_i2c when requesting an invalid signal.
973    pub fn test_atlas_scientific_error_response_to_invalid_signal_request() {
974        let config: ConfigData =
975            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
976        let mut atlas_scientific = AtlasScientific::new(config.atlas_scientific);
977
978        let mut channels = Channels::new_for_test();
979
980        let result = atlas_scientific.send_request_to_i2c_interface(
981            &AquariumSignal::MockInvalidSignal,
982            &mut channels.atlas_scientific,
983        );
984
985        assert!(matches!(
986            result,
987            Err(AtlasScientificError::InvalidSignal(_, _))
988        ));
989    }
990
991    #[test]
992    // Test function receive_response_from_i2c_interface
993    pub fn test_receive_response_from_i2c_interface() {
994        let mut stimuli_array_temperature = [0u8; 64];
995
996        let message_temperature: [u8; atlas_scientific_constants::MESSAGE_SIZE] = [
997            atlas_scientific_constants::MESSAGE_START_MAGIC,
998            '2' as u8,
999            '3' as u8,
1000            atlas_scientific_constants::DECIMAL_POINT,
1001            '1' as u8,
1002            '0' as u8,
1003            '0' as u8,
1004            atlas_scientific_constants::MESSAGE_FINISH_MAGIC,
1005        ];
1006        stimuli_array_temperature[..message_temperature.len()]
1007            .copy_from_slice(&message_temperature);
1008        let stimuli_i2c_response_temperature = I2cResponse {
1009            read_buf: stimuli_array_temperature,
1010            length: atlas_scientific_constants::MESSAGE_SIZE,
1011        };
1012
1013        let mut stimuli_array_conductivity = [0u8; 64];
1014        let message_conductivity: [u8; atlas_scientific_constants::MESSAGE_SIZE] = [
1015            atlas_scientific_constants::MESSAGE_START_MAGIC,
1016            b'4',
1017            b'8',
1018            b'0',
1019            b'0',
1020            b'0',
1021            b'0',
1022            atlas_scientific_constants::MESSAGE_FINISH_MAGIC,
1023        ];
1024        // Copy the 8-byte message into the beginning of the 64-byte buffer.
1025        // The rest of the buffer will remain filled with zeros.
1026        stimuli_array_conductivity[..message_conductivity.len()]
1027            .copy_from_slice(&message_conductivity);
1028        let stimuli_i2c_response_conductivity = I2cResponse {
1029            read_buf: stimuli_array_conductivity,
1030            length: atlas_scientific_constants::MESSAGE_SIZE,
1031        };
1032
1033        let mut stimuli_array_ph = [0u8; 64];
1034        let message_ph: [u8; atlas_scientific_constants::MESSAGE_SIZE] = [
1035            atlas_scientific_constants::MESSAGE_START_MAGIC,
1036            '7' as u8,
1037            atlas_scientific_constants::DECIMAL_POINT,
1038            '2' as u8,
1039            '0' as u8,
1040            '0' as u8,
1041            '0' as u8,
1042            atlas_scientific_constants::MESSAGE_FINISH_MAGIC,
1043        ];
1044        stimuli_array_ph[..message_ph.len()].copy_from_slice(&message_ph);
1045        let stimuli_i2c_response_ph = I2cResponse {
1046            read_buf: stimuli_array_ph,
1047            length: atlas_scientific_constants::MESSAGE_SIZE,
1048        };
1049
1050        let config: ConfigData =
1051            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
1052
1053        let mut atlas_scientific = AtlasScientific::new(config.atlas_scientific);
1054
1055        let mut channels = Channels::new_for_test();
1056
1057        // Mutexes for Atlas Scientific signals
1058        let mutex_atlas_scientific_temperature =
1059            Arc::new(Mutex::new(AtlasScientificResultData::new(
1060                config.sensor_manager.replacement_value_water_temperature,
1061            )));
1062
1063        let mutex_atlas_scientific_ph = Arc::new(Mutex::new(AtlasScientificResultData::new(
1064            config.sensor_manager.replacement_value_ph,
1065        )));
1066
1067        let mutex_atlas_scientific_conductivity = Arc::new(Mutex::new(
1068            AtlasScientificResultData::new(config.sensor_manager.replacement_value_conductivity),
1069        ));
1070
1071        // Sending stimuli for water temperature before triggering try_recv from the test object
1072        _ = channels
1073            .i2c_interface
1074            .send_to_atlas_scientific(Ok(stimuli_i2c_response_temperature));
1075
1076        _ = atlas_scientific.receive_response_from_i2c_interface(
1077            &AquariumSignal::WaterTemperature,
1078            &mut channels.atlas_scientific,
1079            &mutex_atlas_scientific_temperature,
1080            &mutex_atlas_scientific_ph,
1081            &mutex_atlas_scientific_conductivity,
1082        );
1083
1084        match mutex_atlas_scientific_temperature.lock() {
1085            Ok(result) => {
1086                let result_data = result.clone();
1087                assert_eq!(result_data.value, 23.1);
1088            }
1089            Err(e) => {
1090                panic!("mutex_atlas_scientific_temperature lock poisoned: {:?}", e);
1091            }
1092        };
1093
1094        // Sending stimuli for pH before triggering try_recv from the test object
1095        _ = channels
1096            .i2c_interface
1097            .send_to_atlas_scientific(Ok(stimuli_i2c_response_ph));
1098
1099        _ = atlas_scientific.receive_response_from_i2c_interface(
1100            &AquariumSignal::pH,
1101            &mut channels.atlas_scientific,
1102            &mutex_atlas_scientific_temperature,
1103            &mutex_atlas_scientific_ph,
1104            &mutex_atlas_scientific_conductivity,
1105        );
1106
1107        match mutex_atlas_scientific_ph.lock() {
1108            Ok(result) => {
1109                let result_data = result.clone();
1110                assert_eq!(result_data.value, 7.2);
1111            }
1112            Err(e) => {
1113                panic!("mutex_atlas_scientific_ph lock poisoned: {:?}", e);
1114            }
1115        };
1116
1117        // Sending stimuli for conductivity before triggering try_recv from the test object
1118        _ = channels
1119            .i2c_interface
1120            .send_to_atlas_scientific(Ok(stimuli_i2c_response_conductivity));
1121
1122        _ = atlas_scientific.receive_response_from_i2c_interface(
1123            &AquariumSignal::Conductivity,
1124            &mut channels.atlas_scientific,
1125            &mutex_atlas_scientific_temperature,
1126            &mutex_atlas_scientific_ph,
1127            &mutex_atlas_scientific_conductivity,
1128        );
1129
1130        match mutex_atlas_scientific_conductivity.lock() {
1131            Ok(result) => {
1132                let result_data = result.clone();
1133                assert_eq!(result_data.value, 48000.0);
1134            }
1135            Err(e) => {
1136                panic!("mutex_atlas_scientific_conductivity lock poisoned: {:?}", e);
1137            }
1138        };
1139    }
1140
1141    #[test]
1142    // Test function receive_response_from_i2c_interface
1143    pub fn test_atlas_scientific_interfaces_without_blocking_mutexes() {
1144        let mut stimuli_array_temperature = [0u8; 64];
1145        let message_temperature: [u8; atlas_scientific_constants::MESSAGE_SIZE] = [
1146            atlas_scientific_constants::MESSAGE_START_MAGIC,
1147            '2' as u8,
1148            '3' as u8,
1149            atlas_scientific_constants::DECIMAL_POINT,
1150            '1' as u8,
1151            '0' as u8,
1152            '0' as u8,
1153            atlas_scientific_constants::MESSAGE_FINISH_MAGIC,
1154        ];
1155        stimuli_array_temperature[..message_temperature.len()]
1156            .copy_from_slice(&message_temperature);
1157        let stimuli_i2c_response_temperature = I2cResponse {
1158            read_buf: stimuli_array_temperature,
1159            length: atlas_scientific_constants::MESSAGE_SIZE,
1160        };
1161
1162        let mut stimuli_array_conductivity = [0u8; 64];
1163        let message_conductivity: [u8; atlas_scientific_constants::MESSAGE_SIZE] = [
1164            atlas_scientific_constants::MESSAGE_START_MAGIC,
1165            '4' as u8,
1166            '8' as u8,
1167            '0' as u8,
1168            '0' as u8,
1169            '0' as u8,
1170            '0' as u8,
1171            atlas_scientific_constants::MESSAGE_FINISH_MAGIC,
1172        ];
1173        stimuli_array_conductivity[..message_conductivity.len()]
1174            .copy_from_slice(&message_conductivity);
1175        let stimuli_i2c_response_conductivity = I2cResponse {
1176            read_buf: stimuli_array_conductivity,
1177            length: atlas_scientific_constants::MESSAGE_SIZE,
1178        };
1179
1180        let mut stimuli_array_ph = [0u8; 64];
1181        let message_ph: [u8; atlas_scientific_constants::MESSAGE_SIZE] = [
1182            atlas_scientific_constants::MESSAGE_START_MAGIC,
1183            '7' as u8,
1184            atlas_scientific_constants::DECIMAL_POINT,
1185            '2' as u8,
1186            '0' as u8,
1187            '0' as u8,
1188            '0' as u8,
1189            atlas_scientific_constants::MESSAGE_FINISH_MAGIC,
1190        ];
1191        stimuli_array_ph[..message_ph.len()].copy_from_slice(&message_ph);
1192        let stimuli_i2c_response_ph = I2cResponse {
1193            read_buf: stimuli_array_ph,
1194            length: atlas_scientific_constants::MESSAGE_SIZE,
1195        };
1196
1197        let config: ConfigData =
1198            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
1199
1200        let measurement_interval_millis_for_test_environment =
1201            config.atlas_scientific.measurement_interval_millis;
1202
1203        let mut mock_i2c_interface = MockI2cInterface::new(
1204            &config.atlas_scientific,
1205            stimuli_i2c_response_temperature.clone(),
1206            stimuli_i2c_response_ph.clone(),
1207            stimuli_i2c_response_conductivity.clone(),
1208        );
1209
1210        let mut atlas_scientific = AtlasScientific::new(config.atlas_scientific);
1211
1212        let mut channels = Channels::new_for_test();
1213
1214        // Mutexes for Atlas Scientific signals
1215        let mutex_atlas_scientific_temperature =
1216            Arc::new(Mutex::new(AtlasScientificResultData::new(
1217                config.sensor_manager.replacement_value_water_temperature,
1218            )));
1219        let mutex_atlas_scientific_temperature_clone_for_asserts =
1220            mutex_atlas_scientific_temperature.clone();
1221
1222        let mutex_atlas_scientific_ph = Arc::new(Mutex::new(AtlasScientificResultData::new(
1223            config.sensor_manager.replacement_value_ph,
1224        )));
1225        let mutex_atlas_scientific_ph_clone_for_asserts = mutex_atlas_scientific_ph.clone();
1226
1227        let mutex_atlas_scientific_conductivity = Arc::new(Mutex::new(
1228            AtlasScientificResultData::new(config.sensor_manager.replacement_value_conductivity),
1229        ));
1230        let mutex_atlas_scientific_conductivity_clone_for_asserts =
1231            mutex_atlas_scientific_conductivity.clone();
1232
1233        // thread for test environment (incl. signal handler)
1234        let join_handle_test_environment = thread::Builder::new()
1235            .name("test_environment".to_string())
1236            .spawn(move || {
1237                // let test run a bit longer than the measurement interval
1238                let sleep_time = Duration::from_millis(
1239                    measurement_interval_millis_for_test_environment * 3 + 1000,
1240                );
1241                let spin_sleeper = SpinSleeper::default();
1242                spin_sleeper.sleep(sleep_time);
1243                channels
1244                    .signal_handler
1245                    .send_to_atlas_scientific(InternalCommand::Quit)
1246                    .unwrap();
1247                channels
1248                    .signal_handler
1249                    .send_to_i2c_interface(InternalCommand::Quit)
1250                    .unwrap();
1251                channels
1252                    .signal_handler
1253                    .send_to_atlas_scientific(InternalCommand::Terminate)
1254                    .unwrap();
1255            })
1256            .unwrap();
1257
1258        // thread for mock i2c interface
1259        let join_handle_mock_i2c_interface = thread::Builder::new()
1260            .name("i2c_interface".to_string())
1261            .spawn(move || {
1262                mock_i2c_interface.execute(channels.i2c_interface);
1263            })
1264            .unwrap();
1265
1266        // thread for the test object
1267        let join_handle_test_object = thread::Builder::new()
1268            .name("test_object".to_string())
1269            .spawn(move || {
1270                atlas_scientific.execute(
1271                    &mut channels.atlas_scientific,
1272                    mutex_atlas_scientific_temperature,
1273                    mutex_atlas_scientific_ph,
1274                    mutex_atlas_scientific_conductivity,
1275                );
1276            })
1277            .unwrap();
1278
1279        join_handle_test_object
1280            .join()
1281            .expect("Test object did not finish.");
1282        join_handle_mock_i2c_interface
1283            .join()
1284            .expect("Mock i2c interface did not finish.");
1285        join_handle_test_environment
1286            .join()
1287            .expect("Test environment did not finish.");
1288
1289        match mutex_atlas_scientific_temperature_clone_for_asserts.lock() {
1290            Ok(result) => {
1291                let result_data = result.clone();
1292                assert_eq!(result_data.value, 23.1);
1293            }
1294            Err(e) => {
1295                panic!("mutex_atlas_scientific_temperature lock poisoned: {:?}", e);
1296            }
1297        };
1298
1299        match mutex_atlas_scientific_ph_clone_for_asserts.lock() {
1300            Ok(result) => {
1301                let result_data = result.clone();
1302                assert_eq!(result_data.value, 7.2);
1303            }
1304            Err(e) => {
1305                panic!("mutex_atlas_scientific_ph lock poisoned: {:?}", e);
1306            }
1307        };
1308
1309        match mutex_atlas_scientific_conductivity_clone_for_asserts.lock() {
1310            Ok(result) => {
1311                let result_data = result.clone();
1312                assert_eq!(result_data.value, 48000.0);
1313            }
1314            Err(e) => {
1315                panic!("mutex_atlas_scientific_conductivity lock poisoned: {:?}", e);
1316            }
1317        };
1318    }
1319
1320    #[test]
1321    // Test function receive_response_from_i2c_interface
1322    pub fn test_atlas_scientific_interfaces_with_blocking_mutexes() {
1323        let mut stimuli_array_temperature = [0u8; 64];
1324        let message_temperature: [u8; atlas_scientific_constants::MESSAGE_SIZE] = [
1325            atlas_scientific_constants::MESSAGE_START_MAGIC,
1326            '2' as u8,
1327            '3' as u8,
1328            atlas_scientific_constants::DECIMAL_POINT,
1329            '1' as u8,
1330            '0' as u8,
1331            '0' as u8,
1332            atlas_scientific_constants::MESSAGE_FINISH_MAGIC,
1333        ];
1334        stimuli_array_temperature[..message_temperature.len()]
1335            .copy_from_slice(&message_temperature);
1336        let stimuli_i2c_response_temperature = I2cResponse {
1337            read_buf: stimuli_array_temperature,
1338            length: atlas_scientific_constants::MESSAGE_SIZE,
1339        };
1340
1341        let mut stimuli_array_conductivity = [0u8; 64];
1342        let message_conductivity: [u8; atlas_scientific_constants::MESSAGE_SIZE] = [
1343            atlas_scientific_constants::MESSAGE_START_MAGIC,
1344            '4' as u8,
1345            '8' as u8,
1346            '0' as u8,
1347            '0' as u8,
1348            '0' as u8,
1349            '0' as u8,
1350            atlas_scientific_constants::MESSAGE_FINISH_MAGIC,
1351        ];
1352        stimuli_array_conductivity[..message_conductivity.len()]
1353            .copy_from_slice(&message_conductivity);
1354        let stimuli_i2c_response_conductivity = I2cResponse {
1355            read_buf: stimuli_array_conductivity,
1356            length: atlas_scientific_constants::MESSAGE_SIZE,
1357        };
1358
1359        let mut stimuli_array_ph = [0u8; 64];
1360        let message_ph: [u8; atlas_scientific_constants::MESSAGE_SIZE] = [
1361            atlas_scientific_constants::MESSAGE_START_MAGIC,
1362            '7' as u8,
1363            atlas_scientific_constants::DECIMAL_POINT,
1364            '2' as u8,
1365            '0' as u8,
1366            '0' as u8,
1367            '0' as u8,
1368            atlas_scientific_constants::MESSAGE_FINISH_MAGIC,
1369        ];
1370        stimuli_array_ph[..message_ph.len()].copy_from_slice(&message_ph);
1371        let stimuli_i2c_response_ph = I2cResponse {
1372            read_buf: stimuli_array_ph,
1373            length: atlas_scientific_constants::MESSAGE_SIZE,
1374        };
1375
1376        let config: ConfigData =
1377            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
1378
1379        let measurement_interval_millis_for_test_environment =
1380            config.atlas_scientific.measurement_interval_millis;
1381
1382        let mut mock_i2c_interface = MockI2cInterface::new(
1383            &config.atlas_scientific,
1384            stimuli_i2c_response_temperature.clone(),
1385            stimuli_i2c_response_ph.clone(),
1386            stimuli_i2c_response_conductivity.clone(),
1387        );
1388
1389        let mut atlas_scientific = AtlasScientific::new(config.atlas_scientific);
1390
1391        let mut channels = Channels::new_for_test();
1392
1393        // Mutexes for Atlas Scientific signals
1394        let mutex_atlas_scientific_temperature =
1395            Arc::new(Mutex::new(AtlasScientificResultData::new(
1396                config.sensor_manager.replacement_value_water_temperature,
1397            )));
1398        let mutex_atlas_scientific_temperature_clone_for_asserts =
1399            mutex_atlas_scientific_temperature.clone();
1400        let mutex_atlas_scientific_temperature_clone_for_test_environment =
1401            mutex_atlas_scientific_temperature.clone();
1402
1403        let mutex_atlas_scientific_ph = Arc::new(Mutex::new(AtlasScientificResultData::new(
1404            config.sensor_manager.replacement_value_ph,
1405        )));
1406        let mutex_atlas_scientific_ph_clone_for_asserts = mutex_atlas_scientific_ph.clone();
1407        let mutex_atlas_scientific_ph_clone_for_test_environment =
1408            mutex_atlas_scientific_ph.clone();
1409
1410        let mutex_atlas_scientific_conductivity = Arc::new(Mutex::new(
1411            AtlasScientificResultData::new(config.sensor_manager.replacement_value_conductivity),
1412        ));
1413        let mutex_atlas_scientific_conductivity_clone_for_asserts =
1414            mutex_atlas_scientific_conductivity.clone();
1415        let mutex_atlas_scientific_conductivity_clone_for_test_environment =
1416            mutex_atlas_scientific_conductivity.clone();
1417
1418        // thread for test environment (incl. signal handler)
1419        let join_handle_test_environment = thread::Builder::new()
1420            .name("test_environment".to_string())
1421            .spawn(move || {
1422                let sleep_time =
1423                    Duration::from_millis(measurement_interval_millis_for_test_environment + 1000);
1424                let spin_sleeper = SpinSleeper::default();
1425
1426                // lock the mutexes and wait for triggering the detection
1427                match mutex_atlas_scientific_temperature_clone_for_test_environment.lock() {
1428                    Ok(_) => {
1429                        spin_sleeper.sleep(sleep_time);
1430                    }
1431                    Err(e) => {
1432                        panic!("mutex_atlas_scientific_temperature lock poisoned: {:?}", e);
1433                    }
1434                };
1435                match mutex_atlas_scientific_ph_clone_for_test_environment.lock() {
1436                    Ok(_) => {
1437                        spin_sleeper.sleep(sleep_time);
1438                    }
1439                    Err(e) => {
1440                        panic!("mutex_atlas_scientific_ph lock poisoned: {:?}", e);
1441                    }
1442                };
1443                match mutex_atlas_scientific_conductivity_clone_for_test_environment.lock() {
1444                    Ok(_) => {
1445                        spin_sleeper.sleep(sleep_time);
1446                    }
1447                    Err(e) => {
1448                        panic!("mutex_atlas_scientific_conductivity lock poisoned: {:?}", e);
1449                    }
1450                };
1451
1452                channels
1453                    .signal_handler
1454                    .send_to_atlas_scientific(InternalCommand::Quit)
1455                    .unwrap();
1456                channels
1457                    .signal_handler
1458                    .send_to_i2c_interface(InternalCommand::Quit)
1459                    .unwrap();
1460                channels
1461                    .signal_handler
1462                    .send_to_atlas_scientific(InternalCommand::Terminate)
1463                    .unwrap();
1464            })
1465            .unwrap();
1466
1467        // thread for mock i2c interface
1468        let join_handle_mock_i2c_interface = thread::Builder::new()
1469            .name("i2c_interface".to_string())
1470            .spawn(move || {
1471                mock_i2c_interface.execute(channels.i2c_interface);
1472            })
1473            .unwrap();
1474
1475        // thread for the test object
1476        let join_handle_test_object = thread::Builder::new()
1477            .name("test_object".to_string())
1478            .spawn(move || {
1479                atlas_scientific.execute(
1480                    &mut channels.atlas_scientific,
1481                    mutex_atlas_scientific_temperature,
1482                    mutex_atlas_scientific_ph,
1483                    mutex_atlas_scientific_conductivity,
1484                );
1485                assert_eq!(
1486                    atlas_scientific.mutex_temperature_access_duration_exceeded,
1487                    true
1488                );
1489                assert_eq!(atlas_scientific.mutex_ph_access_duration_exceeded, true);
1490                assert_eq!(
1491                    atlas_scientific.mutex_conductivity_access_duration_exceeded,
1492                    true
1493                );
1494            })
1495            .unwrap();
1496
1497        join_handle_test_object
1498            .join()
1499            .expect("Test object did not finish.");
1500        join_handle_mock_i2c_interface
1501            .join()
1502            .expect("Mock i2c interface did not finish.");
1503        join_handle_test_environment
1504            .join()
1505            .expect("Test environment did not finish.");
1506
1507        match mutex_atlas_scientific_temperature_clone_for_asserts.lock() {
1508            Ok(result) => {
1509                let result_data = result.clone();
1510                assert_eq!(result_data.value, 23.1);
1511            }
1512            Err(e) => {
1513                panic!("mutex_atlas_scientific_temperature lock poisoned: {:?}", e);
1514            }
1515        };
1516
1517        match mutex_atlas_scientific_ph_clone_for_asserts.lock() {
1518            Ok(result) => {
1519                let result_data = result.clone();
1520                assert_eq!(result_data.value, 7.2);
1521            }
1522            Err(e) => {
1523                panic!("mutex_atlas_scientific_ph lock poisoned: {:?}", e);
1524            }
1525        };
1526
1527        match mutex_atlas_scientific_conductivity_clone_for_asserts.lock() {
1528            Ok(result) => {
1529                let result_data = result.clone();
1530                assert_eq!(result_data.value, 48000.0);
1531            }
1532            Err(e) => {
1533                panic!("mutex_atlas_scientific_conductivity lock poisoned: {:?}", e);
1534            }
1535        };
1536    }
1537
1538    #[test]
1539    // Test case checks if the is_numeric function correctly identifies valid digits.
1540    fn test_is_numeric_with_valid_digits() {
1541        // Test with '0'
1542        assert!(AtlasScientific::is_numeric(b'0', false));
1543        assert!(AtlasScientific::is_numeric(b'0', true));
1544
1545        // Test with '5'
1546        assert!(AtlasScientific::is_numeric(b'5', false));
1547        assert!(AtlasScientific::is_numeric(b'5', true));
1548
1549        // Test with '9'
1550        assert!(AtlasScientific::is_numeric(b'9', false));
1551        assert!(AtlasScientific::is_numeric(b'9', true));
1552    }
1553
1554    #[test]
1555    // Test case checks if the is_numeric function correctly identifies the decimal point.
1556    fn test_is_numeric_with_decimal_point() {
1557        assert!(AtlasScientific::is_numeric(b'.', false));
1558        assert!(AtlasScientific::is_numeric(b'.', true));
1559    }
1560
1561    #[test]
1562    // Test case checks if the is_numeric function correctly handles the negative sign
1563    // when negative values are allowed.
1564    fn test_is_numeric_with_negative_sign_allowed() {
1565        assert!(AtlasScientific::is_numeric(b'-', true));
1566    }
1567
1568    #[test]
1569    // Test case checks if the is_numeric function correctly handles the negative sign
1570    // when negative values are not allowed (e.g., for pH values).
1571    fn test_is_numeric_with_negative_sign_not_allowed() {
1572        assert!(!AtlasScientific::is_numeric(b'-', false));
1573    }
1574
1575    #[test]
1576    // Test case checks if the is_numeric function correctly identifies various
1577    // invalid characters that are not part of a number.
1578    fn test_is_numeric_with_invalid_characters() {
1579        // Test with an alphabet character
1580        assert!(!AtlasScientific::is_numeric(b'a', false));
1581        assert!(!AtlasScientific::is_numeric(b'X', true));
1582
1583        // Test with a special symbol
1584        assert!(!AtlasScientific::is_numeric(b'?', false));
1585        assert!(!AtlasScientific::is_numeric(b'#', true));
1586
1587        // Test with a space
1588        assert!(!AtlasScientific::is_numeric(b' ', false));
1589        assert!(!AtlasScientific::is_numeric(b' ', true));
1590
1591        // Test with a newline character
1592        assert!(!AtlasScientific::is_numeric(b'\n', false));
1593        assert!(!AtlasScientific::is_numeric(b'\n', true));
1594    }
1595
1596    #[test]
1597    // Test case checks if send_request_to_i2c_interface correctly sends a request for pH.
1598    fn test_send_request_to_i2c_interface_ph_success() {
1599        let config: ConfigData =
1600            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
1601        let sleep_time_millis_ph = config.atlas_scientific.sleep_time_millis_ph;
1602        let address_atlas_scientific_ph = config.atlas_scientific.address_atlas_scientific_ph;
1603        let mut atlas_scientific = AtlasScientific::new(config.atlas_scientific);
1604
1605        let (tx_atlas_scientific_to_i2c_interface, mut rx_i2c_interface_from_atlas_scientific): (
1606            AquaSender<I2cRequest>,
1607            AquaReceiver<I2cRequest>,
1608        ) = channel(1);
1609        let (_tx_i2c_interface_to_atlas_scientific, rx_atlas_scientific_from_i2c_interface): (
1610            AquaSender<I2cResult>,
1611            AquaReceiver<I2cResult>,
1612        ) = channel(1);
1613        let (tx_atlas_scientific_to_signal_handler, _rx_signal_handler_from_atlas_scientific): (
1614            AquaSender<bool>,
1615            AquaReceiver<bool>,
1616        ) = channel(1);
1617        let (_tx_signal_handler_to_atlas_scientific, rx_atlas_scientific_from_signal_handler): (
1618            AquaSender<InternalCommand>,
1619            AquaReceiver<InternalCommand>,
1620        ) = channel(1);
1621
1622        let mut atlas_scientific_channels = AtlasScientificChannels {
1623            tx_atlas_scientific_to_i2c_interface,
1624            #[cfg(feature = "debug_channels")]
1625            cnt_tx_atlas_scientific_to_i2c_interface: 0,
1626
1627            rx_atlas_scientific_from_i2c_interface,
1628            #[cfg(feature = "debug_channels")]
1629            cnt_rx_atlas_scientific_from_i2c_interface: 0,
1630
1631            tx_atlas_scientific_to_signal_handler,
1632            #[cfg(feature = "debug_channels")]
1633            cnt_tx_atlas_scientific_to_signal_handler: 0,
1634
1635            rx_atlas_scientific_from_signal_handler,
1636            #[cfg(feature = "debug_channels")]
1637            cnt_rx_atlas_scientific_from_signal_handler: 0,
1638        };
1639
1640        let result = atlas_scientific
1641            .send_request_to_i2c_interface(&AquariumSignal::pH, &mut atlas_scientific_channels);
1642
1643        // Assert that the sending was successful
1644        assert!(result.is_ok());
1645
1646        // Assert that a message was received and it's the correct one
1647        let received_request = rx_i2c_interface_from_atlas_scientific.try_recv().unwrap();
1648        assert_eq!(received_request.i2c_address, address_atlas_scientific_ph);
1649        assert_eq!(received_request.sleep_time_millis, sleep_time_millis_ph);
1650        assert_eq!(
1651            received_request.send_buf[0],
1652            [atlas_scientific_constants::COMMAND_READ][0]
1653        );
1654        assert_eq!(received_request.send_len, 1);
1655        assert_eq!(
1656            received_request.expected_response_length,
1657            atlas_scientific_constants::MESSAGE_SIZE
1658        );
1659    }
1660
1661    #[test]
1662    // Test case checks if send_request_to_i2c_interface correctly sends a request for conductivity.
1663    fn test_send_request_to_i2c_interface_conductivity_success() {
1664        let config: ConfigData =
1665            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
1666        let sleep_time_millis_conductivity = config.atlas_scientific.sleep_time_millis_conductivity;
1667        let address_atlas_scientific_conductivity = config
1668            .atlas_scientific
1669            .address_atlas_scientific_conductivity;
1670        let mut atlas_scientific = AtlasScientific::new(config.atlas_scientific);
1671
1672        let mut channels = Channels::new_for_test();
1673
1674        let result = atlas_scientific.send_request_to_i2c_interface(
1675            &AquariumSignal::Conductivity,
1676            &mut channels.atlas_scientific,
1677        );
1678
1679        // Assert that the sending was successful
1680        assert!(result.is_ok());
1681
1682        // Assert that a message was received and it's the correct one
1683        let received_request = channels
1684            .i2c_interface
1685            .rx_i2c_interface_from_atlas_scientific
1686            .try_recv()
1687            .unwrap();
1688        assert_eq!(
1689            received_request.i2c_address,
1690            address_atlas_scientific_conductivity
1691        );
1692        assert_eq!(
1693            received_request.sleep_time_millis,
1694            sleep_time_millis_conductivity
1695        );
1696        assert_eq!(
1697            received_request.send_buf[0],
1698            [atlas_scientific_constants::COMMAND_READ][0]
1699        );
1700        assert_eq!(
1701            received_request.expected_response_length,
1702            atlas_scientific_constants::MESSAGE_SIZE
1703        );
1704    }
1705
1706    #[test]
1707    // Test case checks if send_request_to_i2c_interface correctly sends a request for water temperature.
1708    fn test_send_request_to_i2c_interface_temperature_success() {
1709        let config: ConfigData =
1710            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
1711        let sleep_time_millis_temperature = config.atlas_scientific.sleep_time_millis_temperature;
1712        let address_atlas_scientific_temperature =
1713            config.atlas_scientific.address_atlas_scientific_temperature;
1714        let mut atlas_scientific = AtlasScientific::new(config.atlas_scientific);
1715
1716        let mut channels = Channels::new_for_test();
1717
1718        let result = atlas_scientific.send_request_to_i2c_interface(
1719            &AquariumSignal::WaterTemperature,
1720            &mut channels.atlas_scientific,
1721        );
1722
1723        // Assert that the sending was successful
1724        assert!(result.is_ok());
1725
1726        // Assert that a message was received and it's the correct one
1727        let received_request = channels
1728            .i2c_interface
1729            .rx_i2c_interface_from_atlas_scientific
1730            .try_recv()
1731            .unwrap();
1732        assert_eq!(
1733            received_request.i2c_address,
1734            address_atlas_scientific_temperature
1735        );
1736        assert_eq!(
1737            received_request.sleep_time_millis,
1738            sleep_time_millis_temperature
1739        );
1740        assert_eq!(
1741            received_request.send_buf[0],
1742            [atlas_scientific_constants::COMMAND_READ][0]
1743        );
1744        assert_eq!(
1745            received_request.expected_response_length,
1746            atlas_scientific_constants::MESSAGE_SIZE
1747        );
1748    }
1749
1750    #[test]
1751    // Test case checks that send_request_to_i2c_interface handles an invalid signal gracefully.
1752    fn test_send_request_to_i2c_interface_invalid_signal() {
1753        let config: ConfigData =
1754            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
1755        let mut atlas_scientific = AtlasScientific::new(config.atlas_scientific);
1756
1757        let mut channels = Channels::new_for_test();
1758
1759        // Use a signal not handled by the function's match arms
1760        let result = atlas_scientific.send_request_to_i2c_interface(
1761            &AquariumSignal::MockInvalidSignal,
1762            &mut channels.atlas_scientific,
1763        );
1764
1765        // Assert that the function returns false, indicating failure
1766        assert!(result.is_err());
1767
1768        // Assert that no message was sent over the channel
1769        assert!(channels
1770            .i2c_interface
1771            .rx_i2c_interface_from_atlas_scientific
1772            .try_recv()
1773            .is_err());
1774    }
1775
1776    #[test]
1777    // Test case checks that send_request_to_i2c_interface handles a disconnected channel.
1778    fn test_send_request_to_i2c_interface_channel_disconnected() {
1779        let config: ConfigData =
1780            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
1781        let mut atlas_scientific = AtlasScientific::new(config.atlas_scientific);
1782
1783        let mut channels = Channels::new_for_test();
1784
1785        // Drop the receiver to simulate a disconnected channel
1786        drop(
1787            channels
1788                .i2c_interface
1789                .rx_i2c_interface_from_atlas_scientific,
1790        );
1791
1792        // Attempt to send a request
1793        let result = atlas_scientific
1794            .send_request_to_i2c_interface(&AquariumSignal::pH, &mut channels.atlas_scientific);
1795
1796        // Assert that the function returns false, indicating failure
1797        assert!(result.is_err());
1798    }
1799
1800    #[test]
1801    // Test case checks if write_result_to_mutex correctly writes a successful
1802    // temperature result to the correct mutex.
1803    fn test_write_result_to_mutex_temperature_success() {
1804        let config: ConfigData =
1805            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
1806        let atlas_scientific = AtlasScientific::new(config.atlas_scientific);
1807
1808        let initial_temp = AtlasScientificResultData::new(0.0);
1809        let initial_ph = AtlasScientificResultData::new(0.0);
1810        let initial_cond = AtlasScientificResultData::new(0.0);
1811
1812        let mutex_temp = Arc::new(Mutex::new(initial_temp.clone()));
1813        let mutex_ph = Arc::new(Mutex::new(initial_ph.clone()));
1814        let mutex_cond = Arc::new(Mutex::new(initial_cond.clone()));
1815
1816        let new_temp_value = 25.5;
1817        let new_temp_result = AtlasScientificResultData::new(new_temp_value);
1818
1819        let before = Instant::now();
1820        let result = atlas_scientific.write_result_to_mutex(
1821            &AquariumSignal::WaterTemperature,
1822            new_temp_result.clone(),
1823            &mutex_temp,
1824            &mutex_ph,
1825            &mutex_cond,
1826        );
1827        let after = Instant::now();
1828
1829        assert!(result.is_ok());
1830        let result_instant = result.unwrap();
1831        assert!(result_instant >= before);
1832        assert!(result_instant <= after);
1833
1834        // Check that the temperature mutex was updated
1835        let temp_lock = mutex_temp.lock().unwrap();
1836        assert_eq!(temp_lock.value, new_temp_value);
1837
1838        // Check that other mutexes were not changed
1839        let ph_lock = mutex_ph.lock().unwrap();
1840        assert_eq!(ph_lock.value, initial_ph.value);
1841        let cond_lock = mutex_cond.lock().unwrap();
1842        assert_eq!(cond_lock.value, initial_cond.value);
1843    }
1844
1845    #[test]
1846    // Test case checks if write_result_to_mutex correctly writes a successful
1847    // pH result to the correct mutex.
1848    fn test_write_result_to_mutex_ph_success() {
1849        let config: ConfigData =
1850            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
1851        let atlas_scientific = AtlasScientific::new(config.atlas_scientific);
1852
1853        let mutex_temp = Arc::new(Mutex::new(AtlasScientificResultData::new(0.0)));
1854        let mutex_ph = Arc::new(Mutex::new(AtlasScientificResultData::new(0.0)));
1855        let mutex_cond = Arc::new(Mutex::new(AtlasScientificResultData::new(0.0)));
1856
1857        let new_ph_value = 8.2;
1858        let new_ph_result = AtlasScientificResultData::new(new_ph_value);
1859
1860        let before = Instant::now();
1861        let result = atlas_scientific.write_result_to_mutex(
1862            &AquariumSignal::pH,
1863            new_ph_result.clone(),
1864            &mutex_temp,
1865            &mutex_ph,
1866            &mutex_cond,
1867        );
1868        let after = Instant::now();
1869
1870        assert!(result.is_ok());
1871        let result_instant = result.unwrap();
1872        assert!(result_instant >= before);
1873        assert!(result_instant <= after);
1874
1875        // Check that the pH mutex was updated
1876        let ph_lock = mutex_ph.lock().unwrap();
1877        assert_eq!(ph_lock.value, new_ph_value);
1878    }
1879
1880    #[test]
1881    // Test case checks if write_result_to_mutex correctly writes a successful
1882    // conductivity result to the correct mutex.
1883    fn test_write_result_to_mutex_conductivity_success() {
1884        let config: ConfigData =
1885            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
1886        let atlas_scientific = AtlasScientific::new(config.atlas_scientific);
1887
1888        let mutex_temp = Arc::new(Mutex::new(AtlasScientificResultData::new(0.0)));
1889        let mutex_ph = Arc::new(Mutex::new(AtlasScientificResultData::new(0.0)));
1890        let mutex_cond = Arc::new(Mutex::new(AtlasScientificResultData::new(0.0)));
1891
1892        let new_cond_value = 50000.0;
1893        let new_cond_result = AtlasScientificResultData::new(new_cond_value);
1894
1895        let before = Instant::now();
1896        let result = atlas_scientific.write_result_to_mutex(
1897            &AquariumSignal::Conductivity,
1898            new_cond_result.clone(),
1899            &mutex_temp,
1900            &mutex_ph,
1901            &mutex_cond,
1902        );
1903        let after = Instant::now();
1904
1905        assert!(result.is_ok());
1906        let result_instant = result.unwrap();
1907        assert!(result_instant >= before);
1908        assert!(result_instant <= after);
1909
1910        // Check that the conductivity mutex was updated
1911        let cond_lock = mutex_cond.lock().unwrap();
1912        assert_eq!(cond_lock.value, new_cond_value);
1913    }
1914
1915    #[test]
1916    // Test case checks that write_result_to_mutex handles an invalid signal
1917    // by doing nothing and not changing any mutex values.
1918    fn test_write_result_to_mutex_invalid_signal() {
1919        let config: ConfigData =
1920            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
1921        let atlas_scientific = AtlasScientific::new(config.atlas_scientific);
1922
1923        let initial_temp = AtlasScientificResultData::new(1.0);
1924        let initial_ph = AtlasScientificResultData::new(2.0);
1925        let initial_cond = AtlasScientificResultData::new(3.0);
1926
1927        let mutex_temp = Arc::new(Mutex::new(initial_temp.clone()));
1928        let mutex_ph = Arc::new(Mutex::new(initial_ph.clone()));
1929        let mutex_cond = Arc::new(Mutex::new(initial_cond.clone()));
1930
1931        let new_result = AtlasScientificResultData::new(99.0);
1932
1933        let before = Instant::now();
1934        let result = atlas_scientific.write_result_to_mutex(
1935            &AquariumSignal::MockInvalidSignal,
1936            new_result,
1937            &mutex_temp,
1938            &mutex_ph,
1939            &mutex_cond,
1940        );
1941        let after = Instant::now();
1942
1943        assert!(result.is_ok());
1944        let result_instant = result.unwrap();
1945        assert!(result_instant >= before);
1946        assert!(result_instant <= after);
1947
1948        // Check that no mutex values were changed
1949        let temp_lock = mutex_temp.lock().unwrap();
1950        assert_eq!(temp_lock.value, initial_temp.value);
1951        let ph_lock = mutex_ph.lock().unwrap();
1952        assert_eq!(ph_lock.value, initial_ph.value);
1953        let cond_lock = mutex_cond.lock().unwrap();
1954        assert_eq!(cond_lock.value, initial_cond.value);
1955    }
1956
1957    #[test]
1958    // Test case checks that write_result_to_mutex correctly returns an error
1959    // when trying to write to a poisoned mutex.
1960    fn test_write_result_to_mutex_poisoned() {
1961        let config: ConfigData =
1962            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
1963        let atlas_scientific = AtlasScientific::new(config.atlas_scientific);
1964
1965        let mutex_temp = Arc::new(Mutex::new(AtlasScientificResultData::new(0.0)));
1966        let mutex_ph = Arc::new(Mutex::new(AtlasScientificResultData::new(0.0)));
1967        let mutex_cond = Arc::new(Mutex::new(AtlasScientificResultData::new(0.0)));
1968
1969        // Poison the pH mutex by panicking while holding the lock in another thread
1970        let mutex_ph_clone = mutex_ph.clone();
1971        let handle = thread::spawn(move || {
1972            let _lock = mutex_ph_clone.lock().unwrap();
1973            panic!("Poisoning the mutex for the test");
1974        });
1975        assert!(handle.join().is_err()); // The join will return an Err because the thread panicked
1976
1977        let new_result = AtlasScientificResultData::new(8.0);
1978
1979        // Attempt to write to the now-poisoned mutex
1980        let result = atlas_scientific.write_result_to_mutex(
1981            &AquariumSignal::pH,
1982            new_result,
1983            &mutex_temp,
1984            &mutex_ph,
1985            &mutex_cond,
1986        );
1987
1988        // Assert that the correct error is returned
1989        assert!(result.is_err());
1990        assert!(matches!(
1991            result,
1992            Err(AtlasScientificError::CouldNotLockMutex { .. })
1993        ));
1994        if let Err(AtlasScientificError::CouldNotLockMutex {
1995            location: _,
1996            signal,
1997        }) = result
1998        {
1999            assert_eq!(signal, AquariumSignal::pH);
2000        }
2001    }
2002
2003    /// Helper function to set up a standard test environment for receive tests.
2004    fn setup_receive_test() -> (
2005        AtlasScientific,
2006        AtlasScientificChannels,
2007        AquaSender<I2cResult>,
2008        Arc<Mutex<AtlasScientificResultData>>,
2009        Arc<Mutex<AtlasScientificResultData>>,
2010        Arc<Mutex<AtlasScientificResultData>>,
2011    ) {
2012        let config: ConfigData =
2013            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
2014        let atlas_scientific = AtlasScientific::new(config.atlas_scientific);
2015
2016        let channels = Channels::new_for_test();
2017
2018        // Initialize mutexes with a known default value (0.0)
2019        let mutex_temp = Arc::new(Mutex::new(AtlasScientificResultData::new(0.0)));
2020        let mutex_ph = Arc::new(Mutex::new(AtlasScientificResultData::new(0.0)));
2021        let mutex_cond = Arc::new(Mutex::new(AtlasScientificResultData::new(0.0)));
2022
2023        (
2024            atlas_scientific,
2025            channels.atlas_scientific,
2026            channels.i2c_interface.tx_i2c_interface_to_atlas_scientific,
2027            mutex_temp,
2028            mutex_ph,
2029            mutex_cond,
2030        )
2031    }
2032
2033    /// Helper function to run a successful receive-and-parse test for a given signal.
2034    fn run_receive_success_test(signal: AquariumSignal, value: f32) {
2035        let (
2036            mut atlas_scientific,
2037            mut atlas_scientific_channels,
2038            mut tx_i2c_interface_to_atlas_scientific,
2039            mutex_temp,
2040            mutex_ph,
2041            mutex_cond,
2042        ) = setup_receive_test();
2043
2044        // Create a valid I2C response buffer.
2045        let mut value_str = value.to_string();
2046        if value_str.len() > 5 {
2047            value_str.truncate(5);
2048        }
2049        while value_str.len() < 6 {
2050            value_str.push('0');
2051        }
2052
2053        let mut buffer = [0u8; 8];
2054        buffer[0] = atlas_scientific_constants::MESSAGE_START_MAGIC;
2055        buffer[1..7].copy_from_slice(value_str.as_bytes());
2056        buffer[7] = atlas_scientific_constants::MESSAGE_FINISH_MAGIC;
2057
2058        let i2c_response = I2cResponse::new(&buffer);
2059
2060        // Send the valid response
2061        tx_i2c_interface_to_atlas_scientific
2062            .send(Ok(i2c_response))
2063            .unwrap();
2064
2065        // Execute the function under test
2066        let result = atlas_scientific.receive_response_from_i2c_interface(
2067            &signal,
2068            &mut atlas_scientific_channels,
2069            &mutex_temp,
2070            &mutex_ph,
2071            &mutex_cond,
2072        );
2073
2074        // Assert the outcome
2075        assert!(
2076            result.is_ok(),
2077            "Test failed for signal {:?} with value {}. Error: {:?}",
2078            signal,
2079            value,
2080            result.err()
2081        );
2082        assert_float_absolute_eq!(result.unwrap().value, value, 0.001);
2083
2084        // Check that the correct mutex was updated and others were not
2085        let (target_mutex, other1, other2) = match signal {
2086            AquariumSignal::WaterTemperature => (&mutex_temp, &mutex_ph, &mutex_cond),
2087            AquariumSignal::pH => (&mutex_ph, &mutex_temp, &mutex_cond),
2088            AquariumSignal::Conductivity => (&mutex_cond, &mutex_temp, &mutex_ph),
2089            _ => panic!("Invalid signal for this test"),
2090        };
2091
2092        let lock = target_mutex.lock().unwrap();
2093        assert_float_absolute_eq!(lock.value, value, 0.001);
2094
2095        let lock1 = other1.lock().unwrap();
2096        assert_float_absolute_eq!(lock1.value, 0.0, 0.001);
2097        let lock2 = other2.lock().unwrap();
2098        assert_float_absolute_eq!(lock2.value, 0.0, 0.001);
2099    }
2100
2101    #[test]
2102    /// Tests the happy path for all three Atlas Scientific signals.
2103    fn test_receive_response_success_all_signals() {
2104        run_receive_success_test(AquariumSignal::WaterTemperature, 25.5);
2105        run_receive_success_test(AquariumSignal::pH, 8.2);
2106        run_receive_success_test(AquariumSignal::Conductivity, 51000.0);
2107    }
2108
2109    #[test]
2110    /// Tests that the function correctly handles an I2C communication failure from the channel.
2111    fn test_receive_response_i2c_failure() {
2112        let (
2113            mut atlas_scientific,
2114            mut atlas_scientific_channels,
2115            mut tx_i2c_interface_to_atlas_scientific,
2116            mutex_temp,
2117            mutex_ph,
2118            mutex_cond,
2119        ) = setup_receive_test();
2120
2121        // Send an I2C error over the channel.
2122        #[cfg(all(target_os = "linux", feature = "target_hw"))]
2123        let simulated_io_error = io::Error::new(
2124            io::ErrorKind::PermissionDenied,
2125            "Simulated hardware permission error",
2126        );
2127
2128        #[cfg(all(target_os = "linux", feature = "target_hw"))]
2129        let mock_rppal_error = rppal::i2c::Error::Io(simulated_io_error);
2130
2131        #[cfg(all(target_os = "linux", feature = "target_hw"))]
2132        let mock_error = I2cError::WriteFailure {
2133            source: mock_rppal_error,
2134        };
2135
2136        #[cfg(not(any(target_os = "linux", feature = "target_hw")))]
2137        let mock_error = I2cError::WriteFailure;
2138
2139        // for testing on Linux server (without target hardware)
2140        #[cfg(all(target_os = "linux", not(feature = "target_hw")))]
2141        let mock_error = I2cError::WriteFailure;
2142
2143        let _ = tx_i2c_interface_to_atlas_scientific
2144            .send(Err(mock_error(module_path!().to_string())))
2145            .unwrap();
2146
2147        let result = atlas_scientific.receive_response_from_i2c_interface(
2148            &AquariumSignal::pH,
2149            &mut atlas_scientific_channels,
2150            &mutex_temp,
2151            &mutex_ph,
2152            &mutex_cond,
2153        );
2154
2155        assert!(result.is_err());
2156        assert!(matches!(
2157            result,
2158            Err(AtlasScientificError::I2cCommunicationFailure { .. })
2159        ));
2160    }
2161
2162    #[test]
2163    /// Tests that the function returns the correct error when the channel is empty.
2164    fn test_receive_response_channel_empty() {
2165        let (
2166            mut atlas_scientific,
2167            mut atlas_scientific_channels,
2168            _tx_i2c_interface_to_atlas_scientific,
2169            mutex_temp,
2170            mutex_ph,
2171            mutex_cond,
2172        ) = setup_receive_test();
2173
2174        let result = atlas_scientific.receive_response_from_i2c_interface(
2175            &AquariumSignal::pH,
2176            &mut atlas_scientific_channels,
2177            &mutex_temp,
2178            &mutex_ph,
2179            &mutex_cond,
2180        );
2181
2182        assert!(matches!(
2183            result,
2184            Err(AtlasScientificError::ChannelIsEmpty(_))
2185        ));
2186    }
2187
2188    #[test]
2189    /// Tests that the function returns the correct error when the channel is disconnected.
2190    fn test_receive_response_channel_disconnected() {
2191        let (
2192            mut atlas_scientific,
2193            mut atlas_scientific_channels,
2194            tx_i2c_interface_to_atlas_scientific,
2195            mutex_temp,
2196            mutex_ph,
2197            mutex_cond,
2198        ) = setup_receive_test();
2199
2200        drop(tx_i2c_interface_to_atlas_scientific); // Disconnect the channel
2201
2202        let result = atlas_scientific.receive_response_from_i2c_interface(
2203            &AquariumSignal::pH,
2204            &mut atlas_scientific_channels,
2205            &mutex_temp,
2206            &mutex_ph,
2207            &mutex_cond,
2208        );
2209
2210        assert!(matches!(
2211            result,
2212            Err(AtlasScientificError::ChannelIsDisconnected { .. })
2213        ));
2214    }
2215
2216    #[test]
2217    /// Tests that a long wait for a mutex lock triggers the internal warning flag.
2218    fn test_receive_response_long_mutex_wait_triggers_warning() {
2219        let (
2220            mut atlas_scientific,
2221            mut atlas_scientific_channels,
2222            mut tx_i2c_interface_to_atlas_scientific,
2223            mutex_temp,
2224            mutex_ph,
2225            mutex_cond,
2226        ) = setup_receive_test();
2227
2228        // Spawn a thread that will hold the lock for a while, simulating a slow consumer.
2229        let mutex_temp_clone = mutex_temp.clone();
2230        let handle = thread::spawn(move || {
2231            let _lock = mutex_temp_clone.lock().unwrap();
2232            let spin_sleeper = SpinSleeper::default();
2233            spin_sleeper.sleep(Duration::from_millis(
2234                atlas_scientific_constants::MAX_MUTEX_ACCESS_DURATION_MILLIS + 15,
2235            ));
2236            // Lock is released when the thread finishes.
2237        });
2238
2239        // Give the spawned thread a moment to acquire the lock.
2240        let spin_sleeper = SpinSleeper::default();
2241        spin_sleeper.sleep(Duration::from_millis(5));
2242
2243        // Send a valid response that will be processed.
2244        let mut read_buf = [0u8; 64];
2245        let message = [1, b'2', b'5', b'.', b'0', b'0', b'0', 0];
2246        read_buf[..message.len()].copy_from_slice(&message);
2247        let i2c_response = I2cResponse {
2248            read_buf,
2249            length: 8,
2250        };
2251        tx_i2c_interface_to_atlas_scientific
2252            .send(Ok(i2c_response))
2253            .unwrap();
2254
2255        // This call will now block until the spawned thread releases the lock.
2256        // The duration of this block will be longer than the threshold.
2257        let result = atlas_scientific.receive_response_from_i2c_interface(
2258            &AquariumSignal::WaterTemperature,
2259            &mut atlas_scientific_channels,
2260            &mutex_temp,
2261            &mutex_ph,
2262            &mutex_cond,
2263        );
2264
2265        // Wait for the spawned thread to complete.
2266        handle.join().unwrap();
2267
2268        // Assert that the operation was ultimately successful.
2269        assert!(result.is_ok());
2270
2271        // Assert that the warning flag was set because of the long wait.
2272        assert!(atlas_scientific.mutex_temperature_access_duration_exceeded);
2273    }
2274
2275    #[test]
2276    /// Tests that the function handles an unsupported signal without altering any state.
2277    fn test_receive_response_with_unsupported_signal() {
2278        let (
2279            mut atlas_scientific,
2280            mut atlas_scientific_channels,
2281            mut tx_i2c_interface_to_atlas_scientific,
2282            mutex_temp,
2283            mutex_ph,
2284            mutex_cond,
2285        ) = setup_receive_test();
2286
2287        // Store initial values of the mutexes.
2288        let initial_temp_val = mutex_temp.lock().unwrap().value;
2289        let initial_ph_val = mutex_ph.lock().unwrap().value;
2290        let initial_cond_val = mutex_cond.lock().unwrap().value;
2291
2292        // Send a valid response.
2293        let mut read_buf = [0u8; 64];
2294        let message = [1, b'9', b'9', b'.', b'0', b'0', b'0', 0];
2295        read_buf[..message.len()].copy_from_slice(&message);
2296        let i2c_response = I2cResponse {
2297            read_buf,
2298            length: 8,
2299        };
2300        tx_i2c_interface_to_atlas_scientific
2301            .send(Ok(i2c_response))
2302            .unwrap();
2303
2304        // Execute the function with a signal that is not handled by `write_result_to_mutex`.
2305        let result = atlas_scientific.receive_response_from_i2c_interface(
2306            &AquariumSignal::MockInvalidSignal,
2307            &mut atlas_scientific_channels,
2308            &mutex_temp,
2309            &mutex_ph,
2310            &mutex_cond,
2311        );
2312
2313        // The function should still "succeed" in processing the message.
2314        assert!(result.is_ok());
2315        assert_float_absolute_eq!(result.unwrap().value, 99.0, 0.001);
2316
2317        // Assert that none of the mutexes were updated.
2318        let final_temp_val = mutex_temp.lock().unwrap().value;
2319        let final_ph_val = mutex_ph.lock().unwrap().value;
2320        let final_cond_val = mutex_cond.lock().unwrap().value;
2321
2322        assert_eq!(initial_temp_val, final_temp_val);
2323        assert_eq!(initial_ph_val, final_ph_val);
2324        assert_eq!(initial_cond_val, final_cond_val);
2325    }
2326
2327    #[test]
2328    #[cfg(all(target_os = "linux", feature = "target_hw"))]
2329    #[ignore]
2330    // read from real Atlas Scientific hardware combined in one test case to avoid race conditions:
2331    // - pH sensor
2332    // - conductivity sensor
2333    // - temperature
2334    pub fn test_atlas_scientific_interface_read_sensors() {
2335        let config: ConfigData =
2336            read_config_file("/config/aquarium_control_test_generic.toml".to_string());
2337        let mut i2c_interface = I2cInterface::new(config.i2c_interface);
2338
2339        let send_buf = [atlas_scientific_constants::COMMAND_READ];
2340        let request_ph = I2cRequest::new(
2341            config.atlas_scientific.address_atlas_scientific_ph,
2342            send_buf.clone(),
2343            config.atlas_scientific.sleep_time_millis_ph,
2344            atlas_scientific_constants::MESSAGE_SIZE,
2345        );
2346        let request_temperature = I2cRequest::new(
2347            config.atlas_scientific.address_atlas_scientific_temperature,
2348            send_buf.clone(),
2349            config.atlas_scientific.sleep_time_millis_temperature,
2350            atlas_scientific_constants::MESSAGE_SIZE,
2351        );
2352        let request_conductivity = I2cRequest::new(
2353            config
2354                .atlas_scientific
2355                .address_atlas_scientific_conductivity,
2356            send_buf,
2357            config.atlas_scientific.sleep_time_millis_conductivity,
2358            atlas_scientific_constants::MESSAGE_SIZE,
2359        );
2360
2361        let response_ph = i2c_interface.get_response_from_i2c(request_ph);
2362        println!("Atlas Scientific response for pH: {:?}", response_ph);
2363
2364        let response_temperature = i2c_interface.get_response_from_i2c(request_temperature);
2365        println!(
2366            "Atlas Scientific response for temperature: {:?}",
2367            response_temperature
2368        );
2369
2370        let response_conductivity = i2c_interface.get_response_from_i2c(request_conductivity);
2371        println!(
2372            "Atlas Scientific response for conductivity: {:?}",
2373            response_conductivity
2374        );
2375
2376        let ph = AtlasScientific::check_response(
2377            response_ph.unwrap().read_buf.try_into().unwrap(),
2378            true,
2379        )
2380        .unwrap();
2381        println!("pH={}", ph);
2382        let temperature = AtlasScientific::check_response(
2383            response_temperature.unwrap().read_buf.try_into().unwrap(),
2384            true,
2385        )
2386        .unwrap();
2387        println!("temperature={}", temperature);
2388        let conductivity = AtlasScientific::check_response(
2389            response_conductivity.unwrap().read_buf.try_into().unwrap(),
2390            true,
2391        )
2392        .unwrap();
2393        println!("conductivity={}", conductivity);
2394    }
2395}