diff options
author | Aram Drevekenin <aram@poor.dev> | 2023-12-28 17:19:22 +0100 |
---|---|---|
committer | Aram Drevekenin <aram@poor.dev> | 2023-12-28 17:19:22 +0100 |
commit | 124c6759168ad369b6bbfc0926fbcc7bf24c3894 (patch) | |
tree | d17651f77da53d70dc89c3d73e1ab8ba4f6f1e14 | |
parent | e23ba43a312ced1f31ec63813d1118dec289beac (diff) |
fix: pipe/client-server communication robustness
-rw-r--r-- | zellij-client/src/cli_client.rs | 25 | ||||
-rw-r--r-- | zellij-server/src/lib.rs | 86 | ||||
-rw-r--r-- | zellij-server/src/plugins/mod.rs | 4 | ||||
-rw-r--r-- | zellij-server/src/plugins/unit/plugin_tests.rs | 6 | ||||
-rw-r--r-- | zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap | 32 | ||||
-rw-r--r-- | zellij-server/src/plugins/zellij_exports.rs | 1 | ||||
-rw-r--r-- | zellij-server/src/route.rs | 15 | ||||
-rw-r--r-- | zellij-server/src/unit/screen_tests.rs | 3 | ||||
-rw-r--r-- | zellij-utils/src/errors.rs | 1 |
9 files changed, 149 insertions, 24 deletions
diff --git a/zellij-client/src/cli_client.rs b/zellij-client/src/cli_client.rs index 4f0b472f8..423d38ba3 100644 --- a/zellij-client/src/cli_client.rs +++ b/zellij-client/src/cli_client.rs @@ -9,7 +9,7 @@ use zellij_utils::{ uuid::Uuid, errors::prelude::*, input::actions::Action, - ipc::{ClientToServerMsg, ServerToClientMsg}, + ipc::{ClientToServerMsg, ServerToClientMsg, ExitReason}, }; pub fn start_cli_client(mut os_input: Box<dyn ClientOsApi>, session_name: &str, actions: Vec<Action>) { @@ -107,7 +107,6 @@ fn pipe_client( }, Some((ServerToClientMsg::CliPipeOutput(pipe_name, output), _)) => { let err_context = "Failed to write to stdout"; - // log::info!("CLI CLIENT, output to pipe: {:?}, input_pipe_id: {:?}", pipe_name, input_pipe_id); if pipe_name == input_pipe_id { let mut stdout = os_input.get_stdout_writer(); stdout @@ -119,6 +118,17 @@ fn pipe_client( .non_fatal(); } }, + Some((ServerToClientMsg::Exit(exit_reason), _)) => { + match exit_reason { + ExitReason::Error(e) => { + eprintln!("{}", e); + process::exit(2); + }, + _ => { + process::exit(0); + } + } + } _ => {}, } } @@ -142,6 +152,17 @@ fn single_message_client(os_input: &mut Box<dyn ClientOsApi>, action: Action, pa log_lines.iter().for_each(|line| eprintln!("{line}")); process::exit(2); }, + Some((ServerToClientMsg::Exit(exit_reason), _)) => { + match exit_reason { + ExitReason::Error(e) => { + eprintln!("{}", e); + process::exit(2); + }, + _ => { + process::exit(0); + } + } + } _ => {}, } } diff --git a/zellij-server/src/lib.rs b/zellij-server/src/lib.rs index 161c6aee0..82ad52cb0 100644 --- a/zellij-server/src/lib.rs +++ b/zellij-server/src/lib.rs @@ -89,6 +89,7 @@ pub enum ServerInstruction { SwitchSession(ConnectToSession, ClientId), UnblockCliPipeInput(String), // String -> Pipe name CliPipeOutput(String, String), // String -> Pipe name, String -> Output + AssociatePipeWithClient { pipe_id: String, client_id: ClientId }, } impl From<&ServerInstruction> for ServerContext { @@ -109,6 +110,7 @@ impl From<&ServerInstruction> for ServerContext { ServerInstruction::SwitchSession(..) => ServerContext::SwitchSession, ServerInstruction::UnblockCliPipeInput(..) => ServerContext::UnblockCliPipeInput, ServerInstruction::CliPipeOutput(..) => ServerContext::CliPipeOutput, + ServerInstruction::AssociatePipeWithClient{..} => ServerContext::AssociatePipeWithClient, } } } @@ -191,12 +193,14 @@ macro_rules! send_to_client { #[derive(Clone, Debug, PartialEq)] pub(crate) struct SessionState { clients: HashMap<ClientId, Option<Size>>, + pipes: HashMap<String, ClientId>, // String => pipe_id } impl SessionState { pub fn new() -> Self { SessionState { clients: HashMap::new(), + pipes: HashMap::new(), } } pub fn new_client(&mut self) -> ClientId { @@ -212,8 +216,12 @@ impl SessionState { self.clients.insert(next_client_id, None); next_client_id } + pub fn associate_pipe_with_client(&mut self, pipe_id: String, client_id: ClientId) { + self.pipes.insert(pipe_id, client_id); + } pub fn remove_client(&mut self, client_id: ClientId) { self.clients.remove(&client_id); + self.pipes.retain(|p_id, c_id| c_id != &client_id); } pub fn set_client_size(&mut self, client_id: ClientId, size: Size) { self.clients.insert(client_id, Some(size)); @@ -245,6 +253,17 @@ impl SessionState { pub fn client_ids(&self) -> Vec<ClientId> { self.clients.keys().copied().collect() } + pub fn active_clients_are_connected(&self) -> bool { + let ids_of_pipe_clients: HashSet<ClientId> = self.pipes.values().copied().collect(); + let mut active_clients_connected = false; + for client_id in self.clients.keys() { + if ids_of_pipe_clients.contains(client_id) { + continue; + } + active_clients_connected = true; + } + active_clients_connected + } } pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) { @@ -496,23 +515,49 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) { } }, ServerInstruction::UnblockCliPipeInput(pipe_name) => { - for client_id in session_state.read().unwrap().clients.keys() { - send_to_client!( - *client_id, - os_input, - ServerToClientMsg::UnblockCliPipeInput(pipe_name.clone()), - session_state - ); + match session_state.read().unwrap().pipes.get(&pipe_name) { + Some(client_id) => { + send_to_client!( + *client_id, + os_input, + ServerToClientMsg::UnblockCliPipeInput(pipe_name.clone()), + session_state + ); + }, + None => { + // send to all clients, this pipe might not have been associated yet + for client_id in session_state.read().unwrap().clients.keys() { + send_to_client!( + *client_id, + os_input, + ServerToClientMsg::UnblockCliPipeInput(pipe_name.clone()), + session_state + ); + } + } } }, ServerInstruction::CliPipeOutput(pipe_name, output) => { - for client_id in session_state.read().unwrap().clients.keys() { - send_to_client!( - *client_id, - os_input, - ServerToClientMsg::CliPipeOutput(pipe_name.clone(), output.clone()), - session_state - ); + match session_state.read().unwrap().pipes.get(&pipe_name) { + Some(client_id) => { + send_to_client!( + *client_id, + os_input, + ServerToClientMsg::CliPipeOutput(pipe_name.clone(), output.clone()), + session_state + ); + }, + None => { + // send to all clients, this pipe might not have been associated yet + for client_id in session_state.read().unwrap().clients.keys() { + send_to_client!( + *client_id, + os_input, + ServerToClientMsg::CliPipeOutput(pipe_name.clone(), output.clone()), + session_state + ); + } + } } }, ServerInstruction::ClientExit(client_id) => { @@ -545,8 +590,13 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) { .senders .send_to_plugin(PluginInstruction::RemoveClient(client_id)) .unwrap(); - if session_state.read().unwrap().clients.is_empty() { + if !session_state.read().unwrap().active_clients_are_connected() { *session_data.write().unwrap() = None; + let client_ids_to_cleanup: Vec<ClientId> = session_state.read().unwrap().clients.keys().copied().collect(); + // these are just the pipes + for client_id in client_ids_to_cleanup { + remove_client!(client_id, os_input, session_state); + } break; } }, @@ -726,6 +776,12 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) { ); remove_client!(client_id, os_input, session_state); }, + ServerInstruction::AssociatePipeWithClient{ pipe_id, client_id } => { + session_state + .write() + .unwrap() + .associate_pipe_with_client(pipe_id, client_id); + } } } diff --git a/zellij-server/src/plugins/mod.rs b/zellij-server/src/plugins/mod.rs index f54a31b10..afc566fd6 100644 --- a/zellij-server/src/plugins/mod.rs +++ b/zellij-server/src/plugins/mod.rs @@ -461,12 +461,14 @@ pub(crate) fn plugin_thread_main( // the message name - DONE // - TODO: remove the launch_new from everything except the cli place thing - DONE // - TODO: consider re-adding the skip_cache flag - DONE - // - TODO: only send messages (unblockclipipeinput, clipipeoutput) to the relevant client and not all of them + // - TODO: only send messages (unblockclipipeinput, clipipeoutput) to the relevant client and not all of them - DONE // - TODO: look into leaking messages (simultaneously piping to 2 instances of the // plugin with --launch-new) // * bring all the custo moverride stuff form the plugin messages for when // launching a new plugin with a message (like we did through the cli) // * add permissions + // * work on product side... do we need all parameters? does enforcing name make + // sense? now that we separated name and id? rethink (some of) the interface? // * work on cli error messages, must be clearer // TODO: diff --git a/zellij-server/src/plugins/unit/plugin_tests.rs b/zellij-server/src/plugins/unit/plugin_tests.rs index c38b7c268..e07e1ab73 100644 --- a/zellij-server/src/plugins/unit/plugin_tests.rs +++ b/zellij-server/src/plugins/unit/plugin_tests.rs @@ -5672,7 +5672,7 @@ pub fn unblock_input_plugin_command() { let _ = plugin_thread_sender.send(PluginInstruction::Update(vec![( None, Some(client_id), - Event::CliMessage { name: "message_name".to_owned(), payload: Some("message_payload".to_owned()), args: None} + Event::CliMessage { input_pipe_id: "input_pipe_id".to_owned(), name: "message_name".to_owned(), payload: Some("message_payload".to_owned()), args: None} )])); screen_thread.join().unwrap(); // this might take a while if the cache is cold teardown(); @@ -5750,7 +5750,7 @@ pub fn pipe_output_plugin_command() { let _ = plugin_thread_sender.send(PluginInstruction::Update(vec![( None, Some(client_id), - Event::CliMessage { name: "pipe_output".to_owned(), payload: Some("message_payload".to_owned()), args: None} + Event::CliMessage { input_pipe_id: "input_pipe_id".to_owned(), name: "pipe_output".to_owned(), payload: Some("message_payload".to_owned()), args: None} )])); std::thread::sleep(std::time::Duration::from_millis(500)); teardown(); @@ -5825,7 +5825,7 @@ pub fn send_message_to_plugin_plugin_command() { Some(client_id), // this will trigger the fixture plugin to send a message to all plugins and then receive // it itself - Event::CliMessage { name: "send_message_to_plugin".to_owned(), payload: Some("payload_sent_to_self".to_owned()), args: None} + Event::CliMessage { input_pipe_id: "input_pipe_id".to_owned(), name: "send_message_to_plugin".to_owned(), payload: Some("payload_sent_to_self".to_owned()), args: None} )])); std::thread::sleep(std::time::Duration::from_millis(500)); teardown(); diff --git a/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap index 0c8739ce1..a61803c61 100644 --- a/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap +++ b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap @@ -62,6 +62,38 @@ Some( 32, 123, 32, + 105, + 110, + 112, + 117, + 116, + 95, + 112, + 105, + 112, + 101, + 95, + 105, + 100, + 58, + 32, + 34, + 105, + 110, + 112, + 117, + 116, + 95, + 112, + 105, + 112, + 101, + 95, + 105, + 100, + 34, + 44, + 32, 110, 97, 109, diff --git a/zellij-server/src/plugins/zellij_exports.rs b/zellij-server/src/plugins/zellij_exports.rs index a0b60a6df..d08f3e821 100644 --- a/zellij-server/src/plugins/zellij_exports.rs +++ b/zellij-server/src/plugins/zellij_exports.rs @@ -58,6 +58,7 @@ macro_rules! apply_action { $env.plugin_env.client_attributes.clone(), $env.plugin_env.default_shell.clone(), $env.plugin_env.default_layout.clone(), + None, ) { log::error!("{}: {:?}", $error_message(), e); } diff --git a/zellij-server/src/route.rs b/zellij-server/src/route.rs index 01b362b83..375adb464 100644 --- a/zellij-server/src/route.rs +++ b/zellij-server/src/route.rs @@ -1,4 +1,4 @@ -use std::collections::VecDeque; +use std::collections::{VecDeque, HashSet}; use std::sync::{Arc, RwLock}; use crate::thread_bus::ThreadSenders; @@ -36,6 +36,7 @@ pub(crate) fn route_action( client_attributes: ClientAttributes, default_shell: Option<TerminalAction>, default_layout: Box<Layout>, + mut seen_cli_pipes: Option<&mut HashSet<String>>, ) -> Result<bool> { let mut should_break = false; let err_context = || format!("failed to route action for client {client_id}"); @@ -804,6 +805,14 @@ pub(crate) fn route_action( .with_context(err_context)?; }, Action::CliMessage{ input_pipe_id, mut name, payload, plugin, args, configuration, floating, in_place, launch_new, skip_cache, cwd, pane_title } => { + if let Some(seen_cli_pipes) = seen_cli_pipes.as_mut() { + if !seen_cli_pipes.contains(&input_pipe_id) { + seen_cli_pipes.insert(input_pipe_id.clone()); + senders + .send_to_server(ServerInstruction::AssociatePipeWithClient{ pipe_id: input_pipe_id.clone(), client_id }) + .with_context(err_context)?; + } + } if let Some(name) = name.take() { let should_open_in_place = in_place.unwrap_or(false); if should_open_in_place && pane_id.is_none() { @@ -847,12 +856,13 @@ pub(crate) fn route_thread_main( ) -> Result<()> { let mut retry_queue = VecDeque::new(); let err_context = || format!("failed to handle instruction for client {client_id}"); + let mut seen_cli_pipes = HashSet::new(); 'route_loop: loop { match receiver.recv() { Some((instruction, err_ctx)) => { err_ctx.update_thread_ctx(); let rlocked_sessions = session_data.read().to_anyhow().with_context(err_context)?; - let handle_instruction = |instruction: ClientToServerMsg, + let mut handle_instruction = |instruction: ClientToServerMsg, mut retry_queue: Option< &mut VecDeque<ClientToServerMsg>, >| @@ -882,6 +892,7 @@ pub(crate) fn route_thread_main( rlocked_sessions.client_attributes.clone(), rlocked_sessions.default_shell.clone(), rlocked_sessions.layout.clone(), + Some(&mut seen_cli_pipes), )? { should_break = true; } diff --git a/zellij-server/src/unit/screen_tests.rs b/zellij-server/src/unit/screen_tests.rs index 6296d60d8..fa939d96c 100644 --- a/zellij-server/src/unit/screen_tests.rs +++ b/zellij-server/src/unit/screen_tests.rs @@ -85,7 +85,7 @@ fn take_snapshots_and_cursor_coordinates_from_render_events<'a>( let snapshots: Vec<(Option<(usize, usize)>, String)> = all_events .filter_map(|server_instruction| { match server_instruction { - ServerInstruction::Render(output) => { + ServerInstruction::Render(output, _) => { if let Some(output) = output { // note this only takes a snapshot of the first client! let raw_snapshot = output.get(&1).unwrap(); @@ -125,6 +125,7 @@ fn send_cli_action_to_server( client_attributes.clone(), default_shell.clone(), default_layout.clone(), + None, ) .unwrap(); } diff --git a/zellij-utils/src/errors.rs b/zellij-utils/src/errors.rs index 30883d7ba..2be6d89d2 100644 --- a/zellij-utils/src/errors.rs +++ b/zellij-utils/src/errors.rs @@ -439,6 +439,7 @@ pub enum ServerContext { SwitchSession, UnblockCliPipeInput, CliPipeOutput, + AssociatePipeWithClient, } #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] |