summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSean McArthur <sean@seanmonstar.com>2018-03-22 09:59:13 -0700
committerCarl Lerche <me@carllerche.com>2018-03-22 09:59:13 -0700
commite5ebd02885da0b3927d26a4a23bd0e5297d3d262 (patch)
tree74a1672c549323039db4a76183275fe134a4820a
parent08c21e7bac0c0163d236943aaa3c5738e4b5bc2e (diff)
implement poll_vectored_* and initializer method for futures2 (#242)
-rw-r--r--tokio-tcp/src/stream.rs62
1 files changed, 62 insertions, 0 deletions
diff --git a/tokio-tcp/src/stream.rs b/tokio-tcp/src/stream.rs
index d402952c..a8e67c0a 100644
--- a/tokio-tcp/src/stream.rs
+++ b/tokio-tcp/src/stream.rs
@@ -411,6 +411,16 @@ impl futures2::io::AsyncRead for TcpStream {
{
futures2::io::AsyncRead::poll_read(&mut self.io, cx, buf)
}
+
+ fn poll_vectored_read(&mut self, cx: &mut futures2::task::Context, vec: &mut [&mut IoVec])
+ -> futures2::Poll<usize, io::Error>
+ {
+ futures2::io::AsyncRead::poll_vectored_read(&mut &*self, cx, vec)
+ }
+
+ unsafe fn initializer(&self) -> futures2::io::Initializer {
+ futures2::io::Initializer::nop()
+ }
}
impl AsyncWrite for TcpStream {
@@ -431,6 +441,12 @@ impl futures2::io::AsyncWrite for TcpStream {
futures2::io::AsyncWrite::poll_write(&mut self.io, cx, buf)
}
+ fn poll_vectored_write(&mut self, cx: &mut futures2::task::Context, vec: &[&IoVec])
+ -> futures2::Poll<usize, io::Error>
+ {
+ futures2::io::AsyncWrite::poll_vectored_write(&mut &*self, cx, vec)
+ }
+
fn poll_flush(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> {
futures2::io::AsyncWrite::poll_flush(&mut self.io, cx)
}
@@ -519,6 +535,31 @@ impl<'a> futures2::io::AsyncRead for &'a TcpStream {
{
futures2::io::AsyncRead::poll_read(&mut &self.io, cx, buf)
}
+
+ fn poll_vectored_read(&mut self, cx: &mut futures2::task::Context, vec: &mut [&mut IoVec])
+ -> futures2::Poll<usize, io::Error>
+ {
+ if let futures2::Async::Pending = self.io.poll_read_ready2(cx, mio::Ready::readable())? {
+ return Ok(futures2::Async::Pending)
+ }
+
+ let r = self.io.get_ref().read_bufs(vec);
+
+ match r {
+ Ok(n) => {
+ Ok(futures2::Async::Ready(n))
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_read_ready2(cx, mio::Ready::readable())?;
+ Ok(futures2::Async::Pending)
+ }
+ Err(e) => Err(e),
+ }
+ }
+
+ unsafe fn initializer(&self) -> futures2::io::Initializer {
+ futures2::io::Initializer::nop()
+ }
}
impl<'a> AsyncWrite for &'a TcpStream {
@@ -563,6 +604,27 @@ impl<'a> futures2::io::AsyncWrite for &'a TcpStream {
futures2::io::AsyncWrite::poll_write(&mut &self.io, cx, buf)
}
+ fn poll_vectored_write(&mut self, cx: &mut futures2::task::Context, vec: &[&IoVec])
+ -> futures2::Poll<usize, io::Error>
+ {
+ if let futures2::Async::Pending = self.io.poll_write_ready2(cx)? {
+ return Ok(futures2::Async::Pending)
+ }
+
+ let r = self.io.get_ref().write_bufs(vec);
+
+ match r {
+ Ok(n) => {
+ Ok(futures2::Async::Ready(n))
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_write_ready()?;
+ Ok(futures2::Async::Pending)
+ }
+ Err(e) => Err(e),
+ }
+ }
+
fn poll_flush(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> {
futures2::io::AsyncWrite::poll_flush(&mut &self.io, cx)
}