summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2018-03-04 10:46:54 -0800
committerGitHub <noreply@github.com>2018-03-04 10:46:54 -0800
commit9f7a98af3c4cbeb89b162c1faa4480553a3dc82e (patch)
tree13ba7eb34d118e32c40f9775592665b8ebe9bc9c
parent7db77194194851fcc7cad4d68f0481941fb8a285 (diff)
Switch TCP/UDP fns to poll_ -> Poll<...> style (#175)
Tokio is moving away from using `WouldBlock`, instead favoring `Async::NotReady`. This patch updates the TCP and UDP types, deprecating any function that returns `WouldBlock` and adding a poll_ prefixed equivalent.
-rw-r--r--examples/echo-udp.rs6
-rw-r--r--src/lib.rs1
-rw-r--r--src/net/tcp.rs103
-rw-r--r--src/net/udp/frame.rs4
-rw-r--r--src/net/udp/mod.rs166
-rw-r--r--tests/udp.rs2
6 files changed, 191 insertions, 91 deletions
diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs
index f7e2bf09..18f2d393 100644
--- a/examples/echo-udp.rs
+++ b/examples/echo-udp.rs
@@ -10,9 +10,9 @@
//!
//! Each line you type in to the `nc` terminal should be echo'd back to you!
+#[macro_use]
extern crate futures;
extern crate tokio;
-#[macro_use]
extern crate tokio_io;
use std::{env, io};
@@ -37,14 +37,14 @@ impl Future for Server {
// If so then we try to send it back to the original source, waiting
// until it's writable and we're able to do so.
if let Some((size, peer)) = self.to_send {
- let amt = try_nb!(self.socket.send_to(&self.buf[..size], &peer));
+ let amt = try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer));
println!("Echoed {}/{} bytes to {}", amt, size, peer);
self.to_send = None;
}
// If we're here then `to_send` is `None`, so we take a look for the
// next message we're going to echo back.
- self.to_send = Some(try_nb!(self.socket.recv_from(&mut self.buf)));
+ self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf)));
}
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 9de17a9f..b821f67b 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -71,7 +71,6 @@ extern crate futures;
extern crate iovec;
extern crate mio;
extern crate slab;
-#[macro_use]
extern crate tokio_io;
extern crate tokio_executor;
extern crate tokio_reactor;
diff --git a/src/net/tcp.rs b/src/net/tcp.rs
index 35cd4f00..dada8f4d 100644
--- a/src/net/tcp.rs
+++ b/src/net/tcp.rs
@@ -39,32 +39,50 @@ impl TcpListener {
Ok(TcpListener::new(l))
}
+ #[deprecated(since = "0.1.2", note = "use poll_accept instead")]
+ #[doc(hidden)]
+ pub fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> {
+ match self.poll_accept()? {
+ Async::Ready(ret) => Ok(ret),
+ Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
+ }
+ }
+
/// Attempt to accept a connection and create a new connected `TcpStream` if
/// successful.
///
- /// This function will attempt an accept operation, but will not block
- /// waiting for it to complete. If the operation would block then a "would
- /// block" error is returned. Additionally, if this method would block, it
- /// registers the current task to receive a notification when it would
- /// otherwise not block.
- ///
/// Note that typically for simple usage it's easier to treat incoming
/// connections as a `Stream` of `TcpStream`s with the `incoming` method
/// below.
///
+ /// # Return
+ ///
+ /// On success, returns `Ok(Async::Ready((socket, addr)))`.
+ ///
+ /// If the listener is not ready to accept, the method returns
+ /// `Ok(Async::NotReady)` and arranges for the current task to receive a
+ /// notification when the listener becomes ready to accept.
+ ///
/// # Panics
///
- /// This function will panic if it is called outside the context of a
- /// future's task. It's recommended to only call this from the
- /// implementation of a `Future::poll`, if necessary.
- pub fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> {
- let (io, addr) = self.accept_std()?;
+ /// This function will panic if called from outside of a task context.
+ pub fn poll_accept(&mut self) -> Poll<(TcpStream, SocketAddr), io::Error> {
+ let (io, addr) = try_ready!(self.poll_accept_std());
let io = mio::net::TcpStream::from_stream(io)?;
let io = PollEvented2::new(io);
let io = TcpStream { io };
- Ok((io, addr))
+ Ok((io, addr).into())
+ }
+
+ #[deprecated(since = "0.1.2", note = "use poll_accept_std instead")]
+ #[doc(hidden)]
+ pub fn accept_std(&mut self) -> io::Result<(net::TcpStream, SocketAddr)> {
+ match self.poll_accept_std()? {
+ Async::Ready(ret) => Ok(ret),
+ Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
+ }
}
/// Attempt to accept a connection and create a new connected `TcpStream` if
@@ -75,23 +93,27 @@ impl TcpListener {
/// can then allow for the TCP stream to be assoiated with a different
/// reactor than the one this `TcpListener` is associated with.
///
+ /// # Return
+ ///
+ /// On success, returns `Ok(Async::Ready((socket, addr)))`.
+ ///
+ /// If the listener is not ready to accept, the method returns
+ /// `Ok(Async::NotReady)` and arranges for the current task to receive a
+ /// notification when the listener becomes ready to accept.
+ ///
/// # Panics
///
- /// This function will panic for the same reasons as `accept`, notably if
- /// called outside the context of a future.
- pub fn accept_std(&mut self) -> io::Result<(net::TcpStream, SocketAddr)> {
- if let Async::NotReady = self.io.poll_read_ready()? {
- return Err(io::ErrorKind::WouldBlock.into())
- }
+ /// This function will panic if called from outside of a task context.
+ pub fn poll_accept_std(&mut self) -> Poll<(net::TcpStream, SocketAddr), io::Error> {
+ try_ready!(self.io.poll_read_ready());
match self.io.get_ref().accept_std() {
- Ok(pair) => Ok(pair),
- Err(e) => {
- if e.kind() == io::ErrorKind::WouldBlock {
- self.io.need_read()?;
- }
- Err(e)
+ Ok(pair) => Ok(pair.into()),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.need_read()?;
+ Ok(Async::NotReady)
}
+ Err(e) => Err(e),
}
}
@@ -181,7 +203,7 @@ impl Stream for Incoming {
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
- let (socket, _) = try_nb!(self.inner.accept());
+ let (socket, _) = try_ready!(self.inner.poll_accept());
Ok(Async::Ready(Some(socket)))
}
}
@@ -298,22 +320,41 @@ impl TcpStream {
self.io.get_ref().peer_addr()
}
+ #[deprecated(since = "0.1.2", note = "use poll_peek instead")]
+ #[doc(hidden)]
+ pub fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ match self.poll_peek(buf)? {
+ Async::Ready(n) => Ok(n),
+ Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
+ }
+ }
+
/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.
///
/// Successive calls return the same data. This is accomplished by passing
/// `MSG_PEEK` as a flag to the underlying recv system call.
- pub fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- if let Async::NotReady = self.io.poll_read_ready()? {
- return Err(io::ErrorKind::WouldBlock.into())
- }
+ ///
+ /// # Return
+ ///
+ /// On success, returns `Ok(Async::Ready(num_bytes_read))`.
+ ///
+ /// If no data is available for reading, the method returns
+ /// `Ok(Async::NotReady)` and arranges for the current task to receive a
+ /// notification when the socket becomes readable or is closed.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if called from outside of a task context.
+ pub fn poll_peek(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> {
+ try_ready!(self.io.poll_read_ready());
match self.io.get_ref().peek(buf) {
- Ok(v) => Ok(v),
+ Ok(ret) => Ok(ret.into()),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.need_read()?;
- Err(io::ErrorKind::WouldBlock.into())
+ Ok(Async::NotReady)
}
Err(e) => Err(e),
}
diff --git a/src/net/udp/frame.rs b/src/net/udp/frame.rs
index 77480a24..c06b2279 100644
--- a/src/net/udp/frame.rs
+++ b/src/net/udp/frame.rs
@@ -44,7 +44,7 @@ impl<C: Decoder> Stream for UdpFramed<C> {
let (n, addr) = unsafe {
// Read into the buffer without having to initialize the memory.
- let (n, addr) = try_nb!(self.socket.recv_from(self.rd.bytes_mut()));
+ let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut()));
self.rd.advance_mut(n);
(n, addr)
};
@@ -87,7 +87,7 @@ impl<C: Encoder> Sink for UdpFramed<C> {
}
trace!("flushing frame; length={}", self.wr.len());
- let n = try_nb!(self.socket.send_to(&self.wr, &self.out_addr));
+ let n = try_ready!(self.socket.poll_send_to(&self.wr, &self.out_addr));
trace!("written {}", n);
let wrote_all = n == self.wr.len();
diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs
index bfac9404..2216d0a4 100644
--- a/src/net/udp/mod.rs
+++ b/src/net/udp/mod.rs
@@ -56,75 +56,127 @@ impl UdpSocket {
self.io.get_ref().connect(*addr)
}
- /// Sends data on the socket to the address previously bound via connect().
- /// On success, returns the number of bytes written.
+ #[deprecated(since = "0.1.2", note = "use poll_send instead")]
+ #[doc(hidden)]
+ pub fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
+ match self.poll_send(buf)? {
+ Async::Ready(n) => Ok(n),
+ Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
+ }
+ }
+
+ /// Sends data on the socket to the remote address to which it is connected.
+ ///
+ /// The [`connect`] method will connect this socket to a remote address. This
+ /// method will fail if the socket is not connected.
+ ///
+ /// [`connect`]: #method.connect
+ ///
+ /// # Return
+ ///
+ /// On success, returns `Ok(Async::Ready(num_bytes_written))`.
+ ///
+ /// If the socket is not ready for writing, the method returns
+ /// `Ok(Async::NotReady)` and arranges for the current task to receive a
+ /// notification when the socket becomes writable.
///
/// # Panics
///
- /// This function will panic if called outside the context of a future's
- /// task.
- pub fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
- if let Async::NotReady = self.io.poll_write_ready()? {
- return Err(io::ErrorKind::WouldBlock.into())
- }
+ /// This function will panic if called from outside of a task context.
+ pub fn poll_send(&mut self, buf: &[u8]) -> Poll<usize, io::Error> {
+ try_ready!(self.io.poll_write_ready());
match self.io.get_ref().send(buf) {
- Ok(n) => Ok(n),
- Err(e) => {
- if e.kind() == io::ErrorKind::WouldBlock {
- self.io.need_write()?;
- }
- Err(e)
+ Ok(n) => Ok(n.into()),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.need_write()?;
+ Ok(Async::NotReady)
}
+ Err(e) => Err(e),
+ }
+ }
+
+ #[deprecated(since = "0.1.2", note = "use poll_recv instead")]
+ #[doc(hidden)]
+ pub fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ match self.poll_recv(buf)? {
+ Async::Ready(n) => Ok(n),
+ Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
}
}
- /// Receives data from the socket previously bound with connect().
- /// On success, returns the number of bytes read.
+ /// Receives a single datagram message on the socket from the remote address to
+ /// which it is connected. On success, returns the number of bytes read.
+ ///
+ /// The function must be called with valid byte array `buf` of sufficient size to
+ /// hold the message bytes. If a message is too long to fit in the supplied buffer,
+ /// excess bytes may be discarded.
+ ///
+ /// The [`connect`] method will connect this socket to a remote address. This
+ /// method will fail if the socket is not connected.
+ ///
+ /// [`connect`]: #method.connect
+ ///
+ /// # Return
+ ///
+ /// On success, returns `Ok(Async::Ready(num_bytes_read))`.
+ ///
+ /// If no data is available for reading, the method returns
+ /// `Ok(Async::NotReady)` and arranges for the current task to receive a
+ /// notification when the socket becomes receivable or is closed.
///
/// # Panics
///
- /// This function will panic if called outside the context of a future's
- /// task.
- pub fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- if let Async::NotReady = self.io.poll_read_ready()? {
- return Err(io::ErrorKind::WouldBlock.into())
- }
+ /// This function will panic if called from outside of a task context.
+ pub fn poll_recv(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> {
+ try_ready!(self.io.poll_read_ready());
match self.io.get_ref().recv(buf) {
- Ok(n) => Ok(n),
- Err(e) => {
- if e.kind() == io::ErrorKind::WouldBlock {
- self.io.need_read()?;
- }
- Err(e)
+ Ok(n) => Ok(n.into()),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.need_read()?;
+ Ok(Async::NotReady)
}
+ Err(e) => Err(e),
+ }
+ }
+
+ #[deprecated(since = "0.1.2", note = "use poll_send_to instead")]
+ #[doc(hidden)]
+ pub fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
+ match self.poll_send_to(buf, target)? {
+ Async::Ready(n) => Ok(n),
+ Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
}
}
/// Sends data on the socket to the given address. On success, returns the
/// number of bytes written.
///
- /// Address type can be any implementer of `ToSocketAddrs` trait. See its
- /// documentation for concrete examples.
+ /// This will return an error when the IP version of the local socket
+ /// does not match that of `target`.
+ ///
+ /// # Return
+ ///
+ /// On success, returns `Ok(Async::Ready(num_bytes_written))`.
+ ///
+ /// If the socket is not ready for writing, the method returns
+ /// `Ok(Async::NotReady)` and arranges for the current task to receive a
+ /// notification when the socket becomes writable.
///
/// # Panics
///
- /// This function will panic if called outside the context of a future's
- /// task.
- pub fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
- if let Async::NotReady = self.io.poll_write_ready()? {
- return Err(io::ErrorKind::WouldBlock.into())
- }
+ /// This function will panic if called from outside of a task context.
+ pub fn poll_send_to(&mut self, buf: &[u8], target: &SocketAddr) -> Poll<usize, io::Error> {
+ try_ready!(self.io.poll_write_ready());
match self.io.get_ref().send_to(buf, target) {
- Ok(n) => Ok(n),
- Err(e) => {
- if e.kind() == io::ErrorKind::WouldBlock {
- self.io.need_write()?;
- }
- Err(e)
+ Ok(n) => Ok(n.into()),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.need_write()?;
+ Ok(Async::NotReady)
}
+ Err(e) => Err(e),
}
}
@@ -148,6 +200,15 @@ impl UdpSocket {
SendDgram::new(self, buf, *addr)
}
+ #[deprecated(since = "0.1.2", note = "use poll_recv_from instead")]
+ #[doc(hidden)]
+ pub fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ match self.poll_recv_from(buf)? {
+ Async::Ready(ret) => Ok(ret),
+ Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
+ }
+ }
+
/// Receives data from the socket. On success, returns the number of bytes
/// read and the address from whence the data came.
///
@@ -155,19 +216,16 @@ impl UdpSocket {
///
/// This function will panic if called outside the context of a future's
/// task.
- pub fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
- if let Async::NotReady = self.io.poll_read_ready()? {
- return Err(io::ErrorKind::WouldBlock.into())
- }
+ pub fn poll_recv_from(&mut self, buf: &mut [u8]) -> Poll<(usize, SocketAddr), io::Error> {
+ try_ready!(self.io.poll_read_ready());
match self.io.get_ref().recv_from(buf) {
- Ok(n) => Ok(n),
- Err(e) => {
- if e.kind() == io::ErrorKind::WouldBlock {
- self.io.need_read()?;
- }
- Err(e)
+ Ok(n) => Ok(n.into()),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.need_read()?;
+ Ok(Async::NotReady)
}
+ Err(e) => Err(e),
}
}
@@ -384,7 +442,7 @@ impl<T> Future for SendDgram<T>
{
let ref mut inner =
self.state.as_mut().expect("SendDgram polled after completion");
- let n = try_nb!(inner.socket.send_to(inner.buffer.as_ref(), &inner.addr));
+ let n = try_ready!(inner.socket.poll_send_to(inner.buffer.as_ref(), &inner.addr));
if n != inner.buffer.as_ref().len() {
return Err(incomplete_write("failed to send entire message \
in datagram"))
@@ -436,7 +494,7 @@ impl<T> Future for RecvDgram<T>
let ref mut inner =
self.state.as_mut().expect("RecvDgram polled after completion");
- try_nb!(inner.socket.recv_from(inner.buffer.as_mut()))
+ try_ready!(inner.socket.poll_recv_from(inner.buffer.as_mut()))
};
let inner = self.state.take().unwrap();
diff --git a/tests/udp.rs b/tests/udp.rs
index 162af700..8faebee6 100644
--- a/tests/udp.rs
+++ b/tests/udp.rs
@@ -1,3 +1,5 @@
+#![allow(deprecated)]
+
extern crate futures;
extern crate tokio;
#[macro_use]