diff options
author | kxt <ktamas@fastmail.fm> | 2021-05-24 15:00:49 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-24 15:00:49 +0200 |
commit | 2168793dc75380a485faa4d470489d3d8baed0dc (patch) | |
tree | 3360d4aae1b08e5576ab51adef7cd58138b92670 | |
parent | 193331062655c4a0b79cf9433157ec8526c4b29f (diff) |
fix(pty): use async io to avoid polling (#523)
This patch fixes #509 by using async read instead of polling a
non-blocking fd. This reduces CPU usage when the ptys are idle.
-rw-r--r-- | Cargo.lock | 12 | ||||
-rw-r--r-- | src/tests/fakes.rs | 36 | ||||
-rw-r--r-- | zellij-server/Cargo.toml | 1 | ||||
-rw-r--r-- | zellij-server/src/os_input_output.rs | 44 | ||||
-rw-r--r-- | zellij-server/src/pty.rs | 160 |
5 files changed, 158 insertions, 95 deletions
diff --git a/Cargo.lock b/Cargo.lock index 48aa2bf2c..0dda83734 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -171,6 +171,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" [[package]] +name = "async-trait" +version = "0.1.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b98e84bbb4cbcdd97da190ba0c58a1bb0de2c1fdf67d159e192ed766aeca722" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] name = "atomic-waker" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2308,6 +2319,7 @@ name = "zellij-server" version = "0.12.0" dependencies = [ "ansi_term 0.12.1", + "async-trait", "daemonize", "insta", "serde_json", diff --git a/src/tests/fakes.rs b/src/tests/fakes.rs index e9042ed73..a2f338ba6 100644 --- a/src/tests/fakes.rs +++ b/src/tests/fakes.rs @@ -10,9 +10,10 @@ use zellij_utils::{nix, zellij_tile}; use crate::tests::possible_tty_inputs::{get_possible_tty_inputs, Bytes}; use crate::tests::utils::commands::{QUIT, SLEEP}; use zellij_client::os_input_output::ClientOsApi; -use zellij_server::os_input_output::{Pid, ServerOsApi}; +use zellij_server::os_input_output::{async_trait, AsyncReader, Pid, ServerOsApi}; use zellij_tile::data::Palette; use zellij_utils::{ + async_std, channels::{ChannelWithContext, SenderType, SenderWithContext}, errors::ErrorContext, interprocess::local_socket::LocalSocketStream, @@ -227,6 +228,33 @@ impl ClientOsApi for FakeInputOutput { } } +struct FakeAsyncReader { + fd: RawFd, + os_api: Box<dyn ServerOsApi>, +} + +#[async_trait] +impl AsyncReader for FakeAsyncReader { + async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { + // simulates async semantics: EAGAIN is not propagated to caller + loop { + let res = self.os_api.read_from_tty_stdout(self.fd, buf); + match res { + Err(nix::Error::Sys(nix::errno::Errno::EAGAIN)) => { + async_std::task::sleep(Duration::from_millis(10)).await; + continue; + } + Err(e) => { + break Err(std::io::Error::from_raw_os_error( + e.as_errno().unwrap() as i32 + )) + } + Ok(n_bytes) => break Ok(n_bytes), + } + } + } +} + impl ServerOsApi for FakeInputOutput { fn set_terminal_size_using_fd(&self, pid: RawFd, cols: u16, rows: u16) { let terminal_input = self @@ -277,6 +305,12 @@ impl ServerOsApi for FakeInputOutput { None => Err(nix::Error::Sys(nix::errno::Errno::EAGAIN)), } } + fn async_file_reader(&self, fd: RawFd) -> Box<dyn AsyncReader> { + Box::new(FakeAsyncReader { + fd, + os_api: ServerOsApi::box_clone(self), + }) + } fn tcdrain(&self, pid: RawFd) -> Result<(), nix::Error> { self.io_events.lock().unwrap().push(IoEvent::TcDrain(pid)); Ok(()) diff --git a/zellij-server/Cargo.toml b/zellij-server/Cargo.toml index 532ef9317..388a3e5b9 100644 --- a/zellij-server/Cargo.toml +++ b/zellij-server/Cargo.toml @@ -10,6 +10,7 @@ license = "MIT" [dependencies] ansi_term = "0.12.1" +async-trait = "0.1.50" daemonize = "0.4.1" serde_json = "1.0" unicode-width = "0.1.8" diff --git a/zellij-server/src/os_input_output.rs b/zellij-server/src/os_input_output.rs index be51878ce..2ac35b69b 100644 --- a/zellij-server/src/os_input_output.rs +++ b/zellij-server/src/os_input_output.rs @@ -4,10 +4,11 @@ use std::path::PathBuf; use std::process::{Child, Command}; use std::sync::{Arc, Mutex}; -use zellij_utils::{interprocess, libc, nix, signal_hook, zellij_tile}; +use zellij_utils::{async_std, interprocess, libc, nix, signal_hook, zellij_tile}; +use async_std::fs::File as AsyncFile; +use async_std::os::unix::io::FromRawFd; use interprocess::local_socket::LocalSocketStream; -use nix::fcntl::{fcntl, FcntlArg, OFlag}; use nix::pty::{forkpty, Winsize}; use nix::sys::signal::{kill, Signal}; use nix::sys::termios; @@ -21,7 +22,11 @@ use zellij_utils::{ shared::default_palette, }; +use async_std::io::ReadExt; +pub use async_trait::async_trait; + pub use nix::unistd::Pid; + pub(crate) fn set_terminal_size_using_fd(fd: RawFd, columns: u16, rows: u16) { // TODO: do this with the nix ioctl use libc::ioctl; @@ -88,8 +93,6 @@ fn spawn_terminal(file_to_open: Option<PathBuf>, orig_termios: termios::Termios) let pid_secondary = match fork_pty_res.fork_result { ForkResult::Parent { child } => { // fcntl(pid_primary, FcntlArg::F_SETFL(OFlag::empty())).expect("could not fcntl"); - fcntl(pid_primary, FcntlArg::F_SETFL(OFlag::O_NONBLOCK)) - .expect("could not fcntl"); child } ForkResult::Child => match file_to_open { @@ -133,6 +136,34 @@ pub struct ServerOsInputOutput { send_instructions_to_client: Arc<Mutex<Option<IpcSenderWithContext<ServerToClientMsg>>>>, } +// async fn in traits is not supported by rust, so dtolnay's excellent async_trait macro is being +// used. See https://smallcultfollowing.com/babysteps/blog/2019/10/26/async-fn-in-traits-are-hard/ +#[async_trait] +pub trait AsyncReader: Send + Sync { + async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error>; +} + +/// An `AsyncReader` that wraps a `RawFd` +struct RawFdAsyncReader { + fd: async_std::fs::File, +} + +impl RawFdAsyncReader { + fn new(fd: RawFd) -> RawFdAsyncReader { + RawFdAsyncReader { + /// The supplied `RawFd` is consumed by the created `RawFdAsyncReader`, closing it when dropped + fd: unsafe { AsyncFile::from_raw_fd(fd) }, + } + } +} + +#[async_trait] +impl AsyncReader for RawFdAsyncReader { + async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { + self.fd.read(buf).await + } +} + /// The `ServerOsApi` trait represents an abstract interface to the features of an operating system that /// Zellij server requires. pub trait ServerOsApi: Send + Sync { @@ -142,6 +173,8 @@ pub trait ServerOsApi: Send + Sync { fn spawn_terminal(&self, file_to_open: Option<PathBuf>) -> (RawFd, Pid); /// Read bytes from the standard output of the virtual terminal referred to by `fd`. fn read_from_tty_stdout(&self, fd: RawFd, buf: &mut [u8]) -> Result<usize, nix::Error>; + /// Creates an `AsyncReader` that can be used to read from `fd` in an async context + fn async_file_reader(&self, fd: RawFd) -> Box<dyn AsyncReader>; /// Write bytes to the standard input of the virtual terminal referred to by `fd`. fn write_to_tty_stdin(&self, fd: RawFd, buf: &[u8]) -> Result<usize, nix::Error>; /// Wait until all output written to the object referred to by `fd` has been transmitted. @@ -172,6 +205,9 @@ impl ServerOsApi for ServerOsInputOutput { fn read_from_tty_stdout(&self, fd: RawFd, buf: &mut [u8]) -> Result<usize, nix::Error> { unistd::read(fd, buf) } + fn async_file_reader(&self, fd: RawFd) -> Box<dyn AsyncReader> { + Box::new(RawFdAsyncReader::new(fd)) + } fn write_to_tty_stdin(&self, fd: RawFd, buf: &[u8]) -> Result<usize, nix::Error> { unistd::write(fd, buf) } diff --git a/zellij-server/src/pty.rs b/zellij-server/src/pty.rs index ecaef460a..803c04248 100644 --- a/zellij-server/src/pty.rs +++ b/zellij-server/src/pty.rs @@ -1,16 +1,14 @@ -use zellij_utils::{async_std, nix}; +use zellij_utils::async_std; -use async_std::stream::*; -use async_std::task; -use async_std::task::*; +use async_std::future::timeout as async_timeout; +use async_std::task::{self, JoinHandle}; use std::collections::HashMap; use std::os::unix::io::RawFd; use std::path::PathBuf; -use std::pin::*; use std::time::{Duration, Instant}; use crate::{ - os_input_output::{Pid, ServerOsApi}, + os_input_output::{AsyncReader, Pid, ServerOsApi}, panes::PaneId, screen::ScreenInstruction, thread_bus::{Bus, ThreadSenders}, @@ -23,52 +21,6 @@ use zellij_utils::{ logging::debug_to_file, }; -pub struct ReadFromPid { - pid: RawFd, - os_input: Box<dyn ServerOsApi>, -} - -impl ReadFromPid { - pub fn new(pid: &RawFd, os_input: Box<dyn ServerOsApi>) -> ReadFromPid { - ReadFromPid { - pid: *pid, - os_input, - } - } -} - -impl Stream for ReadFromPid { - type Item = Vec<u8>; - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - let mut read_buffer = [0; 65535]; - let pid = self.pid; - let read_result = &self.os_input.read_from_tty_stdout(pid, &mut read_buffer); - match read_result { - Ok(res) => { - if *res == 0 { - // indicates end of file - Poll::Ready(None) - } else { - let res = Some(read_buffer[..*res].to_vec()); - Poll::Ready(res) - } - } - Err(e) => { - match e { - nix::Error::Sys(errno) => { - if *errno == nix::errno::Errno::EAGAIN { - Poll::Ready(Some(vec![])) // TODO: better with timeout waker somehow - } else { - Poll::Ready(None) - } - } - _ => Poll::Ready(None), - } - } - } - } -} - pub type VteBytes = Vec<u8>; /// Instructions related to PTYs (pseudoterminals). @@ -160,6 +112,42 @@ pub(crate) fn pty_thread_main(mut pty: Pty, maybe_layout: Option<Layout>) { } } +enum ReadResult { + Ok(usize), + Timeout, + Err(std::io::Error), +} + +impl From<std::io::Result<usize>> for ReadResult { + fn from(e: std::io::Result<usize>) -> ReadResult { + match e { + Err(e) => ReadResult::Err(e), + Ok(n) => ReadResult::Ok(n), + } + } +} + +async fn deadline_read( + reader: &mut dyn AsyncReader, + deadline: Option<Instant>, + buf: &mut [u8], +) -> ReadResult { + if let Some(deadline) = deadline { + let timeout = deadline.checked_duration_since(Instant::now()); + if let Some(timeout) = timeout { + match async_timeout(timeout, reader.read(buf)).await { + Ok(res) => res.into(), + _ => ReadResult::Timeout, + } + } else { + // deadline has already elapsed + ReadResult::Timeout + } + } else { + reader.read(buf).await.into() + } +} + fn stream_terminal_bytes( pid: RawFd, senders: ThreadSenders, @@ -170,50 +158,42 @@ fn stream_terminal_bytes( task::spawn({ async move { err_ctx.add_call(ContextType::AsyncTask); - let mut terminal_bytes = ReadFromPid::new(&pid, os_input); - let mut last_byte_receive_time: Option<Instant> = None; - let mut pending_render = false; - let max_render_pause = Duration::from_millis(30); + // After a successful read, we keep on reading additional data up to a duration of + // `render_pause`. This is in order to batch up PtyBytes before rendering them. + // Once `render_deadline` has elapsed, we send Render. + let render_pause = Duration::from_millis(30); + let mut render_deadline = None; - while let Some(bytes) = terminal_bytes.next().await { - let bytes_is_empty = bytes.is_empty(); - if debug { - debug_to_file(&bytes, pid).unwrap(); - } - if !bytes_is_empty { - let _ = senders.send_to_screen(ScreenInstruction::PtyBytes(pid, bytes)); - // for UX reasons, if we got something on the wire, we only send the render notice if: - // 1. there aren't any more bytes on the wire afterwards - // 2. a certain period (currently 30ms) has elapsed since the last render - // (otherwise if we get a large amount of data, the display would hang - // until it's done) - // 3. the stream has ended, and so we render 1 last time - match last_byte_receive_time.as_mut() { - Some(receive_time) => { - if receive_time.elapsed() > max_render_pause { - pending_render = false; - let _ = senders.send_to_screen(ScreenInstruction::Render); - last_byte_receive_time = Some(Instant::now()); - } else { - pending_render = true; - } - } - None => { - last_byte_receive_time = Some(Instant::now()); - pending_render = true; - } - }; - } else { - if pending_render { - pending_render = false; + let mut buf = [0u8; 65536]; + let mut async_reader = os_input.async_file_reader(pid); + loop { + match deadline_read(async_reader.as_mut(), render_deadline, &mut buf).await { + ReadResult::Ok(0) | ReadResult::Err(_) => break, // EOF or error + ReadResult::Timeout => { let _ = senders.send_to_screen(ScreenInstruction::Render); + // next read does not need a deadline as we just rendered everything + render_deadline = None; + + // yield so Screen thread has some time to render before send additional + // PtyBytes. + task::sleep(Duration::from_millis(10)).await; + } + ReadResult::Ok(n_bytes) => { + let bytes = &buf[..n_bytes]; + if debug { + let _ = debug_to_file(bytes, pid); + } + let _ = senders + .send_to_screen(ScreenInstruction::PtyBytes(pid, bytes.to_vec())); + // if we already have a render_deadline we keep it, otherwise we set it + // to the duration of `render_pause`. + render_deadline.get_or_insert(Instant::now() + render_pause); } - last_byte_receive_time = None; - task::sleep(::std::time::Duration::from_millis(10)).await; } } - senders.send_to_screen(ScreenInstruction::Render).unwrap(); + let _ = senders.send_to_screen(ScreenInstruction::Render); + #[cfg(not(any(feature = "test", test)))] // this is a little hacky, and is because the tests end the file as soon as // we read everything, rather than hanging until there is new data |