summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAram Drevekenin <aram@poor.dev>2023-12-28 17:19:22 +0100
committerAram Drevekenin <aram@poor.dev>2023-12-28 17:19:22 +0100
commit124c6759168ad369b6bbfc0926fbcc7bf24c3894 (patch)
treed17651f77da53d70dc89c3d73e1ab8ba4f6f1e14
parente23ba43a312ced1f31ec63813d1118dec289beac (diff)
fix: pipe/client-server communication robustness
-rw-r--r--zellij-client/src/cli_client.rs25
-rw-r--r--zellij-server/src/lib.rs86
-rw-r--r--zellij-server/src/plugins/mod.rs4
-rw-r--r--zellij-server/src/plugins/unit/plugin_tests.rs6
-rw-r--r--zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap32
-rw-r--r--zellij-server/src/plugins/zellij_exports.rs1
-rw-r--r--zellij-server/src/route.rs15
-rw-r--r--zellij-server/src/unit/screen_tests.rs3
-rw-r--r--zellij-utils/src/errors.rs1
9 files changed, 149 insertions, 24 deletions
diff --git a/zellij-client/src/cli_client.rs b/zellij-client/src/cli_client.rs
index 4f0b472f8..423d38ba3 100644
--- a/zellij-client/src/cli_client.rs
+++ b/zellij-client/src/cli_client.rs
@@ -9,7 +9,7 @@ use zellij_utils::{
uuid::Uuid,
errors::prelude::*,
input::actions::Action,
- ipc::{ClientToServerMsg, ServerToClientMsg},
+ ipc::{ClientToServerMsg, ServerToClientMsg, ExitReason},
};
pub fn start_cli_client(mut os_input: Box<dyn ClientOsApi>, session_name: &str, actions: Vec<Action>) {
@@ -107,7 +107,6 @@ fn pipe_client(
},
Some((ServerToClientMsg::CliPipeOutput(pipe_name, output), _)) => {
let err_context = "Failed to write to stdout";
- // log::info!("CLI CLIENT, output to pipe: {:?}, input_pipe_id: {:?}", pipe_name, input_pipe_id);
if pipe_name == input_pipe_id {
let mut stdout = os_input.get_stdout_writer();
stdout
@@ -119,6 +118,17 @@ fn pipe_client(
.non_fatal();
}
},
+ Some((ServerToClientMsg::Exit(exit_reason), _)) => {
+ match exit_reason {
+ ExitReason::Error(e) => {
+ eprintln!("{}", e);
+ process::exit(2);
+ },
+ _ => {
+ process::exit(0);
+ }
+ }
+ }
_ => {},
}
}
@@ -142,6 +152,17 @@ fn single_message_client(os_input: &mut Box<dyn ClientOsApi>, action: Action, pa
log_lines.iter().for_each(|line| eprintln!("{line}"));
process::exit(2);
},
+ Some((ServerToClientMsg::Exit(exit_reason), _)) => {
+ match exit_reason {
+ ExitReason::Error(e) => {
+ eprintln!("{}", e);
+ process::exit(2);
+ },
+ _ => {
+ process::exit(0);
+ }
+ }
+ }
_ => {},
}
}
diff --git a/zellij-server/src/lib.rs b/zellij-server/src/lib.rs
index 161c6aee0..82ad52cb0 100644
--- a/zellij-server/src/lib.rs
+++ b/zellij-server/src/lib.rs
@@ -89,6 +89,7 @@ pub enum ServerInstruction {
SwitchSession(ConnectToSession, ClientId),
UnblockCliPipeInput(String), // String -> Pipe name
CliPipeOutput(String, String), // String -> Pipe name, String -> Output
+ AssociatePipeWithClient { pipe_id: String, client_id: ClientId },
}
impl From<&ServerInstruction> for ServerContext {
@@ -109,6 +110,7 @@ impl From<&ServerInstruction> for ServerContext {
ServerInstruction::SwitchSession(..) => ServerContext::SwitchSession,
ServerInstruction::UnblockCliPipeInput(..) => ServerContext::UnblockCliPipeInput,
ServerInstruction::CliPipeOutput(..) => ServerContext::CliPipeOutput,
+ ServerInstruction::AssociatePipeWithClient{..} => ServerContext::AssociatePipeWithClient,
}
}
}
@@ -191,12 +193,14 @@ macro_rules! send_to_client {
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct SessionState {
clients: HashMap<ClientId, Option<Size>>,
+ pipes: HashMap<String, ClientId>, // String => pipe_id
}
impl SessionState {
pub fn new() -> Self {
SessionState {
clients: HashMap::new(),
+ pipes: HashMap::new(),
}
}
pub fn new_client(&mut self) -> ClientId {
@@ -212,8 +216,12 @@ impl SessionState {
self.clients.insert(next_client_id, None);
next_client_id
}
+ pub fn associate_pipe_with_client(&mut self, pipe_id: String, client_id: ClientId) {
+ self.pipes.insert(pipe_id, client_id);
+ }
pub fn remove_client(&mut self, client_id: ClientId) {
self.clients.remove(&client_id);
+ self.pipes.retain(|p_id, c_id| c_id != &client_id);
}
pub fn set_client_size(&mut self, client_id: ClientId, size: Size) {
self.clients.insert(client_id, Some(size));
@@ -245,6 +253,17 @@ impl SessionState {
pub fn client_ids(&self) -> Vec<ClientId> {
self.clients.keys().copied().collect()
}
+ pub fn active_clients_are_connected(&self) -> bool {
+ let ids_of_pipe_clients: HashSet<ClientId> = self.pipes.values().copied().collect();
+ let mut active_clients_connected = false;
+ for client_id in self.clients.keys() {
+ if ids_of_pipe_clients.contains(client_id) {
+ continue;
+ }
+ active_clients_connected = true;
+ }
+ active_clients_connected
+ }
}
pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
@@ -496,23 +515,49 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
}
},
ServerInstruction::UnblockCliPipeInput(pipe_name) => {
- for client_id in session_state.read().unwrap().clients.keys() {
- send_to_client!(
- *client_id,
- os_input,
- ServerToClientMsg::UnblockCliPipeInput(pipe_name.clone()),
- session_state
- );
+ match session_state.read().unwrap().pipes.get(&pipe_name) {
+ Some(client_id) => {
+ send_to_client!(
+ *client_id,
+ os_input,
+ ServerToClientMsg::UnblockCliPipeInput(pipe_name.clone()),
+ session_state
+ );
+ },
+ None => {
+ // send to all clients, this pipe might not have been associated yet
+ for client_id in session_state.read().unwrap().clients.keys() {
+ send_to_client!(
+ *client_id,
+ os_input,
+ ServerToClientMsg::UnblockCliPipeInput(pipe_name.clone()),
+ session_state
+ );
+ }
+ }
}
},
ServerInstruction::CliPipeOutput(pipe_name, output) => {
- for client_id in session_state.read().unwrap().clients.keys() {
- send_to_client!(
- *client_id,
- os_input,
- ServerToClientMsg::CliPipeOutput(pipe_name.clone(), output.clone()),
- session_state
- );
+ match session_state.read().unwrap().pipes.get(&pipe_name) {
+ Some(client_id) => {
+ send_to_client!(
+ *client_id,
+ os_input,
+ ServerToClientMsg::CliPipeOutput(pipe_name.clone(), output.clone()),
+ session_state
+ );
+ },
+ None => {
+ // send to all clients, this pipe might not have been associated yet
+ for client_id in session_state.read().unwrap().clients.keys() {
+ send_to_client!(
+ *client_id,
+ os_input,
+ ServerToClientMsg::CliPipeOutput(pipe_name.clone(), output.clone()),
+ session_state
+ );
+ }
+ }
}
},
ServerInstruction::ClientExit(client_id) => {
@@ -545,8 +590,13 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
.senders
.send_to_plugin(PluginInstruction::RemoveClient(client_id))
.unwrap();
- if session_state.read().unwrap().clients.is_empty() {
+ if !session_state.read().unwrap().active_clients_are_connected() {
*session_data.write().unwrap() = None;
+ let client_ids_to_cleanup: Vec<ClientId> = session_state.read().unwrap().clients.keys().copied().collect();
+ // these are just the pipes
+ for client_id in client_ids_to_cleanup {
+ remove_client!(client_id, os_input, session_state);
+ }
break;
}
},
@@ -726,6 +776,12 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
);
remove_client!(client_id, os_input, session_state);
},
+ ServerInstruction::AssociatePipeWithClient{ pipe_id, client_id } => {
+ session_state
+ .write()
+ .unwrap()
+ .associate_pipe_with_client(pipe_id, client_id);
+ }
}
}
diff --git a/zellij-server/src/plugins/mod.rs b/zellij-server/src/plugins/mod.rs
index f54a31b10..afc566fd6 100644
--- a/zellij-server/src/plugins/mod.rs
+++ b/zellij-server/src/plugins/mod.rs
@@ -461,12 +461,14 @@ pub(crate) fn plugin_thread_main(
// the message name - DONE
// - TODO: remove the launch_new from everything except the cli place thing - DONE
// - TODO: consider re-adding the skip_cache flag - DONE
- // - TODO: only send messages (unblockclipipeinput, clipipeoutput) to the relevant client and not all of them
+ // - TODO: only send messages (unblockclipipeinput, clipipeoutput) to the relevant client and not all of them - DONE
// - TODO: look into leaking messages (simultaneously piping to 2 instances of the
// plugin with --launch-new)
// * bring all the custo moverride stuff form the plugin messages for when
// launching a new plugin with a message (like we did through the cli)
// * add permissions
+ // * work on product side... do we need all parameters? does enforcing name make
+ // sense? now that we separated name and id? rethink (some of) the interface?
// * work on cli error messages, must be clearer
// TODO:
diff --git a/zellij-server/src/plugins/unit/plugin_tests.rs b/zellij-server/src/plugins/unit/plugin_tests.rs
index c38b7c268..e07e1ab73 100644
--- a/zellij-server/src/plugins/unit/plugin_tests.rs
+++ b/zellij-server/src/plugins/unit/plugin_tests.rs
@@ -5672,7 +5672,7 @@ pub fn unblock_input_plugin_command() {
let _ = plugin_thread_sender.send(PluginInstruction::Update(vec![(
None,
Some(client_id),
- Event::CliMessage { name: "message_name".to_owned(), payload: Some("message_payload".to_owned()), args: None}
+ Event::CliMessage { input_pipe_id: "input_pipe_id".to_owned(), name: "message_name".to_owned(), payload: Some("message_payload".to_owned()), args: None}
)]));
screen_thread.join().unwrap(); // this might take a while if the cache is cold
teardown();
@@ -5750,7 +5750,7 @@ pub fn pipe_output_plugin_command() {
let _ = plugin_thread_sender.send(PluginInstruction::Update(vec![(
None,
Some(client_id),
- Event::CliMessage { name: "pipe_output".to_owned(), payload: Some("message_payload".to_owned()), args: None}
+ Event::CliMessage { input_pipe_id: "input_pipe_id".to_owned(), name: "pipe_output".to_owned(), payload: Some("message_payload".to_owned()), args: None}
)]));
std::thread::sleep(std::time::Duration::from_millis(500));
teardown();
@@ -5825,7 +5825,7 @@ pub fn send_message_to_plugin_plugin_command() {
Some(client_id),
// this will trigger the fixture plugin to send a message to all plugins and then receive
// it itself
- Event::CliMessage { name: "send_message_to_plugin".to_owned(), payload: Some("payload_sent_to_self".to_owned()), args: None}
+ Event::CliMessage { input_pipe_id: "input_pipe_id".to_owned(), name: "send_message_to_plugin".to_owned(), payload: Some("payload_sent_to_self".to_owned()), args: None}
)]));
std::thread::sleep(std::time::Duration::from_millis(500));
teardown();
diff --git a/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap
index 0c8739ce1..a61803c61 100644
--- a/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap
+++ b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap
@@ -62,6 +62,38 @@ Some(
32,
123,
32,
+ 105,
+ 110,
+ 112,
+ 117,
+ 116,
+ 95,
+ 112,
+ 105,
+ 112,
+ 101,
+ 95,
+ 105,
+ 100,
+ 58,
+ 32,
+ 34,
+ 105,
+ 110,
+ 112,
+ 117,
+ 116,
+ 95,
+ 112,
+ 105,
+ 112,
+ 101,
+ 95,
+ 105,
+ 100,
+ 34,
+ 44,
+ 32,
110,
97,
109,
diff --git a/zellij-server/src/plugins/zellij_exports.rs b/zellij-server/src/plugins/zellij_exports.rs
index a0b60a6df..d08f3e821 100644
--- a/zellij-server/src/plugins/zellij_exports.rs
+++ b/zellij-server/src/plugins/zellij_exports.rs
@@ -58,6 +58,7 @@ macro_rules! apply_action {
$env.plugin_env.client_attributes.clone(),
$env.plugin_env.default_shell.clone(),
$env.plugin_env.default_layout.clone(),
+ None,
) {
log::error!("{}: {:?}", $error_message(), e);
}
diff --git a/zellij-server/src/route.rs b/zellij-server/src/route.rs
index 01b362b83..375adb464 100644
--- a/zellij-server/src/route.rs
+++ b/zellij-server/src/route.rs
@@ -1,4 +1,4 @@
-use std::collections::VecDeque;
+use std::collections::{VecDeque, HashSet};
use std::sync::{Arc, RwLock};
use crate::thread_bus::ThreadSenders;
@@ -36,6 +36,7 @@ pub(crate) fn route_action(
client_attributes: ClientAttributes,
default_shell: Option<TerminalAction>,
default_layout: Box<Layout>,
+ mut seen_cli_pipes: Option<&mut HashSet<String>>,
) -> Result<bool> {
let mut should_break = false;
let err_context = || format!("failed to route action for client {client_id}");
@@ -804,6 +805,14 @@ pub(crate) fn route_action(
.with_context(err_context)?;
},
Action::CliMessage{ input_pipe_id, mut name, payload, plugin, args, configuration, floating, in_place, launch_new, skip_cache, cwd, pane_title } => {
+ if let Some(seen_cli_pipes) = seen_cli_pipes.as_mut() {
+ if !seen_cli_pipes.contains(&input_pipe_id) {
+ seen_cli_pipes.insert(input_pipe_id.clone());
+ senders
+ .send_to_server(ServerInstruction::AssociatePipeWithClient{ pipe_id: input_pipe_id.clone(), client_id })
+ .with_context(err_context)?;
+ }
+ }
if let Some(name) = name.take() {
let should_open_in_place = in_place.unwrap_or(false);
if should_open_in_place && pane_id.is_none() {
@@ -847,12 +856,13 @@ pub(crate) fn route_thread_main(
) -> Result<()> {
let mut retry_queue = VecDeque::new();
let err_context = || format!("failed to handle instruction for client {client_id}");
+ let mut seen_cli_pipes = HashSet::new();
'route_loop: loop {
match receiver.recv() {
Some((instruction, err_ctx)) => {
err_ctx.update_thread_ctx();
let rlocked_sessions = session_data.read().to_anyhow().with_context(err_context)?;
- let handle_instruction = |instruction: ClientToServerMsg,
+ let mut handle_instruction = |instruction: ClientToServerMsg,
mut retry_queue: Option<
&mut VecDeque<ClientToServerMsg>,
>|
@@ -882,6 +892,7 @@ pub(crate) fn route_thread_main(
rlocked_sessions.client_attributes.clone(),
rlocked_sessions.default_shell.clone(),
rlocked_sessions.layout.clone(),
+ Some(&mut seen_cli_pipes),
)? {
should_break = true;
}
diff --git a/zellij-server/src/unit/screen_tests.rs b/zellij-server/src/unit/screen_tests.rs
index 6296d60d8..fa939d96c 100644
--- a/zellij-server/src/unit/screen_tests.rs
+++ b/zellij-server/src/unit/screen_tests.rs
@@ -85,7 +85,7 @@ fn take_snapshots_and_cursor_coordinates_from_render_events<'a>(
let snapshots: Vec<(Option<(usize, usize)>, String)> = all_events
.filter_map(|server_instruction| {
match server_instruction {
- ServerInstruction::Render(output) => {
+ ServerInstruction::Render(output, _) => {
if let Some(output) = output {
// note this only takes a snapshot of the first client!
let raw_snapshot = output.get(&1).unwrap();
@@ -125,6 +125,7 @@ fn send_cli_action_to_server(
client_attributes.clone(),
default_shell.clone(),
default_layout.clone(),
+ None,
)
.unwrap();
}
diff --git a/zellij-utils/src/errors.rs b/zellij-utils/src/errors.rs
index 30883d7ba..2be6d89d2 100644
--- a/zellij-utils/src/errors.rs
+++ b/zellij-utils/src/errors.rs
@@ -439,6 +439,7 @@ pub enum ServerContext {
SwitchSession,
UnblockCliPipeInput,
CliPipeOutput,
+ AssociatePipeWithClient,
}
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]