aquarium_control/watchmen/
memory.rs1use crate::utilities::proc_ext_req::ProcessExternalRequestTrait;
24use crate::watchmen::memory_channels::MemoryChannels;
25use crate::watchmen::memory_config::MemoryConfig;
26cfg_if::cfg_if! {
27 if #[cfg(feature = "jemalloc")] {
28 use log::{error, info};
29 use tikv_jemalloc_ctl::{epoch, stats};
30 #[cfg(not(test))]
31 use log::warn;
32 }
33}
34use crate::watchmen::memory_config_check::{memory_config_check, MemoryConfigError};
35#[cfg(target_os = "linux")]
36use log::info;
37use spin_sleep::SpinSleeper;
38use std::time::{Duration, Instant};
39
40#[cfg(all(target_os = "linux", not(test)))]
41use nix::unistd::gettid;
42
43pub struct Memory {
48 config: MemoryConfig,
50
51 #[cfg(feature = "jemalloc")]
52 lock_warn_resident_memory_exceeded: bool,
54
55 #[cfg(feature = "jemalloc")]
56 lock_warn_allocated_memory_exceeded: bool,
58}
59
60const CYCLE_TIME_HEATING_MEMORY: u64 = 250;
61
62impl Memory {
63 pub fn new(config: MemoryConfig) -> Result<Memory, MemoryConfigError> {
73 if config.active {
75 match memory_config_check(&config) {
77 Ok(_env_var_name_value_tuple_opt) => {
78 #[cfg(target_os = "linux")]
79 match _env_var_name_value_tuple_opt {
80 Some((env_var_name, value)) => {
81 if config.active {
82 info!("Memory configuration check: {env_var_name} is configured to {value}");
83 }
84 }
85 None => { }
86 }
87 }
88 Err(e) => {
89 return Err(e);
90 }
91 }
92 }
93
94 Ok(Self {
95 config,
96 #[cfg(feature = "jemalloc")]
97 lock_warn_allocated_memory_exceeded: false,
98 #[cfg(feature = "jemalloc")]
99 lock_warn_resident_memory_exceeded: false,
100 })
101 }
102
103 #[cfg(feature = "jemalloc")]
104 fn check_memory_limits(&mut self, allocated: usize, resident: usize) {
109 if allocated > self.config.max_allocated {
110 if !self.lock_warn_allocated_memory_exceeded {
111 #[cfg(not(test))]
112 {
113 let warning_message = format!(
114 "Currently allocated memory ({}) exceeds threshold ({})",
115 allocated, self.config.max_allocated
116 );
117 warn!(target: module_path!(), "{}", warning_message);
118 }
119 self.lock_warn_allocated_memory_exceeded = true;
120 }
121 } else {
122 self.lock_warn_allocated_memory_exceeded = false;
124 }
125
126 if resident > self.config.max_resident {
127 if !self.lock_warn_resident_memory_exceeded {
128 #[cfg(not(test))]
129 {
130 let warning_message = format!(
131 "Currently resident memory ({}) exceeds threshold ({})",
132 resident, self.config.max_resident
133 );
134 warn!(target: module_path!(), "{}", warning_message);
135 }
136 self.lock_warn_resident_memory_exceeded = true;
137 }
138 } else {
139 self.lock_warn_resident_memory_exceeded = false;
141 }
142 }
143
144 pub fn execute(&mut self, memory_channels: &mut MemoryChannels) {
154 #[cfg(all(target_os = "linux", not(test)))]
155 info!(target: module_path!(), "Thread started with TID: {}", gettid());
156
157 let sleep_time = Duration::from_millis(CYCLE_TIME_HEATING_MEMORY);
158 let check_interval = Duration::from_secs(self.config.check_interval);
159 let mut last_check_instant = Instant::now() - check_interval;
160 let spin_sleeper = SpinSleeper::default();
161
162 loop {
163 let (quit_command_received, _, _) = self
165 .process_external_request(&mut memory_channels.rx_memory_from_signal_handler, None);
166 if quit_command_received {
167 break;
168 }
169 spin_sleeper.sleep(sleep_time);
170
171 if last_check_instant.elapsed() > check_interval {
172 #[cfg(feature = "jemalloc")]
173 {
174 match epoch::advance() {
175 Ok(_) => {
176 let allocated = match tikv_jemalloc_ctl::stats::allocated::read() {
177 Ok(allocated) => allocated,
178 Err(_) => {
179 error!(target: module_path!(), "Could not read allocated memory");
180 continue;
181 }
182 };
183 let resident = match stats::resident::read() {
184 Ok(resident) => resident,
185 Err(_) => {
186 error!(target: module_path!(), "Could not read resident memory");
187 continue;
188 }
189 };
190 info!(target: module_path!(), "Jemalloc: allocated = {allocated}, resident = {resident}");
191 self.check_memory_limits(allocated, resident);
192 }
193 Err(_) => {
194 error!(target: module_path!(), "Could not advance epoch.");
195 }
196 }
197 }
198
199 last_check_instant = Instant::now();
200 }
201 }
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use crate::launch::channels::Channels;
209 use crate::utilities::channel_content::InternalCommand;
210 use std::thread;
211
212 fn test_config() -> MemoryConfig {
214 MemoryConfig {
215 active: false,
216 execute: false,
217 env_variable_name: "dummy".to_string(),
218 env_variable_min_val: 0,
219 #[cfg(feature = "jemalloc")]
220 max_allocated: 100,
221 #[cfg(feature = "jemalloc")]
222 max_resident: 200,
223 check_interval: 60,
224 env_variable_max_val: 0,
225 }
226 }
227
228 #[test]
229 #[cfg(feature = "jemalloc")]
230 fn test_new_initialization() {
231 let config = test_config();
233
234 let memory = Memory::new(config).unwrap();
236
237 assert!(!memory.lock_warn_allocated_memory_exceeded);
239 assert!(!memory.lock_warn_resident_memory_exceeded);
240 }
241
242 #[test]
243 fn test_execute_graceful_shutdown() {
244 let config = test_config();
246 let mut memory = Memory::new(config).unwrap();
247 let mut channels = Channels::new_for_test();
248
249 let handle = thread::spawn(move || {
251 memory.execute(&mut channels.memory);
254 });
255
256 let spin_sleeper = SpinSleeper::default();
257 spin_sleeper.sleep(Duration::from_secs(1));
258
259 channels
260 .signal_handler
261 .send_to_memory(InternalCommand::Quit)
262 .unwrap();
263
264 let result = handle.join();
266 assert!(
267 result.is_ok(),
268 "Thread should shut down gracefully on Quit command"
269 );
270 }
271
272 #[test]
273 #[cfg(feature = "jemalloc")]
274 fn test_check_memory_limits_below_threshold() {
275 let mut memory = Memory::new(test_config()).unwrap();
277
278 memory.check_memory_limits(50, 150); assert!(
283 !memory.lock_warn_allocated_memory_exceeded,
284 "Lock should not be set when allocated memory usage is below threshold"
285 );
286 assert!(
287 !memory.lock_warn_resident_memory_exceeded,
288 "Lock should not be set when resident memory usage is below threshold"
289 );
290 }
291
292 #[test]
293 #[cfg(feature = "jemalloc")]
294 fn test_check_memory_limits_above_allocated_threshold() {
295 let mut memory = Memory::new(test_config()).unwrap();
297
298 memory.check_memory_limits(150, 150); assert!(
303 memory.lock_warn_allocated_memory_exceeded,
304 "Allocated memory usage warn lock flag should be set when usage exceeds limit"
305 );
306 assert!(
307 !memory.lock_warn_resident_memory_exceeded,
308 "Resident memory usage warn lock flag should remain unset"
309 );
310 }
311
312 #[test]
313 #[cfg(feature = "jemalloc")]
314 fn test_check_memory_limits_above_resident_threshold() {
315 let mut memory = Memory::new(test_config()).unwrap();
317
318 memory.check_memory_limits(50, 250); assert!(
323 !memory.lock_warn_allocated_memory_exceeded,
324 "Allocated memory usage warn lock flag should remain unset"
325 );
326 assert!(
327 memory.lock_warn_resident_memory_exceeded,
328 "Resident memory memory lock flag should be set when usage exceeds limit"
329 );
330 }
331
332 #[test]
333 #[cfg(feature = "jemalloc")]
334 fn test_check_memory_limits_resets_lock_when_normal() {
335 let mut memory = Memory::new(test_config()).unwrap();
337
338 memory.check_memory_limits(150, 250);
340 assert!(memory.lock_warn_resident_memory_exceeded);
341 assert!(memory.lock_warn_allocated_memory_exceeded);
342
343 memory.check_memory_limits(50, 150);
346
347 assert!(
349 !memory.lock_warn_allocated_memory_exceeded,
350 "Allocated memory usage warn flag should be reset when usage returns to normal"
351 );
352 assert!(
353 !memory.lock_warn_resident_memory_exceeded,
354 "Resident memory usage warn lock flag should be reset when usage returns to normal"
355 );
356 }
357}