aquarium_control/relays/
relay_manager.rs

1/* Copyright 2024 Uwe Martin
2
3Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
4
5The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
6
7THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
8*/
9#[cfg(not(test))]
10use log::error;
11#[cfg(all(not(test), target_os = "linux"))]
12use log::info;
13
14#[cfg(not(test))]
15use log::warn;
16
17#[cfg(feature = "debug_relay_manager")]
18use log::debug;
19
20#[cfg(all(not(test), target_os = "linux"))]
21use nix::unistd::gettid;
22
23use crate::launch::channels::{AquaChannelError, AquaReceiver, AquaSender};
24use crate::launch::execution_config::ExecutionConfig;
25use crate::relays::relay_error::RelayError;
26use crate::relays::relay_manager_channels::RelayManagerChannels;
27use crate::relays::relay_manager_config::RelayManagerConfig;
28use crate::utilities::channel_content::InternalCommand;
29use crate::utilities::logger::log_error_chain;
30use crate::utilities::proc_ext_req::ProcessExternalRequestTrait;
31use spin_sleep::SpinSleeper;
32use std::time::{Duration, Instant};
33
34/// Trait for the execution of relay actuation on either simulator or real hardware.
35/// This trait allows running the main control with a mock implementation for testing.
36pub trait RelayActuationTrait {
37    /// Actuates a device based on the given command.
38    ///
39    /// This is the primary method for controlling a hardware device. Implementations
40    /// should translate the high-level `InternalCommand` into a hardware-specific
41    /// action (e.g., sending a serial command, setting a GPIO pin).
42    ///
43    /// # Arguments
44    /// * `internal_command` - The command specifying the device and action to perform.
45    ///
46    /// # Returns
47    /// An empty `Result` (`Ok(())`) on successful actuation.
48    ///
49    /// # Errors
50    /// Returns a `RelayError` variant if the actuation fails. This could be due to
51    /// various reasons depending on the implementation, such as
52    /// - A failure to write to a serial port (`WriteError`).
53    /// - A corrupt response from the hardware (`IncorrectChecksum`).
54    /// - An attempt to use an unsupported command for the given hardware.
55    fn actuate(&mut self, internal_command: &InternalCommand) -> Result<(), RelayError>;
56
57    /// Gets the required interval for the heartbeat signal, if any.
58    ///
59    /// Some hardware (like the Controllino) requires a periodic "keep-alive" signal
60    /// to ensure the connection is active. This method defines that interval.
61    ///
62    /// # Returns
63    /// - `Some(u64)`: The heartbeat interval in seconds.
64    /// - `None`: If no heartbeat is required for this actuator (e.g., for direct
65    ///   GPIO control or a simulator).
66    fn get_heartbeat_interval_seconds(&self) -> Option<u64>;
67
68    /// Sends a heartbeat signal to the hardware.
69    ///
70    /// This method should be called periodically according to the interval specified
71    /// by `get_heartbeat_interval_seconds` to maintain the hardware connection.
72    ///
73    /// # Returns
74    /// An empty `Result` (`Ok(())`) if the heartbeat was sent successfully.
75    ///
76    /// # Errors
77    /// Returns a `RelayError` if sending the heartbeat signal fails, for instance,
78    /// due to a disconnected serial port.
79    fn heartbeat(&mut self) -> Result<(), RelayError>;
80
81    /// Flushes the communication buffer of the hardware interface.
82    ///
83    /// This method is a recovery mechanism, typically used after a communication
84    /// error (like an incorrect checksum) to clear any lingering, corrupt data
85    /// from the input/output buffers and resynchronize communication.
86    ///
87    /// # Returns
88    /// An empty `Result` (`Ok(())`) if the buffer was flushed successfully.
89    ///
90    /// # Errors
91    /// Returns a `RelayError` if the flush operation fails, which could indicate
92    /// a deeper issue with the serial port or hardware driver.
93    fn flush_buffer(&mut self) -> Result<(), RelayError>;
94}
95
96#[cfg_attr(doc, aquamarine::aquamarine)]
97/// Contains the configuration and the implementation of the relay manager.
98/// Thread communication of this component is as follows:
99/// ```mermaid
100/// graph LR
101///     relay_manager[Relay manager] --> tcp_communication[TCP communication]
102///     relay_manager --> refill[Refill control]
103///     refill --> relay_manager
104///     relay_manager --> heating[Heating control]
105///     heating --> relay_manager
106///     relay_manager --> balling[Balling dosing control]
107///     balling --> relay_manager
108///     relay_manager --> feed[Feed control]
109///     feed --> relay_manager
110///     relay_manager --> signal_handler[Signal handler]
111///     signal_handler --> relay_manager
112///     relay_manager --> ventilation[Ventilation control]
113///     ventilation --> relay_manager
114/// ```
115pub struct RelayManager {
116    /// configuration data for relay manager
117    config: RelayManagerConfig,
118
119    /// inhibition flag to avoid flooding the log file with repeated messages of failure to send to refill control via the channel
120    lock_error_channel_send_refill: bool,
121
122    /// inhibition flag to avoid flooding the log file with repeated messages of failure to receive from refill control via the channel
123    lock_error_channel_receive_refill: bool,
124
125    /// inhibition flag to avoid flooding the log file with repeated messages of failure to send to feed control via the channel
126    lock_error_channel_send_feed: bool,
127
128    /// inhibition flag to avoid flooding the log file with repeated messages of failure to receive from feed control via the channel
129    lock_error_channel_receive_feed: bool,
130
131    /// inhibition flag to avoid flooding the log file with repeated messages of failure to send to ventilation control via the channel
132    lock_error_channel_send_ventilation: bool,
133
134    /// inhibition flag to avoid flooding the log file with repeated messages of failure to receive from ventilation control via the channel
135    lock_error_channel_receive_ventilation: bool,
136
137    /// inhibition flag to avoid flooding the log file with repeated messages of failure to send to heating control via the channel
138    lock_error_channel_send_heating: bool,
139
140    /// inhibition flag to avoid flooding the log file with repeated messages of failure to receive from heating control via the channel
141    lock_error_channel_receive_heating: bool,
142
143    /// inhibition flag to avoid flooding the log file with repeated messages of failure to send to balling mineral dosing via the channel
144    lock_error_channel_send_balling: bool,
145
146    /// inhibition flag to avoid flooding the log file with repeated messages of failure to receive from balling mineral dosing via the channel
147    lock_error_channel_receive_balling: bool,
148
149    /// information about which threads have been started
150    execution_config: ExecutionConfig,
151}
152
153impl RelayManager {
154    /// Creates a new `RelayManager` instance.
155    ///
156    /// This constructor initializes the relay management module with its configuration.
157    /// It sets up various internal lock flags to `false`, which are used to prevent
158    /// repetitive error and warning messages from flooding the log file during operation,
159    /// especially concerning channel communication and actuation errors.
160    ///
161    /// # Arguments
162    /// * `config` - Configuration data for the relay manager, loaded from a TOML file.
163    ///
164    /// # Returns
165    /// A new `RelayManager` struct, ready to handle relay actuation commands
166    /// and communicate with other modules.    
167    pub fn new(config: RelayManagerConfig, execution_config: ExecutionConfig) -> RelayManager {
168        RelayManager {
169            config,
170            lock_error_channel_send_refill: false,
171            lock_error_channel_receive_refill: false,
172            lock_error_channel_send_heating: false,
173            lock_error_channel_receive_heating: false,
174            lock_error_channel_send_ventilation: false,
175            lock_error_channel_receive_ventilation: false,
176            lock_error_channel_send_balling: false,
177            lock_error_channel_receive_balling: false,
178            lock_error_channel_send_feed: false,
179            lock_error_channel_receive_feed: false,
180            execution_config,
181        }
182    }
183
184    /// Attempts to actuate a given command, with retries on failure.
185    ///
186    /// This helper function encapsulates the retry logic for actuator commands. It
187    /// will attempt the actuation at least once, and if it fails, it will retry
188    /// up to the number of times specified by `self.config.actuation_retries`.
189    /// If a failure is due to an `IncorrectChecksum`, it will attempt to flush the
190    /// buffer before retrying.
191    ///
192    /// # Arguments
193    /// * `actuator` - A mutable reference to an object implementing `RelayActuationTrait`.
194    /// * `command` - The `InternalCommand` to be executed.
195    /// * `actuation_name` - A descriptive name for the action (e.g., "refill pump") for logging.
196    /// * `spin_sleeper` - A `SpinSleeper` to pause between retries.
197    /// * `sleep_duration` - The `Duration` to wait between retry attempts.
198    ///
199    /// # Returns
200    /// An empty `Result` (`Ok(())`) if the actuation succeeds, possibly after one or more retries.
201    ///
202    /// # Errors
203    /// Returns `Err(RelayError::ActuationFailed)` if all attempts (the initial one plus all
204    /// retries) fail. This error is a wrapper that contains detailed context, including
205    /// - The name of the actuation that failed.
206    /// - The total number of attempts made.
207    /// - The specific error from the *last* attempt, providing the root cause of the final failure.
208    fn actuate_with_retries(
209        &mut self,
210        actuator: &mut impl RelayActuationTrait,
211        command: &InternalCommand,
212        actuation_name: &str, // Name for logging, e.g., "refill pump", "heater"
213        spin_sleeper: &mut SpinSleeper,
214        sleep_duration: Duration,
215    ) -> Result<(), RelayError> {
216        let mut attempts = 0;
217        let max_attempts = 1 + self.config.actuation_retries; // 1 initial attempt + `actuation_retries` retries
218        let mut error_to_be_reported = RelayError::InitialValue(module_path!().to_string());
219
220        while attempts < max_attempts {
221            match actuator.actuate(command) {
222                Ok(()) => {
223                    if attempts > 0 {
224                        #[cfg(not(test))]
225                        warn!(
226                            target: module_path!(),
227                            "{actuation_name}: actuation succeeded after {attempts} retries."
228                        );
229                        #[cfg(test)] // to avoid warning when compiling for test
230                        if actuation_name == "not existing name" {
231                            println!("Actuation with strange name: {}", actuation_name);
232                        }
233                    }
234                    return Ok(());
235                }
236                Err(e) => {
237                    if matches!(e, RelayError::IncorrectChecksum(_)) {
238                        if let Err(f) = actuator.flush_buffer() {
239                            // if we cannot flush the buffer, mission abort
240                            error_to_be_reported = f;
241                            break;
242                        }
243                    }
244
245                    // Log error only after the final attempt
246                    if attempts == max_attempts - 1 {
247                        error_to_be_reported = e;
248                        break;
249                    } else {
250                        // Wait a little before retrying
251                        spin_sleeper.sleep(sleep_duration);
252                    }
253                    attempts += 1;
254                }
255            }
256        }
257        Err(RelayError::ActuationFailed {
258            location: module_path!().to_string(),
259            actuation_name: actuation_name.to_string(),
260            last_attempt_number: attempts + 1,
261            max_attempt_allowed: max_attempts,
262            source: Box::new(error_to_be_reported),
263        })
264    }
265
266    /// Executes the main control loop for the Relay Manager.
267    ///
268    /// This function runs continuously, acting as the central dispatcher for actuation commands
269    /// received from various control modules (Refill, Heating, etc.). It uses a helper function,
270    /// `process_actuation_request`, to handle the logic for each module's channel pair,
271    /// keeping the main loop clean and readable.
272    ///
273    /// The loop forwards commands to the provided `actuator` and handles graceful shutdown
274    /// when a `Quit` command is received from the signal handler. It also manages to send
275    /// periodic heartbeats to the hardware if required.
276    ///
277    /// # Arguments
278    /// * `relay_manager_channels` - A mutable reference to the struct containing all `mpsc` channels necessary for
279    ///   communication with other control threads.
280    /// * `actuator` - A mutable reference to an object implementing the `RelayActuationTrait`,
281    ///   which handles the actual hardware interaction or simulation.
282    pub fn execute(
283        &mut self,
284        relay_manager_channels: &mut RelayManagerChannels,
285        actuator: &mut impl RelayActuationTrait,
286    ) {
287        #[cfg(all(target_os = "linux", not(test)))]
288        info!(target: module_path!(), "Thread started with TID: {}", gettid());
289
290        let mut last_heartbeat_time = Instant::now();
291        let mut spin_sleeper = SpinSleeper::default();
292        let sleep_duration_between_retries =
293            Duration::from_millis(self.config.pause_between_retries_millis);
294        let sleep_duration_main_cycle = Duration::from_millis(10);
295
296        loop {
297            // Check for quit command first
298            if self
299                .process_external_request(
300                    &mut relay_manager_channels.rx_relay_manager_from_signal_handler,
301                    None,
302                )
303                .0
304            {
305                #[cfg(feature = "debug_relay_manager")]
306                debug!(target: module_path!(), "received QUIT command");
307                break;
308            }
309
310            if self.execution_config.refill {
311                (
312                    self.lock_error_channel_receive_refill,
313                    self.lock_error_channel_send_refill,
314                ) = self.process_actuation_request(
315                    (
316                        &mut relay_manager_channels.rx_relay_manager_from_refill,
317                        &mut relay_manager_channels.tx_relay_manager_to_refill,
318                    ),
319                    (
320                        self.lock_error_channel_receive_refill,
321                        self.lock_error_channel_send_refill,
322                    ),
323                    "refill",
324                    actuator,
325                    &mut spin_sleeper,
326                    sleep_duration_between_retries,
327                );
328            }
329
330            if self.execution_config.heating {
331                (
332                    self.lock_error_channel_receive_heating,
333                    self.lock_error_channel_send_heating,
334                ) = self.process_actuation_request(
335                    (
336                        &mut relay_manager_channels.rx_relay_manager_from_heating,
337                        &mut relay_manager_channels.tx_relay_manager_to_heating,
338                    ),
339                    (
340                        self.lock_error_channel_receive_heating,
341                        self.lock_error_channel_send_heating,
342                    ),
343                    "heating",
344                    actuator,
345                    &mut spin_sleeper,
346                    sleep_duration_between_retries,
347                );
348            }
349
350            if self.execution_config.ventilation {
351                (
352                    self.lock_error_channel_receive_ventilation,
353                    self.lock_error_channel_send_ventilation,
354                ) = self.process_actuation_request(
355                    (
356                        &mut relay_manager_channels.rx_relay_manager_from_ventilation,
357                        &mut relay_manager_channels.tx_relay_manager_to_ventilation,
358                    ),
359                    (
360                        self.lock_error_channel_receive_ventilation,
361                        self.lock_error_channel_send_ventilation,
362                    ),
363                    "ventilation",
364                    actuator,
365                    &mut spin_sleeper,
366                    sleep_duration_between_retries,
367                );
368            }
369
370            if self.execution_config.balling {
371                (
372                    self.lock_error_channel_receive_balling,
373                    self.lock_error_channel_send_balling,
374                ) = self.process_actuation_request(
375                    (
376                        &mut relay_manager_channels.rx_relay_manager_from_balling,
377                        &mut relay_manager_channels.tx_relay_manager_to_balling,
378                    ),
379                    (
380                        self.lock_error_channel_receive_balling,
381                        self.lock_error_channel_send_balling,
382                    ),
383                    "Balling dosing",
384                    actuator,
385                    &mut spin_sleeper,
386                    sleep_duration_between_retries,
387                );
388            }
389
390            if self.execution_config.feed {
391                (
392                    self.lock_error_channel_receive_feed,
393                    self.lock_error_channel_send_feed,
394                ) = self.process_actuation_request(
395                    (
396                        &mut relay_manager_channels.rx_relay_manager_from_feed,
397                        &mut relay_manager_channels.tx_relay_manager_to_feed,
398                    ),
399                    (
400                        self.lock_error_channel_receive_feed,
401                        self.lock_error_channel_send_feed,
402                    ),
403                    "feed",
404                    actuator,
405                    &mut spin_sleeper,
406                    sleep_duration_between_retries,
407                );
408            }
409
410            // *** send heartbeat ***
411            if let Some(interval_secs) = actuator.get_heartbeat_interval_seconds() {
412                if Instant::now().duration_since(last_heartbeat_time).as_secs() >= interval_secs {
413                    if let Err(e) = actuator.heartbeat() {
414                        log_error_chain(
415                            module_path!(),
416                            "Execution of heartbeat for relay control failed.",
417                            e,
418                        );
419                    }
420                    last_heartbeat_time = Instant::now();
421                }
422            }
423
424            // have a minimum of sleep time in between checking channels
425            spin_sleeper.sleep(sleep_duration_main_cycle);
426        }
427
428        #[cfg(feature = "debug_relay_manager")]
429        debug!(
430            target: module_path!(),
431            "exited loop, sending back confirmation to signal handler"
432        );
433
434        // Application received request to terminate. That is why the loop was left.
435        // Answer to the thread which sent the request for termination, so that shutdown can proceed further.
436        if let Err(_e) = relay_manager_channels
437            .tx_relay_manager_to_signal_handler
438            .send(true)
439        {
440            #[cfg(not(test))]
441            error!(target: module_path!(), "Sending confirmation to signal handler failed. ({_e:?})");
442        };
443    }
444
445    /// Helper function to process an actuation request from a single channel pair.
446    ///
447    /// This function encapsulates the logic for receiving a command, actuating it with retries,
448    /// and sending a confirmation response. It is used to de-duplicate the main `execute` loop.
449    ///
450    /// It takes the current state of the log-suppression flags (`lock_receive`, `lock_send`)
451    /// by value and returns their updated state in a tuple. This pattern is used to work
452    /// around Rust's borrowing rules, as the flags are fields on `RelayManager` and cannot
453    /// be mutably borrowed multiple times in the `execute` loop.
454    ///
455    /// # Arguments
456    /// * `rx` - The channel for receiving `InternalCommand`s from a control module.
457    /// * `tx` - The channel for sending a boolean confirmation back to the control module.
458    /// * `lock_receive` - The current state of the receive-error lock flag.
459    /// * `lock_send` - The current state of the send error lock flag.
460    /// * `actuation_name` - A descriptive name for the action (e.g., "refill") for logging.
461    /// * `actuator` - A mutable reference to the hardware actuator.
462    /// * `spin_sleeper` - A `SpinSleeper` to pause between retries.
463    /// * `sleep_duration_between_retries` - The `Duration` to wait between retry attempts.
464    ///
465    /// # Returns
466    /// A tuple `(bool, bool)` containing the updated state of the `lock_receive` and
467    /// `lock_send` flags, respectively.
468    fn process_actuation_request(
469        &mut self,
470        channels: (&mut AquaReceiver<InternalCommand>, &mut AquaSender<bool>),
471        locks: (bool, bool),
472        actuation_name: &str,
473        actuator: &mut impl RelayActuationTrait,
474        spin_sleeper: &mut SpinSleeper,
475        sleep_duration_between_retries: Duration,
476    ) -> (bool, bool) {
477        let (rx, tx) = channels;
478        let (mut lock_receive, mut lock_send) = locks;
479        match rx.try_recv() {
480            Ok(_command) => {
481                lock_receive = false;
482                if self.config.active {
483                    if let Err(_actuation_error) = self.actuate_with_retries(
484                        actuator,
485                        &_command,
486                        actuation_name,
487                        spin_sleeper,
488                        sleep_duration_between_retries,
489                    ) {
490                        #[cfg(not(test))]
491                        log_error_chain(
492                            module_path!(),
493                            &format!("Actuation of {actuation_name} failed."),
494                            _actuation_error,
495                        );
496                    }
497                }
498                if let Err(_e) = tx.send(true) {
499                    if !lock_send {
500                        #[cfg(not(test))]
501                        error!(
502                            target: module_path!(),
503                            "responding to {actuation_name} control ({_e:?})",
504                        );
505                        lock_send = true;
506                    }
507                } else {
508                    lock_send = false;
509                }
510            }
511            Err(e) => match e {
512                #[cfg(feature = "debug_channels")]
513                AquaChannelError::Full => { /* Not applicable. Do nothing */ }
514                AquaChannelError::Empty => {
515                    lock_receive = false;
516                }
517                AquaChannelError::Disconnected => {
518                    if !lock_receive {
519                        #[cfg(not(test))]
520                        error!(
521                            "{}: error when trying to receive command from {actuation_name} control ({e:?})",
522                            module_path!(),
523                        );
524                        lock_receive = true;
525                    }
526                }
527            },
528        }
529        (lock_receive, lock_send)
530    }
531}
532
533#[cfg(test)]
534pub mod tests {
535    use crate::launch::channels::Channels;
536    use crate::launch::execution_config::ExecutionConfig;
537    use crate::relays::relay_manager::{RelayActuationTrait, RelayError, RelayManager};
538    use crate::utilities::channel_content::{AquariumDevice, InternalCommand};
539    use crate::utilities::config::{read_config_file, ConfigData};
540    use spin_sleep::SpinSleeper;
541    use std::thread::scope;
542    use std::time::Duration;
543
544    // Contains mock trait implementation for unit testing purposes.
545    pub struct ActuateVoid {
546        // execution counter for testing purposes
547        execution_counter: u64,
548
549        // heartbeat counter for testing purposes
550        heartbeat_counter: u64,
551
552        // flush counter for testing purposes
553        flush_counter: u64,
554
555        // injection of error: if not None, this value is used one time by actuate and then set to None
556        onetime_error_result: Option<RelayError>,
557
558        // injection of error: if not None, this value is used one time by actuate and then set to None
559        permanent_error_result: Option<RelayError>,
560    }
561
562    impl RelayActuationTrait for ActuateVoid {
563        // Mock function for testing
564        fn actuate(&mut self, internal_command: &InternalCommand) -> Result<(), RelayError> {
565            println!(
566                "Received {} - increasing execution counter (currently at {})",
567                internal_command, self.execution_counter
568            );
569            self.execution_counter += 1;
570            if self.onetime_error_result.is_some() {
571                // call will replace error value with "None" - inducing Ok-result for next call
572                // of actuate
573                Err(self.onetime_error_result.take().unwrap())
574            } else {
575                if self.permanent_error_result.is_some() {
576                    // Workaround: RelayError does not implement the Clone trait.
577                    // WriteError is the only error variant used by the test cases.
578                    Err(RelayError::WriteError(module_path!().to_string()))
579                } else {
580                    Ok(())
581                }
582            }
583        }
584
585        fn get_heartbeat_interval_seconds(&self) -> Option<u64> {
586            Some(1)
587        }
588
589        fn heartbeat(&mut self) -> Result<(), RelayError> {
590            self.heartbeat_counter += 1;
591            Ok(())
592        }
593
594        fn flush_buffer(&mut self) -> Result<(), RelayError> {
595            self.flush_counter += 1;
596            Ok(())
597        }
598    }
599
600    impl ActuateVoid {
601        // provides the number of executions to the test case for assertion
602        pub fn get_execution_count(&self) -> u64 {
603            self.execution_counter
604        }
605    }
606
607    #[test]
608    // Test case tests the channel communication for the .execute function of relay manager.
609    // It also checks the heartbeat communication.
610    pub fn test_relay_manager_channel_communication() {
611        let config: ConfigData =
612            read_config_file("/config/aquarium_control_test_simulator.toml".to_string()).unwrap();
613
614        let spin_sleeper = SpinSleeper::default();
615        let sleep_duration = Duration::from_secs(2);
616
617        let mut relay_manager = RelayManager::new(config.relay_manager, ExecutionConfig::default());
618
619        let mut channels = Channels::new_for_test();
620
621        scope(|scope| {
622            // thread for test environment
623            scope.spawn(move || {
624                let _ = channels.refill.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::RefillPump));
625                match channels.refill.receive_from_relay_manager() {
626                    Ok(c) => {
627                        assert_eq!(c, true);
628                    },
629                    Err(e) => {
630                        panic!("test_relay_manager_channel_communication: error when receiving answer for refill thread: {e:?}");
631                    }
632                }
633                let _ = channels.heating.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::Heater));
634                match channels.heating.receive_from_relay_manager() {
635                    Ok(c) => {
636                        assert_eq!(c, true);
637                    },
638                    Err(e) => {
639                        panic!("test_relay_manager_channel_communication: error when receiving answer for heating thread: {e:?}");
640                    }
641                }
642                let _ = channels.ventilation.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::Ventilation));
643                match channels.ventilation.receive_from_relay_manager() {
644                    Ok(c) => {
645                        assert_eq!(c, true);
646                    },
647                    Err(e) => {
648                        panic!("test_relay_manager_channel_communication: error when receiving answer for ventilation thread: {e:?}");
649                    }
650                }
651                let _ = channels.balling.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::PeristalticPump1));
652                match channels.balling.receive_from_relay_manager() {
653                    Ok(c) => {
654                        assert_eq!(c, true);
655                    },
656                    Err(e) => {
657                        panic!("test_relay_manager_channel_communication: error when receiving answer for Balling dosing thread: {e:?}");
658                    }
659                }
660                let _ = channels.balling.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::PeristalticPump2));
661                match channels.balling.receive_from_relay_manager() {
662                    Ok(c) => {
663                        assert_eq!(c, true);
664                    },
665                    Err(e) => {
666                        panic!("test_relay_manager_channel_communication: error when receiving answer for Balling dosing thread: {e:?}");
667                    }
668                }
669                let _ = channels.balling.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::PeristalticPump3));
670                match channels.balling.receive_from_relay_manager() {
671                    Ok(c) => {
672                        assert_eq!(c, true);
673                    },
674                    Err(e) => {
675                        panic!("test_relay_manager_channel_communication: error when receiving answer for Balling dosing thread: {e:?}");
676                    }
677                }
678                let _ = channels.balling.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::PeristalticPump4));
679                match channels.balling.receive_from_relay_manager() {
680                    Ok(c) => {
681                        assert_eq!(c, true);
682                    },
683                    Err(e) => {
684                        panic!("test_relay_manager_channel_communication: error when receiving answer for Balling dosing thread: {e:?}");
685                    }
686                }
687                let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::Feeder));
688                match channels.feed.receive_from_relay_manager() {
689                    Ok(c) => {
690                        assert_eq!(c, true);
691                    },
692                    Err(e) => {
693                        panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
694                    }
695                }
696                let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::MainPump1));
697                match channels.feed.receive_from_relay_manager() {
698                    Ok(c) => {
699                        assert_eq!(c, true);
700                    },
701                    Err(e) => {
702                        panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
703                    }
704                }
705                let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::MainPump2));
706                match channels.feed.receive_from_relay_manager() {
707                    Ok(c) => {
708                        assert_eq!(c, true);
709                    },
710                    Err(e) => {
711                        panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
712                    }
713                }
714                let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::AuxPump1));
715                match channels.feed.receive_from_relay_manager() {
716                    Ok(c) => {
717                        assert_eq!(c, true);
718                    },
719                    Err(e) => {
720                        panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
721                    }
722                }
723                let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::AuxPump2));
724                match channels.feed.receive_from_relay_manager() {
725                    Ok(c) => {
726                        assert_eq!(c, true);
727                    },
728                    Err(e) => {
729                        panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
730                    }
731                }
732                let _ = channels.refill.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::RefillPump));
733                match channels.refill.receive_from_relay_manager() {
734                    Ok(c) => {
735                        assert_eq!(c, true);
736                    },
737                    Err(e) => {
738                        panic!("test_relay_manager_channel_communication: error when receiving answer for refill thread: {e:?}");
739                    }
740                }
741                let _ = channels.heating.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::Heater));
742                match channels.heating.receive_from_relay_manager() {
743                    Ok(c) => {
744                        assert_eq!(c, true);
745                    },
746                    Err(e) => {
747                        panic!("test_relay_manager_channel_communication: error when receiving answer for heating thread: {e:?}");
748                    }
749                }
750                let _ = channels.ventilation.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::Ventilation));
751                match channels.ventilation.receive_from_relay_manager() {
752                    Ok(c) => {
753                        assert_eq!(c, true);
754                    },
755                    Err(e) => {
756                        panic!("test_relay_manager_channel_communication: error when receiving answer for ventilation thread: {e:?}");
757                    }
758                }
759                let _ = channels.balling.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::PeristalticPump1));
760                match channels.balling.receive_from_relay_manager() {
761                    Ok(c) => {
762                        assert_eq!(c, true);
763                    },
764                    Err(e) => {
765                        panic!("test_relay_manager_channel_communication: error when receiving answer for Balling dosing thread: {e:?}");
766                    }
767                }
768                let _ = channels.balling.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::PeristalticPump2));
769                match channels.balling.receive_from_relay_manager() {
770                    Ok(c) => {
771                        assert_eq!(c, true);
772                    },
773                    Err(e) => {
774                        panic!("test_relay_manager_channel_communication: error when receiving answer for Balling dosing thread: {e:?}");
775                    }
776                }
777                let _ = channels.balling.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::PeristalticPump3));
778                match channels.balling.receive_from_relay_manager() {
779                    Ok(c) => {
780                        assert_eq!(c, true);
781                    },
782                    Err(e) => {
783                        panic!("test_relay_manager_channel_communication: error when receiving answer for Balling dosing thread: {e:?}");
784                    }
785                }
786                let _ = channels.balling.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::PeristalticPump4));
787                match channels.balling.receive_from_relay_manager() {
788                    Ok(c) => {
789                        assert_eq!(c, true);
790                    },
791                    Err(e) => {
792                        panic!("test_relay_manager_channel_communication: error when receiving answer for Balling dosing thread: {e:?}");
793                    }
794                }
795                let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::Feeder));
796                match channels.feed.receive_from_relay_manager() {
797                    Ok(c) => {
798                        assert_eq!(c, true);
799                    },
800                    Err(e) => {
801                        panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
802                    }
803                }
804                let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::MainPump1));
805                match channels.feed.receive_from_relay_manager() {
806                    Ok(c) => {
807                        assert_eq!(c, true);
808                    },
809                    Err(e) => {
810                        panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
811                    }
812                }
813                let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::MainPump2));
814                match channels.feed.receive_from_relay_manager() {
815                    Ok(c) => {
816                        assert_eq!(c, true);
817                    },
818                    Err(e) => {
819                        panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
820                    }
821                }
822                let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::AuxPump1));
823                match channels.feed.receive_from_relay_manager() {
824                    Ok(c) => {
825                        assert_eq!(c, true);
826                    },
827                    Err(e) => {
828                        panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
829                    }
830                }
831                let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::AuxPump2));
832                match channels.feed.receive_from_relay_manager() {
833                    Ok(c) => {
834                        assert_eq!(c, true);
835                    },
836                    Err(e) => {
837                        panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
838                    }
839                }
840
841                spin_sleeper.sleep(sleep_duration); // wait for two seconds to test the heartbeat
842
843                let _ = channels.signal_handler.send_to_relay_manager(InternalCommand::Quit);
844                channels.signal_handler.receive_from_relay_manager().unwrap();
845            });
846
847            // thread for the test object
848            scope.spawn(move || {
849                let mut mock_actuator = ActuateVoid {
850                    execution_counter: 0,
851                    heartbeat_counter: 0,
852                    flush_counter: 0,
853                    onetime_error_result: None,
854                    permanent_error_result: None,
855                };
856
857                relay_manager.execute(&mut channels.relay_manager, &mut mock_actuator);
858
859                // Test object thread has ended.
860                assert_eq!(mock_actuator.get_execution_count(), 24); //Assertion checks the number of calls to the actuator.
861                assert_eq!(mock_actuator.heartbeat_counter, 2); // Assertion checks the number of heartbeats.
862            });
863        });
864    }
865    #[test]
866    // Test case tests if the relay manager is calling the flush functionality when the
867    // actuator returns the IncorrectChecksum error.
868    pub fn test_relay_manager_flush() {
869        let config: ConfigData =
870            read_config_file("/config/aquarium_control_test_simulator.toml".to_string()).unwrap();
871
872        let mut relay_manager = RelayManager::new(config.relay_manager, ExecutionConfig::default());
873
874        let mut channels = Channels::new_for_test();
875
876        scope(|scope| {
877            // thread for test environment
878            scope.spawn(move || {
879                let _ = channels.refill.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::RefillPump));
880                match channels.refill.receive_from_relay_manager() {
881                    Ok(c) => {
882                        assert_eq!(c, true);
883                    },
884                    Err(e) => {
885                        panic!("test_relay_manager_channel_communication: error when receiving answer for refill thread: {e:?}");
886                    }
887                }
888                let _ = channels.heating.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::Heater));
889                match channels.heating.receive_from_relay_manager() {
890                    Ok(c) => {
891                        assert_eq!(c, true);
892                    },
893                    Err(e) => {
894                        panic!("test_relay_manager_channel_communication: error when receiving answer for heating thread: {e:?}");
895                    }
896                }
897
898                let _ = channels.signal_handler.send_to_relay_manager(InternalCommand::Quit);
899                channels.signal_handler.receive_from_relay_manager().unwrap();
900            });
901
902            // thread for the test object
903            scope.spawn(move || {
904                let mut mock_actuator = ActuateVoid {
905                    execution_counter: 0,
906                    heartbeat_counter: 0,
907                    flush_counter: 0,
908                    onetime_error_result: Some(RelayError::IncorrectChecksum(
909                        module_path!().to_string(),
910                    )),
911                    permanent_error_result: None,
912                };
913
914                relay_manager.execute(&mut channels.relay_manager, &mut mock_actuator);
915
916                // Test object thread has ended.
917                assert_eq!(mock_actuator.get_execution_count(), 3); //Assertion checks the number of calls to the actuator.
918                assert_eq!(mock_actuator.flush_counter, 1); // Assertion checks the number of buffer flush operations.
919            });
920        });
921    }
922
923    #[test]
924    // Test case tests if the relay manager is calling the actuate function multiple times
925    // in case actuator returns an error.
926    pub fn test_relay_manager_repeat() {
927        let config: ConfigData =
928            read_config_file("/config/aquarium_control_test_simulator.toml".to_string()).unwrap();
929
930        let target_repeat_times = config.relay_manager.actuation_retries;
931
932        let mut relay_manager = RelayManager::new(config.relay_manager, ExecutionConfig::default());
933
934        let mut channels = Channels::new_for_test();
935
936        scope(|scope| {
937            // thread for test environment
938            scope.spawn(move || {
939                let _ = channels.refill.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::RefillPump));
940                match channels.refill.receive_from_relay_manager() {
941                    Ok(c) => {
942                        assert_eq!(c, true);
943                    },
944                    Err(e) => {
945                        panic!("test_relay_manager_channel_communication: error when receiving answer for refill thread: {e:?}");
946                    }
947                }
948
949                let _ = channels.signal_handler.send_to_relay_manager(InternalCommand::Quit);
950                channels.signal_handler.receive_from_relay_manager().unwrap();
951            });
952
953            // thread for the test object
954            scope.spawn(move || {
955                let mut mock_actuator = ActuateVoid {
956                    execution_counter: 0,
957                    heartbeat_counter: 0,
958                    flush_counter: 0,
959                    onetime_error_result: None,
960                    permanent_error_result: Some(RelayError::WriteError(
961                        module_path!().to_string(),
962                    )),
963                };
964
965                relay_manager.execute(&mut channels.relay_manager, &mut mock_actuator);
966
967                // Test object thread has ended.
968
969                // Assertion checks the number of calls to the actuator.
970                // Relay manager shall repeat in case of failure as configured by user.
971                assert_eq!(mock_actuator.get_execution_count(), target_repeat_times + 1);
972            });
973        });
974    }
975}