diff options
author | andy finch <andyfinch7@gmail.com> | 2019-07-11 11:05:49 -0500 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2019-07-11 09:05:49 -0700 |
commit | 795e02f4c6cfb577d6393221a8424c6e39e9d37c (patch) | |
tree | 068bab124bab3ded9bc5c3a4dc4674f47eaa2656 /tokio-fs/src | |
parent | 7ac8bfc82133a19cabc917e5ab55902e3b2441df (diff) |
fs: update to use `std::future` (#1269)
Diffstat (limited to 'tokio-fs/src')
27 files changed, 353 insertions, 186 deletions
diff --git a/tokio-fs/src/create_dir.rs b/tokio-fs/src/create_dir.rs index a07c1ad5..ac784075 100644 --- a/tokio-fs/src/create_dir.rs +++ b/tokio-fs/src/create_dir.rs @@ -1,7 +1,10 @@ -use futures::{Future, Poll}; use std::fs; +use std::future::Future; use std::io; use std::path::Path; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; /// Creates a new, empty directory at the provided path /// @@ -34,10 +37,9 @@ impl<P> Future for CreateDirFuture<P> where P: AsRef<Path>, { - type Item = (); - type Error = io::Error; + type Output = io::Result<()>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { crate::blocking_io(|| fs::create_dir(&self.path)) } } diff --git a/tokio-fs/src/create_dir_all.rs b/tokio-fs/src/create_dir_all.rs index 9f034fa4..914c0b17 100644 --- a/tokio-fs/src/create_dir_all.rs +++ b/tokio-fs/src/create_dir_all.rs @@ -1,7 +1,10 @@ -use futures::{Future, Poll}; use std::fs; +use std::future::Future; use std::io; use std::path::Path; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; /// Recursively create a directory and all of its parent components if they /// are missing. @@ -35,10 +38,9 @@ impl<P> Future for CreateDirAllFuture<P> where P: AsRef<Path>, { - type Item = (); - type Error = io::Error; + type Output = io::Result<()>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { crate::blocking_io(|| fs::create_dir_all(&self.path)) } } diff --git a/tokio-fs/src/file/clone.rs b/tokio-fs/src/file/clone.rs index 1bbb6cc9..24a44306 100644 --- a/tokio-fs/src/file/clone.rs +++ b/tokio-fs/src/file/clone.rs @@ -1,6 +1,9 @@ use super::File; -use futures::{Future, Poll}; +use std::future::Future; use std::io; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; /// Future returned by `File::try_clone`. /// @@ -21,15 +24,16 @@ impl CloneFuture { } impl Future for CloneFuture { - type Item = (File, File); - type Error = (File, io::Error); + type Output = Result<(File, File), (File, io::Error)>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - self.file + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { + let inner_self = Pin::get_mut(self); + inner_self + .file .as_mut() .expect("Cannot poll `CloneFuture` after it resolves") .poll_try_clone() - .map(|inner| inner.map(|cloned| (self.file.take().unwrap(), cloned))) - .map_err(|err| (self.file.take().unwrap(), err)) + .map(|inner| inner.map(|cloned| (inner_self.file.take().unwrap(), cloned))) + .map_err(|err| (inner_self.file.take().unwrap(), err)) } } diff --git a/tokio-fs/src/file/create.rs b/tokio-fs/src/file/create.rs index da03779e..bc4661cb 100644 --- a/tokio-fs/src/file/create.rs +++ b/tokio-fs/src/file/create.rs @@ -1,8 +1,11 @@ use super::File; -use futures::{try_ready, Future, Poll}; use std::fs::File as StdFile; +use std::future::Future; use std::io; use std::path::Path; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; /// Future returned by `File::create` and resolves to a `File` instance. #[derive(Debug)] @@ -12,7 +15,7 @@ pub struct CreateFuture<P> { impl<P> CreateFuture<P> where - P: AsRef<Path> + Send + 'static, + P: AsRef<Path> + Send + Unpin + 'static, { pub(crate) fn new(path: P) -> Self { CreateFuture { path } @@ -21,15 +24,14 @@ where impl<P> Future for CreateFuture<P> where - P: AsRef<Path> + Send + 'static, + P: AsRef<Path> + Send + Unpin + 'static, { - type Item = File; - type Error = io::Error; + type Output = io::Result<File>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - let std = try_ready!(crate::blocking_io(|| StdFile::create(&self.path))); + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { + let std = ready!(crate::blocking_io(|| StdFile::create(&self.path)))?; let file = File::from_std(std); - Ok(file.into()) + Poll::Ready(Ok(file.into())) } } diff --git a/tokio-fs/src/file/metadata.rs b/tokio-fs/src/file/metadata.rs index 7a3c5ef9..a5aa4683 100644 --- a/tokio-fs/src/file/metadata.rs +++ b/tokio-fs/src/file/metadata.rs @@ -1,8 +1,11 @@ use super::File; -use futures::{try_ready, Future, Poll}; use std::fs::File as StdFile; use std::fs::Metadata; +use std::future::Future; use std::io; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; const POLL_AFTER_RESOLVE: &str = "Cannot poll MetadataFuture after it resolves"; @@ -23,13 +26,13 @@ impl MetadataFuture { } impl Future for MetadataFuture { - type Item = (File, Metadata); - type Error = io::Error; + type Output = io::Result<(File, Metadata)>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - let metadata = try_ready!(crate::blocking_io(|| StdFile::metadata(self.std()))); + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { + let inner = Pin::get_mut(self); + let metadata = ready!(crate::blocking_io(|| StdFile::metadata(inner.std())))?; - let file = self.file.take().expect(POLL_AFTER_RESOLVE); - Ok((file, metadata).into()) + let file = inner.file.take().expect(POLL_AFTER_RESOLVE); + Poll::Ready(Ok((file, metadata).into())) } } diff --git a/tokio-fs/src/file/mod.rs b/tokio-fs/src/file/mod.rs index c51bf5b5..76b51ae8 100644 --- a/tokio-fs/src/file/mod.rs +++ b/tokio-fs/src/file/mod.rs @@ -16,10 +16,12 @@ pub use self::open::OpenFuture; pub use self::open_options::OpenOptions; pub use self::seek::SeekFuture; -use futures::Poll; use std::fs::{File as StdFile, Metadata, Permissions}; use std::io::{self, Read, Seek, Write}; use std::path::Path; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; use tokio_io::{AsyncRead, AsyncWrite}; /// A reference to an open file on the filesystem. @@ -103,7 +105,7 @@ impl File { /// ``` pub fn open<P>(path: P) -> OpenFuture<P> where - P: AsRef<Path> + Send + 'static, + P: AsRef<Path> + Send + Unpin + 'static, { OpenOptions::new().read(true).open(path) } @@ -142,7 +144,7 @@ impl File { /// ``` pub fn create<P>(path: P) -> CreateFuture<P> where - P: AsRef<Path> + Send + 'static, + P: AsRef<Path> + Send + Unpin + 'static, { CreateFuture::new(path) } @@ -191,7 +193,7 @@ impl File { /// /// tokio::run(task); /// ``` - pub fn poll_seek(&mut self, pos: io::SeekFrom) -> Poll<u64, io::Error> { + pub fn poll_seek(&mut self, pos: io::SeekFrom) -> Poll<io::Result<u64>> { crate::blocking_io(|| self.std().seek(pos)) } @@ -243,7 +245,7 @@ impl File { /// /// tokio::run(task); /// ``` - pub fn poll_sync_all(&mut self) -> Poll<(), io::Error> { + pub fn poll_sync_all(&mut self) -> Poll<io::Result<()>> { crate::blocking_io(|| self.std().sync_all()) } @@ -273,7 +275,7 @@ impl File { /// /// tokio::run(task); /// ``` - pub fn poll_sync_data(&mut self) -> Poll<(), io::Error> { + pub fn poll_sync_data(&mut self) -> Poll<io::Result<()>> { crate::blocking_io(|| self.std().sync_data()) } @@ -305,7 +307,7 @@ impl File { /// /// tokio::run(task); /// ``` - pub fn poll_set_len(&mut self, size: u64) -> Poll<(), io::Error> { + pub fn poll_set_len(&mut self, size: u64) -> Poll<io::Result<()>> { crate::blocking_io(|| self.std().set_len(size)) } @@ -344,7 +346,7 @@ impl File { /// /// tokio::run(task); /// ``` - pub fn poll_metadata(&mut self) -> Poll<Metadata, io::Error> { + pub fn poll_metadata(&mut self) -> Poll<io::Result<Metadata>> { crate::blocking_io(|| self.std().metadata()) } @@ -366,7 +368,7 @@ impl File { /// /// tokio::run(task); /// ``` - pub fn poll_try_clone(&mut self) -> Poll<File, io::Error> { + pub fn poll_try_clone(&mut self) -> Poll<io::Result<File>> { crate::blocking_io(|| { let std = self.std().try_clone()?; Ok(File::from_std(std)) @@ -437,7 +439,7 @@ impl File { /// /// tokio::run(task); /// ``` - pub fn poll_set_permissions(&mut self, perm: Permissions) -> Poll<(), io::Error> { + pub fn poll_set_permissions(&mut self, perm: Permissions) -> Poll<io::Result<()>> { crate::blocking_io(|| self.std().set_permissions(perm)) } @@ -479,8 +481,15 @@ impl Read for File { } impl AsyncRead for File { - unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { - false + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + match Pin::get_mut(self).read(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Poll::Pending, + other => Poll::Ready(other), + } } } @@ -495,11 +504,26 @@ impl Write for File { } impl AsyncWrite for File { - fn shutdown(&mut self) -> Poll<(), io::Error> { - crate::blocking_io(|| { - self.std = None; - Ok(()) - }) + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + match Pin::get_mut(self).write(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Poll::Pending, + other => Poll::Ready(other), + } + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + match Pin::get_mut(self).flush() { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Poll::Pending, + other => Poll::Ready(other), + } + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + Poll::Ready(Ok(())) } } diff --git a/tokio-fs/src/file/open.rs b/tokio-fs/src/file/open.rs index b95af160..5f7915ee 100644 --- a/tokio-fs/src/file/open.rs +++ b/tokio-fs/src/file/open.rs @@ -1,19 +1,22 @@ use super::File; -use futures::{try_ready, Future, Poll}; use std::fs::OpenOptions as StdOpenOptions; +use std::future::Future; use std::io; use std::path::Path; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; /// Future returned by `File::open` and resolves to a `File` instance. #[derive(Debug)] -pub struct OpenFuture<P> { +pub struct OpenFuture<P: Unpin> { options: StdOpenOptions, path: P, } impl<P> OpenFuture<P> where - P: AsRef<Path> + Send + 'static, + P: AsRef<Path> + Send + Unpin + 'static, { pub(crate) fn new(options: StdOpenOptions, path: P) -> Self { OpenFuture { options, path } @@ -22,15 +25,14 @@ where impl<P> Future for OpenFuture<P> where - P: AsRef<Path> + Send + 'static, + P: AsRef<Path> + Send + Unpin + 'static, { - type Item = File; - type Error = io::Error; + type Output = io::Result<File>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - let std = try_ready!(crate::blocking_io(|| self.options.open(&self.path))); + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { + let std = ready!(crate::blocking_io(|| self.options.open(&self.path)))?; let file = File::from_std(std); - Ok(file.into()) + Poll::Ready(Ok(file.into())) } } diff --git a/tokio-fs/src/file/open_options.rs b/tokio-fs/src/file/open_options.rs index 70b78211..41f232e1 100644 --- a/tokio-fs/src/file/open_options.rs +++ b/tokio-fs/src/file/open_options.rs @@ -90,7 +90,7 @@ impl OpenOptions { /// [`open`]: https://doc.rust-lang.org/std/fs/struct.OpenOptions.html#method.open pub fn open<P>(&self, path: P) -> OpenFuture<P> where - P: AsRef<Path> + Send + 'static, + P: AsRef<Path> + Send + Unpin + 'static, { OpenFuture::new(self.0.clone(), path) } diff --git a/tokio-fs/src/file/seek.rs b/tokio-fs/src/file/seek.rs index 16f30e1b..be0d997d 100644 --- a/tokio-fs/src/file/seek.rs +++ b/tokio-fs/src/file/seek.rs @@ -1,6 +1,9 @@ use super::File; -use futures::{try_ready, Future, Poll}; +use std::future::Future; use std::io; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; /// Future returned by `File::seek`. #[derive(Debug)] @@ -19,16 +22,16 @@ impl SeekFuture { } impl Future for SeekFuture { - type Item = (File, u64); - type Error = io::Error; + type Output = io::Result<(File, u64)>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - let pos = try_ready!(self + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { + let inner_self = Pin::get_mut(self); + let pos = ready!(inner_self .inner .as_mut() .expect("Cannot poll `SeekFuture` after it resolves") - .poll_seek(self.pos)); - let inner = self.inner.take().unwrap(); - Ok((inner, pos).into()) + .poll_seek(inner_self.pos))?; + let inner = inner_self.inner.take().unwrap(); + Poll::Ready(Ok((inner, pos).into())) } } diff --git a/tokio-fs/src/hard_link.rs b/tokio-fs/src/hard_link.rs index 277eedbe..b4a023b3 100644 --- a/tokio-fs/src/hard_link.rs +++ b/tokio-fs/src/hard_link.rs @@ -1,7 +1,10 @@ -use futures::{Future, Poll}; use std::fs; +use std::future::Future; use std::io; use std::path::Path; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; /// Creates a new hard link on the filesystem. /// @@ -41,10 +44,9 @@ where P: AsRef<Path>, Q: AsRef<Path>, { - type Item = (); - type Error = io::Error; + type Output = io::Result<()>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { crate::blocking_io(|| fs::hard_link(&self.src, &self.dst)) } } diff --git a/tokio-fs/src/lib.rs b/tokio-fs/src/lib.rs index 89b95623..2070059d 100644 --- a/tokio-fs/src/lib.rs +++ b/tokio-fs/src/lib.rs @@ -30,6 +30,9 @@ //! [`AsyncRead`]: https://docs.rs/tokio-io/0.1/tokio_io/trait.AsyncRead.html //! [tokio-threadpool]: https://docs.rs/tokio-threadpool/0.1/tokio_threadpool +#[macro_use] +extern crate tokio_futures; + mod create_dir; mod create_dir_all; pub mod file; @@ -68,20 +71,19 @@ pub use crate::stdout::{stdout, Stdout}; pub use crate::symlink_metadata::{symlink_metadata, SymlinkMetadataFuture}; pub use crate::write::{write, WriteFile}; -use futures::Async::*; -use futures::Poll; use std::io; use std::io::ErrorKind::{Other, WouldBlock}; +use std::task::Poll; +use std::task::Poll::*; -fn blocking_io<F, T>(f: F) -> Poll<T, io::Error> +fn blocking_io<F, T>(f: F) -> Poll<io::Result<T>> where F: FnOnce() -> io::Result<T>, { match tokio_threadpool::blocking(f) { - Ok(Ready(Ok(v))) => Ok(v.into()), - Ok(Ready(Err(err))) => Err(err), - Ok(NotReady) => Ok(NotReady), - Err(_) => Err(blocking_err()), + Ready(Ok(v)) => Ready(v), + Ready(Err(_)) => Ready(Err(blocking_err())), + Pending => Pending, } } @@ -90,13 +92,13 @@ where F: FnOnce() -> io::Result<T>, { match tokio_threadpool::blocking(f) { - Ok(Ready(Ok(v))) => Ok(v), - Ok(Ready(Err(err))) => { + Ready(Ok(Ok(v))) => Ok(v), + Ready(Ok(Err(err))) => { debug_assert_ne!(err.kind(), WouldBlock); Err(err) } - Ok(NotReady) => Err(WouldBlock.into()), - Err(_) => Err(blocking_err()), + Ready(Err(_)) => Err(blocking_err()), + Pending => Err(blocking_err()), } } diff --git a/tokio-fs/src/metadata.rs b/tokio-fs/src/metadata.rs index ece1e05a..11a25249 100644 --- a/tokio-fs/src/metadata.rs +++ b/tokio-fs/src/metadata.rs @@ -1,8 +1,11 @@ use super::blocking_io; -use futures::{Future, Poll}; use std::fs::{self, Metadata}; +use std::future::Future; use std::io; use std::path::Path; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; /// Queries the file system metadata for a path. pub fn metadata<P>(path: P) -> MetadataFuture<P> @@ -34,10 +37,9 @@ impl<P> Future for MetadataFuture<P> where P: AsRef<Path> + Send + 'static, { - type Item = Metadata; - type Error = io::Error; + type Output = io::Result<Metadata>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { blocking_io(|| fs::metadata(&self.path)) } } diff --git a/tokio-fs/src/os/unix.rs b/tokio-fs/src/os/unix.rs index 84b62c8f..b6f8e0be 100644 --- a/tokio-fs/src/os/unix.rs +++ b/tokio-fs/src/os/unix.rs @@ -1,9 +1,12 @@ //! Unix-specific extensions to primitives in the `tokio_fs` module. -use futures::{Future, Poll}; +use std::future::Future; use std::io; use std::os::unix::fs; use std::path::Path; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; /// Creates a new symbolic link on the filesystem. /// @@ -42,10 +45,9 @@ where P: AsRef<Path>, Q: AsRef<Path>, { - type Item = (); - type Error = io::Error; + type Output = io::Result<()>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { crate::blocking_io(|| fs::symlink(&self.src, &self.dst)) } } diff --git a/tokio-fs/src/os/windows/symlink_dir.rs b/tokio-fs/src/os/windows/symlink_dir.rs index 90f4a04f..e322deb9 100644 --- a/tokio-fs/src/os/windows/symlink_dir.rs +++ b/tokio-fs/src/os/windows/symlink_dir.rs @@ -1,7 +1,10 @@ -use futures::{Future, Poll}; +use std::future::Future; use std::io; use std::os::windows::fs; use std::path::Path; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; /// Creates a new directory symlink on the filesystem. /// @@ -41,10 +44,9 @@ where P: AsRef<Path>, Q: AsRef<Path>, { - type Item = (); - type Error = io::Error; + type Output = io::Result<()>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { crate::blocking_io(|| fs::symlink_dir(&self.src, &self.dst)) } } diff --git a/tokio-fs/src/os/windows/symlink_file.rs b/tokio-fs/src/os/windows/symlink_file.rs index eb7c9d04..afd6b229 100644 --- a/tokio-fs/src/os/windows/symlink_file.rs +++ b/tokio-fs/src/os/windows/symlink_file.rs @@ -1,7 +1,10 @@ -use futures::{Future, Poll}; +use std::future::Future; use std::io; use std::os::windows::fs; use std::path::Path; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; /// Creates a new file symbolic link on the filesystem. /// @@ -41,10 +44,9 @@ where P: AsRef<Path>, Q: AsRef<Path>, { - type Item = (); - type Error = io::Error; + type Output = io::Result<()>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { crate::blocking_io(|| fs::symlink_file(&self.src, &self.dst)) } } diff --git a/tokio-fs/src/read.rs b/tokio-fs/src/read.rs index fbcce035..cfd0afe3 100644 --- a/tokio-fs/src/read.rs +++ b/tokio-fs/src/read.rs @@ -1,7 +1,11 @@ use crate::{file, File}; -use futures::{try_ready, Async, Future, Poll}; +use std::future::Future; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; use std::{io, mem, path::Path}; use tokio_io; +use tokio_io::AsyncRead; /// Creates a future which will open a file for reading and read the entire /// contents into a buffer and return said buffer. @@ -25,7 +29,7 @@ use tokio_io; /// ``` pub fn read<P>(path: P) -> ReadFile<P> where - P: AsRef<Path> + Send + 'static, + P: AsRef<Path> + Send + Unpin + 'static, { ReadFile { state: State::Open(File::open(path)), @@ -34,41 +38,50 @@ where /// A future used to open a file and read its entire contents into a buffer. #[derive(Debug)] -pub struct ReadFile<P: AsRef<Path> + Send + 'static> { +pub struct ReadFile<P: AsRef<Path> + Send + Unpin + 'static> { state: State<P>, } #[derive(Debug)] -enum State<P: AsRef<Path> + Send + 'static> { +enum State<P: AsRef<Path> + Send + Unpin + 'static> { Open(file::OpenFuture<P>), Metadata(file::MetadataFuture), - Read(tokio_io::io::ReadToEnd<File>), + Reading(Vec<u8>, usize, File), + Empty, } -impl<P: AsRef<Path> + Send + 'static> Future for ReadFile<P> { - type Item = Vec<u8>; - type Error = io::Error; +impl<P: AsRef<Path> + Send + Unpin + 'static> Future for ReadFile<P> { + type Output = io::Result<Vec<u8>>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - let new_state = match &mut self.state { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let inner = Pin::get_mut(self); + match &mut inner.state { State::Open(ref mut open_file) => { - let file = try_ready!(open_file.poll()); - State::Metadata(file.metadata()) + let file = ready!(Pin::new(open_file).poll(cx))?; + let new_state = State::Metadata(file.metadata()); + mem::replace(&mut inner.state, new_state); + Pin::new(inner).poll(cx) } State::Metadata(read_metadata) => { - let (file, metadata) = try_ready!(read_metadata.poll()); + let (file, metadata) = ready!(Pin::new(read_metadata).poll(cx))?; let buf = Vec::with_capacity(metadata.len() as usize + 1); - let read = tokio_io::io::read_to_end(file, buf); - State::Read(read) + let new_state = State::Reading(buf, 0, file); + mem::replace(&mut inner.state, new_state); + Pin::new(inner).poll(cx) } - State::Read(ref mut read) => { - let (_, buf) = try_ready!(read.poll()); - return Ok(Async::Ready(buf)); + State::Reading(buf, ref mut pos, file) => { + let n = ready!(Pin::new(file).poll_read_buf(cx, buf))?; + *pos += n; + if *pos >= buf.len() { + match mem::replace(&mut inner.state, State::Empty) { + State::Reading(buf, _, _) => Poll::Ready(Ok(buf)), + _ => panic!(), + } + } else { + Poll::Pending + } } - }; - - mem::replace(&mut self.state, new_state); - // Getting here means we transitionsed state. Must poll the new state. - self.poll() + State::Empty => panic!("poll a WriteFile after it's done"), + } } } diff --git a/tokio-fs/src/read_dir.rs b/tokio-fs/src/read_dir.rs index 855e2332..01e36d01 100644 --- a/tokio-fs/src/read_dir.rs +++ b/tokio-fs/src/read_dir.rs @@ -1,10 +1,14 @@ -use futures::{Future, Poll, Stream}; +use futures_core::stream::Stream; use std::ffi::OsString; use std::fs::{self, DirEntry as StdDirEntry, FileType, Metadata, ReadDir as StdReadDir}; +use std::future::Future; use std::io; #[cfg(unix)] use std::os::unix::fs::DirEntryExt; use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; /// Returns a stream over the entries within a directory. /// @@ -40,10 +44,9 @@ impl<P> Future for ReadDirFuture<P> where P: AsRef<Path> + Send + 'static, { - type Item = ReadDir; - type Error = io::Error; + type Output = io::Result<ReadDir>; - fn poll(&mut self) -> Poll<Self::Item, io::Error> { + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { crate::blocking_io(|| Ok(ReadDir(fs::read_dir(&self.path)?))) } } @@ -68,15 +71,19 @@ where pub struct ReadDir(StdReadDir); impl Stream for ReadDir { - type Item = DirEntry; - type Error = io::Error; + type Item = io::Result<DirEntry>; - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - crate::blocking_io(|| match self.0.next() { + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let inner = Pin::get_mut(self); + match crate::blocking_io(|| match inner.0.next() { Some(Err(err)) => Err(err), - Some(Ok(item)) => Ok(Some(DirEntry(item))), + Some(Ok(item)) => Ok(Some(Ok(DirEntry(item)))), None => Ok(None), - }) + }) { + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + Poll::Ready(Ok(v)) => Poll::Ready(v), + Poll::Pending => Poll::Pending, + } } } @@ -181,7 +188,7 @@ impl DirEntry { /// /// tokio::run(fut); /// ``` - pub fn poll_metadata(&self) -> Poll<Metadata, io::Error> { + pub fn poll_metadata(&self) -> Poll<io::Result<Metadata>> { crate::blocking_io(|| self.0.metadata()) } @@ -213,7 +220,7 @@ impl DirEntry { /// /// tokio::run(fut); /// ``` - pub fn poll_file_type(&self) -> Poll<FileType, io::Error> { + pub fn poll_file_type(&self) -> Poll<io::Result<FileType>> { crate::blocking_io(|| self.0.file_type()) } } diff --git a/tokio-fs/src/read_link.rs b/tokio-fs/src/read_link.rs index a672c97d..d5b9fe2d 100644 --- a/tokio-fs/src/read_link.rs +++ b/tokio-fs/src/read_link.rs @@ -1,7 +1,10 @@ -use futures |