summaryrefslogtreecommitdiffstats
path: root/tokio/src/fs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-21 15:49:00 -0700
committerGitHub <noreply@github.com>2019-10-21 15:49:00 -0700
commit978013a215ebae63cd087139514de32bbd36ce11 (patch)
treedcf43cf2ac044ec9031a79901aa6956351c27ee4 /tokio/src/fs
parent6aa6ebb5bce7b2b8c5b81814b6ea47994f0f54d9 (diff)
fs: move into `tokio` (#1672)
A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The `fs` implementation is now provided by the main `tokio` crate. The `fs` functionality may still be excluded from the build by skipping the `fs` feature flag.
Diffstat (limited to 'tokio/src/fs')
-rw-r--r--tokio/src/fs/blocking.rs275
-rw-r--r--tokio/src/fs/create_dir.rs14
-rw-r--r--tokio/src/fs/create_dir_all.rs15
-rw-r--r--tokio/src/fs/file.rs610
-rw-r--r--tokio/src/fs/hard_link.rs19
-rw-r--r--tokio/src/fs/metadata.rs14
-rw-r--r--tokio/src/fs/mod.rs98
-rw-r--r--tokio/src/fs/open_options.rs113
-rw-r--r--tokio/src/fs/os/mod.rs7
-rw-r--r--tokio/src/fs/os/unix/mod.rs4
-rw-r--r--tokio/src/fs/os/unix/symlink.rs18
-rw-r--r--tokio/src/fs/os/windows/mod.rs7
-rw-r--r--tokio/src/fs/os/windows/symlink_dir.rs19
-rw-r--r--tokio/src/fs/os/windows/symlink_file.rs19
-rw-r--r--tokio/src/fs/read.rs27
-rw-r--r--tokio/src/fs/read_dir.rs237
-rw-r--r--tokio/src/fs/read_link.rs14
-rw-r--r--tokio/src/fs/read_to_string.rs27
-rw-r--r--tokio/src/fs/remove_dir.rs12
-rw-r--r--tokio/src/fs/remove_dir_all.rs14
-rw-r--r--tokio/src/fs/remove_file.rs18
-rw-r--r--tokio/src/fs/rename.rs17
-rw-r--r--tokio/src/fs/set_permissions.rs15
-rw-r--r--tokio/src/fs/symlink_metadata.rs18
-rw-r--r--tokio/src/fs/write.rs28
25 files changed, 1659 insertions, 0 deletions
diff --git a/tokio/src/fs/blocking.rs b/tokio/src/fs/blocking.rs
new file mode 100644
index 00000000..71b627c1
--- /dev/null
+++ b/tokio/src/fs/blocking.rs
@@ -0,0 +1,275 @@
+use crate::fs::sys;
+
+use tokio_io::{AsyncRead, AsyncWrite};
+
+use futures_core::ready;
+use std::cmp;
+use std::future::Future;
+use std::io;
+use std::io::prelude::*;
+use std::pin::Pin;
+use std::task::Poll::*;
+use std::task::{Context, Poll};
+
+use self::State::*;
+
+/// `T` should not implement _both_ Read and Write.
+#[derive(Debug)]
+pub(crate) struct Blocking<T> {
+ inner: Option<T>,
+ state: State<T>,
+ /// true if the lower IO layer needs flushing
+ need_flush: bool,
+}
+
+#[derive(Debug)]
+pub(crate) struct Buf {
+ buf: Vec<u8>,
+ pos: usize,
+}
+
+pub(crate) const MAX_BUF: usize = 16 * 1024;
+
+#[derive(Debug)]
+enum State<T> {
+ Idle(Option<Buf>),
+ Busy(sys::Blocking<(io::Result<usize>, Buf, T)>),
+}
+
+impl<T> Blocking<T> {
+ pub(crate) fn new(inner: T) -> Blocking<T> {
+ Blocking {
+ inner: Some(inner),
+ state: State::Idle(Some(Buf::with_capacity(0))),
+ need_flush: false,
+ }
+ }
+}
+
+impl<T> AsyncRead for Blocking<T>
+where
+ T: Read + Unpin + Send + 'static,
+{
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ dst: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ loop {
+ match self.state {
+ Idle(ref mut buf_cell) => {
+ let mut buf = buf_cell.take().unwrap();
+
+ if !buf.is_empty() {
+ let n = buf.copy_to(dst);
+ *buf_cell = Some(buf);
+ return Ready(Ok(n));
+ }
+
+ buf.ensure_capacity_for(dst);
+ let mut inner = self.inner.take().unwrap();
+
+ self.state = Busy(sys::run(move || {
+ let res = buf.read_from(&mut inner);
+ (res, buf, inner)
+ }));
+ }
+ Busy(ref mut rx) => {
+ let (res, mut buf, inner) = ready!(Pin::new(rx).poll(cx));
+ self.inner = Some(inner);
+
+ match res {
+ Ok(_) => {
+ let n = buf.copy_to(dst);
+ self.state = Idle(Some(buf));
+ return Ready(Ok(n));
+ }
+ Err(e) => {
+ assert!(buf.is_empty());
+
+ self.state = Idle(Some(buf));
+ return Ready(Err(e));
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+impl<T> AsyncWrite for Blocking<T>
+where
+ T: Write + Unpin + Send + 'static,
+{
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ src: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ loop {
+ match self.state {
+ Idle(ref mut buf_cell) => {
+ let mut buf = buf_cell.take().unwrap();
+
+ assert!(buf.is_empty());
+
+ let n = buf.copy_from(src);
+ let mut inner = self.inner.take().unwrap();
+
+ self.state = Busy(sys::run(move || {
+ let n = buf.len();
+ let res = buf.write_to(&mut inner).map(|_| n);
+
+ (res, buf, inner)
+ }));
+ self.need_flush = true;
+
+ return Ready(Ok(n));
+ }
+ Busy(ref mut rx) => {
+ let (res, buf, inner) = ready!(Pin::new(rx).poll(cx));
+ self.state = Idle(Some(buf));
+ self.inner = Some(inner);
+
+ // If error, return
+ res?;
+ }
+ }
+ }
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ loop {
+ let need_flush = self.need_flush;
+ match self.state {
+ // The buffer is not used here
+ Idle(ref mut buf_cell) => {
+ if need_flush {
+ let buf = buf_cell.take().unwrap();
+ let mut inner = self.inner.take().unwrap();
+
+ self.state = Busy(sys::run(move || {
+ let res = inner.flush().map(|_| 0);
+ (res, buf, inner)
+ }));
+
+ self.need_flush = false;
+ } else {
+ return Ready(Ok(()));
+ }
+ }
+ Busy(ref mut rx) => {
+ let (res, buf, inner) = ready!(Pin::new(rx).poll(cx));
+ self.state = Idle(Some(buf));
+ self.inner = Some(inner);
+
+ // If error, return
+ res?;
+ }
+ }
+ }
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ Poll::Ready(Ok(()))
+ }
+}
+
+/// Repeates operations that are interrupted
+macro_rules! uninterruptibly {
+ ($e:expr) => {{
+ loop {
+ match $e {
+ Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
+ res => break res,
+ }
+ }
+ }};
+}
+
+impl Buf {
+ pub(crate) fn with_capacity(n: usize) -> Buf {
+ Buf {
+ buf: Vec::with_capacity(n),
+ pos: 0,
+ }
+ }
+
+ pub(crate) fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ pub(crate) fn len(&self) -> usize {
+ self.buf.len() - self.pos
+ }
+
+ pub(crate) fn copy_to(&mut self, dst: &mut [u8]) -> usize {
+ let n = cmp::min(self.len(), dst.len());
+ dst[..n].copy_from_slice(&self.bytes()[..n]);
+ self.pos += n;
+
+ if self.pos == self.buf.len() {
+ self.buf.truncate(0);
+ self.pos = 0;
+ }
+
+ n
+ }
+
+ pub(crate) fn copy_from(&mut self, src: &[u8]) -> usize {
+ assert!(self.is_empty());
+
+ let n = cmp::min(src.len(), MAX_BUF);
+
+ self.buf.extend_from_slice(&src[..n]);
+ n
+ }
+
+ pub(crate) fn bytes(&self) -> &[u8] {
+ &self.buf[self.pos..]
+ }
+
+ pub(crate) fn ensure_capacity_for(&mut self, bytes: &[u8]) {
+ assert!(self.is_empty());
+
+ let len = cmp::min(bytes.len(), MAX_BUF);
+
+ if self.buf.len() < len {
+ self.buf.reserve(len - self.buf.len());
+ }
+
+ unsafe {
+ self.buf.set_len(len);
+ }
+ }
+
+ pub(crate) fn read_from<T: Read>(&mut self, rd: &mut T) -> io::Result<usize> {
+ let res = uninterruptibly!(rd.read(&mut self.buf));
+
+ if let Ok(n) = res {
+ self.buf.truncate(n);
+ } else {
+ self.buf.clear();
+ }
+
+ assert_eq!(self.pos, 0);
+
+ res
+ }
+
+ pub(crate) fn write_to<T: Write>(&mut self, wr: &mut T) -> io::Result<()> {
+ assert_eq!(self.pos, 0);
+
+ // `write_all` already ignores interrupts
+ let res = wr.write_all(&self.buf);
+ self.buf.clear();
+ res
+ }
+
+ pub(crate) fn discard_read(&mut self) -> i64 {
+ let ret = -(self.bytes().len() as i64);
+ self.pos = 0;
+ self.buf.truncate(0);
+ ret
+ }
+}
diff --git a/tokio/src/fs/create_dir.rs b/tokio/src/fs/create_dir.rs
new file mode 100644
index 00000000..a74ca71d
--- /dev/null
+++ b/tokio/src/fs/create_dir.rs
@@ -0,0 +1,14 @@
+use crate::fs::asyncify;
+
+use std::io;
+use std::path::Path;
+
+/// Creates a new, empty directory at the provided path
+///
+/// This is an async version of [`std::fs::create_dir`][std]
+///
+/// [std]: https://doc.rust-lang.org/std/fs/fn.create_dir.html
+pub async fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
+ let path = path.as_ref().to_owned();
+ asyncify(move || std::fs::create_dir(path)).await
+}
diff --git a/tokio/src/fs/create_dir_all.rs b/tokio/src/fs/create_dir_all.rs
new file mode 100644
index 00000000..2a7374a3
--- /dev/null
+++ b/tokio/src/fs/create_dir_all.rs
@@ -0,0 +1,15 @@
+use crate::fs::asyncify;
+
+use std::io;
+use std::path::Path;
+
+/// Recursively create a directory and all of its parent components if they
+/// are missing.
+///
+/// This is an async version of [`std::fs::create_dir_all`][std]
+///
+/// [std]: https://doc.rust-lang.org/std/fs/fn.create_dir_all.html
+pub async fn create_dir_all<P: AsRef<Path>>(path: P) -> io::Result<()> {
+ let path = path.as_ref().to_owned();
+ asyncify(move || std::fs::create_dir_all(path)).await
+}
diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs
new file mode 100644
index 00000000..7c3a6948
--- /dev/null
+++ b/tokio/src/fs/file.rs
@@ -0,0 +1,610 @@
+//! Types for working with [`File`].
+//!
+//! [`File`]: file/struct.File.html
+
+use self::State::*;
+use crate::fs::blocking::Buf;
+use crate::fs::{asyncify, sys};
+
+use tokio_io::{AsyncRead, AsyncWrite};
+
+use futures_core::ready;
+use std::fmt;
+use std::fs::{Metadata, Permissions};
+use std::future::Future;
+use std::io::{self, Seek, SeekFrom};
+use std::path::Path;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
+use std::task::Poll::*;
+
+/// A reference to an open file on the filesystem.
+///
+/// This is a specialized version of [`std::fs::File`][std] for usage from the
+/// Tokio runtime.
+///
+/// An instance of a `File` can be read and/or written depending on what options
+/// it was opened with. Files also implement Seek to alter the logical cursor
+/// that the file contains internally.
+///
+/// Files are automatically closed when they go out of scope.
+///
+/// [std]: https://doc.rust-lang.org/std/fs/struct.File.html
+///
+/// # Examples
+///
+/// Create a new file and asynchronously write bytes to it:
+///
+/// ```no_run
+/// use tokio::fs::File;
+/// use tokio::prelude::*;
+///
+/// # async fn dox() -> std::io::Result<()> {
+/// let mut file = File::create("foo.txt").await?;
+/// file.write_all(b"hello, world!").await?;
+/// # Ok(())
+/// # }
+/// ```
+///
+/// Read the contents of a file into a buffer
+///
+/// ```no_run
+/// use tokio::fs::File;
+/// use tokio::prelude::*;
+///
+/// # async fn dox() -> std::io::Result<()> {
+/// let mut file = File::open("foo.txt").await?;
+///
+/// let mut contents = vec![];
+/// file.read_to_end(&mut contents).await?;
+///
+/// println!("len = {}", contents.len());
+/// # Ok(())
+/// # }
+/// ```
+pub struct File {
+ std: Arc<sys::File>,
+ state: State,
+
+ /// Errors from writes/flushes are returned in write/flush calls. If a write
+ /// error is observed while performing a read, it is saved until the next
+ /// write / flush call.
+ last_write_err: Option<io::ErrorKind>,
+}
+
+#[derive(Debug)]
+enum State {
+ Idle(Option<Buf>),
+ Busy(sys::Blocking<(Operation, Buf)>),
+}
+
+#[derive(Debug)]
+enum Operation {
+ Read(io::Result<usize>),
+ Write(io::Result<()>),
+ Seek(io::Result<u64>),
+}
+
+impl File {
+ /// Attempts to open a file in read-only mode.
+ ///
+ /// See [`OpenOptions`] for more details.
+ ///
+ /// [`OpenOptions`]: struct.OpenOptions.html
+ ///
+ /// # Errors
+ ///
+ /// This function will return an error if called from outside of the Tokio
+ /// runtime or if path does not already exist. Other errors may also be
+ /// returned according to OpenOptions::open.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ /// use tokio::prelude::*;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let mut file = File::open("foo.txt").await?;
+ ///
+ /// let mut contents = vec![];
+ /// file.read_to_end(&mut contents).await?;
+ ///
+ /// println!("len = {}", contents.len());
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn open<P>(path: P) -> io::Result<File>
+ where
+ P: AsRef<Path>,
+ {
+ let path = path.as_ref().to_owned();
+ let std = asyncify(|| sys::File::open(path)).await?;
+
+ Ok(File::from_std(std))
+ }
+
+ /// Opens a file in write-only mode.
+ ///
+ /// This function will create a file if it does not exist, and will truncate
+ /// it if it does.
+ ///
+ /// See [`OpenOptions`] for more details.
+ ///
+ /// [`OpenOptions`]: struct.OpenOptions.html
+ ///
+ /// # Errors
+ ///
+ /// Results in an error if called from outside of the Tokio runtime or if
+ /// the underlying [`create`] call results in an error.
+ ///
+ /// [`create`]: https://doc.rust-lang.org/std/fs/struct.File.html#method.create
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ /// use tokio::prelude::*;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let mut file = File::create("foo.txt").await?;
+ /// file.write_all(b"hello, world!").await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn create<P>(path: P) -> io::Result<File>
+ where
+ P: AsRef<Path>,
+ {
+ let path = path.as_ref().to_owned();
+ let std_file = asyncify(move || sys::File::create(path)).await?;
+ Ok(File::from_std(std_file))
+ }
+
+ /// Convert a [`std::fs::File`][std] to a [`tokio_fs::File`][file].
+ ///
+ /// [std]: https://doc.rust-lang.org/std/fs/struct.File.html
+ /// [file]: struct.File.html
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// // This line could block. It is not recommended to do this on the Tokio
+ /// // runtime.
+ /// let std_file = std::fs::File::open("foo.txt").unwrap();
+ /// let file = tokio::fs::File::from_std(std_file);
+ /// ```
+ pub fn from_std(std: sys::File) -> File {
+ File {
+ std: Arc::new(std),
+ state: State::Idle(Some(Buf::with_capacity(0))),
+ last_write_err: None,
+ }
+ }
+
+ /// Seek to an offset, in bytes, in a stream.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ /// use tokio::prelude::*;
+ ///
+ /// use std::io::SeekFrom;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let mut file = File::open("foo.txt").await?;
+ /// file.seek(SeekFrom::Start(6)).await?;
+ ///
+ /// let mut contents = vec![0u8; 10];
+ /// file.read_exact(&mut contents).await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn seek(&mut self, mut pos: SeekFrom) -> io::Result<u64> {
+ self.complete_inflight().await;
+
+ let mut buf = match self.state {
+ Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
+ _ => unreachable!(),
+ };
+
+ // Factor in any unread data from the buf
+ if !buf.is_empty() {
+ let n = buf.discard_read();
+
+ if let SeekFrom::Current(ref mut offset) = pos {
+ *offset += n;
+ }
+ }
+
+ let std = self.std.clone();
+
+ // Start the operation
+ self.state = Busy(sys::run(move || {
+ let res = (&*std).seek(pos);
+ (Operation::Seek(res), buf)
+ }));
+
+ let (op, buf) = match self.state {
+ Idle(_) => unreachable!(),
+ Busy(ref mut rx) => rx.await,
+ };
+
+ self.state = Idle(Some(buf));
+
+ match op {
+ Operation::Seek(res) => res,
+ _ => unreachable!(),
+ }
+ }
+
+ /// Attempts to sync all OS-internal metadata to disk.
+ ///
+ /// This function will attempt to ensure that all in-core data reaches the
+ /// filesystem before returning.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ /// use tokio::prelude::*;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let mut file = File::create("foo.txt").await?;
+ /// file.write_all(b"hello, world!").await?;
+ /// file.sync_all().await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn sync_all(&mut self) -> io::Result<()> {
+ self.complete_inflight().await;
+
+ let std = self.std.clone();
+ asyncify(move || std.sync_all()).await
+ }
+
+ /// This function is similar to `sync_all`, except that it may not
+ /// synchronize file metadata to the filesystem.
+ ///
+ /// This is intended for use cases that must synchronize content, but don't
+ /// need the metadata on disk. The goal of this method is to reduce disk
+ /// operations.
+ ///
+ /// Note that some platforms may simply implement this in terms of `sync_all`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ /// use tokio::prelude::*;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let mut file = File::create("foo.txt").await?;
+ /// file.write_all(b"hello, world!").await?;
+ /// file.sync_data().await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn sync_data(&mut self) -> io::Result<()> {
+ self.complete_inflight().await;
+
+ let std = self.std.clone();
+ asyncify(move || std.sync_data()).await
+ }
+
+ /// Truncates or extends the underlying file, updating the size of this file to become size.
+ ///
+ /// If the size is less than the current file's size, then the file will be
+ /// shrunk. If it is greater than the current file's size, then the file
+ /// will be extended to size and have all of the intermediate data filled in
+ /// with 0s.
+ ///
+ /// # Errors
+ ///
+ /// This function will return an error if the file is not opened for
+ /// writing.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ /// use tokio::prelude::*;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let mut file = File::create("foo.txt").await?;
+ /// file.write_all(b"hello, world!").await?;
+ /// file.set_len(10).await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn set_len(&mut self, size: u64) -> io::Result<()> {
+ self.complete_inflight().await;
+
+ let mut buf = match self.state {
+ Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
+ _ => unreachable!(),
+ };
+
+ let seek = if !buf.is_empty() {
+ Some(SeekFrom::Current(buf.discard_read()))
+ } else {
+ None
+ };
+
+ let std = self.std.clone();
+
+ self.state = Busy(sys::run(move || {
+ let res = if let Some(seek) = seek {
+ (&*std).seek(seek).and_then(|_| std.set_len(size))
+ } else {
+ std.set_len(size)
+ }
+ .map(|_| 0); // the value is discarded later
+
+ // Return the result as a seek
+ (Operation::Seek(res), buf)
+ }));
+
+ let (op, buf) = match self.state {
+ Idle(_) => unreachable!(),
+ Busy(ref mut rx) => rx.await,
+ };
+
+ self.state = Idle(Some(buf));
+
+ match op {
+ Operation::Seek(res) => res.map(|_| ()),
+ _ => unreachable!(),
+ }
+ }
+
+ /// Queries metadata about the underlying file.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let file = File::open("foo.txt").await?;
+ /// let metadata = file.metadata().await?;
+ ///
+ /// println!("{:?}", metadata);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn metadata(&self) -> io::Result<Metadata> {
+ let std = self.std.clone();
+ asyncify(move || std.metadata()).await
+ }
+
+ /// Create a new `File` instance that shares the same underlying file handle
+ /// as the existing `File` instance. Reads, writes, and seeks will affect both
+ /// File instances simultaneously.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let file = File::open("foo.txt").await?;
+ /// let file_clone = file.try_clone().await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn try_clone(&self) -> io::Result<File> {
+ let std = self.std.clone();
+ let std_file = asyncify(move || std.try_clone()).await?;
+ Ok(File::from_std(std_file))
+ }
+
+ /// Changes the permissions on the underlying file.
+ ///
+ /// # Platform-specific behavior
+ ///
+ /// This function currently corresponds to the `fchmod` function on Unix and
+ /// the `SetFileInformationByHandle` function on Windows. Note that, this
+ /// [may change in the future][changes].
+ ///
+ /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
+ ///
+ /// # Errors
+ ///
+ /// This function will return an error if the user lacks permission change
+ /// attributes on the underlying file. It may also return an error in other
+ /// os-specific unspecified cases.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::fs::File;
+ ///
+ /// # async fn dox() -> std::io::Result<()> {
+ /// let file = File::open("foo.txt").await?;
+ /// let mut perms = file.metadata().await?.permissions();
+ /// perms.set_readonly(true);
+ /// file.set_permissions(perms).await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
+ let std = self.std.clone();
+ asyncify(move || std.set_permissions(perm)).await
+ }
+
+ async fn complete_inflight(&mut self) {
+ use futures_util::future::poll_fn;
+
+ if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await {
+ self.last_write_err = Some(e.kind());
+ }
+ }
+}
+
+impl AsyncRead for File {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ dst: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ loop {
+ match self.state {
+ Idle(ref mut buf_cell) => {
+ let mut buf = buf_cell.take().unwrap();
+
+ if !buf.is_empty() {
+ let n = buf.copy_to(dst);
+ *buf_cell = Some(buf);
+ return Ready(Ok(n));
+ }
+
+ buf.ensure_capacity_for(dst);
+ let std = self.std.clone();
+
+ self.state = Busy(sys::run(move || {
+ let res = buf.read_from(&mut &*std);
+ (Operation::Read(res), buf)
+ }));
+ }
+ Busy(ref mut rx) => {
+ let (op, mut buf) = ready!(Pin::new(rx).poll(cx));
+
+ match op {
+ Operation::Read(Ok(_)) => {
+ let n = buf.copy_to(dst);
+ self.state = Idle(Some(buf));
+ return Ready(Ok(n));
+ }
+ Operation::Read(Err(e)) => {
+ assert!(buf.is_empty());
+
+ self.state = Idle(Some(buf));
+ return Ready(Err(e));
+ }
+ Operation::Write(Ok(_)) => {
+ assert!(buf.is_empty());
+ self.state = Idle(Some(buf));
+ continue;
+ }
+ Operation::Write(Err(e)) => {
+ assert!(self.last_write_err.is_none());
+ self.last_write_err = Some(e.kind());
+ self.state = Idle(Some(buf));
+ }
+ Operation::Seek(_) => {
+ assert!(buf.is_empty());
+ self.state = Idle(Some(buf));
+ continue;
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+impl AsyncWrite for File {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ src: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ if let Some(e) = self.last_write_err.take() {
+ return Ready(Err(e.into()));
+ }
+
+ loop {
+ match self.state {
+ Idle(ref mut buf_cell) => {
+ let mut buf = buf_cell.take().unwrap();
+
+ let seek = if !buf.is_empty() {
+ Some(SeekFrom::Current(buf.discard_read()))
+ } else {
+ None
+ };
+
+ let n = buf.copy_from(src);
+ let std = self.std.clone();
+
+ self.state = Busy(sys::run(move || {
+ let res = if let Some(seek) = seek {
+ (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
+ } else {
+ buf.write_to(&mut &*std)
+ };
+
+ (Operation::Write(res), buf)
+ }));
+
+ return Ready(Ok(n));
+ }
+ Busy(ref mut rx) => {
+ let (op, buf) = ready!(Pin::new(rx).poll(cx));
+ self.state = Idle(Some(buf));
+
+ match op {
+ Operation::Read(_) => {
+ // We don't care about the result here. The fact
+ // that the cursor has advanced will be reflected in
+ // the next iteration of the loop
+ continue;
+ }
+ Operation::Write(res) => {
+ // If the previous write was successful, continue.
+ // Otherwise, error.
+ res?;
+ continue;
+ }
+ Operation::Seek(_) => {
+ // Ignore the seek
+ continue;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ if let Some(e) = self.last_write_err.take() {
+ return Ready(Err(e.into()));
+ }
+
+ let (op, buf) = match self.state {
+ Idle(_) => return Ready(Ok(())),
+ Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx)),
+ };
+
+ // The buffer is not used here
+ self.state = Idle(Some(buf));
+
+ match op {
+ Operation::Read(_) => Ready(Ok(())),
+ Operation::Write(res) => Ready(res),
+ Operation::Seek(_) => Ready(Ok(())),
+ }
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>