posixmq/
posixmq.rs

1/* posixmq 1.0.0 - Idiomatic rust library for using posix message queues
2 * Copyright 2019, 2020 Torbjørn Birch Moltu
3 *
4 * Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5 * http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6 * http://opensource.org/licenses/MIT>, at your option. This file may not be
7 * copied, modified, or distributed except according to those terms.
8 */
9
10//! Posix message queue wrapper with optional mio integration.
11//!
12//! Posix message queues are like pipes, but message-oriented which makes them
13//! safe to read by multiple processes. Messages are sorted based on an
14//! additional priority parameter. Queues are not placed in the normal file
15//! system, but uses a separate, flat namespace. Normal file permissions still
16//! apply though.
17//! For a longer introduction, see [`man mq_overview`](http://man7.org/linux/man-pages/man7/mq_overview.7.html)
18//! or [`man mq`](https://www.unix.com/man-page/netbsd/3/mq/).
19//!
20//! They are not all that useful, as only Linux and some BSDs implement them,
21//! and even there you might be limited to creating queues with a capacity of
22//! no more than 10 messages at a time.
23//!
24//! # Examples
25//!
26//! Send a couple messages:
27//! ```ignore
28//! use posixmq::PosixMq;
29//!
30//! // open the message queue if it exists, or create it if it doesn't.
31//! // names should start with a slash and have no more slashes.
32//! let mq = PosixMq::create("/hello_posixmq").unwrap();
33//! mq.send(0, b"message").unwrap();
34//! // messages with equal priority will be received in order
35//! mq.send(0, b"queue").unwrap();
36//! // but this message has higher priority and will be received first
37//! mq.send(10, b"Hello,").unwrap();
38//! ```
39//!
40//! and receive them:
41//! ```ignore
42//! use posixmq::PosixMq;
43//!
44//! // open the queue read-only, or fail if it doesn't exist.
45//! let mq = PosixMq::open("/hello_posixmq").unwrap();
46//! // delete the message queue when you don't need to open it again.
47//! // otherwise it will remain until the system is rebooted, consuming
48//! posixmq::remove_queue("/hello_posixmq").unwrap();
49//!
50//! // the receive buffer must be at least as big as the biggest possible
51//! // message, or you will not be allowed to receive anything.
52//! let mut buf = vec![0; mq.attributes().unwrap().max_msg_len];
53//! assert_eq!(mq.recv(&mut buf).unwrap(), (10, "Hello,".len()));
54//! assert_eq!(mq.recv(&mut buf).unwrap(), (0, "message".len()));
55//! assert_eq!(mq.recv(&mut buf).unwrap(), (0, "queue".len()));
56//! assert_eq!(&buf[..5], b"queue");
57//!
58//! // check that there are no more messages
59//! assert_eq!(mq.attributes().unwrap().current_messages, 0);
60//! // note that acting on this value is race-prone. A better way to do this
61//! // would be to switch our descriptor to non-blocking mode, and check for
62//! // an error of type `ErrorKind::WouldBlock`.
63//! ```
64//!
65//! With mio (and `features = ["mio_07"]` in Cargo.toml):
66#![cfg_attr(feature="mio_07", doc="```")]
67#![cfg_attr(not(feature="mio_07"), doc="```compile_fail")]
68//! # extern crate mio_07 as mio;
69//! # use mio::{Events, Poll, Interest, Token};
70//! # use std::io::ErrorKind;
71//! # use std::thread;
72//! // set up queue
73//! let mut receiver = posixmq::OpenOptions::readonly()
74//!     .nonblocking()
75//!     .capacity(3)
76//!     .max_msg_len(100)
77//!     .create_new()
78//!     .open("/mio")
79//!     .unwrap();
80//!
81//! // send something from another thread (or process)
82//! let sender = thread::spawn(move|| {
83//!     let sender = posixmq::OpenOptions::writeonly().open("/mio").unwrap();
84//!     posixmq::remove_queue("/mio").unwrap();
85//!     sender.send(0, b"async").unwrap();
86//! });
87//!
88//! // set up mio and register
89//! let mut poll = Poll::new().unwrap();
90//! poll.registry().register(&mut receiver, Token(0), Interest::READABLE).unwrap();
91//! let mut events = Events::with_capacity(10);
92//!
93//! poll.poll(&mut events, None).unwrap();
94//! for event in &events {
95//!     if event.token() == Token(0) {
96//!         loop {
97//!             let mut buf = [0; 100];
98//!             match receiver.recv(&mut buf) {
99//!                 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
100//!                 Err(e) => panic!("Error receiving message: {}", e),
101//!                 Ok((priority, len)) => {
102//!                     assert_eq!(priority, 0);
103//!                     assert_eq!(&buf[..len], b"async");
104//!                 }
105//!             }
106//!         }
107//!     }
108//! }
109//!
110//! sender.join().unwrap();
111//! ```
112//!
113//! See the examples/ directory for more.
114//!
115//! # Portability
116//!
117//! While the p in POSIX stands for Portable, that is not a fitting description
118//! of their message queues; Support is spotty even among *nix OSes.
119//! **Windows, macOS, OpenBSD, Android, ios, Rumprun, Fuchsia and Emscripten
120//! doesn't support posix message queues at all.**
121//!
122//! ## Compatible operating systems and features
123//!
124//! &nbsp; | Linux | FreeBSD 11+ | NetBSD | DragonFly BSD | Illumos | Solaris | VxWorks
125//! -|-|-|-|-|-|-|-
126//! core features | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes
127//! mio `Source` & `Evented` | Yes | Yes | unusable | Yes | No | No | No
128//! `FromRawFd`+`IntoRawFd`+[`try_clone()`](struct.PosixMq.html#method.try_clone) | Yes | No | Yes | Yes | No | No | No
129//! `AsRawFd`+[`set_cloexec()`](struct.PosixMq.html#method.set_cloexec) | Yes | Yes | Yes | Yes | No | No | No
130//! Tested? | Manually+CI | Manually+CI | Manually | Manually | Manually (on OmniOSce) | Cross-`check`ed on CI | No
131//!
132//! This library will fail to compile if the target OS doesn't have posix
133//! message queues.
134//!
135//! Feature explanations:
136//!
137//! * `FromRawFd`+`IntoRawFd`+[`try_clone()`](struct.PosixMq.html#method.try_clone):
138//!   For theese to work, the inner `mqd_t` type must be an `int`/`RawFd` typedef,
139//!   and known to represent a file descriptor.  
140//!   These impls are only available on OSes where this is known to be the case,
141//!   to increase the likelyhood that the core features will compile on an
142//!   unknown OS.
143//! * `AsRawFd`+[`set_cloexec()`](struct.PosixMq.html#method.set_cloexec):
144//!   Similar to `FromRawFd` and `IntoRawFd`, but FreeBSD 11+ has [a function](https://svnweb.freebsd.org/base/head/include/mqueue.h?revision=306588&view=markup#l54)
145//!   which lets one get a file descriptor from a `mqd_t`.  
146//!   Changing or querying close-on-exec requires `AsRawFd`, and is only 
147//!   only meaningful on operating systems that have the concept of `exec()`.  
148//!   [`is_cloexec()`](struct.PosixMq.html#method.is_cloexec) is always present
149//!   and returns `true` on OSes where close-on-exec cannot be disabled or one
150//!   cannot `exec()`. (posix message queue descriptors should have
151//!   close-on-exec set by default).
152//! * mio `Source` & `Evented`: The impls require both `AsRawFd`
153//!   and that mio compiles on the OS.
154//!   This does not guarantee that the event notification mechanism used by mio
155//!   supports posix message queues though. (registering fails on NetBSD)
156//!
157//! On Linux, message queues and their permissions can be viewed in
158//! `/dev/mqueue/`. The kernel *can* be compiled to not support posix message
159//! queues, so it's not guaranteed to always work. (such as on Android)
160//!
161//! On FreeBSD, the kernel module responsible for posix message queues
162//! is not loaded by default; Run `kldload mqueuefs` as root to enable it.
163//! To list queues, the file system must additionally be mounted first:
164//! `mount -t mqueuefs null $somewhere`.  
165//! Versions before 11 do not have the function used to get a file descriptor,
166//! so this library will not compile there.
167//!
168//! On NetBSD, re-opening message queues multiple times can eventually make all
169//! further opens fail. This does not affect programs that open a single
170//! queue once.  
171//! The mio integration compiles, but registering message queues with mio fails.  
172//! Because NetBSD ignores cloexec when opening or cloning descriptors, there
173//! is a race condition with other threads exec'ing before this library can
174//! enable close-on-exec for the descriptor.
175//!
176//! DragonFly BSD doesn't set cloexec when opening either, but does when
177//! cloning.
178//!
179//! ## OS-dependent restrictions and default values
180//!
181//! Not even limiting oneself to the core features is enough to guarantee
182//! portability!
183//!
184//! &nbsp; | Linux | FreeBSD | NetBSD | DragonFly BSD | Illumos
185//! -|-|-|-|-|-
186//! max priority | 32767 | 63 | **31** | 31 | 31
187//! default capacity | 10 | 10 | 32 | 32 | 128
188//! default max_msg_len | 8192 | 1024 | 992 | 992 | 1024
189//! max capacity | **10**\* | 100 | 512 | 512 | No limit
190//! max max_msg_len | **8192**\* | 16384 | 16384 | 16384 | No limit
191//! allows empty messages | Yes | Yes | No | No | Yes
192//! enforces name rules | Yes | Yes | No | No | Yes
193//! allows "/.", "/.." and "/" | No | No | Yes | Yes | Yes
194//!
195//! On Linux the listed size limits only apply to unprivileged processes.
196//! As root there instead appears to be a combined limit on memory usage of the
197//! form `capacity*(max_msg_len+k)`, but is several times higher than 10*8192.
198//!
199//! # Differences from the C API
200//!
201//! * [`send()`](struct.PosixMq.html#method.send),
202//!   [`recv()`](struct.PosixMq.html#method.recv) and the timed equivalents
203//!   tries again when EINTR / `ErrorKind::Interrupted` is returned.
204//!   (Consistent with how std does IO)
205//! * `open()` and all other methods which take `AsRef<[u8]>` prepends `'/'` to
206//!   the name if missing.
207//!   (They have to copy the name anyway, to append a terminating `'\0'`)
208//!   Use [`open_c()`](struct.OpenOptions.html#method.open_c) and
209//!   [`remove_queue_c()`](fn.remove_queue_c.html) if you need to interact with
210//!   queues on NetBSD or DragonFly that doesn't have a leading `'/'`.
211//!
212//! # Minimum supported Rust version
213//!
214//! The minimum supported Rust version for posixmq 1.0.z releases is 1.31.1.  
215//! Later 1.y.0 releases might increase this. Until rustup has builds for
216//! DragonFly BSD and Illumos, the minimum version will not be increased past
217//! what is available in the repositories for those operating systems.
218
219// # Why this crate requires `std`
220//
221// The libc crate doesn't expose `errno` in a portable way,
222// so `std::io::Error::last_os_error()` is required to give errors
223// more specific than "something went wrong".
224// Depending on std also means that functions can use `io::Error` and
225// `SystemTime` instead of custom types.
226
227#![allow(clippy::needless_return, clippy::redundant_closure, clippy::needless_lifetimes)] // style
228#![allow(clippy::range_plus_one)] // edge case: I think 1..x+1 is clearer than 1..=x
229#![allow(clippy::cast_lossless)] // improves portability when values are limited by the OS anyway
230// feel free to disable more lints
231
232use std::{io, mem, ptr};
233use std::ffi::CStr;
234use std::io::ErrorKind;
235use std::fmt::{self, Debug, Formatter};
236#[cfg(any(
237    target_os="linux", target_os="freebsd",
238    target_os="netbsd", target_os="dragonfly",
239))]
240use std::os::unix::io::{AsRawFd, RawFd};
241#[cfg(any(target_os="linux", target_os="netbsd", target_os="dragonfly"))]
242use std::os::unix::io::{FromRawFd, IntoRawFd};
243use std::time::{Duration, SystemTime};
244
245extern crate libc;
246use libc::{c_int, c_uint, c_char};
247#[cfg(not(all(target_arch="x86_64", target_os="linux", target_pointer_width="32")))]
248use libc::c_long;
249use libc::{mqd_t, mq_open, mq_close, mq_unlink, mq_send, mq_receive};
250use libc::{mq_attr, mq_getattr, mq_setattr};
251use libc::{timespec, time_t, mq_timedsend, mq_timedreceive};
252#[cfg(target_os="freebsd")]
253use libc::mq_getfd_np;
254use libc::{mode_t, O_ACCMODE, O_RDONLY, O_WRONLY, O_RDWR, O_CREAT, O_EXCL, O_NONBLOCK};
255#[cfg(any(
256    target_os="linux", target_os="freebsd",
257    target_os="netbsd", target_os="dragonfly",
258))]
259use libc::{fcntl, F_GETFD, FD_CLOEXEC, ioctl, FIOCLEX, FIONCLEX};
260#[cfg(any(target_os="linux", target_os="netbsd", target_os="dragonfly"))]
261use libc::F_DUPFD_CLOEXEC;
262
263#[cfg(feature="mio_06")]
264extern crate mio_06;
265#[cfg(feature="mio_06")]
266use mio_06::{event::Evented, unix::EventedFd, Ready, Poll, PollOpt};
267#[cfg(feature="mio_07")]
268extern crate mio_07;
269#[cfg(feature="mio_07")]
270use mio_07::{event::Source, unix::SourceFd, Registry, Interest};
271
272
273const CSTR_BUF_SIZE: usize = 48;
274fn with_name_as_cstr<F: FnOnce(&CStr)->Result<R,io::Error>, R>(mut name: &[u8],  f: F)
275-> Result<R,io::Error> {
276    if name.first() == Some(&b'/') {
277        name = &name[1..];
278    }
279    let mut longbuf: Box<[u8]>;
280    let mut shortbuf: [u8; CSTR_BUF_SIZE];
281    let c_bytes = if name.len() + 2 <= CSTR_BUF_SIZE {
282        shortbuf = [0; CSTR_BUF_SIZE];
283        &mut shortbuf[..name.len()+2]
284    } else {
285        longbuf = vec![0; name.len()+2].into_boxed_slice();
286        &mut longbuf
287    };
288    c_bytes[0] = b'/';
289    c_bytes[1..name.len()+1].copy_from_slice(name);
290
291    match CStr::from_bytes_with_nul(c_bytes) {
292        Ok(name) => f(name),
293        Err(_) => Err(io::Error::new(ErrorKind::InvalidInput, "contains nul byte"))
294    }
295}
296
297
298// Cannot use std::fs's because it doesn't expose getters,
299// and rolling our own means we can also use it for mq-specific capacities.
300/// Flags and parameters which control how a [`PosixMq`](struct.PosixMq.html)
301/// message queue is opened or created.
302#[derive(Clone,Copy, PartialEq,Eq)]
303pub struct OpenOptions {
304    flags: c_int,
305    mode: mode_t,
306    capacity: usize,
307    max_msg_len: usize,
308}
309
310impl Debug for OpenOptions {
311    fn fmt(&self,  fmtr: &mut Formatter) -> fmt::Result {
312        fmtr.debug_struct("OpenOptions")
313            .field(
314                "read",
315                &((self.flags & O_ACCMODE) == O_RDWR  ||  (self.flags & O_ACCMODE) == O_RDONLY)
316             )
317            .field(
318                "write",
319                &((self.flags & O_ACCMODE) == O_RDWR  ||  (self.flags & O_ACCMODE) == O_WRONLY)
320             )
321            .field("create", &(self.flags & O_CREAT != 0))
322            .field("open", &(self.flags & O_EXCL == 0))
323            .field("mode", &format_args!("{:03o}", self.mode))
324            .field("capacity", &self.capacity)
325            .field("max_msg_len", &self.max_msg_len)
326            .field("nonblocking", &((self.flags & O_NONBLOCK) != 0))
327            .finish()
328    }
329}
330
331impl OpenOptions {
332    fn new(flags: c_int) -> Self {
333        OpenOptions {
334            flags,
335            // default permissions to only accessible for owner
336            mode: 0o600,
337            capacity: 0,
338            max_msg_len: 0,
339        }
340    }
341
342    /// Open message queue for receiving only.
343    pub fn readonly() -> Self {
344        OpenOptions::new(O_RDONLY)
345    }
346
347    /// Open message queue for sending only.
348    pub fn writeonly() -> Self {
349        OpenOptions::new(O_WRONLY)
350    }
351
352    /// Open message queue both for sending and receiving.
353    pub fn readwrite() -> Self {
354        OpenOptions::new(O_RDWR)
355    }
356
357    /// Set permissions to create the queue with.
358    ///
359    /// Some bits might be cleared by the process's umask when creating the
360    /// queue, and unknown bits are ignored.
361    ///
362    /// This field is ignored if the queue already exists or should not be created.
363    /// If this method is not called, queues are created with mode 600.
364    pub fn mode(&mut self,  mode: u32) -> &mut Self {
365        // 32bit value for consistency with std::os::unix even though only 12
366        // bits are needed. Truncate if necessary because the OS ignores
367        // unknown bits anyway. (and they're probably always zero as well).
368        self.mode = mode as mode_t;
369        return self;
370    }
371
372    /// Set the maximum size of each message.
373    ///
374    /// `recv()` will fail if given a buffer smaller than this value.
375    ///
376    /// If max_msg_len and capacity are both zero (or not set), the queue
377    /// will be created with a maximum length and capacity decided by the
378    /// operating system.  
379    /// If this value is specified, capacity should also be, or opening the
380    /// message queue might fail.
381    pub fn max_msg_len(&mut self,  max_msg_len: usize) -> &mut Self {
382        self.max_msg_len = max_msg_len;
383        return self;
384    }
385
386    /// Set the maximum number of messages in the queue.
387    ///
388    /// When the queue is full, further `send()`s will either block
389    /// or fail with an error of type `ErrorKind::WouldBlock`.
390    ///
391    /// If both capacity and max_msg_len are zero (or not set), the queue
392    /// will be created with a maximum length and capacity decided by the
393    /// operating system.  
394    /// If this value is specified, max_msg_len should also be, or opening the
395    /// message queue might fail.
396    pub fn capacity(&mut self,  capacity: usize) -> &mut Self {
397        self.capacity = capacity;
398        return self;
399    }
400
401    /// Create message queue if it doesn't exist.
402    pub fn create(&mut self) -> &mut Self {
403        self.flags |= O_CREAT;
404        self.flags &= !O_EXCL;
405        return self;
406    }
407
408    /// Create a new queue, failing if the queue already exists.
409    pub fn create_new(&mut self) -> &mut Self {
410        self.flags |= O_CREAT | O_EXCL;
411        return self;
412    }
413
414    /// Require the queue to already exist, failing if it doesn't.
415    pub fn existing(&mut self) -> &mut Self {
416        self.flags &= !(O_CREAT | O_EXCL);
417        return self;
418    }
419
420    /// Open the message queue in non-blocking mode.
421    ///
422    /// This must be done if you want to use the message queue with mio.
423    pub fn nonblocking(&mut self) -> &mut Self {
424        self.flags |= O_NONBLOCK;
425        return self;
426    }
427
428    /// Open a queue with the specified options.
429    ///
430    /// If the name doesn't start with a '/', one will be prepended.
431    ///
432    /// # Errors
433    ///
434    /// * Queue doesn't exist (ENOENT) => `ErrorKind::NotFound`
435    /// * Name is just "/" (ENOENT) or is empty => `ErrorKind::NotFound`
436    /// * Queue already exists (EEXISTS) => `ErrorKind::AlreadyExists`
437    /// * Not permitted to open in this mode (EACCESS) => `ErrorKind::PermissionDenied`
438    /// * More than one '/' in name (EACCESS) => `ErrorKind::PermissionDenied`
439    /// * Invalid capacities (EINVAL) => `ErrorKind::InvalidInput`
440    /// * Capacities too high (EMFILE) => `ErrorKind::Other`
441    /// * Posix message queues are disabled (ENOSYS) => `ErrorKind::Other`
442    /// * Name contains '\0' => `ErrorKind::InvalidInput`
443    /// * Name is too long (ENAMETOOLONG) => `ErrorKind::Other`
444    /// * Unlikely (ENFILE, EMFILE, ENOMEM, ENOSPC) => `ErrorKind::Other`
445    /// * Possibly other
446    pub fn open<N: AsRef<[u8]> + ?Sized>(&self,  name: &N) -> Result<PosixMq, io::Error> {
447        pub fn open_slice(opts: &OpenOptions,  name: &[u8]) -> Result<PosixMq, io::Error> {
448            with_name_as_cstr(name, |name| opts.open_c(&name) )
449        }
450        open_slice(self, name.as_ref())
451    }
452
453    /// Open a queue with the specified options and without inspecting `name`
454    /// or allocating.
455    ///
456    /// This can on NetBSD be used to access message queues with names that
457    /// doesn't start with a '/'.
458    ///
459    /// # Errors
460    ///
461    /// * Queue doesn't exist (ENOENT) => `ErrorKind::NotFound`
462    /// * Name is just "/" (ENOENT) => `ErrorKind::NotFound`
463    /// * Queue already exists (EEXISTS) => `ErrorKind::AlreadyExists`
464    /// * Not permitted to open in this mode (EACCESS) => `ErrorKind::PermissionDenied`
465    /// * More than one '/' in name (EACCESS) => `ErrorKind::PermissionDenied`
466    /// * Invalid capacities (EINVAL) => `ErrorKind::InvalidInput`
467    /// * Posix message queues are disabled (ENOSYS) => `ErrorKind::Other`
468    /// * Name is empty (EINVAL) => `ErrorKind::InvalidInput`
469    /// * Name is too long (ENAMETOOLONG) => `ErrorKind::Other`
470    /// * Unlikely (ENFILE, EMFILE, ENOMEM, ENOSPC) => `ErrorKind::Other`
471    /// * Possibly other
472    pub fn open_c(&self,  name: &CStr) -> Result<PosixMq, io::Error> {
473        let opts = self;
474
475        // because mq_open is a vararg function, mode_t cannot be passed
476        // directly on FreeBSD where it's smaller than c_int.
477        let permissions = opts.mode as c_int;
478
479        let mut capacities = unsafe { mem::zeroed::<mq_attr>() };
480        let capacities_ptr = if opts.capacity != 0  ||  opts.max_msg_len != 0 {
481            capacities.mq_maxmsg = opts.capacity as KernelLong;
482            capacities.mq_msgsize = opts.max_msg_len as KernelLong;
483            &mut capacities as *mut mq_attr
484        } else {
485            ptr::null_mut::<mq_attr>()
486        };
487
488        let mqd = unsafe { mq_open(name.as_ptr(), opts.flags, permissions, capacities_ptr) };
489        // even when mqd_t is a pointer, -1 is the return value for error
490        if mqd == -1isize as mqd_t {
491            return Err(io::Error::last_os_error());
492        }
493        let mq = PosixMq{mqd};
494
495        // NetBSD and DragonFly BSD doesn't set cloexec by default and
496        // ignores O_CLOEXEC. Setting it with FIOCLEX works though.
497        // Propagate error if setting cloexec somehow fails, even though
498        // close-on-exec won't matter in most cases.
499        #[cfg(any(target_os="netbsd", target_os="dragonfly"))]
500        mq.set_cloexec(true)?;
501
502        Ok(mq)
503    }
504}
505
506
507/// Delete a posix message queue.
508///
509/// A `'/'` is prepended to the name if it doesn't start with one already.
510/// (it would have to append a `'\0'` and therefore allocate or copy anyway.)
511///
512/// Processes that have it open will still be able to use it.
513///
514/// # Errors
515///
516/// * Queue doesn't exist (ENOENT) => `ErrorKind::NotFound`
517/// * Name is invalid (ENOENT or EACCESS) => `ErrorKind::NotFound` or `ErrorKind::PermissionDenied`
518/// * Not permitted to delete the queue (EACCES) => `ErrorKind::PermissionDenied`
519/// * Posix message queues are disabled (ENOSYS) => `ErrorKind::Other`
520/// * Name contains '\0' bytes => `ErrorKind::InvalidInput`
521/// * Name is too long (ENAMETOOLONG) => `ErrorKind::Other`
522/// * Possibly other
523pub fn remove_queue<N: AsRef<[u8]> + ?Sized>(name: &N) -> Result<(), io::Error> {
524    fn remove_queue_slice(name: &[u8]) -> Result<(), io::Error> {
525        with_name_as_cstr(name, |name| remove_queue_c(&name) )
526    }
527    remove_queue_slice(name.as_ref())
528}
529
530/// Delete a posix message queue, without inspecting `name` or allocating.
531///
532/// This function is on NetBSD necessary to remove queues with names that
533/// doesn't start with a '/'.
534///
535/// # Errors
536///
537/// * Queue doesn't exist (ENOENT) => `ErrorKind::NotFound`
538/// * Not permitted to delete the queue (EACCES) => `ErrorKind::PermissionDenied`
539/// * Posix message queues are disabled (ENOSYS) => `ErrorKind::Other`
540/// * More than one '/' in name (EACCESS) => `ErrorKind::PermissionDenied`
541/// * Name is empty (EINVAL) => `ErrorKind::InvalidInput`
542/// * Name is invalid (ENOENT, EACCESS or EINVAL) => `ErrorKind::NotFound`
543///   `ErrorKind::PermissionDenied` or `ErrorKind::InvalidInput`
544/// * Name is too long (ENAMETOOLONG) => `ErrorKind::Other`
545/// * Possibly other
546pub fn remove_queue_c(name: &CStr) -> Result<(), io::Error> {
547    let name = name.as_ptr();
548    let ret = unsafe { mq_unlink(name) };
549    if ret != 0 {
550        return Err(io::Error::last_os_error());
551    }
552    Ok(())
553}
554
555
556// The fields of `mq_attr` and `timespec` are of type `long` on all targets
557// except x86_64-unknown-linux-gnux32, where they are `long long` (to match up
558// with normal x86_64 `long`).
559// Rusts lack of implicit widening makes this peculiarity annoying.
560#[cfg(all(target_arch="x86_64", target_os="linux", target_pointer_width="32"))]
561type KernelLong = i64;
562#[cfg(not(all(target_arch="x86_64", target_os="linux", target_pointer_width="32")))]
563type KernelLong = c_long;
564
565/// Contains information about the capacities and state of a posix message queue.
566///
567/// Created by [`PosixMq::attributes()`](struct.PosixMq.html#method.attributes).
568#[derive(Clone,Copy, PartialEq,Eq, Default)]
569pub struct Attributes {
570    /// The maximum size of messages that can be stored in the queue.
571    pub max_msg_len: usize,
572    /// The maximum number of messages in the queue.
573    pub capacity: usize,
574    /// The number of messages currently in the queue at the time the
575    /// attributes were retrieved.
576    pub current_messages: usize,
577    /// Whether the descriptor was set to nonblocking mode when
578    /// the attributes were retrieved.
579    pub nonblocking: bool,
580    _private: ()
581}
582
583impl Debug for Attributes {
584    fn fmt(&self,  fmtr: &mut fmt::Formatter) -> fmt::Result {
585        fmtr.debug_struct("Attributes")
586            .field("max_msg_len", &self.max_msg_len)
587            .field("capacity", &self.capacity)
588            .field("current_messages", &self.current_messages)
589            .field("nonblocking", &self.nonblocking)
590            .finish()
591    }
592}
593
594
595macro_rules! retry_if_interrupted {($call:expr) => {{
596    loop {// catch EINTR and retry
597        let ret = $call;
598        if ret != -1 {
599            break ret;
600        }
601        let err = io::Error::last_os_error();
602        if err.kind() != ErrorKind::Interrupted {
603            return Err(err)
604        }
605    }
606}}}
607
608/// Returns saturated timespec as err if systemtime cannot be represented
609fn deadline_to_realtime(deadline: SystemTime) -> Result<timespec, timespec> {
610    /// Don't use struct literal in case timespec has extra fields on some platform.
611    fn new_timespec(secs: time_t,  nsecs: KernelLong) -> timespec {
612        let mut ts: timespec = unsafe { mem::zeroed() };
613        ts.tv_sec = secs;
614        ts.tv_nsec = nsecs;
615        return ts;
616    }
617
618    // mq_timedsend() and mq_timedreceive() takes an absolute point in time,
619    // based on CLOCK_REALTIME aka SystemTime.
620    match deadline.duration_since(SystemTime::UNIX_EPOCH) {
621        // Currently SystemTime has the same range as the C types, but
622        // avoid truncation in case this changes.
623        Ok(expires) if expires.as_secs() > time_t::max_value() as u64
624            => Err(new_timespec(time_t::max_value(), 0)),
625        Ok(expires)
626            => Ok(new_timespec(expires.as_secs() as time_t, expires.subsec_nanos() as KernelLong)),
627        // A pre-1970 deadline is probably a bug, but handle it anyway.
628        // Based on https://github.com/solemnwarning/timespec/blob/master/README.md
629        // the subsecond part of timespec should be positive and counts toward
630        // positive infinity; (-1, 0) < (-1, 999999999) < (0, 0). This has the
631        // advantage of simplifying addition and subtraction, but is the
632        // opposite of Duration which counts away from zero.
633        // The minimum representable value is therefore (-min_value(), 0)
634        Err(ref earlier) if earlier.duration() > Duration::new(time_t::max_value() as u64 + 1, 0)
635            => Err(new_timespec(time_t::min_value()+1, 0)), // add one to avoid negation bugs
636        Err(ref earlier) if earlier.duration().subsec_nanos() == 0
637            => Ok(new_timespec(-(earlier.duration().as_secs() as time_t), 0)),
638        Err(earlier) => {
639            // convert fractional part from counting away from zero to counting
640            // toward positive infinity
641            let before = earlier.duration();
642            let secs = -(before.as_secs() as time_t) - 1;
643            let nsecs = 1_000_000_000 - before.subsec_nanos() as KernelLong;
644            Ok(new_timespec(secs, nsecs))
645        }
646    }
647}
648
649/// Returns an error if timeout is not representable or the produced deadline
650/// overflows.
651fn timeout_to_realtime(timeout: Duration) -> Result<timespec, io::Error> {
652    if let Ok(now) = deadline_to_realtime(SystemTime::now()) {
653        let mut expires = now;
654        expires.tv_sec = expires.tv_sec.wrapping_add(timeout.as_secs() as time_t);
655        // nanosecond values only use 30 bits, so adding two together is safe
656        // even if tv_nsec is an i32
657        expires.tv_nsec += timeout.subsec_nanos() as KernelLong;
658        const NANO: KernelLong = 1_000_000_000;
659        expires.tv_sec = expires.tv_sec.wrapping_add(expires.tv_nsec / NANO);
660        expires.tv_nsec %= NANO;
661        // check that the unsigned timeout is representable as a signed and
662        // possibly smaller time_t, and the additions didn't overflow.
663        // The second check will fail to catch Duration::new(!0, 999_999_999)
664        // (which makes tv_sec wrap completely to the original value), but
665        // the unsigned max value is not representable as a signed value and
666        // will be caught by the first check.
667        // Using wrapping_add and catching overflow afterwards avoids repeating
668        // the error creation and also handles negative system time.
669        if timeout.as_secs() > time_t::max_value() as u64  ||  expires.tv_sec < now.tv_sec {
670            Err(io::Error::new(ErrorKind::InvalidInput, "timeout is too long"))
671        } else {
672            Ok(expires)
673        }
674    } else {
675        Err(io::Error::new(ErrorKind::Other, "system time is not representable"))
676    }
677}
678
679
680/// A descriptor for an open posix message queue.
681///
682/// Message queues can be sent to and / or received from depending on the
683/// options it was opened with.
684///
685/// The descriptor is closed when this struct is dropped.
686///
687/// See [the documentation in the crate root](index.html) for examples,
688/// portability notes and OS details.
689pub struct PosixMq {
690    mqd: mqd_t
691}
692
693impl PosixMq {
694    /// Open an existing message queue in read-write mode.
695    ///
696    /// See [`OpenOptions::open()`](struct.OpenOptions.html#method.open) for
697    /// details and possible errors.
698    pub fn open<N: AsRef<[u8]> + ?Sized>(name: &N) -> Result<Self, io::Error> {
699        OpenOptions::readwrite().open(name)
700    }
701
702    /// Open a message queue in read-write mode, creating it if it doesn't exists.
703    ///
704    /// See [`OpenOptions::open()`](struct.OpenOptions.html#method.open) for
705    /// details and possible errors.
706    pub fn create<N: AsRef<[u8]> + ?Sized>(name: &N) -> Result<Self, io::Error> {
707        OpenOptions::readwrite().create().open(name)
708    }
709
710
711    /// Add a message to the queue.
712    ///
713    /// For maximum portability, avoid using priorities >= 32 or sending
714    /// zero-length messages.
715    ///
716    /// # Errors
717    ///
718    /// * Queue is full and opened in nonblocking mode (EAGAIN) => `ErrorKind::WouldBlock`
719    /// * Message is too big for the queue (EMSGSIZE) => `ErrorKind::Other`
720    /// * Message is zero-length and the OS doesn't allow this (EMSGSIZE) => `ErrorKind::Other`
721    /// * Priority is too high (EINVAL) => `ErrorKind::InvalidInput`
722    /// * Queue is opened in read-only mode (EBADF) => `ErrorKind::Other`
723    /// * Possibly other => `ErrorKind::Other`
724    pub fn send(&self,  priority: u32,  msg: &[u8]) -> Result<(), io::Error> {
725        let mptr = msg.as_ptr() as *const c_char;
726        retry_if_interrupted!(unsafe { mq_send(self.mqd, mptr, msg.len(), priority as c_uint) });
727        Ok(())
728    }
729
730    /// Take the message with the highest priority from the queue.
731    ///
732    /// The buffer must be at least as big as the maximum message length.
733    ///
734    /// # Errors
735    ///
736    /// * Queue is empty and opened in nonblocking mode (EAGAIN) => `ErrorKind::WouldBlock`
737    /// * The receive buffer is smaller than the queue's maximum message size (EMSGSIZE) => `ErrorKind::Other`
738    /// * Queue is opened in write-only mode (EBADF) => `ErrorKind::Other`
739    /// * Possibly other => `ErrorKind::Other`
740    pub fn recv(&self,  msgbuf: &mut [u8]) -> Result<(u32, usize), io::Error> {
741        let bptr = msgbuf.as_mut_ptr() as *mut c_char;
742        let mut priority = 0 as c_uint;
743        let len = retry_if_interrupted!(
744            unsafe { mq_receive(self.mqd, bptr, msgbuf.len(), &mut priority) }
745        );
746        // c_uint is unlikely to differ from u32, but even if it's bigger, the
747        // range of supported values will likely be far smaller.
748        Ok((priority as u32, len as usize))
749    }
750
751    /// Returns an `Iterator` which calls [`recv()`](#method.recv) repeatedly
752    /// with an appropriately sized buffer.
753    ///
754    /// If the message queue is opened in non-blocking mode the iterator can be
755    /// used to drain the queue. Otherwise it will block and never end.
756    pub fn iter<'a>(&'a self) -> Iter<'a> {
757        self.into_iter()
758    }
759
760
761    fn timedsend(&self,  priority: u32,  msg: &[u8],  deadline: &timespec)
762    -> Result<(), io::Error> {
763        let mptr = msg.as_ptr() as *const c_char;
764        retry_if_interrupted!(unsafe {
765            mq_timedsend(self.mqd, mptr, msg.len(), priority as c_uint, deadline)
766        });
767        Ok(())
768    }
769
770    /// Add a message to the queue or cancel if it's still full after a given
771    /// duration.
772    ///
773    /// Returns immediately if opened in nonblocking mode, and the timeout has
774    /// no effect.
775    ///
776    /// For maximum portability, avoid using priorities >= 32 or sending
777    /// zero-length messages.
778    ///
779    /// # Errors
780    ///
781    /// * Timeout expired (ETIMEDOUT) => `ErrorKind::TimedOut`
782    /// * Message is too big for the queue (EMSGSIZE) => `ErrorKind::Other`
783    /// * OS doesn't allow empty messages (EMSGSIZE) => `ErrorKind::Other`
784    /// * Priority is too high (EINVAL) => `ErrorKind::InvalidInput`
785    /// * Queue is full and opened in nonblocking mode (EAGAIN) => `ErrorKind::WouldBlock`
786    /// * Queue is opened in write-only mode (EBADF) => `ErrorKind::Other`
787    /// * Timeout is too long / not representable => `ErrorKind::InvalidInput`
788    /// * Possibly other => `ErrorKind::Other`
789    pub fn send_timeout(&self,  priority: u32,  msg: &[u8],  timeout: Duration)
790    -> Result<(), io::Error> {
791        timeout_to_realtime(timeout).and_then(|expires| self.timedsend(priority, msg, &expires) )
792    }
793
794    /// Add a message to the queue or cancel if the queue is still full at a
795    /// certain point in time.
796    ///
797    /// Returns immediately if opened in nonblocking mode, and the timeout has
798    /// no effect.  
799    /// The deadline is a `SystemTime` because the queues are intended for
800    /// inter-process commonication, and `Instant` might be process-specific.
801    ///
802    /// For maximum portability, avoid using priorities >= 32 or sending
803    /// zero-length messages.
804    ///
805    /// # Errors
806    ///
807    /// * Deadline reached (ETIMEDOUT) => `ErrorKind::TimedOut`
808    /// * Message is too big for the queue (EMSGSIZE) => `ErrorKind::Other`
809    /// * OS doesn't allow empty messages (EMSGSIZE) => `ErrorKind::Other`
810    /// * Priority is too high (EINVAL) => `ErrorKind::InvalidInput`
811    /// * Queue is full and opened in nonblocking mode (EAGAIN) => `ErrorKind::WouldBlock`
812    /// * Queue is opened in write-only mode (EBADF) => `ErrorKind::Other`
813    /// * Possibly other => `ErrorKind::Other`
814    pub fn send_deadline(&self,  priority: u32,  msg: &[u8],  deadline: SystemTime)
815    -> Result<(), io::Error> {
816        match deadline_to_realtime(deadline) {
817            Ok(expires) => self.timedsend(priority, msg, &expires),
818            Err(_) => Err(io::Error::new(ErrorKind::InvalidInput, "deadline is not representable"))
819        }
820    }
821
822    fn timedreceive(&self,  msgbuf: &mut[u8],  deadline: &timespec)
823    -> Result<(u32, usize), io::Error> {
824        let bptr = msgbuf.as_mut_ptr() as *mut c_char;
825        let mut priority: c_uint = 0;
826        let len = retry_if_interrupted!(
827            unsafe { mq_timedreceive(self.mqd, bptr, msgbuf.len(), &mut priority, deadline) }
828        );
829        Ok((priority as u32, len as usize))
830    }
831
832    /// Take the message with the highest priority from the queue or cancel if
833    /// the queue still empty after a given duration.
834    ///
835    /// Returns immediately if opened in nonblocking mode, and the timeout has
836    /// no effect.
837    ///
838    /// # Errors
839    ///
840    /// * Timeout expired (ETIMEDOUT) => `ErrorKind::TimedOut`
841    /// * The receive buffer is smaller than the queue's maximum message size (EMSGSIZE) => `ErrorKind::Other`
842    /// * Queue is empty and opened in nonblocking mode (EAGAIN) => `ErrorKind::WouldBlock`
843    /// * Queue is opened in read-only mode (EBADF) => `ErrorKind::Other`
844    /// * Timeout is too long / not representable => `ErrorKind::InvalidInput`
845    /// * Possibly other => `ErrorKind::Other`
846    pub fn recv_timeout(&self,  msgbuf: &mut[u8],  timeout: Duration)
847    -> Result<(u32, usize), io::Error> {
848        timeout_to_realtime(timeout).and_then(|expires| self.timedreceive(msgbuf, &expires) )
849    }
850
851    /// Take the message with the highest priority from the queue or cancel if
852    /// the queue is still empty at a point in time.
853    ///
854    /// Returns immediately if opened in nonblocking mode, and the timeout has
855    /// no effect.  
856    /// The deadline is a `SystemTime` because the queues are intended for
857    /// inter-process commonication, and `Instant` might be process-specific.
858    ///
859    /// # Errors
860    ///
861    /// * Deadline reached (ETIMEDOUT) => `ErrorKind::TimedOut`
862    /// * The receive buffer is smaller than the queue's maximum message size (EMSGSIZE) => `ErrorKind::Other`
863    /// * Queue is empty and opened in nonblocking mode (EAGAIN) => `ErrorKind::WouldBlock`
864    /// * Queue is opened in read-only mode (EBADF) => `ErrorKind::Other`
865    /// * Possibly other => `ErrorKind::Other`
866    pub fn recv_deadline(&self,  msgbuf: &mut[u8],  deadline: SystemTime)
867    -> Result<(u32, usize), io::Error> {
868        match deadline_to_realtime(deadline) {
869            Ok(expires) => self.timedreceive(msgbuf, &expires),
870            Err(_) => Err(io::Error::new(ErrorKind::InvalidInput, "deadline is not representable"))
871        }
872    }
873
874
875    /// Get information about the state of the message queue.
876    ///
877    /// # Errors
878    ///
879    /// Retrieving these attributes should only fail if the underlying
880    /// descriptor has been closed or is not a message queue.
881    ///
882    /// On operating systems where the descriptor is a pointer, such as on
883    /// FreeBSD and Illumos, such bugs will enable undefined behavior
884    /// and this call will dereference freed or uninitialized memory.  
885    /// (That doesn't make this function unsafe though -
886    /// [`PosixMq::from_raw_mqd()`](#method.from_raw_mqd) and `mq_close()` are.)
887    ///
888    /// While a `send()` or `recv()` ran in place of this call would also have
889    /// failed immediately and therefore not blocked, The descriptor might have
890    /// become used for another queue when a *later* `send()` or `recv()` is
891    /// performed. The descriptor might then be in blocking mode.
892    ///
893    /// # Examples
894    ///
895    /// ```
896    /// # let _ = posixmq::remove_queue("/with_custom_capacity");
897    /// let mq = posixmq::OpenOptions::readwrite()
898    ///     .create_new()
899    ///     .max_msg_len(100)
900    ///     .capacity(3)
901    ///     .open("/with_custom_capacity")
902    ///     .expect("create queue");
903    /// let attrs = mq.attributes().expect("get attributes for queue");
904    /// assert_eq!(attrs.max_msg_len, 100);
905    /// assert_eq!(attrs.capacity, 3);
906    /// assert_eq!(attrs.current_messages, 0);
907    /// assert!(!attrs.nonblocking);
908    /// ```
909    ///
910    /// Ignore the error:
911    ///
912    /// (Will only happen with buggy code (incorrect usage of
913    /// [`from_raw_fd()`](#method.from_raw_fd) or similar)).
914    ///
915    #[cfg_attr(
916        any(target_os="linux", target_os="android", target_os="netbsd", target_os="dragonfly"),
917        doc="```"
918    )]
919    #[cfg_attr(
920        not(any(target_os="linux", target_os="android", target_os="netbsd", target_os="dragonfly")),
921        doc="```no_compile"
922    )]
923    /// # use std::os::unix::io::FromRawFd;
924    /// # let bad = unsafe { posixmq::PosixMq::from_raw_fd(-1) };
925    /// let attrs = bad.attributes().unwrap_or_default();
926    /// assert_eq!(attrs.max_msg_len, 0);
927    /// assert_eq!(attrs.capacity, 0);
928    /// assert_eq!(attrs.current_messages, 0);
929    /// assert!(!attrs.nonblocking);
930    /// ```
931    pub fn attributes(&self) -> Result<Attributes, io::Error> {
932        let mut attrs: mq_attr = unsafe { mem::zeroed() };
933        if unsafe { mq_getattr(self.mqd, &mut attrs) } == -1 {
934            Err(io::Error::last_os_error())
935        } else {
936            Ok(Attributes {
937                max_msg_len: attrs.mq_msgsize as usize,
938                capacity: attrs.mq_maxmsg as usize,
939                current_messages: attrs.mq_curmsgs as usize,
940                nonblocking: (attrs.mq_flags & (O_NONBLOCK as KernelLong)) != 0,
941                _private: ()
942            })
943        }
944    }
945
946    /// Check whether this descriptor is in nonblocking mode.
947    ///
948    /// # Errors
949    ///
950    /// Should only fail as result of buggy code that either created this
951    /// descriptor from something that is not a queue, or has already closed
952    /// the underlying descriptor.  
953    /// (This function will not silently succeed if the fd points to anything
954    /// other than a queue (for example a socket), as this function
955    /// is a wrapper around [`attributes()`][#method.attributes].)  
956    /// To ignore failure, one can write `.is_nonblocking().unwrap_or(false)`.
957    ///
958    /// ## An error doesn't guarantee that any further [`send()`](#method.send) or [`recv()`](#method.recv) wont block.
959    ///
960    /// While a `send()` or `recv()` ran in place of this call would also have
961    /// failed immediately and therefore not blocked, the descriptor might have
962    /// become used for another queue when a *later* `send()` or `recv()` is
963    /// performed. The descriptor might then be in blocking mode.
964    pub fn is_nonblocking(&self) -> Result<bool, io::Error> {
965        match self.attributes() {
966            Ok(attrs) => Ok(attrs.nonblocking),
967            Err(e) => Err(e),
968        }
969    }
970
971    /// Enable or disable nonblocking mode for this descriptor.
972    ///
973    /// This can also be set when opening the message queue,
974    /// with [`OpenOptions::nonblocking()`](struct.OpenOptions.html#method.nonblocking).
975    ///
976    /// # Errors
977    ///
978    /// Setting nonblocking mode should only fail due to incorrect usage of
979    /// `from_raw_fd()` or `as_raw_fd()`, see the documentation on
980    /// [`attributes()`](struct.PosixMq.html#method.attributes) for details.
981    pub fn set_nonblocking(&self,  nonblocking: bool) -> Result<(), io::Error> {
982        let mut attrs: mq_attr = unsafe { mem::zeroed() };
983        attrs.mq_flags = if nonblocking {O_NONBLOCK as KernelLong} else {0};
984        let res = unsafe { mq_setattr(self.mqd, &attrs, ptr::null_mut()) };
985        if res == -1 {
986            return Err(io::Error::last_os_error());
987        }
988        Ok(())
989    }
990
991
992    /// Create a new descriptor for the same message queue.
993    ///
994    /// The new descriptor will have close-on-exec set.
995    ///
996    /// This function is not available on FreeBSD, Illumos or Solaris.
997    #[cfg(any(target_os="linux", target_os="dragonfly", target_os="netbsd"))]
998    pub fn try_clone(&self) -> Result<Self, io::Error> {
999        let mq = match unsafe { fcntl(self.mqd, F_DUPFD_CLOEXEC, 0) } {
1000            -1 => return Err(io::Error::last_os_error()),
1001            fd => PosixMq{mqd: fd},
1002        };
1003        // NetBSD ignores the cloexec part of F_DUPFD_CLOEXEC
1004        // (but DragonFly BSD respects it here)
1005        #[cfg(target_os="netbsd")]
1006        mq.set_cloexec(true)?;
1007        Ok(mq)
1008    }
1009
1010
1011    /// Check whether this descriptor will be closed if the process `exec`s
1012    /// into another program.
1013    ///
1014    /// Posix message queues are closed on exec by default,
1015    /// but this can be changed with [`set_cloexec()`](#method.set_cloexec).
1016    ///
1017    /// This function is not available on Illumos, Solaris or VxWorks.
1018    ///
1019    /// # Errors
1020    ///
1021    /// Retrieving this flag should only fail if the descriptor
1022    /// is already closed.  
1023    /// In that case it will obviously not be open after execing,
1024    /// so treating errors as `true` should be safe.
1025    ///
1026    /// # Examples
1027    ///
1028    /// ```
1029    /// let queue = posixmq::PosixMq::create("is_cloexec").expect("open queue");
1030    /// # posixmq::remove_queue("is_cloexec").expect("delete queue");
1031    /// assert!(queue.is_cloexec().unwrap_or(true));
1032    /// ```
1033    pub fn is_cloexec(&self) -> Result<bool, io::Error> {
1034        #[cfg(any(
1035            target_os="linux", target_os="freebsd",
1036            target_os="netbsd", target_os="dragonfly",
1037        ))]
1038        match unsafe { fcntl(self.as_raw_fd(), F_GETFD) } {
1039            -1 => Err(io::Error::last_os_error()),
1040            flags => Ok((flags & FD_CLOEXEC) != 0),
1041        }
1042        #[cfg(not(any(
1043            target_os="linux", target_os="freebsd",
1044            target_os="netbsd", target_os="dragonfly",
1045        )))]
1046        Err(io::Error::new(
1047            ErrorKind::Other,
1048            "close-on-exec information is not available"
1049        ))
1050    }
1051
1052    /// Change close-on-exec for this descriptor.
1053    ///
1054    /// It is on by default, so this method should only be called when one
1055    /// wants the descriptor to remain open afte `exec`ing.
1056    ///
1057    /// This function is not available on Illumos, Solaris or VxWorks.
1058    ///
1059    /// # Errors
1060    ///
1061    /// This function should only fail if the underlying file descriptor has
1062    /// been closed (due to incorrect usage of `from_raw_fd()` or similar),
1063    /// and not reused for something else yet.
1064    #[cfg(any(
1065        target_os="linux", target_os="freebsd",
1066        target_os="netbsd", target_os="dragonfly",
1067    ))]
1068    pub fn set_cloexec(&self,  cloexec: bool) -> Result<(), io::Error> {
1069        let op = if cloexec {FIOCLEX} else {FIONCLEX};
1070        match unsafe { ioctl(self.as_raw_fd(), op) } {
1071            // Don't hide the error here, because callers can ignore the
1072            // returned value if they want.
1073            -1 => Err(io::Error::last_os_error()),
1074            _ => Ok(())
1075        }
1076    }
1077
1078
1079    /// Create a `PosixMq` from an already opened message queue descriptor.
1080    ///
1081    /// This function should only be used for ffi or if calling `mq_open()`
1082    /// directly for some reason.  
1083    /// Use [`from_raw_fd()`](#method.from_raw_fd) instead if the surrounding
1084    /// code requires `mqd_t` to be a file descriptor.
1085    ///
1086    /// # Safety
1087    ///
1088    /// On some operating systems `mqd_t` is a pointer, which means that the
1089    /// safety of most other methods depend on it being correct.
1090    pub unsafe fn from_raw_mqd(mqd: mqd_t) -> Self {
1091        PosixMq{mqd}
1092    }
1093
1094    /// Get the raw message queue descriptor.
1095    ///
1096    /// This function should only be used for passing to ffi code or to access
1097    /// portable features not exposed by this wrapper (such as calling
1098    /// `mq_notify()` or not automatically retrying on EINTR /
1099    /// `ErrorKind::Interrupted` when sending or receiving).
1100    ///
1101    /// If you need a file descriptor, use `as_raw_fd()` instead for increased
1102    /// portability.
1103    /// ([`as_raw_fd()`](#method.as_raw_fd) can sometimes retrieve an
1104    /// underlying file descriptor even if `mqd_t` is not an `int`.)
1105    pub fn as_raw_mqd(&self) -> mqd_t {
1106        self.mqd
1107    }
1108
1109    /// Convert this wrapper into the raw message queue descriptor without
1110    /// closing it.
1111    ///
1112    /// This function should only be used for ffi; If you need a file
1113    /// descriptor use [`into_raw_fd()`](#method.into_raw_fd) instead.
1114    pub fn into_raw_mqd(self) -> mqd_t {
1115        let mqd = self.mqd;
1116        mem::forget(self);
1117        return mqd;
1118    }
1119}
1120
1121/// Get an underlying file descriptor for the message queue.
1122///
1123/// If you just need the raw `mqd_t`, use
1124/// [`as_raw_mqd()`](struct.PosixMq.html#method.as_raw_mqd)
1125/// instead for increased portability.
1126///
1127/// This impl is not available on Illumos, Solaris or VxWorks.
1128#[cfg(any(
1129    target_os="linux", target_os="freebsd",
1130    target_os="netbsd", target_os="dragonfly",
1131))]
1132impl AsRawFd for PosixMq {
1133    // On Linux, NetBSD and DragonFly BSD, `mqd_t` is a plain file descriptor
1134    // and can trivially be convverted, but this is not guaranteed, nor the
1135    // case on FreeBSD, Illumos and Solaris.
1136    #[cfg(not(target_os="freebsd"))]
1137    fn as_raw_fd(&self) -> RawFd {
1138        self.mqd
1139    }
1140
1141    // FreeBSD has mq_getfd_np() (where _np stands for non-portable)
1142    #[cfg(target_os="freebsd")]
1143    fn as_raw_fd(&self) -> RawFd {
1144        unsafe { mq_getfd_np(self.mqd) }
1145    }
1146}
1147
1148/// Create a `PosixMq` wrapper from a raw file descriptor.
1149///
1150/// Note that the message queue will be closed when the returned `PosixMq` goes
1151/// out of scope / is dropped.
1152///
1153/// This impl is not available on FreeBSD, Illumos or Solaris; If you got a
1154/// `mqd_t` in a portable fashion (from FFI code or by calling `mq_open()`
1155/// yourself for some reason), use
1156/// [`from_raw_mqd()`](struct.PosixMq.html#method.from_raw_mqd) instead.
1157#[cfg(any(target_os="linux", target_os="netbsd", target_os="dragonfly"))]
1158impl FromRawFd for PosixMq {
1159    unsafe fn from_raw_fd(fd: RawFd) -> Self {
1160        PosixMq{mqd: fd}
1161    }
1162}
1163
1164/// Convert the `PosixMq` into a raw file descriptor without closing the
1165/// message queue.
1166///
1167/// This impl is not available on FreeBSD, Illumos or Solaris. If you need to
1168/// transfer ownership to FFI code accepting a `mqd_t`, use
1169/// [`into_raw_mqd()`](struct.PosixMq.html#method.into_raw_mqd) instead.
1170#[cfg(any(target_os="linux", target_os="netbsd", target_os="dragonfly"))]
1171impl IntoRawFd for PosixMq {
1172    fn into_raw_fd(self) -> RawFd {
1173        let fd = self.mqd;
1174        mem::forget(self);
1175        return fd;
1176    }
1177}
1178
1179
1180impl IntoIterator for PosixMq {
1181    type Item = (u32, Vec<u8>);
1182    type IntoIter = IntoIter;
1183    fn into_iter(self) -> IntoIter {
1184        IntoIter {
1185            max_msg_len: match self.attributes() {
1186                Ok(attrs) => attrs.max_msg_len,
1187                Err(_) => 0,
1188            },
1189            mq: self,
1190        }
1191    }
1192}
1193
1194impl<'a> IntoIterator for &'a PosixMq {
1195    type Item = (u32, Vec<u8>);
1196    type IntoIter = Iter<'a>;
1197    fn into_iter(self) -> Iter<'a> {
1198        Iter {
1199            max_msg_len: match self.attributes() {
1200                Ok(attrs) => attrs.max_msg_len,
1201                Err(_) => 0,
1202            },
1203            mq: self,
1204        }
1205    }
1206}
1207
1208
1209impl Debug for PosixMq {
1210    fn fmt(&self,  fmtr: &mut Formatter) -> fmt::Result {
1211        let mut representation = fmtr.debug_struct("PosixMq");
1212        // display raw value and name unless we know it's a plain fd
1213        #[cfg(not(any(
1214            target_os="linux", target_os="netbsd", target_os="dragonfly",
1215        )))]
1216        representation.field("mqd", &self.mqd);
1217        // show file descriptor where we have one
1218        #[cfg(any(
1219            target_os="linux", target_os="freebsd",
1220            target_os="netbsd", target_os="dragonfly",
1221        ))]
1222        representation.field("fd", &self.as_raw_fd());
1223        return representation.finish();
1224    }
1225}
1226
1227impl Drop for PosixMq {
1228    fn drop(&mut self) {
1229        unsafe { mq_close(self.mqd) };
1230    }
1231}
1232
1233// On some platforms mqd_t is a pointer, so Send and Sync aren't
1234// auto-implemented there. While I don't feel certain enough to
1235// blanket-implement Sync, I can't see why an implementation would make it UB
1236// to move operations to another thread.
1237unsafe impl Send for PosixMq {}
1238
1239// On FreeBSD, mqd_t is a `struct{int fd, struct sigev_node* node}*`,
1240// but the sigevent is only accessed by `mq_notify()`, so it's thread-safe
1241// as long as that function requires `&mut self` or isn't exposed.
1242//  src: https://svnweb.freebsd.org/base/head/lib/librt/mq.c?view=markup
1243// On Illumos, mqd_t points to a rather complex struct, but the functions use
1244// mutexes and semaphores, so I assume they're totally thread-safe.
1245//  src: https://github.com/illumos/illumos-gate/blob/master/usr/src/lib/libc/port/rt/mqueue.c
1246// Solaris I assume is equivalent to Illumos, because the Illumos code has
1247// barely been modified after the initial source code release.
1248// Linux, NetBSD and DragonFly BSD gets Sync auto-implemented because
1249// mqd_t is an int.
1250#[cfg(any(target_os="freebsd", target_os="illumos", target_os="solaris"))]
1251unsafe impl Sync for PosixMq {}
1252
1253
1254/// Allow receiving event notifications through mio (version 0.6).
1255///
1256/// This impl requires the `mio_06` feature to be enabled:
1257///
1258/// ```toml
1259/// [dependencies]
1260/// posixmq = {version="1.0", features=["mio_06"]}
1261/// ```
1262///
1263/// Remember to open the queue in non-blocking mode. (with `OpenOptions.noblocking()`)
1264#[cfg(feature="mio_06")]
1265impl Evented for PosixMq {
1266    fn register(&self,  poll: &Poll,  token: mio_06::Token,  interest: Ready,  opts: PollOpt)
1267    -> Result<(), io::Error> {
1268        EventedFd(&self.as_raw_fd()).register(poll, token, interest, opts)
1269    }
1270
1271    fn reregister(&self,  poll: &Poll,  token: mio_06::Token,  interest: Ready,  opts: PollOpt)
1272    -> Result<(), io::Error> {
1273        EventedFd(&self.as_raw_fd()).reregister(poll, token, interest, opts)
1274    }
1275
1276    fn deregister(&self,  poll: &Poll) -> Result<(), io::Error> {
1277        EventedFd(&self.as_raw_fd()).deregister(poll)
1278    }
1279}
1280
1281
1282/// Allow receiving event notifications through mio (version 0.7).
1283///
1284/// This impl requires the `mio_07` feature to be enabled:
1285///
1286/// ```toml
1287/// [dependencies]
1288/// posixmq = {version="1.0", features=["mio_07"]}
1289/// ```
1290///
1291/// Due to a [long-lived bug in cargo]() this will currently enable
1292/// the os_reactor feature of mio. This is not intended, and can change in the
1293/// future.
1294///
1295/// You probably want to make the queue non-blocking: Either use
1296/// [`OpenOptions.noblocking()`](struct.OpenOptions.html#method.nonblocking)
1297/// when preparing to open the queue, or call [`set_nonblocking(true)`](struct.PosixMq.html#method.set_nonblocking).
1298#[cfg(feature="mio_07")]
1299impl Source for &PosixMq {
1300    fn register(&mut self,  registry: &Registry,  token: mio_07::Token,  interest: Interest)
1301    -> Result<(), io::Error> {
1302        SourceFd(&self.as_raw_fd()).register(registry, token, interest)
1303    }
1304
1305    fn reregister(&mut self,  registry: &Registry,  token: mio_07::Token,  interest: Interest)
1306    -> Result<(), io::Error> {
1307        SourceFd(&self.as_raw_fd()).reregister(registry, token, interest)
1308    }
1309
1310    fn deregister(&mut self,  registry: &Registry) -> Result<(), io::Error> {
1311        SourceFd(&self.as_raw_fd()).deregister(registry)
1312    }
1313}
1314
1315#[cfg(feature="mio_07")]
1316impl Source for PosixMq {
1317    fn register(&mut self,  registry: &Registry,  token: mio_07::Token,  interest: Interest)
1318    -> Result<(), io::Error> {
1319        {&mut &*self}.register(registry, token, interest)
1320    }
1321
1322    fn reregister(&mut self,  registry: &Registry,  token: mio_07::Token,  interest: Interest)
1323    -> Result<(), io::Error> {
1324        {&mut &*self}.reregister(registry, token, interest)
1325    }
1326
1327    fn deregister(&mut self,  registry: &Registry) -> Result<(), io::Error> {
1328        {&mut &*self}.deregister(registry)
1329    }
1330}
1331
1332
1333/// An `Iterator` that calls [`recv()`](struct.PosixMq.html#method.recv) on a borrowed [`PosixMq`](struct.PosixMq.html).
1334///
1335/// Iteration ends when a `recv()` fails with an `ErrorKind::WouldBlock` error,
1336/// but is infinite if the descriptor is in blocking mode.
1337///
1338/// # Panics
1339///
1340/// `next()` will panic if an error of type other than `ErrorKind::WouldBlock`
1341/// or `ErrorKind::Interrupted` occurs.
1342#[derive(Clone)]
1343pub struct Iter<'a> {
1344    mq: &'a PosixMq,
1345    /// Cached
1346    max_msg_len: usize,
1347}
1348
1349impl<'a> Iterator for Iter<'a> {
1350    type Item = (u32, Vec<u8>);
1351    fn next(&mut self) -> Option<(u32, Vec<u8>)> {
1352        let mut buf = vec![0; self.max_msg_len];
1353        match self.mq.recv(&mut buf) {
1354            Err(ref e) if e.kind() == ErrorKind::WouldBlock => None,
1355            Err(e) => panic!("Cannot receive from posix message queue: {}", e),
1356            Ok((priority, len)) => {
1357                buf.truncate(len);
1358                Some((priority, buf))
1359            }
1360        }
1361    }
1362}
1363
1364/// An `Iterator` that [`recv()`](struct.PosixMq.html#method.recv)s
1365/// messages from an owned [`PosixMq`](struct.PosixMq.html).
1366///
1367/// Iteration ends when a `recv()` fails with an `ErrorKind::WouldBlock` error,
1368/// but is infinite if the descriptor is in blocking mode.
1369///
1370/// # Panics
1371///
1372/// `next()` will panic if an error of type other than `ErrorKind::WouldBlock`
1373/// or `ErrorKind::Interrupted` occurs.
1374pub struct IntoIter {
1375    mq: PosixMq,
1376    max_msg_len: usize,
1377}
1378
1379impl Iterator for IntoIter {
1380    type Item = (u32, Vec<u8>);
1381    fn next(&mut self) -> Option<(u32, Vec<u8>)> {
1382        Iter{mq: &self.mq, max_msg_len: self.max_msg_len}.next()
1383    }
1384}
1385
1386
1387#[cfg(debug_assertions)]
1388mod doctest_md_files {
1389    macro_rules! mdfile {($content:expr, $(#[$meta:meta])* $attach_to:ident) => {
1390        #[doc=$content]
1391        #[allow(unused)]
1392        $(#[$meta])* // can't #[cfg_attr(, doc=)] in .md file
1393        enum $attach_to {}
1394    }}
1395    mdfile!{include_str!("README.md"), Readme}
1396}