1use mysql::prelude::*;
49use mysql::*;
50
51use crate::database::sql_interface::SqlInterface;
52use crate::database::sql_query_strings::SQL_QUERY_CHECK_DATA_COUNT;
53use crate::database::{sql_interface_error::SqlInterfaceError, sql_query_strings};
54use crate::recorder::recorder_data_frame::RecorderDataFrame;
55
56#[derive(Debug)]
58pub struct SqlInterfaceData {
59 pub conn: PooledConn,
61}
62
63impl SqlInterfaceData {
64 pub fn new(
84 mut conn: PooledConn,
85 max_rows_data: u64,
86 ) -> Result<SqlInterfaceData, SqlInterfaceError> {
87 let count_rows =
88 SqlInterface::get_single_integer_from_database(&mut conn, SQL_QUERY_CHECK_DATA_COUNT)
89 .map_err(|e| SqlInterfaceError::DatabaseCheckDataFailure {
90 location: module_path!().to_string(),
91 source: Box::new(e),
92 })?;
93
94 if count_rows < 0 {
96 return Err(SqlInterfaceError::DatabaseDataTableNegativeValue(
97 module_path!().to_string(),
98 count_rows,
99 ));
100 }
101
102 if max_rows_data > 0 {
103 if count_rows as u64 > max_rows_data {
105 return Err(SqlInterfaceError::DatabaseDataTableContainsTooManyRows(
106 module_path!().to_string(),
107 count_rows as u64,
108 max_rows_data,
109 ));
110 }
111 }
112 Ok(SqlInterfaceData { conn })
113 }
114
115 pub fn write_data_frame_to_database(
140 &mut self,
141 data: &RecorderDataFrame,
142 ) -> Result<(), SqlInterfaceError> {
143 let params = params! {
144 "timestamp" => data.timestamp,
145 "temp" => data.water_temperature,
146 "temp_f" => data.water_temperature_filtered,
147 "ph" => data.ph,
148 "ph_f" => data.ph_filtered,
149 "cond" => data.conductivity,
150 "cond_f" => data.conductivity_filtered,
151 "cond_c" => data.conductivity_compensated,
152 "refill" => data.refill_in_progress,
153 "tank_pos" => data.tank_level_switch_position,
154 "tank_inv" => data.tank_level_switch_invalid,
155 "tank_stab" => data.tank_level_switch_position_stabilized,
156 "vent" => data.surface_ventilation_status,
157 "amb_temp" => data.ambient_temperature,
158 "amb_hum" => data.ambient_humidity,
159 "heater" => data.heater_status,
160 };
161
162 self.conn
164 .exec_drop::<_, _>(sql_query_strings::SQL_QUERY_WRITE_DATA, params)
165 .map_err(|e| SqlInterfaceError::InsertDataFrameFailure {
166 location: module_path!().to_string(),
167 query: sql_query_strings::SQL_QUERY_WRITE_DATA.to_string(),
168 source: e,
169 })
170 }
171}
172
173#[cfg(test)]
174pub mod tests {
175 use crate::database::sql_interface_data::SqlInterfaceData;
176 use crate::database::sql_query_strings::SQL_TABLE_DATA;
177 use crate::database::{
178 sql_interface::SqlInterface, sql_interface_error::SqlInterfaceError, sql_query_strings,
179 };
180 use crate::recorder::recorder_data_frame::RecorderDataFrame;
181 use crate::utilities::config::{read_config_file_with_test_database, ConfigData};
182 use chrono::NaiveDateTime;
183 use mysql::{prelude::Queryable, PooledConn, Row, Value};
184 use spin_sleep::SpinSleeper;
185 use std::time::Duration;
186
187 #[test]
188 fn test_sql_interface_data_new() {
195 let config: ConfigData = read_config_file_with_test_database(
197 "/config/aquarium_control_test_generic.toml".to_string(),
198 58, );
200 println!("Testing with database {}", config.sql_interface.db_name);
201 let mut sql_interface: SqlInterface = SqlInterface::new(config.sql_interface)
202 .expect("Initialization of SQL interface for test failed.");
203
204 println!("* Testing new() with valid row count (Happy Path)...");
206 SqlInterface::truncate_table(&mut sql_interface, SQL_TABLE_DATA.to_string()).unwrap();
207
208 let mut data_db = SqlInterfaceData {
210 conn: sql_interface.get_connection().unwrap(),
211 };
212 let data_frame = RecorderDataFrame {
213 timestamp: chrono::Local::now().naive_local(),
214 ..Default::default()
215 };
216 data_db.write_data_frame_to_database(&data_frame).unwrap();
217
218 let result = SqlInterfaceData::new(sql_interface.get_connection().unwrap(), 10);
220 assert!(
221 result.is_ok(),
222 "Expected new() to succeed on happy path, but it failed: {:?}",
223 result.err()
224 );
225 println!("* Succeeded: Happy path initialization is successful.");
226
227 println!("* Testing new() with too many rows in the data table...");
229 let mut data_db = SqlInterfaceData {
231 conn: sql_interface.get_connection().unwrap(),
232 };
233 let data_frame_2 = RecorderDataFrame {
234 timestamp: (chrono::Local::now() + chrono::Duration::seconds(1)).naive_local(),
235 ..Default::default()
236 };
237 data_db.write_data_frame_to_database(&data_frame_2).unwrap();
238
239 let result = SqlInterfaceData::new(sql_interface.get_connection().unwrap(), 1);
241 assert!(
242 matches!(
243 result,
244 Err(SqlInterfaceError::DatabaseDataTableContainsTooManyRows(
245 _,
246 _,
247 _
248 ))
249 ),
250 "Expected row limit error, but got {:?}",
251 result
252 );
253 println!("* Succeeded: Initialization fails if data table exceeds row limit.");
254
255 println!("* Testing new() with deactivated row limit check (limit = 0)...");
257 let result = SqlInterfaceData::new(sql_interface.get_connection().unwrap(), 0); assert!(
260 result.is_ok(),
261 "Expected new() to succeed with deactivated check, but it failed: {:?}",
262 result.err()
263 );
264 println!("* Succeeded: Initialization passes when row limit check is deactivated.");
265 }
266
267 pub struct SqlDataFrame1 {
272 pub timestamp: String,
273 pub water_temperature: f32,
274 pub water_temperature_filtered: f32,
275 pub ph: f32,
276 pub ph_filtered: f32,
277 pub conductivity: f32,
278 pub conductivity_filtered: f32,
279 pub conductivity_compensated: f32,
280 }
281
282 pub struct SqlDataFrame2 {
287 pub refill_in_progress: bool,
288 pub tank_level_switch_position: bool,
289 pub tank_level_switch_invalid: bool,
290 pub tank_level_switch_position_stabilized: bool,
291 pub surface_ventilation_status: bool,
292 pub ambient_temperature: f32,
293 pub ambient_humidity: f32,
294 pub heater_status: bool,
295 }
296
297 pub fn get_single_dataframe_from_database(
322 conn: &mut PooledConn,
323 timestamp: NaiveDateTime,
324 ) -> Result<RecorderDataFrame, SqlInterfaceError> {
325 let timestamp_string = timestamp.format("%Y-%m-%d %H:%M:%S").to_string();
326 let sql_query_string_1 = str::replace(
328 sql_query_strings::SQL_QUERY_READ_DATA_FRAME_1,
329 "#timestamp",
330 ×tamp_string,
331 );
332 let data_frame_1_array: Vec<SqlDataFrame1> = match conn.query_map(
333 sql_query_string_1.clone(),
334 |(
335 timestamp,
336 water_temp,
337 water_temp_filt,
338 ph,
339 ph_filt,
340 conduc,
341 conduc_filt,
342 conduc_comp,
343 )| SqlDataFrame1 {
344 timestamp,
345 water_temperature: water_temp,
346 water_temperature_filtered: water_temp_filt,
347 ph,
348 ph_filtered: ph_filt,
349 conductivity: conduc,
350 conductivity_filtered: conduc_filt,
351 conductivity_compensated: conduc_comp,
352 },
353 ) {
354 Ok(c) => c,
355 Err(e) => {
356 panic!("Could not get data frame part 1 from SQL database: {e:?} using query string {}", sql_query_string_1);
357 }
358 };
359 let sql_query_string_2 = str::replace(
360 sql_query_strings::SQL_QUERY_READ_DATA_FRAME_2,
361 "#timestamp",
362 ×tamp_string,
363 );
364 let data_frame_2_array: Vec<SqlDataFrame2> = match conn.query_map(
365 sql_query_string_2.clone(),
366 |(
367 refill_in_progress,
368 tank_level_switch_position,
369 tank_level_switch_invalid,
370 tank_level_switch_position_stabilized,
371 surface_ventilation_status,
372 ambient_temperature,
373 ambient_humidity,
374 heater_status,
375 )| SqlDataFrame2 {
376 refill_in_progress,
377 tank_level_switch_position,
378 tank_level_switch_invalid,
379 tank_level_switch_position_stabilized,
380 surface_ventilation_status,
381 ambient_temperature,
382 ambient_humidity,
383 heater_status,
384 },
385 ) {
386 Ok(c) => c,
387 Err(e) => {
388 panic!("Could not get data frame part 2 from SQL database: {e:?} using query string {}", sql_query_string_2);
389 }
390 };
391
392 if (data_frame_1_array.len() == 0) || (data_frame_2_array.len() == 0) {
394 return Err(SqlInterfaceError::SingleDataFrameRequestEmptyResponse(
395 module_path!().to_string(),
396 sql_query_string_1,
397 sql_query_string_2,
398 ));
399 }
400 if (data_frame_2_array.len() > 1) || (data_frame_2_array.len() > 1) {
402 return Err(SqlInterfaceError::SingleDataFrameRequestNoSingleResponse(
403 module_path!().to_string(),
404 sql_query_string_1,
405 sql_query_string_2,
406 ));
407 }
408 let data_frame_1 = &data_frame_1_array[0];
409 let data_frame_2 = &data_frame_2_array[0];
410
411 Ok(RecorderDataFrame {
412 timestamp: NaiveDateTime::parse_from_str(&data_frame_1.timestamp, "%Y-%m-%d %H:%M:%S")
413 .expect("Failed to convert string to timestamp of type NaiveDateTime."),
414 water_temperature: Some(data_frame_1.water_temperature),
415 water_temperature_filtered: Some(data_frame_1.water_temperature_filtered),
416 ph: Some(data_frame_1.ph),
417 ph_filtered: Some(data_frame_1.ph_filtered),
418 conductivity: Some(data_frame_1.conductivity),
419 conductivity_filtered: Some(data_frame_1.conductivity_filtered),
420 conductivity_compensated: Some(data_frame_1.conductivity_compensated),
421 refill_in_progress: Some(data_frame_2.refill_in_progress),
422 tank_level_switch_position: Some(data_frame_2.tank_level_switch_position),
423 tank_level_switch_invalid: Some(data_frame_2.tank_level_switch_invalid),
424 tank_level_switch_position_stabilized: Some(
425 data_frame_2.tank_level_switch_position_stabilized,
426 ),
427 surface_ventilation_status: Some(data_frame_2.surface_ventilation_status),
428 ambient_temperature: Some(data_frame_2.ambient_temperature),
429 ambient_humidity: Some(data_frame_2.ambient_humidity),
430 heater_status: Some(data_frame_2.heater_status),
431 })
432 }
433
434 pub fn check_database_row_if_all_null(conn: &mut PooledConn, timestamp: NaiveDateTime) {
441 let timestamp_string = timestamp.format("%Y-%m-%d %H:%M:%S").to_string();
442 let sql_query_string = str::replace(
444 sql_query_strings::SQL_QUERY_READ_NULL_DATA_FRAME,
445 "#timestamp",
446 ×tamp_string,
447 );
448 let row: Row = conn
449 .query_first(sql_query_string)
450 .unwrap()
451 .expect("Querying single row of NULL values failed.");
452
453 for cell in row.columns_ref() {
454 let cell_value = &row[cell.name_str().as_ref()];
455 match cell_value {
456 _val @ Value::NULL => {
457 println!("Identified NULL value.");
458 }
459 _ => {
460 panic!("Identified non-=NULL value");
461 }
462 }
463 }
464 }
465
466 #[test]
467 pub fn test_sql_interface_data_write_data_frame() {
471 let config: ConfigData = read_config_file_with_test_database(
472 "/config/aquarium_control_test_generic.toml".to_string(),
473 32,
474 );
475 println!("Testing with database {}", config.sql_interface.db_name);
476 let max_rows_data = config.sql_interface.max_rows_data;
477 let mut sql_interface: SqlInterface = SqlInterface::new(config.sql_interface)
478 .expect("Initialization of SQL interface for test failed.");
479 let mut sql_interface_data =
480 SqlInterfaceData::new(sql_interface.get_connection().unwrap(), max_rows_data).unwrap();
481
482 match SqlInterface::truncate_table(&mut sql_interface, SQL_TABLE_DATA.to_string()) {
484 Ok(_) => {}
485 Err(e) => panic!("Could not prepare test case: {e:?}"),
486 }
487 let current_timestamp = match SqlInterface::get_current_timestamp(&mut sql_interface.conn) {
488 Ok(c) => c,
489 Err(e) => {
490 panic!("Could not get current timestamp from database: {e:?}");
491 }
492 };
493 let dataframe_reference_1 = RecorderDataFrame {
494 timestamp: current_timestamp,
495 water_temperature: Some(25.5),
496 water_temperature_filtered: Some(25.0),
497 ph: Some(7.0),
498 ph_filtered: Some(7.5),
499 conductivity: Some(50000.0),
500 conductivity_filtered: Some(51000.0),
501 conductivity_compensated: Some(52000.0),
502 refill_in_progress: Some(false),
503 tank_level_switch_position: Some(false),
504 tank_level_switch_invalid: Some(false),
505 tank_level_switch_position_stabilized: Some(false),
506 surface_ventilation_status: Some(true),
507 ambient_temperature: Some(22.0),
508 ambient_humidity: Some(60.0),
509 heater_status: Some(true),
510 };
511 match sql_interface_data.write_data_frame_to_database(&dataframe_reference_1) {
512 Ok(()) => {}
513 Err(e) => {
514 panic!("Could not write dataframe to database: {e:?}");
515 }
516 }
517 let dataframe_test = match get_single_dataframe_from_database(
518 &mut sql_interface_data.conn,
519 dataframe_reference_1.timestamp,
520 ) {
521 Ok(c) => c,
522 Err(e) => {
523 panic!("Could not retrieve data frame from database: {e:?}");
524 }
525 };
526 assert_eq!(dataframe_test, dataframe_reference_1);
527 println!("* checking inserting data frame into empty table succeeded");
528 let dataframe_reference_2 = RecorderDataFrame {
532 timestamp: dataframe_reference_1.timestamp,
533 water_temperature: Some(26.5),
534 water_temperature_filtered: Some(26.0),
535 ph: Some(8.0),
536 ph_filtered: Some(8.5),
537 conductivity: Some(50001.0),
538 conductivity_filtered: Some(51001.0),
539 conductivity_compensated: Some(52001.0),
540 refill_in_progress: Some(true),
541 tank_level_switch_position: Some(true),
542 tank_level_switch_invalid: Some(true),
543 tank_level_switch_position_stabilized: Some(true),
544 surface_ventilation_status: Some(false),
545 ambient_temperature: Some(23.0),
546 ambient_humidity: Some(61.0),
547 heater_status: Some(false),
548 };
549 match sql_interface_data.write_data_frame_to_database(&dataframe_reference_2) {
550 Ok(()) => {}
551 Err(e) => {
552 panic!("Could not write dataframe to database: {e:?}");
553 }
554 }
555 let dataframe_test = match get_single_dataframe_from_database(
556 &mut sql_interface_data.conn,
557 dataframe_reference_2.timestamp,
558 ) {
559 Ok(c) => c,
560 Err(e) => {
561 panic!("Could not retrieve data frame from database: {e:?}");
562 }
563 };
564 assert_eq!(dataframe_test, dataframe_reference_2);
565 println!("* checking inserting already existing data frame into empty table succeeded");
566 match SqlInterface::truncate_table(&mut sql_interface, SQL_TABLE_DATA.to_string()) {
570 Ok(_) => {}
571 Err(e) => panic!("Could not prepare test case: {e:?}"),
572 }
573 let current_timestamp = match SqlInterface::get_current_timestamp(&mut sql_interface.conn) {
574 Ok(c) => c,
575 Err(e) => {
576 panic!("Could not get current timestamp from database: {e:?}");
577 }
578 };
579 let dataframe_reference_3 = RecorderDataFrame {
580 timestamp: current_timestamp,
581 water_temperature: None,
582 water_temperature_filtered: None,
583 ph: None,
584 ph_filtered: None,
585 conductivity: None,
586 conductivity_filtered: None,
587 conductivity_compensated: None,
588 refill_in_progress: None,
589 tank_level_switch_position: None,
590 tank_level_switch_invalid: None,
591 tank_level_switch_position_stabilized: None,
592 surface_ventilation_status: None,
593 ambient_temperature: None,
594 ambient_humidity: None,
595 heater_status: None,
596 };
597 match sql_interface_data.write_data_frame_to_database(&dataframe_reference_3) {
598 Ok(()) => {}
599 Err(e) => {
600 panic!("Could not write dataframe to database: {e:?}");
601 }
602 }
603 check_database_row_if_all_null(
604 &mut sql_interface_data.conn,
605 dataframe_reference_3.timestamp,
606 );
607 println!("* checking inserting data frame with NULL values into empty table succeeded");
608
609 let spin_sleeper = SpinSleeper::default();
611 let sleep_duration_one_second = Duration::from_secs(1);
612 spin_sleeper.sleep(sleep_duration_one_second);
613
614 let mut dataframe_reference_4 = dataframe_reference_3.clone();
616 let current_timestamp = match SqlInterface::get_current_timestamp(&mut sql_interface.conn) {
617 Ok(c) => c,
618 Err(e) => {
619 panic!("Could not get current timestamp from database: {e:?}");
620 }
621 };
622 dataframe_reference_4.timestamp = current_timestamp;
623
624 match sql_interface_data.write_data_frame_to_database(&dataframe_reference_4) {
626 Ok(()) => {}
627 Err(e) => {
628 panic!("Could not write dataframe to database: {e:?}");
629 }
630 }
631
632 let test_result = SqlInterfaceData::new(sql_interface.get_connection().unwrap(), 1);
633 assert!(matches!(
634 test_result,
635 Err(SqlInterfaceError::DatabaseDataTableContainsTooManyRows(
636 _,
637 _,
638 _
639 ))
640 ));
641 }
643}