diff options
author | Michael Howell <michael@notriddle.com> | 2019-12-10 22:48:24 -0700 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2019-12-10 21:48:24 -0800 |
commit | 24cd6d67f76f122f67cbbb101d555018fc27820b (patch) | |
tree | 50fa1165a920a0c6b67de6335b745d92b6b5a2e7 | |
parent | 975576952f33c64e4faaa616f67ae9d6b596e4aa (diff) |
io: add AsyncSeek trait (#1924)
Co-authored-by: Taiki Endo <te316e89@gmail.com>
-rw-r--r-- | tokio/src/fs/file.rs | 135 | ||||
-rw-r--r-- | tokio/src/io/async_seek.rs | 97 | ||||
-rw-r--r-- | tokio/src/io/mod.rs | 10 | ||||
-rw-r--r-- | tokio/src/io/seek.rs | 56 | ||||
-rw-r--r-- | tokio/src/io/util/async_seek_ext.rs | 38 | ||||
-rw-r--r-- | tokio/src/io/util/mod.rs | 3 | ||||
-rw-r--r-- | tokio/src/prelude.rs | 2 |
7 files changed, 280 insertions, 61 deletions
diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index bb1084a1..20139ec1 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -5,7 +5,7 @@ use self::State::*; use crate::fs::{asyncify, sys}; use crate::io::blocking::Buf; -use crate::io::{AsyncRead, AsyncWrite}; +use crate::io::{AsyncRead, AsyncSeek, AsyncWrite}; use std::fmt; use std::fs::{Metadata, Permissions}; @@ -176,63 +176,6 @@ impl File { } } - /// Seek to an offset, in bytes, in a stream. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::fs::File; - /// use tokio::prelude::*; - /// - /// use std::io::SeekFrom; - /// - /// # async fn dox() -> std::io::Result<()> { - /// let mut file = File::open("foo.txt").await?; - /// file.seek(SeekFrom::Start(6)).await?; - /// - /// let mut contents = vec![0u8; 10]; - /// file.read_exact(&mut contents).await?; - /// # Ok(()) - /// # } - /// ``` - pub async fn seek(&mut self, mut pos: SeekFrom) -> io::Result<u64> { - self.complete_inflight().await; - - let mut buf = match self.state { - Idle(ref mut buf_cell) => buf_cell.take().unwrap(), - _ => unreachable!(), - }; - - // Factor in any unread data from the buf - if !buf.is_empty() { - let n = buf.discard_read(); - - if let SeekFrom::Current(ref mut offset) = pos { - *offset += n; - } - } - - let std = self.std.clone(); - - // Start the operation - self.state = Busy(sys::run(move || { - let res = (&*std).seek(pos); - (Operation::Seek(res), buf) - })); - - let (op, buf) = match self.state { - Idle(_) => unreachable!(), - Busy(ref mut rx) => rx.await.unwrap(), - }; - - self.state = Idle(Some(buf)); - - match op { - Operation::Seek(res) => res, - _ => unreachable!(), - } - } - /// Attempts to sync all OS-internal metadata to disk. /// /// This function will attempt to ensure that all in-core data reaches the @@ -548,6 +491,82 @@ impl AsyncRead for File { } } +impl AsyncSeek for File { + fn start_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut pos: SeekFrom, + ) -> Poll<io::Result<()>> { + if let Some(e) = self.last_write_err.take() { + return Ready(Err(e.into())); + } + + loop { + match self.state { + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap(); + + // Factor in any unread data from the buf + if !buf.is_empty() { + let n = buf.discard_read(); + + if let SeekFrom::Current(ref mut offset) = pos { + *offset += n; + } + } + + let std = self.std.clone(); + + self.state = Busy(sys::run(move || { + let res = (&*std).seek(pos); + (Operation::Seek(res), buf) + })); + + return Ready(Ok(())); + } + Busy(ref mut rx) => { + let (op, buf) = ready!(Pin::new(rx).poll(cx))?; + self.state = Idle(Some(buf)); + + match op { + Operation::Read(_) => {} + Operation::Write(Err(e)) => { + self.last_write_err = Some(e.kind()); + } + Operation::Write(_) => {} + Operation::Seek(_) => {} + } + } + } + } + } + + fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { + if let Some(e) = self.last_write_err.take() { + return Ready(Err(e.into())); + } + + loop { + match self.state { + Idle(_) => panic!("must call start_seek before calling poll_complete"), + Busy(ref mut rx) => { + let (op, buf) = ready!(Pin::new(rx).poll(cx))?; + self.state = Idle(Some(buf)); + + match op { + Operation::Read(_) => {} + Operation::Write(Err(e)) => { + self.last_write_err = Some(e.kind()); + } + Operation::Write(_) => {} + Operation::Seek(res) => return Ready(res), + } + } + } + } + } +} + impl AsyncWrite for File { fn poll_write( mut self: Pin<&mut Self>, diff --git a/tokio/src/io/async_seek.rs b/tokio/src/io/async_seek.rs new file mode 100644 index 00000000..d36d404f --- /dev/null +++ b/tokio/src/io/async_seek.rs @@ -0,0 +1,97 @@ +use std::io::{self, SeekFrom}; +use std::ops::DerefMut; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Seek bytes asynchronously. +/// +/// This trait is analogous to the `std::io::Seek` trait, but integrates +/// with the asynchronous task system. In particular, the `start_seek` +/// method, unlike `Seek::seek`, will not block the calling thread. +pub trait AsyncSeek { + /// Attempt to seek to an offset, in bytes, in a stream. + /// + /// A seek beyond the end of a stream is allowed, but behavior is defined + /// by the implementation. + /// + /// If this function returns successfully, then the job has been submitted. + /// To find out when it completes, call `poll_complete`. + fn start_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + position: SeekFrom, + ) -> Poll<io::Result<()>>; + + /// Wait for a seek operation to complete. + /// + /// If the seek operation completed successfully, + /// this method returns the new position from the start of the stream. + /// That position can be used later with [`SeekFrom::Start`]. + /// + /// # Errors + /// + /// Seeking to a negative offset is considered an error. + /// + /// # Panics + /// + /// Calling this method without calling `start_seek` first is an error. + fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>; +} + +macro_rules! deref_async_seek { + () => { + fn start_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<io::Result<()>> { + Pin::new(&mut **self).start_seek(cx, pos) + } + + fn poll_complete( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<u64>> { + Pin::new(&mut **self).poll_complete(cx) + } + } +} + +impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for Box<T> { + deref_async_seek!(); +} + +impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for &mut T { + deref_async_seek!(); +} + +impl<P> AsyncSeek for Pin<P> +where + P: DerefMut + Unpin, + P::Target: AsyncSeek, +{ + fn start_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<io::Result<()>> { + self.get_mut().as_mut().start_seek(cx, pos) + } + + fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { + self.get_mut().as_mut().poll_complete(cx) + } +} + +impl<T: AsRef<[u8]> + Unpin> AsyncSeek for io::Cursor<T> { + fn start_seek( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<io::Result<()>> { + Poll::Ready(io::Seek::seek(&mut *self, pos).map(drop)) + } + fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<u64>> { + Poll::Ready(Ok(self.get_mut().position())) + } +} diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 29d700b1..dd48acda 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -164,6 +164,9 @@ pub use self::async_buf_read::AsyncBufRead; mod async_read; pub use self::async_read::AsyncRead; +mod async_seek; +pub use self::async_seek::AsyncSeek; + mod async_write; pub use self::async_write::AsyncWrite; @@ -192,10 +195,13 @@ cfg_io_util! { mod split; pub use split::{split, ReadHalf, WriteHalf}; + pub(crate) mod seek; + pub use self::seek::Seek; + pub(crate) mod util; pub use util::{ - copy, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufStream, - BufWriter, Copy, Empty, Lines, Repeat, Sink, Split, Take, + copy, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, + BufStream, BufWriter, Copy, Empty, Lines, Repeat, Sink, Split, Take, }; // Re-export io::Error so that users don't have to deal with conflicts when diff --git a/tokio/src/io/seek.rs b/tokio/src/io/seek.rs new file mode 100644 index 00000000..080141f0 --- /dev/null +++ b/tokio/src/io/seek.rs @@ -0,0 +1,56 @@ +use crate::io::AsyncSeek; +use std::future::Future; +use std::io::{self, SeekFrom}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Future for the [`seek`](crate::io::AsyncSeekExt::seek) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Seek<'a, S: ?Sized> { + seek: &'a mut S, + pos: Option<SeekFrom>, +} + +pub(crate) fn seek<S>(seek: &mut S, pos: SeekFrom) -> Seek<'_, S> +where + S: AsyncSeek + ?Sized + Unpin, +{ + Seek { + seek, + pos: Some(pos), + } +} + +impl<S> Future for Seek<'_, S> +where + S: AsyncSeek + ?Sized + Unpin, +{ + type Output = io::Result<u64>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let me = &mut *self; + match me.pos { + Some(pos) => { + match Pin::new(&mut me.seek).start_seek(cx, pos) { + Poll::Ready(Ok(())) => me.pos = None, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => (), + }; + Poll::Pending + } + None => Pin::new(&mut me.seek).poll_complete(cx), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn assert_unpin() { + use std::marker::PhantomPinned; + crate::is_unpin::<Seek<'_, PhantomPinned>>(); + } +} diff --git a/tokio/src/io/util/async_seek_ext.rs b/tokio/src/io/util/async_seek_ext.rs new file mode 100644 index 00000000..aeae4cbd --- /dev/null +++ b/tokio/src/io/util/async_seek_ext.rs @@ -0,0 +1,38 @@ +use crate::io::seek::{seek, Seek}; +use crate::io::AsyncSeek; +use std::io::SeekFrom; + +/// An extension trait which adds utility methods to `AsyncSeek` types. +pub trait AsyncSeekExt: AsyncSeek { + /// Creates a future which will seek an IO object, and then yield the + /// new position in the object and the object itself. + /// + /// In the case of an error the buffer and the object will be discarded, with + /// the error yielded. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::prelude::*; + /// + /// use std::io::SeekFrom; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::open("foo.txt").await?; + /// file.seek(SeekFrom::Start(6)).await?; + /// + /// let mut contents = vec![0u8; 10]; + /// file.read_exact(&mut contents).await?; + /// # Ok(()) + /// # } + /// ``` + fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self> + where + Self: Unpin, + { + seek(self, pos) + } +} + +impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
\ No newline at end of file diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index c06c070d..d239e1a2 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -7,6 +7,9 @@ cfg_io_util! { mod async_read_ext; pub use async_read_ext::AsyncReadExt; + mod async_seek_ext; + pub use async_seek_ext::AsyncSeekExt; + mod async_write_ext; pub use async_write_ext::AsyncWriteExt; diff --git a/tokio/src/prelude.rs b/tokio/src/prelude.rs index d4e790be..1909f9da 100644 --- a/tokio/src/prelude.rs +++ b/tokio/src/prelude.rs @@ -17,5 +17,5 @@ pub use crate::io::{self, AsyncBufRead, AsyncRead, AsyncWrite}; cfg_io_util! { #[doc(no_inline)] - pub use crate::io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _}; + pub use crate::io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _}; } |