//! Types for working with [`File`]. //! //! [`File`]: file/struct.File.html use self::State::*; use crate::fs::blocking::Buf; use crate::fs::{asyncify, sys}; use crate::io::{AsyncRead, AsyncWrite}; use futures_core::ready; use std::fmt; use std::fs::{Metadata, Permissions}; use std::future::Future; use std::io::{self, Seek, SeekFrom}; use std::path::Path; use std::pin::Pin; use std::sync::Arc; use std::task::Context; use std::task::Poll; use std::task::Poll::*; /// A reference to an open file on the filesystem. /// /// This is a specialized version of [`std::fs::File`][std] for usage from the /// Tokio runtime. /// /// An instance of a `File` can be read and/or written depending on what options /// it was opened with. Files also implement Seek to alter the logical cursor /// that the file contains internally. /// /// Files are automatically closed when they go out of scope. /// /// [std]: https://doc.rust-lang.org/std/fs/struct.File.html /// /// # Examples /// /// Create a new file and asynchronously write bytes to it: /// /// ```no_run /// use tokio::fs::File; /// use tokio::prelude::*; /// /// # async fn dox() -> std::io::Result<()> { /// let mut file = File::create("foo.txt").await?; /// file.write_all(b"hello, world!").await?; /// # Ok(()) /// # } /// ``` /// /// Read the contents of a file into a buffer /// /// ```no_run /// use tokio::fs::File; /// use tokio::prelude::*; /// /// # async fn dox() -> std::io::Result<()> { /// let mut file = File::open("foo.txt").await?; /// /// let mut contents = vec![]; /// file.read_to_end(&mut contents).await?; /// /// println!("len = {}", contents.len()); /// # Ok(()) /// # } /// ``` pub struct File { std: Arc, state: State, /// Errors from writes/flushes are returned in write/flush calls. If a write /// error is observed while performing a read, it is saved until the next /// write / flush call. last_write_err: Option, } #[derive(Debug)] enum State { Idle(Option), Busy(sys::Blocking<(Operation, Buf)>), } #[derive(Debug)] enum Operation { Read(io::Result), Write(io::Result<()>), Seek(io::Result), } impl File { /// Attempts to open a file in read-only mode. /// /// See [`OpenOptions`] for more details. /// /// [`OpenOptions`]: struct.OpenOptions.html /// /// # Errors /// /// This function will return an error if called from outside of the Tokio /// runtime or if path does not already exist. Other errors may also be /// returned according to OpenOptions::open. /// /// # Examples /// /// ```no_run /// use tokio::fs::File; /// use tokio::prelude::*; /// /// # async fn dox() -> std::io::Result<()> { /// let mut file = File::open("foo.txt").await?; /// /// let mut contents = vec![]; /// file.read_to_end(&mut contents).await?; /// /// println!("len = {}", contents.len()); /// # Ok(()) /// # } /// ``` pub async fn open(path: impl AsRef) -> io::Result { let path = path.as_ref().to_owned(); let std = asyncify(|| sys::File::open(path)).await?; Ok(File::from_std(std)) } /// Opens a file in write-only mode. /// /// This function will create a file if it does not exist, and will truncate /// it if it does. /// /// See [`OpenOptions`] for more details. /// /// [`OpenOptions`]: struct.OpenOptions.html /// /// # Errors /// /// Results in an error if called from outside of the Tokio runtime or if /// the underlying [`create`] call results in an error. /// /// [`create`]: https://doc.rust-lang.org/std/fs/struct.File.html#method.create /// /// # Examples /// /// ```no_run /// use tokio::fs::File; /// use tokio::prelude::*; /// /// # async fn dox() -> std::io::Result<()> { /// let mut file = File::create("foo.txt").await?; /// file.write_all(b"hello, world!").await?; /// # Ok(()) /// # } /// ``` pub async fn create(path: impl AsRef) -> io::Result { let path = path.as_ref().to_owned(); let std_file = asyncify(move || sys::File::create(path)).await?; Ok(File::from_std(std_file)) } /// Convert a [`std::fs::File`][std] to a [`tokio_fs::File`][file]. /// /// [std]: https://doc.rust-lang.org/std/fs/struct.File.html /// [file]: struct.File.html /// /// # Examples /// /// ```no_run /// // This line could block. It is not recommended to do this on the Tokio /// // runtime. /// let std_file = std::fs::File::open("foo.txt").unwrap(); /// let file = tokio::fs::File::from_std(std_file); /// ``` pub fn from_std(std: sys::File) -> File { File { std: Arc::new(std), state: State::Idle(Some(Buf::with_capacity(0))), last_write_err: None, } } /// Seek to an offset, in bytes, in a stream. /// /// # Examples /// /// ```no_run /// use tokio::fs::File; /// use tokio::prelude::*; /// /// use std::io::SeekFrom; /// /// # async fn dox() -> std::io::Result<()> { /// let mut file = File::open("foo.txt").await?; /// file.seek(SeekFrom::Start(6)).await?; /// /// let mut contents = vec![0u8; 10]; /// file.read_exact(&mut contents).await?; /// # Ok(()) /// # } /// ``` pub async fn seek(&mut self, mut pos: SeekFrom) -> io::Result { self.complete_inflight().await; let mut buf = match self.state { Idle(ref mut buf_cell) => buf_cell.take().unwrap(), _ => unreachable!(), }; // Factor in any unread data from the buf if !buf.is_empty() { let n = buf.discard_read(); if let SeekFrom::Current(ref mut offset) = pos { *offset += n; } } let std = self.std.clone(); // Start the operation self.state = Busy(sys::run(move || { let res = (&*std).seek(pos); (Operation::Seek(res), buf) })); let (op, buf) = match self.state { Idle(_) => unreachable!(), Busy(ref mut rx) => rx.await.unwrap(), }; self.state = Idle(Some(buf)); match op { Operation::Seek(res) => res, _ => unreachable!(), } } /// Attempts to sync all OS-internal metadata to disk. /// /// This function will attempt to ensure that all in-core data reaches the /// filesystem before returning. /// /// # Examples /// /// ```no_run /// use tokio::fs::File; /// use tokio::prelude::*; /// /// # async fn dox() -> std::io::Result<()> { /// let mut file = File::create("foo.txt").await?; /// file.write_all(b"hello, world!").await?; /// file.sync_all().await?; /// # Ok(()) /// # } /// ``` pub async fn sync_all(&mut self) -> io::Result<()> { self.complete_inflight().await; let std = self.std.clone(); asyncify(move || std.sync_all()).await } /// This function is similar to `sync_all`, except that it may not /// synchronize file metadata to the filesystem. /// /// This is intended for use cases that must synchronize content, but don't /// need the metadata on disk. The goal of this method is to reduce disk /// operations. /// /// Note that some platforms may simply implement this in terms of `sync_all`. /// /// # Examples /// /// ```no_run /// use tokio::fs::File; /// use tokio::prelude::*; /// /// # async fn dox() -> std::io::Result<()> { /// let mut file = File::create("foo.txt").await?; /// file.write_all(b"hello, world!").await?; /// file.sync_data().await?; /// # Ok(()) /// # } /// ``` pub async fn sync_data(&mut self) -> io::Result<()> { self.complete_inflight().await; let std = self.std.clone(); asyncify(move || std.sync_data()).await } /// Truncates or extends the underlying file, updating the size of this file to become size. /// /// If the size is less than the current file's size, then the file will be /// shrunk. If it is greater than the current file's size, then the file /// will be extended to size and have all of the intermediate data filled in /// with 0s. /// /// # Errors /// /// This function will return an error if the file is not opened for /// writing. /// /// # Examples /// /// ```no_run /// use tokio::fs::File; /// use tokio::prelude::*; /// /// # async fn dox() -> std::io::Result<()> { /// let mut file = File::create("foo.txt").await?; /// file.write_all(b"hello, world!").await?; /// file.set_len(10).await?; /// # Ok(()) /// # } /// ``` pub async fn set_len(&mut self, size: u64) -> io::Result<()> { self.complete_inflight().await; let mut buf = match self.state { Idle(ref mut buf_cell) => buf_cell.take().unwrap(), _ => unreachable!(), }; let seek = if !buf.is_empty() { Some(SeekFrom::Current(buf.discard_read())) } else { None }; let std = self.std.clone(); self.state = Busy(sys::run(move || { let res = if let Some(seek) = seek { (&*std).seek(seek).and_then(|_| std.set_len(size)) } else { std.set_len(size) } .map(|_| 0); // the value is discarded later // Return the result as a seek (Operation::Seek(res), buf) })); let (op, buf) = match self.state { Idle(_) => unreachable!(), Busy(ref mut rx) => rx.await?, }; self.state = Idle(Some(buf)); match op { Operation::Seek(res) => res.map(|_| ()), _ => unreachable!(), } } /// Queries metadata about the underlying file. /// /// # Examples /// /// ```no_run /// use tokio::fs::File; /// /// # async fn dox() -> std::io::Result<()> { /// let file = File::open("foo.txt").await?; /// let metadata = file.metadata().await?; /// /// println!("{:?}", metadata); /// # Ok(()) /// # } /// ``` pub async fn metadata(&self) -> io::Result { let std = self.std.clone(); asyncify(move || std.metadata()).await } /// Create a new `File` instance that shares the same underlying file handle /// as the existing `File` instance. Reads, writes, and seeks will affect both /// File instances simultaneously. /// /// # Examples /// /// ```no_run /// use tokio::fs::File; /// /// # async fn dox() -> std::io::Result<()> { /// let file = File::open("foo.txt").await?; /// let file_clone = file.try_clone().await?; /// # Ok(()) /// # } /// ``` pub async fn try_clone(&self) -> io::Result { let std = self.std.clone(); let std_file = asyncify(move || std.try_clone()).await?; Ok(File::from_std(std_file)) } /// Changes the permissions on the underlying file. /// /// # Platform-specific behavior /// /// This function currently corresponds to the `fchmod` function on Unix and /// the `SetFileInformationByHandle` function on Windows. Note that, this /// [may change in the future][changes]. /// /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior /// /// # Errors /// /// This function will return an error if the user lacks permission change /// attributes on the underlying file. It may also return an error in other /// os-specific unspecified cases. /// /// # Examples /// /// ```no_run /// use tokio::fs::File; /// /// # async fn dox() -> std::io::Result<()> { /// let file = File::open("foo.txt").await?; /// let mut perms = file.metadata().await?.permissions(); /// perms.set_readonly(true); /// file.set_permissions(perms).await?; /// # Ok(()) /// # } /// ``` pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> { let std = self.std.clone(); asyncify(move || std.set_permissions(perm)).await } async fn complete_inflight(&mut self) { use futures_util::future::poll_fn; if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await { self.last_write_err = Some(e.kind()); } } } impl AsyncRead for File { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, dst: &mut [u8], ) -> Poll> { loop { match self.state { Idle(ref mut buf_cell) => { let mut buf = buf_cell.take().unwrap(); if !buf.is_empty() { let n = buf.copy_to(dst); *buf_cell = Some(buf); return Ready(Ok(n)); } buf.ensure_capacity_for(dst); let std = self.std.clone(); self.state = Busy(sys::run(move || { let res = buf.read_from(&mut &*std); (Operation::Read(res), buf) })); } Busy(ref mut rx) => { let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?; match op { Operation::Read(Ok(_)) => { let n = buf.copy_to(dst); self.state = Idle(Some(buf)); return Ready(Ok(n)); } Operation::Read(Err(e)) => { assert!(buf.is_empty()); self.state = Idle(Some(buf)); return Ready(Err(e)); } Operation::Write(Ok(_)) => { assert!(buf.is_empty()); self.state = Idle(Some(buf)); continue; } Operation::Write(Err(e)) => { assert!(self.last_write_err.is_none()); self.last_write_err = Some(e.kind()); self.state = Idle(Some(buf)); } Operation::Seek(_) => { assert!(buf.is_empty()); self.state = Idle(Some(buf)); continue; } } } } } } } impl AsyncWrite for File { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, src: &[u8], ) -> Poll> { if let Some(e) = self.last_write_err.take() { return Ready(Err(e.into())); } loop { match self.state { Idle(ref mut buf_cell) => { let mut buf = buf_cell.take().unwrap(); let seek = if !buf.is_empty() { Some(SeekFrom::Current(buf.discard_read())) } else { None }; let n = buf.copy_from(src); let std = self.std.clone(); self.state = Busy(sys::run(move || { let res = if let Some(seek) = seek { (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std)) } else { buf.write_to(&mut &*std) }; (Operation::Write(res), buf) })); return Ready(Ok(n)); } Busy(ref mut rx) => { let (op, buf) = ready!(Pin::new(rx).poll(cx))?; self.state = Idle(Some(buf)); match op { Operation::Read(_) => { // We don't care about the result here. The fact // that the cursor has advanced will be reflected in // the next iteration of the loop continue; } Operation::Write(res) => { // If the previous write was successful, continue. // Otherwise, error. res?; continue; } Operation::Seek(_) => { // Ignore the seek continue; } } } } } } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if let Some(e) = self.last_write_err.take() { return Ready(Err(e.into())); } let (op, buf) = match self.state { Idle(_) => return Ready(Ok(())), Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?, }; // The buffer is not used here self.state = Idle(Some(buf)); match op { Operation::Read(_) => Ready(Ok(())), Operation::Write(res) => Ready(res), Operation::Seek(_) => Ready(Ok(())), } } fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } impl From for File { fn from(std: sys::File) -> Self { Self::from_std(std) } } impl fmt::Debug for File { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("tokio::fs::File") .field("std", &self.std) .finish() } }