1use std::{
10 fmt,
11 ops::Deref,
12 sync::Arc,
13 time::{Duration, Instant},
14};
15
16use crate::{
17 conn::query_result::{Binary, Text},
18 prelude::*,
19 ChangeUserOpts, Conn, DriverError, LocalInfileHandler, Opts, Params, QueryResult, Result,
20 Statement, Transaction, TxOpts,
21};
22
23mod inner;
24
25#[derive(Clone)]
63pub struct Pool {
64 inner: Arc<inner::Inner>,
65}
66
67impl Pool {
68 fn _get_conn<T: AsRef<[u8]>>(
72 &self,
73 stmt: Option<T>,
74 timeout: Option<Duration>,
75 mut call_ping: bool,
76 ) -> Result<PooledConn> {
77 let times = timeout.map(|timeout| (Instant::now(), timeout));
78
79 let (protected, condvar) = self.inner.protected();
80
81 let conn = if !self.inner.opts().reset_connection() {
82 if let Some(ref query) = stmt {
84 protected.lock()?.take_by_query(query.as_ref())
85 } else {
86 None
87 }
88 } else {
89 None
90 };
91
92 let mut conn = if let Some(conn) = conn {
93 conn
94 } else {
95 let mut protected = protected.lock()?;
96 loop {
97 if let Some(conn) = protected.pop_front() {
98 drop(protected);
99 break conn;
100 } else if self.inner.is_full() {
101 protected = if let Some((start, timeout)) = times {
102 if start.elapsed() > timeout {
103 return Err(DriverError::Timeout.into());
104 }
105 condvar.wait_timeout(protected, timeout)?.0
106 } else {
107 condvar.wait(protected)?
108 }
109 } else {
110 protected.new_conn()?;
111 self.inner.increase();
112 call_ping = false;
114 }
115 }
116 };
117
118 if call_ping && self.inner.opts().check_health() && conn.ping().is_err() {
119 self.inner.decrease();
121 return self._get_conn(stmt, timeout, call_ping);
122 }
123
124 Ok(PooledConn {
125 pool: self.clone(),
126 conn: Some(conn),
127 })
128 }
129
130 pub fn new<T, E>(opts: T) -> Result<Pool>
132 where
133 Opts: TryFrom<T, Error = E>,
134 crate::Error: From<E>,
135 {
136 Ok(Pool {
137 inner: Arc::new(inner::Inner::new(Opts::try_from(opts)?)?),
138 })
139 }
140
141 pub fn get_conn(&self) -> Result<PooledConn> {
143 self._get_conn(None::<String>, None, true)
144 }
145
146 pub fn try_get_conn(&self, timeout: Duration) -> Result<PooledConn> {
152 self._get_conn(None::<String>, Some(timeout), true)
153 }
154
155 pub fn start_transaction(&self, tx_opts: TxOpts) -> Result<Transaction<'static>> {
157 let conn = self._get_conn(None::<String>, None, false)?;
158 let result = conn.pooled_start_transaction(tx_opts);
159 match result {
160 Ok(trans) => Ok(trans),
161 Err(ref e) if e.is_connectivity_error() => {
162 let conn = self._get_conn(None::<String>, None, true)?;
163 conn.pooled_start_transaction(tx_opts)
164 }
165 Err(e) => Err(e),
166 }
167 }
168}
169
170impl fmt::Debug for Pool {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172 write!(
173 f,
174 "Pool {{ constraints: {:?}, count: {} }}",
175 self.inner.opts().constraints(),
176 self.inner.count(),
177 )
178 }
179}
180
181#[derive(Debug)]
214pub struct PooledConn {
215 pool: Pool,
216 conn: Option<Conn>,
217}
218
219impl Deref for PooledConn {
220 type Target = Conn;
221
222 fn deref(&self) -> &Self::Target {
223 self.conn.as_ref().expect("deref after drop")
224 }
225}
226
227impl Drop for PooledConn {
228 fn drop(&mut self) {
229 if let Some(mut conn) = self.conn.take() {
230 match conn.cleanup_for_pool() {
231 Ok(_) => {
232 let (protected, condvar) = self.pool.inner.protected();
233 match protected.lock() {
234 Ok(mut protected) => {
235 protected.push_back(conn);
236 drop(protected);
237 condvar.notify_one();
238 }
239 Err(_) => {
240 self.pool.inner.decrease();
242 }
243 }
244 }
245 Err(_) => {
246 self.pool.inner.decrease();
248 }
249 }
250 }
251 }
252}
253
254impl PooledConn {
255 pub fn start_transaction(&mut self, tx_opts: TxOpts) -> Result<Transaction> {
258 self.conn.as_mut().unwrap().start_transaction(tx_opts)
259 }
260
261 #[cfg(feature = "binlog")]
263 #[cfg_attr(docsrs, doc(cfg(feature = "binlog")))]
264 pub fn get_binlog_stream(
265 mut self,
266 request: crate::BinlogRequest<'_>,
267 ) -> Result<crate::BinlogStream> {
268 self.conn.take().unwrap().get_binlog_stream(request)
269 }
270
271 pub fn unwrap(mut self) -> Conn {
273 self.conn.take().unwrap()
274 }
275
276 fn pooled_start_transaction(mut self, tx_opts: TxOpts) -> Result<Transaction<'static>> {
277 self.as_mut()._start_transaction(tx_opts)?;
278 Ok(Transaction::new(self.into()))
279 }
280
281 pub fn set_local_infile_handler(&mut self, handler: Option<LocalInfileHandler>) {
285 self.conn
286 .as_mut()
287 .unwrap()
288 .set_local_infile_handler(handler);
289 }
290
291 pub fn change_user(&mut self) -> Result<()> {
293 self.conn
294 .as_mut()
295 .unwrap()
296 .change_user(ChangeUserOpts::default())
297 }
298
299 pub fn reset_connection(&mut self, reset_connection: bool) {
303 if let Some(conn) = self.conn.as_mut() {
304 conn.0.reset_upon_return = reset_connection;
305 }
306 }
307}
308
309impl AsRef<Conn> for PooledConn {
310 fn as_ref(&self) -> &Conn {
311 self.conn.as_ref().unwrap()
312 }
313}
314
315impl AsMut<Conn> for PooledConn {
316 fn as_mut(&mut self) -> &mut Conn {
317 self.conn.as_mut().unwrap()
318 }
319}
320
321impl Queryable for PooledConn {
322 fn query_iter<T: AsRef<str>>(&mut self, query: T) -> Result<QueryResult<'_, '_, '_, Text>> {
323 self.conn.as_mut().unwrap().query_iter(query)
324 }
325
326 fn prep<T: AsRef<str>>(&mut self, query: T) -> Result<Statement> {
327 self.conn.as_mut().unwrap().prep(query)
328 }
329
330 fn close(&mut self, stmt: Statement) -> Result<()> {
331 self.conn.as_mut().unwrap().close(stmt)
332 }
333
334 fn exec_iter<S, P>(&mut self, stmt: S, params: P) -> Result<QueryResult<'_, '_, '_, Binary>>
335 where
336 S: AsStatement,
337 P: Into<Params>,
338 {
339 self.conn.as_mut().unwrap().exec_iter(stmt, params)
340 }
341}
342
343#[cfg(test)]
344#[allow(non_snake_case)]
345mod test {
346 mod pool {
347 use std::{thread, time::Duration};
348
349 use crate::{
350 from_value, prelude::*, test_misc::get_opts, DriverError, Error, OptsBuilder, Pool,
351 PoolConstraints, PoolOpts, TxOpts, Value,
352 };
353
354 #[test]
355 fn multiple_pools_should_work() {
356 let pool = Pool::new(get_opts()).unwrap();
357 pool.get_conn()
358 .unwrap()
359 .exec_drop("DROP DATABASE IF EXISTS A", ())
360 .unwrap();
361 pool.get_conn()
362 .unwrap()
363 .exec_drop("CREATE DATABASE A", ())
364 .unwrap();
365 pool.get_conn()
366 .unwrap()
367 .exec_drop("DROP TABLE IF EXISTS A.a", ())
368 .unwrap();
369 pool.get_conn()
370 .unwrap()
371 .exec_drop("CREATE TABLE IF NOT EXISTS A.a (id INT)", ())
372 .unwrap();
373 pool.get_conn()
374 .unwrap()
375 .exec_drop("INSERT INTO A.a VALUES (1)", ())
376 .unwrap();
377 let opts = OptsBuilder::from_opts(get_opts()).db_name(Some("A"));
378 let pool2 = Pool::new(opts).unwrap();
379 let count: u8 = pool2
380 .get_conn()
381 .unwrap()
382 .exec_first("SELECT COUNT(*) FROM a", ())
383 .unwrap()
384 .unwrap();
385 assert_eq!(1, count);
386 pool.get_conn()
387 .unwrap()
388 .exec_drop("DROP DATABASE A", ())
389 .unwrap();
390 }
391
392 struct A {
393 pool: Pool,
394 x: u32,
395 }
396
397 impl A {
398 fn add(&mut self) {
399 self.x += 1;
400 }
401 }
402
403 #[test]
404 fn should_fix_connectivity_errors_on_prepare() {
405 let pool = Pool::new(get_opts().pool_opts(
406 PoolOpts::default().with_constraints(PoolConstraints::new_const::<2, 2>()),
407 ))
408 .unwrap();
409 let mut conn = pool.get_conn().unwrap();
410
411 let id: u32 = pool
412 .get_conn()
413 .unwrap()
414 .exec_first("SELECT CONNECTION_ID();", ())
415 .unwrap()
416 .unwrap();
417
418 conn.query_drop(&*format!("KILL {}", id)).unwrap();
419 thread::sleep(Duration::from_millis(250));
420 pool.get_conn()
421 .unwrap()
422 .prep("SHOW FULL PROCESSLIST")
423 .unwrap();
424 }
425
426 #[test]
427 fn should_fix_connectivity_errors_on_prep_exec() {
428 let pool = Pool::new(get_opts().pool_opts(
429 PoolOpts::default().with_constraints(PoolConstraints::new_const::<2, 2>()),
430 ))
431 .unwrap();
432 let mut conn = pool.get_conn().unwrap();
433
434 let id: u32 = pool
435 .get_conn()
436 .unwrap()
437 .exec_first("SELECT CONNECTION_ID();", ())
438 .unwrap()
439 .unwrap();
440
441 conn.query_drop(&*format!("KILL {}", id)).unwrap();
442 thread::sleep(Duration::from_millis(250));
443 pool.get_conn()
444 .unwrap()
445 .exec_drop("SHOW FULL PROCESSLIST", ())
446 .unwrap();
447 }
448 #[test]
449 fn should_fix_connectivity_errors_on_start_transaction() {
450 let pool = Pool::new(get_opts().pool_opts(
451 PoolOpts::default().with_constraints(PoolConstraints::new_const::<2, 2>()),
452 ))
453 .unwrap();
454 let mut conn = pool.get_conn().unwrap();
455
456 let id: u32 = pool
457 .get_conn()
458 .unwrap()
459 .exec_first("SELECT CONNECTION_ID();", ())
460 .unwrap()
461 .unwrap();
462
463 conn.query_drop(&*format!("KILL {}", id)).unwrap();
464 thread::sleep(Duration::from_millis(250));
465 pool.start_transaction(TxOpts::default()).unwrap();
466 }
467 #[test]
468 fn should_execute_queries_on_PooledConn() {
469 let pool = Pool::new(get_opts()).unwrap();
470 let mut threads = Vec::new();
471 for _ in 0usize..10 {
472 let pool = pool.clone();
473 threads.push(thread::spawn(move || {
474 let conn = pool.get_conn();
475 assert!(conn.is_ok());
476 let mut conn = conn.unwrap();
477 conn.query_drop("SELECT 1").unwrap();
478 }));
479 }
480 for t in threads.into_iter() {
481 assert!(t.join().is_ok());
482 }
483 }
484 #[test]
485 fn should_timeout_if_no_connections_available() {
486 let pool = Pool::new(get_opts().pool_opts(
487 PoolOpts::default().with_constraints(PoolConstraints::new_const::<0, 1>()),
488 ))
489 .unwrap();
490 let conn1 = pool.try_get_conn(Duration::from_millis(357)).unwrap();
491 let conn2 = pool.try_get_conn(Duration::from_millis(357));
492 assert!(conn2.is_err());
493 match conn2 {
494 Err(Error::DriverError(DriverError::Timeout)) => (),
495 _ => panic!("Timeout error expected"),
496 }
497 drop(conn1);
498 assert!(pool.try_get_conn(Duration::from_millis(357)).is_ok());
499 }
500
501 #[test]
502 fn should_execute_statements_on_PooledConn() {
503 let pool = Pool::new(get_opts()).unwrap();
504 let mut threads = Vec::new();
505 for _ in 0usize..10 {
506 let pool = pool.clone();
507 threads.push(thread::spawn(move || {
508 let mut conn = pool.get_conn().unwrap();
509 let stmt = conn.prep("SELECT 1").unwrap();
510 conn.exec_drop(&stmt, ()).unwrap();
511 }));
512 }
513 for t in threads.into_iter() {
514 assert!(t.join().is_ok());
515 }
516
517 let pool = Pool::new(get_opts()).unwrap();
518 let mut threads = Vec::new();
519 for _ in 0usize..10 {
520 let pool = pool.clone();
521 threads.push(thread::spawn(move || {
522 let mut conn = pool.get_conn().unwrap();
523 conn.exec_drop("SELECT ?", (1,)).unwrap();
524 }));
525 }
526 for t in threads.into_iter() {
527 assert!(t.join().is_ok());
528 }
529 }
530
531 #[test]
532 #[allow(unused_variables)]
533 fn should_start_transaction_on_Pool() {
534 let pool = Pool::new(
535 get_opts().pool_opts(
536 PoolOpts::default()
537 .with_constraints(PoolConstraints::new_const::<1, 10>())
538 .with_reset_connection(false),
539 ),
540 )
541 .unwrap();
542 pool.get_conn()
543 .unwrap()
544 .query_drop("CREATE TEMPORARY TABLE mysql.tbl(a INT)")
545 .unwrap();
546 pool.start_transaction(TxOpts::default())
547 .and_then(|mut t| {
548 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
549 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
550 t.commit()
551 })
552 .unwrap();
553 assert_eq!(
554 pool.get_conn()
555 .unwrap()
556 .query_first::<u8, _>("SELECT COUNT(a) FROM mysql.tbl")
557 .unwrap()
558 .unwrap(),
559 2_u8
560 );
561 pool.start_transaction(TxOpts::default())
562 .and_then(|mut t| {
563 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
564 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
565 t.rollback()
566 })
567 .unwrap();
568 assert_eq!(
569 pool.get_conn()
570 .unwrap()
571 .query_first::<u8, _>("SELECT COUNT(a) FROM mysql.tbl")
572 .unwrap()
573 .unwrap(),
574 2_u8
575 );
576 pool.start_transaction(TxOpts::default())
577 .map(|mut t| {
578 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
579 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
580 })
581 .unwrap();
582 assert_eq!(
583 pool.get_conn()
584 .unwrap()
585 .query_first::<u8, _>("SELECT COUNT(a) FROM mysql.tbl")
586 .unwrap()
587 .unwrap(),
588 2_u8
589 );
590 let mut a = A { pool, x: 0 };
591 let transaction = a.pool.start_transaction(TxOpts::default()).unwrap();
592 a.add();
593 }
594
595 #[test]
596 fn should_reuse_connections() -> crate::Result<()> {
597 let pool = Pool::new(get_opts().pool_opts(
598 PoolOpts::default().with_constraints(PoolConstraints::new_const::<1, 1>()),
599 ))?;
600 let mut conn = pool.get_conn()?;
601
602 let server_version = conn.server_version();
603 let connection_id = conn.connection_id();
604
605 for _ in 0..16 {
606 drop(conn);
607 conn = pool.get_conn()?;
608 println!("CONN connection_id={}", conn.connection_id());
609 assert!(conn.connection_id() == connection_id || server_version < (5, 7, 2));
610 }
611
612 Ok(())
613 }
614
615 #[test]
616 fn should_start_transaction_on_PooledConn() {
617 let pool = Pool::new(get_opts()).unwrap();
618 let mut conn = pool.get_conn().unwrap();
619 conn.query_drop("CREATE TEMPORARY TABLE mysql.tbl(a INT)")
620 .unwrap();
621 conn.start_transaction(TxOpts::default())
622 .and_then(|mut t| {
623 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
624 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
625 t.commit()
626 })
627 .unwrap();
628 for x in conn.query_iter("SELECT COUNT(a) FROM mysql.tbl").unwrap() {
629 let mut x = x.unwrap();
630 assert_eq!(from_value::<u8>(x.take(0).unwrap()), 2u8);
631 }
632 conn.start_transaction(TxOpts::default())
633 .and_then(|mut t| {
634 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
635 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
636 t.rollback()
637 })
638 .unwrap();
639 for x in conn.query_iter("SELECT COUNT(a) FROM mysql.tbl").unwrap() {
640 let mut x = x.unwrap();
641 assert_eq!(from_value::<u8>(x.take(0).unwrap()), 2u8);
642 }
643 conn.start_transaction(TxOpts::default())
644 .map(|mut t| {
645 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
646 t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
647 })
648 .unwrap();
649 for x in conn.query_iter("SELECT COUNT(a) FROM mysql.tbl").unwrap() {
650 let mut x = x.unwrap();
651 assert_eq!(from_value::<u8>(x.take(0).unwrap()), 2u8);
652 }
653 }
654
655 #[test]
656 fn should_opt_out_of_connection_reset() {
657 let pool_opts = PoolOpts::new().with_constraints(PoolConstraints::new_const::<1, 1>());
658 let opts = get_opts().pool_opts(pool_opts.clone());
659
660 let pool = Pool::new(opts.clone()).unwrap();
661
662 let mut conn = pool.get_conn().unwrap();
663 assert_eq!(
664 conn.query_first::<Value, _>("SELECT @foo").unwrap(),
665 Some(Value::NULL)
666 );
667 conn.query_drop("SET @foo = 'foo'").unwrap();
668 assert_eq!(
669 conn.query_first::<String, _>("SELECT @foo")
670 .unwrap()
671 .unwrap(),
672 "foo",
673 );
674 drop(conn);
675
676 conn = pool.get_conn().unwrap();
677 assert_eq!(
678 conn.query_first::<Value, _>("SELECT @foo").unwrap(),
679 Some(Value::NULL)
680 );
681 conn.query_drop("SET @foo = 'foo'").unwrap();
682 conn.reset_connection(false);
683 drop(conn);
684
685 conn = pool.get_conn().unwrap();
686 assert_eq!(
687 conn.query_first::<String, _>("SELECT @foo")
688 .unwrap()
689 .unwrap(),
690 "foo",
691 );
692 drop(conn);
693
694 let pool = Pool::new(opts.pool_opts(pool_opts.with_reset_connection(false))).unwrap();
695 conn = pool.get_conn().unwrap();
696 conn.query_drop("SET @foo = 'foo'").unwrap();
697 drop(conn);
698 conn = pool.get_conn().unwrap();
699 assert_eq!(
700 conn.query_first::<String, _>("SELECT @foo")
701 .unwrap()
702 .unwrap(),
703 "foo",
704 );
705 drop(conn);
706 }
707
708 #[cfg(feature = "nightly")]
709 mod bench {
710 use test;
711
712 use std::thread;
713
714 use crate::{prelude::*, test_misc::get_opts, Pool};
715
716 #[bench]
717 fn many_prepexecs(bencher: &mut test::Bencher) {
718 let pool = Pool::new(get_opts()).unwrap();
719 bencher.iter(|| {
720 "SELECT 1".with(()).run(&pool).unwrap();
721 });
722 }
723
724 #[bench]
725 fn many_prepares_threaded(bencher: &mut test::Bencher) {
726 let pool = Pool::new(get_opts()).unwrap();
727 bencher.iter(|| {
728 let mut threads = Vec::new();
729 for _ in 0..4 {
730 let pool = pool.clone();
731 threads.push(thread::spawn(move || {
732 for _ in 0..250 {
733 test::black_box(
734 "SELECT 1, 'hello world', 123.321, ?, ?, ?"
735 .with(("hello", "world", 65536))
736 .run(&pool)
737 .unwrap(),
738 );
739 }
740 }));
741 }
742 for t in threads {
743 t.join().unwrap();
744 }
745 });
746 }
747
748 #[bench]
749 fn many_prepares_threaded_no_cache(bencher: &mut test::Bencher) {
750 let mut pool = Pool::new(get_opts()).unwrap();
751 pool.use_cache(false);
752 bencher.iter(|| {
753 let mut threads = Vec::new();
754 for _ in 0..4 {
755 let pool = pool.clone();
756 threads.push(thread::spawn(move || {
757 for _ in 0..250 {
758 test::black_box(
759 "SELECT 1, 'hello world', 123.321, ?, ?, ?"
760 .with(("hello", "world", 65536))
761 .run(&pool)
762 .unwrap(),
763 );
764 }
765 }));
766 }
767 for t in threads {
768 t.join().unwrap();
769 }
770 });
771 }
772 }
773 }
774}