mysql/conn/pool/
mod.rs

1// Copyright (c) 2020 rust-mysql-simple contributors
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use std::{
10    fmt,
11    ops::Deref,
12    sync::Arc,
13    time::{Duration, Instant},
14};
15
16use crate::{
17    conn::query_result::{Binary, Text},
18    prelude::*,
19    ChangeUserOpts, Conn, DriverError, LocalInfileHandler, Opts, Params, QueryResult, Result,
20    Statement, Transaction, TxOpts,
21};
22
23mod inner;
24
25/// Thread-safe cloneable smart pointer to a connection pool.
26///
27/// However you can prepare statements directly on `Pool` without
28/// invoking [`Pool::get_conn`](struct.Pool.html#method.get_conn).
29///
30/// `Pool` will hold at least `min` connections and will create as many as `max`
31/// connections with possible overhead of one connection per alive thread.
32///
33/// Example of multithreaded `Pool` usage:
34///
35/// ```rust
36/// # mysql::doctest_wrapper!(__result, {
37/// # use mysql::*;
38/// # use mysql::prelude::*;
39/// # let mut conn = Conn::new(get_opts())?;
40/// # let pool_opts = PoolOpts::new().with_constraints(PoolConstraints::new_const::<5, 10>());
41/// # let opts = get_opts().pool_opts(pool_opts);
42/// let pool = Pool::new(opts).unwrap();
43/// let mut threads = Vec::new();
44///
45/// for _ in 0..1000 {
46///     let pool = pool.clone();
47///     threads.push(std::thread::spawn(move || {
48///         let mut conn = pool.get_conn().unwrap();
49///         let result: u8 = conn.query_first("SELECT 1").unwrap().unwrap();
50///         assert_eq!(result, 1_u8);
51///     }));
52/// }
53///
54/// for t in threads.into_iter() {
55///     assert!(t.join().is_ok());
56/// }
57/// # });
58/// ```
59///
60/// For more info on how to work with mysql connection please look at
61/// [`PooledConn`](struct.PooledConn.html) documentation.
62#[derive(Clone)]
63pub struct Pool {
64    inner: Arc<inner::Inner>,
65}
66
67impl Pool {
68    /// Will return connection taken from a pool.
69    ///
70    /// Will wait til timeout if `timeout_ms` is `Some(_)`
71    fn _get_conn<T: AsRef<[u8]>>(
72        &self,
73        stmt: Option<T>,
74        timeout: Option<Duration>,
75        mut call_ping: bool,
76    ) -> Result<PooledConn> {
77        let times = timeout.map(|timeout| (Instant::now(), timeout));
78
79        let (protected, condvar) = self.inner.protected();
80
81        let conn = if !self.inner.opts().reset_connection() {
82            // stmt cache considered enabled if reset_connection is false
83            if let Some(ref query) = stmt {
84                protected.lock()?.take_by_query(query.as_ref())
85            } else {
86                None
87            }
88        } else {
89            None
90        };
91
92        let mut conn = if let Some(conn) = conn {
93            conn
94        } else {
95            let mut protected = protected.lock()?;
96            loop {
97                if let Some(conn) = protected.pop_front() {
98                    drop(protected);
99                    break conn;
100                } else if self.inner.is_full() {
101                    protected = if let Some((start, timeout)) = times {
102                        if start.elapsed() > timeout {
103                            return Err(DriverError::Timeout.into());
104                        }
105                        condvar.wait_timeout(protected, timeout)?.0
106                    } else {
107                        condvar.wait(protected)?
108                    }
109                } else {
110                    protected.new_conn()?;
111                    self.inner.increase();
112                    // we do not have to call ping for a fresh connection
113                    call_ping = false;
114                }
115            }
116        };
117
118        if call_ping && self.inner.opts().check_health() && conn.ping().is_err() {
119            // existing connection seem to be dead, retrying..
120            self.inner.decrease();
121            return self._get_conn(stmt, timeout, call_ping);
122        }
123
124        Ok(PooledConn {
125            pool: self.clone(),
126            conn: Some(conn),
127        })
128    }
129
130    /// Creates new pool with the given options (see [`Opts`]).
131    pub fn new<T, E>(opts: T) -> Result<Pool>
132    where
133        Opts: TryFrom<T, Error = E>,
134        crate::Error: From<E>,
135    {
136        Ok(Pool {
137            inner: Arc::new(inner::Inner::new(Opts::try_from(opts)?)?),
138        })
139    }
140
141    /// Gives you a [`PooledConn`](struct.PooledConn.html).
142    pub fn get_conn(&self) -> Result<PooledConn> {
143        self._get_conn(None::<String>, None, true)
144    }
145
146    /// Will try to get connection for the duration of `timeout`.
147    ///
148    /// # Failure
149    /// This function will return `Error::DriverError(DriverError::Timeout)` if timeout was
150    /// reached while waiting for new connection to become available.
151    pub fn try_get_conn(&self, timeout: Duration) -> Result<PooledConn> {
152        self._get_conn(None::<String>, Some(timeout), true)
153    }
154
155    /// Shortcut for `pool.get_conn()?.start_transaction(..)`.
156    pub fn start_transaction(&self, tx_opts: TxOpts) -> Result<Transaction<'static>> {
157        let conn = self._get_conn(None::<String>, None, false)?;
158        let result = conn.pooled_start_transaction(tx_opts);
159        match result {
160            Ok(trans) => Ok(trans),
161            Err(ref e) if e.is_connectivity_error() => {
162                let conn = self._get_conn(None::<String>, None, true)?;
163                conn.pooled_start_transaction(tx_opts)
164            }
165            Err(e) => Err(e),
166        }
167    }
168}
169
170impl fmt::Debug for Pool {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172        write!(
173            f,
174            "Pool {{ constraints: {:?}, count: {} }}",
175            self.inner.opts().constraints(),
176            self.inner.count(),
177        )
178    }
179}
180
181/// Pooled mysql connection.
182///
183/// You should prefer using `prep` along `exec` instead of `query` from the Queryable trait where
184/// possible, except cases when statement has no params and when it has no return values or return
185/// values which evaluates to `Value::Bytes`.
186///
187/// `query` is a part of mysql text protocol, so under the hood you will always receive
188/// `Value::Bytes` as a result and `from_value` will need to parse it if you want, for example, `i64`
189///
190/// ```rust
191/// # mysql::doctest_wrapper!(__result, {
192/// # use mysql::*;
193/// # use mysql::prelude::*;
194/// # let mut conn = Conn::new(get_opts())?;
195/// let pool = Pool::new(get_opts()).unwrap();
196/// let mut conn = pool.get_conn().unwrap();
197///
198/// conn.query_first("SELECT 42").map(|result: Option<Value>| {
199///     let result = result.unwrap();
200///     assert_eq!(result, Value::Bytes(b"42".to_vec()));
201///     assert_eq!(from_value::<i64>(result), 42i64);
202/// }).unwrap();
203/// conn.exec_iter("SELECT 42", ()).map(|mut result| {
204///     let cell = result.next().unwrap().unwrap().take(0).unwrap();
205///     assert_eq!(cell, Value::Int(42i64));
206///     assert_eq!(from_value::<i64>(cell), 42i64);
207/// }).unwrap();
208/// # });
209/// ```
210///
211/// For more info on how to work with query results please look at
212/// [`QueryResult`](../struct.QueryResult.html) documentation.
213#[derive(Debug)]
214pub struct PooledConn {
215    pool: Pool,
216    conn: Option<Conn>,
217}
218
219impl Deref for PooledConn {
220    type Target = Conn;
221
222    fn deref(&self) -> &Self::Target {
223        self.conn.as_ref().expect("deref after drop")
224    }
225}
226
227impl Drop for PooledConn {
228    fn drop(&mut self) {
229        if let Some(mut conn) = self.conn.take() {
230            match conn.cleanup_for_pool() {
231                Ok(_) => {
232                    let (protected, condvar) = self.pool.inner.protected();
233                    match protected.lock() {
234                        Ok(mut protected) => {
235                            protected.push_back(conn);
236                            drop(protected);
237                            condvar.notify_one();
238                        }
239                        Err(_) => {
240                            // everything is broken
241                            self.pool.inner.decrease();
242                        }
243                    }
244                }
245                Err(_) => {
246                    // the connection is broken
247                    self.pool.inner.decrease();
248                }
249            }
250        }
251    }
252}
253
254impl PooledConn {
255    /// Redirects to
256    /// [`Conn#start_transaction`](struct.Conn.html#method.start_transaction)
257    pub fn start_transaction(&mut self, tx_opts: TxOpts) -> Result<Transaction> {
258        self.conn.as_mut().unwrap().start_transaction(tx_opts)
259    }
260
261    /// Turns this connection into a binlog stream (see [`Conn::get_binlog_stream`]).
262    #[cfg(feature = "binlog")]
263    #[cfg_attr(docsrs, doc(cfg(feature = "binlog")))]
264    pub fn get_binlog_stream(
265        mut self,
266        request: crate::BinlogRequest<'_>,
267    ) -> Result<crate::BinlogStream> {
268        self.conn.take().unwrap().get_binlog_stream(request)
269    }
270
271    /// Unwraps wrapped [`Conn`](struct.Conn.html).
272    pub fn unwrap(mut self) -> Conn {
273        self.conn.take().unwrap()
274    }
275
276    fn pooled_start_transaction(mut self, tx_opts: TxOpts) -> Result<Transaction<'static>> {
277        self.as_mut()._start_transaction(tx_opts)?;
278        Ok(Transaction::new(self.into()))
279    }
280
281    /// A way to override default local infile handler for this pooled connection. Destructor will
282    /// restore original handler before returning connection to a pool.
283    /// See [`Conn::set_local_infile_handler`](struct.Conn.html#method.set_local_infile_handler).
284    pub fn set_local_infile_handler(&mut self, handler: Option<LocalInfileHandler>) {
285        self.conn
286            .as_mut()
287            .unwrap()
288            .set_local_infile_handler(handler);
289    }
290
291    /// Invokes `COM_CHANGE_USER` (see [`Conn::change_user`] docs).
292    pub fn change_user(&mut self) -> Result<()> {
293        self.conn
294            .as_mut()
295            .unwrap()
296            .change_user(ChangeUserOpts::default())
297    }
298
299    /// Turns on/off automatic connection reset upon return to a pool (see [`Opts::get_pool_opts`]).
300    ///
301    /// Initial value is taken from [`crate::PoolOpts::reset_connection`].
302    pub fn reset_connection(&mut self, reset_connection: bool) {
303        if let Some(conn) = self.conn.as_mut() {
304            conn.0.reset_upon_return = reset_connection;
305        }
306    }
307}
308
309impl AsRef<Conn> for PooledConn {
310    fn as_ref(&self) -> &Conn {
311        self.conn.as_ref().unwrap()
312    }
313}
314
315impl AsMut<Conn> for PooledConn {
316    fn as_mut(&mut self) -> &mut Conn {
317        self.conn.as_mut().unwrap()
318    }
319}
320
321impl Queryable for PooledConn {
322    fn query_iter<T: AsRef<str>>(&mut self, query: T) -> Result<QueryResult<'_, '_, '_, Text>> {
323        self.conn.as_mut().unwrap().query_iter(query)
324    }
325
326    fn prep<T: AsRef<str>>(&mut self, query: T) -> Result<Statement> {
327        self.conn.as_mut().unwrap().prep(query)
328    }
329
330    fn close(&mut self, stmt: Statement) -> Result<()> {
331        self.conn.as_mut().unwrap().close(stmt)
332    }
333
334    fn exec_iter<S, P>(&mut self, stmt: S, params: P) -> Result<QueryResult<'_, '_, '_, Binary>>
335    where
336        S: AsStatement,
337        P: Into<Params>,
338    {
339        self.conn.as_mut().unwrap().exec_iter(stmt, params)
340    }
341}
342
343#[cfg(test)]
344#[allow(non_snake_case)]
345mod test {
346    mod pool {
347        use std::{thread, time::Duration};
348
349        use crate::{
350            from_value, prelude::*, test_misc::get_opts, DriverError, Error, OptsBuilder, Pool,
351            PoolConstraints, PoolOpts, TxOpts, Value,
352        };
353
354        #[test]
355        fn multiple_pools_should_work() {
356            let pool = Pool::new(get_opts()).unwrap();
357            pool.get_conn()
358                .unwrap()
359                .exec_drop("DROP DATABASE IF EXISTS A", ())
360                .unwrap();
361            pool.get_conn()
362                .unwrap()
363                .exec_drop("CREATE DATABASE A", ())
364                .unwrap();
365            pool.get_conn()
366                .unwrap()
367                .exec_drop("DROP TABLE IF EXISTS A.a", ())
368                .unwrap();
369            pool.get_conn()
370                .unwrap()
371                .exec_drop("CREATE TABLE IF NOT EXISTS A.a (id INT)", ())
372                .unwrap();
373            pool.get_conn()
374                .unwrap()
375                .exec_drop("INSERT INTO A.a VALUES (1)", ())
376                .unwrap();
377            let opts = OptsBuilder::from_opts(get_opts()).db_name(Some("A"));
378            let pool2 = Pool::new(opts).unwrap();
379            let count: u8 = pool2
380                .get_conn()
381                .unwrap()
382                .exec_first("SELECT COUNT(*) FROM a", ())
383                .unwrap()
384                .unwrap();
385            assert_eq!(1, count);
386            pool.get_conn()
387                .unwrap()
388                .exec_drop("DROP DATABASE A", ())
389                .unwrap();
390        }
391
392        struct A {
393            pool: Pool,
394            x: u32,
395        }
396
397        impl A {
398            fn add(&mut self) {
399                self.x += 1;
400            }
401        }
402
403        #[test]
404        fn should_fix_connectivity_errors_on_prepare() {
405            let pool = Pool::new(get_opts().pool_opts(
406                PoolOpts::default().with_constraints(PoolConstraints::new_const::<2, 2>()),
407            ))
408            .unwrap();
409            let mut conn = pool.get_conn().unwrap();
410
411            let id: u32 = pool
412                .get_conn()
413                .unwrap()
414                .exec_first("SELECT CONNECTION_ID();", ())
415                .unwrap()
416                .unwrap();
417
418            conn.query_drop(&*format!("KILL {}", id)).unwrap();
419            thread::sleep(Duration::from_millis(250));
420            pool.get_conn()
421                .unwrap()
422                .prep("SHOW FULL PROCESSLIST")
423                .unwrap();
424        }
425
426        #[test]
427        fn should_fix_connectivity_errors_on_prep_exec() {
428            let pool = Pool::new(get_opts().pool_opts(
429                PoolOpts::default().with_constraints(PoolConstraints::new_const::<2, 2>()),
430            ))
431            .unwrap();
432            let mut conn = pool.get_conn().unwrap();
433
434            let id: u32 = pool
435                .get_conn()
436                .unwrap()
437                .exec_first("SELECT CONNECTION_ID();", ())
438                .unwrap()
439                .unwrap();
440
441            conn.query_drop(&*format!("KILL {}", id)).unwrap();
442            thread::sleep(Duration::from_millis(250));
443            pool.get_conn()
444                .unwrap()
445                .exec_drop("SHOW FULL PROCESSLIST", ())
446                .unwrap();
447        }
448        #[test]
449        fn should_fix_connectivity_errors_on_start_transaction() {
450            let pool = Pool::new(get_opts().pool_opts(
451                PoolOpts::default().with_constraints(PoolConstraints::new_const::<2, 2>()),
452            ))
453            .unwrap();
454            let mut conn = pool.get_conn().unwrap();
455
456            let id: u32 = pool
457                .get_conn()
458                .unwrap()
459                .exec_first("SELECT CONNECTION_ID();", ())
460                .unwrap()
461                .unwrap();
462
463            conn.query_drop(&*format!("KILL {}", id)).unwrap();
464            thread::sleep(Duration::from_millis(250));
465            pool.start_transaction(TxOpts::default()).unwrap();
466        }
467        #[test]
468        fn should_execute_queries_on_PooledConn() {
469            let pool = Pool::new(get_opts()).unwrap();
470            let mut threads = Vec::new();
471            for _ in 0usize..10 {
472                let pool = pool.clone();
473                threads.push(thread::spawn(move || {
474                    let conn = pool.get_conn();
475                    assert!(conn.is_ok());
476                    let mut conn = conn.unwrap();
477                    conn.query_drop("SELECT 1").unwrap();
478                }));
479            }
480            for t in threads.into_iter() {
481                assert!(t.join().is_ok());
482            }
483        }
484        #[test]
485        fn should_timeout_if_no_connections_available() {
486            let pool = Pool::new(get_opts().pool_opts(
487                PoolOpts::default().with_constraints(PoolConstraints::new_const::<0, 1>()),
488            ))
489            .unwrap();
490            let conn1 = pool.try_get_conn(Duration::from_millis(357)).unwrap();
491            let conn2 = pool.try_get_conn(Duration::from_millis(357));
492            assert!(conn2.is_err());
493            match conn2 {
494                Err(Error::DriverError(DriverError::Timeout)) => (),
495                _ => panic!("Timeout error expected"),
496            }
497            drop(conn1);
498            assert!(pool.try_get_conn(Duration::from_millis(357)).is_ok());
499        }
500
501        #[test]
502        fn should_execute_statements_on_PooledConn() {
503            let pool = Pool::new(get_opts()).unwrap();
504            let mut threads = Vec::new();
505            for _ in 0usize..10 {
506                let pool = pool.clone();
507                threads.push(thread::spawn(move || {
508                    let mut conn = pool.get_conn().unwrap();
509                    let stmt = conn.prep("SELECT 1").unwrap();
510                    conn.exec_drop(&stmt, ()).unwrap();
511                }));
512            }
513            for t in threads.into_iter() {
514                assert!(t.join().is_ok());
515            }
516
517            let pool = Pool::new(get_opts()).unwrap();
518            let mut threads = Vec::new();
519            for _ in 0usize..10 {
520                let pool = pool.clone();
521                threads.push(thread::spawn(move || {
522                    let mut conn = pool.get_conn().unwrap();
523                    conn.exec_drop("SELECT ?", (1,)).unwrap();
524                }));
525            }
526            for t in threads.into_iter() {
527                assert!(t.join().is_ok());
528            }
529        }
530
531        #[test]
532        #[allow(unused_variables)]
533        fn should_start_transaction_on_Pool() {
534            let pool = Pool::new(
535                get_opts().pool_opts(
536                    PoolOpts::default()
537                        .with_constraints(PoolConstraints::new_const::<1, 10>())
538                        .with_reset_connection(false),
539                ),
540            )
541            .unwrap();
542            pool.get_conn()
543                .unwrap()
544                .query_drop("CREATE TEMPORARY TABLE mysql.tbl(a INT)")
545                .unwrap();
546            pool.start_transaction(TxOpts::default())
547                .and_then(|mut t| {
548                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
549                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
550                    t.commit()
551                })
552                .unwrap();
553            assert_eq!(
554                pool.get_conn()
555                    .unwrap()
556                    .query_first::<u8, _>("SELECT COUNT(a) FROM mysql.tbl")
557                    .unwrap()
558                    .unwrap(),
559                2_u8
560            );
561            pool.start_transaction(TxOpts::default())
562                .and_then(|mut t| {
563                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
564                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
565                    t.rollback()
566                })
567                .unwrap();
568            assert_eq!(
569                pool.get_conn()
570                    .unwrap()
571                    .query_first::<u8, _>("SELECT COUNT(a) FROM mysql.tbl")
572                    .unwrap()
573                    .unwrap(),
574                2_u8
575            );
576            pool.start_transaction(TxOpts::default())
577                .map(|mut t| {
578                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
579                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
580                })
581                .unwrap();
582            assert_eq!(
583                pool.get_conn()
584                    .unwrap()
585                    .query_first::<u8, _>("SELECT COUNT(a) FROM mysql.tbl")
586                    .unwrap()
587                    .unwrap(),
588                2_u8
589            );
590            let mut a = A { pool, x: 0 };
591            let transaction = a.pool.start_transaction(TxOpts::default()).unwrap();
592            a.add();
593        }
594
595        #[test]
596        fn should_reuse_connections() -> crate::Result<()> {
597            let pool = Pool::new(get_opts().pool_opts(
598                PoolOpts::default().with_constraints(PoolConstraints::new_const::<1, 1>()),
599            ))?;
600            let mut conn = pool.get_conn()?;
601
602            let server_version = conn.server_version();
603            let connection_id = conn.connection_id();
604
605            for _ in 0..16 {
606                drop(conn);
607                conn = pool.get_conn()?;
608                println!("CONN connection_id={}", conn.connection_id());
609                assert!(conn.connection_id() == connection_id || server_version < (5, 7, 2));
610            }
611
612            Ok(())
613        }
614
615        #[test]
616        fn should_start_transaction_on_PooledConn() {
617            let pool = Pool::new(get_opts()).unwrap();
618            let mut conn = pool.get_conn().unwrap();
619            conn.query_drop("CREATE TEMPORARY TABLE mysql.tbl(a INT)")
620                .unwrap();
621            conn.start_transaction(TxOpts::default())
622                .and_then(|mut t| {
623                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
624                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
625                    t.commit()
626                })
627                .unwrap();
628            for x in conn.query_iter("SELECT COUNT(a) FROM mysql.tbl").unwrap() {
629                let mut x = x.unwrap();
630                assert_eq!(from_value::<u8>(x.take(0).unwrap()), 2u8);
631            }
632            conn.start_transaction(TxOpts::default())
633                .and_then(|mut t| {
634                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
635                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
636                    t.rollback()
637                })
638                .unwrap();
639            for x in conn.query_iter("SELECT COUNT(a) FROM mysql.tbl").unwrap() {
640                let mut x = x.unwrap();
641                assert_eq!(from_value::<u8>(x.take(0).unwrap()), 2u8);
642            }
643            conn.start_transaction(TxOpts::default())
644                .map(|mut t| {
645                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
646                    t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
647                })
648                .unwrap();
649            for x in conn.query_iter("SELECT COUNT(a) FROM mysql.tbl").unwrap() {
650                let mut x = x.unwrap();
651                assert_eq!(from_value::<u8>(x.take(0).unwrap()), 2u8);
652            }
653        }
654
655        #[test]
656        fn should_opt_out_of_connection_reset() {
657            let pool_opts = PoolOpts::new().with_constraints(PoolConstraints::new_const::<1, 1>());
658            let opts = get_opts().pool_opts(pool_opts.clone());
659
660            let pool = Pool::new(opts.clone()).unwrap();
661
662            let mut conn = pool.get_conn().unwrap();
663            assert_eq!(
664                conn.query_first::<Value, _>("SELECT @foo").unwrap(),
665                Some(Value::NULL)
666            );
667            conn.query_drop("SET @foo = 'foo'").unwrap();
668            assert_eq!(
669                conn.query_first::<String, _>("SELECT @foo")
670                    .unwrap()
671                    .unwrap(),
672                "foo",
673            );
674            drop(conn);
675
676            conn = pool.get_conn().unwrap();
677            assert_eq!(
678                conn.query_first::<Value, _>("SELECT @foo").unwrap(),
679                Some(Value::NULL)
680            );
681            conn.query_drop("SET @foo = 'foo'").unwrap();
682            conn.reset_connection(false);
683            drop(conn);
684
685            conn = pool.get_conn().unwrap();
686            assert_eq!(
687                conn.query_first::<String, _>("SELECT @foo")
688                    .unwrap()
689                    .unwrap(),
690                "foo",
691            );
692            drop(conn);
693
694            let pool = Pool::new(opts.pool_opts(pool_opts.with_reset_connection(false))).unwrap();
695            conn = pool.get_conn().unwrap();
696            conn.query_drop("SET @foo = 'foo'").unwrap();
697            drop(conn);
698            conn = pool.get_conn().unwrap();
699            assert_eq!(
700                conn.query_first::<String, _>("SELECT @foo")
701                    .unwrap()
702                    .unwrap(),
703                "foo",
704            );
705            drop(conn);
706        }
707
708        #[cfg(feature = "nightly")]
709        mod bench {
710            use test;
711
712            use std::thread;
713
714            use crate::{prelude::*, test_misc::get_opts, Pool};
715
716            #[bench]
717            fn many_prepexecs(bencher: &mut test::Bencher) {
718                let pool = Pool::new(get_opts()).unwrap();
719                bencher.iter(|| {
720                    "SELECT 1".with(()).run(&pool).unwrap();
721                });
722            }
723
724            #[bench]
725            fn many_prepares_threaded(bencher: &mut test::Bencher) {
726                let pool = Pool::new(get_opts()).unwrap();
727                bencher.iter(|| {
728                    let mut threads = Vec::new();
729                    for _ in 0..4 {
730                        let pool = pool.clone();
731                        threads.push(thread::spawn(move || {
732                            for _ in 0..250 {
733                                test::black_box(
734                                    "SELECT 1, 'hello world', 123.321, ?, ?, ?"
735                                        .with(("hello", "world", 65536))
736                                        .run(&pool)
737                                        .unwrap(),
738                                );
739                            }
740                        }));
741                    }
742                    for t in threads {
743                        t.join().unwrap();
744                    }
745                });
746            }
747
748            #[bench]
749            fn many_prepares_threaded_no_cache(bencher: &mut test::Bencher) {
750                let mut pool = Pool::new(get_opts()).unwrap();
751                pool.use_cache(false);
752                bencher.iter(|| {
753                    let mut threads = Vec::new();
754                    for _ in 0..4 {
755                        let pool = pool.clone();
756                        threads.push(thread::spawn(move || {
757                            for _ in 0..250 {
758                                test::black_box(
759                                    "SELECT 1, 'hello world', 123.321, ?, ?, ?"
760                                        .with(("hello", "world", 65536))
761                                        .run(&pool)
762                                        .unwrap(),
763                                );
764                            }
765                        }));
766                    }
767                    for t in threads {
768                        t.join().unwrap();
769                    }
770                });
771            }
772        }
773    }
774}