aquarium_control/watchmen/
memory.rs

1/* Copyright 2025 Uwe Martin
2
3Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
4
5The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
6
7THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
8*/
9//! Manages memory monitoring and allocator configuration checks for the application.
10//!
11//! This module provides a `Memory` struct designed to run as a dedicated thread.
12//! It monitors the application's memory footprint over time and issues a warning if thresholds are exceeded.
13//!
14//! Its primary responsibilities are:
15//! 1.  **Pre-flight checks**: Check validity of configuration and environment variables.
16//! 1.  **Resident memory monitoring**: It periodically checks the process's resident memory,
17//!     as informed by the allocator - conditionally compiled for feature 'jemalloc'.
18//! 2.  **Allocated memory monitoring **: It also checks the allocated memory,
19//!     as informed by the allocator - conditionally compiled for feature 'jemalloc'.
20//!
21//! The monitoring loop is controlled via channels and can be terminated cleanly on application shutdown.
22
23use crate::utilities::proc_ext_req::ProcessExternalRequestTrait;
24use crate::watchmen::memory_channels::MemoryChannels;
25use crate::watchmen::memory_config::MemoryConfig;
26cfg_if::cfg_if! {
27    if #[cfg(feature = "jemalloc")] {
28        use log::{error, info};
29        use tikv_jemalloc_ctl::{epoch, stats};
30        #[cfg(not(test))]
31        use log::warn;
32    }
33}
34use crate::watchmen::memory_config_check::{memory_config_check, MemoryConfigError};
35#[cfg(target_os = "linux")]
36use log::info;
37use spin_sleep::SpinSleeper;
38use std::time::{Duration, Instant};
39
40#[cfg(all(target_os = "linux", not(test)))]
41use nix::unistd::gettid;
42
43/// The main struct for the memory monitoring thread.
44///
45/// An instance of this struct contains the configuration and the state required
46/// to perform periodic memory checks.
47pub struct Memory {
48    /// configuration read from .toml file
49    config: MemoryConfig,
50
51    #[cfg(feature = "jemalloc")]
52    /// Lock flag avoiding flooding of logfile with repeated messages about resident memory usage
53    lock_warn_resident_memory_exceeded: bool,
54
55    #[cfg(feature = "jemalloc")]
56    /// Lock flag avoiding flooding of logfile with repeated messages about allocated memory usage
57    lock_warn_allocated_memory_exceeded: bool,
58}
59
60const CYCLE_TIME_HEATING_MEMORY: u64 = 250;
61
62impl Memory {
63    /// Creates a new `Memory` instance.
64    ///
65    /// Stores the configuration and initializes the lock flags.
66    ///
67    /// # Arguments
68    /// * `config` - A reference to the global `ConfigData` containing the memory configuration.
69    ///
70    /// # Returns
71    /// A new `Memory` instance wrapped in a Result type ready to be executed in its own thread.
72    pub fn new(config: MemoryConfig) -> Result<Memory, MemoryConfigError> {
73        // execute the configuration check
74        if config.active {
75            // check if the memory configuration is valid
76            match memory_config_check(&config) {
77                Ok(_env_var_name_value_tuple_opt) => {
78                    #[cfg(target_os = "linux")]
79                    match _env_var_name_value_tuple_opt {
80                        Some((env_var_name, value)) => {
81                            if config.active {
82                                info!("Memory configuration check: {env_var_name} is configured to {value}");
83                            }
84                        }
85                        None => { /* do nothing, configuration check may be disabled */ }
86                    }
87                }
88                Err(e) => {
89                    return Err(e);
90                }
91            }
92        }
93
94        Ok(Self {
95            config,
96            #[cfg(feature = "jemalloc")]
97            lock_warn_allocated_memory_exceeded: false,
98            #[cfg(feature = "jemalloc")]
99            lock_warn_resident_memory_exceeded: false,
100        })
101    }
102
103    #[cfg(feature = "jemalloc")]
104    /// Checks the provided memory usage values against configured limits and updates lock flags.
105    ///
106    /// This function contains the core logic for memory threshold checking, making it
107    /// easy to test without depending on live system calls.
108    fn check_memory_limits(&mut self, allocated: usize, resident: usize) {
109        if allocated > self.config.max_allocated {
110            if !self.lock_warn_allocated_memory_exceeded {
111                #[cfg(not(test))]
112                {
113                    let warning_message = format!(
114                        "Currently allocated memory ({}) exceeds threshold ({})",
115                        allocated, self.config.max_allocated
116                    );
117                    warn!(target: module_path!(), "{}", warning_message);
118                }
119                self.lock_warn_allocated_memory_exceeded = true;
120            }
121        } else {
122            // Reset the lock when usage is back to normal
123            self.lock_warn_allocated_memory_exceeded = false;
124        }
125
126        if resident > self.config.max_resident {
127            if !self.lock_warn_resident_memory_exceeded {
128                #[cfg(not(test))]
129                {
130                    let warning_message = format!(
131                        "Currently resident memory ({}) exceeds threshold ({})",
132                        resident, self.config.max_resident
133                    );
134                    warn!(target: module_path!(), "{}", warning_message);
135                }
136                self.lock_warn_resident_memory_exceeded = true;
137            }
138        } else {
139            // Reset the lock when usage is back to normal
140            self.lock_warn_resident_memory_exceeded = false;
141        }
142    }
143
144    /// Starts the memory monitoring loop.
145    ///
146    /// 1.  Checks for a `Quit` command from the signal handler to terminate gracefully.
147    /// 2.  Checks the current resident and allocated memory usage against configured limits.
148    /// 3.  Logs a warning if any limit is exceeded.
149    /// 4.  Sleeps for the configured interval before the next check.
150    ///
151    /// # Arguments
152    /// * `memory_channels` - A mutable reference to the channels used for receiving shutdown commands.
153    pub fn execute(&mut self, memory_channels: &mut MemoryChannels) {
154        #[cfg(all(target_os = "linux", not(test)))]
155        info!(target: module_path!(), "Thread started with TID: {}", gettid());
156
157        let sleep_time = Duration::from_millis(CYCLE_TIME_HEATING_MEMORY);
158        let check_interval = Duration::from_secs(self.config.check_interval);
159        let mut last_check_instant = Instant::now() - check_interval;
160        let spin_sleeper = SpinSleeper::default();
161
162        loop {
163            // Check for a quit command before sleeping to ensure a responsive shutdown.
164            let (quit_command_received, _, _) = self
165                .process_external_request(&mut memory_channels.rx_memory_from_signal_handler, None);
166            if quit_command_received {
167                break;
168            }
169            spin_sleeper.sleep(sleep_time);
170
171            if last_check_instant.elapsed() > check_interval {
172                #[cfg(feature = "jemalloc")]
173                {
174                    match epoch::advance() {
175                        Ok(_) => {
176                            let allocated = match tikv_jemalloc_ctl::stats::allocated::read() {
177                                Ok(allocated) => allocated,
178                                Err(_) => {
179                                    error!(target: module_path!(), "Could not read allocated memory");
180                                    continue;
181                                }
182                            };
183                            let resident = match stats::resident::read() {
184                                Ok(resident) => resident,
185                                Err(_) => {
186                                    error!(target: module_path!(), "Could not read resident memory");
187                                    continue;
188                                }
189                            };
190                            info!(target: module_path!(), "Jemalloc: allocated = {allocated}, resident = {resident}");
191                            self.check_memory_limits(allocated, resident);
192                        }
193                        Err(_) => {
194                            error!(target: module_path!(), "Could not advance epoch.");
195                        }
196                    }
197                }
198
199                last_check_instant = Instant::now();
200            }
201        }
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use crate::launch::channels::Channels;
209    use crate::utilities::channel_content::InternalCommand;
210    use std::thread;
211
212    /// Helper to create a default config for testing.
213    fn test_config() -> MemoryConfig {
214        MemoryConfig {
215            active: false,
216            execute: false,
217            env_variable_name: "dummy".to_string(),
218            env_variable_min_val: 0,
219            #[cfg(feature = "jemalloc")]
220            max_allocated: 100,
221            #[cfg(feature = "jemalloc")]
222            max_resident: 200,
223            check_interval: 60,
224            env_variable_max_val: 0,
225        }
226    }
227
228    #[test]
229    #[cfg(feature = "jemalloc")]
230    fn test_new_initialization() {
231        // Arrange
232        let config = test_config();
233
234        // Act
235        let memory = Memory::new(config).unwrap();
236
237        // Assert
238        assert!(!memory.lock_warn_allocated_memory_exceeded);
239        assert!(!memory.lock_warn_resident_memory_exceeded);
240    }
241
242    #[test]
243    fn test_execute_graceful_shutdown() {
244        // Arrange
245        let config = test_config();
246        let mut memory = Memory::new(config).unwrap();
247        let mut channels = Channels::new_for_test();
248
249        // Act
250        let handle = thread::spawn(move || {
251            // The `process_external_request` trait is not implemented in this file,
252            // so we simulate its behavior for the test.
253            memory.execute(&mut channels.memory);
254        });
255
256        let spin_sleeper = SpinSleeper::default();
257        spin_sleeper.sleep(Duration::from_secs(1));
258
259        channels
260            .signal_handler
261            .send_to_memory(InternalCommand::Quit)
262            .unwrap();
263
264        // Assert
265        let result = handle.join();
266        assert!(
267            result.is_ok(),
268            "Thread should shut down gracefully on Quit command"
269        );
270    }
271
272    #[test]
273    #[cfg(feature = "jemalloc")]
274    fn test_check_memory_limits_below_threshold() {
275        // Arrange
276        let mut memory = Memory::new(test_config()).unwrap();
277
278        // Act
279        memory.check_memory_limits(50, 150); // Values are below 100 and 200 limits
280
281        // Assert
282        assert!(
283            !memory.lock_warn_allocated_memory_exceeded,
284            "Lock should not be set when allocated memory usage is below threshold"
285        );
286        assert!(
287            !memory.lock_warn_resident_memory_exceeded,
288            "Lock should not be set when resident memory usage is below threshold"
289        );
290    }
291
292    #[test]
293    #[cfg(feature = "jemalloc")]
294    fn test_check_memory_limits_above_allocated_threshold() {
295        // Arrange
296        let mut memory = Memory::new(test_config()).unwrap();
297
298        // Act
299        memory.check_memory_limits(150, 150); // allocated is above the 100-Byte limit
300
301        // Assert
302        assert!(
303            memory.lock_warn_allocated_memory_exceeded,
304            "Allocated memory usage warn lock flag should be set when usage exceeds limit"
305        );
306        assert!(
307            !memory.lock_warn_resident_memory_exceeded,
308            "Resident memory usage warn lock flag should remain unset"
309        );
310    }
311
312    #[test]
313    #[cfg(feature = "jemalloc")]
314    fn test_check_memory_limits_above_resident_threshold() {
315        // Arrange
316        let mut memory = Memory::new(test_config()).unwrap();
317
318        // Act
319        memory.check_memory_limits(50, 250); // Resident memory is above the 200-Byte limit
320
321        // Assert
322        assert!(
323            !memory.lock_warn_allocated_memory_exceeded,
324            "Allocated memory usage warn lock flag should remain unset"
325        );
326        assert!(
327            memory.lock_warn_resident_memory_exceeded,
328            "Resident memory memory lock flag should be set when usage exceeds limit"
329        );
330    }
331
332    #[test]
333    #[cfg(feature = "jemalloc")]
334    fn test_check_memory_limits_resets_lock_when_normal() {
335        // Arrange
336        let mut memory = Memory::new(test_config()).unwrap();
337
338        // Set the lock initially by exceeding the limit
339        memory.check_memory_limits(150, 250);
340        assert!(memory.lock_warn_resident_memory_exceeded);
341        assert!(memory.lock_warn_allocated_memory_exceeded);
342
343        // Act
344        // Now, run with values below the limit
345        memory.check_memory_limits(50, 150);
346
347        // Assert
348        assert!(
349            !memory.lock_warn_allocated_memory_exceeded,
350            "Allocated memory usage warn flag should be reset when usage returns to normal"
351        );
352        assert!(
353            !memory.lock_warn_resident_memory_exceeded,
354            "Resident memory usage warn lock flag should be reset when usage returns to normal"
355        );
356    }
357}