diff options
author | Aram Drevekenin <aram@poor.dev> | 2023-12-05 11:02:11 +0100 |
---|---|---|
committer | Aram Drevekenin <aram@poor.dev> | 2023-12-05 11:02:11 +0100 |
commit | dfe90ccbc2f6683471ffa520f37ba91d1575a5ad (patch) | |
tree | 56577a0864b835003123bc985efa880729ecfd66 | |
parent | 6e3ac243275753c2380c4e44e7e205081478e56f (diff) |
prototype - pipe from the CLI to plugins and back again
-rw-r--r-- | zellij-client/src/cli_client.rs | 20 | ||||
-rw-r--r-- | zellij-client/src/lib.rs | 13 | ||||
-rw-r--r-- | zellij-server/src/lib.rs | 20 | ||||
-rw-r--r-- | zellij-server/src/plugins/plugin_loader.rs | 1 | ||||
-rw-r--r-- | zellij-server/src/plugins/plugin_map.rs | 1 | ||||
-rw-r--r-- | zellij-server/src/plugins/wasm_bridge.rs | 7 | ||||
-rw-r--r-- | zellij-server/src/plugins/zellij_exports.rs | 25 | ||||
-rw-r--r-- | zellij-server/src/screen.rs | 20 | ||||
-rw-r--r-- | zellij-tile/src/shim.rs | 16 | ||||
-rw-r--r-- | zellij-utils/assets/prost/api.plugin_command.rs | 20 | ||||
-rw-r--r-- | zellij-utils/src/data.rs | 2 | ||||
-rw-r--r-- | zellij-utils/src/errors.rs | 6 | ||||
-rw-r--r-- | zellij-utils/src/ipc.rs | 3 | ||||
-rw-r--r-- | zellij-utils/src/plugin_api/plugin_command.proto | 9 | ||||
-rw-r--r-- | zellij-utils/src/plugin_api/plugin_command.rs | 22 |
15 files changed, 162 insertions, 23 deletions
diff --git a/zellij-client/src/cli_client.rs b/zellij-client/src/cli_client.rs index 2860e898c..eb659f55a 100644 --- a/zellij-client/src/cli_client.rs +++ b/zellij-client/src/cli_client.rs @@ -45,8 +45,24 @@ pub fn start_cli_client(os_input: Box<dyn ClientOsApi>, session_name: &str, acti loop { match os_input.recv_from_server() { - Some((ServerToClientMsg::ContinuePipe, _)) => { - break; + // TODO: CONTINUE HERE (05/12) - convert this to UnblockPipeInput(pipe_name), then + // handle PipeOutput(pipe_name, output), then see what else we need and do + // some tests with a plugin + Some((ServerToClientMsg::UnblockPipeInput(pipe_name), _)) => { + if pipe_name == name { + break; + } + }, + Some((ServerToClientMsg::PipeOutput(pipe_name, output), _)) => { + // TODO: handle errors + let err_context = "Failed to write to stdout"; + if pipe_name == name { + let mut stdout = os_input.get_stdout_writer(); + stdout + .write_all(output.as_bytes()); + // .context(err_context); + stdout.flush();//.context(err_context); + } }, _ => {}, } diff --git a/zellij-client/src/lib.rs b/zellij-client/src/lib.rs index ffa515ede..73d3f5fab 100644 --- a/zellij-client/src/lib.rs +++ b/zellij-client/src/lib.rs @@ -39,7 +39,6 @@ pub(crate) enum ClientInstruction { Error(String), Render(String), UnblockInputThread, - ContinuePipe, Exit(ExitReason), SwitchToMode(InputMode), Connected, @@ -50,6 +49,8 @@ pub(crate) enum ClientInstruction { LogError(Vec<String>), SwitchSession(ConnectToSession), SetSynchronizedOutput(Option<SyncOutput>), + UnblockPipeInput(String), // String -> pipe name + PipeOutput(String, String), // String -> pipe name, String -> output } impl From<ServerToClientMsg> for ClientInstruction { @@ -58,7 +59,6 @@ impl From<ServerToClientMsg> for ClientInstruction { ServerToClientMsg::Exit(e) => ClientInstruction::Exit(e), ServerToClientMsg::Render(buffer) => ClientInstruction::Render(buffer), ServerToClientMsg::UnblockInputThread => ClientInstruction::UnblockInputThread, - ServerToClientMsg::ContinuePipe => ClientInstruction::ContinuePipe, ServerToClientMsg::SwitchToMode(input_mode) => { ClientInstruction::SwitchToMode(input_mode) }, @@ -69,6 +69,12 @@ impl From<ServerToClientMsg> for ClientInstruction { ServerToClientMsg::SwitchSession(connect_to_session) => { ClientInstruction::SwitchSession(connect_to_session) }, + ServerToClientMsg::UnblockPipeInput(pipe_name) => { + ClientInstruction::UnblockPipeInput(pipe_name) + }, + ServerToClientMsg::PipeOutput(pipe_name, output) => { + ClientInstruction::PipeOutput(pipe_name, output) + }, } } } @@ -80,7 +86,6 @@ impl From<&ClientInstruction> for ClientContext { ClientInstruction::Error(_) => ClientContext::Error, ClientInstruction::Render(_) => ClientContext::Render, ClientInstruction::UnblockInputThread => ClientContext::UnblockInputThread, - ClientInstruction::ContinuePipe => ClientContext::ContinuePipe, ClientInstruction::SwitchToMode(_) => ClientContext::SwitchToMode, ClientInstruction::Connected => ClientContext::Connected, ClientInstruction::ActiveClients(_) => ClientContext::ActiveClients, @@ -90,6 +95,8 @@ impl From<&ClientInstruction> for ClientContext { ClientInstruction::DoneParsingStdinQuery => ClientContext::DoneParsingStdinQuery, ClientInstruction::SwitchSession(..) => ClientContext::SwitchSession, ClientInstruction::SetSynchronizedOutput(..) => ClientContext::SetSynchronisedOutput, + ClientInstruction::UnblockPipeInput(..) => ClientContext::UnblockPipeInput, + ClientInstruction::PipeOutput(..) => ClientContext::PipeOutput, } } } diff --git a/zellij-server/src/lib.rs b/zellij-server/src/lib.rs index 7a2b65e92..df802b17b 100644 --- a/zellij-server/src/lib.rs +++ b/zellij-server/src/lib.rs @@ -70,7 +70,6 @@ pub enum ServerInstruction { ), Render(Option<HashMap<ClientId, String>>), UnblockInputThread, - ContinuePipe, ClientExit(ClientId), RemoveClient(ClientId), Error(String), @@ -87,6 +86,8 @@ pub enum ServerInstruction { ActiveClients(ClientId), Log(Vec<String>, ClientId), SwitchSession(ConnectToSession, ClientId), + UnblockPipeInput(String), // String -> Pipe name + PipeOutput(String, String), // String -> Pipe name, String -> Output } impl From<&ServerInstruction> for ServerContext { @@ -95,7 +96,6 @@ impl From<&ServerInstruction> for ServerContext { ServerInstruction::NewClient(..) => ServerContext::NewClient, ServerInstruction::Render(_) => ServerContext::Render, ServerInstruction::UnblockInputThread => ServerContext::UnblockInputThread, - ServerInstruction::ContinuePipe => ServerContext::ContinuePipe, ServerInstruction::ClientExit(..) => ServerContext::ClientExit, ServerInstruction::RemoveClient(..) => ServerContext::RemoveClient, ServerInstruction::Error(_) => ServerContext::Error, @@ -106,6 +106,8 @@ impl From<&ServerInstruction> for ServerContext { ServerInstruction::ActiveClients(_) => ServerContext::ActiveClients, ServerInstruction::Log(..) => ServerContext::Log, ServerInstruction::SwitchSession(..) => ServerContext::SwitchSession, + ServerInstruction::UnblockPipeInput(..) => ServerContext::UnblockPipeInput, + ServerInstruction::PipeOutput(..) => ServerContext::PipeOutput, } } } @@ -492,12 +494,22 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) { ); } }, - ServerInstruction::ContinuePipe => { + ServerInstruction::UnblockPipeInput(pipe_name) => { for client_id in session_state.read().unwrap().clients.keys() { send_to_client!( *client_id, os_input, - ServerToClientMsg::ContinuePipe, + ServerToClientMsg::UnblockPipeInput(pipe_name.clone()), + session_state + ); + } + }, + ServerInstruction::PipeOutput(pipe_name, output) => { + for client_id in session_state.read().unwrap().clients.keys() { + send_to_client!( + *client_id, + os_input, + ServerToClientMsg::PipeOutput(pipe_name.clone(), output.clone()), session_state ); } diff --git a/zellij-server/src/plugins/plugin_loader.rs b/zellij-server/src/plugins/plugin_loader.rs index 1f001db1a..2a8901163 100644 --- a/zellij-server/src/plugins/plugin_loader.rs +++ b/zellij-server/src/plugins/plugin_loader.rs @@ -830,6 +830,7 @@ impl<'a> PluginLoader<'a> { default_shell: self.default_shell.clone(), default_layout: self.default_layout.clone(), plugin_cwd: self.zellij_cwd.clone(), + input_pipes_to_unblock: Arc::new(Mutex::new(HashSet::new())), }; let subscriptions = Arc::new(Mutex::new(HashSet::new())); diff --git a/zellij-server/src/plugins/plugin_map.rs b/zellij-server/src/plugins/plugin_map.rs index 067c178fa..9308db117 100644 --- a/zellij-server/src/plugins/plugin_map.rs +++ b/zellij-server/src/plugins/plugin_map.rs @@ -228,6 +228,7 @@ pub struct PluginEnv { pub default_shell: Option<TerminalAction>, pub default_layout: Box<Layout>, pub plugin_cwd: PathBuf, + pub input_pipes_to_unblock: Arc<Mutex<HashSet<String>>>, } impl PluginEnv { diff --git a/zellij-server/src/plugins/wasm_bridge.rs b/zellij-server/src/plugins/wasm_bridge.rs index a1cca4331..341d8cf67 100644 --- a/zellij-server/src/plugins/wasm_bridge.rs +++ b/zellij-server/src/plugins/wasm_bridge.rs @@ -445,7 +445,7 @@ impl WasmBridge { )]; senders .send_to_screen(ScreenInstruction::PluginBytes( - plugin_bytes, + plugin_bytes, None )) .unwrap(); }, @@ -521,8 +521,10 @@ impl WasmBridge { &mut plugin_bytes, ) { Ok(()) => { + let input_pipes_to_unblock = running_plugin.plugin_env.input_pipes_to_unblock.lock().unwrap().drain().collect(); let _ = senders.send_to_screen(ScreenInstruction::PluginBytes( plugin_bytes, + Some(input_pipes_to_unblock), // TODO: optimize )); }, Err(e) => { @@ -647,8 +649,9 @@ impl WasmBridge { &mut plugin_bytes, ) { Ok(()) => { + let input_pipes_to_unblock = running_plugin.plugin_env.input_pipes_to_unblock.lock().unwrap().drain().collect(); let _ = senders.send_to_screen( - ScreenInstruction::PluginBytes(plugin_bytes), + ScreenInstruction::PluginBytes(plugin_bytes, Some(input_pipes_to_unblock)), ); }, Err(e) => { diff --git a/zellij-server/src/plugins/zellij_exports.rs b/zellij-server/src/plugins/zellij_exports.rs index 3a7c851ef..52e0e5fce 100644 --- a/zellij-server/src/plugins/zellij_exports.rs +++ b/zellij-server/src/plugins/zellij_exports.rs @@ -243,6 +243,12 @@ fn host_run_plugin_command(env: FunctionEnvMut<ForeignFunctionEnv>) { PluginCommand::SubscribeToCustomMessage(custom_message_name) => { subscribe_to_custom_message(env, custom_message_name)? }, + PluginCommand::UnblockPipeInput(pipe_name) => { // TODO: permissions! + unblock_pipe_input(env, pipe_name)? + }, + PluginCommand::PipeOutput(pipe_name, output) => { // TODO: permissions! + pipe_output(env, pipe_name, output)? + }, }, (PermissionStatus::Denied, permission) => { log::error!( @@ -286,6 +292,25 @@ fn subscribe_to_custom_message(env: &ForeignFunctionEnv, custom_message_name: St )) } +// TODO: permissions!!!111oneoneone +fn unblock_pipe_input(env: &ForeignFunctionEnv, pipe_name: String) -> Result<()> { + // TODO: do this after rendering somehow? or is this ok with backpressure? + env.plugin_env.input_pipes_to_unblock.lock().unwrap().insert(pipe_name); + Ok(()) // TODO: no result return +// env.plugin_env +// .senders +// .send_to_server(ServerInstruction::UnblockPipeInput(pipe_name)) +// .context("failed to unblock pipe input") +} + +// TODO: permissions!!!111oneoneone +fn pipe_output(env: &ForeignFunctionEnv, pipe_name: String, output: String) -> Result<()> { + env.plugin_env + .senders + .send_to_server(ServerInstruction::PipeOutput(pipe_name, output)) + .context("failed to send pipe output") +} + fn unsubscribe(env: &ForeignFunctionEnv, event_list: HashSet<EventType>) -> Result<()> { env.subscriptions .lock() diff --git a/zellij-server/src/screen.rs b/zellij-server/src/screen.rs index 2671ab057..dacd9f607 100644 --- a/zellij-server/src/screen.rs +++ b/zellij-server/src/screen.rs @@ -138,7 +138,9 @@ type HoldForCommand = Option<RunCommand>; #[derive(Debug, Clone)] pub enum ScreenInstruction { PtyBytes(u32, VteBytes), - PluginBytes(Vec<(u32, ClientId, VteBytes)>), // u32 is plugin_id + PluginBytes(Vec<(u32, ClientId, VteBytes)>, Option<HashSet<String>>), // u32 is plugin_id, + // HashSet -> input pipes + // to unblock Render, NewPane( PaneId, @@ -2155,7 +2157,7 @@ pub(crate) fn screen_thread_main( } } }, - ScreenInstruction::PluginBytes(mut plugin_bytes) => { + ScreenInstruction::PluginBytes(mut plugin_bytes, input_pipes_to_unblock) => { for (pid, client_id, vte_bytes) in plugin_bytes.drain(..) { let all_tabs = screen.get_tabs_mut(); for tab in all_tabs.values_mut() { @@ -2167,11 +2169,15 @@ pub(crate) fn screen_thread_main( } } screen.render()?; - // TODO: add the pipe id as an Option to PluginBytes - screen.bus - .senders - .send_to_server(ServerInstruction::ContinuePipe) - .context("failed to unblock input"); + if let Some(input_pipes_to_unblock) = input_pipes_to_unblock { + // TODO: add the pipe id as an Option to PluginBytes + for pipe_name in input_pipes_to_unblock { + screen.bus + .senders + .send_to_server(ServerInstruction::UnblockPipeInput(pipe_name)) + .context("failed to unblock input pipe"); + } + } }, ScreenInstruction::Render => { screen.render()?; diff --git a/zellij-tile/src/shim.rs b/zellij-tile/src/shim.rs index c43d4b237..1bf8df794 100644 --- a/zellij-tile/src/shim.rs +++ b/zellij-tile/src/shim.rs @@ -702,6 +702,22 @@ pub fn subscribe_to_custom_message(name: &str) { unsafe { host_run_plugin_command() }; } +/// Unblock the input side of a pipe, requesting the next message be sent if there is one +pub fn unblock_pipe_input(pipe_name: &str) { + let plugin_command = PluginCommand::UnblockPipeInput(pipe_name.to_owned()); + let protobuf_plugin_command: ProtobufPluginCommand = plugin_command.try_into().unwrap(); + object_to_stdout(&protobuf_plugin_command.encode_to_vec()); + unsafe { host_run_plugin_command() }; +} + +/// Send output to the output side of a pipe, ths does not affect the input side of same pipe +pub fn pipe_output(pipe_name: &str, output: &str) { + let plugin_command = PluginCommand::PipeOutput(pipe_name.to_owned(), output.to_owned()); + let protobuf_plugin_command: ProtobufPluginCommand = plugin_command.try_into().unwrap(); + object_to_stdout(&protobuf_plugin_command.encode_to_vec()); + unsafe { host_run_plugin_command() }; +} + // Utility Functions #[allow(unused)] diff --git a/zellij-utils/assets/prost/api.plugin_command.rs b/zellij-utils/assets/prost/api.plugin_command.rs index ec29e42d5..9659e231e 100644 --- a/zellij-utils/assets/prost/api.plugin_command.rs +++ b/zellij-utils/assets/prost/api.plugin_command.rs @@ -5,7 +5,7 @@ pub struct PluginCommand { pub name: i32, #[prost( oneof = "plugin_command::Payload", - tags = "2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47" + tags = "2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49" )] pub payload: ::core::option::Option<plugin_command::Payload>, } @@ -106,10 +106,22 @@ pub mod plugin_command { RenameSessionPayload(::prost::alloc::string::String), #[prost(string, tag = "47")] SubscribeToCustomMessagePayload(::prost::alloc::string::String), + #[prost(string, tag = "48")] + UnblockPipeInputPayload(::prost::alloc::string::String), + #[prost(message, tag = "49")] + PipeOutputPayload(super::PipeOutputPayload), } } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct PipeOutputPayload { + #[prost(string, tag = "1")] + pub pipe_name: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub output: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct SwitchSessionPayload { #[prost(string, optional, tag = "1")] pub name: ::core::option::Option<::prost::alloc::string::String>, @@ -321,6 +333,8 @@ pub enum CommandName { DeleteAllDeadSessions = 74, RenameSession = 75, SubscribeToCustomMessage = 76, + UnblockPipeInput = 77, + PipeOutput = 78, } impl CommandName { /// String value of the enum field names used in the ProtoBuf definition. @@ -406,6 +420,8 @@ impl CommandName { CommandName::DeleteAllDeadSessions => "DeleteAllDeadSessions", CommandName::RenameSession => "RenameSession", CommandName::SubscribeToCustomMessage => "SubscribeToCustomMessage", + CommandName::UnblockPipeInput => "UnblockPipeInput", + CommandName::PipeOutput => "PipeOutput", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -488,6 +504,8 @@ impl CommandName { "DeleteAllDeadSessions" => Some(Self::DeleteAllDeadSessions), "RenameSession" => Some(Self::RenameSession), "SubscribeToCustomMessage" => Some(Self::SubscribeToCustomMessage), + "UnblockPipeInput" => Some(Self::UnblockPipeInput), + "PipeOutput" => Some(Self::PipeOutput), _ => None, } } diff --git a/zellij-utils/src/data.rs b/zellij-utils/src/data.rs index 7286f5988..b295c7bff 100644 --- a/zellij-utils/src/data.rs +++ b/zellij-utils/src/data.rs @@ -1110,4 +1110,6 @@ pub enum PluginCommand { ), RenameSession(String), // String -> new session name SubscribeToCustomMessage(String), // String -> message name + UnblockPipeInput(String), // String => pipe name + PipeOutput(String, String), // String => pipe name, String => output } diff --git a/zellij-utils/src/errors.rs b/zellij-utils/src/errors.rs index d9817e1e9..2ddb34895 100644 --- a/zellij-utils/src/errors.rs +++ b/zellij-utils/src/errors.rs @@ -403,7 +403,6 @@ pub enum ClientContext { Exit, Error, UnblockInputThread, - ContinuePipe, Render, ServerError, SwitchToMode, @@ -416,6 +415,8 @@ pub enum ClientContext { DoneParsingStdinQuery, SwitchSession, SetSynchronisedOutput, + UnblockPipeInput, + PipeOutput, } /// Stack call representations corresponding to the different types of [`ServerInstruction`]s. @@ -424,7 +425,6 @@ pub enum ServerContext { NewClient, Render, UnblockInputThread, - ContinuePipe, ClientExit, RemoveClient, Error, @@ -435,6 +435,8 @@ pub enum ServerContext { ActiveClients, Log, SwitchSession, + UnblockPipeInput, + PipeOutput, } #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] diff --git a/zellij-utils/src/ipc.rs b/zellij-utils/src/ipc.rs index 5cbe6b57a..7fdd4e5c0 100644 --- a/zellij-utils/src/ipc.rs +++ b/zellij-utils/src/ipc.rs @@ -96,7 +96,6 @@ pub enum ClientToServerMsg { pub enum ServerToClientMsg { Render(String), UnblockInputThread, - ContinuePipe, Exit(ExitReason), SwitchToMode(InputMode), Connected, @@ -104,6 +103,8 @@ pub enum ServerToClientMsg { Log(Vec<String>), LogError(Vec<String>), SwitchSession(ConnectToSession), + UnblockPipeInput(String), // String -> pipe name + PipeOutput(String, String), // String -> pipe name, String -> Output } #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/zellij-utils/src/plugin_api/plugin_command.proto b/zellij-utils/src/plugin_api/plugin_command.proto index 25ea79d8e..fb5c4d6bb 100644 --- a/zellij-utils/src/plugin_api/plugin_command.proto +++ b/zellij-utils/src/plugin_api/plugin_command.proto @@ -88,6 +88,8 @@ enum CommandName { DeleteAllDeadSessions = 74; RenameSession = 75; SubscribeToCustomMessage = 76; + UnblockPipeInput = 77; + PipeOutput = 78; } message PluginCommand { @@ -139,9 +141,16 @@ message PluginCommand { string delete_dead_session_payload = 45; string rename_session_payload = 46; string subscribe_to_custom_message_payload = 47; + string unblock_pipe_input_payload = 48; + PipeOutputPayload pipe_output_payload = 49; } } +message PipeOutputPayload { + string pipe_name = 1; + string output = 2; +} + message SwitchSessionPayload { optional string name = 1; optional uint32 tab_position = 2; diff --git a/zellij-utils/src/plugin_api/plugin_command.rs b/zellij-utils/src/plugin_api/plugin_command.rs index 6929b1821..1f14082f3 100644 --- a/zellij-utils/src/plugin_api/plugin_command.rs +++ b/zellij-utils/src/plugin_api/plugin_command.rs @@ -8,7 +8,7 @@ pub use super::generated_api::api::{ OpenFilePayload, PluginCommand as ProtobufPluginCommand, PluginMessagePayload, RequestPluginPermissionPayload, ResizePayload, RunCommandPayload, SetTimeoutPayload, SubscribePayload, SwitchSessionPayload, SwitchTabToPayload, UnsubscribePayload, - WebRequestPayload, + WebRequestPayload, PipeOutputPayload }, plugin_permission::PermissionType as ProtobufPermissionType, resize::ResizeAction as ProtobufResizeAction, @@ -647,6 +647,18 @@ impl TryFrom<ProtobufPluginCommand> for PluginCommand { }, _ => Err("Mismatched payload for SubscribeToCustomMessage"), }, + Some(CommandName::UnblockPipeInput) => match protobuf_plugin_command.payload { + Some(Payload::UnblockPipeInputPayload(pipe_name)) => { + Ok(PluginCommand::UnblockPipeInput(pipe_name)) + }, + _ => Err("Mismatched payload for UnblockPipeInput"), + }, + Some(CommandName::PipeOutput) => match protobuf_plugin_command.payload { + Some(Payload::PipeOutputPayload(PipeOutputPayload { pipe_name, output })) => { + Ok(PluginCommand::PipeOutput(pipe_name, output)) + }, + _ => Err("Mismatched payload for PipeOutput"), + }, None => Err("Unrecognized plugin command"), } } @@ -1079,6 +1091,14 @@ impl TryFrom<PluginCommand> for ProtobufPluginCommand { name: CommandName::SubscribeToCustomMessage as i32, |