diff options
author | Zahari Dichev <zaharidichev@gmail.com> | 2020-10-08 11:56:01 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-08 10:56:01 +0200 |
commit | 43bd11bf2fa4eaee84383ddbe4c750868f1bb684 (patch) | |
tree | 29ab12a3ec77ff0dfc71c6f678a87516c5801456 | |
parent | d94ab62c54ac594f912a116965fecbdfb5e1d3e6 (diff) |
io: remove Poll from the AsyncSeek::start_seek return value (#2885)
-rw-r--r-- | tokio-util/src/either.rs | 8 | ||||
-rw-r--r-- | tokio/src/fs/file.rs | 45 | ||||
-rw-r--r-- | tokio/src/io/async_seek.rs | 45 | ||||
-rw-r--r-- | tokio/src/io/seek.rs | 17 |
4 files changed, 49 insertions, 66 deletions
diff --git a/tokio-util/src/either.rs b/tokio-util/src/either.rs index 3c749c1b..f5246af2 100644 --- a/tokio-util/src/either.rs +++ b/tokio-util/src/either.rs @@ -125,12 +125,8 @@ where L: AsyncSeek, R: AsyncSeek, { - fn start_seek( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - position: SeekFrom, - ) -> Poll<Result<()>> { - delegate_call!(self.start_seek(cx, position)) + fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> Result<()> { + delegate_call!(self.start_seek(position)) } fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<u64>> { diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 319c2c7f..9556a22f 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -86,6 +86,8 @@ pub struct File { /// error is observed while performing a read, it is saved until the next /// write / flush call. last_write_err: Option<io::ErrorKind>, + + pos: u64, } #[derive(Debug)] @@ -199,6 +201,7 @@ impl File { std: Arc::new(std), state: State::Idle(Some(Buf::with_capacity(0))), last_write_err: None, + pos: 0, } } @@ -332,7 +335,9 @@ impl File { self.state = Idle(Some(buf)); match op { - Operation::Seek(res) => res.map(|_| ()), + Operation::Seek(res) => res.map(|pos| { + self.pos = pos; + }), _ => unreachable!(), } } @@ -524,9 +529,12 @@ impl AsyncRead for File { self.last_write_err = Some(e.kind()); self.state = Idle(Some(buf)); } - Operation::Seek(_) => { + Operation::Seek(result) => { assert!(buf.is_empty()); self.state = Idle(Some(buf)); + if let Ok(pos) = result { + self.pos = pos; + } continue; } } @@ -537,13 +545,10 @@ impl AsyncRead for File { } impl AsyncSeek for File { - fn start_seek( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - mut pos: SeekFrom, - ) -> Poll<io::Result<()>> { + fn start_seek(mut self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> { loop { match self.state { + Busy(_) => panic!("must wait for poll_complete before calling start_seek"), Idle(ref mut buf_cell) => { let mut buf = buf_cell.take().unwrap(); @@ -562,22 +567,7 @@ impl AsyncSeek for File { let res = (&*std).seek(pos); (Operation::Seek(res), buf) })); - - return Ready(Ok(())); - } - Busy(ref mut rx) => { - let (op, buf) = ready!(Pin::new(rx).poll(cx))?; - self.state = Idle(Some(buf)); - - match op { - Operation::Read(_) => {} - Operation::Write(Err(e)) => { - assert!(self.last_write_err.is_none()); - self.last_write_err = Some(e.kind()); - } - Operation::Write(_) => {} - Operation::Seek(_) => {} - } + return Ok(()); } } } @@ -586,7 +576,7 @@ impl AsyncSeek for File { fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { loop { match self.state { - Idle(_) => panic!("must call start_seek before calling poll_complete"), + Idle(_) => return Poll::Ready(Ok(self.pos)), Busy(ref mut rx) => { let (op, buf) = ready!(Pin::new(rx).poll(cx))?; self.state = Idle(Some(buf)); @@ -598,7 +588,12 @@ impl AsyncSeek for File { self.last_write_err = Some(e.kind()); } Operation::Write(_) => {} - Operation::Seek(res) => return Ready(res), + Operation::Seek(res) => { + if let Ok(pos) = res { + self.pos = pos; + } + return Ready(res); + } } } } diff --git a/tokio/src/io/async_seek.rs b/tokio/src/io/async_seek.rs index 32ed0a22..bd7a992e 100644 --- a/tokio/src/io/async_seek.rs +++ b/tokio/src/io/async_seek.rs @@ -23,36 +23,33 @@ pub trait AsyncSeek { /// /// If this function returns successfully, then the job has been submitted. /// To find out when it completes, call `poll_complete`. - fn start_seek( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - position: SeekFrom, - ) -> Poll<io::Result<()>>; + /// + /// # Errors + /// + /// This function can return [`io::ErrorKind::Other`] in case there is + /// another seek in progress. To avoid this, it is advisable that any call + /// to `start_seek` is preceded by a call to `poll_complete` to ensure all + /// pending seeks have completed. + fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()>; /// Waits for a seek operation to complete. /// /// If the seek operation completed successfully, /// this method returns the new position from the start of the stream. - /// That position can be used later with [`SeekFrom::Start`]. + /// That position can be used later with [`SeekFrom::Start`]. Repeatedly + /// calling this function without calling `start_seek` might return the + /// same result. /// /// # Errors /// /// Seeking to a negative offset is considered an error. - /// - /// # Panics - /// - /// Calling this method without calling `start_seek` first is an error. fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>; } macro_rules! deref_async_seek { () => { - fn start_seek( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - pos: SeekFrom, - ) -> Poll<io::Result<()>> { - Pin::new(&mut **self).start_seek(cx, pos) + fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { + Pin::new(&mut **self).start_seek(pos) } fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { @@ -74,12 +71,8 @@ where P: DerefMut + Unpin, P::Target: AsyncSeek, { - fn start_seek( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - pos: SeekFrom, - ) -> Poll<io::Result<()>> { - self.get_mut().as_mut().start_seek(cx, pos) + fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { + self.get_mut().as_mut().start_seek(pos) } fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { @@ -88,12 +81,8 @@ where } impl<T: AsRef<[u8]> + Unpin> AsyncSeek for io::Cursor<T> { - fn start_seek( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - pos: SeekFrom, - ) -> Poll<io::Result<()>> { - Poll::Ready(io::Seek::seek(&mut *self, pos).map(drop)) + fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { + io::Seek::seek(&mut *self, pos).map(drop) } fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<u64>> { Poll::Ready(Ok(self.get_mut().position())) diff --git a/tokio/src/io/seek.rs b/tokio/src/io/seek.rs index c90330a5..e64205d9 100644 --- a/tokio/src/io/seek.rs +++ b/tokio/src/io/seek.rs @@ -40,14 +40,17 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let me = self.project(); match me.pos { - Some(pos) => match Pin::new(&mut *me.seek).start_seek(cx, *pos) { - Poll::Ready(Ok(())) => { - *me.pos = None; - Pin::new(&mut *me.seek).poll_complete(cx) + Some(pos) => { + // ensure no seek in progress + ready!(Pin::new(&mut *me.seek).poll_complete(cx))?; + match Pin::new(&mut *me.seek).start_seek(*pos) { + Ok(()) => { + *me.pos = None; + Pin::new(&mut *me.seek).poll_complete(cx) + } + Err(e) => Poll::Ready(Err(e)), } - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - Poll::Pending => Poll::Pending, - }, + } None => Pin::new(&mut *me.seek).poll_complete(cx), } } |