diff options
author | Alex Crichton <alex@alexcrichton.com> | 2016-09-02 11:07:52 -0700 |
---|---|---|
committer | Alex Crichton <alex@alexcrichton.com> | 2016-09-07 22:12:14 -0700 |
commit | 6c045d31ac78df7b6b190935c2024c90b420da28 (patch) | |
tree | 645a3e9e28959ea09ccd00809f3a226cb842bacd | |
parent | 93c61bb384b8097a4897661eb877fc6a8440a02a (diff) |
Reorganize the entire crate:
Renamed APIs
* Loop => reactor::Core
* LoopHandle => reactor::Handle
* LoopPin => reactor::Pinned
* TcpStream => net::TcpStream
* TcpListener => net::TcpListener
* UdpSocket => net::UdpSocket
* Sender => channel::Sender
* Receiver => channel::Receiver
* Timeout => reactor::Timeout
* ReadinessStream => reactor::PollEvented
* All `LoopHandle` methods to construct objects are now free functions on the
associated types, e.g. `LoopHandle::tcp_listen` is now `TcpListener::bind`
* All APIs taking a `Handle` now take a `Handle` as the last argument
* All future-returning APIs now return concrete types instead of trait objects
Added APIs
* io::Io trait -- Read + Write + ability to poll
Removed without replacement:
* AddSource
* AddTimeout
* IoToken
* TimeoutToken
Closes #3
Closes #6
-rw-r--r-- | examples/echo.rs | 7 | ||||
-rw-r--r-- | examples/sink.rs | 8 | ||||
-rw-r--r-- | src/channel.rs | 72 | ||||
-rw-r--r-- | src/io/mod.rs | 75 | ||||
-rw-r--r-- | src/lib.rs | 114 | ||||
-rw-r--r-- | src/net/mod.rs | 11 | ||||
-rw-r--r-- | src/net/tcp.rs (renamed from src/tcp.rs) | 262 | ||||
-rw-r--r-- | src/net/udp.rs (renamed from src/udp.rs) | 52 | ||||
-rw-r--r-- | src/reactor/channel.rs (renamed from src/event_loop/channel.rs) | 0 | ||||
-rw-r--r-- | src/reactor/io_token.rs (renamed from src/event_loop/source.rs) | 76 | ||||
-rw-r--r-- | src/reactor/mod.rs (renamed from src/event_loop/mod.rs) | 97 | ||||
-rw-r--r-- | src/reactor/poll_evented.rs (renamed from src/readiness_stream.rs) | 193 | ||||
-rw-r--r-- | src/reactor/timeout.rs (renamed from src/timeout.rs) | 53 | ||||
-rw-r--r-- | src/reactor/timeout_token.rs (renamed from src/event_loop/timeout.rs) | 77 | ||||
-rw-r--r-- | tests/buffered.rs | 6 | ||||
-rw-r--r-- | tests/chain.rs | 6 | ||||
-rw-r--r-- | tests/echo.rs | 6 | ||||
-rw-r--r-- | tests/limit.rs | 6 | ||||
-rw-r--r-- | tests/poll.rs | 64 | ||||
-rw-r--r-- | tests/spawn.rs | 6 | ||||
-rw-r--r-- | tests/stream-buffered.rs | 6 | ||||
-rw-r--r-- | tests/tcp.rs | 22 | ||||
-rw-r--r-- | tests/timeout.rs | 5 | ||||
-rw-r--r-- | tests/udp.rs | 9 |
24 files changed, 784 insertions, 449 deletions
diff --git a/examples/echo.rs b/examples/echo.rs index 1116d683..4c8417c8 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -9,8 +9,9 @@ use std::net::SocketAddr; use futures::Future; use futures::stream::Stream; -use tokio_core::Loop; use tokio_core::io::{copy, TaskIo}; +use tokio_core::net::TcpListener; +use tokio_core::reactor::Core; fn main() { env_logger::init().unwrap(); @@ -18,11 +19,11 @@ fn main() { let addr = addr.parse::<SocketAddr>().unwrap(); // Create the event loop that will drive this server - let mut l = Loop::new().unwrap(); + let mut l = Core::new().unwrap(); let pin = l.pin(); // Create a TCP listener which will listen for incoming connections - let server = l.handle().tcp_listen(&addr); + let server = TcpListener::bind(&addr, &l.handle()); let done = server.and_then(move |socket| { // Once we've got the TCP listener, inform that we have it diff --git a/examples/sink.rs b/examples/sink.rs index 21ba71d2..c825274c 100644 --- a/examples/sink.rs +++ b/examples/sink.rs @@ -14,14 +14,16 @@ use std::net::SocketAddr; use futures::Future; use futures::stream::{self, Stream}; use tokio_core::io::IoFuture; +use tokio_core::net::{TcpListener, TcpStream}; +use tokio_core::reactor::Core; fn main() { env_logger::init().unwrap(); let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse::<SocketAddr>().unwrap(); - let mut l = tokio_core::Loop::new().unwrap(); - let server = l.handle().tcp_listen(&addr).and_then(|socket| { + let mut l = Core::new().unwrap(); + let server = TcpListener::bind(&addr, &l.handle()).and_then(|socket| { socket.incoming().and_then(|(socket, addr)| { println!("got a socket: {}", addr); write(socket).or_else(|_| Ok(())) @@ -34,7 +36,7 @@ fn main() { l.run(server).unwrap(); } -fn write(socket: tokio_core::TcpStream) -> IoFuture<()> { +fn write(socket: TcpStream) -> IoFuture<()> { static BUF: &'static [u8] = &[0; 64 * 1024]; let iter = iter::repeat(()).map(|()| Ok(())); stream::iter(iter).fold(socket, |socket, ()| { diff --git a/src/channel.rs b/src/channel.rs index c0c03b5e..151abf95 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -1,3 +1,8 @@ +//! In-memory evented channels. +//! +//! This module contains a `Sender` and `Receiver` pair types which can be used +//! to send messages between different future tasks. + use std::io; use std::sync::mpsc::TryRecvError; @@ -5,15 +10,15 @@ use futures::{Future, Poll, Async}; use futures::stream::Stream; use mio::channel; -use {ReadinessStream, LoopHandle}; use io::IoFuture; +use reactor::{Handle, PollEvented}; /// The transmission half of a channel used for sending messages to a receiver. /// /// A `Sender` can be `clone`d to have multiple threads or instances sending /// messages to one receiver. /// -/// This type is created by the `LoopHandle::channel` method. +/// This type is created by the `channel` function. pub struct Sender<T> { tx: channel::Sender<T>, } @@ -24,32 +29,36 @@ pub struct Sender<T> { /// A `Receiver` cannot be cloned, so only one thread can receive messages at a /// time. /// -/// This type is created by the `LoopHandle::channel` method and implements the -/// `Stream` trait to represent received messages. +/// This type is created by the `channel` function and implements the `Stream` +/// trait to represent received messages. pub struct Receiver<T> { - rx: ReadinessStream<channel::Receiver<T>>, + rx: PollEvented<channel::Receiver<T>>, } -impl LoopHandle { - /// Creates a new in-memory channel used for sending data across `Send + - /// 'static` boundaries, frequently threads. - /// - /// This type can be used to conveniently send messages between futures. - /// Unlike the futures crate `channel` method and types, the returned tx/rx - /// pair is a multi-producer single-consumer (mpsc) channel *with no - /// backpressure*. Currently it's left up to the application to implement a - /// mechanism, if necessary, to avoid messages piling up. - /// - /// The returned `Sender` can be used to send messages that are processed by - /// the returned `Receiver`. The `Sender` can be cloned to send messages - /// from multiple sources simultaneously. - pub fn channel<T>(self) -> (Sender<T>, IoFuture<Receiver<T>>) - where T: Send + 'static, - { - let (tx, rx) = channel::channel(); - let rx = ReadinessStream::new(self, rx).map(|rx| Receiver { rx: rx }); - (Sender { tx: tx }, rx.boxed()) - } +/// Future returned by the `channel` function which will resolve to a +/// `Receiver<T>`. +pub struct ReceiverNew<T> { + inner: IoFuture<Receiver<T>>, +} + +/// Creates a new in-memory channel used for sending data across `Send + +/// 'static` boundaries, frequently threads. +/// +/// This type can be used to conveniently send messages between futures. +/// Unlike the futures crate `channel` method and types, the returned tx/rx +/// pair is a multi-producer single-consumer (mpsc) channel *with no +/// backpressure*. Currently it's left up to the application to implement a +/// mechanism, if necessary, to avoid messages piling up. +/// +/// The returned `Sender` can be used to send messages that are processed by +/// the returned `Receiver`. The `Sender` can be cloned to send messages +/// from multiple sources simultaneously. +pub fn channel<T>(handle: &Handle) -> (Sender<T>, ReceiverNew<T>) + where T: Send + 'static, +{ + let (tx, rx) = channel::channel(); + let rx = PollEvented::new(rx, handle).map(|rx| Receiver { rx: rx }); + (Sender { tx: tx }, ReceiverNew { inner: rx.boxed() }) } impl<T> Sender<T> { @@ -87,7 +96,9 @@ impl<T> Stream for Receiver<T> { type Error = io::Error; fn poll(&mut self) -> Poll<Option<T>, io::Error> { - try_ready!(self.rx.poll_read()); + if let Async::NotReady = self.rx.poll_read() { + return Ok(Async::NotReady) + } match self.rx.get_ref().try_recv() { Ok(t) => Ok(Async::Ready(Some(t))), Err(TryRecvError::Empty) => { @@ -98,3 +109,12 @@ impl<T> Stream for Receiver<T> { } } } + +impl<T> Future for ReceiverNew<T> { + type Item = Receiver<T>; + type Error = io::Error; + + fn poll(&mut self) -> Poll<Receiver<T>, io::Error> { + self.inner.poll() + } +} diff --git a/src/io/mod.rs b/src/io/mod.rs index 6eb97686..a9f3a8f2 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -3,9 +3,9 @@ //! Contains various combinators to work with I/O objects and type definitions //! as well. -use std::io; +use std::io::{self, Read, Write}; -use futures::BoxFuture; +use futures::{BoxFuture, Async}; use futures::stream::BoxStream; /// A convenience typedef around a `Future` whose error component is `io::Error` @@ -45,3 +45,74 @@ pub use self::read_to_end::{read_to_end, ReadToEnd}; pub use self::task::{TaskIo, TaskIoRead, TaskIoWrite}; pub use self::window::Window; pub use self::write_all::{write_all, WriteAll}; + +/// A trait for read/write I/O objects +/// +/// This trait represents I/O object which are readable and writable. +/// Additionally, they're associated with the ability to test whether they're +/// readable or writable. +/// +/// Imporantly, the methods of this trait are intended to be used in conjuction +/// with the current task of a future. Namely whenever any of them return a +/// value that indicates "would block" the current future's task is arranged to +/// receive a notification when the method would otherwise not indicate that it +/// would block. +pub trait Io: Read + Write { + /// Tests to see if this I/O object may be readable. + /// + /// This method returns an `Async<()>` indicating whether the object + /// **might** be readable. It is possible that even if this method returns + /// `Async::Ready` that a call to `read` would return a `WouldBlock` error. + /// + /// There is a default implementation for this function which always + /// indicates that an I/O object is readable, but objects which can + /// implement a finer grained version of this are recommended to do so. + /// + /// If this function returns `Async::NotReady` then the current future's + /// task is arranged to receive a notification when it might not return + /// `NotReady`. + /// + /// # Panics + /// + /// This method is likely to panic if called from outside the context of a + /// future's task. + fn poll_read(&mut self) -> Async<()> { + Async::Ready(()) + } + + /// Tests to see if this I/O object may be writable. + /// + /// This method returns an `Async<()>` indicating whether the object + /// **might** be writable. It is possible that even if this method returns + /// `Async::Ready` that a call to `write` would return a `WouldBlock` error. + /// + /// There is a default implementation for this function which always + /// indicates that an I/O object is writable, but objects which can + /// implement a finer grained version of this are recommended to do so. + /// + /// If this function returns `Async::NotReady` then the current future's + /// task is arranged to receive a notification when it might not return + /// `NotReady`. + /// + /// # Panics + /// + /// This method is likely to panic if called from outside the context of a + /// future's task. + fn poll_write(&mut self) -> Async<()> { + Async::Ready(()) + } + + /// Helper method for splitting this read/write object into two halves. + /// + /// The two halves returned implement the `Read` and `Write` traits, + /// respectively, but are only usable on the current task. + /// + /// # Panics + /// + /// This method will panic if there is not currently an active future task. + fn task_split(self) -> (TaskIoRead<Self>, TaskIoWrite<Self>) + where Self: Sized + { + TaskIo::new(self).split() + } +} @@ -1,7 +1,98 @@ -//! Mio bindings with streams and futures +//! `Future`-powered I/O at the core of Tokio //! -//! This crate uses the `futures_io` and `futures` crates to provide a thin -//! binding on top of mio of TCP and UDP sockets. +//! This crate uses the `futures` crate to provide an event loop ("reactor +//! core") which can be used to drive I/O like TCP and UDP, spawned future +//! tasks, and other events like channels/timeouts. All asynchronous I/O is +//! powered by the `mio` crate. +//! +//! The concrete types provided in this crate are relatively bare bones but are +//! intended to be the essential foundation for further projects needing an +//! event loop. In this crate you'll find: +//! +//! * TCP, both streams and listeners +//! * UDP sockets +//! * Message queues +//! * Timeouts +//! +//! More functionality is likely to be added over time, but otherwise the crate +//! is intended to be flexible with the `PollEvented` type which accepts any +//! type which implements `mio::Evented`. Using this if you'd like Unix domain +//! sockets, for example, the `tokio-uds` is built externally to offer this +//! functionality. +//! +//! Some other important tasks covered by this crate are: +//! +//! * The ability to spawn futures into an even loop. The `Handle` and `Pinned` +//! types have a `spawn` method which allows executing a future on an event +//! loop. The `Pinned::spawn` method crucially does not require the future +//! itself to be `Send`. +//! +//! * The `Io` trait serves as an abstraction for future crates to build on top +//! of. This packages up `Read` and `Write` functionality as well as the +//! ability to poll for readiness on both ends. +//! +//! * All I/O is futures-aware. If any action in this crate returns "not ready" +//! or "would block", then the current future task is scheduled to receive a +//! notification when it would otherwise make progress. +//! +//! # Examples +//! +//! A simple TCP echo server: +//! +//! ```no_run +//! extern crate futures; +//! extern crate tokio_core; +//! +//! use std::env; +//! use std::net::SocketAddr; +//! +//! use futures::Future; +//! use futures::stream::Stream; +//! use tokio_core::io::{copy, Io}; +//! use tokio_core::net::TcpListener; +//! use tokio_core::reactor::Core; +//! +//! fn main() { +//! let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); +//! let addr = addr.parse::<SocketAddr>().unwrap(); +//! +//! // Create the event loop that will drive this server +//! let mut l = Core::new().unwrap(); +//! let pin = l.pin(); +//! +//! // Create a TCP listener which will listen for incoming connections +//! let server = TcpListener::bind(&addr, pin.handle()); +//! +//! let done = server.and_then(|socket| { +//! // Once we've got the TCP listener, inform that we have it +//! println!("Listening on: {}", addr); +//! +//! // Pull out the stream of incoming connections and then for each new +//! // one spin up a new task copying data. +//! // +//! // We use the `io::copy` future to copy all data from the +//! // reading half onto the writing half. +//! socket.incoming().for_each(|(socket, addr)| { +//! let pair = futures::lazy(|| Ok(socket.task_split())); +//! let amt = pair.and_then(|(reader, writer)| copy(reader, writer)); +//! +//! // Once all that is done we print out how much we wrote, and then +//! // critically we *spawn* this future which allows it to run +//! // concurrently with other connections. +//! pin.spawn(amt.then(move |result| { +//! println!("wrote {:?} bytes to {}", result, addr); +//! Ok(()) +//! })); +//! +//! Ok(()) +//! }) +//! }); +//! +//! // Execute our server (modeled as a future) and wait for it to +//! // complete. +//! l.run(done).unwrap(); +//! } +//! ``` #![deny(missing_docs)] @@ -22,19 +113,8 @@ mod lock; #[macro_use] pub mod io; -mod channel; -mod event_loop; mod mpsc_queue; -mod readiness_stream; -mod tcp; -mod timeout; mod timer_wheel; -mod udp; - -pub use channel::{Sender, Receiver}; -pub use event_loop::{Loop, LoopPin, LoopHandle, AddSource, AddTimeout}; -pub use event_loop::{TimeoutToken, IoToken}; -pub use readiness_stream::ReadinessStream; -pub use tcp::{TcpListener, TcpStream}; -pub use timeout::Timeout; -pub use udp::UdpSocket; +pub mod channel; +pub mod net; +pub mod reactor; diff --git a/src/net/mod.rs b/src/net/mod.rs new file mode 100644 index 00000000..296ff873 --- /dev/null +++ b/src/net/mod.rs @@ -0,0 +1,11 @@ +//! TCP/UDP bindings for `tokio-core` +//! +//! This module contains the TCP/UDP networking types, similar to the standard +//! library, which can be used to implement networking protocols. + +mod tcp; +mod udp; + +pub use self::tcp::{TcpStream, TcpStreamNew}; +pub use self::tcp::{TcpListener, TcpListenerNew, Incoming}; +pub use self::udp::{UdpSocket, UdpSocketNew}; diff --git a/src/tcp.rs b/src/net/tcp.rs index be94931b..463a042d 100644 --- a/src/tcp.rs +++ b/src/net/tcp.rs @@ -7,30 +7,45 @@ use futures::stream::Stream; use futures::{Future, IntoFuture, failed, Poll, Async}; use mio; -use {ReadinessStream, LoopHandle}; -use io::{IoFuture, IoStream}; +use io::{Io, IoFuture, IoStream}; +use reactor::{Handle, PollEvented}; /// An I/O object representing a TCP socket listening for incoming connections. /// /// This object can be converted into a stream of incoming connections for /// various forms of processing. pub struct TcpListener { - io: ReadinessStream<mio::tcp::TcpListener>, + io: PollEvented<mio::tcp::TcpListener>, +} + +/// Future which will resolve to a `TcpListener` +pub struct TcpListenerNew { + inner: IoFuture<TcpListener>, +} + +/// Stream returned by the `TcpListener::incoming` function representing the +/// stream of sockets received from a listener. +pub struct Incoming { + inner: IoStream<(TcpStream, SocketAddr)>, } impl TcpListener { - fn new(listener: mio::tcp::TcpListener, - handle: LoopHandle) -> IoFuture<TcpListener> { - ReadinessStream::new(handle, listener).map(|io| { - TcpListener { - io: io, - } - }).boxed() + /// Create a new TCP listener associated with this event loop. + /// + /// The TCP listener will bind to the provided `addr` address, if available, + /// and will be returned as a future. The returned future, if resolved + /// successfully, can then be used to accept incoming connections. + pub fn bind(addr: &SocketAddr, handle: &Handle) -> TcpListenerNew { + let future = match mio::tcp::TcpListener::bind(addr) { + Ok(l) => TcpListener::new(l, handle), + Err(e) => failed(e).boxed(), + }; + TcpListenerNew { inner: future } } /// Create a new TCP listener from the standard library's TCP listener. /// - /// This method can be used when the `LoopHandle::tcp_listen` method isn't + /// This method can be used when the `Handle::tcp_listen` method isn't /// sufficient because perhaps some more configuration is needed in terms of /// before the calls to `bind` and `listen`. /// @@ -57,15 +72,23 @@ impl TcpListener { /// well (same for IPv6). pub fn from_listener(listener: net::TcpListener, addr: &SocketAddr, - handle: LoopHandle) -> IoFuture<TcpListener> { + handle: &Handle) -> IoFuture<TcpListener> { + let handle = handle.clone(); mio::tcp::TcpListener::from_listener(listener, addr) .into_future() - .and_then(|l| TcpListener::new(l, handle)) + .and_then(move |l| TcpListener::new(l, &handle)) .boxed() } + fn new(listener: mio::tcp::TcpListener, handle: &Handle) + -> IoFuture<TcpListener> { + PollEvented::new(listener, handle).map(|io| { + TcpListener { io: io } + }).boxed() + } + /// Test whether this socket is ready to be read or not. - pub fn poll_read(&self) -> Poll<(), io::Error> { + pub fn poll_read(&self) -> Async<()> { self.io.poll_read() } @@ -82,17 +105,19 @@ impl TcpListener { /// /// This method returns an implementation of the `Stream` trait which /// resolves to the sockets the are accepted on this listener. - pub fn incoming(self) -> IoStream<(TcpStream, SocketAddr)> { - struct Incoming { + pub fn incoming(self) -> Incoming { + struct MyIncoming { inner: TcpListener, } - impl Stream for Incoming { + impl Stream for MyIncoming { type Item = (mio::tcp::TcpStream, SocketAddr); type Error = io::Error; fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> { - try_ready!(self.inner.io.poll_read()); + if let Async::NotReady = self.inner.io.poll_read() { + return Ok(Async::NotReady) + } match self.inner.io.get_ref().accept() { Ok(pair) => Ok(Async::Ready(Some(pair))), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { @@ -104,13 +129,15 @@ impl TcpListener { } } - let loop_handle = self.io.loop_handle().clone(); - Incoming { inner: self } - .and_then(move |(tcp, addr)| { - ReadinessStream::new(loop_handle.clone(), tcp).map(move |io| { + let handle = self.io.handle().clone(); + let stream = MyIncoming { inner: self }; + Incoming { + inner: stream.and_then(move |(tcp, addr)| { + PollEvented::new(tcp, &handle).map(move |io| { (TcpStream { io: io }, addr) }) - }).boxed() + }).boxed(), + } } /// Sets the value for the `IP_TTL` option on this socket. @@ -158,6 +185,24 @@ impl fmt::Debug for TcpListener { } } +impl Future for TcpListenerNew { + type Item = TcpListener; + type Error = io::Error; + + fn poll(&mut self) -> Poll<TcpListener, io::Error> { + self.inner.poll() + } +} + +impl Stream for Incoming { + type Item = (TcpStream, SocketAddr); + type Error = io::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> { + self.inner.poll() + } +} + /// An I/O object representing a TCP stream connected to a remote endpoint. /// /// A TCP stream can either be created by connecting to an endpoint or by @@ -165,27 +210,21 @@ impl fmt::Debug for TcpListener { /// raw underlying I/O object as well as streams for the read/write /// notifications on the stream itself. pub struct TcpStream { - io: ReadinessStream<mio::tcp::TcpStream>, + io: PollEvented<mio::tcp::TcpStream>, } -enum TcpStreamNew { +/// Future returned by `TcpStream::connect` which will resolve to a `TcpStream` +/// when the stream is connected. +pub struct TcpStreamNew { + inner: IoFuture<TcpStream>, +} + +enum TcpStreamConnect { Waiting(TcpStream), Empty, } -impl LoopHandle { - /// Create a new TCP listener associated with this event loop. - /// - /// The TCP listener will bind to the provided `addr` address, if available, - /// and will be returned as a future. The returned future, if resolved - /// successfully, can then be used to accept incoming connections. - pub fn tcp_listen(self, addr: &SocketAddr) -> IoFuture<TcpListener> { - match mio::tcp::TcpListener::bind(addr) { - Ok(l) => TcpListener::new(l, self), - Err(e) => failed(e).boxed(), - } - } - +impl TcpStream { /// Create a new TCP stream connected to the specified address. /// /// This function will create a new TCP socket and attempt to connect it to @@ -193,20 +232,18 @@ impl LoopHandle { /// stream has successfully connected. If an error happens during the /// connection or during the socket creation, that error will be returned to /// the future instead. - pub fn tcp_connect(self, addr: &SocketAddr) -> IoFuture<TcpStream> { - match mio::tcp::TcpStream::connect(addr) { - Ok(tcp) => TcpStream::new(tcp, self), + pub fn connect(addr: &SocketAddr, handle: &Handle) -> TcpStreamNew { + let future = match mio::tcp::TcpStream::connect(addr) { + Ok(tcp) => TcpStream::new(tcp, handle), Err(e) => failed(e).boxed(), - } + }; + TcpStreamNew { inner: future } } -} -impl TcpStream { - fn new(connected_stream: mio::tcp::TcpStream, - handle: LoopHandle) + fn new(connected_stream: mio::tcp::TcpStream, handle: &Handle) -> IoFuture<TcpStream> { - ReadinessStream::new(handle, connected_stream).and_then(|io| { - TcpStreamNew::Waiting(TcpStream { io: io }) + PollEvented::new(connected_stream, handle).and_then(|io| { + TcpStreamConnect::Waiting(TcpStream { io: io }) }).boxed() } @@ -230,7 +267,7 @@ impl TcpStream { /// (perhaps to `INADDR_ANY`) before this method is called. pub fn connect_stream(stream: net::TcpStream, addr: &SocketAddr, - handle: LoopHandle) -> IoFuture<TcpStream> { + handle: &Handle) -> IoFuture<TcpStream> { match mio::tcp::TcpStream::connect_stream(stream, addr) { Ok(tcp) => TcpStream::new(tcp, handle), Err(e) => failed(e).boxed(), @@ -243,7 +280,7 @@ impl TcpStream { /// get a notification when the socket does become readable. That is, this /// is only suitable for calling in a `Future::poll` method and will /// automatically handle ensuring a retry once the socket is readable again. - pub fn poll_read(&self) -> Poll<(), io::Error> { + pub fn poll_read(&self) -> Async<()> { self.io.poll_read() } @@ -253,7 +290,7 @@ impl TcpStream { /// get a notification when the socket does become writable. That is, this /// is only suitable for calling in a `Future::poll` method and will /// automatically handle ensuring a retry once the socket is writable again. - pub fn poll_write(&self) -> Poll<(), io::Error> { + pub fn poll_write(&self) -> Async<()> { self.io.poll_write() } @@ -340,91 +377,54 @@ impl TcpStream { } } -impl Future for TcpStreamNew { - type Item = TcpStream; - type Error = io::Error; - - fn poll(&mut self) -> Poll<TcpStream, io::Error> { - { - let stream = match *self { - TcpStreamNew::Waiting(ref s) => s, - TcpStreamNew::Empty => panic!("can't poll TCP stream twice"), - }; - - // Once we've connected, wait for the stream to be writable as - // that's when the actual connection has been initiated. Once we're - // writable we check for `take_socket_error` to see if the connect - // actually hit an error or not. - // - // If all that succeeded then we ship everything on up. - try_ready!(stream.io.poll_write()); - if let Some(e) = try!(stream.io.get_ref().take_error()) { - return Err(e) - } - } - match mem::replace(self, TcpStreamNew::Empty) { - TcpStreamNew::Waiting(stream) => Ok(Async::Ready(stream)), - TcpStreamNew::Empty => panic!(), - } - } -} - impl Read for TcpStream { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - <&TcpStream>::read(&mut &*self, buf) + self.io.read(buf) } } impl Write for TcpStream { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - <&TcpStream>::write(&mut &*self, buf) + self.io.write(buf) } fn flush(&mut self) -> io::Result<()> { - <&TcpStream>::flush(&mut &*self) + self.io.flush() + } +} + +impl Io for TcpStream { + fn poll_read(&mut self) -> Async<()> { + <TcpStream>::poll_read(self) + } + + fn poll_write(&mut self) -> Async<()> { + <TcpStream>::poll_write(self) } } impl<'a> Read for &'a TcpStream { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - if let Async::NotReady = try!(self.io.poll_read()) { - return Err(mio::would_block()) - } - let r = self.io.get_ref().read(buf); - if is_wouldblock(&r) { - self.io.need_read(); - } - r + (&self.io).read(buf) } } impl<'a> Write for &'a TcpStream { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - if let Async::NotReady = try!(self.io.poll_write()) { - return Err(mio::would_block()) - } - let r = self.io.get_ref().write(buf); - if is_wouldblock(&r) { - self.io.need_write(); - } - r + (&self.io).write(buf) } fn flush(&mut self) -> io::Result<()> { - if let Async::NotReady = try!(self.io.poll_write()) { - return Err(mio::would_block()) - } - let r = self.io.get_ref().flush(); - if is_wouldblock(&r) { - self.io.need_write(); - } - r + (&self.io).flush() } } -fn is_wouldblock<T>(r: &io::Result<T>) -> bool { - match *r { - Ok(_) => false, - Err(ref e) => e.kind() == io::ErrorKind::WouldBlock, +impl<'a> Io for &'a TcpStream { + fn poll_read(&mut self) -> Async<()> { + <TcpStream>::poll_read(self) + } + + fn poll_write(&mut self) -> Async<()> { + <TcpStream>::poll_write(self) } } @@ -434,6 +434,46 @@ impl fmt::Debug for TcpStream { } } +impl Future for TcpStreamNew { + type Item = TcpStream; + type Error = io::Error; + + fn poll(&mut self) -> Poll<TcpStream, io::Error> { + self.inner.poll() + } +} + +impl Future for TcpStreamConnect { + type Item = TcpStream; + type Error = io::Error; + + fn poll(&mut self) -> Poll<TcpStream, io::Error> { + { + let stream = match *self { + TcpStreamConnect::Waiting(ref s) => s, + TcpStreamConnect::Empty => panic!("can't poll TCP stream twice"), + }; + + // Once we've connected, wait for the stream to be writable as + // that's when the actual connection has been initiated. Once we're + // writable we check for `take_socket_error` to see if the connect + // actually hit an error or not. + // + // If all that succeeded then we ship everything on up. + if let Async::NotReady = stream.io.poll_write() { + return Ok(Async::NotReady) + } + if let Some(e) = try!(stream.io.get_ref().take_error()) { + return Err(e) + } + } + match mem::replace(self, TcpStreamConnect::Empty) { + TcpStreamConnect::Waiting(stream) => Ok(Async::Ready(stream)), + TcpStreamConnect::Empty => panic!(), + } + } +} + #[cfg(unix)] mod sys { use std::os::unix::prelude::*; |