diff options
author | Aram Drevekenin <aram@poor.dev> | 2024-01-11 10:34:24 +0100 |
---|---|---|
committer | Aram Drevekenin <aram@poor.dev> | 2024-01-11 10:34:24 +0100 |
commit | 7dd6b4d9c2cd62e5501463607dfd4aa65e71ef1d (patch) | |
tree | 494d49131ad0a3ef30b27c4d3979a0f45b16a704 | |
parent | 6a8d5f6db87db5bb40a7303f8093df223e86321a (diff) |
refactor: various cleanups
-rw-r--r-- | zellij-server/src/lib.rs | 1 | ||||
-rw-r--r-- | zellij-server/src/plugins/mod.rs | 209 | ||||
-rw-r--r-- | zellij-server/src/plugins/unit/plugin_tests.rs | 3 | ||||
-rw-r--r-- | zellij-server/src/plugins/wasm_bridge.rs | 38 | ||||
-rw-r--r-- | zellij-server/src/screen.rs | 9 |
5 files changed, 132 insertions, 128 deletions
diff --git a/zellij-server/src/lib.rs b/zellij-server/src/lib.rs index 9bbd107b5..3f4d18f65 100644 --- a/zellij-server/src/lib.rs +++ b/zellij-server/src/lib.rs @@ -928,7 +928,6 @@ fn init_session( .spawn({ let plugin_bus = Bus::new( vec![plugin_receiver], - // Some(&to_screen), Some(&to_screen_bounded), Some(&to_pty), Some(&to_plugin), diff --git a/zellij-server/src/plugins/mod.rs b/zellij-server/src/plugins/mod.rs index 3503791ba..59ba18154 100644 --- a/zellij-server/src/plugins/mod.rs +++ b/zellij-server/src/plugins/mod.rs @@ -35,7 +35,6 @@ use zellij_utils::{ }, ipc::ClientAttributes, pane_size::Size, - uuid::Uuid, }; pub type PluginId = u32; @@ -257,7 +256,7 @@ pub(crate) fn plugin_thread_main( match wasm_bridge .load_plugin(&run, Some(tab_index), size, None, skip_cache, None, None) { - Ok((plugin_id, client_id)) => { + Ok((plugin_id, _client_id)) => { let should_be_open_in_place = false; drop(bus.senders.send_to_screen(ScreenInstruction::AddPlugin( should_float, @@ -322,7 +321,7 @@ pub(crate) fn plugin_thread_main( for run_instruction in extracted_run_instructions { if let Some(Run::Plugin(run)) = run_instruction { let skip_cache = false; - let (plugin_id, client_id) = wasm_bridge.load_plugin( + let (plugin_id, _client_id) = wasm_bridge.load_plugin( &run, Some(tab_index), size, @@ -431,7 +430,7 @@ pub(crate) fn plugin_thread_main( payload, plugin, args, - mut configuration, + configuration, floating, pane_id_to_replace, pane_title, @@ -440,54 +439,31 @@ pub(crate) fn plugin_thread_main( cli_client_id, } => { let should_float = floating.unwrap_or(true); - let size = Size::default(); // TODO: why?? let mut pipe_messages = vec![]; match plugin { Some(plugin_url) => { // send to specific plugin(s) - let is_private = true; - match RunPlugin::from_url(&plugin_url) { - Ok(mut run_plugin) => { - if let Some(configuration) = configuration.take() { - run_plugin.configuration = PluginUserConfiguration::new(configuration); - } - let all_plugin_ids = wasm_bridge.get_or_load_plugins( - run_plugin, - size, - cwd, - skip_cache, - should_float, - pane_id_to_replace.is_some(), - pane_title, - pane_id_to_replace, - Some(cli_client_id), - ); - for (plugin_id, client_id) in all_plugin_ids { - pipe_messages.push(( - Some(plugin_id), - client_id, - PipeMessage::new(PipeSource::Cli(pipe_id.clone()), &name, &payload, &args, is_private) - )); - } - }, - Err(e) => { - let _ = bus.senders.send_to_server( - ServerInstruction::LogError(vec![format!("Failed to parse plugin url: {}", e)], cli_client_id) - ); - } - } + pipe_to_specific_plugins( + PipeSource::Cli(pipe_id.clone()), + &plugin_url, + &configuration, + &cwd, + skip_cache, + should_float, + &pane_id_to_replace, + &pane_title, + Some(cli_client_id), + &mut pipe_messages, + &name, + &payload, + &args, + &bus, + &mut wasm_bridge, + ); }, None => { - // send to all plugins - let is_private = false; - let all_plugin_ids = wasm_bridge.all_plugin_ids(); - for (plugin_id, client_id) in all_plugin_ids { - pipe_messages.push(( - Some(plugin_id), - Some(client_id), - PipeMessage::new(PipeSource::Cli(pipe_id.clone()), &name, &payload, &args, is_private) - )); - } + // no specific destination, send to all plugins + pipe_to_all_plugins(PipeSource::Cli(pipe_id.clone()), &name, &payload, &args, &mut wasm_bridge, &mut pipe_messages); } } wasm_bridge.pipe_messages(pipe_messages, shutdown_send.clone())?; @@ -497,15 +473,9 @@ pub(crate) fn plugin_thread_main( } PluginInstruction::MessageFromPlugin{source_plugin_id, message} => { let cwd = message.new_plugin_args.as_ref().and_then(|n| n.cwd.clone()); - let size = Size::default(); let mut pipe_messages = vec![]; let skip_cache = message.new_plugin_args.as_ref().map(|n| n.skip_cache).unwrap_or(false); let should_float = message.new_plugin_args.as_ref().and_then(|n| n.should_float).unwrap_or(true); - let should_be_open_in_place = message - .new_plugin_args - .as_ref() - .and_then(|n| n.pane_id_to_replace) - .is_some(); let pane_title = message.new_plugin_args.as_ref().and_then(|n| n.pane_title.clone()); let pane_id_to_replace = message .new_plugin_args @@ -514,45 +484,27 @@ pub(crate) fn plugin_thread_main( match message.plugin_url { Some(plugin_url) => { // send to specific plugin(s) - let is_private = true; - match RunPlugin::from_url(&plugin_url) { - Ok(mut run_plugin) => { - run_plugin.configuration = PluginUserConfiguration::new(message.plugin_config); - let all_plugin_ids = wasm_bridge.get_or_load_plugins( - run_plugin, - size, - cwd, - skip_cache, - should_float, - should_be_open_in_place, - pane_title, - pane_id_to_replace.map(|p| p.into()), - None, - ); - for (plugin_id, client_id) in all_plugin_ids { - pipe_messages.push(( - Some(plugin_id), - client_id, - PipeMessage::new(PipeSource::Plugin(source_plugin_id), &message.message_name, &message.message_payload, &Some(message.message_args.clone()), is_private), - )) - } - }, - Err(e) => { - log::error!("Failed to parse plugin url: {:?}", e); - } - } + pipe_to_specific_plugins( + PipeSource::Plugin(source_plugin_id), + &plugin_url, + &Some(message.plugin_config), + &cwd, + skip_cache, + should_float, + &pane_id_to_replace.map(|p| p.into()), + &pane_title, + None, + &mut pipe_messages, + &message.message_name, + &message.message_payload, + &Some(message.message_args), + &bus, + &mut wasm_bridge, + ); }, None => { // send to all plugins - let is_private = false; - let all_plugin_ids = wasm_bridge.all_plugin_ids(); - for (plugin_id, client_id) in all_plugin_ids { - pipe_messages.push(( - Some(plugin_id), - Some(client_id), - PipeMessage::new(PipeSource::Plugin(source_plugin_id), &message.message_name, &message.message_payload, &Some(message.message_args.clone()), is_private) - )); - } + pipe_to_all_plugins(PipeSource::Plugin(source_plugin_id), &message.message_name, &message.message_payload, &Some(message.message_args), &mut wasm_bridge, &mut pipe_messages); } } wasm_bridge.pipe_messages(pipe_messages, shutdown_send.clone())?; @@ -607,6 +559,87 @@ fn populate_session_layout_metadata( session_layout_metadata.update_plugin_cmds(plugin_ids_to_cmds); } +fn pipe_to_all_plugins( + pipe_source: PipeSource, + name: &str, + payload: &Option<String>, + args: &Option<BTreeMap<String, String>>, + wasm_bridge: &mut WasmBridge, + pipe_messages: &mut Vec<(Option<PluginId>, Option<ClientId>, PipeMessage)> +) { + let is_private = false; + let all_plugin_ids = wasm_bridge.all_plugin_ids(); + for (plugin_id, client_id) in all_plugin_ids { + pipe_messages.push(( + Some(plugin_id), + Some(client_id), + PipeMessage::new(pipe_source.clone(), name, payload, &args, is_private) + // PipeMessage::new(pipe_source, &message.message_name, &message.message_payload, &Some(message.message_args.clone()), is_private) + // PipeMessage::new(PipeSource::Plugin(source_plugin_id), &message.message_name, &message.message_payload, &Some(message.message_args.clone()), is_private) + )); + } +} + +fn pipe_to_specific_plugins( + pipe_source: PipeSource, + plugin_url: &str, + configuration: &Option<BTreeMap<String, String>>, + cwd: &Option<PathBuf>, + skip_cache: bool, + should_float: bool, + pane_id_to_replace: &Option<PaneId>, + pane_title: &Option<String>, + cli_client_id: Option<ClientId>, + pipe_messages: &mut Vec<(Option<PluginId>, Option<ClientId>, PipeMessage)>, + name: &str, + payload: &Option<String>, + args: &Option<BTreeMap<String, String>>, + bus: &Bus<PluginInstruction>, + wasm_bridge: &mut WasmBridge +) { + let is_private = true; + let size = Size::default(); + match RunPlugin::from_url(&plugin_url) { + Ok(mut run_plugin) => { + if let Some(configuration) = configuration { + run_plugin.configuration = PluginUserConfiguration::new(configuration.clone()); + } + let all_plugin_ids = wasm_bridge.get_or_load_plugins( + run_plugin, + size, + cwd.clone(), + skip_cache, + should_float, + pane_id_to_replace.is_some(), + pane_title.clone(), + pane_id_to_replace.clone(), + cli_client_id, + ); + for (plugin_id, client_id) in all_plugin_ids { + pipe_messages.push(( + Some(plugin_id), + client_id, + PipeMessage::new(pipe_source.clone(), name, payload, args, is_private) + // PipeMessage::new(PipeSource::Cli(pipe_id.clone()), &name, &payload, &args, is_private) + )); + } + }, + Err(e) => { + + match cli_client_id { + Some(cli_client_id) => { + let _ = bus.senders.send_to_server( + ServerInstruction::LogError(vec![format!("Failed to parse plugin url: {}", e)], cli_client_id) + ); + }, + None => { + log::error!("Failed to parse plugin url: {}", e); + } + } + } + } +} + const EXIT_TIMEOUT: Duration = Duration::from_secs(3); #[path = "./unit/plugin_tests.rs"] diff --git a/zellij-server/src/plugins/unit/plugin_tests.rs b/zellij-server/src/plugins/unit/plugin_tests.rs index 651422d75..8d2f31c14 100644 --- a/zellij-server/src/plugins/unit/plugin_tests.rs +++ b/zellij-server/src/plugins/unit/plugin_tests.rs @@ -5854,9 +5854,6 @@ pub fn pipe_output_plugin_command() { skip_cache: false, cli_client_id: client_id }); - - - std::thread::sleep(std::time::Duration::from_millis(500)); teardown(); server_thread.join().unwrap(); // this might take a while if the cache is cold diff --git a/zellij-server/src/plugins/wasm_bridge.rs b/zellij-server/src/plugins/wasm_bridge.rs index fc4053f93..dc7cef7da 100644 --- a/zellij-server/src/plugins/wasm_bridge.rs +++ b/zellij-server/src/plugins/wasm_bridge.rs @@ -436,9 +436,7 @@ impl WasmBridge { running_plugin.rows = new_rows; running_plugin.columns = new_columns; - // TODO: better, right now the plugin doesn't render on first render? if old_rows != new_rows || old_columns != new_columns { - // if true { let rendered_bytes = running_plugin .instance .clone() @@ -519,11 +517,7 @@ impl WasmBridge { let event_type = EventType::from_str(&event.to_string()).with_context(err_context)?; if (subs.contains(&event_type) || event_type == EventType::PermissionRequestResult) - && ((pid.is_none() && cid.is_none()) - || (pid.is_none() && cid == Some(*client_id)) - || (cid.is_none() && pid == Some(*plugin_id)) - || (cid == Some(*client_id) && pid == Some(*plugin_id))) - { + && self.message_is_directed_at_plugin(pid, cid, plugin_id, client_id) { task::spawn({ let mut senders = self.senders.clone(); let running_plugin = running_plugin.clone(); @@ -576,8 +570,6 @@ impl WasmBridge { mut messages: Vec<(Option<PluginId>, Option<ClientId>, PipeMessage)>, shutdown_sender: Sender<()>, ) -> Result<()> { - let err_context = || "failed to update plugin state".to_string(); - let plugins_to_update: Vec<( PluginId, ClientId, @@ -597,12 +589,8 @@ impl WasmBridge { }) .collect(); for (pid, cid, pipe_message) in messages.drain(..) { - for (plugin_id, client_id, running_plugin, subscriptions) in &plugins_to_update { - // TODO: break this conditional out to self.is_directed_at_plugin or some such - if (pid.is_none() && cid.is_none()) - || (pid.is_none() && cid == Some(*client_id)) - || (cid.is_none() && pid == Some(*plugin_id)) - || (cid == Some(*client_id) && pid == Some(*plugin_id)) { + for (plugin_id, client_id, running_plugin, _subscriptions) in &plugins_to_update { + if self.message_is_directed_at_plugin(pid, cid, plugin_id, client_id) { task::spawn({ let mut senders = self.senders.clone(); let running_plugin = running_plugin.clone(); @@ -736,6 +724,7 @@ impl WasmBridge { let events_or_pipe_messages = events_or_pipe_messages.clone(); async move { let subs = subscriptions.lock().unwrap().clone(); + let _s = _s; // guard to allow the task to complete before cleanup/shutdown for event_or_pipe_message in events_or_pipe_messages { match event_or_pipe_message { EventOrPipeMessage::Event(event) => { @@ -746,7 +735,6 @@ impl WasmBridge { } let mut running_plugin = running_plugin.lock().unwrap(); let mut plugin_bytes = vec![]; - // let _s = _s; // guard to allow the task to complete before cleanup/shutdown match apply_event_to_plugin( plugin_id, client_id, @@ -770,7 +758,6 @@ impl WasmBridge { EventOrPipeMessage::PipeMessage(pipe_message) => { let mut running_plugin = running_plugin.lock().unwrap(); let mut plugin_bytes = vec![]; - // let _s = _s; // guard to allow the task to complete before cleanup/shutdown match apply_pipe_message_to_plugin( plugin_id, @@ -855,13 +842,6 @@ impl WasmBridge { if self.cached_plugin_map.is_empty() { self.cached_plugin_map = self.plugin_map.lock().unwrap().clone_plugin_assets(); } -// self.plugin_map -// .lock() -// .unwrap() -// .all_plugin_and_client_ids_for_plugin_location(plugin_location, plugin_configuration) -// .into_iter() -// .map(|(p_id, c_id)| (p_id, Some(c_id))) -// .collect() match self.cached_plugin_map.get(plugin_location).and_then(|m| m.get(plugin_configuration)) { Some(plugin_and_client_ids) => plugin_and_client_ids.iter().map(|(plugin_id, client_id)| (*plugin_id, Some(*client_id))).collect(), None => vec![] @@ -983,8 +963,6 @@ impl WasmBridge { permission_cache.write_to_file().with_context(err_context) } - // TODO: change name to reflect that this is only for waiting for permission request result - // thing pub fn cache_plugin_events(&mut self, plugin_id: PluginId) { self.plugin_ids_waiting_for_permission_request.insert(plugin_id); self.cached_events_for_pending_plugins.entry(plugin_id).or_insert_with(Default::default); @@ -1038,6 +1016,12 @@ impl WasmBridge { pub fn clear_plugin_map_cache(&mut self) { self.cached_plugin_map.clear(); } + fn message_is_directed_at_plugin(&self, message_pid: Option<PluginId>, message_cid: Option<ClientId>, plugin_id: &PluginId, client_id: &ClientId) -> bool { + message_pid.is_none() && message_cid.is_none() + || (message_pid.is_none() && message_cid == Some(*client_id)) + || (message_cid.is_none() && message_pid == Some(*plugin_id)) + || (message_cid == Some(*client_id) && message_pid == Some(*plugin_id)) + } } fn handle_plugin_successful_loading(senders: &ThreadSenders, plugin_id: PluginId) { @@ -1211,7 +1195,7 @@ pub fn apply_pipe_message_to_plugin( plugin_bytes.push((plugin_id, client_id, rendered_bytes.as_bytes().to_vec())); } }, - Err(e) => { + Err(_e) => { // no-op, this is probably an old plugin that does not have this interface // we don't log this error because if we do the logs will be super crowded } diff --git a/zellij-server/src/screen.rs b/zellij-server/src/screen.rs index 72dcf9048..fae34bcb3 100644 --- a/zellij-server/src/screen.rs +++ b/zellij-server/src/screen.rs @@ -2177,15 +2177,6 @@ pub(crate) fn screen_thread_main( } } screen.render(input_pipes_to_unblock)?; -// if let Some(input_pipes_to_unblock) = input_pipes_to_unblock { -// // TODO: add the pipe id as an Option to PluginBytes -// for pipe_name in input_pipes_to_unblock { -// screen.bus -// .senders -// .send_to_server(ServerInstruction::UnblockCliPipeInput(pipe_name)) -// .context("failed to unblock input pipe"); -// } -// } }, ScreenInstruction::Render => { screen.render(None)?; |