summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkxt <ktamas@fastmail.fm>2021-05-24 15:00:49 +0200
committerGitHub <noreply@github.com>2021-05-24 15:00:49 +0200
commit2168793dc75380a485faa4d470489d3d8baed0dc (patch)
tree3360d4aae1b08e5576ab51adef7cd58138b92670
parent193331062655c4a0b79cf9433157ec8526c4b29f (diff)
fix(pty): use async io to avoid polling (#523)
This patch fixes #509 by using async read instead of polling a non-blocking fd. This reduces CPU usage when the ptys are idle.
-rw-r--r--Cargo.lock12
-rw-r--r--src/tests/fakes.rs36
-rw-r--r--zellij-server/Cargo.toml1
-rw-r--r--zellij-server/src/os_input_output.rs44
-rw-r--r--zellij-server/src/pty.rs160
5 files changed, 158 insertions, 95 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 48aa2bf2c..0dda83734 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -171,6 +171,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0"
[[package]]
+name = "async-trait"
+version = "0.1.50"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0b98e84bbb4cbcdd97da190ba0c58a1bb0de2c1fdf67d159e192ed766aeca722"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
name = "atomic-waker"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2308,6 +2319,7 @@ name = "zellij-server"
version = "0.12.0"
dependencies = [
"ansi_term 0.12.1",
+ "async-trait",
"daemonize",
"insta",
"serde_json",
diff --git a/src/tests/fakes.rs b/src/tests/fakes.rs
index e9042ed73..a2f338ba6 100644
--- a/src/tests/fakes.rs
+++ b/src/tests/fakes.rs
@@ -10,9 +10,10 @@ use zellij_utils::{nix, zellij_tile};
use crate::tests::possible_tty_inputs::{get_possible_tty_inputs, Bytes};
use crate::tests::utils::commands::{QUIT, SLEEP};
use zellij_client::os_input_output::ClientOsApi;
-use zellij_server::os_input_output::{Pid, ServerOsApi};
+use zellij_server::os_input_output::{async_trait, AsyncReader, Pid, ServerOsApi};
use zellij_tile::data::Palette;
use zellij_utils::{
+ async_std,
channels::{ChannelWithContext, SenderType, SenderWithContext},
errors::ErrorContext,
interprocess::local_socket::LocalSocketStream,
@@ -227,6 +228,33 @@ impl ClientOsApi for FakeInputOutput {
}
}
+struct FakeAsyncReader {
+ fd: RawFd,
+ os_api: Box<dyn ServerOsApi>,
+}
+
+#[async_trait]
+impl AsyncReader for FakeAsyncReader {
+ async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
+ // simulates async semantics: EAGAIN is not propagated to caller
+ loop {
+ let res = self.os_api.read_from_tty_stdout(self.fd, buf);
+ match res {
+ Err(nix::Error::Sys(nix::errno::Errno::EAGAIN)) => {
+ async_std::task::sleep(Duration::from_millis(10)).await;
+ continue;
+ }
+ Err(e) => {
+ break Err(std::io::Error::from_raw_os_error(
+ e.as_errno().unwrap() as i32
+ ))
+ }
+ Ok(n_bytes) => break Ok(n_bytes),
+ }
+ }
+ }
+}
+
impl ServerOsApi for FakeInputOutput {
fn set_terminal_size_using_fd(&self, pid: RawFd, cols: u16, rows: u16) {
let terminal_input = self
@@ -277,6 +305,12 @@ impl ServerOsApi for FakeInputOutput {
None => Err(nix::Error::Sys(nix::errno::Errno::EAGAIN)),
}
}
+ fn async_file_reader(&self, fd: RawFd) -> Box<dyn AsyncReader> {
+ Box::new(FakeAsyncReader {
+ fd,
+ os_api: ServerOsApi::box_clone(self),
+ })
+ }
fn tcdrain(&self, pid: RawFd) -> Result<(), nix::Error> {
self.io_events.lock().unwrap().push(IoEvent::TcDrain(pid));
Ok(())
diff --git a/zellij-server/Cargo.toml b/zellij-server/Cargo.toml
index 532ef9317..388a3e5b9 100644
--- a/zellij-server/Cargo.toml
+++ b/zellij-server/Cargo.toml
@@ -10,6 +10,7 @@ license = "MIT"
[dependencies]
ansi_term = "0.12.1"
+async-trait = "0.1.50"
daemonize = "0.4.1"
serde_json = "1.0"
unicode-width = "0.1.8"
diff --git a/zellij-server/src/os_input_output.rs b/zellij-server/src/os_input_output.rs
index be51878ce..2ac35b69b 100644
--- a/zellij-server/src/os_input_output.rs
+++ b/zellij-server/src/os_input_output.rs
@@ -4,10 +4,11 @@ use std::path::PathBuf;
use std::process::{Child, Command};
use std::sync::{Arc, Mutex};
-use zellij_utils::{interprocess, libc, nix, signal_hook, zellij_tile};
+use zellij_utils::{async_std, interprocess, libc, nix, signal_hook, zellij_tile};
+use async_std::fs::File as AsyncFile;
+use async_std::os::unix::io::FromRawFd;
use interprocess::local_socket::LocalSocketStream;
-use nix::fcntl::{fcntl, FcntlArg, OFlag};
use nix::pty::{forkpty, Winsize};
use nix::sys::signal::{kill, Signal};
use nix::sys::termios;
@@ -21,7 +22,11 @@ use zellij_utils::{
shared::default_palette,
};
+use async_std::io::ReadExt;
+pub use async_trait::async_trait;
+
pub use nix::unistd::Pid;
+
pub(crate) fn set_terminal_size_using_fd(fd: RawFd, columns: u16, rows: u16) {
// TODO: do this with the nix ioctl
use libc::ioctl;
@@ -88,8 +93,6 @@ fn spawn_terminal(file_to_open: Option<PathBuf>, orig_termios: termios::Termios)
let pid_secondary = match fork_pty_res.fork_result {
ForkResult::Parent { child } => {
// fcntl(pid_primary, FcntlArg::F_SETFL(OFlag::empty())).expect("could not fcntl");
- fcntl(pid_primary, FcntlArg::F_SETFL(OFlag::O_NONBLOCK))
- .expect("could not fcntl");
child
}
ForkResult::Child => match file_to_open {
@@ -133,6 +136,34 @@ pub struct ServerOsInputOutput {
send_instructions_to_client: Arc<Mutex<Option<IpcSenderWithContext<ServerToClientMsg>>>>,
}
+// async fn in traits is not supported by rust, so dtolnay's excellent async_trait macro is being
+// used. See https://smallcultfollowing.com/babysteps/blog/2019/10/26/async-fn-in-traits-are-hard/
+#[async_trait]
+pub trait AsyncReader: Send + Sync {
+ async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error>;
+}
+
+/// An `AsyncReader` that wraps a `RawFd`
+struct RawFdAsyncReader {
+ fd: async_std::fs::File,
+}
+
+impl RawFdAsyncReader {
+ fn new(fd: RawFd) -> RawFdAsyncReader {
+ RawFdAsyncReader {
+ /// The supplied `RawFd` is consumed by the created `RawFdAsyncReader`, closing it when dropped
+ fd: unsafe { AsyncFile::from_raw_fd(fd) },
+ }
+ }
+}
+
+#[async_trait]
+impl AsyncReader for RawFdAsyncReader {
+ async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
+ self.fd.read(buf).await
+ }
+}
+
/// The `ServerOsApi` trait represents an abstract interface to the features of an operating system that
/// Zellij server requires.
pub trait ServerOsApi: Send + Sync {
@@ -142,6 +173,8 @@ pub trait ServerOsApi: Send + Sync {
fn spawn_terminal(&self, file_to_open: Option<PathBuf>) -> (RawFd, Pid);
/// Read bytes from the standard output of the virtual terminal referred to by `fd`.
fn read_from_tty_stdout(&self, fd: RawFd, buf: &mut [u8]) -> Result<usize, nix::Error>;
+ /// Creates an `AsyncReader` that can be used to read from `fd` in an async context
+ fn async_file_reader(&self, fd: RawFd) -> Box<dyn AsyncReader>;
/// Write bytes to the standard input of the virtual terminal referred to by `fd`.
fn write_to_tty_stdin(&self, fd: RawFd, buf: &[u8]) -> Result<usize, nix::Error>;
/// Wait until all output written to the object referred to by `fd` has been transmitted.
@@ -172,6 +205,9 @@ impl ServerOsApi for ServerOsInputOutput {
fn read_from_tty_stdout(&self, fd: RawFd, buf: &mut [u8]) -> Result<usize, nix::Error> {
unistd::read(fd, buf)
}
+ fn async_file_reader(&self, fd: RawFd) -> Box<dyn AsyncReader> {
+ Box::new(RawFdAsyncReader::new(fd))
+ }
fn write_to_tty_stdin(&self, fd: RawFd, buf: &[u8]) -> Result<usize, nix::Error> {
unistd::write(fd, buf)
}
diff --git a/zellij-server/src/pty.rs b/zellij-server/src/pty.rs
index ecaef460a..803c04248 100644
--- a/zellij-server/src/pty.rs
+++ b/zellij-server/src/pty.rs
@@ -1,16 +1,14 @@
-use zellij_utils::{async_std, nix};
+use zellij_utils::async_std;
-use async_std::stream::*;
-use async_std::task;
-use async_std::task::*;
+use async_std::future::timeout as async_timeout;
+use async_std::task::{self, JoinHandle};
use std::collections::HashMap;
use std::os::unix::io::RawFd;
use std::path::PathBuf;
-use std::pin::*;
use std::time::{Duration, Instant};
use crate::{
- os_input_output::{Pid, ServerOsApi},
+ os_input_output::{AsyncReader, Pid, ServerOsApi},
panes::PaneId,
screen::ScreenInstruction,
thread_bus::{Bus, ThreadSenders},
@@ -23,52 +21,6 @@ use zellij_utils::{
logging::debug_to_file,
};
-pub struct ReadFromPid {
- pid: RawFd,
- os_input: Box<dyn ServerOsApi>,
-}
-
-impl ReadFromPid {
- pub fn new(pid: &RawFd, os_input: Box<dyn ServerOsApi>) -> ReadFromPid {
- ReadFromPid {
- pid: *pid,
- os_input,
- }
- }
-}
-
-impl Stream for ReadFromPid {
- type Item = Vec<u8>;
- fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let mut read_buffer = [0; 65535];
- let pid = self.pid;
- let read_result = &self.os_input.read_from_tty_stdout(pid, &mut read_buffer);
- match read_result {
- Ok(res) => {
- if *res == 0 {
- // indicates end of file
- Poll::Ready(None)
- } else {
- let res = Some(read_buffer[..*res].to_vec());
- Poll::Ready(res)
- }
- }
- Err(e) => {
- match e {
- nix::Error::Sys(errno) => {
- if *errno == nix::errno::Errno::EAGAIN {
- Poll::Ready(Some(vec![])) // TODO: better with timeout waker somehow
- } else {
- Poll::Ready(None)
- }
- }
- _ => Poll::Ready(None),
- }
- }
- }
- }
-}
-
pub type VteBytes = Vec<u8>;
/// Instructions related to PTYs (pseudoterminals).
@@ -160,6 +112,42 @@ pub(crate) fn pty_thread_main(mut pty: Pty, maybe_layout: Option<Layout>) {
}
}
+enum ReadResult {
+ Ok(usize),
+ Timeout,
+ Err(std::io::Error),
+}
+
+impl From<std::io::Result<usize>> for ReadResult {
+ fn from(e: std::io::Result<usize>) -> ReadResult {
+ match e {
+ Err(e) => ReadResult::Err(e),
+ Ok(n) => ReadResult::Ok(n),
+ }
+ }
+}
+
+async fn deadline_read(
+ reader: &mut dyn AsyncReader,
+ deadline: Option<Instant>,
+ buf: &mut [u8],
+) -> ReadResult {
+ if let Some(deadline) = deadline {
+ let timeout = deadline.checked_duration_since(Instant::now());
+ if let Some(timeout) = timeout {
+ match async_timeout(timeout, reader.read(buf)).await {
+ Ok(res) => res.into(),
+ _ => ReadResult::Timeout,
+ }
+ } else {
+ // deadline has already elapsed
+ ReadResult::Timeout
+ }
+ } else {
+ reader.read(buf).await.into()
+ }
+}
+
fn stream_terminal_bytes(
pid: RawFd,
senders: ThreadSenders,
@@ -170,50 +158,42 @@ fn stream_terminal_bytes(
task::spawn({
async move {
err_ctx.add_call(ContextType::AsyncTask);
- let mut terminal_bytes = ReadFromPid::new(&pid, os_input);
- let mut last_byte_receive_time: Option<Instant> = None;
- let mut pending_render = false;
- let max_render_pause = Duration::from_millis(30);
+ // After a successful read, we keep on reading additional data up to a duration of
+ // `render_pause`. This is in order to batch up PtyBytes before rendering them.
+ // Once `render_deadline` has elapsed, we send Render.
+ let render_pause = Duration::from_millis(30);
+ let mut render_deadline = None;
- while let Some(bytes) = terminal_bytes.next().await {
- let bytes_is_empty = bytes.is_empty();
- if debug {
- debug_to_file(&bytes, pid).unwrap();
- }
- if !bytes_is_empty {
- let _ = senders.send_to_screen(ScreenInstruction::PtyBytes(pid, bytes));
- // for UX reasons, if we got something on the wire, we only send the render notice if:
- // 1. there aren't any more bytes on the wire afterwards
- // 2. a certain period (currently 30ms) has elapsed since the last render
- // (otherwise if we get a large amount of data, the display would hang
- // until it's done)
- // 3. the stream has ended, and so we render 1 last time
- match last_byte_receive_time.as_mut() {
- Some(receive_time) => {
- if receive_time.elapsed() > max_render_pause {
- pending_render = false;
- let _ = senders.send_to_screen(ScreenInstruction::Render);
- last_byte_receive_time = Some(Instant::now());
- } else {
- pending_render = true;
- }
- }
- None => {
- last_byte_receive_time = Some(Instant::now());
- pending_render = true;
- }
- };
- } else {
- if pending_render {
- pending_render = false;
+ let mut buf = [0u8; 65536];
+ let mut async_reader = os_input.async_file_reader(pid);
+ loop {
+ match deadline_read(async_reader.as_mut(), render_deadline, &mut buf).await {
+ ReadResult::Ok(0) | ReadResult::Err(_) => break, // EOF or error
+ ReadResult::Timeout => {
let _ = senders.send_to_screen(ScreenInstruction::Render);
+ // next read does not need a deadline as we just rendered everything
+ render_deadline = None;
+
+ // yield so Screen thread has some time to render before send additional
+ // PtyBytes.
+ task::sleep(Duration::from_millis(10)).await;
+ }
+ ReadResult::Ok(n_bytes) => {
+ let bytes = &buf[..n_bytes];
+ if debug {
+ let _ = debug_to_file(bytes, pid);
+ }
+ let _ = senders
+ .send_to_screen(ScreenInstruction::PtyBytes(pid, bytes.to_vec()));
+ // if we already have a render_deadline we keep it, otherwise we set it
+ // to the duration of `render_pause`.
+ render_deadline.get_or_insert(Instant::now() + render_pause);
}
- last_byte_receive_time = None;
- task::sleep(::std::time::Duration::from_millis(10)).await;
}
}
- senders.send_to_screen(ScreenInstruction::Render).unwrap();
+ let _ = senders.send_to_screen(ScreenInstruction::Render);
+
#[cfg(not(any(feature = "test", test)))]
// this is a little hacky, and is because the tests end the file as soon as
// we read everything, rather than hanging until there is new data