diff options
author | Aram Drevekenin <aram@poor.dev> | 2022-11-17 19:40:09 +0100 |
---|---|---|
committer | Aram Drevekenin <aram@poor.dev> | 2022-11-17 19:40:09 +0100 |
commit | bca41afe1500df1cde9fad8b9520e4cf79aee672 (patch) | |
tree | 02bb568f0a9a8138af138a0f2a620aa045644b4c | |
parent | 6c152926675fd28d898b70d20d470a752be6eec8 (diff) |
fix(router): handle client buffer overflow
-rw-r--r-- | zellij-server/src/os_input_output.rs | 45 |
1 files changed, 41 insertions, 4 deletions
diff --git a/zellij-server/src/os_input_output.rs b/zellij-server/src/os_input_output.rs index 24a2ace21..f6a0c9b44 100644 --- a/zellij-server/src/os_input_output.rs +++ b/zellij-server/src/os_input_output.rs @@ -13,12 +13,13 @@ use nix::{ use signal_hook::consts::*; use sysinfo::{ProcessExt, ProcessRefreshKind, System, SystemExt}; use zellij_utils::{ + channels, async_std, data::Palette, errors::prelude::*, input::command::{RunCommand, TerminalAction}, interprocess, - ipc::{ClientToServerMsg, IpcReceiverWithContext, IpcSenderWithContext, ServerToClientMsg}, + ipc::{ClientToServerMsg, IpcReceiverWithContext, IpcSenderWithContext, ServerToClientMsg, ExitReason}, libc, nix, shared::default_palette, signal_hook, @@ -323,10 +324,46 @@ fn spawn_terminal( handle_terminal(cmd, failover_cmd, orig_termios, quit_cb, terminal_id) } +// The ClientSender is in charge of sending messages to the client on a special thread +// This is done so that when the unix socket buffer is full, we won't block the entire router +// thread +// When the above happens, the ClientSender buffers messages in hopes that the congestion will be +// freed until we runs out of buffer space. +// If we run out of buffer space, we bubble up an error sot hat the router thread will give up on +// this client and we'll stop sending messages to it. +// If the client ever becomes responsive again, we'll send one final "Buffer full" message so it +// knows what happened. +#[derive(Clone)] +struct ClientSender { + client_id: ClientId, + client_buffer_sender: channels::Sender<ServerToClientMsg>, +} + +impl ClientSender { + pub fn new(client_id: ClientId, mut sender: IpcSenderWithContext<ServerToClientMsg>) -> Self { + let (client_buffer_sender, client_buffer_receiver) = channels::bounded(50); + std::thread::spawn(move || { + let err_context = || format!("failed to send message to client {client_id}"); + for msg in client_buffer_receiver.iter() { + let _ = sender.send(msg).with_context(err_context); + } + let _ = sender.send(ServerToClientMsg::Exit(ExitReason::Error("Buffer full".to_string()))); + }); + ClientSender { + client_id, + client_buffer_sender + } + } + pub fn send_or_buffer(&self, msg: ServerToClientMsg) -> Result<()> { + let err_context = || format!("Client {} send buffer full", self.client_id); + self.client_buffer_sender.try_send(msg).with_context(err_context) + } +} + #[derive(Clone)] pub struct ServerOsInputOutput { orig_termios: Arc<Mutex<termios::Termios>>, - client_senders: Arc<Mutex<HashMap<ClientId, IpcSenderWithContext<ServerToClientMsg>>>>, + client_senders: Arc<Mutex<HashMap<ClientId, ClientSender>>>, terminal_id_to_raw_fd: Arc<Mutex<BTreeMap<u32, Option<RawFd>>>>, // A value of None means the // terminal_id exists but is // not connected to an fd (eg. @@ -589,7 +626,7 @@ impl ServerOsApi for ServerOsInputOutput { .with_context(err_context)? .get_mut(&client_id) { - sender.send(msg).with_context(err_context) + sender.send_or_buffer(msg).with_context(err_context) } else { Ok(()) } @@ -601,7 +638,7 @@ impl ServerOsApi for ServerOsInputOutput { stream: LocalSocketStream, ) -> Result<IpcReceiverWithContext<ClientToServerMsg>> { let receiver = IpcReceiverWithContext::new(stream); - let sender = receiver.get_sender(); + let sender = ClientSender::new(client_id, receiver.get_sender()); self.client_senders .lock() .to_anyhow() |