diff options
author | Aram Drevekenin <aram@poor.dev> | 2024-01-17 12:10:49 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-17 12:10:49 +0100 |
commit | d780bd91052d8282ba5a7f06c6fb7faa7ca7cc18 (patch) | |
tree | ca08219a38b9e6a3b1c027682359074c86e0dbb5 | |
parent | f6d57295a02393e26c74afb007bf673bcbb454e8 (diff) |
feat(plugins): introduce 'pipes', allowing users to pipe data to and control plugins from the command line (#3066)
* prototype - working with message from the cli
* prototype - pipe from the CLI to plugins
* prototype - pipe from the CLI to plugins and back again
* prototype - working with better cli interface
* prototype - working after removing unused stuff
* prototype - working with launching plugin if it is not launched, also fixed event ordering
* refactor: change message to cli-message
* prototype - allow plugins to send messages to each other
* fix: allow cli messages to send plugin parameters (and implement backpressure)
* fix: use input_pipe_id to identify cli pipes instead of their message name
* fix: come cleanups and add skip_cache parameter
* fix: pipe/client-server communication robustness
* fix: leaking messages between plugins while loading
* feat: allow plugins to specify how a new plugin instance is launched when sending messages
* fix: add permissions
* refactor: adjust cli api
* fix: improve cli plugin loading error messages
* docs: cli pipe
* fix: take plugin configuration into account when messaging between plugins
* refactor: pipe message protobuf interface
* refactor: update(event) -> pipe
* refactor - rename CliMessage to CliPipe
* fix: add is_private to pipes and change some naming
* refactor - cli client
* refactor: various cleanups
* style(fmt): rustfmt
* fix(pipes): backpressure across multiple plugins
* style: some cleanups
* style(fmt): rustfmt
* style: fix merge conflict mistake
* style(wording): clarify pipe permission
48 files changed, 3071 insertions, 305 deletions
diff --git a/default-plugins/fixture-plugin-for-tests/src/main.rs b/default-plugins/fixture-plugin-for-tests/src/main.rs index 79783a1be..fcdc363f7 100644 --- a/default-plugins/fixture-plugin-for-tests/src/main.rs +++ b/default-plugins/fixture-plugin-for-tests/src/main.rs @@ -11,6 +11,7 @@ struct State { received_events: Vec<Event>, received_payload: Option<String>, configuration: BTreeMap<String, String>, + message_to_plugin_payload: Option<String>, } #[derive(Default, Serialize, Deserialize)] @@ -34,9 +35,12 @@ impl<'de> ZellijWorker<'de> for TestWorker { } } +#[cfg(target_family = "wasm")] register_plugin!(State); +#[cfg(target_family = "wasm")] register_worker!(TestWorker, test_worker, TEST_WORKER); +#[cfg(target_family = "wasm")] impl ZellijPlugin for State { fn load(&mut self, configuration: BTreeMap<String, String>) { request_permission(&[ @@ -49,6 +53,8 @@ impl ZellijPlugin for State { PermissionType::OpenTerminalsOrPlugins, PermissionType::WriteToStdin, PermissionType::WebAccess, + PermissionType::ReadCliPipes, + PermissionType::MessageAndLaunchOtherPlugins, ]); self.configuration = configuration; subscribe(&[ @@ -295,10 +301,35 @@ impl ZellijPlugin for State { self.received_events.push(event); should_render } + fn pipe(&mut self, pipe_message: PipeMessage) -> bool { + let input_pipe_id = match pipe_message.source { + PipeSource::Cli(id) => id.clone(), + PipeSource::Plugin(id) => format!("{}", id), + }; + let name = pipe_message.name; + let payload = pipe_message.payload; + if name == "message_name" && payload == Some("message_payload".to_owned()) { + unblock_cli_pipe_input(&input_pipe_id); + } else if name == "message_name_block" { + block_cli_pipe_input(&input_pipe_id); + } else if name == "pipe_output" { + cli_pipe_output(&name, "this_is_my_output"); + } else if name == "pipe_message_to_plugin" { + pipe_message_to_plugin( + MessageToPlugin::new("message_to_plugin").with_payload("my_cool_payload"), + ); + } else if name == "message_to_plugin" { + self.message_to_plugin_payload = payload.clone(); + } + let should_render = true; + should_render + } fn render(&mut self, rows: usize, cols: usize) { if let Some(payload) = self.received_payload.as_ref() { println!("Payload from worker: {:?}", payload); + } else if let Some(payload) = self.message_to_plugin_payload.take() { + println!("Payload from self: {:?}", payload); } else { println!( "Rows: {:?}, Cols: {:?}, Received events: {:?}", diff --git a/src/main.rs b/src/main.rs index 955a982b6..7481bfc58 100644 --- a/src/main.rs +++ b/src/main.rs @@ -111,6 +111,31 @@ fn main() { commands::convert_old_theme_file(old_theme_file); std::process::exit(0); } + if let Some(Command::Sessions(Sessions::Pipe { + name, + payload, + args, + plugin, + plugin_configuration, + })) = opts.command + { + let command_cli_action = CliAction::Pipe { + name, + payload, + args, + plugin, + plugin_configuration, + + force_launch_plugin: false, + skip_plugin_cache: false, + floating_plugin: None, + in_place_plugin: None, + plugin_cwd: None, + plugin_title: None, + }; + commands::send_action_to_session(command_cli_action, opts.session, config); + std::process::exit(0); + } } if let Some(Command::Sessions(Sessions::ListSessions { diff --git a/zellij-client/src/cli_client.rs b/zellij-client/src/cli_client.rs index ef9f9122f..38d73a4cc 100644 --- a/zellij-client/src/cli_client.rs +++ b/zellij-client/src/cli_client.rs @@ -1,15 +1,23 @@ //! The `[cli_client]` is used to attach to a running server session //! and dispatch actions, that are specified through the command line. +use std::collections::BTreeMap; +use std::io::BufRead; use std::process; use std::{fs, path::PathBuf}; use crate::os_input_output::ClientOsApi; use zellij_utils::{ + errors::prelude::*, input::actions::Action, - ipc::{ClientToServerMsg, ServerToClientMsg}, + ipc::{ClientToServerMsg, ExitReason, ServerToClientMsg}, + uuid::Uuid, }; -pub fn start_cli_client(os_input: Box<dyn ClientOsApi>, session_name: &str, actions: Vec<Action>) { +pub fn start_cli_client( + mut os_input: Box<dyn ClientOsApi>, + session_name: &str, + actions: Vec<Action>, +) { let zellij_ipc_pipe: PathBuf = { let mut sock_dir = zellij_utils::consts::ZELLIJ_SOCK_DIR.clone(); fs::create_dir_all(&sock_dir).unwrap(); @@ -21,10 +29,166 @@ pub fn start_cli_client(os_input: Box<dyn ClientOsApi>, session_name: &str, acti let pane_id = os_input .env_variable("ZELLIJ_PANE_ID") .and_then(|e| e.trim().parse().ok()); + for action in actions { - let msg = ClientToServerMsg::Action(action, pane_id, None); - os_input.send_to_server(msg); + match action { + Action::CliPipe { + pipe_id, + name, + payload, + plugin, + args, + configuration, + launch_new, + skip_cache, + floating, + in_place, + cwd, + pane_title, + } => { + pipe_client( + &mut os_input, + pipe_id, + name, + payload, + plugin, + args, + configuration, + launch_new, + skip_cache, + floating, + in_place, + pane_id, + cwd, + pane_title, + ); + }, + action => { + single_message_client(&mut os_input, action, pane_id); + }, + } } +} + +fn pipe_client( + os_input: &mut Box<dyn ClientOsApi>, + pipe_id: String, + mut name: Option<String>, + payload: Option<String>, + plugin: Option<String>, + args: Option<BTreeMap<String, String>>, + mut configuration: Option<BTreeMap<String, String>>, + launch_new: bool, + skip_cache: bool, + floating: Option<bool>, + in_place: Option<bool>, + pane_id: Option<u32>, + cwd: Option<PathBuf>, + pane_title: Option<String>, +) { + let mut stdin = os_input.get_stdin_reader(); + let name = name.take().or_else(|| Some(Uuid::new_v4().to_string())); + if launch_new { + // we do this to make sure the plugin is unique (has a unique configuration parameter) so + // that a new one would be launched, but we'll still send it to the same instance rather + // than launching a new one in every iteration of the loop + configuration + .get_or_insert_with(BTreeMap::new) + .insert("_zellij_id".to_owned(), Uuid::new_v4().to_string()); + } + let create_msg = |payload: Option<String>| -> ClientToServerMsg { + ClientToServerMsg::Action( + Action::CliPipe { + pipe_id: pipe_id.clone(), + name: name.clone(), + payload, + args: args.clone(), + plugin: plugin.clone(), + configuration: configuration.clone(), + floating, + in_place, + launch_new, + skip_cache, + cwd: cwd.clone(), + pane_title: pane_title.clone(), + }, + pane_id, + None, + ) + }; + loop { + if payload.is_some() { + // we got payload from the command line, we should use it and not wait for more + let msg = create_msg(payload); + os_input.send_to_server(msg); + break; + } + // we didn't get payload from the command line, meaning we listen on STDIN because this + // signifies the user is about to pipe more (eg. cat my-large-file | zellij pipe ...) + let mut buffer = String::new(); + let _ = stdin.read_line(&mut buffer); + if buffer.is_empty() { + // end of pipe, send an empty message down the pipe + let msg = create_msg(None); + os_input.send_to_server(msg); + break; + } else { + // we've got data! send it down the pipe (most common) + let msg = create_msg(Some(buffer)); + os_input.send_to_server(msg); + } + loop { + // wait for a response and act accordingly + match os_input.recv_from_server() { + Some((ServerToClientMsg::UnblockCliPipeInput(pipe_name), _)) => { + // unblock this pipe, meaning we need to stop waiting for a response and read + // once more from STDIN + if pipe_name == pipe_id { + break; + } + }, + Some((ServerToClientMsg::CliPipeOutput(pipe_name, output), _)) => { + // send data to STDOUT, this *does not* mean we need to unblock the input + let err_context = "Failed to write to stdout"; + if pipe_name == pipe_id { + let mut stdout = os_input.get_stdout_writer(); + stdout + .write_all(output.as_bytes()) + .context(err_context) + .non_fatal(); + stdout.flush().context(err_context).non_fatal(); + } + }, + Some((ServerToClientMsg::Log(log_lines), _)) => { + log_lines.iter().for_each(|line| println!("{line}")); + process::exit(0); + }, + Some((ServerToClientMsg::LogError(log_lines), _)) => { + log_lines.iter().for_each(|line| eprintln!("{line}")); + process::exit(2); + }, + Some((ServerToClientMsg::Exit(exit_reason), _)) => match exit_reason { + ExitReason::Error(e) => { + eprintln!("{}", e); + process::exit(2); + }, + _ => { + process::exit(0); + }, + }, + _ => {}, + } + } + } +} + +fn single_message_client( + os_input: &mut Box<dyn ClientOsApi>, + action: Action, + pane_id: Option<u32>, +) { + let msg = ClientToServerMsg::Action(action, pane_id, None); + os_input.send_to_server(msg); loop { match os_input.recv_from_server() { Some((ServerToClientMsg::UnblockInputThread, _)) => { @@ -39,6 +203,15 @@ pub fn start_cli_client(os_input: Box<dyn ClientOsApi>, session_name: &str, acti log_lines.iter().for_each(|line| eprintln!("{line}")); process::exit(2); }, + Some((ServerToClientMsg::Exit(exit_reason), _)) => match exit_reason { + ExitReason::Error(e) => { + eprintln!("{}", e); + process::exit(2); + }, + _ => { + process::exit(0); + }, + }, _ => {}, } } diff --git a/zellij-client/src/lib.rs b/zellij-client/src/lib.rs index c704ad84a..640cbcd80 100644 --- a/zellij-client/src/lib.rs +++ b/zellij-client/src/lib.rs @@ -49,6 +49,8 @@ pub(crate) enum ClientInstruction { LogError(Vec<String>), SwitchSession(ConnectToSession), SetSynchronizedOutput(Option<SyncOutput>), + UnblockCliPipeInput(String), // String -> pipe name + CliPipeOutput(String, String), // String -> pipe name, String -> output } impl From<ServerToClientMsg> for ClientInstruction { @@ -67,6 +69,12 @@ impl From<ServerToClientMsg> for ClientInstruction { ServerToClientMsg::SwitchSession(connect_to_session) => { ClientInstruction::SwitchSession(connect_to_session) }, < |