summaryrefslogtreecommitdiffstats
path: root/zellij-server/src/pty.rs
diff options
context:
space:
mode:
Diffstat (limited to 'zellij-server/src/pty.rs')
-rw-r--r--zellij-server/src/pty.rs160
1 files changed, 70 insertions, 90 deletions
diff --git a/zellij-server/src/pty.rs b/zellij-server/src/pty.rs
index ecaef460a..803c04248 100644
--- a/zellij-server/src/pty.rs
+++ b/zellij-server/src/pty.rs
@@ -1,16 +1,14 @@
-use zellij_utils::{async_std, nix};
+use zellij_utils::async_std;
-use async_std::stream::*;
-use async_std::task;
-use async_std::task::*;
+use async_std::future::timeout as async_timeout;
+use async_std::task::{self, JoinHandle};
use std::collections::HashMap;
use std::os::unix::io::RawFd;
use std::path::PathBuf;
-use std::pin::*;
use std::time::{Duration, Instant};
use crate::{
- os_input_output::{Pid, ServerOsApi},
+ os_input_output::{AsyncReader, Pid, ServerOsApi},
panes::PaneId,
screen::ScreenInstruction,
thread_bus::{Bus, ThreadSenders},
@@ -23,52 +21,6 @@ use zellij_utils::{
logging::debug_to_file,
};
-pub struct ReadFromPid {
- pid: RawFd,
- os_input: Box<dyn ServerOsApi>,
-}
-
-impl ReadFromPid {
- pub fn new(pid: &RawFd, os_input: Box<dyn ServerOsApi>) -> ReadFromPid {
- ReadFromPid {
- pid: *pid,
- os_input,
- }
- }
-}
-
-impl Stream for ReadFromPid {
- type Item = Vec<u8>;
- fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let mut read_buffer = [0; 65535];
- let pid = self.pid;
- let read_result = &self.os_input.read_from_tty_stdout(pid, &mut read_buffer);
- match read_result {
- Ok(res) => {
- if *res == 0 {
- // indicates end of file
- Poll::Ready(None)
- } else {
- let res = Some(read_buffer[..*res].to_vec());
- Poll::Ready(res)
- }
- }
- Err(e) => {
- match e {
- nix::Error::Sys(errno) => {
- if *errno == nix::errno::Errno::EAGAIN {
- Poll::Ready(Some(vec![])) // TODO: better with timeout waker somehow
- } else {
- Poll::Ready(None)
- }
- }
- _ => Poll::Ready(None),
- }
- }
- }
- }
-}
-
pub type VteBytes = Vec<u8>;
/// Instructions related to PTYs (pseudoterminals).
@@ -160,6 +112,42 @@ pub(crate) fn pty_thread_main(mut pty: Pty, maybe_layout: Option<Layout>) {
}
}
+enum ReadResult {
+ Ok(usize),
+ Timeout,
+ Err(std::io::Error),
+}
+
+impl From<std::io::Result<usize>> for ReadResult {
+ fn from(e: std::io::Result<usize>) -> ReadResult {
+ match e {
+ Err(e) => ReadResult::Err(e),
+ Ok(n) => ReadResult::Ok(n),
+ }
+ }
+}
+
+async fn deadline_read(
+ reader: &mut dyn AsyncReader,
+ deadline: Option<Instant>,
+ buf: &mut [u8],
+) -> ReadResult {
+ if let Some(deadline) = deadline {
+ let timeout = deadline.checked_duration_since(Instant::now());
+ if let Some(timeout) = timeout {
+ match async_timeout(timeout, reader.read(buf)).await {
+ Ok(res) => res.into(),
+ _ => ReadResult::Timeout,
+ }
+ } else {
+ // deadline has already elapsed
+ ReadResult::Timeout
+ }
+ } else {
+ reader.read(buf).await.into()
+ }
+}
+
fn stream_terminal_bytes(
pid: RawFd,
senders: ThreadSenders,
@@ -170,50 +158,42 @@ fn stream_terminal_bytes(
task::spawn({
async move {
err_ctx.add_call(ContextType::AsyncTask);
- let mut terminal_bytes = ReadFromPid::new(&pid, os_input);
- let mut last_byte_receive_time: Option<Instant> = None;
- let mut pending_render = false;
- let max_render_pause = Duration::from_millis(30);
+ // After a successful read, we keep on reading additional data up to a duration of
+ // `render_pause`. This is in order to batch up PtyBytes before rendering them.
+ // Once `render_deadline` has elapsed, we send Render.
+ let render_pause = Duration::from_millis(30);
+ let mut render_deadline = None;
- while let Some(bytes) = terminal_bytes.next().await {
- let bytes_is_empty = bytes.is_empty();
- if debug {
- debug_to_file(&bytes, pid).unwrap();
- }
- if !bytes_is_empty {
- let _ = senders.send_to_screen(ScreenInstruction::PtyBytes(pid, bytes));
- // for UX reasons, if we got something on the wire, we only send the render notice if:
- // 1. there aren't any more bytes on the wire afterwards
- // 2. a certain period (currently 30ms) has elapsed since the last render
- // (otherwise if we get a large amount of data, the display would hang
- // until it's done)
- // 3. the stream has ended, and so we render 1 last time
- match last_byte_receive_time.as_mut() {
- Some(receive_time) => {
- if receive_time.elapsed() > max_render_pause {
- pending_render = false;
- let _ = senders.send_to_screen(ScreenInstruction::Render);
- last_byte_receive_time = Some(Instant::now());
- } else {
- pending_render = true;
- }
- }
- None => {
- last_byte_receive_time = Some(Instant::now());
- pending_render = true;
- }
- };
- } else {
- if pending_render {
- pending_render = false;
+ let mut buf = [0u8; 65536];
+ let mut async_reader = os_input.async_file_reader(pid);
+ loop {
+ match deadline_read(async_reader.as_mut(), render_deadline, &mut buf).await {
+ ReadResult::Ok(0) | ReadResult::Err(_) => break, // EOF or error
+ ReadResult::Timeout => {
let _ = senders.send_to_screen(ScreenInstruction::Render);
+ // next read does not need a deadline as we just rendered everything
+ render_deadline = None;
+
+ // yield so Screen thread has some time to render before send additional
+ // PtyBytes.
+ task::sleep(Duration::from_millis(10)).await;
+ }
+ ReadResult::Ok(n_bytes) => {
+ let bytes = &buf[..n_bytes];
+ if debug {
+ let _ = debug_to_file(bytes, pid);
+ }
+ let _ = senders
+ .send_to_screen(ScreenInstruction::PtyBytes(pid, bytes.to_vec()));
+ // if we already have a render_deadline we keep it, otherwise we set it
+ // to the duration of `render_pause`.
+ render_deadline.get_or_insert(Instant::now() + render_pause);
}
- last_byte_receive_time = None;
- task::sleep(::std::time::Duration::from_millis(10)).await;
}
}
- senders.send_to_screen(ScreenInstruction::Render).unwrap();
+ let _ = senders.send_to_screen(ScreenInstruction::Render);
+
#[cfg(not(any(feature = "test", test)))]
// this is a little hacky, and is because the tests end the file as soon as
// we read everything, rather than hanging until there is new data