aquarium_control/simulator/tcp_communication.rs
1/* Copyright 2024 Uwe Martin
2
3Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
4
5The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
6
7THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
8*/
9
10#[cfg(all(not(test), target_os = "linux"))]
11use log::info;
12use log::{debug, error};
13
14use spin_sleep::SpinSleeper;
15use std::{
16 io::{prelude::*, BufReader},
17 net::TcpStream,
18 time::Duration,
19};
20
21use crate::launch::channels::{AquaChannelError, AquaReceiver, AquaSender};
22use crate::launch::execution_config::ExecutionConfig;
23use crate::simulator::tcp_communication_channels::TcpCommunicationChannels;
24use crate::simulator::tcp_communication_config::TcpCommunicationConfig;
25pub(crate) use crate::simulator::tcp_communication_error::TcpCommunicationError;
26use crate::simulator::tcp_data::*;
27use crate::utilities::acknowledge_signal_handler::AcknowledgeSignalHandlerTrait;
28use crate::utilities::channel_content::{AquariumSignal, InternalCommand};
29use crate::utilities::logger::log_error_chain;
30use crate::utilities::proc_ext_req::ProcessExternalRequestTrait;
31#[cfg(all(not(test), target_os = "linux"))]
32use nix::unistd::gettid;
33
34#[cfg_attr(doc, aquamarine::aquamarine)]
35/// Contains configuration of implementation for opening, reading, and closing TCP channel.
36/// Thread communication is as follows:
37/// ```mermaid
38/// graph LR
39/// relay_manager[Relay Manager] --> tcp_communication[TCP Communication]
40/// sensor_manager[Sensor Manager] --> tcp_communication
41/// tcp_communication --> sensor_manager
42/// tank_level_switch[Tank Level Switch] --> tcp_communication
43/// tcp_communication --> tank_level_switch
44/// tcp_communication --> signal_handler[Signal Handler]
45/// signal_handler --> tcp_communication
46/// ```
47pub struct TcpCommunication {
48 /// the raw TCP stream
49 write_stream: TcpStream,
50
51 /// buffered reader for TCP stream
52 buf_reader: BufReader<TcpStream>,
53
54 /// information about which threads are started
55 execution_config: ExecutionConfig,
56}
57
58impl TcpCommunication {
59 /// Creates a new `TcpCommunication` instance and establishes a TCP connection to a simulator.
60 ///
61 /// This constructor attempts to connect to a TCP server (acting as a simulator)
62 /// at the specified IP address and port. Upon a successful connection, it prepares
63 /// both a `TcpStream` for writing and a `BufReader` for efficient reading from the stream.
64 ///
65 /// # Arguments
66 /// * `tcp_connection` - Configuration data for the TCP connection, including the
67 /// target `ip_address` and `port` of the simulator.
68 /// * `execution_config` - Information about which threads are started
69 ///
70 /// # Returns
71 /// A `Result` containing a new `TcpCommunication` struct with an established
72 /// TCP connection on success.
73 ///
74 /// # Errors
75 /// Returns a `TcpCommunicationError` if the connection or setup fails:
76 /// - `TcpCommunicationError::ConnectionFailed`: If the initial TCP connection to the
77 /// specified address and port cannot be established. This could be due to the
78 /// simulator not running, a firewall blocking the connection, or an incorrect address/port.
79 /// - `TcpCommunicationError::StreamCloneFailed`: If the underlying `TcpStream` cannot
80 /// be cloned, which is a necessary step for creating separate read and write handles.
81 pub fn new(
82 tcp_connection: &TcpCommunicationConfig,
83 execution_config: ExecutionConfig,
84 ) -> Result<TcpCommunication, TcpCommunicationError> {
85 let stream = TcpStream::connect((tcp_connection.ip_address.as_str(), tcp_connection.port))
86 .map_err(|e| TcpCommunicationError::ConnectionFailed {
87 location: module_path!().to_string(),
88 ip_address: tcp_connection.ip_address.clone(),
89 port: tcp_connection.port,
90 source: e,
91 })?;
92
93 let write_stream =
94 stream
95 .try_clone()
96 .map_err(|e| TcpCommunicationError::StreamCloneFailed {
97 location: module_path!().to_string(),
98 source: e,
99 })?;
100
101 Ok(TcpCommunication {
102 write_stream,
103 buf_reader: BufReader::new(stream),
104 execution_config,
105 })
106 }
107
108 /// Extracts the last word from a given string slice, using whitespace as the delimiter.
109 ///
110 /// This private helper function is useful for parsing responses where the desired
111 /// data (e.g., a numeric value) is consistently the final token in a space-separated sequence.
112 /// This implementation avoids heap allocation by using a reverse iterator.
113 ///
114 /// # Arguments
115 /// * `sequence` - The string slice from which to extract the last word.
116 ///
117 /// # Returns
118 /// An `Option<&str>`:
119 /// - `Some(&str)` containing a reference to the last word if the string contains words.
120 /// - `None` if the input string is empty or contains only whitespace.
121 fn get_last_word(sequence: &str) -> Option<&str> {
122 sequence.split_whitespace().last()
123 }
124
125 /// Writes a string request to the TCP stream and handles potential errors.
126 ///
127 /// This private helper function sends a `request` string as bytes over the TCP stream.
128 /// It verifies that the entire request was written successfully and flushes the stream
129 /// to ensure the data is sent immediately.
130 ///
131 /// # Arguments
132 /// * `request` - The `String` containing the data to be written to the TCP stream.
133 ///
134 /// # Returns
135 /// An empty `Result` (`Ok(())`) on a successful write-operation and flush.
136 ///
137 /// # Errors
138 /// Returns a `TcpCommunicationError` if the write or flush operation fails:
139 /// - `TcpCommunicationError::WritingToStreamFailed`: If the underlying OS returns an
140 /// error while writing to the TCP socket. This usually indicates a connection issue.
141 /// - `TcpCommunicationError::FlushFailed`: If the OS cannot flush its write buffer
142 /// to the socket, also indicating a connection problem.
143 fn write_stream_check_for_error(
144 &mut self,
145 request: String,
146 ) -> Result<(), TcpCommunicationError> {
147 self.write_stream
148 .write_all(request.as_bytes())
149 .map_err(|e| TcpCommunicationError::WritingToStreamFailed {
150 location: module_path!().to_string(),
151 source: e,
152 })?;
153 self.write_stream
154 .flush()
155 .map_err(|e| TcpCommunicationError::FlushFailed {
156 location: module_path!().to_string(),
157 source: e,
158 })
159 }
160
161 /// Communicates with the simulator to change the state of a specific relay.
162 ///
163 /// This private helper function constructs a TCP message to command a relay
164 /// to either switch `ON` or `OFF`. It then writes this command to the TCP stream
165 /// and waits for a response from the simulator to confirm the command was received.
166 ///
167 /// # Arguments
168 /// * `relay_id` - The numeric ID of the relay whose state is to be changed.
169 /// * `state` - A boolean value indicating the desired state: `true` for ON, `false` for OFF.
170 ///
171 /// # Returns
172 /// An empty `Result` (`Ok(())`) if the command was sent and a response was received successfully.
173 ///
174 /// # Errors
175 /// Returns a `TcpCommunicationError` if any part of the process fails:
176 /// - `TcpCommunicationError::ReadingFromStreamFailed`: If a response line cannot be
177 /// read from the simulator, which likely indicates the connection was dropped.
178 /// - It will also propagate any errors from `write_stream_check_for_error`.
179 fn change_relay(&mut self, relay_id: u16, state: bool) -> Result<(), TcpCommunicationError> {
180 let command = if state {
181 debug!(target: module_path!(), "switching on relay #{relay_id}");
182 command::SETSINGLE
183 } else {
184 debug!(target: module_path!(), "switching off relay #{relay_id}");
185 command::UNSETSINGLE
186 };
187
188 let request = format!("{} {} {}\n", category::CONTROLLINO, command, relay_id);
189
190 self.write_stream_check_for_error(request)?;
191
192 let mut response = String::new();
193 self.buf_reader.read_line(&mut response).map_err(|e| {
194 TcpCommunicationError::ReadingFromStreamFailed {
195 location: module_path!().to_string(),
196 source: e,
197 }
198 })?;
199 Ok(())
200 }
201
202 /// Communicates with the simulator to request and retrieve a sensor signal value.
203 ///
204 /// This private helper function constructs a TCP message to ask the simulator
205 /// for the current reading of a specific `AquariumSignal`. It then writes this
206 /// request to the TCP stream, waits for a response, and parses the received
207 /// string into an `f32` value.
208 ///
209 /// # Arguments
210 /// * `aquarium_signal` - The `AquariumSignal` enum variant describing the sensor
211 /// signal (e.g., water temperature, pH) for which the value is requested.
212 ///
213 /// # Returns
214 /// A `Result` containing the retrieved sensor value as an `f32` on success.
215 ///
216 /// # Errors
217 /// Returns a `TcpCommunicationError` if any part of the process fails:
218 /// - `TcpCommunicationError::IllegalSignalRequestToSimulator`: If an unsupported
219 /// `AquariumSignal` is requested.
220 /// - `TcpCommunicationError::ReadingFromStreamFailed`: If a response cannot be read
221 /// from the TCP stream.
222 /// - `TcpCommunicationError::ResponseContainsNoWords`: If the simulator's response
223 /// is empty or contains no parsable words.
224 /// - `TcpCommunicationError::LastWordOfResponseEmpty`: If the last word of the response
225 /// is an empty string.
226 /// - `TcpCommunicationError::ResponseConversionError`: If the last word of the response
227 /// cannot be parsed into an `f32`.
228 /// - It will also propagate any errors from `write_stream_check_for_error`.
229 fn get_signal(
230 &mut self,
231 aquarium_signal: AquariumSignal,
232 ) -> Result<f32, TcpCommunicationError> {
233 let (category, tcp_signal_identifier) = match aquarium_signal {
234 AquariumSignal::WaterTemperature => (
235 category::ATLASSCIENTIFIC,
236 signal::ATLASSCIENTIFICTEMPERATURE,
237 ),
238 AquariumSignal::pH => (category::ATLASSCIENTIFIC, signal::ATLASSCIENTIFICPH),
239 AquariumSignal::Conductivity => (
240 category::ATLASSCIENTIFIC,
241 signal::ATLASSCIENTIFICCONDUCTIVITY,
242 ),
243 AquariumSignal::AmbientTemperature => (category::DHT22, signal::AMBIENTTEMPERATURE),
244 AquariumSignal::AmbientHumidity => (category::DHT22, signal::AMBIENTHUMIDITY),
245 AquariumSignal::TankLevelSwitchPosition => {
246 (category::TANKLEVELSWITCH, signal::TANKLEVELSWITCHPOSITION)
247 }
248 AquariumSignal::TankLevelSwitchInvalid => {
249 (category::TANKLEVELSWITCH, signal::TANKLEVELSWITCHINVALID)
250 }
251 _ => {
252 return Err(TcpCommunicationError::IllegalSignalRequestToSimulator(
253 module_path!().to_string(),
254 aquarium_signal,
255 ));
256 }
257 };
258
259 let request = format!("{} {} {}\n", category, command::READ, tcp_signal_identifier);
260 self.write_stream_check_for_error(request)?;
261
262 let mut response = String::new();
263 self.buf_reader.read_line(&mut response).map_err(|e| {
264 TcpCommunicationError::ReadingFromStreamFailed {
265 location: module_path!().to_string(),
266 source: e,
267 }
268 })?;
269
270 // Flattened and more robust parsing logic
271 Self::get_last_word(&response)
272 .ok_or_else(|| {
273 TcpCommunicationError::ResponseContainsNoWords(module_path!().to_string())
274 })
275 .and_then(|last_word| {
276 let trimmed_word = last_word.trim();
277 if trimmed_word.is_empty() {
278 Err(TcpCommunicationError::LastWordOfResponseEmpty(
279 module_path!().to_string(),
280 ))
281 } else {
282 // Parse directly from the trimmed slice.
283 trimmed_word.parse::<f32>().map_err(|e| {
284 TcpCommunicationError::ResponseConversionError {
285 location: module_path!().to_string(),
286 last_word: trimmed_word.to_string(), // Only allocate string for the error case.
287 source: e,
288 }
289 })
290 }
291 })
292 }
293
294 /// Receives a `RequestSignal` command without blocking from a channel and sends back the corresponding sensor value via TCP.
295 ///
296 /// This private helper function is used to service requests from various modules
297 /// that are configured to run in simulator mode. It checks the provided receiver channel (`rx`) for an
298 /// `InternalCommand::RequestSignal`. If such a request is found, it uses `self.get_signal()`
299 /// to fetch the simulated sensor value and sends it back through the provided sender channel (`tx`).
300 /// Other command types are ignored.
301 ///
302 /// # Arguments
303 /// * `rx` - A reference to the receiver channel from which `InternalCommand` requests are received.
304 /// * `tx` - A reference to the sender channel through which the `f32` sensor response is sent back.
305 ///
306 /// # Returns
307 /// A `Result` which is:
308 /// - `Ok(f32)`: In the case of success
309 /// - `Err(TcpCommunicationError:)`: in case of failure
310 fn receive_and_answer(
311 &mut self,
312 rx: &mut AquaReceiver<InternalCommand>,
313 tx: &mut AquaSender<Result<f32, TcpCommunicationError>>,
314 ) -> Result<(), TcpCommunicationError> {
315 match rx.try_recv() {
316 Ok(InternalCommand::RequestSignal(s)) => {
317 // A signal was requested. Get it from the simulator.
318 let response = self.get_signal(s);
319 // Send the response (Ok or Err) back to the caller.
320 tx.send(response).map_err(|_| {
321 TcpCommunicationError::SendingResponseViaChannelFailure(
322 module_path!().to_string(),
323 )
324 })
325 }
326 Ok(_) => {
327 // Any other command was received, which we can safely ignore.
328 Ok(())
329 }
330 #[cfg(feature = "debug_channels")]
331 Err(AquaChannelError::Full) => {
332 // Not applicable. Do nothing.
333 Ok(())
334 }
335 Err(AquaChannelError::Empty) => {
336 // No command in the channel, which is a normal, non-error state.
337 Ok(())
338 }
339 Err(AquaChannelError::Disconnected) => {
340 // The channel is broken, which is a critical error.
341 Err(TcpCommunicationError::ChannelDisconnected(
342 module_path!().to_string(),
343 ))
344 }
345 }
346 }
347
348 /// Handles a request-response cycle for a simulated sensor if it's active.
349 fn handle_simulated_sensor(
350 &mut self,
351 is_active: bool,
352 rx_opt: &mut Option<AquaReceiver<InternalCommand>>,
353 tx_opt: &mut Option<AquaSender<Result<f32, TcpCommunicationError>>>,
354 module_name: &str,
355 ) {
356 if is_active {
357 if let (Some(rx), Some(tx)) = (rx_opt, tx_opt) {
358 if let Err(e) = self.receive_and_answer(rx, tx) {
359 log_error_chain(
360 module_path!(),
361 &format!("Error in communication with {module_name}"),
362 e,
363 );
364 }
365 }
366 }
367 }
368
369 /// Executes the main control loop for the TCP communication module.
370 ///
371 /// This function runs continuously as a server, processing incoming requests
372 /// from various client threads (e.g., Ambient, Atlas Scientific, Tank Level Switch, Relay Manager).
373 /// It dispatches these requests to the connected TCP simulator and sends back the responses.
374 ///
375 /// The loop remains active until a `Quit` command is received from the signal handler.
376 /// Upon receiving `Quit`, it breaks out of the loop and sends a confirmation back to the signal handler.
377 ///
378 /// # Arguments
379 /// * `tcp_communication_channels` - A mutable reference to a struct containing the
380 /// channels for communication with other threads,
381 /// including senders/receivers for SensorManager, Tank Level Switch,
382 /// Relay Manager, and the Signal Handler.
383 pub fn execute(&mut self, tcp_communication_channels: &mut TcpCommunicationChannels) {
384 #[cfg(all(target_os = "linux", not(test)))]
385 info!(target: module_path!(), "Thread started with TID: {}", gettid());
386
387 #[cfg(all(not(test), feature = "debug_channels"))]
388 tcp_communication_channels.report_status();
389
390 let spin_sleeper = SpinSleeper::default();
391 let sleep_interval_millis = 10;
392 let sleep_duration_ten_milli_sec = Duration::from_millis(sleep_interval_millis);
393 let mut lock_relay_manager_channel_disconnected = false;
394
395 loop {
396 let (quit_command_received, _, _) = self.process_external_request(
397 &mut tcp_communication_channels.rx_tcp_communication_from_signal_handler,
398 None,
399 );
400 if quit_command_received {
401 break;
402 }
403
404 // *** communication with sensor manager ***
405 self.handle_simulated_sensor(
406 self.execution_config.sensor_manager,
407 &mut tcp_communication_channels.rx_tcp_communication_from_sensor_manager_opt,
408 &mut tcp_communication_channels.tx_tcp_communication_to_sensor_manager_opt,
409 "sensor manager",
410 );
411
412 // *** communication with Tank level switch ***
413 self.handle_simulated_sensor(
414 self.execution_config.tank_level_switch,
415 &mut tcp_communication_channels.rx_tcp_communication_from_tank_level_switch_opt,
416 &mut tcp_communication_channels.tx_tcp_communication_to_tank_level_switch_opt,
417 "tank level switch",
418 );
419
420 // *** communication with relay manager - no response required ***
421 if !lock_relay_manager_channel_disconnected && self.execution_config.relay_manager {
422 match tcp_communication_channels.rx_tcp_communication_from_relay_manager_opt {
423 Some(ref mut rx_tcp_communication_from_relay_manager) => {
424 match rx_tcp_communication_from_relay_manager.try_recv() {
425 Ok(c) => match c {
426 InternalCommand::SetRelay(c) => {
427 let response = self.change_relay(c, true);
428 if let Err(response_err) = response {
429 log_error_chain(
430 module_path!(),
431 "Error in communication to relay manager",
432 response_err,
433 );
434 }
435 }
436 InternalCommand::UnsetRelay(c) => {
437 let response = self.change_relay(c, false);
438 if let Err(response_err) = response {
439 log_error_chain(
440 module_path!(),
441 "Error in communication to relay manager",
442 response_err,
443 );
444 }
445 }
446 _ => { /* ignore all other commands */ }
447 },
448 #[cfg(feature = "debug_channels")]
449 Err(AquaChannelError::Full) => { /* Not applicable - do nothing */ }
450 Err(AquaChannelError::Empty) => { /* empty buffer - no action required */
451 }
452 Err(AquaChannelError::Disconnected) => {
453 lock_relay_manager_channel_disconnected = true;
454 error!(
455 target: module_path!(),
456 "Error in trying to receive command from relay manager"
457 );
458 }
459 }
460 }
461 None => {
462 // no value provided - do nothing
463 }
464 }
465 }
466 // **********************************
467 spin_sleeper.sleep(sleep_duration_ten_milli_sec);
468 }
469
470 tcp_communication_channels.acknowledge_signal_handler();
471 }
472}