diff options
Diffstat (limited to 'tokio/src/io/async_buf_read.rs')
-rw-r--r-- | tokio/src/io/async_buf_read.rs | 109 |
1 files changed, 109 insertions, 0 deletions
diff --git a/tokio/src/io/async_buf_read.rs b/tokio/src/io/async_buf_read.rs new file mode 100644 index 00000000..1499bd4f --- /dev/null +++ b/tokio/src/io/async_buf_read.rs @@ -0,0 +1,109 @@ +use crate::io::AsyncRead; + +use std::io; +use std::ops::DerefMut; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Read bytes asynchronously. +/// +/// This trait inherits from `std::io::BufRead` and indicates that an I/O object is +/// **non-blocking**. All non-blocking I/O objects must return an error when +/// bytes are unavailable instead of blocking the current thread. +pub trait AsyncBufRead: AsyncRead { + /// Attempt to return the contents of the internal buffer, filling it with more data + /// from the inner reader if it is empty. + /// + /// On success, returns `Poll::Ready(Ok(buf))`. + /// + /// If no data is available for reading, the method returns + /// `Poll::Pending` and arranges for the current task (via + /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes + /// readable or is closed. + /// + /// This function is a lower-level call. It needs to be paired with the + /// [`consume`] method to function properly. When calling this + /// method, none of the contents will be "read" in the sense that later + /// calling [`poll_read`] may return the same contents. As such, [`consume`] must + /// be called with the number of bytes that are consumed from this buffer to + /// ensure that the bytes are never returned twice. + /// + /// An empty buffer returned indicates that the stream has reached EOF. + /// + /// [`poll_read`]: AsyncRead::poll_read + /// [`consume`]: AsyncBufRead::consume + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>; + + /// Tells this buffer that `amt` bytes have been consumed from the buffer, + /// so they should no longer be returned in calls to [`poll_read`]. + /// + /// This function is a lower-level call. It needs to be paired with the + /// [`poll_fill_buf`] method to function properly. This function does + /// not perform any I/O, it simply informs this object that some amount of + /// its buffer, returned from [`poll_fill_buf`], has been consumed and should + /// no longer be returned. As such, this function may do odd things if + /// [`poll_fill_buf`] isn't called before calling it. + /// + /// The `amt` must be `<=` the number of bytes in the buffer returned by + /// [`poll_fill_buf`]. + /// + /// [`poll_read`]: AsyncRead::poll_read + /// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf + fn consume(self: Pin<&mut Self>, amt: usize); +} + +macro_rules! deref_async_buf_read { + () => { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) + -> Poll<io::Result<&[u8]>> + { + Pin::new(&mut **self.get_mut()).poll_fill_buf(cx) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + Pin::new(&mut **self).consume(amt) + } + } +} + +impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for Box<T> { + deref_async_buf_read!(); +} + +impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for &mut T { + deref_async_buf_read!(); +} + +impl<P> AsyncBufRead for Pin<P> +where + P: DerefMut + Unpin, + P::Target: AsyncBufRead, +{ + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + self.get_mut().as_mut().poll_fill_buf(cx) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.get_mut().as_mut().consume(amt) + } +} + +impl AsyncBufRead for &[u8] { + fn poll_fill_buf(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + Poll::Ready(Ok(*self)) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + *self = &self[amt..]; + } +} + +impl<T: AsRef<[u8]> + Unpin> AsyncBufRead for io::Cursor<T> { + fn poll_fill_buf(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + Poll::Ready(io::BufRead::fill_buf(self.get_mut())) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + io::BufRead::consume(self.get_mut(), amt) + } +} |