diff options
author | Carl Lerche <me@carllerche.com> | 2019-10-21 15:49:00 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-21 15:49:00 -0700 |
commit | 978013a215ebae63cd087139514de32bbd36ce11 (patch) | |
tree | dcf43cf2ac044ec9031a79901aa6956351c27ee4 /tokio/src | |
parent | 6aa6ebb5bce7b2b8c5b81814b6ea47994f0f54d9 (diff) |
fs: move into `tokio` (#1672)
A step towards collapsing Tokio sub crates into a single `tokio`
crate (#1318).
The `fs` implementation is now provided by the main `tokio` crate. The
`fs` functionality may still be excluded from the build by skipping the
`fs` feature flag.
Diffstat (limited to 'tokio/src')
30 files changed, 1832 insertions, 16 deletions
diff --git a/tokio/src/fs.rs b/tokio/src/fs.rs deleted file mode 100644 index cf902f8e..00000000 --- a/tokio/src/fs.rs +++ /dev/null @@ -1,14 +0,0 @@ -//! Asynchronous filesystem manipulation operations. -//! -//! This module contains basic methods and types for manipulating the contents -//! of the local filesystem from within the context of the Tokio runtime. -//! -//! Unlike *most* other Tokio APIs, the filesystem APIs **must** be used from -//! the context of the Tokio runtime as they require Tokio specific features to -//! function. - -pub use tokio_fs::{ - create_dir, create_dir_all, hard_link, metadata, os, read, read_dir, read_link, read_to_string, - remove_dir, remove_dir_all, remove_file, rename, set_permissions, symlink_metadata, write, - File, OpenOptions, -}; diff --git a/tokio/src/fs/blocking.rs b/tokio/src/fs/blocking.rs new file mode 100644 index 00000000..71b627c1 --- /dev/null +++ b/tokio/src/fs/blocking.rs @@ -0,0 +1,275 @@ +use crate::fs::sys; + +use tokio_io::{AsyncRead, AsyncWrite}; + +use futures_core::ready; +use std::cmp; +use std::future::Future; +use std::io; +use std::io::prelude::*; +use std::pin::Pin; +use std::task::Poll::*; +use std::task::{Context, Poll}; + +use self::State::*; + +/// `T` should not implement _both_ Read and Write. +#[derive(Debug)] +pub(crate) struct Blocking<T> { + inner: Option<T>, + state: State<T>, + /// true if the lower IO layer needs flushing + need_flush: bool, +} + +#[derive(Debug)] +pub(crate) struct Buf { + buf: Vec<u8>, + pos: usize, +} + +pub(crate) const MAX_BUF: usize = 16 * 1024; + +#[derive(Debug)] +enum State<T> { + Idle(Option<Buf>), + Busy(sys::Blocking<(io::Result<usize>, Buf, T)>), +} + +impl<T> Blocking<T> { + pub(crate) fn new(inner: T) -> Blocking<T> { + Blocking { + inner: Some(inner), + state: State::Idle(Some(Buf::with_capacity(0))), + need_flush: false, + } + } +} + +impl<T> AsyncRead for Blocking<T> +where + T: Read + Unpin + Send + 'static, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + dst: &mut [u8], + ) -> Poll<io::Result<usize>> { + loop { + match self.state { + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap(); + + if !buf.is_empty() { + let n = buf.copy_to(dst); + *buf_cell = Some(buf); + return Ready(Ok(n)); + } + + buf.ensure_capacity_for(dst); + let mut inner = self.inner.take().unwrap(); + + self.state = Busy(sys::run(move || { + let res = buf.read_from(&mut inner); + (res, buf, inner) + })); + } + Busy(ref mut rx) => { + let (res, mut buf, inner) = ready!(Pin::new(rx).poll(cx)); + self.inner = Some(inner); + + match res { + Ok(_) => { + let n = buf.copy_to(dst); + self.state = Idle(Some(buf)); + return Ready(Ok(n)); + } + Err(e) => { + assert!(buf.is_empty()); + + self.state = Idle(Some(buf)); + return Ready(Err(e)); + } + } + } + } + } + } +} + +impl<T> AsyncWrite for Blocking<T> +where + T: Write + Unpin + Send + 'static, +{ + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + src: &[u8], + ) -> Poll<io::Result<usize>> { + loop { + match self.state { + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap(); + + assert!(buf.is_empty()); + + let n = buf.copy_from(src); + let mut inner = self.inner.take().unwrap(); + + self.state = Busy(sys::run(move || { + let n = buf.len(); + let res = buf.write_to(&mut inner).map(|_| n); + + (res, buf, inner) + })); + self.need_flush = true; + + return Ready(Ok(n)); + } + Busy(ref mut rx) => { + let (res, buf, inner) = ready!(Pin::new(rx).poll(cx)); + self.state = Idle(Some(buf)); + self.inner = Some(inner); + + // If error, return + res?; + } + } + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + loop { + let need_flush = self.need_flush; + match self.state { + // The buffer is not used here + Idle(ref mut buf_cell) => { + if need_flush { + let buf = buf_cell.take().unwrap(); + let mut inner = self.inner.take().unwrap(); + + self.state = Busy(sys::run(move || { + let res = inner.flush().map(|_| 0); + (res, buf, inner) + })); + + self.need_flush = false; + } else { + return Ready(Ok(())); + } + } + Busy(ref mut rx) => { + let (res, buf, inner) = ready!(Pin::new(rx).poll(cx)); + self.state = Idle(Some(buf)); + self.inner = Some(inner); + + // If error, return + res?; + } + } + } + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + Poll::Ready(Ok(())) + } +} + +/// Repeates operations that are interrupted +macro_rules! uninterruptibly { + ($e:expr) => {{ + loop { + match $e { + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + res => break res, + } + } + }}; +} + +impl Buf { + pub(crate) fn with_capacity(n: usize) -> Buf { + Buf { + buf: Vec::with_capacity(n), + pos: 0, + } + } + + pub(crate) fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub(crate) fn len(&self) -> usize { + self.buf.len() - self.pos + } + + pub(crate) fn copy_to(&mut self, dst: &mut [u8]) -> usize { + let n = cmp::min(self.len(), dst.len()); + dst[..n].copy_from_slice(&self.bytes()[..n]); + self.pos += n; + + if self.pos == self.buf.len() { + self.buf.truncate(0); + self.pos = 0; + } + + n + } + + pub(crate) fn copy_from(&mut self, src: &[u8]) -> usize { + assert!(self.is_empty()); + + let n = cmp::min(src.len(), MAX_BUF); + + self.buf.extend_from_slice(&src[..n]); + n + } + + pub(crate) fn bytes(&self) -> &[u8] { + &self.buf[self.pos..] + } + + pub(crate) fn ensure_capacity_for(&mut self, bytes: &[u8]) { + assert!(self.is_empty()); + + let len = cmp::min(bytes.len(), MAX_BUF); + + if self.buf.len() < len { + self.buf.reserve(len - self.buf.len()); + } + + unsafe { + self.buf.set_len(len); + } + } + + pub(crate) fn read_from<T: Read>(&mut self, rd: &mut T) -> io::Result<usize> { + let res = uninterruptibly!(rd.read(&mut self.buf)); + + if let Ok(n) = res { + self.buf.truncate(n); + } else { + self.buf.clear(); + } + + assert_eq!(self.pos, 0); + + res + } + + pub(crate) fn write_to<T: Write>(&mut self, wr: &mut T) -> io::Result<()> { + assert_eq!(self.pos, 0); + + // `write_all` already ignores interrupts + let res = wr.write_all(&self.buf); + self.buf.clear(); + res + } + + pub(crate) fn discard_read(&mut self) -> i64 { + let ret = -(self.bytes().len() as i64); + self.pos = 0; + self.buf.truncate(0); + ret + } +} diff --git a/tokio/src/fs/create_dir.rs b/tokio/src/fs/create_dir.rs new file mode 100644 index 00000000..a74ca71d --- /dev/null +++ b/tokio/src/fs/create_dir.rs @@ -0,0 +1,14 @@ +use crate::fs::asyncify; + +use std::io; +use std::path::Path; + +/// Creates a new, empty directory at the provided path +/// +/// This is an async version of [`std::fs::create_dir`][std] +/// +/// [std]: https://doc.rust-lang.org/std/fs/fn.create_dir.html +pub async fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> { + let path = path.as_ref().to_owned(); + asyncify(move || std::fs::create_dir(path)).await +} diff --git a/tokio/src/fs/create_dir_all.rs b/tokio/src/fs/create_dir_all.rs new file mode 100644 index 00000000..2a7374a3 --- /dev/null +++ b/tokio/src/fs/create_dir_all.rs @@ -0,0 +1,15 @@ +use crate::fs::asyncify; + +use std::io; +use std::path::Path; + +/// Recursively create a directory and all of its parent components if they +/// are missing. +/// +/// This is an async version of [`std::fs::create_dir_all`][std] +/// +/// [std]: https://doc.rust-lang.org/std/fs/fn.create_dir_all.html +pub async fn create_dir_all<P: AsRef<Path>>(path: P) -> io::Result<()> { + let path = path.as_ref().to_owned(); + asyncify(move || std::fs::create_dir_all(path)).await +} diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs new file mode 100644 index 00000000..7c3a6948 --- /dev/null +++ b/tokio/src/fs/file.rs @@ -0,0 +1,610 @@ +//! Types for working with [`File`]. +//! +//! [`File`]: file/struct.File.html + +use self::State::*; +use crate::fs::blocking::Buf; +use crate::fs::{asyncify, sys}; + +use tokio_io::{AsyncRead, AsyncWrite}; + +use futures_core::ready; +use std::fmt; +use std::fs::{Metadata, Permissions}; +use std::future::Future; +use std::io::{self, Seek, SeekFrom}; +use std::path::Path; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; +use std::task::Poll::*; + +/// A reference to an open file on the filesystem. +/// +/// This is a specialized version of [`std::fs::File`][std] for usage from the +/// Tokio runtime. +/// +/// An instance of a `File` can be read and/or written depending on what options +/// it was opened with. Files also implement Seek to alter the logical cursor +/// that the file contains internally. +/// +/// Files are automatically closed when they go out of scope. +/// +/// [std]: https://doc.rust-lang.org/std/fs/struct.File.html +/// +/// # Examples +/// +/// Create a new file and asynchronously write bytes to it: +/// +/// ```no_run +/// use tokio::fs::File; +/// use tokio::prelude::*; +/// +/// # async fn dox() -> std::io::Result<()> { +/// let mut file = File::create("foo.txt").await?; +/// file.write_all(b"hello, world!").await?; +/// # Ok(()) +/// # } +/// ``` +/// +/// Read the contents of a file into a buffer +/// +/// ```no_run +/// use tokio::fs::File; +/// use tokio::prelude::*; +/// +/// # async fn dox() -> std::io::Result<()> { +/// let mut file = File::open("foo.txt").await?; +/// +/// let mut contents = vec![]; +/// file.read_to_end(&mut contents).await?; +/// +/// println!("len = {}", contents.len()); +/// # Ok(()) +/// # } +/// ``` +pub struct File { + std: Arc<sys::File>, + state: State, + + /// Errors from writes/flushes are returned in write/flush calls. If a write + /// error is observed while performing a read, it is saved until the next + /// write / flush call. + last_write_err: Option<io::ErrorKind>, +} + +#[derive(Debug)] +enum State { + Idle(Option<Buf>), + Busy(sys::Blocking<(Operation, Buf)>), +} + +#[derive(Debug)] +enum Operation { + Read(io::Result<usize>), + Write(io::Result<()>), + Seek(io::Result<u64>), +} + +impl File { + /// Attempts to open a file in read-only mode. + /// + /// See [`OpenOptions`] for more details. + /// + /// [`OpenOptions`]: struct.OpenOptions.html + /// + /// # Errors + /// + /// This function will return an error if called from outside of the Tokio + /// runtime or if path does not already exist. Other errors may also be + /// returned according to OpenOptions::open. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::prelude::*; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::open("foo.txt").await?; + /// + /// let mut contents = vec![]; + /// file.read_to_end(&mut contents).await?; + /// + /// println!("len = {}", contents.len()); + /// # Ok(()) + /// # } + /// ``` + pub async fn open<P>(path: P) -> io::Result<File> + where + P: AsRef<Path>, + { + let path = path.as_ref().to_owned(); + let std = asyncify(|| sys::File::open(path)).await?; + + Ok(File::from_std(std)) + } + + /// Opens a file in write-only mode. + /// + /// This function will create a file if it does not exist, and will truncate + /// it if it does. + /// + /// See [`OpenOptions`] for more details. + /// + /// [`OpenOptions`]: struct.OpenOptions.html + /// + /// # Errors + /// + /// Results in an error if called from outside of the Tokio runtime or if + /// the underlying [`create`] call results in an error. + /// + /// [`create`]: https://doc.rust-lang.org/std/fs/struct.File.html#method.create + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::prelude::*; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::create("foo.txt").await?; + /// file.write_all(b"hello, world!").await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn create<P>(path: P) -> io::Result<File> + where + P: AsRef<Path>, + { + let path = path.as_ref().to_owned(); + let std_file = asyncify(move || sys::File::create(path)).await?; + Ok(File::from_std(std_file)) + } + + /// Convert a [`std::fs::File`][std] to a [`tokio_fs::File`][file]. + /// + /// [std]: https://doc.rust-lang.org/std/fs/struct.File.html + /// [file]: struct.File.html + /// + /// # Examples + /// + /// ```no_run + /// // This line could block. It is not recommended to do this on the Tokio + /// // runtime. + /// let std_file = std::fs::File::open("foo.txt").unwrap(); + /// let file = tokio::fs::File::from_std(std_file); + /// ``` + pub fn from_std(std: sys::File) -> File { + File { + std: Arc::new(std), + state: State::Idle(Some(Buf::with_capacity(0))), + last_write_err: None, + } + } + + /// Seek to an offset, in bytes, in a stream. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::prelude::*; + /// + /// use std::io::SeekFrom; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::open("foo.txt").await?; + /// file.seek(SeekFrom::Start(6)).await?; + /// + /// let mut contents = vec![0u8; 10]; + /// file.read_exact(&mut contents).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn seek(&mut self, mut pos: SeekFrom) -> io::Result<u64> { + self.complete_inflight().await; + + let mut buf = match self.state { + Idle(ref mut buf_cell) => buf_cell.take().unwrap(), + _ => unreachable!(), + }; + + // Factor in any unread data from the buf + if !buf.is_empty() { + let n = buf.discard_read(); + + if let SeekFrom::Current(ref mut offset) = pos { + *offset += n; + } + } + + let std = self.std.clone(); + + // Start the operation + self.state = Busy(sys::run(move || { + let res = (&*std).seek(pos); + (Operation::Seek(res), buf) + })); + + let (op, buf) = match self.state { + Idle(_) => unreachable!(), + Busy(ref mut rx) => rx.await, + }; + + self.state = Idle(Some(buf)); + + match op { + Operation::Seek(res) => res, + _ => unreachable!(), + } + } + + /// Attempts to sync all OS-internal metadata to disk. + /// + /// This function will attempt to ensure that all in-core data reaches the + /// filesystem before returning. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::prelude::*; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::create("foo.txt").await?; + /// file.write_all(b"hello, world!").await?; + /// file.sync_all().await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn sync_all(&mut self) -> io::Result<()> { + self.complete_inflight().await; + + let std = self.std.clone(); + asyncify(move || std.sync_all()).await + } + + /// This function is similar to `sync_all`, except that it may not + /// synchronize file metadata to the filesystem. + /// + /// This is intended for use cases that must synchronize content, but don't + /// need the metadata on disk. The goal of this method is to reduce disk + /// operations. + /// + /// Note that some platforms may simply implement this in terms of `sync_all`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::prelude::*; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::create("foo.txt").await?; + /// file.write_all(b"hello, world!").await?; + /// file.sync_data().await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn sync_data(&mut self) -> io::Result<()> { + self.complete_inflight().await; + + let std = self.std.clone(); + asyncify(move || std.sync_data()).await + } + + /// Truncates or extends the underlying file, updating the size of this file to become size. + /// + /// If the size is less than the current file's size, then the file will be + /// shrunk. If it is greater than the current file's size, then the file + /// will be extended to size and have all of the intermediate data filled in + /// with 0s. + /// + /// # Errors + /// + /// This function will return an error if the file is not opened for + /// writing. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::prelude::*; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::create("foo.txt").await?; + /// file.write_all(b"hello, world!").await?; + /// file.set_len(10).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn set_len(&mut self, size: u64) -> io::Result<()> { + self.complete_inflight().await; + + let mut buf = match self.state { + Idle(ref mut buf_cell) => buf_cell.take().unwrap(), + _ => unreachable!(), + }; + + let seek = if !buf.is_empty() { + Some(SeekFrom::Current(buf.discard_read())) + } else { + None + }; + + let std = self.std.clone(); + + self.state = Busy(sys::run(move || { + let res = if let Some(seek) = seek { + (&*std).seek(seek).and_then(|_| std.set_len(size)) + } else { + std.set_len(size) + } + .map(|_| 0); // the value is discarded later + + // Return the result as a seek + (Operation::Seek(res), buf) + })); + + let (op, buf) = match self.state { + Idle(_) => unreachable!(), + Busy(ref mut rx) => rx.await, + }; + + self.state = Idle(Some(buf)); + + match op { + Operation::Seek(res) => res.map(|_| ()), + _ => unreachable!(), + } + } + + /// Queries metadata about the underlying file. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let file = File::open("foo.txt").await?; + /// let metadata = file.metadata().await?; + /// + /// println!("{:?}", metadata); + /// # Ok(()) + /// # } + /// ``` + pub async fn metadata(&self) -> io::Result<Metadata> { + let std = self.std.clone(); + asyncify(move || std.metadata()).await + } + + /// Create a new `File` instance that shares the same underlying file handle + /// as the existing `File` instance. Reads, writes, and seeks will affect both + /// File instances simultaneously. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let file = File::open("foo.txt").await?; + /// let file_clone = file.try_clone().await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn try_clone(&self) -> io::Result<File> { + let std = self.std.clone(); + let std_file = asyncify(move || std.try_clone()).await?; + Ok(File::from_std(std_file)) + } + + /// Changes the permissions on the underlying file. + /// + /// # Platform-specific behavior + /// + /// This function currently corresponds to the `fchmod` function on Unix and + /// the `SetFileInformationByHandle` function on Windows. Note that, this + /// [may change in the future][changes]. + /// + /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior + /// + /// # Errors + /// + /// This function will return an error if the user lacks permission change + /// attributes on the underlying file. It may also return an error in other + /// os-specific unspecified cases. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let file = File::open("foo.txt").await?; + /// let mut perms = file.metadata().await?.permissions(); + /// perms.set_readonly(true); + /// file.set_permissions(perms).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> { + let std = self.std.clone(); + asyncify(move || std.set_permissions(perm)).await + } + + async fn complete_inflight(&mut self) { + use futures_util::future::poll_fn; + + if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await { + self.last_write_err = Some(e.kind()); + } + } +} + +impl AsyncRead for File { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + dst: &mut [u8], + ) -> Poll<io::Result<usize>> { + loop { + match self.state { + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap(); + + if !buf.is_empty() { + let n = buf.copy_to(dst); + *buf_cell = Some(buf); + return Ready(Ok(n)); + } + + buf.ensure_capacity_for(dst); + let std = self.std.clone(); + + self.state = Busy(sys::run(move || { + let res = buf.read_from(&mut &*std); + (Operation::Read(res), buf) + })); + } + Busy(ref mut rx) => { + let (op, mut buf) = ready!(Pin::new(rx).poll(cx)); + + match op { + Operation::Read(Ok(_)) => { + let n = buf.copy_to(dst); + self.state = Idle(Some(buf)); + return Ready(Ok(n)); + } + Operation::Read(Err(e)) => { + assert!(buf.is_empty()); + + self.state = Idle(Some(buf)); + return Ready(Err(e)); + } + Operation::Write(Ok(_)) => { + assert!(buf.is_empty()); + self.state = Idle(Some(buf)); + continue; + } + Operation::Write(Err(e)) => { + assert!(self.last_write_err.is_none()); + self.last_write_err = Some(e.kind()); + self.state = Idle(Some(buf)); + } + Operation::Seek(_) => { + assert!(buf.is_empty()); + self.state = Idle(Some(buf)); + continue; + } + } + } + } + } + } +} + +impl AsyncWrite for File { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + src: &[u8], + ) -> Poll<io::Result<usize>> { + if let Some(e) = self.last_write_err.take() { + return Ready(Err(e.into())); + } + + loop { + match self.state { + Idle(ref mut buf_cell) => { + let mut buf = buf_cell.take().unwrap(); + + let seek = if !buf.is_empty() { + Some(SeekFrom::Current(buf.discard_read())) + } else { + None + }; + + let n = buf.copy_from(src); + let std = self.std.clone(); + + self.state = Busy(sys::run(move || { + let res = if let Some(seek) = seek { + (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) + } else { + buf.write_to(&mut &*std) + }; + |