diff options
author | Carl Lerche <me@carllerche.com> | 2018-03-04 10:46:54 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-04 10:46:54 -0800 |
commit | 9f7a98af3c4cbeb89b162c1faa4480553a3dc82e (patch) | |
tree | 13ba7eb34d118e32c40f9775592665b8ebe9bc9c | |
parent | 7db77194194851fcc7cad4d68f0481941fb8a285 (diff) |
Switch TCP/UDP fns to poll_ -> Poll<...> style (#175)
Tokio is moving away from using `WouldBlock`, instead favoring
`Async::NotReady`.
This patch updates the TCP and UDP types, deprecating any function that
returns `WouldBlock` and adding a poll_ prefixed equivalent.
-rw-r--r-- | examples/echo-udp.rs | 6 | ||||
-rw-r--r-- | src/lib.rs | 1 | ||||
-rw-r--r-- | src/net/tcp.rs | 103 | ||||
-rw-r--r-- | src/net/udp/frame.rs | 4 | ||||
-rw-r--r-- | src/net/udp/mod.rs | 166 | ||||
-rw-r--r-- | tests/udp.rs | 2 |
6 files changed, 191 insertions, 91 deletions
diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs index f7e2bf09..18f2d393 100644 --- a/examples/echo-udp.rs +++ b/examples/echo-udp.rs @@ -10,9 +10,9 @@ //! //! Each line you type in to the `nc` terminal should be echo'd back to you! +#[macro_use] extern crate futures; extern crate tokio; -#[macro_use] extern crate tokio_io; use std::{env, io}; @@ -37,14 +37,14 @@ impl Future for Server { // If so then we try to send it back to the original source, waiting // until it's writable and we're able to do so. if let Some((size, peer)) = self.to_send { - let amt = try_nb!(self.socket.send_to(&self.buf[..size], &peer)); + let amt = try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer)); println!("Echoed {}/{} bytes to {}", amt, size, peer); self.to_send = None; } // If we're here then `to_send` is `None`, so we take a look for the // next message we're going to echo back. - self.to_send = Some(try_nb!(self.socket.recv_from(&mut self.buf))); + self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf))); } } } @@ -71,7 +71,6 @@ extern crate futures; extern crate iovec; extern crate mio; extern crate slab; -#[macro_use] extern crate tokio_io; extern crate tokio_executor; extern crate tokio_reactor; diff --git a/src/net/tcp.rs b/src/net/tcp.rs index 35cd4f00..dada8f4d 100644 --- a/src/net/tcp.rs +++ b/src/net/tcp.rs @@ -39,32 +39,50 @@ impl TcpListener { Ok(TcpListener::new(l)) } + #[deprecated(since = "0.1.2", note = "use poll_accept instead")] + #[doc(hidden)] + pub fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> { + match self.poll_accept()? { + Async::Ready(ret) => Ok(ret), + Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), + } + } + /// Attempt to accept a connection and create a new connected `TcpStream` if /// successful. /// - /// This function will attempt an accept operation, but will not block - /// waiting for it to complete. If the operation would block then a "would - /// block" error is returned. Additionally, if this method would block, it - /// registers the current task to receive a notification when it would - /// otherwise not block. - /// /// Note that typically for simple usage it's easier to treat incoming /// connections as a `Stream` of `TcpStream`s with the `incoming` method /// below. /// + /// # Return + /// + /// On success, returns `Ok(Async::Ready((socket, addr)))`. + /// + /// If the listener is not ready to accept, the method returns + /// `Ok(Async::NotReady)` and arranges for the current task to receive a + /// notification when the listener becomes ready to accept. + /// /// # Panics /// - /// This function will panic if it is called outside the context of a - /// future's task. It's recommended to only call this from the - /// implementation of a `Future::poll`, if necessary. - pub fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> { - let (io, addr) = self.accept_std()?; + /// This function will panic if called from outside of a task context. + pub fn poll_accept(&mut self) -> Poll<(TcpStream, SocketAddr), io::Error> { + let (io, addr) = try_ready!(self.poll_accept_std()); let io = mio::net::TcpStream::from_stream(io)?; let io = PollEvented2::new(io); let io = TcpStream { io }; - Ok((io, addr)) + Ok((io, addr).into()) + } + + #[deprecated(since = "0.1.2", note = "use poll_accept_std instead")] + #[doc(hidden)] + pub fn accept_std(&mut self) -> io::Result<(net::TcpStream, SocketAddr)> { + match self.poll_accept_std()? { + Async::Ready(ret) => Ok(ret), + Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), + } } /// Attempt to accept a connection and create a new connected `TcpStream` if @@ -75,23 +93,27 @@ impl TcpListener { /// can then allow for the TCP stream to be assoiated with a different /// reactor than the one this `TcpListener` is associated with. /// + /// # Return + /// + /// On success, returns `Ok(Async::Ready((socket, addr)))`. + /// + /// If the listener is not ready to accept, the method returns + /// `Ok(Async::NotReady)` and arranges for the current task to receive a + /// notification when the listener becomes ready to accept. + /// /// # Panics /// - /// This function will panic for the same reasons as `accept`, notably if - /// called outside the context of a future. - pub fn accept_std(&mut self) -> io::Result<(net::TcpStream, SocketAddr)> { - if let Async::NotReady = self.io.poll_read_ready()? { - return Err(io::ErrorKind::WouldBlock.into()) - } + /// This function will panic if called from outside of a task context. + pub fn poll_accept_std(&mut self) -> Poll<(net::TcpStream, SocketAddr), io::Error> { + try_ready!(self.io.poll_read_ready()); match self.io.get_ref().accept_std() { - Ok(pair) => Ok(pair), - Err(e) => { - if e.kind() == io::ErrorKind::WouldBlock { - self.io.need_read()?; - } - Err(e) + Ok(pair) => Ok(pair.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.need_read()?; + Ok(Async::NotReady) } + Err(e) => Err(e), } } @@ -181,7 +203,7 @@ impl Stream for Incoming { type Error = io::Error; fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> { - let (socket, _) = try_nb!(self.inner.accept()); + let (socket, _) = try_ready!(self.inner.poll_accept()); Ok(Async::Ready(Some(socket))) } } @@ -298,22 +320,41 @@ impl TcpStream { self.io.get_ref().peer_addr() } + #[deprecated(since = "0.1.2", note = "use poll_peek instead")] + #[doc(hidden)] + pub fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> { + match self.poll_peek(buf)? { + Async::Ready(n) => Ok(n), + Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), + } + } + /// Receives data on the socket from the remote address to which it is /// connected, without removing that data from the queue. On success, /// returns the number of bytes peeked. /// /// Successive calls return the same data. This is accomplished by passing /// `MSG_PEEK` as a flag to the underlying recv system call. - pub fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> { - if let Async::NotReady = self.io.poll_read_ready()? { - return Err(io::ErrorKind::WouldBlock.into()) - } + /// + /// # Return + /// + /// On success, returns `Ok(Async::Ready(num_bytes_read))`. + /// + /// If no data is available for reading, the method returns + /// `Ok(Async::NotReady)` and arranges for the current task to receive a + /// notification when the socket becomes readable or is closed. + /// + /// # Panics + /// + /// This function will panic if called from outside of a task context. + pub fn poll_peek(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> { + try_ready!(self.io.poll_read_ready()); match self.io.get_ref().peek(buf) { - Ok(v) => Ok(v), + Ok(ret) => Ok(ret.into()), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.io.need_read()?; - Err(io::ErrorKind::WouldBlock.into()) + Ok(Async::NotReady) } Err(e) => Err(e), } diff --git a/src/net/udp/frame.rs b/src/net/udp/frame.rs index 77480a24..c06b2279 100644 --- a/src/net/udp/frame.rs +++ b/src/net/udp/frame.rs @@ -44,7 +44,7 @@ impl<C: Decoder> Stream for UdpFramed<C> { let (n, addr) = unsafe { // Read into the buffer without having to initialize the memory. - let (n, addr) = try_nb!(self.socket.recv_from(self.rd.bytes_mut())); + let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut())); self.rd.advance_mut(n); (n, addr) }; @@ -87,7 +87,7 @@ impl<C: Encoder> Sink for UdpFramed<C> { } trace!("flushing frame; length={}", self.wr.len()); - let n = try_nb!(self.socket.send_to(&self.wr, &self.out_addr)); + let n = try_ready!(self.socket.poll_send_to(&self.wr, &self.out_addr)); trace!("written {}", n); let wrote_all = n == self.wr.len(); diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs index bfac9404..2216d0a4 100644 --- a/src/net/udp/mod.rs +++ b/src/net/udp/mod.rs @@ -56,75 +56,127 @@ impl UdpSocket { self.io.get_ref().connect(*addr) } - /// Sends data on the socket to the address previously bound via connect(). - /// On success, returns the number of bytes written. + #[deprecated(since = "0.1.2", note = "use poll_send instead")] + #[doc(hidden)] + pub fn send(&mut self, buf: &[u8]) -> io::Result<usize> { + match self.poll_send(buf)? { + Async::Ready(n) => Ok(n), + Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), + } + } + + /// Sends data on the socket to the remote address to which it is connected. + /// + /// The [`connect`] method will connect this socket to a remote address. This + /// method will fail if the socket is not connected. + /// + /// [`connect`]: #method.connect + /// + /// # Return + /// + /// On success, returns `Ok(Async::Ready(num_bytes_written))`. + /// + /// If the socket is not ready for writing, the method returns + /// `Ok(Async::NotReady)` and arranges for the current task to receive a + /// notification when the socket becomes writable. /// /// # Panics /// - /// This function will panic if called outside the context of a future's - /// task. - pub fn send(&mut self, buf: &[u8]) -> io::Result<usize> { - if let Async::NotReady = self.io.poll_write_ready()? { - return Err(io::ErrorKind::WouldBlock.into()) - } + /// This function will panic if called from outside of a task context. + pub fn poll_send(&mut self, buf: &[u8]) -> Poll<usize, io::Error> { + try_ready!(self.io.poll_write_ready()); match self.io.get_ref().send(buf) { - Ok(n) => Ok(n), - Err(e) => { - if e.kind() == io::ErrorKind::WouldBlock { - self.io.need_write()?; - } - Err(e) + Ok(n) => Ok(n.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.need_write()?; + Ok(Async::NotReady) } + Err(e) => Err(e), + } + } + + #[deprecated(since = "0.1.2", note = "use poll_recv instead")] + #[doc(hidden)] + pub fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { + match self.poll_recv(buf)? { + Async::Ready(n) => Ok(n), + Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), } } - /// Receives data from the socket previously bound with connect(). - /// On success, returns the number of bytes read. + /// Receives a single datagram message on the socket from the remote address to + /// which it is connected. On success, returns the number of bytes read. + /// + /// The function must be called with valid byte array `buf` of sufficient size to + /// hold the message bytes. If a message is too long to fit in the supplied buffer, + /// excess bytes may be discarded. + /// + /// The [`connect`] method will connect this socket to a remote address. This + /// method will fail if the socket is not connected. + /// + /// [`connect`]: #method.connect + /// + /// # Return + /// + /// On success, returns `Ok(Async::Ready(num_bytes_read))`. + /// + /// If no data is available for reading, the method returns + /// `Ok(Async::NotReady)` and arranges for the current task to receive a + /// notification when the socket becomes receivable or is closed. /// /// # Panics /// - /// This function will panic if called outside the context of a future's - /// task. - pub fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { - if let Async::NotReady = self.io.poll_read_ready()? { - return Err(io::ErrorKind::WouldBlock.into()) - } + /// This function will panic if called from outside of a task context. + pub fn poll_recv(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> { + try_ready!(self.io.poll_read_ready()); match self.io.get_ref().recv(buf) { - Ok(n) => Ok(n), - Err(e) => { - if e.kind() == io::ErrorKind::WouldBlock { - self.io.need_read()?; - } - Err(e) + Ok(n) => Ok(n.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.need_read()?; + Ok(Async::NotReady) } + Err(e) => Err(e), + } + } + + #[deprecated(since = "0.1.2", note = "use poll_send_to instead")] + #[doc(hidden)] + pub fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> { + match self.poll_send_to(buf, target)? { + Async::Ready(n) => Ok(n), + Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), } } /// Sends data on the socket to the given address. On success, returns the /// number of bytes written. /// - /// Address type can be any implementer of `ToSocketAddrs` trait. See its - /// documentation for concrete examples. + /// This will return an error when the IP version of the local socket + /// does not match that of `target`. + /// + /// # Return + /// + /// On success, returns `Ok(Async::Ready(num_bytes_written))`. + /// + /// If the socket is not ready for writing, the method returns + /// `Ok(Async::NotReady)` and arranges for the current task to receive a + /// notification when the socket becomes writable. /// /// # Panics /// - /// This function will panic if called outside the context of a future's - /// task. - pub fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> { - if let Async::NotReady = self.io.poll_write_ready()? { - return Err(io::ErrorKind::WouldBlock.into()) - } + /// This function will panic if called from outside of a task context. + pub fn poll_send_to(&mut self, buf: &[u8], target: &SocketAddr) -> Poll<usize, io::Error> { + try_ready!(self.io.poll_write_ready()); match self.io.get_ref().send_to(buf, target) { - Ok(n) => Ok(n), - Err(e) => { - if e.kind() == io::ErrorKind::WouldBlock { - self.io.need_write()?; - } - Err(e) + Ok(n) => Ok(n.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.need_write()?; + Ok(Async::NotReady) } + Err(e) => Err(e), } } @@ -148,6 +200,15 @@ impl UdpSocket { SendDgram::new(self, buf, *addr) } + #[deprecated(since = "0.1.2", note = "use poll_recv_from instead")] + #[doc(hidden)] + pub fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + match self.poll_recv_from(buf)? { + Async::Ready(ret) => Ok(ret), + Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), + } + } + /// Receives data from the socket. On success, returns the number of bytes /// read and the address from whence the data came. /// @@ -155,19 +216,16 @@ impl UdpSocket { /// /// This function will panic if called outside the context of a future's /// task. - pub fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - if let Async::NotReady = self.io.poll_read_ready()? { - return Err(io::ErrorKind::WouldBlock.into()) - } + pub fn poll_recv_from(&mut self, buf: &mut [u8]) -> Poll<(usize, SocketAddr), io::Error> { + try_ready!(self.io.poll_read_ready()); match self.io.get_ref().recv_from(buf) { - Ok(n) => Ok(n), - Err(e) => { - if e.kind() == io::ErrorKind::WouldBlock { - self.io.need_read()?; - } - Err(e) + Ok(n) => Ok(n.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.need_read()?; + Ok(Async::NotReady) } + Err(e) => Err(e), } } @@ -384,7 +442,7 @@ impl<T> Future for SendDgram<T> { let ref mut inner = self.state.as_mut().expect("SendDgram polled after completion"); - let n = try_nb!(inner.socket.send_to(inner.buffer.as_ref(), &inner.addr)); + let n = try_ready!(inner.socket.poll_send_to(inner.buffer.as_ref(), &inner.addr)); if n != inner.buffer.as_ref().len() { return Err(incomplete_write("failed to send entire message \ in datagram")) @@ -436,7 +494,7 @@ impl<T> Future for RecvDgram<T> let ref mut inner = self.state.as_mut().expect("RecvDgram polled after completion"); - try_nb!(inner.socket.recv_from(inner.buffer.as_mut())) + try_ready!(inner.socket.poll_recv_from(inner.buffer.as_mut())) }; let inner = self.state.take().unwrap(); diff --git a/tests/udp.rs b/tests/udp.rs index 162af700..8faebee6 100644 --- a/tests/udp.rs +++ b/tests/udp.rs @@ -1,3 +1,5 @@ +#![allow(deprecated)] + extern crate futures; extern crate tokio; #[macro_use] |