diff options
author | Aram Drevekenin <aram@poor.dev> | 2024-01-16 09:49:16 +0100 |
---|---|---|
committer | Aram Drevekenin <aram@poor.dev> | 2024-01-16 09:49:16 +0100 |
commit | c3e81217b77df1d3de4fd2d136410baf5a4c8341 (patch) | |
tree | 130a9335366b324bec90c2917f414655f26fe13b | |
parent | 4915877f5791a7d18952c8c8161d58150eabfe81 (diff) |
fix(pipes): backpressure across multiple plugins
-rw-r--r-- | zellij-server/src/lib.rs | 19 | ||||
-rw-r--r-- | zellij-server/src/plugins/mod.rs | 13 | ||||
-rw-r--r-- | zellij-server/src/plugins/pipes.rs | 220 | ||||
-rw-r--r-- | zellij-server/src/plugins/plugin_map.rs | 18 | ||||
-rw-r--r-- | zellij-server/src/plugins/unit/plugin_tests.rs | 55 | ||||
-rw-r--r-- | zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__block_input_plugin_command.snap | 18 | ||||
-rw-r--r-- | zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap | 20 | ||||
-rw-r--r-- | zellij-server/src/plugins/wasm_bridge.rs | 234 | ||||
-rw-r--r-- | zellij-server/src/route.rs | 2 | ||||
-rw-r--r-- | zellij-server/src/screen.rs | 50 | ||||
-rw-r--r-- | zellij-server/src/tab/mod.rs | 1 | ||||
-rw-r--r-- | zellij-server/src/unit/screen_tests.rs | 2 | ||||
-rw-r--r-- | zellij-utils/src/errors.rs | 1 |
13 files changed, 414 insertions, 239 deletions
diff --git a/zellij-server/src/lib.rs b/zellij-server/src/lib.rs index ccf0ab4c3..437f465bd 100644 --- a/zellij-server/src/lib.rs +++ b/zellij-server/src/lib.rs @@ -68,8 +68,7 @@ pub enum ServerInstruction { ClientId, Option<PluginsConfig>, ), - Render(Option<HashMap<ClientId, String>>, Option<HashSet<String>>), // 2nd argument is - // input_pipes_to_unblock + Render(Option<HashMap<ClientId, String>>), UnblockInputThread, ClientExit(ClientId), RemoveClient(ClientId), @@ -228,7 +227,7 @@ impl SessionState { } pub fn remove_client(&mut self, client_id: ClientId) { self.clients.remove(&client_id); - self.pipes.retain(|p_id, c_id| c_id != &client_id); + self.pipes.retain(|_p_id, c_id| c_id != &client_id); } pub fn set_client_size(&mut self, client_id: ClientId, size: Size) { self.clients.insert(client_id, Some(size)); @@ -685,7 +684,7 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) { .unwrap(); } }, - ServerInstruction::Render(serialized_output, input_pipes_to_unblock) => { + ServerInstruction::Render(serialized_output) => { let client_ids = session_state.read().unwrap().client_ids(); // If `Some(_)`- unwrap it and forward it to the clients to render. // If `None`- Send an exit instruction. This is the case when a user closes the last Tab/Pane. @@ -701,18 +700,6 @@ pub fn start_server(mut os_input: Box<dyn ServerOsApi>, socket_path: PathBuf) { session_state ); } - if let Some(input_pipes_to_unblock) = input_pipes_to_unblock { - for pipe_name in input_pipes_to_unblock { - for client_id in session_state.read().unwrap().clients.keys() { - send_to_client!( - *client_id, - os_input, - ServerToClientMsg::UnblockCliPipeInput(pipe_name.clone()), - session_state - ); - } - } - } } else { for client_id in client_ids { let _ = os_input diff --git a/zellij-server/src/plugins/mod.rs b/zellij-server/src/plugins/mod.rs index 6d99e5818..fcdeec0be 100644 --- a/zellij-server/src/plugins/mod.rs +++ b/zellij-server/src/plugins/mod.rs @@ -2,6 +2,7 @@ mod plugin_loader; mod plugin_map; mod plugin_worker; mod wasm_bridge; +mod pipes; mod watch_filesystem; mod zellij_exports; use log::info; @@ -20,6 +21,7 @@ use crate::session_layout_metadata::SessionLayoutMetadata; use crate::{pty::PtyInstruction, thread_bus::Bus, ClientId, ServerInstruction}; use wasm_bridge::WasmBridge; +pub use wasm_bridge::PluginRenderAsset; use zellij_utils::{ async_std::{channel, future::timeout, task}, @@ -127,6 +129,7 @@ pub enum PluginInstruction { source_plugin_id: u32, message: MessageToPlugin, }, + UnblockCliPipes(Vec<PluginRenderAsset>), Exit, } @@ -161,6 +164,7 @@ impl From<&PluginInstruction> for PluginContext { PluginInstruction::CliPipe { .. } => PluginContext::CliPipe, PluginInstruction::CachePluginEvents { .. } => PluginContext::CachePluginEvents, PluginInstruction::MessageFromPlugin { .. } => PluginContext::MessageFromPlugin, + PluginInstruction::UnblockCliPipes { .. } => PluginContext::UnblockCliPipes, } } } @@ -562,6 +566,15 @@ pub(crate) fn plugin_thread_main( } wasm_bridge.pipe_messages(pipe_messages, shutdown_send.clone())?; }, + PluginInstruction::UnblockCliPipes(pipes_to_unblock) => { + let pipes_to_unblock = wasm_bridge.update_cli_pipe_state(pipes_to_unblock); + for pipe_name in pipes_to_unblock { + let _ = bus + .senders + .send_to_server(ServerInstruction::UnblockCliPipeInput(pipe_name)) + .context("failed to unblock input pipe"); + } + } PluginInstruction::Exit => { break; }, diff --git a/zellij-server/src/plugins/pipes.rs b/zellij-server/src/plugins/pipes.rs new file mode 100644 index 000000000..778523a23 --- /dev/null +++ b/zellij-server/src/plugins/pipes.rs @@ -0,0 +1,220 @@ +use super::{PluginId, PluginInstruction}; +use crate::plugins::wasm_bridge::PluginRenderAsset; +use crate::plugins::plugin_map::RunningPlugin; +use crate::plugins::zellij_exports::{wasi_read_string, wasi_write_object}; +use std::collections::{HashMap, HashSet}; +use wasmer::Value; +use zellij_utils::data::{PipeMessage, PipeSource}; +use zellij_utils::plugin_api::pipe_message::ProtobufPipeMessage; + +use zellij_utils::prost::Message; +use zellij_utils::errors::prelude::*; + +use crate::{ + thread_bus::ThreadSenders, + ClientId +}; + +#[derive(Debug, Clone)] +pub enum PipeStateChange { + NoChange, + Block, + Unblock +} + +#[derive(Debug, Clone, Default)] +pub struct PendingPipes { + pipes: HashMap<String, PendingPipeInfo> +} + +impl PendingPipes { + pub fn mark_being_processed(&mut self, pipe_id: &str, plugin_id: &PluginId, client_id: &ClientId) { + if self.pipes.contains_key(pipe_id) { + self.pipes.get_mut(pipe_id).map(|pending_pipe_info| pending_pipe_info.add_processing_plugin(plugin_id, client_id)); + } else { + self.pipes.insert(pipe_id.to_owned(), PendingPipeInfo::new(plugin_id, client_id)); + } + } + // returns a list of pipes that are no longer pending and should be unblocked + pub fn update_pipe_state_change(&mut self, cli_pipe_name: &str, pipe_state_change: PipeStateChange, plugin_id: &PluginId, client_id: &ClientId) -> Vec<String> { + let mut pipe_names_to_unblock = vec![]; + match self.pipes.get_mut(cli_pipe_name) { + Some(pending_pipe_info) => { + let should_unblock_this_pipe = pending_pipe_info.update_state_change(pipe_state_change, plugin_id, client_id); + if should_unblock_this_pipe { + pipe_names_to_unblock.push(cli_pipe_name.to_owned()); + } + } + None => { + // state somehow corrupted, let's recover... + pipe_names_to_unblock.push(cli_pipe_name.to_owned()); + } + } + for pipe_name in &pipe_names_to_unblock { + self.pipes.remove(pipe_name); + } + pipe_names_to_unblock + } + // returns a list of pipes that are no longer pending and should be unblocked + pub fn unload_plugin(&mut self, plugin_id: &PluginId) -> Vec<String> { + let mut pipe_names_to_unblock = vec![]; + for (pipe_name, pending_pipe_info) in self.pipes.iter_mut() { + let should_unblock_this_pipe = pending_pipe_info.unload_plugin(plugin_id); + if should_unblock_this_pipe { + pipe_names_to_unblock.push(pipe_name.to_owned()); + } + } + for pipe_name in &pipe_names_to_unblock { + self.pipes.remove(pipe_name); + } + pipe_names_to_unblock + + } +} + +#[derive(Debug, Clone, Default)] +pub struct PendingPipeInfo { + is_explicitly_blocked: bool, + currently_being_processed_by: HashSet<(PluginId, ClientId)> +} + +impl PendingPipeInfo { + pub fn new(plugin_id: &PluginId, client_id: &ClientId) -> Self { + let mut currently_being_processed_by = HashSet::new(); + currently_being_processed_by.insert((*plugin_id, *client_id)); + PendingPipeInfo { + currently_being_processed_by, + ..Default::default() + } + } + pub fn add_processing_plugin(&mut self, plugin_id: &PluginId, client_id: &ClientId) { + self.currently_being_processed_by.insert((*plugin_id, *client_id)); + } + // returns true if this pipe should be unblocked + pub fn update_state_change(&mut self, pipe_state_change: PipeStateChange, plugin_id: &PluginId, client_id: &ClientId) -> bool { + match pipe_state_change { + PipeStateChange::Block => { + self.is_explicitly_blocked = true; + }, + PipeStateChange::Unblock => { + self.is_explicitly_blocked = false; + }, + _ => {} + }; + self.currently_being_processed_by.remove(&(*plugin_id, *client_id)); + let pipe_should_be_unblocked = self.currently_being_processed_by.is_empty() && !self.is_explicitly_blocked; + pipe_should_be_unblocked + } + // returns true if this pipe should be unblocked + pub fn unload_plugin(&mut self, plugin_id_to_unload: &PluginId) -> bool { + self.currently_being_processed_by.retain(|(plugin_id, _)| plugin_id != plugin_id_to_unload); + if self.currently_being_processed_by.is_empty() && !self.is_explicitly_blocked { + true + } else { + false + } + } +} + +pub fn apply_pipe_message_to_plugin( + plugin_id: PluginId, + client_id: ClientId, + running_plugin: &mut RunningPlugin, + pipe_message: &PipeMessage, + plugin_render_assets: &mut Vec<PluginRenderAsset>, + senders: &ThreadSenders, +) -> Result<()> { + let instance = &running_plugin.instance; + let plugin_env = &running_plugin.plugin_env; + let rows = running_plugin.rows; + let columns = running_plugin.columns; + + let err_context = || format!("Failed to apply event to plugin {plugin_id}"); + let protobuf_pipe_message: ProtobufPipeMessage = pipe_message + .clone() + .try_into() + .map_err(|e| anyhow!("Failed to convert to protobuf: {:?}", e))?; + match instance.exports.get_function("pipe") { + Ok(pipe) => { + wasi_write_object(&plugin_env.wasi_env, &protobuf_pipe_message.encode_to_vec()) + .with_context(err_context)?; + let pipe_return = pipe + .call(&mut running_plugin.store, &[]) + .with_context(err_context)?; + let should_render = match pipe_return.get(0) { + Some(Value::I32(n)) => *n == 1, + _ => false, + }; + if rows > 0 && columns > 0 && should_render { + let rendered_bytes = instance + .exports + .get_function("render") + .map_err(anyError::new) + .and_then(|render| { + render + .call( + &mut running_plugin.store, + &[Value::I32(rows as i32), Value::I32(columns as i32)], + ) + .map_err(anyError::new) + }) + .and_then(|_| wasi_read_string(&plugin_env.wasi_env)) + .with_context(err_context)?; + let pipes_to_block_or_unblock = pipes_to_block_or_unblock(running_plugin, Some(&pipe_message.source)); + let plugin_render_asset = PluginRenderAsset::new(plugin_id, client_id, rendered_bytes.as_bytes().to_vec()) + .with_pipes(pipes_to_block_or_unblock); + plugin_render_assets.push(plugin_render_asset); + } else { + let pipes_to_block_or_unblock = pipes_to_block_or_unblock(running_plugin, Some(&pipe_message.source)); + let plugin_render_asset = PluginRenderAsset::new(plugin_id, client_id, vec![]) + .with_pipes(pipes_to_block_or_unblock); + let _ = senders + .send_to_plugin(PluginInstruction::UnblockCliPipes(vec![plugin_render_asset])) + .context("failed to unblock input pipe"); + } + }, + Err(_e) => { + // no-op, this is probably an old plugin that does not have this interface + // we don't log this error because if we do the logs will be super crowded + let pipes_to_block_or_unblock = pipes_to_block_or_unblock(running_plugin, Some(&pipe_message.source)); + let plugin_render_asset = PluginRenderAsset::new( + plugin_id, + client_id, + vec![], // nothing to render + ).with_pipes(pipes_to_block_or_unblock); + let _ = senders + .send_to_plugin(PluginInstruction::UnblockCliPipes(vec![plugin_render_asset])) + .context("failed to unblock input pipe"); + }, + } + Ok(()) +} + +pub fn pipes_to_block_or_unblock(running_plugin: &mut RunningPlugin, current_pipe: Option<&PipeSource>) -> HashMap<String, PipeStateChange> { + let mut pipe_state_changes = HashMap::new(); + let mut input_pipes_to_unblock: HashSet<String> = running_plugin + .plugin_env + .input_pipes_to_unblock + .lock() + .unwrap() + .drain() + .collect(); + let mut input_pipes_to_block: HashSet<String> = running_plugin + .plugin_env + .input_pipes_to_block + .lock() + .unwrap() + .drain() + .collect(); + if let Some(PipeSource::Cli(current_pipe)) = current_pipe { + pipe_state_changes.insert(current_pipe.to_owned(), PipeStateChange::NoChange); + } + for pipe in input_pipes_to_block.drain() { + pipe_state_changes.insert(pipe, PipeStateChange::Block); + } + for pipe in input_pipes_to_unblock.drain() { + // unblock has priority over block if they happened simultaneously + pipe_state_changes.insert(pipe, PipeStateChange::Unblock); + } + pipe_state_changes +} diff --git a/zellij-server/src/plugins/plugin_map.rs b/zellij-server/src/plugins/plugin_map.rs index 9e96c1011..e03cf1728 100644 --- a/zellij-server/src/plugins/plugin_map.rs +++ b/zellij-server/src/plugins/plugin_map.rs @@ -178,24 +178,6 @@ impl PluginMap { } Ok(plugin_ids) } - pub fn all_plugin_and_client_ids_for_plugin_location( - &self, - plugin_location: &RunPluginLocation, - plugin_configuration: &PluginUserConfiguration, - ) -> Vec<(PluginId, ClientId)> { - self.plugin_assets - .iter() - .filter(|(_, (running_plugin, _subscriptions, _workers))| { - let running_plugin = running_plugin.lock().unwrap(); - let running_plugin_location = &running_plugin.plugin_env.plugin.location; - let running_plugin_configuration = - &running_plugin.plugin_env.plugin.userspace_configuration; - running_plugin_location == plugin_location - && running_plugin_configuration == plugin_configuration - }) - .map(|((plugin_id, client_id), _)| (*plugin_id, *client_id)) - .collect() - } pub fn clone_plugin_assets( &self, ) -> HashMap<RunPluginLocation, HashMap<PluginUserConfiguration, Vec<(PluginId, ClientId)>>> diff --git a/zellij-server/src/plugins/unit/plugin_tests.rs b/zellij-server/src/plugins/unit/plugin_tests.rs index b8ea6d566..6791213c7 100644 --- a/zellij-server/src/plugins/unit/plugin_tests.rs +++ b/zellij-server/src/plugins/unit/plugin_tests.rs @@ -603,11 +603,14 @@ pub fn load_new_plugin_from_hd() { .unwrap() .iter() .find_map(|i| { - if let ScreenInstruction::PluginBytes(plugin_bytes, _) = i { - for (plugin_id, client_id, plugin_bytes) in plugin_bytes { - let plugin_bytes = String::from_utf8_lossy(plugin_bytes).to_string(); + if let ScreenInstruction::PluginBytes(plugin_render_assets) = i { + for plugin_render_asset in plugin_render_assets { + let plugin_id = plugin_render_asset.plugin_id; + let client_id = plugin_render_asset.client_id; + let plugin_bytes = plugin_render_asset.bytes.clone(); + let plugin_bytes = String::from_utf8_lossy(plugin_bytes.as_slice()).to_string(); if plugin_bytes.contains("InputReceived") { - return Some((*plugin_id, *client_id, plugin_bytes)); + return Some((plugin_id, client_id, plugin_bytes)); } } } @@ -679,11 +682,14 @@ pub fn plugin_workers() { .unwrap() .iter() .find_map(|i| { - if let ScreenInstruction::PluginBytes(plugin_bytes, _) = i { - for (plugin_id, client_id, plugin_bytes) in plugin_bytes { - let plugin_bytes = String::from_utf8_lossy(plugin_bytes).to_string(); + if let ScreenInstruction::PluginBytes(plugin_render_assets) = i { + for plugin_render_asset in plugin_render_assets { + let plugin_id = plugin_render_asset.plugin_id; + let client_id = plugin_render_asset.client_id; + let plugin_bytes = plugin_render_asset.bytes.clone(); + let plugin_bytes = String::from_utf8_lossy(plugin_bytes.as_slice()).to_string(); if plugin_bytes.contains("Payload from worker") { - return Some((*plugin_id, *client_id, plugin_bytes)); + return Some((plugin_id, client_id, plugin_bytes)); } } } @@ -764,11 +770,14 @@ pub fn plugin_workers_persist_state() { .unwrap() .iter() .find_map(|i| { - if let ScreenInstruction::PluginBytes(plugin_bytes, _) = i { - for (plugin_id, client_id, plugin_bytes) in plugin_bytes { - let plugin_bytes = String::from_utf8_lossy(plugin_bytes).to_string(); + if let ScreenInstruction::PluginBytes(plugin_render_assets) = i { + for plugin_render_asset in plugin_render_assets { + let plugin_bytes = plugin_render_asset.bytes.clone(); + let plugin_id = plugin_render_asset.plugin_id; + let client_id = plugin_render_asset.client_id; + let plugin_bytes = String::from_utf8_lossy(plugin_bytes.as_slice()).to_string(); if plugin_bytes.contains("received 2 messages") { - return Some((*plugin_id, *client_id, plugin_bytes)); + return Some((plugin_id, client_id, plugin_bytes)); } } } @@ -838,11 +847,14 @@ pub fn can_subscribe_to_hd_events() { .unwrap() .iter() .find_map(|i| { - if let ScreenInstruction::PluginBytes(plugin_bytes, _) = i { - for (plugin_id, client_id, plugin_bytes) in plugin_bytes { - let plugin_bytes = String::from_utf8_lossy(plugin_bytes).to_string(); + if let ScreenInstruction::PluginBytes(plugin_render_assets) = i { + for plugin_render_asset in plugin_render_assets { + let plugin_id = plugin_render_asset.plugin_id; + let client_id = plugin_render_asset.client_id; + let plugin_bytes = plugin_render_asset.bytes.clone(); + let plugin_bytes = String::from_utf8_lossy(plugin_bytes.as_slice()).to_string(); if plugin_bytes.contains("FileSystemCreate") { - return Some((*plugin_id, *client_id, plugin_bytes)); + return Some((plugin_id, client_id, plugin_bytes)); } } } @@ -5943,11 +5955,14 @@ pub fn pipe_message_to_plugin_plugin_command() { .unwrap() .iter() .find_map(|i| { - if let ScreenInstruction::PluginBytes(plugin_bytes, _) = i { - for (plugin_id, client_id, plugin_bytes) in plugin_bytes { - let plugin_bytes = String::from_utf8_lossy(plugin_bytes).to_string(); + if let ScreenInstruction::PluginBytes(plugin_render_assets) = i { + for plugin_render_asset in plugin_render_assets { + let plugin_id = plugin_render_asset.plugin_id; + let client_id = plugin_render_asset.client_id; + let plugin_bytes = plugin_render_asset.bytes.clone(); + let plugin_bytes = String::from_utf8_lossy(plugin_bytes.as_slice()).to_string(); if plugin_bytes.contains("Payload from self:") { - return Some((*plugin_id, *client_id, plugin_bytes)); + return Some((plugin_id, client_id, plugin_bytes)); } } } diff --git a/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__block_input_plugin_command.snap b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__block_input_plugin_command.snap index cd8d854b6..2034c6a0a 100644 --- a/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__block_input_plugin_command.snap +++ b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__block_input_plugin_command.snap @@ -1,15 +1,15 @@ --- source: zellij-server/src/plugins/./unit/plugin_tests.rs -assertion_line: 5783 +assertion_line: 5812 expression: "format!(\"{:#?}\", plugin_bytes_events)" --- Some( PluginBytes( [ - ( - 0, - 1, - [ + PluginRenderAsset { + client_id: 1, + plugin_id: 0, + bytes: [ 82, 111, 119, @@ -53,10 +53,10 @@ Some( 10, 13, ], - ), + cli_pipes: { + "input_pipe_id": Block, + }, + }, ], - Some( - {}, - ), ), ) diff --git a/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap index a36ea0518..ce6f0344a 100644 --- a/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap +++ b/zellij-server/src/plugins/unit/snapshots/zellij_server__plugins__plugin_tests__unblock_input_plugin_command.snap @@ -1,15 +1,15 @@ --- source: zellij-server/src/plugins/./unit/plugin_tests.rs -assertion_line: 5702 +assertion_line: 5730 expression: "format!(\"{:#?}\", plugin_bytes_events)" --- Some( PluginBytes( [ - ( - 0, - 1, - [ + PluginRenderAsset { + client_id: 1, + plugin_id: 0, + bytes: [ 82, 111, 119, @@ -53,12 +53,10 @@ Some( 10, 13, ], - ), - ], - Some( - { - "input_pipe_id", + cli_pipes: { + "input_pipe_id": Unblock, + }, }, - ), + ], ), ) diff --git a/zellij-server/src/plugins/wasm_bridge.rs b/zellij-server/src/plugins/wasm_bridge.rs index 8b50ded0b..3dfd9aab4 100644 --- a/zellij-server/src/plugins/wasm_bridge.rs +++ b/zellij-server/src/plugins/wasm_bridge.rs @@ -1,5 +1,6 @@ use super::{PluginId, PluginInstruction}; use crate::plugins::plugin_loader::PluginLoader; +use crate::plugins::pipes::{PipeStateChange, PendingPipes, apply_pipe_message_to_plugin, pipes_to_block_or_unblock}; use crate::plugins::plugin_map::{AtomicEvent, PluginEnv, PluginMap, RunningPlugin, Subscriptions}; use crate::plugins::plugin_worker::MessageToWorker; use crate::plugins::watch_filesystem::watch_filesystem; @@ -22,7 +23,6 @@ use zellij_utils::downloader::Downloader; use zellij_utils::input::permission::PermissionCache; use zellij_utils::notify_debouncer_full::{notify::RecommendedWatcher, Debouncer, FileIdMap}; use zellij_utils::plugin_api::event::ProtobufEvent; -use zellij_utils::plugin_api::pipe_message::ProtobufPipeMessage; use zellij_utils::prost::Message; @@ -43,12 +43,35 @@ use zellij_utils::{ pane_size::Size, }; -#[derive(Clone)] +#[derive(Debug, Clone)] pub enum EventOrPipeMessage { Event(Event), PipeMessage(PipeMessage), } +#[derive(Debug, Clone, Default)] +pub struct PluginRenderAsset { // TODO: naming + pub client_id: ClientId, + pub plugin_id: PluginId, |