summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Howell <michael@notriddle.com>2019-12-10 22:48:24 -0700
committerCarl Lerche <me@carllerche.com>2019-12-10 21:48:24 -0800
commit24cd6d67f76f122f67cbbb101d555018fc27820b (patch)
tree50fa1165a920a0c6b67de6335b745d92b6b5a2e7
parent975576952f33c64e4faaa616f67ae9d6b596e4aa (diff)
io: add AsyncSeek trait (#1924)
Co-authored-by: Taiki Endo <te316e89@gmail.com>
-rw-r--r--tokio/src/fs/file.rs135
-rw-r--r--tokio/src/io/async_seek.rs97
-rw-r--r--tokio/src/io/mod.rs10
-rw-r--r--tokio/src/io/seek.rs56
-rw-r--r--tokio/src/io/util/async_seek_ext.rs38
-rw-r--r--tokio/src/io/util/mod.rs3
-rw-r--r--tokio/src/prelude.rs2
7 files changed, 280 insertions, 61 deletions
diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs
index bb1084a1..20139ec1 100644
--- a/tokio/src/fs/file.rs
+++ b/tokio/src/fs/file.rs
@@ -5,7 +5,7 @@
use self::State::*;
use crate::fs::{asyncify, sys};
use crate::io::blocking::Buf;
-use crate::io::{AsyncRead, AsyncWrite};
+use crate::io::{AsyncRead, AsyncSeek, AsyncWrite};
use std::fmt;
use std::fs::{Metadata, Permissions};
@@ -176,63 +176,6 @@ impl File {
}
}
- /// Seek to an offset, in bytes, in a stream.
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use tokio::fs::File;
- /// use tokio::prelude::*;
- ///
- /// use std::io::SeekFrom;
- ///
- /// # async fn dox() -> std::io::Result<()> {
- /// let mut file = File::open("foo.txt").await?;
- /// file.seek(SeekFrom::Start(6)).await?;
- ///
- /// let mut contents = vec![0u8; 10];
- /// file.read_exact(&mut contents).await?;
- /// # Ok(())
- /// # }
- /// ```
- pub async fn seek(&mut self, mut pos: SeekFrom) -> io::Result<u64> {
- self.complete_inflight().await;
-
- let mut buf = match self.state {
- Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
- _ => unreachable!(),
- };
-
- // Factor in any unread data from the buf
- if !buf.is_empty() {
- let n = buf.discard_read();
-
- if let SeekFrom::Current(ref mut offset) = pos {
- *offset += n;
- }
- }
-
- let std = self.std.clone();
-
- // Start the operation
- self.state = Busy(sys::run(move || {
- let res = (&*std).seek(pos);
- (Operation::Seek(res), buf)
- }));
-
- let (op, buf) = match self.state {
- Idle(_) => unreachable!(),
- Busy(ref mut rx) => rx.await.unwrap(),
- };
-
- self.state = Idle(Some(buf));
-
- match op {
- Operation::Seek(res) => res,
- _ => unreachable!(),
- }
- }
-
/// Attempts to sync all OS-internal metadata to disk.
///
/// This function will attempt to ensure that all in-core data reaches the
@@ -548,6 +491,82 @@ impl AsyncRead for File {
}
}
+impl AsyncSeek for File {
+ fn start_seek(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ mut pos: SeekFrom,
+ ) -> Poll<io::Result<()>> {
+ if let Some(e) = self.last_write_err.take() {
+ return Ready(Err(e.into()));
+ }
+
+ loop {
+ match self.state {
+ Idle(ref mut buf_cell) => {
+ let mut buf = buf_cell.take().unwrap();
+
+ // Factor in any unread data from the buf
+ if !buf.is_empty() {
+ let n = buf.discard_read();
+
+ if let SeekFrom::Current(ref mut offset) = pos {
+ *offset += n;
+ }
+ }
+
+ let std = self.std.clone();
+
+ self.state = Busy(sys::run(move || {
+ let res = (&*std).seek(pos);
+ (Operation::Seek(res), buf)
+ }));
+
+ return Ready(Ok(()));
+ }
+ Busy(ref mut rx) => {
+ let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
+ self.state = Idle(Some(buf));
+
+ match op {
+ Operation::Read(_) => {}
+ Operation::Write(Err(e)) => {
+ self.last_write_err = Some(e.kind());
+ }
+ Operation::Write(_) => {}
+ Operation::Seek(_) => {}
+ }
+ }
+ }
+ }
+ }
+
+ fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
+ if let Some(e) = self.last_write_err.take() {
+ return Ready(Err(e.into()));
+ }
+
+ loop {
+ match self.state {
+ Idle(_) => panic!("must call start_seek before calling poll_complete"),
+ Busy(ref mut rx) => {
+ let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
+ self.state = Idle(Some(buf));
+
+ match op {
+ Operation::Read(_) => {}
+ Operation::Write(Err(e)) => {
+ self.last_write_err = Some(e.kind());
+ }
+ Operation::Write(_) => {}
+ Operation::Seek(res) => return Ready(res),
+ }
+ }
+ }
+ }
+ }
+}
+
impl AsyncWrite for File {
fn poll_write(
mut self: Pin<&mut Self>,
diff --git a/tokio/src/io/async_seek.rs b/tokio/src/io/async_seek.rs
new file mode 100644
index 00000000..d36d404f
--- /dev/null
+++ b/tokio/src/io/async_seek.rs
@@ -0,0 +1,97 @@
+use std::io::{self, SeekFrom};
+use std::ops::DerefMut;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Seek bytes asynchronously.
+///
+/// This trait is analogous to the `std::io::Seek` trait, but integrates
+/// with the asynchronous task system. In particular, the `start_seek`
+/// method, unlike `Seek::seek`, will not block the calling thread.
+pub trait AsyncSeek {
+ /// Attempt to seek to an offset, in bytes, in a stream.
+ ///
+ /// A seek beyond the end of a stream is allowed, but behavior is defined
+ /// by the implementation.
+ ///
+ /// If this function returns successfully, then the job has been submitted.
+ /// To find out when it completes, call `poll_complete`.
+ fn start_seek(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ position: SeekFrom,
+ ) -> Poll<io::Result<()>>;
+
+ /// Wait for a seek operation to complete.
+ ///
+ /// If the seek operation completed successfully,
+ /// this method returns the new position from the start of the stream.
+ /// That position can be used later with [`SeekFrom::Start`].
+ ///
+ /// # Errors
+ ///
+ /// Seeking to a negative offset is considered an error.
+ ///
+ /// # Panics
+ ///
+ /// Calling this method without calling `start_seek` first is an error.
+ fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>;
+}
+
+macro_rules! deref_async_seek {
+ () => {
+ fn start_seek(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<io::Result<()>> {
+ Pin::new(&mut **self).start_seek(cx, pos)
+ }
+
+ fn poll_complete(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<u64>> {
+ Pin::new(&mut **self).poll_complete(cx)
+ }
+ }
+}
+
+impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for Box<T> {
+ deref_async_seek!();
+}
+
+impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for &mut T {
+ deref_async_seek!();
+}
+
+impl<P> AsyncSeek for Pin<P>
+where
+ P: DerefMut + Unpin,
+ P::Target: AsyncSeek,
+{
+ fn start_seek(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<io::Result<()>> {
+ self.get_mut().as_mut().start_seek(cx, pos)
+ }
+
+ fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
+ self.get_mut().as_mut().poll_complete(cx)
+ }
+}
+
+impl<T: AsRef<[u8]> + Unpin> AsyncSeek for io::Cursor<T> {
+ fn start_seek(
+ mut self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<io::Result<()>> {
+ Poll::Ready(io::Seek::seek(&mut *self, pos).map(drop))
+ }
+ fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<u64>> {
+ Poll::Ready(Ok(self.get_mut().position()))
+ }
+}
diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs
index 29d700b1..dd48acda 100644
--- a/tokio/src/io/mod.rs
+++ b/tokio/src/io/mod.rs
@@ -164,6 +164,9 @@ pub use self::async_buf_read::AsyncBufRead;
mod async_read;
pub use self::async_read::AsyncRead;
+mod async_seek;
+pub use self::async_seek::AsyncSeek;
+
mod async_write;
pub use self::async_write::AsyncWrite;
@@ -192,10 +195,13 @@ cfg_io_util! {
mod split;
pub use split::{split, ReadHalf, WriteHalf};
+ pub(crate) mod seek;
+ pub use self::seek::Seek;
+
pub(crate) mod util;
pub use util::{
- copy, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufStream,
- BufWriter, Copy, Empty, Lines, Repeat, Sink, Split, Take,
+ copy, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader,
+ BufStream, BufWriter, Copy, Empty, Lines, Repeat, Sink, Split, Take,
};
// Re-export io::Error so that users don't have to deal with conflicts when
diff --git a/tokio/src/io/seek.rs b/tokio/src/io/seek.rs
new file mode 100644
index 00000000..080141f0
--- /dev/null
+++ b/tokio/src/io/seek.rs
@@ -0,0 +1,56 @@
+use crate::io::AsyncSeek;
+use std::future::Future;
+use std::io::{self, SeekFrom};
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Future for the [`seek`](crate::io::AsyncSeekExt::seek) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Seek<'a, S: ?Sized> {
+ seek: &'a mut S,
+ pos: Option<SeekFrom>,
+}
+
+pub(crate) fn seek<S>(seek: &mut S, pos: SeekFrom) -> Seek<'_, S>
+where
+ S: AsyncSeek + ?Sized + Unpin,
+{
+ Seek {
+ seek,
+ pos: Some(pos),
+ }
+}
+
+impl<S> Future for Seek<'_, S>
+where
+ S: AsyncSeek + ?Sized + Unpin,
+{
+ type Output = io::Result<u64>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = &mut *self;
+ match me.pos {
+ Some(pos) => {
+ match Pin::new(&mut me.seek).start_seek(cx, pos) {
+ Poll::Ready(Ok(())) => me.pos = None,
+ Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
+ Poll::Pending => (),
+ };
+ Poll::Pending
+ }
+ None => Pin::new(&mut me.seek).poll_complete(cx),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn assert_unpin() {
+ use std::marker::PhantomPinned;
+ crate::is_unpin::<Seek<'_, PhantomPinned>>();
+ }
+}
diff --git a/tokio/src/io/util/async_seek_ext.rs b/tokio/src/io/util/async_seek_ext.rs
new file mode 100644
index 00000000..aeae4cbd
--- /dev/null
+++ b/tokio/src/io/util/async_seek_ext.rs
@@ -0,0 +1,38 @@
+use crate::io::seek::{seek, Seek};
+use crate::io::AsyncSeek;
+use std::io::SeekFrom;
+
+/// An extension trait which adds utility methods to `AsyncSeek` types.
+pub trait AsyncSeekExt: AsyncSeek {
+ /// Creates a future which will seek an IO object, and then yield the
+ /// new position in the object and the object itself.
+ ///
+ /// In the case of an error the buffer and the object will be discarded, with
+ /// the error yielded.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ /// use tokio::prelude::*;
+ ///
+ /// use std::io::SeekFrom;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let mut file = File::open("foo.txt").await?;
+ /// file.seek(SeekFrom::Start(6)).await?;
+ ///
+ /// let mut contents = vec![0u8; 10];
+ /// file.read_exact(&mut contents).await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self>
+ where
+ Self: Unpin,
+ {
+ seek(self, pos)
+ }
+}
+
+impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {} \ No newline at end of file
diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs
index c06c070d..d239e1a2 100644
--- a/tokio/src/io/util/mod.rs
+++ b/tokio/src/io/util/mod.rs
@@ -7,6 +7,9 @@ cfg_io_util! {
mod async_read_ext;
pub use async_read_ext::AsyncReadExt;
+ mod async_seek_ext;
+ pub use async_seek_ext::AsyncSeekExt;
+
mod async_write_ext;
pub use async_write_ext::AsyncWriteExt;
diff --git a/tokio/src/prelude.rs b/tokio/src/prelude.rs
index d4e790be..1909f9da 100644
--- a/tokio/src/prelude.rs
+++ b/tokio/src/prelude.rs
@@ -17,5 +17,5 @@ pub use crate::io::{self, AsyncBufRead, AsyncRead, AsyncWrite};
cfg_io_util! {
#[doc(no_inline)]
- pub use crate::io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _};
+ pub use crate::io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _};
}