summaryrefslogtreecommitdiffstats
path: root/zellij-server/src/plugins/wasm_bridge.rs
diff options
context:
space:
mode:
Diffstat (limited to 'zellij-server/src/plugins/wasm_bridge.rs')
-rw-r--r--zellij-server/src/plugins/wasm_bridge.rs622
1 files changed, 170 insertions, 452 deletions
diff --git a/zellij-server/src/plugins/wasm_bridge.rs b/zellij-server/src/plugins/wasm_bridge.rs
index 119a1d24b..45a516d3e 100644
--- a/zellij-server/src/plugins/wasm_bridge.rs
+++ b/zellij-server/src/plugins/wasm_bridge.rs
@@ -1,79 +1,36 @@
use super::PluginInstruction;
use crate::plugins::plugin_loader::{PluginLoader, VersionMismatchError};
-use log::{debug, info, warn};
-use serde::{de::DeserializeOwned, Serialize};
+use crate::plugins::plugin_map::{AtomicEvent, PluginEnv, PluginMap};
+use crate::plugins::zellij_exports::{wasi_read_string, wasi_write_object};
+use log::info;
use std::{
collections::{HashMap, HashSet},
fmt::Display,
path::PathBuf,
- process,
str::FromStr,
sync::{Arc, Mutex},
- thread,
- time::{Duration, Instant},
};
-use wasmer::{
- imports, ChainableNamedResolver, Function, ImportObject, Instance, Module, Store, Value,
- WasmerEnv,
-};
-use wasmer_wasi::WasiEnv;
+use wasmer::{Instance, Module, Store, Value};
use zellij_utils::async_std::task::{self, JoinHandle};
use crate::{
- background_jobs::BackgroundJob,
- panes::PaneId,
- pty::{ClientOrTabIndex, PtyInstruction},
- screen::ScreenInstruction,
- thread_bus::ThreadSenders,
- ui::loading_indication::LoadingIndication,
- ClientId,
+ background_jobs::BackgroundJob, screen::ScreenInstruction, thread_bus::ThreadSenders,
+ ui::loading_indication::LoadingIndication, ClientId,
};
use zellij_utils::{
consts::VERSION,
- data::{Event, EventType, PluginIds},
+ data::{Event, EventType},
errors::prelude::*,
errors::ZellijError,
input::{
- command::TerminalAction,
layout::{RunPlugin, RunPluginLocation},
- plugins::{PluginConfig, PluginType, PluginsConfig},
+ plugins::PluginsConfig,
},
pane_size::Size,
- serde,
};
-type PluginId = u32;
-
-#[derive(WasmerEnv, Clone)]
-pub struct PluginEnv {
- pub plugin_id: u32,
- pub plugin: PluginConfig,
- pub senders: ThreadSenders,
- pub wasi_env: WasiEnv,
- pub subscriptions: Arc<Mutex<HashSet<EventType>>>,
- pub tab_index: usize,
- pub client_id: ClientId,
- #[allow(dead_code)]
- pub plugin_own_data_dir: PathBuf,
-}
-
-impl PluginEnv {
- // Get the name (path) of the containing plugin
- pub fn name(&self) -> String {
- format!(
- "{} (ID {})",
- self.plugin.path.display().to_string(),
- self.plugin_id
- )
- }
-}
-
-pub type PluginMap = HashMap<(u32, ClientId), (Instance, PluginEnv, (usize, usize))>; // u32 =>
- // plugin_id,
- // (usize, usize)
- // => (rows,
- // columns)
+pub type PluginId = u32;
pub struct WasmBridge {
connected_clients: Arc<Mutex<Vec<ClientId>>>,
@@ -121,10 +78,24 @@ impl WasmBridge {
run: &RunPlugin,
tab_index: usize,
size: Size,
- client_id: ClientId,
+ client_id: Option<ClientId>,
) -> Result<u32> {
// returns the plugin id
- let err_context = move || format!("failed to load plugin for client {client_id}");
+ let err_context = move || format!("failed to load plugin");
+
+ let client_id = client_id
+ .or_else(|| {
+ self.connected_clients
+ .lock()
+ .unwrap()
+ .iter()
+ .next()
+ .copied()
+ })
+ .with_context(|| {
+ "Plugins must have a client id, none was provided and none are connected"
+ })?;
+
let plugin_id = self.next_plugin_id;
let plugin = self
@@ -277,48 +248,30 @@ impl WasmBridge {
Ok(())
}
pub fn add_client(&mut self, client_id: ClientId) -> Result<()> {
- let err_context = || format!("failed to add plugins for client {client_id}");
-
- self.connected_clients.lock().unwrap().push(client_id);
-
- let mut seen = HashSet::new();
- let mut new_plugins = HashMap::new();
- let mut plugin_map = self.plugin_map.lock().unwrap();
- for (&(plugin_id, _), (instance, plugin_env, (rows, columns))) in &*plugin_map {
- if seen.contains(&plugin_id) {
- continue;
- }
- seen.insert(plugin_id);
- let mut new_plugin_env = plugin_env.clone();
-
- new_plugin_env.client_id = client_id;
- new_plugins.insert(
- plugin_id,
- (instance.module().clone(), new_plugin_env, (*rows, *columns)),
- );
+ let mut loading_indication = LoadingIndication::new("".into());
+ match PluginLoader::add_client(
+ client_id,
+ self.plugin_dir.clone(),
+ self.plugin_cache.clone(),
+ self.senders.clone(),
+ self.store.clone(),
+ self.plugin_map.clone(),
+ self.connected_clients.clone(),
+ &mut loading_indication,
+ ) {
+ Ok(_) => {
+ let _ = self
+ .senders
+ .send_to_screen(ScreenInstruction::RequestStateUpdateForPlugins);
+ Ok(())
+ },
+ Err(e) => Err(e),
}
- for (plugin_id, (module, mut new_plugin_env, (rows, columns))) in new_plugins.drain() {
- let wasi = new_plugin_env
- .wasi_env
- .import_object(&module)
- .with_context(err_context)?;
- let zellij = zellij_exports(&self.store, &new_plugin_env);
- let mut instance =
- Instance::new(&module, &zellij.chain_back(wasi)).with_context(err_context)?;
- load_plugin_instance(&mut instance).with_context(err_context)?;
- plugin_map.insert(
- (plugin_id, client_id),
- (instance, new_plugin_env, (rows, columns)),
- );
- }
- Ok(())
}
pub fn resize_plugin(&mut self, pid: u32, new_columns: usize, new_rows: usize) -> Result<()> {
- let err_context = || format!("failed to resize plugin {pid}");
- let mut plugin_bytes = vec![];
- let mut plugin_map = self.plugin_map.lock().unwrap();
- for ((plugin_id, client_id), (instance, plugin_env, (current_rows, current_columns))) in
- plugin_map.iter_mut()
+ let err_context = move || format!("failed to resize plugin {pid}");
+ for ((plugin_id, client_id), (running_plugin, _subscriptions)) in
+ self.plugin_map.lock().unwrap().iter_mut()
{
if self
.cached_resizes_for_pending_plugins
@@ -327,26 +280,53 @@ impl WasmBridge {
continue;
}
if *plugin_id == pid {
- *current_rows = new_rows;
- *current_columns = new_columns;
-
- // TODO: consolidate with above render function
- let rendered_bytes = instance
- .exports
- .get_function("render")
- .map_err(anyError::new)
- .and_then(|render| {
- render
- .call(&[
- Value::I32(*current_rows as i32),
- Value::I32(*current_columns as i32),
- ])
- .map_err(anyError::new)
- })
- .and_then(|_| wasi_read_string(&plugin_env.wasi_env))
- .with_context(err_context)?;
-
- plugin_bytes.push((*plugin_id, *client_id, rendered_bytes.as_bytes().to_vec()));
+ let event_id = running_plugin
+ .lock()
+ .unwrap()
+ .next_event_id(AtomicEvent::Resize);
+ task::spawn({
+ let senders = self.senders.clone();
+ let running_plugin = running_plugin.clone();
+ let plugin_id = *plugin_id;
+ let client_id = *client_id;
+ async move {
+ let mut running_plugin = running_plugin.lock().unwrap();
+ if running_plugin.apply_event_id(AtomicEvent::Resize, event_id) {
+ running_plugin.rows = new_rows;
+ running_plugin.columns = new_columns;
+ let rendered_bytes = running_plugin
+ .instance
+ .exports
+ .get_function("render")
+ .map_err(anyError::new)
+ .and_then(|render| {
+ render
+ .call(&[
+ Value::I32(running_plugin.rows as i32),
+ Value::I32(running_plugin.columns as i32),
+ ])
+ .map_err(anyError::new)
+ })
+ .and_then(|_| wasi_read_string(&running_plugin.plugin_env.wasi_env))
+ .with_context(err_context);
+ match rendered_bytes {
+ Ok(rendered_bytes) => {
+ let plugin_bytes = vec![(
+ plugin_id,
+ client_id,
+ rendered_bytes.as_bytes().to_vec(),
+ )];
+ senders
+ .send_to_screen(ScreenInstruction::PluginBytes(
+ plugin_bytes,
+ ))
+ .unwrap();
+ },
+ Err(e) => log::error!("{}", e),
+ }
+ }
+ }
+ });
}
}
for (plugin_id, mut current_size) in self.cached_resizes_for_pending_plugins.iter_mut() {
@@ -355,9 +335,6 @@ impl WasmBridge {
current_size.1 = new_columns;
}
}
- let _ = self
- .senders
- .send_to_screen(ScreenInstruction::PluginBytes(plugin_bytes));
Ok(())
}
pub fn update_plugins(
@@ -366,21 +343,17 @@ impl WasmBridge {
) -> Result<()> {
let err_context = || "failed to update plugin state".to_string();
- let plugin_map = self.plugin_map.lock().unwrap();
- let mut plugin_bytes = vec![];
for (pid, cid, event) in updates.drain(..) {
- for (&(plugin_id, client_id), (instance, plugin_env, (rows, columns))) in &*plugin_map {
+ for (&(plugin_id, client_id), (running_plugin, subscriptions)) in
+ &*self.plugin_map.lock().unwrap()
+ {
if self
.cached_events_for_pending_plugins
.contains_key(&plugin_id)
{
continue;
}
- let subs = plugin_env
- .subscriptions
- .lock()
- .to_anyhow()
- .with_context(err_context)?;
+ let subs = subscriptions.lock().unwrap().clone();
// FIXME: This is very janky... Maybe I should write my own macro for Event -> EventType?
let event_type =
EventType::from_str(&event.to_string()).with_context(err_context)?;
@@ -390,16 +363,34 @@ impl WasmBridge {
|| (cid.is_none() && pid == Some(plugin_id))
|| (cid == Some(client_id) && pid == Some(plugin_id)))
{
- apply_event_to_plugin(
- plugin_id,
- client_id,
- &instance,
- &plugin_env,
- &event,
- *rows,
- *columns,
- &mut plugin_bytes,
- )?;
+ task::spawn({
+ let senders = self.senders.clone();
+ let running_plugin = running_plugin.clone();
+ let event = event.clone();
+ async move {
+ let running_plugin = running_plugin.lock().unwrap();
+ let mut plugin_bytes = vec![];
+ match apply_event_to_plugin(
+ plugin_id,
+ client_id,
+ &running_plugin.instance,
+ &running_plugin.plugin_env,
+ &event,
+ running_plugin.rows,
+ running_plugin.columns,
+ &mut plugin_bytes,
+ ) {
+ Ok(()) => {
+ let _ = senders.send_to_screen(ScreenInstruction::PluginBytes(
+ plugin_bytes,
+ ));
+ },
+ Err(e) => {
+ log::error!("{}", e);
+ },
+ }
+ }
+ });
}
}
for (plugin_id, cached_events) in self.cached_events_for_pending_plugins.iter_mut() {
@@ -408,9 +399,6 @@ impl WasmBridge {
}
}
}
- let _ = self
- .senders
- .send_to_screen(ScreenInstruction::PluginBytes(plugin_bytes));
Ok(())
}
pub fn apply_cached_events(&mut self, plugin_ids: Vec<u32>) -> Result<()> {
@@ -450,7 +438,6 @@ impl WasmBridge {
fn apply_cached_events_and_resizes_for_plugin(&mut self, plugin_id: PluginId) -> Result<()> {
let err_context = || format!("Failed to apply cached events to plugin");
if let Some(events) = self.cached_events_for_pending_plugins.remove(&plugin_id) {
- let mut plugin_map = self.plugin_map.lock().unwrap();
let all_connected_clients: Vec<ClientId> = self
.connected_clients
.lock()
@@ -459,35 +446,48 @@ impl WasmBridge {
.copied()
.collect();
for client_id in &all_connected_clients {
- let mut plugin_bytes = vec![];
- if let Some((instance, plugin_env, (rows, columns))) =
- plugin_map.get_mut(&(plugin_id, *client_id))
+ if let Some((running_plugin, subscriptions)) = self
+ .plugin_map
+ .lock()
+ .unwrap()
+ .get_mut(&(plugin_id, *client_id))
{
- let subs = plugin_env
- .subscriptions
- .lock()
- .to_anyhow()
- .with_context(err_context)?;
+ let subs = subscriptions.lock().unwrap().clone();
for event in events.clone() {
let event_type =
EventType::from_str(&event.to_string()).with_context(err_context)?;
if !subs.contains(&event_type) {
continue;
}
- apply_event_to_plugin(
- plugin_id,
- *client_id,
- &instance,
- &plugin_env,
- &event,
- *rows,
- *columns,
- &mut plugin_bytes,
- )?;
+ task::spawn({
+ let senders = self.senders.clone();
+ let running_plugin = running_plugin.clone();
+ let client_id = *client_id;
+ async move {
+ let running_plugin = running_plugin.lock().unwrap();
+ let mut plugin_bytes = vec![];
+ match apply_event_to_plugin(
+ plugin_id,
+ client_id,
+ &running_plugin.instance,
+ &running_plugin.plugin_env,
+ &event,
+ running_plugin.rows,
+ running_plugin.columns,
+ &mut plugin_bytes,
+ ) {
+ Ok(()) => {
+ let _ = senders.send_to_screen(
+ ScreenInstruction::PluginBytes(plugin_bytes),
+ );
+ },
+ Err(e) => {
+ log::error!("{}", e);
+ },
+ }
+ }
+ });
}
- let _ = self
- .senders
- .send_to_screen(ScreenInstruction::PluginBytes(plugin_bytes));
}
}
}
@@ -512,11 +512,11 @@ impl WasmBridge {
.lock()
.unwrap()
.iter()
- .filter(
- |((_plugin_id, _client_id), (_instance, plugin_env, _size))| {
- &plugin_env.plugin.location == plugin_location
- },
- )
+ .filter(|(_, (running_plugin, _subscriptions))| {
+ &running_plugin.lock().unwrap().plugin_env.plugin.location == plugin_location
+ // TODO:
+ // better
+ })
.map(|((plugin_id, _client_id), _)| *plugin_id)
.collect();
if plugin_ids.is_empty() {
@@ -530,8 +530,11 @@ impl WasmBridge {
.lock()
.unwrap()
.iter()
- .find(|((p_id, _client_id), (_instance, _plugin_env, _size))| *p_id == plugin_id)
- .map(|((_p_id, _client_id), (_instance, _plugin_env, size))| *size)
+ .find(|((p_id, _client_id), _)| *p_id == plugin_id)
+ .map(|(_, (running_plugin, _subscriptions))| {
+ let running_plugin = running_plugin.lock().unwrap();
+ (running_plugin.rows, running_plugin.columns)
+ })
}
fn start_plugin_loading_indication(
&self,
@@ -563,6 +566,7 @@ fn handle_plugin_loading_failure(
loading_indication: &mut LoadingIndication,
error: impl Display,
) {
+ log::error!("{}", error);
let _ = senders.send_to_background_jobs(BackgroundJob::StopPluginLoadingAnimation(plugin_id));
loading_indication.indicate_loading_error(error.to_string());
let _ = senders.send_to_screen(ScreenInstruction::UpdatePluginLoadingStage(
@@ -571,292 +575,6 @@ fn handle_plugin_loading_failure(
));
}
-fn load_plugin_instance(instance: &mut Instance) -> Result<()> {
- let err_context = || format!("failed to load plugin from instance {instance:#?}");
-
- let load_function = instance
- .exports
- .get_function("_start")
- .with_context(err_context)?;
- // This eventually calls the `.load()` method
- load_function.call(&[]).with_context(err_context)?;
- Ok(())
-}
-
-pub(crate) fn zellij_exports(store: &Store, plugin_env: &PluginEnv) -> ImportObject {
- macro_rules! zellij_export {
- ($($host_function:ident),+ $(,)?) => {
- imports! {
- "zellij" => {
- $(stringify!($host_function) =>
- Function::new_native_with_env(store, plugin_env.clone(), $host_function),)+
- }
- }
- }
- }
-
- zellij_export! {
- host_subscribe,
- host_unsubscribe,
- host_set_selectable,
- host_get_plugin_ids,
- host_get_zellij_version,
- host_open_file,
- host_switch_tab_to,
- host_set_timeout,
- host_exec_cmd,
- host_report_panic,
- }
-}
-
-fn host_subscribe(plugin_env: &PluginEnv) {
- wasi_read_object::<HashSet<EventType>>(&plugin_env.wasi_env)
- .and_then(|new| {
- plugin_env.subscriptions.lock().to_anyhow()?.extend(new);
- Ok(())
- })
- .with_context(|| format!("failed to subscribe for plugin {}", plugin_env.name()))
- .fatal();
-}
-
-fn host_unsubscribe(plugin_env: &PluginEnv) {
- wasi_read_object::<HashSet<EventType>>(&plugin_env.wasi_env)
- .and_then(|old| {
- plugin_env
- .subscriptions
- .lock()
- .to_anyhow()?
- .retain(|k| !old.contains(k));
- Ok(())
- })
- .with_context(|| format!("failed to unsubscribe for plugin {}", plugin_env.name()))
- .fatal();
-}
-
-fn host_set_selectable(plugin_env: &PluginEnv, selectable: i32) {
- match plugin_env.plugin.run {
- PluginType::Pane(Some(tab_index)) => {
- let selectable = selectable != 0;
- plugin_env
- .senders
- .send_to_screen(ScreenInstruction::SetSelectable(
- PaneId::Plugin(plugin_env.plugin_id),
- selectable,
- tab_index,
- ))
- .with_context(|| {
- format!(
- "failed to set plugin {} selectable from plugin {}",
- selectable,
- plugin_env.name()
- )
- })
- .non_fatal();
- },
- _ => {
- debug!(
- "{} - Calling method 'host_set_selectable' does nothing for headless plugins",
- plugin_env.plugin.location
- )
- },
- }
-}
-
-fn host_get_plugin_ids(plugin_env: &PluginEnv) {
- let ids = PluginIds {
- plugin_id: plugin_env.plugin_id,
- zellij_pid: process::id(),
- };
- wasi_write_object(&plugin_env.wasi_env, &ids)
- .with_context(|| {
- format!(
- "failed to query plugin IDs from host for plugin {}",
- plugin_env.name()
- )
- })
- .non_fatal();
-}
-
-fn host_get_zellij_version(plugin_env: &PluginEnv) {
- wasi_write_object(&plugin_env.wasi_env, VERSION)
- .with_context(|| {
- format!(
- "failed to request zellij version from host for plugin {}",
- plugin_env.name()
- )
- })
- .non_fatal();
-}
-
-fn host_open_file(plugin_env: &PluginEnv) {
- wasi_read_object::<PathBuf>(&plugin_env.wasi_env)
- .and_then(|path| {
- plugin_env
- .senders
- .send_to_pty(PtyInstruction::SpawnTerminal(
- Some(TerminalAction::OpenFile(path, None, None)),
- None,
- None,
- ClientOrTabIndex::TabIndex(plugin_env.tab_index),
- ))
- })
- .with_context(|| {
- format!(
- "failed to open file on host from plugin {}",
- plugin_env.name()
- )
- })
- .non_fatal();
-}
-
-fn host_switch_tab_to(plugin_env: &PluginEnv, tab_idx: u32) {
- plugin_env
- .senders
- .send_to_screen(ScreenInstruction::GoToTab(
- tab_idx,
- Some(plugin_env.client_id),
- ))
- .with_context(|| {
- format!(
- "failed to switch host to tab {tab_idx} from plugin {}",
- plugin_env.name()
- )
- })
- .non_fatal();
-}
-
-fn host_set_timeout(plugin_env: &PluginEnv, secs: f64) {
- // There is a fancy, high-performance way to do this with zero additional threads:
- // If the plugin thread keeps a BinaryHeap of timer structs, it can manage multiple and easily `.peek()` at the
- // next time to trigger in O(1) time. Once the wake-up time is known, the `wasm` thread can use `recv_timeout()`
- // to wait for an event with the timeout set to be the time of the next wake up. If events come in in the meantime,
- // they are handled, but if the timeout triggers, we replace the event from `recv()` with an
- // `Update(pid, TimerEvent)` and pop the timer from the Heap (or reschedule it). No additional threads for as many
- // timers as we'd like.
- //
- // But that's a lot of code, and this is a few lines:
- let send_plugin_instructions = plugin_env.senders.to_plugin.clone();
- let update_target = Some(plugin_env.plugin_id);
- let client_id = plugin_env.client_id;
- let plugin_name = plugin_env.name();
- thread::spawn(move || {
- let start_time = Instant::now();
- thread::sleep(Duration::from_secs_f64(secs));
- // FIXME: The way that elapsed time is being calculated here is not exact; it doesn't take into account the
- // time it takes an event to actually reach the plugin after it's sent to the `wasm` thread.
- let elapsed_time = Instant::now().duration_since(start_time).as_secs_f64();
-
- send_plugin_instructions
- .ok_or(anyhow!("found no sender to send plugin instruction to"))
- .and_then(|sender| {
- sender
- .send(PluginInstruction::Update(vec![(
- update_target,
- Some(client_id),
- Event::Timer(elapsed_time),
- )]))
- .to_anyhow()
- })
- .with_context(|| {
- format!(
- "failed to set host timeout of {secs} s for plugin {}",
- plugin_name
- )
- })
- .non_fatal();
- });
-}
-
-fn host_exec_cmd(plugin_env: &PluginEnv) {
- let err_context = || {
- format!(
- "failed to execute command on host for plugin '{}'",
- plugin_env.name()
- )
- };
-
- let mut cmdline: Vec<String> = wasi_read_object(&plugin_env.wasi_env)
- .with_context(err_context)
- .fatal();
- let command = cmdline.remove(0);
-
- // Bail out if we're forbidden to run command
- if !plugin_env.plugin._allow_exec_host_cmd {
- warn!("This plugin isn't allow to run command in host side, skip running this command: '{cmd} {args}'.",
- cmd = command, args = cmdline.join(" "));
- return;
- }
-
- // Here, we don't wait the command to finish
- process::Command::new(command)
- .args(cmdline)
- .spawn()
- .with_context(err_context)
- .non_fatal();
-}
-
-// Custom panic handler for plugins.