diff options
author | Roman <humbug@deeptown.org> | 2018-02-12 21:52:05 +0400 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2018-02-12 09:52:05 -0800 |
commit | 88a7030f730a4ebf66f82460558bbb0e13c02bae (patch) | |
tree | b279495f356f95100c67ecac10f11fe05bdeab37 /tokio-io | |
parent | 35aeabd3ffda78fea756b786bbf0fe38f7e71d83 (diff) |
Split io code (#129)
* move src/io.rs -> src/io/mod.rs
* move src/read.rs -> src/io/read.rs
* move src/read_exact.rs -> src/io/read_exact.rs
* move src/read_until.rs -> src/io/read_until.rs
* move src/read_to_end.rs -> src/io/read_to_end.rs
* move src/flush.rs -> src/io/flush.rs
* move src/copy.rs -> src/io/copy.rs
* move src/shutdown.rs -> src/io/shutdown.rs
* move src/write_all.rs -> src/io/write_all.rs
* move Async{Read,Write} => src/io/async_{read,write}.rs
* move Async{Read,Write} => src/async_{read,write}.rs
Diffstat (limited to 'tokio-io')
-rw-r--r-- | tokio-io/src/async_read.rs | 145 | ||||
-rw-r--r-- | tokio-io/src/async_write.rs | 181 | ||||
-rw-r--r-- | tokio-io/src/io/copy.rs (renamed from tokio-io/src/copy.rs) | 0 | ||||
-rw-r--r-- | tokio-io/src/io/flush.rs (renamed from tokio-io/src/flush.rs) | 0 | ||||
-rw-r--r-- | tokio-io/src/io/mod.rs (renamed from tokio-io/src/io.rs) | 25 | ||||
-rw-r--r-- | tokio-io/src/io/read.rs (renamed from tokio-io/src/read.rs) | 0 | ||||
-rw-r--r-- | tokio-io/src/io/read_exact.rs (renamed from tokio-io/src/read_exact.rs) | 0 | ||||
-rw-r--r-- | tokio-io/src/io/read_to_end.rs (renamed from tokio-io/src/read_to_end.rs) | 0 | ||||
-rw-r--r-- | tokio-io/src/io/read_until.rs (renamed from tokio-io/src/read_until.rs) | 0 | ||||
-rw-r--r-- | tokio-io/src/io/shutdown.rs (renamed from tokio-io/src/shutdown.rs) | 0 | ||||
-rw-r--r-- | tokio-io/src/io/write_all.rs (renamed from tokio-io/src/write_all.rs) | 0 | ||||
-rw-r--r-- | tokio-io/src/lib.rs | 331 |
12 files changed, 348 insertions, 334 deletions
diff --git a/tokio-io/src/async_read.rs b/tokio-io/src/async_read.rs new file mode 100644 index 00000000..fb72c14b --- /dev/null +++ b/tokio-io/src/async_read.rs @@ -0,0 +1,145 @@ +use std::io as std_io; +use bytes::BufMut; +use futures::{Async, Poll}; + +use {framed, split, AsyncWrite}; +use codec::{Decoder, Encoder, Framed}; +use split::{ReadHalf, WriteHalf}; + +/// A trait for readable objects which operated in an asynchronous and +/// futures-aware fashion. +/// +/// This trait inherits from `io::Read` and indicates as a marker that an I/O +/// object is **nonblocking**, meaning that it will return an error instead of +/// blocking when bytes are unavailable, but the stream hasn't reached EOF. +/// Specifically this means that the `read` function for types that implement +/// this trait can have a few return values: +/// +/// * `Ok(n)` means that `n` bytes of data was immediately read and placed into +/// the output buffer, where `n` == 0 implies that EOF has been reached. +/// * `Err(e) if e.kind() == ErrorKind::WouldBlock` means that no data was read +/// into the buffer provided. The I/O object is not currently readable but may +/// become readable in the future. Most importantly, **the current future's +/// task is scheduled to get unparked when the object is readable**. This +/// means that like `Future::poll` you'll receive a notification when the I/O +/// object is readable again. +/// * `Err(e)` for other errors are standard I/O errors coming from the +/// underlying object. +/// +/// This trait importantly means that the `read` method only works in the +/// context of a future's task. The object may panic if used outside of a task. +pub trait AsyncRead: std_io::Read { + /// Prepares an uninitialized buffer to be safe to pass to `read`. Returns + /// `true` if the supplied buffer was zeroed out. + /// + /// While it would be highly unusual, implementations of [`io::Read`] are + /// able to read data from the buffer passed as an argument. Because of + /// this, the buffer passed to [`io::Read`] must be initialized memory. In + /// situations where large numbers of buffers are used, constantly having to + /// zero out buffers can be expensive. + /// + /// This function does any necessary work to prepare an uninitialized buffer + /// to be safe to pass to `read`. If `read` guarantees to never attempt read + /// data out of the supplied buffer, then `prepare_uninitialized_buffer` + /// doesn't need to do any work. + /// + /// If this function returns `true`, then the memory has been zeroed out. + /// This allows implementations of `AsyncRead` which are composed of + /// multiple sub implementations to efficiently implement + /// `prepare_uninitialized_buffer`. + /// + /// This function isn't actually `unsafe` to call but `unsafe` to implement. + /// The implementor must ensure that either the whole `buf` has been zeroed + /// or `read_buf()` overwrites the buffer without reading it and returns + /// correct value. + /// + /// This function is called from [`read_buf`]. + /// + /// [`io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html + /// [`read_buf`]: #method.read_buf + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + for i in 0..buf.len() { + buf[i] = 0; + } + + true + } + + /// Pull some bytes from this source into the specified `Buf`, returning + /// how many bytes were read. + /// + /// The `buf` provided will have bytes read into it and the internal cursor + /// will be advanced if any bytes were read. Note that this method typically + /// will not reallocate the buffer provided. + fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error> + where Self: Sized, + { + if !buf.has_remaining_mut() { + return Ok(Async::Ready(0)); + } + + unsafe { + let n = { + let b = buf.bytes_mut(); + + self.prepare_uninitialized_buffer(b); + + try_nb!(self.read(b)) + }; + + buf.advance_mut(n); + Ok(Async::Ready(n)) + } + } + + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// `Io` object, using `Decode` and `Encode` to read and write the raw data. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Codec` + /// traits to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things like gzip or TLS, which require both read and write access to the + /// underlying object. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `Framed` returned by this method, which will + /// break them into separate objects, allowing them to interact more easily. + fn framed<T: Encoder + Decoder>(self, codec: T) -> Framed<Self, T> + where Self: AsyncWrite + Sized, + { + framed::framed(self, codec) + } + + /// Helper method for splitting this read/write object into two halves. + /// + /// The two halves returned implement the `Read` and `Write` traits, + /// respectively. + fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>) + where Self: AsyncWrite + Sized, + { + split::split(self) + } +} + +impl<T: ?Sized + AsyncRead> AsyncRead for Box<T> { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + (**self).prepare_uninitialized_buffer(buf) + } +} + +impl<'a, T: ?Sized + AsyncRead> AsyncRead for &'a mut T { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + (**self).prepare_uninitialized_buffer(buf) + } +} + +impl<'a> AsyncRead for &'a [u8] { + unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [u8]) -> bool { + false + } +} diff --git a/tokio-io/src/async_write.rs b/tokio-io/src/async_write.rs new file mode 100644 index 00000000..e152cd5a --- /dev/null +++ b/tokio-io/src/async_write.rs @@ -0,0 +1,181 @@ +use std::io as std_io; +use std::io::Write; +use bytes::Buf; +use futures::{Async, Poll}; + +use AsyncRead; + +/// A trait for writable objects which operated in an asynchronous and +/// futures-aware fashion. +/// +/// This trait inherits from `io::Write` and indicates that an I/O object is +/// **nonblocking**, meaning that it will return an error instead of blocking +/// when bytes cannot currently be written, but hasn't closed. Specifically +/// this means that the `write` function for types that implement this trait +/// can have a few return values: +/// +/// * `Ok(n)` means that `n` bytes of data was immediately written . +/// * `Err(e) if e.kind() == ErrorKind::WouldBlock` means that no data was +/// written from the buffer provided. The I/O object is not currently +/// writable but may become writable in the future. Most importantly, **the +/// current future's task is scheduled to get unparked when the object is +/// readable**. This means that like `Future::poll` you'll receive a +/// notification when the I/O object is writable again. +/// * `Err(e)` for other errors are standard I/O errors coming from the +/// underlying object. +/// +/// This trait importantly means that the `write` method only works in the +/// context of a future's task. The object may panic if used outside of a task. +/// +/// Note that this trait also represents that the `Write::flush` method works +/// very similarly to the `write` method, notably that `Ok(())` means that the +/// writer has successfully been flushed, a "would block" error means that the +/// current task is ready to receive a notification when flushing can make more +/// progress, and otherwise normal errors can happen as well. +pub trait AsyncWrite: std_io::Write { + /// Initiates or attempts to shut down this writer, returning success when + /// the I/O connection has completely shut down. + /// + /// This method is intended to be used for asynchronous shutdown of I/O + /// connections. For example this is suitable for implementing shutdown of a + /// TLS connection or calling `TcpStream::shutdown` on a proxied connection. + /// Protocols sometimes need to flush out final pieces of data or otherwise + /// perform a graceful shutdown handshake, reading/writing more data as + /// appropriate. This method is the hook for such protocols to implement the + /// graceful shutdown logic. + /// + /// This `shutdown` method is required by implementors of the + /// `AsyncWrite` trait. Wrappers typically just want to proxy this call + /// through to the wrapped type, and base types will typically implement + /// shutdown logic here or just return `Ok(().into())`. Note that if you're + /// wrapping an underlying `AsyncWrite` a call to `shutdown` implies that + /// transitively the entire stream has been shut down. After your wrapper's + /// shutdown logic has been executed you should shut down the underlying + /// stream. + /// + /// Invocation of a `shutdown` implies an invocation of `flush`. Once this + /// method returns `Ready` it implies that a flush successfully happened + /// before the shutdown happened. That is, callers don't need to call + /// `flush` before calling `shutdown`. They can rely that by calling + /// `shutdown` any pending buffered data will be written out. + /// + /// # Return value + /// + /// This function returns a `Poll<(), io::Error>` classified as such: + /// + /// * `Ok(Async::Ready(()))` - indicates that the connection was + /// successfully shut down and is now safe to deallocate/drop/close + /// resources associated with it. This method means that the current task + /// will no longer receive any notifications due to this method and the + /// I/O object itself is likely no longer usable. + /// + /// * `Ok(Async::NotReady)` - indicates that shutdown is initiated but could + /// not complete just yet. This may mean that more I/O needs to happen to + /// continue this shutdown operation. The current task is scheduled to + /// receive a notification when it's otherwise ready to continue the + /// shutdown operation. When woken up this method should be called again. + /// + /// * `Err(e)` - indicates a fatal error has happened with shutdown, + /// indicating that the shutdown operation did not complete successfully. + /// This typically means that the I/O object is no longer usable. + /// + /// # Errors + /// + /// This function can return normal I/O errors through `Err`, described + /// above. Additionally this method may also render the underlying + /// `Write::write` method no longer usable (e.g. will return errors in the + /// future). It's recommended that once `shutdown` is called the + /// `write` method is no longer called. + /// + /// # Panics + /// + /// This function will panic if not called within the context of a future's + /// task. + fn shutdown(&mut self) -> Poll<(), std_io::Error>; + + /// Write a `Buf` into this value, returning how many bytes were written. + /// + /// Note that this method will advance the `buf` provided automatically by + /// the number of bytes written. + fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error> + where Self: Sized, + { + if !buf.has_remaining() { + return Ok(Async::Ready(0)); + } + + let n = try_nb!(self.write(buf.bytes())); + buf.advance(n); + Ok(Async::Ready(n)) + } +} + +impl<T: ?Sized + AsyncWrite> AsyncWrite for Box<T> { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + (**self).shutdown() + } +} +impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + (**self).shutdown() + } +} + +impl AsyncRead for std_io::Repeat { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } +} + +impl AsyncWrite for std_io::Sink { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + Ok(().into()) + } +} + +// TODO: Implement `prepare_uninitialized_buffer` for `io::Take`. +// This is blocked on rust-lang/rust#27269 +impl<T: AsyncRead> AsyncRead for std_io::Take<T> { +} + +// TODO: Implement `prepare_uninitialized_buffer` when upstream exposes inner +// parts +impl<T, U> AsyncRead for std_io::Chain<T, U> + where T: AsyncRead, + U: AsyncRead, +{ +} + +impl<T: AsyncWrite> AsyncWrite for std_io::BufWriter<T> { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + try_nb!(self.flush()); + self.get_mut().shutdown() + } +} + +impl<T: AsyncRead> AsyncRead for std_io::BufReader<T> { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.get_ref().prepare_uninitialized_buffer(buf) + } +} + +impl<T: AsRef<[u8]>> AsyncRead for std_io::Cursor<T> { +} + +impl<'a> AsyncWrite for std_io::Cursor<&'a mut [u8]> { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + Ok(().into()) + } +} + +impl AsyncWrite for std_io::Cursor<Vec<u8>> { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + Ok(().into()) + } +} + +impl AsyncWrite for std_io::Cursor<Box<[u8]>> { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + Ok(().into()) + } +} diff --git a/tokio-io/src/copy.rs b/tokio-io/src/io/copy.rs index 8b8c0fe4..8b8c0fe4 100644 --- a/tokio-io/src/copy.rs +++ b/tokio-io/src/io/copy.rs diff --git a/tokio-io/src/flush.rs b/tokio-io/src/io/flush.rs index 29065f9f..29065f9f 100644 --- a/tokio-io/src/flush.rs +++ b/tokio-io/src/io/flush.rs diff --git a/tokio-io/src/io.rs b/tokio-io/src/io/mod.rs index 0d8b5e98..95eea095 100644 --- a/tokio-io/src/io.rs +++ b/tokio-io/src/io/mod.rs @@ -9,15 +9,24 @@ //! [found online]: https://tokio.rs/docs/getting-started/core/ //! [low level details]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/ +mod copy; +mod flush; +mod read; +mod read_exact; +mod read_to_end; +mod read_until; +mod shutdown; +mod write_all; + pub use allow_std::AllowStdIo; -pub use copy::{copy, Copy}; -pub use flush::{flush, Flush}; +pub use self::copy::{copy, Copy}; +pub use self::flush::{flush, Flush}; pub use lines::{lines, Lines}; -pub use read::{read, Read}; -pub use read_exact::{read_exact, ReadExact}; -pub use read_to_end::{read_to_end, ReadToEnd}; -pub use read_until::{read_until, ReadUntil}; -pub use shutdown::{shutdown, Shutdown}; +pub use self::read::{read, Read}; +pub use self::read_exact::{read_exact, ReadExact}; +pub use self::read_to_end::{read_to_end, ReadToEnd}; +pub use self::read_until::{read_until, ReadUntil}; +pub use self::shutdown::{shutdown, Shutdown}; pub use split::{ReadHalf, WriteHalf}; pub use window::Window; -pub use write_all::{write_all, WriteAll}; +pub use self::write_all::{write_all, WriteAll}; diff --git a/tokio-io/src/read.rs b/tokio-io/src/io/read.rs index abfb459c..abfb459c 100644 --- a/tokio-io/src/read.rs +++ b/tokio-io/src/io/read.rs diff --git a/tokio-io/src/read_exact.rs b/tokio-io/src/io/read_exact.rs index 14251242..14251242 100644 --- a/tokio-io/src/read_exact.rs +++ b/tokio-io/src/io/read_exact.rs diff --git a/tokio-io/src/read_to_end.rs b/tokio-io/src/io/read_to_end.rs index 519f8a76..519f8a76 100644 --- a/tokio-io/src/read_to_end.rs +++ b/tokio-io/src/io/read_to_end.rs diff --git a/tokio-io/src/read_until.rs b/tokio-io/src/io/read_until.rs index 73c98292..73c98292 100644 --- a/tokio-io/src/read_until.rs +++ b/tokio-io/src/io/read_until.rs diff --git a/tokio-io/src/shutdown.rs b/tokio-io/src/io/shutdown.rs index 96a8886d..96a8886d 100644 --- a/tokio-io/src/shutdown.rs +++ b/tokio-io/src/io/shutdown.rs diff --git a/tokio-io/src/write_all.rs b/tokio-io/src/io/write_all.rs index d000f1b9..d000f1b9 100644 --- a/tokio-io/src/write_all.rs +++ b/tokio-io/src/io/write_all.rs diff --git a/tokio-io/src/lib.rs b/tokio-io/src/lib.rs index 58dbeedf..75344e9a 100644 --- a/tokio-io/src/lib.rs +++ b/tokio-io/src/lib.rs @@ -17,10 +17,8 @@ extern crate futures; extern crate bytes; use std::io as std_io; -use std::io::Write; -use futures::{Async, Future, Poll, Stream}; -use bytes::{Buf, BufMut}; +use futures::{Future, Stream}; /// A convenience typedef around a `Future` whose error component is `io::Error` pub type IoFuture<T> = Box<Future<Item = T, Error = std_io::Error> + Send>; @@ -49,337 +47,18 @@ pub mod io; pub mod codec; mod allow_std; -mod copy; -mod flush; +mod async_read; +mod async_write; mod framed; mod framed_read; mod framed_write; mod length_delimited; mod lines; -mod read; -mod read_exact; -mod read_to_end; -mod read_until; -mod shutdown; mod split; mod window; -mod write_all; -use codec::{Decoder, Encoder, Framed}; -use split::{ReadHalf, WriteHalf}; - -/// A trait for readable objects which operated in an asynchronous and -/// futures-aware fashion. -/// -/// This trait inherits from `io::Read` and indicates as a marker that an I/O -/// object is **nonblocking**, meaning that it will return an error instead of -/// blocking when bytes are unavailable, but the stream hasn't reached EOF. -/// Specifically this means that the `read` function for types that implement -/// this trait can have a few return values: -/// -/// * `Ok(n)` means that `n` bytes of data was immediately read and placed into -/// the output buffer, where `n` == 0 implies that EOF has been reached. -/// * `Err(e) if e.kind() == ErrorKind::WouldBlock` means that no data was read -/// into the buffer provided. The I/O object is not currently readable but may -/// become readable in the future. Most importantly, **the current future's -/// task is scheduled to get unparked when the object is readable**. This -/// means that like `Future::poll` you'll receive a notification when the I/O -/// object is readable again. -/// * `Err(e)` for other errors are standard I/O errors coming from the -/// underlying object. -/// -/// This trait importantly means that the `read` method only works in the -/// context of a future's task. The object may panic if used outside of a task. -pub trait AsyncRead: std_io::Read { - /// Prepares an uninitialized buffer to be safe to pass to `read`. Returns - /// `true` if the supplied buffer was zeroed out. - /// - /// While it would be highly unusual, implementations of [`io::Read`] are - /// able to read data from the buffer passed as an argument. Because of - /// this, the buffer passed to [`io::Read`] must be initialized memory. In - /// situations where large numbers of buffers are used, constantly having to - /// zero out buffers can be expensive. - /// - /// This function does any necessary work to prepare an uninitialized buffer - /// to be safe to pass to `read`. If `read` guarantees to never attempt read - /// data out of the supplied buffer, then `prepare_uninitialized_buffer` - /// doesn't need to do any work. - /// - /// If this function returns `true`, then the memory has been zeroed out. - /// This allows implementations of `AsyncRead` which are composed of - /// multiple sub implementations to efficiently implement - /// `prepare_uninitialized_buffer`. - /// - /// This function isn't actually `unsafe` to call but `unsafe` to implement. - /// The implementor must ensure that either the whole `buf` has been zeroed - /// or `read_buf()` overwrites the buffer without reading it and returns - /// correct value. - /// - /// This function is called from [`read_buf`]. - /// - /// [`io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html - /// [`read_buf`]: #method.read_buf - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - for i in 0..buf.len() { - buf[i] = 0; - } - - true - } - - /// Pull some bytes from this source into the specified `Buf`, returning - /// how many bytes were read. - /// - /// The `buf` provided will have bytes read into it and the internal cursor - /// will be advanced if any bytes were read. Note that this method typically - /// will not reallocate the buffer provided. - fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error> - where Self: Sized, - { - if !buf.has_remaining_mut() { - return Ok(Async::Ready(0)); - } - - unsafe { - let n = { - let b = buf.bytes_mut(); - - self.prepare_uninitialized_buffer(b); - - try_nb!(self.read(b)) - }; - - buf.advance_mut(n); - Ok(Async::Ready(n)) - } - } - - /// Provides a `Stream` and `Sink` interface for reading and writing to this - /// `Io` object, using `Decode` and `Encode` to read and write the raw data. - /// - /// Raw I/O objects work with byte sequences, but higher-level code usually - /// wants to batch these into meaningful chunks, called "frames". This - /// method layers framing on top of an I/O object, by using the `Codec` - /// traits to handle encoding and decoding of messages frames. Note that - /// the incoming and outgoing frame types may be distinct. - /// - /// This function returns a *single* object that is both `Stream` and - /// `Sink`; grouping this into a single object is often useful for layering - /// things like gzip or TLS, which require both read and write access to the - /// underlying object. - /// - /// If you want to work more directly with the streams and sink, consider - /// calling `split` on the `Framed` returned by this method, which will - /// break them into separate objects, allowing them to interact more easily. - fn framed<T: Encoder + Decoder>(self, codec: T) -> Framed<Self, T> - where Self: AsyncWrite + Sized, - { - framed::framed(self, codec) - } - - /// Helper method for splitting this read/write object into two halves. - /// - /// The two halves returned implement the `Read` and `Write` traits, - /// respectively. - fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>) - where Self: AsyncWrite + Sized, - { - split::split(self) - } -} - -impl<T: ?Sized + AsyncRead> AsyncRead for Box<T> { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - (**self).prepare_uninitialized_buffer(buf) - } -} - -impl<'a, T: ?Sized + AsyncRead> AsyncRead for &'a mut T { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - (**self).prepare_uninitialized_buffer(buf) - } -} - -impl<'a> AsyncRead for &'a [u8] { - unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [u8]) -> bool { - false - } -} - -/// A trait for writable objects which operated in an asynchronous and -/// futures-aware fashion. -/// -/// This trait inherits from `io::Write` and indicates that an I/O object is -/// **nonblocking**, meaning that it will return an error instead of blocking -/// when bytes cannot currently be written, but hasn't closed. Specifically -/// this means that the `write` function for types that implement this trait -/// can have a few return values: -/// -/// * `Ok(n)` means that `n` bytes of data was immediately written . -/// * `Err(e) if e.kind() == ErrorKind::WouldBlock` means that no data was -/// written from the buffer provided. The I/O object is not currently -/// writable but may become writable in the future. Most importantly, **the -/// current future's task is scheduled to get unparked when the object is -/// readable**. This means that like `Future::poll` you'll receive a -/// notification when the I/O object is writable again. -/// * `Err(e)` for other errors are standard I/O errors coming from the -/// underlying object. -/// -/// This trait importantly means that the `write` method only works in the -/// context of a future's task. The object may panic if used outside of a task. -/// -/// Note that this trait also represents that the `Write::flush` method works -/// very similarly to the `write` method, notably that `Ok(())` means that the -/// writer has successfully been flushed, a "would block" error means that the -/// current task is ready to receive a notification when flushing can make more -/// progress, and otherwise normal errors can happen as well. -pub trait AsyncWrite: std_io::Write { - /// Initiates or attempts to shut down this writer, returning success when - /// the I/O connection has completely shut down. - /// - /// This method is intended to be used for asynchronous shutdown of I/O - /// connections. For example this is suitable for implementing shutdown of a - /// TLS connection or calling `TcpStream::shutdown` on a proxied connection. - /// Protocols sometimes need to flush out final pieces of data or otherwise - /// perform a graceful shutdown handshake, reading/writing more data as - /// appropriate. This method is the hook for such protocols to implement the - /// graceful shutdown logic. - /// - /// This `shutdown` method is required by implementors of the - /// `AsyncWrite` trait. Wrappers typically just want to proxy this call - /// through to the wrapped type, and base types will typically implement - /// shutdown logic here or just return `Ok(().into())`. Note that if you're - /// wrapping an underlying `AsyncWrite` a call to `shutdown` implies that - /// transitively the entire stream has been shut down. After your wrapper's - /// shutdown logic has been executed you should shut down the underlying - /// stream. - /// - /// Invocation of a `shutdown` implies an invocation of `flush`. Once this - /// method returns `Ready` it implies that a flush successfully happened - /// before the shutdown happened. That is, callers don't need to call - /// `flush` before calling `shutdown`. They can rely that by calling - /// `shutdown` any pending buffered data will be written out. - /// - /// # Return value - /// - /// This function returns a `Poll<(), io::Error>` classified as such: - /// - /// * `Ok(Async::Ready(()))` - indicates that the connection was - /// successfully shut down and is now safe to deallocate/drop/close - /// resources associated with it. This method means that the current task - /// will no longer receive any notifications due to this method and the - /// I/O object itself is likely no longer usable. - /// - /// * `Ok(Async::NotReady)` - indicates that shutdown is initiated but could - /// not complete just yet. This may mean that more I/O needs to happen to - /// continue this shutdown operation. The current task is scheduled to - /// receive a notification when it's otherwise ready to continue the - /// shutdown operation. When woken up this method should be called again. - /// - /// * `Err(e)` - indicates a fatal error has happened with shutdown, - /// indicating that the shutdown operation did not complete successfully. - /// This typically means that the I/O object is no longer usable. - /// - /// # Errors - /// - /// This function can return normal I/O errors through `Err`, described - /// above. Additionally this method may also render the underlying - /// `Write::write` method no longer usable (e.g. will return errors in the - /// future). It's recommended that once `shutdown` is called the - /// `write` method is no longer called. - /// - /// # Panics - /// - /// This function will panic if not called within the context of a future's - /// task. - fn shutdown(&mut self) -> Poll<(), std_io::Error>; - - /// Write a `Buf` into this value, returning how many bytes were written. - /// - /// Note that this method will advance the `buf` provided automatically by - /// the number of bytes written. - fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error> - where Self: Sized, - { - if !buf.has_remaining() { - return Ok(Async::Ready(0)); - } - - let n = try_nb!(self.write(buf.bytes())); - buf.advance(n); - Ok(Async::Ready(n)) - } -} - -impl<T: ?Sized + AsyncWrite> AsyncWrite for Box<T> { - fn shutdown(&mut self) -> Poll<(), std_io::Error> { - (**self).shutdown() - } -} -impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T { - fn shutdown(&mut self) -> Poll<(), std_io::Error> { - (**self).shutdown() - } -} - -impl AsyncRead for std_io::Repeat { - unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { - false - } -} - -impl AsyncWrite for std_io::Sink { - fn shutdown(&mut self) -> Poll<(), std_io::Error> { - Ok(().into()) - } -} - -// TODO: Implement `prepare_uninitialized_buffer` for `io::Take`. -// This is blocked on rust-lang/rust#27269 -impl<T: AsyncRead> AsyncRead for std_io::Take<T> { -} - -// TODO: Implement `prepare_uninitialized_buffer` when upstream exposes inner -// parts -impl<T, U> AsyncRead for std_io::Chain<T, U> - where T: AsyncRead, - U: AsyncRead, -{ -} - -impl<T: AsyncWrite> AsyncWrite for std_io::BufWriter<T> { - fn shutdown(&mut self) -> Poll<(), std_io::Error> { - try_nb!(self.flush()); - self.get_mut().shutdown() - } -} - -impl<T: AsyncRead> AsyncRead for std_io::BufReader<T> { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.get_ref().prepare_uninitialized_buffer(buf) - } -} - -impl<T: AsRef<[u8]>> AsyncRead for std_io::Cursor<T> { -} - -impl<'a> AsyncWrite for std_io::Cursor<&'a mut [u8]> { - fn shutdown(&mut self) -> Poll<(), std_io::Error> { - Ok(().into()) - } -} - -impl AsyncWrite for std_io::Cursor<Vec<u8>> { - fn shutdown(&mut self) -> Poll<(), std_io::Error> { - Ok(().into()) - } -} - -impl AsyncWrite for std_io::Cursor<Box<[u8]>> { - fn shutdown(&mut self) -> Poll<(), std_io::Error> { - Ok(().into()) - } -} +pub use self::async_read::AsyncRead; +pub use self::async_write::AsyncWrite; fn _assert_objects() { fn _assert<T>() {} |