mysql/conn/
query_result.rs

1// Copyright (c) 2020 rust-mysql-simple contributors
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9pub use mysql_common::proto::{Binary, Text};
10
11use mysql_common::{io::ParseBuf, packets::OkPacket, row::RowDeserializer, value::ServerSide};
12
13use std::{borrow::Cow, marker::PhantomData, sync::Arc};
14
15use crate::{conn::ConnMut, Column, Conn, Error, Result, Row};
16
17/// Result set kind.
18pub trait Protocol: 'static + Send + Sync {
19    fn next(conn: &mut Conn, columns: Arc<[Column]>) -> Result<Option<Row>>;
20}
21
22impl Protocol for Text {
23    fn next(conn: &mut Conn, columns: Arc<[Column]>) -> Result<Option<Row>> {
24        match conn.next_row_packet()? {
25            Some(pld) => {
26                let row = ParseBuf(&pld).parse::<RowDeserializer<(), Text>>(columns)?;
27                Ok(Some(row.into()))
28            }
29            None => Ok(None),
30        }
31    }
32}
33
34impl Protocol for Binary {
35    fn next(conn: &mut Conn, columns: Arc<[Column]>) -> Result<Option<Row>> {
36        match conn.next_row_packet()? {
37            Some(pld) => {
38                let row = ParseBuf(&pld).parse::<RowDeserializer<ServerSide, Binary>>(columns)?;
39                Ok(Some(row.into()))
40            }
41            None => Ok(None),
42        }
43    }
44}
45
46/// State of a result set iterator.
47#[derive(Debug)]
48enum SetIteratorState {
49    /// Iterator is in a non-empty set.
50    InSet(Arc<[Column]>),
51    /// Iterator is in an empty set.
52    InEmptySet(OkPacket<'static>),
53    /// Iterator is in an errored result set.
54    Errored(Error),
55    /// Next result set isn't handled.
56    OnBoundary,
57    /// No more result sets.
58    Done,
59}
60
61impl SetIteratorState {
62    fn ok_packet(&self) -> Option<&OkPacket<'_>> {
63        if let Self::InEmptySet(ref ok) = self {
64            Some(ok)
65        } else {
66            None
67        }
68    }
69
70    fn columns(&self) -> Option<&Arc<[Column]>> {
71        if let Self::InSet(ref cols) = self {
72            Some(cols)
73        } else {
74            None
75        }
76    }
77}
78
79impl From<Arc<[Column]>> for SetIteratorState {
80    fn from(columns: Arc<[Column]>) -> Self {
81        Self::InSet(columns)
82    }
83}
84
85impl From<OkPacket<'static>> for SetIteratorState {
86    fn from(ok_packet: OkPacket<'static>) -> Self {
87        Self::InEmptySet(ok_packet)
88    }
89}
90
91impl From<Error> for SetIteratorState {
92    fn from(err: Error) -> Self {
93        Self::Errored(err)
94    }
95}
96
97impl From<ResultSetMeta> for SetIteratorState {
98    fn from(value: ResultSetMeta) -> Self {
99        match value {
100            ResultSetMeta::Empty(ok_packet) => Self::from(ok_packet),
101            ResultSetMeta::NonEmptyWithMeta(column) => Self::from(column),
102        }
103    }
104}
105
106pub(crate) enum ResultSetMeta {
107    Empty(OkPacket<'static>),
108    NonEmptyWithMeta(Arc<[Column]>),
109}
110
111/// Response to a query or statement execution.
112///
113/// It is an iterator:
114/// *   over result sets (via `Self::current_set`)
115/// *   over rows of a current result set (via `Iterator` impl)
116#[derive(Debug)]
117pub struct QueryResult<'c, 't, 'tc, T: crate::prelude::Protocol> {
118    conn: ConnMut<'c, 't, 'tc>,
119    state: SetIteratorState,
120    set_index: usize,
121    protocol: PhantomData<T>,
122}
123
124impl<'c, 't, 'tc, T: crate::prelude::Protocol> QueryResult<'c, 't, 'tc, T> {
125    fn from_state(
126        conn: ConnMut<'c, 't, 'tc>,
127        state: SetIteratorState,
128    ) -> QueryResult<'c, 't, 'tc, T> {
129        QueryResult {
130            conn,
131            state,
132            set_index: 0,
133            protocol: PhantomData,
134        }
135    }
136
137    pub(crate) fn new(
138        conn: ConnMut<'c, 't, 'tc>,
139        meta: ResultSetMeta,
140    ) -> QueryResult<'c, 't, 'tc, T> {
141        Self::from_state(conn, meta.into())
142    }
143
144    /// Updates state with the next result set, if any.
145    ///
146    /// Returns `false` if there is no next result set.
147    ///
148    /// **Requires:** `self.state == OnBoundary`
149    fn handle_next(&mut self) {
150        debug_assert!(
151            matches!(self.state, SetIteratorState::OnBoundary),
152            "self.state != OnBoundary"
153        );
154
155        if self.conn.more_results_exists() {
156            match self.conn.handle_result_set() {
157                Ok(info) => self.state = info.into_query_meta().into(),
158                Err(err) => self.state = err.into(),
159            }
160            self.set_index += 1;
161        } else {
162            self.state = SetIteratorState::Done;
163        }
164    }
165
166    /// Returns an iterator over the current result set.
167    #[deprecated = "Please use QueryResult::iter"]
168    pub fn next_set<'d>(&'d mut self) -> Option<ResultSet<'c, 't, 'tc, 'd, T>> {
169        self.iter()
170    }
171
172    /// Returns an iterator over the current result set.
173    ///
174    /// The returned iterator will be consumed either by the caller
175    /// or implicitly by the `ResultSet::drop`. This operation
176    /// will advance `self` to the next result set (if any).
177    ///
178    /// The following code describes the behavior:
179    ///
180    /// ```rust
181    /// # mysql::doctest_wrapper!(__result, {
182    /// # use mysql::*;
183    /// # use mysql::prelude::*;
184    /// # let pool = Pool::new(get_opts())?;
185    /// # let mut conn = pool.get_conn()?;
186    /// # conn.query_drop("CREATE TEMPORARY TABLE mysql.tbl(id INT NOT NULL PRIMARY KEY)")?;
187    ///
188    /// let mut query_result = conn.query_iter("\
189    ///     INSERT INTO mysql.tbl (id) VALUES (3, 4);\
190    ///     SELECT * FROM mysql.tbl;
191    ///     UPDATE mysql.tbl SET id = id + 1;")?;
192    ///
193    /// // query_result is on the first result set at the moment
194    /// {
195    ///     assert_eq!(query_result.affected_rows(), 2);
196    ///     assert_eq!(query_result.last_insert_id(), Some(4));
197    ///
198    ///     let first_result_set = query_result.iter().unwrap();
199    ///     assert_eq!(first_result_set.affected_rows(), 2);
200    ///     assert_eq!(first_result_set.last_insert_id(), Some(4));
201    /// }
202    ///
203    /// // the first result set is now dropped, so query_result is on the second result set
204    /// {
205    ///     assert_eq!(query_result.affected_rows(), 0);
206    ///     assert_eq!(query_result.last_insert_id(), None);
207    ///     
208    ///     let mut second_result_set = query_result.iter().unwrap();
209    ///
210    ///     let first_row = second_result_set.next().unwrap().unwrap();
211    ///     assert_eq!(from_row::<u8>(first_row), 3_u8);
212    ///     let second_row = second_result_set.next().unwrap().unwrap();
213    ///     assert_eq!(from_row::<u8>(second_row), 4_u8);
214    ///
215    ///     assert!(second_result_set.next().is_none());
216    ///
217    ///     // second_result_set is consumed but still represents the second result set
218    ///     assert_eq!(second_result_set.affected_rows(), 0);
219    /// }
220    ///
221    /// // the second result set is now dropped, so query_result is on the third result set
222    /// assert_eq!(query_result.affected_rows(), 2);
223    ///
224    /// // QueryResult::drop simply does the following:
225    /// while query_result.iter().is_some() {}
226    /// # });
227    /// ```
228    pub fn iter<'d>(&'d mut self) -> Option<ResultSet<'c, 't, 'tc, 'd, T>> {
229        use SetIteratorState::*;
230
231        if let OnBoundary | Done = &self.state {
232            debug_assert!(
233                !self.conn.more_results_exists(),
234                "the next state must be handled by the Iterator::next"
235            );
236
237            None
238        } else {
239            Some(ResultSet {
240                set_index: self.set_index,
241                inner: self,
242            })
243        }
244    }
245
246    /// Returns the number of affected rows for the current result set.
247    pub fn affected_rows(&self) -> u64 {
248        self.state
249            .ok_packet()
250            .map(|ok| ok.affected_rows())
251            .unwrap_or_default()
252    }
253
254    /// Returns the last insert id for the current result set.
255    pub fn last_insert_id(&self) -> Option<u64> {
256        self.state
257            .ok_packet()
258            .map(|ok| ok.last_insert_id())
259            .unwrap_or_default()
260    }
261
262    /// Returns the warnings count for the current result set.
263    pub fn warnings(&self) -> u16 {
264        self.state
265            .ok_packet()
266            .map(|ok| ok.warnings())
267            .unwrap_or_default()
268    }
269
270    /// [Info] for the current result set.
271    ///
272    /// Will be empty if not defined.
273    ///
274    /// [Info]: http://dev.mysql.com/doc/internals/en/packet-OK_Packet.html
275    pub fn info_ref(&self) -> &[u8] {
276        self.state
277            .ok_packet()
278            .and_then(|ok| ok.info_ref())
279            .unwrap_or_default()
280    }
281
282    /// [Info] for the current result set.
283    ///
284    /// Will be empty if not defined.
285    ///
286    /// [Info]: http://dev.mysql.com/doc/internals/en/packet-OK_Packet.html
287    pub fn info_str(&self) -> Cow<str> {
288        self.state
289            .ok_packet()
290            .and_then(|ok| ok.info_str())
291            .unwrap_or_else(|| "".into())
292    }
293
294    /// Returns columns of the current result rest.
295    pub fn columns(&self) -> SetColumns {
296        SetColumns {
297            inner: self.state.columns().map(Into::into),
298        }
299    }
300}
301
302impl<'c, 't, 'tc, T: crate::prelude::Protocol> Drop for QueryResult<'c, 't, 'tc, T> {
303    fn drop(&mut self) {
304        while self.iter().is_some() {}
305    }
306}
307
308#[derive(Debug)]
309pub struct ResultSet<'a, 'b, 'c, 'd, T: crate::prelude::Protocol> {
310    set_index: usize,
311    inner: &'d mut QueryResult<'a, 'b, 'c, T>,
312}
313
314impl<'a, 'b, 'c, T: crate::prelude::Protocol> std::ops::Deref for ResultSet<'a, 'b, 'c, '_, T> {
315    type Target = QueryResult<'a, 'b, 'c, T>;
316
317    fn deref(&self) -> &Self::Target {
318        &*self.inner
319    }
320}
321
322impl<T: crate::prelude::Protocol> Iterator for ResultSet<'_, '_, '_, '_, T> {
323    type Item = Result<Row>;
324
325    fn next(&mut self) -> Option<Self::Item> {
326        if self.set_index == self.inner.set_index {
327            self.inner.next()
328        } else {
329            None
330        }
331    }
332}
333
334impl<T: crate::prelude::Protocol> Iterator for QueryResult<'_, '_, '_, T> {
335    type Item = Result<Row>;
336
337    fn next(&mut self) -> Option<Self::Item> {
338        use SetIteratorState::*;
339
340        let state = std::mem::replace(&mut self.state, OnBoundary);
341
342        match state {
343            InSet(cols) => match T::next(&mut self.conn, cols.clone()) {
344                Ok(Some(row)) => {
345                    self.state = InSet(cols);
346                    Some(Ok(row))
347                }
348                Ok(None) => {
349                    self.handle_next();
350                    None
351                }
352                Err(e) => {
353                    self.handle_next();
354                    Some(Err(e))
355                }
356            },
357            InEmptySet(_) => {
358                self.handle_next();
359                None
360            }
361            Errored(err) => {
362                self.handle_next();
363                Some(Err(err))
364            }
365            OnBoundary => None,
366            Done => {
367                self.state = Done;
368                None
369            }
370        }
371    }
372}
373
374impl<T: crate::prelude::Protocol> Drop for ResultSet<'_, '_, '_, '_, T> {
375    fn drop(&mut self) {
376        while self.next().is_some() {}
377    }
378}
379
380#[derive(Debug, Clone, PartialEq)]
381pub struct SetColumns<'a> {
382    inner: Option<&'a Arc<[Column]>>,
383}
384
385impl<'a> SetColumns<'a> {
386    /// Returns an index of a column by its name.
387    pub fn column_index<U: AsRef<str>>(&self, name: U) -> Option<usize> {
388        let name = name.as_ref().as_bytes();
389        self.inner
390            .as_ref()
391            .and_then(|cols| cols.iter().position(|col| col.name_ref() == name))
392    }
393}
394
395impl AsRef<[Column]> for SetColumns<'_> {
396    fn as_ref(&self) -> &[Column] {
397        self.inner
398            .as_ref()
399            .map(|cols| &(*cols)[..])
400            .unwrap_or(&[][..])
401    }
402}