summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2016-08-04 16:04:24 -0700
committerAlex Crichton <alex@alexcrichton.com>2016-08-04 20:34:54 -0700
commite7f4313cf40f52907806bfab2ca35b8c30d6095f (patch)
treecbb853388e703ae3543210393467ed3dfca0903e /src
parent04bd33e3901b5bf994b043f194aacdb8433345bb (diff)
Implementing LoopData
This type acts for a handle to storage of non-`Send` data. The handle itself is sendable across threads and is therefore suitable for storage in a `Future`. This data uses communication internally and a new method on `Task` to ensure that when the data needs to be accessed the future will find its way to the right thread. More on this type coming soon!
Diffstat (limited to 'src')
-rw-r--r--src/channel.rs118
-rw-r--r--src/event_loop.rs341
-rw-r--r--src/lib.rs5
-rw-r--r--src/mpsc_queue.rs148
4 files changed, 596 insertions, 16 deletions
diff --git a/src/channel.rs b/src/channel.rs
new file mode 100644
index 00000000..503baef5
--- /dev/null
+++ b/src/channel.rs
@@ -0,0 +1,118 @@
+//! A thin wrapper around a mpsc queue and mio-based channel information
+//!
+//! Normally the standard library's channels would suffice but we unfortunately
+//! need the `Sender<T>` half to be `Sync`, so to accomplish this for now we
+//! just vendor the same mpsc queue as the one in the standard library and then
+//! we pair that with the `mio::channel` module's Ctl pairs to control the
+//! readiness notifications on the channel.
+
+use std::cell::Cell;
+use std::io;
+use std::marker;
+use std::sync::Arc;
+
+use mio;
+use mio::channel::{ctl_pair, SenderCtl, ReceiverCtl};
+
+use mpsc_queue::{Queue, PopResult};
+
+pub struct Sender<T> {
+ ctl: SenderCtl,
+ inner: Arc<Queue<T>>,
+}
+
+pub struct Receiver<T> {
+ ctl: ReceiverCtl,
+ inner: Arc<Queue<T>>,
+ _marker: marker::PhantomData<Cell<()>>, // this type is not Sync
+}
+
+pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
+ let inner = Arc::new(Queue::new());
+ let (tx, rx) = ctl_pair();
+
+ let tx = Sender {
+ ctl: tx,
+ inner: inner.clone(),
+ };
+ let rx = Receiver {
+ ctl: rx,
+ inner: inner.clone(),
+ _marker: marker::PhantomData,
+ };
+ (tx, rx)
+}
+
+impl<T> Sender<T> {
+ pub fn send(&self, data: T) -> io::Result<()> {
+ self.inner.push(data);
+ self.ctl.inc()
+ }
+}
+
+impl<T> Receiver<T> {
+ pub fn recv(&self) -> io::Result<Option<T>> {
+ // Note that the underlying method is `unsafe` because it's only safe
+ // if one thread accesses it at a time.
+ //
+ // We, however, are the only thread with a `Receiver<T>` because this
+ // type is not `Sync`. and we never handed out another instance.
+ match unsafe { self.inner.pop() } {
+ PopResult::Data(t) => {
+ try!(self.ctl.dec());
+ Ok(Some(t))
+ }
+
+ // If the queue is either in an inconsistent or empty state, then
+ // we return `None` for both instances. Note that the standard
+ // library performs a yield loop in the event of `Inconsistent`,
+ // which means that there's data in the queue but a sender hasn't
+ // finished their operation yet.
+ //
+ // We do this because the queue will continue to be readable as
+ // the thread performing the push will eventually call `inc`, so
+ // if we return `None` and the event loop just loops aruond calling
+ // this method then we'll eventually get back to the same spot
+ // and due the retry.
+ //
+ // Basically, the inconsistent state doesn't mean we need to busy
+ // wait, but instead we can forge ahead and assume by the time we
+ // go to the kernel and come back we'll no longer be in an
+ // inconsistent state.
+ PopResult::Empty |
+ PopResult::Inconsistent => Ok(None),
+ }
+ }
+}
+
+// Just delegate everything to `self.ctl`
+impl<T> mio::Evented for Receiver<T> {
+ fn register(&self,
+ poll: &mio::Poll,
+ token: mio::Token,
+ interest: mio::EventSet,
+ opts: mio::PollOpt) -> io::Result<()> {
+ self.ctl.register(poll, token, interest, opts)
+ }
+
+ fn reregister(&self,
+ poll: &mio::Poll,
+ token: mio::Token,
+ interest: mio::EventSet,
+ opts: mio::PollOpt) -> io::Result<()> {
+ self.ctl.reregister(poll, token, interest, opts)
+ }
+
+ fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
+ self.ctl.deregister(poll)
+ }
+}
+
+impl<T> Clone for Sender<T> {
+ fn clone(&self) -> Sender<T> {
+ Sender {
+ ctl: self.ctl.clone(),
+ inner: self.inner.clone(),
+ }
+ }
+}
diff --git a/src/event_loop.rs b/src/event_loop.rs
index b41ebc47..42ca012f 100644
--- a/src/event_loop.rs
+++ b/src/event_loop.rs
@@ -1,5 +1,7 @@
+use std::any::Any;
use std::cell::{Cell, RefCell};
use std::io::{self, ErrorKind};
+use std::marker;
use std::mem;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
@@ -7,11 +9,13 @@ use std::sync::mpsc;
use std::time::{Instant, Duration};
use futures::{Future, Task, TaskHandle, Poll};
+use futures::executor::{ExecuteCallback, Executor};
use futures_io::Ready;
-use mio::channel::SendError;
use mio;
use slab::Slab;
+use channel::{Sender, Receiver, channel};
+use event_loop::dropbox::DropBox;
use slot::{self, Slot};
use timer_wheel::{TimerWheel, Timeout};
@@ -31,8 +35,8 @@ pub struct Loop {
id: usize,
active: Cell<bool>,
io: mio::Poll,
- tx: mio::channel::Sender<Message>,
- rx: mio::channel::Receiver<Message>,
+ tx: Arc<MioSender>,
+ rx: Receiver<Message>,
dispatch: RefCell<Slab<Scheduled, usize>>,
// Timer wheel keeping track of all timeouts. The `usize` stored in the
@@ -45,6 +49,10 @@ pub struct Loop {
timeouts: RefCell<Slab<(Timeout, TimeoutState), usize>>,
}
+struct MioSender {
+ inner: Sender<Message>,
+}
+
/// Handle to an event loop, used to construct I/O objects, send messages, and
/// otherwise interact indirectly with the event loop itself.
///
@@ -53,7 +61,7 @@ pub struct Loop {
#[derive(Clone)]
pub struct LoopHandle {
id: usize,
- tx: mio::channel::Sender<Message>,
+ tx: Arc<MioSender>,
}
struct Scheduled {
@@ -75,6 +83,8 @@ enum Message {
AddTimeout(Instant, Arc<Slot<io::Result<TimeoutToken>>>),
UpdateTimeout(TimeoutToken, TaskHandle),
CancelTimeout(TimeoutToken),
+ Run(Box<ExecuteCallback>),
+ Drop(DropBox<Any>),
Shutdown,
}
@@ -103,7 +113,7 @@ impl Loop {
/// Creates a new event loop, returning any error that happened during the
/// creation.
pub fn new() -> io::Result<Loop> {
- let (tx, rx) = mio::channel::from_std_channel(mpsc::channel());
+ let (tx, rx) = channel();
let io = try!(mio::Poll::new());
try!(io.register(&rx,
mio::Token(0),
@@ -113,7 +123,7 @@ impl Loop {
id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed),
active: Cell::new(true),
io: io,
- tx: tx,
+ tx: Arc::new(MioSender { inner: tx }),
rx: rx,
dispatch: RefCell::new(Slab::new_starting_at(1, SLAB_CAPACITY)),
timeouts: RefCell::new(Slab::new_starting_at(0, SLAB_CAPACITY)),
@@ -314,7 +324,8 @@ impl Loop {
}
fn consume_queue(&self) {
- while let Ok(msg) = self.rx.try_recv() {
+ // TODO: can we do better than `.unwrap()` here?
+ while let Some(msg) = self.rx.recv().unwrap() {
self.notify(msg);
}
}
@@ -337,6 +348,8 @@ impl Loop {
}
Message::UpdateTimeout(t, handle) => self.update_timeout(&t, handle),
Message::CancelTimeout(t) => self.cancel_timeout(&t),
+ Message::Run(f) => f.call(),
+ Message::Drop(data) => drop(data),
}
}
}
@@ -352,21 +365,15 @@ impl LoopHandle {
lp.notify(msg);
}
None => {
- match self.tx.send(msg) {
+ match self.tx.inner.send(msg) {
Ok(()) => {}
// This should only happen when there was an error
// writing to the pipe to wake up the event loop,
// hopefully that never happens
- Err(SendError::Io(e)) => {
+ Err(e) => {
panic!("error sending message to event loop: {}", e)
}
-
- // If we're still sending a message to the event loop
- // after it's closed, then that's bad!
- Err(SendError::Disconnected(_)) => {
- panic!("event loop is no longer available")
- }
}
}
}
@@ -511,6 +518,38 @@ impl LoopHandle {
self.send(Message::CancelTimeout(timeout))
}
+ /// Schedules a closure to add some data to event loop thread itself.
+ ///
+ /// This function is useful for when storing non-`Send` data inside of a
+ /// future. This returns a future which will resolve to a `LoopData<A>`
+ /// handle, which is itself `Send + 'static` regardless of the underlying
+ /// `A`. That is, for example, you can create a handle to some data that
+ /// contains an `Rc`, for example.
+ ///
+ /// This function takes a closure which may be sent to the event loop to
+ /// generate an instance of type `A`. The closure itself is required to be
+ /// `Send + 'static`, but the data it produces is only required to adhere to
+ /// `Any`.
+ ///
+ /// If the returned future is polled on the event loop thread itself it will
+ /// very cheaply resolve to a handle to the data, but if it's not polled on
+ /// the event loop then it will send a message to the event loop to run the
+ /// closure `f`, generate a handle, and then the future will yield it back.
+ // TODO: more with examples
+ pub fn add_loop_data<F, A>(&self, f: F) -> AddLoopData<F, A>
+ where F: FnOnce() -> A + Send + 'static,
+ A: Any,
+ {
+ AddLoopData {
+ _marker: marker::PhantomData,
+ inner: LoopFuture {
+ loop_handle: self.clone(),
+ data: Some(f),
+ result: None,
+ },
+ }
+ }
+
/// Send a message to the associated event loop that it should shut down, or
/// otherwise break out of its current loop of iteration.
///
@@ -574,6 +613,271 @@ impl Future for AddTimeout {
}
}
+/// A handle to data that is owned by an event loop thread, and is only
+/// accessible on that thread itself.
+///
+/// This structure is created by the `LoopHandle::add_loop_data` method which
+/// will return a future resolving to one of these references. A `LoopData<A>`
+/// handle is `Send` regardless of what `A` is, but the internal data can only
+/// be accessed on the event loop thread itself.
+///
+/// Internally this reference also stores a handle to the event loop that the
+/// data originated on, so it knows how to go back to the event loop to access
+/// the data itself.
+// TODO: write more once it's implemented
+pub struct LoopData<A: Any> {
+ data: DropBox<A>,
+ handle: LoopHandle,
+}
+
+/// Future returned from the `LoopHandle::add_loop_data` method.
+///
+/// This future will resolve to a `LoopData<A>` reference when completed, which
+/// represents a handle to data that is "owned" by the event loop thread but can
+/// migrate among threads temporarily so travel with a future itself.
+pub struct AddLoopData<F, A> {
+ inner: LoopFuture<DropBox<Any>, F>,
+ _marker: marker::PhantomData<fn() -> A>,
+}
+
+fn _assert() {
+ fn _assert_send<T: Send>() {}
+ _assert_send::<LoopData<()>>();
+}
+
+impl<F, A> Future for AddLoopData<F, A>
+ where F: FnOnce() -> A + Send + 'static,
+ A: Any,
+{
+ type Item = LoopData<A>;
+ type Error = io::Error;
+
+ fn poll(&mut self, _task: &mut Task) -> Poll<LoopData<A>, io::Error> {
+ let ret = self.inner.poll(|_lp, f| {
+ Ok(DropBox::new(f()))
+ });
+
+ ret.map(|mut data| {
+ match data.downcast::<A>() {
+ Some(data) => {
+ LoopData {
+ data: data,
+ handle: self.inner.loop_handle.clone(),
+ }
+ }
+ None => panic!("data mixed up?"),
+ }
+ })
+ }
+
+ fn schedule(&mut self, task: &mut Task) {
+ self.inner.schedule(task, |f, slot| {
+ Message::Run(Box::new(move || {
+ slot.try_produce(Ok(DropBox::new(f()))).ok()
+ .expect("add loop data try_produce intereference");
+ }))
+ })
+ }
+}
+
+impl<A: Any> LoopData<A> {
+ /// Gets a shared reference to the underlying data in this handle.
+ ///
+ /// Returns `None` if it is not called from the event loop thread that this
+ /// `LoopData<A>` is associated with, or `Some` with a reference to the data
+ /// if we are indeed on the event loop thread.
+ pub fn get(&self) -> Option<&A> {
+ self.data.get()
+ }
+
+ /// Gets a mutable reference to the underlying data in this handle.
+ ///
+ /// Returns `None` if it is not called from the event loop thread that this
+ /// `LoopData<A>` is associated with, or `Some` with a reference to the data
+ /// if we are indeed on the event loop thread.
+ pub fn get_mut(&mut self) -> Option<&mut A> {
+ self.data.get_mut()
+ }
+
+ /// Acquire the executor associated with the thread that owns this
+ /// `LoopData<A>`'s data.
+ ///
+ /// If the `get` and `get_mut` functions above return `None`, then this data
+ /// is being polled on the wrong thread to access the data, and to make
+ /// progress a future may need to migrate to the actual thread which owns
+ /// the relevant data.
+ ///
+ /// This executor can in turn be passed to `Task::poll_on`, which will then
+ /// move the entire future to be polled on the right thread.
+ pub fn executor(&self) -> Arc<Executor> {
+ self.handle.tx.clone()
+ }
+}
+
+impl<A: Any> Drop for LoopData<A> {
+ fn drop(&mut self) {
+ // The `DropBox` we store internally will cause a memory leak if it's
+ // dropped on the wrong thread. While necessary for safety, we don't
+ // actually want a memory leak, so for all normal circumstances we take
+ // out the `DropBox<A>` as a `DropBox<Any>` and then we send it off to
+ // the event loop.
+ //
+ // TODO: possible optimization is to do none of this if we're on the
+ // event loop thread itself
+ if let Some(data) = self.data.take_any() {
+ self.handle.send(Message::Drop(data));
+ }
+ }
+}
+
+/// A curious inner module with one `unsafe` keyword, yet quite an important
+/// one!
+///
+/// The purpose of this module is to define a type, `DropBox<A>`, which is able
+/// to be sent across thread event when the underlying data `A` is itself not
+/// sendable across threads. This is then in turn used to build up the
+/// `LoopData` abstraction above.
+///
+/// A `DropBox` currently contains two major components, an identification of
+/// the thread that it originated from as well as the data itself. Right now the
+/// data is stored in a `Box` as we'll transition between it and `Box<Any>`, but
+/// this is perhaps optimizable.
+///
+/// The `DropBox<A>` itself only provides a few safe methods, all of which are
+/// safe to call from any thread. Access to the underlying data is only granted
+/// if we're on the right thread, and otherwise the methods don't access the
+/// data itself.
+///
+/// Finally, one crucial piece, if the data is dropped it may run code that
+/// assumes it's on the original thread. For this reason we have to be sure that
+/// the data is only dropped on the originating thread itself. It's currently
+/// the job of the outer `LoopData` to ensure that a `DropBox` is dropped on the
+/// right thread, so we don't attempt to perform any communication in this
+/// `Drop` implementation. Instead, if a `DropBox` is dropped on the wrong
+/// thread, it simply leaks its contents.
+///
+/// All that's really just a lot of words in an attempt to justify the `unsafe`
+/// impl of `Send` below. The idea is that the data is only ever accessed on the
+/// originating thread, even during `Drop`.
+///
+/// Note that this is a private module to have a visibility boundary around the
+/// unsafe internals. Although there's not any unsafe blocks here, the code
+/// itself is quite unsafe as it has to make sure that the data is dropped in
+/// the right place, if ever.
+mod dropbox {
+ use std::any::Any;
+ use std::mem;
+ use super::CURRENT_LOOP;
+
+ pub struct DropBox<A: ?Sized> {
+ id: usize,
+ inner: Option<Box<A>>,
+ }
+
+ unsafe impl<A: ?Sized> Send for DropBox<A> {}
+
+ impl DropBox<Any> {
+ /// Creates a new `DropBox` pinned to the current threads.
+ ///
+ /// Will panic if `CURRENT_LOOP` isn't set.
+ pub fn new<A: Any>(a: A) -> DropBox<Any> {
+ DropBox {
+ id: CURRENT_LOOP.with(|lp| lp.id),
+ inner: Some(Box::new(a) as Box<Any>),
+ }
+ }
+
+ /// Downcasts this `DropBox` to the type specified.
+ ///
+ /// Normally this always succeeds as it's a static assertion that we
+ /// already have all the types matched up, but an `Option` is returned
+ /// here regardless.
+ pub fn downcast<A: Any>(&mut self) -> Option<DropBox<A>> {
+ self.inner.take().and_then(|data| {
+ match data.downcast::<A>() {
+ Ok(a) => Some(DropBox { id: self.id, inner: Some(a) }),
+
+ // Note that we're careful that when a downcast fails we put
+ // the data back into ourselves, because we may be
+ // downcasting on any thread. This will ensure that if we
+ // drop accidentally we'll forget the data correctly.
+ Err(obj) => {
+ self.inner = Some(obj);
+ None
+ }
+ }
+ })
+ }
+ }
+
+ impl<A: Any> DropBox<A> {
+ /// Consumes the contents of this `DropBox<A>`, returning a new
+ /// `DropBox<Any>`.
+ ///
+ /// This is just intended to be a simple and cheap conversion, should
+ /// almost always return `Some`.
+ pub fn take_any(&mut self) -> Option<DropBox<Any>> {
+ self.inner.take().map(|d| {
+ DropBox { id: self.id, inner: Some(d as Box<Any>) }
+ })
+ }
+ }
+
+ impl<A: ?Sized> DropBox<A> {
+ /// Returns a shared reference to the data if we're on the right
+ /// thread.
+ pub fn get(&self) -> Option<&A> {
+ if CURRENT_LOOP.is_set() {
+ CURRENT_LOOP.with(|lp| {
+ if lp.id == self.id {
+ self.inner.as_ref().map(|b| &**b)
+ } else {
+ None
+ }
+ })
+ } else {
+ None
+ }
+ }
+
+ /// Returns a mutable reference to the data if we're on the right
+ /// thread.
+ pub fn get_mut(&mut self) -> Option<&mut A> {
+ if CURRENT_LOOP.is_set() {
+ CURRENT_LOOP.with(move |lp| {
+ if lp.id == self.id {
+ self.inner.as_mut().map(|b| &mut **b)
+ } else {
+ None
+ }
+ })
+ } else {
+ None
+ }
+ }
+ }
+
+ impl<A: ?Sized> Drop for DropBox<A> {
+ fn drop(&mut self) {
+ // Try our safe accessor first, and if it works then we know that
+ // we're on the right thread. In that case we can simply drop as
+ // usual.
+ if let Some(a) = self.get_mut().take() {
+ return drop(a)
+ }
+
+ // If we're on the wrong thread but we actually have some data, then
+ // something in theory horrible has gone awry. Prevent memory safety
+ // issues by forgetting the data and then also warn about this odd
+ // event.
+ if let Some(data) = self.inner.take() {
+ mem::forget(data);
+ warn!("forgetting some data on an event loop");
+ }
+ }
+ }
+}
+
struct LoopFuture<T, U> {
loop_handle: LoopHandle,
data: Option<U>,
@@ -671,3 +975,10 @@ impl<E: ?Sized> Source<E> {
&self.io
}
}
+
+impl Executor for MioSender {
+ fn execute_boxed(&self, callback: Box<ExecuteCallback>) {
+ self.inner.send(Message::Run(callback))
+ .expect("error sending a message to the event loop")
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index d0888963..0967c6ad 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -29,8 +29,11 @@ pub mod timer_wheel;
mod slot;
#[path = "../../src/lock.rs"]
mod lock;
+mod mpsc_queue;
+mod channel;
-pub use event_loop::{Loop, LoopHandle};
+pub use event_loop::{Loop, LoopHandle, AddSource, AddTimeout};
+pub use event_loop::{LoopData, AddLoopData, TimeoutToken};
pub use readiness_stream::ReadinessStream;
pub use tcp::{TcpListener, TcpStream};
pub use timeout::Timeout;
diff --git a/src/mpsc_queue.rs b/src/mpsc_queue.rs
new file mode 100644
index 00000000..7893bbb7
--- /dev/null
+++ b/src/mpsc_queue.rs
@@ -0,0 +1,148 @@
+/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+ * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+ * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and documentation are
+ * those of the authors and should not be interpreted as representing official
+ * policies, either expressed or implied, of Dmitry Vyukov.
+ */
+
+//! A mostly lock-free multi-producer, single consumer queue.
+//!
+//! This module contains an implementation of a concurrent MPSC queue. This
+//! queue can be used to share data between threads, and is also used as the
+//! building block of channels in rust.
+//!
+//! Note that the current implementation of this queue has a caveat of the `pop`
+//! method, and see the method for more information about it. Due to this
+//! caveat, this queue may not be appropriate for all use-cases.
+
+// http://www.1024cores.net/home/lock-free-algorithms
+// /queues/non-intrusive-mpsc-node-based-queue
+
+// NOTE: this implementation is lifted from the standard library and only
+// slightly modified
+
+pub use self::PopResult::*;
+
+use std::cell::UnsafeCell;
+use std::ptr;
+use std::sync::atomic::{AtomicPtr, Ordering};
+
+/// A result of the `pop` function.
+pub enum PopResult<T> {
+ /// Some data has been popped
+ Data(T),
+ /// The queue is empty
+ Empty,
+ /// The queue is in an inconsistent state. Popping data should succeed, but
+ /// some pushers have yet to make enough progress in order allow a pop to
+ /// succeed. It is recommended that a pop() occur "in the near future" in
+ /// order to see if the sender has made progress or not
+ Inconsistent,
+}
+
+struct Node<T> {
+ next: AtomicPtr<Node<T>>,
+ value: Option<T>,
+}
+
+/// The multi-producer single-consumer structure. This is not cloneable, but it
+/// may be safely shared so long as it is guaranteed that there is only one
+/// popper at a time (many pushers are allowed).
+pub struct Queue<T> {
+ head: AtomicPtr<Node<T>>,
+ tail: UnsafeCell<*mut Node<T>>,
+}
+
+unsafe impl<T: Send> Send for Queue<T> { }
+unsafe impl<T: Send> Sync for Queue<T> { }
+
+impl<T> Node<T> {
+ unsafe fn new(v: Option<T>) -> *mut Node<T> {
+ Box::into_raw(Box::new(Node {
+ next: AtomicPtr::new(ptr::null_mut()),
+ value: v,
+ }))
+ }
+}
+
+impl<T> Queue<T> {
+ /// Creates a new queue that is safe to share among multiple producers and
+ /// one consumer.
+ pub fn new() -> Queue<T> {
+ let stub = unsafe { Node::new(None) };
+ Queue {
+ head: AtomicPtr::new(stub),
+ tail: UnsafeCell::new(stub),
+ }
+ }
+
+ /// Pushes a new value onto this queue.
+ pub fn push(&self, t: T) {
+ unsafe {
+ let n = Node::new(Some(t));
+ let prev = self.head.swap(n, Ordering::AcqRel);
+ (*prev).next.store(n, Ordering::Release);
+ }
+ }
+
+ /// Pops some data from this queue.
+ ///
+ /// Note that the current implementation means that this function cannot
+ /// return `Option<T>`. It is possible for this queue to be in an
+ /// inconsistent state where many pushes have succeeded and completely
+ /// finished, but pops cannot return `Some(t)`. This inconsistent state
+ /// happens when a pusher is pre-empted at an inopportune moment.
+ ///
+ /// This inconsistent state means that this queue does indeed have data, but
+ /// it does not currently have access to it at this time.
+ ///
+ /// This function is unsafe because only one thread can call it at a time.
+ pub unsafe fn pop(&self) -> PopResult<T> {
+ let tail = *self.tail.get();
+ let next = (*tail).next.load(Ordering::Acquire);
+
+ if !next.is_null() {
+ *self.tail.get() = next;
+ assert!((*tail).value.is_none());
+ assert!((*next).value.is_some());
+ let ret = (*next).value.take().unwrap();
+ drop(Box::from_raw(tail));
+ return Data(ret);
+ }
+
+ if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent}
+ }
+}
+
+impl<T> Drop for Queue<T> {
+ fn drop(&mut self) {
+ unsafe {
+ let mut cur = *self.tail.get();
+ while !cur.is_null() {
+ let next = (*cur).next.load(Ordering::Relaxed);
+ drop(Box::from_raw(cur));
+ cur = next;
+ }
+ }
+ }
+}