diff options
author | Michael Howell <michael@notriddle.com> | 2019-12-10 22:48:24 -0700 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2019-12-10 21:48:24 -0800 |
commit | 24cd6d67f76f122f67cbbb101d555018fc27820b (patch) | |
tree | 50fa1165a920a0c6b67de6335b745d92b6b5a2e7 /tokio/src/io | |
parent | 975576952f33c64e4faaa616f67ae9d6b596e4aa (diff) |
io: add AsyncSeek trait (#1924)
Co-authored-by: Taiki Endo <te316e89@gmail.com>
Diffstat (limited to 'tokio/src/io')
-rw-r--r-- | tokio/src/io/async_seek.rs | 97 | ||||
-rw-r--r-- | tokio/src/io/mod.rs | 10 | ||||
-rw-r--r-- | tokio/src/io/seek.rs | 56 | ||||
-rw-r--r-- | tokio/src/io/util/async_seek_ext.rs | 38 | ||||
-rw-r--r-- | tokio/src/io/util/mod.rs | 3 |
5 files changed, 202 insertions, 2 deletions
diff --git a/tokio/src/io/async_seek.rs b/tokio/src/io/async_seek.rs new file mode 100644 index 00000000..d36d404f --- /dev/null +++ b/tokio/src/io/async_seek.rs @@ -0,0 +1,97 @@ +use std::io::{self, SeekFrom}; +use std::ops::DerefMut; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Seek bytes asynchronously. +/// +/// This trait is analogous to the `std::io::Seek` trait, but integrates +/// with the asynchronous task system. In particular, the `start_seek` +/// method, unlike `Seek::seek`, will not block the calling thread. +pub trait AsyncSeek { + /// Attempt to seek to an offset, in bytes, in a stream. + /// + /// A seek beyond the end of a stream is allowed, but behavior is defined + /// by the implementation. + /// + /// If this function returns successfully, then the job has been submitted. + /// To find out when it completes, call `poll_complete`. + fn start_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + position: SeekFrom, + ) -> Poll<io::Result<()>>; + + /// Wait for a seek operation to complete. + /// + /// If the seek operation completed successfully, + /// this method returns the new position from the start of the stream. + /// That position can be used later with [`SeekFrom::Start`]. + /// + /// # Errors + /// + /// Seeking to a negative offset is considered an error. + /// + /// # Panics + /// + /// Calling this method without calling `start_seek` first is an error. + fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>; +} + +macro_rules! deref_async_seek { + () => { + fn start_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<io::Result<()>> { + Pin::new(&mut **self).start_seek(cx, pos) + } + + fn poll_complete( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<u64>> { + Pin::new(&mut **self).poll_complete(cx) + } + } +} + +impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for Box<T> { + deref_async_seek!(); +} + +impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for &mut T { + deref_async_seek!(); +} + +impl<P> AsyncSeek for Pin<P> +where + P: DerefMut + Unpin, + P::Target: AsyncSeek, +{ + fn start_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<io::Result<()>> { + self.get_mut().as_mut().start_seek(cx, pos) + } + + fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { + self.get_mut().as_mut().poll_complete(cx) + } +} + +impl<T: AsRef<[u8]> + Unpin> AsyncSeek for io::Cursor<T> { + fn start_seek( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<io::Result<()>> { + Poll::Ready(io::Seek::seek(&mut *self, pos).map(drop)) + } + fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<u64>> { + Poll::Ready(Ok(self.get_mut().position())) + } +} diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 29d700b1..dd48acda 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -164,6 +164,9 @@ pub use self::async_buf_read::AsyncBufRead; mod async_read; pub use self::async_read::AsyncRead; +mod async_seek; +pub use self::async_seek::AsyncSeek; + mod async_write; pub use self::async_write::AsyncWrite; @@ -192,10 +195,13 @@ cfg_io_util! { mod split; pub use split::{split, ReadHalf, WriteHalf}; + pub(crate) mod seek; + pub use self::seek::Seek; + pub(crate) mod util; pub use util::{ - copy, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufStream, - BufWriter, Copy, Empty, Lines, Repeat, Sink, Split, Take, + copy, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, + BufStream, BufWriter, Copy, Empty, Lines, Repeat, Sink, Split, Take, }; // Re-export io::Error so that users don't have to deal with conflicts when diff --git a/tokio/src/io/seek.rs b/tokio/src/io/seek.rs new file mode 100644 index 00000000..080141f0 --- /dev/null +++ b/tokio/src/io/seek.rs @@ -0,0 +1,56 @@ +use crate::io::AsyncSeek; +use std::future::Future; +use std::io::{self, SeekFrom}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Future for the [`seek`](crate::io::AsyncSeekExt::seek) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Seek<'a, S: ?Sized> { + seek: &'a mut S, + pos: Option<SeekFrom>, +} + +pub(crate) fn seek<S>(seek: &mut S, pos: SeekFrom) -> Seek<'_, S> +where + S: AsyncSeek + ?Sized + Unpin, +{ + Seek { + seek, + pos: Some(pos), + } +} + +impl<S> Future for Seek<'_, S> +where + S: AsyncSeek + ?Sized + Unpin, +{ + type Output = io::Result<u64>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let me = &mut *self; + match me.pos { + Some(pos) => { + match Pin::new(&mut me.seek).start_seek(cx, pos) { + Poll::Ready(Ok(())) => me.pos = None, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => (), + }; + Poll::Pending + } + None => Pin::new(&mut me.seek).poll_complete(cx), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn assert_unpin() { + use std::marker::PhantomPinned; + crate::is_unpin::<Seek<'_, PhantomPinned>>(); + } +} diff --git a/tokio/src/io/util/async_seek_ext.rs b/tokio/src/io/util/async_seek_ext.rs new file mode 100644 index 00000000..aeae4cbd --- /dev/null +++ b/tokio/src/io/util/async_seek_ext.rs @@ -0,0 +1,38 @@ +use crate::io::seek::{seek, Seek}; +use crate::io::AsyncSeek; +use std::io::SeekFrom; + +/// An extension trait which adds utility methods to `AsyncSeek` types. +pub trait AsyncSeekExt: AsyncSeek { + /// Creates a future which will seek an IO object, and then yield the + /// new position in the object and the object itself. + /// + /// In the case of an error the buffer and the object will be discarded, with + /// the error yielded. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::prelude::*; + /// + /// use std::io::SeekFrom; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::open("foo.txt").await?; + /// file.seek(SeekFrom::Start(6)).await?; + /// + /// let mut contents = vec![0u8; 10]; + /// file.read_exact(&mut contents).await?; + /// # Ok(()) + /// # } + /// ``` + fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self> + where + Self: Unpin, + { + seek(self, pos) + } +} + +impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
\ No newline at end of file diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index c06c070d..d239e1a2 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -7,6 +7,9 @@ cfg_io_util! { mod async_read_ext; pub use async_read_ext::AsyncReadExt; + mod async_seek_ext; + pub use async_seek_ext::AsyncSeekExt; + mod async_write_ext; pub use async_write_ext::AsyncWriteExt; |