posixmq/posixmq.rs
1/* posixmq 1.0.0 - Idiomatic rust library for using posix message queues
2 * Copyright 2019, 2020 Torbjørn Birch Moltu
3 *
4 * Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5 * http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6 * http://opensource.org/licenses/MIT>, at your option. This file may not be
7 * copied, modified, or distributed except according to those terms.
8 */
9
10//! Posix message queue wrapper with optional mio integration.
11//!
12//! Posix message queues are like pipes, but message-oriented which makes them
13//! safe to read by multiple processes. Messages are sorted based on an
14//! additional priority parameter. Queues are not placed in the normal file
15//! system, but uses a separate, flat namespace. Normal file permissions still
16//! apply though.
17//! For a longer introduction, see [`man mq_overview`](http://man7.org/linux/man-pages/man7/mq_overview.7.html)
18//! or [`man mq`](https://www.unix.com/man-page/netbsd/3/mq/).
19//!
20//! They are not all that useful, as only Linux and some BSDs implement them,
21//! and even there you might be limited to creating queues with a capacity of
22//! no more than 10 messages at a time.
23//!
24//! # Examples
25//!
26//! Send a couple messages:
27//! ```ignore
28//! use posixmq::PosixMq;
29//!
30//! // open the message queue if it exists, or create it if it doesn't.
31//! // names should start with a slash and have no more slashes.
32//! let mq = PosixMq::create("/hello_posixmq").unwrap();
33//! mq.send(0, b"message").unwrap();
34//! // messages with equal priority will be received in order
35//! mq.send(0, b"queue").unwrap();
36//! // but this message has higher priority and will be received first
37//! mq.send(10, b"Hello,").unwrap();
38//! ```
39//!
40//! and receive them:
41//! ```ignore
42//! use posixmq::PosixMq;
43//!
44//! // open the queue read-only, or fail if it doesn't exist.
45//! let mq = PosixMq::open("/hello_posixmq").unwrap();
46//! // delete the message queue when you don't need to open it again.
47//! // otherwise it will remain until the system is rebooted, consuming
48//! posixmq::remove_queue("/hello_posixmq").unwrap();
49//!
50//! // the receive buffer must be at least as big as the biggest possible
51//! // message, or you will not be allowed to receive anything.
52//! let mut buf = vec![0; mq.attributes().unwrap().max_msg_len];
53//! assert_eq!(mq.recv(&mut buf).unwrap(), (10, "Hello,".len()));
54//! assert_eq!(mq.recv(&mut buf).unwrap(), (0, "message".len()));
55//! assert_eq!(mq.recv(&mut buf).unwrap(), (0, "queue".len()));
56//! assert_eq!(&buf[..5], b"queue");
57//!
58//! // check that there are no more messages
59//! assert_eq!(mq.attributes().unwrap().current_messages, 0);
60//! // note that acting on this value is race-prone. A better way to do this
61//! // would be to switch our descriptor to non-blocking mode, and check for
62//! // an error of type `ErrorKind::WouldBlock`.
63//! ```
64//!
65//! With mio (and `features = ["mio_07"]` in Cargo.toml):
66#![cfg_attr(feature="mio_07", doc="```")]
67#![cfg_attr(not(feature="mio_07"), doc="```compile_fail")]
68//! # extern crate mio_07 as mio;
69//! # use mio::{Events, Poll, Interest, Token};
70//! # use std::io::ErrorKind;
71//! # use std::thread;
72//! // set up queue
73//! let mut receiver = posixmq::OpenOptions::readonly()
74//! .nonblocking()
75//! .capacity(3)
76//! .max_msg_len(100)
77//! .create_new()
78//! .open("/mio")
79//! .unwrap();
80//!
81//! // send something from another thread (or process)
82//! let sender = thread::spawn(move|| {
83//! let sender = posixmq::OpenOptions::writeonly().open("/mio").unwrap();
84//! posixmq::remove_queue("/mio").unwrap();
85//! sender.send(0, b"async").unwrap();
86//! });
87//!
88//! // set up mio and register
89//! let mut poll = Poll::new().unwrap();
90//! poll.registry().register(&mut receiver, Token(0), Interest::READABLE).unwrap();
91//! let mut events = Events::with_capacity(10);
92//!
93//! poll.poll(&mut events, None).unwrap();
94//! for event in &events {
95//! if event.token() == Token(0) {
96//! loop {
97//! let mut buf = [0; 100];
98//! match receiver.recv(&mut buf) {
99//! Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
100//! Err(e) => panic!("Error receiving message: {}", e),
101//! Ok((priority, len)) => {
102//! assert_eq!(priority, 0);
103//! assert_eq!(&buf[..len], b"async");
104//! }
105//! }
106//! }
107//! }
108//! }
109//!
110//! sender.join().unwrap();
111//! ```
112//!
113//! See the examples/ directory for more.
114//!
115//! # Portability
116//!
117//! While the p in POSIX stands for Portable, that is not a fitting description
118//! of their message queues; Support is spotty even among *nix OSes.
119//! **Windows, macOS, OpenBSD, Android, ios, Rumprun, Fuchsia and Emscripten
120//! doesn't support posix message queues at all.**
121//!
122//! ## Compatible operating systems and features
123//!
124//! | Linux | FreeBSD 11+ | NetBSD | DragonFly BSD | Illumos | Solaris | VxWorks
125//! -|-|-|-|-|-|-|-
126//! core features | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes
127//! mio `Source` & `Evented` | Yes | Yes | unusable | Yes | No | No | No
128//! `FromRawFd`+`IntoRawFd`+[`try_clone()`](struct.PosixMq.html#method.try_clone) | Yes | No | Yes | Yes | No | No | No
129//! `AsRawFd`+[`set_cloexec()`](struct.PosixMq.html#method.set_cloexec) | Yes | Yes | Yes | Yes | No | No | No
130//! Tested? | Manually+CI | Manually+CI | Manually | Manually | Manually (on OmniOSce) | Cross-`check`ed on CI | No
131//!
132//! This library will fail to compile if the target OS doesn't have posix
133//! message queues.
134//!
135//! Feature explanations:
136//!
137//! * `FromRawFd`+`IntoRawFd`+[`try_clone()`](struct.PosixMq.html#method.try_clone):
138//! For theese to work, the inner `mqd_t` type must be an `int`/`RawFd` typedef,
139//! and known to represent a file descriptor.
140//! These impls are only available on OSes where this is known to be the case,
141//! to increase the likelyhood that the core features will compile on an
142//! unknown OS.
143//! * `AsRawFd`+[`set_cloexec()`](struct.PosixMq.html#method.set_cloexec):
144//! Similar to `FromRawFd` and `IntoRawFd`, but FreeBSD 11+ has [a function](https://svnweb.freebsd.org/base/head/include/mqueue.h?revision=306588&view=markup#l54)
145//! which lets one get a file descriptor from a `mqd_t`.
146//! Changing or querying close-on-exec requires `AsRawFd`, and is only
147//! only meaningful on operating systems that have the concept of `exec()`.
148//! [`is_cloexec()`](struct.PosixMq.html#method.is_cloexec) is always present
149//! and returns `true` on OSes where close-on-exec cannot be disabled or one
150//! cannot `exec()`. (posix message queue descriptors should have
151//! close-on-exec set by default).
152//! * mio `Source` & `Evented`: The impls require both `AsRawFd`
153//! and that mio compiles on the OS.
154//! This does not guarantee that the event notification mechanism used by mio
155//! supports posix message queues though. (registering fails on NetBSD)
156//!
157//! On Linux, message queues and their permissions can be viewed in
158//! `/dev/mqueue/`. The kernel *can* be compiled to not support posix message
159//! queues, so it's not guaranteed to always work. (such as on Android)
160//!
161//! On FreeBSD, the kernel module responsible for posix message queues
162//! is not loaded by default; Run `kldload mqueuefs` as root to enable it.
163//! To list queues, the file system must additionally be mounted first:
164//! `mount -t mqueuefs null $somewhere`.
165//! Versions before 11 do not have the function used to get a file descriptor,
166//! so this library will not compile there.
167//!
168//! On NetBSD, re-opening message queues multiple times can eventually make all
169//! further opens fail. This does not affect programs that open a single
170//! queue once.
171//! The mio integration compiles, but registering message queues with mio fails.
172//! Because NetBSD ignores cloexec when opening or cloning descriptors, there
173//! is a race condition with other threads exec'ing before this library can
174//! enable close-on-exec for the descriptor.
175//!
176//! DragonFly BSD doesn't set cloexec when opening either, but does when
177//! cloning.
178//!
179//! ## OS-dependent restrictions and default values
180//!
181//! Not even limiting oneself to the core features is enough to guarantee
182//! portability!
183//!
184//! | Linux | FreeBSD | NetBSD | DragonFly BSD | Illumos
185//! -|-|-|-|-|-
186//! max priority | 32767 | 63 | **31** | 31 | 31
187//! default capacity | 10 | 10 | 32 | 32 | 128
188//! default max_msg_len | 8192 | 1024 | 992 | 992 | 1024
189//! max capacity | **10**\* | 100 | 512 | 512 | No limit
190//! max max_msg_len | **8192**\* | 16384 | 16384 | 16384 | No limit
191//! allows empty messages | Yes | Yes | No | No | Yes
192//! enforces name rules | Yes | Yes | No | No | Yes
193//! allows "/.", "/.." and "/" | No | No | Yes | Yes | Yes
194//!
195//! On Linux the listed size limits only apply to unprivileged processes.
196//! As root there instead appears to be a combined limit on memory usage of the
197//! form `capacity*(max_msg_len+k)`, but is several times higher than 10*8192.
198//!
199//! # Differences from the C API
200//!
201//! * [`send()`](struct.PosixMq.html#method.send),
202//! [`recv()`](struct.PosixMq.html#method.recv) and the timed equivalents
203//! tries again when EINTR / `ErrorKind::Interrupted` is returned.
204//! (Consistent with how std does IO)
205//! * `open()` and all other methods which take `AsRef<[u8]>` prepends `'/'` to
206//! the name if missing.
207//! (They have to copy the name anyway, to append a terminating `'\0'`)
208//! Use [`open_c()`](struct.OpenOptions.html#method.open_c) and
209//! [`remove_queue_c()`](fn.remove_queue_c.html) if you need to interact with
210//! queues on NetBSD or DragonFly that doesn't have a leading `'/'`.
211//!
212//! # Minimum supported Rust version
213//!
214//! The minimum supported Rust version for posixmq 1.0.z releases is 1.31.1.
215//! Later 1.y.0 releases might increase this. Until rustup has builds for
216//! DragonFly BSD and Illumos, the minimum version will not be increased past
217//! what is available in the repositories for those operating systems.
218
219// # Why this crate requires `std`
220//
221// The libc crate doesn't expose `errno` in a portable way,
222// so `std::io::Error::last_os_error()` is required to give errors
223// more specific than "something went wrong".
224// Depending on std also means that functions can use `io::Error` and
225// `SystemTime` instead of custom types.
226
227#![allow(clippy::needless_return, clippy::redundant_closure, clippy::needless_lifetimes)] // style
228#![allow(clippy::range_plus_one)] // edge case: I think 1..x+1 is clearer than 1..=x
229#![allow(clippy::cast_lossless)] // improves portability when values are limited by the OS anyway
230// feel free to disable more lints
231
232use std::{io, mem, ptr};
233use std::ffi::CStr;
234use std::io::ErrorKind;
235use std::fmt::{self, Debug, Formatter};
236#[cfg(any(
237 target_os="linux", target_os="freebsd",
238 target_os="netbsd", target_os="dragonfly",
239))]
240use std::os::unix::io::{AsRawFd, RawFd};
241#[cfg(any(target_os="linux", target_os="netbsd", target_os="dragonfly"))]
242use std::os::unix::io::{FromRawFd, IntoRawFd};
243use std::time::{Duration, SystemTime};
244
245extern crate libc;
246use libc::{c_int, c_uint, c_char};
247#[cfg(not(all(target_arch="x86_64", target_os="linux", target_pointer_width="32")))]
248use libc::c_long;
249use libc::{mqd_t, mq_open, mq_close, mq_unlink, mq_send, mq_receive};
250use libc::{mq_attr, mq_getattr, mq_setattr};
251use libc::{timespec, time_t, mq_timedsend, mq_timedreceive};
252#[cfg(target_os="freebsd")]
253use libc::mq_getfd_np;
254use libc::{mode_t, O_ACCMODE, O_RDONLY, O_WRONLY, O_RDWR, O_CREAT, O_EXCL, O_NONBLOCK};
255#[cfg(any(
256 target_os="linux", target_os="freebsd",
257 target_os="netbsd", target_os="dragonfly",
258))]
259use libc::{fcntl, F_GETFD, FD_CLOEXEC, ioctl, FIOCLEX, FIONCLEX};
260#[cfg(any(target_os="linux", target_os="netbsd", target_os="dragonfly"))]
261use libc::F_DUPFD_CLOEXEC;
262
263#[cfg(feature="mio_06")]
264extern crate mio_06;
265#[cfg(feature="mio_06")]
266use mio_06::{event::Evented, unix::EventedFd, Ready, Poll, PollOpt};
267#[cfg(feature="mio_07")]
268extern crate mio_07;
269#[cfg(feature="mio_07")]
270use mio_07::{event::Source, unix::SourceFd, Registry, Interest};
271
272
273const CSTR_BUF_SIZE: usize = 48;
274fn with_name_as_cstr<F: FnOnce(&CStr)->Result<R,io::Error>, R>(mut name: &[u8], f: F)
275-> Result<R,io::Error> {
276 if name.first() == Some(&b'/') {
277 name = &name[1..];
278 }
279 let mut longbuf: Box<[u8]>;
280 let mut shortbuf: [u8; CSTR_BUF_SIZE];
281 let c_bytes = if name.len() + 2 <= CSTR_BUF_SIZE {
282 shortbuf = [0; CSTR_BUF_SIZE];
283 &mut shortbuf[..name.len()+2]
284 } else {
285 longbuf = vec![0; name.len()+2].into_boxed_slice();
286 &mut longbuf
287 };
288 c_bytes[0] = b'/';
289 c_bytes[1..name.len()+1].copy_from_slice(name);
290
291 match CStr::from_bytes_with_nul(c_bytes) {
292 Ok(name) => f(name),
293 Err(_) => Err(io::Error::new(ErrorKind::InvalidInput, "contains nul byte"))
294 }
295}
296
297
298// Cannot use std::fs's because it doesn't expose getters,
299// and rolling our own means we can also use it for mq-specific capacities.
300/// Flags and parameters which control how a [`PosixMq`](struct.PosixMq.html)
301/// message queue is opened or created.
302#[derive(Clone,Copy, PartialEq,Eq)]
303pub struct OpenOptions {
304 flags: c_int,
305 mode: mode_t,
306 capacity: usize,
307 max_msg_len: usize,
308}
309
310impl Debug for OpenOptions {
311 fn fmt(&self, fmtr: &mut Formatter) -> fmt::Result {
312 fmtr.debug_struct("OpenOptions")
313 .field(
314 "read",
315 &((self.flags & O_ACCMODE) == O_RDWR || (self.flags & O_ACCMODE) == O_RDONLY)
316 )
317 .field(
318 "write",
319 &((self.flags & O_ACCMODE) == O_RDWR || (self.flags & O_ACCMODE) == O_WRONLY)
320 )
321 .field("create", &(self.flags & O_CREAT != 0))
322 .field("open", &(self.flags & O_EXCL == 0))
323 .field("mode", &format_args!("{:03o}", self.mode))
324 .field("capacity", &self.capacity)
325 .field("max_msg_len", &self.max_msg_len)
326 .field("nonblocking", &((self.flags & O_NONBLOCK) != 0))
327 .finish()
328 }
329}
330
331impl OpenOptions {
332 fn new(flags: c_int) -> Self {
333 OpenOptions {
334 flags,
335 // default permissions to only accessible for owner
336 mode: 0o600,
337 capacity: 0,
338 max_msg_len: 0,
339 }
340 }
341
342 /// Open message queue for receiving only.
343 pub fn readonly() -> Self {
344 OpenOptions::new(O_RDONLY)
345 }
346
347 /// Open message queue for sending only.
348 pub fn writeonly() -> Self {
349 OpenOptions::new(O_WRONLY)
350 }
351
352 /// Open message queue both for sending and receiving.
353 pub fn readwrite() -> Self {
354 OpenOptions::new(O_RDWR)
355 }
356
357 /// Set permissions to create the queue with.
358 ///
359 /// Some bits might be cleared by the process's umask when creating the
360 /// queue, and unknown bits are ignored.
361 ///
362 /// This field is ignored if the queue already exists or should not be created.
363 /// If this method is not called, queues are created with mode 600.
364 pub fn mode(&mut self, mode: u32) -> &mut Self {
365 // 32bit value for consistency with std::os::unix even though only 12
366 // bits are needed. Truncate if necessary because the OS ignores
367 // unknown bits anyway. (and they're probably always zero as well).
368 self.mode = mode as mode_t;
369 return self;
370 }
371
372 /// Set the maximum size of each message.
373 ///
374 /// `recv()` will fail if given a buffer smaller than this value.
375 ///
376 /// If max_msg_len and capacity are both zero (or not set), the queue
377 /// will be created with a maximum length and capacity decided by the
378 /// operating system.
379 /// If this value is specified, capacity should also be, or opening the
380 /// message queue might fail.
381 pub fn max_msg_len(&mut self, max_msg_len: usize) -> &mut Self {
382 self.max_msg_len = max_msg_len;
383 return self;
384 }
385
386 /// Set the maximum number of messages in the queue.
387 ///
388 /// When the queue is full, further `send()`s will either block
389 /// or fail with an error of type `ErrorKind::WouldBlock`.
390 ///
391 /// If both capacity and max_msg_len are zero (or not set), the queue
392 /// will be created with a maximum length and capacity decided by the
393 /// operating system.
394 /// If this value is specified, max_msg_len should also be, or opening the
395 /// message queue might fail.
396 pub fn capacity(&mut self, capacity: usize) -> &mut Self {
397 self.capacity = capacity;
398 return self;
399 }
400
401 /// Create message queue if it doesn't exist.
402 pub fn create(&mut self) -> &mut Self {
403 self.flags |= O_CREAT;
404 self.flags &= !O_EXCL;
405 return self;
406 }
407
408 /// Create a new queue, failing if the queue already exists.
409 pub fn create_new(&mut self) -> &mut Self {
410 self.flags |= O_CREAT | O_EXCL;
411 return self;
412 }
413
414 /// Require the queue to already exist, failing if it doesn't.
415 pub fn existing(&mut self) -> &mut Self {
416 self.flags &= !(O_CREAT | O_EXCL);
417 return self;
418 }
419
420 /// Open the message queue in non-blocking mode.
421 ///
422 /// This must be done if you want to use the message queue with mio.
423 pub fn nonblocking(&mut self) -> &mut Self {
424 self.flags |= O_NONBLOCK;
425 return self;
426 }
427
428 /// Open a queue with the specified options.
429 ///
430 /// If the name doesn't start with a '/', one will be prepended.
431 ///
432 /// # Errors
433 ///
434 /// * Queue doesn't exist (ENOENT) => `ErrorKind::NotFound`
435 /// * Name is just "/" (ENOENT) or is empty => `ErrorKind::NotFound`
436 /// * Queue already exists (EEXISTS) => `ErrorKind::AlreadyExists`
437 /// * Not permitted to open in this mode (EACCESS) => `ErrorKind::PermissionDenied`
438 /// * More than one '/' in name (EACCESS) => `ErrorKind::PermissionDenied`
439 /// * Invalid capacities (EINVAL) => `ErrorKind::InvalidInput`
440 /// * Capacities too high (EMFILE) => `ErrorKind::Other`
441 /// * Posix message queues are disabled (ENOSYS) => `ErrorKind::Other`
442 /// * Name contains '\0' => `ErrorKind::InvalidInput`
443 /// * Name is too long (ENAMETOOLONG) => `ErrorKind::Other`
444 /// * Unlikely (ENFILE, EMFILE, ENOMEM, ENOSPC) => `ErrorKind::Other`
445 /// * Possibly other
446 pub fn open<N: AsRef<[u8]> + ?Sized>(&self, name: &N) -> Result<PosixMq, io::Error> {
447 pub fn open_slice(opts: &OpenOptions, name: &[u8]) -> Result<PosixMq, io::Error> {
448 with_name_as_cstr(name, |name| opts.open_c(&name) )
449 }
450 open_slice(self, name.as_ref())
451 }
452
453 /// Open a queue with the specified options and without inspecting `name`
454 /// or allocating.
455 ///
456 /// This can on NetBSD be used to access message queues with names that
457 /// doesn't start with a '/'.
458 ///
459 /// # Errors
460 ///
461 /// * Queue doesn't exist (ENOENT) => `ErrorKind::NotFound`
462 /// * Name is just "/" (ENOENT) => `ErrorKind::NotFound`
463 /// * Queue already exists (EEXISTS) => `ErrorKind::AlreadyExists`
464 /// * Not permitted to open in this mode (EACCESS) => `ErrorKind::PermissionDenied`
465 /// * More than one '/' in name (EACCESS) => `ErrorKind::PermissionDenied`
466 /// * Invalid capacities (EINVAL) => `ErrorKind::InvalidInput`
467 /// * Posix message queues are disabled (ENOSYS) => `ErrorKind::Other`
468 /// * Name is empty (EINVAL) => `ErrorKind::InvalidInput`
469 /// * Name is too long (ENAMETOOLONG) => `ErrorKind::Other`
470 /// * Unlikely (ENFILE, EMFILE, ENOMEM, ENOSPC) => `ErrorKind::Other`
471 /// * Possibly other
472 pub fn open_c(&self, name: &CStr) -> Result<PosixMq, io::Error> {
473 let opts = self;
474
475 // because mq_open is a vararg function, mode_t cannot be passed
476 // directly on FreeBSD where it's smaller than c_int.
477 let permissions = opts.mode as c_int;
478
479 let mut capacities = unsafe { mem::zeroed::<mq_attr>() };
480 let capacities_ptr = if opts.capacity != 0 || opts.max_msg_len != 0 {
481 capacities.mq_maxmsg = opts.capacity as KernelLong;
482 capacities.mq_msgsize = opts.max_msg_len as KernelLong;
483 &mut capacities as *mut mq_attr
484 } else {
485 ptr::null_mut::<mq_attr>()
486 };
487
488 let mqd = unsafe { mq_open(name.as_ptr(), opts.flags, permissions, capacities_ptr) };
489 // even when mqd_t is a pointer, -1 is the return value for error
490 if mqd == -1isize as mqd_t {
491 return Err(io::Error::last_os_error());
492 }
493 let mq = PosixMq{mqd};
494
495 // NetBSD and DragonFly BSD doesn't set cloexec by default and
496 // ignores O_CLOEXEC. Setting it with FIOCLEX works though.
497 // Propagate error if setting cloexec somehow fails, even though
498 // close-on-exec won't matter in most cases.
499 #[cfg(any(target_os="netbsd", target_os="dragonfly"))]
500 mq.set_cloexec(true)?;
501
502 Ok(mq)
503 }
504}
505
506
507/// Delete a posix message queue.
508///
509/// A `'/'` is prepended to the name if it doesn't start with one already.
510/// (it would have to append a `'\0'` and therefore allocate or copy anyway.)
511///
512/// Processes that have it open will still be able to use it.
513///
514/// # Errors
515///
516/// * Queue doesn't exist (ENOENT) => `ErrorKind::NotFound`
517/// * Name is invalid (ENOENT or EACCESS) => `ErrorKind::NotFound` or `ErrorKind::PermissionDenied`
518/// * Not permitted to delete the queue (EACCES) => `ErrorKind::PermissionDenied`
519/// * Posix message queues are disabled (ENOSYS) => `ErrorKind::Other`
520/// * Name contains '\0' bytes => `ErrorKind::InvalidInput`
521/// * Name is too long (ENAMETOOLONG) => `ErrorKind::Other`
522/// * Possibly other
523pub fn remove_queue<N: AsRef<[u8]> + ?Sized>(name: &N) -> Result<(), io::Error> {
524 fn remove_queue_slice(name: &[u8]) -> Result<(), io::Error> {
525 with_name_as_cstr(name, |name| remove_queue_c(&name) )
526 }
527 remove_queue_slice(name.as_ref())
528}
529
530/// Delete a posix message queue, without inspecting `name` or allocating.
531///
532/// This function is on NetBSD necessary to remove queues with names that
533/// doesn't start with a '/'.
534///
535/// # Errors
536///
537/// * Queue doesn't exist (ENOENT) => `ErrorKind::NotFound`
538/// * Not permitted to delete the queue (EACCES) => `ErrorKind::PermissionDenied`
539/// * Posix message queues are disabled (ENOSYS) => `ErrorKind::Other`
540/// * More than one '/' in name (EACCESS) => `ErrorKind::PermissionDenied`
541/// * Name is empty (EINVAL) => `ErrorKind::InvalidInput`
542/// * Name is invalid (ENOENT, EACCESS or EINVAL) => `ErrorKind::NotFound`
543/// `ErrorKind::PermissionDenied` or `ErrorKind::InvalidInput`
544/// * Name is too long (ENAMETOOLONG) => `ErrorKind::Other`
545/// * Possibly other
546pub fn remove_queue_c(name: &CStr) -> Result<(), io::Error> {
547 let name = name.as_ptr();
548 let ret = unsafe { mq_unlink(name) };
549 if ret != 0 {
550 return Err(io::Error::last_os_error());
551 }
552 Ok(())
553}
554
555
556// The fields of `mq_attr` and `timespec` are of type `long` on all targets
557// except x86_64-unknown-linux-gnux32, where they are `long long` (to match up
558// with normal x86_64 `long`).
559// Rusts lack of implicit widening makes this peculiarity annoying.
560#[cfg(all(target_arch="x86_64", target_os="linux", target_pointer_width="32"))]
561type KernelLong = i64;
562#[cfg(not(all(target_arch="x86_64", target_os="linux", target_pointer_width="32")))]
563type KernelLong = c_long;
564
565/// Contains information about the capacities and state of a posix message queue.
566///
567/// Created by [`PosixMq::attributes()`](struct.PosixMq.html#method.attributes).
568#[derive(Clone,Copy, PartialEq,Eq, Default)]
569pub struct Attributes {
570 /// The maximum size of messages that can be stored in the queue.
571 pub max_msg_len: usize,
572 /// The maximum number of messages in the queue.
573 pub capacity: usize,
574 /// The number of messages currently in the queue at the time the
575 /// attributes were retrieved.
576 pub current_messages: usize,
577 /// Whether the descriptor was set to nonblocking mode when
578 /// the attributes were retrieved.
579 pub nonblocking: bool,
580 _private: ()
581}
582
583impl Debug for Attributes {
584 fn fmt(&self, fmtr: &mut fmt::Formatter) -> fmt::Result {
585 fmtr.debug_struct("Attributes")
586 .field("max_msg_len", &self.max_msg_len)
587 .field("capacity", &self.capacity)
588 .field("current_messages", &self.current_messages)
589 .field("nonblocking", &self.nonblocking)
590 .finish()
591 }
592}
593
594
595macro_rules! retry_if_interrupted {($call:expr) => {{
596 loop {// catch EINTR and retry
597 let ret = $call;
598 if ret != -1 {
599 break ret;
600 }
601 let err = io::Error::last_os_error();
602 if err.kind() != ErrorKind::Interrupted {
603 return Err(err)
604 }
605 }
606}}}
607
608/// Returns saturated timespec as err if systemtime cannot be represented
609fn deadline_to_realtime(deadline: SystemTime) -> Result<timespec, timespec> {
610 /// Don't use struct literal in case timespec has extra fields on some platform.
611 fn new_timespec(secs: time_t, nsecs: KernelLong) -> timespec {
612 let mut ts: timespec = unsafe { mem::zeroed() };
613 ts.tv_sec = secs;
614 ts.tv_nsec = nsecs;
615 return ts;
616 }
617
618 // mq_timedsend() and mq_timedreceive() takes an absolute point in time,
619 // based on CLOCK_REALTIME aka SystemTime.
620 match deadline.duration_since(SystemTime::UNIX_EPOCH) {
621 // Currently SystemTime has the same range as the C types, but
622 // avoid truncation in case this changes.
623 Ok(expires) if expires.as_secs() > time_t::max_value() as u64
624 => Err(new_timespec(time_t::max_value(), 0)),
625 Ok(expires)
626 => Ok(new_timespec(expires.as_secs() as time_t, expires.subsec_nanos() as KernelLong)),
627 // A pre-1970 deadline is probably a bug, but handle it anyway.
628 // Based on https://github.com/solemnwarning/timespec/blob/master/README.md
629 // the subsecond part of timespec should be positive and counts toward
630 // positive infinity; (-1, 0) < (-1, 999999999) < (0, 0). This has the
631 // advantage of simplifying addition and subtraction, but is the
632 // opposite of Duration which counts away from zero.
633 // The minimum representable value is therefore (-min_value(), 0)
634 Err(ref earlier) if earlier.duration() > Duration::new(time_t::max_value() as u64 + 1, 0)
635 => Err(new_timespec(time_t::min_value()+1, 0)), // add one to avoid negation bugs
636 Err(ref earlier) if earlier.duration().subsec_nanos() == 0
637 => Ok(new_timespec(-(earlier.duration().as_secs() as time_t), 0)),
638 Err(earlier) => {
639 // convert fractional part from counting away from zero to counting
640 // toward positive infinity
641 let before = earlier.duration();
642 let secs = -(before.as_secs() as time_t) - 1;
643 let nsecs = 1_000_000_000 - before.subsec_nanos() as KernelLong;
644 Ok(new_timespec(secs, nsecs))
645 }
646 }
647}
648
649/// Returns an error if timeout is not representable or the produced deadline
650/// overflows.
651fn timeout_to_realtime(timeout: Duration) -> Result<timespec, io::Error> {
652 if let Ok(now) = deadline_to_realtime(SystemTime::now()) {
653 let mut expires = now;
654 expires.tv_sec = expires.tv_sec.wrapping_add(timeout.as_secs() as time_t);
655 // nanosecond values only use 30 bits, so adding two together is safe
656 // even if tv_nsec is an i32
657 expires.tv_nsec += timeout.subsec_nanos() as KernelLong;
658 const NANO: KernelLong = 1_000_000_000;
659 expires.tv_sec = expires.tv_sec.wrapping_add(expires.tv_nsec / NANO);
660 expires.tv_nsec %= NANO;
661 // check that the unsigned timeout is representable as a signed and
662 // possibly smaller time_t, and the additions didn't overflow.
663 // The second check will fail to catch Duration::new(!0, 999_999_999)
664 // (which makes tv_sec wrap completely to the original value), but
665 // the unsigned max value is not representable as a signed value and
666 // will be caught by the first check.
667 // Using wrapping_add and catching overflow afterwards avoids repeating
668 // the error creation and also handles negative system time.
669 if timeout.as_secs() > time_t::max_value() as u64 || expires.tv_sec < now.tv_sec {
670 Err(io::Error::new(ErrorKind::InvalidInput, "timeout is too long"))
671 } else {
672 Ok(expires)
673 }
674 } else {
675 Err(io::Error::new(ErrorKind::Other, "system time is not representable"))
676 }
677}
678
679
680/// A descriptor for an open posix message queue.
681///
682/// Message queues can be sent to and / or received from depending on the
683/// options it was opened with.
684///
685/// The descriptor is closed when this struct is dropped.
686///
687/// See [the documentation in the crate root](index.html) for examples,
688/// portability notes and OS details.
689pub struct PosixMq {
690 mqd: mqd_t
691}
692
693impl PosixMq {
694 /// Open an existing message queue in read-write mode.
695 ///
696 /// See [`OpenOptions::open()`](struct.OpenOptions.html#method.open) for
697 /// details and possible errors.
698 pub fn open<N: AsRef<[u8]> + ?Sized>(name: &N) -> Result<Self, io::Error> {
699 OpenOptions::readwrite().open(name)
700 }
701
702 /// Open a message queue in read-write mode, creating it if it doesn't exists.
703 ///
704 /// See [`OpenOptions::open()`](struct.OpenOptions.html#method.open) for
705 /// details and possible errors.
706 pub fn create<N: AsRef<[u8]> + ?Sized>(name: &N) -> Result<Self, io::Error> {
707 OpenOptions::readwrite().create().open(name)
708 }
709
710
711 /// Add a message to the queue.
712 ///
713 /// For maximum portability, avoid using priorities >= 32 or sending
714 /// zero-length messages.
715 ///
716 /// # Errors
717 ///
718 /// * Queue is full and opened in nonblocking mode (EAGAIN) => `ErrorKind::WouldBlock`
719 /// * Message is too big for the queue (EMSGSIZE) => `ErrorKind::Other`
720 /// * Message is zero-length and the OS doesn't allow this (EMSGSIZE) => `ErrorKind::Other`
721 /// * Priority is too high (EINVAL) => `ErrorKind::InvalidInput`
722 /// * Queue is opened in read-only mode (EBADF) => `ErrorKind::Other`
723 /// * Possibly other => `ErrorKind::Other`
724 pub fn send(&self, priority: u32, msg: &[u8]) -> Result<(), io::Error> {
725 let mptr = msg.as_ptr() as *const c_char;
726 retry_if_interrupted!(unsafe { mq_send(self.mqd, mptr, msg.len(), priority as c_uint) });
727 Ok(())
728 }
729
730 /// Take the message with the highest priority from the queue.
731 ///
732 /// The buffer must be at least as big as the maximum message length.
733 ///
734 /// # Errors
735 ///
736 /// * Queue is empty and opened in nonblocking mode (EAGAIN) => `ErrorKind::WouldBlock`
737 /// * The receive buffer is smaller than the queue's maximum message size (EMSGSIZE) => `ErrorKind::Other`
738 /// * Queue is opened in write-only mode (EBADF) => `ErrorKind::Other`
739 /// * Possibly other => `ErrorKind::Other`
740 pub fn recv(&self, msgbuf: &mut [u8]) -> Result<(u32, usize), io::Error> {
741 let bptr = msgbuf.as_mut_ptr() as *mut c_char;
742 let mut priority = 0 as c_uint;
743 let len = retry_if_interrupted!(
744 unsafe { mq_receive(self.mqd, bptr, msgbuf.len(), &mut priority) }
745 );
746 // c_uint is unlikely to differ from u32, but even if it's bigger, the
747 // range of supported values will likely be far smaller.
748 Ok((priority as u32, len as usize))
749 }
750
751 /// Returns an `Iterator` which calls [`recv()`](#method.recv) repeatedly
752 /// with an appropriately sized buffer.
753 ///
754 /// If the message queue is opened in non-blocking mode the iterator can be
755 /// used to drain the queue. Otherwise it will block and never end.
756 pub fn iter<'a>(&'a self) -> Iter<'a> {
757 self.into_iter()
758 }
759
760
761 fn timedsend(&self, priority: u32, msg: &[u8], deadline: ×pec)
762 -> Result<(), io::Error> {
763 let mptr = msg.as_ptr() as *const c_char;
764 retry_if_interrupted!(unsafe {
765 mq_timedsend(self.mqd, mptr, msg.len(), priority as c_uint, deadline)
766 });
767 Ok(())
768 }
769
770 /// Add a message to the queue or cancel if it's still full after a given
771 /// duration.
772 ///
773 /// Returns immediately if opened in nonblocking mode, and the timeout has
774 /// no effect.
775 ///
776 /// For maximum portability, avoid using priorities >= 32 or sending
777 /// zero-length messages.
778 ///
779 /// # Errors
780 ///
781 /// * Timeout expired (ETIMEDOUT) => `ErrorKind::TimedOut`
782 /// * Message is too big for the queue (EMSGSIZE) => `ErrorKind::Other`
783 /// * OS doesn't allow empty messages (EMSGSIZE) => `ErrorKind::Other`
784 /// * Priority is too high (EINVAL) => `ErrorKind::InvalidInput`
785 /// * Queue is full and opened in nonblocking mode (EAGAIN) => `ErrorKind::WouldBlock`
786 /// * Queue is opened in write-only mode (EBADF) => `ErrorKind::Other`
787 /// * Timeout is too long / not representable => `ErrorKind::InvalidInput`
788 /// * Possibly other => `ErrorKind::Other`
789 pub fn send_timeout(&self, priority: u32, msg: &[u8], timeout: Duration)
790 -> Result<(), io::Error> {
791 timeout_to_realtime(timeout).and_then(|expires| self.timedsend(priority, msg, &expires) )
792 }
793
794 /// Add a message to the queue or cancel if the queue is still full at a
795 /// certain point in time.
796 ///
797 /// Returns immediately if opened in nonblocking mode, and the timeout has
798 /// no effect.
799 /// The deadline is a `SystemTime` because the queues are intended for
800 /// inter-process commonication, and `Instant` might be process-specific.
801 ///
802 /// For maximum portability, avoid using priorities >= 32 or sending
803 /// zero-length messages.
804 ///
805 /// # Errors
806 ///
807 /// * Deadline reached (ETIMEDOUT) => `ErrorKind::TimedOut`
808 /// * Message is too big for the queue (EMSGSIZE) => `ErrorKind::Other`
809 /// * OS doesn't allow empty messages (EMSGSIZE) => `ErrorKind::Other`
810 /// * Priority is too high (EINVAL) => `ErrorKind::InvalidInput`
811 /// * Queue is full and opened in nonblocking mode (EAGAIN) => `ErrorKind::WouldBlock`
812 /// * Queue is opened in write-only mode (EBADF) => `ErrorKind::Other`
813 /// * Possibly other => `ErrorKind::Other`
814 pub fn send_deadline(&self, priority: u32, msg: &[u8], deadline: SystemTime)
815 -> Result<(), io::Error> {
816 match deadline_to_realtime(deadline) {
817 Ok(expires) => self.timedsend(priority, msg, &expires),
818 Err(_) => Err(io::Error::new(ErrorKind::InvalidInput, "deadline is not representable"))
819 }
820 }
821
822 fn timedreceive(&self, msgbuf: &mut[u8], deadline: ×pec)
823 -> Result<(u32, usize), io::Error> {
824 let bptr = msgbuf.as_mut_ptr() as *mut c_char;
825 let mut priority: c_uint = 0;
826 let len = retry_if_interrupted!(
827 unsafe { mq_timedreceive(self.mqd, bptr, msgbuf.len(), &mut priority, deadline) }
828 );
829 Ok((priority as u32, len as usize))
830 }
831
832 /// Take the message with the highest priority from the queue or cancel if
833 /// the queue still empty after a given duration.
834 ///
835 /// Returns immediately if opened in nonblocking mode, and the timeout has
836 /// no effect.
837 ///
838 /// # Errors
839 ///
840 /// * Timeout expired (ETIMEDOUT) => `ErrorKind::TimedOut`
841 /// * The receive buffer is smaller than the queue's maximum message size (EMSGSIZE) => `ErrorKind::Other`
842 /// * Queue is empty and opened in nonblocking mode (EAGAIN) => `ErrorKind::WouldBlock`
843 /// * Queue is opened in read-only mode (EBADF) => `ErrorKind::Other`
844 /// * Timeout is too long / not representable => `ErrorKind::InvalidInput`
845 /// * Possibly other => `ErrorKind::Other`
846 pub fn recv_timeout(&self, msgbuf: &mut[u8], timeout: Duration)
847 -> Result<(u32, usize), io::Error> {
848 timeout_to_realtime(timeout).and_then(|expires| self.timedreceive(msgbuf, &expires) )
849 }
850
851 /// Take the message with the highest priority from the queue or cancel if
852 /// the queue is still empty at a point in time.
853 ///
854 /// Returns immediately if opened in nonblocking mode, and the timeout has
855 /// no effect.
856 /// The deadline is a `SystemTime` because the queues are intended for
857 /// inter-process commonication, and `Instant` might be process-specific.
858 ///
859 /// # Errors
860 ///
861 /// * Deadline reached (ETIMEDOUT) => `ErrorKind::TimedOut`
862 /// * The receive buffer is smaller than the queue's maximum message size (EMSGSIZE) => `ErrorKind::Other`
863 /// * Queue is empty and opened in nonblocking mode (EAGAIN) => `ErrorKind::WouldBlock`
864 /// * Queue is opened in read-only mode (EBADF) => `ErrorKind::Other`
865 /// * Possibly other => `ErrorKind::Other`
866 pub fn recv_deadline(&self, msgbuf: &mut[u8], deadline: SystemTime)
867 -> Result<(u32, usize), io::Error> {
868 match deadline_to_realtime(deadline) {
869 Ok(expires) => self.timedreceive(msgbuf, &expires),
870 Err(_) => Err(io::Error::new(ErrorKind::InvalidInput, "deadline is not representable"))
871 }
872 }
873
874
875 /// Get information about the state of the message queue.
876 ///
877 /// # Errors
878 ///
879 /// Retrieving these attributes should only fail if the underlying
880 /// descriptor has been closed or is not a message queue.
881 ///
882 /// On operating systems where the descriptor is a pointer, such as on
883 /// FreeBSD and Illumos, such bugs will enable undefined behavior
884 /// and this call will dereference freed or uninitialized memory.
885 /// (That doesn't make this function unsafe though -
886 /// [`PosixMq::from_raw_mqd()`](#method.from_raw_mqd) and `mq_close()` are.)
887 ///
888 /// While a `send()` or `recv()` ran in place of this call would also have
889 /// failed immediately and therefore not blocked, The descriptor might have
890 /// become used for another queue when a *later* `send()` or `recv()` is
891 /// performed. The descriptor might then be in blocking mode.
892 ///
893 /// # Examples
894 ///
895 /// ```
896 /// # let _ = posixmq::remove_queue("/with_custom_capacity");
897 /// let mq = posixmq::OpenOptions::readwrite()
898 /// .create_new()
899 /// .max_msg_len(100)
900 /// .capacity(3)
901 /// .open("/with_custom_capacity")
902 /// .expect("create queue");
903 /// let attrs = mq.attributes().expect("get attributes for queue");
904 /// assert_eq!(attrs.max_msg_len, 100);
905 /// assert_eq!(attrs.capacity, 3);
906 /// assert_eq!(attrs.current_messages, 0);
907 /// assert!(!attrs.nonblocking);
908 /// ```
909 ///
910 /// Ignore the error:
911 ///
912 /// (Will only happen with buggy code (incorrect usage of
913 /// [`from_raw_fd()`](#method.from_raw_fd) or similar)).
914 ///
915 #[cfg_attr(
916 any(target_os="linux", target_os="android", target_os="netbsd", target_os="dragonfly"),
917 doc="```"
918 )]
919 #[cfg_attr(
920 not(any(target_os="linux", target_os="android", target_os="netbsd", target_os="dragonfly")),
921 doc="```no_compile"
922 )]
923 /// # use std::os::unix::io::FromRawFd;
924 /// # let bad = unsafe { posixmq::PosixMq::from_raw_fd(-1) };
925 /// let attrs = bad.attributes().unwrap_or_default();
926 /// assert_eq!(attrs.max_msg_len, 0);
927 /// assert_eq!(attrs.capacity, 0);
928 /// assert_eq!(attrs.current_messages, 0);
929 /// assert!(!attrs.nonblocking);
930 /// ```
931 pub fn attributes(&self) -> Result<Attributes, io::Error> {
932 let mut attrs: mq_attr = unsafe { mem::zeroed() };
933 if unsafe { mq_getattr(self.mqd, &mut attrs) } == -1 {
934 Err(io::Error::last_os_error())
935 } else {
936 Ok(Attributes {
937 max_msg_len: attrs.mq_msgsize as usize,
938 capacity: attrs.mq_maxmsg as usize,
939 current_messages: attrs.mq_curmsgs as usize,
940 nonblocking: (attrs.mq_flags & (O_NONBLOCK as KernelLong)) != 0,
941 _private: ()
942 })
943 }
944 }
945
946 /// Check whether this descriptor is in nonblocking mode.
947 ///
948 /// # Errors
949 ///
950 /// Should only fail as result of buggy code that either created this
951 /// descriptor from something that is not a queue, or has already closed
952 /// the underlying descriptor.
953 /// (This function will not silently succeed if the fd points to anything
954 /// other than a queue (for example a socket), as this function
955 /// is a wrapper around [`attributes()`][#method.attributes].)
956 /// To ignore failure, one can write `.is_nonblocking().unwrap_or(false)`.
957 ///
958 /// ## An error doesn't guarantee that any further [`send()`](#method.send) or [`recv()`](#method.recv) wont block.
959 ///
960 /// While a `send()` or `recv()` ran in place of this call would also have
961 /// failed immediately and therefore not blocked, the descriptor might have
962 /// become used for another queue when a *later* `send()` or `recv()` is
963 /// performed. The descriptor might then be in blocking mode.
964 pub fn is_nonblocking(&self) -> Result<bool, io::Error> {
965 match self.attributes() {
966 Ok(attrs) => Ok(attrs.nonblocking),
967 Err(e) => Err(e),
968 }
969 }
970
971 /// Enable or disable nonblocking mode for this descriptor.
972 ///
973 /// This can also be set when opening the message queue,
974 /// with [`OpenOptions::nonblocking()`](struct.OpenOptions.html#method.nonblocking).
975 ///
976 /// # Errors
977 ///
978 /// Setting nonblocking mode should only fail due to incorrect usage of
979 /// `from_raw_fd()` or `as_raw_fd()`, see the documentation on
980 /// [`attributes()`](struct.PosixMq.html#method.attributes) for details.
981 pub fn set_nonblocking(&self, nonblocking: bool) -> Result<(), io::Error> {
982 let mut attrs: mq_attr = unsafe { mem::zeroed() };
983 attrs.mq_flags = if nonblocking {O_NONBLOCK as KernelLong} else {0};
984 let res = unsafe { mq_setattr(self.mqd, &attrs, ptr::null_mut()) };
985 if res == -1 {
986 return Err(io::Error::last_os_error());
987 }
988 Ok(())
989 }
990
991
992 /// Create a new descriptor for the same message queue.
993 ///
994 /// The new descriptor will have close-on-exec set.
995 ///
996 /// This function is not available on FreeBSD, Illumos or Solaris.
997 #[cfg(any(target_os="linux", target_os="dragonfly", target_os="netbsd"))]
998 pub fn try_clone(&self) -> Result<Self, io::Error> {
999 let mq = match unsafe { fcntl(self.mqd, F_DUPFD_CLOEXEC, 0) } {
1000 -1 => return Err(io::Error::last_os_error()),
1001 fd => PosixMq{mqd: fd},
1002 };
1003 // NetBSD ignores the cloexec part of F_DUPFD_CLOEXEC
1004 // (but DragonFly BSD respects it here)
1005 #[cfg(target_os="netbsd")]
1006 mq.set_cloexec(true)?;
1007 Ok(mq)
1008 }
1009
1010
1011 /// Check whether this descriptor will be closed if the process `exec`s
1012 /// into another program.
1013 ///
1014 /// Posix message queues are closed on exec by default,
1015 /// but this can be changed with [`set_cloexec()`](#method.set_cloexec).
1016 ///
1017 /// This function is not available on Illumos, Solaris or VxWorks.
1018 ///
1019 /// # Errors
1020 ///
1021 /// Retrieving this flag should only fail if the descriptor
1022 /// is already closed.
1023 /// In that case it will obviously not be open after execing,
1024 /// so treating errors as `true` should be safe.
1025 ///
1026 /// # Examples
1027 ///
1028 /// ```
1029 /// let queue = posixmq::PosixMq::create("is_cloexec").expect("open queue");
1030 /// # posixmq::remove_queue("is_cloexec").expect("delete queue");
1031 /// assert!(queue.is_cloexec().unwrap_or(true));
1032 /// ```
1033 pub fn is_cloexec(&self) -> Result<bool, io::Error> {
1034 #[cfg(any(
1035 target_os="linux", target_os="freebsd",
1036 target_os="netbsd", target_os="dragonfly",
1037 ))]
1038 match unsafe { fcntl(self.as_raw_fd(), F_GETFD) } {
1039 -1 => Err(io::Error::last_os_error()),
1040 flags => Ok((flags & FD_CLOEXEC) != 0),
1041 }
1042 #[cfg(not(any(
1043 target_os="linux", target_os="freebsd",
1044 target_os="netbsd", target_os="dragonfly",
1045 )))]
1046 Err(io::Error::new(
1047 ErrorKind::Other,
1048 "close-on-exec information is not available"
1049 ))
1050 }
1051
1052 /// Change close-on-exec for this descriptor.
1053 ///
1054 /// It is on by default, so this method should only be called when one
1055 /// wants the descriptor to remain open afte `exec`ing.
1056 ///
1057 /// This function is not available on Illumos, Solaris or VxWorks.
1058 ///
1059 /// # Errors
1060 ///
1061 /// This function should only fail if the underlying file descriptor has
1062 /// been closed (due to incorrect usage of `from_raw_fd()` or similar),
1063 /// and not reused for something else yet.
1064 #[cfg(any(
1065 target_os="linux", target_os="freebsd",
1066 target_os="netbsd", target_os="dragonfly",
1067 ))]
1068 pub fn set_cloexec(&self, cloexec: bool) -> Result<(), io::Error> {
1069 let op = if cloexec {FIOCLEX} else {FIONCLEX};
1070 match unsafe { ioctl(self.as_raw_fd(), op) } {
1071 // Don't hide the error here, because callers can ignore the
1072 // returned value if they want.
1073 -1 => Err(io::Error::last_os_error()),
1074 _ => Ok(())
1075 }
1076 }
1077
1078
1079 /// Create a `PosixMq` from an already opened message queue descriptor.
1080 ///
1081 /// This function should only be used for ffi or if calling `mq_open()`
1082 /// directly for some reason.
1083 /// Use [`from_raw_fd()`](#method.from_raw_fd) instead if the surrounding
1084 /// code requires `mqd_t` to be a file descriptor.
1085 ///
1086 /// # Safety
1087 ///
1088 /// On some operating systems `mqd_t` is a pointer, which means that the
1089 /// safety of most other methods depend on it being correct.
1090 pub unsafe fn from_raw_mqd(mqd: mqd_t) -> Self {
1091 PosixMq{mqd}
1092 }
1093
1094 /// Get the raw message queue descriptor.
1095 ///
1096 /// This function should only be used for passing to ffi code or to access
1097 /// portable features not exposed by this wrapper (such as calling
1098 /// `mq_notify()` or not automatically retrying on EINTR /
1099 /// `ErrorKind::Interrupted` when sending or receiving).
1100 ///
1101 /// If you need a file descriptor, use `as_raw_fd()` instead for increased
1102 /// portability.
1103 /// ([`as_raw_fd()`](#method.as_raw_fd) can sometimes retrieve an
1104 /// underlying file descriptor even if `mqd_t` is not an `int`.)
1105 pub fn as_raw_mqd(&self) -> mqd_t {
1106 self.mqd
1107 }
1108
1109 /// Convert this wrapper into the raw message queue descriptor without
1110 /// closing it.
1111 ///
1112 /// This function should only be used for ffi; If you need a file
1113 /// descriptor use [`into_raw_fd()`](#method.into_raw_fd) instead.
1114 pub fn into_raw_mqd(self) -> mqd_t {
1115 let mqd = self.mqd;
1116 mem::forget(self);
1117 return mqd;
1118 }
1119}
1120
1121/// Get an underlying file descriptor for the message queue.
1122///
1123/// If you just need the raw `mqd_t`, use
1124/// [`as_raw_mqd()`](struct.PosixMq.html#method.as_raw_mqd)
1125/// instead for increased portability.
1126///
1127/// This impl is not available on Illumos, Solaris or VxWorks.
1128#[cfg(any(
1129 target_os="linux", target_os="freebsd",
1130 target_os="netbsd", target_os="dragonfly",
1131))]
1132impl AsRawFd for PosixMq {
1133 // On Linux, NetBSD and DragonFly BSD, `mqd_t` is a plain file descriptor
1134 // and can trivially be convverted, but this is not guaranteed, nor the
1135 // case on FreeBSD, Illumos and Solaris.
1136 #[cfg(not(target_os="freebsd"))]
1137 fn as_raw_fd(&self) -> RawFd {
1138 self.mqd
1139 }
1140
1141 // FreeBSD has mq_getfd_np() (where _np stands for non-portable)
1142 #[cfg(target_os="freebsd")]
1143 fn as_raw_fd(&self) -> RawFd {
1144 unsafe { mq_getfd_np(self.mqd) }
1145 }
1146}
1147
1148/// Create a `PosixMq` wrapper from a raw file descriptor.
1149///
1150/// Note that the message queue will be closed when the returned `PosixMq` goes
1151/// out of scope / is dropped.
1152///
1153/// This impl is not available on FreeBSD, Illumos or Solaris; If you got a
1154/// `mqd_t` in a portable fashion (from FFI code or by calling `mq_open()`
1155/// yourself for some reason), use
1156/// [`from_raw_mqd()`](struct.PosixMq.html#method.from_raw_mqd) instead.
1157#[cfg(any(target_os="linux", target_os="netbsd", target_os="dragonfly"))]
1158impl FromRawFd for PosixMq {
1159 unsafe fn from_raw_fd(fd: RawFd) -> Self {
1160 PosixMq{mqd: fd}
1161 }
1162}
1163
1164/// Convert the `PosixMq` into a raw file descriptor without closing the
1165/// message queue.
1166///
1167/// This impl is not available on FreeBSD, Illumos or Solaris. If you need to
1168/// transfer ownership to FFI code accepting a `mqd_t`, use
1169/// [`into_raw_mqd()`](struct.PosixMq.html#method.into_raw_mqd) instead.
1170#[cfg(any(target_os="linux", target_os="netbsd", target_os="dragonfly"))]
1171impl IntoRawFd for PosixMq {
1172 fn into_raw_fd(self) -> RawFd {
1173 let fd = self.mqd;
1174 mem::forget(self);
1175 return fd;
1176 }
1177}
1178
1179
1180impl IntoIterator for PosixMq {
1181 type Item = (u32, Vec<u8>);
1182 type IntoIter = IntoIter;
1183 fn into_iter(self) -> IntoIter {
1184 IntoIter {
1185 max_msg_len: match self.attributes() {
1186 Ok(attrs) => attrs.max_msg_len,
1187 Err(_) => 0,
1188 },
1189 mq: self,
1190 }
1191 }
1192}
1193
1194impl<'a> IntoIterator for &'a PosixMq {
1195 type Item = (u32, Vec<u8>);
1196 type IntoIter = Iter<'a>;
1197 fn into_iter(self) -> Iter<'a> {
1198 Iter {
1199 max_msg_len: match self.attributes() {
1200 Ok(attrs) => attrs.max_msg_len,
1201 Err(_) => 0,
1202 },
1203 mq: self,
1204 }
1205 }
1206}
1207
1208
1209impl Debug for PosixMq {
1210 fn fmt(&self, fmtr: &mut Formatter) -> fmt::Result {
1211 let mut representation = fmtr.debug_struct("PosixMq");
1212 // display raw value and name unless we know it's a plain fd
1213 #[cfg(not(any(
1214 target_os="linux", target_os="netbsd", target_os="dragonfly",
1215 )))]
1216 representation.field("mqd", &self.mqd);
1217 // show file descriptor where we have one
1218 #[cfg(any(
1219 target_os="linux", target_os="freebsd",
1220 target_os="netbsd", target_os="dragonfly",
1221 ))]
1222 representation.field("fd", &self.as_raw_fd());
1223 return representation.finish();
1224 }
1225}
1226
1227impl Drop for PosixMq {
1228 fn drop(&mut self) {
1229 unsafe { mq_close(self.mqd) };
1230 }
1231}
1232
1233// On some platforms mqd_t is a pointer, so Send and Sync aren't
1234// auto-implemented there. While I don't feel certain enough to
1235// blanket-implement Sync, I can't see why an implementation would make it UB
1236// to move operations to another thread.
1237unsafe impl Send for PosixMq {}
1238
1239// On FreeBSD, mqd_t is a `struct{int fd, struct sigev_node* node}*`,
1240// but the sigevent is only accessed by `mq_notify()`, so it's thread-safe
1241// as long as that function requires `&mut self` or isn't exposed.
1242// src: https://svnweb.freebsd.org/base/head/lib/librt/mq.c?view=markup
1243// On Illumos, mqd_t points to a rather complex struct, but the functions use
1244// mutexes and semaphores, so I assume they're totally thread-safe.
1245// src: https://github.com/illumos/illumos-gate/blob/master/usr/src/lib/libc/port/rt/mqueue.c
1246// Solaris I assume is equivalent to Illumos, because the Illumos code has
1247// barely been modified after the initial source code release.
1248// Linux, NetBSD and DragonFly BSD gets Sync auto-implemented because
1249// mqd_t is an int.
1250#[cfg(any(target_os="freebsd", target_os="illumos", target_os="solaris"))]
1251unsafe impl Sync for PosixMq {}
1252
1253
1254/// Allow receiving event notifications through mio (version 0.6).
1255///
1256/// This impl requires the `mio_06` feature to be enabled:
1257///
1258/// ```toml
1259/// [dependencies]
1260/// posixmq = {version="1.0", features=["mio_06"]}
1261/// ```
1262///
1263/// Remember to open the queue in non-blocking mode. (with `OpenOptions.noblocking()`)
1264#[cfg(feature="mio_06")]
1265impl Evented for PosixMq {
1266 fn register(&self, poll: &Poll, token: mio_06::Token, interest: Ready, opts: PollOpt)
1267 -> Result<(), io::Error> {
1268 EventedFd(&self.as_raw_fd()).register(poll, token, interest, opts)
1269 }
1270
1271 fn reregister(&self, poll: &Poll, token: mio_06::Token, interest: Ready, opts: PollOpt)
1272 -> Result<(), io::Error> {
1273 EventedFd(&self.as_raw_fd()).reregister(poll, token, interest, opts)
1274 }
1275
1276 fn deregister(&self, poll: &Poll) -> Result<(), io::Error> {
1277 EventedFd(&self.as_raw_fd()).deregister(poll)
1278 }
1279}
1280
1281
1282/// Allow receiving event notifications through mio (version 0.7).
1283///
1284/// This impl requires the `mio_07` feature to be enabled:
1285///
1286/// ```toml
1287/// [dependencies]
1288/// posixmq = {version="1.0", features=["mio_07"]}
1289/// ```
1290///
1291/// Due to a [long-lived bug in cargo]() this will currently enable
1292/// the os_reactor feature of mio. This is not intended, and can change in the
1293/// future.
1294///
1295/// You probably want to make the queue non-blocking: Either use
1296/// [`OpenOptions.noblocking()`](struct.OpenOptions.html#method.nonblocking)
1297/// when preparing to open the queue, or call [`set_nonblocking(true)`](struct.PosixMq.html#method.set_nonblocking).
1298#[cfg(feature="mio_07")]
1299impl Source for &PosixMq {
1300 fn register(&mut self, registry: &Registry, token: mio_07::Token, interest: Interest)
1301 -> Result<(), io::Error> {
1302 SourceFd(&self.as_raw_fd()).register(registry, token, interest)
1303 }
1304
1305 fn reregister(&mut self, registry: &Registry, token: mio_07::Token, interest: Interest)
1306 -> Result<(), io::Error> {
1307 SourceFd(&self.as_raw_fd()).reregister(registry, token, interest)
1308 }
1309
1310 fn deregister(&mut self, registry: &Registry) -> Result<(), io::Error> {
1311 SourceFd(&self.as_raw_fd()).deregister(registry)
1312 }
1313}
1314
1315#[cfg(feature="mio_07")]
1316impl Source for PosixMq {
1317 fn register(&mut self, registry: &Registry, token: mio_07::Token, interest: Interest)
1318 -> Result<(), io::Error> {
1319 {&mut &*self}.register(registry, token, interest)
1320 }
1321
1322 fn reregister(&mut self, registry: &Registry, token: mio_07::Token, interest: Interest)
1323 -> Result<(), io::Error> {
1324 {&mut &*self}.reregister(registry, token, interest)
1325 }
1326
1327 fn deregister(&mut self, registry: &Registry) -> Result<(), io::Error> {
1328 {&mut &*self}.deregister(registry)
1329 }
1330}
1331
1332
1333/// An `Iterator` that calls [`recv()`](struct.PosixMq.html#method.recv) on a borrowed [`PosixMq`](struct.PosixMq.html).
1334///
1335/// Iteration ends when a `recv()` fails with an `ErrorKind::WouldBlock` error,
1336/// but is infinite if the descriptor is in blocking mode.
1337///
1338/// # Panics
1339///
1340/// `next()` will panic if an error of type other than `ErrorKind::WouldBlock`
1341/// or `ErrorKind::Interrupted` occurs.
1342#[derive(Clone)]
1343pub struct Iter<'a> {
1344 mq: &'a PosixMq,
1345 /// Cached
1346 max_msg_len: usize,
1347}
1348
1349impl<'a> Iterator for Iter<'a> {
1350 type Item = (u32, Vec<u8>);
1351 fn next(&mut self) -> Option<(u32, Vec<u8>)> {
1352 let mut buf = vec![0; self.max_msg_len];
1353 match self.mq.recv(&mut buf) {
1354 Err(ref e) if e.kind() == ErrorKind::WouldBlock => None,
1355 Err(e) => panic!("Cannot receive from posix message queue: {}", e),
1356 Ok((priority, len)) => {
1357 buf.truncate(len);
1358 Some((priority, buf))
1359 }
1360 }
1361 }
1362}
1363
1364/// An `Iterator` that [`recv()`](struct.PosixMq.html#method.recv)s
1365/// messages from an owned [`PosixMq`](struct.PosixMq.html).
1366///
1367/// Iteration ends when a `recv()` fails with an `ErrorKind::WouldBlock` error,
1368/// but is infinite if the descriptor is in blocking mode.
1369///
1370/// # Panics
1371///
1372/// `next()` will panic if an error of type other than `ErrorKind::WouldBlock`
1373/// or `ErrorKind::Interrupted` occurs.
1374pub struct IntoIter {
1375 mq: PosixMq,
1376 max_msg_len: usize,
1377}
1378
1379impl Iterator for IntoIter {
1380 type Item = (u32, Vec<u8>);
1381 fn next(&mut self) -> Option<(u32, Vec<u8>)> {
1382 Iter{mq: &self.mq, max_msg_len: self.max_msg_len}.next()
1383 }
1384}
1385
1386
1387#[cfg(debug_assertions)]
1388mod doctest_md_files {
1389 macro_rules! mdfile {($content:expr, $(#[$meta:meta])* $attach_to:ident) => {
1390 #[doc=$content]
1391 #[allow(unused)]
1392 $(#[$meta])* // can't #[cfg_attr(, doc=)] in .md file
1393 enum $attach_to {}
1394 }}
1395 mdfile!{include_str!("README.md"), Readme}
1396}