aquarium_control/dispatch/
messaging_channels.rs

1/* Copyright 2025 Uwe Martin
2
3Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
4
5The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
6
7THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
8*/
9
10//! A central container for all inter-thread communication channels used by the `Messaging` module.
11//!
12//! This module defines the `MessagingChannels` struct, which acts as a "switchboard" or
13//! router for commands received from an external source. Its primary purpose is to hold all
14//! the `mpsc::channel` senders that the main `Messaging` thread needs to dispatch commands
15//! to the various application domains (e.g., `Refill`, `Heating`, `Feed`).
16//!
17//! This entire module is conditionally compiled and is only available on `target_os = "linux"`
18//! (or during tests), as it is a core part of the POSIX message queue IPC mechanism.
19//!
20//! ## Key Components
21//!
22//! - **`MessagingChannels` Struct**: A simple container struct that aggregates all the
23//!   `AquaSender` and `AquaReceiver` endpoints required by the `Messaging` thread.
24//!
25//! - **`send_command_to_domain()` Method**: The core logic of this module. It takes an
26//!   `InternalCommand` and a `MessagingDomain` and uses a `match` statement to select
27//!   the correct channel sender, effectively routing the command to the intended thread.
28//!
29//! ## Design and Architecture
30//!
31//! The `MessagingChannels` struct is a key part of the application's inter-thread
32//! communication strategy, promoting clean and decoupled code.
33//!
34//! - **Centralization**: By gathering all necessary channels into a single struct, it
35//!   provides a clear and explicit declaration of the `Messaging` thread's communication
36//!   dependencies. This makes the overall architecture easier to reason about.
37//!
38//! - **Decoupling**: The `Messaging` thread itself doesn't need to know about every
39//!   channel. It just needs this struct, which it can use to dispatch commands.
40//!   This separates the dispatch logic from the communication infrastructure.
41//!
42//! - **Testability**: The struct's channels can be used by mock implementation during
43//!   unit testing, allowing for isolated testing of the `Feed` thread's logic without
44//!   needing to run the entire application.
45
46#[cfg(any(test, target_os = "linux"))]
47use crate::launch::channels::AquaReceiver;
48#[allow(unused)]
49use crate::launch::channels::AquaSender;
50#[cfg(any(test, target_os = "linux"))]
51use crate::utilities::channel_content::InternalCommand;
52
53cfg_if::cfg_if! {
54    if #[cfg(target_os = "linux")] {
55        use std::fmt;
56        use crate::dispatch::messaging_domain::MessagingDomain;
57        use crate::dispatch::messaging_error::MessagingError;
58        use crate::launch::channels::AquaChannelError;
59    }
60}
61
62#[cfg(any(test, target_os = "linux"))]
63/// Collects all channels used by the `Messaging` module for IPC.
64///
65/// This struct acts as a central container for all the communication endpoints
66/// that the `Messaging` module uses to dispatch commands received via a POSIX
67/// message queue to the appropriate application threads (domains). It also
68/// includes channels for communicating with the `SignalHandler` for graceful shutdown.
69///
70/// This entire struct is conditionally compiled and is only available on Linux
71/// systems, as POSIX message queues are a platform-specific feature.
72pub struct MessagingChannels {
73    /// Sender for sending acknowledgments back to the `SignalHandler`.
74    pub tx_messaging_to_signal_handler: AquaSender<bool>,
75
76    /// Receiver for commands from the `SignalHandler` (e.g., `Quit`, `Terminate`).
77    pub rx_messaging_from_signal_handler: AquaReceiver<InternalCommand>,
78
79    /// Sender for dispatching commands to the `Refill` module.
80    pub tx_messaging_to_refill: AquaSender<InternalCommand>,
81
82    /// Sender for dispatching commands to the `Ventilation` module.
83    pub tx_messaging_to_ventilation: AquaSender<InternalCommand>,
84
85    /// Sender for dispatching commands to the `Heating` module.
86    pub tx_messaging_to_heating: AquaSender<InternalCommand>,
87
88    /// Sender for dispatching commands to the `Feed` module.
89    pub tx_messaging_to_feed: AquaSender<InternalCommand>,
90
91    /// Sender for dispatching commands to the `Balling` module.
92    pub tx_messaging_to_balling: AquaSender<InternalCommand>,
93
94    #[allow(unused)]
95    /// Sender for dispatching commands to the `Monitors` module.
96    pub tx_messaging_to_monitors: AquaSender<InternalCommand>,
97
98    #[allow(unused)]
99    /// Sender for dispatching commands to the `Watchdog` module.
100    pub tx_messaging_to_watchdog: AquaSender<InternalCommand>,
101}
102
103#[cfg(target_os = "linux")]
104impl MessagingChannels {
105    /// Dispatches a command to the appropriate application domain (thread).
106    ///
107    /// This function acts as a router, taking a generic `InternalCommand` and a
108    /// `MessagingDomain` and sending the command to the correct channel based on
109    /// the specified domain. It also handles the conditional incrementing of debug
110    /// counters when the `debug_channels` feature is enabled.
111    ///
112    /// # Arguments
113    /// * `command` - The `InternalCommand` (e.g., `Start`, `Stop`) to be sent.
114    /// * `domain` - The target `MessagingDomain` (e.g., `Refill`, `Heating`) that
115    ///   should receive the command.
116    ///
117    /// # Returns
118    /// An empty `Result` (`Ok(())`) if the command was successfully sent.
119    ///
120    /// # Errors
121    /// Returns a `MessagingError` if:
122    /// - The `domain` is `MessagingDomain::Unknown`.
123    /// - The target channel is disconnected (i.e., the receiving thread has terminated),
124    ///   wrapped in `MessagingError::ChannelSendError`.
125    pub fn send_command_to_domain(
126        &mut self,
127        command: InternalCommand,
128        domain: MessagingDomain,
129    ) -> Result<(), MessagingError> {
130        // Select the correct sender channel based on the domain.
131        let tx_messaging_to_domain = match domain {
132            MessagingDomain::Refill => &mut self.tx_messaging_to_refill,
133            MessagingDomain::Ventilation => &mut self.tx_messaging_to_ventilation,
134            MessagingDomain::Feed => &mut self.tx_messaging_to_feed,
135            MessagingDomain::Heating => &mut self.tx_messaging_to_heating,
136            MessagingDomain::Balling => &mut self.tx_messaging_to_balling,
137            MessagingDomain::Monitors => &mut self.tx_messaging_to_monitors,
138            MessagingDomain::Watchdog => &mut self.tx_messaging_to_watchdog,
139            _ => {
140                return Err(MessagingError::UnknownDomain(
141                    module_path!().to_string(),
142                    domain as i32,
143                ));
144            }
145        };
146
147        // Send the command and capture the result.
148        tx_messaging_to_domain
149            .send(command)
150            .map_err(|e| MessagingError::ChannelSendError {
151                location: module_path!().to_string(),
152                source: e,
153            })?;
154
155        Ok(())
156    }
157
158    /// Sends an acknowledgment to the signal handler.
159    pub fn send_to_signal_handler(&mut self, ack: bool) -> Result<(), AquaChannelError> {
160        self.tx_messaging_to_signal_handler.send(ack)
161    }
162}
163
164#[cfg(all(feature = "debug_channels", target_os = "linux"))]
165impl fmt::Display for MessagingChannels {
166    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
167        writeln!(f, "=== MessagingChannels ===")?;
168        writeln!(
169            f,
170            "tx_messaging_to_signal_handler: {}",
171            self.tx_messaging_to_signal_handler.count
172        )?;
173        writeln!(
174            f,
175            "rx_messaging_from_signal_handler: {}",
176            self.rx_messaging_from_signal_handler.count
177        )?;
178        writeln!(
179            f,
180            "tx_messaging_to_refill: {}",
181            self.tx_messaging_to_refill.count
182        )?;
183        writeln!(
184            f,
185            "tx_messaging_to_ventilation: {}",
186            self.tx_messaging_to_ventilation.count
187        )?;
188        writeln!(
189            f,
190            "tx_messaging_to_heating: {}",
191            self.tx_messaging_to_heating.count
192        )?;
193        writeln!(
194            f,
195            "tx_messaging_to_feed: {}",
196            self.tx_messaging_to_feed.count
197        )?;
198        writeln!(
199            f,
200            "tx_messaging_to_balling: {}",
201            self.tx_messaging_to_balling.count
202        )?;
203        writeln!(
204            f,
205            "tx_messaging_to_monitors: {}",
206            self.tx_messaging_to_monitors.count
207        )?;
208        write!(
209            f,
210            "tx_messaging_to_watchdog: {}",
211            self.tx_messaging_to_watchdog.count
212        )
213    }
214}
215
216#[cfg(all(not(feature = "debug_channels"), target_os = "linux"))]
217impl fmt::Display for MessagingChannels {
218    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219        write!(
220            f,
221            "Channel counters are not active. Use --features \"debug_channels\" to enable them."
222        )
223    }
224}