mysql/conn/
local_infile.rs

1// Copyright (c) 2020 rust-mysql-simple contributors
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use std::{
10    fmt, io,
11    sync::{Arc, Mutex},
12};
13
14use crate::Conn;
15
16pub(crate) type LocalInfileInner =
17    Arc<Mutex<dyn for<'a> FnMut(&'a [u8], &'a mut LocalInfile<'_>) -> io::Result<()> + Send>>;
18
19/// Callback to handle requests for local files.
20/// Consult [Mysql documentation](https://dev.mysql.com/doc/refman/5.7/en/load-data.html) for the
21/// format of local infile data.
22///
23/// # Support
24///
25/// Note that older versions of Mysql server may not support this functionality.
26///
27/// ```rust
28/// # mysql::doctest_wrapper!(__result, {
29/// use mysql::*;
30/// use mysql::prelude::*;
31///
32/// use std::io::Write;
33///
34/// let pool = Pool::new(get_opts())?;
35/// let mut conn = pool.get_conn().unwrap();
36///
37/// conn.query_drop("CREATE TEMPORARY TABLE mysql.tbl(a TEXT)").unwrap();
38/// conn.set_local_infile_handler(Some(
39///     LocalInfileHandler::new(|file_name, writer| {
40///         writer.write_all(b"row1: file name is ")?;
41///         writer.write_all(file_name)?;
42///         writer.write_all(b"\n")?;
43///
44///         writer.write_all(b"row2: foobar\n")
45///     })
46/// ));
47///
48/// match conn.query_drop("LOAD DATA LOCAL INFILE 'file_name' INTO TABLE mysql.tbl") {
49///     Ok(_) => (),
50///     Err(Error::MySqlError(ref e)) if e.code == 1148 => {
51///         // functionality is not supported by the server
52///         return Ok(());
53///     }
54///     err => {
55///         err.unwrap();
56///     }
57/// }
58///
59/// let mut row_num = 0;
60/// let result: Vec<String> = conn.query("SELECT * FROM mysql.tbl").unwrap();
61/// assert_eq!(
62///     result,
63///     vec!["row1: file name is file_name".to_string(), "row2: foobar".to_string()],
64/// );
65/// # });
66/// ```
67#[derive(Clone)]
68pub struct LocalInfileHandler(pub(crate) LocalInfileInner);
69
70impl LocalInfileHandler {
71    pub fn new<F>(f: F) -> Self
72    where
73        F: for<'a> FnMut(&'a [u8], &'a mut LocalInfile<'_>) -> io::Result<()> + Send + 'static,
74    {
75        LocalInfileHandler(Arc::new(Mutex::new(f)))
76    }
77}
78
79impl PartialEq for LocalInfileHandler {
80    fn eq(&self, other: &LocalInfileHandler) -> bool {
81        std::ptr::eq(&*self.0, &*other.0)
82    }
83}
84
85impl Eq for LocalInfileHandler {}
86
87impl fmt::Debug for LocalInfileHandler {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
89        write!(f, "LocalInfileHandler(...)")
90    }
91}
92
93/// Local in-file stream.
94/// The callback will be passed a reference to this stream, which it
95/// should use to write the contents of the requested file.
96/// See [LocalInfileHandler](struct.LocalInfileHandler.html) documentation for example.
97#[derive(Debug)]
98pub struct LocalInfile<'a> {
99    buffer: io::Cursor<&'a mut [u8]>,
100    conn: &'a mut Conn,
101}
102
103impl<'a> LocalInfile<'a> {
104    pub(crate) const BUFFER_SIZE: usize = 4096;
105
106    pub(crate) fn new(buffer: &'a mut [u8; LocalInfile::BUFFER_SIZE], conn: &'a mut Conn) -> Self {
107        Self {
108            buffer: io::Cursor::new(buffer),
109            conn,
110        }
111    }
112}
113
114impl io::Write for LocalInfile<'_> {
115    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
116        if self.buffer.position() == Self::BUFFER_SIZE as u64 {
117            self.flush()?;
118        }
119        self.buffer.write(buf)
120    }
121
122    fn flush(&mut self) -> io::Result<()> {
123        let n = self.buffer.position() as usize;
124        if n > 0 {
125            let mut range = &self.buffer.get_ref()[..n];
126            self.conn
127                .write_packet(&mut range)
128                .map_err(|e| io::Error::new(io::ErrorKind::Other, Box::new(e)))?;
129        }
130        self.buffer.set_position(0);
131        Ok(())
132    }
133}