aquarium_control/watchmen/
monitors.rs

1/* Copyright 2024 Uwe Martin
2
3Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
4
5The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
6
7THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
8*/
9
10use log::{error, info};
11
12#[cfg(any(feature = "debug_channels", test))]
13use log::debug;
14
15#[cfg(all(not(test), target_os = "linux"))]
16use nix::unistd::gettid;
17
18use crate::launch::channels::AquaChannelError;
19use crate::utilities::acknowledge_signal_handler::AcknowledgeSignalHandlerTrait;
20use crate::utilities::proc_ext_req::ProcessExternalRequestTrait;
21use crate::watchmen::monitors_channels::MonitorsChannels;
22use crate::watchmen::monitors_config::MonitorsConfig;
23use crate::water::refill_monitor_view::RefillMonitorView;
24use spin_sleep::SpinSleeper;
25use std::sync::{Arc, Mutex};
26use std::time::Duration;
27
28#[cfg_attr(doc, aquamarine::aquamarine)]
29/// Contains the configuration and the implementation for the monitors.
30/// Thread communication of this component is as follows:
31/// ```mermaid
32/// graph LR
33///     monitors[Monitors] --> relay_manager[Relay Manager]
34///     relay_manager --> monitors
35///     monitors --> signal_handler[Signal handler]
36///     signal_handler --> monitors
37///     messaging[Messaging] --> monitors
38/// ```
39pub struct Monitors {
40    /// configuration data for monitors
41    config: MonitorsConfig,
42
43    /// inhibition flag to avoid flooding the log file with repeated messages about monitors being inhibited
44    lock_info_monitors_inhibited: bool,
45
46    /// inhibition flag to avoid flooding the log file with repeated messages about failure to receive from refill
47    lock_error_receive_refill: bool,
48
49    /// storage of refill monitor views
50    refill_monitor_views: Vec<RefillMonitorView>,
51}
52
53impl ProcessExternalRequestTrait for Monitors {}
54
55impl Monitors {
56    /// Creates a new `Monitors` instance.
57    ///
58    /// This constructor initializes the monitoring module with its specified
59    /// configuration. It sets up internal "lock" flags to `false` by default;
60    /// these flags are used to prevent repetitive error and warning messages
61    /// from flooding the log during operation.
62    ///
63    /// # Arguments
64    /// * `config` - **Configuration data** for the monitors, loaded from a TOML file.
65    ///   This includes parameters such as whether the monitors are `active`.
66    ///
67    /// # Returns
68    /// A new **`Monitors` struct**, ready to perform its monitoring tasks.
69    pub fn new(config: MonitorsConfig) -> Monitors {
70        Self {
71            config,
72            lock_info_monitors_inhibited: false,
73            lock_error_receive_refill: false,
74            refill_monitor_views: vec![],
75        }
76    }
77    /// Manages a vector of `RefillMonitorView` structs, ensuring it does not exceed
78    /// a specified maximum length.
79    ///
80    /// If the vector has reached its maximum capacity, the oldest element (at index 0)
81    /// is removed before the new `refill_monitor_view` is appended.
82    /// Otherwise, the new `refill_monitor_view` is simply appended.
83    ///
84    /// # Arguments
85    /// * `refill_monitor_view` - The new `RefillMonitorView` struct to be added to the vector.
86    pub fn append_refill_monitor_view(&mut self, refill_monitor_view: RefillMonitorView) {
87        // Check if the vector has reached or exceeded its maximum allowed length.
88        if self.refill_monitor_views.len() >= self.config.max_length_refill_monitor_views {
89            #[cfg(feature = "debug_channels")]
90            debug!(target: module_path!(), "CAUTION: Reached size limit for monitor views.");
91
92            // If it has, remove the oldest element (at index 0).
93            // This shifts all further elements to the left.
94            self.refill_monitor_views.remove(0);
95        }
96        // Append the new RefillMonitorView struct to the end of the vector.
97        self.refill_monitor_views.push(refill_monitor_view);
98    }
99
100    /// Executes the main control loop for the monitor module.
101    ///
102    /// This function runs continuously, performing periodic monitoring tasks when active.
103    /// It responds to external `Start` and `Stop` commands to enable or inhibit monitoring
104    /// activities. The loop also continuously checks for `Quit` and `Terminate` commands
105    /// from the signal handler to ensure a graceful application shutdown.
106    ///
107    /// Monitoring activities (represented by debug logs) are performed at a rate of
108    /// approximately every 10 seconds, controlled by the `loop_counter` and `sleep_duration`.
109    ///
110    /// # Arguments
111    /// * `_mutex_device_scheduler_monitors` - (Ignored) A clone of a mutex for device scheduling.
112    ///   This argument is part of a common signature but is not used by this module's logic.
113    /// * `monitors_channels` - A mutable reference to the struct containing the channels.
114    pub fn execute(
115        &mut self,
116        _mutex_device_scheduler_monitors: Arc<Mutex<i32>>,
117        monitors_channels: &mut MonitorsChannels,
118    ) {
119        #[cfg(all(target_os = "linux", not(test)))]
120        info!(target: module_path!(), "Thread started with TID: {}", gettid());
121
122        let sleep_duration_hundred_millis = Duration::from_millis(100);
123        let spin_sleeper = SpinSleeper::default();
124        let mut quit_command_received: bool;
125        let mut start_command_received: bool;
126        let mut stop_command_received: bool;
127        let mut monitors_inhibited: bool = false;
128        let mut old_refill_view_opt: Option<RefillMonitorView> = None;
129
130        loop {
131            if self.config.active {
132                if monitors_inhibited && !self.lock_info_monitors_inhibited {
133                    info!(
134                        target: module_path!(),
135                        "monitors are inhibited after receiving stop command via message.");
136                    self.lock_info_monitors_inhibited = true;
137                } else {
138                    self.lock_info_monitors_inhibited = false;
139                    #[cfg(test)]
140                    debug!(
141                        target: module_path!(),
142                        "executing monitors"
143                    );
144                }
145
146                match monitors_channels.receive_from_refill() {
147                    Ok(new_refill_view) => match old_refill_view_opt {
148                        Some(ref old_refill_view) => {
149                            if &new_refill_view != old_refill_view {
150                                self.append_refill_monitor_view(new_refill_view.clone());
151                                old_refill_view_opt = Some(new_refill_view);
152                            }
153                        }
154                        None => {
155                            self.append_refill_monitor_view(new_refill_view.clone());
156                            old_refill_view_opt = Some(new_refill_view);
157                        }
158                    },
159                    Err(e) => {
160                        match e {
161                            #[cfg(feature = "debug_channels")]
162                            AquaChannelError::Full => { /* only applicable to send - no action required */
163                            }
164                            AquaChannelError::Empty => { /* empty buffer - no action required */ }
165                            AquaChannelError::Disconnected => {
166                                if !self.lock_error_receive_refill {
167                                    error!(target: module_path!(), "Error when trying to receive from refill: {e}");
168                                    self.lock_error_receive_refill = true;
169                                }
170                            }
171                        }
172                    }
173                };
174            }
175            spin_sleeper.sleep(sleep_duration_hundred_millis);
176
177            (
178                quit_command_received,
179                start_command_received,
180                stop_command_received,
181            ) = self.process_external_request(
182                &mut monitors_channels.rx_monitors_from_signal_handler,
183                monitors_channels.rx_monitors_from_messaging_opt.as_mut(),
184            );
185            if quit_command_received {
186                break;
187            }
188            if stop_command_received {
189                info!(
190                    target: module_path!(),
191                    "received Stop command. Inhibiting monitoring."
192                );
193                monitors_inhibited = true;
194            }
195            if start_command_received {
196                info!(
197                    target: module_path!(),
198                    "received Start command. Restarting monitoring."
199                );
200                monitors_inhibited = false;
201            }
202        }
203
204        monitors_channels.acknowledge_signal_handler();
205    }
206}