diff options
Diffstat (limited to 'zellij-server/src/plugins/wasm_bridge.rs')
-rw-r--r-- | zellij-server/src/plugins/wasm_bridge.rs | 622 |
1 files changed, 170 insertions, 452 deletions
diff --git a/zellij-server/src/plugins/wasm_bridge.rs b/zellij-server/src/plugins/wasm_bridge.rs index 119a1d24b..45a516d3e 100644 --- a/zellij-server/src/plugins/wasm_bridge.rs +++ b/zellij-server/src/plugins/wasm_bridge.rs @@ -1,79 +1,36 @@ use super::PluginInstruction; use crate::plugins::plugin_loader::{PluginLoader, VersionMismatchError}; -use log::{debug, info, warn}; -use serde::{de::DeserializeOwned, Serialize}; +use crate::plugins::plugin_map::{AtomicEvent, PluginEnv, PluginMap}; +use crate::plugins::zellij_exports::{wasi_read_string, wasi_write_object}; +use log::info; use std::{ collections::{HashMap, HashSet}, fmt::Display, path::PathBuf, - process, str::FromStr, sync::{Arc, Mutex}, - thread, - time::{Duration, Instant}, }; -use wasmer::{ - imports, ChainableNamedResolver, Function, ImportObject, Instance, Module, Store, Value, - WasmerEnv, -}; -use wasmer_wasi::WasiEnv; +use wasmer::{Instance, Module, Store, Value}; use zellij_utils::async_std::task::{self, JoinHandle}; use crate::{ - background_jobs::BackgroundJob, - panes::PaneId, - pty::{ClientOrTabIndex, PtyInstruction}, - screen::ScreenInstruction, - thread_bus::ThreadSenders, - ui::loading_indication::LoadingIndication, - ClientId, + background_jobs::BackgroundJob, screen::ScreenInstruction, thread_bus::ThreadSenders, + ui::loading_indication::LoadingIndication, ClientId, }; use zellij_utils::{ consts::VERSION, - data::{Event, EventType, PluginIds}, + data::{Event, EventType}, errors::prelude::*, errors::ZellijError, input::{ - command::TerminalAction, layout::{RunPlugin, RunPluginLocation}, - plugins::{PluginConfig, PluginType, PluginsConfig}, + plugins::PluginsConfig, }, pane_size::Size, - serde, }; -type PluginId = u32; - -#[derive(WasmerEnv, Clone)] -pub struct PluginEnv { - pub plugin_id: u32, - pub plugin: PluginConfig, - pub senders: ThreadSenders, - pub wasi_env: WasiEnv, - pub subscriptions: Arc<Mutex<HashSet<EventType>>>, - pub tab_index: usize, - pub client_id: ClientId, - #[allow(dead_code)] - pub plugin_own_data_dir: PathBuf, -} - -impl PluginEnv { - // Get the name (path) of the containing plugin - pub fn name(&self) -> String { - format!( - "{} (ID {})", - self.plugin.path.display().to_string(), - self.plugin_id - ) - } -} - -pub type PluginMap = HashMap<(u32, ClientId), (Instance, PluginEnv, (usize, usize))>; // u32 => - // plugin_id, - // (usize, usize) - // => (rows, - // columns) +pub type PluginId = u32; pub struct WasmBridge { connected_clients: Arc<Mutex<Vec<ClientId>>>, @@ -121,10 +78,24 @@ impl WasmBridge { run: &RunPlugin, tab_index: usize, size: Size, - client_id: ClientId, + client_id: Option<ClientId>, ) -> Result<u32> { // returns the plugin id - let err_context = move || format!("failed to load plugin for client {client_id}"); + let err_context = move || format!("failed to load plugin"); + + let client_id = client_id + .or_else(|| { + self.connected_clients + .lock() + .unwrap() + .iter() + .next() + .copied() + }) + .with_context(|| { + "Plugins must have a client id, none was provided and none are connected" + })?; + let plugin_id = self.next_plugin_id; let plugin = self @@ -277,48 +248,30 @@ impl WasmBridge { Ok(()) } pub fn add_client(&mut self, client_id: ClientId) -> Result<()> { - let err_context = || format!("failed to add plugins for client {client_id}"); - - self.connected_clients.lock().unwrap().push(client_id); - - let mut seen = HashSet::new(); - let mut new_plugins = HashMap::new(); - let mut plugin_map = self.plugin_map.lock().unwrap(); - for (&(plugin_id, _), (instance, plugin_env, (rows, columns))) in &*plugin_map { - if seen.contains(&plugin_id) { - continue; - } - seen.insert(plugin_id); - let mut new_plugin_env = plugin_env.clone(); - - new_plugin_env.client_id = client_id; - new_plugins.insert( - plugin_id, - (instance.module().clone(), new_plugin_env, (*rows, *columns)), - ); + let mut loading_indication = LoadingIndication::new("".into()); + match PluginLoader::add_client( + client_id, + self.plugin_dir.clone(), + self.plugin_cache.clone(), + self.senders.clone(), + self.store.clone(), + self.plugin_map.clone(), + self.connected_clients.clone(), + &mut loading_indication, + ) { + Ok(_) => { + let _ = self + .senders + .send_to_screen(ScreenInstruction::RequestStateUpdateForPlugins); + Ok(()) + }, + Err(e) => Err(e), } - for (plugin_id, (module, mut new_plugin_env, (rows, columns))) in new_plugins.drain() { - let wasi = new_plugin_env - .wasi_env - .import_object(&module) - .with_context(err_context)?; - let zellij = zellij_exports(&self.store, &new_plugin_env); - let mut instance = - Instance::new(&module, &zellij.chain_back(wasi)).with_context(err_context)?; - load_plugin_instance(&mut instance).with_context(err_context)?; - plugin_map.insert( - (plugin_id, client_id), - (instance, new_plugin_env, (rows, columns)), - ); - } - Ok(()) } pub fn resize_plugin(&mut self, pid: u32, new_columns: usize, new_rows: usize) -> Result<()> { - let err_context = || format!("failed to resize plugin {pid}"); - let mut plugin_bytes = vec![]; - let mut plugin_map = self.plugin_map.lock().unwrap(); - for ((plugin_id, client_id), (instance, plugin_env, (current_rows, current_columns))) in - plugin_map.iter_mut() + let err_context = move || format!("failed to resize plugin {pid}"); + for ((plugin_id, client_id), (running_plugin, _subscriptions)) in + self.plugin_map.lock().unwrap().iter_mut() { if self .cached_resizes_for_pending_plugins @@ -327,26 +280,53 @@ impl WasmBridge { continue; } if *plugin_id == pid { - *current_rows = new_rows; - *current_columns = new_columns; - - // TODO: consolidate with above render function - let rendered_bytes = instance - .exports - .get_function("render") - .map_err(anyError::new) - .and_then(|render| { - render - .call(&[ - Value::I32(*current_rows as i32), - Value::I32(*current_columns as i32), - ]) - .map_err(anyError::new) - }) - .and_then(|_| wasi_read_string(&plugin_env.wasi_env)) - .with_context(err_context)?; - - plugin_bytes.push((*plugin_id, *client_id, rendered_bytes.as_bytes().to_vec())); + let event_id = running_plugin + .lock() + .unwrap() + .next_event_id(AtomicEvent::Resize); + task::spawn({ + let senders = self.senders.clone(); + let running_plugin = running_plugin.clone(); + let plugin_id = *plugin_id; + let client_id = *client_id; + async move { + let mut running_plugin = running_plugin.lock().unwrap(); + if running_plugin.apply_event_id(AtomicEvent::Resize, event_id) { + running_plugin.rows = new_rows; + running_plugin.columns = new_columns; + let rendered_bytes = running_plugin + .instance + .exports + .get_function("render") + .map_err(anyError::new) + .and_then(|render| { + render + .call(&[ + Value::I32(running_plugin.rows as i32), + Value::I32(running_plugin.columns as i32), + ]) + .map_err(anyError::new) + }) + .and_then(|_| wasi_read_string(&running_plugin.plugin_env.wasi_env)) + .with_context(err_context); + match rendered_bytes { + Ok(rendered_bytes) => { + let plugin_bytes = vec![( + plugin_id, + client_id, + rendered_bytes.as_bytes().to_vec(), + )]; + senders + .send_to_screen(ScreenInstruction::PluginBytes( + plugin_bytes, + )) + .unwrap(); + }, + Err(e) => log::error!("{}", e), + } + } + } + }); } } for (plugin_id, mut current_size) in self.cached_resizes_for_pending_plugins.iter_mut() { @@ -355,9 +335,6 @@ impl WasmBridge { current_size.1 = new_columns; } } - let _ = self - .senders - .send_to_screen(ScreenInstruction::PluginBytes(plugin_bytes)); Ok(()) } pub fn update_plugins( @@ -366,21 +343,17 @@ impl WasmBridge { ) -> Result<()> { let err_context = || "failed to update plugin state".to_string(); - let plugin_map = self.plugin_map.lock().unwrap(); - let mut plugin_bytes = vec![]; for (pid, cid, event) in updates.drain(..) { - for (&(plugin_id, client_id), (instance, plugin_env, (rows, columns))) in &*plugin_map { + for (&(plugin_id, client_id), (running_plugin, subscriptions)) in + &*self.plugin_map.lock().unwrap() + { if self .cached_events_for_pending_plugins .contains_key(&plugin_id) { continue; } - let subs = plugin_env - .subscriptions - .lock() - .to_anyhow() - .with_context(err_context)?; + let subs = subscriptions.lock().unwrap().clone(); // FIXME: This is very janky... Maybe I should write my own macro for Event -> EventType? let event_type = EventType::from_str(&event.to_string()).with_context(err_context)?; @@ -390,16 +363,34 @@ impl WasmBridge { || (cid.is_none() && pid == Some(plugin_id)) || (cid == Some(client_id) && pid == Some(plugin_id))) { - apply_event_to_plugin( - plugin_id, - client_id, - &instance, - &plugin_env, - &event, - *rows, - *columns, - &mut plugin_bytes, - )?; + task::spawn({ + let senders = self.senders.clone(); + let running_plugin = running_plugin.clone(); + let event = event.clone(); + async move { + let running_plugin = running_plugin.lock().unwrap(); + let mut plugin_bytes = vec![]; + match apply_event_to_plugin( + plugin_id, + client_id, + &running_plugin.instance, + &running_plugin.plugin_env, + &event, + running_plugin.rows, + running_plugin.columns, + &mut plugin_bytes, + ) { + Ok(()) => { + let _ = senders.send_to_screen(ScreenInstruction::PluginBytes( + plugin_bytes, + )); + }, + Err(e) => { + log::error!("{}", e); + }, + } + } + }); } } for (plugin_id, cached_events) in self.cached_events_for_pending_plugins.iter_mut() { @@ -408,9 +399,6 @@ impl WasmBridge { } } } - let _ = self - .senders - .send_to_screen(ScreenInstruction::PluginBytes(plugin_bytes)); Ok(()) } pub fn apply_cached_events(&mut self, plugin_ids: Vec<u32>) -> Result<()> { @@ -450,7 +438,6 @@ impl WasmBridge { fn apply_cached_events_and_resizes_for_plugin(&mut self, plugin_id: PluginId) -> Result<()> { let err_context = || format!("Failed to apply cached events to plugin"); if let Some(events) = self.cached_events_for_pending_plugins.remove(&plugin_id) { - let mut plugin_map = self.plugin_map.lock().unwrap(); let all_connected_clients: Vec<ClientId> = self .connected_clients .lock() @@ -459,35 +446,48 @@ impl WasmBridge { .copied() .collect(); for client_id in &all_connected_clients { - let mut plugin_bytes = vec![]; - if let Some((instance, plugin_env, (rows, columns))) = - plugin_map.get_mut(&(plugin_id, *client_id)) + if let Some((running_plugin, subscriptions)) = self + .plugin_map + .lock() + .unwrap() + .get_mut(&(plugin_id, *client_id)) { - let subs = plugin_env - .subscriptions - .lock() - .to_anyhow() - .with_context(err_context)?; + let subs = subscriptions.lock().unwrap().clone(); for event in events.clone() { let event_type = EventType::from_str(&event.to_string()).with_context(err_context)?; if !subs.contains(&event_type) { continue; } - apply_event_to_plugin( - plugin_id, - *client_id, - &instance, - &plugin_env, - &event, - *rows, - *columns, - &mut plugin_bytes, - )?; + task::spawn({ + let senders = self.senders.clone(); + let running_plugin = running_plugin.clone(); + let client_id = *client_id; + async move { + let running_plugin = running_plugin.lock().unwrap(); + let mut plugin_bytes = vec![]; + match apply_event_to_plugin( + plugin_id, + client_id, + &running_plugin.instance, + &running_plugin.plugin_env, + &event, + running_plugin.rows, + running_plugin.columns, + &mut plugin_bytes, + ) { + Ok(()) => { + let _ = senders.send_to_screen( + ScreenInstruction::PluginBytes(plugin_bytes), + ); + }, + Err(e) => { + log::error!("{}", e); + }, + } + } + }); } - let _ = self - .senders - .send_to_screen(ScreenInstruction::PluginBytes(plugin_bytes)); } } } @@ -512,11 +512,11 @@ impl WasmBridge { .lock() .unwrap() .iter() - .filter( - |((_plugin_id, _client_id), (_instance, plugin_env, _size))| { - &plugin_env.plugin.location == plugin_location - }, - ) + .filter(|(_, (running_plugin, _subscriptions))| { + &running_plugin.lock().unwrap().plugin_env.plugin.location == plugin_location + // TODO: + // better + }) .map(|((plugin_id, _client_id), _)| *plugin_id) .collect(); if plugin_ids.is_empty() { @@ -530,8 +530,11 @@ impl WasmBridge { .lock() .unwrap() .iter() - .find(|((p_id, _client_id), (_instance, _plugin_env, _size))| *p_id == plugin_id) - .map(|((_p_id, _client_id), (_instance, _plugin_env, size))| *size) + .find(|((p_id, _client_id), _)| *p_id == plugin_id) + .map(|(_, (running_plugin, _subscriptions))| { + let running_plugin = running_plugin.lock().unwrap(); + (running_plugin.rows, running_plugin.columns) + }) } fn start_plugin_loading_indication( &self, @@ -563,6 +566,7 @@ fn handle_plugin_loading_failure( loading_indication: &mut LoadingIndication, error: impl Display, ) { + log::error!("{}", error); let _ = senders.send_to_background_jobs(BackgroundJob::StopPluginLoadingAnimation(plugin_id)); loading_indication.indicate_loading_error(error.to_string()); let _ = senders.send_to_screen(ScreenInstruction::UpdatePluginLoadingStage( @@ -571,292 +575,6 @@ fn handle_plugin_loading_failure( )); } -fn load_plugin_instance(instance: &mut Instance) -> Result<()> { - let err_context = || format!("failed to load plugin from instance {instance:#?}"); - - let load_function = instance - .exports - .get_function("_start") - .with_context(err_context)?; - // This eventually calls the `.load()` method - load_function.call(&[]).with_context(err_context)?; - Ok(()) -} - -pub(crate) fn zellij_exports(store: &Store, plugin_env: &PluginEnv) -> ImportObject { - macro_rules! zellij_export { - ($($host_function:ident),+ $(,)?) => { - imports! { - "zellij" => { - $(stringify!($host_function) => - Function::new_native_with_env(store, plugin_env.clone(), $host_function),)+ - } - } - } - } - - zellij_export! { - host_subscribe, - host_unsubscribe, - host_set_selectable, - host_get_plugin_ids, - host_get_zellij_version, - host_open_file, - host_switch_tab_to, - host_set_timeout, - host_exec_cmd, - host_report_panic, - } -} - -fn host_subscribe(plugin_env: &PluginEnv) { - wasi_read_object::<HashSet<EventType>>(&plugin_env.wasi_env) - .and_then(|new| { - plugin_env.subscriptions.lock().to_anyhow()?.extend(new); - Ok(()) - }) - .with_context(|| format!("failed to subscribe for plugin {}", plugin_env.name())) - .fatal(); -} - -fn host_unsubscribe(plugin_env: &PluginEnv) { - wasi_read_object::<HashSet<EventType>>(&plugin_env.wasi_env) - .and_then(|old| { - plugin_env - .subscriptions - .lock() - .to_anyhow()? - .retain(|k| !old.contains(k)); - Ok(()) - }) - .with_context(|| format!("failed to unsubscribe for plugin {}", plugin_env.name())) - .fatal(); -} - -fn host_set_selectable(plugin_env: &PluginEnv, selectable: i32) { - match plugin_env.plugin.run { - PluginType::Pane(Some(tab_index)) => { - let selectable = selectable != 0; - plugin_env - .senders - .send_to_screen(ScreenInstruction::SetSelectable( - PaneId::Plugin(plugin_env.plugin_id), - selectable, - tab_index, - )) - .with_context(|| { - format!( - "failed to set plugin {} selectable from plugin {}", - selectable, - plugin_env.name() - ) - }) - .non_fatal(); - }, - _ => { - debug!( - "{} - Calling method 'host_set_selectable' does nothing for headless plugins", - plugin_env.plugin.location - ) - }, - } -} - -fn host_get_plugin_ids(plugin_env: &PluginEnv) { - let ids = PluginIds { - plugin_id: plugin_env.plugin_id, - zellij_pid: process::id(), - }; - wasi_write_object(&plugin_env.wasi_env, &ids) - .with_context(|| { - format!( - "failed to query plugin IDs from host for plugin {}", - plugin_env.name() - ) - }) - .non_fatal(); -} - -fn host_get_zellij_version(plugin_env: &PluginEnv) { - wasi_write_object(&plugin_env.wasi_env, VERSION) - .with_context(|| { - format!( - "failed to request zellij version from host for plugin {}", - plugin_env.name() - ) - }) - .non_fatal(); -} - -fn host_open_file(plugin_env: &PluginEnv) { - wasi_read_object::<PathBuf>(&plugin_env.wasi_env) - .and_then(|path| { - plugin_env - .senders - .send_to_pty(PtyInstruction::SpawnTerminal( - Some(TerminalAction::OpenFile(path, None, None)), - None, - None, - ClientOrTabIndex::TabIndex(plugin_env.tab_index), - )) - }) - .with_context(|| { - format!( - "failed to open file on host from plugin {}", - plugin_env.name() - ) - }) - .non_fatal(); -} - -fn host_switch_tab_to(plugin_env: &PluginEnv, tab_idx: u32) { - plugin_env - .senders - .send_to_screen(ScreenInstruction::GoToTab( - tab_idx, - Some(plugin_env.client_id), - )) - .with_context(|| { - format!( - "failed to switch host to tab {tab_idx} from plugin {}", - plugin_env.name() - ) - }) - .non_fatal(); -} - -fn host_set_timeout(plugin_env: &PluginEnv, secs: f64) { - // There is a fancy, high-performance way to do this with zero additional threads: - // If the plugin thread keeps a BinaryHeap of timer structs, it can manage multiple and easily `.peek()` at the - // next time to trigger in O(1) time. Once the wake-up time is known, the `wasm` thread can use `recv_timeout()` - // to wait for an event with the timeout set to be the time of the next wake up. If events come in in the meantime, - // they are handled, but if the timeout triggers, we replace the event from `recv()` with an - // `Update(pid, TimerEvent)` and pop the timer from the Heap (or reschedule it). No additional threads for as many - // timers as we'd like. - // - // But that's a lot of code, and this is a few lines: - let send_plugin_instructions = plugin_env.senders.to_plugin.clone(); - let update_target = Some(plugin_env.plugin_id); - let client_id = plugin_env.client_id; - let plugin_name = plugin_env.name(); - thread::spawn(move || { - let start_time = Instant::now(); - thread::sleep(Duration::from_secs_f64(secs)); - // FIXME: The way that elapsed time is being calculated here is not exact; it doesn't take into account the - // time it takes an event to actually reach the plugin after it's sent to the `wasm` thread. - let elapsed_time = Instant::now().duration_since(start_time).as_secs_f64(); - - send_plugin_instructions - .ok_or(anyhow!("found no sender to send plugin instruction to")) - .and_then(|sender| { - sender - .send(PluginInstruction::Update(vec![( - update_target, - Some(client_id), - Event::Timer(elapsed_time), - )])) - .to_anyhow() - }) - .with_context(|| { - format!( - "failed to set host timeout of {secs} s for plugin {}", - plugin_name - ) - }) - .non_fatal(); - }); -} - -fn host_exec_cmd(plugin_env: &PluginEnv) { - let err_context = || { - format!( - "failed to execute command on host for plugin '{}'", - plugin_env.name() - ) - }; - - let mut cmdline: Vec<String> = wasi_read_object(&plugin_env.wasi_env) - .with_context(err_context) - .fatal(); - let command = cmdline.remove(0); - - // Bail out if we're forbidden to run command - if !plugin_env.plugin._allow_exec_host_cmd { - warn!("This plugin isn't allow to run command in host side, skip running this command: '{cmd} {args}'.", - cmd = command, args = cmdline.join(" ")); - return; - } - - // Here, we don't wait the command to finish - process::Command::new(command) - .args(cmdline) - .spawn() - .with_context(err_context) - .non_fatal(); -} - -// Custom panic handler for plugins. |