summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2016-09-02 11:07:52 -0700
committerAlex Crichton <alex@alexcrichton.com>2016-09-07 22:12:14 -0700
commit6c045d31ac78df7b6b190935c2024c90b420da28 (patch)
tree645a3e9e28959ea09ccd00809f3a226cb842bacd
parent93c61bb384b8097a4897661eb877fc6a8440a02a (diff)
Reorganize the entire crate:
Renamed APIs * Loop => reactor::Core * LoopHandle => reactor::Handle * LoopPin => reactor::Pinned * TcpStream => net::TcpStream * TcpListener => net::TcpListener * UdpSocket => net::UdpSocket * Sender => channel::Sender * Receiver => channel::Receiver * Timeout => reactor::Timeout * ReadinessStream => reactor::PollEvented * All `LoopHandle` methods to construct objects are now free functions on the associated types, e.g. `LoopHandle::tcp_listen` is now `TcpListener::bind` * All APIs taking a `Handle` now take a `Handle` as the last argument * All future-returning APIs now return concrete types instead of trait objects Added APIs * io::Io trait -- Read + Write + ability to poll Removed without replacement: * AddSource * AddTimeout * IoToken * TimeoutToken Closes #3 Closes #6
-rw-r--r--examples/echo.rs7
-rw-r--r--examples/sink.rs8
-rw-r--r--src/channel.rs72
-rw-r--r--src/io/mod.rs75
-rw-r--r--src/lib.rs114
-rw-r--r--src/net/mod.rs11
-rw-r--r--src/net/tcp.rs (renamed from src/tcp.rs)262
-rw-r--r--src/net/udp.rs (renamed from src/udp.rs)52
-rw-r--r--src/reactor/channel.rs (renamed from src/event_loop/channel.rs)0
-rw-r--r--src/reactor/io_token.rs (renamed from src/event_loop/source.rs)76
-rw-r--r--src/reactor/mod.rs (renamed from src/event_loop/mod.rs)97
-rw-r--r--src/reactor/poll_evented.rs (renamed from src/readiness_stream.rs)193
-rw-r--r--src/reactor/timeout.rs (renamed from src/timeout.rs)53
-rw-r--r--src/reactor/timeout_token.rs (renamed from src/event_loop/timeout.rs)77
-rw-r--r--tests/buffered.rs6
-rw-r--r--tests/chain.rs6
-rw-r--r--tests/echo.rs6
-rw-r--r--tests/limit.rs6
-rw-r--r--tests/poll.rs64
-rw-r--r--tests/spawn.rs6
-rw-r--r--tests/stream-buffered.rs6
-rw-r--r--tests/tcp.rs22
-rw-r--r--tests/timeout.rs5
-rw-r--r--tests/udp.rs9
24 files changed, 784 insertions, 449 deletions
diff --git a/examples/echo.rs b/examples/echo.rs
index 1116d683..4c8417c8 100644
--- a/examples/echo.rs
+++ b/examples/echo.rs
@@ -9,8 +9,9 @@ use std::net::SocketAddr;
use futures::Future;
use futures::stream::Stream;
-use tokio_core::Loop;
use tokio_core::io::{copy, TaskIo};
+use tokio_core::net::TcpListener;
+use tokio_core::reactor::Core;
fn main() {
env_logger::init().unwrap();
@@ -18,11 +19,11 @@ fn main() {
let addr = addr.parse::<SocketAddr>().unwrap();
// Create the event loop that will drive this server
- let mut l = Loop::new().unwrap();
+ let mut l = Core::new().unwrap();
let pin = l.pin();
// Create a TCP listener which will listen for incoming connections
- let server = l.handle().tcp_listen(&addr);
+ let server = TcpListener::bind(&addr, &l.handle());
let done = server.and_then(move |socket| {
// Once we've got the TCP listener, inform that we have it
diff --git a/examples/sink.rs b/examples/sink.rs
index 21ba71d2..c825274c 100644
--- a/examples/sink.rs
+++ b/examples/sink.rs
@@ -14,14 +14,16 @@ use std::net::SocketAddr;
use futures::Future;
use futures::stream::{self, Stream};
use tokio_core::io::IoFuture;
+use tokio_core::net::{TcpListener, TcpStream};
+use tokio_core::reactor::Core;
fn main() {
env_logger::init().unwrap();
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
- let mut l = tokio_core::Loop::new().unwrap();
- let server = l.handle().tcp_listen(&addr).and_then(|socket| {
+ let mut l = Core::new().unwrap();
+ let server = TcpListener::bind(&addr, &l.handle()).and_then(|socket| {
socket.incoming().and_then(|(socket, addr)| {
println!("got a socket: {}", addr);
write(socket).or_else(|_| Ok(()))
@@ -34,7 +36,7 @@ fn main() {
l.run(server).unwrap();
}
-fn write(socket: tokio_core::TcpStream) -> IoFuture<()> {
+fn write(socket: TcpStream) -> IoFuture<()> {
static BUF: &'static [u8] = &[0; 64 * 1024];
let iter = iter::repeat(()).map(|()| Ok(()));
stream::iter(iter).fold(socket, |socket, ()| {
diff --git a/src/channel.rs b/src/channel.rs
index c0c03b5e..151abf95 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -1,3 +1,8 @@
+//! In-memory evented channels.
+//!
+//! This module contains a `Sender` and `Receiver` pair types which can be used
+//! to send messages between different future tasks.
+
use std::io;
use std::sync::mpsc::TryRecvError;
@@ -5,15 +10,15 @@ use futures::{Future, Poll, Async};
use futures::stream::Stream;
use mio::channel;
-use {ReadinessStream, LoopHandle};
use io::IoFuture;
+use reactor::{Handle, PollEvented};
/// The transmission half of a channel used for sending messages to a receiver.
///
/// A `Sender` can be `clone`d to have multiple threads or instances sending
/// messages to one receiver.
///
-/// This type is created by the `LoopHandle::channel` method.
+/// This type is created by the `channel` function.
pub struct Sender<T> {
tx: channel::Sender<T>,
}
@@ -24,32 +29,36 @@ pub struct Sender<T> {
/// A `Receiver` cannot be cloned, so only one thread can receive messages at a
/// time.
///
-/// This type is created by the `LoopHandle::channel` method and implements the
-/// `Stream` trait to represent received messages.
+/// This type is created by the `channel` function and implements the `Stream`
+/// trait to represent received messages.
pub struct Receiver<T> {
- rx: ReadinessStream<channel::Receiver<T>>,
+ rx: PollEvented<channel::Receiver<T>>,
}
-impl LoopHandle {
- /// Creates a new in-memory channel used for sending data across `Send +
- /// 'static` boundaries, frequently threads.
- ///
- /// This type can be used to conveniently send messages between futures.
- /// Unlike the futures crate `channel` method and types, the returned tx/rx
- /// pair is a multi-producer single-consumer (mpsc) channel *with no
- /// backpressure*. Currently it's left up to the application to implement a
- /// mechanism, if necessary, to avoid messages piling up.
- ///
- /// The returned `Sender` can be used to send messages that are processed by
- /// the returned `Receiver`. The `Sender` can be cloned to send messages
- /// from multiple sources simultaneously.
- pub fn channel<T>(self) -> (Sender<T>, IoFuture<Receiver<T>>)
- where T: Send + 'static,
- {
- let (tx, rx) = channel::channel();
- let rx = ReadinessStream::new(self, rx).map(|rx| Receiver { rx: rx });
- (Sender { tx: tx }, rx.boxed())
- }
+/// Future returned by the `channel` function which will resolve to a
+/// `Receiver<T>`.
+pub struct ReceiverNew<T> {
+ inner: IoFuture<Receiver<T>>,
+}
+
+/// Creates a new in-memory channel used for sending data across `Send +
+/// 'static` boundaries, frequently threads.
+///
+/// This type can be used to conveniently send messages between futures.
+/// Unlike the futures crate `channel` method and types, the returned tx/rx
+/// pair is a multi-producer single-consumer (mpsc) channel *with no
+/// backpressure*. Currently it's left up to the application to implement a
+/// mechanism, if necessary, to avoid messages piling up.
+///
+/// The returned `Sender` can be used to send messages that are processed by
+/// the returned `Receiver`. The `Sender` can be cloned to send messages
+/// from multiple sources simultaneously.
+pub fn channel<T>(handle: &Handle) -> (Sender<T>, ReceiverNew<T>)
+ where T: Send + 'static,
+{
+ let (tx, rx) = channel::channel();
+ let rx = PollEvented::new(rx, handle).map(|rx| Receiver { rx: rx });
+ (Sender { tx: tx }, ReceiverNew { inner: rx.boxed() })
}
impl<T> Sender<T> {
@@ -87,7 +96,9 @@ impl<T> Stream for Receiver<T> {
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<T>, io::Error> {
- try_ready!(self.rx.poll_read());
+ if let Async::NotReady = self.rx.poll_read() {
+ return Ok(Async::NotReady)
+ }
match self.rx.get_ref().try_recv() {
Ok(t) => Ok(Async::Ready(Some(t))),
Err(TryRecvError::Empty) => {
@@ -98,3 +109,12 @@ impl<T> Stream for Receiver<T> {
}
}
}
+
+impl<T> Future for ReceiverNew<T> {
+ type Item = Receiver<T>;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Receiver<T>, io::Error> {
+ self.inner.poll()
+ }
+}
diff --git a/src/io/mod.rs b/src/io/mod.rs
index 6eb97686..a9f3a8f2 100644
--- a/src/io/mod.rs
+++ b/src/io/mod.rs
@@ -3,9 +3,9 @@
//! Contains various combinators to work with I/O objects and type definitions
//! as well.
-use std::io;
+use std::io::{self, Read, Write};
-use futures::BoxFuture;
+use futures::{BoxFuture, Async};
use futures::stream::BoxStream;
/// A convenience typedef around a `Future` whose error component is `io::Error`
@@ -45,3 +45,74 @@ pub use self::read_to_end::{read_to_end, ReadToEnd};
pub use self::task::{TaskIo, TaskIoRead, TaskIoWrite};
pub use self::window::Window;
pub use self::write_all::{write_all, WriteAll};
+
+/// A trait for read/write I/O objects
+///
+/// This trait represents I/O object which are readable and writable.
+/// Additionally, they're associated with the ability to test whether they're
+/// readable or writable.
+///
+/// Imporantly, the methods of this trait are intended to be used in conjuction
+/// with the current task of a future. Namely whenever any of them return a
+/// value that indicates "would block" the current future's task is arranged to
+/// receive a notification when the method would otherwise not indicate that it
+/// would block.
+pub trait Io: Read + Write {
+ /// Tests to see if this I/O object may be readable.
+ ///
+ /// This method returns an `Async<()>` indicating whether the object
+ /// **might** be readable. It is possible that even if this method returns
+ /// `Async::Ready` that a call to `read` would return a `WouldBlock` error.
+ ///
+ /// There is a default implementation for this function which always
+ /// indicates that an I/O object is readable, but objects which can
+ /// implement a finer grained version of this are recommended to do so.
+ ///
+ /// If this function returns `Async::NotReady` then the current future's
+ /// task is arranged to receive a notification when it might not return
+ /// `NotReady`.
+ ///
+ /// # Panics
+ ///
+ /// This method is likely to panic if called from outside the context of a
+ /// future's task.
+ fn poll_read(&mut self) -> Async<()> {
+ Async::Ready(())
+ }
+
+ /// Tests to see if this I/O object may be writable.
+ ///
+ /// This method returns an `Async<()>` indicating whether the object
+ /// **might** be writable. It is possible that even if this method returns
+ /// `Async::Ready` that a call to `write` would return a `WouldBlock` error.
+ ///
+ /// There is a default implementation for this function which always
+ /// indicates that an I/O object is writable, but objects which can
+ /// implement a finer grained version of this are recommended to do so.
+ ///
+ /// If this function returns `Async::NotReady` then the current future's
+ /// task is arranged to receive a notification when it might not return
+ /// `NotReady`.
+ ///
+ /// # Panics
+ ///
+ /// This method is likely to panic if called from outside the context of a
+ /// future's task.
+ fn poll_write(&mut self) -> Async<()> {
+ Async::Ready(())
+ }
+
+ /// Helper method for splitting this read/write object into two halves.
+ ///
+ /// The two halves returned implement the `Read` and `Write` traits,
+ /// respectively, but are only usable on the current task.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if there is not currently an active future task.
+ fn task_split(self) -> (TaskIoRead<Self>, TaskIoWrite<Self>)
+ where Self: Sized
+ {
+ TaskIo::new(self).split()
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index 0b0864f6..c31e1504 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,7 +1,98 @@
-//! Mio bindings with streams and futures
+//! `Future`-powered I/O at the core of Tokio
//!
-//! This crate uses the `futures_io` and `futures` crates to provide a thin
-//! binding on top of mio of TCP and UDP sockets.
+//! This crate uses the `futures` crate to provide an event loop ("reactor
+//! core") which can be used to drive I/O like TCP and UDP, spawned future
+//! tasks, and other events like channels/timeouts. All asynchronous I/O is
+//! powered by the `mio` crate.
+//!
+//! The concrete types provided in this crate are relatively bare bones but are
+//! intended to be the essential foundation for further projects needing an
+//! event loop. In this crate you'll find:
+//!
+//! * TCP, both streams and listeners
+//! * UDP sockets
+//! * Message queues
+//! * Timeouts
+//!
+//! More functionality is likely to be added over time, but otherwise the crate
+//! is intended to be flexible with the `PollEvented` type which accepts any
+//! type which implements `mio::Evented`. Using this if you'd like Unix domain
+//! sockets, for example, the `tokio-uds` is built externally to offer this
+//! functionality.
+//!
+//! Some other important tasks covered by this crate are:
+//!
+//! * The ability to spawn futures into an even loop. The `Handle` and `Pinned`
+//! types have a `spawn` method which allows executing a future on an event
+//! loop. The `Pinned::spawn` method crucially does not require the future
+//! itself to be `Send`.
+//!
+//! * The `Io` trait serves as an abstraction for future crates to build on top
+//! of. This packages up `Read` and `Write` functionality as well as the
+//! ability to poll for readiness on both ends.
+//!
+//! * All I/O is futures-aware. If any action in this crate returns "not ready"
+//! or "would block", then the current future task is scheduled to receive a
+//! notification when it would otherwise make progress.
+//!
+//! # Examples
+//!
+//! A simple TCP echo server:
+//!
+//! ```no_run
+//! extern crate futures;
+//! extern crate tokio_core;
+//!
+//! use std::env;
+//! use std::net::SocketAddr;
+//!
+//! use futures::Future;
+//! use futures::stream::Stream;
+//! use tokio_core::io::{copy, Io};
+//! use tokio_core::net::TcpListener;
+//! use tokio_core::reactor::Core;
+//!
+//! fn main() {
+//! let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
+//! let addr = addr.parse::<SocketAddr>().unwrap();
+//!
+//! // Create the event loop that will drive this server
+//! let mut l = Core::new().unwrap();
+//! let pin = l.pin();
+//!
+//! // Create a TCP listener which will listen for incoming connections
+//! let server = TcpListener::bind(&addr, pin.handle());
+//!
+//! let done = server.and_then(|socket| {
+//! // Once we've got the TCP listener, inform that we have it
+//! println!("Listening on: {}", addr);
+//!
+//! // Pull out the stream of incoming connections and then for each new
+//! // one spin up a new task copying data.
+//! //
+//! // We use the `io::copy` future to copy all data from the
+//! // reading half onto the writing half.
+//! socket.incoming().for_each(|(socket, addr)| {
+//! let pair = futures::lazy(|| Ok(socket.task_split()));
+//! let amt = pair.and_then(|(reader, writer)| copy(reader, writer));
+//!
+//! // Once all that is done we print out how much we wrote, and then
+//! // critically we *spawn* this future which allows it to run
+//! // concurrently with other connections.
+//! pin.spawn(amt.then(move |result| {
+//! println!("wrote {:?} bytes to {}", result, addr);
+//! Ok(())
+//! }));
+//!
+//! Ok(())
+//! })
+//! });
+//!
+//! // Execute our server (modeled as a future) and wait for it to
+//! // complete.
+//! l.run(done).unwrap();
+//! }
+//! ```
#![deny(missing_docs)]
@@ -22,19 +113,8 @@ mod lock;
#[macro_use]
pub mod io;
-mod channel;
-mod event_loop;
mod mpsc_queue;
-mod readiness_stream;
-mod tcp;
-mod timeout;
mod timer_wheel;
-mod udp;
-
-pub use channel::{Sender, Receiver};
-pub use event_loop::{Loop, LoopPin, LoopHandle, AddSource, AddTimeout};
-pub use event_loop::{TimeoutToken, IoToken};
-pub use readiness_stream::ReadinessStream;
-pub use tcp::{TcpListener, TcpStream};
-pub use timeout::Timeout;
-pub use udp::UdpSocket;
+pub mod channel;
+pub mod net;
+pub mod reactor;
diff --git a/src/net/mod.rs b/src/net/mod.rs
new file mode 100644
index 00000000..296ff873
--- /dev/null
+++ b/src/net/mod.rs
@@ -0,0 +1,11 @@
+//! TCP/UDP bindings for `tokio-core`
+//!
+//! This module contains the TCP/UDP networking types, similar to the standard
+//! library, which can be used to implement networking protocols.
+
+mod tcp;
+mod udp;
+
+pub use self::tcp::{TcpStream, TcpStreamNew};
+pub use self::tcp::{TcpListener, TcpListenerNew, Incoming};
+pub use self::udp::{UdpSocket, UdpSocketNew};
diff --git a/src/tcp.rs b/src/net/tcp.rs
index be94931b..463a042d 100644
--- a/src/tcp.rs
+++ b/src/net/tcp.rs
@@ -7,30 +7,45 @@ use futures::stream::Stream;
use futures::{Future, IntoFuture, failed, Poll, Async};
use mio;
-use {ReadinessStream, LoopHandle};
-use io::{IoFuture, IoStream};
+use io::{Io, IoFuture, IoStream};
+use reactor::{Handle, PollEvented};
/// An I/O object representing a TCP socket listening for incoming connections.
///
/// This object can be converted into a stream of incoming connections for
/// various forms of processing.
pub struct TcpListener {
- io: ReadinessStream<mio::tcp::TcpListener>,
+ io: PollEvented<mio::tcp::TcpListener>,
+}
+
+/// Future which will resolve to a `TcpListener`
+pub struct TcpListenerNew {
+ inner: IoFuture<TcpListener>,
+}
+
+/// Stream returned by the `TcpListener::incoming` function representing the
+/// stream of sockets received from a listener.
+pub struct Incoming {
+ inner: IoStream<(TcpStream, SocketAddr)>,
}
impl TcpListener {
- fn new(listener: mio::tcp::TcpListener,
- handle: LoopHandle) -> IoFuture<TcpListener> {
- ReadinessStream::new(handle, listener).map(|io| {
- TcpListener {
- io: io,
- }
- }).boxed()
+ /// Create a new TCP listener associated with this event loop.
+ ///
+ /// The TCP listener will bind to the provided `addr` address, if available,
+ /// and will be returned as a future. The returned future, if resolved
+ /// successfully, can then be used to accept incoming connections.
+ pub fn bind(addr: &SocketAddr, handle: &Handle) -> TcpListenerNew {
+ let future = match mio::tcp::TcpListener::bind(addr) {
+ Ok(l) => TcpListener::new(l, handle),
+ Err(e) => failed(e).boxed(),
+ };
+ TcpListenerNew { inner: future }
}
/// Create a new TCP listener from the standard library's TCP listener.
///
- /// This method can be used when the `LoopHandle::tcp_listen` method isn't
+ /// This method can be used when the `Handle::tcp_listen` method isn't
/// sufficient because perhaps some more configuration is needed in terms of
/// before the calls to `bind` and `listen`.
///
@@ -57,15 +72,23 @@ impl TcpListener {
/// well (same for IPv6).
pub fn from_listener(listener: net::TcpListener,
addr: &SocketAddr,
- handle: LoopHandle) -> IoFuture<TcpListener> {
+ handle: &Handle) -> IoFuture<TcpListener> {
+ let handle = handle.clone();
mio::tcp::TcpListener::from_listener(listener, addr)
.into_future()
- .and_then(|l| TcpListener::new(l, handle))
+ .and_then(move |l| TcpListener::new(l, &handle))
.boxed()
}
+ fn new(listener: mio::tcp::TcpListener, handle: &Handle)
+ -> IoFuture<TcpListener> {
+ PollEvented::new(listener, handle).map(|io| {
+ TcpListener { io: io }
+ }).boxed()
+ }
+
/// Test whether this socket is ready to be read or not.
- pub fn poll_read(&self) -> Poll<(), io::Error> {
+ pub fn poll_read(&self) -> Async<()> {
self.io.poll_read()
}
@@ -82,17 +105,19 @@ impl TcpListener {
///
/// This method returns an implementation of the `Stream` trait which
/// resolves to the sockets the are accepted on this listener.
- pub fn incoming(self) -> IoStream<(TcpStream, SocketAddr)> {
- struct Incoming {
+ pub fn incoming(self) -> Incoming {
+ struct MyIncoming {
inner: TcpListener,
}
- impl Stream for Incoming {
+ impl Stream for MyIncoming {
type Item = (mio::tcp::TcpStream, SocketAddr);
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
- try_ready!(self.inner.io.poll_read());
+ if let Async::NotReady = self.inner.io.poll_read() {
+ return Ok(Async::NotReady)
+ }
match self.inner.io.get_ref().accept() {
Ok(pair) => Ok(Async::Ready(Some(pair))),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
@@ -104,13 +129,15 @@ impl TcpListener {
}
}
- let loop_handle = self.io.loop_handle().clone();
- Incoming { inner: self }
- .and_then(move |(tcp, addr)| {
- ReadinessStream::new(loop_handle.clone(), tcp).map(move |io| {
+ let handle = self.io.handle().clone();
+ let stream = MyIncoming { inner: self };
+ Incoming {
+ inner: stream.and_then(move |(tcp, addr)| {
+ PollEvented::new(tcp, &handle).map(move |io| {
(TcpStream { io: io }, addr)
})
- }).boxed()
+ }).boxed(),
+ }
}
/// Sets the value for the `IP_TTL` option on this socket.
@@ -158,6 +185,24 @@ impl fmt::Debug for TcpListener {
}
}
+impl Future for TcpListenerNew {
+ type Item = TcpListener;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<TcpListener, io::Error> {
+ self.inner.poll()
+ }
+}
+
+impl Stream for Incoming {
+ type Item = (TcpStream, SocketAddr);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
+ self.inner.poll()
+ }
+}
+
/// An I/O object representing a TCP stream connected to a remote endpoint.
///
/// A TCP stream can either be created by connecting to an endpoint or by
@@ -165,27 +210,21 @@ impl fmt::Debug for TcpListener {
/// raw underlying I/O object as well as streams for the read/write
/// notifications on the stream itself.
pub struct TcpStream {
- io: ReadinessStream<mio::tcp::TcpStream>,
+ io: PollEvented<mio::tcp::TcpStream>,
}
-enum TcpStreamNew {
+/// Future returned by `TcpStream::connect` which will resolve to a `TcpStream`
+/// when the stream is connected.
+pub struct TcpStreamNew {
+ inner: IoFuture<TcpStream>,
+}
+
+enum TcpStreamConnect {
Waiting(TcpStream),
Empty,
}
-impl LoopHandle {
- /// Create a new TCP listener associated with this event loop.
- ///
- /// The TCP listener will bind to the provided `addr` address, if available,
- /// and will be returned as a future. The returned future, if resolved
- /// successfully, can then be used to accept incoming connections.
- pub fn tcp_listen(self, addr: &SocketAddr) -> IoFuture<TcpListener> {
- match mio::tcp::TcpListener::bind(addr) {
- Ok(l) => TcpListener::new(l, self),
- Err(e) => failed(e).boxed(),
- }
- }
-
+impl TcpStream {
/// Create a new TCP stream connected to the specified address.
///
/// This function will create a new TCP socket and attempt to connect it to
@@ -193,20 +232,18 @@ impl LoopHandle {
/// stream has successfully connected. If an error happens during the
/// connection or during the socket creation, that error will be returned to
/// the future instead.
- pub fn tcp_connect(self, addr: &SocketAddr) -> IoFuture<TcpStream> {
- match mio::tcp::TcpStream::connect(addr) {
- Ok(tcp) => TcpStream::new(tcp, self),
+ pub fn connect(addr: &SocketAddr, handle: &Handle) -> TcpStreamNew {
+ let future = match mio::tcp::TcpStream::connect(addr) {
+ Ok(tcp) => TcpStream::new(tcp, handle),
Err(e) => failed(e).boxed(),
- }
+ };
+ TcpStreamNew { inner: future }
}
-}
-impl TcpStream {
- fn new(connected_stream: mio::tcp::TcpStream,
- handle: LoopHandle)
+ fn new(connected_stream: mio::tcp::TcpStream, handle: &Handle)
-> IoFuture<TcpStream> {
- ReadinessStream::new(handle, connected_stream).and_then(|io| {
- TcpStreamNew::Waiting(TcpStream { io: io })
+ PollEvented::new(connected_stream, handle).and_then(|io| {
+ TcpStreamConnect::Waiting(TcpStream { io: io })
}).boxed()
}
@@ -230,7 +267,7 @@ impl TcpStream {
/// (perhaps to `INADDR_ANY`) before this method is called.
pub fn connect_stream(stream: net::TcpStream,
addr: &SocketAddr,
- handle: LoopHandle) -> IoFuture<TcpStream> {
+ handle: &Handle) -> IoFuture<TcpStream> {
match mio::tcp::TcpStream::connect_stream(stream, addr) {
Ok(tcp) => TcpStream::new(tcp, handle),
Err(e) => failed(e).boxed(),
@@ -243,7 +280,7 @@ impl TcpStream {
/// get a notification when the socket does become readable. That is, this
/// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the socket is readable again.
- pub fn poll_read(&self) -> Poll<(), io::Error> {
+ pub fn poll_read(&self) -> Async<()> {
self.io.poll_read()
}
@@ -253,7 +290,7 @@ impl TcpStream {
/// get a notification when the socket does become writable. That is, this
/// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the socket is writable again.
- pub fn poll_write(&self) -> Poll<(), io::Error> {
+ pub fn poll_write(&self) -> Async<()> {
self.io.poll_write()
}
@@ -340,91 +377,54 @@ impl TcpStream {
}
}
-impl Future for TcpStreamNew {
- type Item = TcpStream;
- type Error = io::Error;
-
- fn poll(&mut self) -> Poll<TcpStream, io::Error> {
- {
- let stream = match *self {
- TcpStreamNew::Waiting(ref s) => s,
- TcpStreamNew::Empty => panic!("can't poll TCP stream twice"),
- };
-
- // Once we've connected, wait for the stream to be writable as
- // that's when the actual connection has been initiated. Once we're
- // writable we check for `take_socket_error` to see if the connect
- // actually hit an error or not.
- //
- // If all that succeeded then we ship everything on up.
- try_ready!(stream.io.poll_write());
- if let Some(e) = try!(stream.io.get_ref().take_error()) {
- return Err(e)
- }
- }
- match mem::replace(self, TcpStreamNew::Empty) {
- TcpStreamNew::Waiting(stream) => Ok(Async::Ready(stream)),
- TcpStreamNew::Empty => panic!(),
- }
- }
-}
-
impl Read for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- <&TcpStream>::read(&mut &*self, buf)
+ self.io.read(buf)
}
}
impl Write for TcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- <&TcpStream>::write(&mut &*self, buf)
+ self.io.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
- <&TcpStream>::flush(&mut &*self)
+ self.io.flush()
+ }
+}
+
+impl Io for TcpStream {
+ fn poll_read(&mut self) -> Async<()> {
+ <TcpStream>::poll_read(self)
+ }
+
+ fn poll_write(&mut self) -> Async<()> {
+ <TcpStream>::poll_write(self)
}
}
impl<'a> Read for &'a TcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- if let Async::NotReady = try!(self.io.poll_read()) {
- return Err(mio::would_block())
- }
- let r = self.io.get_ref().read(buf);
- if is_wouldblock(&r) {
- self.io.need_read();
- }
- r
+ (&self.io).read(buf)
}
}
impl<'a> Write for &'a TcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- if let Async::NotReady = try!(self.io.poll_write()) {
- return Err(mio::would_block())
- }
- let r = self.io.get_ref().write(buf);
- if is_wouldblock(&r) {
- self.io.need_write();
- }
- r
+ (&self.io).write(buf)
}
fn flush(&mut self) -> io::Result<()> {
- if let Async::NotReady = try!(self.io.poll_write()) {
- return Err(mio::would_block())
- }
- let r = self.io.get_ref().flush();
- if is_wouldblock(&r) {
- self.io.need_write();
- }
- r
+ (&self.io).flush()
}
}
-fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
- match *r {
- Ok(_) => false,
- Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
+impl<'a> Io for &'a TcpStream {
+ fn poll_read(&mut self) -> Async<()> {
+ <TcpStream>::poll_read(self)
+ }
+
+ fn poll_write(&mut self) -> Async<()> {
+ <TcpStream>::poll_write(self)
}
}
@@ -434,6 +434,46 @@ impl fmt::Debug for TcpStream {
}
}
+impl Future for TcpStreamNew {
+ type Item = TcpStream;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<TcpStream, io::Error> {
+ self.inner.poll()
+ }
+}
+
+impl Future for TcpStreamConnect {
+ type Item = TcpStream;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<TcpStream, io::Error> {
+ {
+ let stream = match *self {
+ TcpStreamConnect::Waiting(ref s) => s,
+ TcpStreamConnect::Empty => panic!("can't poll TCP stream twice"),
+ };
+
+ // Once we've connected, wait for the stream to be writable as
+ // that's when the actual connection has been initiated. Once we're
+ // writable we check for `take_socket_error` to see if the connect
+ // actually hit an error or not.
+ //
+ // If all that succeeded then we ship everything on up.
+ if let Async::NotReady = stream.io.poll_write() {
+ return Ok(Async::NotReady)
+ }
+ if let Some(e) = try!(stream.io.get_ref().take_error()) {
+ return Err(e)
+ }
+ }
+ match mem::replace(self, TcpStreamConnect::Empty) {
+ TcpStreamConnect::Waiting(stream) => Ok(Async::Ready(stream)),
+ TcpStreamConnect::Empty => panic!(),
+ }
+ }
+}
+
#[cfg(unix)]
mod sys {
use std::os::unix::prelude::*;
diff --git a/src/udp.rs b/src/net/udp.rs
index 73afcd66..e11ed873 100644
--- a/