summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAram Drevekenin <aram@poor.dev>2024-01-16 09:49:16 +0100
committerAram Drevekenin <aram@poor.dev>2024-01-16 09:49:16 +0100
commitc3e81217b77df1d3de4fd2d136410baf5a4c8341 (patch)
tree130a9335366b324bec90c2917f414655f26fe13b
parent4915877f5791a7d18952c8c8161d58150eabfe81 (diff)
fix(pipes): backpressure across multiple plugins
-rw-r--r--zellij-server/src/lib.rs19
-rw-r--r--zellij-server/src/plugins/mod.rs13
-rw-r--r--zellij-server/src/plugins/pipes.rs220
-rw-r--r--zellij-server/src/plugins/plugin_map.rs18
-rw-r--r--zellij-server/src/plugins/unit/plugin_tests.rs55
-rw-r--r--zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__block_input_plugin_command.snap18
-rw-r--r--zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap20
-rw-r--r--zellij-server/src/plugins/wasm_bridge.rs234
-rw-r--r--zellij-server/src/route.rs2
-rw-r--r--zellij-server/src/screen.rs50
-rw-r--r--zellij-server/src/tab/mod.rs1
-rw-r--r--zellij-server/src/unit/screen_tests.rs2
-rw-r--r--zellij-utils/src/errors.rs1
13 files changed, 414 insertions, 239 deletions
diff --git a/zellij-server/src/lib.rs b/zellij-server/src/lib.rs
index ccf0ab4c3..437f465bd 100644
--- a/zellij-server/src/lib.rs
+++ b/zellij-server/src/lib.rs
@@ -68,8 +68,7 @@ pub enum ServerInstruction {
ClientId,
Option<PluginsConfig>,
),
- Render(Option<HashMap<ClientId, String>>, Option<HashSet<String>>), // 2nd argument is
- // input_pipes_to_unblock
+ Render(Option<HashMap<ClientId, String>>),
UnblockInputThread,
ClientExit(ClientId),
RemoveClient(ClientId),
@@ -228,7 +227,7 @@ impl SessionState {
}
pub fn remove_client(&mut self, client_id: ClientId) {
self.clients.remove(&client_id);
- self.pipes.retain(|p_id, c_id| c_id != &client_id);
+ self.pipes.retain(|_p_id, c_id| c_id != &client_id);
}
pub fn set_client_size(&mut self, client_id: ClientId, size: Size) {
self.clients.insert(client_id, Some(size));
@@ -685,7 +684,7 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
.unwrap();
}
},
- ServerInstruction::Render(serialized_output, input_pipes_to_unblock) => {
+ ServerInstruction::Render(serialized_output) => {
let client_ids = session_state.read().unwrap().client_ids();
// If `Some(_)`- unwrap it and forward it to the clients to render.
// If `None`- Send an exit instruction. This is the case when a user closes the last Tab/Pane.
@@ -701,18 +700,6 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) {
session_state
);
}
- if let Some(input_pipes_to_unblock) = input_pipes_to_unblock {
- for pipe_name in input_pipes_to_unblock {
- for client_id in session_state.read().unwrap().clients.keys() {
- send_to_client!(
- *client_id,
- os_input,
- ServerToClientMsg::UnblockCliPipeInput(pipe_name.clone()),
- session_state
- );
- }
- }
- }
} else {
for client_id in client_ids {
let _ = os_input
diff --git a/zellij-server/src/plugins/mod.rs b/zellij-server/src/plugins/mod.rs
index 6d99e5818..fcdeec0be 100644
--- a/zellij-server/src/plugins/mod.rs
+++ b/zellij-server/src/plugins/mod.rs
@@ -2,6 +2,7 @@ mod plugin_loader;
mod plugin_map;
mod plugin_worker;
mod wasm_bridge;
+mod pipes;
mod watch_filesystem;
mod zellij_exports;
use log::info;
@@ -20,6 +21,7 @@ use crate::session_layout_metadata::SessionLayoutMetadata;
use crate::{pty::PtyInstruction, thread_bus::Bus, ClientId, ServerInstruction};
use wasm_bridge::WasmBridge;
+pub use wasm_bridge::PluginRenderAsset;
use zellij_utils::{
async_std::{channel, future::timeout, task},
@@ -127,6 +129,7 @@ pub enum PluginInstruction {
source_plugin_id: u32,
message: MessageToPlugin,
},
+ UnblockCliPipes(Vec<PluginRenderAsset>),
Exit,
}
@@ -161,6 +164,7 @@ impl From<&PluginInstruction> for PluginContext {
PluginInstruction::CliPipe { .. } => PluginContext::CliPipe,
PluginInstruction::CachePluginEvents { .. } => PluginContext::CachePluginEvents,
PluginInstruction::MessageFromPlugin { .. } => PluginContext::MessageFromPlugin,
+ PluginInstruction::UnblockCliPipes { .. } => PluginContext::UnblockCliPipes,
}
}
}
@@ -562,6 +566,15 @@ pub(crate) fn plugin_thread_main(
}
wasm_bridge.pipe_messages(pipe_messages, shutdown_send.clone())?;
},
+ PluginInstruction::UnblockCliPipes(pipes_to_unblock) => {
+ let pipes_to_unblock = wasm_bridge.update_cli_pipe_state(pipes_to_unblock);
+ for pipe_name in pipes_to_unblock {
+ let _ = bus
+ .senders
+ .send_to_server(ServerInstruction::UnblockCliPipeInput(pipe_name))
+ .context("failed to unblock input pipe");
+ }
+ }
PluginInstruction::Exit => {
break;
},
diff --git a/zellij-server/src/plugins/pipes.rs b/zellij-server/src/plugins/pipes.rs
new file mode 100644
index 000000000..778523a23
--- /dev/null
+++ b/zellij-server/src/plugins/pipes.rs
@@ -0,0 +1,220 @@
+use super::{PluginId, PluginInstruction};
+use crate::plugins::wasm_bridge::PluginRenderAsset;
+use crate::plugins::plugin_map::RunningPlugin;
+use crate::plugins::zellij_exports::{wasi_read_string, wasi_write_object};
+use std::collections::{HashMap, HashSet};
+use wasmer::Value;
+use zellij_utils::data::{PipeMessage, PipeSource};
+use zellij_utils::plugin_api::pipe_message::ProtobufPipeMessage;
+
+use zellij_utils::prost::Message;
+use zellij_utils::errors::prelude::*;
+
+use crate::{
+ thread_bus::ThreadSenders,
+ ClientId
+};
+
+#[derive(Debug, Clone)]
+pub enum PipeStateChange {
+ NoChange,
+ Block,
+ Unblock
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct PendingPipes {
+ pipes: HashMap<String, PendingPipeInfo>
+}
+
+impl PendingPipes {
+ pub fn mark_being_processed(&mut self, pipe_id: &str, plugin_id: &PluginId, client_id: &ClientId) {
+ if self.pipes.contains_key(pipe_id) {
+ self.pipes.get_mut(pipe_id).map(|pending_pipe_info| pending_pipe_info.add_processing_plugin(plugin_id, client_id));
+ } else {
+ self.pipes.insert(pipe_id.to_owned(), PendingPipeInfo::new(plugin_id, client_id));
+ }
+ }
+ // returns a list of pipes that are no longer pending and should be unblocked
+ pub fn update_pipe_state_change(&mut self, cli_pipe_name: &str, pipe_state_change: PipeStateChange, plugin_id: &PluginId, client_id: &ClientId) -> Vec<String> {
+ let mut pipe_names_to_unblock = vec![];
+ match self.pipes.get_mut(cli_pipe_name) {
+ Some(pending_pipe_info) => {
+ let should_unblock_this_pipe = pending_pipe_info.update_state_change(pipe_state_change, plugin_id, client_id);
+ if should_unblock_this_pipe {
+ pipe_names_to_unblock.push(cli_pipe_name.to_owned());
+ }
+ }
+ None => {
+ // state somehow corrupted, let's recover...
+ pipe_names_to_unblock.push(cli_pipe_name.to_owned());
+ }
+ }
+ for pipe_name in &pipe_names_to_unblock {
+ self.pipes.remove(pipe_name);
+ }
+ pipe_names_to_unblock
+ }
+ // returns a list of pipes that are no longer pending and should be unblocked
+ pub fn unload_plugin(&mut self, plugin_id: &PluginId) -> Vec<String> {
+ let mut pipe_names_to_unblock = vec![];
+ for (pipe_name, pending_pipe_info) in self.pipes.iter_mut() {
+ let should_unblock_this_pipe = pending_pipe_info.unload_plugin(plugin_id);
+ if should_unblock_this_pipe {
+ pipe_names_to_unblock.push(pipe_name.to_owned());
+ }
+ }
+ for pipe_name in &pipe_names_to_unblock {
+ self.pipes.remove(pipe_name);
+ }
+ pipe_names_to_unblock
+
+ }
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct PendingPipeInfo {
+ is_explicitly_blocked: bool,
+ currently_being_processed_by: HashSet<(PluginId, ClientId)>
+}
+
+impl PendingPipeInfo {
+ pub fn new(plugin_id: &PluginId, client_id: &ClientId) -> Self {
+ let mut currently_being_processed_by = HashSet::new();
+ currently_being_processed_by.insert((*plugin_id, *client_id));
+ PendingPipeInfo {
+ currently_being_processed_by,
+ ..Default::default()
+ }
+ }
+ pub fn add_processing_plugin(&mut self, plugin_id: &PluginId, client_id: &ClientId) {
+ self.currently_being_processed_by.insert((*plugin_id, *client_id));
+ }
+ // returns true if this pipe should be unblocked
+ pub fn update_state_change(&mut self, pipe_state_change: PipeStateChange, plugin_id: &PluginId, client_id: &ClientId) -> bool {
+ match pipe_state_change {
+ PipeStateChange::Block => {
+ self.is_explicitly_blocked = true;
+ },
+ PipeStateChange::Unblock => {
+ self.is_explicitly_blocked = false;
+ },
+ _ => {}
+ };
+ self.currently_being_processed_by.remove(&(*plugin_id, *client_id));
+ let pipe_should_be_unblocked = self.currently_being_processed_by.is_empty() && !self.is_explicitly_blocked;
+ pipe_should_be_unblocked
+ }
+ // returns true if this pipe should be unblocked
+ pub fn unload_plugin(&mut self, plugin_id_to_unload: &PluginId) -> bool {
+ self.currently_being_processed_by.retain(|(plugin_id, _)| plugin_id != plugin_id_to_unload);
+ if self.currently_being_processed_by.is_empty() && !self.is_explicitly_blocked {
+ true
+ } else {
+ false
+ }
+ }
+}
+
+pub fn apply_pipe_message_to_plugin(
+ plugin_id: PluginId,
+ client_id: ClientId,
+ running_plugin: &mut RunningPlugin,
+ pipe_message: &PipeMessage,
+ plugin_render_assets: &mut Vec<PluginRenderAsset>,
+ senders: &ThreadSenders,
+) -> Result<()> {
+ let instance = &running_plugin.instance;
+ let plugin_env = &running_plugin.plugin_env;
+ let rows = running_plugin.rows;
+ let columns = running_plugin.columns;
+
+ let err_context = || format!("Failed to apply event to plugin {plugin_id}");
+ let protobuf_pipe_message: ProtobufPipeMessage = pipe_message
+ .clone()
+ .try_into()
+ .map_err(|e| anyhow!("Failed to convert to protobuf: {:?}", e))?;
+ match instance.exports.get_function("pipe") {
+ Ok(pipe) => {
+ wasi_write_object(&plugin_env.wasi_env, &protobuf_pipe_message.encode_to_vec())
+ .with_context(err_context)?;
+ let pipe_return = pipe
+ .call(&mut running_plugin.store, &[])
+ .with_context(err_context)?;
+ let should_render = match pipe_return.get(0) {
+ Some(Value::I32(n)) => *n == 1,
+ _ => false,
+ };
+ if rows > 0 && columns > 0 && should_render {
+ let rendered_bytes = instance
+ .exports
+ .get_function("render")
+ .map_err(anyError::new)
+ .and_then(|render| {
+ render
+ .call(
+ &mut running_plugin.store,
+ &[Value::I32(rows as i32), Value::I32(columns as i32)],
+ )
+ .map_err(anyError::new)
+ })
+ .and_then(|_| wasi_read_string(&plugin_env.wasi_env))
+ .with_context(err_context)?;
+ let pipes_to_block_or_unblock = pipes_to_block_or_unblock(running_plugin, Some(&pipe_message.source));
+ let plugin_render_asset = PluginRenderAsset::new(plugin_id, client_id, rendered_bytes.as_bytes().to_vec())
+ .with_pipes(pipes_to_block_or_unblock);
+ plugin_render_assets.push(plugin_render_asset);
+ } else {
+ let pipes_to_block_or_unblock = pipes_to_block_or_unblock(running_plugin, Some(&pipe_message.source));
+ let plugin_render_asset = PluginRenderAsset::new(plugin_id, client_id, vec![])
+ .with_pipes(pipes_to_block_or_unblock);
+ let _ = senders
+ .send_to_plugin(PluginInstruction::UnblockCliPipes(vec![plugin_render_asset]))
+ .context("failed to unblock input pipe");
+ }
+ },
+ Err(_e) => {
+ // no-op, this is probably an old plugin that does not have this interface
+ // we don't log this error because if we do the logs will be super crowded
+ let pipes_to_block_or_unblock = pipes_to_block_or_unblock(running_plugin, Some(&pipe_message.source));
+ let plugin_render_asset = PluginRenderAsset::new(
+ plugin_id,
+ client_id,
+ vec![], // nothing to render
+ ).with_pipes(pipes_to_block_or_unblock);
+ let _ = senders
+ .send_to_plugin(PluginInstruction::UnblockCliPipes(vec![plugin_render_asset]))
+ .context("failed to unblock input pipe");
+ },
+ }
+ Ok(())
+}
+
+pub fn pipes_to_block_or_unblock(running_plugin: &mut RunningPlugin, current_pipe: Option<&PipeSource>) -> HashMap<String, PipeStateChange> {
+ let mut pipe_state_changes = HashMap::new();
+ let mut input_pipes_to_unblock: HashSet<String> = running_plugin
+ .plugin_env
+ .input_pipes_to_unblock
+ .lock()
+ .unwrap()
+ .drain()
+ .collect();
+ let mut input_pipes_to_block: HashSet<String> = running_plugin
+ .plugin_env
+ .input_pipes_to_block
+ .lock()
+ .unwrap()
+ .drain()
+ .collect();
+ if let Some(PipeSource::Cli(current_pipe)) = current_pipe {
+ pipe_state_changes.insert(current_pipe.to_owned(), PipeStateChange::NoChange);
+ }
+ for pipe in input_pipes_to_block.drain() {
+ pipe_state_changes.insert(pipe, PipeStateChange::Block);
+ }
+ for pipe in input_pipes_to_unblock.drain() {
+ // unblock has priority over block if they happened simultaneously
+ pipe_state_changes.insert(pipe, PipeStateChange::Unblock);
+ }
+ pipe_state_changes
+}
diff --git a/zellij-server/src/plugins/plugin_map.rs b/zellij-server/src/plugins/plugin_map.rs
index 9e96c1011..e03cf1728 100644
--- a/zellij-server/src/plugins/plugin_map.rs
+++ b/zellij-server/src/plugins/plugin_map.rs
@@ -178,24 +178,6 @@ impl PluginMap {
}
Ok(plugin_ids)
}
- pub fn all_plugin_and_client_ids_for_plugin_location(
- &self,
- plugin_location: &RunPluginLocation,
- plugin_configuration: &PluginUserConfiguration,
- ) -> Vec<(PluginId, ClientId)> {
- self.plugin_assets
- .iter()
- .filter(|(_, (running_plugin, _subscriptions, _workers))| {
- let running_plugin = running_plugin.lock().unwrap();
- let running_plugin_location = &running_plugin.plugin_env.plugin.location;
- let running_plugin_configuration =
- &running_plugin.plugin_env.plugin.userspace_configuration;
- running_plugin_location == plugin_location
- && running_plugin_configuration == plugin_configuration
- })
- .map(|((plugin_id, client_id), _)| (*plugin_id, *client_id))
- .collect()
- }
pub fn clone_plugin_assets(
&self,
) -> HashMap<RunPluginLocation, HashMap<PluginUserConfiguration, Vec<(PluginId, ClientId)>>>
diff --git a/zellij-server/src/plugins/unit/plugin_tests.rs b/zellij-server/src/plugins/unit/plugin_tests.rs
index b8ea6d566..6791213c7 100644
--- a/zellij-server/src/plugins/unit/plugin_tests.rs
+++ b/zellij-server/src/plugins/unit/plugin_tests.rs
@@ -603,11 +603,14 @@ pub fn load_new_plugin_from_hd() {
.unwrap()
.iter()
.find_map(|i| {
- if let ScreenInstruction::PluginBytes(plugin_bytes, _) = i {
- for (plugin_id, client_id, plugin_bytes) in plugin_bytes {
- let plugin_bytes = String::from_utf8_lossy(plugin_bytes).to_string();
+ if let ScreenInstruction::PluginBytes(plugin_render_assets) = i {
+ for plugin_render_asset in plugin_render_assets {
+ let plugin_id = plugin_render_asset.plugin_id;
+ let client_id = plugin_render_asset.client_id;
+ let plugin_bytes = plugin_render_asset.bytes.clone();
+ let plugin_bytes = String::from_utf8_lossy(plugin_bytes.as_slice()).to_string();
if plugin_bytes.contains("InputReceived") {
- return Some((*plugin_id, *client_id, plugin_bytes));
+ return Some((plugin_id, client_id, plugin_bytes));
}
}
}
@@ -679,11 +682,14 @@ pub fn plugin_workers() {
.unwrap()
.iter()
.find_map(|i| {
- if let ScreenInstruction::PluginBytes(plugin_bytes, _) = i {
- for (plugin_id, client_id, plugin_bytes) in plugin_bytes {
- let plugin_bytes = String::from_utf8_lossy(plugin_bytes).to_string();
+ if let ScreenInstruction::PluginBytes(plugin_render_assets) = i {
+ for plugin_render_asset in plugin_render_assets {
+ let plugin_id = plugin_render_asset.plugin_id;
+ let client_id = plugin_render_asset.client_id;
+ let plugin_bytes = plugin_render_asset.bytes.clone();
+ let plugin_bytes = String::from_utf8_lossy(plugin_bytes.as_slice()).to_string();
if plugin_bytes.contains("Payload from worker") {
- return Some((*plugin_id, *client_id, plugin_bytes));
+ return Some((plugin_id, client_id, plugin_bytes));
}
}
}
@@ -764,11 +770,14 @@ pub fn plugin_workers_persist_state() {
.unwrap()
.iter()
.find_map(|i| {
- if let ScreenInstruction::PluginBytes(plugin_bytes, _) = i {
- for (plugin_id, client_id, plugin_bytes) in plugin_bytes {
- let plugin_bytes = String::from_utf8_lossy(plugin_bytes).to_string();
+ if let ScreenInstruction::PluginBytes(plugin_render_assets) = i {
+ for plugin_render_asset in plugin_render_assets {
+ let plugin_bytes = plugin_render_asset.bytes.clone();
+ let plugin_id = plugin_render_asset.plugin_id;
+ let client_id = plugin_render_asset.client_id;
+ let plugin_bytes = String::from_utf8_lossy(plugin_bytes.as_slice()).to_string();
if plugin_bytes.contains("received 2 messages") {
- return Some((*plugin_id, *client_id, plugin_bytes));
+ return Some((plugin_id, client_id, plugin_bytes));
}
}
}
@@ -838,11 +847,14 @@ pub fn can_subscribe_to_hd_events() {
.unwrap()
.iter()
.find_map(|i| {
- if let ScreenInstruction::PluginBytes(plugin_bytes, _) = i {
- for (plugin_id, client_id, plugin_bytes) in plugin_bytes {
- let plugin_bytes = String::from_utf8_lossy(plugin_bytes).to_string();
+ if let ScreenInstruction::PluginBytes(plugin_render_assets) = i {
+ for plugin_render_asset in plugin_render_assets {
+ let plugin_id = plugin_render_asset.plugin_id;
+ let client_id = plugin_render_asset.client_id;
+ let plugin_bytes = plugin_render_asset.bytes.clone();
+ let plugin_bytes = String::from_utf8_lossy(plugin_bytes.as_slice()).to_string();
if plugin_bytes.contains("FileSystemCreate") {
- return Some((*plugin_id, *client_id, plugin_bytes));
+ return Some((plugin_id, client_id, plugin_bytes));
}
}
}
@@ -5943,11 +5955,14 @@ pub fn pipe_message_to_plugin_plugin_command() {
.unwrap()
.iter()
.find_map(|i| {
- if let ScreenInstruction::PluginBytes(plugin_bytes, _) = i {
- for (plugin_id, client_id, plugin_bytes) in plugin_bytes {
- let plugin_bytes = String::from_utf8_lossy(plugin_bytes).to_string();
+ if let ScreenInstruction::PluginBytes(plugin_render_assets) = i {
+ for plugin_render_asset in plugin_render_assets {
+ let plugin_id = plugin_render_asset.plugin_id;
+ let client_id = plugin_render_asset.client_id;
+ let plugin_bytes = plugin_render_asset.bytes.clone();
+ let plugin_bytes = String::from_utf8_lossy(plugin_bytes.as_slice()).to_string();
if plugin_bytes.contains("Payload from self:") {
- return Some((*plugin_id, *client_id, plugin_bytes));
+ return Some((plugin_id, client_id, plugin_bytes));
}
}
}
diff --git a/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__block_input_plugin_command.snap b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__block_input_plugin_command.snap
index cd8d854b6..2034c6a0a 100644
--- a/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__block_input_plugin_command.snap
+++ b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__block_input_plugin_command.snap
@@ -1,15 +1,15 @@
---
source: zellij-server/src/plugins/./unit/plugin_tests.rs
-assertion_line: 5783
+assertion_line: 5812
expression: "format!(\"{:#?}\", plugin_bytes_events)"
---
Some(
PluginBytes(
[
- (
- 0,
- 1,
- [
+ PluginRenderAsset {
+ client_id: 1,
+ plugin_id: 0,
+ bytes: [
82,
111,
119,
@@ -53,10 +53,10 @@ Some(
10,
13,
],
- ),
+ cli_pipes: {
+ "input_pipe_id": Block,
+ },
+ },
],
- Some(
- {},
- ),
),
)
diff --git a/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap
index a36ea0518..ce6f0344a 100644
--- a/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap
+++ b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap
@@ -1,15 +1,15 @@
---
source: zellij-server/src/plugins/./unit/plugin_tests.rs
-assertion_line: 5702
+assertion_line: 5730
expression: "format!(\"{:#?}\", plugin_bytes_events)"
---
Some(
PluginBytes(
[
- (
- 0,
- 1,
- [
+ PluginRenderAsset {
+ client_id: 1,
+ plugin_id: 0,
+ bytes: [
82,
111,
119,
@@ -53,12 +53,10 @@ Some(
10,
13,
],
- ),
- ],
- Some(
- {
- "input_pipe_id",
+ cli_pipes: {
+ "input_pipe_id": Unblock,
+ },
},
- ),
+ ],
),
)
diff --git a/zellij-server/src/plugins/wasm_bridge.rs b/zellij-server/src/plugins/wasm_bridge.rs
index 8b50ded0b..3dfd9aab4 100644
--- a/zellij-server/src/plugins/wasm_bridge.rs
+++ b/zellij-server/src/plugins/wasm_bridge.rs
@@ -1,5 +1,6 @@
use super::{PluginId, PluginInstruction};
use crate::plugins::plugin_loader::PluginLoader;
+use crate::plugins::pipes::{PipeStateChange, PendingPipes, apply_pipe_message_to_plugin, pipes_to_block_or_unblock};
use crate::plugins::plugin_map::{AtomicEvent, PluginEnv, PluginMap, RunningPlugin, Subscriptions};
use crate::plugins::plugin_worker::MessageToWorker;
use crate::plugins::watch_filesystem::watch_filesystem;
@@ -22,7 +23,6 @@ use zellij_utils::downloader::Downloader;
use zellij_utils::input::permission::PermissionCache;
use zellij_utils::notify_debouncer_full::{notify::RecommendedWatcher, Debouncer, FileIdMap};
use zellij_utils::plugin_api::event::ProtobufEvent;
-use zellij_utils::plugin_api::pipe_message::ProtobufPipeMessage;
use zellij_utils::prost::Message;
@@ -43,12 +43,35 @@ use zellij_utils::{
pane_size::Size,
};
-#[derive(Clone)]
+#[derive(Debug, Clone)]
pub enum EventOrPipeMessage {
Event(Event),
PipeMessage(PipeMessage),
}
+#[derive(Debug, Clone, Default)]
+pub struct PluginRenderAsset { // TODO: naming
+ pub client_id: ClientId,
+ pub plugin_id: PluginId,