1#[cfg(feature = "debug_tank_level_switch")]
10use log::debug;
11use std::sync::{Arc, Mutex};
12
13#[cfg(all(target_os = "linux", not(test)))]
14use nix::unistd::gettid;
15
16use log::error;
17#[cfg(all(target_os = "linux", not(test)))]
18use log::info;
19
20#[cfg(not(test))]
21use log::warn;
22
23use crate::launch::channels::{AquaReceiver, AquaSender};
24use crate::sensors::tank_level_switch_channels::TankLevelSwitchChannels;
25use crate::sensors::tank_level_switch_config::TankLevelSwitchConfig;
26use crate::utilities::check_mutex_access_duration::CheckMutexAccessDurationTrait;
27use spin_sleep::SpinSleeper;
28use std::time::{Duration, Instant};
29use thiserror::Error;
30
31cfg_if::cfg_if! {
32 if #[cfg(all(target_os = "linux", feature = "target_hw"))] {
33 use rppal::gpio::{Gpio, InputPin};
34 }
35}
36
37use crate::simulator::get_resp_sim::GetResponseFromSimulatorTrait;
38use crate::simulator::tcp_communication_error::TcpCommunicationError;
39use crate::utilities::acknowledge_signal_handler::AcknowledgeSignalHandlerTrait;
40use crate::utilities::channel_content::{AquariumSignal, InternalCommand};
41use crate::utilities::logger::log_error_chain;
42use crate::utilities::proc_ext_req::ProcessExternalRequestTrait;
43use crate::utilities::wait_for_termination::WaitForTerminationTrait;
44
45const CYCLE_TIME_TANK_LEVEL_SWITCH_MILLIS: u64 = 100;
48
49const PERMISSIBLE_CYCLE_TIME_DEVIATION_TANK_LEVEL_SWITCH_MILLIS: u128 = 5;
52
53const MAX_MUTEX_ACCESS_DURATION_MILLIS: u64 = 10;
55
56pub struct TankLevelSwitchSignals {
61 pub tank_level_switch_position: bool,
63
64 pub tank_level_switch_position_stabilized: bool,
66
67 pub tank_level_switch_invalid: bool,
69}
70
71impl TankLevelSwitchSignals {
72 pub fn new(
86 tank_level_switch_position: bool,
87 tank_level_switch_position_stabilized: bool,
88 tank_level_switch_invalid: bool,
89 ) -> TankLevelSwitchSignals {
90 TankLevelSwitchSignals {
91 tank_level_switch_position,
92 tank_level_switch_position_stabilized,
93 tank_level_switch_invalid,
94 }
95 }
96}
97
98#[derive(Error, Debug)]
100pub enum TankLevelSwitchError {
101 #[cfg(all(target_os = "linux", feature = "target_hw"))]
103 #[error("[{0}] Application shall use real HW, but GPIO lib interface was not provided.")]
104 GPIOInterfaceNotProvided(String),
105
106 #[cfg(all(target_os = "linux", feature = "target_hw"))]
108 #[error("[{location}] Could not get pin {pin_number} from GPIO interface for tank level monitoring.")]
109 GpioPinRetrievalError {
110 location: String,
111 pin_number: u8,
112
113 #[source]
114 source: rppal::gpio::Error,
115 },
116
117 #[error("[{0}] Configured to run with simulator, but no channel to TCP thread provided.")]
119 SimulatorChannelNotProvided(String),
120
121 #[allow(unused)] #[error("[{0}] Input pin has not been provided. This is a configuration error.")]
124 InputPinNotProvided(String),
125
126 #[error("[{location}] Communication with simulator for {signal} failed")]
128 SimulatorCommunicationError {
129 location: String,
130 signal: AquariumSignal,
131
132 #[source]
133 source: TcpCommunicationError,
134 },
135}
136
137#[cfg_attr(doc, aquamarine::aquamarine)]
142pub struct TankLevelSwitch {
154 config: TankLevelSwitchConfig,
156
157 tank_level_switch_position: bool,
159
160 tank_level_switch_position_stabilized: bool,
162
163 tank_level_switch_invalid: bool,
165
166 tank_level_switch_position_low_stabilization_counter: u32,
168
169 pub lock_warn_inapplicable_command_signal_handler: bool,
171
172 pub lock_error_channel_receive_termination: bool,
174
175 pub lock_warn_max_mutex_access_duration: bool,
177
178 lock_error_mutex_poisoned: bool,
180
181 #[cfg(test)]
183 pub mutex_access_duration_exceeded: bool,
184
185 lock_error_input_pin_none: bool,
187
188 lock_error_simulator_communication: bool,
190
191 lock_error_channel_not_provided: bool,
193
194 #[cfg(all(target_os = "linux", feature = "target_hw"))]
195 input_pin_opt: Option<InputPin>,
197
198 pub max_mutex_access_duration: Duration,
200}
201
202impl ProcessExternalRequestTrait for TankLevelSwitch {}
203
204impl GetResponseFromSimulatorTrait for TankLevelSwitch {}
205
206impl TankLevelSwitch {
207 #[cfg(any(not(target_os = "linux"), not(feature = "target_hw")))]
208 pub fn new(
231 config: TankLevelSwitchConfig,
232 initial_position_stabilized: bool,
233 ) -> Result<TankLevelSwitch, TankLevelSwitchError> {
234 Ok(TankLevelSwitch {
235 config,
236 tank_level_switch_position: true, tank_level_switch_invalid: false, tank_level_switch_position_stabilized: initial_position_stabilized,
239 tank_level_switch_position_low_stabilization_counter: 0,
240 lock_warn_inapplicable_command_signal_handler: false,
241 lock_error_channel_receive_termination: false,
242 lock_warn_max_mutex_access_duration: false,
243 lock_error_mutex_poisoned: false,
244 #[cfg(test)]
245 mutex_access_duration_exceeded: false,
246 lock_error_input_pin_none: false,
247 lock_error_simulator_communication: false,
248 lock_error_channel_not_provided: false,
249 max_mutex_access_duration: Duration::from_millis(MAX_MUTEX_ACCESS_DURATION_MILLIS),
250 })
251 }
252
253 #[cfg(all(target_os = "linux", feature = "target_hw"))]
254 pub fn new(
278 config: TankLevelSwitchConfig,
279 initial_position_stabilized: bool,
280 gpio_lib_handle_opt: Option<Gpio>,
281 gpio_tank_level: u8,
282 ) -> Result<TankLevelSwitch, TankLevelSwitchError> {
283 let input_pin_opt = if config.use_simulator {
284 None
286 } else {
287 let gpio_lib_handle = gpio_lib_handle_opt.ok_or_else(|| {
289 TankLevelSwitchError::GPIOInterfaceNotProvided(module_path!().to_string())
290 })?;
291
292 let tank_level_pin = gpio_lib_handle.get(gpio_tank_level).map_err(|e| {
294 TankLevelSwitchError::GpioPinRetrievalError {
295 location: module_path!().to_string(),
296 pin_number: gpio_tank_level,
297 source: e,
298 }
299 })?;
300 Some(tank_level_pin.into_input_pullup())
301 };
302
303 Ok(TankLevelSwitch {
304 config,
305 tank_level_switch_position: true, tank_level_switch_invalid: false, tank_level_switch_position_stabilized: initial_position_stabilized,
308 tank_level_switch_position_low_stabilization_counter: 0,
309 lock_warn_inapplicable_command_signal_handler: false,
310 lock_error_channel_receive_termination: false,
311 lock_warn_max_mutex_access_duration: false,
312 lock_error_mutex_poisoned: false,
313 #[cfg(test)]
314 mutex_access_duration_exceeded: false,
315 lock_error_input_pin_none: false,
316 input_pin_opt,
317 lock_error_channel_not_provided: false,
318 lock_error_simulator_communication: false,
319 max_mutex_access_duration: Duration::from_millis(MAX_MUTEX_ACCESS_DURATION_MILLIS),
320 })
321 }
322
323 pub fn update_tank_level_switch_signals(
344 &mut self,
345 tx_tank_level_switch_to_tcp_opt: &mut Option<AquaSender<InternalCommand>>,
346 rx_tank_level_switch_from_tcp_opt: &mut Option<
347 AquaReceiver<Result<f32, TcpCommunicationError>>,
348 >,
349 ) -> Result<(), TankLevelSwitchError> {
350 if self.config.use_simulator {
351 let tx_tank_level_switch_to_tcp =
353 tx_tank_level_switch_to_tcp_opt.as_mut().ok_or_else(|| {
354 TankLevelSwitchError::SimulatorChannelNotProvided(module_path!().to_string())
355 })?;
356 let rx_tank_level_switch_from_tcp =
357 rx_tank_level_switch_from_tcp_opt.as_mut().ok_or_else(|| {
358 TankLevelSwitchError::SimulatorChannelNotProvided(module_path!().to_string())
359 })?;
360
361 let position_signal = AquariumSignal::TankLevelSwitchPosition;
363 let position_val = Self::get_response_from_simulator(
364 module_path!().to_string(),
365 tx_tank_level_switch_to_tcp,
366 rx_tank_level_switch_from_tcp,
367 InternalCommand::RequestSignal(position_signal.clone()),
368 )
369 .map_err(|e| TankLevelSwitchError::SimulatorCommunicationError {
370 location: module_path!().to_string(),
371 signal: position_signal,
372 source: e,
373 })?;
374 self.tank_level_switch_position = position_val > 0.0;
375
376 let invalid_signal = AquariumSignal::TankLevelSwitchInvalid;
378 let invalid_val = Self::get_response_from_simulator(
379 module_path!().to_string(),
380 tx_tank_level_switch_to_tcp,
381 rx_tank_level_switch_from_tcp,
382 InternalCommand::RequestSignal(invalid_signal.clone()),
383 )
384 .map_err(|e| TankLevelSwitchError::SimulatorCommunicationError {
385 location: module_path!().to_string(),
386 signal: invalid_signal,
387 source: e,
388 })?;
389 self.tank_level_switch_invalid = invalid_val > 0.0;
390 } else {
391 #[cfg(all(target_os = "linux", feature = "target_hw"))]
392 {
393 if let Some(pin) = self.input_pin_opt.as_mut() {
395 self.tank_level_switch_position = pin.is_high();
396 self.lock_error_input_pin_none = false;
397 } else {
398 return Err(TankLevelSwitchError::InputPinNotProvided(
400 module_path!().to_string(),
401 ));
402 }
403 }
404 }
405 Ok(())
406 }
407
408 fn calc_tank_level_switch_position_stabilized(&mut self) {
421 if self.tank_level_switch_invalid {
422 self.tank_level_switch_position_stabilized = true;
424 self.tank_level_switch_position_low_stabilization_counter = 0;
426 } else {
427 self.tank_level_switch_position_stabilized = match self.tank_level_switch_position {
429 true => {
430 self.tank_level_switch_position_low_stabilization_counter = 0;
431 true }
433 false => {
434 if self.tank_level_switch_position_low_stabilization_counter
435 < self
436 .config
437 .tank_level_switch_position_low_stabilization_count
438 * 10
439 {
440 self.tank_level_switch_position_low_stabilization_counter = self
442 .tank_level_switch_position_low_stabilization_counter
443 .saturating_add(1);
444
445 #[cfg(feature = "debug_tank_level_switch")]
446 debug!(
447 "calc_tank_level_switch_position_stabilized: tank_level_switch_position_low_stabilization_counter = {}, tank_level_switch_position_stabilized={}",
448 self.tank_level_switch_position_low_stabilization_counter,
449 self.tank_level_switch_position_stabilized
450 );
451 self.tank_level_switch_position_stabilized
452 } else {
453 false
454 }
455 }
456 }
457 }
458 }
459
460 pub fn execute(
483 &mut self,
484 tank_level_switch_channels: &mut TankLevelSwitchChannels,
485 mutex_tank_level_switch_signals: Arc<Mutex<TankLevelSwitchSignals>>,
486 ) -> Result<(), TankLevelSwitchError> {
487 #[cfg(all(target_os = "linux", not(test)))]
488 info!(target: module_path!(), "Thread started with TID: {}", gettid());
489
490 let spin_sleeper = SpinSleeper::default();
491 let mut lock_warn_cycle_time_exceeded: bool = false;
492 let cycle_time_duration = Duration::from_millis(CYCLE_TIME_TANK_LEVEL_SWITCH_MILLIS);
493
494 if (self.config.use_simulator)
495 && (tank_level_switch_channels
496 .tx_tank_level_switch_to_tcp_opt
497 .is_none()
498 | tank_level_switch_channels
499 .rx_tank_level_switch_from_tcp_opt
500 .is_none())
501 {
502 return Err(TankLevelSwitchError::SimulatorChannelNotProvided(
503 module_path!().to_string(),
504 ));
505 }
506
507 let mut start_time = Instant::now();
508
509 loop {
510 let (
511 quit_command_received, _,
513 _,
514 ) = self.process_external_request(
515 &mut tank_level_switch_channels.rx_tank_level_switch_from_signal_handler,
516 None,
517 );
518 if quit_command_received {
519 break;
520 }
521
522 if self.config.active {
523 if let Err(e) = self.update_tank_level_switch_signals(
525 &mut tank_level_switch_channels.tx_tank_level_switch_to_tcp_opt,
526 &mut tank_level_switch_channels.rx_tank_level_switch_from_tcp_opt,
527 ) {
528 if matches!(e, TankLevelSwitchError::InputPinNotProvided(_)) {
529 if !self.lock_error_input_pin_none {
530 log_error_chain(
531 module_path!(),
532 "Failed to update tank level switch signals.",
533 e,
534 );
535 self.lock_error_input_pin_none = true;
536 }
537 } else if matches!(e, TankLevelSwitchError::SimulatorCommunicationError { .. })
538 {
539 if !self.lock_error_simulator_communication {
540 log_error_chain(
541 module_path!(),
542 "Failed to update tank level switch signals.",
543 e,
544 );
545 self.lock_error_simulator_communication = true;
546 }
547 } else if matches!(e, TankLevelSwitchError::SimulatorChannelNotProvided(_)) {
548 if !self.lock_error_channel_not_provided {
549 log_error_chain(
550 module_path!(),
551 "Failed to update tank level switch signals.",
552 e,
553 );
554 self.lock_error_channel_not_provided = true;
555 }
556 } else {
557 log_error_chain(
559 module_path!(),
560 "Failed to update tank level switch signals.",
561 e,
562 );
563 }
564 } else {
565 self.lock_error_input_pin_none = false;
566 }
567
568 self.calc_tank_level_switch_position_stabilized();
569
570 let instant_before_locking_mutex = Instant::now();
571 let mut instant_after_locking_mutex = Instant::now(); {
575 match mutex_tank_level_switch_signals.lock() {
576 Ok(mut c) => {
577 instant_after_locking_mutex = Instant::now();
578 c.tank_level_switch_position_stabilized =
579 self.tank_level_switch_position_stabilized;
580 c.tank_level_switch_invalid = self.tank_level_switch_invalid;
581 c.tank_level_switch_position = self.tank_level_switch_position;
582 self.lock_error_mutex_poisoned = false;
583 }
584 Err(e) => {
585 if !self.lock_error_mutex_poisoned {
586 error!(target: module_path!(), "Could not lock mutex ({e:?}).");
587 }
588 self.lock_error_mutex_poisoned = true;
589 }
590 }
591 }
592
593 self.check_mutex_access_duration(
595 None,
596 instant_after_locking_mutex,
597 instant_before_locking_mutex,
598 );
599 }
600
601 let stop_time = Instant::now();
602 let execution_duration = stop_time.duration_since(start_time);
603 if execution_duration > cycle_time_duration {
604 let delta_duration = execution_duration - cycle_time_duration;
605 if !self.config.use_simulator
606 && delta_duration.as_millis()
607 > PERMISSIBLE_CYCLE_TIME_DEVIATION_TANK_LEVEL_SWITCH_MILLIS
608 && !lock_warn_cycle_time_exceeded
609 {
610 #[cfg(not(test))]
611 warn!(target: module_path!(),
612 "execution duration of {} milliseconds exceeds cycle time of {} (delta duration={})",
613 execution_duration.as_millis(),
614 CYCLE_TIME_TANK_LEVEL_SWITCH_MILLIS,
615 delta_duration.as_millis(),
616 );
617 lock_warn_cycle_time_exceeded = true;
618 }
619 } else {
620 let remaining_sleep_time = cycle_time_duration - execution_duration;
621 lock_warn_cycle_time_exceeded = false;
622 spin_sleeper.sleep(remaining_sleep_time);
624 }
625
626 start_time = Instant::now();
627 }
628
629 tank_level_switch_channels.acknowledge_signal_handler();
630
631 let sleep_duration_one_millis = Duration::from_millis(1);
635 self.wait_for_termination(
636 &mut tank_level_switch_channels.rx_tank_level_switch_from_signal_handler,
637 sleep_duration_one_millis,
638 module_path!(),
639 );
640 Ok(())
641 }
642}
643
644#[cfg(test)]
645pub mod tests {
646 use crate::launch::channels::{channel, Channels};
647 use crate::mocks::mock_signal::tests::MockSignal;
648 use crate::mocks::mock_tcp::tests::mock_tcp_for_tank_level_switch;
649 use crate::mocks::test_command::tests::TestCommand;
650 use crate::sensors::tank_level_switch::{TankLevelSwitch, TankLevelSwitchSignals};
651 use crate::utilities::channel_content::InternalCommand;
652 use crate::utilities::config::{read_config_file, ConfigData};
653 use spin_sleep::SpinSleeper;
654 use std::sync::{Arc, Mutex};
655 use std::thread;
656 use std::thread::JoinHandle;
657 use std::time::Duration;
658
659 #[cfg(all(target_os = "linux", feature = "target_hw"))]
660 use crate::sensors::gpio_handler::GpioHandler;
661
662 #[cfg(all(target_os = "linux", feature = "target_hw"))]
663 use crate::relays::actuate_gpio;
664 use crate::sensors::tank_level_switch_channels::TankLevelSwitchChannels;
665
666 #[test]
667 pub fn test_refill_calc_tank_level_switch_position_stabilized_false_when_invalid() {
669 let mut channels = Channels::new(false, false, true);
670
671 let (_tx_test_environment_to_tcp, mut rx_tcp_from_test_environment) = channel(1);
672
673 let config: ConfigData =
674 read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
675 let duration_test_case_secs = config
676 .tank_level_switch
677 .tank_level_switch_position_low_stabilization_count
678 + 1;
679
680 #[cfg(all(target_os = "linux", feature = "target_hw"))]
681 let mut tank_level_switch = TankLevelSwitch::new(
682 config.tank_level_switch,
683 true,
684 None,
685 config.gpio_handler.tank_level_switch,
686 );
687 #[cfg(any(not(target_os = "linux"), not(feature = "target_hw")))]
688 let mut tank_level_switch = TankLevelSwitch::new(config.tank_level_switch, true).unwrap();
689
690 let mutex_tank_level_switch_signals =
691 Arc::new(Mutex::new(TankLevelSwitchSignals::new(false, false, true)));
692
693 let join_handle_mock_tcp = thread::Builder::new()
695 .name("mock_tcp".to_string())
696 .spawn(move || {
697 mock_tcp_for_tank_level_switch(
698 channels.tcp_communication,
699 Some(&mut rx_tcp_from_test_environment),
700 false,
701 true,
702 );
703 })
704 .unwrap();
705
706 let join_handle_test_environment = thread::Builder::new()
708 .name("test_environment".to_string())
709 .spawn(move || {
710 let sleep_duration = Duration::from_secs(duration_test_case_secs.into());
711 let sleep_duration_thread_processing = Duration::from_millis(50);
712 let spin_sleeper = SpinSleeper::default();
713 spin_sleeper.sleep(sleep_duration);
714 spin_sleeper.sleep(sleep_duration_thread_processing);
715 let _ = channels
716 .signal_handler
717 .send_to_tank_level_switch(InternalCommand::Quit);
718 channels
719 .signal_handler
720 .receive_from_tank_level_switch()
721 .unwrap();
722 let _ = channels
723 .signal_handler
724 .send_to_tank_level_switch(InternalCommand::Terminate);
725 })
726 .unwrap();
727
728 let join_handle_test_object = thread::Builder::new()
730 .name("test_object".to_string())
731 .spawn(move || {
732 let mut tx_tank_level_switch_to_tcp_for_test_case_finish = channels
734 .tank_level_switch
735 .tx_tank_level_switch_to_tcp_opt
736 .clone()
737 .unwrap();
738
739 let mutex_tank_level_switch_signals_for_assert =
740 mutex_tank_level_switch_signals.clone();
741 let _ = tank_level_switch.execute(
742 &mut channels.tank_level_switch,
743 mutex_tank_level_switch_signals,
744 );
745
746 let signals_after_test_run =
748 mutex_tank_level_switch_signals_for_assert.lock().unwrap();
749 assert!(signals_after_test_run.tank_level_switch_position_stabilized);
751 assert!(signals_after_test_run.tank_level_switch_invalid);
753
754 let _ =
756 tx_tank_level_switch_to_tcp_for_test_case_finish.send(InternalCommand::Quit);
757 })
758 .unwrap();
759
760 join_handle_test_object
761 .join()
762 .expect("Test object thread did not finish.");
763 join_handle_mock_tcp
764 .join()
765 .expect("Mock TCP thread did not finish.");
766 join_handle_test_environment
767 .join()
768 .expect("Test environment thread did not finish.");
769
770 println!("* [TankLevelSwitch] testing if stabilized tank level position is high when invalid bit is true succeeded.");
771 }
772
773 fn create_test_object(
775 mut tank_level_switch: TankLevelSwitch,
776 mut tank_level_switch_channels: TankLevelSwitchChannels,
777 mutex_tank_level_switch_signals: Arc<Mutex<TankLevelSwitchSignals>>,
778 ) -> JoinHandle<()> {
779 thread::Builder::new()
780 .name("test_object".to_string())
781 .spawn(move || {
782 let _ = tank_level_switch.execute(
783 &mut tank_level_switch_channels,
784 mutex_tank_level_switch_signals,
785 );
786 })
787 .unwrap()
788 }
789
790 #[test]
791 pub fn test_refill_calc_tank_level_switch_position_stabilized_false_true() {
793 let mut channels = Channels::new(false, false, true);
794
795 let (mut tx_test_environment_to_tcp, mut rx_tcp_from_test_environment) = channel(1);
796
797 let mut config: ConfigData =
798 read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
799 config.refill.initial_wait_interval = 2;
800 config.refill.check_interval = 60;
801
802 let duration_test_case_part_1: u64 = 3; let duration_test_case_part_2: u64 = 2; #[cfg(all(target_os = "linux", feature = "target_hw"))]
806 let mut tank_level_switch = TankLevelSwitch::new(
807 config.tank_level_switch,
808 false,
809 None,
810 config.gpio_handler.tank_level_switch,
811 );
812 #[cfg(any(not(target_os = "linux"), not(feature = "target_hw")))]
813 let tank_level_switch = TankLevelSwitch::new(config.tank_level_switch, false).unwrap();
814 let mutex_tank_level_switch_signals =
815 Arc::new(Mutex::new(TankLevelSwitchSignals::new(true, true, false)));
816
817 let join_handle_mock_tcp = thread::Builder::new()
819 .name("mock_tcp".to_string())
820 .spawn(move || {
821 mock_tcp_for_tank_level_switch(
822 channels.tcp_communication,
823 Some(&mut rx_tcp_from_test_environment),
824 false,
825 false,
826 );
827 })
828 .unwrap();
829
830 let mut tx_test_environment_to_tcp_for_test_case_finish = channels
831 .tank_level_switch
832 .tx_tank_level_switch_to_tcp_opt
833 .clone()
834 .unwrap();
835
836 let join_handle_test_environment = thread::Builder::new()
838 .name("test_environment".to_string())
839 .spawn(move || {
840 let sleep_duration_part_1 = Duration::from_secs(duration_test_case_part_1.into());
841 let sleep_duration_part_2 = Duration::from_secs(duration_test_case_part_2.into());
842 let sleep_duration_thread_processing = Duration::from_millis(50);
843 let spin_sleeper = SpinSleeper::default();
844 spin_sleeper.sleep(sleep_duration_part_1);
845 let _ = tx_test_environment_to_tcp.send(TestCommand::UpdateSignal(
846 MockSignal::TankLevelSwitchPosition(true),
847 ));
848 spin_sleeper.sleep(sleep_duration_part_2);
849 let _ = channels
850 .signal_handler
851 .send_to_heating(InternalCommand::Quit);
852 let _ = channels
853 .signal_handler
854 .send_to_refill(InternalCommand::Quit);
855 spin_sleeper.sleep(sleep_duration_thread_processing);
856 let _ = channels
857 .signal_handler
858 .send_to_tank_level_switch(InternalCommand::Quit);
859 channels
860 .signal_handler
861 .receive_from_tank_level_switch()
862 .unwrap();
863 let _ = channels
864 .signal_handler
865 .send_to_heating(InternalCommand::Terminate);
866 let _ = channels
867 .signal_handler
868 .send_to_refill(InternalCommand::Terminate);
869 let _ = tx_test_environment_to_tcp_for_test_case_finish.send(InternalCommand::Quit);
870 let _ = channels
871 .signal_handler
872 .send_to_tank_level_switch(InternalCommand::Terminate);
873 })
874 .unwrap();
875
876 let join_handle_test_object = create_test_object(
878 tank_level_switch,
879 channels.tank_level_switch,
880 mutex_tank_level_switch_signals,
881 );
882
883 join_handle_test_object
884 .join()
885 .expect("Test object thread did not finish.");
886 join_handle_mock_tcp
887 .join()
888 .expect("Mock TCP thread did not finish.");
889 join_handle_test_environment
890 .join()
891 .expect("Test environment thread did not finish.");
892
893 println!("* [TankLevelSwitch] testing immediate change false -> true for stabilized tank level position when invalid bit is false succeeded");
894 }
895
896 #[test]
897 pub fn test_refill_calc_tank_level_switch_position_stabilized_true_false() {
899 let mut channels = Channels::new(false, false, true);
900
901 let (mut tx_test_environment_to_tcp, mut rx_tcp_from_test_environment) = channel(1);
902
903 let config: ConfigData =
904 read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
905 let duration_test_case_part_1: u64 = 3; let duration_test_case_part_2: u64 = 8; #[cfg(all(target_os = "linux", feature = "target_hw"))]
908 let mut tank_level_switch = TankLevelSwitch::new(
909 config.tank_level_switch,
910 true,
911 None,
912 config.gpio_handler.tank_level_switch,
913 );
914 #[cfg(any(not(target_os = "linux"), not(feature = "target_hw")))]
915 let tank_level_switch = TankLevelSwitch::new(config.tank_level_switch, true).unwrap();
916 let mutex_tank_level_switch_signals =
917 Arc::new(Mutex::new(TankLevelSwitchSignals::new(true, true, false)));
918
919 let join_handle_mock_tcp = thread::Builder::new()
921 .name("mock_tcp".to_string())
922 .spawn(move || {
923 mock_tcp_for_tank_level_switch(
924 channels.tcp_communication,
925 Some(&mut rx_tcp_from_test_environment),
926 true,
927 false,
928 );
929 })
930 .unwrap();
931
932 let mut tx_test_environment_to_tcp_for_test_case_finish = channels
933 .tank_level_switch
934 .tx_tank_level_switch_to_tcp_opt
935 .clone()
936 .unwrap();
937
938 let join_handle_test_environment = thread::Builder::new()
940 .name("test_environment".to_string())
941 .spawn(move || {
942 let sleep_duration_part_1 = Duration::from_secs(duration_test_case_part_1.into());
943 let sleep_duration_part_2 = Duration::from_secs(duration_test_case_part_2.into());
944 let sleep_duration_thread_processing = Duration::from_millis(50);
945 let spin_sleeper = SpinSleeper::default();
946 spin_sleeper.sleep(sleep_duration_part_1);
947 let _ = tx_test_environment_to_tcp.send(TestCommand::UpdateSignal(
948 MockSignal::TankLevelSwitchPosition(false),
949 ));
950 spin_sleeper.sleep(sleep_duration_part_2);
951 let _ = channels
952 .signal_handler
953 .send_to_heating(InternalCommand::Quit);
954 let _ = channels
955 .signal_handler
956 .send_to_refill(InternalCommand::Quit);
957 spin_sleeper.sleep(sleep_duration_thread_processing);
958 let _ = channels
959 .signal_handler
960 .send_to_tank_level_switch(InternalCommand::Quit);
961 channels
962 .signal_handler
963 .receive_from_tank_level_switch()
964 .unwrap();
965 let _ = channels
966 .signal_handler
967 .send_to_heating(InternalCommand::Terminate);
968 let _ = channels
969 .signal_handler
970 .send_to_refill(InternalCommand::Terminate);
971 let _ = tx_test_environment_to_tcp_for_test_case_finish.send(InternalCommand::Quit);
972 let _ = channels
973 .signal_handler
974 .send_to_tank_level_switch(InternalCommand::Terminate);
975 })
976 .unwrap();
977
978 let join_handle_test_object = create_test_object(
980 tank_level_switch,
981 channels.tank_level_switch,
982 mutex_tank_level_switch_signals,
983 );
984
985 join_handle_test_object
986 .join()
987 .expect("Test object thread did not finish.");
988 join_handle_mock_tcp
989 .join()
990 .expect("Mock TCP thread did not finish.");
991 join_handle_test_environment
992 .join()
993 .expect("Test environment thread did not finish.");
994
995 println!("* [TankLevelSwitch] checking delayed change true -> false for stabilized tank level position when invalid bit is false succeeded.");
996 }
997
998 #[test]
999 pub fn test_refill_calc_tank_level_switch_position_stabilized_is_true_with_toggling_input() {
1001 let mut channels = Channels::new(false, false, true);
1002 let (mut tx_test_environment_to_tcp, mut rx_tcp_from_test_environment) = channel(1);
1003
1004 let config: ConfigData =
1005 read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
1006 let duration_test_case_part: u64 = 2; #[cfg(all(target_os = "linux", feature = "target_hw"))]
1009 let mut tank_level_switch = TankLevelSwitch::new(
1010 config.tank_level_switch,
1011 true,
1012 None,
1013 config.gpio_handler.tank_level_switch,
1014 );
1015 #[cfg(any(not(target_os = "linux"), not(feature = "target_hw")))]
1016 let tank_level_switch = TankLevelSwitch::new(config.tank_level_switch, true).unwrap();
1017 let mutex_tank_level_switch_signals =
1018 Arc::new(Mutex::new(TankLevelSwitchSignals::new(true, true, false)));
1019
1020 let join_handle_mock_tcp = thread::Builder::new()
1022 .name("mock_tcp".to_string())
1023 .spawn(move || {
1024 mock_tcp_for_tank_level_switch(
1025 channels.tcp_communication,
1026 Some(&mut rx_tcp_from_test_environment),
1027 true,
1028 false,
1029 );
1030 })
1031 .unwrap();
1032
1033 let mut tx_test_environment_to_tcp_for_test_case_finish = channels
1034 .tank_level_switch
1035 .tx_tank_level_switch_to_tcp_opt
1036 .clone()
1037 .unwrap();
1038
1039 let join_handle_test_environment = thread::Builder::new()
1041 .name("test_environment".to_string())
1042 .spawn(move || {
1043 let sleep_duration_part = Duration::from_secs(duration_test_case_part.into());
1044 let sleep_duration_thread_processing = Duration::from_millis(50);
1045 let spin_sleeper = SpinSleeper::default();
1046 for _ in 0..2 {
1047 spin_sleeper.sleep(sleep_duration_part);
1048 let _ = tx_test_environment_to_tcp.send(TestCommand::UpdateSignal(
1049 MockSignal::TankLevelSwitchPosition(false),
1050 ));
1051 spin_sleeper.sleep(sleep_duration_part);
1052 let _ = tx_test_environment_to_tcp.send(TestCommand::UpdateSignal(
1053 MockSignal::TankLevelSwitchPosition(true),
1054 ));
1055 }
1056 let _ = channels
1057 .signal_handler
1058 .send_to_heating(InternalCommand::Quit);
1059 let _ = channels
1060 .signal_handler
1061 .send_to_refill(InternalCommand::Quit);
1062 spin_sleeper.sleep(sleep_duration_thread_processing);
1063 let _ = channels
1064 .signal_handler
1065 .send_to_tank_level_switch(InternalCommand::Quit);
1066 channels
1067 .signal_handler
1068 .receive_from_tank_level_switch()
1069 .unwrap();
1070 let _ = channels
1071 .signal_handler
1072 .send_to_heating(InternalCommand::Terminate);
1073 let _ = channels
1074 .signal_handler
1075 .send_to_refill(InternalCommand::Terminate);
1076 let _ = tx_test_environment_to_tcp_for_test_case_finish.send(InternalCommand::Quit);
1077 let _ = channels
1078 .signal_handler
1079 .send_to_tank_level_switch(InternalCommand::Terminate);
1080 })
1081 .unwrap();
1082
1083 let join_handle_test_object = create_test_object(
1085 tank_level_switch,
1086 channels.tank_level_switch,
1087 mutex_tank_level_switch_signals,
1088 );
1089
1090 join_handle_test_object
1091 .join()
1092 .expect("Test object thread did not finish.");
1093 join_handle_mock_tcp
1094 .join()
1095 .expect("Mock TCP thread did not finish.");
1096 join_handle_test_environment
1097 .join()
1098 .expect("Test environment thread did not finish.");
1099
1100 println!("* [TankLevelSwitch] checking delayed change true -> false for stabilized tank level position when invalid bit is false succeeded.");
1101 }
1102
1103 #[cfg(all(target_os = "linux", feature = "target_hw"))]
1104 fn test_transition_stabilized_true_false(
1105 spin_sleeper: &SpinSleeper,
1106 sleep_duration_part: Duration,
1107 mutex_tank_level_switch_signals: &mut Arc<Mutex<TankLevelSwitchSignals>>,
1108 ) {
1109 for i in 0..20 {
1110 spin_sleeper.sleep(sleep_duration_part);
1112 {
1113 let tank_level_switch_signals = mutex_tank_level_switch_signals.lock().unwrap();
1114 println!(
1115 "Tank level switch signals at iteration {}: Position={} Invalid={} Stabilized={}",
1116 i,
1117 tank_level_switch_signals.tank_level_switch_position,
1118 tank_level_switch_signals.tank_level_switch_invalid,
1119 tank_level_switch_signals.tank_level_switch_position_stabilized
1120 );
1121 if i < 10 {
1122 assert_eq!(tank_level_switch_signals.tank_level_switch_position, false);
1123 assert_eq!(
1124 tank_level_switch_signals.tank_level_switch_position_stabilized,
1125 true
1126 );
1127 } else {
1128 assert_eq!(tank_level_switch_signals.tank_level_switch_position, false);
1129 assert_eq!(
1130 tank_level_switch_signals.tank_level_switch_position_stabilized,
1131 false
1132 );
1133 }
1134 }
1135 }
1136 }
1137
1138 #[test]
1139 #[ignore]
1140 #[cfg(all(target_os = "linux", feature = "target_hw"))]
1141 pub fn test_tank_level_switch_position_on_hardware() {
1143 let mut config: ConfigData =
1144 read_config_file("/config/aquarium_control_test_generic.toml".to_string());
1145 config.gpio_handler.use_simulator = false;
1146 config.tank_level_switch.use_simulator = false;
1147
1148 let (tx_tank_level_switch_to_signal_handler, rx_signal_handler_from_tank_level_switch) =
1149 channel(1);
1150 let (tx_signal_handler_to_tank_level_switch, rx_tank_level_switch_from_signal_handler) =
1151 channel(1);
1152
1153 let tank_level_switch_pin = config.gpio_handler.tank_level_switch;
1154
1155 let gpio_handler = GpioHandler::new(config.gpio_handler).unwrap();
1156
1157 let gpio_lib_handle = gpio_handler.gpio_lib_handle_opt.clone().unwrap();
1158
1159 let mut stimuli_pin =
1160 actuate_gpio::get_output_pin(&gpio_lib_handle, 21, "Tank level switch stimuli");
1161
1162 let mut tank_level_switch = TankLevelSwitch::new(
1163 config.tank_level_switch,
1164 true,
1165 Some(gpio_handler.gpio_lib_handle_opt.unwrap()),
1166 tank_level_switch_pin,
1167 );
1168
1169 let mut mutex_tank_level_switch_signals =
1170 Arc::new(Mutex::new(TankLevelSwitchSignals::new(true, true, false)));
1171 let mutex_tank_level_switch_signals_clone_for_test_object =
1172 mutex_tank_level_switch_signals.clone();
1173
1174 let join_handle_test_environment = thread::Builder::new()
1176 .name("test_environment".to_string())
1177 .spawn(move || {
1178 let sleep_duration_part = Duration::from_millis(500);
1179 let spin_sleeper = SpinSleeper::default();
1180
1181 test_transition_stabilized_true_false(
1182 &spin_sleeper,
1183 sleep_duration_part,
1184 &mut mutex_tank_level_switch_signals
1185 );
1186
1187 stimuli_pin.set_high();
1188
1189 for i in 0..20 {
1190 spin_sleeper.sleep(sleep_duration_part);
1192 {
1193 let tank_level_switch_signals =
1194 mutex_tank_level_switch_signals.lock().unwrap();
1195 println!(
1196 "Tank level switch signals at iteration {}: Position={} Invalid={} Stabilized={}",
1197 i,
1198 tank_level_switch_signals.tank_level_switch_position,
1199 tank_level_switch_signals.tank_level_switch_invalid,
1200 tank_level_switch_signals.tank_level_switch_position_stabilized
1201 );
1202 assert_eq!(tank_level_switch_signals.tank_level_switch_position, true);
1203 assert_eq!(tank_level_switch_signals.tank_level_switch_position_stabilized, true);
1204 }
1205 }
1206
1207 stimuli_pin.set_low();
1208
1209 test_transition_stabilized_true_false(
1210 &spin_sleeper,
1211 sleep_duration_part,
1212 &mut mutex_tank_level_switch_signals
1213 );
1214
1215 let _ = tx_signal_handler_to_tank_level_switch.send(InternalCommand::Quit);
1216 rx_signal_handler_from_tank_level_switch.recv().unwrap();
1217 let _ = tx_signal_handler_to_tank_level_switch.send(InternalCommand::Terminate);
1218 spin_sleeper.sleep(sleep_duration_part);
1219 })
1220 .unwrap();
1221
1222 let join_handle_test_object = thread::Builder::new()
1224 .name("test_object".to_string())
1225 .spawn(move || {
1226 tank_level_switch.execute(
1227 &None,
1228 &None,
1229 &tx_tank_level_switch_to_signal_handler,
1230 &rx_tank_level_switch_from_signal_handler,
1231 mutex_tank_level_switch_signals_clone_for_test_object,
1232 );
1233 })
1234 .unwrap();
1235
1236 join_handle_test_object
1237 .join()
1238 .expect("Test object thread did not finish.");
1239 join_handle_test_environment
1240 .join()
1241 .expect("Test environment thread did not finish.");
1242 }
1243 #[test]
1244 pub fn test_refill_calc_tank_level_switch_position_stabilized_false_true_with_mutex_blocked_too_long(
1246 ) {
1247 let mut channels = Channels::new(false, false, true);
1248
1249 let (mut tx_test_environment_to_tcp, mut rx_tcp_from_test_environment) = channel(1);
1250
1251 let mut config: ConfigData =
1252 read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
1253 config.refill.initial_wait_interval = 2;
1254 config.refill.check_interval = 60;
1255
1256 let duration_test_case_part_1: u64 = 3; let duration_test_case_part_2: u64 = 2; #[cfg(all(target_os = "linux", feature = "target_hw"))]
1260 let mut tank_level_switch = TankLevelSwitch::new(
1261 config.tank_level_switch,
1262 false,
1263 None,
1264 config.gpio_handler.tank_level_switch,
1265 );
1266 #[cfg(any(not(target_os = "linux"), not(feature = "target_hw")))]
1267 let mut tank_level_switch = TankLevelSwitch::new(config.tank_level_switch, false).unwrap();
1268 let mutex_tank_level_switch_signals =
1269 Arc::new(Mutex::new(TankLevelSwitchSignals::new(true, true, false)));
1270 let mutex_tank_level_switch_signals_clone_for_test_environment =
1271 mutex_tank_level_switch_signals.clone();
1272
1273 let join_handle_mock_tcp = thread::Builder::new()
1275 .name("mock_tcp".to_string())
1276 .spawn(move || {
1277 mock_tcp_for_tank_level_switch(
1278 channels.tcp_communication,
1279 Some(&mut rx_tcp_from_test_environment),
1280 false,
1281 false,
1282 );
1283 })
1284 .unwrap();
1285
1286 let mut tx_test_environment_to_tcp_for_test_case_finish = channels
1287 .tank_level_switch
1288 .tx_tank_level_switch_to_tcp_opt
1289 .clone()
1290 .unwrap();
1291
1292 let join_handle_test_environment = thread::Builder::new()
1294 .name("test_environment".to_string())
1295 .spawn(move || {
1296 let sleep_duration_part_1 = Duration::from_secs(duration_test_case_part_1.into());
1297 let sleep_duration_part_2 = Duration::from_secs(duration_test_case_part_2.into());
1298 let sleep_duration_thread_processing = Duration::from_millis(50);
1299 let spin_sleeper = SpinSleeper::default();
1300 spin_sleeper.sleep(sleep_duration_part_1);
1301 let _ = tx_test_environment_to_tcp.send(TestCommand::UpdateSignal(
1302 MockSignal::TankLevelSwitchPosition(true),
1303 ));
1304 {
1305 let _unused = mutex_tank_level_switch_signals_clone_for_test_environment.lock();
1307 spin_sleeper.sleep(sleep_duration_part_2);
1308 }
1309 let _ = channels
1310 .signal_handler
1311 .send_to_heating(InternalCommand::Quit);
1312 let _ = channels
1313 .signal_handler
1314 .send_to_refill(InternalCommand::Quit);
1315 spin_sleeper.sleep(sleep_duration_thread_processing);
1316 let _ = channels
1317 .signal_handler
1318 .send_to_tank_level_switch(InternalCommand::Quit);
1319 channels
1320 .signal_handler
1321 .receive_from_tank_level_switch()
1322 .unwrap();
1323 let _ = channels
1324 .signal_handler
1325 .send_to_heating(InternalCommand::Terminate);
1326 let _ = channels
1327 .signal_handler
1328 .send_to_refill(InternalCommand::Terminate);
1329 let _ = tx_test_environment_to_tcp_for_test_case_finish.send(InternalCommand::Quit);
1330 let _ = channels
1331 .signal_handler
1332 .send_to_tank_level_switch(InternalCommand::Terminate);
1333 })
1334 .unwrap();
1335
1336 let join_handle_test_object = thread::Builder::new()
1338 .name("test_object".to_string())
1339 .spawn(move || {
1340 let _ = tank_level_switch.execute(
1341 &mut channels.tank_level_switch,
1342 mutex_tank_level_switch_signals,
1343 );
1344
1345 assert_eq!(tank_level_switch.mutex_access_duration_exceeded, true);
1347 })
1348 .unwrap();
1349
1350 join_handle_test_object
1351 .join()
1352 .expect("Test object thread did not finish.");
1353 join_handle_mock_tcp
1354 .join()
1355 .expect("Mock TCP thread did not finish.");
1356 join_handle_test_environment
1357 .join()
1358 .expect("Test environment thread did not finish.");
1359
1360 println!("* [TankLevelSwitch] testing immediate change false -> true for stabilized tank level position when invalid bit is false succeeded while having mutex blocked");
1361 }
1362}