summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2016-08-26 14:30:46 -0700
committerAlex Crichton <alex@alexcrichton.com>2016-08-26 14:39:47 -0700
commitf107c8d860137b41e509b179d605db30082cb0da (patch)
tree063e7b0c86785f70a63458e17183602e58d9c7f3 /src
parente71d509fee767d6b796ba18a5501f80f0fb4babc (diff)
Rename to tokio-core, add in futures-io
Renames the futures-mio crate to tokio-core, pulls in the futures-io crate under an `io` module, and gets everything compiling.
Diffstat (limited to 'src')
-rw-r--r--src/bin/echo.rs7
-rw-r--r--src/bin/sink.rs11
-rw-r--r--src/channel.rs2
-rw-r--r--src/io/copy.rs82
-rw-r--r--src/io/flush.rs39
-rw-r--r--src/io/mod.rs47
-rw-r--r--src/io/read_exact.rs77
-rw-r--r--src/io/read_to_end.rs62
-rw-r--r--src/io/task.rs102
-rw-r--r--src/io/window.rs116
-rw-r--r--src/io/write_all.rs80
-rw-r--r--src/lib.rs5
-rw-r--r--src/lock.rs106
-rw-r--r--src/slot.rs691
-rw-r--r--src/tcp.rs2
-rw-r--r--src/timeout.rs2
-rw-r--r--src/udp.rs2
17 files changed, 1416 insertions, 17 deletions
diff --git a/src/bin/echo.rs b/src/bin/echo.rs
index ccd94752..710db79d 100644
--- a/src/bin/echo.rs
+++ b/src/bin/echo.rs
@@ -1,22 +1,21 @@
//! An echo server that just writes back everything that's written to it.
extern crate futures;
-extern crate futures_io;
-extern crate futures_mio;
+extern crate tokio_core;
use std::env;
use std::net::SocketAddr;
use futures::Future;
use futures::stream::Stream;
-use futures_io::{copy, TaskIo};
+use tokio_core::io::{copy, TaskIo};
fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
// Create the event loop that will drive this server
- let mut l = futures_mio::Loop::new().unwrap();
+ let mut l = tokio_core::Loop::new().unwrap();
// Create a TCP listener which will listen for incoming connections
let server = l.handle().tcp_listen(&addr);
diff --git a/src/bin/sink.rs b/src/bin/sink.rs
index 49feed35..baf96b8e 100644
--- a/src/bin/sink.rs
+++ b/src/bin/sink.rs
@@ -5,8 +5,7 @@
#[macro_use]
extern crate futures;
-extern crate futures_io;
-extern crate futures_mio;
+extern crate tokio_core;
use std::env;
use std::iter;
@@ -14,13 +13,13 @@ use std::net::SocketAddr;
use futures::Future;
use futures::stream::{self, Stream};
-use futures_io::IoFuture;
+use tokio_core::io::IoFuture;
fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
- let mut l = futures_mio::Loop::new().unwrap();
+ let mut l = tokio_core::Loop::new().unwrap();
let server = l.handle().tcp_listen(&addr).and_then(|socket| {
socket.incoming().and_then(|(socket, addr)| {
println!("got a socket: {}", addr);
@@ -34,10 +33,10 @@ fn main() {
l.run(server).unwrap();
}
-fn write(socket: futures_mio::TcpStream) -> IoFuture<()> {
+fn write(socket: tokio_core::TcpStream) -> IoFuture<()> {
static BUF: &'static [u8] = &[0; 64 * 1024];
let iter = iter::repeat(()).map(|()| Ok(()));
stream::iter(iter).fold(socket, |socket, ()| {
- futures_io::write_all(socket, BUF).map(|(socket, _)| socket)
+ tokio_core::io::write_all(socket, BUF).map(|(socket, _)| socket)
}).map(|_| ()).boxed()
}
diff --git a/src/channel.rs b/src/channel.rs
index cd5fef46..3da44b53 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -3,10 +3,10 @@ use std::sync::mpsc::TryRecvError;
use futures::{Future, Poll};
use futures::stream::Stream;
-use futures_io::IoFuture;
use mio::channel;
use {ReadinessStream, LoopHandle};
+use io::IoFuture;
/// The transmission half of a channel used for sending messages to a receiver.
///
diff --git a/src/io/copy.rs b/src/io/copy.rs
new file mode 100644
index 00000000..042e819b
--- /dev/null
+++ b/src/io/copy.rs
@@ -0,0 +1,82 @@
+use std::io::{self, Read, Write};
+
+use futures::{Future, Poll};
+
+/// A future which will copy all data from a reader into a writer.
+///
+/// Created by the `copy` function, this future will resolve to the number of
+/// bytes copied or an error if one happens.
+pub struct Copy<R, W> {
+ reader: R,
+ read_done: bool,
+ writer: W,
+ pos: usize,
+ cap: usize,
+ amt: u64,
+ buf: Box<[u8]>,
+}
+
+/// Creates a future which represents copying all the bytes from one object to
+/// another.
+///
+/// The returned future will copy all the bytes read from `reader` into the
+/// `writer` specified. This future will only complete once the `reader` has hit
+/// EOF and all bytes have been written to and flushed from the `writer`
+/// provided.
+///
+/// On success the number of bytes is returned and the `reader` and `writer` are
+/// consumed. On error the error is returned and the I/O objects are consumed as
+/// well.
+pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W>
+ where R: Read,
+ W: Write,
+{
+ Copy {
+ reader: reader,
+ read_done: false,
+ writer: writer,
+ amt: 0,
+ pos: 0,
+ cap: 0,
+ buf: Box::new([0; 2048]),
+ }
+}
+
+impl<R, W> Future for Copy<R, W>
+ where R: Read,
+ W: Write,
+{
+ type Item = u64;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<u64, io::Error> {
+ loop {
+ // If our buffer is empty, then we need to read some data to
+ // continue.
+ if self.pos == self.cap && !self.read_done {
+ let n = try_nb!(self.reader.read(&mut self.buf));
+ if n == 0 {
+ self.read_done = true;
+ } else {
+ self.pos = 0;
+ self.cap = n;
+ }
+ }
+
+ // If our buffer has some data, let's write it out!
+ while self.pos < self.cap {
+ let i = try_nb!(self.writer.write(&self.buf[self.pos..self.cap]));
+ self.pos += i;
+ self.amt += i as u64;
+ }
+
+ // If we've written al the data and we've seen EOF, flush out the
+ // data and finish the transfer.
+ // done with the entire transfer.
+ if self.pos == self.cap && self.read_done {
+ try_nb!(self.writer.flush());
+ return Poll::Ok(self.amt)
+ }
+ }
+ }
+}
diff --git a/src/io/flush.rs b/src/io/flush.rs
new file mode 100644
index 00000000..159a178b
--- /dev/null
+++ b/src/io/flush.rs
@@ -0,0 +1,39 @@
+use std::io::{self, Write};
+
+use futures::{Poll, Future};
+
+/// A future used to fully flush an I/O object.
+///
+/// Resolves to the underlying I/O object once the flush operation is complete.
+///
+/// Created by the `flush` function.
+pub struct Flush<A> {
+ a: Option<A>,
+}
+
+/// Creates a future which will entirely flush an I/O object and then yield the
+/// object itself.
+///
+/// This function will consume the object provided if an error happens, and
+/// otherwise it will repeatedly call `flush` until it sees `Ok(())`, scheduling
+/// a retry if `WouldBlock` is seen along the way.
+pub fn flush<A>(a: A) -> Flush<A>
+ where A: Write,
+{
+ Flush {
+ a: Some(a),
+ }
+}
+
+impl<A> Future for Flush<A>
+ where A: Write,
+{
+ type Item = A;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<A, io::Error> {
+ try_nb!(self.a.as_mut().unwrap().flush());
+ Poll::Ok(self.a.take().unwrap())
+ }
+}
+
diff --git a/src/io/mod.rs b/src/io/mod.rs
new file mode 100644
index 00000000..41a73978
--- /dev/null
+++ b/src/io/mod.rs
@@ -0,0 +1,47 @@
+//! I/O conveniences when working with primitives in `tokio-core`
+//!
+//! Contains various combinators to work with I/O objects and type definitions
+//! as well.
+
+use std::io;
+
+use futures::BoxFuture;
+use futures::stream::BoxStream;
+
+/// A convenience typedef around a `Future` whose error component is `io::Error`
+pub type IoFuture<T> = BoxFuture<T, io::Error>;
+
+/// A convenience typedef around a `Stream` whose error component is `io::Error`
+pub type IoStream<T> = BoxStream<T, io::Error>;
+
+/// A convenience macro for working with `io::Result<T>` from the `Read` and
+/// `Write` traits.
+///
+/// This macro takes `io::Result<T>` as input, and returns `T` as the output. If
+/// the input type is of the `Err` variant, then `Poll::NotReady` is returned if
+/// it indicates `WouldBlock` or otherwise `Err` is returned.
+#[macro_export]
+macro_rules! try_nb {
+ ($e:expr) => (match $e {
+ Ok(t) => t,
+ Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => {
+ return ::futures::Poll::NotReady
+ }
+ Err(e) => return ::futures::Poll::Err(e.into()),
+ })
+}
+
+mod copy;
+mod flush;
+mod read_exact;
+mod read_to_end;
+mod task;
+mod window;
+mod write_all;
+pub use self::copy::{copy, Copy};
+pub use self::flush::{flush, Flush};
+pub use self::read_exact::{read_exact, ReadExact};
+pub use self::read_to_end::{read_to_end, ReadToEnd};
+pub use self::task::{TaskIo, TaskIoRead, TaskIoWrite};
+pub use self::window::Window;
+pub use self::write_all::{write_all, WriteAll};
diff --git a/src/io/read_exact.rs b/src/io/read_exact.rs
new file mode 100644
index 00000000..b251bfea
--- /dev/null
+++ b/src/io/read_exact.rs
@@ -0,0 +1,77 @@
+use std::io::{self, Read};
+use std::mem;
+
+use futures::{Poll, Future};
+
+/// A future which can be used to easily read the entire contents of a stream
+/// into a vector.
+///
+/// Created by the `read_exact` function.
+pub struct ReadExact<A, T> {
+ state: State<A, T>,
+}
+
+enum State<A, T> {
+ Reading {
+ a: A,
+ buf: T,
+ pos: usize,
+ },
+ Empty,
+}
+
+/// Creates a future which will read exactly enough bytes to fill `buf`,
+/// returning an error if EOF is hit sooner.
+///
+/// The returned future will resolve to both the I/O stream as well as the
+/// buffer once the read operation is completed.
+///
+/// In the case of an error the buffer and the object will be discarded, with
+/// the error yielded. In the case of success the object will be destroyed and
+/// the buffer will be returned, with all data read from the stream appended to
+/// the buffer.
+pub fn read_exact<A, T>(a: A, buf: T) -> ReadExact<A, T>
+ where A: Read,
+ T: AsMut<[u8]>,
+{
+ ReadExact {
+ state: State::Reading {
+ a: a,
+ buf: buf,
+ pos: 0,
+ },
+ }
+}
+
+fn eof() -> io::Error {
+ io::Error::new(io::ErrorKind::UnexpectedEof, "early eof")
+}
+
+impl<A, T> Future for ReadExact<A, T>
+ where A: Read,
+ T: AsMut<[u8]>,
+{
+ type Item = (A, T);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, T), io::Error> {
+ match self.state {
+ State::Reading { ref mut a, ref mut buf, ref mut pos } => {
+ let buf = buf.as_mut();
+ while *pos < buf.len() {
+ let n = try_nb!(a.read(&mut buf[*pos..]));
+ *pos += n;
+ if n == 0 {
+ return Poll::Err(eof())
+ }
+ }
+ }
+ State::Empty => panic!("poll a WriteAll after it's done"),
+ }
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Reading { a, buf, .. } => Poll::Ok((a, buf)),
+ State::Empty => panic!(),
+ }
+ }
+}
diff --git a/src/io/read_to_end.rs b/src/io/read_to_end.rs
new file mode 100644
index 00000000..80e7cd6c
--- /dev/null
+++ b/src/io/read_to_end.rs
@@ -0,0 +1,62 @@
+use std::io::{self, Read};
+use std::mem;
+
+use futures::{Poll, Future};
+
+/// A future which can be used to easily read the entire contents of a stream
+/// into a vector.
+///
+/// Created by the `read_to_end` function.
+pub struct ReadToEnd<A> {
+ state: State<A>,
+}
+
+enum State<A> {
+ Reading {
+ a: A,
+ buf: Vec<u8>,
+ },
+ Empty,
+}
+
+/// Creates a future which will read all the bytes associated with the I/O
+/// object `A` into the buffer provided.
+///
+/// In the case of an error the buffer and the object will be discarded, with
+/// the error yielded. In the case of success the object will be destroyed and
+/// the buffer will be returned, with all data read from the stream appended to
+/// the buffer.
+pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A>
+ where A: Read,
+{
+ ReadToEnd {
+ state: State::Reading {
+ a: a,
+ buf: buf,
+ }
+ }
+}
+
+impl<A> Future for ReadToEnd<A>
+ where A: Read,
+{
+ type Item = (A, Vec<u8>);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> {
+ match self.state {
+ State::Reading { ref mut a, ref mut buf } => {
+ // If we get `Ok`, then we know the stream hit EOF and we're done. If we
+ // hit "would block" then all the read data so far is in our buffer, and
+ // otherwise we propagate errors
+ try_nb!(a.read_to_end(buf));
+ },
+ State::Empty => panic!("poll ReadToEnd after it's done"),
+ }
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Reading { a, buf } => Poll::Ok((a, buf)),
+ State::Empty => unreachable!(),
+ }
+ }
+}
diff --git a/src/io/task.rs b/src/io/task.rs
new file mode 100644
index 00000000..3c87a32e
--- /dev/null
+++ b/src/io/task.rs
@@ -0,0 +1,102 @@
+use std::cell::RefCell;
+use std::io::{self, Read, Write};
+
+use futures::task::TaskData;
+
+/// Abstraction that allows inserting an I/O object into task-local storage,
+/// returning a handle that can be split.
+///
+/// A `TaskIo<T>` handle implements the `ReadTask` and `WriteTask` and will only
+/// work with the same task that the associated object was inserted into. The
+/// handle may then be optionally `split` into the read/write halves so they can
+/// be worked with independently.
+///
+/// Note that it is important that the future returned from `TaskIo::new`, when
+/// polled, will pin the yielded `TaskIo<T>` object to that specific task. Any
+/// attempt to read or write the object on other tasks will result in a panic.
+pub struct TaskIo<T> {
+ handle: TaskData<RefCell<T>>,
+}
+
+/// The readable half of a `TaskIo<T>` instance returned from `TaskIo::split`.
+///
+/// This handle implements the `ReadTask` trait and can be used to split up an
+/// I/O object into two distinct halves.
+pub struct TaskIoRead<T> {
+ handle: TaskData<RefCell<T>>,
+}
+
+/// The writable half of a `TaskIo<T>` instance returned from `TaskIo::split`.
+///
+/// This handle implements the `WriteTask` trait and can be used to split up an
+/// I/O object into two distinct halves.
+pub struct TaskIoWrite<T> {
+ handle: TaskData<RefCell<T>>,
+}
+
+impl<T> TaskIo<T> {
+ /// Returns a new future which represents the insertion of the I/O object
+ /// `T` into task local storage, returning a `TaskIo<T>` handle to it.
+ ///
+ /// The returned future will never resolve to an error.
+ pub fn new(t: T) -> TaskIo<T> {
+ TaskIo {
+ handle: TaskData::new(RefCell::new(t)),
+ }
+ }
+}
+
+impl<T> TaskIo<T>
+ where T: Read + Write,
+{
+ /// For an I/O object which is both readable and writable, this method can
+ /// be used to split the handle into two independently owned halves.
+ ///
+ /// The returned pair implements the `ReadTask` and `WriteTask` traits,
+ /// respectively, and can be used to pass around the object to different
+ /// combinators if necessary.
+ pub fn split(self) -> (TaskIoRead<T>, TaskIoWrite<T>) {
+ (TaskIoRead { handle: self.handle.clone() },
+ TaskIoWrite { handle: self.handle })
+ }
+}
+
+impl<T> Read for TaskIo<T>
+ where T: io::Read,
+{
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.handle.with(|t| t.borrow_mut().read(buf))
+ }
+}
+
+impl<T> Write for TaskIo<T>
+ where T: io::Write,
+{
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.handle.with(|t| t.borrow_mut().write(buf))
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.handle.with(|t| t.borrow_mut().flush())
+ }
+}
+
+impl<T> Read for TaskIoRead<T>
+ where T: io::Read,
+{
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.handle.with(|t| t.borrow_mut().read(buf))
+ }
+}
+
+impl<T> Write for TaskIoWrite<T>
+ where T: io::Write,
+{
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.handle.with(|t| t.borrow_mut().write(buf))
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.handle.with(|t| t.borrow_mut().flush())
+ }
+}
diff --git a/src/io/window.rs b/src/io/window.rs
new file mode 100644
index 00000000..c6136099
--- /dev/null
+++ b/src/io/window.rs
@@ -0,0 +1,116 @@
+use std::ops;
+
+/// A owned window around an underlying buffer.
+///
+/// Normally slices work great for considering sub-portions of a buffer, but
+/// unfortunately a slice is a *borrowed* type in Rust which has an associated
+/// lifetime. When working with future and async I/O these lifetimes are not
+/// always appropriate, and are sometimes difficult to store in tasks. This
+/// type strives to fill this gap by providing an "owned slice" around an
+/// underlying buffer of bytes.
+///
+/// A `Window<T>` wraps an underlying buffer, `T`, and has configurable
+/// start/end indexes to alter the behavior of the `AsRef<[u8]>` implementation
+/// that this type carries.
+///
+/// This type can be particularly useful when working with the `write_all`
+/// combinator in this crate. Data can be sliced via `Window`, consumed by
+/// `write_all`, and then earned back once the write operation finishes through
+/// the `into_inner` method on this type.
+pub struct Window<T> {
+ inner: T,
+ range: ops::Range<usize>,
+}
+
+impl<T: AsRef<[u8]>> Window<T> {
+ /// Creates a new window around the buffer `t` defaulting to the entire
+ /// slice.
+ ///
+ /// Further methods can be called on the returned `Window<T>` to alter the
+ /// window into the data provided.
+ pub fn new(t: T) -> Window<T> {
+ Window {
+ range: 0..t.as_ref().len(),
+ inner: t,
+ }
+ }
+
+ /// Gets a shared reference to the underlying buffer inside of this
+ /// `Window`.
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ /// Gets a mutable reference to the underlying buffer inside of this
+ /// `Window`.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+
+ /// Consumes this `Window`, returning the underlying buffer.
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+
+ /// Returns the starting index of this window into the underlying buffer
+ /// `T`.
+ pub fn start(&self) -> usize {
+ self.range.start
+ }
+
+ /// Returns the end index of this window into the underlying buffer
+ /// `T`.
+ pub fn end(&self) -> usize {
+ self.range.end
+ }
+
+ /// Changes the starting index of this window to the index specified.
+ ///
+ /// Returns the windows back to chain multiple calls to this method.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `start` is out of bounds for the underlying
+ /// slice or if it comes after the `end` configured in this window.
+ pub fn set_start(&mut self, start: usize) -> &mut Window<T> {
+ assert!(start < self.inner.as_ref().len());
+ assert!(start <= self.range.end);
+ self.range.start = start;
+ self
+ }
+
+ /// Changes the end index of this window to the index specified.
+ ///
+ /// Returns the windows back to chain multiple calls to this method.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `end` is out of bounds for the underlying
+ /// slice or if it comes after the `end` configured in this window.
+ pub fn set_end(&mut self, end: usize) -> &mut Window<T> {
+ assert!(end < self.inner.as_ref().len());
+ assert!(self.range.start <= end);
+ self.range.end = end;
+ self
+ }
+
+ // TODO: how about a generic set() method along the lines of:
+ //
+ // buffer.set(..3)
+ // .set(0..2)
+ // .set(4..)
+ //
+ // etc.
+}
+
+impl<T: AsRef<[u8]>> AsRef<[u8]> for Window<T> {
+ fn as_ref(&self) -> &[u8] {
+ &self.inner.as_ref()[self.range.start..self.range.end]
+ }
+}
+
+impl<T: AsMut<[u8]>> AsMut<[u8]> for Window<T> {
+ fn as_mut(&mut self) -> &mut [u8] {
+ &mut self.inner.as_mut()[self.range.start..self.range.end]
+ }
+}
diff --git a/src/io/write_all.rs b/src/io/write_all.rs
new file mode 100644
index 00000000..df4751df
--- /dev/null
+++ b/src/io/write_all.rs
@@ -0,0 +1,80 @@
+use std::io::{self, Write};
+use std::mem;
+
+use futures::{Poll, Future};
+
+/// A future used to write the entire contents of some data to a stream.
+///
+/// This is created by the `write_all` top-level method.
+pub struct WriteAll<A, T> {
+ state: State<A, T>,
+}
+
+enum State<A, T> {
+ Writing {
+ a: A,
+ buf: T,
+ pos: usize,
+ },
+ Empty,
+}
+
+/// Creates a future that will write the entire contents of the buffer `buf` to
+/// the stream `a` provided.
+///
+/// The returned future will not return until all the data has been written, and
+/// the future will resolve to the stream as well as the buffer (for reuse if
+/// needed).
+///
+/// Any error which happens during writing will cause both the stream and the
+/// buffer to get destroyed.
+///
+/// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should
+/// be broadly applicable to accepting data which can be converted to a slice.
+/// The `Window` struct is also available in this crate to provide a different
+/// window into a slice if necessary.
+pub fn write_all<A, T>(a: A, buf: T) -> WriteAll<A, T>
+ where A: Write,
+ T: AsRef<[u8]>,
+{
+ WriteAll {
+ state: State::Writing {
+ a: a,
+ buf: buf,
+ pos: 0,
+ },
+ }
+}
+
+fn zero_write() -> io::Error {
+ io::Error::new(io::ErrorKind::WriteZero, "zero-length write")
+}
+
+impl<A, T> Future for WriteAll<A, T>
+ where A: Write,
+ T: AsRef<[u8]>,
+{
+ type Item = (A, T);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, T), io::Error> {
+ match self.state {
+ State::Writing { ref mut a, ref buf, ref mut pos } => {
+ let buf = buf.as_ref();
+ while *pos < buf.len() {
+ let n = try_nb!(a.write(&buf[*pos..]));
+ *pos += n;
+ if n == 0 {
+ return Poll::Err(zero_write())
+ }
+ }
+ }
+ State::Empty => panic!("poll a WriteAll after it's done"),
+ }
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Writing { a, buf, .. } => Poll::Ok((a, buf)),
+ State::Empty => panic!(),
+ }
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index 449dc600..3250a0dc 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -6,7 +6,6 @@
#![deny(missing_docs)]
extern crate futures;
-extern crate futures_io;
extern crate mio;
extern crate slab;
@@ -16,9 +15,7 @@ extern crate scoped_tls;
#[macro_use]
extern crate log;
-#[path = "../../src/slot.rs"]
mod slot;
-#[path = "../../src/lock.rs"]
mod lock;
mod channel;
@@ -30,6 +27,8 @@ mod timeout;
mod timer_wheel;
mod udp;
+pub mod io;
+
pub use channel::{Sender, Receiver};
pub use event_loop::{Loop, LoopPin, LoopHandle, AddSource, AddTimeout};
pub use event_loop::{LoopData, AddLoopData, TimeoutToken, IoToken};
diff --git a/src/lock.rs b/src/lock.rs
new file mode 100644
index 00000000..e5bc6b2b
--- /dev/null
+++ b/src/lock.rs
@@ -0,0 +1,106 @@
+//! A "mutex" which only supports try_lock
+//!
+//! As a futures library the eventual call to an event loop should be the only
+//! thing that ever blocks, so this is assisted with a fast user-space
+//! implementation of a lock that can only have a `try_lock` operation.
+
+extern crate core;
+
+use self::core::cell::UnsafeCell;
+use self::core::ops::{Deref, DerefMut};
+use self::core::sync::atomic::Ordering::{Acquire, Release};
+use self::core::sync::atomic::AtomicBool;
+
+/// A "mutex" around a value, similar to `std::sync::Mutex<T>`.
+///
+/// This lock only supports the `try_lock` operation, however, and does not
+/// implement poisoning.
+pub struct Lock<T> {
+ locked: AtomicBool,
+ data: UnsafeCell<T>,
+}
+
+/// Sentinel representing an acquired lock through which the data can be
+/// accessed.
+pub struct TryLock<'a, T: 'a> {
+ __ptr: &'a Lock<T>,
+}
+
+// The `Lock` structure is basically just a `Mutex<T>`, and these two impls are
+// intended to mirror the standard library's corresponding impls for `Mutex<T>`.
+//
+// If a `T` is sendable across threads, so is the lock, and `T` must be sendable
+// across threads to be `Sync` because it allows mutable access from multiple
+// threads.
+unsafe impl<T: Send> Send for Lock<T> {}
+unsafe impl<T: Send> Sync for Lock<T> {}
+
+impl<T> Lock<T> {
+ /// Creates a new lock around the given value.
+ pub fn new(t: T) -> Lock<T> {
+ Lock {
+ locked: AtomicBool::new(false),
+ data: UnsafeCell::new(t),
+ }
+ }
+
+ /// Attempts to acquire this lock, returning whether the lock was acquired or
+ /// not.
+ ///
+ /// If `Some` is returned then the data this lock protects can be accessed
+ /// through the sentinel. This sentinel allows both mutable and immutable
+ /// access.
+ ///
+ /// If `None` is returned then the lock is already locked, either elsewhere
+ /// on this thread or on another thread.
+ pub fn try_lock(&self) -> Option<TryLock<T>> {
+ if !self.locked.swap(true, Acquire) {
+ Some(TryLock { __ptr: self })
+ } else {
+ None
+ }
+ }
+}
+
+impl<'a, T> Deref for TryLock<'a, T> {
+ type Target = T;
+ fn deref(&self) -> &T {
+ // The existence of `TryLock` represents that we own the lock, so we
+ // can safely access the data here.
+ unsafe { &*self.__ptr.data.get() }
+ }
+}
+
+impl<'a, T> DerefMut for TryLock<'a, T> {
+ fn deref_mut(&mut self) -> &mut T {
+ // The existence of `TryLock` represents that we own the lock, so we
+ // can safely access the data here.
+ //
+ // Additionally, we're the *only* `TryLock` in existence so mutable
+ // access should be ok.
+ unsafe { &mut *self.__ptr.data.get() }
+ }
+}
+
+impl<'a, T> Drop for TryLock<'a, T> {
+ fn drop(&mut self) {
+ self.__ptr.locked.store(false, Release);
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::Lock;
+
+ #[test]
+ fn smoke() {
+ let a = Lock::new(1);
+ let mut a1 = a.try_lock().unwrap();
+ assert!(a.try_lock().is_none());
+ assert_eq!(*a1, 1);
+ *a1 = 2;
+ drop(a1);
+ assert_eq!(*a.try_lock().unwrap(), 2);
+ assert_eq!(*a.try_lock().unwrap(), 2);
+ }
+}
diff --git a/src/slot.rs b/src/slot.rs
new file mode 100644
index 00000000..d802c987
--- /dev/null
+++ b/src/slot.rs
@@ -0,0 +1,691 @@
+//! A slot in memory for communicating between a producer and a consumer.
+//!
+//! This module contains an implementation detail of this library for a type
+//! which is only intended to be shared between one consumer and one producer of
+//! a value. It is unlikely that this module will survive stabilization of this
+//! library, so it is not recommended to rely on it.
+
+#![allow(dead_code)] // imported in a few places
+
+use std::prelude::v1::*;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+use lock::Lock;
+
+/// A slot in memory intended to represent the communication channel between one
+/// producer and one consumer.
+///
+/// Each slot contains space for a piece of data of type `T`, and can have
+/// callbacks registered to run when the slot is either full or empty.
+///
+/// Slots are only intended to be shared between exactly one producer and
+/// exactly one consumer. If there are multiple concurrent producers or
+/// consumers then this is still memory safe but will have unpredictable results
+/// (and maybe panics). Note that this does not require that the "consumer" is
+/// the same for the entire lifetime of a slot, simply that there is only one
+/// consumer at a time.
+///
+/// # Registering callbacks
+///
+/// [`on_empty`](#method.on_empty) registers a callback to run when the slot
+/// becomes empty, and [`on_full`](#method.on_full) registers one to run when it
+/// becomes full. In both cases, the callback will run immediately if possible.
+///
+/// At most one callback can be registered at any given time: it is an error to
+/// attempt to register a callback with `on_full` if one is currently registered
+/// via `on_empty`, or any other combination.
+///
+/// # Cancellation
+///
+/// Registering a callback returns a `Token` which can be used to
+/// [`cancel`](#method.cancel) the callback. Only callbacks that have not yet
+/// started running can be canceled. Canceling a callback that has already run
+/// is not an error, and `cancel` does not signal whether or not the callback
+/// was actually canceled to the caller.
+pub struct Slot<T> {
+ // The purpose of this data type is to communicate when a value becomes
+ // available and coordinate between a producer and consumer about that
+ // value. Slots end up being at the core of many futures as they handle
+ // values transferring between producers and consumers, which means that
+ // they can never block.
+ //
+ // As a result, this `Slot` is a lock-free implementation in terms of not
+ // actually blocking at any point in time. The `Lock` types are
+ // half-optional and half-not-optional. They aren't actually mutexes as they