aquarium_control/simulator/
tcp_communication.rs

1/* Copyright 2024 Uwe Martin
2
3Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
4
5The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
6
7THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
8*/
9
10#[cfg(all(not(test), target_os = "linux"))]
11use log::info;
12use log::{debug, error};
13
14use spin_sleep::SpinSleeper;
15use std::{
16    io::{prelude::*, BufReader},
17    net::TcpStream,
18    time::Duration,
19};
20
21use crate::launch::channels::{AquaChannelError, AquaReceiver, AquaSender};
22use crate::launch::execution_config::ExecutionConfig;
23use crate::simulator::tcp_communication_channels::TcpCommunicationChannels;
24use crate::simulator::tcp_communication_config::TcpCommunicationConfig;
25pub(crate) use crate::simulator::tcp_communication_error::TcpCommunicationError;
26use crate::simulator::tcp_data::*;
27use crate::utilities::acknowledge_signal_handler::AcknowledgeSignalHandlerTrait;
28use crate::utilities::channel_content::{AquariumSignal, InternalCommand};
29use crate::utilities::logger::log_error_chain;
30use crate::utilities::proc_ext_req::ProcessExternalRequestTrait;
31#[cfg(all(not(test), target_os = "linux"))]
32use nix::unistd::gettid;
33
34#[cfg_attr(doc, aquamarine::aquamarine)]
35/// Contains configuration of implementation for opening, reading, and closing TCP channel.
36/// Thread communication is as follows:
37/// ```mermaid
38/// graph LR
39///     relay_manager[Relay Manager] --> tcp_communication[TCP Communication]
40///     sensor_manager[Sensor Manager] --> tcp_communication
41///     tcp_communication --> sensor_manager
42///     tank_level_switch[Tank Level Switch] --> tcp_communication
43///     tcp_communication --> tank_level_switch
44///     tcp_communication --> signal_handler[Signal Handler]
45///     signal_handler --> tcp_communication
46/// ```
47pub struct TcpCommunication {
48    /// the raw TCP stream
49    write_stream: TcpStream,
50
51    /// buffered reader for TCP stream
52    buf_reader: BufReader<TcpStream>,
53
54    /// information about which threads are started
55    execution_config: ExecutionConfig,
56}
57
58impl TcpCommunication {
59    /// Creates a new `TcpCommunication` instance and establishes a TCP connection to a simulator.
60    ///
61    /// This constructor attempts to connect to a TCP server (acting as a simulator)
62    /// at the specified IP address and port. Upon a successful connection, it prepares
63    /// both a `TcpStream` for writing and a `BufReader` for efficient reading from the stream.
64    ///
65    /// # Arguments
66    /// * `tcp_connection` - Configuration data for the TCP connection, including the
67    ///   target `ip_address` and `port` of the simulator.
68    /// * `execution_config` - Information about which threads are started
69    ///
70    /// # Returns
71    /// A `Result` containing a new `TcpCommunication` struct with an established
72    /// TCP connection on success.
73    ///
74    /// # Errors
75    /// Returns a `TcpCommunicationError` if the connection or setup fails:
76    /// - `TcpCommunicationError::ConnectionFailed`: If the initial TCP connection to the
77    ///   specified address and port cannot be established. This could be due to the
78    ///   simulator not running, a firewall blocking the connection, or an incorrect address/port.
79    /// - `TcpCommunicationError::StreamCloneFailed`: If the underlying `TcpStream` cannot
80    ///   be cloned, which is a necessary step for creating separate read and write handles.
81    pub fn new(
82        tcp_connection: &TcpCommunicationConfig,
83        execution_config: ExecutionConfig,
84    ) -> Result<TcpCommunication, TcpCommunicationError> {
85        let stream = TcpStream::connect((tcp_connection.ip_address.as_str(), tcp_connection.port))
86            .map_err(|e| TcpCommunicationError::ConnectionFailed {
87                location: module_path!().to_string(),
88                ip_address: tcp_connection.ip_address.clone(),
89                port: tcp_connection.port,
90                source: e,
91            })?;
92
93        let write_stream =
94            stream
95                .try_clone()
96                .map_err(|e| TcpCommunicationError::StreamCloneFailed {
97                    location: module_path!().to_string(),
98                    source: e,
99                })?;
100
101        Ok(TcpCommunication {
102            write_stream,
103            buf_reader: BufReader::new(stream),
104            execution_config,
105        })
106    }
107
108    /// Extracts the last word from a given string slice, using whitespace as the delimiter.
109    ///
110    /// This private helper function is useful for parsing responses where the desired
111    /// data (e.g., a numeric value) is consistently the final token in a space-separated sequence.
112    /// This implementation avoids heap allocation by using a reverse iterator.
113    ///
114    /// # Arguments
115    /// * `sequence` - The string slice from which to extract the last word.
116    ///
117    /// # Returns
118    /// An `Option<&str>`:
119    /// - `Some(&str)` containing a reference to the last word if the string contains words.
120    /// - `None` if the input string is empty or contains only whitespace.
121    fn get_last_word(sequence: &str) -> Option<&str> {
122        sequence.split_whitespace().last()
123    }
124
125    /// Writes a string request to the TCP stream and handles potential errors.
126    ///
127    /// This private helper function sends a `request` string as bytes over the TCP stream.
128    /// It verifies that the entire request was written successfully and flushes the stream
129    /// to ensure the data is sent immediately.
130    ///
131    /// # Arguments
132    /// * `request` - The `String` containing the data to be written to the TCP stream.
133    ///
134    /// # Returns
135    /// An empty `Result` (`Ok(())`) on a successful write-operation and flush.
136    ///
137    /// # Errors
138    /// Returns a `TcpCommunicationError` if the write or flush operation fails:
139    /// - `TcpCommunicationError::WritingToStreamFailed`: If the underlying OS returns an
140    ///   error while writing to the TCP socket. This usually indicates a connection issue.
141    /// - `TcpCommunicationError::FlushFailed`: If the OS cannot flush its write buffer
142    ///   to the socket, also indicating a connection problem.
143    fn write_stream_check_for_error(
144        &mut self,
145        request: String,
146    ) -> Result<(), TcpCommunicationError> {
147        self.write_stream
148            .write_all(request.as_bytes())
149            .map_err(|e| TcpCommunicationError::WritingToStreamFailed {
150                location: module_path!().to_string(),
151                source: e,
152            })?;
153        self.write_stream
154            .flush()
155            .map_err(|e| TcpCommunicationError::FlushFailed {
156                location: module_path!().to_string(),
157                source: e,
158            })
159    }
160
161    /// Communicates with the simulator to change the state of a specific relay.
162    ///
163    /// This private helper function constructs a TCP message to command a relay
164    /// to either switch `ON` or `OFF`. It then writes this command to the TCP stream
165    /// and waits for a response from the simulator to confirm the command was received.
166    ///
167    /// # Arguments
168    /// * `relay_id` - The numeric ID of the relay whose state is to be changed.
169    /// * `state` - A boolean value indicating the desired state: `true` for ON, `false` for OFF.
170    ///
171    /// # Returns
172    /// An empty `Result` (`Ok(())`) if the command was sent and a response was received successfully.
173    ///
174    /// # Errors
175    /// Returns a `TcpCommunicationError` if any part of the process fails:
176    /// - `TcpCommunicationError::ReadingFromStreamFailed`: If a response line cannot be
177    ///   read from the simulator, which likely indicates the connection was dropped.
178    /// - It will also propagate any errors from `write_stream_check_for_error`.
179    fn change_relay(&mut self, relay_id: u16, state: bool) -> Result<(), TcpCommunicationError> {
180        let command = if state {
181            debug!(target: module_path!(), "switching on relay #{relay_id}");
182            command::SETSINGLE
183        } else {
184            debug!(target: module_path!(), "switching off relay #{relay_id}");
185            command::UNSETSINGLE
186        };
187
188        let request = format!("{} {} {}\n", category::CONTROLLINO, command, relay_id);
189
190        self.write_stream_check_for_error(request)?;
191
192        let mut response = String::new();
193        self.buf_reader.read_line(&mut response).map_err(|e| {
194            TcpCommunicationError::ReadingFromStreamFailed {
195                location: module_path!().to_string(),
196                source: e,
197            }
198        })?;
199        Ok(())
200    }
201
202    /// Communicates with the simulator to request and retrieve a sensor signal value.
203    ///
204    /// This private helper function constructs a TCP message to ask the simulator
205    /// for the current reading of a specific `AquariumSignal`. It then writes this
206    /// request to the TCP stream, waits for a response, and parses the received
207    /// string into an `f32` value.
208    ///
209    /// # Arguments
210    /// * `aquarium_signal` - The `AquariumSignal` enum variant describing the sensor
211    ///   signal (e.g., water temperature, pH) for which the value is requested.
212    ///
213    /// # Returns
214    /// A `Result` containing the retrieved sensor value as an `f32` on success.
215    ///
216    /// # Errors
217    /// Returns a `TcpCommunicationError` if any part of the process fails:
218    /// - `TcpCommunicationError::IllegalSignalRequestToSimulator`: If an unsupported
219    ///   `AquariumSignal` is requested.
220    /// - `TcpCommunicationError::ReadingFromStreamFailed`: If a response cannot be read
221    ///   from the TCP stream.
222    /// - `TcpCommunicationError::ResponseContainsNoWords`: If the simulator's response
223    ///   is empty or contains no parsable words.
224    /// - `TcpCommunicationError::LastWordOfResponseEmpty`: If the last word of the response
225    ///   is an empty string.
226    /// - `TcpCommunicationError::ResponseConversionError`: If the last word of the response
227    ///   cannot be parsed into an `f32`.
228    /// - It will also propagate any errors from `write_stream_check_for_error`.
229    fn get_signal(
230        &mut self,
231        aquarium_signal: AquariumSignal,
232    ) -> Result<f32, TcpCommunicationError> {
233        let (category, tcp_signal_identifier) = match aquarium_signal {
234            AquariumSignal::WaterTemperature => (
235                category::ATLASSCIENTIFIC,
236                signal::ATLASSCIENTIFICTEMPERATURE,
237            ),
238            AquariumSignal::pH => (category::ATLASSCIENTIFIC, signal::ATLASSCIENTIFICPH),
239            AquariumSignal::Conductivity => (
240                category::ATLASSCIENTIFIC,
241                signal::ATLASSCIENTIFICCONDUCTIVITY,
242            ),
243            AquariumSignal::AmbientTemperature => (category::DHT22, signal::AMBIENTTEMPERATURE),
244            AquariumSignal::AmbientHumidity => (category::DHT22, signal::AMBIENTHUMIDITY),
245            AquariumSignal::TankLevelSwitchPosition => {
246                (category::TANKLEVELSWITCH, signal::TANKLEVELSWITCHPOSITION)
247            }
248            AquariumSignal::TankLevelSwitchInvalid => {
249                (category::TANKLEVELSWITCH, signal::TANKLEVELSWITCHINVALID)
250            }
251            _ => {
252                return Err(TcpCommunicationError::IllegalSignalRequestToSimulator(
253                    module_path!().to_string(),
254                    aquarium_signal,
255                ));
256            }
257        };
258
259        let request = format!("{} {} {}\n", category, command::READ, tcp_signal_identifier);
260        self.write_stream_check_for_error(request)?;
261
262        let mut response = String::new();
263        self.buf_reader.read_line(&mut response).map_err(|e| {
264            TcpCommunicationError::ReadingFromStreamFailed {
265                location: module_path!().to_string(),
266                source: e,
267            }
268        })?;
269
270        // Flattened and more robust parsing logic
271        Self::get_last_word(&response)
272            .ok_or_else(|| {
273                TcpCommunicationError::ResponseContainsNoWords(module_path!().to_string())
274            })
275            .and_then(|last_word| {
276                let trimmed_word = last_word.trim();
277                if trimmed_word.is_empty() {
278                    Err(TcpCommunicationError::LastWordOfResponseEmpty(
279                        module_path!().to_string(),
280                    ))
281                } else {
282                    // Parse directly from the trimmed slice.
283                    trimmed_word.parse::<f32>().map_err(|e| {
284                        TcpCommunicationError::ResponseConversionError {
285                            location: module_path!().to_string(),
286                            last_word: trimmed_word.to_string(), // Only allocate string for the error case.
287                            source: e,
288                        }
289                    })
290                }
291            })
292    }
293
294    /// Receives a `RequestSignal` command without blocking from a channel and sends back the corresponding sensor value via TCP.
295    ///
296    /// This private helper function is used to service requests from various modules
297    /// that are configured to run in simulator mode. It checks the provided receiver channel (`rx`) for an
298    /// `InternalCommand::RequestSignal`. If such a request is found, it uses `self.get_signal()`
299    /// to fetch the simulated sensor value and sends it back through the provided sender channel (`tx`).
300    /// Other command types are ignored.
301    ///
302    /// # Arguments
303    /// * `rx` - A reference to the receiver channel from which `InternalCommand` requests are received.
304    /// * `tx` - A reference to the sender channel through which the `f32` sensor response is sent back.
305    ///
306    /// # Returns
307    /// A `Result` which is:
308    /// - `Ok(f32)`: In the case of success
309    /// - `Err(TcpCommunicationError:)`: in case of failure
310    fn receive_and_answer(
311        &mut self,
312        rx: &mut AquaReceiver<InternalCommand>,
313        tx: &mut AquaSender<Result<f32, TcpCommunicationError>>,
314    ) -> Result<(), TcpCommunicationError> {
315        match rx.try_recv() {
316            Ok(InternalCommand::RequestSignal(s)) => {
317                // A signal was requested. Get it from the simulator.
318                let response = self.get_signal(s);
319                // Send the response (Ok or Err) back to the caller.
320                tx.send(response).map_err(|_| {
321                    TcpCommunicationError::SendingResponseViaChannelFailure(
322                        module_path!().to_string(),
323                    )
324                })
325            }
326            Ok(_) => {
327                // Any other command was received, which we can safely ignore.
328                Ok(())
329            }
330            #[cfg(feature = "debug_channels")]
331            Err(AquaChannelError::Full) => {
332                // Not applicable. Do nothing.
333                Ok(())
334            }
335            Err(AquaChannelError::Empty) => {
336                // No command in the channel, which is a normal, non-error state.
337                Ok(())
338            }
339            Err(AquaChannelError::Disconnected) => {
340                // The channel is broken, which is a critical error.
341                Err(TcpCommunicationError::ChannelDisconnected(
342                    module_path!().to_string(),
343                ))
344            }
345        }
346    }
347
348    /// Handles a request-response cycle for a simulated sensor if it's active.
349    fn handle_simulated_sensor(
350        &mut self,
351        is_active: bool,
352        rx_opt: &mut Option<AquaReceiver<InternalCommand>>,
353        tx_opt: &mut Option<AquaSender<Result<f32, TcpCommunicationError>>>,
354        module_name: &str,
355    ) {
356        if is_active {
357            if let (Some(rx), Some(tx)) = (rx_opt, tx_opt) {
358                if let Err(e) = self.receive_and_answer(rx, tx) {
359                    log_error_chain(
360                        module_path!(),
361                        &format!("Error in communication with {module_name}"),
362                        e,
363                    );
364                }
365            }
366        }
367    }
368
369    /// Executes the main control loop for the TCP communication module.
370    ///
371    /// This function runs continuously as a server, processing incoming requests
372    /// from various client threads (e.g., Ambient, Atlas Scientific, Tank Level Switch, Relay Manager).
373    /// It dispatches these requests to the connected TCP simulator and sends back the responses.
374    ///
375    /// The loop remains active until a `Quit` command is received from the signal handler.
376    /// Upon receiving `Quit`, it breaks out of the loop and sends a confirmation back to the signal handler.
377    ///
378    /// # Arguments
379    /// * `tcp_communication_channels` - A mutable reference to a struct containing the
380    ///   channels for communication with other threads,
381    ///   including senders/receivers for SensorManager, Tank Level Switch,
382    ///   Relay Manager, and the Signal Handler.
383    pub fn execute(&mut self, tcp_communication_channels: &mut TcpCommunicationChannels) {
384        #[cfg(all(target_os = "linux", not(test)))]
385        info!(target: module_path!(), "Thread started with TID: {}", gettid());
386
387        #[cfg(all(not(test), feature = "debug_channels"))]
388        tcp_communication_channels.report_status();
389
390        let spin_sleeper = SpinSleeper::default();
391        let sleep_interval_millis = 10;
392        let sleep_duration_ten_milli_sec = Duration::from_millis(sleep_interval_millis);
393        let mut lock_relay_manager_channel_disconnected = false;
394
395        loop {
396            let (quit_command_received, _, _) = self.process_external_request(
397                &mut tcp_communication_channels.rx_tcp_communication_from_signal_handler,
398                None,
399            );
400            if quit_command_received {
401                break;
402            }
403
404            // *** communication with sensor manager ***
405            self.handle_simulated_sensor(
406                self.execution_config.sensor_manager,
407                &mut tcp_communication_channels.rx_tcp_communication_from_sensor_manager_opt,
408                &mut tcp_communication_channels.tx_tcp_communication_to_sensor_manager_opt,
409                "sensor manager",
410            );
411
412            // *** communication with Tank level switch ***
413            self.handle_simulated_sensor(
414                self.execution_config.tank_level_switch,
415                &mut tcp_communication_channels.rx_tcp_communication_from_tank_level_switch_opt,
416                &mut tcp_communication_channels.tx_tcp_communication_to_tank_level_switch_opt,
417                "tank level switch",
418            );
419
420            // *** communication with relay manager - no response required ***
421            if !lock_relay_manager_channel_disconnected && self.execution_config.relay_manager {
422                match tcp_communication_channels.rx_tcp_communication_from_relay_manager_opt {
423                    Some(ref mut rx_tcp_communication_from_relay_manager) => {
424                        match rx_tcp_communication_from_relay_manager.try_recv() {
425                            Ok(c) => match c {
426                                InternalCommand::SetRelay(c) => {
427                                    let response = self.change_relay(c, true);
428                                    if let Err(response_err) = response {
429                                        log_error_chain(
430                                            module_path!(),
431                                            "Error in communication to relay manager",
432                                            response_err,
433                                        );
434                                    }
435                                }
436                                InternalCommand::UnsetRelay(c) => {
437                                    let response = self.change_relay(c, false);
438                                    if let Err(response_err) = response {
439                                        log_error_chain(
440                                            module_path!(),
441                                            "Error in communication to relay manager",
442                                            response_err,
443                                        );
444                                    }
445                                }
446                                _ => { /* ignore all other commands */ }
447                            },
448                            #[cfg(feature = "debug_channels")]
449                            Err(AquaChannelError::Full) => { /* Not applicable - do nothing */ }
450                            Err(AquaChannelError::Empty) => { /* empty buffer - no action required */
451                            }
452                            Err(AquaChannelError::Disconnected) => {
453                                lock_relay_manager_channel_disconnected = true;
454                                error!(
455                                    target: module_path!(),
456                                    "Error in trying to receive command from relay manager"
457                                );
458                            }
459                        }
460                    }
461                    None => {
462                        // no value provided - do nothing
463                    }
464                }
465            }
466            // **********************************
467            spin_sleeper.sleep(sleep_duration_ten_milli_sec);
468        }
469
470        tcp_communication_channels.acknowledge_signal_handler();
471    }
472}