aquarium_control/relays/relay_manager.rs
1/* Copyright 2024 Uwe Martin
2
3Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
4
5The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
6
7THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
8*/
9#[cfg(not(test))]
10use log::error;
11#[cfg(all(not(test), target_os = "linux"))]
12use log::info;
13
14#[cfg(not(test))]
15use log::warn;
16
17#[cfg(feature = "debug_relay_manager")]
18use log::debug;
19
20#[cfg(all(not(test), target_os = "linux"))]
21use nix::unistd::gettid;
22
23use crate::launch::channels::{AquaChannelError, AquaReceiver, AquaSender};
24use crate::launch::execution_config::ExecutionConfig;
25use crate::relays::relay_error::RelayError;
26use crate::relays::relay_manager_channels::RelayManagerChannels;
27use crate::relays::relay_manager_config::RelayManagerConfig;
28use crate::utilities::channel_content::InternalCommand;
29use crate::utilities::logger::log_error_chain;
30use crate::utilities::proc_ext_req::ProcessExternalRequestTrait;
31use spin_sleep::SpinSleeper;
32use std::time::{Duration, Instant};
33
34/// Trait for the execution of relay actuation on either simulator or real hardware.
35/// This trait allows running the main control with a mock implementation for testing.
36pub trait RelayActuationTrait {
37 /// Actuates a device based on the given command.
38 ///
39 /// This is the primary method for controlling a hardware device. Implementations
40 /// should translate the high-level `InternalCommand` into a hardware-specific
41 /// action (e.g., sending a serial command, setting a GPIO pin).
42 ///
43 /// # Arguments
44 /// * `internal_command` - The command specifying the device and action to perform.
45 ///
46 /// # Returns
47 /// An empty `Result` (`Ok(())`) on successful actuation.
48 ///
49 /// # Errors
50 /// Returns a `RelayError` variant if the actuation fails. This could be due to
51 /// various reasons depending on the implementation, such as
52 /// - A failure to write to a serial port (`WriteError`).
53 /// - A corrupt response from the hardware (`IncorrectChecksum`).
54 /// - An attempt to use an unsupported command for the given hardware.
55 fn actuate(&mut self, internal_command: &InternalCommand) -> Result<(), RelayError>;
56
57 /// Gets the required interval for the heartbeat signal, if any.
58 ///
59 /// Some hardware (like the Controllino) requires a periodic "keep-alive" signal
60 /// to ensure the connection is active. This method defines that interval.
61 ///
62 /// # Returns
63 /// - `Some(u64)`: The heartbeat interval in seconds.
64 /// - `None`: If no heartbeat is required for this actuator (e.g., for direct
65 /// GPIO control or a simulator).
66 fn get_heartbeat_interval_seconds(&self) -> Option<u64>;
67
68 /// Sends a heartbeat signal to the hardware.
69 ///
70 /// This method should be called periodically according to the interval specified
71 /// by `get_heartbeat_interval_seconds` to maintain the hardware connection.
72 ///
73 /// # Returns
74 /// An empty `Result` (`Ok(())`) if the heartbeat was sent successfully.
75 ///
76 /// # Errors
77 /// Returns a `RelayError` if sending the heartbeat signal fails, for instance,
78 /// due to a disconnected serial port.
79 fn heartbeat(&mut self) -> Result<(), RelayError>;
80
81 /// Flushes the communication buffer of the hardware interface.
82 ///
83 /// This method is a recovery mechanism, typically used after a communication
84 /// error (like an incorrect checksum) to clear any lingering, corrupt data
85 /// from the input/output buffers and resynchronize communication.
86 ///
87 /// # Returns
88 /// An empty `Result` (`Ok(())`) if the buffer was flushed successfully.
89 ///
90 /// # Errors
91 /// Returns a `RelayError` if the flush operation fails, which could indicate
92 /// a deeper issue with the serial port or hardware driver.
93 fn flush_buffer(&mut self) -> Result<(), RelayError>;
94}
95
96#[cfg_attr(doc, aquamarine::aquamarine)]
97/// Contains the configuration and the implementation of the relay manager.
98/// Thread communication of this component is as follows:
99/// ```mermaid
100/// graph LR
101/// relay_manager[Relay manager] --> tcp_communication[TCP communication]
102/// relay_manager --> refill[Refill control]
103/// refill --> relay_manager
104/// relay_manager --> heating[Heating control]
105/// heating --> relay_manager
106/// relay_manager --> balling[Balling dosing control]
107/// balling --> relay_manager
108/// relay_manager --> feed[Feed control]
109/// feed --> relay_manager
110/// relay_manager --> signal_handler[Signal handler]
111/// signal_handler --> relay_manager
112/// relay_manager --> ventilation[Ventilation control]
113/// ventilation --> relay_manager
114/// ```
115pub struct RelayManager {
116 /// configuration data for relay manager
117 config: RelayManagerConfig,
118
119 /// inhibition flag to avoid flooding the log file with repeated messages of failure to send to refill control via the channel
120 lock_error_channel_send_refill: bool,
121
122 /// inhibition flag to avoid flooding the log file with repeated messages of failure to receive from refill control via the channel
123 lock_error_channel_receive_refill: bool,
124
125 /// inhibition flag to avoid flooding the log file with repeated messages of failure to send to feed control via the channel
126 lock_error_channel_send_feed: bool,
127
128 /// inhibition flag to avoid flooding the log file with repeated messages of failure to receive from feed control via the channel
129 lock_error_channel_receive_feed: bool,
130
131 /// inhibition flag to avoid flooding the log file with repeated messages of failure to send to ventilation control via the channel
132 lock_error_channel_send_ventilation: bool,
133
134 /// inhibition flag to avoid flooding the log file with repeated messages of failure to receive from ventilation control via the channel
135 lock_error_channel_receive_ventilation: bool,
136
137 /// inhibition flag to avoid flooding the log file with repeated messages of failure to send to heating control via the channel
138 lock_error_channel_send_heating: bool,
139
140 /// inhibition flag to avoid flooding the log file with repeated messages of failure to receive from heating control via the channel
141 lock_error_channel_receive_heating: bool,
142
143 /// inhibition flag to avoid flooding the log file with repeated messages of failure to send to balling mineral dosing via the channel
144 lock_error_channel_send_balling: bool,
145
146 /// inhibition flag to avoid flooding the log file with repeated messages of failure to receive from balling mineral dosing via the channel
147 lock_error_channel_receive_balling: bool,
148
149 /// information about which threads have been started
150 execution_config: ExecutionConfig,
151}
152
153impl RelayManager {
154 /// Creates a new `RelayManager` instance.
155 ///
156 /// This constructor initializes the relay management module with its configuration.
157 /// It sets up various internal lock flags to `false`, which are used to prevent
158 /// repetitive error and warning messages from flooding the log file during operation,
159 /// especially concerning channel communication and actuation errors.
160 ///
161 /// # Arguments
162 /// * `config` - Configuration data for the relay manager, loaded from a TOML file.
163 ///
164 /// # Returns
165 /// A new `RelayManager` struct, ready to handle relay actuation commands
166 /// and communicate with other modules.
167 pub fn new(config: RelayManagerConfig, execution_config: ExecutionConfig) -> RelayManager {
168 RelayManager {
169 config,
170 lock_error_channel_send_refill: false,
171 lock_error_channel_receive_refill: false,
172 lock_error_channel_send_heating: false,
173 lock_error_channel_receive_heating: false,
174 lock_error_channel_send_ventilation: false,
175 lock_error_channel_receive_ventilation: false,
176 lock_error_channel_send_balling: false,
177 lock_error_channel_receive_balling: false,
178 lock_error_channel_send_feed: false,
179 lock_error_channel_receive_feed: false,
180 execution_config,
181 }
182 }
183
184 /// Attempts to actuate a given command, with retries on failure.
185 ///
186 /// This helper function encapsulates the retry logic for actuator commands. It
187 /// will attempt the actuation at least once, and if it fails, it will retry
188 /// up to the number of times specified by `self.config.actuation_retries`.
189 /// If a failure is due to an `IncorrectChecksum`, it will attempt to flush the
190 /// buffer before retrying.
191 ///
192 /// # Arguments
193 /// * `actuator` - A mutable reference to an object implementing `RelayActuationTrait`.
194 /// * `command` - The `InternalCommand` to be executed.
195 /// * `actuation_name` - A descriptive name for the action (e.g., "refill pump") for logging.
196 /// * `spin_sleeper` - A `SpinSleeper` to pause between retries.
197 /// * `sleep_duration` - The `Duration` to wait between retry attempts.
198 ///
199 /// # Returns
200 /// An empty `Result` (`Ok(())`) if the actuation succeeds, possibly after one or more retries.
201 ///
202 /// # Errors
203 /// Returns `Err(RelayError::ActuationFailed)` if all attempts (the initial one plus all
204 /// retries) fail. This error is a wrapper that contains detailed context, including
205 /// - The name of the actuation that failed.
206 /// - The total number of attempts made.
207 /// - The specific error from the *last* attempt, providing the root cause of the final failure.
208 fn actuate_with_retries(
209 &mut self,
210 actuator: &mut impl RelayActuationTrait,
211 command: &InternalCommand,
212 actuation_name: &str, // Name for logging, e.g., "refill pump", "heater"
213 spin_sleeper: &mut SpinSleeper,
214 sleep_duration: Duration,
215 ) -> Result<(), RelayError> {
216 let mut attempts = 0;
217 let max_attempts = 1 + self.config.actuation_retries; // 1 initial attempt + `actuation_retries` retries
218 let mut error_to_be_reported = RelayError::InitialValue(module_path!().to_string());
219
220 while attempts < max_attempts {
221 match actuator.actuate(command) {
222 Ok(()) => {
223 if attempts > 0 {
224 #[cfg(not(test))]
225 warn!(
226 target: module_path!(),
227 "{actuation_name}: actuation succeeded after {attempts} retries."
228 );
229 #[cfg(test)] // to avoid warning when compiling for test
230 if actuation_name == "not existing name" {
231 println!("Actuation with strange name: {}", actuation_name);
232 }
233 }
234 return Ok(());
235 }
236 Err(e) => {
237 if matches!(e, RelayError::IncorrectChecksum(_)) {
238 if let Err(f) = actuator.flush_buffer() {
239 // if we cannot flush the buffer, mission abort
240 error_to_be_reported = f;
241 break;
242 }
243 }
244
245 // Log error only after the final attempt
246 if attempts == max_attempts - 1 {
247 error_to_be_reported = e;
248 break;
249 } else {
250 // Wait a little before retrying
251 spin_sleeper.sleep(sleep_duration);
252 }
253 attempts += 1;
254 }
255 }
256 }
257 Err(RelayError::ActuationFailed {
258 location: module_path!().to_string(),
259 actuation_name: actuation_name.to_string(),
260 last_attempt_number: attempts + 1,
261 max_attempt_allowed: max_attempts,
262 source: Box::new(error_to_be_reported),
263 })
264 }
265
266 /// Executes the main control loop for the Relay Manager.
267 ///
268 /// This function runs continuously, acting as the central dispatcher for actuation commands
269 /// received from various control modules (Refill, Heating, etc.). It uses a helper function,
270 /// `process_actuation_request`, to handle the logic for each module's channel pair,
271 /// keeping the main loop clean and readable.
272 ///
273 /// The loop forwards commands to the provided `actuator` and handles graceful shutdown
274 /// when a `Quit` command is received from the signal handler. It also manages to send
275 /// periodic heartbeats to the hardware if required.
276 ///
277 /// # Arguments
278 /// * `relay_manager_channels` - A mutable reference to the struct containing all `mpsc` channels necessary for
279 /// communication with other control threads.
280 /// * `actuator` - A mutable reference to an object implementing the `RelayActuationTrait`,
281 /// which handles the actual hardware interaction or simulation.
282 pub fn execute(
283 &mut self,
284 relay_manager_channels: &mut RelayManagerChannels,
285 actuator: &mut impl RelayActuationTrait,
286 ) {
287 #[cfg(all(target_os = "linux", not(test)))]
288 info!(target: module_path!(), "Thread started with TID: {}", gettid());
289
290 let mut last_heartbeat_time = Instant::now();
291 let mut spin_sleeper = SpinSleeper::default();
292 let sleep_duration_between_retries =
293 Duration::from_millis(self.config.pause_between_retries_millis);
294 let sleep_duration_main_cycle = Duration::from_millis(10);
295
296 loop {
297 // Check for quit command first
298 if self
299 .process_external_request(
300 &mut relay_manager_channels.rx_relay_manager_from_signal_handler,
301 None,
302 )
303 .0
304 {
305 #[cfg(feature = "debug_relay_manager")]
306 debug!(target: module_path!(), "received QUIT command");
307 break;
308 }
309
310 if self.execution_config.refill {
311 (
312 self.lock_error_channel_receive_refill,
313 self.lock_error_channel_send_refill,
314 ) = self.process_actuation_request(
315 (
316 &mut relay_manager_channels.rx_relay_manager_from_refill,
317 &mut relay_manager_channels.tx_relay_manager_to_refill,
318 ),
319 (
320 self.lock_error_channel_receive_refill,
321 self.lock_error_channel_send_refill,
322 ),
323 "refill",
324 actuator,
325 &mut spin_sleeper,
326 sleep_duration_between_retries,
327 );
328 }
329
330 if self.execution_config.heating {
331 (
332 self.lock_error_channel_receive_heating,
333 self.lock_error_channel_send_heating,
334 ) = self.process_actuation_request(
335 (
336 &mut relay_manager_channels.rx_relay_manager_from_heating,
337 &mut relay_manager_channels.tx_relay_manager_to_heating,
338 ),
339 (
340 self.lock_error_channel_receive_heating,
341 self.lock_error_channel_send_heating,
342 ),
343 "heating",
344 actuator,
345 &mut spin_sleeper,
346 sleep_duration_between_retries,
347 );
348 }
349
350 if self.execution_config.ventilation {
351 (
352 self.lock_error_channel_receive_ventilation,
353 self.lock_error_channel_send_ventilation,
354 ) = self.process_actuation_request(
355 (
356 &mut relay_manager_channels.rx_relay_manager_from_ventilation,
357 &mut relay_manager_channels.tx_relay_manager_to_ventilation,
358 ),
359 (
360 self.lock_error_channel_receive_ventilation,
361 self.lock_error_channel_send_ventilation,
362 ),
363 "ventilation",
364 actuator,
365 &mut spin_sleeper,
366 sleep_duration_between_retries,
367 );
368 }
369
370 if self.execution_config.balling {
371 (
372 self.lock_error_channel_receive_balling,
373 self.lock_error_channel_send_balling,
374 ) = self.process_actuation_request(
375 (
376 &mut relay_manager_channels.rx_relay_manager_from_balling,
377 &mut relay_manager_channels.tx_relay_manager_to_balling,
378 ),
379 (
380 self.lock_error_channel_receive_balling,
381 self.lock_error_channel_send_balling,
382 ),
383 "Balling dosing",
384 actuator,
385 &mut spin_sleeper,
386 sleep_duration_between_retries,
387 );
388 }
389
390 if self.execution_config.feed {
391 (
392 self.lock_error_channel_receive_feed,
393 self.lock_error_channel_send_feed,
394 ) = self.process_actuation_request(
395 (
396 &mut relay_manager_channels.rx_relay_manager_from_feed,
397 &mut relay_manager_channels.tx_relay_manager_to_feed,
398 ),
399 (
400 self.lock_error_channel_receive_feed,
401 self.lock_error_channel_send_feed,
402 ),
403 "feed",
404 actuator,
405 &mut spin_sleeper,
406 sleep_duration_between_retries,
407 );
408 }
409
410 // *** send heartbeat ***
411 if let Some(interval_secs) = actuator.get_heartbeat_interval_seconds() {
412 if Instant::now().duration_since(last_heartbeat_time).as_secs() >= interval_secs {
413 if let Err(e) = actuator.heartbeat() {
414 log_error_chain(
415 module_path!(),
416 "Execution of heartbeat for relay control failed.",
417 e,
418 );
419 }
420 last_heartbeat_time = Instant::now();
421 }
422 }
423
424 // have a minimum of sleep time in between checking channels
425 spin_sleeper.sleep(sleep_duration_main_cycle);
426 }
427
428 #[cfg(feature = "debug_relay_manager")]
429 debug!(
430 target: module_path!(),
431 "exited loop, sending back confirmation to signal handler"
432 );
433
434 // Application received request to terminate. That is why the loop was left.
435 // Answer to the thread which sent the request for termination, so that shutdown can proceed further.
436 if let Err(_e) = relay_manager_channels
437 .tx_relay_manager_to_signal_handler
438 .send(true)
439 {
440 #[cfg(not(test))]
441 error!(target: module_path!(), "Sending confirmation to signal handler failed. ({_e:?})");
442 };
443 }
444
445 /// Helper function to process an actuation request from a single channel pair.
446 ///
447 /// This function encapsulates the logic for receiving a command, actuating it with retries,
448 /// and sending a confirmation response. It is used to de-duplicate the main `execute` loop.
449 ///
450 /// It takes the current state of the log-suppression flags (`lock_receive`, `lock_send`)
451 /// by value and returns their updated state in a tuple. This pattern is used to work
452 /// around Rust's borrowing rules, as the flags are fields on `RelayManager` and cannot
453 /// be mutably borrowed multiple times in the `execute` loop.
454 ///
455 /// # Arguments
456 /// * `rx` - The channel for receiving `InternalCommand`s from a control module.
457 /// * `tx` - The channel for sending a boolean confirmation back to the control module.
458 /// * `lock_receive` - The current state of the receive-error lock flag.
459 /// * `lock_send` - The current state of the send error lock flag.
460 /// * `actuation_name` - A descriptive name for the action (e.g., "refill") for logging.
461 /// * `actuator` - A mutable reference to the hardware actuator.
462 /// * `spin_sleeper` - A `SpinSleeper` to pause between retries.
463 /// * `sleep_duration_between_retries` - The `Duration` to wait between retry attempts.
464 ///
465 /// # Returns
466 /// A tuple `(bool, bool)` containing the updated state of the `lock_receive` and
467 /// `lock_send` flags, respectively.
468 fn process_actuation_request(
469 &mut self,
470 channels: (&mut AquaReceiver<InternalCommand>, &mut AquaSender<bool>),
471 locks: (bool, bool),
472 actuation_name: &str,
473 actuator: &mut impl RelayActuationTrait,
474 spin_sleeper: &mut SpinSleeper,
475 sleep_duration_between_retries: Duration,
476 ) -> (bool, bool) {
477 let (rx, tx) = channels;
478 let (mut lock_receive, mut lock_send) = locks;
479 match rx.try_recv() {
480 Ok(_command) => {
481 lock_receive = false;
482 if self.config.active {
483 if let Err(_actuation_error) = self.actuate_with_retries(
484 actuator,
485 &_command,
486 actuation_name,
487 spin_sleeper,
488 sleep_duration_between_retries,
489 ) {
490 #[cfg(not(test))]
491 log_error_chain(
492 module_path!(),
493 &format!("Actuation of {actuation_name} failed."),
494 _actuation_error,
495 );
496 }
497 }
498 if let Err(_e) = tx.send(true) {
499 if !lock_send {
500 #[cfg(not(test))]
501 error!(
502 target: module_path!(),
503 "responding to {actuation_name} control ({_e:?})",
504 );
505 lock_send = true;
506 }
507 } else {
508 lock_send = false;
509 }
510 }
511 Err(e) => match e {
512 #[cfg(feature = "debug_channels")]
513 AquaChannelError::Full => { /* Not applicable. Do nothing */ }
514 AquaChannelError::Empty => {
515 lock_receive = false;
516 }
517 AquaChannelError::Disconnected => {
518 if !lock_receive {
519 #[cfg(not(test))]
520 error!(
521 "{}: error when trying to receive command from {actuation_name} control ({e:?})",
522 module_path!(),
523 );
524 lock_receive = true;
525 }
526 }
527 },
528 }
529 (lock_receive, lock_send)
530 }
531}
532
533#[cfg(test)]
534pub mod tests {
535 use crate::launch::channels::Channels;
536 use crate::launch::execution_config::ExecutionConfig;
537 use crate::relays::relay_manager::{RelayActuationTrait, RelayError, RelayManager};
538 use crate::utilities::channel_content::{AquariumDevice, InternalCommand};
539 use crate::utilities::config::{read_config_file, ConfigData};
540 use spin_sleep::SpinSleeper;
541 use std::thread::scope;
542 use std::time::Duration;
543
544 // Contains mock trait implementation for unit testing purposes.
545 pub struct ActuateVoid {
546 // execution counter for testing purposes
547 execution_counter: u64,
548
549 // heartbeat counter for testing purposes
550 heartbeat_counter: u64,
551
552 // flush counter for testing purposes
553 flush_counter: u64,
554
555 // injection of error: if not None, this value is used one time by actuate and then set to None
556 onetime_error_result: Option<RelayError>,
557
558 // injection of error: if not None, this value is used one time by actuate and then set to None
559 permanent_error_result: Option<RelayError>,
560 }
561
562 impl RelayActuationTrait for ActuateVoid {
563 // Mock function for testing
564 fn actuate(&mut self, internal_command: &InternalCommand) -> Result<(), RelayError> {
565 println!(
566 "Received {} - increasing execution counter (currently at {})",
567 internal_command, self.execution_counter
568 );
569 self.execution_counter += 1;
570 if self.onetime_error_result.is_some() {
571 // call will replace error value with "None" - inducing Ok-result for next call
572 // of actuate
573 Err(self.onetime_error_result.take().unwrap())
574 } else {
575 if self.permanent_error_result.is_some() {
576 // Workaround: RelayError does not implement the Clone trait.
577 // WriteError is the only error variant used by the test cases.
578 Err(RelayError::WriteError(module_path!().to_string()))
579 } else {
580 Ok(())
581 }
582 }
583 }
584
585 fn get_heartbeat_interval_seconds(&self) -> Option<u64> {
586 Some(1)
587 }
588
589 fn heartbeat(&mut self) -> Result<(), RelayError> {
590 self.heartbeat_counter += 1;
591 Ok(())
592 }
593
594 fn flush_buffer(&mut self) -> Result<(), RelayError> {
595 self.flush_counter += 1;
596 Ok(())
597 }
598 }
599
600 impl ActuateVoid {
601 // provides the number of executions to the test case for assertion
602 pub fn get_execution_count(&self) -> u64 {
603 self.execution_counter
604 }
605 }
606
607 #[test]
608 // Test case tests the channel communication for the .execute function of relay manager.
609 // It also checks the heartbeat communication.
610 pub fn test_relay_manager_channel_communication() {
611 let config: ConfigData =
612 read_config_file("/config/aquarium_control_test_simulator.toml".to_string()).unwrap();
613
614 let spin_sleeper = SpinSleeper::default();
615 let sleep_duration = Duration::from_secs(2);
616
617 let mut relay_manager = RelayManager::new(config.relay_manager, ExecutionConfig::default());
618
619 let mut channels = Channels::new_for_test();
620
621 scope(|scope| {
622 // thread for test environment
623 scope.spawn(move || {
624 let _ = channels.refill.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::RefillPump));
625 match channels.refill.receive_from_relay_manager() {
626 Ok(c) => {
627 assert_eq!(c, true);
628 },
629 Err(e) => {
630 panic!("test_relay_manager_channel_communication: error when receiving answer for refill thread: {e:?}");
631 }
632 }
633 let _ = channels.heating.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::Heater));
634 match channels.heating.receive_from_relay_manager() {
635 Ok(c) => {
636 assert_eq!(c, true);
637 },
638 Err(e) => {
639 panic!("test_relay_manager_channel_communication: error when receiving answer for heating thread: {e:?}");
640 }
641 }
642 let _ = channels.ventilation.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::Ventilation));
643 match channels.ventilation.receive_from_relay_manager() {
644 Ok(c) => {
645 assert_eq!(c, true);
646 },
647 Err(e) => {
648 panic!("test_relay_manager_channel_communication: error when receiving answer for ventilation thread: {e:?}");
649 }
650 }
651 let _ = channels.balling.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::PeristalticPump1));
652 match channels.balling.receive_from_relay_manager() {
653 Ok(c) => {
654 assert_eq!(c, true);
655 },
656 Err(e) => {
657 panic!("test_relay_manager_channel_communication: error when receiving answer for Balling dosing thread: {e:?}");
658 }
659 }
660 let _ = channels.balling.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::PeristalticPump2));
661 match channels.balling.receive_from_relay_manager() {
662 Ok(c) => {
663 assert_eq!(c, true);
664 },
665 Err(e) => {
666 panic!("test_relay_manager_channel_communication: error when receiving answer for Balling dosing thread: {e:?}");
667 }
668 }
669 let _ = channels.balling.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::PeristalticPump3));
670 match channels.balling.receive_from_relay_manager() {
671 Ok(c) => {
672 assert_eq!(c, true);
673 },
674 Err(e) => {
675 panic!("test_relay_manager_channel_communication: error when receiving answer for Balling dosing thread: {e:?}");
676 }
677 }
678 let _ = channels.balling.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::PeristalticPump4));
679 match channels.balling.receive_from_relay_manager() {
680 Ok(c) => {
681 assert_eq!(c, true);
682 },
683 Err(e) => {
684 panic!("test_relay_manager_channel_communication: error when receiving answer for Balling dosing thread: {e:?}");
685 }
686 }
687 let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::Feeder));
688 match channels.feed.receive_from_relay_manager() {
689 Ok(c) => {
690 assert_eq!(c, true);
691 },
692 Err(e) => {
693 panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
694 }
695 }
696 let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::MainPump1));
697 match channels.feed.receive_from_relay_manager() {
698 Ok(c) => {
699 assert_eq!(c, true);
700 },
701 Err(e) => {
702 panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
703 }
704 }
705 let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::MainPump2));
706 match channels.feed.receive_from_relay_manager() {
707 Ok(c) => {
708 assert_eq!(c, true);
709 },
710 Err(e) => {
711 panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
712 }
713 }
714 let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::AuxPump1));
715 match channels.feed.receive_from_relay_manager() {
716 Ok(c) => {
717 assert_eq!(c, true);
718 },
719 Err(e) => {
720 panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
721 }
722 }
723 let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::AuxPump2));
724 match channels.feed.receive_from_relay_manager() {
725 Ok(c) => {
726 assert_eq!(c, true);
727 },
728 Err(e) => {
729 panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
730 }
731 }
732 let _ = channels.refill.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::RefillPump));
733 match channels.refill.receive_from_relay_manager() {
734 Ok(c) => {
735 assert_eq!(c, true);
736 },
737 Err(e) => {
738 panic!("test_relay_manager_channel_communication: error when receiving answer for refill thread: {e:?}");
739 }
740 }
741 let _ = channels.heating.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::Heater));
742 match channels.heating.receive_from_relay_manager() {
743 Ok(c) => {
744 assert_eq!(c, true);
745 },
746 Err(e) => {
747 panic!("test_relay_manager_channel_communication: error when receiving answer for heating thread: {e:?}");
748 }
749 }
750 let _ = channels.ventilation.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::Ventilation));
751 match channels.ventilation.receive_from_relay_manager() {
752 Ok(c) => {
753 assert_eq!(c, true);
754 },
755 Err(e) => {
756 panic!("test_relay_manager_channel_communication: error when receiving answer for ventilation thread: {e:?}");
757 }
758 }
759 let _ = channels.balling.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::PeristalticPump1));
760 match channels.balling.receive_from_relay_manager() {
761 Ok(c) => {
762 assert_eq!(c, true);
763 },
764 Err(e) => {
765 panic!("test_relay_manager_channel_communication: error when receiving answer for Balling dosing thread: {e:?}");
766 }
767 }
768 let _ = channels.balling.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::PeristalticPump2));
769 match channels.balling.receive_from_relay_manager() {
770 Ok(c) => {
771 assert_eq!(c, true);
772 },
773 Err(e) => {
774 panic!("test_relay_manager_channel_communication: error when receiving answer for Balling dosing thread: {e:?}");
775 }
776 }
777 let _ = channels.balling.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::PeristalticPump3));
778 match channels.balling.receive_from_relay_manager() {
779 Ok(c) => {
780 assert_eq!(c, true);
781 },
782 Err(e) => {
783 panic!("test_relay_manager_channel_communication: error when receiving answer for Balling dosing thread: {e:?}");
784 }
785 }
786 let _ = channels.balling.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::PeristalticPump4));
787 match channels.balling.receive_from_relay_manager() {
788 Ok(c) => {
789 assert_eq!(c, true);
790 },
791 Err(e) => {
792 panic!("test_relay_manager_channel_communication: error when receiving answer for Balling dosing thread: {e:?}");
793 }
794 }
795 let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::Feeder));
796 match channels.feed.receive_from_relay_manager() {
797 Ok(c) => {
798 assert_eq!(c, true);
799 },
800 Err(e) => {
801 panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
802 }
803 }
804 let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::MainPump1));
805 match channels.feed.receive_from_relay_manager() {
806 Ok(c) => {
807 assert_eq!(c, true);
808 },
809 Err(e) => {
810 panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
811 }
812 }
813 let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::MainPump2));
814 match channels.feed.receive_from_relay_manager() {
815 Ok(c) => {
816 assert_eq!(c, true);
817 },
818 Err(e) => {
819 panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
820 }
821 }
822 let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::AuxPump1));
823 match channels.feed.receive_from_relay_manager() {
824 Ok(c) => {
825 assert_eq!(c, true);
826 },
827 Err(e) => {
828 panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
829 }
830 }
831 let _ = channels.feed.send_to_relay_manager(InternalCommand::SwitchOff(AquariumDevice::AuxPump2));
832 match channels.feed.receive_from_relay_manager() {
833 Ok(c) => {
834 assert_eq!(c, true);
835 },
836 Err(e) => {
837 panic!("test_relay_manager_channel_communication: error when receiving answer for Feed thread: {e:?}");
838 }
839 }
840
841 spin_sleeper.sleep(sleep_duration); // wait for two seconds to test the heartbeat
842
843 let _ = channels.signal_handler.send_to_relay_manager(InternalCommand::Quit);
844 channels.signal_handler.receive_from_relay_manager().unwrap();
845 });
846
847 // thread for the test object
848 scope.spawn(move || {
849 let mut mock_actuator = ActuateVoid {
850 execution_counter: 0,
851 heartbeat_counter: 0,
852 flush_counter: 0,
853 onetime_error_result: None,
854 permanent_error_result: None,
855 };
856
857 relay_manager.execute(&mut channels.relay_manager, &mut mock_actuator);
858
859 // Test object thread has ended.
860 assert_eq!(mock_actuator.get_execution_count(), 24); //Assertion checks the number of calls to the actuator.
861 assert_eq!(mock_actuator.heartbeat_counter, 2); // Assertion checks the number of heartbeats.
862 });
863 });
864 }
865 #[test]
866 // Test case tests if the relay manager is calling the flush functionality when the
867 // actuator returns the IncorrectChecksum error.
868 pub fn test_relay_manager_flush() {
869 let config: ConfigData =
870 read_config_file("/config/aquarium_control_test_simulator.toml".to_string()).unwrap();
871
872 let mut relay_manager = RelayManager::new(config.relay_manager, ExecutionConfig::default());
873
874 let mut channels = Channels::new_for_test();
875
876 scope(|scope| {
877 // thread for test environment
878 scope.spawn(move || {
879 let _ = channels.refill.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::RefillPump));
880 match channels.refill.receive_from_relay_manager() {
881 Ok(c) => {
882 assert_eq!(c, true);
883 },
884 Err(e) => {
885 panic!("test_relay_manager_channel_communication: error when receiving answer for refill thread: {e:?}");
886 }
887 }
888 let _ = channels.heating.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::Heater));
889 match channels.heating.receive_from_relay_manager() {
890 Ok(c) => {
891 assert_eq!(c, true);
892 },
893 Err(e) => {
894 panic!("test_relay_manager_channel_communication: error when receiving answer for heating thread: {e:?}");
895 }
896 }
897
898 let _ = channels.signal_handler.send_to_relay_manager(InternalCommand::Quit);
899 channels.signal_handler.receive_from_relay_manager().unwrap();
900 });
901
902 // thread for the test object
903 scope.spawn(move || {
904 let mut mock_actuator = ActuateVoid {
905 execution_counter: 0,
906 heartbeat_counter: 0,
907 flush_counter: 0,
908 onetime_error_result: Some(RelayError::IncorrectChecksum(
909 module_path!().to_string(),
910 )),
911 permanent_error_result: None,
912 };
913
914 relay_manager.execute(&mut channels.relay_manager, &mut mock_actuator);
915
916 // Test object thread has ended.
917 assert_eq!(mock_actuator.get_execution_count(), 3); //Assertion checks the number of calls to the actuator.
918 assert_eq!(mock_actuator.flush_counter, 1); // Assertion checks the number of buffer flush operations.
919 });
920 });
921 }
922
923 #[test]
924 // Test case tests if the relay manager is calling the actuate function multiple times
925 // in case actuator returns an error.
926 pub fn test_relay_manager_repeat() {
927 let config: ConfigData =
928 read_config_file("/config/aquarium_control_test_simulator.toml".to_string()).unwrap();
929
930 let target_repeat_times = config.relay_manager.actuation_retries;
931
932 let mut relay_manager = RelayManager::new(config.relay_manager, ExecutionConfig::default());
933
934 let mut channels = Channels::new_for_test();
935
936 scope(|scope| {
937 // thread for test environment
938 scope.spawn(move || {
939 let _ = channels.refill.send_to_relay_manager(InternalCommand::SwitchOn(AquariumDevice::RefillPump));
940 match channels.refill.receive_from_relay_manager() {
941 Ok(c) => {
942 assert_eq!(c, true);
943 },
944 Err(e) => {
945 panic!("test_relay_manager_channel_communication: error when receiving answer for refill thread: {e:?}");
946 }
947 }
948
949 let _ = channels.signal_handler.send_to_relay_manager(InternalCommand::Quit);
950 channels.signal_handler.receive_from_relay_manager().unwrap();
951 });
952
953 // thread for the test object
954 scope.spawn(move || {
955 let mut mock_actuator = ActuateVoid {
956 execution_counter: 0,
957 heartbeat_counter: 0,
958 flush_counter: 0,
959 onetime_error_result: None,
960 permanent_error_result: Some(RelayError::WriteError(
961 module_path!().to_string(),
962 )),
963 };
964
965 relay_manager.execute(&mut channels.relay_manager, &mut mock_actuator);
966
967 // Test object thread has ended.
968
969 // Assertion checks the number of calls to the actuator.
970 // Relay manager shall repeat in case of failure as configured by user.
971 assert_eq!(mock_actuator.get_execution_count(), target_repeat_times + 1);
972 });
973 });
974 }
975}