summaryrefslogtreecommitdiffstats
path: root/tokio/src
diff options
context:
space:
mode:
authorMikail Bagishov <bagishov.mikail@yandex.ru>2020-09-23 09:16:05 +0300
committerGitHub <noreply@github.com>2020-09-23 08:16:05 +0200
commit555b74c7cdc3a288b0ae16af6e6358f3e3922ca6 (patch)
treeb63ed8382760b2b95ed583460d1a4e547f0d083b /tokio/src
parent7ae5b7bd4f93612f91ab504ffb63aa8241c1d7bb (diff)
io: fix stdout and stderr buffering on windows (#2734)
Diffstat (limited to 'tokio/src')
-rw-r--r--tokio/src/io/mod.rs2
-rw-r--r--tokio/src/io/stderr.rs5
-rw-r--r--tokio/src/io/stdio_common.rs131
-rw-r--r--tokio/src/io/stdout.rs6
4 files changed, 139 insertions, 5 deletions
diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs
index c4b4d7d3..e81054bf 100644
--- a/tokio/src/io/mod.rs
+++ b/tokio/src/io/mod.rs
@@ -214,6 +214,8 @@ cfg_io_driver! {
}
cfg_io_std! {
+ mod stdio_common;
+
mod stderr;
pub use stderr::{stderr, Stderr};
diff --git a/tokio/src/io/stderr.rs b/tokio/src/io/stderr.rs
index 2993dd2e..2f624fba 100644
--- a/tokio/src/io/stderr.rs
+++ b/tokio/src/io/stderr.rs
@@ -1,4 +1,5 @@
use crate::io::blocking::Blocking;
+use crate::io::stdio_common::SplitByUtf8BoundaryIfWindows;
use crate::io::AsyncWrite;
use std::io;
@@ -35,7 +36,7 @@ cfg_io_std! {
/// ```
#[derive(Debug)]
pub struct Stderr {
- std: Blocking<std::io::Stderr>,
+ std: SplitByUtf8BoundaryIfWindows<Blocking<std::io::Stderr>>,
}
/// Constructs a new handle to the standard error of the current process.
@@ -67,7 +68,7 @@ cfg_io_std! {
pub fn stderr() -> Stderr {
let std = io::stderr();
Stderr {
- std: Blocking::new(std),
+ std: SplitByUtf8BoundaryIfWindows::new(Blocking::new(std)),
}
}
}
diff --git a/tokio/src/io/stdio_common.rs b/tokio/src/io/stdio_common.rs
new file mode 100644
index 00000000..66094d12
--- /dev/null
+++ b/tokio/src/io/stdio_common.rs
@@ -0,0 +1,131 @@
+//! Contains utilities for stdout and stderr.
+use crate::io::AsyncWrite;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+/// # Windows
+/// AsyncWrite adapter that finds last char boundary in given buffer and does not write the rest.
+/// That's why, wrapped writer will always receive well-formed utf-8 bytes.
+/// # Other platforms
+/// passes data to `inner` as is
+#[derive(Debug)]
+pub(crate) struct SplitByUtf8BoundaryIfWindows<W> {
+ inner: W,
+}
+
+impl<W> SplitByUtf8BoundaryIfWindows<W> {
+ pub(crate) fn new(inner: W) -> Self {
+ Self { inner }
+ }
+}
+
+impl<W> crate::io::AsyncWrite for SplitByUtf8BoundaryIfWindows<W>
+where
+ W: AsyncWrite + Unpin,
+{
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<Result<usize, std::io::Error>> {
+ // following two ifs are enabled only on windows targets, because
+ // on other targets we do not have problems with incomplete utf8 chars
+
+ // ensure buffer is not longer than MAX_BUF
+ #[cfg(any(target_os = "windows", test))]
+ let buf = if buf.len() > crate::io::blocking::MAX_BUF {
+ &buf[..crate::io::blocking::MAX_BUF]
+ } else {
+ buf
+ };
+ // now remove possible trailing incomplete character
+ #[cfg(any(target_os = "windows", test))]
+ let buf = match std::str::from_utf8(buf) {
+ // `buf` is already utf-8, no need to trim it futher
+ Ok(_) => buf,
+ Err(err) => {
+ let bad_bytes = buf.len() - err.valid_up_to();
+ // TODO: this is too conservative
+ const MAX_BYTES_PER_CHAR: usize = 8;
+
+ if bad_bytes <= MAX_BYTES_PER_CHAR && err.valid_up_to() > 0 {
+ // Input data is probably UTF-8, but last char was split
+ // after trimming.
+ // let's exclude this character from the buf
+ &buf[..err.valid_up_to()]
+ } else {
+ // UTF-8 violation could not be caused by trimming.
+ // Let's pass buffer to underlying writer as is.
+ // Why do not we return error here? It is possible
+ // that stdout is not console. Such streams allow
+ // non-utf8 data. That's why, let's defer to underlying
+ // writer and let it return error if needed
+ buf
+ }
+ }
+ };
+ // now pass trimmed input buffer to inner writer
+ Pin::new(&mut self.inner).poll_write(cx, buf)
+ }
+
+ fn poll_flush(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ Pin::new(&mut self.inner).poll_flush(cx)
+ }
+
+ fn poll_shutdown(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ Pin::new(&mut self.inner).poll_shutdown(cx)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::io::AsyncWriteExt;
+ use std::io;
+ use std::pin::Pin;
+ use std::task::Context;
+ use std::task::Poll;
+
+ const MAX_BUF: usize = 16 * 1024;
+ struct MockWriter;
+ impl crate::io::AsyncWrite for MockWriter {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<Result<usize, io::Error>> {
+ assert!(buf.len() <= MAX_BUF);
+ assert!(std::str::from_utf8(buf).is_ok());
+ Poll::Ready(Ok(buf.len()))
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Result<(), io::Error>> {
+ Poll::Ready(Ok(()))
+ }
+ }
+ #[test]
+ #[cfg(not(loom))]
+ fn test_splitter() {
+ let data = str::repeat("█", MAX_BUF);
+ let mut wr = super::SplitByUtf8BoundaryIfWindows::new(MockWriter);
+ let fut = async move {
+ wr.write_all(data.as_bytes()).await.unwrap();
+ };
+ crate::runtime::Builder::new()
+ .basic_scheduler()
+ .build()
+ .unwrap()
+ .block_on(fut);
+ }
+}
diff --git a/tokio/src/io/stdout.rs b/tokio/src/io/stdout.rs
index 5377993a..a08ed01e 100644
--- a/tokio/src/io/stdout.rs
+++ b/tokio/src/io/stdout.rs
@@ -1,6 +1,6 @@
use crate::io::blocking::Blocking;
+use crate::io::stdio_common::SplitByUtf8BoundaryIfWindows;
use crate::io::AsyncWrite;
-
use std::io;
use std::pin::Pin;
use std::task::Context;
@@ -35,7 +35,7 @@ cfg_io_std! {
/// ```
#[derive(Debug)]
pub struct Stdout {
- std: Blocking<std::io::Stdout>,
+ std: SplitByUtf8BoundaryIfWindows<Blocking<std::io::Stdout>>,
}
/// Constructs a new handle to the standard output of the current process.
@@ -67,7 +67,7 @@ cfg_io_std! {
pub fn stdout() -> Stdout {
let std = io::stdout();
Stdout {
- std: Blocking::new(std),
+ std: SplitByUtf8BoundaryIfWindows::new(Blocking::new(std)),
}
}
}