diff options
Diffstat (limited to 'zellij-server')
11 files changed, 1031 insertions, 169 deletions
diff --git a/zellij-server/Cargo.toml b/zellij-server/Cargo.toml index 0fa9fe1e3..725b36fce 100644 --- a/zellij-server/Cargo.toml +++ b/zellij-server/Cargo.toml @@ -35,6 +35,8 @@ semver = "0.11.0" [dev-dependencies] insta = "1.6.0" +tempfile = "3.2.0" +wasmer = { version = "2.3.0", features = [ "singlepass" ] } [features] singlepass = ["wasmer/singlepass"] diff --git a/zellij-server/src/logging_pipe.rs b/zellij-server/src/logging_pipe.rs index 6081b283f..4f35db4fb 100644 --- a/zellij-server/src/logging_pipe.rs +++ b/zellij-server/src/logging_pipe.rs @@ -3,6 +3,7 @@ use std::{ io::{Read, Seek, Write}, }; +use crate::plugins::PluginId; use log::{debug, error}; use wasmer_wasi::{WasiFile, WasiFsError}; use zellij_utils::{errors::prelude::*, serde}; @@ -17,11 +18,11 @@ const ZELLIJ_MAX_PIPE_BUFFER_SIZE: usize = 16_384; pub struct LoggingPipe { buffer: VecDeque<u8>, plugin_name: String, - plugin_id: u32, + plugin_id: PluginId, } impl LoggingPipe { - pub fn new(plugin_name: &str, plugin_id: u32) -> LoggingPipe { + pub fn new(plugin_name: &str, plugin_id: PluginId) -> LoggingPipe { LoggingPipe { buffer: VecDeque::new(), plugin_name: String::from(plugin_name), diff --git a/zellij-server/src/plugins/mod.rs b/zellij-server/src/plugins/mod.rs index aaf6164b3..9384b3f62 100644 --- a/zellij-server/src/plugins/mod.rs +++ b/zellij-server/src/plugins/mod.rs @@ -22,6 +22,8 @@ use zellij_utils::{ pane_size::Size, }; +pub type PluginId = u32; + #[derive(Clone, Debug)] pub enum PluginInstruction { Load( @@ -32,8 +34,8 @@ pub enum PluginInstruction { ClientId, Size, ), - Update(Vec<(Option<u32>, Option<ClientId>, Event)>), // Focused plugin / broadcast, client_id, event data - Unload(u32), // plugin_id + Update(Vec<(Option<PluginId>, Option<ClientId>, Event)>), // Focused plugin / broadcast, client_id, event data + Unload(PluginId), // plugin_id Reload( Option<bool>, // should float Option<String>, // pane title @@ -41,7 +43,7 @@ pub enum PluginInstruction { usize, // tab index Size, ), - Resize(u32, usize, usize), // plugin_id, columns, rows + Resize(PluginId, usize, usize), // plugin_id, columns, rows AddClient(ClientId), RemoveClient(ClientId), NewTab( @@ -52,7 +54,23 @@ pub enum PluginInstruction { usize, // tab_index ClientId, ), - ApplyCachedEvents(Vec<u32>), // a list of plugin id + ApplyCachedEvents(Vec<PluginId>), + ApplyCachedWorkerMessages(PluginId), + PostMessagesToPluginWorker( + PluginId, + ClientId, + String, // worker name + Vec<( + String, // serialized message name + String, // serialized payload + )>, + ), + PostMessageToPlugin( + PluginId, + ClientId, + String, // serialized message + String, // serialized payload + ), Exit, } @@ -69,6 +87,13 @@ impl From<&PluginInstruction> for PluginContext { PluginInstruction::RemoveClient(_) => PluginContext::RemoveClient, PluginInstruction::NewTab(..) => PluginContext::NewTab, PluginInstruction::ApplyCachedEvents(..) => PluginContext::ApplyCachedEvents, + PluginInstruction::ApplyCachedWorkerMessages(..) => { + PluginContext::ApplyCachedWorkerMessages + }, + PluginInstruction::PostMessagesToPluginWorker(..) => { + PluginContext::PostMessageToPluginWorker + }, + PluginInstruction::PostMessageToPlugin(..) => PluginContext::PostMessageToPlugin, } } } @@ -163,7 +188,7 @@ pub(crate) fn plugin_thread_main( tab_index, client_id, ) => { - let mut plugin_ids: HashMap<RunPluginLocation, Vec<u32>> = HashMap::new(); + let mut plugin_ids: HashMap<RunPluginLocation, Vec<PluginId>> = HashMap::new(); let mut extracted_run_instructions = tab_layout .clone() .unwrap_or_else(|| layout.new_tab().0) @@ -199,6 +224,30 @@ pub(crate) fn plugin_thread_main( PluginInstruction::ApplyCachedEvents(plugin_id) => { wasm_bridge.apply_cached_events(plugin_id)?; }, + PluginInstruction::ApplyCachedWorkerMessages(plugin_id) => { + wasm_bridge.apply_cached_worker_messages(plugin_id)?; + }, + PluginInstruction::PostMessagesToPluginWorker( + plugin_id, + client_id, + worker_name, + messages, + ) => { + wasm_bridge.post_messages_to_plugin_worker( + plugin_id, + client_id, + worker_name, + messages, + )?; + }, + PluginInstruction::PostMessageToPlugin(plugin_id, client_id, message, payload) => { + let updates = vec![( + Some(plugin_id), + Some(client_id), + Event::CustomMessage(message, payload), + )]; + wasm_bridge.update_plugins(updates)?; + }, PluginInstruction::Exit => { wasm_bridge.cleanup(); break; @@ -218,3 +267,7 @@ pub(crate) fn plugin_thread_main( }) .context("failed to cleanup plugin data directory") } + +#[path = "./unit/plugin_tests.rs"] +#[cfg(test)] +mod plugin_tests; diff --git a/zellij-server/src/plugins/plugin_loader.rs b/zellij-server/src/plugins/plugin_loader.rs index 86afd8ca5..91eb6c93e 100644 --- a/zellij-server/src/plugins/plugin_loader.rs +++ b/zellij-server/src/plugins/plugin_loader.rs @@ -1,5 +1,8 @@ -use crate::plugins::plugin_map::{PluginEnv, PluginMap, RunningPlugin, Subscriptions}; +use crate::plugins::plugin_map::{ + PluginEnv, PluginMap, RunningPlugin, RunningWorker, Subscriptions, +}; use crate::plugins::zellij_exports::{wasi_read_string, zellij_exports}; +use crate::plugins::PluginId; use highway::{HighwayHash, PortableHash}; use log::info; use semver::Version; @@ -19,7 +22,7 @@ use crate::{ }; use zellij_utils::{ - consts::{VERSION, ZELLIJ_CACHE_DIR, ZELLIJ_TMP_DIR}, + consts::{VERSION, ZELLIJ_CACHE_DIR, ZELLIJ_SESSION_CACHE_DIR, ZELLIJ_TMP_DIR}, errors::prelude::*, input::plugins::PluginConfig, pane_size::Size, @@ -152,7 +155,7 @@ pub struct PluginLoader<'a> { plugin_path: PathBuf, loading_indication: &'a mut LoadingIndication, senders: ThreadSenders, - plugin_id: u32, + plugin_id: PluginId, client_id: ClientId, store: Store, plugin: PluginConfig, @@ -165,7 +168,7 @@ pub struct PluginLoader<'a> { impl<'a> PluginLoader<'a> { pub fn reload_plugin_from_memory( - plugin_id: u32, + plugin_id: PluginId, plugin_dir: PathBuf, plugin_cache: Arc<Mutex<HashMap<PathBuf, Module>>>, senders: ThreadSenders, @@ -194,9 +197,7 @@ impl<'a> PluginLoader<'a> { )?; plugin_loader .load_module_from_memory() - .and_then(|module| { - plugin_loader.create_plugin_instance_environment_and_subscriptions(module) - }) + .and_then(|module| plugin_loader.create_plugin_environment(module)) .and_then(|(instance, plugin_env, subscriptions)| { plugin_loader.load_plugin_instance( &instance, @@ -214,7 +215,7 @@ impl<'a> PluginLoader<'a> { } pub fn start_plugin( - plugin_id: u32, + plugin_id: PluginId, client_id: ClientId, plugin: &PluginConfig, tab_index: usize, @@ -227,7 +228,7 @@ impl<'a> PluginLoader<'a> { connected_clients: Arc<Mutex<Vec<ClientId>>>, loading_indication: &mut LoadingIndication, ) -> Result<()> { - let err_context = || format!("failed to start plugin {plugin:#?} for client {client_id}"); + let err_context = || format!("failed to start plugin {plugin_id} for client {client_id}"); let mut plugin_loader = PluginLoader::new( &plugin_cache, loading_indication, @@ -244,9 +245,7 @@ impl<'a> PluginLoader<'a> { .load_module_from_memory() .or_else(|_e| plugin_loader.load_module_from_hd_cache()) .or_else(|_e| plugin_loader.compile_module()) - .and_then(|module| { - plugin_loader.create_plugin_instance_environment_and_subscriptions(module) - }) + .and_then(|module| plugin_loader.create_plugin_environment(module)) .and_then(|(instance, plugin_env, subscriptions)| { plugin_loader.load_plugin_instance( &instance, @@ -276,7 +275,7 @@ impl<'a> PluginLoader<'a> { loading_indication: &mut LoadingIndication, ) -> Result<()> { let mut new_plugins = HashSet::new(); - for (&(plugin_id, _), _) in &*plugin_map.lock().unwrap() { + for plugin_id in plugin_map.lock().unwrap().plugin_ids() { new_plugins.insert((plugin_id, client_id)); } for (plugin_id, existing_client_id) in new_plugins { @@ -292,9 +291,7 @@ impl<'a> PluginLoader<'a> { )?; plugin_loader .load_module_from_memory() - .and_then(|module| { - plugin_loader.create_plugin_instance_environment_and_subscriptions(module) - }) + .and_then(|module| plugin_loader.create_plugin_environment(module)) .and_then(|(instance, plugin_env, subscriptions)| { plugin_loader.load_plugin_instance( &instance, @@ -309,7 +306,7 @@ impl<'a> PluginLoader<'a> { } pub fn reload_plugin( - plugin_id: u32, + plugin_id: PluginId, plugin_dir: PathBuf, plugin_cache: Arc<Mutex<HashMap<PathBuf, Module>>>, senders: ThreadSenders, @@ -339,9 +336,7 @@ impl<'a> PluginLoader<'a> { )?; plugin_loader .compile_module() - .and_then(|module| { - plugin_loader.create_plugin_instance_environment_and_subscriptions(module) - }) + .and_then(|module| plugin_loader.create_plugin_environment(module)) .and_then(|(instance, plugin_env, subscriptions)| { plugin_loader.load_plugin_instance( &instance, @@ -361,7 +356,7 @@ impl<'a> PluginLoader<'a> { plugin_cache: &Arc<Mutex<HashMap<PathBuf, Module>>>, loading_indication: &'a mut LoadingIndication, senders: &ThreadSenders, - plugin_id: u32, + plugin_id: PluginId, client_id: ClientId, store: &Store, plugin: PluginConfig, @@ -369,7 +364,9 @@ impl<'a> PluginLoader<'a> { tab_index: usize, size: Size, ) -> Result<Self> { - let plugin_own_data_dir = ZELLIJ_CACHE_DIR.join(Url::from(&plugin.location).to_string()); + let plugin_own_data_dir = ZELLIJ_SESSION_CACHE_DIR + .join(Url::from(&plugin.location).to_string()) + .join(format!("{}-{}", plugin_id, client_id)); create_plugin_fs_entries(&plugin_own_data_dir)?; let plugin_path = plugin.path.clone(); Ok(PluginLoader { @@ -393,16 +390,16 @@ impl<'a> PluginLoader<'a> { plugin_map: &Arc<Mutex<PluginMap>>, loading_indication: &'a mut LoadingIndication, senders: &ThreadSenders, - plugin_id: u32, + plugin_id: PluginId, client_id: ClientId, store: &Store, plugin_dir: &'a PathBuf, ) -> Result<Self> { let err_context = || "Failed to find existing plugin"; - let (running_plugin, _subscriptions) = { + let (running_plugin, _subscriptions, _workers) = { let mut plugin_map = plugin_map.lock().unwrap(); plugin_map - .remove(&(plugin_id, client_id)) + .remove_single_plugin(plugin_id, client_id) .with_context(err_context)? }; let running_plugin = running_plugin.lock().unwrap(); @@ -431,19 +428,17 @@ impl<'a> PluginLoader<'a> { plugin_map: &Arc<Mutex<PluginMap>>, loading_indication: &'a mut LoadingIndication, senders: &ThreadSenders, - plugin_id: u32, + plugin_id: PluginId, client_id: ClientId, store: &Store, plugin_dir: &'a PathBuf, ) -> Result<Self> { let err_context = || "Failed to find existing plugin"; - let (running_plugin, _subscriptions) = { + let running_plugin = { let plugin_map = plugin_map.lock().unwrap(); plugin_map - .iter() - .find(|((p_id, _c_id), _)| p_id == &plugin_id) + .get_running_plugin(plugin_id, None) .with_context(err_context)? - .1 .clone() }; let running_plugin = running_plugin.lock().unwrap(); @@ -553,49 +548,13 @@ impl<'a> PluginLoader<'a> { .with_context(err_context)?; Ok(module) } - pub fn create_plugin_instance_environment_and_subscriptions( + pub fn create_plugin_environment( &mut self, module: Module, ) -> Result<(Instance, PluginEnv, Arc<Mutex<Subscriptions>>)> { - let err_context = || { - format!( - "Failed to create instance and plugin env for plugin {}", - self.plugin_id - ) - }; - let mut wasi_env = WasiState::new("Zellij") - .env("CLICOLOR_FORCE", "1") - .map_dir("/host", ".") - .and_then(|wasi| wasi.map_dir("/data", &self.plugin_own_data_dir)) - .and_then(|wasi| wasi.map_dir("/tmp", ZELLIJ_TMP_DIR.as_path())) - .and_then(|wasi| { - wasi.stdin(Box::new(Pipe::new())) - .stdout(Box::new(Pipe::new())) - .stderr(Box::new(LoggingPipe::new( - &self.plugin.location.to_string(), - self.plugin_id, - ))) - .finalize() - }) - .with_context(err_context)?; - let wasi = wasi_env.import_object(&module).with_context(err_context)?; - - let mut mut_plugin = self.plugin.clone(); - mut_plugin.set_tab_index(self.tab_index); - let plugin_env = PluginEnv { - plugin_id: self.plugin_id, - client_id: self.client_id, - plugin: mut_plugin, - senders: self.senders.clone(), - wasi_env, - plugin_own_data_dir: self.plugin_own_data_dir.clone(), - tab_index: self.tab_index, - }; - - let subscriptions = Arc::new(Mutex::new(HashSet::new())); - let zellij = zellij_exports(&self.store, &plugin_env, &subscriptions); - let instance = - Instance::new(&module, &zellij.chain_back(wasi)).with_context(err_context)?; + let err_context = || format!("Failed to create environment for plugin"); + let (instance, plugin_env, subscriptions) = + self.create_plugin_instance_env_and_subscriptions(&module)?; assert_plugin_version(&instance, &plugin_env).with_context(err_context)?; // Only do an insert when everything went well! let cloned_plugin = self.plugin.clone(); @@ -605,6 +564,26 @@ impl<'a> PluginLoader<'a> { .insert(cloned_plugin.path, module); Ok((instance, plugin_env, subscriptions)) } + pub fn create_plugin_instance_and_wasi_env_for_worker( + &mut self, + ) -> Result<(Instance, PluginEnv)> { + let err_context = || { + format!( + "Failed to create instance and plugin env for worker {}", + self.plugin_id + ) + }; + let module = self + .plugin_cache + .lock() + .unwrap() + .get(&self.plugin.path) + .with_context(err_context)? + .clone(); + let (instance, plugin_env, _subscriptions) = + self.create_plugin_instance_env_and_subscriptions(&module)?; + Ok((instance, plugin_env)) + } pub fn load_plugin_instance( &mut self, instance: &Instance, @@ -621,11 +600,35 @@ impl<'a> PluginLoader<'a> { self.senders, self.plugin_id ); - let load_function = instance + let start_function = instance .exports .get_function("_start") .with_context(err_context)?; - // This eventually calls the `.load()` method + let load_function = instance + .exports + .get_function("load") + .with_context(err_context)?; + let mut workers = HashMap::new(); + for (function_name, _exported_function) in instance.exports.iter().functions() { + if function_name.ends_with("_worker") { + let plugin_config = self.plugin.clone(); + let (instance, plugin_env) = + self.create_plugin_instance_and_wasi_env_for_worker()?; + + let start_function_for_worker = instance + .exports + .get_function("_start") + .with_context(err_context)?; + start_function_for_worker + .call(&[]) + .with_context(err_context)?; + + let worker = + RunningWorker::new(instance, &function_name, plugin_config, plugin_env); + workers.insert(function_name.into(), Arc::new(Mutex::new(worker))); + } + } + start_function.call(&[]).with_context(err_context)?; load_function.call(&[]).with_context(err_context)?; display_loading_stage!( indicate_starting_plugin_success, @@ -640,16 +643,16 @@ impl<'a> PluginLoader<'a> { self.plugin_id ); plugin_map.lock().unwrap().insert( - (self.plugin_id, self.client_id), - ( - Arc::new(Mutex::new(RunningPlugin::new( - main_user_instance, - main_user_env, - self.size.rows, - self.size.cols, - ))), - subscriptions.clone(), - ), + self.plugin_id, + self.client_id, + Arc::new(Mutex::new(RunningPlugin::new( + main_user_instance, + main_user_env, + self.size.rows, + self.size.cols, + ))), + subscriptions.clone(), + workers, ); display_loading_stage!( indicate_writing_plugin_to_cache_success, @@ -672,6 +675,10 @@ impl<'a> PluginLoader<'a> { self.plugin_id ); for client_id in connected_clients { + if client_id == &self.client_id { + // don't reload the plugin once more for ourselves + continue; + } let mut loading_indication = LoadingIndication::new("".into()); let mut plugin_loader_for_client = PluginLoader::new_from_different_client_id( &self.plugin_cache.clone(), @@ -685,10 +692,7 @@ impl<'a> PluginLoader<'a> { )?; plugin_loader_for_client .load_module_from_memory() - .and_then(|module| { - plugin_loader_for_client - .create_plugin_instance_environment_and_subscriptions(module) - }) + .and_then(|module| plugin_loader_for_client.create_plugin_environment(module)) .and_then(|(instance, plugin_env, subscriptions)| { plugin_loader_for_client.load_plugin_instance( &instance, @@ -730,6 +734,51 @@ impl<'a> PluginLoader<'a> { }, } } + fn create_plugin_instance_env_and_subscriptions( + &self, + module: &Module, + ) -> Result<(Instance, PluginEnv, Arc<Mutex<Subscriptions>>)> { + let err_context = || { + format!( + "Failed to create instance, plugin env and subscriptions for plugin {}", + self.plugin_id + ) + }; + let mut wasi_env = WasiState::new("Zellij") + .env("CLICOLOR_FORCE", "1") + .map_dir("/host", ".") + .and_then(|wasi| wasi.map_dir("/data", &self.plugin_own_data_dir)) + .and_then(|wasi| wasi.map_dir("/tmp", ZELLIJ_TMP_DIR.as_path())) + .and_then(|wasi| { + wasi.stdin(Box::new(Pipe::new())) + .stdout(Box::new(Pipe::new())) + .stderr(Box::new(LoggingPipe::new( + &self.plugin.location.to_string(), + self.plugin_id, + ))) + .finalize() + }) + .with_context(err_context)?; + let wasi = wasi_env.import_object(&module).with_context(err_context)?; + + let mut mut_plugin = self.plugin.clone(); + mut_plugin.set_tab_index(self.tab_index); + let plugin_env = PluginEnv { + plugin_id: self.plugin_id, + client_id: self.client_id, + plugin: mut_plugin, + senders: self.senders.clone(), + wasi_env, + plugin_own_data_dir: self.plugin_own_data_dir.clone(), + tab_index: self.tab_index, + }; + + let subscriptions = Arc::new(Mutex::new(HashSet::new())); + let zellij = zellij_exports(&self.store, &plugin_env, &subscriptions); + let instance = + Instance::new(&module, &zellij.chain_back(wasi)).with_context(err_context)?; + Ok((instance, plugin_env, subscriptions)) + } } fn create_plugin_fs_entries(plugin_own_data_dir: &PathBuf) -> Result<()> { diff --git a/zellij-server/src/plugins/plugin_map.rs b/zellij-server/src/plugins/plugin_map.rs index 4cb5ebc9d..0c3df931e 100644 --- a/zellij-server/src/plugins/plugin_map.rs +++ b/zellij-server/src/plugins/plugin_map.rs @@ -1,4 +1,6 @@ -use crate::plugins::wasm_bridge::PluginId; +use crate::plugins::plugin_loader::{PluginLoader, VersionMismatchError}; +use crate::plugins::zellij_exports::wasi_write_object; +use crate::plugins::PluginId; use std::{ collections::{HashMap, HashSet}, path::PathBuf, @@ -9,20 +11,183 @@ use wasmer_wasi::WasiEnv; use crate::{thread_bus::ThreadSenders, ClientId}; -use zellij_utils::{data::EventType, input::plugins::PluginConfig}; +use zellij_utils::errors::prelude::*; +use zellij_utils::{ + consts::VERSION, data::EventType, input::layout::RunPluginLocation, + input::plugins::PluginConfig, +}; // the idea here is to provide atomicity when adding/removing plugins from the map (eg. when a new // client connects) but to also allow updates/renders not to block each other // so when adding/removing from the map - everything is halted, that's life // but when cloning the internal RunningPlugin and Subscriptions atomics, we can call methods on // them without blocking other instances -pub type PluginMap = - HashMap<(PluginId, ClientId), (Arc<Mutex<RunningPlugin>>, Arc<Mutex<Subscriptions>>)>; +#[derive(Default)] +pub struct PluginMap { + plugin_assets: HashMap< + (PluginId, ClientId), + ( + Arc<Mutex<RunningPlugin>>, + Arc<Mutex<Subscriptions>>, + HashMap<String, Arc<Mutex<RunningWorker>>>, + ), + >, +} + +impl PluginMap { + pub fn remove_plugins( + &mut self, + pid: PluginId, + ) -> Vec& |