diff options
author | Mikail Bagishov <bagishov.mikail@yandex.ru> | 2020-09-23 09:16:05 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-23 08:16:05 +0200 |
commit | 555b74c7cdc3a288b0ae16af6e6358f3e3922ca6 (patch) | |
tree | b63ed8382760b2b95ed583460d1a4e547f0d083b /tokio/src | |
parent | 7ae5b7bd4f93612f91ab504ffb63aa8241c1d7bb (diff) |
io: fix stdout and stderr buffering on windows (#2734)
Diffstat (limited to 'tokio/src')
-rw-r--r-- | tokio/src/io/mod.rs | 2 | ||||
-rw-r--r-- | tokio/src/io/stderr.rs | 5 | ||||
-rw-r--r-- | tokio/src/io/stdio_common.rs | 131 | ||||
-rw-r--r-- | tokio/src/io/stdout.rs | 6 |
4 files changed, 139 insertions, 5 deletions
diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index c4b4d7d3..e81054bf 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -214,6 +214,8 @@ cfg_io_driver! { } cfg_io_std! { + mod stdio_common; + mod stderr; pub use stderr::{stderr, Stderr}; diff --git a/tokio/src/io/stderr.rs b/tokio/src/io/stderr.rs index 2993dd2e..2f624fba 100644 --- a/tokio/src/io/stderr.rs +++ b/tokio/src/io/stderr.rs @@ -1,4 +1,5 @@ use crate::io::blocking::Blocking; +use crate::io::stdio_common::SplitByUtf8BoundaryIfWindows; use crate::io::AsyncWrite; use std::io; @@ -35,7 +36,7 @@ cfg_io_std! { /// ``` #[derive(Debug)] pub struct Stderr { - std: Blocking<std::io::Stderr>, + std: SplitByUtf8BoundaryIfWindows<Blocking<std::io::Stderr>>, } /// Constructs a new handle to the standard error of the current process. @@ -67,7 +68,7 @@ cfg_io_std! { pub fn stderr() -> Stderr { let std = io::stderr(); Stderr { - std: Blocking::new(std), + std: SplitByUtf8BoundaryIfWindows::new(Blocking::new(std)), } } } diff --git a/tokio/src/io/stdio_common.rs b/tokio/src/io/stdio_common.rs new file mode 100644 index 00000000..66094d12 --- /dev/null +++ b/tokio/src/io/stdio_common.rs @@ -0,0 +1,131 @@ +//! Contains utilities for stdout and stderr. +use crate::io::AsyncWrite; +use std::pin::Pin; +use std::task::{Context, Poll}; +/// # Windows +/// AsyncWrite adapter that finds last char boundary in given buffer and does not write the rest. +/// That's why, wrapped writer will always receive well-formed utf-8 bytes. +/// # Other platforms +/// passes data to `inner` as is +#[derive(Debug)] +pub(crate) struct SplitByUtf8BoundaryIfWindows<W> { + inner: W, +} + +impl<W> SplitByUtf8BoundaryIfWindows<W> { + pub(crate) fn new(inner: W) -> Self { + Self { inner } + } +} + +impl<W> crate::io::AsyncWrite for SplitByUtf8BoundaryIfWindows<W> +where + W: AsyncWrite + Unpin, +{ + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<Result<usize, std::io::Error>> { + // following two ifs are enabled only on windows targets, because + // on other targets we do not have problems with incomplete utf8 chars + + // ensure buffer is not longer than MAX_BUF + #[cfg(any(target_os = "windows", test))] + let buf = if buf.len() > crate::io::blocking::MAX_BUF { + &buf[..crate::io::blocking::MAX_BUF] + } else { + buf + }; + // now remove possible trailing incomplete character + #[cfg(any(target_os = "windows", test))] + let buf = match std::str::from_utf8(buf) { + // `buf` is already utf-8, no need to trim it futher + Ok(_) => buf, + Err(err) => { + let bad_bytes = buf.len() - err.valid_up_to(); + // TODO: this is too conservative + const MAX_BYTES_PER_CHAR: usize = 8; + + if bad_bytes <= MAX_BYTES_PER_CHAR && err.valid_up_to() > 0 { + // Input data is probably UTF-8, but last char was split + // after trimming. + // let's exclude this character from the buf + &buf[..err.valid_up_to()] + } else { + // UTF-8 violation could not be caused by trimming. + // Let's pass buffer to underlying writer as is. + // Why do not we return error here? It is possible + // that stdout is not console. Such streams allow + // non-utf8 data. That's why, let's defer to underlying + // writer and let it return error if needed + buf + } + } + }; + // now pass trimmed input buffer to inner writer + Pin::new(&mut self.inner).poll_write(cx, buf) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), std::io::Error>> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), std::io::Error>> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } +} + +#[cfg(test)] +mod tests { + use crate::io::AsyncWriteExt; + use std::io; + use std::pin::Pin; + use std::task::Context; + use std::task::Poll; + + const MAX_BUF: usize = 16 * 1024; + struct MockWriter; + impl crate::io::AsyncWrite for MockWriter { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<Result<usize, io::Error>> { + assert!(buf.len() <= MAX_BUF); + assert!(std::str::from_utf8(buf).is_ok()); + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll<Result<(), io::Error>> { + Poll::Ready(Ok(())) + } + } + #[test] + #[cfg(not(loom))] + fn test_splitter() { + let data = str::repeat("█", MAX_BUF); + let mut wr = super::SplitByUtf8BoundaryIfWindows::new(MockWriter); + let fut = async move { + wr.write_all(data.as_bytes()).await.unwrap(); + }; + crate::runtime::Builder::new() + .basic_scheduler() + .build() + .unwrap() + .block_on(fut); + } +} diff --git a/tokio/src/io/stdout.rs b/tokio/src/io/stdout.rs index 5377993a..a08ed01e 100644 --- a/tokio/src/io/stdout.rs +++ b/tokio/src/io/stdout.rs @@ -1,6 +1,6 @@ use crate::io::blocking::Blocking; +use crate::io::stdio_common::SplitByUtf8BoundaryIfWindows; use crate::io::AsyncWrite; - use std::io; use std::pin::Pin; use std::task::Context; @@ -35,7 +35,7 @@ cfg_io_std! { /// ``` #[derive(Debug)] pub struct Stdout { - std: Blocking<std::io::Stdout>, + std: SplitByUtf8BoundaryIfWindows<Blocking<std::io::Stdout>>, } /// Constructs a new handle to the standard output of the current process. @@ -67,7 +67,7 @@ cfg_io_std! { pub fn stdout() -> Stdout { let std = io::stdout(); Stdout { - std: Blocking::new(std), + std: SplitByUtf8BoundaryIfWindows::new(Blocking::new(std)), } } } |