summaryrefslogtreecommitdiffstats
path: root/tokio/src/fs
diff options
context:
space:
mode:
authorTaiki Endo <te316e89@gmail.com>2019-11-17 15:03:39 +0900
committerCarl Lerche <me@carllerche.com>2019-11-16 22:03:39 -0800
commit10dc659450d83c21de9661fc084ae83b4878098b (patch)
tree11334e3781fb67f76b44a3906f0ade4ab26c49a2 /tokio/src/fs
parent320c84a433e5e54e386c17098fc6d36d15e4acff (diff)
io: expose std{in, out, err} under io feature (#1759)
This exposes `std{in, out, err}` under io feature by moving `fs::blocking` module into `io::blocking`. As `fs` feature depends on `io-trait` feature, `fs` implementations can always access `io` module.
Diffstat (limited to 'tokio/src/fs')
-rw-r--r--tokio/src/fs/blocking.rs273
-rw-r--r--tokio/src/fs/file.rs2
-rw-r--r--tokio/src/fs/mod.rs2
3 files changed, 1 insertions, 276 deletions
diff --git a/tokio/src/fs/blocking.rs b/tokio/src/fs/blocking.rs
deleted file mode 100644
index 64398cbb..00000000
--- a/tokio/src/fs/blocking.rs
+++ /dev/null
@@ -1,273 +0,0 @@
-use crate::fs::sys;
-use crate::io::{AsyncRead, AsyncWrite};
-
-use std::cmp;
-use std::future::Future;
-use std::io;
-use std::io::prelude::*;
-use std::pin::Pin;
-use std::task::Poll::*;
-use std::task::{Context, Poll};
-
-use self::State::*;
-
-/// `T` should not implement _both_ Read and Write.
-#[derive(Debug)]
-pub(crate) struct Blocking<T> {
- inner: Option<T>,
- state: State<T>,
- /// true if the lower IO layer needs flushing
- need_flush: bool,
-}
-
-#[derive(Debug)]
-pub(crate) struct Buf {
- buf: Vec<u8>,
- pos: usize,
-}
-
-pub(crate) const MAX_BUF: usize = 16 * 1024;
-
-#[derive(Debug)]
-enum State<T> {
- Idle(Option<Buf>),
- Busy(sys::Blocking<(io::Result<usize>, Buf, T)>),
-}
-
-impl<T> Blocking<T> {
- pub(crate) fn new(inner: T) -> Blocking<T> {
- Blocking {
- inner: Some(inner),
- state: State::Idle(Some(Buf::with_capacity(0))),
- need_flush: false,
- }
- }
-}
-
-impl<T> AsyncRead for Blocking<T>
-where
- T: Read + Unpin + Send + 'static,
-{
- fn poll_read(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- dst: &mut [u8],
- ) -> Poll<io::Result<usize>> {
- loop {
- match self.state {
- Idle(ref mut buf_cell) => {
- let mut buf = buf_cell.take().unwrap();
-
- if !buf.is_empty() {
- let n = buf.copy_to(dst);
- *buf_cell = Some(buf);
- return Ready(Ok(n));
- }
-
- buf.ensure_capacity_for(dst);
- let mut inner = self.inner.take().unwrap();
-
- self.state = Busy(sys::run(move || {
- let res = buf.read_from(&mut inner);
- (res, buf, inner)
- }));
- }
- Busy(ref mut rx) => {
- let (res, mut buf, inner) = ready!(Pin::new(rx).poll(cx))?;
- self.inner = Some(inner);
-
- match res {
- Ok(_) => {
- let n = buf.copy_to(dst);
- self.state = Idle(Some(buf));
- return Ready(Ok(n));
- }
- Err(e) => {
- assert!(buf.is_empty());
-
- self.state = Idle(Some(buf));
- return Ready(Err(e));
- }
- }
- }
- }
- }
- }
-}
-
-impl<T> AsyncWrite for Blocking<T>
-where
- T: Write + Unpin + Send + 'static,
-{
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- src: &[u8],
- ) -> Poll<io::Result<usize>> {
- loop {
- match self.state {
- Idle(ref mut buf_cell) => {
- let mut buf = buf_cell.take().unwrap();
-
- assert!(buf.is_empty());
-
- let n = buf.copy_from(src);
- let mut inner = self.inner.take().unwrap();
-
- self.state = Busy(sys::run(move || {
- let n = buf.len();
- let res = buf.write_to(&mut inner).map(|_| n);
-
- (res, buf, inner)
- }));
- self.need_flush = true;
-
- return Ready(Ok(n));
- }
- Busy(ref mut rx) => {
- let (res, buf, inner) = ready!(Pin::new(rx).poll(cx))?;
- self.state = Idle(Some(buf));
- self.inner = Some(inner);
-
- // If error, return
- res?;
- }
- }
- }
- }
-
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
- loop {
- let need_flush = self.need_flush;
- match self.state {
- // The buffer is not used here
- Idle(ref mut buf_cell) => {
- if need_flush {
- let buf = buf_cell.take().unwrap();
- let mut inner = self.inner.take().unwrap();
-
- self.state = Busy(sys::run(move || {
- let res = inner.flush().map(|_| 0);
- (res, buf, inner)
- }));
-
- self.need_flush = false;
- } else {
- return Ready(Ok(()));
- }
- }
- Busy(ref mut rx) => {
- let (res, buf, inner) = ready!(Pin::new(rx).poll(cx))?;
- self.state = Idle(Some(buf));
- self.inner = Some(inner);
-
- // If error, return
- res?;
- }
- }
- }
- }
-
- fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
- Poll::Ready(Ok(()))
- }
-}
-
-/// Repeates operations that are interrupted
-macro_rules! uninterruptibly {
- ($e:expr) => {{
- loop {
- match $e {
- Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
- res => break res,
- }
- }
- }};
-}
-
-impl Buf {
- pub(crate) fn with_capacity(n: usize) -> Buf {
- Buf {
- buf: Vec::with_capacity(n),
- pos: 0,
- }
- }
-
- pub(crate) fn is_empty(&self) -> bool {
- self.len() == 0
- }
-
- pub(crate) fn len(&self) -> usize {
- self.buf.len() - self.pos
- }
-
- pub(crate) fn copy_to(&mut self, dst: &mut [u8]) -> usize {
- let n = cmp::min(self.len(), dst.len());
- dst[..n].copy_from_slice(&self.bytes()[..n]);
- self.pos += n;
-
- if self.pos == self.buf.len() {
- self.buf.truncate(0);
- self.pos = 0;
- }
-
- n
- }
-
- pub(crate) fn copy_from(&mut self, src: &[u8]) -> usize {
- assert!(self.is_empty());
-
- let n = cmp::min(src.len(), MAX_BUF);
-
- self.buf.extend_from_slice(&src[..n]);
- n
- }
-
- pub(crate) fn bytes(&self) -> &[u8] {
- &self.buf[self.pos..]
- }
-
- pub(crate) fn ensure_capacity_for(&mut self, bytes: &[u8]) {
- assert!(self.is_empty());
-
- let len = cmp::min(bytes.len(), MAX_BUF);
-
- if self.buf.len() < len {
- self.buf.reserve(len - self.buf.len());
- }
-
- unsafe {
- self.buf.set_len(len);
- }
- }
-
- pub(crate) fn read_from<T: Read>(&mut self, rd: &mut T) -> io::Result<usize> {
- let res = uninterruptibly!(rd.read(&mut self.buf));
-
- if let Ok(n) = res {
- self.buf.truncate(n);
- } else {
- self.buf.clear();
- }
-
- assert_eq!(self.pos, 0);
-
- res
- }
-
- pub(crate) fn write_to<T: Write>(&mut self, wr: &mut T) -> io::Result<()> {
- assert_eq!(self.pos, 0);
-
- // `write_all` already ignores interrupts
- let res = wr.write_all(&self.buf);
- self.buf.clear();
- res
- }
-
- pub(crate) fn discard_read(&mut self) -> i64 {
- let ret = -(self.bytes().len() as i64);
- self.pos = 0;
- self.buf.truncate(0);
- ret
- }
-}
diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs
index 0ff45025..af7be586 100644
--- a/tokio/src/fs/file.rs
+++ b/tokio/src/fs/file.rs
@@ -3,8 +3,8 @@
//! [`File`]: file/struct.File.html
use self::State::*;
-use crate::fs::blocking::Buf;
use crate::fs::{asyncify, sys};
+use crate::io::blocking::Buf;
use crate::io::{AsyncRead, AsyncWrite};
use std::fmt;
diff --git a/tokio/src/fs/mod.rs b/tokio/src/fs/mod.rs
index 9108116a..93724280 100644
--- a/tokio/src/fs/mod.rs
+++ b/tokio/src/fs/mod.rs
@@ -22,8 +22,6 @@
//!
//! [`AsyncRead`]: https://docs.rs/tokio-io/0.1/tokio_io/trait.AsyncRead.html
-pub(crate) mod blocking;
-
mod create_dir;
pub use self::create_dir::create_dir;