diff options
author | Aram Drevekenin <aram@poor.dev> | 2023-04-28 15:26:39 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-28 15:26:39 +0200 |
commit | 1289643f89b8ea42a8adcfbbcee34d3dbf5e7176 (patch) | |
tree | 11f49470d3e9d9db16f2537b91416c6eca7490da /zellij-server | |
parent | a29c6533850d0543c2966d0743d3775d72657a15 (diff) |
feat(plugins): update and render plugins asynchronously (#2410)
* working-ish minus a few race conditions
* relax atomicity
* various refactoringz
* remove commented out code
* clarify some stuffs
* refactor(plugins): move PluginMap and friends to a different file
* refactor(plugins): move zellij_exports and friends to a different file
* style(fmt): rustfmt
* fix(e2e): adjust tests for flakiness async loading
Diffstat (limited to 'zellij-server')
-rw-r--r-- | zellij-server/src/plugins/mod.rs | 70 | ||||
-rw-r--r-- | zellij-server/src/plugins/plugin_loader.rs | 244 | ||||
-rw-r--r-- | zellij-server/src/plugins/plugin_map.rs | 94 | ||||
-rw-r--r-- | zellij-server/src/plugins/wasm_bridge.rs | 622 | ||||
-rw-r--r-- | zellij-server/src/plugins/zellij_exports.rs | 326 | ||||
-rw-r--r-- | zellij-server/src/route.rs | 1 | ||||
-rw-r--r-- | zellij-server/src/screen.rs | 9 |
7 files changed, 793 insertions, 573 deletions
diff --git a/zellij-server/src/plugins/mod.rs b/zellij-server/src/plugins/mod.rs index eeb19c201..aaf6164b3 100644 --- a/zellij-server/src/plugins/mod.rs +++ b/zellij-server/src/plugins/mod.rs @@ -1,5 +1,7 @@ mod plugin_loader; +mod plugin_map; mod wasm_bridge; +mod zellij_exports; use log::info; use std::{collections::HashMap, fs, path::PathBuf}; use wasmer::Store; @@ -37,7 +39,6 @@ pub enum PluginInstruction { Option<String>, // pane title RunPlugin, usize, // tab index - ClientId, Size, ), Resize(u32, usize, usize), // plugin_id, columns, rows @@ -91,7 +92,7 @@ pub(crate) fn plugin_thread_main( err_ctx.add_call(ContextType::Plugin((&event).into())); match event { PluginInstruction::Load(should_float, pane_title, run, tab_index, client_id, size) => { - match wasm_bridge.load_plugin(&run, tab_index, size, client_id) { + match wasm_bridge.load_plugin(&run, tab_index, size, Some(client_id)) { Ok(plugin_id) => { drop(bus.senders.send_to_screen(ScreenInstruction::AddPlugin( should_float, @@ -112,41 +113,38 @@ pub(crate) fn plugin_thread_main( PluginInstruction::Unload(pid) => { wasm_bridge.unload_plugin(pid)?; }, - PluginInstruction::Reload( - should_float, - pane_title, - run, - tab_index, - client_id, - size, - ) => match wasm_bridge.reload_plugin(&run) { - Ok(_) => { - let _ = bus - .senders - .send_to_server(ServerInstruction::UnblockInputThread); - }, - Err(err) => match err.downcast_ref::<ZellijError>() { - Some(ZellijError::PluginDoesNotExist) => { - log::warn!("Plugin {} not found, starting it instead", run.location); - match wasm_bridge.load_plugin(&run, tab_index, size, client_id) { - Ok(plugin_id) => { - drop(bus.senders.send_to_screen(ScreenInstruction::AddPlugin( - should_float, - run, - pane_title, - tab_index, - plugin_id, - ))); - }, - Err(e) => { - log::error!("Failed to load plugin: {e}"); - }, - }; + PluginInstruction::Reload(should_float, pane_title, run, tab_index, size) => { + match wasm_bridge.reload_plugin(&run) { + Ok(_) => { + let _ = bus + .senders + .send_to_server(ServerInstruction::UnblockInputThread); }, - _ => { - return Err(err); + Err(err) => match err.downcast_ref::<ZellijError>() { + Some(ZellijError::PluginDoesNotExist) => { + log::warn!("Plugin {} not found, starting it instead", run.location); + // we intentionally do not provide the client_id here because it belongs to + // the cli who spawned the command and is not an existing client_id + match wasm_bridge.load_plugin(&run, tab_index, size, None) { + Ok(plugin_id) => { + drop(bus.senders.send_to_screen(ScreenInstruction::AddPlugin( + should_float, + run, + pane_title, + tab_index, + plugin_id, + ))); + }, + Err(e) => { + log::error!("Failed to load plugin: {e}"); + }, + }; + }, + _ => { + return Err(err); + }, }, - }, + } }, PluginInstruction::Resize(pid, new_columns, new_rows) => { wasm_bridge.resize_plugin(pid, new_columns, new_rows)?; @@ -184,7 +182,7 @@ pub(crate) fn plugin_thread_main( for run_instruction in extracted_run_instructions { if let Some(Run::Plugin(run)) = run_instruction { let plugin_id = - wasm_bridge.load_plugin(&run, tab_index, size, client_id)?; + wasm_bridge.load_plugin(&run, tab_index, size, Some(client_id))?; plugin_ids.entry(run.location).or_default().push(plugin_id); } } diff --git a/zellij-server/src/plugins/plugin_loader.rs b/zellij-server/src/plugins/plugin_loader.rs index c54133889..86afd8ca5 100644 --- a/zellij-server/src/plugins/plugin_loader.rs +++ b/zellij-server/src/plugins/plugin_loader.rs @@ -1,4 +1,5 @@ -use crate::plugins::wasm_bridge::{wasi_read_string, zellij_exports, PluginEnv, PluginMap}; +use crate::plugins::plugin_map::{PluginEnv, PluginMap, RunningPlugin, Subscriptions}; +use crate::plugins::zellij_exports::{wasi_read_string, zellij_exports}; use highway::{HighwayHash, PortableHash}; use log::info; use semver::Version; @@ -146,21 +147,8 @@ fn assert_plugin_version(instance: &Instance, plugin_env: &PluginEnv) -> Result< Ok(()) } -fn load_plugin_instance(instance: &mut Instance) -> Result<()> { - let err_context = || format!("failed to load plugin from instance {instance:#?}"); - - let load_function = instance - .exports - .get_function("_start") - .with_context(err_context)?; - // This eventually calls the `.load()` method - load_function.call(&[]).with_context(err_context)?; - Ok(()) -} - pub struct PluginLoader<'a> { plugin_cache: Arc<Mutex<HashMap<PathBuf, Module>>>, - plugin_map: Arc<Mutex<PluginMap>>, plugin_path: PathBuf, loading_indication: &'a mut LoadingIndication, senders: ThreadSenders, @@ -206,15 +194,20 @@ impl<'a> PluginLoader<'a> { )?; plugin_loader .load_module_from_memory() - .and_then(|module| plugin_loader.create_plugin_instance_and_environment(module)) - .and_then(|(instance, plugin_env)| { - plugin_loader.load_plugin_instance(&instance, &plugin_env)?; - plugin_loader.clone_instance_for_other_clients( + .and_then(|module| { + plugin_loader.create_plugin_instance_environment_and_subscriptions(module) + }) + .and_then(|(instance, plugin_env, subscriptions)| { + plugin_loader.load_plugin_instance( &instance, &plugin_env, - &connected_clients, + &plugin_map, + &subscriptions, ) }) + .and_then(|_| { + plugin_loader.clone_instance_for_other_clients(&connected_clients, &plugin_map) + }) .with_context(err_context)?; display_loading_stage!(end, loading_indication, senders, plugin_id); Ok(()) @@ -237,7 +230,6 @@ impl<'a> PluginLoader<'a> { let err_context = || format!("failed to start plugin {plugin:#?} for client {client_id}"); let mut plugin_loader = PluginLoader::new( &plugin_cache, - &plugin_map, loading_indication, &senders, plugin_id, @@ -252,19 +244,69 @@ impl<'a> PluginLoader<'a> { .load_module_from_memory() .or_else(|_e| plugin_loader.load_module_from_hd_cache()) .or_else(|_e| plugin_loader.compile_module()) - .and_then(|module| plugin_loader.create_plugin_instance_and_environment(module)) - .and_then(|(instance, plugin_env)| { - plugin_loader.load_plugin_instance(&instance, &plugin_env)?; - plugin_loader.clone_instance_for_other_clients( + .and_then(|module| { + plugin_loader.create_plugin_instance_environment_and_subscriptions(module) + }) + .and_then(|(instance, plugin_env, subscriptions)| { + plugin_loader.load_plugin_instance( &instance, &plugin_env, + &plugin_map, + &subscriptions, + ) + }) + .and_then(|_| { + plugin_loader.clone_instance_for_other_clients( &connected_clients.lock().unwrap(), + &plugin_map, ) }) .with_context(err_context)?; display_loading_stage!(end, loading_indication, senders, plugin_id); Ok(()) } + pub fn add_client( + client_id: ClientId, + plugin_dir: PathBuf, + plugin_cache: Arc<Mutex<HashMap<PathBuf, Module>>>, + senders: ThreadSenders, + store: Store, + plugin_map: Arc<Mutex<PluginMap>>, + connected_clients: Arc<Mutex<Vec<ClientId>>>, + loading_indication: &mut LoadingIndication, + ) -> Result<()> { + let mut new_plugins = HashSet::new(); + for (&(plugin_id, _), _) in &*plugin_map.lock().unwrap() { + new_plugins.insert((plugin_id, client_id)); + } + for (plugin_id, existing_client_id) in new_plugins { + let mut plugin_loader = PluginLoader::new_from_different_client_id( + &plugin_cache, + &plugin_map, + loading_indication, + &senders, + plugin_id, + existing_client_id, + &store, + &plugin_dir, + )?; + plugin_loader + .load_module_from_memory() + .and_then(|module| { + plugin_loader.create_plugin_instance_environment_and_subscriptions(module) + }) + .and_then(|(instance, plugin_env, subscriptions)| { + plugin_loader.load_plugin_instance( + &instance, + &plugin_env, + &plugin_map, + &subscriptions, + ) + })? + } + connected_clients.lock().unwrap().push(client_id); + Ok(()) + } pub fn reload_plugin( plugin_id: u32, @@ -297,22 +339,26 @@ impl<'a> PluginLoader<'a> { )?; plugin_loader .compile_module() - .and_then(|module| plugin_loader.create_plugin_instance_and_environment(module)) - .and_then(|(instance, plugin_env)| { - plugin_loader.load_plugin_instance(&instance, &plugin_env)?; - plugin_loader.clone_instance_for_other_clients( + .and_then(|module| { + plugin_loader.create_plugin_instance_environment_and_subscriptions(module) + }) + .and_then(|(instance, plugin_env, subscriptions)| { + plugin_loader.load_plugin_instance( &instance, &plugin_env, - &connected_clients, + &plugin_map, + &subscriptions, ) }) + .and_then(|_| { + plugin_loader.clone_instance_for_other_clients(&connected_clients, &plugin_map) + }) .with_context(err_context)?; display_loading_stage!(end, loading_indication, senders, plugin_id); Ok(()) } pub fn new( plugin_cache: &Arc<Mutex<HashMap<PathBuf, Module>>>, - plugin_map: &Arc<Mutex<PluginMap>>, loading_indication: &'a mut LoadingIndication, senders: &ThreadSenders, plugin_id: u32, @@ -328,7 +374,6 @@ impl<'a> PluginLoader<'a> { let plugin_path = plugin.path.clone(); Ok(PluginLoader { plugin_cache: plugin_cache.clone(), - plugin_map: plugin_map.clone(), plugin_path, loading_indication, senders: senders.clone(), @@ -354,19 +399,63 @@ impl<'a> PluginLoader<'a> { plugin_dir: &'a PathBuf, ) -> Result<Self> { let err_context = || "Failed to find existing plugin"; - let (_old_instance, old_user_env, (rows, cols)) = { + let (running_plugin, _subscriptions) = { let mut plugin_map = plugin_map.lock().unwrap(); plugin_map .remove(&(plugin_id, client_id)) .with_context(err_context)? }; - let tab_index = old_user_env.tab_index; - let size = Size { rows, cols }; - let plugin_config = old_user_env.plugin.clone(); - loading_indication.set_name(old_user_env.name()); + let running_plugin = running_plugin.lock().unwrap(); + let tab_index = running_plugin.plugin_env.tab_index; + let size = Size { + rows: running_plugin.rows, + cols: running_plugin.columns, + }; + let plugin_config = running_plugin.plugin_env.plugin.clone(); + loading_indication.set_name(running_plugin.plugin_env.name()); + PluginLoader::new( + plugin_cache, + loading_indication, + senders, + plugin_id, + client_id, + store, + plugin_config, + plugin_dir, + tab_index, + size, + ) + } + pub fn new_from_different_client_id( + plugin_cache: &Arc<Mutex<HashMap<PathBuf, Module>>>, + plugin_map: &Arc<Mutex<PluginMap>>, + loading_indication: &'a mut LoadingIndication, + senders: &ThreadSenders, + plugin_id: u32, + client_id: ClientId, + store: &Store, + plugin_dir: &'a PathBuf, + ) -> Result<Self> { + let err_context = || "Failed to find existing plugin"; + let (running_plugin, _subscriptions) = { + let plugin_map = plugin_map.lock().unwrap(); + plugin_map + .iter() + .find(|((p_id, _c_id), _)| p_id == &plugin_id) + .with_context(err_context)? + .1 + .clone() + }; + let running_plugin = running_plugin.lock().unwrap(); + let tab_index = running_plugin.plugin_env.tab_index; + let size = Size { + rows: running_plugin.rows, + cols: running_plugin.columns, + }; + let plugin_config = running_plugin.plugin_env.plugin.clone(); + loading_indication.set_name(running_plugin.plugin_env.name()); PluginLoader::new( plugin_cache, - plugin_map, loading_indication, senders, plugin_id, @@ -464,10 +553,10 @@ impl<'a> PluginLoader<'a> { .with_context(err_context)?; Ok(module) } - pub fn create_plugin_instance_and_environment( + pub fn create_plugin_instance_environment_and_subscriptions( &mut self, module: Module, - ) -> Result<(Instance, PluginEnv)> { + ) -> Result<(Instance, PluginEnv, Arc<Mutex<Subscriptions>>)> { let err_context = || { format!( "Failed to create instance and plugin env for plugin {}", @@ -499,12 +588,12 @@ impl<'a> PluginLoader<'a> { plugin: mut_plugin, senders: self.senders.clone(), wasi_env, - subscriptions: Arc::new(Mutex::new(HashSet::new())), plugin_own_data_dir: self.plugin_own_data_dir.clone(), tab_index: self.tab_index, }; - let zellij = zellij_exports(&self.store, &plugin_env); + let subscriptions = Arc::new(Mutex::new(HashSet::new())); + let zellij = zellij_exports(&self.store, &plugin_env, &subscriptions); let instance = Instance::new(&module, &zellij.chain_back(wasi)).with_context(err_context)?; assert_plugin_version(&instance, &plugin_env).with_context(err_context)?; @@ -514,12 +603,14 @@ impl<'a> PluginLoader<'a> { .lock() .unwrap() .insert(cloned_plugin.path, module); - Ok((instance, plugin_env)) + Ok((instance, plugin_env, subscriptions)) } pub fn load_plugin_instance( &mut self, instance: &Instance, plugin_env: &PluginEnv, + plugin_map: &Arc<Mutex<PluginMap>>, + subscriptions: &Arc<Mutex<Subscriptions>>, ) -> Result<()> { let err_context = || format!("failed to load plugin from instance {instance:#?}"); let main_user_instance = instance.clone(); @@ -548,13 +639,16 @@ impl<'a> PluginLoader<'a> { self.senders, self.plugin_id ); - let mut plugin_map = self.plugin_map.lock().unwrap(); - plugin_map.insert( + plugin_map.lock().unwrap().insert( (self.plugin_id, self.client_id), ( - main_user_instance, - main_user_env, - (self.size.rows, self.size.cols), + Arc::new(Mutex::new(RunningPlugin::new( + main_user_instance, + main_user_env, + self.size.rows, + self.size.cols, + ))), + subscriptions.clone(), ), ); display_loading_stage!( @@ -567,9 +661,8 @@ impl<'a> PluginLoader<'a> { } pub fn clone_instance_for_other_clients( &mut self, - instance: &Instance, - plugin_env: &PluginEnv, connected_clients: &[ClientId], + plugin_map: &Arc<Mutex<PluginMap>>, ) -> Result<()> { if !connected_clients.is_empty() { display_loading_stage!( @@ -578,14 +671,32 @@ impl<'a> PluginLoader<'a> { self.senders, self.plugin_id ); - let mut plugin_map = self.plugin_map.lock().unwrap(); for client_id in connected_clients { - let (instance, new_plugin_env) = - clone_plugin_for_client(&plugin_env, *client_id, &instance, &self.store)?; - plugin_map.insert( - (self.plugin_id, *client_id), - (instance, new_plugin_env, (self.size.rows, self.size.cols)), - ); + let mut loading_indication = LoadingIndication::new("".into()); + let mut plugin_loader_for_client = PluginLoader::new_from_different_client_id( + &self.plugin_cache.clone(), + &plugin_map, + &mut loading_indication, + &self.senders.clone(), + self.plugin_id, + *client_id, + &self.store, + &self.plugin_dir, + )?; + plugin_loader_for_client + .load_module_from_memory() + .and_then(|module| { + plugin_loader_for_client + .create_plugin_instance_environment_and_subscriptions(module) + }) + .and_then(|(instance, plugin_env, subscriptions)| { + plugin_loader_for_client.load_plugin_instance( + &instance, + &plugin_env, + plugin_map, + &subscriptions, + ) + })? } display_loading_stage!( indicate_cloning_plugin_for_other_clients_success, @@ -633,24 +744,3 @@ fn create_plugin_fs_entries(plugin_own_data_dir: &PathBuf) -> Result<()> { .with_context(err_context)?; Ok(()) } - -fn clone_plugin_for_client( - plugin_env: &PluginEnv, - client_id: ClientId, - instance: &Instance, - store: &Store, -) -> Result<(Instance, PluginEnv)> { - let err_context = || format!("Failed to clone plugin for client {client_id}"); - let mut new_plugin_env = plugin_env.clone(); - new_plugin_env.client_id = client_id; - let module = instance.module().clone(); - let wasi = new_plugin_env - .wasi_env - .import_object(&module) - .with_context(err_context)?; - let zellij = zellij_exports(store, &new_plugin_env); - let mut instance = - Instance::new(&module, &zellij.chain_back(wasi)).with_context(err_context)?; - load_plugin_instance(&mut instance).with_context(err_context)?; - Ok((instance, new_plugin_env)) -} diff --git a/zellij-server/src/plugins/plugin_map.rs b/zellij-server/src/plugins/plugin_map.rs new file mode 100644 index 000000000..4cb5ebc9d --- /dev/null +++ b/zellij-server/src/plugins/plugin_map.rs @@ -0,0 +1,94 @@ +use crate::plugins::wasm_bridge::PluginId; +use std::{ + collections::{HashMap, HashSet}, + path::PathBuf, + sync::{Arc, Mutex}, +}; +use wasmer::Instance; +use wasmer_wasi::WasiEnv; + +use crate::{thread_bus::ThreadSenders, ClientId}; + +use zellij_utils::{data::EventType, input::plugins::PluginConfig}; + +// the idea here is to provide atomicity when adding/removing plugins from the map (eg. when a new +// client connects) but to also allow updates/renders not to block each other +// so when adding/removing from the map - everything is halted, that's life +// but when cloning the internal RunningPlugin and Subscriptions atomics, we can call methods on +// them without blocking other instances +pub type PluginMap = + HashMap<(PluginId, ClientId), (Arc<Mutex<RunningPlugin>>, Arc<Mutex<Subscriptions>>)>; +pub type Subscriptions = HashSet<EventType>; + +#[derive(Clone)] +pub struct PluginEnv { + pub plugin_id: u32, + pub plugin: PluginConfig, + pub senders: ThreadSenders, + pub wasi_env: WasiEnv, + pub tab_index: usize, + pub client_id: ClientId, + #[allow(dead_code)] + pub plugin_own_data_dir: PathBuf, +} + +impl PluginEnv { + // Get the name (path) of the containing plugin + pub fn name(&self) -> String { + format!( + "{} (ID {})", + self.plugin.path.display().to_string(), + self.plugin_id + ) + } +} + +#[derive(Eq, PartialEq, Hash)] +pub enum AtomicEvent { + Resize, +} + +pub struct RunningPlugin { + pub instance: Instance, + pub plugin_env: PluginEnv, + pub rows: usize, + pub columns: usize, + next_event_ids: HashMap<AtomicEvent, usize>, + last_applied_event_ids: HashMap<AtomicEvent, usize>, +} + +impl RunningPlugin { + pub fn new(instance: Instance, plugin_env: PluginEnv, rows: usize, columns: usize) -> Self { + RunningPlugin { + instance, + plugin_env, + rows, + columns, + next_event_ids: HashMap::new(), + last_applied_event_ids: HashMap::new(), + } + } + pub fn next_event_id(&mut self, atomic_event: AtomicEvent) -> usize { + // TODO: probably not usize... + let current_event_id = *self.next_event_ids.get(&atomic_event).unwrap_or(&0); + if current_event_id < usize::MAX { + let next_event_id = current_event_id + 1; + self.next_event_ids.insert(atomic_event, next_event_id); + current_event_id + } else { + let current_event_id = 0; + let next_event_id = 1; + self.last_applied_event_ids.remove(&atomic_event); + self.next_event_ids.insert(atomic_event, next_event_id); + current_event_id + } + } + pub fn apply_event_id(&mut self, atomic_event: AtomicEvent, event_id: usize) -> bool { + if &event_id >= self.last_applied_event_ids.get(&atomic_event).unwrap_or(&0) { + self.last_applied_event_ids.insert(atomic_event, event_id); + true + } else { + false + } + } +} diff --git a/zellij-server/src/plugins/wasm_bridge.rs b/zellij-server/src/plugins/wasm_bridge.rs index 119a1d24b..45a516d3e 100644 --- a/zellij-server/src/plugins/wasm_bridge.rs +++ b/zellij-server/src/plugins/wasm_bridge.rs @@ -1,79 +1,36 @@ use super::PluginInstruction; use crate::plugins::plugin_loader::{PluginLoader, VersionMismatchError}; -use log::{debug, info, warn}; -use serde::{de::DeserializeOwned, Serialize}; +use crate::plugins::plugin_map::{AtomicEvent, PluginEnv, PluginMap}; +use crate::plugins::zellij_exports::{wasi_read_string, wasi_write_object}; +use log::info; use std::{ collections::{HashMap, HashSet}, fmt::Display, path::PathBuf, - process, str::FromStr, sync::{Arc, Mutex}, - thread, - time::{Duration, Instant}, |