summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZahari Dichev <zaharidichev@gmail.com>2020-10-08 11:56:01 +0300
committerGitHub <noreply@github.com>2020-10-08 10:56:01 +0200
commit43bd11bf2fa4eaee84383ddbe4c750868f1bb684 (patch)
tree29ab12a3ec77ff0dfc71c6f678a87516c5801456
parentd94ab62c54ac594f912a116965fecbdfb5e1d3e6 (diff)
io: remove Poll from the AsyncSeek::start_seek return value (#2885)
-rw-r--r--tokio-util/src/either.rs8
-rw-r--r--tokio/src/fs/file.rs45
-rw-r--r--tokio/src/io/async_seek.rs45
-rw-r--r--tokio/src/io/seek.rs17
4 files changed, 49 insertions, 66 deletions
diff --git a/tokio-util/src/either.rs b/tokio-util/src/either.rs
index 3c749c1b..f5246af2 100644
--- a/tokio-util/src/either.rs
+++ b/tokio-util/src/either.rs
@@ -125,12 +125,8 @@ where
L: AsyncSeek,
R: AsyncSeek,
{
- fn start_seek(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- position: SeekFrom,
- ) -> Poll<Result<()>> {
- delegate_call!(self.start_seek(cx, position))
+ fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> Result<()> {
+ delegate_call!(self.start_seek(position))
}
fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<u64>> {
diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs
index 319c2c7f..9556a22f 100644
--- a/tokio/src/fs/file.rs
+++ b/tokio/src/fs/file.rs
@@ -86,6 +86,8 @@ pub struct File {
/// error is observed while performing a read, it is saved until the next
/// write / flush call.
last_write_err: Option<io::ErrorKind>,
+
+ pos: u64,
}
#[derive(Debug)]
@@ -199,6 +201,7 @@ impl File {
std: Arc::new(std),
state: State::Idle(Some(Buf::with_capacity(0))),
last_write_err: None,
+ pos: 0,
}
}
@@ -332,7 +335,9 @@ impl File {
self.state = Idle(Some(buf));
match op {
- Operation::Seek(res) => res.map(|_| ()),
+ Operation::Seek(res) => res.map(|pos| {
+ self.pos = pos;
+ }),
_ => unreachable!(),
}
}
@@ -524,9 +529,12 @@ impl AsyncRead for File {
self.last_write_err = Some(e.kind());
self.state = Idle(Some(buf));
}
- Operation::Seek(_) => {
+ Operation::Seek(result) => {
assert!(buf.is_empty());
self.state = Idle(Some(buf));
+ if let Ok(pos) = result {
+ self.pos = pos;
+ }
continue;
}
}
@@ -537,13 +545,10 @@ impl AsyncRead for File {
}
impl AsyncSeek for File {
- fn start_seek(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- mut pos: SeekFrom,
- ) -> Poll<io::Result<()>> {
+ fn start_seek(mut self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> {
loop {
match self.state {
+ Busy(_) => panic!("must wait for poll_complete before calling start_seek"),
Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
@@ -562,22 +567,7 @@ impl AsyncSeek for File {
let res = (&*std).seek(pos);
(Operation::Seek(res), buf)
}));
-
- return Ready(Ok(()));
- }
- Busy(ref mut rx) => {
- let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
- self.state = Idle(Some(buf));
-
- match op {
- Operation::Read(_) => {}
- Operation::Write(Err(e)) => {
- assert!(self.last_write_err.is_none());
- self.last_write_err = Some(e.kind());
- }
- Operation::Write(_) => {}
- Operation::Seek(_) => {}
- }
+ return Ok(());
}
}
}
@@ -586,7 +576,7 @@ impl AsyncSeek for File {
fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
loop {
match self.state {
- Idle(_) => panic!("must call start_seek before calling poll_complete"),
+ Idle(_) => return Poll::Ready(Ok(self.pos)),
Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
self.state = Idle(Some(buf));
@@ -598,7 +588,12 @@ impl AsyncSeek for File {
self.last_write_err = Some(e.kind());
}
Operation::Write(_) => {}
- Operation::Seek(res) => return Ready(res),
+ Operation::Seek(res) => {
+ if let Ok(pos) = res {
+ self.pos = pos;
+ }
+ return Ready(res);
+ }
}
}
}
diff --git a/tokio/src/io/async_seek.rs b/tokio/src/io/async_seek.rs
index 32ed0a22..bd7a992e 100644
--- a/tokio/src/io/async_seek.rs
+++ b/tokio/src/io/async_seek.rs
@@ -23,36 +23,33 @@ pub trait AsyncSeek {
///
/// If this function returns successfully, then the job has been submitted.
/// To find out when it completes, call `poll_complete`.
- fn start_seek(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- position: SeekFrom,
- ) -> Poll<io::Result<()>>;
+ ///
+ /// # Errors
+ ///
+ /// This function can return [`io::ErrorKind::Other`] in case there is
+ /// another seek in progress. To avoid this, it is advisable that any call
+ /// to `start_seek` is preceded by a call to `poll_complete` to ensure all
+ /// pending seeks have completed.
+ fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()>;
/// Waits for a seek operation to complete.
///
/// If the seek operation completed successfully,
/// this method returns the new position from the start of the stream.
- /// That position can be used later with [`SeekFrom::Start`].
+ /// That position can be used later with [`SeekFrom::Start`]. Repeatedly
+ /// calling this function without calling `start_seek` might return the
+ /// same result.
///
/// # Errors
///
/// Seeking to a negative offset is considered an error.
- ///
- /// # Panics
- ///
- /// Calling this method without calling `start_seek` first is an error.
fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>;
}
macro_rules! deref_async_seek {
() => {
- fn start_seek(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- pos: SeekFrom,
- ) -> Poll<io::Result<()>> {
- Pin::new(&mut **self).start_seek(cx, pos)
+ fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
+ Pin::new(&mut **self).start_seek(pos)
}
fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
@@ -74,12 +71,8 @@ where
P: DerefMut + Unpin,
P::Target: AsyncSeek,
{
- fn start_seek(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- pos: SeekFrom,
- ) -> Poll<io::Result<()>> {
- self.get_mut().as_mut().start_seek(cx, pos)
+ fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
+ self.get_mut().as_mut().start_seek(pos)
}
fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
@@ -88,12 +81,8 @@ where
}
impl<T: AsRef<[u8]> + Unpin> AsyncSeek for io::Cursor<T> {
- fn start_seek(
- mut self: Pin<&mut Self>,
- _: &mut Context<'_>,
- pos: SeekFrom,
- ) -> Poll<io::Result<()>> {
- Poll::Ready(io::Seek::seek(&mut *self, pos).map(drop))
+ fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
+ io::Seek::seek(&mut *self, pos).map(drop)
}
fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<u64>> {
Poll::Ready(Ok(self.get_mut().position()))
diff --git a/tokio/src/io/seek.rs b/tokio/src/io/seek.rs
index c90330a5..e64205d9 100644
--- a/tokio/src/io/seek.rs
+++ b/tokio/src/io/seek.rs
@@ -40,14 +40,17 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
match me.pos {
- Some(pos) => match Pin::new(&mut *me.seek).start_seek(cx, *pos) {
- Poll::Ready(Ok(())) => {
- *me.pos = None;
- Pin::new(&mut *me.seek).poll_complete(cx)
+ Some(pos) => {
+ // ensure no seek in progress
+ ready!(Pin::new(&mut *me.seek).poll_complete(cx))?;
+ match Pin::new(&mut *me.seek).start_seek(*pos) {
+ Ok(()) => {
+ *me.pos = None;
+ Pin::new(&mut *me.seek).poll_complete(cx)
+ }
+ Err(e) => Poll::Ready(Err(e)),
}
- Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
- Poll::Pending => Poll::Pending,
- },
+ }
None => Pin::new(&mut *me.seek).poll_complete(cx),
}
}