diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/bin/echo.rs | 7 | ||||
-rw-r--r-- | src/bin/sink.rs | 11 | ||||
-rw-r--r-- | src/channel.rs | 2 | ||||
-rw-r--r-- | src/io/copy.rs | 82 | ||||
-rw-r--r-- | src/io/flush.rs | 39 | ||||
-rw-r--r-- | src/io/mod.rs | 47 | ||||
-rw-r--r-- | src/io/read_exact.rs | 77 | ||||
-rw-r--r-- | src/io/read_to_end.rs | 62 | ||||
-rw-r--r-- | src/io/task.rs | 102 | ||||
-rw-r--r-- | src/io/window.rs | 116 | ||||
-rw-r--r-- | src/io/write_all.rs | 80 | ||||
-rw-r--r-- | src/lib.rs | 5 | ||||
-rw-r--r-- | src/lock.rs | 106 | ||||
-rw-r--r-- | src/slot.rs | 691 | ||||
-rw-r--r-- | src/tcp.rs | 2 | ||||
-rw-r--r-- | src/timeout.rs | 2 | ||||
-rw-r--r-- | src/udp.rs | 2 |
17 files changed, 1416 insertions, 17 deletions
diff --git a/src/bin/echo.rs b/src/bin/echo.rs index ccd94752..710db79d 100644 --- a/src/bin/echo.rs +++ b/src/bin/echo.rs @@ -1,22 +1,21 @@ //! An echo server that just writes back everything that's written to it. extern crate futures; -extern crate futures_io; -extern crate futures_mio; +extern crate tokio_core; use std::env; use std::net::SocketAddr; use futures::Future; use futures::stream::Stream; -use futures_io::{copy, TaskIo}; +use tokio_core::io::{copy, TaskIo}; fn main() { let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse::<SocketAddr>().unwrap(); // Create the event loop that will drive this server - let mut l = futures_mio::Loop::new().unwrap(); + let mut l = tokio_core::Loop::new().unwrap(); // Create a TCP listener which will listen for incoming connections let server = l.handle().tcp_listen(&addr); diff --git a/src/bin/sink.rs b/src/bin/sink.rs index 49feed35..baf96b8e 100644 --- a/src/bin/sink.rs +++ b/src/bin/sink.rs @@ -5,8 +5,7 @@ #[macro_use] extern crate futures; -extern crate futures_io; -extern crate futures_mio; +extern crate tokio_core; use std::env; use std::iter; @@ -14,13 +13,13 @@ use std::net::SocketAddr; use futures::Future; use futures::stream::{self, Stream}; -use futures_io::IoFuture; +use tokio_core::io::IoFuture; fn main() { let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse::<SocketAddr>().unwrap(); - let mut l = futures_mio::Loop::new().unwrap(); + let mut l = tokio_core::Loop::new().unwrap(); let server = l.handle().tcp_listen(&addr).and_then(|socket| { socket.incoming().and_then(|(socket, addr)| { println!("got a socket: {}", addr); @@ -34,10 +33,10 @@ fn main() { l.run(server).unwrap(); } -fn write(socket: futures_mio::TcpStream) -> IoFuture<()> { +fn write(socket: tokio_core::TcpStream) -> IoFuture<()> { static BUF: &'static [u8] = &[0; 64 * 1024]; let iter = iter::repeat(()).map(|()| Ok(())); stream::iter(iter).fold(socket, |socket, ()| { - futures_io::write_all(socket, BUF).map(|(socket, _)| socket) + tokio_core::io::write_all(socket, BUF).map(|(socket, _)| socket) }).map(|_| ()).boxed() } diff --git a/src/channel.rs b/src/channel.rs index cd5fef46..3da44b53 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -3,10 +3,10 @@ use std::sync::mpsc::TryRecvError; use futures::{Future, Poll}; use futures::stream::Stream; -use futures_io::IoFuture; use mio::channel; use {ReadinessStream, LoopHandle}; +use io::IoFuture; /// The transmission half of a channel used for sending messages to a receiver. /// diff --git a/src/io/copy.rs b/src/io/copy.rs new file mode 100644 index 00000000..042e819b --- /dev/null +++ b/src/io/copy.rs @@ -0,0 +1,82 @@ +use std::io::{self, Read, Write}; + +use futures::{Future, Poll}; + +/// A future which will copy all data from a reader into a writer. +/// +/// Created by the `copy` function, this future will resolve to the number of +/// bytes copied or an error if one happens. +pub struct Copy<R, W> { + reader: R, + read_done: bool, + writer: W, + pos: usize, + cap: usize, + amt: u64, + buf: Box<[u8]>, +} + +/// Creates a future which represents copying all the bytes from one object to +/// another. +/// +/// The returned future will copy all the bytes read from `reader` into the +/// `writer` specified. This future will only complete once the `reader` has hit +/// EOF and all bytes have been written to and flushed from the `writer` +/// provided. +/// +/// On success the number of bytes is returned and the `reader` and `writer` are +/// consumed. On error the error is returned and the I/O objects are consumed as +/// well. +pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W> + where R: Read, + W: Write, +{ + Copy { + reader: reader, + read_done: false, + writer: writer, + amt: 0, + pos: 0, + cap: 0, + buf: Box::new([0; 2048]), + } +} + +impl<R, W> Future for Copy<R, W> + where R: Read, + W: Write, +{ + type Item = u64; + type Error = io::Error; + + fn poll(&mut self) -> Poll<u64, io::Error> { + loop { + // If our buffer is empty, then we need to read some data to + // continue. + if self.pos == self.cap && !self.read_done { + let n = try_nb!(self.reader.read(&mut self.buf)); + if n == 0 { + self.read_done = true; + } else { + self.pos = 0; + self.cap = n; + } + } + + // If our buffer has some data, let's write it out! + while self.pos < self.cap { + let i = try_nb!(self.writer.write(&self.buf[self.pos..self.cap])); + self.pos += i; + self.amt += i as u64; + } + + // If we've written al the data and we've seen EOF, flush out the + // data and finish the transfer. + // done with the entire transfer. + if self.pos == self.cap && self.read_done { + try_nb!(self.writer.flush()); + return Poll::Ok(self.amt) + } + } + } +} diff --git a/src/io/flush.rs b/src/io/flush.rs new file mode 100644 index 00000000..159a178b --- /dev/null +++ b/src/io/flush.rs @@ -0,0 +1,39 @@ +use std::io::{self, Write}; + +use futures::{Poll, Future}; + +/// A future used to fully flush an I/O object. +/// +/// Resolves to the underlying I/O object once the flush operation is complete. +/// +/// Created by the `flush` function. +pub struct Flush<A> { + a: Option<A>, +} + +/// Creates a future which will entirely flush an I/O object and then yield the +/// object itself. +/// +/// This function will consume the object provided if an error happens, and +/// otherwise it will repeatedly call `flush` until it sees `Ok(())`, scheduling +/// a retry if `WouldBlock` is seen along the way. +pub fn flush<A>(a: A) -> Flush<A> + where A: Write, +{ + Flush { + a: Some(a), + } +} + +impl<A> Future for Flush<A> + where A: Write, +{ + type Item = A; + type Error = io::Error; + + fn poll(&mut self) -> Poll<A, io::Error> { + try_nb!(self.a.as_mut().unwrap().flush()); + Poll::Ok(self.a.take().unwrap()) + } +} + diff --git a/src/io/mod.rs b/src/io/mod.rs new file mode 100644 index 00000000..41a73978 --- /dev/null +++ b/src/io/mod.rs @@ -0,0 +1,47 @@ +//! I/O conveniences when working with primitives in `tokio-core` +//! +//! Contains various combinators to work with I/O objects and type definitions +//! as well. + +use std::io; + +use futures::BoxFuture; +use futures::stream::BoxStream; + +/// A convenience typedef around a `Future` whose error component is `io::Error` +pub type IoFuture<T> = BoxFuture<T, io::Error>; + +/// A convenience typedef around a `Stream` whose error component is `io::Error` +pub type IoStream<T> = BoxStream<T, io::Error>; + +/// A convenience macro for working with `io::Result<T>` from the `Read` and +/// `Write` traits. +/// +/// This macro takes `io::Result<T>` as input, and returns `T` as the output. If +/// the input type is of the `Err` variant, then `Poll::NotReady` is returned if +/// it indicates `WouldBlock` or otherwise `Err` is returned. +#[macro_export] +macro_rules! try_nb { + ($e:expr) => (match $e { + Ok(t) => t, + Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => { + return ::futures::Poll::NotReady + } + Err(e) => return ::futures::Poll::Err(e.into()), + }) +} + +mod copy; +mod flush; +mod read_exact; +mod read_to_end; +mod task; +mod window; +mod write_all; +pub use self::copy::{copy, Copy}; +pub use self::flush::{flush, Flush}; +pub use self::read_exact::{read_exact, ReadExact}; +pub use self::read_to_end::{read_to_end, ReadToEnd}; +pub use self::task::{TaskIo, TaskIoRead, TaskIoWrite}; +pub use self::window::Window; +pub use self::write_all::{write_all, WriteAll}; diff --git a/src/io/read_exact.rs b/src/io/read_exact.rs new file mode 100644 index 00000000..b251bfea --- /dev/null +++ b/src/io/read_exact.rs @@ -0,0 +1,77 @@ +use std::io::{self, Read}; +use std::mem; + +use futures::{Poll, Future}; + +/// A future which can be used to easily read the entire contents of a stream +/// into a vector. +/// +/// Created by the `read_exact` function. +pub struct ReadExact<A, T> { + state: State<A, T>, +} + +enum State<A, T> { + Reading { + a: A, + buf: T, + pos: usize, + }, + Empty, +} + +/// Creates a future which will read exactly enough bytes to fill `buf`, +/// returning an error if EOF is hit sooner. +/// +/// The returned future will resolve to both the I/O stream as well as the +/// buffer once the read operation is completed. +/// +/// In the case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success the object will be destroyed and +/// the buffer will be returned, with all data read from the stream appended to +/// the buffer. +pub fn read_exact<A, T>(a: A, buf: T) -> ReadExact<A, T> + where A: Read, + T: AsMut<[u8]>, +{ + ReadExact { + state: State::Reading { + a: a, + buf: buf, + pos: 0, + }, + } +} + +fn eof() -> io::Error { + io::Error::new(io::ErrorKind::UnexpectedEof, "early eof") +} + +impl<A, T> Future for ReadExact<A, T> + where A: Read, + T: AsMut<[u8]>, +{ + type Item = (A, T); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, T), io::Error> { + match self.state { + State::Reading { ref mut a, ref mut buf, ref mut pos } => { + let buf = buf.as_mut(); + while *pos < buf.len() { + let n = try_nb!(a.read(&mut buf[*pos..])); + *pos += n; + if n == 0 { + return Poll::Err(eof()) + } + } + } + State::Empty => panic!("poll a WriteAll after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, buf, .. } => Poll::Ok((a, buf)), + State::Empty => panic!(), + } + } +} diff --git a/src/io/read_to_end.rs b/src/io/read_to_end.rs new file mode 100644 index 00000000..80e7cd6c --- /dev/null +++ b/src/io/read_to_end.rs @@ -0,0 +1,62 @@ +use std::io::{self, Read}; +use std::mem; + +use futures::{Poll, Future}; + +/// A future which can be used to easily read the entire contents of a stream +/// into a vector. +/// +/// Created by the `read_to_end` function. +pub struct ReadToEnd<A> { + state: State<A>, +} + +enum State<A> { + Reading { + a: A, + buf: Vec<u8>, + }, + Empty, +} + +/// Creates a future which will read all the bytes associated with the I/O +/// object `A` into the buffer provided. +/// +/// In the case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success the object will be destroyed and +/// the buffer will be returned, with all data read from the stream appended to +/// the buffer. +pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A> + where A: Read, +{ + ReadToEnd { + state: State::Reading { + a: a, + buf: buf, + } + } +} + +impl<A> Future for ReadToEnd<A> + where A: Read, +{ + type Item = (A, Vec<u8>); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> { + match self.state { + State::Reading { ref mut a, ref mut buf } => { + // If we get `Ok`, then we know the stream hit EOF and we're done. If we + // hit "would block" then all the read data so far is in our buffer, and + // otherwise we propagate errors + try_nb!(a.read_to_end(buf)); + }, + State::Empty => panic!("poll ReadToEnd after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, buf } => Poll::Ok((a, buf)), + State::Empty => unreachable!(), + } + } +} diff --git a/src/io/task.rs b/src/io/task.rs new file mode 100644 index 00000000..3c87a32e --- /dev/null +++ b/src/io/task.rs @@ -0,0 +1,102 @@ +use std::cell::RefCell; +use std::io::{self, Read, Write}; + +use futures::task::TaskData; + +/// Abstraction that allows inserting an I/O object into task-local storage, +/// returning a handle that can be split. +/// +/// A `TaskIo<T>` handle implements the `ReadTask` and `WriteTask` and will only +/// work with the same task that the associated object was inserted into. The +/// handle may then be optionally `split` into the read/write halves so they can +/// be worked with independently. +/// +/// Note that it is important that the future returned from `TaskIo::new`, when +/// polled, will pin the yielded `TaskIo<T>` object to that specific task. Any +/// attempt to read or write the object on other tasks will result in a panic. +pub struct TaskIo<T> { + handle: TaskData<RefCell<T>>, +} + +/// The readable half of a `TaskIo<T>` instance returned from `TaskIo::split`. +/// +/// This handle implements the `ReadTask` trait and can be used to split up an +/// I/O object into two distinct halves. +pub struct TaskIoRead<T> { + handle: TaskData<RefCell<T>>, +} + +/// The writable half of a `TaskIo<T>` instance returned from `TaskIo::split`. +/// +/// This handle implements the `WriteTask` trait and can be used to split up an +/// I/O object into two distinct halves. +pub struct TaskIoWrite<T> { + handle: TaskData<RefCell<T>>, +} + +impl<T> TaskIo<T> { + /// Returns a new future which represents the insertion of the I/O object + /// `T` into task local storage, returning a `TaskIo<T>` handle to it. + /// + /// The returned future will never resolve to an error. + pub fn new(t: T) -> TaskIo<T> { + TaskIo { + handle: TaskData::new(RefCell::new(t)), + } + } +} + +impl<T> TaskIo<T> + where T: Read + Write, +{ + /// For an I/O object which is both readable and writable, this method can + /// be used to split the handle into two independently owned halves. + /// + /// The returned pair implements the `ReadTask` and `WriteTask` traits, + /// respectively, and can be used to pass around the object to different + /// combinators if necessary. + pub fn split(self) -> (TaskIoRead<T>, TaskIoWrite<T>) { + (TaskIoRead { handle: self.handle.clone() }, + TaskIoWrite { handle: self.handle }) + } +} + +impl<T> Read for TaskIo<T> + where T: io::Read, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.handle.with(|t| t.borrow_mut().read(buf)) + } +} + +impl<T> Write for TaskIo<T> + where T: io::Write, +{ + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.handle.with(|t| t.borrow_mut().write(buf)) + } + + fn flush(&mut self) -> io::Result<()> { + self.handle.with(|t| t.borrow_mut().flush()) + } +} + +impl<T> Read for TaskIoRead<T> + where T: io::Read, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.handle.with(|t| t.borrow_mut().read(buf)) + } +} + +impl<T> Write for TaskIoWrite<T> + where T: io::Write, +{ + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.handle.with(|t| t.borrow_mut().write(buf)) + } + + fn flush(&mut self) -> io::Result<()> { + self.handle.with(|t| t.borrow_mut().flush()) + } +} diff --git a/src/io/window.rs b/src/io/window.rs new file mode 100644 index 00000000..c6136099 --- /dev/null +++ b/src/io/window.rs @@ -0,0 +1,116 @@ +use std::ops; + +/// A owned window around an underlying buffer. +/// +/// Normally slices work great for considering sub-portions of a buffer, but +/// unfortunately a slice is a *borrowed* type in Rust which has an associated +/// lifetime. When working with future and async I/O these lifetimes are not +/// always appropriate, and are sometimes difficult to store in tasks. This +/// type strives to fill this gap by providing an "owned slice" around an +/// underlying buffer of bytes. +/// +/// A `Window<T>` wraps an underlying buffer, `T`, and has configurable +/// start/end indexes to alter the behavior of the `AsRef<[u8]>` implementation +/// that this type carries. +/// +/// This type can be particularly useful when working with the `write_all` +/// combinator in this crate. Data can be sliced via `Window`, consumed by +/// `write_all`, and then earned back once the write operation finishes through +/// the `into_inner` method on this type. +pub struct Window<T> { + inner: T, + range: ops::Range<usize>, +} + +impl<T: AsRef<[u8]>> Window<T> { + /// Creates a new window around the buffer `t` defaulting to the entire + /// slice. + /// + /// Further methods can be called on the returned `Window<T>` to alter the + /// window into the data provided. + pub fn new(t: T) -> Window<T> { + Window { + range: 0..t.as_ref().len(), + inner: t, + } + } + + /// Gets a shared reference to the underlying buffer inside of this + /// `Window`. + pub fn get_ref(&self) -> &T { + &self.inner + } + + /// Gets a mutable reference to the underlying buffer inside of this + /// `Window`. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Consumes this `Window`, returning the underlying buffer. + pub fn into_inner(self) -> T { + self.inner + } + + /// Returns the starting index of this window into the underlying buffer + /// `T`. + pub fn start(&self) -> usize { + self.range.start + } + + /// Returns the end index of this window into the underlying buffer + /// `T`. + pub fn end(&self) -> usize { + self.range.end + } + + /// Changes the starting index of this window to the index specified. + /// + /// Returns the windows back to chain multiple calls to this method. + /// + /// # Panics + /// + /// This method will panic if `start` is out of bounds for the underlying + /// slice or if it comes after the `end` configured in this window. + pub fn set_start(&mut self, start: usize) -> &mut Window<T> { + assert!(start < self.inner.as_ref().len()); + assert!(start <= self.range.end); + self.range.start = start; + self + } + + /// Changes the end index of this window to the index specified. + /// + /// Returns the windows back to chain multiple calls to this method. + /// + /// # Panics + /// + /// This method will panic if `end` is out of bounds for the underlying + /// slice or if it comes after the `end` configured in this window. + pub fn set_end(&mut self, end: usize) -> &mut Window<T> { + assert!(end < self.inner.as_ref().len()); + assert!(self.range.start <= end); + self.range.end = end; + self + } + + // TODO: how about a generic set() method along the lines of: + // + // buffer.set(..3) + // .set(0..2) + // .set(4..) + // + // etc. +} + +impl<T: AsRef<[u8]>> AsRef<[u8]> for Window<T> { + fn as_ref(&self) -> &[u8] { + &self.inner.as_ref()[self.range.start..self.range.end] + } +} + +impl<T: AsMut<[u8]>> AsMut<[u8]> for Window<T> { + fn as_mut(&mut self) -> &mut [u8] { + &mut self.inner.as_mut()[self.range.start..self.range.end] + } +} diff --git a/src/io/write_all.rs b/src/io/write_all.rs new file mode 100644 index 00000000..df4751df --- /dev/null +++ b/src/io/write_all.rs @@ -0,0 +1,80 @@ +use std::io::{self, Write}; +use std::mem; + +use futures::{Poll, Future}; + +/// A future used to write the entire contents of some data to a stream. +/// +/// This is created by the `write_all` top-level method. +pub struct WriteAll<A, T> { + state: State<A, T>, +} + +enum State<A, T> { + Writing { + a: A, + buf: T, + pos: usize, + }, + Empty, +} + +/// Creates a future that will write the entire contents of the buffer `buf` to +/// the stream `a` provided. +/// +/// The returned future will not return until all the data has been written, and +/// the future will resolve to the stream as well as the buffer (for reuse if +/// needed). +/// +/// Any error which happens during writing will cause both the stream and the +/// buffer to get destroyed. +/// +/// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should +/// be broadly applicable to accepting data which can be converted to a slice. +/// The `Window` struct is also available in this crate to provide a different +/// window into a slice if necessary. +pub fn write_all<A, T>(a: A, buf: T) -> WriteAll<A, T> + where A: Write, + T: AsRef<[u8]>, +{ + WriteAll { + state: State::Writing { + a: a, + buf: buf, + pos: 0, + }, + } +} + +fn zero_write() -> io::Error { + io::Error::new(io::ErrorKind::WriteZero, "zero-length write") +} + +impl<A, T> Future for WriteAll<A, T> + where A: Write, + T: AsRef<[u8]>, +{ + type Item = (A, T); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, T), io::Error> { + match self.state { + State::Writing { ref mut a, ref buf, ref mut pos } => { + let buf = buf.as_ref(); + while *pos < buf.len() { + let n = try_nb!(a.write(&buf[*pos..])); + *pos += n; + if n == 0 { + return Poll::Err(zero_write()) + } + } + } + State::Empty => panic!("poll a WriteAll after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Writing { a, buf, .. } => Poll::Ok((a, buf)), + State::Empty => panic!(), + } + } +} @@ -6,7 +6,6 @@ #![deny(missing_docs)] extern crate futures; -extern crate futures_io; extern crate mio; extern crate slab; @@ -16,9 +15,7 @@ extern crate scoped_tls; #[macro_use] extern crate log; -#[path = "../../src/slot.rs"] mod slot; -#[path = "../../src/lock.rs"] mod lock; mod channel; @@ -30,6 +27,8 @@ mod timeout; mod timer_wheel; mod udp; +pub mod io; + pub use channel::{Sender, Receiver}; pub use event_loop::{Loop, LoopPin, LoopHandle, AddSource, AddTimeout}; pub use event_loop::{LoopData, AddLoopData, TimeoutToken, IoToken}; diff --git a/src/lock.rs b/src/lock.rs new file mode 100644 index 00000000..e5bc6b2b --- /dev/null +++ b/src/lock.rs @@ -0,0 +1,106 @@ +//! A "mutex" which only supports try_lock +//! +//! As a futures library the eventual call to an event loop should be the only +//! thing that ever blocks, so this is assisted with a fast user-space +//! implementation of a lock that can only have a `try_lock` operation. + +extern crate core; + +use self::core::cell::UnsafeCell; +use self::core::ops::{Deref, DerefMut}; +use self::core::sync::atomic::Ordering::{Acquire, Release}; +use self::core::sync::atomic::AtomicBool; + +/// A "mutex" around a value, similar to `std::sync::Mutex<T>`. +/// +/// This lock only supports the `try_lock` operation, however, and does not +/// implement poisoning. +pub struct Lock<T> { + locked: AtomicBool, + data: UnsafeCell<T>, +} + +/// Sentinel representing an acquired lock through which the data can be +/// accessed. +pub struct TryLock<'a, T: 'a> { + __ptr: &'a Lock<T>, +} + +// The `Lock` structure is basically just a `Mutex<T>`, and these two impls are +// intended to mirror the standard library's corresponding impls for `Mutex<T>`. +// +// If a `T` is sendable across threads, so is the lock, and `T` must be sendable +// across threads to be `Sync` because it allows mutable access from multiple +// threads. +unsafe impl<T: Send> Send for Lock<T> {} +unsafe impl<T: Send> Sync for Lock<T> {} + +impl<T> Lock<T> { + /// Creates a new lock around the given value. + pub fn new(t: T) -> Lock<T> { + Lock { + locked: AtomicBool::new(false), + data: UnsafeCell::new(t), + } + } + + /// Attempts to acquire this lock, returning whether the lock was acquired or + /// not. + /// + /// If `Some` is returned then the data this lock protects can be accessed + /// through the sentinel. This sentinel allows both mutable and immutable + /// access. + /// + /// If `None` is returned then the lock is already locked, either elsewhere + /// on this thread or on another thread. + pub fn try_lock(&self) -> Option<TryLock<T>> { + if !self.locked.swap(true, Acquire) { + Some(TryLock { __ptr: self }) + } else { + None + } + } +} + +impl<'a, T> Deref for TryLock<'a, T> { + type Target = T; + fn deref(&self) -> &T { + // The existence of `TryLock` represents that we own the lock, so we + // can safely access the data here. + unsafe { &*self.__ptr.data.get() } + } +} + +impl<'a, T> DerefMut for TryLock<'a, T> { + fn deref_mut(&mut self) -> &mut T { + // The existence of `TryLock` represents that we own the lock, so we + // can safely access the data here. + // + // Additionally, we're the *only* `TryLock` in existence so mutable + // access should be ok. + unsafe { &mut *self.__ptr.data.get() } + } +} + +impl<'a, T> Drop for TryLock<'a, T> { + fn drop(&mut self) { + self.__ptr.locked.store(false, Release); + } +} + +#[cfg(test)] +mod tests { + use super::Lock; + + #[test] + fn smoke() { + let a = Lock::new(1); + let mut a1 = a.try_lock().unwrap(); + assert!(a.try_lock().is_none()); + assert_eq!(*a1, 1); + *a1 = 2; + drop(a1); + assert_eq!(*a.try_lock().unwrap(), 2); + assert_eq!(*a.try_lock().unwrap(), 2); + } +} diff --git a/src/slot.rs b/src/slot.rs new file mode 100644 index 00000000..d802c987 --- /dev/null +++ b/src/slot.rs @@ -0,0 +1,691 @@ +//! A slot in memory for communicating between a producer and a consumer. +//! +//! This module contains an implementation detail of this library for a type +//! which is only intended to be shared between one consumer and one producer of +//! a value. It is unlikely that this module will survive stabilization of this +//! library, so it is not recommended to rely on it. + +#![allow(dead_code)] // imported in a few places + +use std::prelude::v1::*; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use lock::Lock; + +/// A slot in memory intended to represent the communication channel between one +/// producer and one consumer. +/// +/// Each slot contains space for a piece of data of type `T`, and can have +/// callbacks registered to run when the slot is either full or empty. +/// +/// Slots are only intended to be shared between exactly one producer and +/// exactly one consumer. If there are multiple concurrent producers or +/// consumers then this is still memory safe but will have unpredictable results +/// (and maybe panics). Note that this does not require that the "consumer" is +/// the same for the entire lifetime of a slot, simply that there is only one +/// consumer at a time. +/// +/// # Registering callbacks +/// +/// [`on_empty`](#method.on_empty) registers a callback to run when the slot +/// becomes empty, and [`on_full`](#method.on_full) registers one to run when it +/// becomes full. In both cases, the callback will run immediately if possible. +/// +/// At most one callback can be registered at any given time: it is an error to +/// attempt to register a callback with `on_full` if one is currently registered +/// via `on_empty`, or any other combination. +/// +/// # Cancellation +/// +/// Registering a callback returns a `Token` which can be used to +/// [`cancel`](#method.cancel) the callback. Only callbacks that have not yet +/// started running can be canceled. Canceling a callback that has already run +/// is not an error, and `cancel` does not signal whether or not the callback +/// was actually canceled to the caller. +pub struct Slot<T> { + // The purpose of this data type is to communicate when a value becomes + // available and coordinate between a producer and consumer about that + // value. Slots end up being at the core of many futures as they handle + // values transferring between producers and consumers, which means that + // they can never block. + // + // As a result, this `Slot` is a lock-free implementation in terms of not + // actually blocking at any point in time. The `Lock` types are + // half-optional and half-not-optional. They aren't actually mutexes as they + // only support a `try_lock` operation, and all the methods below ensure + // that progress can always be made without |