aquarium_control/dispatch/
messaging.rs1#[cfg(feature = "debug_messaging")]
61use log::debug;
62
63cfg_if::cfg_if! {
64 if #[cfg(target_os = "linux")] {
65 #[cfg(not(test))]
66 use log::info;
67
68 use log::{error, warn};
69 use posixmq::PosixMq;
70 use std::time::Duration;
71 use spin_sleep::SpinSleeper;
72 #[cfg(not(test))]
73 use nix::unistd::gettid;
74
75 use crate::utilities::channel_content::InternalCommand;
76 use crate::dispatch::messaging_channels::MessagingChannels;
77 use crate::dispatch::messaging_error::MessagingError;
78 use crate::utilities::proc_ext_req::ProcessExternalRequestTrait;
79 use crate::utilities::wait_for_termination::WaitForTerminationTrait;
80 use crate::utilities::acknowledge_signal_handler::AcknowledgeSignalHandlerTrait;
81 use crate::dispatch::messaging_domain::MessagingDomain;
82 use crate::dispatch::messaging_config::MessagingConfig;
83 use crate::launch::execution_config::ExecutionConfig;
84 use std::io::ErrorKind;
85 }
86}
87
88#[allow(unused)]
90pub struct Message {
91 domain: i32,
92 pub command: i32,
93 pub command_param1: i32,
94 command_param2: i32,
95}
96
97#[allow(unused)]
98impl Message {
99 pub fn new() -> Message {
108 Message {
109 domain: 0,
110 command: 0,
111 command_param1: 0,
112 command_param2: 0,
113 }
114 }
115
116 #[cfg(test)]
117 pub fn create_message_buf(&self) -> Vec<u8> {
119 let mut buf = vec![0; 20];
120 let domain_buf = self.domain.to_le_bytes();
121 let command_buf = self.command.to_le_bytes();
122 let command_param1_buf = self.command_param1.to_le_bytes();
123 let command_param2_buf = self.command_param2.to_le_bytes();
124
125 buf[4] = domain_buf[0];
127 buf[5] = domain_buf[1];
128 buf[6] = domain_buf[2];
129 buf[7] = domain_buf[3];
130 buf[8] = command_buf[0];
131 buf[9] = command_buf[1];
132 buf[10] = command_buf[2];
133 buf[11] = command_buf[3];
134 buf[12] = command_param1_buf[0];
135 buf[13] = command_param1_buf[1];
136 buf[14] = command_param1_buf[2];
137 buf[15] = command_param1_buf[3];
138 buf[16] = command_param2_buf[0];
139 buf[17] = command_param2_buf[1];
140 buf[18] = command_param2_buf[2];
141 buf[19] = command_param2_buf[3];
142
143 buf
144 }
145}
146
147#[cfg_attr(doc, aquamarine::aquamarine)]
148#[cfg(target_os = "linux")]
162pub struct Messaging {
163 config: MessagingConfig,
164
165 mq: PosixMq,
167 mq_max_msg_len: usize,
168
169 pub lock_warn_inapplicable_command_signal_handler: bool,
171
172 pub lock_error_channel_receive_termination: bool,
174
175 execution_config: ExecutionConfig,
177}
178
179#[cfg(target_os = "linux")]
180impl Messaging {
181 pub fn new(
204 config: MessagingConfig,
205 execution_config: ExecutionConfig,
206 ) -> Result<Messaging, MessagingError> {
207 let mq_file_name_clone_for_error = config.mq_filename.clone();
208
209 let mq = PosixMq::open(&config.mq_filename).map_err(|e| {
210 MessagingError::PosixMessageQueueOpeningError {
211 location: module_path!().to_string(),
212 mq_name: mq_file_name_clone_for_error,
213 source: e,
214 }
215 })?;
216
217 let mq_max_msg_len = match mq.attributes() {
220 Ok(c) => c.max_msg_len,
221 Err(e) => {
222 return Err(MessagingError::PosixMessageQueueAttributesError {
223 location: module_path!().to_string(),
224 source: e,
225 });
226 }
227 };
228 #[cfg(not(target_os = "linux"))]
229 let mq_max_msg_len = 0;
230
231 Ok(Messaging {
232 config,
233 lock_warn_inapplicable_command_signal_handler: false,
234 lock_error_channel_receive_termination: false,
235 mq,
236 mq_max_msg_len,
237 execution_config,
238 })
239 }
240
241 pub fn execute(&mut self, messaging_channels: &mut MessagingChannels) {
259 #[cfg(not(test))]
260 info!(target: module_path!(), "Thread started with TID: {}", gettid());
261 let duration_timeout = Duration::from_millis(self.config.timeout_millis);
262
263 let mut msgbuf = vec![0; self.mq_max_msg_len];
264 let mut msg = Message::new();
265
266 let sleep_duration = Duration::from_millis(10);
267 let spin_sleeper = SpinSleeper::default();
268
269 loop {
270 match self.mq.recv_timeout(&mut msgbuf, duration_timeout) {
271 Ok((_r1, _r2)) => {
272 if !msgbuf.is_empty() {
273 let mut domain_buf: [u8; 4] = [0; 4];
274 domain_buf[0] = msgbuf[4];
275 domain_buf[1] = msgbuf[5];
276 domain_buf[2] = msgbuf[6];
277 domain_buf[3] = msgbuf[7];
278 msg.domain = i32::from_le_bytes(domain_buf);
279
280 let mut command_buf: [u8; 4] = [0; 4];
281 command_buf[0] = msgbuf[8];
282 command_buf[1] = msgbuf[9];
283 command_buf[2] = msgbuf[10];
284 command_buf[3] = msgbuf[11];
285 msg.command = i32::from_le_bytes(command_buf);
286
287 let mut command_param1_buf: [u8; 4] = [0; 4];
288 command_param1_buf[0] = msgbuf[12];
289 command_param1_buf[1] = msgbuf[13];
290 command_param1_buf[2] = msgbuf[14];
291 command_param1_buf[3] = msgbuf[15];
292 msg.command_param1 = i32::from_le_bytes(command_param1_buf);
293
294 let mut command_param2_buf: [u8; 4] = [0; 4];
295 command_param2_buf[0] = msgbuf[16];
296 command_param2_buf[1] = msgbuf[17];
297 command_param2_buf[2] = msgbuf[18];
298 command_param2_buf[3] = msgbuf[19];
299 msg.command_param2 = i32::from_le_bytes(command_param2_buf);
300
301 #[cfg(not(test))]
302 info!(
303 target: module_path!(),
304 "received message: domain={}, cmd={}, param1={}, param2={}",
305 msg.domain,
306 msg.command,
307 msg.command_param1,
308 msg.command_param2
309 );
310
311 let domain = MessagingDomain::from(msg.domain);
313 if domain == MessagingDomain::Unknown {
314 warn!(
315 target: module_path!(),
316 "ignoring message for unknown domain {}",
317 msg.domain
318 );
319 }
320
321 let command = InternalCommand::from_msg(&msg);
323 if command == InternalCommand::Unknown {
324 warn!(
325 target: module_path!(),
326 "Messaging: ignoring message with unknown command {}",
327 msg.command
328 );
329 }
330
331 if (domain != MessagingDomain::Unknown)
333 && (command != InternalCommand::Unknown)
334 && (domain.is_domain_thread_executed(&self.execution_config))
335 {
336 #[cfg(not(test))]
337 info!(
338 target: module_path!(),
339 "forwarding message to domain {}",
340 domain
341 );
342
343 match messaging_channels.send_command_to_domain(command, domain.clone())
344 {
345 Ok(_) => { }
346 Err(e) => {
347 error!(
348 target: module_path!(),
349 "channel communication to domain {} failed ({e:?})",
350 domain
351 );
352 }
353 }
354 }
355 } else {
356 error!(
357 target: module_path!(),
358 "message has incorrect length ({}). ignoring it.",
359 msgbuf.len()
360 );
361 }
362 }
363 Err(e) => {
364 match e.kind() {
365 ErrorKind::TimedOut => { }
366 _ => {
367 error!(
368 target: module_path!(),
369 "Error occurred when reading POSIX message ({e:?})"
370 );
371 }
372 }
373 }
374 }
375 let (quit_command_received, _, _) = self.process_external_request(
376 &mut messaging_channels.rx_messaging_from_signal_handler,
377 None,
378 );
379 if quit_command_received {
380 #[cfg(feature = "debug_messaging")]
381 debug!(
382 target: module_path!(),
383 "received QUIT command from signal handler"
384 );
385
386 break;
387 }
388
389 spin_sleeper.sleep(sleep_duration);
390 }
391
392 messaging_channels.acknowledge_signal_handler();
393
394 self.wait_for_termination(
398 &mut messaging_channels.rx_messaging_from_signal_handler,
399 sleep_duration,
400 module_path!(),
401 );
402 }
403}
404
405#[cfg(test)]
406#[cfg(target_os = "linux")]
407pub mod tests {
408 use posixmq::PosixMq;
409 use spin_sleep::SpinSleeper;
410 use std::{thread, time::Duration};
411
412 use crate::dispatch::messaging::{Message, Messaging};
413 use crate::dispatch::messaging_channels::MessagingChannels;
414 use crate::dispatch::messaging_domain::MessagingDomain;
415 use crate::launch::channels::AquaSender;
416 use crate::launch::channels::Channels;
417 use crate::launch::execution_config::ExecutionConfig;
418 use crate::mocks::mock_message_command_receiver::tests::mock_message_command_receiver;
419 use crate::utilities::channel_content::InternalCommand;
420 use crate::utilities::config::{read_config_file, ConfigData};
421 use crate::utilities::signal_handler_channels::SignalHandlerChannels;
422
423 fn open_posixmq_test_file(filename: &str) -> PosixMq {
440 match PosixMq::open(filename) {
441 Ok(c) => c,
442 Err(e) => {
443 panic!(
444 "{}: failure to open message queue for test using {}, error={e:?}",
445 module_path!(),
446 filename
447 );
448 }
449 }
450 }
451
452 fn create_test_object(
468 mut messaging: Messaging,
469 mut messaging_channels: MessagingChannels,
470 ) -> thread::JoinHandle<()> {
471 thread::Builder::new()
472 .name("test_object".to_string())
473 .spawn(move || {
474 messaging.execute(&mut messaging_channels);
475 })
476 .unwrap()
477 }
478
479 fn create_test_environment(
500 msg_commands: Vec<Message>,
501 mut tx_environment_to_object: AquaSender<InternalCommand>,
502 signal_handler_channels: SignalHandlerChannels,
503 mq_filename: String,
504 ) -> thread::JoinHandle<()> {
505 thread::Builder::new()
506 .name("test_environment".to_string())
507 .spawn(move || {
508 let sleep_duration_100_millis = Duration::from_millis(100);
509 let spin_sleeper = SpinSleeper::default();
510
511 spin_sleeper.sleep(sleep_duration_100_millis);
512
513 let mq_stimuli = open_posixmq_test_file(&mq_filename);
514
515 for msg_command in msg_commands {
516 let _ = mq_stimuli.send(0, &msg_command.create_message_buf());
518
519 spin_sleeper.sleep(sleep_duration_100_millis);
520 }
521
522 let _ = tx_environment_to_object.send(InternalCommand::Quit);
524
525 let mut tx_signal_handler_to_messaging = signal_handler_channels
526 .tx_signal_handler_to_messaging_opt
527 .unwrap();
528 let mut rx_signal_handler_from_messaging = signal_handler_channels
529 .rx_signal_handler_from_messaging_opt
530 .unwrap();
531
532 let _ = tx_signal_handler_to_messaging.send(InternalCommand::Quit);
534
535 let _ = rx_signal_handler_from_messaging.recv();
537
538 let _ = tx_signal_handler_to_messaging.send(InternalCommand::Terminate);
539 })
540 .unwrap()
541 }
542
543 fn assert_stop_start_sequence(mut commands_received: Vec<InternalCommand>) {
560 let spin_sleeper = SpinSleeper::default();
561 spin_sleeper.sleep(Duration::from_millis(100));
562
563 for command in &commands_received {
564 println!("assert_stop_start_sequence received command: {command:?}");
565 }
566 assert_eq!(2, commands_received.len());
568
569 assert_eq!(InternalCommand::Start, commands_received.pop().unwrap());
571
572 assert_eq!(InternalCommand::Stop, commands_received.pop().unwrap());
574 }
575
576 #[test]
578 pub fn test_messaging_ventilation() {
579 let mut config: ConfigData =
580 read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
581 config.messaging.mq_filename = "/aquarium_control.5UXp".to_string();
582 let mq_filename_for_test_environment = config.messaging.mq_filename.clone();
583
584 let mut msg_start = Message::new();
585 msg_start.domain = MessagingDomain::Ventilation.into();
586 msg_start.command = InternalCommand::Start.to_numeric();
587
588 let mut msg_stop = Message::new();
589 msg_stop.domain = MessagingDomain::Ventilation.into();
590 msg_stop.command = InternalCommand::Stop.to_numeric();
591
592 let messaging = Messaging::new(config.messaging, ExecutionConfig::default()).unwrap();
593
594 let channels = Channels::new_for_test();
595
596 let tx_test_environment_to_ventilation =
597 channels.messaging.tx_messaging_to_ventilation.clone();
598
599 let join_handle_mock_ventilation = thread::Builder::new()
600 .name("mock_ventilation".to_string())
601 .spawn(move || {
602 assert_stop_start_sequence(mock_message_command_receiver(
603 channels
604 .ventilation
605 .rx_ventilation_from_messaging_opt
606 .unwrap(),
607 ));
608 })
609 .unwrap();
610
611 let join_handle_test_environment = create_test_environment(
613 vec![msg_stop, msg_start], tx_test_environment_to_ventilation,
615 channels.signal_handler,
616 mq_filename_for_test_environment,
617 );
618
619 let join_handle_test_object = create_test_object(messaging, channels.messaging);
621
622 join_handle_mock_ventilation
623 .join()
624 .expect("Mock ventilation thread did not finish.");
625 join_handle_test_environment
626 .join()
627 .expect("Test environment thread did not finish.");
628 join_handle_test_object
629 .join()
630 .expect("Test object thread did not finish.");
631 }
632
633 #[test]
635 pub fn test_messaging_heating() {
636 let mut config: ConfigData =
637 read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
638 config.messaging.mq_filename = "/aquarium_control.6UXp".to_string();
639 let mq_filename_for_test_environment = config.messaging.mq_filename.clone();
640
641 let mut msg_start = Message::new();
642 msg_start.domain = MessagingDomain::Heating.into();
643 msg_start.command = InternalCommand::Start.to_numeric();
644
645 let mut msg_stop = Message::new();
646 msg_stop.domain = MessagingDomain::Heating.into();
647 msg_stop.command = InternalCommand::Stop.to_numeric();
648
649 let messaging = Messaging::new(config.messaging, ExecutionConfig::default()).unwrap();
650
651 let channels = Channels::new_for_test();
652
653 let tx_test_environment_to_heating = channels.messaging.tx_messaging_to_heating.clone();
654
655 let join_handle_mock_heating = thread::Builder::new()
656 .name("mock_heating".to_string())
657 .spawn(move || {
658 assert_stop_start_sequence(mock_message_command_receiver(
659 channels.heating.rx_heating_from_messaging_opt.unwrap(),
660 ));
661 })
662 .unwrap();
663
664 let join_handle_test_environment = create_test_environment(
666 vec![msg_stop, msg_start], tx_test_environment_to_heating,
668 channels.signal_handler,
669 mq_filename_for_test_environment,
670 );
671
672 let join_handle_test_object = create_test_object(messaging, channels.messaging);
674
675 join_handle_mock_heating
676 .join()
677 .expect("Mock heating thread did not finish.");
678 join_handle_test_environment
679 .join()
680 .expect("Test environment thread did not finish.");
681 join_handle_test_object
682 .join()
683 .expect("Test object thread did not finish.");
684 }
685
686 #[test]
688 pub fn test_messaging_refill() {
689 let mut config: ConfigData =
690 read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
691 config.messaging.mq_filename = "/aquarium_control.7UXp".to_string();
692 let mq_file_name_for_test_environment = config.messaging.mq_filename.clone();
693
694 let mut msg_reset = Message::new();
695 msg_reset.domain = MessagingDomain::Refill.into();
696 msg_reset.command = InternalCommand::ResetAllErrors.to_numeric();
697
698 let mut msg_start = Message::new();
699 msg_start.domain = MessagingDomain::Refill.into();
700 msg_start.command = InternalCommand::Start.to_numeric();
701
702 let mut msg_stop = Message::new();
703 msg_stop.domain = MessagingDomain::Refill.into();
704 msg_stop.command = InternalCommand::Stop.to_numeric();
705
706 let messaging = Messaging::new(config.messaging, ExecutionConfig::default()).unwrap();
707
708 let channels = Channels::new_for_test();
709
710 let tx_test_environment_to_refill = channels.messaging.tx_messaging_to_refill.clone();
711
712 let join_handle_mock_refill = thread::Builder::new()
713 .name("mock_refill".to_string())
714 .spawn(move || {
715 let mut commands_received = mock_message_command_receiver(
716 channels.refill.rx_refill_from_messaging_opt.unwrap(),
717 );
718
719 assert_eq!(3, commands_received.len());
721
722 assert_eq!(InternalCommand::Start, commands_received.pop().unwrap());
724
725 assert_eq!(InternalCommand::Stop, commands_received.pop().unwrap());
727
728 assert_eq!(
730 InternalCommand::ResetAllErrors,
731 commands_received.pop().unwrap()
732 );
733 })
734 .unwrap();
735
736 let join_handle_test_environment = create_test_environment(
739 vec![msg_reset, msg_stop, msg_start], tx_test_environment_to_refill,
741 channels.signal_handler,
742 mq_file_name_for_test_environment,
743 );
744
745 let join_handle_test_object = create_test_object(messaging, channels.messaging);
747
748 join_handle_mock_refill
749 .join()
750 .expect("Mock refill thread did not finish.");
751 join_handle_test_environment
752 .join()
753 .expect("Test environment thread did not finish.");
754 join_handle_test_object
755 .join()
756 .expect("Test object thread did not finish.");
757 }
758
759 #[test]
761 pub fn test_messaging_feed() {
762 let mut config: ConfigData =
763 read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
764 config.messaging.mq_filename = "/aquarium_control.8UXp".to_string();
765 let mq_filename_for_test_environment = config.messaging.mq_filename.clone();
766
767 let mut msg_exec = Message::new();
768 msg_exec.domain = MessagingDomain::Feed.into();
769 msg_exec.command = InternalCommand::Execute(13).to_numeric();
770 msg_exec.command_param1 = 13;
771
772 let mut msg_start = Message::new();
773 msg_start.domain = MessagingDomain::Feed.into();
774 msg_start.command = InternalCommand::Start.to_numeric();
775
776 let mut msg_stop = Message::new();
777 msg_stop.domain = MessagingDomain::Feed.into();
778 msg_stop.command = InternalCommand::Stop.to_numeric();
779
780 let messaging = Messaging::new(config.messaging, ExecutionConfig::default()).unwrap();
781
782 let channels = Channels::new_for_test();
783
784 let tx_test_environment_to_feed = channels.messaging.tx_messaging_to_feed.clone();
785
786 let join_handle_mock_feed = thread::Builder::new()
787 .name("mock_feed".to_string())
788 .spawn(move || {
789 let mut commands_received = mock_message_command_receiver(
790 channels.feed.rx_feed_from_messaging_opt.unwrap(),
791 );
792
793 assert_eq!(3, commands_received.len());
795
796 assert_eq!(InternalCommand::Start, commands_received.pop().unwrap());
798
799 assert_eq!(InternalCommand::Stop, commands_received.pop().unwrap());
801
802 assert_eq!(
804 InternalCommand::Execute(13),
805 commands_received.pop().unwrap()
806 );
807 })
808 .unwrap();
809
810 let join_handle_test_environment = create_test_environment(
812 vec![msg_exec, msg_stop, msg_start], tx_test_environment_to_feed,
814 channels.signal_handler,
815 mq_filename_for_test_environment,
816 );
817
818 let join_handle_test_object = create_test_object(messaging, channels.messaging);
820
821 join_handle_mock_feed
822 .join()
823 .expect("Mock feed thread did not finish.");
824 join_handle_test_environment
825 .join()
826 .expect("Test environment thread did not finish.");
827 join_handle_test_object
828 .join()
829 .expect("Test object thread did not finish.");
830 }
831
832 #[test]
834 pub fn test_messaging_balling() {
835 let mut config: ConfigData =
836 read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
837 config.messaging.mq_filename = "/aquarium_control.9UXp".to_string();
838 let mq_filename_for_test_environment = config.messaging.mq_filename.clone();
839
840 open_posixmq_test_file(&config.messaging.mq_filename);
841
842 let mut msg_start = Message::new();
843 msg_start.domain = MessagingDomain::Balling.into();
844 msg_start.command = InternalCommand::Start.to_numeric();
845
846 let mut msg_stop = Message::new();
847 msg_stop.domain = MessagingDomain::Balling.into();
848 msg_stop.command = InternalCommand::Stop.to_numeric();
849
850 let messaging = Messaging::new(config.messaging, ExecutionConfig::default()).unwrap();
851
852 let channels = Channels::new_for_test();
853
854 let tx_test_environment_to_balling = channels.messaging.tx_messaging_to_balling.clone();
855
856 let join_handle_mock_balling = thread::Builder::new()
857 .name("mock_balling".to_string())
858 .spawn(move || {
859 assert_stop_start_sequence(mock_message_command_receiver(
860 channels.balling.rx_balling_from_messaging_opt.unwrap(),
861 ));
862 })
863 .unwrap();
864
865 let join_handle_test_environment = create_test_environment(
867 vec![msg_stop, msg_start], tx_test_environment_to_balling,
869 channels.signal_handler,
870 mq_filename_for_test_environment,
871 );
872
873 let join_handle_test_object = create_test_object(messaging, channels.messaging);
875
876 join_handle_mock_balling
877 .join()
878 .expect("Mock Balling thread did not finish.");
879 join_handle_test_environment
880 .join()
881 .expect("Test environment thread did not finish.");
882 join_handle_test_object
883 .join()
884 .expect("Test object thread did not finish.");
885 }
886
887 #[test]
889 pub fn test_messaging_watchdog() {
890 let mut config: ConfigData =
891 read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap();
892 config.messaging.mq_filename = "/aquarium_control.9UXp".to_string();
893 let mq_filename_for_test_environment = config.messaging.mq_filename.clone();
894
895 let mut msg_start = Message::new();
896 msg_start.domain = MessagingDomain::Watchdog.into();
897 msg_start.command = InternalCommand::Start.to_numeric();
898
899 let mut msg_stop = Message::new();
900 msg_stop.domain = MessagingDomain::Watchdog.into();
901 msg_stop.command = InternalCommand::Stop.to_numeric();
902
903 let messaging = Messaging::new(config.messaging, ExecutionConfig::default()).unwrap();
904
905 let channels = Channels::new_for_test();
906
907 let tx_test_environment_to_watchdog = channels.messaging.tx_messaging_to_watchdog.clone();
908
909 let join_handle_mock_watchdog = thread::Builder::new()
910 .name("mock_watchdog".to_string())
911 .spawn(move || {
912 assert_stop_start_sequence(mock_message_command_receiver(
913 channels.watchdog.rx_watchdog_from_messaging_opt.unwrap(),
914 ));
915 })
916 .unwrap();
917
918 let join_handle_test_environment = create_test_environment(
920 vec![msg_stop, msg_start], tx_test_environment_to_watchdog,
922 channels.signal_handler,
923 mq_filename_for_test_environment,
924 );
925
926 let join_handle_test_object = create_test_object(messaging, channels.messaging);
928
929 join_handle_mock_watchdog
930 .join()
931 .expect("Mock Balling thread did not finish.");
932 join_handle_test_environment
933 .join()
934 .expect("Test environment thread did not finish.");
935 join_handle_test_object
936 .join()
937 .expect("Test object thread did not finish.");
938 }
939}