summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAram Drevekenin <aram@poor.dev>2022-11-18 10:21:59 +0100
committerGitHub <noreply@github.com>2022-11-18 10:21:59 +0100
commit2afb355e4856b1819af0c54e57d6476656630eca (patch)
treea4c8ec006f4222bd0dcc9caaf61b9e3856836724
parent84a931ad82d9e259e7a17240be0d5a8e3136852a (diff)
fix(router): handle client buffer overflow (#1955)
* fix(router): handle client buffer overflow * style(fmt): rustfmt
-rw-r--r--zellij-server/src/os_input_output.rs53
1 files changed, 48 insertions, 5 deletions
diff --git a/zellij-server/src/os_input_output.rs b/zellij-server/src/os_input_output.rs
index 24a2ace21..07bd7c0f4 100644
--- a/zellij-server/src/os_input_output.rs
+++ b/zellij-server/src/os_input_output.rs
@@ -13,12 +13,15 @@ use nix::{
use signal_hook::consts::*;
use sysinfo::{ProcessExt, ProcessRefreshKind, System, SystemExt};
use zellij_utils::{
- async_std,
+ async_std, channels,
data::Palette,
errors::prelude::*,
input::command::{RunCommand, TerminalAction},
interprocess,
- ipc::{ClientToServerMsg, IpcReceiverWithContext, IpcSenderWithContext, ServerToClientMsg},
+ ipc::{
+ ClientToServerMsg, ExitReason, IpcReceiverWithContext, IpcSenderWithContext,
+ ServerToClientMsg,
+ },
libc, nix,
shared::default_palette,
signal_hook,
@@ -323,10 +326,50 @@ fn spawn_terminal(
handle_terminal(cmd, failover_cmd, orig_termios, quit_cb, terminal_id)
}
+// The ClientSender is in charge of sending messages to the client on a special thread
+// This is done so that when the unix socket buffer is full, we won't block the entire router
+// thread
+// When the above happens, the ClientSender buffers messages in hopes that the congestion will be
+// freed until we runs out of buffer space.
+// If we run out of buffer space, we bubble up an error sot hat the router thread will give up on
+// this client and we'll stop sending messages to it.
+// If the client ever becomes responsive again, we'll send one final "Buffer full" message so it
+// knows what happened.
+#[derive(Clone)]
+struct ClientSender {
+ client_id: ClientId,
+ client_buffer_sender: channels::Sender<ServerToClientMsg>,
+}
+
+impl ClientSender {
+ pub fn new(client_id: ClientId, mut sender: IpcSenderWithContext<ServerToClientMsg>) -> Self {
+ let (client_buffer_sender, client_buffer_receiver) = channels::bounded(50);
+ std::thread::spawn(move || {
+ let err_context = || format!("failed to send message to client {client_id}");
+ for msg in client_buffer_receiver.iter() {
+ let _ = sender.send(msg).with_context(err_context);
+ }
+ let _ = sender.send(ServerToClientMsg::Exit(ExitReason::Error(
+ "Buffer full".to_string(),
+ )));
+ });
+ ClientSender {
+ client_id,
+ client_buffer_sender,
+ }
+ }
+ pub fn send_or_buffer(&self, msg: ServerToClientMsg) -> Result<()> {
+ let err_context = || format!("Client {} send buffer full", self.client_id);
+ self.client_buffer_sender
+ .try_send(msg)
+ .with_context(err_context)
+ }
+}
+
#[derive(Clone)]
pub struct ServerOsInputOutput {
orig_termios: Arc<Mutex<termios::Termios>>,
- client_senders: Arc<Mutex<HashMap<ClientId, IpcSenderWithContext<ServerToClientMsg>>>>,
+ client_senders: Arc<Mutex<HashMap<ClientId, ClientSender>>>,
terminal_id_to_raw_fd: Arc<Mutex<BTreeMap<u32, Option<RawFd>>>>, // A value of None means the
// terminal_id exists but is
// not connected to an fd (eg.
@@ -589,7 +632,7 @@ impl ServerOsApi for ServerOsInputOutput {
.with_context(err_context)?
.get_mut(&client_id)
{
- sender.send(msg).with_context(err_context)
+ sender.send_or_buffer(msg).with_context(err_context)
} else {
Ok(())
}
@@ -601,7 +644,7 @@ impl ServerOsApi for ServerOsInputOutput {
stream: LocalSocketStream,
) -> Result<IpcReceiverWithContext<ClientToServerMsg>> {
let receiver = IpcReceiverWithContext::new(stream);
- let sender = receiver.get_sender();
+ let sender = ClientSender::new(client_id, receiver.get_sender());
self.client_senders
.lock()
.to_anyhow()