diff options
author | Taiki Endo <te316e89@gmail.com> | 2019-11-17 15:03:39 +0900 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2019-11-16 22:03:39 -0800 |
commit | 10dc659450d83c21de9661fc084ae83b4878098b (patch) | |
tree | 11334e3781fb67f76b44a3906f0ade4ab26c49a2 /tokio/src/fs | |
parent | 320c84a433e5e54e386c17098fc6d36d15e4acff (diff) |
io: expose std{in, out, err} under io feature (#1759)
This exposes `std{in, out, err}` under io feature by moving
`fs::blocking` module into `io::blocking`.
As `fs` feature depends on `io-trait` feature, `fs` implementations can
always access `io` module.
Diffstat (limited to 'tokio/src/fs')
-rw-r--r-- | tokio/src/fs/blocking.rs | 273 | ||||
-rw-r--r-- | tokio/src/fs/file.rs | 2 | ||||
-rw-r--r-- | tokio/src/fs/mod.rs | 2 |
3 files changed, 1 insertions, 276 deletions
diff --git a/tokio/src/fs/blocking.rs b/tokio/src/fs/blocking.rs deleted file mode 100644 index 64398cbb..00000000 --- a/tokio/src/fs/blocking.rs +++ /dev/null @@ -1,273 +0,0 @@ -use crate::fs::sys; -use crate::io::{AsyncRead, AsyncWrite}; - -use std::cmp; -use std::future::Future; -use std::io; -use std::io::prelude::*; -use std::pin::Pin; -use std::task::Poll::*; -use std::task::{Context, Poll}; - -use self::State::*; - -/// `T` should not implement _both_ Read and Write. -#[derive(Debug)] -pub(crate) struct Blocking<T> { - inner: Option<T>, - state: State<T>, - /// true if the lower IO layer needs flushing - need_flush: bool, -} - -#[derive(Debug)] -pub(crate) struct Buf { - buf: Vec<u8>, - pos: usize, -} - -pub(crate) const MAX_BUF: usize = 16 * 1024; - -#[derive(Debug)] -enum State<T> { - Idle(Option<Buf>), - Busy(sys::Blocking<(io::Result<usize>, Buf, T)>), -} - -impl<T> Blocking<T> { - pub(crate) fn new(inner: T) -> Blocking<T> { - Blocking { - inner: Some(inner), - state: State::Idle(Some(Buf::with_capacity(0))), - need_flush: false, - } - } -} - -impl<T> AsyncRead for Blocking<T> -where - T: Read + Unpin + Send + 'static, -{ - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - dst: &mut [u8], - ) -> Poll<io::Result<usize>> { - loop { - match self.state { - Idle(ref mut buf_cell) => { - let mut buf = buf_cell.take().unwrap(); - - if !buf.is_empty() { - let n = buf.copy_to(dst); - *buf_cell = Some(buf); - return Ready(Ok(n)); - } - - buf.ensure_capacity_for(dst); - let mut inner = self.inner.take().unwrap(); - - self.state = Busy(sys::run(move || { - let res = buf.read_from(&mut inner); - (res, buf, inner) - })); - } - Busy(ref mut rx) => { - let (res, mut buf, inner) = ready!(Pin::new(rx).poll(cx))?; - self.inner = Some(inner); - - match res { - Ok(_) => { - let n = buf.copy_to(dst); - self.state = Idle(Some(buf)); - return Ready(Ok(n)); - } - Err(e) => { - assert!(buf.is_empty()); - - self.state = Idle(Some(buf)); - return Ready(Err(e)); - } - } - } - } - } - } -} - -impl<T> AsyncWrite for Blocking<T> -where - T: Write + Unpin + Send + 'static, -{ - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - src: &[u8], - ) -> Poll<io::Result<usize>> { - loop { - match self.state { - Idle(ref mut buf_cell) => { - let mut buf = buf_cell.take().unwrap(); - - assert!(buf.is_empty()); - - let n = buf.copy_from(src); - let mut inner = self.inner.take().unwrap(); - - self.state = Busy(sys::run(move || { - let n = buf.len(); - let res = buf.write_to(&mut inner).map(|_| n); - - (res, buf, inner) - })); - self.need_flush = true; - - return Ready(Ok(n)); - } - Busy(ref mut rx) => { - let (res, buf, inner) = ready!(Pin::new(rx).poll(cx))?; - self.state = Idle(Some(buf)); - self.inner = Some(inner); - - // If error, return - res?; - } - } - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { - loop { - let need_flush = self.need_flush; - match self.state { - // The buffer is not used here - Idle(ref mut buf_cell) => { - if need_flush { - let buf = buf_cell.take().unwrap(); - let mut inner = self.inner.take().unwrap(); - - self.state = Busy(sys::run(move || { - let res = inner.flush().map(|_| 0); - (res, buf, inner) - })); - - self.need_flush = false; - } else { - return Ready(Ok(())); - } - } - Busy(ref mut rx) => { - let (res, buf, inner) = ready!(Pin::new(rx).poll(cx))?; - self.state = Idle(Some(buf)); - self.inner = Some(inner); - - // If error, return - res?; - } - } - } - } - - fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { - Poll::Ready(Ok(())) - } -} - -/// Repeates operations that are interrupted -macro_rules! uninterruptibly { - ($e:expr) => {{ - loop { - match $e { - Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} - res => break res, - } - } - }}; -} - -impl Buf { - pub(crate) fn with_capacity(n: usize) -> Buf { - Buf { - buf: Vec::with_capacity(n), - pos: 0, - } - } - - pub(crate) fn is_empty(&self) -> bool { - self.len() == 0 - } - - pub(crate) fn len(&self) -> usize { - self.buf.len() - self.pos - } - - pub(crate) fn copy_to(&mut self, dst: &mut [u8]) -> usize { - let n = cmp::min(self.len(), dst.len()); - dst[..n].copy_from_slice(&self.bytes()[..n]); - self.pos += n; - - if self.pos == self.buf.len() { - self.buf.truncate(0); - self.pos = 0; - } - - n - } - - pub(crate) fn copy_from(&mut self, src: &[u8]) -> usize { - assert!(self.is_empty()); - - let n = cmp::min(src.len(), MAX_BUF); - - self.buf.extend_from_slice(&src[..n]); - n - } - - pub(crate) fn bytes(&self) -> &[u8] { - &self.buf[self.pos..] - } - - pub(crate) fn ensure_capacity_for(&mut self, bytes: &[u8]) { - assert!(self.is_empty()); - - let len = cmp::min(bytes.len(), MAX_BUF); - - if self.buf.len() < len { - self.buf.reserve(len - self.buf.len()); - } - - unsafe { - self.buf.set_len(len); - } - } - - pub(crate) fn read_from<T: Read>(&mut self, rd: &mut T) -> io::Result<usize> { - let res = uninterruptibly!(rd.read(&mut self.buf)); - - if let Ok(n) = res { - self.buf.truncate(n); - } else { - self.buf.clear(); - } - - assert_eq!(self.pos, 0); - - res - } - - pub(crate) fn write_to<T: Write>(&mut self, wr: &mut T) -> io::Result<()> { - assert_eq!(self.pos, 0); - - // `write_all` already ignores interrupts - let res = wr.write_all(&self.buf); - self.buf.clear(); - res - } - - pub(crate) fn discard_read(&mut self) -> i64 { - let ret = -(self.bytes().len() as i64); - self.pos = 0; - self.buf.truncate(0); - ret - } -} diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index 0ff45025..af7be586 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -3,8 +3,8 @@ //! [`File`]: file/struct.File.html use self::State::*; -use crate::fs::blocking::Buf; use crate::fs::{asyncify, sys}; +use crate::io::blocking::Buf; use crate::io::{AsyncRead, AsyncWrite}; use std::fmt; diff --git a/tokio/src/fs/mod.rs b/tokio/src/fs/mod.rs index 9108116a..93724280 100644 --- a/tokio/src/fs/mod.rs +++ b/tokio/src/fs/mod.rs @@ -22,8 +22,6 @@ //! //! [`AsyncRead`]: https://docs.rs/tokio-io/0.1/tokio_io/trait.AsyncRead.html -pub(crate) mod blocking; - mod create_dir; pub use self::create_dir::create_dir; |