aquarium_control/dispatch/
messaging.rs

1/* Copyright 2024 Uwe Martin
2
3Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
4
5The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
6
7THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
8*/
9
10//! Provides the Inter-Process Communication (IPC) bridge for external control.
11//!
12//! This module is responsible for listening to a POSIX message queue for commands sent
13//! from external processes (e.g., the command-line utility `aquarium_control` or the web server).
14//! It acts as the central point of dispatcher, translating these external messages into internal
15//! commands and forwarding them to the appropriate application threads
16//! (like `Refill`, `Heating`, `Feed`, etc.).
17//!
18//! This entire module is conditionally compiled and is only available on `target_os = "linux"`.
19//!
20//! ## Key Components
21//!
22//! - **`Messaging` Struct**: The core component that holds the POSIX message queue connection
23//!   and runs the main dispatch loop.
24//!
25//! - **`Message` Struct**: A simple data structure representing the raw command received
26//!   from the message queue containing a `domain`, `command`, and parameters.
27//!
28//! - **`execute()` Method**: The main loop of the `Messaging` thread. It performs the following:
29//!   1.  Listens for incoming byte messages on the POSIX queue.
30//!   2.  Parses the raw bytes into a `Message` struct.
31//!   3.  Interprets the message's `domain` (e.g., `Refill`, `Heating`) and `command`.
32//!   4.  Sends the corresponding `InternalCommand` to the target thread via its channel.
33//!   5.  Listens for `Quit` and `Terminate` signals from the `signal_handler` for graceful shutdown.
34//!
35//! ## Design and Architecture
36//!
37//! The `Messaging` module serves as a crucial bridge between the running application and
38//! the outside world, allowing for dynamic control without restarting the service.
39//!
40//! - **Decoupling**: It decouples the internal workings of the application from the
41//!   specifics of the external control mechanism.
42//! - **Centralized Dispatch**: All external commands flow through this single point,
43//!   making the control flow easy to understand and debug.
44//! - **Robustness**: It includes logic to handle unknown domains or commands gracefully
45//!   by logging a warning and ignoring the message.
46//!
47//! ### Example Message Flow
48//!
49//! 1.  An external script sends a byte message to the `/aquarium_control` POSIX message queue.
50//!     The message contains bytes representing: `domain=HEATING`, `command=STOP`.
51//! 2.  The `Messaging::execute()` loop receives these bytes.
52//! 3.  It parses the bytes into a `Message` struct.
53//! 4.  It matches the `domain` to `MessagingDomain::Heating` and the `command` to `InternalCommand::Stop`.
54//! 5.  It sends `InternalCommand::Stop` to the `Heating` thread's channel.
55//! 6.  The `Heating` thread receives the command and stops its operation.
56//!
57//! The mermaid diagram in the `Messaging` struct documentation provides a visual overview
58//! of the communication paths.
59
60#[cfg(feature = "debug_messaging")]
61use log::debug;
62
63cfg_if::cfg_if! {
64    if #[cfg(target_os = "linux")] {
65        #[cfg(not(test))]
66        use log::info;
67
68        use log::{error, warn};
69        use posixmq::PosixMq;
70        use std::time::Duration;
71        use spin_sleep::SpinSleeper;
72        #[cfg(not(test))]
73        use nix::unistd::gettid;
74
75        use crate::utilities::channel_content::InternalCommand;
76        use crate::dispatch::messaging_channels::MessagingChannels;
77        use crate::dispatch::messaging_error::MessagingError;
78        use crate::utilities::proc_ext_req::ProcessExternalRequestTrait;
79        use crate::utilities::wait_for_termination::WaitForTerminationTrait;
80        use crate::utilities::acknowledge_signal_handler::AcknowledgeSignalHandlerTrait;
81        use crate::dispatch::messaging_domain::MessagingDomain;
82        use crate::dispatch::messaging_config::MessagingConfig;
83        use crate::launch::execution_config::ExecutionConfig;
84        use std::io::ErrorKind;
85    }
86}
87
88/// Stores the content of the message and provides functionality for testing.
89#[allow(unused)]
90pub struct Message {
91    domain: i32,
92    pub command: i32,
93    pub command_param1: i32,
94    command_param2: i32,
95}
96
97#[allow(unused)]
98impl Message {
99    /// Creates a new `Message` instance with all its fields initialized to zero.
100    ///
101    /// This constructor provides a default, empty message state, which can then be
102    /// populated with specific domain, command, and parameter values.
103    ///
104    /// # Returns
105    /// A new `Message` struct with `domain`, `command`, `command_param1`,
106    /// and `command_param2` all set to `0`.
107    pub fn new() -> Message {
108        Message {
109            domain: 0,
110            command: 0,
111            command_param1: 0,
112            command_param2: 0,
113        }
114    }
115
116    #[cfg(test)]
117    /// create a message represented as a raw byte array for testing
118    pub fn create_message_buf(&self) -> Vec<u8> {
119        let mut buf = vec![0; 20];
120        let domain_buf = self.domain.to_le_bytes();
121        let command_buf = self.command.to_le_bytes();
122        let command_param1_buf = self.command_param1.to_le_bytes();
123        let command_param2_buf = self.command_param2.to_le_bytes();
124
125        // transfer partial arrays to array for a complete message
126        buf[4] = domain_buf[0];
127        buf[5] = domain_buf[1];
128        buf[6] = domain_buf[2];
129        buf[7] = domain_buf[3];
130        buf[8] = command_buf[0];
131        buf[9] = command_buf[1];
132        buf[10] = command_buf[2];
133        buf[11] = command_buf[3];
134        buf[12] = command_param1_buf[0];
135        buf[13] = command_param1_buf[1];
136        buf[14] = command_param1_buf[2];
137        buf[15] = command_param1_buf[3];
138        buf[16] = command_param2_buf[0];
139        buf[17] = command_param2_buf[1];
140        buf[18] = command_param2_buf[2];
141        buf[19] = command_param2_buf[3];
142
143        buf
144    }
145}
146
147#[cfg_attr(doc, aquamarine::aquamarine)]
148/// Struct provides data structure and implementation for IPC message queue communication.
149/// Thread communication of this component is as follows:
150/// ```mermaid
151/// graph LR
152///     signal_handler[Signal handler] --> messaging[Messaging]
153///     messaging --> refill[Refill control]
154///     messaging --> ventilation[Ventilation control]
155///     messaging --> heating[Heating control]
156///     messaging --> feed[Feed control]
157///     messaging --> balling[Balling dosing control]
158///     messaging --> monitors[Monitors]
159///     messaging --> watchdog[Watchdog]
160/// ```
161#[cfg(target_os = "linux")]
162pub struct Messaging {
163    config: MessagingConfig,
164
165    // Message queue
166    mq: PosixMq,
167    mq_max_msg_len: usize,
168
169    /// an inhibition flag to avoid flooding the log file with repeated messages about having received an inapplicable command
170    pub lock_warn_inapplicable_command_signal_handler: bool,
171
172    /// an inhibition flag to avoid flooding the log file with repeated messages about failure to receive termination signal via the channel
173    pub lock_error_channel_receive_termination: bool,
174
175    /// configuration data indicating which threads are started
176    execution_config: ExecutionConfig,
177}
178
179#[cfg(target_os = "linux")]
180impl Messaging {
181    /// Opens and initializes the POSIX message queue for inter-process communication.
182    ///
183    /// This constructor attempts to open the message queue specified in the configuration.
184    /// It then retrieves the maximum message length supported by this queue.
185    /// This function is specifically enabled for Linux operating systems because
186    /// the messaging subsystem is only supported by this operating system and not by
187    /// other operating systems (e.g., macOS).
188    ///
189    /// # Arguments
190    /// * `config` - Configuration data for the messaging module, primarily containing
191    ///   the `mq_filename` (the name of the POSIX message queue).
192    /// * `execution_config` - Configuration data indicating which threads are started.
193    ///
194    /// # Returns
195    /// A `Result` containing a new, initialized `Messaging` struct on success.
196    ///
197    /// # Errors
198    /// This function will return an error if:
199    /// - It fails to open the message queue with the specified `mq_filename`. This will
200    ///   be a `MessagingError::PosixMessageQueueOpeningError`.
201    /// - It fails to retrieve the attributes (like max message length) of the opened
202    ///   message queue. This will be a `MessagingError::PosixMessageQueueAttributesError`.
203    pub fn new(
204        config: MessagingConfig,
205        execution_config: ExecutionConfig,
206    ) -> Result<Messaging, MessagingError> {
207        let mq_file_name_clone_for_error = config.mq_filename.clone();
208
209        let mq = PosixMq::open(&config.mq_filename).map_err(|e| {
210            MessagingError::PosixMessageQueueOpeningError {
211                location: module_path!().to_string(),
212                mq_name: mq_file_name_clone_for_error,
213                source: e,
214            }
215        })?;
216
217        // identify maximum length of message queue
218
219        let mq_max_msg_len = match mq.attributes() {
220            Ok(c) => c.max_msg_len,
221            Err(e) => {
222                return Err(MessagingError::PosixMessageQueueAttributesError {
223                    location: module_path!().to_string(),
224                    source: e,
225                });
226            }
227        };
228        #[cfg(not(target_os = "linux"))]
229        let mq_max_msg_len = 0;
230
231        Ok(Messaging {
232            config,
233            lock_warn_inapplicable_command_signal_handler: false,
234            lock_error_channel_receive_termination: false,
235            mq,
236            mq_max_msg_len,
237            execution_config,
238        })
239    }
240
241    /// Continuously monitors the POSIX message queue for incoming commands and forwards them to the relevant modules.
242    ///
243    /// This function serves as the central message dispatching unit for the application on Linux systems.
244    /// It operates in a loop, attempting to receive messages from a configured POSIX message queue (`mq_filename`).
245    /// Upon receiving a message, it parses the message's content (domain, command, parameters) and
246    /// dispatches it as an `InternalCommand` to the appropriate module's channel.
247    ///
248    /// It also periodically checks for a `Quit` command from the signal handler to initiate a graceful shutdown
249    /// and then waits for a `Terminate` command before exiting.
250    ///
251    /// # Arguments
252    /// * `messaging_channels` - A mutable reference to a struct containing all channels for communication to other threads.
253    ///
254    /// # Panics
255    /// This function will panic if:
256    /// - It encounters a fatal error while receiving from the POSIX message queue (other than a timeout).
257    /// - It fails to receive the `Terminate` command from the signal handler during shutdown.
258    pub fn execute(&mut self, messaging_channels: &mut MessagingChannels) {
259        #[cfg(not(test))]
260        info!(target: module_path!(), "Thread started with TID: {}", gettid());
261        let duration_timeout = Duration::from_millis(self.config.timeout_millis);
262
263        let mut msgbuf = vec![0; self.mq_max_msg_len];
264        let mut msg = Message::new();
265
266        let sleep_duration = Duration::from_millis(10);
267        let spin_sleeper = SpinSleeper::default();
268
269        loop {
270            match self.mq.recv_timeout(&mut msgbuf, duration_timeout) {
271                Ok((_r1, _r2)) => {
272                    if !msgbuf.is_empty() {
273                        let mut domain_buf: [u8; 4] = [0; 4];
274                        domain_buf[0] = msgbuf[4];
275                        domain_buf[1] = msgbuf[5];
276                        domain_buf[2] = msgbuf[6];
277                        domain_buf[3] = msgbuf[7];
278                        msg.domain = i32::from_le_bytes(domain_buf);
279
280                        let mut command_buf: [u8; 4] = [0; 4];
281                        command_buf[0] = msgbuf[8];
282                        command_buf[1] = msgbuf[9];
283                        command_buf[2] = msgbuf[10];
284                        command_buf[3] = msgbuf[11];
285                        msg.command = i32::from_le_bytes(command_buf);
286
287                        let mut command_param1_buf: [u8; 4] = [0; 4];
288                        command_param1_buf[0] = msgbuf[12];
289                        command_param1_buf[1] = msgbuf[13];
290                        command_param1_buf[2] = msgbuf[14];
291                        command_param1_buf[3] = msgbuf[15];
292                        msg.command_param1 = i32::from_le_bytes(command_param1_buf);
293
294                        let mut command_param2_buf: [u8; 4] = [0; 4];
295                        command_param2_buf[0] = msgbuf[16];
296                        command_param2_buf[1] = msgbuf[17];
297                        command_param2_buf[2] = msgbuf[18];
298                        command_param2_buf[3] = msgbuf[19];
299                        msg.command_param2 = i32::from_le_bytes(command_param2_buf);
300
301                        #[cfg(not(test))]
302                        info!(
303                            target: module_path!(),
304                            "received message: domain={}, cmd={}, param1={}, param2={}",
305                            msg.domain,
306                            msg.command,
307                            msg.command_param1,
308                            msg.command_param2
309                        );
310
311                        // interpret the domain in a message
312                        let domain = MessagingDomain::from(msg.domain);
313                        if domain == MessagingDomain::Unknown {
314                            warn!(
315                                target: module_path!(),
316                                "ignoring message for unknown domain {}",
317                                msg.domain
318                            );
319                        }
320
321                        // interpret command in a message
322                        let command = InternalCommand::from_msg(&msg);
323                        if command == InternalCommand::Unknown {
324                            warn!(
325                                target: module_path!(),
326                                "Messaging: ignoring message with unknown command {}",
327                                msg.command
328                            );
329                        }
330
331                        // prepare and execute communication to domain
332                        if (domain != MessagingDomain::Unknown)
333                            && (command != InternalCommand::Unknown)
334                            && (domain.is_domain_thread_executed(&self.execution_config))
335                        {
336                            #[cfg(not(test))]
337                            info!(
338                                target: module_path!(),
339                                "forwarding message to domain {}",
340                                domain
341                            );
342
343                            match messaging_channels.send_command_to_domain(command, domain.clone())
344                            {
345                                Ok(_) => { /* do nothing */ }
346                                Err(e) => {
347                                    error!(
348                                        target: module_path!(),
349                                        "channel communication to domain {} failed ({e:?})",
350                                        domain
351                                    );
352                                }
353                            }
354                        }
355                    } else {
356                        error!(
357                            target: module_path!(),
358                            "message has incorrect length ({}). ignoring it.",
359                            msgbuf.len()
360                        );
361                    }
362                }
363                Err(e) => {
364                    match e.kind() {
365                        ErrorKind::TimedOut => { /* do nothing */ }
366                        _ => {
367                            error!(
368                                target: module_path!(),
369                                "Error occurred when reading POSIX message ({e:?})"
370                            );
371                        }
372                    }
373                }
374            }
375            let (quit_command_received, _, _) = self.process_external_request(
376                &mut messaging_channels.rx_messaging_from_signal_handler,
377                None,
378            );
379            if quit_command_received {
380                #[cfg(feature = "debug_messaging")]
381                debug!(
382                    target: module_path!(),
383                    "received QUIT command from signal handler"
384                );
385
386                break;
387            }
388
389            spin_sleeper.sleep(sleep_duration);
390        }
391
392        messaging_channels.acknowledge_signal_handler();
393
394        // This thread has channel connections to underlying threads.
395        // Those threads have to stop receiving commands from this thread.
396        // The shutdown sequence is handled by the signal_handler module.
397        self.wait_for_termination(
398            &mut messaging_channels.rx_messaging_from_signal_handler,
399            sleep_duration,
400            module_path!(),
401        );
402    }
403}
404
405#[cfg(test)]
406#[cfg(target_os = "linux")]
407pub mod tests {
408    use posixmq::PosixMq;
409    use spin_sleep::SpinSleeper;
410    use std::{thread, time::Duration};
411
412    use crate::dispatch::messaging::{Message, Messaging};
413    use crate::dispatch::messaging_channels::MessagingChannels;
414    use crate::dispatch::messaging_domain::MessagingDomain;
415    use crate::launch::channels::AquaSender;
416    use crate::launch::channels::Channels;
417    use crate::launch::execution_config::ExecutionConfig;
418    use crate::mocks::mock_message_command_receiver::tests::mock_message_command_receiver;
419    use crate::utilities::channel_content::InternalCommand;
420    use crate::utilities::config::{read_config_file, ConfigData};
421    use crate::utilities::signal_handler_channels::SignalHandlerChannels;
422
423    // Opens a POSIX message queue for test purposes.
424    //
425    // This helper function is exclusively used in test environments on Linux.
426    // It attempts to open a POSIX message queue specified by its `filename`.
427    // If the message queue cannot be opened, the function will panic,
428    // indicating a critical issue for the test setup.
429    //
430    // # Arguments
431    // * `filename` - The name of the POSIX message queue to open for testing.
432    //
433    // # Returns
434    // An opened `PosixMq` instance.
435    //
436    // # Panics
437    // This function will panic if it fails to open the specified POSIX message queue,
438    // as this is considered a critical failure for the test setup.
439    fn open_posixmq_test_file(filename: &str) -> PosixMq {
440        match PosixMq::open(filename) {
441            Ok(c) => c,
442            Err(e) => {
443                panic!(
444                    "{}: failure to open message queue for test using {}, error={e:?}",
445                    module_path!(),
446                    filename
447                );
448            }
449        }
450    }
451
452    // Creates a new thread to run the `Messaging` module's `execute` loop.
453    //
454    // This helper function is used in test environments on Linux to simulate the `Messaging`
455    // module running in its own dedicated thread. It sets up the necessary channel
456    // connections for communication with various components (like `Refill`, `Heating`, `Watchdog`, etc.).
457    //
458    // # Arguments
459    // * `messaging` - The `Messaging` instance to be executed in the new thread. It is moved into the thread.
460    // * `messaging_channels` - Struct containing the channels for messaging
461    //
462    // # Returns
463    // A `thread::JoinHandle<()>` which can be used to wait for the spawned thread to complete.
464    //
465    // # Panics
466    // This function will panic if the new thread cannot be spawned.
467    fn create_test_object(
468        mut messaging: Messaging,
469        mut messaging_channels: MessagingChannels,
470    ) -> thread::JoinHandle<()> {
471        thread::Builder::new()
472            .name("test_object".to_string())
473            .spawn(move || {
474                messaging.execute(&mut messaging_channels);
475            })
476            .unwrap()
477    }
478
479    // Creates a new thread to simulate external stimuli and commands for the `Messaging` module.
480    //
481    // This helper function is used in test environments on Linux. It simulates an
482    // external process sending commands to the `Messaging` module via a POSIX message queue
483    // and also sends shutdown commands to the `Messaging` module and a mock control object
484    // through internal channels.
485    //
486    // # Arguments
487    // * `msg_commands` - A vector of `Message` structs to be sent as stimuli to the `Messaging` module.
488    // * `tx_environment_to_object` - The sender channel for this test environment to send commands to a mock control object.
489    // * `tx_signal_handler_to_messaging` - The sender channel for this test environment to send commands to the `Messaging` module, simulating the signal handler.
490    // * `rx_signal_handler_from_messaging` - The receiver channel for this test environment to receive acknowledgments from the `Messaging` module.
491    // * `mq_filename` - The name of the POSIX message queue used for sending stimuli.
492    //
493    // # Returns
494    // A `thread::JoinHandle<()>` which can be used to wait for the spawned thread to complete.
495    //
496    // # Panics
497    // This function will panic if:
498    // - The new thread cannot be spawned.
499    fn create_test_environment(
500        msg_commands: Vec<Message>,
501        mut tx_environment_to_object: AquaSender<InternalCommand>,
502        signal_handler_channels: SignalHandlerChannels,
503        mq_filename: String,
504    ) -> thread::JoinHandle<()> {
505        thread::Builder::new()
506            .name("test_environment".to_string())
507            .spawn(move || {
508                let sleep_duration_100_millis = Duration::from_millis(100);
509                let spin_sleeper = SpinSleeper::default();
510
511                spin_sleeper.sleep(sleep_duration_100_millis);
512
513                let mq_stimuli = open_posixmq_test_file(&mq_filename);
514
515                for msg_command in msg_commands {
516                    // sending an IPC message requesting the command to the (sub-)test object
517                    let _ = mq_stimuli.send(0, &msg_command.create_message_buf());
518
519                    spin_sleeper.sleep(sleep_duration_100_millis);
520                }
521
522                // requesting mock control to quit
523                let _ = tx_environment_to_object.send(InternalCommand::Quit);
524
525                let mut tx_signal_handler_to_messaging = signal_handler_channels
526                    .tx_signal_handler_to_messaging_opt
527                    .unwrap();
528                let mut rx_signal_handler_from_messaging = signal_handler_channels
529                    .rx_signal_handler_from_messaging_opt
530                    .unwrap();
531
532                // requesting messaging to quit
533                let _ = tx_signal_handler_to_messaging.send(InternalCommand::Quit);
534
535                // waiting for confirmation from messaging
536                let _ = rx_signal_handler_from_messaging.recv();
537
538                let _ = tx_signal_handler_to_messaging.send(InternalCommand::Terminate);
539            })
540            .unwrap()
541    }
542
543    // Asserts that a specific sequence of "Stop" then "Start" commands was received.
544    //
545    // This helper function is used in test environments on Linux to validate the order
546    // and content of commands received by a test mock. It specifically checks that:
547    // 1. Exactly two commands were received.
548    // 2. The first command received was `InternalCommand::Stop`.
549    // 3. The second command received was `InternalCommand::Start`.
550    //
551    // # Arguments
552    // * `commands_received` - A `Vec<InternalCommand>` containing the commands that were
553    //   captured by a mock object in the order they were received. This vector is consumed.
554    //
555    // # Panics
556    // This function will panic if:
557    // - The `commands_received` vector does not contain exactly two elements.
558    // - The commands are not `InternalCommand::Stop` followed by `InternalCommand::Start` in that order.
559    fn assert_stop_start_sequence(mut commands_received: Vec<InternalCommand>) {
560        let spin_sleeper = SpinSleeper::default();
561        spin_sleeper.sleep(Duration::from_millis(100));
562
563        for command in &commands_received {
564            println!("assert_stop_start_sequence received command: {command:?}");
565        }
566        // assert the number of commands sent/received are identical
567        assert_eq!(2, commands_received.len());
568
569        // assert content of last message sent from test environment:
570        assert_eq!(InternalCommand::Start, commands_received.pop().unwrap());
571
572        // assert the content of the first message sent from the test environment:
573        assert_eq!(InternalCommand::Stop, commands_received.pop().unwrap());
574    }
575
576    // Test case checks if messages for ventilation control are triggering the corresponding channel communication
577    #[test]
578    pub fn test_messaging_ventilation() {
579        let mut config: ConfigData =
580            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
581        config.messaging.mq_filename = "/aquarium_control.5UXp".to_string();
582        let mq_filename_for_test_environment = config.messaging.mq_filename.clone();
583
584        let mut msg_start = Message::new();
585        msg_start.domain = MessagingDomain::Ventilation.into();
586        msg_start.command = InternalCommand::Start.to_numeric();
587
588        let mut msg_stop = Message::new();
589        msg_stop.domain = MessagingDomain::Ventilation.into();
590        msg_stop.command = InternalCommand::Stop.to_numeric();
591
592        let messaging = Messaging::new(config.messaging, ExecutionConfig::default()).unwrap();
593
594        let channels = Channels::new_for_test();
595
596        let tx_test_environment_to_ventilation =
597            channels.messaging.tx_messaging_to_ventilation.clone();
598
599        let join_handle_mock_ventilation = thread::Builder::new()
600            .name("mock_ventilation".to_string())
601            .spawn(move || {
602                assert_stop_start_sequence(mock_message_command_receiver(
603                    channels
604                        .ventilation
605                        .rx_ventilation_from_messaging_opt
606                        .unwrap(),
607                ));
608            })
609            .unwrap();
610
611        // thread for controlling duration of test run
612        let join_handle_test_environment = create_test_environment(
613            vec![msg_stop, msg_start], // first stop, then start ventilation
614            tx_test_environment_to_ventilation,
615            channels.signal_handler,
616            mq_filename_for_test_environment,
617        );
618
619        // thread for the test object
620        let join_handle_test_object = create_test_object(messaging, channels.messaging);
621
622        join_handle_mock_ventilation
623            .join()
624            .expect("Mock ventilation thread did not finish.");
625        join_handle_test_environment
626            .join()
627            .expect("Test environment thread did not finish.");
628        join_handle_test_object
629            .join()
630            .expect("Test object thread did not finish.");
631    }
632
633    // Test case checks if messages for heating control are triggering the corresponding channel communication
634    #[test]
635    pub fn test_messaging_heating() {
636        let mut config: ConfigData =
637            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
638        config.messaging.mq_filename = "/aquarium_control.6UXp".to_string();
639        let mq_filename_for_test_environment = config.messaging.mq_filename.clone();
640
641        let mut msg_start = Message::new();
642        msg_start.domain = MessagingDomain::Heating.into();
643        msg_start.command = InternalCommand::Start.to_numeric();
644
645        let mut msg_stop = Message::new();
646        msg_stop.domain = MessagingDomain::Heating.into();
647        msg_stop.command = InternalCommand::Stop.to_numeric();
648
649        let messaging = Messaging::new(config.messaging, ExecutionConfig::default()).unwrap();
650
651        let channels = Channels::new_for_test();
652
653        let tx_test_environment_to_heating = channels.messaging.tx_messaging_to_heating.clone();
654
655        let join_handle_mock_heating = thread::Builder::new()
656            .name("mock_heating".to_string())
657            .spawn(move || {
658                assert_stop_start_sequence(mock_message_command_receiver(
659                    channels.heating.rx_heating_from_messaging_opt.unwrap(),
660                ));
661            })
662            .unwrap();
663
664        // thread for controlling duration of test run
665        let join_handle_test_environment = create_test_environment(
666            vec![msg_stop, msg_start], // first stop, then start heating
667            tx_test_environment_to_heating,
668            channels.signal_handler,
669            mq_filename_for_test_environment,
670        );
671
672        // thread for the test object
673        let join_handle_test_object = create_test_object(messaging, channels.messaging);
674
675        join_handle_mock_heating
676            .join()
677            .expect("Mock heating thread did not finish.");
678        join_handle_test_environment
679            .join()
680            .expect("Test environment thread did not finish.");
681        join_handle_test_object
682            .join()
683            .expect("Test object thread did not finish.");
684    }
685
686    // Test case checks if messages for refill control are triggering the corresponding channel communication
687    #[test]
688    pub fn test_messaging_refill() {
689        let mut config: ConfigData =
690            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
691        config.messaging.mq_filename = "/aquarium_control.7UXp".to_string();
692        let mq_file_name_for_test_environment = config.messaging.mq_filename.clone();
693
694        let mut msg_reset = Message::new();
695        msg_reset.domain = MessagingDomain::Refill.into();
696        msg_reset.command = InternalCommand::ResetAllErrors.to_numeric();
697
698        let mut msg_start = Message::new();
699        msg_start.domain = MessagingDomain::Refill.into();
700        msg_start.command = InternalCommand::Start.to_numeric();
701
702        let mut msg_stop = Message::new();
703        msg_stop.domain = MessagingDomain::Refill.into();
704        msg_stop.command = InternalCommand::Stop.to_numeric();
705
706        let messaging = Messaging::new(config.messaging, ExecutionConfig::default()).unwrap();
707
708        let channels = Channels::new_for_test();
709
710        let tx_test_environment_to_refill = channels.messaging.tx_messaging_to_refill.clone();
711
712        let join_handle_mock_refill = thread::Builder::new()
713            .name("mock_refill".to_string())
714            .spawn(move || {
715                let mut commands_received = mock_message_command_receiver(
716                    channels.refill.rx_refill_from_messaging_opt.unwrap(),
717                );
718
719                // assert the number of commands sent/received are identical
720                assert_eq!(3, commands_received.len());
721
722                // assert the content of the last message sent from test environment:
723                assert_eq!(InternalCommand::Start, commands_received.pop().unwrap());
724
725                // assert the content of the second message sent from the test environment:
726                assert_eq!(InternalCommand::Stop, commands_received.pop().unwrap());
727
728                // assert the content of the first message sent from the test environment:
729                assert_eq!(
730                    InternalCommand::ResetAllErrors,
731                    commands_received.pop().unwrap()
732                );
733            })
734            .unwrap();
735
736        // thread for controlling duration of test run
737        // thread for controlling duration of test run
738        let join_handle_test_environment = create_test_environment(
739            vec![msg_reset, msg_stop, msg_start], // first reset errors, then stop refill, then start refill
740            tx_test_environment_to_refill,
741            channels.signal_handler,
742            mq_file_name_for_test_environment,
743        );
744
745        // thread for the test object
746        let join_handle_test_object = create_test_object(messaging, channels.messaging);
747
748        join_handle_mock_refill
749            .join()
750            .expect("Mock refill thread did not finish.");
751        join_handle_test_environment
752            .join()
753            .expect("Test environment thread did not finish.");
754        join_handle_test_object
755            .join()
756            .expect("Test object thread did not finish.");
757    }
758
759    // Test case checks if messages for feed control are triggering the corresponding channel communication
760    #[test]
761    pub fn test_messaging_feed() {
762        let mut config: ConfigData =
763            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
764        config.messaging.mq_filename = "/aquarium_control.8UXp".to_string();
765        let mq_filename_for_test_environment = config.messaging.mq_filename.clone();
766
767        let mut msg_exec = Message::new();
768        msg_exec.domain = MessagingDomain::Feed.into();
769        msg_exec.command = InternalCommand::Execute(13).to_numeric();
770        msg_exec.command_param1 = 13;
771
772        let mut msg_start = Message::new();
773        msg_start.domain = MessagingDomain::Feed.into();
774        msg_start.command = InternalCommand::Start.to_numeric();
775
776        let mut msg_stop = Message::new();
777        msg_stop.domain = MessagingDomain::Feed.into();
778        msg_stop.command = InternalCommand::Stop.to_numeric();
779
780        let messaging = Messaging::new(config.messaging, ExecutionConfig::default()).unwrap();
781
782        let channels = Channels::new_for_test();
783
784        let tx_test_environment_to_feed = channels.messaging.tx_messaging_to_feed.clone();
785
786        let join_handle_mock_feed = thread::Builder::new()
787            .name("mock_feed".to_string())
788            .spawn(move || {
789                let mut commands_received = mock_message_command_receiver(
790                    channels.feed.rx_feed_from_messaging_opt.unwrap(),
791                );
792
793                // assert the number of commands sent/received are identical
794                assert_eq!(3, commands_received.len());
795
796                // assert content of last message sent from test environment:
797                assert_eq!(InternalCommand::Start, commands_received.pop().unwrap());
798
799                // assert the content of the second message sent from the test environment:
800                assert_eq!(InternalCommand::Stop, commands_received.pop().unwrap());
801
802                // assert the content of the first message sent from the test environment:
803                assert_eq!(
804                    InternalCommand::Execute(13),
805                    commands_received.pop().unwrap()
806                );
807            })
808            .unwrap();
809
810        // thread for controlling duration of test run
811        let join_handle_test_environment = create_test_environment(
812            vec![msg_exec, msg_stop, msg_start], // first execute a feed profile, then stop feed, then start feed
813            tx_test_environment_to_feed,
814            channels.signal_handler,
815            mq_filename_for_test_environment,
816        );
817
818        // thread for the test object
819        let join_handle_test_object = create_test_object(messaging, channels.messaging);
820
821        join_handle_mock_feed
822            .join()
823            .expect("Mock feed thread did not finish.");
824        join_handle_test_environment
825            .join()
826            .expect("Test environment thread did not finish.");
827        join_handle_test_object
828            .join()
829            .expect("Test object thread did not finish.");
830    }
831
832    // Test case checks if messages for Balling control are triggering the corresponding channel communication
833    #[test]
834    pub fn test_messaging_balling() {
835        let mut config: ConfigData =
836            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
837        config.messaging.mq_filename = "/aquarium_control.9UXp".to_string();
838        let mq_filename_for_test_environment = config.messaging.mq_filename.clone();
839
840        open_posixmq_test_file(&config.messaging.mq_filename);
841
842        let mut msg_start = Message::new();
843        msg_start.domain = MessagingDomain::Balling.into();
844        msg_start.command = InternalCommand::Start.to_numeric();
845
846        let mut msg_stop = Message::new();
847        msg_stop.domain = MessagingDomain::Balling.into();
848        msg_stop.command = InternalCommand::Stop.to_numeric();
849
850        let messaging = Messaging::new(config.messaging, ExecutionConfig::default()).unwrap();
851
852        let channels = Channels::new_for_test();
853
854        let tx_test_environment_to_balling = channels.messaging.tx_messaging_to_balling.clone();
855
856        let join_handle_mock_balling = thread::Builder::new()
857            .name("mock_balling".to_string())
858            .spawn(move || {
859                assert_stop_start_sequence(mock_message_command_receiver(
860                    channels.balling.rx_balling_from_messaging_opt.unwrap(),
861                ));
862            })
863            .unwrap();
864
865        // thread for controlling duration of test run
866        let join_handle_test_environment = create_test_environment(
867            vec![msg_stop, msg_start], // first stop, then start Balling dosing control
868            tx_test_environment_to_balling,
869            channels.signal_handler,
870            mq_filename_for_test_environment,
871        );
872
873        // thread for the test object
874        let join_handle_test_object = create_test_object(messaging, channels.messaging);
875
876        join_handle_mock_balling
877            .join()
878            .expect("Mock Balling thread did not finish.");
879        join_handle_test_environment
880            .join()
881            .expect("Test environment thread did not finish.");
882        join_handle_test_object
883            .join()
884            .expect("Test object thread did not finish.");
885    }
886
887    // Test case checks if messages for Watchdog control are triggering the corresponding channel communication
888    #[test]
889    pub fn test_messaging_watchdog() {
890        let mut config: ConfigData =
891            read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
892        config.messaging.mq_filename = "/aquarium_control.9UXp".to_string();
893        let mq_filename_for_test_environment = config.messaging.mq_filename.clone();
894
895        let mut msg_start = Message::new();
896        msg_start.domain = MessagingDomain::Watchdog.into();
897        msg_start.command = InternalCommand::Start.to_numeric();
898
899        let mut msg_stop = Message::new();
900        msg_stop.domain = MessagingDomain::Watchdog.into();
901        msg_stop.command = InternalCommand::Stop.to_numeric();
902
903        let messaging = Messaging::new(config.messaging, ExecutionConfig::default()).unwrap();
904
905        let channels = Channels::new_for_test();
906
907        let tx_test_environment_to_watchdog = channels.messaging.tx_messaging_to_watchdog.clone();
908
909        let join_handle_mock_watchdog = thread::Builder::new()
910            .name("mock_watchdog".to_string())
911            .spawn(move || {
912                assert_stop_start_sequence(mock_message_command_receiver(
913                    channels.watchdog.rx_watchdog_from_messaging_opt.unwrap(),
914                ));
915            })
916            .unwrap();
917
918        // thread for controlling duration of test run
919        let join_handle_test_environment = create_test_environment(
920            vec![msg_stop, msg_start], // first stop, then start Watchdog
921            tx_test_environment_to_watchdog,
922            channels.signal_handler,
923            mq_filename_for_test_environment,
924        );
925
926        // thread for the test object
927        let join_handle_test_object = create_test_object(messaging, channels.messaging);
928
929        join_handle_mock_watchdog
930            .join()
931            .expect("Mock Balling thread did not finish.");
932        join_handle_test_environment
933            .join()
934            .expect("Test environment thread did not finish.");
935        join_handle_test_object
936            .join()
937            .expect("Test object thread did not finish.");
938    }
939}