aquarium_control/water/
water_injection.rs

1/* Copyright 2024 Uwe Martin
2
3Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
4
5The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
6
7THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
8*/
9
10use chrono::Local;
11use log::error;
12use spin_sleep::SpinSleeper;
13use std::sync::{Arc, Mutex};
14use std::time::{Duration, Instant};
15
16use crate::database::sql_interface_refill::DatabaseInterfaceRefillTrait;
17use crate::sensors::tank_level_switch::TankLevelSwitchSignals;
18use crate::utilities::channel_content::{AquariumDevice, InternalCommand};
19use crate::utilities::check_mutex_access_duration::CheckMutexAccessDurationTrait;
20use crate::utilities::proc_ext_req::ProcessExternalRequestTrait;
21use crate::water::refill_channels::RefillChannels;
22use crate::water::refill_config::RefillConfig;
23use crate::water::refill_error_states::RefillErrorStates;
24
25/// allow max. 10 milliseconds for mutex to be blocked by any other thread
26const MAX_MUTEX_ACCESS_DURATION_MILLIS: u64 = 10;
27
28/// Trait for the execution of the refill once control has detected that the water level is low.
29/// This trait allows running the main control with a mock implementation for testing.
30pub trait WaterInjectionTrait {
31    fn inject_water(
32        &mut self,
33        refill_channels: &mut RefillChannels,
34        refill_errors: &mut RefillErrorStates,
35        sql_interface_refill: &mut Box<dyn DatabaseInterfaceRefillTrait + Sync + Send>,
36        mutex_tank_level_switch: &Arc<Mutex<TankLevelSwitchSignals>>,
37    ) -> bool;
38}
39
40#[cfg_attr(doc, aquamarine::aquamarine)]
41/// Implementation of the WaterInjectionTrait which contains the code for executing the refill once
42/// the main control has detected that the water level is low.
43/// Thread communication is as follows:
44/// ```mermaid
45/// graph LR
46///     water_injection[Water Injection] --> relay_manager[Relay Manager]
47///     relay_manager --> water_injection
48///     water_injection --> data_logger[Data Logger]
49///     data_logger --> water_injection
50///     tank_level_switch[TankLevelSwitch] -.-> water_injection
51///     signal_handler[Signal Handler] --> water_injection
52/// ```
53pub struct WaterInjection {
54    /// The volume of water (in Liters) injected during a single control loop cycle.
55    /// This value is calculated at initialization from the pump's flow rate and the loop's sleep interval.
56    refill_increment: f64,
57
58    /// The extra volume of water to add after the tank level switch first reports `high`.
59    /// This acts as a hysteresis to prevent the pump from cycling too frequently.
60    additional_refill_volume: f64,
61
62    /// The maximum total volume of water (in Liters) allowed for a single refill operation.
63    /// This serves as a safety cutoff to prevent overfilling if the level switch fails.
64    max_refill_volume: f64,
65
66    /// A high-precision sleeper utility to pause the control loop for a consistent duration.
67    spin_sleeper: SpinSleeper,
68
69    /// The `Duration` for which the control loop sleeps on each iteration (e.g., 100 ms).
70    sleep_duration_hundred_milli_sec: Duration,
71
72    /// The sleep interval of the control loop, expressed in seconds as a floating-point number
73    /// for use in volume calculations.
74    sleep_interval_seconds: f64,
75
76    /// An inhibition flag to prevent flooding the log file with repeated warnings
77    /// about excessive mutex access times.
78    pub lock_warn_max_mutex_access_duration: bool,
79
80    /// A flag used for testing purposes to record if the mutex access time was ever exceeded.
81    /// Unlike the lock flag, this is not reset.
82    #[cfg(test)]
83    pub mutex_access_duration_exceeded: bool,
84
85    /// The maximum permissible duration for a mutex lock to be held before a warning is issued.
86    pub max_mutex_access_duration: Duration,
87}
88
89impl WaterInjection {
90    /// Creates a new `WaterInjection` instance.
91    ///
92    /// This constructor initializes the water injection module, setting up parameters
93    /// crucial for controlling the refill pump during a water injection operation.
94    /// It calculates the `refill_increment` based on pump flow, copies refill limits
95    /// from the configuration, and sets up internal timing mechanisms.
96    /// All internal "lock" flags (used to prevent log flooding) are initialized to `false`.
97    ///
98    /// # Arguments
99    /// * `config` - A reference to the **`RefillConfig`** struct, which contains
100    ///   essential parameters such as `refill_pump_flow`, `additional_refill_volume`,
101    ///   and `max_refill_volume`.
102    ///
103    /// # Returns
104    /// A new **`WaterInjection` struct**, ready to perform water injection operations.
105    pub fn new(config: &RefillConfig) -> WaterInjection {
106        let sleep_interval_millis = 100;
107        let sleep_interval_seconds: f64 = (sleep_interval_millis as f64) / (1000f64);
108
109        WaterInjection {
110            refill_increment: sleep_interval_seconds * config.refill_pump_flow,
111            additional_refill_volume: config.additional_refill_volume,
112            max_refill_volume: config.max_refill_volume,
113            spin_sleeper: SpinSleeper::default(),
114            sleep_duration_hundred_milli_sec: Duration::from_millis(sleep_interval_millis),
115            sleep_interval_seconds,
116            lock_warn_max_mutex_access_duration: false,
117            #[cfg(test)]
118            mutex_access_duration_exceeded: false,
119            max_mutex_access_duration: Duration::from_millis(MAX_MUTEX_ACCESS_DURATION_MILLIS),
120        }
121    }
122}
123
124impl WaterInjectionTrait for WaterInjection {
125    /// Executes the freshwater refill operation, controlling the refill pump and monitoring tank level.
126    ///
127    /// This function orchestrates the physical water injection. It first switches on the refill pump,
128    /// then enters a loop to continuously monitor the tank level switch and check for various
129    /// termination conditions:
130    /// - Water level reaching `high` (plus `additional_refill_volume`).
131    /// - Tank level switch signal becoming `invalid`.
132    /// - Total refilled `volume` exceeding `max_refill_volume` (safety timeout).
133    /// - A `Quit` command being received from the signal handler.
134    ///
135    /// During the refill, it periodically communicates the pump status to the `DataLogger`
136    /// and logs the final refill event details (timestamp, duration, volume, error code) to the database.
137    ///
138    /// # Arguments
139    /// * `refill_channels` - A mutable reference to a struct containing all necessary communication channels with
140    ///   `RelayManager`, `DataLogger`, and `SignalHandler`.
141    /// * `refill_errors` - A mutable reference to a `RefillErrors` struct, where specific error
142    ///   flags (e.g., `error_switch_stuck_low`, `error_sql_update_failed`) will be set if issues occur during injection.
143    /// * `sql_interface_refill` - A mutable reference to the SQL database interface for refill control,
144    ///   used to log the refill event.
145    /// * `mutex_tank_level_switch_signals` - A mutable reference to the `Arc<Mutex<TankLevelSwitchSignals>>`
146    ///   for reading the current tank level switch position and validity.
147    ///
148    /// # Returns
149    /// `true` if the refill operation was **aborted due to a `Quit` command** received from the signal handler;
150    /// `false` otherwise (meaning it completed normally or aborted due to other conditions like level reached or errors).
151    fn inject_water(
152        &mut self,
153        refill_channels: &mut RefillChannels,
154        refill_errors: &mut RefillErrorStates,
155        sql_interface_refill: &mut Box<dyn DatabaseInterfaceRefillTrait + Sync + Send>,
156        mutex_tank_level_switch_signals: &Arc<Mutex<TankLevelSwitchSignals>>,
157    ) -> bool {
158        let mut quit_command_received = false;
159
160        // command start of refill pump
161        match refill_channels
162            .send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::RefillPump))
163        {
164            Ok(_) => {
165                // now listen for the response
166                match refill_channels.receive_from_relay_manager() {
167                    Ok(_) => {}
168                    Err(e) => {
169                        error!(
170                            target: module_path!(),
171                            "receiving answer from relay manager when switching on refill pump failed ({e:?})"
172                        );
173                    }
174                };
175            }
176            Err(e) => {
177                error!(
178                    target: module_path!(),
179                    "channel communication to relay manager when switching on refill pump failed ({e:?})"
180                );
181            }
182        }
183
184        //*** wait for condition to stop refilling ***
185        let mut refill_volume = 0.0;
186        let mut refill_time_seconds = 0.0;
187        let mut refill_error_code = 0;
188        let mut additional_refill_volume_executed = 0.0;
189        let mut tank_level_switch_position = false;
190        let mut tank_level_switch_invalid = false;
191
192        loop {
193            self.spin_sleeper
194                .sleep(self.sleep_duration_hundred_milli_sec);
195            refill_time_seconds += self.sleep_interval_seconds;
196
197            let instant_before_locking_mutex = Instant::now();
198            let mut instant_after_locking_mutex = Instant::now(); // initialization is overwritten
199
200            // access mutex for reading tank level switch signals
201            {
202                match mutex_tank_level_switch_signals.lock() {
203                    Ok(c) => {
204                        tank_level_switch_position = c.tank_level_switch_position;
205                        tank_level_switch_invalid = c.tank_level_switch_invalid;
206                        instant_after_locking_mutex = Instant::now();
207                    }
208                    Err(_) => {
209                        // Do nothing, keep values as they are.
210                    }
211                }
212            }
213
214            // check if access to mutex took too long
215            self.check_mutex_access_duration(
216                None,
217                instant_after_locking_mutex,
218                instant_before_locking_mutex,
219            );
220
221            if tank_level_switch_position
222                && additional_refill_volume_executed > self.additional_refill_volume
223            {
224                // the water level changed to high
225                break;
226            }
227            if tank_level_switch_invalid {
228                // switch signal is invalid
229                refill_error_code = 1;
230                break;
231            }
232
233            // calculate volume refilled in this operation
234            refill_volume += self.refill_increment;
235            if tank_level_switch_position {
236                additional_refill_volume_executed += self.refill_increment;
237            }
238            (quit_command_received, _, _) = self
239                .process_external_request(&mut refill_channels.rx_refill_from_signal_handler, None);
240            if quit_command_received {
241                break;
242            }
243            if refill_volume > self.max_refill_volume {
244                refill_errors.error_switch_stuck_low = true;
245                refill_error_code = 2;
246                break;
247            }
248        }
249
250        // command stop of refill pump
251        match refill_channels
252            .send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::RefillPump))
253        {
254            Ok(_) => {
255                // now listen for the response
256                match refill_channels.receive_from_relay_manager() {
257                    Ok(_) => {}
258                    Err(e) => {
259                        error!(
260                            target: module_path!(),
261                            "receiving answer from relay manager when switching off refill pump failed ({e:?})"
262                        );
263                    }
264                };
265            }
266            Err(e) => {
267                error!(
268                    target: module_path!(),
269                    "channel communication to relay manager when switching off refill pump failed ({e:?})"
270                );
271            }
272        }
273
274        // write the result to the database
275        match sql_interface_refill.insert_refill_event(
276            Local::now().naive_local(),
277            refill_time_seconds,
278            refill_volume,
279            refill_error_code,
280        ) {
281            Ok(_) => {}
282            Err(e) => {
283                error!(
284                    target: module_path!(),
285                    "writing refill operation data to SQL database failed ({e:?})"
286                );
287                refill_errors.error_sql_update_failed = true;
288            }
289        }
290
291        quit_command_received // return information if refill was aborted due to request from signal handler
292    }
293}
294
295#[cfg(test)]
296pub mod tests {
297    use crate::database::sql_interface_refill::SqlInterfaceRefill;
298    use crate::launch::channels::{AquaReceiver, AquaSender, Channels};
299    use crate::mocks::mock_relay_manager::tests::mock_relay_manager;
300    use crate::sensors::tank_level_switch::TankLevelSwitchSignals;
301    use crate::utilities::channel_content::{ActuatorState, AquariumDevice, InternalCommand};
302    use crate::water::refill::tests::prepare_refill_tests;
303    use crate::water::refill::Refill;
304    use crate::water::refill_channels::RefillChannels;
305    use crate::water::water_injection::{WaterInjection, WaterInjectionTrait};
306    use all_asserts::{assert_ge, assert_le};
307    use spin_sleep::SpinSleeper;
308    use std::sync::{Arc, Mutex};
309    use std::{thread, time::Duration};
310
311    // Creates a thread to run `WaterInjection::inject_water` and assert its outcome.
312    //
313    // This private helper function simplifies complex test scenarios for `WaterInjection`.
314    // It spawns a new thread.
315    // 1. It initializes a `WaterInjection` instance using the provided `Refill`'s configuration.
316    // 2. It sets up the `WaterInjectionChannels` by borrowing the necessary `mpsc` channels from the test environment.
317    // 3. It calls `water_injection.inject_water()` with all its required parameters.
318    // 4. It asserts that the `inject_water` function's return value (`result`) matches `target_result`.
319    // 5. It asserts that the `refill_errors.has_error()` status matches `target_refill_has_errors` after the operation.
320    //
321    // # Arguments
322    // * `refill_channels` - Struct containing the signals for communication with other threads.
323    // * `rx_water_injection_from_relay_manager` - Receiver channel for `WaterInjection` to receive acknowledgments from the mock `RelayManager`.
324    // * `rx_water_injection_from_signal_handler` - Receiver channel for `WaterInjection` to receive commands from the mock `SignalHandler`.
325    // * `refill` - The `Refill` struct (moved into this thread) whose `refill_errors` and `sql_interface_refill` will be used and modified by `inject_water`.
326    // * `mutex_tank_level_switch_signals` - The `Arc<Mutex<TankLevelSwitchSignals>>` providing shared access to mock tank level switch signals.
327    // * `target_result` - The expected boolean return value of the `inject_water` call (`true` if `Quit` was received, `false` otherwise).
328    // * `target_refill_has_errors` - The expected boolean result of `refill.refill_errors.has_error()` after `inject_water` completes.
329    //
330    // # Returns
331    // A `thread::JoinHandle<()>` which can be used by the main test thread to wait for this spawned thread's completion.
332    //
333    // # Panics
334    // This function will panic if the new thread cannot be spawned, or if any of the `assert_eq!` calls inside the spawned thread fails.
335    fn create_assert_test_object(
336        mut refill_channels: RefillChannels,
337        mut refill: Refill,
338        sql_interface_refill: SqlInterfaceRefill,
339        mutex_tank_level_switch_signals: Arc<Mutex<TankLevelSwitchSignals>>,
340        target_result: bool,
341        target_refill_has_errors: bool,
342    ) -> thread::JoinHandle<()> {
343        thread::Builder::new()
344            .name("test_object".to_string())
345            .spawn(move || {
346                let mut water_injection = WaterInjection::new(&refill.config);
347
348                let mut boxed_sql_interface_refill: Box<
349                    dyn crate::database::sql_interface_refill::DatabaseInterfaceRefillTrait
350                        + Sync
351                        + Send,
352                > = Box::new(sql_interface_refill);
353
354                let result = water_injection.inject_water(
355                    &mut refill_channels,
356                    &mut refill.refill_errors,
357                    &mut boxed_sql_interface_refill,
358                    &mutex_tank_level_switch_signals,
359                );
360                assert_eq!(result, target_result);
361                assert_eq!(refill.refill_errors.has_error(), target_refill_has_errors);
362            })
363            .unwrap()
364    }
365
366    #[test]
367    // This test case executes the inject_water-function and immediately sets the sensor signal to high value.
368    // Test case uses test database #39.
369    pub fn test_water_injection_immediate_switch_high_no_quit_signal() {
370        let sleep_duration_100_millis = Duration::from_millis(100);
371        let spin_sleeper = SpinSleeper::default();
372
373        let (_, refill, _, sql_interface_refill) = prepare_refill_tests(None, 2, 60, Some(39));
374
375        let mut channels = Channels::new_for_test();
376
377        let mut tx_signal_handler_to_relay_manager =
378            channels.refill.tx_refill_to_relay_manager.clone();
379
380        let mutex_tank_level_switch_signals =
381            Arc::new(Mutex::new(TankLevelSwitchSignals::new(true, true, false)));
382
383        // thread for mock relay manager - includes assertions
384        let join_handle_mock_relay_manager = thread::Builder::new()
385            .name("mock_relay_manager".to_string())
386            .spawn(move || {
387                let (mut actuation_events, mock_actuator_states) = mock_relay_manager(
388                    &mut channels.relay_manager.tx_relay_manager_to_refill,
389                    &mut channels.relay_manager.rx_relay_manager_from_refill,
390                );
391                assert_eq!(actuation_events.len(), 2);
392                let last_actuation_event = actuation_events.pop().unwrap();
393                assert_eq!(
394                    last_actuation_event.command,
395                    InternalCommand::SwitchOff(AquariumDevice::RefillPump)
396                );
397                let first_actuation_event = actuation_events.pop().unwrap();
398                assert_eq!(
399                    first_actuation_event.command,
400                    InternalCommand::SwitchOn(AquariumDevice::RefillPump)
401                );
402                let actuation_duration = last_actuation_event
403                    .time
404                    .duration_since(first_actuation_event.time)
405                    .as_millis();
406                println!("Actuation duration={}", actuation_duration);
407                assert_ge!(actuation_duration, 4000);
408                assert_le!(actuation_duration, 4500);
409                assert_eq!(
410                    mock_actuator_states.refill_pump_actuation_state,
411                    ActuatorState::Off
412                );
413            })
414            .unwrap();
415
416        // thread for controlling duration of test run
417        let join_handle_test_environment = thread::Builder::new()
418            .name("test_environment".to_string())
419            .spawn(move || {
420                let sleep_duration_six_seconds = Duration::from_secs(6);
421                let sleep_duration_two_seconds = Duration::from_secs(2);
422                let spin_sleeper = SpinSleeper::default();
423                spin_sleeper.sleep(sleep_duration_six_seconds);
424                let _ = channels
425                    .signal_handler
426                    .send_to_refill(InternalCommand::Quit);
427                spin_sleeper.sleep(sleep_duration_two_seconds);
428                let _ = tx_signal_handler_to_relay_manager.send(InternalCommand::Quit);
429                let _ = channels
430                    .signal_handler
431                    .send_to_refill(InternalCommand::Quit);
432            })
433            .unwrap();
434
435        // make sure all mock threads are running when instantiating the test object
436        spin_sleeper.sleep(sleep_duration_100_millis);
437
438        // thread for the test object
439        let join_handle_test_object = create_assert_test_object(
440            channels.refill,
441            refill,
442            sql_interface_refill,
443            mutex_tank_level_switch_signals,
444            false,
445            false,
446        );
447
448        join_handle_mock_relay_manager
449            .join()
450            .expect("Mock relay manager thread did not finish.");
451        join_handle_test_environment
452            .join()
453            .expect("Test environment thread did not finish.");
454        join_handle_test_object
455            .join()
456            .expect("Test object thread did not finish.");
457    }
458
459    fn create_mock_relay_manager(
460        mut tx_relay_manager_to_water_injection: AquaSender<bool>,
461        mut rx_relay_manager_from_water_injection: AquaReceiver<InternalCommand>,
462    ) -> thread::JoinHandle<()> {
463        thread::Builder::new()
464            .name("mock_relay_manager".to_string())
465            .spawn(move || {
466                let (mut actuation_events, mock_actuator_states) = mock_relay_manager(
467                    &mut tx_relay_manager_to_water_injection,
468                    &mut rx_relay_manager_from_water_injection,
469                );
470                assert_eq!(actuation_events.len(), 2);
471                let last_actuation_event = actuation_events.pop().unwrap();
472                assert_eq!(
473                    last_actuation_event.command,
474                    InternalCommand::SwitchOff(AquariumDevice::RefillPump)
475                );
476                let first_actuation_event = actuation_events.pop().unwrap();
477                assert_eq!(
478                    first_actuation_event.command,
479                    InternalCommand::SwitchOn(AquariumDevice::RefillPump)
480                );
481                let actuation_duration = last_actuation_event
482                    .time
483                    .duration_since(first_actuation_event.time)
484                    .as_millis();
485                println!("Actuation duration={}", actuation_duration);
486                assert_ge!(actuation_duration, 0);
487                assert_le!(actuation_duration, 200);
488                assert_eq!(
489                    mock_actuator_states.refill_pump_actuation_state,
490                    ActuatorState::Off
491                );
492            })
493            .unwrap()
494    }
495
496    #[test]
497    // This test case executes the inject_water function and immediately sets the sensor signal to high value.
498    // Additionally, the Quit signal is sent.
499    // Expected behavior: immediate stop of additional refill operation.
500    // Test case uses test database #40.
501    pub fn test_water_injection_immediate_switch_high_and_quit_signal() {
502        let sleep_duration_100_millis = Duration::from_millis(100);
503        let spin_sleeper = SpinSleeper::default();
504
505        let (_, refill, _, sql_interface_refill) = prepare_refill_tests(None, 2, 60, Some(40));
506
507        let mut channels = Channels::new_for_test();
508
509        let mut tx_text_environment_to_relay_manager =
510            channels.refill.tx_refill_to_relay_manager.clone();
511
512        let mutex_tank_level_switch_signals =
513            Arc::new(Mutex::new(TankLevelSwitchSignals::new(true, true, false)));
514
515        // thread for mock relay manager - includes assertions
516        let join_handle_mock_relay_manager = create_mock_relay_manager(
517            channels.relay_manager.tx_relay_manager_to_refill,
518            channels.relay_manager.rx_relay_manager_from_refill,
519        );
520
521        // thread for controlling duration of test run
522        let join_handle_test_environment = thread::Builder::new()
523            .name("test_environment".to_string())
524            .spawn(move || {
525                let sleep_duration_two_seconds = Duration::from_secs(1);
526                let spin_sleeper = SpinSleeper::default();
527                let _ = channels
528                    .signal_handler
529                    .send_to_refill(InternalCommand::Quit);
530                println!("Sent quit signal for refill.");
531                spin_sleeper.sleep(sleep_duration_two_seconds);
532                let _ = tx_text_environment_to_relay_manager.send(InternalCommand::Quit);
533                println!("Sent quit signal for relay manager.");
534            })
535            .unwrap();
536
537        // make sure all mock threads are running when instantiating the test object
538        spin_sleeper.sleep(sleep_duration_100_millis);
539
540        // thread for the test object
541        let join_handle_test_object = create_assert_test_object(
542            channels.refill,
543            refill,
544            sql_interface_refill,
545            mutex_tank_level_switch_signals,
546            true,
547            false,
548        );
549
550        join_handle_mock_relay_manager
551            .join()
552            .expect("Mock relay manager thread did not finish.");
553        join_handle_test_environment
554            .join()
555            .expect("Test environment thread did not finish.");
556        join_handle_test_object
557            .join()
558            .expect("Test object thread did not finish.");
559    }
560
561    // Creates a thread that simulates a test environment for `WaterInjection` scenarios.
562    //
563    // This private helper function is used in integration tests to control the test's timeline
564    // and send specific shutdown signals. It simulates a period of normal operation, then
565    // triggers a `Quit` command to the `WaterInjection` module, and finally sends `Quit`
566    // commands to other mock threads to ensure a clean test environment shutdown.
567    //
568    // # Arguments
569    // * `tx_signal_handler_to_water_injection` - The sender channel for this mock environment to send
570    //   commands to the `WaterInjection` test object, simulating the signal handler.
571    // * `tx_signal_handler_to_relay_manager` - The sender channel for this mock environment to send
572    //   commands to the mock `RelayManager` thread.
573    // * `tx_signal_handler_to_data_logger` - The sender channel for this mock environment to send
574    //   commands to the mock `DataLogger` thread.
575    //
576    // # Returns
577    // A `thread::JoinHandle<()>` which can be used by the main test thread to wait for this
578    // mock environment thread's completion.
579    //
580    // # Panics
581    // This function will panic if:
582    // - The new thread cannot be spawned.
583    // - It fails to send any of the `InternalCommand::Quit` messages through the channels.
584    fn create_test_environment(
585        mut tx_signal_handler_to_water_injection: AquaSender<InternalCommand>,
586        mut tx_signal_handler_to_relay_manager: AquaSender<InternalCommand>,
587    ) -> thread::JoinHandle<()> {
588        thread::Builder::new()
589            .name("test_environment".to_string())
590            .spawn(move || {
591                let sleep_duration_two_seconds = Duration::from_secs(2);
592                let spin_sleeper = SpinSleeper::default();
593
594                // wait for the refill operation to inject some water
595                spin_sleeper.sleep(sleep_duration_two_seconds);
596                let _ = tx_signal_handler_to_water_injection.send(InternalCommand::Quit);
597
598                // wait for the test object to terminate
599                spin_sleeper.sleep(sleep_duration_two_seconds);
600
601                // request termination to mock threads
602                let _ = tx_signal_handler_to_relay_manager.send(InternalCommand::Quit);
603            })
604            .unwrap()
605    }
606
607    #[test]
608    // This test case executes the inject_water function and immediately
609    // - sets the sensor signal to high value
610    // - sets the invalid bit.
611    // Water injection shall be aborted without adding further water.
612    // Test case uses test database #41.
613    pub fn test_water_injection_immediate_switch_high_with_invalid_true() {
614        let sleep_duration_100_millis = Duration::from_millis(100);
615        let spin_sleeper = SpinSleeper::default();
616
617        let (_, refill, _, sql_interface_refill) = prepare_refill_tests(None, 2, 60, Some(41));
618
619        let channels = Channels::new_for_test();
620
621        let tx_signal_handler_to_relay_manager = channels.refill.tx_refill_to_relay_manager.clone();
622
623        let mutex_tank_level_switch_signals =
624            Arc::new(Mutex::new(TankLevelSwitchSignals::new(true, true, true)));
625
626        // thread for mock relay manager - includes assertions
627        let join_handle_mock_relay_manager = create_mock_relay_manager(
628            channels.relay_manager.tx_relay_manager_to_refill,
629            channels.relay_manager.rx_relay_manager_from_refill,
630        );
631
632        // thread for controlling duration of test run
633        let join_handle_test_environment = create_test_environment(
634            channels.signal_handler.tx_signal_handler_to_refill,
635            tx_signal_handler_to_relay_manager,
636        );
637
638        // make sure all mock threads are running when instantiating the test object
639        spin_sleeper.sleep(sleep_duration_100_millis);
640
641        // thread for the test object
642        let join_handle_test_object = create_assert_test_object(
643            channels.refill,
644            refill,
645            sql_interface_refill,
646            mutex_tank_level_switch_signals,
647            false,
648            false,
649        );
650
651        join_handle_mock_relay_manager
652            .join()
653            .expect("Mock relay manager thread did not finish.");
654        join_handle_test_environment
655            .join()
656            .expect("Test environment thread did not finish.");
657        join_handle_test_object
658            .join()
659            .expect("Test object thread did not finish.");
660    }
661
662    #[test]
663    // This test case executes the inject_water function and sets the sensor signal
664    // to high after some time has passed (the happy case).
665    // Test case uses test database #42.
666    pub fn test_water_injection_delayed_switch_high() {
667        let sleep_duration_100_millis = Duration::from_millis(100);
668        let spin_sleeper = SpinSleeper::default();
669
670        let (_, refill, _, sql_interface_refill) = prepare_refill_tests(None, 2, 60, Some(42));
671
672        let mut channels = Channels::new_for_test();
673
674        let mut tx_test_environment_to_relay_manager =
675            channels.refill.tx_refill_to_relay_manager.clone();
676
677        let mutex_tank_level_switch_signals =
678            Arc::new(Mutex::new(TankLevelSwitchSignals::new(false, false, false)));
679        let mutex_tank_level_switch_signals_clone_for_test_environment =
680            mutex_tank_level_switch_signals.clone();
681
682        // thread for Mock relay manager - includes assertions
683        let join_handle_mock_relay_manager = thread::Builder::new()
684            .name("mock_relay_manager".to_string())
685            .spawn(move || {
686                let (mut actuation_events, mock_actuator_states) = mock_relay_manager(
687                    &mut channels.relay_manager.tx_relay_manager_to_refill,
688                    &mut channels.relay_manager.rx_relay_manager_from_refill,
689                );
690                assert_eq!(actuation_events.len(), 2);
691                let last_actuation_event = actuation_events.pop().unwrap();
692                assert_eq!(
693                    last_actuation_event.command,
694                    InternalCommand::SwitchOff(AquariumDevice::RefillPump)
695                );
696                let first_actuation_event = actuation_events.pop().unwrap();
697                assert_eq!(
698                    first_actuation_event.command,
699                    InternalCommand::SwitchOn(AquariumDevice::RefillPump)
700                );
701                let actuation_duration = last_actuation_event
702                    .time
703                    .duration_since(first_actuation_event.time)
704                    .as_millis();
705                println!("Actuation duration={}", actuation_duration);
706                assert_ge!(actuation_duration, 5890);
707                assert_le!(actuation_duration, 6250);
708                assert_eq!(
709                    mock_actuator_states.refill_pump_actuation_state,
710                    ActuatorState::Off
711                );
712            })
713            .unwrap();
714
715        // thread for controlling duration of test run
716        let join_handle_test_environment = thread::Builder::new()
717            .name("test_environment".to_string())
718            .spawn(move || {
719                let sleep_duration_ten_seconds = Duration::from_secs(10);
720                let sleep_duration_two_seconds = Duration::from_secs(2);
721                let spin_sleeper = SpinSleeper::default();
722
723                // run with tank level switch in low position
724                spin_sleeper.sleep(sleep_duration_two_seconds);
725
726                // update tank level switch stabilized signal to high
727                {
728                    let mut tank_level_switch_signals =
729                        mutex_tank_level_switch_signals_clone_for_test_environment
730                            .lock()
731                            .unwrap();
732                    tank_level_switch_signals.tank_level_switch_position_stabilized = true;
733                    tank_level_switch_signals.tank_level_switch_position = true;
734                }
735
736                // wait for the refill operation to finish
737                spin_sleeper.sleep(sleep_duration_ten_seconds);
738                let _ = channels
739                    .signal_handler
740                    .send_to_refill(InternalCommand::Quit);
741
742                // wait for inject_water to terminate
743                spin_sleeper.sleep(sleep_duration_two_seconds);
744
745                // request termination to mock threads
746                let _ = tx_test_environment_to_relay_manager.send(InternalCommand::Quit);
747            })
748            .unwrap();
749
750        // make sure all mock threads are running when instantiating the test object
751        spin_sleeper.sleep(sleep_duration_100_millis);
752
753        // thread for the test object
754        let join_handle_test_object = create_assert_test_object(
755            channels.refill,
756            refill,
757            sql_interface_refill,
758            mutex_tank_level_switch_signals,
759            false,
760            false,
761        );
762
763        join_handle_mock_relay_manager
764            .join()
765            .expect("Mock relay manager thread did not finish.");
766        join_handle_test_environment
767            .join()
768            .expect("Test environment thread did not finish.");
769        join_handle_test_object
770            .join()
771            .expect("Test object thread did not finish.");
772    }
773
774    #[test]
775    // This test case executes inject_water function with simulation of level switch stuck at low position.
776    // Test case uses test database #43.
777    pub fn test_water_injection_switch_stuck_low() {
778        let sleep_duration_100_millis = Duration::from_millis(100);
779        let spin_sleeper = SpinSleeper::default();
780
781        let (_, mut refill, _, sql_interface_refill) = prepare_refill_tests(None, 2, 60, Some(43));
782
783        let mut water_injection = WaterInjection::new(&refill.config);
784
785        let mut channels = Channels::new_for_test();
786
787        let mut tx_signal_handler_to_relay_manager =
788            channels.refill.tx_refill_to_relay_manager.clone();
789
790        let mutex_tank_level_switch_signals =
791            Arc::new(Mutex::new(TankLevelSwitchSignals::new(false, false, false)));
792
793        // thread for Mock relay manager - includes assertions
794        let join_handle_mock_relay_manager = thread::Builder::new()
795            .name("mock_relay_manager".to_string())
796            .spawn(move || {
797                let (mut actuation_events, actuator_states) = mock_relay_manager(
798                    &mut channels.relay_manager.tx_relay_manager_to_refill,
799                    &mut channels.relay_manager.rx_relay_manager_from_refill,
800                );
801                assert_eq!(actuation_events.len(), 2);
802                let last_actuation_event = actuation_events.pop().unwrap();
803                assert_eq!(
804                    last_actuation_event.command,
805                    InternalCommand::SwitchOff(AquariumDevice::RefillPump)
806                );
807                let first_actuation_event = actuation_events.pop().unwrap();
808                assert_eq!(
809                    first_actuation_event.command,
810                    InternalCommand::SwitchOn(AquariumDevice::RefillPump)
811                );
812                let actuation_duration = last_actuation_event
813                    .time
814                    .duration_since(first_actuation_event.time)
815                    .as_millis();
816                println!("Actuation duration={}", actuation_duration);
817                assert_ge!(actuation_duration, 40000);
818                assert_le!(actuation_duration, 43100);
819                actuator_states.check_terminal_condition_refill();
820            })
821            .unwrap();
822
823        // thread for controlling duration of test run
824        let join_handle_test_environment = thread::Builder::new()
825            .name("test_environment".to_string())
826            .spawn(move || {
827                let sleep_duration_fourtyfive_seconds = Duration::from_secs(45);
828                let spin_sleeper = SpinSleeper::default();
829
830                // wait for the refill operation to abort
831                spin_sleeper.sleep(sleep_duration_fourtyfive_seconds);
832
833                // request termination to mock threads
834                let _ = tx_signal_handler_to_relay_manager.send(InternalCommand::Quit);
835            })
836            .unwrap();
837
838        // make sure all mock threads are running when instantiating the test object
839        spin_sleeper.sleep(sleep_duration_100_millis);
840
841        // thread for the test object
842        let join_handle_test_object = thread::Builder::new()
843            .name("test_object".to_string())
844            .spawn(move || {
845                let mut boxed_sql_interface_refill: Box<
846                    dyn crate::database::sql_interface_refill::DatabaseInterfaceRefillTrait
847                        + Sync
848                        + Send,
849                > = Box::new(sql_interface_refill);
850
851                let result = water_injection.inject_water(
852                    &mut channels.refill,
853                    &mut refill.refill_errors,
854                    &mut boxed_sql_interface_refill,
855                    &mutex_tank_level_switch_signals,
856                );
857                assert_eq!(result, false);
858                assert_eq!(refill.refill_errors.has_error(), true);
859                assert_eq!(refill.refill_errors.error_switch_stuck_low, true);
860            })
861            .unwrap();
862
863        join_handle_mock_relay_manager
864            .join()
865            .expect("Mock relay manager thread did not finish.");
866        join_handle_test_environment
867            .join()
868            .expect("Test environment thread did not finish.");
869        join_handle_test_object
870            .join()
871            .expect("Test object thread did not finish.");
872    }
873
874    #[test]
875    // This test case executes inject_water function and sends Quit signal
876    // after some time when the water level is still low.
877    // Test case uses test database #44.
878    pub fn test_water_injection_quit_signal() {
879        let sleep_duration_100_millis = Duration::from_millis(100);
880        let spin_sleeper = SpinSleeper::default();
881
882        let (_, mut refill, _, sql_interface_refill) = prepare_refill_tests(None, 10, 60, Some(44));
883        let mut water_injection = WaterInjection::new(&refill.config);
884
885        let mut channels = Channels::new_for_test();
886
887        let tx_signal_handler_to_relay_manager = channels.refill.tx_refill_to_relay_manager.clone();
888
889        let mutex_tank_level_switch_signals =
890            Arc::new(Mutex::new(TankLevelSwitchSignals::new(false, false, false)));
891
892        // thread for Mock relay manager - includes assertions
893        let join_handle_mock_relay_manager = thread::Builder::new()
894            .name("mock_relay_manager".to_string())
895            .spawn(move || {
896                let (mut actuation_events, mock_actuator_states) = mock_relay_manager(
897                    &mut channels.relay_manager.tx_relay_manager_to_refill,
898                    &mut channels.relay_manager.rx_relay_manager_from_refill,
899                );
900                assert_eq!(actuation_events.len(), 2);
901                let last_actuation_event = actuation_events.pop().unwrap();
902                assert_eq!(
903                    last_actuation_event.command,
904                    InternalCommand::SwitchOff(AquariumDevice::RefillPump)
905                );
906                let first_actuation_event = actuation_events.pop().unwrap();
907                assert_eq!(
908                    first_actuation_event.command,
909                    InternalCommand::SwitchOn(AquariumDevice::RefillPump)
910                );
911                let actuation_duration = last_actuation_event
912                    .time
913                    .duration_since(first_actuation_event.time)
914                    .as_millis();
915                println!("Actuation duration={}", actuation_duration);
916                assert_ge!(actuation_duration, 1900);
917                assert_le!(actuation_duration, 2100);
918                assert_eq!(
919                    mock_actuator_states.refill_pump_actuation_state,
920                    ActuatorState::Off
921                );
922            })
923            .unwrap();
924
925        // thread for controlling duration of test run
926        let join_handle_test_environment = create_test_environment(
927            channels.signal_handler.tx_signal_handler_to_refill,
928            tx_signal_handler_to_relay_manager,
929        );
930
931        // make sure all mock threads are running when instantiating the test object
932        spin_sleeper.sleep(sleep_duration_100_millis);
933
934        // thread for the test object
935        let join_handle_test_object = thread::Builder::new()
936            .name("test_object".to_string())
937            .spawn(move || {
938                let mut boxed_sql_interface_refill: Box<
939                    dyn crate::database::sql_interface_refill::DatabaseInterfaceRefillTrait
940                        + Sync
941                        + Send,
942                > = Box::new(sql_interface_refill);
943                let result = water_injection.inject_water(
944                    &mut channels.refill,
945                    &mut refill.refill_errors,
946                    &mut boxed_sql_interface_refill,
947                    &mutex_tank_level_switch_signals,
948                );
949                assert_eq!(result, true);
950                assert_eq!(refill.refill_errors.has_error(), false);
951            })
952            .unwrap();
953
954        join_handle_mock_relay_manager
955            .join()
956            .expect("Mock relay manager thread did not finish.");
957        join_handle_test_environment
958            .join()
959            .expect("Test environment thread did not finish.");
960        join_handle_test_object
961            .join()
962            .expect("Test object thread did not finish.");
963    }
964}