diff options
author | Aram Drevekenin <aram@poor.dev> | 2024-01-09 21:14:53 +0100 |
---|---|---|
committer | Aram Drevekenin <aram@poor.dev> | 2024-01-09 21:14:53 +0100 |
commit | b9c3d89970c1af295511a82d74e88de5807875c1 (patch) | |
tree | baf8991edd7ffaff90535c062e44d457ad4e1bef | |
parent | 3538ce5e54c42da544342aac75724a90975c076b (diff) |
refactor: update(event) -> pipe
18 files changed, 552 insertions, 334 deletions
diff --git a/default-plugins/fixture-plugin-for-tests/src/main.rs b/default-plugins/fixture-plugin-for-tests/src/main.rs index 3524f5421..973d0eca5 100644 --- a/default-plugins/fixture-plugin-for-tests/src/main.rs +++ b/default-plugins/fixture-plugin-for-tests/src/main.rs @@ -35,9 +35,12 @@ impl<'de> ZellijWorker<'de> for TestWorker { } } +#[cfg(target_family = "wasm")] register_plugin!(State); +#[cfg(target_family = "wasm")] register_worker!(TestWorker, test_worker, TEST_WORKER); +#[cfg(target_family = "wasm")] impl ZellijPlugin for State { fn load(&mut self, configuration: BTreeMap<String, String>) { request_permission(&[ @@ -62,8 +65,6 @@ impl ZellijPlugin for State { EventType::FileSystemCreate, EventType::FileSystemUpdate, EventType::FileSystemDelete, - EventType::CliMessage, - EventType::MessageFromPlugin, ]); } @@ -286,20 +287,6 @@ impl ZellijPlugin for State { self.received_payload = Some(payload.clone()); } }, - Event::CliMessage{name, payload, ..} => { - if name == "message_name" && payload == &Some("message_payload".to_owned()) { - unblock_cli_pipe_input(name); - } else if name == "pipe_output" { - cli_pipe_output(name, "this_is_my_output"); - } else if name == "send_message_to_plugin" { - send_message_to_plugin(MessageToPlugin::new("message_to_plugin").with_payload("my_cool_payload")); - } - }, - Event::MessageFromPlugin {name, payload, ..} => { - if name == "message_to_plugin" { - self.message_to_plugin_payload = payload.clone(); - } - } Event::SystemClipboardFailure => { // this is just to trigger the worker message post_message_to(PluginMessage { @@ -314,6 +301,27 @@ impl ZellijPlugin for State { self.received_events.push(event); should_render } + fn pipe(&mut self, pipe_message: PipeMessage) -> bool { + let input_pipe_id = match pipe_message.source { + PipeSource::Cli(id) => id.clone(), + PipeSource::Plugin(id) => format!("{}", id) + }; + let name = pipe_message.name; + let payload = pipe_message.payload; + if name == "message_name" && payload == Some("message_payload".to_owned()) { + unblock_cli_pipe_input(&input_pipe_id); + } else if name == "message_name_block" { + block_cli_pipe_input(&input_pipe_id); + } else if name == "pipe_output" { + cli_pipe_output(&name, "this_is_my_output"); + } else if name == "send_message_to_plugin" { + send_message_to_plugin(MessageToPlugin::new("message_to_plugin").with_payload("my_cool_payload")); + } else if name == "message_to_plugin" { + self.message_to_plugin_payload = payload.clone(); + } + let should_render = true; + should_render + } fn render(&mut self, rows: usize, cols: usize) { if let Some(payload) = self.received_payload.as_ref() { diff --git a/zellij-server/src/plugins/mod.rs b/zellij-server/src/plugins/mod.rs index e74e0126c..88845bba6 100644 --- a/zellij-server/src/plugins/mod.rs +++ b/zellij-server/src/plugins/mod.rs @@ -23,7 +23,7 @@ use wasm_bridge::WasmBridge; use zellij_utils::{ async_std::{channel, future::timeout, task}, - data::{Event, EventType, PermissionStatus, PermissionType, PluginCapabilities, MessageToPlugin}, + data::{Event, EventType, PermissionStatus, PermissionType, PluginCapabilities, MessageToPlugin, PipeMessage, PipeSource}, errors::{prelude::*, ContextType, PluginContext}, input::{ command::TerminalAction, @@ -116,7 +116,10 @@ pub enum PluginInstruction { cli_client_id: ClientId, }, CachePluginEvents { plugin_id: PluginId }, - MessageFromPlugin(MessageToPlugin), + MessageFromPlugin{ + source_plugin_id: u32, + message: MessageToPlugin + }, Exit, } @@ -438,7 +441,7 @@ pub(crate) fn plugin_thread_main( } => { let should_float = floating.unwrap_or(true); let size = Size::default(); // TODO: why?? - let mut updates = vec![]; + let mut pipe_messages = vec![]; match plugin { Some(plugin_url) => { match RunPlugin::from_url(&plugin_url) { @@ -458,7 +461,11 @@ pub(crate) fn plugin_thread_main( Some(cli_client_id), ); for (plugin_id, client_id) in all_plugin_ids { - updates.push((Some(plugin_id), client_id, Event::CliMessage {input_pipe_id: input_pipe_id.clone(), name: name.clone(), payload: payload.clone(), args: args.clone() })); + pipe_messages.push(( + Some(plugin_id), + client_id, + PipeMessage::new(PipeSource::Cli(input_pipe_id.clone()), &name, &payload, &args) + )); } }, Err(e) => { @@ -472,36 +479,40 @@ pub(crate) fn plugin_thread_main( // send to all plugins let all_plugin_ids = wasm_bridge.all_plugin_ids(); for (plugin_id, client_id) in all_plugin_ids { - updates.push((Some(plugin_id), Some(client_id), Event::CliMessage{ input_pipe_id: input_pipe_id.clone(), name: name.clone(), payload: payload.clone(), args: args.clone()})); + pipe_messages.push(( + Some(plugin_id), + Some(client_id), + PipeMessage::new(PipeSource::Cli(input_pipe_id.clone()), &name, &payload, &args) + )); } } } - wasm_bridge.update_plugins(updates, shutdown_send.clone())?; + wasm_bridge.pipe_messages(pipe_messages, shutdown_send.clone())?; }, PluginInstruction::CachePluginEvents { plugin_id } => { wasm_bridge.cache_plugin_events(plugin_id); } - PluginInstruction::MessageFromPlugin(message_to_plugin) => { - let cwd = message_to_plugin.new_plugin_args.as_ref().and_then(|n| n.cwd.clone()); + PluginInstruction::MessageFromPlugin{source_plugin_id, message} => { + let cwd = message.new_plugin_args.as_ref().and_then(|n| n.cwd.clone()); let size = Size::default(); - let mut updates = vec![]; - let skip_cache = message_to_plugin.new_plugin_args.as_ref().map(|n| n.skip_cache).unwrap_or(false); - let should_float = message_to_plugin.new_plugin_args.as_ref().and_then(|n| n.should_float).unwrap_or(true); - let should_be_open_in_place = message_to_plugin + let mut pipe_messages = vec![]; + let skip_cache = message.new_plugin_args.as_ref().map(|n| n.skip_cache).unwrap_or(false); + let should_float = message.new_plugin_args.as_ref().and_then(|n| n.should_float).unwrap_or(true); + let should_be_open_in_place = message .new_plugin_args .as_ref() .and_then(|n| n.pane_id_to_replace) .is_some(); - let pane_title = message_to_plugin.new_plugin_args.as_ref().and_then(|n| n.pane_title.clone()); - let pane_id_to_replace = message_to_plugin + let pane_title = message.new_plugin_args.as_ref().and_then(|n| n.pane_title.clone()); + let pane_id_to_replace = message .new_plugin_args .as_ref() .and_then(|n| n.pane_id_to_replace); - match message_to_plugin.plugin_url { + match message.plugin_url { Some(plugin_url) => { match RunPlugin::from_url(&plugin_url) { Ok(mut run_plugin) => { - run_plugin.configuration = PluginUserConfiguration::new(message_to_plugin.plugin_config); + run_plugin.configuration = PluginUserConfiguration::new(message.plugin_config); let all_plugin_ids = wasm_bridge.get_or_load_plugins( run_plugin, size, @@ -514,11 +525,11 @@ pub(crate) fn plugin_thread_main( None, ); for (plugin_id, client_id) in all_plugin_ids { - updates.push((Some(plugin_id), client_id, Event::MessageFromPlugin { - name: message_to_plugin.message_name.clone(), - payload: message_to_plugin.message_payload.clone(), - args: Some(message_to_plugin.message_args.clone()), - })); + pipe_messages.push(( + Some(plugin_id), + client_id, + PipeMessage::new(PipeSource::Plugin(source_plugin_id), &message.message_name, &message.message_payload, &Some(message.message_args.clone())), + )) } }, Err(e) => { @@ -530,15 +541,15 @@ pub(crate) fn plugin_thread_main( // send to all plugins let all_plugin_ids = wasm_bridge.all_plugin_ids(); for (plugin_id, client_id) in all_plugin_ids { - updates.push((Some(plugin_id), Some(client_id), Event::MessageFromPlugin { - name: message_to_plugin.message_name.clone(), - payload: message_to_plugin.message_payload.clone(), - args: Some(message_to_plugin.message_args.clone()), - })); + pipe_messages.push(( + Some(plugin_id), + Some(client_id), + PipeMessage::new(PipeSource::Plugin(source_plugin_id), &message.message_name, &message.message_payload, &Some(message.message_args.clone())) + )); } } } - wasm_bridge.update_plugins(updates, shutdown_send.clone())?; + wasm_bridge.pipe_messages(pipe_messages, shutdown_send.clone())?; } PluginInstruction::Exit => { break; diff --git a/zellij-server/src/plugins/plugin_loader.rs b/zellij-server/src/plugins/plugin_loader.rs index 4ed81786b..c14a324b7 100644 --- a/zellij-server/src/plugins/plugin_loader.rs +++ b/zellij-server/src/plugins/plugin_loader.rs @@ -833,6 +833,7 @@ impl<'a> PluginLoader<'a> { default_layout: self.default_layout.clone(), plugin_cwd: self.zellij_cwd.clone(), input_pipes_to_unblock: Arc::new(Mutex::new(HashSet::new())), + input_pipes_to_block: Arc::new(Mutex::new(HashSet::new())), }; let subscriptions = Arc::new(Mutex::new(HashSet::new())); diff --git a/zellij-server/src/plugins/plugin_map.rs b/zellij-server/src/plugins/plugin_map.rs index 705f96e69..5805cc562 100644 --- a/zellij-server/src/plugins/plugin_map.rs +++ b/zellij-server/src/plugins/plugin_map.rs @@ -285,6 +285,7 @@ pub struct PluginEnv { pub default_layout: Box<Layout>, pub plugin_cwd: PathBuf, pub input_pipes_to_unblock: Arc<Mutex<HashSet<String>>>, + pub input_pipes_to_block: Arc<Mutex<HashSet<String>>>, } impl PluginEnv { diff --git a/zellij-server/src/plugins/unit/plugin_tests.rs b/zellij-server/src/plugins/unit/plugin_tests.rs index e07e1ab73..0a1c09f67 100644 --- a/zellij-server/src/plugins/unit/plugin_tests.rs +++ b/zellij-server/src/plugins/unit/plugin_tests.rs @@ -5669,11 +5669,102 @@ pub fn unblock_input_plugin_command() { false, )); std::thread::sleep(std::time::Duration::from_millis(500)); - let _ = plugin_thread_sender.send(PluginInstruction::Update(vec![( + + let _ = plugin_thread_sender.send(PluginInstruction::CliMessage{ + input_pipe_id: "input_pipe_id".to_owned(), + name: "message_name".to_owned(), + payload: Some("message_payload".to_owned()), + plugin: None, // broadcast + args: None, + configuration: None, + floating: None, + pane_id_to_replace: None, + pane_title: None, + cwd: None, + skip_cache: false, + cli_client_id: client_id + }); + screen_thread.join().unwrap(); // this might take a while if the cache is cold + teardown(); + let plugin_bytes_events = received_screen_instructions + .lock() + .unwrap() + .iter() + .rev() + .find_map(|i| { + if let ScreenInstruction::PluginBytes(..) = i { + Some(i.clone()) + } else { + None + } + }) + .clone(); + assert_snapshot!(format!("{:#?}", plugin_bytes_events)); +} + +#[test] +#[ignore] +pub fn block_input_plugin_command() { + let temp_folder = tempdir().unwrap(); // placed explicitly in the test scope because its + // destructor removes the directory + let plugin_host_folder = PathBuf::from(temp_folder.path()); + let cache_path = plugin_host_folder.join("permissions_test.kdl"); + let (plugin_thread_sender, screen_receiver, teardown) = + create_plugin_thread(Some(plugin_host_folder)); + let plugin_should_float = Some(false); + let plugin_title = Some("test_plugin".to_owned()); + let run_plugin = RunPlugin { + _allow_exec_host_cmd: false, + location: RunPluginLocation::File(PathBuf::from(&*PLUGIN_FIXTURE)), + configuration: Default::default(), + }; + let tab_index = 1; + let client_id = 1; + let size = Size { + cols: 121, + rows: 20, + }; + let received_screen_instructions = Arc::new(Mutex::new(vec![])); + let screen_thread = grant_permissions_and_log_actions_in_thread!( + received_screen_instructions, + ScreenInstruction::PluginBytes, + screen_receiver, + 1, + &PermissionType::ReadCliMessages, + cache_path, + plugin_thread_sender, + client_id + ); + + let _ = plugin_thread_sender.send(PluginInstruction::AddClient(client_id)); + let _ = plugin_thread_sender.send(PluginInstruction::Load( + plugin_should_float, + false, + plugin_title, + run_plugin, + tab_index, None, - Some(client_id), - Event::CliMessage { input_pipe_id: "input_pipe_id".to_owned(), name: "message_name".to_owned(), payload: Some("message_payload".to_owned()), args: None} - )])); + client_id, + size, + None, + false, + )); + std::thread::sleep(std::time::Duration::from_millis(500)); + + let _ = plugin_thread_sender.send(PluginInstruction::CliMessage{ + input_pipe_id: "input_pipe_id".to_owned(), + name: "message_name_block".to_owned(), + payload: Some("message_payload".to_owned()), + plugin: None, // broadcast + args: None, + configuration: None, + floating: None, + pane_id_to_replace: None, + pane_title: None, + cwd: None, + skip_cache: false, + cli_client_id: client_id + }); screen_thread.join().unwrap(); // this might take a while if the cache is cold teardown(); let plugin_bytes_events = received_screen_instructions @@ -5747,11 +5838,29 @@ pub fn pipe_output_plugin_command() { false, )); std::thread::sleep(std::time::Duration::from_millis(500)); - let _ = plugin_thread_sender.send(PluginInstruction::Update(vec![( - None, - Some(client_id), - Event::CliMessage { input_pipe_id: "input_pipe_id".to_owned(), name: "pipe_output".to_owned(), payload: Some("message_payload".to_owned()), args: None} - )])); +// let _ = plugin_thread_sender.send(PluginInstruction::Update(vec![( +// None, +// Some(client_id), +// Event::CliMessage { input_pipe_id: "input_pipe_id".to_owned(), name: "pipe_output".to_owned(), payload: Some("message_payload".to_owned()), args: None} +// )])); + + let _ = plugin_thread_sender.send(PluginInstruction::CliMessage{ + input_pipe_id: "input_pipe_id".to_owned(), + name: "pipe_output".to_owned(), + payload: Some("message_payload".to_owned()), + plugin: None, // broadcast + args: None, + configuration: None, + floating: None, + pane_id_to_replace: None, + pane_title: None, + cwd: None, + skip_cache: false, + cli_client_id: client_id + }); + + + std::thread::sleep(std::time::Duration::from_millis(500)); teardown(); server_thread.join().unwrap(); // this might take a while if the cache is cold @@ -5794,7 +5903,6 @@ pub fn send_message_to_plugin_plugin_command() { rows: 20, }; let received_screen_instructions = Arc::new(Mutex::new(vec![])); - // let screen_thread = grant_permissions_and_log_actions_in_thread_naked_variant!( let screen_thread = grant_permissions_and_log_actions_in_thread!( received_screen_instructions, ScreenInstruction::PluginBytes, @@ -5820,13 +5928,30 @@ pub fn send_message_to_plugin_plugin_command() { false, )); std::thread::sleep(std::time::Duration::from_millis(500)); - let _ = plugin_thread_sender.send(PluginInstruction::Update(vec![( - None, - Some(client_id), - // this will trigger the fixture plugin to send a message to all plugins and then receive - // it itself - Event::CliMessage { input_pipe_id: "input_pipe_id".to_owned(), name: "send_message_to_plugin".to_owned(), payload: Some("payload_sent_to_self".to_owned()), args: None} - )])); + let _ = plugin_thread_sender.send(PluginInstruction::CliMessage{ + input_pipe_id: "input_pipe_id".to_owned(), + name: "send_message_to_plugin".to_owned(), + payload: Some("payload_sent_to_self".to_owned()), + plugin: None, // broadcast + args: None, + configuration: None, + floating: None, + pane_id_to_replace: None, + pane_title: None, + cwd: None, + skip_cache: false, + cli_client_id: client_id + }); + + + +// vec![( +// None, +// Some(client_id), +// // this will trigger the fixture plugin to send a message to all plugins and then receive +// // it itself +// Event::CliMessage { input_pipe_id: "input_pipe_id".to_owned(), name: "send_message_to_plugin".to_owned(), payload: Some("payload_sent_to_self".to_owned()), args: None} +// )])); std::thread::sleep(std::time::Duration::from_millis(500)); teardown(); screen_thread.join().unwrap(); // this might take a while if the cache is cold diff --git a/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__block_input_plugin_command.snap b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__block_input_plugin_command.snap new file mode 100644 index 000000000..cd8d854b6 --- /dev/null +++ b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__block_input_plugin_command.snap @@ -0,0 +1,62 @@ +--- +source: zellij-server/src/plugins/./unit/plugin_tests.rs +assertion_line: 5783 +expression: "format!(\"{:#?}\", plugin_bytes_events)" +--- +Some( + PluginBytes( + [ + ( + 0, + 1, + [ + 82, + 111, + 119, + 115, + 58, + 32, + 50, + 48, + 44, + 32, + 67, + 111, + 108, + 115, + 58, + 32, + 49, + 50, + 49, + 44, + 32, + 82, + 101, + 99, + 101, + 105, + 118, + 101, + 100, + 32, + 101, + 118, + 101, + 110, + 116, + 115, + 58, + 32, + 91, + 93, + 10, + 13, + ], + ), + ], + Some( + {}, + ), + ), +) diff --git a/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap index a61803c61..a36ea0518 100644 --- a/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap +++ b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap @@ -1,6 +1,6 @@ --- source: zellij-server/src/plugins/./unit/plugin_tests.rs -assertion_line: 5695 +assertion_line: 5702 expression: "format!(\"{:#?}\", plugin_bytes_events)" --- Some( @@ -49,123 +49,6 @@ Some( 58, 32, 91, - 67, - 108, - 105, - 77, - 101, - 115, - 115, - 97, - 103, - 101, - 32, - 123, - 32, - 105, - 110, - 112, - 117, - 116, - 95, - 112, - 105, - 112, - 101, - 95, - 105, - 100, - 58, - 32, - 34, - 105, - 110, - 112, - 117, - 116, - 95, - 112, - 105, - 112, |