aquarium_control/sensors/
tank_level_switch.rs

1/* Copyright 2024 Uwe Martin
2
3Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
4
5The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
6
7THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
8*/
9#[cfg(feature = "debug_tank_level_switch")]
10use log::debug;
11use std::sync::{Arc, Mutex};
12
13#[cfg(all(target_os = "linux", not(test)))]
14use nix::unistd::gettid;
15
16use log::error;
17#[cfg(all(target_os = "linux", not(test)))]
18use log::info;
19
20#[cfg(not(test))]
21use log::warn;
22
23use crate::launch::channels::{AquaReceiver, AquaSender};
24use crate::sensors::tank_level_switch_channels::TankLevelSwitchChannels;
25use crate::sensors::tank_level_switch_config::TankLevelSwitchConfig;
26use crate::utilities::check_mutex_access_duration::CheckMutexAccessDurationTrait;
27use spin_sleep::SpinSleeper;
28use std::time::{Duration, Instant};
29use thiserror::Error;
30
31cfg_if::cfg_if! {
32    if #[cfg(all(target_os = "linux", feature = "target_hw"))] {
33            use rppal::gpio::{Gpio, InputPin};
34    }
35}
36
37use crate::simulator::get_resp_sim::GetResponseFromSimulatorTrait;
38use crate::simulator::tcp_communication_error::TcpCommunicationError;
39use crate::utilities::acknowledge_signal_handler::AcknowledgeSignalHandlerTrait;
40use crate::utilities::channel_content::{AquariumSignal, InternalCommand};
41use crate::utilities::logger::log_error_chain;
42use crate::utilities::proc_ext_req::ProcessExternalRequestTrait;
43use crate::utilities::wait_for_termination::WaitForTerminationTrait;
44
45/// application is monitoring if execution duration can maintain this limit
46/// sleep time is adjusted accordingly to run at fixed execution time
47const CYCLE_TIME_TANK_LEVEL_SWITCH_MILLIS: u64 = 100;
48
49/// application is adapting the sleep time to achieve constant execution rate of 100 milliseconds.
50/// this is the threshold from which on the cycle time monitoring alerts.
51const PERMISSIBLE_CYCLE_TIME_DEVIATION_TANK_LEVEL_SWITCH_MILLIS: u128 = 5;
52
53/// allow max. 10 milliseconds for mutex to be blocked by any other thread
54const MAX_MUTEX_ACCESS_DURATION_MILLIS: u64 = 10;
55
56/// Contains all signal information from the tank level switch relevant for other threads:
57/// - measured tank level switch position
58/// - stabilized tank level switch position
59/// - flag indicating if the tank level switch position is invalid
60pub struct TankLevelSwitchSignals {
61    /// measured pin state
62    pub tank_level_switch_position: bool,
63
64    /// calculated stabilized pin state
65    pub tank_level_switch_position_stabilized: bool,
66
67    /// flag indicating if the tank level switch position is invalid
68    pub tank_level_switch_invalid: bool,
69}
70
71impl TankLevelSwitchSignals {
72    /// Creates a new `TankLevelSwitchSignals` struct with initial status values.
73    ///
74    /// This constructor sets the initially measured position, the calculated stabilized position,
75    /// and the validity flag of the tank level switch. It's a general-purpose constructor
76    /// for this data-holding struct.
77    ///
78    /// # Arguments
79    /// * `tank_level_switch_position` - The initial measured pin state (`true` for high water, `false` for low).
80    /// * `tank_level_switch_position_stabilized` - The initial calculated stabilized pin state.
81    /// * `tank_level_switch_invalid` - The initial flag indicating if the switch position is considered invalid.
82    ///
83    /// # Returns
84    /// A new `TankLevelSwitchSignals` struct.
85    pub fn new(
86        tank_level_switch_position: bool,
87        tank_level_switch_position_stabilized: bool,
88        tank_level_switch_invalid: bool,
89    ) -> TankLevelSwitchSignals {
90        TankLevelSwitchSignals {
91            tank_level_switch_position,
92            tank_level_switch_position_stabilized,
93            tank_level_switch_invalid,
94        }
95    }
96}
97
98/// Contains error definitions for TankLevelSwitch
99#[derive(Error, Debug)]
100pub enum TankLevelSwitchError {
101    /// Application shall use real HW, but GPIO lib interface was not provided.
102    #[cfg(all(target_os = "linux", feature = "target_hw"))]
103    #[error("[{0}] Application shall use real HW, but GPIO lib interface was not provided.")]
104    GPIOInterfaceNotProvided(String),
105
106    /// Could not get pin from GPIO interface for tank level monitoring.
107    #[cfg(all(target_os = "linux", feature = "target_hw"))]
108    #[error("[{location}] Could not get pin {pin_number} from GPIO interface for tank level monitoring.")]
109    GpioPinRetrievalError {
110        location: String,
111        pin_number: u8,
112
113        #[source]
114        source: rppal::gpio::Error,
115    },
116
117    /// Application shall use simulator, but the channel to TCP thread was not provided.
118    #[error("[{0}] Configured to run with simulator, but no channel to TCP thread provided.")]
119    SimulatorChannelNotProvided(String),
120
121    /// Input pin has not been provided. This is a configuration error.
122    #[allow(unused)] // variant is constructed in platform-specific code and evaluated in generic code.
123    #[error("[{0}] Input pin has not been provided. This is a configuration error.")]
124    InputPinNotProvided(String),
125
126    /// Communication with the simulator failed.
127    #[error("[{location}] Communication with simulator for {signal} failed")]
128    SimulatorCommunicationError {
129        location: String,
130        signal: AquariumSignal,
131
132        #[source]
133        source: TcpCommunicationError,
134    },
135}
136
137/// Contains the measurement of tank level position and calculation of stabilized tank level position signal.
138/// Tank level position is measured by accessing the state of a dedicated GPIO pin.
139/// Alternatively, TCP communication is used when configured to run with the simulator.
140/// The struct holds attributes for the results and for error flags indicating unsuccessful communication.
141#[cfg_attr(doc, aquamarine::aquamarine)]
142/// Thread communication of this component is as follows:
143/// ```mermaid
144/// graph LR
145///     signal_handler[SignalHandler] --> tank_level_switch
146///     tank_level_switch --> signal_handler
147///     tank_level_switch --> tcp_communication[TcpCommunication]
148///     tcp_communication --> tank_level_switch
149/// ```
150/// Communication with the simulator (TcpCommunication) and signal handler uses
151/// channels. Refill control, Heating control and the data logger read from a mutex to which
152/// TankLevelSwitch writes to.
153pub struct TankLevelSwitch {
154    /// configuration of tank level switch calculation
155    config: TankLevelSwitchConfig,
156
157    /// flag indicating if the last measurement of tank level switch indicates high (true) or low (false)
158    tank_level_switch_position: bool,
159
160    /// flag indicating if the stabilized tank level switch indicates high (true) or low (false)
161    tank_level_switch_position_stabilized: bool,
162
163    /// flag indicating that the tank level switch position may be inaccurate
164    tank_level_switch_invalid: bool,
165
166    /// time counter to determine if the position is stabilized
167    tank_level_switch_position_low_stabilization_counter: u32,
168
169    /// inhibition flag to avoid flooding the log file with repeated messages about having received inapplicable command from the signal handler
170    pub lock_warn_inapplicable_command_signal_handler: bool,
171
172    /// inhibition flag to avoid flooding the log file with repeated messages about failure to receive termination signal via the channel
173    pub lock_error_channel_receive_termination: bool,
174
175    /// inhibition flag to avoid flooding the log file with repeated messages about excessive access time to mutex
176    pub lock_warn_max_mutex_access_duration: bool,
177
178    /// inhibition flag to avoid flooding the log file with repeated messages about mutex not being accessible.
179    lock_error_mutex_poisoned: bool,
180
181    // for testing purposes: record when mutex access time is exceeded without resetting it
182    #[cfg(test)]
183    pub mutex_access_duration_exceeded: bool,
184
185    /// inhibition flag to avoid flooding the log file with repeated messages about not having received an input pin
186    lock_error_input_pin_none: bool,
187
188    /// inhibition flag to avoid flooding the log file with repeated messages about simulator communication error
189    lock_error_simulator_communication: bool,
190
191    /// inhibition flag to avoid flooding the log file with repeated messages about channel(s) not being provided
192    lock_error_channel_not_provided: bool,
193
194    #[cfg(all(target_os = "linux", feature = "target_hw"))]
195    // interface to GPIO pin wrapped as optional
196    input_pin_opt: Option<InputPin>,
197
198    /// Maximum permissible access duration for Mutex
199    pub max_mutex_access_duration: Duration,
200}
201
202impl ProcessExternalRequestTrait for TankLevelSwitch {}
203
204impl GetResponseFromSimulatorTrait for TankLevelSwitch {}
205
206impl TankLevelSwitch {
207    #[cfg(any(not(target_os = "linux"), not(feature = "target_hw")))]
208    /// Creates a new **`TankLevelSwitch`** instance for **development or unsupported platforms**.
209    ///
210    /// This constructor is activated when the application is compiled for a non-Linux
211    /// operating system or when the `target_hw` feature is not enabled. In this mode,
212    /// it acts as a **mock or placeholder**, allowing basic testing and compilation
213    /// without requiring actual GPIO hardware. It initializes the sensor position to
214    /// `true` (high water) and the invalid flag to `false` by default, but sets the
215    /// stabilized position based on the provided `initial_position_stabilized` argument.
216    ///
217    /// # Arguments
218    /// * `config` - **Configuration data** for the tank level switch, primarily dictating
219    ///   the `use_simulator` flag and stabilization logic.
220    /// * `initial_position_stabilized` - The **assumed initially stabilized position** of the
221    ///   tank level (`true` for high water, `false` for low) before any actual measurements.
222    ///
223    /// # Returns
224    /// A new **`TankLevelSwitch` struct**, configured for simulated or non-hardware operation, wrapped in Ok().
225    ///
226    /// # Errors
227    /// This specific implementation for non-hardware platforms does not produce any errors
228    /// and always returns `Ok`. The `Result` type is used to maintain signature
229    /// compatibility with the hardware-specific version of this function.
230    pub fn new(
231        config: TankLevelSwitchConfig,
232        initial_position_stabilized: bool,
233    ) -> Result<TankLevelSwitch, TankLevelSwitchError> {
234        Ok(TankLevelSwitch {
235            config,
236            tank_level_switch_position: true, // initialization value assuming no error
237            tank_level_switch_invalid: false, // initialization value assuming no error
238            tank_level_switch_position_stabilized: initial_position_stabilized,
239            tank_level_switch_position_low_stabilization_counter: 0,
240            lock_warn_inapplicable_command_signal_handler: false,
241            lock_error_channel_receive_termination: false,
242            lock_warn_max_mutex_access_duration: false,
243            lock_error_mutex_poisoned: false,
244            #[cfg(test)]
245            mutex_access_duration_exceeded: false,
246            lock_error_input_pin_none: false,
247            lock_error_simulator_communication: false,
248            lock_error_channel_not_provided: false,
249            max_mutex_access_duration: Duration::from_millis(MAX_MUTEX_ACCESS_DURATION_MILLIS),
250        })
251    }
252
253    #[cfg(all(target_os = "linux", feature = "target_hw"))]
254    /// Creates a new `TankLevelSwitch` instance for target hardware platforms.
255    ///
256    /// This constructor initializes the tank level switch monitoring module. If not in
257    /// simulator mode, it establishes access to the physical GPIO pin, configuring it
258    /// as an input with a pull-up resistor.
259    ///
260    /// # Arguments
261    /// * `config` - Configuration data for the tank level switch.
262    /// * `initial_position_stabilized` - The assumed initially stabilized position of the
263    ///   tank level (`true` for high water, `false` for low).
264    /// * `gpio_lib_handle_opt` - An `Option<Gpio>` providing the interface to the GPIO library.
265    ///   This must be `Some(Gpio)` if `config.use_simulator` is `false`.
266    /// * `gpio_tank_level` - The GPIO pin (`u8`) connected to the tank level switch.
267    ///
268    /// # Returns
269    /// A `Result` containing a new `TankLevelSwitch` instance on success.
270    ///
271    /// # Errors
272    /// Returns a `TankLevelSwitchError` if initialization fails:
273    /// - `TankLevelSwitchError::GPIOInterfaceNotProvided`: If `use_simulator` is `false`
274    ///   but no `Gpio` handle is provided.
275    /// - `TankLevelSwitchError::GpioPinRetrievalError`: If the specified GPIO pin cannot be
276    ///   retrieved from the GPIO interface (e.g., invalid pin, permissions issue).
277    pub fn new(
278        config: TankLevelSwitchConfig,
279        initial_position_stabilized: bool,
280        gpio_lib_handle_opt: Option<Gpio>,
281        gpio_tank_level: u8,
282    ) -> Result<TankLevelSwitch, TankLevelSwitchError> {
283        let input_pin_opt = if config.use_simulator {
284            // In simulator mode, we don't need a real pin.
285            None
286        } else {
287            // In hardware mode, the Gpio handle is required.
288            let gpio_lib_handle = gpio_lib_handle_opt.ok_or_else(|| {
289                TankLevelSwitchError::GPIOInterfaceNotProvided(module_path!().to_string())
290            })?;
291
292            // Get the specific pin for tank level monitoring.
293            let tank_level_pin = gpio_lib_handle.get(gpio_tank_level).map_err(|e| {
294                TankLevelSwitchError::GpioPinRetrievalError {
295                    location: module_path!().to_string(),
296                    pin_number: gpio_tank_level,
297                    source: e,
298                }
299            })?;
300            Some(tank_level_pin.into_input_pullup())
301        };
302
303        Ok(TankLevelSwitch {
304            config,
305            tank_level_switch_position: true, // initialization value assuming no error
306            tank_level_switch_invalid: false, // initialization value assuming no error
307            tank_level_switch_position_stabilized: initial_position_stabilized,
308            tank_level_switch_position_low_stabilization_counter: 0,
309            lock_warn_inapplicable_command_signal_handler: false,
310            lock_error_channel_receive_termination: false,
311            lock_warn_max_mutex_access_duration: false,
312            lock_error_mutex_poisoned: false,
313            #[cfg(test)]
314            mutex_access_duration_exceeded: false,
315            lock_error_input_pin_none: false,
316            input_pin_opt,
317            lock_error_channel_not_provided: false,
318            lock_error_simulator_communication: false,
319            max_mutex_access_duration: Duration::from_millis(MAX_MUTEX_ACCESS_DURATION_MILLIS),
320        })
321    }
322
323    /// Updates the measured tank level switch signals from either a GPIO pin or a simulator.
324    ///
325    /// This function determines the source of the tank level switch position and invalid status
326    /// based on the `use_simulator` configuration.
327    ///
328    /// # Arguments
329    /// * `tx_tank_level_switch_to_tcp_opt` - An `Option` containing the sender channel to the TCP simulator.
330    /// * `rx_tank_level_switch_from_tcp_opt` - An `Option` containing the receiver channel from the TCP simulator.
331    ///
332    /// # Returns
333    /// An empty `Result` (`Ok(())`) on success.
334    ///
335    /// # Errors
336    /// Returns a `TankLevelSwitchError` if:
337    /// - `TankLevelSwitchError::SimulatorChannelNotProvided`: If in simulator mode, and the
338    ///   required channels are missing.
339    /// - `TankLevelSwitchError::SimulatorCommunicationError`: If communication with the
340    ///   simulator fails for either signal.
341    /// - `TankLevelSwitchError::InputPinNotProvided`: If in hardware mode but the GPIO
342    ///   input pin is not available.
343    pub fn update_tank_level_switch_signals(
344        &mut self,
345        tx_tank_level_switch_to_tcp_opt: &mut Option<AquaSender<InternalCommand>>,
346        rx_tank_level_switch_from_tcp_opt: &mut Option<
347            AquaReceiver<Result<f32, TcpCommunicationError>>,
348        >,
349    ) -> Result<(), TankLevelSwitchError> {
350        if self.config.use_simulator {
351            // Ensure channels are present before proceeding.
352            let tx_tank_level_switch_to_tcp =
353                tx_tank_level_switch_to_tcp_opt.as_mut().ok_or_else(|| {
354                    TankLevelSwitchError::SimulatorChannelNotProvided(module_path!().to_string())
355                })?;
356            let rx_tank_level_switch_from_tcp =
357                rx_tank_level_switch_from_tcp_opt.as_mut().ok_or_else(|| {
358                    TankLevelSwitchError::SimulatorChannelNotProvided(module_path!().to_string())
359                })?;
360
361            // Fetch the position signal.
362            let position_signal = AquariumSignal::TankLevelSwitchPosition;
363            let position_val = Self::get_response_from_simulator(
364                module_path!().to_string(),
365                tx_tank_level_switch_to_tcp,
366                rx_tank_level_switch_from_tcp,
367                InternalCommand::RequestSignal(position_signal.clone()),
368            )
369            .map_err(|e| TankLevelSwitchError::SimulatorCommunicationError {
370                location: module_path!().to_string(),
371                signal: position_signal,
372                source: e,
373            })?;
374            self.tank_level_switch_position = position_val > 0.0;
375
376            // Fetch the invalid signal.
377            let invalid_signal = AquariumSignal::TankLevelSwitchInvalid;
378            let invalid_val = Self::get_response_from_simulator(
379                module_path!().to_string(),
380                tx_tank_level_switch_to_tcp,
381                rx_tank_level_switch_from_tcp,
382                InternalCommand::RequestSignal(invalid_signal.clone()),
383            )
384            .map_err(|e| TankLevelSwitchError::SimulatorCommunicationError {
385                location: module_path!().to_string(),
386                signal: invalid_signal,
387                source: e,
388            })?;
389            self.tank_level_switch_invalid = invalid_val > 0.0;
390        } else {
391            #[cfg(all(target_os = "linux", feature = "target_hw"))]
392            {
393                // Use `if let` for more idiomatic Option handling.
394                if let Some(pin) = self.input_pin_opt.as_mut() {
395                    self.tank_level_switch_position = pin.is_high();
396                    self.lock_error_input_pin_none = false;
397                } else {
398                    // This case should ideally not be reached if the constructor logic is correct.
399                    return Err(TankLevelSwitchError::InputPinNotProvided(
400                        module_path!().to_string(),
401                    ));
402                }
403            }
404        }
405        Ok(())
406    }
407
408    /// Calculates a stabilized tank level position, applying asymmetric delay and handling invalid inputs.
409    ///
410    /// This private helper updates the `tank_level_switch_position_stabilized` flag.
411    /// It implements the following logic:
412    ///
413    /// - **Invalid Signal**: If `tank_level_switch_invalid` is `true`, the stabilized position is
414    ///   immediately forced to `true` (a safe, high-water state).
415    /// - **High Water Level**: If the raw position is `true`, the stabilized signal is set to `true`
416    ///   immediately, and the low-level stabilization counter is reset.
417    /// - **Low Water Level (with Delay)**: If the raw position is `false`, a counter increments.
418    ///   The stabilized signal only changes to `false` after this counter exceeds the configured
419    ///   threshold, preventing flickering from momentary low readings.
420    fn calc_tank_level_switch_position_stabilized(&mut self) {
421        if self.tank_level_switch_invalid {
422            // go to safe position if the input signal is invalid
423            self.tank_level_switch_position_stabilized = true;
424            // in case the sensor recovers, the counter shall start from zero again
425            self.tank_level_switch_position_low_stabilization_counter = 0;
426        } else {
427            // consider asymmetric delay for calculation
428            self.tank_level_switch_position_stabilized = match self.tank_level_switch_position {
429                true => {
430                    self.tank_level_switch_position_low_stabilization_counter = 0;
431                    true // set the stabilized signal to true (high) without delay
432                }
433                false => {
434                    if self.tank_level_switch_position_low_stabilization_counter
435                        < self
436                            .config
437                            .tank_level_switch_position_low_stabilization_count
438                            * 10
439                    {
440                        // do not change the signal if the counter is not exceeded
441                        self.tank_level_switch_position_low_stabilization_counter = self
442                            .tank_level_switch_position_low_stabilization_counter
443                            .saturating_add(1);
444
445                        #[cfg(feature = "debug_tank_level_switch")]
446                        debug!(
447                            "calc_tank_level_switch_position_stabilized: tank_level_switch_position_low_stabilization_counter = {}, tank_level_switch_position_stabilized={}",
448                             self.tank_level_switch_position_low_stabilization_counter,
449                             self.tank_level_switch_position_stabilized
450                        );
451                        self.tank_level_switch_position_stabilized
452                    } else {
453                        false
454                    }
455                }
456            }
457        }
458    }
459
460    /// Executes the main control loop for the tank level switch module.
461    ///
462    /// This function runs continuously, managing the measurement, stabilization,
463    /// and provision of the tank water level. It periodically updates its state by
464    /// reading from a physical GPIO pin or a simulator and writes the result to a
465    /// shared mutex for access by other threads.
466    ///
467    /// The loop maintains a fixed cycle time and responds to `Quit` commands
468    /// from the signal handler for graceful shutdown.
469    ///
470    /// # Arguments
471    /// * `tank_level_switch_signals` - A mutable reference to the struct containing the channels.
472    /// * `mutex_tank_level_switch_signals` - The shared `Mutex` where the latest sensor
473    ///   signals are written for other modules to consume.
474    ///
475    /// # Returns
476    /// An empty `Result` (`Ok(())`) on successful shutdown.
477    ///
478    /// # Errors
479    /// Returns a `TankLevelSwitchError::SimulatorChannelNotProvided` if the module is
480    /// configured to use the simulator, but the required TCP communication channels
481    /// are not provided at startup.
482    pub fn execute(
483        &mut self,
484        tank_level_switch_channels: &mut TankLevelSwitchChannels,
485        mutex_tank_level_switch_signals: Arc<Mutex<TankLevelSwitchSignals>>,
486    ) -> Result<(), TankLevelSwitchError> {
487        #[cfg(all(target_os = "linux", not(test)))]
488        info!(target: module_path!(), "Thread started with TID: {}", gettid());
489
490        let spin_sleeper = SpinSleeper::default();
491        let mut lock_warn_cycle_time_exceeded: bool = false;
492        let cycle_time_duration = Duration::from_millis(CYCLE_TIME_TANK_LEVEL_SWITCH_MILLIS);
493
494        if (self.config.use_simulator)
495            && (tank_level_switch_channels
496                .tx_tank_level_switch_to_tcp_opt
497                .is_none()
498                | tank_level_switch_channels
499                    .rx_tank_level_switch_from_tcp_opt
500                    .is_none())
501        {
502            return Err(TankLevelSwitchError::SimulatorChannelNotProvided(
503                module_path!().to_string(),
504            ));
505        }
506
507        let mut start_time = Instant::now();
508
509        loop {
510            let (
511                quit_command_received, // a request to end the application has been received
512                _,
513                _,
514            ) = self.process_external_request(
515                &mut tank_level_switch_channels.rx_tank_level_switch_from_signal_handler,
516                None,
517            );
518            if quit_command_received {
519                break;
520            }
521
522            if self.config.active {
523                // either communicate with real HW or with the simulator to get updated signal information
524                if let Err(e) = self.update_tank_level_switch_signals(
525                    &mut tank_level_switch_channels.tx_tank_level_switch_to_tcp_opt,
526                    &mut tank_level_switch_channels.rx_tank_level_switch_from_tcp_opt,
527                ) {
528                    if matches!(e, TankLevelSwitchError::InputPinNotProvided(_)) {
529                        if !self.lock_error_input_pin_none {
530                            log_error_chain(
531                                module_path!(),
532                                "Failed to update tank level switch signals.",
533                                e,
534                            );
535                            self.lock_error_input_pin_none = true;
536                        }
537                    } else if matches!(e, TankLevelSwitchError::SimulatorCommunicationError { .. })
538                    {
539                        if !self.lock_error_simulator_communication {
540                            log_error_chain(
541                                module_path!(),
542                                "Failed to update tank level switch signals.",
543                                e,
544                            );
545                            self.lock_error_simulator_communication = true;
546                        }
547                    } else if matches!(e, TankLevelSwitchError::SimulatorChannelNotProvided(_)) {
548                        if !self.lock_error_channel_not_provided {
549                            log_error_chain(
550                                module_path!(),
551                                "Failed to update tank level switch signals.",
552                                e,
553                            );
554                            self.lock_error_channel_not_provided = true;
555                        }
556                    } else {
557                        // Protection for future error variants
558                        log_error_chain(
559                            module_path!(),
560                            "Failed to update tank level switch signals.",
561                            e,
562                        );
563                    }
564                } else {
565                    self.lock_error_input_pin_none = false;
566                }
567
568                self.calc_tank_level_switch_position_stabilized();
569
570                let instant_before_locking_mutex = Instant::now();
571                let mut instant_after_locking_mutex = Instant::now(); // initialization is overwritten
572
573                // Write calculated signals to mutex
574                {
575                    match mutex_tank_level_switch_signals.lock() {
576                        Ok(mut c) => {
577                            instant_after_locking_mutex = Instant::now();
578                            c.tank_level_switch_position_stabilized =
579                                self.tank_level_switch_position_stabilized;
580                            c.tank_level_switch_invalid = self.tank_level_switch_invalid;
581                            c.tank_level_switch_position = self.tank_level_switch_position;
582                            self.lock_error_mutex_poisoned = false;
583                        }
584                        Err(e) => {
585                            if !self.lock_error_mutex_poisoned {
586                                error!(target: module_path!(), "Could not lock mutex ({e:?}).");
587                            }
588                            self.lock_error_mutex_poisoned = true;
589                        }
590                    }
591                }
592
593                // check if access to mutex took too long
594                self.check_mutex_access_duration(
595                    None,
596                    instant_after_locking_mutex,
597                    instant_before_locking_mutex,
598                );
599            }
600
601            let stop_time = Instant::now();
602            let execution_duration = stop_time.duration_since(start_time);
603            if execution_duration > cycle_time_duration {
604                let delta_duration = execution_duration - cycle_time_duration;
605                if !self.config.use_simulator
606                    && delta_duration.as_millis()
607                        > PERMISSIBLE_CYCLE_TIME_DEVIATION_TANK_LEVEL_SWITCH_MILLIS
608                    && !lock_warn_cycle_time_exceeded
609                {
610                    #[cfg(not(test))]
611                    warn!(target: module_path!(),
612                        "execution duration of {} milliseconds exceeds cycle time of {} (delta duration={})",
613                        execution_duration.as_millis(),
614                        CYCLE_TIME_TANK_LEVEL_SWITCH_MILLIS,
615                        delta_duration.as_millis(),
616                    );
617                    lock_warn_cycle_time_exceeded = true;
618                }
619            } else {
620                let remaining_sleep_time = cycle_time_duration - execution_duration;
621                lock_warn_cycle_time_exceeded = false;
622                // sleep time does not need to be split up because cycle time is 100 milliseconds only
623                spin_sleeper.sleep(remaining_sleep_time);
624            }
625
626            start_time = Instant::now();
627        }
628
629        tank_level_switch_channels.acknowledge_signal_handler();
630
631        // This thread has channel connections to underlying threads (GpioHandler).
632        // Those threads have to stop receiving commands from this thread.
633        // The shutdown sequence is handled by the signal_handler module.
634        let sleep_duration_one_millis = Duration::from_millis(1);
635        self.wait_for_termination(
636            &mut tank_level_switch_channels.rx_tank_level_switch_from_signal_handler,
637            sleep_duration_one_millis,
638            module_path!(),
639        );
640        Ok(())
641    }
642}
643
644#[cfg(test)]
645pub mod tests {
646    use crate::launch::channels::{channel, Channels};
647    use crate::mocks::mock_signal::tests::MockSignal;
648    use crate::mocks::mock_tcp::tests::mock_tcp_for_tank_level_switch;
649    use crate::mocks::test_command::tests::TestCommand;
650    use crate::sensors::tank_level_switch::{TankLevelSwitch, TankLevelSwitchSignals};
651    use crate::utilities::channel_content::InternalCommand;
652    use crate::utilities::config::{read_config_file, ConfigData};
653    use spin_sleep::SpinSleeper;
654    use std::sync::{Arc, Mutex};
655    use std::thread;
656    use std::thread::JoinHandle;
657    use std::time::Duration;
658
659    #[cfg(all(target_os = "linux", feature = "target_hw"))]
660    use crate::sensors::gpio_handler::GpioHandler;
661
662    #[cfg(all(target_os = "linux", feature = "target_hw"))]
663    use crate::relays::actuate_gpio;
664    use crate::sensors::tank_level_switch_channels::TankLevelSwitchChannels;
665
666    #[test]
667    // test if position stabilized signal is updating when the invalid signal is true
668    pub fn test_refill_calc_tank_level_switch_position_stabilized_false_when_invalid() {
669        let mut channels = Channels::new(false, false, true);
670
671        let (_tx_test_environment_to_tcp, mut rx_tcp_from_test_environment) = channel(1);
672
673        let config: ConfigData =
674            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
675        let duration_test_case_secs = config
676            .tank_level_switch
677            .tank_level_switch_position_low_stabilization_count
678            + 1;
679
680        #[cfg(all(target_os = "linux", feature = "target_hw"))]
681        let mut tank_level_switch = TankLevelSwitch::new(
682            config.tank_level_switch,
683            true,
684            None,
685            config.gpio_handler.tank_level_switch,
686        );
687        #[cfg(any(not(target_os = "linux"), not(feature = "target_hw")))]
688        let mut tank_level_switch = TankLevelSwitch::new(config.tank_level_switch, true).unwrap();
689
690        let mutex_tank_level_switch_signals =
691            Arc::new(Mutex::new(TankLevelSwitchSignals::new(false, false, true)));
692
693        // thread for mock tcp
694        let join_handle_mock_tcp = thread::Builder::new()
695            .name("mock_tcp".to_string())
696            .spawn(move || {
697                mock_tcp_for_tank_level_switch(
698                    channels.tcp_communication,
699                    Some(&mut rx_tcp_from_test_environment),
700                    false,
701                    true,
702                );
703            })
704            .unwrap();
705
706        // thread for controlling duration of test run
707        let join_handle_test_environment = thread::Builder::new()
708            .name("test_environment".to_string())
709            .spawn(move || {
710                let sleep_duration = Duration::from_secs(duration_test_case_secs.into());
711                let sleep_duration_thread_processing = Duration::from_millis(50);
712                let spin_sleeper = SpinSleeper::default();
713                spin_sleeper.sleep(sleep_duration);
714                spin_sleeper.sleep(sleep_duration_thread_processing);
715                let _ = channels
716                    .signal_handler
717                    .send_to_tank_level_switch(InternalCommand::Quit);
718                channels
719                    .signal_handler
720                    .receive_from_tank_level_switch()
721                    .unwrap();
722                let _ = channels
723                    .signal_handler
724                    .send_to_tank_level_switch(InternalCommand::Terminate);
725            })
726            .unwrap();
727
728        // thread for the test object
729        let join_handle_test_object = thread::Builder::new()
730            .name("test_object".to_string())
731            .spawn(move || {
732                // clone sender part of channels to mock threads so that we can terminate them after execution of the test object
733                let mut tx_tank_level_switch_to_tcp_for_test_case_finish = channels
734                    .tank_level_switch
735                    .tx_tank_level_switch_to_tcp_opt
736                    .clone()
737                    .unwrap();
738
739                let mutex_tank_level_switch_signals_for_assert =
740                    mutex_tank_level_switch_signals.clone();
741                let _ = tank_level_switch.execute(
742                    &mut channels.tank_level_switch,
743                    mutex_tank_level_switch_signals,
744                );
745
746                // Get the final state after execution.
747                let signals_after_test_run =
748                    mutex_tank_level_switch_signals_for_assert.lock().unwrap();
749                // Assert that the stabilized position is forced to true (safe state) due to invalidity.
750                assert!(signals_after_test_run.tank_level_switch_position_stabilized);
751                // Confirm invalid-flag remained true
752                assert!(signals_after_test_run.tank_level_switch_invalid);
753
754                // send Quit signal to mock threads because the test object has terminated
755                let _ =
756                    tx_tank_level_switch_to_tcp_for_test_case_finish.send(InternalCommand::Quit);
757            })
758            .unwrap();
759
760        join_handle_test_object
761            .join()
762            .expect("Test object thread did not finish.");
763        join_handle_mock_tcp
764            .join()
765            .expect("Mock TCP thread did not finish.");
766        join_handle_test_environment
767            .join()
768            .expect("Test environment thread did not finish.");
769
770        println!("* [TankLevelSwitch] testing if stabilized tank level position is high when invalid bit is true succeeded.");
771    }
772
773    // helper function for creating the test object
774    fn create_test_object(
775        mut tank_level_switch: TankLevelSwitch,
776        mut tank_level_switch_channels: TankLevelSwitchChannels,
777        mutex_tank_level_switch_signals: Arc<Mutex<TankLevelSwitchSignals>>,
778    ) -> JoinHandle<()> {
779        thread::Builder::new()
780            .name("test_object".to_string())
781            .spawn(move || {
782                let _ = tank_level_switch.execute(
783                    &mut tank_level_switch_channels,
784                    mutex_tank_level_switch_signals,
785                );
786            })
787            .unwrap()
788    }
789
790    #[test]
791    // test if the position signal is updating without delay when the invalid signal is false
792    pub fn test_refill_calc_tank_level_switch_position_stabilized_false_true() {
793        let mut channels = Channels::new(false, false, true);
794
795        let (mut tx_test_environment_to_tcp, mut rx_tcp_from_test_environment) = channel(1);
796
797        let mut config: ConfigData =
798            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
799        config.refill.initial_wait_interval = 2;
800        config.refill.check_interval = 60;
801
802        let duration_test_case_part_1: u64 = 3; // three seconds with level switch in low position
803        let duration_test_case_part_2: u64 = 2; // two seconds with level switch in high position
804
805        #[cfg(all(target_os = "linux", feature = "target_hw"))]
806        let mut tank_level_switch = TankLevelSwitch::new(
807            config.tank_level_switch,
808            false,
809            None,
810            config.gpio_handler.tank_level_switch,
811        );
812        #[cfg(any(not(target_os = "linux"), not(feature = "target_hw")))]
813        let tank_level_switch = TankLevelSwitch::new(config.tank_level_switch, false).unwrap();
814        let mutex_tank_level_switch_signals =
815            Arc::new(Mutex::new(TankLevelSwitchSignals::new(true, true, false)));
816
817        // thread for mock tcp
818        let join_handle_mock_tcp = thread::Builder::new()
819            .name("mock_tcp".to_string())
820            .spawn(move || {
821                mock_tcp_for_tank_level_switch(
822                    channels.tcp_communication,
823                    Some(&mut rx_tcp_from_test_environment),
824                    false,
825                    false,
826                );
827            })
828            .unwrap();
829
830        let mut tx_test_environment_to_tcp_for_test_case_finish = channels
831            .tank_level_switch
832            .tx_tank_level_switch_to_tcp_opt
833            .clone()
834            .unwrap();
835
836        // thread for controlling duration of test run
837        let join_handle_test_environment = thread::Builder::new()
838            .name("test_environment".to_string())
839            .spawn(move || {
840                let sleep_duration_part_1 = Duration::from_secs(duration_test_case_part_1.into());
841                let sleep_duration_part_2 = Duration::from_secs(duration_test_case_part_2.into());
842                let sleep_duration_thread_processing = Duration::from_millis(50);
843                let spin_sleeper = SpinSleeper::default();
844                spin_sleeper.sleep(sleep_duration_part_1);
845                let _ = tx_test_environment_to_tcp.send(TestCommand::UpdateSignal(
846                    MockSignal::TankLevelSwitchPosition(true),
847                ));
848                spin_sleeper.sleep(sleep_duration_part_2);
849                let _ = channels
850                    .signal_handler
851                    .send_to_heating(InternalCommand::Quit);
852                let _ = channels
853                    .signal_handler
854                    .send_to_refill(InternalCommand::Quit);
855                spin_sleeper.sleep(sleep_duration_thread_processing);
856                let _ = channels
857                    .signal_handler
858                    .send_to_tank_level_switch(InternalCommand::Quit);
859                channels
860                    .signal_handler
861                    .receive_from_tank_level_switch()
862                    .unwrap();
863                let _ = channels
864                    .signal_handler
865                    .send_to_heating(InternalCommand::Terminate);
866                let _ = channels
867                    .signal_handler
868                    .send_to_refill(InternalCommand::Terminate);
869                let _ = tx_test_environment_to_tcp_for_test_case_finish.send(InternalCommand::Quit);
870                let _ = channels
871                    .signal_handler
872                    .send_to_tank_level_switch(InternalCommand::Terminate);
873            })
874            .unwrap();
875
876        // thread for the test object
877        let join_handle_test_object = create_test_object(
878            tank_level_switch,
879            channels.tank_level_switch,
880            mutex_tank_level_switch_signals,
881        );
882
883        join_handle_test_object
884            .join()
885            .expect("Test object thread did not finish.");
886        join_handle_mock_tcp
887            .join()
888            .expect("Mock TCP thread did not finish.");
889        join_handle_test_environment
890            .join()
891            .expect("Test environment thread did not finish.");
892
893        println!("* [TankLevelSwitch] testing immediate change false -> true for stabilized tank level position when invalid bit is false succeeded");
894    }
895
896    #[test]
897    // test if the position signal is updating with delay when the invalid signal is false
898    pub fn test_refill_calc_tank_level_switch_position_stabilized_true_false() {
899        let mut channels = Channels::new(false, false, true);
900
901        let (mut tx_test_environment_to_tcp, mut rx_tcp_from_test_environment) = channel(1);
902
903        let config: ConfigData =
904            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
905        let duration_test_case_part_1: u64 = 3; // three seconds with level switch in low position
906        let duration_test_case_part_2: u64 = 8; // two seconds with level switch in high position
907        #[cfg(all(target_os = "linux", feature = "target_hw"))]
908        let mut tank_level_switch = TankLevelSwitch::new(
909            config.tank_level_switch,
910            true,
911            None,
912            config.gpio_handler.tank_level_switch,
913        );
914        #[cfg(any(not(target_os = "linux"), not(feature = "target_hw")))]
915        let tank_level_switch = TankLevelSwitch::new(config.tank_level_switch, true).unwrap();
916        let mutex_tank_level_switch_signals =
917            Arc::new(Mutex::new(TankLevelSwitchSignals::new(true, true, false)));
918
919        // thread for mock tcp
920        let join_handle_mock_tcp = thread::Builder::new()
921            .name("mock_tcp".to_string())
922            .spawn(move || {
923                mock_tcp_for_tank_level_switch(
924                    channels.tcp_communication,
925                    Some(&mut rx_tcp_from_test_environment),
926                    true,
927                    false,
928                );
929            })
930            .unwrap();
931
932        let mut tx_test_environment_to_tcp_for_test_case_finish = channels
933            .tank_level_switch
934            .tx_tank_level_switch_to_tcp_opt
935            .clone()
936            .unwrap();
937
938        // thread for controlling duration of test run
939        let join_handle_test_environment = thread::Builder::new()
940            .name("test_environment".to_string())
941            .spawn(move || {
942                let sleep_duration_part_1 = Duration::from_secs(duration_test_case_part_1.into());
943                let sleep_duration_part_2 = Duration::from_secs(duration_test_case_part_2.into());
944                let sleep_duration_thread_processing = Duration::from_millis(50);
945                let spin_sleeper = SpinSleeper::default();
946                spin_sleeper.sleep(sleep_duration_part_1);
947                let _ = tx_test_environment_to_tcp.send(TestCommand::UpdateSignal(
948                    MockSignal::TankLevelSwitchPosition(false),
949                ));
950                spin_sleeper.sleep(sleep_duration_part_2);
951                let _ = channels
952                    .signal_handler
953                    .send_to_heating(InternalCommand::Quit);
954                let _ = channels
955                    .signal_handler
956                    .send_to_refill(InternalCommand::Quit);
957                spin_sleeper.sleep(sleep_duration_thread_processing);
958                let _ = channels
959                    .signal_handler
960                    .send_to_tank_level_switch(InternalCommand::Quit);
961                channels
962                    .signal_handler
963                    .receive_from_tank_level_switch()
964                    .unwrap();
965                let _ = channels
966                    .signal_handler
967                    .send_to_heating(InternalCommand::Terminate);
968                let _ = channels
969                    .signal_handler
970                    .send_to_refill(InternalCommand::Terminate);
971                let _ = tx_test_environment_to_tcp_for_test_case_finish.send(InternalCommand::Quit);
972                let _ = channels
973                    .signal_handler
974                    .send_to_tank_level_switch(InternalCommand::Terminate);
975            })
976            .unwrap();
977
978        // thread for the test object
979        let join_handle_test_object = create_test_object(
980            tank_level_switch,
981            channels.tank_level_switch,
982            mutex_tank_level_switch_signals,
983        );
984
985        join_handle_test_object
986            .join()
987            .expect("Test object thread did not finish.");
988        join_handle_mock_tcp
989            .join()
990            .expect("Mock TCP thread did not finish.");
991        join_handle_test_environment
992            .join()
993            .expect("Test environment thread did not finish.");
994
995        println!("* [TankLevelSwitch] checking delayed change true -> false for stabilized tank level position when invalid bit is false succeeded.");
996    }
997
998    #[test]
999    // test if position stabilized signal is not updating when the raw input signal is toggling
1000    pub fn test_refill_calc_tank_level_switch_position_stabilized_is_true_with_toggling_input() {
1001        let mut channels = Channels::new(false, false, true);
1002        let (mut tx_test_environment_to_tcp, mut rx_tcp_from_test_environment) = channel(1);
1003
1004        let config: ConfigData =
1005            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
1006        let duration_test_case_part: u64 = 2; // two seconds with the level switch before toggling
1007
1008        #[cfg(all(target_os = "linux", feature = "target_hw"))]
1009        let mut tank_level_switch = TankLevelSwitch::new(
1010            config.tank_level_switch,
1011            true,
1012            None,
1013            config.gpio_handler.tank_level_switch,
1014        );
1015        #[cfg(any(not(target_os = "linux"), not(feature = "target_hw")))]
1016        let tank_level_switch = TankLevelSwitch::new(config.tank_level_switch, true).unwrap();
1017        let mutex_tank_level_switch_signals =
1018            Arc::new(Mutex::new(TankLevelSwitchSignals::new(true, true, false)));
1019
1020        // thread for mock tcp
1021        let join_handle_mock_tcp = thread::Builder::new()
1022            .name("mock_tcp".to_string())
1023            .spawn(move || {
1024                mock_tcp_for_tank_level_switch(
1025                    channels.tcp_communication,
1026                    Some(&mut rx_tcp_from_test_environment),
1027                    true,
1028                    false,
1029                );
1030            })
1031            .unwrap();
1032
1033        let mut tx_test_environment_to_tcp_for_test_case_finish = channels
1034            .tank_level_switch
1035            .tx_tank_level_switch_to_tcp_opt
1036            .clone()
1037            .unwrap();
1038
1039        // thread for controlling duration of test run
1040        let join_handle_test_environment = thread::Builder::new()
1041            .name("test_environment".to_string())
1042            .spawn(move || {
1043                let sleep_duration_part = Duration::from_secs(duration_test_case_part.into());
1044                let sleep_duration_thread_processing = Duration::from_millis(50);
1045                let spin_sleeper = SpinSleeper::default();
1046                for _ in 0..2 {
1047                    spin_sleeper.sleep(sleep_duration_part);
1048                    let _ = tx_test_environment_to_tcp.send(TestCommand::UpdateSignal(
1049                        MockSignal::TankLevelSwitchPosition(false),
1050                    ));
1051                    spin_sleeper.sleep(sleep_duration_part);
1052                    let _ = tx_test_environment_to_tcp.send(TestCommand::UpdateSignal(
1053                        MockSignal::TankLevelSwitchPosition(true),
1054                    ));
1055                }
1056                let _ = channels
1057                    .signal_handler
1058                    .send_to_heating(InternalCommand::Quit);
1059                let _ = channels
1060                    .signal_handler
1061                    .send_to_refill(InternalCommand::Quit);
1062                spin_sleeper.sleep(sleep_duration_thread_processing);
1063                let _ = channels
1064                    .signal_handler
1065                    .send_to_tank_level_switch(InternalCommand::Quit);
1066                channels
1067                    .signal_handler
1068                    .receive_from_tank_level_switch()
1069                    .unwrap();
1070                let _ = channels
1071                    .signal_handler
1072                    .send_to_heating(InternalCommand::Terminate);
1073                let _ = channels
1074                    .signal_handler
1075                    .send_to_refill(InternalCommand::Terminate);
1076                let _ = tx_test_environment_to_tcp_for_test_case_finish.send(InternalCommand::Quit);
1077                let _ = channels
1078                    .signal_handler
1079                    .send_to_tank_level_switch(InternalCommand::Terminate);
1080            })
1081            .unwrap();
1082
1083        // thread for the test object
1084        let join_handle_test_object = create_test_object(
1085            tank_level_switch,
1086            channels.tank_level_switch,
1087            mutex_tank_level_switch_signals,
1088        );
1089
1090        join_handle_test_object
1091            .join()
1092            .expect("Test object thread did not finish.");
1093        join_handle_mock_tcp
1094            .join()
1095            .expect("Mock TCP thread did not finish.");
1096        join_handle_test_environment
1097            .join()
1098            .expect("Test environment thread did not finish.");
1099
1100        println!("* [TankLevelSwitch] checking delayed change true -> false for stabilized tank level position when invalid bit is false succeeded.");
1101    }
1102
1103    #[cfg(all(target_os = "linux", feature = "target_hw"))]
1104    fn test_transition_stabilized_true_false(
1105        spin_sleeper: &SpinSleeper,
1106        sleep_duration_part: Duration,
1107        mutex_tank_level_switch_signals: &mut Arc<Mutex<TankLevelSwitchSignals>>,
1108    ) {
1109        for i in 0..20 {
1110            // loop for the first 10 seconds
1111            spin_sleeper.sleep(sleep_duration_part);
1112            {
1113                let tank_level_switch_signals = mutex_tank_level_switch_signals.lock().unwrap();
1114                println!(
1115                    "Tank level switch signals at iteration {}: Position={} Invalid={} Stabilized={}",
1116                    i,
1117                    tank_level_switch_signals.tank_level_switch_position,
1118                    tank_level_switch_signals.tank_level_switch_invalid,
1119                    tank_level_switch_signals.tank_level_switch_position_stabilized
1120                );
1121                if i < 10 {
1122                    assert_eq!(tank_level_switch_signals.tank_level_switch_position, false);
1123                    assert_eq!(
1124                        tank_level_switch_signals.tank_level_switch_position_stabilized,
1125                        true
1126                    );
1127                } else {
1128                    assert_eq!(tank_level_switch_signals.tank_level_switch_position, false);
1129                    assert_eq!(
1130                        tank_level_switch_signals.tank_level_switch_position_stabilized,
1131                        false
1132                    );
1133                }
1134            }
1135        }
1136    }
1137
1138    #[test]
1139    #[ignore]
1140    #[cfg(all(target_os = "linux", feature = "target_hw"))]
1141    // test functionality on real hardware
1142    pub fn test_tank_level_switch_position_on_hardware() {
1143        let mut config: ConfigData =
1144            read_config_file("/config/aquarium_control_test_generic.toml".to_string());
1145        config.gpio_handler.use_simulator = false;
1146        config.tank_level_switch.use_simulator = false;
1147
1148        let (tx_tank_level_switch_to_signal_handler, rx_signal_handler_from_tank_level_switch) =
1149            channel(1);
1150        let (tx_signal_handler_to_tank_level_switch, rx_tank_level_switch_from_signal_handler) =
1151            channel(1);
1152
1153        let tank_level_switch_pin = config.gpio_handler.tank_level_switch;
1154
1155        let gpio_handler = GpioHandler::new(config.gpio_handler).unwrap();
1156
1157        let gpio_lib_handle = gpio_handler.gpio_lib_handle_opt.clone().unwrap();
1158
1159        let mut stimuli_pin =
1160            actuate_gpio::get_output_pin(&gpio_lib_handle, 21, "Tank level switch stimuli");
1161
1162        let mut tank_level_switch = TankLevelSwitch::new(
1163            config.tank_level_switch,
1164            true,
1165            Some(gpio_handler.gpio_lib_handle_opt.unwrap()),
1166            tank_level_switch_pin,
1167        );
1168
1169        let mut mutex_tank_level_switch_signals =
1170            Arc::new(Mutex::new(TankLevelSwitchSignals::new(true, true, false)));
1171        let mutex_tank_level_switch_signals_clone_for_test_object =
1172            mutex_tank_level_switch_signals.clone();
1173
1174        // thread for controlling duration of test run
1175        let join_handle_test_environment = thread::Builder::new()
1176            .name("test_environment".to_string())
1177            .spawn(move || {
1178                let sleep_duration_part = Duration::from_millis(500);
1179                let spin_sleeper = SpinSleeper::default();
1180
1181                test_transition_stabilized_true_false(
1182                    &spin_sleeper,
1183                    sleep_duration_part,
1184                    &mut mutex_tank_level_switch_signals
1185                );
1186
1187                stimuli_pin.set_high();
1188
1189                for i in 0..20 {
1190                    // loop for the second 10 seconds
1191                    spin_sleeper.sleep(sleep_duration_part);
1192                    {
1193                        let tank_level_switch_signals =
1194                            mutex_tank_level_switch_signals.lock().unwrap();
1195                        println!(
1196                            "Tank level switch signals at iteration {}: Position={} Invalid={} Stabilized={}",
1197                            i,
1198                            tank_level_switch_signals.tank_level_switch_position,
1199                            tank_level_switch_signals.tank_level_switch_invalid,
1200                            tank_level_switch_signals.tank_level_switch_position_stabilized
1201                        );
1202                        assert_eq!(tank_level_switch_signals.tank_level_switch_position, true);
1203                        assert_eq!(tank_level_switch_signals.tank_level_switch_position_stabilized, true);
1204                    }
1205                }
1206
1207                stimuli_pin.set_low();
1208
1209                test_transition_stabilized_true_false(
1210                    &spin_sleeper,
1211                    sleep_duration_part,
1212                    &mut mutex_tank_level_switch_signals
1213                );
1214
1215                let _ = tx_signal_handler_to_tank_level_switch.send(InternalCommand::Quit);
1216                rx_signal_handler_from_tank_level_switch.recv().unwrap();
1217                let _ = tx_signal_handler_to_tank_level_switch.send(InternalCommand::Terminate);
1218                spin_sleeper.sleep(sleep_duration_part);
1219            })
1220            .unwrap();
1221
1222        // thread for the test object
1223        let join_handle_test_object = thread::Builder::new()
1224            .name("test_object".to_string())
1225            .spawn(move || {
1226                tank_level_switch.execute(
1227                    &None,
1228                    &None,
1229                    &tx_tank_level_switch_to_signal_handler,
1230                    &rx_tank_level_switch_from_signal_handler,
1231                    mutex_tank_level_switch_signals_clone_for_test_object,
1232                );
1233            })
1234            .unwrap();
1235
1236        join_handle_test_object
1237            .join()
1238            .expect("Test object thread did not finish.");
1239        join_handle_test_environment
1240            .join()
1241            .expect("Test environment thread did not finish.");
1242    }
1243    #[test]
1244    // test if mutex access duration is measured and monitored
1245    pub fn test_refill_calc_tank_level_switch_position_stabilized_false_true_with_mutex_blocked_too_long(
1246    ) {
1247        let mut channels = Channels::new(false, false, true);
1248
1249        let (mut tx_test_environment_to_tcp, mut rx_tcp_from_test_environment) = channel(1);
1250
1251        let mut config: ConfigData =
1252            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
1253        config.refill.initial_wait_interval = 2;
1254        config.refill.check_interval = 60;
1255
1256        let duration_test_case_part_1: u64 = 3; // three seconds with level switch in low position
1257        let duration_test_case_part_2: u64 = 2; // two seconds with level switch in high position
1258
1259        #[cfg(all(target_os = "linux", feature = "target_hw"))]
1260        let mut tank_level_switch = TankLevelSwitch::new(
1261            config.tank_level_switch,
1262            false,
1263            None,
1264            config.gpio_handler.tank_level_switch,
1265        );
1266        #[cfg(any(not(target_os = "linux"), not(feature = "target_hw")))]
1267        let mut tank_level_switch = TankLevelSwitch::new(config.tank_level_switch, false).unwrap();
1268        let mutex_tank_level_switch_signals =
1269            Arc::new(Mutex::new(TankLevelSwitchSignals::new(true, true, false)));
1270        let mutex_tank_level_switch_signals_clone_for_test_environment =
1271            mutex_tank_level_switch_signals.clone();
1272
1273        // thread for mock tcp
1274        let join_handle_mock_tcp = thread::Builder::new()
1275            .name("mock_tcp".to_string())
1276            .spawn(move || {
1277                mock_tcp_for_tank_level_switch(
1278                    channels.tcp_communication,
1279                    Some(&mut rx_tcp_from_test_environment),
1280                    false,
1281                    false,
1282                );
1283            })
1284            .unwrap();
1285
1286        let mut tx_test_environment_to_tcp_for_test_case_finish = channels
1287            .tank_level_switch
1288            .tx_tank_level_switch_to_tcp_opt
1289            .clone()
1290            .unwrap();
1291
1292        // thread for controlling duration of test run
1293        let join_handle_test_environment = thread::Builder::new()
1294            .name("test_environment".to_string())
1295            .spawn(move || {
1296                let sleep_duration_part_1 = Duration::from_secs(duration_test_case_part_1.into());
1297                let sleep_duration_part_2 = Duration::from_secs(duration_test_case_part_2.into());
1298                let sleep_duration_thread_processing = Duration::from_millis(50);
1299                let spin_sleeper = SpinSleeper::default();
1300                spin_sleeper.sleep(sleep_duration_part_1);
1301                let _ = tx_test_environment_to_tcp.send(TestCommand::UpdateSignal(
1302                    MockSignal::TankLevelSwitchPosition(true),
1303                ));
1304                {
1305                    // block the mutex on purpose
1306                    let _unused = mutex_tank_level_switch_signals_clone_for_test_environment.lock();
1307                    spin_sleeper.sleep(sleep_duration_part_2);
1308                }
1309                let _ = channels
1310                    .signal_handler
1311                    .send_to_heating(InternalCommand::Quit);
1312                let _ = channels
1313                    .signal_handler
1314                    .send_to_refill(InternalCommand::Quit);
1315                spin_sleeper.sleep(sleep_duration_thread_processing);
1316                let _ = channels
1317                    .signal_handler
1318                    .send_to_tank_level_switch(InternalCommand::Quit);
1319                channels
1320                    .signal_handler
1321                    .receive_from_tank_level_switch()
1322                    .unwrap();
1323                let _ = channels
1324                    .signal_handler
1325                    .send_to_heating(InternalCommand::Terminate);
1326                let _ = channels
1327                    .signal_handler
1328                    .send_to_refill(InternalCommand::Terminate);
1329                let _ = tx_test_environment_to_tcp_for_test_case_finish.send(InternalCommand::Quit);
1330                let _ = channels
1331                    .signal_handler
1332                    .send_to_tank_level_switch(InternalCommand::Terminate);
1333            })
1334            .unwrap();
1335
1336        // thread for the test object
1337        let join_handle_test_object = thread::Builder::new()
1338            .name("test_object".to_string())
1339            .spawn(move || {
1340                let _ = tank_level_switch.execute(
1341                    &mut channels.tank_level_switch,
1342                    mutex_tank_level_switch_signals,
1343                );
1344
1345                // check if monitoring recorded excessive mutex blocking time
1346                assert_eq!(tank_level_switch.mutex_access_duration_exceeded, true);
1347            })
1348            .unwrap();
1349
1350        join_handle_test_object
1351            .join()
1352            .expect("Test object thread did not finish.");
1353        join_handle_mock_tcp
1354            .join()
1355            .expect("Mock TCP thread did not finish.");
1356        join_handle_test_environment
1357            .join()
1358            .expect("Test environment thread did not finish.");
1359
1360        println!("* [TankLevelSwitch] testing immediate change false -> true for stabilized tank level position when invalid bit is false succeeded while having mutex blocked");
1361    }
1362}