summaryrefslogtreecommitdiffstats
path: root/zellij-server
diff options
context:
space:
mode:
authorAram Drevekenin <aram@poor.dev>2023-04-28 15:26:39 +0200
committerGitHub <noreply@github.com>2023-04-28 15:26:39 +0200
commit1289643f89b8ea42a8adcfbbcee34d3dbf5e7176 (patch)
tree11f49470d3e9d9db16f2537b91416c6eca7490da /zellij-server
parenta29c6533850d0543c2966d0743d3775d72657a15 (diff)
feat(plugins): update and render plugins asynchronously (#2410)
* working-ish minus a few race conditions * relax atomicity * various refactoringz * remove commented out code * clarify some stuffs * refactor(plugins): move PluginMap and friends to a different file * refactor(plugins): move zellij_exports and friends to a different file * style(fmt): rustfmt * fix(e2e): adjust tests for flakiness async loading
Diffstat (limited to 'zellij-server')
-rw-r--r--zellij-server/src/plugins/mod.rs70
-rw-r--r--zellij-server/src/plugins/plugin_loader.rs244
-rw-r--r--zellij-server/src/plugins/plugin_map.rs94
-rw-r--r--zellij-server/src/plugins/wasm_bridge.rs622
-rw-r--r--zellij-server/src/plugins/zellij_exports.rs326
-rw-r--r--zellij-server/src/route.rs1
-rw-r--r--zellij-server/src/screen.rs9
7 files changed, 793 insertions, 573 deletions
diff --git a/zellij-server/src/plugins/mod.rs b/zellij-server/src/plugins/mod.rs
index eeb19c201..aaf6164b3 100644
--- a/zellij-server/src/plugins/mod.rs
+++ b/zellij-server/src/plugins/mod.rs
@@ -1,5 +1,7 @@
mod plugin_loader;
+mod plugin_map;
mod wasm_bridge;
+mod zellij_exports;
use log::info;
use std::{collections::HashMap, fs, path::PathBuf};
use wasmer::Store;
@@ -37,7 +39,6 @@ pub enum PluginInstruction {
Option<String>, // pane title
RunPlugin,
usize, // tab index
- ClientId,
Size,
),
Resize(u32, usize, usize), // plugin_id, columns, rows
@@ -91,7 +92,7 @@ pub(crate) fn plugin_thread_main(
err_ctx.add_call(ContextType::Plugin((&event).into()));
match event {
PluginInstruction::Load(should_float, pane_title, run, tab_index, client_id, size) => {
- match wasm_bridge.load_plugin(&run, tab_index, size, client_id) {
+ match wasm_bridge.load_plugin(&run, tab_index, size, Some(client_id)) {
Ok(plugin_id) => {
drop(bus.senders.send_to_screen(ScreenInstruction::AddPlugin(
should_float,
@@ -112,41 +113,38 @@ pub(crate) fn plugin_thread_main(
PluginInstruction::Unload(pid) => {
wasm_bridge.unload_plugin(pid)?;
},
- PluginInstruction::Reload(
- should_float,
- pane_title,
- run,
- tab_index,
- client_id,
- size,
- ) => match wasm_bridge.reload_plugin(&run) {
- Ok(_) => {
- let _ = bus
- .senders
- .send_to_server(ServerInstruction::UnblockInputThread);
- },
- Err(err) => match err.downcast_ref::<ZellijError>() {
- Some(ZellijError::PluginDoesNotExist) => {
- log::warn!("Plugin {} not found, starting it instead", run.location);
- match wasm_bridge.load_plugin(&run, tab_index, size, client_id) {
- Ok(plugin_id) => {
- drop(bus.senders.send_to_screen(ScreenInstruction::AddPlugin(
- should_float,
- run,
- pane_title,
- tab_index,
- plugin_id,
- )));
- },
- Err(e) => {
- log::error!("Failed to load plugin: {e}");
- },
- };
+ PluginInstruction::Reload(should_float, pane_title, run, tab_index, size) => {
+ match wasm_bridge.reload_plugin(&run) {
+ Ok(_) => {
+ let _ = bus
+ .senders
+ .send_to_server(ServerInstruction::UnblockInputThread);
},
- _ => {
- return Err(err);
+ Err(err) => match err.downcast_ref::<ZellijError>() {
+ Some(ZellijError::PluginDoesNotExist) => {
+ log::warn!("Plugin {} not found, starting it instead", run.location);
+ // we intentionally do not provide the client_id here because it belongs to
+ // the cli who spawned the command and is not an existing client_id
+ match wasm_bridge.load_plugin(&run, tab_index, size, None) {
+ Ok(plugin_id) => {
+ drop(bus.senders.send_to_screen(ScreenInstruction::AddPlugin(
+ should_float,
+ run,
+ pane_title,
+ tab_index,
+ plugin_id,
+ )));
+ },
+ Err(e) => {
+ log::error!("Failed to load plugin: {e}");
+ },
+ };
+ },
+ _ => {
+ return Err(err);
+ },
},
- },
+ }
},
PluginInstruction::Resize(pid, new_columns, new_rows) => {
wasm_bridge.resize_plugin(pid, new_columns, new_rows)?;
@@ -184,7 +182,7 @@ pub(crate) fn plugin_thread_main(
for run_instruction in extracted_run_instructions {
if let Some(Run::Plugin(run)) = run_instruction {
let plugin_id =
- wasm_bridge.load_plugin(&run, tab_index, size, client_id)?;
+ wasm_bridge.load_plugin(&run, tab_index, size, Some(client_id))?;
plugin_ids.entry(run.location).or_default().push(plugin_id);
}
}
diff --git a/zellij-server/src/plugins/plugin_loader.rs b/zellij-server/src/plugins/plugin_loader.rs
index c54133889..86afd8ca5 100644
--- a/zellij-server/src/plugins/plugin_loader.rs
+++ b/zellij-server/src/plugins/plugin_loader.rs
@@ -1,4 +1,5 @@
-use crate::plugins::wasm_bridge::{wasi_read_string, zellij_exports, PluginEnv, PluginMap};
+use crate::plugins::plugin_map::{PluginEnv, PluginMap, RunningPlugin, Subscriptions};
+use crate::plugins::zellij_exports::{wasi_read_string, zellij_exports};
use highway::{HighwayHash, PortableHash};
use log::info;
use semver::Version;
@@ -146,21 +147,8 @@ fn assert_plugin_version(instance: &Instance, plugin_env: &PluginEnv) -> Result<
Ok(())
}
-fn load_plugin_instance(instance: &mut Instance) -> Result<()> {
- let err_context = || format!("failed to load plugin from instance {instance:#?}");
-
- let load_function = instance
- .exports
- .get_function("_start")
- .with_context(err_context)?;
- // This eventually calls the `.load()` method
- load_function.call(&[]).with_context(err_context)?;
- Ok(())
-}
-
pub struct PluginLoader<'a> {
plugin_cache: Arc<Mutex<HashMap<PathBuf, Module>>>,
- plugin_map: Arc<Mutex<PluginMap>>,
plugin_path: PathBuf,
loading_indication: &'a mut LoadingIndication,
senders: ThreadSenders,
@@ -206,15 +194,20 @@ impl<'a> PluginLoader<'a> {
)?;
plugin_loader
.load_module_from_memory()
- .and_then(|module| plugin_loader.create_plugin_instance_and_environment(module))
- .and_then(|(instance, plugin_env)| {
- plugin_loader.load_plugin_instance(&instance, &plugin_env)?;
- plugin_loader.clone_instance_for_other_clients(
+ .and_then(|module| {
+ plugin_loader.create_plugin_instance_environment_and_subscriptions(module)
+ })
+ .and_then(|(instance, plugin_env, subscriptions)| {
+ plugin_loader.load_plugin_instance(
&instance,
&plugin_env,
- &connected_clients,
+ &plugin_map,
+ &subscriptions,
)
})
+ .and_then(|_| {
+ plugin_loader.clone_instance_for_other_clients(&connected_clients, &plugin_map)
+ })
.with_context(err_context)?;
display_loading_stage!(end, loading_indication, senders, plugin_id);
Ok(())
@@ -237,7 +230,6 @@ impl<'a> PluginLoader<'a> {
let err_context = || format!("failed to start plugin {plugin:#?} for client {client_id}");
let mut plugin_loader = PluginLoader::new(
&plugin_cache,
- &plugin_map,
loading_indication,
&senders,
plugin_id,
@@ -252,19 +244,69 @@ impl<'a> PluginLoader<'a> {
.load_module_from_memory()
.or_else(|_e| plugin_loader.load_module_from_hd_cache())
.or_else(|_e| plugin_loader.compile_module())
- .and_then(|module| plugin_loader.create_plugin_instance_and_environment(module))
- .and_then(|(instance, plugin_env)| {
- plugin_loader.load_plugin_instance(&instance, &plugin_env)?;
- plugin_loader.clone_instance_for_other_clients(
+ .and_then(|module| {
+ plugin_loader.create_plugin_instance_environment_and_subscriptions(module)
+ })
+ .and_then(|(instance, plugin_env, subscriptions)| {
+ plugin_loader.load_plugin_instance(
&instance,
&plugin_env,
+ &plugin_map,
+ &subscriptions,
+ )
+ })
+ .and_then(|_| {
+ plugin_loader.clone_instance_for_other_clients(
&connected_clients.lock().unwrap(),
+ &plugin_map,
)
})
.with_context(err_context)?;
display_loading_stage!(end, loading_indication, senders, plugin_id);
Ok(())
}
+ pub fn add_client(
+ client_id: ClientId,
+ plugin_dir: PathBuf,
+ plugin_cache: Arc<Mutex<HashMap<PathBuf, Module>>>,
+ senders: ThreadSenders,
+ store: Store,
+ plugin_map: Arc<Mutex<PluginMap>>,
+ connected_clients: Arc<Mutex<Vec<ClientId>>>,
+ loading_indication: &mut LoadingIndication,
+ ) -> Result<()> {
+ let mut new_plugins = HashSet::new();
+ for (&(plugin_id, _), _) in &*plugin_map.lock().unwrap() {
+ new_plugins.insert((plugin_id, client_id));
+ }
+ for (plugin_id, existing_client_id) in new_plugins {
+ let mut plugin_loader = PluginLoader::new_from_different_client_id(
+ &plugin_cache,
+ &plugin_map,
+ loading_indication,
+ &senders,
+ plugin_id,
+ existing_client_id,
+ &store,
+ &plugin_dir,
+ )?;
+ plugin_loader
+ .load_module_from_memory()
+ .and_then(|module| {
+ plugin_loader.create_plugin_instance_environment_and_subscriptions(module)
+ })
+ .and_then(|(instance, plugin_env, subscriptions)| {
+ plugin_loader.load_plugin_instance(
+ &instance,
+ &plugin_env,
+ &plugin_map,
+ &subscriptions,
+ )
+ })?
+ }
+ connected_clients.lock().unwrap().push(client_id);
+ Ok(())
+ }
pub fn reload_plugin(
plugin_id: u32,
@@ -297,22 +339,26 @@ impl<'a> PluginLoader<'a> {
)?;
plugin_loader
.compile_module()
- .and_then(|module| plugin_loader.create_plugin_instance_and_environment(module))
- .and_then(|(instance, plugin_env)| {
- plugin_loader.load_plugin_instance(&instance, &plugin_env)?;
- plugin_loader.clone_instance_for_other_clients(
+ .and_then(|module| {
+ plugin_loader.create_plugin_instance_environment_and_subscriptions(module)
+ })
+ .and_then(|(instance, plugin_env, subscriptions)| {
+ plugin_loader.load_plugin_instance(
&instance,
&plugin_env,
- &connected_clients,
+ &plugin_map,
+ &subscriptions,
)
})
+ .and_then(|_| {
+ plugin_loader.clone_instance_for_other_clients(&connected_clients, &plugin_map)
+ })
.with_context(err_context)?;
display_loading_stage!(end, loading_indication, senders, plugin_id);
Ok(())
}
pub fn new(
plugin_cache: &Arc<Mutex<HashMap<PathBuf, Module>>>,
- plugin_map: &Arc<Mutex<PluginMap>>,
loading_indication: &'a mut LoadingIndication,
senders: &ThreadSenders,
plugin_id: u32,
@@ -328,7 +374,6 @@ impl<'a> PluginLoader<'a> {
let plugin_path = plugin.path.clone();
Ok(PluginLoader {
plugin_cache: plugin_cache.clone(),
- plugin_map: plugin_map.clone(),
plugin_path,
loading_indication,
senders: senders.clone(),
@@ -354,19 +399,63 @@ impl<'a> PluginLoader<'a> {
plugin_dir: &'a PathBuf,
) -> Result<Self> {
let err_context = || "Failed to find existing plugin";
- let (_old_instance, old_user_env, (rows, cols)) = {
+ let (running_plugin, _subscriptions) = {
let mut plugin_map = plugin_map.lock().unwrap();
plugin_map
.remove(&(plugin_id, client_id))
.with_context(err_context)?
};
- let tab_index = old_user_env.tab_index;
- let size = Size { rows, cols };
- let plugin_config = old_user_env.plugin.clone();
- loading_indication.set_name(old_user_env.name());
+ let running_plugin = running_plugin.lock().unwrap();
+ let tab_index = running_plugin.plugin_env.tab_index;
+ let size = Size {
+ rows: running_plugin.rows,
+ cols: running_plugin.columns,
+ };
+ let plugin_config = running_plugin.plugin_env.plugin.clone();
+ loading_indication.set_name(running_plugin.plugin_env.name());
+ PluginLoader::new(
+ plugin_cache,
+ loading_indication,
+ senders,
+ plugin_id,
+ client_id,
+ store,
+ plugin_config,
+ plugin_dir,
+ tab_index,
+ size,
+ )
+ }
+ pub fn new_from_different_client_id(
+ plugin_cache: &Arc<Mutex<HashMap<PathBuf, Module>>>,
+ plugin_map: &Arc<Mutex<PluginMap>>,
+ loading_indication: &'a mut LoadingIndication,
+ senders: &ThreadSenders,
+ plugin_id: u32,
+ client_id: ClientId,
+ store: &Store,
+ plugin_dir: &'a PathBuf,
+ ) -> Result<Self> {
+ let err_context = || "Failed to find existing plugin";
+ let (running_plugin, _subscriptions) = {
+ let plugin_map = plugin_map.lock().unwrap();
+ plugin_map
+ .iter()
+ .find(|((p_id, _c_id), _)| p_id == &plugin_id)
+ .with_context(err_context)?
+ .1
+ .clone()
+ };
+ let running_plugin = running_plugin.lock().unwrap();
+ let tab_index = running_plugin.plugin_env.tab_index;
+ let size = Size {
+ rows: running_plugin.rows,
+ cols: running_plugin.columns,
+ };
+ let plugin_config = running_plugin.plugin_env.plugin.clone();
+ loading_indication.set_name(running_plugin.plugin_env.name());
PluginLoader::new(
plugin_cache,
- plugin_map,
loading_indication,
senders,
plugin_id,
@@ -464,10 +553,10 @@ impl<'a> PluginLoader<'a> {
.with_context(err_context)?;
Ok(module)
}
- pub fn create_plugin_instance_and_environment(
+ pub fn create_plugin_instance_environment_and_subscriptions(
&mut self,
module: Module,
- ) -> Result<(Instance, PluginEnv)> {
+ ) -> Result<(Instance, PluginEnv, Arc<Mutex<Subscriptions>>)> {
let err_context = || {
format!(
"Failed to create instance and plugin env for plugin {}",
@@ -499,12 +588,12 @@ impl<'a> PluginLoader<'a> {
plugin: mut_plugin,
senders: self.senders.clone(),
wasi_env,
- subscriptions: Arc::new(Mutex::new(HashSet::new())),
plugin_own_data_dir: self.plugin_own_data_dir.clone(),
tab_index: self.tab_index,
};
- let zellij = zellij_exports(&self.store, &plugin_env);
+ let subscriptions = Arc::new(Mutex::new(HashSet::new()));
+ let zellij = zellij_exports(&self.store, &plugin_env, &subscriptions);
let instance =
Instance::new(&module, &zellij.chain_back(wasi)).with_context(err_context)?;
assert_plugin_version(&instance, &plugin_env).with_context(err_context)?;
@@ -514,12 +603,14 @@ impl<'a> PluginLoader<'a> {
.lock()
.unwrap()
.insert(cloned_plugin.path, module);
- Ok((instance, plugin_env))
+ Ok((instance, plugin_env, subscriptions))
}
pub fn load_plugin_instance(
&mut self,
instance: &Instance,
plugin_env: &PluginEnv,
+ plugin_map: &Arc<Mutex<PluginMap>>,
+ subscriptions: &Arc<Mutex<Subscriptions>>,
) -> Result<()> {
let err_context = || format!("failed to load plugin from instance {instance:#?}");
let main_user_instance = instance.clone();
@@ -548,13 +639,16 @@ impl<'a> PluginLoader<'a> {
self.senders,
self.plugin_id
);
- let mut plugin_map = self.plugin_map.lock().unwrap();
- plugin_map.insert(
+ plugin_map.lock().unwrap().insert(
(self.plugin_id, self.client_id),
(
- main_user_instance,
- main_user_env,
- (self.size.rows, self.size.cols),
+ Arc::new(Mutex::new(RunningPlugin::new(
+ main_user_instance,
+ main_user_env,
+ self.size.rows,
+ self.size.cols,
+ ))),
+ subscriptions.clone(),
),
);
display_loading_stage!(
@@ -567,9 +661,8 @@ impl<'a> PluginLoader<'a> {
}
pub fn clone_instance_for_other_clients(
&mut self,
- instance: &Instance,
- plugin_env: &PluginEnv,
connected_clients: &[ClientId],
+ plugin_map: &Arc<Mutex<PluginMap>>,
) -> Result<()> {
if !connected_clients.is_empty() {
display_loading_stage!(
@@ -578,14 +671,32 @@ impl<'a> PluginLoader<'a> {
self.senders,
self.plugin_id
);
- let mut plugin_map = self.plugin_map.lock().unwrap();
for client_id in connected_clients {
- let (instance, new_plugin_env) =
- clone_plugin_for_client(&plugin_env, *client_id, &instance, &self.store)?;
- plugin_map.insert(
- (self.plugin_id, *client_id),
- (instance, new_plugin_env, (self.size.rows, self.size.cols)),
- );
+ let mut loading_indication = LoadingIndication::new("".into());
+ let mut plugin_loader_for_client = PluginLoader::new_from_different_client_id(
+ &self.plugin_cache.clone(),
+ &plugin_map,
+ &mut loading_indication,
+ &self.senders.clone(),
+ self.plugin_id,
+ *client_id,
+ &self.store,
+ &self.plugin_dir,
+ )?;
+ plugin_loader_for_client
+ .load_module_from_memory()
+ .and_then(|module| {
+ plugin_loader_for_client
+ .create_plugin_instance_environment_and_subscriptions(module)
+ })
+ .and_then(|(instance, plugin_env, subscriptions)| {
+ plugin_loader_for_client.load_plugin_instance(
+ &instance,
+ &plugin_env,
+ plugin_map,
+ &subscriptions,
+ )
+ })?
}
display_loading_stage!(
indicate_cloning_plugin_for_other_clients_success,
@@ -633,24 +744,3 @@ fn create_plugin_fs_entries(plugin_own_data_dir: &PathBuf) -> Result<()> {
.with_context(err_context)?;
Ok(())
}
-
-fn clone_plugin_for_client(
- plugin_env: &PluginEnv,
- client_id: ClientId,
- instance: &Instance,
- store: &Store,
-) -> Result<(Instance, PluginEnv)> {
- let err_context = || format!("Failed to clone plugin for client {client_id}");
- let mut new_plugin_env = plugin_env.clone();
- new_plugin_env.client_id = client_id;
- let module = instance.module().clone();
- let wasi = new_plugin_env
- .wasi_env
- .import_object(&module)
- .with_context(err_context)?;
- let zellij = zellij_exports(store, &new_plugin_env);
- let mut instance =
- Instance::new(&module, &zellij.chain_back(wasi)).with_context(err_context)?;
- load_plugin_instance(&mut instance).with_context(err_context)?;
- Ok((instance, new_plugin_env))
-}
diff --git a/zellij-server/src/plugins/plugin_map.rs b/zellij-server/src/plugins/plugin_map.rs
new file mode 100644
index 000000000..4cb5ebc9d
--- /dev/null
+++ b/zellij-server/src/plugins/plugin_map.rs
@@ -0,0 +1,94 @@
+use crate::plugins::wasm_bridge::PluginId;
+use std::{
+ collections::{HashMap, HashSet},
+ path::PathBuf,
+ sync::{Arc, Mutex},
+};
+use wasmer::Instance;
+use wasmer_wasi::WasiEnv;
+
+use crate::{thread_bus::ThreadSenders, ClientId};
+
+use zellij_utils::{data::EventType, input::plugins::PluginConfig};
+
+// the idea here is to provide atomicity when adding/removing plugins from the map (eg. when a new
+// client connects) but to also allow updates/renders not to block each other
+// so when adding/removing from the map - everything is halted, that's life
+// but when cloning the internal RunningPlugin and Subscriptions atomics, we can call methods on
+// them without blocking other instances
+pub type PluginMap =
+ HashMap<(PluginId, ClientId), (Arc<Mutex<RunningPlugin>>, Arc<Mutex<Subscriptions>>)>;
+pub type Subscriptions = HashSet<EventType>;
+
+#[derive(Clone)]
+pub struct PluginEnv {
+ pub plugin_id: u32,
+ pub plugin: PluginConfig,
+ pub senders: ThreadSenders,
+ pub wasi_env: WasiEnv,
+ pub tab_index: usize,
+ pub client_id: ClientId,
+ #[allow(dead_code)]
+ pub plugin_own_data_dir: PathBuf,
+}
+
+impl PluginEnv {
+ // Get the name (path) of the containing plugin
+ pub fn name(&self) -> String {
+ format!(
+ "{} (ID {})",
+ self.plugin.path.display().to_string(),
+ self.plugin_id
+ )
+ }
+}
+
+#[derive(Eq, PartialEq, Hash)]
+pub enum AtomicEvent {
+ Resize,
+}
+
+pub struct RunningPlugin {
+ pub instance: Instance,
+ pub plugin_env: PluginEnv,
+ pub rows: usize,
+ pub columns: usize,
+ next_event_ids: HashMap<AtomicEvent, usize>,
+ last_applied_event_ids: HashMap<AtomicEvent, usize>,
+}
+
+impl RunningPlugin {
+ pub fn new(instance: Instance, plugin_env: PluginEnv, rows: usize, columns: usize) -> Self {
+ RunningPlugin {
+ instance,
+ plugin_env,
+ rows,
+ columns,
+ next_event_ids: HashMap::new(),
+ last_applied_event_ids: HashMap::new(),
+ }
+ }
+ pub fn next_event_id(&mut self, atomic_event: AtomicEvent) -> usize {
+ // TODO: probably not usize...
+ let current_event_id = *self.next_event_ids.get(&atomic_event).unwrap_or(&0);
+ if current_event_id < usize::MAX {
+ let next_event_id = current_event_id + 1;
+ self.next_event_ids.insert(atomic_event, next_event_id);
+ current_event_id
+ } else {
+ let current_event_id = 0;
+ let next_event_id = 1;
+ self.last_applied_event_ids.remove(&atomic_event);
+ self.next_event_ids.insert(atomic_event, next_event_id);
+ current_event_id
+ }
+ }
+ pub fn apply_event_id(&mut self, atomic_event: AtomicEvent, event_id: usize) -> bool {
+ if &event_id >= self.last_applied_event_ids.get(&atomic_event).unwrap_or(&0) {
+ self.last_applied_event_ids.insert(atomic_event, event_id);
+ true
+ } else {
+ false
+ }
+ }
+}
diff --git a/zellij-server/src/plugins/wasm_bridge.rs b/zellij-server/src/plugins/wasm_bridge.rs
index 119a1d24b..45a516d3e 100644
--- a/zellij-server/src/plugins/wasm_bridge.rs
+++ b/zellij-server/src/plugins/wasm_bridge.rs
@@ -1,79 +1,36 @@
use super::PluginInstruction;
use crate::plugins::plugin_loader::{PluginLoader, VersionMismatchError};
-use log::{debug, info, warn};
-use serde::{de::DeserializeOwned, Serialize};
+use crate::plugins::plugin_map::{AtomicEvent, PluginEnv, PluginMap};
+use crate::plugins::zellij_exports::{wasi_read_string, wasi_write_object};
+use log::info;
use std::{
collections::{HashMap, HashSet},
fmt::Display,
path::PathBuf,
- process,
str::FromStr,
sync::{Arc, Mutex},
- thread,
- time::{Duration, Instant},