aquarium_control/mineral/
balling.rs

1/* Copyright 2024 Uwe Martin
2
3Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
4
5The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
6
7THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
8*/
9
10//! Implements the main control logic for automated Balling mineral dosing.
11//!
12//! This module contains the `Balling` struct, which runs as a dedicated thread to manage
13//! the scheduled dosing of up to four different mineral solutions. It is responsible for
14//! maintaining a precise dosing schedule, responding to external commands, and ensuring
15//! that dosing operations are performed safely and reliably.
16//!
17//! ## Key Components
18//!
19//! - **`Balling` Struct**: The central state machine for the dosing subsystem. It holds the
20//!   configuration and the internal state for each pump, most importantly the
21//!   `countdown_dosing_pumpX` fields which track the time until the next scheduled dose.
22//!
23//! - **`new()` Constructor**: A complex and critical initialization function. It performs
24//!   several pre-flight checks:
25//!   1.  Validates that all configured dosing intervals are logical and non-zero.
26//!   2.  Ensures that every pump marked as `active` in the configuration has a
27//!       corresponding entry in the database.
28//!   3.  Calculates the initial countdown for each pump by querying the database for the
29//!       time of its last dosing event.
30//!
31//! - **`execute()` Method**: The main thread loop. It continuously performs the following actions:
32//!   1.  Periodically decrements the countdown timers for each active pump.
33//!   2.  When a countdown reaches zero, it requests a `schedule_check` to ensure dosing
34//!       is permitted at the current time.
35//!   3.  If permitted, it calls `execute_dosing` to perform the physical action.
36//!   4.  Resets the pump's countdown to its configured interval.
37//!   5.  Listens for and processes external commands like `Start`, `Stop`, and `Execute(pump_id)`.
38//!
39//! - **`execute_dosing()` Method**: A private helper that orchestrates a single dosing event.
40//!   It retrieves pump parameters from the database, calculates the required pump run time,
41//!   invokes the `MineralInjectionTrait` to actuate the pump, and logs the completed
42//!   event back to the database.
43//!
44//! ## Design and Architecture
45//!
46//! The `Balling` module is designed as a robust, decoupled, and testable component.
47//!
48//! - **Time-Based Scheduling**: The core scheduling logic is based on simple countdown timers.
49//!   This is a robust and low-overhead way to manage recurring events without relying on
50//!   a complex external scheduler.
51//!
52//! - **Dependency Injection**: The module relies on a trait for its core dependency:
53//!   - `MineralInjectionTrait`: An abstraction for the physical act of dispensing a
54//!     mineral solution. This allows for mock implementations in tests.
55//!
56//! - **Concurrency Control**: It uses an `Arc<Mutex<i32>>` to coordinate with other
57//!   device-actuating modules (like `Feed`). This mutex acts as semaphore, ensuring
58//!   that only one major physical action occurs at a time across the entire system,
59//!   preventing conflicts.
60//!
61//! - **State Management**: The `dosing_inhibited` flag allows external commands (`Start`/`Stop`)
62//!   to temporarily pause all scheduled dosing without stopping the thread or losing the
63//!   current countdown states.
64
65use chrono::Local;
66#[cfg(feature = "debug_balling")]
67use log::debug;
68use log::{error, info};
69use std::fmt;
70
71#[cfg(all(not(test), target_os = "linux"))]
72use nix::unistd::gettid;
73
74use spin_sleep::SpinSleeper;
75use std::sync::{Arc, Mutex};
76use std::time::{Duration, Instant};
77
78use crate::database::sql_interface_balling::{BallingSetVal, SqlInterfaceBalling};
79use crate::mineral::balling_channels::BallingChannels;
80use crate::mineral::balling_config::BallingConfig;
81use crate::mineral::balling_error::BallingError;
82use crate::mineral::mineral_injection::MineralInjectionTrait;
83use crate::utilities::acknowledge_signal_handler::AcknowledgeSignalHandlerTrait;
84use crate::utilities::channel_content::{AquariumDevice, InternalCommand};
85use crate::utilities::database_ping_trait::DatabasePingTrait;
86use crate::utilities::proc_ext_req::ProcessExternalRequestTrait;
87use crate::utilities::wait_for_termination::WaitForTerminationTrait;
88use crate::{check_quit_increment_counter_ping_database, perform_schedule_check};
89
90#[cfg_attr(doc, aquamarine::aquamarine)]
91/// Contains the configuration and the implementation for the Balling dosing control.
92/// Thread communication of this component is as follows:
93/// ```mermaid
94/// graph LR
95///     balling[Balling Dosing Control] --> signal_handler[Signal Handler]
96///     signal_handler --> balling
97///     signal_handler --> mineral_injection[Mineral Injection]
98///     balling --> mineral_injection
99///     mineral_injection --> relay_manager[Relay Manager]
100///     relay_manager --> mineral_injection
101///     balling --> schedule_check[Schedule Check]
102///     schedule_check --> balling
103///     messaging[Messaging] --> balling
104/// ```
105/// Communication channel to and from the relay manager is forwarded to implementation of MineralInjectionTrait.
106/// Signal handler is communicating both directly with balling and with the implementation of MineralInjectionTrait.
107pub struct Balling {
108    /// configuration data for Balling dosing control
109    config: BallingConfig,
110
111    /// Durations until next dosing
112    durations_until_next_dosing: [Duration; 4],
113
114    /// Configuration data processed into an array: activation
115    pump_is_active: [bool; 4],
116
117    /// Configuration data processed into an array: dosing intervals
118    dosing_intervals: [Duration; 4],
119
120    /// inhibition flag to avoid flooding the log file with repeated messages to send request via the channel to schedule check
121    lock_error_channel_send_schedule_check: bool,
122
123    /// inhibition flag to avoid flooding the log file with repeated messages to receive request via the channel from schedule check
124    lock_error_channel_receive_schedule_check: bool,
125
126    /// communication from trait implementation: request to execute a certain Balling mineral dosing has been received
127    pub execute_command_received: bool,
128
129    /// communication from trait implementation: id of pump to be executed requested externally
130    pub pump_id_requested: i32,
131
132    /// recording when the last database ping happened
133    pub last_ping_instant: Instant,
134
135    /// database ping interval
136    pub database_ping_interval: Duration,
137
138    /// inhibition flag to avoid flooding the log file with repeated messages about having received inapplicable command via the channel
139    pub lock_warn_inapplicable_command_signal_handler: bool,
140
141    /// inhibition flag to avoid flooding the log file with repeated messages about failure to receive termination signal via the channel
142    pub lock_error_channel_receive_termination: bool,
143}
144
145impl fmt::Display for Balling {
146    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147        write!(
148            f,
149            "Balling Controller (Active: {}), Initial Countdown (s): [P1: {}, P2: {}, P3: {}, P4: {}]",
150            self.config.active,
151            self.durations_until_next_dosing[0].as_secs(),
152            self.durations_until_next_dosing[1].as_secs(),
153            self.durations_until_next_dosing[2].as_secs(),
154            self.durations_until_next_dosing[3].as_secs()
155        )
156    }
157}
158
159impl Balling {
160    /// Creates a new `Balling` control instance.
161    ///
162    /// This constructor initializes the Balling dosing control module. It performs
163    /// several pre-flight checks, including validating the `schedule_check_interval`
164    /// and dosing intervals from the configuration, ensuring that a database entry
165    /// exists for every active pump, and calculating the initial countdowns for
166    /// each pump based on their last recorded dosing times.
167    ///
168    /// # Arguments
169    /// * `config` - Configuration data for the Balling dosing control, loaded from a TOML file.
170    /// * `sql_interface_balling` - A mutable reference to a `SqlInterfaceBalling` instance, providing the
171    ///   specific SQL interface for Balling dosing operations.
172    /// * `database_ping_interval` - A `Duration` instance, providing the interval to ping the database.
173    ///
174    /// # Returns
175    /// A `Result` containing a new, initialized `Balling` instance on success.
176    ///
177    /// # Errors
178    /// This function will return a `BallingError` if any of the initial setup steps fail:
179    /// - The `schedule_check_interval` in the configuration is zero (`ScheduleCheckIntervalZero`).
180    /// - Any configured dosing interval is zero or shorter than the `schedule_check_interval`
181    ///   (`InvalidDosingInterval`, `DosingIntervalShorterThanCheckInterval`).
182    /// - A pump is marked as active in the config, but no corresponding set values are found
183    ///   in the database (`SetValueRetrievalError`).
184    /// - It fails to calculate the initial countdown for any pump due to a database error
185    ///   or data conversion issue.
186    pub fn new(
187        config: BallingConfig,
188        database_ping_interval: Duration,
189        sql_interface_balling: &mut SqlInterfaceBalling,
190    ) -> Result<Balling, BallingError> {
191        if config.schedule_check_interval == 0 {
192            return Err(BallingError::ScheduleCheckIntervalZero(
193                module_path!().to_string(),
194            ));
195        }
196
197        Self::check_valid_dosing_intervals(&config)?;
198
199        Self::check_database_vs_config(sql_interface_balling, &config)?;
200
201        let mut durations_until_next_dosing = [Duration::from_secs(0); 4];
202
203        // Calculate the initial countdown for each pump when the next dosing shall happen
204        for pump_id in 1..=4 {
205            durations_until_next_dosing[pump_id - 1] = Self::calc_duration_until_next_dosing(
206                &config,
207                sql_interface_balling,
208                pump_id as i64,
209            )?;
210        }
211
212        // Transfer the pump configuration data into an array
213        let mut pump_is_active = [false; 4];
214        pump_is_active[0] = config.pump1_active;
215        pump_is_active[1] = config.pump2_active;
216        pump_is_active[2] = config.pump3_active;
217        pump_is_active[3] = config.pump4_active;
218
219        let mut dosing_intervals = [Duration::from_secs(0); 4];
220        dosing_intervals[0] = Duration::from_secs(config.dosing_interval_pump1 as u64);
221        dosing_intervals[1] = Duration::from_secs(config.dosing_interval_pump2 as u64);
222        dosing_intervals[2] = Duration::from_secs(config.dosing_interval_pump3 as u64);
223        dosing_intervals[3] = Duration::from_secs(config.dosing_interval_pump4 as u64);
224
225        Ok(Balling {
226            config,
227            durations_until_next_dosing,
228            pump_is_active,
229            dosing_intervals,
230            lock_error_channel_send_schedule_check: false,
231            lock_error_channel_receive_schedule_check: false,
232            execute_command_received: false,
233            pump_id_requested: 0,
234            last_ping_instant: Instant::now(),
235            database_ping_interval,
236            lock_warn_inapplicable_command_signal_handler: false,
237            lock_error_channel_receive_termination: false,
238        })
239    }
240
241    /// Checks that configured dosing intervals are valid and logical.
242    ///
243    /// This function validates two conditions for each active pump:
244    /// 1. The dosing interval must not be zero.
245    /// 2. The dosing interval must be greater than or equal to the `schedule_check_interval`.
246    ///
247    /// # Arguments
248    /// * `config` - A reference to the `BallingConfig` containing the dosing intervals and the check interval.
249    ///
250    /// # Returns
251    /// An empty `Result` (`Ok(())`) if all configured dosing intervals for active pumps are valid.
252    ///
253    /// # Errors
254    /// This function will return a `BallingError` if an invalid configuration is detected:
255    /// - `InvalidDosingInterval`: If an active pump's dosing interval is set to `0`.
256    /// - `DosingIntervalShorterThanCheckInterval`: If an active pump's dosing interval is
257    ///   shorter than the main `schedule_check_interval`, which would lead to illogical scheduling.
258    fn check_valid_dosing_intervals(config: &BallingConfig) -> Result<(), BallingError> {
259        let pumps = [
260            (1, config.pump1_active, config.dosing_interval_pump1),
261            (2, config.pump2_active, config.dosing_interval_pump2),
262            (3, config.pump3_active, config.dosing_interval_pump3),
263            (4, config.pump4_active, config.dosing_interval_pump4),
264        ];
265
266        for (pump_id, is_active, interval) in pumps {
267            if !is_active {
268                continue;
269            }
270            if interval == 0 {
271                return Err(BallingError::InvalidDosingInterval(
272                    module_path!().to_string(),
273                    pump_id,
274                ));
275            }
276            if interval < config.schedule_check_interval {
277                return Err(BallingError::DosingIntervalShorterThanCheckInterval(
278                    module_path!().to_string(),
279                    pump_id as u32,
280                    interval,
281                    config.schedule_check_interval,
282                ));
283            }
284        }
285        Ok(())
286    }
287
288    /// Verifies that a database entry exists for every pump marked as active in the configuration.
289    ///
290    /// This function ensures that the application doesn't try to operate a pump that
291    /// lacks the necessary set values (e.g., flow rate, volume) in the database.
292    ///
293    /// # Arguments
294    /// * `sql_interface_balling` - A mutable reference to the `SqlInterfaceBalling` instance.
295    /// * `config` - A reference to the `BallingConfig` to check which pumps are active.
296    ///
297    /// # Returns
298    /// An empty `Result` (`Ok(())`) if a database entry is found for every active pump.
299    ///
300    /// # Errors
301    /// Returns `BallingError::SetValueRetrievalError` if an active pump is missing its
302    /// corresponding configuration entry in the `ballingsetvals` table. The error will
303    /// wrap the underlying `SqlInterfaceError`.
304    fn check_database_vs_config(
305        sql_interface_balling: &mut SqlInterfaceBalling,
306        config: &BallingConfig,
307    ) -> Result<(), BallingError> {
308        let active_pumps = [
309            (1, config.pump1_active),
310            (2, config.pump2_active),
311            (3, config.pump3_active),
312            (4, config.pump4_active),
313        ];
314
315        for (pump_id, is_active) in active_pumps {
316            if is_active {
317                // The `?` operator makes this much cleaner than a match statement.
318                sql_interface_balling
319                    .get_single_balling_setval_from_database(pump_id)
320                    .map_err(|e| BallingError::SetValueRetrievalError {
321                        location: module_path!().to_string(),
322                        pump_id,
323                        source: Box::new(e),
324                    })?;
325            }
326        }
327        Ok(())
328    }
329
330    /// Retrieves the configured dosing interval for a specific pump.
331    ///
332    /// # Arguments
333    /// * `config` - A reference to the `BallingConfig` containing the dosing intervals.
334    /// * `pump_id` - The ID of the pump (1, 2, 3, or 4) for which to get the interval.
335    ///
336    /// # Returns
337    /// A `Result` containing the configured dosing interval (`u32`) in seconds for the specified pump.
338    ///
339    /// # Errors
340    /// Returns `BallingError::CountdownCalculationInvalidPumpId` if the provided `pump_id`
341    /// is not one of the recognized values (1, 2, 3, or 4).
342    fn get_target_dosing_interval(
343        config: &BallingConfig,
344        pump_id: i64,
345    ) -> Result<u32, BallingError> {
346        match pump_id {
347            1 => Ok(config.dosing_interval_pump1),
348            2 => Ok(config.dosing_interval_pump2),
349            3 => Ok(config.dosing_interval_pump3),
350            4 => Ok(config.dosing_interval_pump4),
351            _ => Err(BallingError::CountdownCalculationInvalidPumpId(
352                module_path!().to_string(),
353                pump_id,
354            )),
355        }
356    }
357
358    /// Calculates the remaining time (countdown) until the next scheduled dosing for a specific pump.
359    ///
360    /// This function determines how much time is left until a pump's next dosing event.
361    /// It retrieves the target interval from the configuration and the elapsed time since the
362    /// last dosing from the database. The duration is the interval minus the elapsed time.
363    /// If no previous dosing event exists, the duration is set to the target interval
364    /// applying a cautious strategy.
365    ///
366    /// # Arguments
367    /// * `config` - A reference to the `BallingConfig` containing the dosing intervals.
368    /// * `sql_interface_balling` - A mutable reference to the `SqlInterfaceBalling` instance.
369    /// * `pump_id` - The unique identifier of the pump (1-4) for which to calculate the countdown.
370    ///
371    /// # Returns
372    /// A `Result` containing the calculated duration until the next dosing. This value is `0` if the dosing is overdue.
373    ///
374    /// # Errors
375    /// This function will return a `BallingError` if:
376    /// - The `pump_id` is invalid (`CountdownCalculationInvalidPumpId`).
377    /// - The configured dosing interval for the pump is zero (`InvalidDosingInterval`).
378    /// - The database query to get the duration since the last dosing fails (`ReadDurationSinceLastDosingFailure`).
379    /// - The retrieved duration from the database is negative or cannot be converted to `u32` (`CalculateDurationSinceLastDosingFailure`).
380    fn calc_duration_until_next_dosing(
381        config: &BallingConfig,
382        sql_interface_balling: &mut SqlInterfaceBalling,
383        pump_id: i64,
384    ) -> Result<Duration, BallingError> {
385        let target_dosing_interval = Self::get_target_dosing_interval(config, pump_id)?;
386        if target_dosing_interval == 0 {
387            return Err(BallingError::InvalidDosingInterval(
388                module_path!().to_string(),
389                pump_id,
390            ));
391        }
392
393        let duration_since_balling_dosing_opt = sql_interface_balling
394            .get_duration_since_last_balling_dosing(pump_id)
395            .map_err(|e| BallingError::ReadDurationSinceLastDosingFailure {
396                location: module_path!().to_string(),
397                pump_id,
398                source: Box::new(e),
399            })?;
400
401        let duration_since_balling_dosing: Duration = match duration_since_balling_dosing_opt {
402            Some(duration_since_balling_dosing) => duration_since_balling_dosing,
403            None => {
404                // No entry found in the database.
405                // For avoiding overdosing, assume that storing dosing event failed.
406                Duration::from_secs(target_dosing_interval as u64)
407            }
408        };
409
410        #[cfg(test)]
411        println!(
412            "{}, Duration since last dosing for pump {} is {} seconds.",
413            module_path!(),
414            pump_id,
415            duration_since_balling_dosing.as_secs()
416        );
417
418        let target_dosing_interval_duration = Duration::from_secs(target_dosing_interval as u64);
419
420        let remaining_time_until_next_dosing =
421            if duration_since_balling_dosing >= target_dosing_interval_duration {
422                Duration::from_secs(0)
423            } else {
424                target_dosing_interval_duration - duration_since_balling_dosing
425            };
426
427        #[cfg(test)]
428        println!(
429            "{}: Duration until next dosing dosing of pump {} is {} seconds.",
430            module_path!(),
431            pump_id,
432            remaining_time_until_next_dosing.as_secs()
433        );
434
435        Ok(remaining_time_until_next_dosing)
436    }
437
438    /// Calculates the target dosing duration in milliseconds based on the pump's flow rate and desired volume.
439    ///
440    /// This private helper function determines how long a dosing pump needs to run
441    /// to dispense a specific volume of fluid, given its flow rate.
442    ///
443    /// # Arguments
444    /// * `balling_setval` - A reference to a `BallingSetVal` struct, which contains
445    ///   the `dosing_speed` (flow rate in ml/sec) and `dosing_volume` (target volume in ml).
446    ///
447    /// # Returns
448    /// A `u32` representing the calculated dosing duration in milliseconds.
449    /// Returns `0` if either `dosing_speed` or `dosing_volume` is zero or negative,
450    /// indicating that no dosing should occur due to invalid parameters.
451    fn calc_target_dosing_duration_millis(balling_setval: &BallingSetVal) -> u32 {
452        if balling_setval.dosing_speed > 0.0 && balling_setval.dosing_volume > 0.0 {
453            let dosing_duration_seconds =
454                balling_setval.dosing_volume / balling_setval.dosing_speed;
455            (dosing_duration_seconds * 1000.0) as u32
456        } else {
457            0 // user set an invalid flow rate or invalid dosing volume -> no dosing
458        }
459    }
460
461    /// Calculates the actual volume of fluid dosed based on the pump's flow rate and the actual run duration.
462    ///
463    /// This private helper function determines the precise volume dispensed, taking into account
464    /// the pump's calibrated flow rate and the actual time it was active.
465    ///
466    /// # Arguments
467    /// * `balling_setval` - A reference to a `BallingSetVal` struct, containing the pump's
468    ///   `dosing_speed` (flow rate in ml/sec).
469    /// * `actual_dosing_duration_millis` - The actual duration (in milliseconds) that the pump was active.
470    ///
471    /// # Returns
472    /// An `f32` representing the actual volume of fluid dosed (in milliliters).
473    /// Returns `0.0` if `dosing_speed` is zero or negative, or if `actual_dosing_duration_millis` is zero,
474    /// indicating that no fluid was dispensed.
475    fn calc_actual_dosing_volume(
476        balling_setval: &BallingSetVal,
477        actual_dosing_duration_millis: u32,
478    ) -> f32 {
479        if balling_setval.dosing_speed > 0.0 && actual_dosing_duration_millis > 0 {
480            (actual_dosing_duration_millis as f32) / 1000.0 * balling_setval.dosing_speed
481        } else {
482            0.0 // user set an invalid flow rate or invalid dosing volume -> no dosing
483        }
484    }
485
486    /// Orchestrates the dosing process for a specific peristaltic pump.
487    ///
488    /// This function performs the complete cycle of a dosing event:
489    /// 1. Retrieve the pump's set values (e.g., flow rate, target volume) from the database.
490    /// 2. Calculates the required dosing duration.
491    /// 3. Invokes the `mineral_injection` trait method to physically actuate the pump for the calculated duration.
492    /// 4. Calculate the actual volume dosed based on the actual run time.
493    /// 5. Log the completed Balling dosing event, including the actual volume, back to the database.
494    ///
495    /// The function also monitors for a `Quit` command from the signal handler during the injection process.
496    ///
497    /// # Arguments
498    /// * `pump_id` - The unique numerical ID of the peristaltic pump to be actuated (e.g., 1, 2, 3, 4).
499    /// * `pump_device` - The `AquariumDevice` enum variant corresponding to the `pump_id`,
500    ///   representing the physical device.
501    /// * `mineral_injection` - A mutable reference to an object implementing `MineralInjectionTrait`,
502    ///   responsible for the physical control of the pump.
503    /// * `balling_channels` - A mutable reference to `BallingChannels` struct containing all necessary `mpsc`
504    /// * `sql_interface_balling` - A mutable reference to a `SqlInterfaceBalling` instance, providing the
505    ///   specific SQL interface for Balling dosing operations. This is moved into the struct.
506    ///
507    /// # Returns
508    /// A `bool` which is `true` if a `Quit` command was received from the signal handler
509    /// during the dosing process, indicating that the application should shut down; otherwise `false`.
510    fn execute_dosing(
511        &mut self,
512        pump_id: u32,
513        pump_device: AquariumDevice,
514        mineral_injection: &mut impl MineralInjectionTrait,
515        balling_channels: &mut BallingChannels,
516        sql_interface_balling: &mut SqlInterfaceBalling,
517    ) -> bool {
518        let mut quit_command_received: bool = false;
519        let actual_dosing_duration_millis: u32;
520
521        match sql_interface_balling.get_single_balling_setval_from_database(pump_id.into()) {
522            Ok(balling_setval) => {
523                let dosing_duration_millis =
524                    Self::calc_target_dosing_duration_millis(&balling_setval);
525
526                #[cfg(test)]
527                println!(
528                    "{}: Calculated target dosing duration of {} ms for pump #{}",
529                    module_path!(),
530                    dosing_duration_millis,
531                    pump_id
532                );
533
534                if dosing_duration_millis > 0 {
535                    (quit_command_received, actual_dosing_duration_millis) = mineral_injection
536                        .inject_mineral(balling_channels, pump_device, dosing_duration_millis);
537                    let actual_dosing_volume = Self::calc_actual_dosing_volume(
538                        &balling_setval,
539                        actual_dosing_duration_millis,
540                    );
541                    if let Err(e) = sql_interface_balling.insert_balling_event(
542                        Local::now().naive_local(),
543                        pump_id.into(),
544                        actual_dosing_volume.into(),
545                    ) {
546                        error!(
547                            target: module_path!(),
548                            "Error occurred in database communication: {e:?}"
549                        );
550                    }
551                }
552            }
553            Err(e) => {
554                error!(
555                    target: module_path!(),
556                    "encountered error when reading set values of pump #{pump_id}: {e:?}"
557                );
558            }
559        }
560        quit_command_received
561    }
562
563    /// Selects the appropriate peristaltic pump and its corresponding device representation based on a given ID.
564    ///
565    /// This function acts as a mapping from a numerical pump ID
566    /// to a tuple containing the pump's `u32` ID and its `AquariumDevice` enum variant.
567    /// Numerical pump IDs are used in external commands and in the database.
568    ///
569    /// # Arguments
570    /// * `pump_id` - The numerical ID of the pump (e.g., 1, 2, 3, 4) to be selected.
571    ///
572    /// # Returns
573    /// An `Option<(u32, AquariumDevice)>`:
574    /// - `Some((u32, AquariumDevice))`: If the `pump_id` matches one of the known peristaltic pumps,
575    ///   it will return its `u32` ID and the corresponding `AquariumDevice` variant.
576    /// - `None`: If the provided `pump_id` does not correspond to a known peristaltic pump.
577    pub fn get_pump_from_id(pump_id: i32) -> Option<(u32, AquariumDevice)> {
578        match pump_id {
579            1 => Some((1, AquariumDevice::PeristalticPump1)),
580            2 => Some((2, AquariumDevice::PeristalticPump2)),
581            3 => Some((3, AquariumDevice::PeristalticPump3)),
582            4 => Some((4, AquariumDevice::PeristalticPump4)),
583            _ => None,
584        }
585    }
586
587    /// Executes the main control loop for the Balling dosing module.
588    ///
589    /// This function runs continuously, managing the automatic dosing schedule for
590    /// multiple peristaltic pumps, processing external commands, and ensuring a
591    /// graceful shutdown. It periodically checks the dosing countdowns for each pump
592    /// and, if due, initiates the dosing process, respecting schedule limitations.
593    ///
594    /// The loop continues until a `Quit` command is received from the signal handler.
595    /// After exiting the main loop, it sends a confirmation back to the signal handler
596    /// and then waits for a `Terminate` command to complete its shutdown sequence.
597    ///
598    /// # Arguments
599    /// * `mutex_device_scheduler_balling` - An `Arc<Mutex<i32>>` used for coordinating
600    ///   access to device scheduling, preventing parallel actuation across different
601    ///   control modules. It holds a counter of the completed actuation.
602    /// * `balling_channels` - A mutable reference to `BallingChannels` struct containing all necessary `mpsc`
603    ///   sender and receiver channels for inter-thread communication (e.g., with
604    ///   the signal handler, relay manager, and schedule checker).
605    /// * `mineral_injection` - A mutable reference to an object implementing the
606    ///   `MineralInjectionTrait`, responsible for the physical control of the dosing pumps.
607    /// * `sql_interface_balling` - A `SqlInterfaceBalling` instance, providing the
608    ///   specific SQL interface for Balling dosing operations.
609    pub fn execute(
610        &mut self,
611        mutex_device_scheduler_balling: Arc<Mutex<i32>>,
612        balling_channels: &mut BallingChannels,
613        mineral_injection: &mut impl MineralInjectionTrait,
614        mut sql_interface_balling: SqlInterfaceBalling,
615    ) {
616        #[cfg(all(target_os = "linux", not(test)))]
617        info!(target: module_path!(), "Thread started with TID: {}", gettid());
618
619        let sleep_duration_hundred_millis = Duration::from_millis(100); // used to reduce the frequency of channel polling
620        let sleep_duration_pump_switch =
621            Duration::from_millis(self.config.pump_switch_delay_millis.into()); // wait between actuation of pumps
622        let spin_sleeper = SpinSleeper::default();
623        let mut loop_counter = 0; // used to reduce the frequency of DB requests
624        let mut quit_command_received: bool; // the request to end the application has been received
625        let mut stop_command_received: bool; // the request to (temporarily) stop dosing has been received
626        let mut start_command_received: bool; // the request to (temporarily) stop dosing has been received
627        let mut dosing_inhibited: bool = false; // state of dosing control determined if the start/stop command has been received
628        let mut schedule_check_result: bool; // flag indicating if operation of Balling is permitted at this time of day
629        let mut target_instants_dosing = [Instant::now(); 4];
630
631        // Calculate the target instant for each pump when the next dosing shall happen
632        for pump_id in 1..=4 {
633            target_instants_dosing[pump_id - 1] =
634                Instant::now() + self.durations_until_next_dosing[pump_id - 1];
635        }
636
637        loop {
638            if self.config.active
639                && (loop_counter % (self.config.schedule_check_interval * 10) == 0)
640            {
641                // this code only executes when the .toml file is configured accordingly (.active)
642                // and the monitoring period has passed (usually every 60 seconds)
643
644                // schedule check to see if actuation is allowed
645                perform_schedule_check!(
646                    balling_channels,
647                    schedule_check_result,
648                    self.lock_error_channel_send_schedule_check,
649                    self.lock_error_channel_receive_schedule_check,
650                    module_path!() // Pass the current module path for accurate logging
651                );
652
653                // debug information
654                #[cfg(feature = "debug_balling")]
655                {
656                    // output countdowns every 10 seconds
657                    debug!(
658                        target: module_path!(),
659                        "countdown 1 = {}, countdown 2 = {}, countdown 3 = {}, countdown 4 = {}",
660                        self.countdown_dosing_pump1,
661                        self.countdown_dosing_pump2,
662                        self.countdown_dosing_pump3,
663                        self.countdown_dosing_pump4,
664                    );
665                }
666                #[allow(clippy::needless_range_loop)]
667                for pump_array_index in 0..=3 {
668                    if self.pump_is_active[pump_array_index]
669                        && target_instants_dosing[pump_array_index] <= Instant::now()
670                        && schedule_check_result
671                        && !dosing_inhibited
672                    {
673                        let pump_id = pump_array_index as u32 + 1;
674                        let pump_device = match pump_id {
675                            1 => AquariumDevice::PeristalticPump1,
676                            2 => AquariumDevice::PeristalticPump2,
677                            3 => AquariumDevice::PeristalticPump3,
678                            4 => AquariumDevice::PeristalticPump4,
679                            _ => unreachable!(),
680                        };
681                        // inner scope to limit the lifetime of unlocked mutex
682                        {
683                            let mut mutex_data = mutex_device_scheduler_balling.lock().unwrap();
684                            quit_command_received = self.execute_dosing(
685                                pump_id,
686                                pump_device,
687                                mineral_injection,
688                                balling_channels,
689                                &mut sql_interface_balling,
690                            );
691                            *mutex_data = mutex_data.saturating_add(1);
692                        }
693                        if quit_command_received {
694                            break; // Quit command received during dosing
695                        }
696                        // wait a little period between dosings of each pump
697                        spin_sleeper.sleep(sleep_duration_pump_switch);
698                        target_instants_dosing[pump_array_index] +=
699                            self.dosing_intervals[pump_array_index];
700                    }
701                }
702            }
703
704            // communication with other threads has to happen more frequently
705            (
706                quit_command_received,
707                start_command_received,
708                stop_command_received,
709            ) = self.process_external_request(
710                &mut balling_channels.rx_balling_from_signal_handler,
711                balling_channels.rx_balling_from_messaging_opt.as_mut(),
712            );
713
714            if self.execute_command_received && self.config.active {
715                let pump_device_opt = Self::get_pump_from_id(self.pump_id_requested);
716                match pump_device_opt {
717                    Some((pump_id, pump_device)) => {
718                        #[cfg(not(test))] // reduce terminal output when testing
719                        info!(
720                            target: module_path!(),
721                            "executing external request for actuation of pump #{}",
722                            self.pump_id_requested
723                        );
724                        // inner scope to limit the lifetime of unlocked mutex
725                        {
726                            let mut mutex_data = mutex_device_scheduler_balling.lock().unwrap();
727                            quit_command_received = self.execute_dosing(
728                                pump_id,
729                                pump_device,
730                                mineral_injection,
731                                balling_channels,
732                                &mut sql_interface_balling,
733                            );
734                            *mutex_data = mutex_data.saturating_add(1);
735                        }
736                    }
737                    None => {
738                        info!(
739                            target: module_path!(),
740                            "ignoring external request for actuation of pump #{}",
741                            self.pump_id_requested
742                        );
743                    }
744                }
745            }
746
747            if stop_command_received {
748                #[cfg(test)]
749                println!(
750                    "{}: received Stop command. Inhibiting dosing.",
751                    module_path!()
752                );
753
754                dosing_inhibited = true;
755            }
756            if start_command_received {
757                #[cfg(test)]
758                println!(
759                    "{}: received Start command. (Re-)starting dosing.",
760                    module_path!()
761                );
762
763                dosing_inhibited = false;
764            }
765
766            check_quit_increment_counter_ping_database!(
767                quit_command_received,
768                spin_sleeper,
769                sleep_duration_hundred_millis,
770                loop_counter,
771                self,
772                &mut sql_interface_balling
773            );
774        }
775
776        balling_channels.acknowledge_signal_handler();
777
778        // This thread has channel connections to underlying threads.
779        // Those threads have to stop receiving commands from this thread.
780        // The shutdown sequence is handled by the signal_handler module.
781        self.wait_for_termination(
782            &mut balling_channels.rx_balling_from_signal_handler,
783            sleep_duration_hundred_millis,
784            module_path!(),
785        );
786    }
787}
788
789#[cfg(test)]
790pub mod tests {
791    use crate::database::sql_interface::SqlInterface;
792    use crate::database::sql_interface_balling::{BallingSetVal, SqlInterfaceBalling};
793    use crate::database::sql_interface_error::SqlInterfaceError;
794    use crate::database::sql_query_strings::SQL_TABLE_BALLING_DOSING_LOG;
795    use crate::launch::channels::Channels;
796    use crate::mineral::balling::{Balling, BallingError};
797    use crate::mocks::mock_mineral_injection::tests::MockMineralInjection;
798    use crate::mocks::mock_schedule_check::tests::mock_schedule_check;
799    use crate::utilities::channel_content::AquariumDevice::{
800        PeristalticPump1, PeristalticPump2, PeristalticPump3, PeristalticPump4,
801    };
802    use crate::utilities::channel_content::InternalCommand;
803    use crate::utilities::config::{
804        read_config_file, read_config_file_with_test_database, ConfigData,
805    };
806    use spin_sleep::SpinSleeper;
807    use std::sync::{Arc, Mutex};
808    use std::thread::scope;
809    use std::time::Duration;
810
811    fn read_balling_config() -> ConfigData {
812        read_config_file("/config/aquarium_control_test_generic.toml".to_string()).unwrap()
813    }
814
815    fn read_balling_config_with_database(db_number: u32) -> ConfigData {
816        read_config_file_with_test_database(
817            "/config/aquarium_control_test_generic.toml".to_string(),
818            db_number,
819        )
820    }
821
822    #[test]
823    // This test case checks if the configuration file contains a valid value for the dosing interval.
824    // It does not operate on any data from SQL database.
825    pub fn test_balling_initialization_invalid_intervals() {
826        let mut config: ConfigData = read_balling_config();
827
828        config.balling.dosing_interval_pump1 = 0;
829        config.balling.dosing_interval_pump2 = 0;
830        config.balling.dosing_interval_pump3 = 0;
831        config.balling.dosing_interval_pump4 = 0;
832
833        println!("Testing with database {}", config.sql_interface.db_name);
834        let max_rows_balling_set_values = config.sql_interface.max_rows_balling_set_values;
835        let max_rows_balling_dosing_log = config.sql_interface.max_rows_balling_dosing_log;
836        let sql_interface = match SqlInterface::new(config.sql_interface) {
837            Ok(c) => c,
838            Err(e) => {
839                panic!("Could not connect to SQL database: {e:?}");
840            }
841        };
842        let mut sql_interface_balling = SqlInterfaceBalling::new(
843            sql_interface.get_connection().unwrap(),
844            max_rows_balling_set_values,
845            max_rows_balling_dosing_log,
846        )
847        .unwrap();
848
849        // create the test object
850        let test_result = Balling::new(
851            config.balling,
852            Duration::from_millis(100),
853            &mut sql_interface_balling,
854        );
855        match test_result {
856            Ok(balling_controller) => {
857                panic!(
858                    "Successfully initialized Balling dosing, although error expected: {}",
859                    balling_controller
860                );
861            }
862            Err(e) => {
863                println!("Balling initialization correctly returned error: {e:?}. Checking if error type is correct...");
864                assert!(matches!(e, BallingError::InvalidDosingInterval(_, _)));
865            }
866        }
867    }
868
869    #[test]
870    // When starting, the application checks when the last dosing events took place by analyzing
871    // data from the database. As a prerequisite for this test case, an empty dosing log is required.
872    // Test case uses test database #01.
873    pub fn test_balling_initialization_empty_dosing_log() {
874        let config: ConfigData = read_balling_config_with_database(1);
875
876        println!("Testing with database {}", config.sql_interface.db_name);
877        let max_rows_balling_set_values = config.sql_interface.max_rows_balling_set_values;
878        let max_rows_balling_dosing_log = config.sql_interface.max_rows_balling_dosing_log;
879        let mut sql_interface = match SqlInterface::new(config.sql_interface) {
880            Ok(c) => c,
881            Err(e) => {
882                panic!("Could not connect to SQL database: {e:?}");
883            }
884        };
885        let mut sql_interface_balling = SqlInterfaceBalling::new(
886            sql_interface.get_connection().unwrap(),
887            max_rows_balling_set_values,
888            max_rows_balling_dosing_log,
889        )
890        .unwrap();
891
892        // empty the Balling dosing log table
893        SqlInterface::truncate_table(&mut sql_interface, SQL_TABLE_BALLING_DOSING_LOG.to_string())
894            .expect("Could not prepare test case");
895
896        // insert balling pump configuration
897        insert_pump_configurations(
898            &mut sql_interface,
899            &mut sql_interface_balling,
900            true,
901            true,
902            true,
903            true,
904        );
905
906        // create the test object
907        let balling = Balling::new(
908            config.balling,
909            Duration::from_millis(100),
910            &mut sql_interface_balling,
911        )
912        .unwrap();
913
914        // check if the countdown calculation was correct
915        assert_eq!(balling.durations_until_next_dosing[0].as_secs(), 0);
916        assert_eq!(balling.durations_until_next_dosing[1].as_secs(), 0);
917        assert_eq!(balling.durations_until_next_dosing[2].as_secs(), 0);
918        assert_eq!(balling.durations_until_next_dosing[3].as_secs(), 0);
919    }
920
921    // Helper function to prepare for test cases which require a filled dosing log.
922    // This function inserts dosing events for each of the four pumps.
923    //
924    // Arguments:
925    // - `sql_interface`: A mutable reference to the main `SqlInterface` for database operations,
926    //                    used here specifically for truncating the dosing log table.
927    // - `sql_interface_balling`: A mutable reference to the `SqlInterfaceBalling`,
928    //                            used for inserting balling dosing events.
929    //
930    // The time of the dosing events is set to:
931    // * 100 seconds in the past for pump #1
932    // * 200 seconds in the past for pump #2
933    // * 300 seconds in the past for pump #3
934    // * 400 seconds in the past for pump #4
935    //
936    // All SQL operations are expected to succeed; any failure will cause a panic,
937    // indicating a test setup issue.
938    fn insert_past_dosing_events(
939        sql_interface: &mut SqlInterface,
940        sql_interface_balling: &mut SqlInterfaceBalling,
941    ) {
942        // empty the Balling dosing log table
943        match SqlInterface::truncate_table(sql_interface, SQL_TABLE_BALLING_DOSING_LOG.to_string())
944        {
945            Ok(_) => {}
946            Err(e) => {
947                panic!("Could not prepare test case: {e:?}")
948            }
949        }
950        let naive_datetime_100secs_ago = SqlInterface::get_naive_timestamp_from_past(0, 100);
951        let naive_datetime_200secs_ago = SqlInterface::get_naive_timestamp_from_past(0, 200);
952        let naive_datetime_300secs_ago = SqlInterface::get_naive_timestamp_from_past(0, 300);
953        let naive_datetime_400secs_ago = SqlInterface::get_naive_timestamp_from_past(0, 400);
954
955        // insert balling dosing events
956        sql_interface_balling
957            .insert_balling_event(naive_datetime_100secs_ago, 1, 0.5)
958            .expect("Could not insert Balling dosing event for pump #1 into database");
959
960        sql_interface_balling
961            .insert_balling_event(naive_datetime_200secs_ago, 2, 0.5)
962            .expect("Could not insert Balling dosing event for pump #2 into database");
963
964        sql_interface_balling
965            .insert_balling_event(naive_datetime_300secs_ago, 3, 0.5)
966            .expect("Could not insert Balling dosing event for pump #3 into database");
967
968        sql_interface_balling
969            .insert_balling_event(naive_datetime_400secs_ago, 4, 0.5)
970            .expect("Could not insert Balling dosing event for pump #4 into database");
971    }
972
973    #[test]
974    // Test case checks if the countdowns for each pump are calculated correctly using
975    // a dosing log in the database which has entries for all pumps.
976    // Additionally, the test case checks the maximum row limitation for the dosing log.
977    // Test case uses test database #02.
978    pub fn test_balling_initialization_full_dosing_log() {
979        let config: ConfigData = read_balling_config_with_database(2);
980
981        println!("Testing with database {}", config.sql_interface.db_name);
982        let max_rows_balling_set_values = config.sql_interface.max_rows_balling_set_values;
983        let max_rows_balling_dosing_log = config.sql_interface.max_rows_balling_dosing_log;
984        let mut sql_interface = match SqlInterface::new(config.sql_interface) {
985            Ok(c) => c,
986            Err(e) => {
987                panic!("Could not connect to SQL database: {e:?}");
988            }
989        };
990        let mut sql_interface_balling = SqlInterfaceBalling::new(
991            sql_interface.get_connection().unwrap(),
992            max_rows_balling_set_values,
993            max_rows_balling_dosing_log,
994        )
995        .unwrap();
996
997        insert_pump_configurations(
998            &mut sql_interface,
999            &mut sql_interface_balling,
1000            true,
1001            true,
1002            true,
1003            true,
1004        );
1005        insert_past_dosing_events(&mut sql_interface, &mut sql_interface_balling);
1006
1007        // create the test object
1008        let balling = Balling::new(
1009            config.balling,
1010            Duration::from_millis(100),
1011            &mut sql_interface_balling,
1012        )
1013        .unwrap();
1014
1015        // check if the countdown calculation was correct:
1016        // the countdown depends on the pump-specific dosing interval and the timing of the last dosing event.
1017        assert_eq!(balling.durations_until_next_dosing[0].as_secs(), 100);
1018        assert_eq!(balling.durations_until_next_dosing[1].as_secs(), 0);
1019        assert_eq!(balling.durations_until_next_dosing[2].as_secs(), 0);
1020        assert_eq!(balling.durations_until_next_dosing[3].as_secs(), 0);
1021
1022        // dosing log has four entries
1023        let test_result = SqlInterfaceBalling::new(sql_interface.get_connection().unwrap(), 0, 1);
1024        assert!(matches!(
1025            test_result,
1026            Err(SqlInterfaceError::DatabaseBallingDosingLogTableContainsTooManyRows(_, _, _))
1027        ));
1028    }
1029
1030    // Helper function to prepare for test cases which require balling pump configuration.
1031    // This function first truncates the pump configuration table and then
1032    // inserts a balling pump configuration into the database for each of the four pumps.
1033    //
1034    // Arguments:
1035    // - `sql_interface`: A mutable reference to the main `SqlInterface` for database operations,
1036    //                    used here specifically for truncating the balling set values table.
1037    // - `sql_interface_balling`: A mutable reference to the `SqlInterfaceBalling`,
1038    //                            used for inserting balling set values.
1039    // - `pump1`: flag indicating that a pump configuration for pump #1 should be inserted.
1040    // - `pump2`: flag indicating that a pump configuration for pump #2 should be inserted.
1041    // - `pump3`: flag indicating that a pump configuration for pump #3 should be inserted.
1042    // - `pump4`: flag indicating that a pump configuration for pump #4 should be inserted.
1043    //
1044    // All SQL operations are expected to succeed; any failure will cause a panic,
1045    // indicating a test setup issue.
1046    fn insert_pump_configurations(
1047        sql_interface: &mut SqlInterface,
1048        sql_interface_balling: &mut SqlInterfaceBalling,
1049        pump1: bool,
1050        pump2: bool,
1051        pump3: bool,
1052        pump4: bool,
1053    ) {
1054        // empty the Balling pump configuration table
1055        match SqlInterface::truncate_table(sql_interface, "ballingsetvals".to_string()) {
1056            Ok(_) => {}
1057            Err(e) => {
1058                panic!("Could not prepare test case: {e:?}")
1059            }
1060        }
1061        if pump1 {
1062            match sql_interface_balling.insert_pump_configuration(1, 4.0, 1.0, "pump1".to_string())
1063            {
1064                Ok(()) => { /* do nothing */ }
1065                Err(e) => {
1066                    panic!(
1067                        "{}: Could not insert pump 1 configuration ({e:?})",
1068                        module_path!(),
1069                    );
1070                }
1071            }
1072        }
1073        if pump2 {
1074            match sql_interface_balling.insert_pump_configuration(2, 4.0, 1.0, "pump2".to_string())
1075            {
1076                Ok(()) => { /* do nothing */ }
1077                Err(e) => {
1078                    panic!(
1079                        "{}: Could not insert pump 2 configuration ({e:?})",
1080                        module_path!(),
1081                    );
1082                }
1083            }
1084        }
1085        if pump3 {
1086            match sql_interface_balling.insert_pump_configuration(3, 4.0, 1.0, "pump3".to_string())
1087            {
1088                Ok(()) => { /* do nothing */ }
1089                Err(e) => {
1090                    panic!(
1091                        "{}: Could not insert pump 3 configuration ({e:?})",
1092                        module_path!(),
1093                    );
1094                }
1095            }
1096        }
1097        if pump4 {
1098            match sql_interface_balling.insert_pump_configuration(4, 4.0, 1.0, "pump4".to_string())
1099            {
1100                Ok(()) => { /* do nothing */ }
1101                Err(e) => {
1102                    panic!(
1103                        "{}: Could not insert pump 4 configuration ({e:?})",
1104                        module_path!(),
1105                    );
1106                }
1107            }
1108        }
1109    }
1110
1111    // This function executes the test object and test environment in separate threads.
1112    // The test case depends on the configuration and reference values provided for the asserts.
1113    pub fn test_balling_dosing(
1114        test_db_number: u32,
1115        schedule_allows: bool,
1116        execution_count_reference: i32,
1117        duration_millis: u64,
1118        pump1_engagement_count_reference: usize,
1119        pump2_engagement_count_reference: usize,
1120        pump3_engagement_count_reference: usize,
1121        pump4_engagement_count_reference: usize,
1122        schedule_check_interval: u32,
1123        dosing_intervals: Option<(u32, u32, u32, u32)>,
1124    ) {
1125        let sleep_duration_test_environment = Duration::from_millis(duration_millis);
1126        let spin_sleeper_test_environment = SpinSleeper::default();
1127        let mut config: ConfigData = read_balling_config_with_database(test_db_number);
1128
1129        config.balling.pump1_active = pump1_engagement_count_reference > 0;
1130        config.balling.pump2_active = pump2_engagement_count_reference > 0;
1131        config.balling.pump3_active = pump3_engagement_count_reference > 0;
1132        config.balling.pump4_active = pump4_engagement_count_reference > 0;
1133        config.balling.schedule_check_interval = schedule_check_interval;
1134
1135        match dosing_intervals {
1136            Some((
1137                dosing_interval_pump1,
1138                dosing_interval_pump2,
1139                dosing_interval_pump3,
1140                dosing_interval_pump4,
1141            )) => {
1142                config.balling.dosing_interval_pump1 = dosing_interval_pump1;
1143                config.balling.dosing_interval_pump2 = dosing_interval_pump2;
1144                config.balling.dosing_interval_pump3 = dosing_interval_pump3;
1145                config.balling.dosing_interval_pump4 = dosing_interval_pump4;
1146            }
1147            None => { /* do nothing */ }
1148        }
1149
1150        println!("Testing with database {}", config.sql_interface.db_name);
1151        let max_rows_balling_set_values = config.sql_interface.max_rows_balling_set_values;
1152        let max_rows_balling_dosing_log = config.sql_interface.max_rows_balling_dosing_log;
1153        let mut sql_interface = match SqlInterface::new(config.sql_interface) {
1154            Ok(c) => c,
1155            Err(e) => {
1156                panic!("Could not connect to SQL database: {e:?}");
1157            }
1158        };
1159        let mut sql_interface_balling = SqlInterfaceBalling::new(
1160            sql_interface.get_connection().unwrap(),
1161            max_rows_balling_set_values,
1162            max_rows_balling_dosing_log,
1163        )
1164        .unwrap();
1165
1166        insert_pump_configurations(
1167            &mut sql_interface,
1168            &mut sql_interface_balling,
1169            pump1_engagement_count_reference > 0,
1170            pump2_engagement_count_reference > 0,
1171            pump3_engagement_count_reference > 0,
1172            pump4_engagement_count_reference > 0,
1173        );
1174        insert_past_dosing_events(&mut sql_interface, &mut sql_interface_balling);
1175
1176        let mut channels = Channels::new_for_test();
1177
1178        let mutex_device_scheduler = Arc::new(Mutex::new(0));
1179        let mutex_device_scheduler_environment = mutex_device_scheduler.clone();
1180
1181        // create the test object
1182        let mut balling = Balling::new(
1183            config.balling,
1184            Duration::from_millis(100),
1185            &mut sql_interface_balling,
1186        )
1187        .unwrap();
1188
1189        let mut mock_mineral_injection = MockMineralInjection::new();
1190
1191        scope(|scope| {
1192            // thread for schedule check
1193            scope.spawn(move || {
1194                mock_schedule_check(
1195                    &mut channels.schedule_check.tx_schedule_check_to_balling,
1196                    &mut channels.schedule_check.rx_schedule_check_from_balling,
1197                    None,
1198                    schedule_allows,
1199                )
1200            });
1201
1202            // thread for the rest of test environment
1203            scope.spawn(move || {
1204                let execution_count: i32;
1205                spin_sleeper_test_environment.sleep(sleep_duration_test_environment);
1206
1207                // internal scope to for releasing mutex after read
1208                {
1209                    execution_count = *mutex_device_scheduler_environment.lock().unwrap();
1210                }
1211
1212                println!("test_balling_dosing: execution_count={}", execution_count);
1213                assert_eq!(execution_count, execution_count_reference);
1214
1215                // now we can send the quit command
1216                let _ = channels
1217                    .signal_handler
1218                    .send_to_balling(InternalCommand::Quit);
1219                channels.signal_handler.receive_from_balling().unwrap();
1220                println!(
1221                    "test_balling_dosing: Received confirmation of Quit command from test object."
1222                );
1223                let _ = channels
1224                    .signal_handler
1225                    .send_to_balling(InternalCommand::Terminate);
1226            });
1227
1228            // thread for the test object
1229            scope.spawn(move || {
1230                let mut tx_balling_to_schedule_check_for_test_case_finish =
1231                    channels.balling.tx_balling_to_schedule_check.clone();
1232
1233                balling.execute(
1234                    mutex_device_scheduler,
1235                    &mut channels.balling,
1236                    &mut mock_mineral_injection,
1237                    sql_interface_balling,
1238                );
1239
1240                let _ =
1241                    tx_balling_to_schedule_check_for_test_case_finish.send(InternalCommand::Quit);
1242
1243                println!("{}", mock_mineral_injection);
1244
1245                assert_eq!(
1246                    mock_mineral_injection.pump1_engagement_durations.len(),
1247                    pump1_engagement_count_reference
1248                );
1249                assert_eq!(
1250                    mock_mineral_injection.pump2_engagement_durations.len(),
1251                    pump2_engagement_count_reference
1252                );
1253                assert_eq!(
1254                    mock_mineral_injection.pump3_engagement_durations.len(),
1255                    pump3_engagement_count_reference
1256                );
1257                assert_eq!(
1258                    mock_mineral_injection.pump4_engagement_durations.len(),
1259                    pump4_engagement_count_reference
1260                );
1261            });
1262        });
1263    }
1264
1265    #[test]
1266    // The test case executes the test object with the following boundaries:
1267    // - pump #1 is active
1268    // - the schedule checker is allowing operation
1269    // Test case uses test database #03.
1270    pub fn test_balling_dosing_schedule_allows_single_dosing() {
1271        test_balling_dosing(
1272            3,                        // index for determining test the database
1273            true,                     // schedule allows
1274            1,                        // execution count
1275            10000,                    // maximum time in milliseconds for test execution
1276            1,                        // number of dosing events for pump #1
1277            0,                        // number of dosing events for pump #2
1278            0,                        // number of dosing events for pump #3
1279            0,                        // number of dosing events for pump #4
1280            40,                       // schedule check interval
1281            Some((50, 200, 100, 50)), // dosing intervals
1282        );
1283    }
1284
1285    #[test]
1286    // The test case executes the test object with the following boundaries:
1287    // - all pumps are active
1288    // - schedule checker is not allowing operation
1289    // Test case uses test database #04.
1290    pub fn test_balling_dosing_schedule_forbids() {
1291        test_balling_dosing(
1292            4,     // index for determining the test database
1293            false, // schedule does not allow
1294            0,     // execution count
1295            10000, // maximum time in milliseconds for test execution
1296            0,     // number of dosings for pump #1
1297            0,     // number of dosings for pump #2
1298            0,     // number of dosings for pump #3
1299            0,     // number of dosings for pump #4
1300            60,    // schedule check interval
1301            None,
1302        );
1303    }
1304
1305    #[test]
1306    // The test case executes the test object with the following boundaries:
1307    // - all pumps are active
1308    // - the schedule checker is allowing operation
1309    // - dosing intervals are short so that lots of dosing events are triggered
1310    // Test case uses test database #05.
1311    pub fn test_balling_dosing_schedule_allows_multiple_dosings() {
1312        test_balling_dosing(
1313            5,                  // index for determining test database
1314            true,               // schedule allows
1315            28,                 // execution count
1316            34000,              // maximum time in milliseconds for test execution
1317            12,                 // number of dosing events for pump #1
1318            7,                  // number of dosing events for pump #2
1319            5,                  // number of dosing events for pump #3
1320            4,                  // number of dosing events for pump #4
1321            1,                  // schedule check interval
1322            Some((3, 5, 7, 9)), // dosing intervals
1323        );
1324    }
1325
1326    #[test]
1327    // Test case checks if the target dosing duration is calculated correctly
1328    // considering only valid input values.
1329    // The test case does not require any communication with the database.
1330    pub fn test_calc_target_dosing_duration_millis_happy_case() {
1331        let balling_setval = BallingSetVal {
1332            pump_id: 1,
1333            dosing_speed: 5.0,
1334            dosing_volume: 5.0,
1335            label: "test_calc_target_dosing_duration_millis_happy_case".to_string(),
1336        };
1337
1338        assert_eq!(
1339            Balling::calc_target_dosing_duration_millis(&balling_setval),
1340            1000
1341        );
1342    }
1343
1344    #[test]
1345    // Test case checks if the target dosing duration is calculated correctly
1346    // using zero dosing speed as input.
1347    // The test case does not require any communication with the database.
1348    pub fn test_calc_target_dosing_duration_millis_zero_dosing_speed() {
1349        let balling_setval = BallingSetVal {
1350            pump_id: 1,
1351            dosing_speed: 0.0,
1352            dosing_volume: 5.0,
1353            label: "test_calc_target_dosing_duration_millis_zero_dosing_speed".to_string(),
1354        };
1355
1356        assert_eq!(
1357            Balling::calc_target_dosing_duration_millis(&balling_setval),
1358            0
1359        );
1360    }
1361
1362    #[test]
1363    // Test case checks if the target dosing duration is calculated correctly
1364    // using negative dosing speed as input.
1365    // The test case does not require any communication with the database.
1366    pub fn test_calc_target_dosing_duration_millis_negative_dosing_speed() {
1367        let balling_setval = BallingSetVal {
1368            pump_id: 1,
1369            dosing_speed: -5.0,
1370            dosing_volume: 5.0,
1371            label: "test_calc_target_dosing_duration_millis_negative_dosing_speed".to_string(),
1372        };
1373
1374        assert_eq!(
1375            Balling::calc_target_dosing_duration_millis(&balling_setval),
1376            0
1377        );
1378    }
1379
1380    #[test]
1381    // Test case checks if the target dosing duration is calculated correctly
1382    // using a zero dosing volume as input.
1383    // The test case does not require any communication with the database.
1384    pub fn test_calc_target_dosing_duration_millis_zero_dosing_volume() {
1385        let balling_setval = BallingSetVal {
1386            pump_id: 1,
1387            dosing_speed: 5.0,
1388            dosing_volume: 0.0,
1389            label: "test_calc_target_dosing_duration_millis_zero_dosing_volume".to_string(),
1390        };
1391
1392        assert_eq!(
1393            Balling::calc_target_dosing_duration_millis(&balling_setval),
1394            0
1395        );
1396    }
1397
1398    #[test]
1399    // Test case checks if the target dosing duration is calculated correctly
1400    // using negative dosing volume as input.
1401    // The test case does not require any communication with the database.
1402    pub fn test_calc_target_dosing_duration_millis_negative_dosing_volume() {
1403        let balling_setval = BallingSetVal {
1404            pump_id: 1,
1405            dosing_speed: 5.0,
1406            dosing_volume: -5.0,
1407            label: "test_calc_target_dosing_duration_millis_negative_dosing_volume".to_string(),
1408        };
1409
1410        assert_eq!(
1411            Balling::calc_target_dosing_duration_millis(&balling_setval),
1412            0
1413        );
1414    }
1415
1416    #[test]
1417    // Test case checks if the actual dosing duration is calculated correctly
1418    // considering only valid input values.
1419    // The test case does not require any communication with the database.
1420    pub fn test_calc_actual_dosing_volume_happy_case() {
1421        let balling_setval = BallingSetVal {
1422            pump_id: 1,
1423            dosing_speed: 5.0,
1424            dosing_volume: 5.0,
1425            label: "test_calc_actual_dosing_volume_happy_case".to_string(),
1426        };
1427
1428        assert_eq!(
1429            Balling::calc_actual_dosing_volume(&balling_setval, 1000),
1430            5.0
1431        );
1432    }
1433
1434    #[test]
1435    // Test case checks if the actual dosing duration is calculated correctly
1436    // using zero dosing speed as input.
1437    // The test case does not require any communication with the database.
1438    pub fn test_calc_actual_dosing_volume_zero_dosing_speed() {
1439        let balling_setval = BallingSetVal {
1440            pump_id: 1,
1441            dosing_speed: 0.0,
1442            dosing_volume: 5.0,
1443            label: "test_calc_actual_dosing_volume_zero_dosing_speed".to_string(),
1444        };
1445
1446        assert_eq!(
1447            Balling::calc_actual_dosing_volume(&balling_setval, 1000),
1448            0.0
1449        );
1450    }
1451
1452    #[test]
1453    // Test case checks if the actual dosing duration is calculated correctly
1454    // using negative dosing speed as input.
1455    // The test case does not require any communication with the database.
1456    pub fn test_calc_actual_dosing_volume_negative_dosing_speed() {
1457        let balling_setval = BallingSetVal {
1458            pump_id: 1,
1459            dosing_speed: 0.0,
1460            dosing_volume: 5.0,
1461            label: "test_calc_actual_dosing_volume_negative_dosing_speed".to_string(),
1462        };
1463
1464        assert_eq!(
1465            Balling::calc_actual_dosing_volume(&balling_setval, 1000),
1466            0.0
1467        );
1468    }
1469
1470    #[test]
1471    // Test case checks if the actual dosing duration is calculated correctly
1472    // using zero actual dosing duration as input.
1473    // The test case does not require any communication with the database.
1474    pub fn test_calc_actual_dosing_volume_zero_actual_dosing_duration() {
1475        let balling_setval = BallingSetVal {
1476            pump_id: 1,
1477            dosing_speed: 5.0,
1478            dosing_volume: 5.0,
1479            label: "test_calc_actual_dosing_volume_zero_actual_dosing_duration".to_string(),
1480        };
1481
1482        assert_eq!(Balling::calc_actual_dosing_volume(&balling_setval, 0), 0.0);
1483    }
1484
1485    #[test]
1486    // Test case runs Balling dosing control and triggers inhibition by sending the stop message.
1487    // After verification that Balling dosing is stopped,
1488    // The test case sends the start message and verifies if Balling dosing control
1489    // resumes operation.
1490    // Test case uses test database #06.
1491    pub fn test_messaging_stops_starts_balling_dosing() {
1492        let sleep_duration_100_millis = Duration::from_millis(100);
1493        let sleep_duration_7_secs = Duration::from_secs(7);
1494        let sleep_duration_2_secs = Duration::from_secs(2);
1495        let spin_sleeper = SpinSleeper::default();
1496
1497        let mut config: ConfigData = read_balling_config_with_database(6);
1498
1499        config.balling.pump2_active = false;
1500        config.balling.pump3_active = false;
1501        config.balling.pump4_active = false;
1502        config.balling.schedule_check_interval = 1;
1503        config.balling.dosing_interval_pump1 = 5;
1504
1505        let max_rows_balling_set_values = config.sql_interface.max_rows_balling_set_values;
1506        let max_rows_balling_dosing_log = config.sql_interface.max_rows_balling_dosing_log;
1507        let mut sql_interface = match SqlInterface::new(config.sql_interface) {
1508            Ok(c) => c,
1509            Err(e) => {
1510                panic!("Could not connect to SQL database: {e:?}");
1511            }
1512        };
1513        match SqlInterface::truncate_table(
1514            &mut sql_interface,
1515            SQL_TABLE_BALLING_DOSING_LOG.to_string(),
1516        ) {
1517            Ok(_) => {}
1518            Err(e) => {
1519                panic!("Could not prepare test case: {e:?}")
1520            }
1521        }
1522
1523        let mut sql_interface_balling = SqlInterfaceBalling::new(
1524            sql_interface.get_connection().unwrap(),
1525            max_rows_balling_set_values,
1526            max_rows_balling_dosing_log,
1527        )
1528        .unwrap();
1529
1530        // empty the Balling dosing log table
1531        match SqlInterface::truncate_table(
1532            &mut sql_interface,
1533            SQL_TABLE_BALLING_DOSING_LOG.to_string(),
1534        ) {
1535            Ok(_) => {}
1536            Err(e) => {
1537                panic!("Could not prepare test case: {e:?}")
1538            }
1539        }
1540
1541        // insert a valid pump configuration for pump #1
1542        insert_pump_configurations(
1543            &mut sql_interface,
1544            &mut sql_interface_balling,
1545            true,
1546            false,
1547            false,
1548            false,
1549        );
1550
1551        // create test objects
1552        let mut balling = Balling::new(
1553            config.balling,
1554            Duration::from_millis(100),
1555            &mut sql_interface_balling,
1556        )
1557        .unwrap();
1558
1559        let mut channels = Channels::new_for_test();
1560
1561        let mut tx_balling_to_schedule_check_for_test_case_finish =
1562            channels.balling.tx_balling_to_schedule_check.clone();
1563
1564        let mutex_device_scheduler = Arc::new(Mutex::new(0));
1565
1566        let mut mock_mineral_injection = MockMineralInjection::new();
1567
1568        scope(|scope| {
1569            // thread for mock schedule check: runs until it receives Quit command
1570            // thread for schedule check
1571            scope.spawn(move || {
1572                mock_schedule_check(
1573                    &mut channels.schedule_check.tx_schedule_check_to_balling,
1574                    &mut channels.schedule_check.rx_schedule_check_from_balling,
1575                    None,
1576                    true,
1577                )
1578            });
1579
1580            let mutex_device_scheduler_balling_test_environment = mutex_device_scheduler.clone();
1581
1582            // thread IPC messaging - this scope controls execution of the test case
1583            scope.spawn(move || {
1584                // wait for 2 seconds to make sure the test object can receive the message
1585                for _ in 0..20 {
1586                    spin_sleeper.sleep(sleep_duration_100_millis);
1587                }
1588
1589                let actuation_count_0 = *mutex_device_scheduler_balling_test_environment
1590                    .lock()
1591                    .unwrap();
1592                // check the initial amount of actuation
1593                assert_eq!(actuation_count_0, 1);
1594
1595                // sending the message requesting stop of Balling dosing control
1596                match channels
1597                    .messaging
1598                    .tx_messaging_to_balling
1599                    .send(InternalCommand::Stop)
1600                {
1601                    Ok(()) => { /* do nothing */ }
1602                    Err(e) => {
1603                        panic!(
1604                            "{}: error when sending stop command to test object ({e:?})",
1605                            module_path!()
1606                        );
1607                    }
1608                }
1609
1610                // Wait for 7 seconds. If the test object does not follow the stop request,
1611                // then dosings will happen in this period.
1612                spin_sleeper.sleep(sleep_duration_7_secs);
1613
1614                let actuation_count_1 = *mutex_device_scheduler_balling_test_environment
1615                    .lock()
1616                    .unwrap();
1617                // check the amount of actuation (after stop request)
1618                assert_eq!(actuation_count_1, 1);
1619
1620                // sending the message requesting restart of Balling dosing control
1621                match channels
1622                    .messaging
1623                    .tx_messaging_to_balling
1624                    .send(InternalCommand::Start)
1625                {
1626                    Ok(()) => { /* do nothing */ }
1627                    Err(e) => {
1628                        panic!(
1629                            "{}: error when sending start command to test object ({e:?})",
1630                            module_path!()
1631                        );
1632                    }
1633                }
1634
1635                // Wait for 2 seconds. If the test object follows the start request,
1636                // then dosings will happen in this period.
1637                spin_sleeper.sleep(sleep_duration_2_secs);
1638
1639                let actuation_count_2 = *mutex_device_scheduler_balling_test_environment
1640                    .lock()
1641                    .unwrap();
1642                // check the amount of actuation (after start request)
1643                assert_eq!(actuation_count_2, 3);
1644
1645                // requesting Balling dosing control to quit
1646                let _ = channels
1647                    .signal_handler
1648                    .send_to_balling(InternalCommand::Quit);
1649                channels.signal_handler.receive_from_balling().unwrap();
1650
1651                let actuation_count_3 = *mutex_device_scheduler_balling_test_environment
1652                    .lock()
1653                    .unwrap();
1654                // check the amount of actuation (after Quit command)
1655                assert_eq!(actuation_count_3, 3);
1656                let _ = channels
1657                    .signal_handler
1658                    .send_to_balling(InternalCommand::Terminate);
1659            });
1660
1661            spin_sleeper.sleep(sleep_duration_100_millis);
1662
1663            // thread for the test object and assertions
1664            scope.spawn(move || {
1665                balling.execute(
1666                    mutex_device_scheduler,
1667                    &mut channels.balling,
1668                    &mut mock_mineral_injection,
1669                    sql_interface_balling,
1670                );
1671                println!("MockMineralInjection:\n{}", mock_mineral_injection);
1672
1673                assert_eq!(mock_mineral_injection.pump1_engagement_durations.len(), 3);
1674                assert_eq!(mock_mineral_injection.pump2_engagement_durations.len(), 0);
1675                assert_eq!(mock_mineral_injection.pump3_engagement_durations.len(), 0);
1676                assert_eq!(mock_mineral_injection.pump4_engagement_durations.len(), 0);
1677
1678                assert_eq!(
1679                    mock_mineral_injection
1680                        .pump1_engagement_durations
1681                        .pop()
1682                        .unwrap(),
1683                    250
1684                );
1685                assert_eq!(
1686                    mock_mineral_injection
1687                        .pump1_engagement_durations
1688                        .pop()
1689                        .unwrap(),
1690                    250
1691                );
1692
1693                let _ =
1694                    tx_balling_to_schedule_check_for_test_case_finish.send(InternalCommand::Quit);
1695            });
1696
1697            println!(
1698                "* [Messaging] checking if messaging can stop and start balling dosing succeeded."
1699            );
1700        });
1701    }
1702
1703    #[test]
1704    // Test case checks if during instantiation of the struct, the implementation checks
1705    // if there is a database entry for a pump that is configured as active.
1706    // The code shall return an error of type BallingError if there is no matching database entry.
1707    // Test case uses test database #48.
1708    pub fn test_check_database_vs_config() {
1709        let mut config: ConfigData = read_balling_config_with_database(48);
1710
1711        config.balling.pump1_active = true;
1712        config.balling.pump2_active = false;
1713        config.balling.pump3_active = false;
1714        config.balling.pump4_active = false;
1715
1716        let max_rows_balling_set_values = config.sql_interface.max_rows_balling_set_values;
1717        let max_rows_balling_dosing_log = config.sql_interface.max_rows_balling_dosing_log;
1718        let mut sql_interface = match SqlInterface::new(config.sql_interface) {
1719            Ok(c) => c,
1720            Err(e) => {
1721                panic!("Could not connect to SQL database: {e:?}");
1722            }
1723        };
1724        match SqlInterface::truncate_table(
1725            &mut sql_interface,
1726            SQL_TABLE_BALLING_DOSING_LOG.to_string(),
1727        ) {
1728            Ok(_) => {}
1729            Err(e) => {
1730                panic!("Could not prepare test case: {e:?}")
1731            }
1732        }
1733        let mut sql_interface_balling = SqlInterfaceBalling::new(
1734            sql_interface.get_connection().unwrap(),
1735            max_rows_balling_set_values,
1736            max_rows_balling_dosing_log,
1737        )
1738        .unwrap();
1739
1740        // insert a valid pump configuration for pump #1
1741        insert_pump_configurations(
1742            &mut sql_interface,
1743            &mut sql_interface_balling,
1744            false,
1745            false,
1746            false,
1747            true, // a different pump is configured as active
1748        );
1749
1750        // create test objects
1751        let result = Balling::new(
1752            config.balling,
1753            Duration::from_millis(100),
1754            &mut sql_interface_balling,
1755        );
1756        assert!(matches!(
1757            result,
1758            Err(BallingError::SetValueRetrievalError {
1759                location: _,
1760                pump_id: _,
1761                source: _
1762            })
1763        ));
1764    }
1765
1766    #[test]
1767    // The test case executes the test object using dosing requests via the external interface.
1768    pub fn test_balling_dosing_external_requests() {
1769        let test_db_number = 49;
1770        let schedule_allows = true;
1771        let execution_count_reference: i32 = 4;
1772        let duration_millis = 5000;
1773        let sleep_duration_test_environment = Duration::from_millis(duration_millis);
1774        let spin_sleeper_test_environment = SpinSleeper::default();
1775        let mut config: ConfigData = read_balling_config_with_database(test_db_number);
1776
1777        config.balling.pump1_active = true;
1778        config.balling.dosing_interval_pump1 = 3600 * 24; // long interval to avoid regular dosing
1779        config.balling.pump2_active = true;
1780        config.balling.dosing_interval_pump2 = 3600 * 24; // long interval to avoid regular dosing
1781        config.balling.pump3_active = true;
1782        config.balling.dosing_interval_pump3 = 3600 * 24; // long interval to avoid regular dosing
1783        config.balling.pump4_active = true;
1784        config.balling.dosing_interval_pump4 = 3600 * 24; // long interval to avoid regular dosing
1785        config.balling.schedule_check_interval = 100;
1786
1787        println!("Testing with database {}", config.sql_interface.db_name);
1788        let max_rows_balling_set_values = config.sql_interface.max_rows_balling_set_values;
1789        let max_rows_balling_dosing_log = config.sql_interface.max_rows_balling_dosing_log;
1790        let mut sql_interface = match SqlInterface::new(config.sql_interface) {
1791            Ok(c) => c,
1792            Err(e) => {
1793                panic!("Could not connect to SQL database: {e:?}");
1794            }
1795        };
1796        match SqlInterface::truncate_table(
1797            &mut sql_interface,
1798            SQL_TABLE_BALLING_DOSING_LOG.to_string(),
1799        ) {
1800            Ok(_) => {}
1801            Err(e) => {
1802                panic!("Could not prepare test case: {e:?}")
1803            }
1804        }
1805        let mut sql_interface_balling = SqlInterfaceBalling::new(
1806            sql_interface.get_connection().unwrap(),
1807            max_rows_balling_set_values,
1808            max_rows_balling_dosing_log,
1809        )
1810        .unwrap();
1811
1812        insert_pump_configurations(
1813            &mut sql_interface,
1814            &mut sql_interface_balling,
1815            true,
1816            true,
1817            true,
1818            true,
1819        );
1820        insert_past_dosing_events(&mut sql_interface, &mut sql_interface_balling);
1821
1822        let mut channels = Channels::new_for_test();
1823
1824        let mutex_device_scheduler = Arc::new(Mutex::new(0));
1825        let mutex_device_scheduler_environment = mutex_device_scheduler.clone();
1826
1827        // create the test object
1828        let mut balling = Balling::new(
1829            config.balling,
1830            Duration::from_secs(1000),
1831            &mut sql_interface_balling,
1832        )
1833        .unwrap();
1834
1835        let mut mock_mineral_injection = MockMineralInjection::new();
1836
1837        scope(|scope| {
1838            // thread for schedule check
1839            scope.spawn(move || {
1840                mock_schedule_check(
1841                    &mut channels.schedule_check.tx_schedule_check_to_balling,
1842                    &mut channels.schedule_check.rx_schedule_check_from_balling,
1843                    None,
1844                    schedule_allows,
1845                )
1846            });
1847
1848            // thread for the rest of test environment
1849            scope.spawn(move || {
1850                let execution_count1: i32;
1851                spin_sleeper_test_environment.sleep(sleep_duration_test_environment);
1852                // internal scope to for releasing mutex after read
1853                {
1854                    execution_count1 = *mutex_device_scheduler_environment.lock().unwrap();
1855                }
1856                println!("test_balling_dosing: execution_count={}", execution_count1);
1857                assert_eq!(execution_count1, 0);
1858
1859                // send the dosing requests to Balling
1860                channels
1861                    .messaging
1862                    .tx_messaging_to_balling
1863                    .send(InternalCommand::Execute(2))
1864                    .unwrap();
1865                channels
1866                    .messaging
1867                    .tx_messaging_to_balling
1868                    .send(InternalCommand::Execute(4))
1869                    .unwrap();
1870                channels
1871                    .messaging
1872                    .tx_messaging_to_balling
1873                    .send(InternalCommand::Execute(3))
1874                    .unwrap();
1875                channels
1876                    .messaging
1877                    .tx_messaging_to_balling
1878                    .send(InternalCommand::Execute(1))
1879                    .unwrap();
1880
1881                let execution_count2: i32;
1882                spin_sleeper_test_environment.sleep(sleep_duration_test_environment);
1883                // internal scope to for releasing mutex after read
1884                {
1885                    execution_count2 = *mutex_device_scheduler_environment.lock().unwrap();
1886                }
1887                println!("test_balling_dosing: execution_count={}", execution_count2);
1888                assert_eq!(execution_count2, execution_count_reference);
1889
1890                // now we can send the quit command
1891                let _ = channels
1892                    .signal_handler
1893                    .send_to_balling(InternalCommand::Quit);
1894                channels.signal_handler.receive_from_balling().unwrap();
1895                println!(
1896                    "test_balling_dosing: Received confirmation of Quit command from test object."
1897                );
1898                let _ = channels
1899                    .signal_handler
1900                    .send_to_balling(InternalCommand::Terminate);
1901            });
1902
1903            // thread for the test object
1904            scope.spawn(move || {
1905                let mut tx_balling_to_schedule_check_for_test_case_finish =
1906                    channels.balling.tx_balling_to_schedule_check.clone();
1907
1908                balling.execute(
1909                    mutex_device_scheduler,
1910                    &mut channels.balling,
1911                    &mut mock_mineral_injection,
1912                    sql_interface_balling,
1913                );
1914
1915                let _ =
1916                    tx_balling_to_schedule_check_for_test_case_finish.send(InternalCommand::Quit);
1917
1918                println!("{}", mock_mineral_injection);
1919
1920                let reference_pump_protocol = vec![
1921                    PeristalticPump2,
1922                    PeristalticPump4,
1923                    PeristalticPump3,
1924                    PeristalticPump1,
1925                ];
1926
1927                assert_eq!(
1928                    mock_mineral_injection.pump_protocol,
1929                    reference_pump_protocol
1930                );
1931            });
1932        });
1933    }
1934
1935    #[test]
1936    // This test case checks if the code checks the dosing interval against the check interval.
1937    // It does not operate on any data from SQL database.
1938    pub fn test_balling_initialization_invalid_dosing_check_interval() {
1939        let mut config: ConfigData = read_balling_config();
1940
1941        config.balling.dosing_interval_pump1 = 100;
1942        config.balling.dosing_interval_pump2 = 100;
1943        config.balling.dosing_interval_pump3 = 100;
1944        config.balling.dosing_interval_pump4 = 100;
1945        config.balling.schedule_check_interval = 200;
1946
1947        let max_rows_balling_set_values = config.sql_interface.max_rows_balling_set_values;
1948        let max_rows_balling_dosing_log = config.sql_interface.max_rows_balling_dosing_log;
1949        let mut sql_interface = match SqlInterface::new(config.sql_interface) {
1950            Ok(c) => c,
1951            Err(e) => {
1952                panic!("Could not connect to SQL database: {e:?}");
1953            }
1954        };
1955        match SqlInterface::truncate_table(
1956            &mut sql_interface,
1957            SQL_TABLE_BALLING_DOSING_LOG.to_string(),
1958        ) {
1959            Ok(_) => {}
1960            Err(e) => {
1961                panic!("Could not prepare test case: {e:?}")
1962            }
1963        }
1964        let mut sql_interface_balling = SqlInterfaceBalling::new(
1965            sql_interface.get_connection().unwrap(),
1966            max_rows_balling_set_values,
1967            max_rows_balling_dosing_log,
1968        )
1969        .unwrap();
1970
1971        // create the test object
1972        let test_result = Balling::new(
1973            config.balling,
1974            Duration::from_secs(1000),
1975            &mut sql_interface_balling,
1976        );
1977        match test_result {
1978            Ok(balling_controller) => {
1979                panic!(
1980                    "Successfully initialized Balling dosing, although error expected: {}",
1981                    balling_controller
1982                );
1983            }
1984            Err(e) => {
1985                println!("Balling initialization correctly returned error: {e:?}. Checking if error type is correct...");
1986                assert!(matches!(
1987                    e,
1988                    BallingError::DosingIntervalShorterThanCheckInterval(_, _, _, _)
1989                ));
1990            }
1991        }
1992    }
1993
1994    #[test]
1995    // This test case checks that `Balling::new` returns an error when the
1996    // schedule_check_interval is set to zero in the configuration.
1997    pub fn test_balling_initialization_zero_check_interval() {
1998        let mut config: ConfigData = read_balling_config();
1999        config.balling.schedule_check_interval = 0;
2000
2001        let max_rows_balling_set_values = config.sql_interface.max_rows_balling_set_values;
2002        let max_rows_balling_dosing_log = config.sql_interface.max_rows_balling_dosing_log;
2003        let mut sql_interface = SqlInterface::new(config.sql_interface).unwrap();
2004        match SqlInterface::truncate_table(
2005            &mut sql_interface,
2006            SQL_TABLE_BALLING_DOSING_LOG.to_string(),
2007        ) {
2008            Ok(_) => {}
2009            Err(e) => {
2010                panic!("Could not prepare test case: {e:?}")
2011            }
2012        }
2013        let mut sql_interface_balling = SqlInterfaceBalling::new(
2014            sql_interface.get_connection().unwrap(),
2015            max_rows_balling_set_values,
2016            max_rows_balling_dosing_log,
2017        )
2018        .unwrap();
2019
2020        // create the test object
2021        let test_result = Balling::new(
2022            config.balling,
2023            Duration::from_secs(1000),
2024            &mut sql_interface_balling,
2025        );
2026
2027        match test_result {
2028            Ok(balling_controller) => {
2029                panic!(
2030                    "Successfully initialized Balling dosing, although an error was expected: {}",
2031                    balling_controller
2032                );
2033            }
2034            Err(e) => {
2035                println!("Balling initialization correctly returned error: {e:?}. Checking if error type is correct...");
2036                assert!(matches!(e, BallingError::ScheduleCheckIntervalZero(_)));
2037            }
2038        }
2039    }
2040}