summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAram Drevekenin <aram@poor.dev>2022-11-17 19:40:09 +0100
committerAram Drevekenin <aram@poor.dev>2022-11-17 19:40:09 +0100
commitbca41afe1500df1cde9fad8b9520e4cf79aee672 (patch)
tree02bb568f0a9a8138af138a0f2a620aa045644b4c
parent6c152926675fd28d898b70d20d470a752be6eec8 (diff)
fix(router): handle client buffer overflow
-rw-r--r--zellij-server/src/os_input_output.rs45
1 files changed, 41 insertions, 4 deletions
diff --git a/zellij-server/src/os_input_output.rs b/zellij-server/src/os_input_output.rs
index 24a2ace21..f6a0c9b44 100644
--- a/zellij-server/src/os_input_output.rs
+++ b/zellij-server/src/os_input_output.rs
@@ -13,12 +13,13 @@ use nix::{
use signal_hook::consts::*;
use sysinfo::{ProcessExt, ProcessRefreshKind, System, SystemExt};
use zellij_utils::{
+ channels,
async_std,
data::Palette,
errors::prelude::*,
input::command::{RunCommand, TerminalAction},
interprocess,
- ipc::{ClientToServerMsg, IpcReceiverWithContext, IpcSenderWithContext, ServerToClientMsg},
+ ipc::{ClientToServerMsg, IpcReceiverWithContext, IpcSenderWithContext, ServerToClientMsg, ExitReason},
libc, nix,
shared::default_palette,
signal_hook,
@@ -323,10 +324,46 @@ fn spawn_terminal(
handle_terminal(cmd, failover_cmd, orig_termios, quit_cb, terminal_id)
}
+// The ClientSender is in charge of sending messages to the client on a special thread
+// This is done so that when the unix socket buffer is full, we won't block the entire router
+// thread
+// When the above happens, the ClientSender buffers messages in hopes that the congestion will be
+// freed until we runs out of buffer space.
+// If we run out of buffer space, we bubble up an error sot hat the router thread will give up on
+// this client and we'll stop sending messages to it.
+// If the client ever becomes responsive again, we'll send one final "Buffer full" message so it
+// knows what happened.
+#[derive(Clone)]
+struct ClientSender {
+ client_id: ClientId,
+ client_buffer_sender: channels::Sender<ServerToClientMsg>,
+}
+
+impl ClientSender {
+ pub fn new(client_id: ClientId, mut sender: IpcSenderWithContext<ServerToClientMsg>) -> Self {
+ let (client_buffer_sender, client_buffer_receiver) = channels::bounded(50);
+ std::thread::spawn(move || {
+ let err_context = || format!("failed to send message to client {client_id}");
+ for msg in client_buffer_receiver.iter() {
+ let _ = sender.send(msg).with_context(err_context);
+ }
+ let _ = sender.send(ServerToClientMsg::Exit(ExitReason::Error("Buffer full".to_string())));
+ });
+ ClientSender {
+ client_id,
+ client_buffer_sender
+ }
+ }
+ pub fn send_or_buffer(&self, msg: ServerToClientMsg) -> Result<()> {
+ let err_context = || format!("Client {} send buffer full", self.client_id);
+ self.client_buffer_sender.try_send(msg).with_context(err_context)
+ }
+}
+
#[derive(Clone)]
pub struct ServerOsInputOutput {
orig_termios: Arc<Mutex<termios::Termios>>,
- client_senders: Arc<Mutex<HashMap<ClientId, IpcSenderWithContext<ServerToClientMsg>>>>,
+ client_senders: Arc<Mutex<HashMap<ClientId, ClientSender>>>,
terminal_id_to_raw_fd: Arc<Mutex<BTreeMap<u32, Option<RawFd>>>>, // A value of None means the
// terminal_id exists but is
// not connected to an fd (eg.
@@ -589,7 +626,7 @@ impl ServerOsApi for ServerOsInputOutput {
.with_context(err_context)?
.get_mut(&client_id)
{
- sender.send(msg).with_context(err_context)
+ sender.send_or_buffer(msg).with_context(err_context)
} else {
Ok(())
}
@@ -601,7 +638,7 @@ impl ServerOsApi for ServerOsInputOutput {
stream: LocalSocketStream,
) -> Result<IpcReceiverWithContext<ClientToServerMsg>> {
let receiver = IpcReceiverWithContext::new(stream);
- let sender = receiver.get_sender();
+ let sender = ClientSender::new(client_id, receiver.get_sender());
self.client_senders
.lock()
.to_anyhow()