diff options
author | Aram Drevekenin <aram@poor.dev> | 2023-05-10 19:26:55 +0200 |
---|---|---|
committer | Aram Drevekenin <aram@poor.dev> | 2023-05-10 19:26:55 +0200 |
commit | db53d0a56f85f1d2b55d9abb3667648d7a4e5616 (patch) | |
tree | 7b85f8496535532f96ad8cca7ccfe3815f56f682 | |
parent | a0ec6e0a83f6921018206ff0978591298be67372 (diff) |
moar refactoring
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | default-plugins/strider/src/main.rs | 15 | ||||
-rw-r--r-- | default-plugins/strider/src/search.rs | 55 | ||||
-rw-r--r-- | zellij-server/src/plugins/mod.rs | 19 | ||||
-rw-r--r-- | zellij-server/src/plugins/plugin_loader.rs | 47 | ||||
-rw-r--r-- | zellij-server/src/plugins/plugin_map.rs | 105 | ||||
-rw-r--r-- | zellij-server/src/plugins/wasm_bridge.rs | 194 | ||||
-rw-r--r-- | zellij-server/src/plugins/zellij_exports.rs | 5 | ||||
-rw-r--r-- | zellij-tile/src/lib.rs | 77 | ||||
-rw-r--r-- | zellij-utils/Cargo.toml | 1 | ||||
-rw-r--r-- | zellij-utils/src/consts.rs | 2 | ||||
-rw-r--r-- | zellij-utils/src/errors.rs | 1 |
12 files changed, 323 insertions, 199 deletions
diff --git a/Cargo.lock b/Cargo.lock index 64381f371..7f2555423 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4129,6 +4129,7 @@ dependencies = [ "thiserror", "unicode-width", "url", + "uuid", "vte 0.11.0", ] diff --git a/default-plugins/strider/src/main.rs b/default-plugins/strider/src/main.rs index 75aa46bd4..cb8e07f46 100644 --- a/default-plugins/strider/src/main.rs +++ b/default-plugins/strider/src/main.rs @@ -14,20 +14,7 @@ use zellij_tile::prelude::*; use serde_json; register_plugin!(State); - -thread_local! { - static SEARCH_WORKER: std::cell::RefCell<SearchWorker> = std::cell::RefCell::new(SearchWorker::new()); -} - -#[no_mangle] -pub fn search_worker() { - let mut json = String::new(); - std::io::stdin().read_line(&mut json).unwrap(); - let (message, payload): (String, String) = serde_json::from_str(&json).unwrap(); // TODO: no unwrap - SEARCH_WORKER.with(|search_worker| { - search_worker.borrow_mut().on_message(message, payload); - }); -} +register_worker!(SearchWorker, search_worker); impl ZellijPlugin for State { fn load(&mut self) { diff --git a/default-plugins/strider/src/search.rs b/default-plugins/strider/src/search.rs index b5346d6e7..8d516990d 100644 --- a/default-plugins/strider/src/search.rs +++ b/default-plugins/strider/src/search.rs @@ -170,20 +170,11 @@ pub struct SearchWorker { skip_hidden_files: bool, } -impl SearchWorker { - pub fn new() -> Self { - SearchWorker { - search_paths: vec![], - search_file_contents: vec![], - skip_hidden_files: true, - } - } - pub fn on_message(&mut self, message: String, payload: String) { +impl <'de> ZellijWorker<'de> for SearchWorker { + // TODO: handle out of order messages, likely when rendering + fn on_message(&mut self, message: String, payload: String) { match message.as_str() { // TODO: deserialize to type "scan_folder" => { - if let Err(e) = std::fs::remove_file("/data/search_data") { - eprintln!("Warning: failed to remove cache file: {:?}", e); - } self.populate_search_paths(); post_message_to_plugin("done_scanning_folder".into(), "".into()); } @@ -206,6 +197,16 @@ impl SearchWorker { _ => {} } } +} + +impl SearchWorker { + pub fn new() -> Self { + SearchWorker { + search_paths: vec![], + search_file_contents: vec![], + skip_hidden_files: true, + } + } fn search(&mut self, search_term: String) -> (String, Vec<SearchResult>) { if self.search_paths.is_empty() { self.populate_search_paths(); @@ -226,14 +227,12 @@ impl SearchWorker { (search_term, matches) } fn populate_search_paths(&mut self) { - // TODO: CONTINUE HERE - when we start, check to see if /data/search_data exists, if it is - // deserialize it and place it in our own state, if not, do the below and then write to it - if let Ok(search_data) = std::fs::read("/data/search_data") { // TODO: add cwd to here - if let Ok(mut existing_state) = serde_json::from_str::<Self>(&String::from_utf8_lossy(&search_data)) { - std::mem::swap(self, &mut existing_state); - return; - } - } +// if let Ok(search_data) = std::fs::read("/data/search_data") { // TODO: add cwd to here +// if let Ok(mut existing_state) = serde_json::from_str::<Self>(&String::from_utf8_lossy(&search_data)) { +// std::mem::swap(self, &mut existing_state); +// return; +// } +// } for entry in WalkDir::new(ROOT).into_iter().filter_map(|e| e.ok()) { if self.skip_hidden_files && entry.file_name().to_str().map(|s| s.starts_with('.')).unwrap_or(false) { continue; @@ -258,14 +257,14 @@ impl SearchWorker { self.search_paths.push(file_path); } - let serialized_state = serde_json::to_string(&self).unwrap(); // TODO: unwrap city - std::fs::write("/data/search_data", serialized_state.as_bytes()).unwrap(); - if let Ok(search_data) = std::fs::read("/data/search_data") { - if let Ok(mut existing_state) = serde_json::from_str::<Self>(&String::from_utf8_lossy(&search_data)) { - std::mem::swap(self, &mut existing_state); - return; - } - } +// let serialized_state = serde_json::to_string(&self).unwrap(); // TODO: unwrap city +// std::fs::write("/data/search_data", serialized_state.as_bytes()).unwrap(); +// if let Ok(search_data) = std::fs::read("/data/search_data") { +// if let Ok(mut existing_state) = serde_json::from_str::<Self>(&String::from_utf8_lossy(&search_data)) { +// std::mem::swap(self, &mut existing_state); +// return; +// } +// } } fn search_file_names(&self, search_term: &str, matcher: &mut SkimMatcherV2, matches: &mut Vec<SearchResult>) { for entry in &self.search_paths { diff --git a/zellij-server/src/plugins/mod.rs b/zellij-server/src/plugins/mod.rs index f4bed9be4..67a8c96c2 100644 --- a/zellij-server/src/plugins/mod.rs +++ b/zellij-server/src/plugins/mod.rs @@ -55,12 +55,15 @@ pub enum PluginInstruction { ClientId, ), ApplyCachedEvents(Vec<PluginId>), - PostMessageToPluginWorker( + ApplyCachedWorkerMessages(PluginId), + PostMessagesToPluginWorker( PluginId, ClientId, String, // worker name - String, // serialized message - String, // serialized payload + Vec<( + String, // serialized message name + String, // serialized payload + )> ), PostMessageToPlugin( PluginId, @@ -84,7 +87,8 @@ impl From<&PluginInstruction> for PluginContext { PluginInstruction::RemoveClient(_) => PluginContext::RemoveClient, PluginInstruction::NewTab(..) => PluginContext::NewTab, PluginInstruction::ApplyCachedEvents(..) => PluginContext::ApplyCachedEvents, - PluginInstruction::PostMessageToPluginWorker(..) => PluginContext::PostMessageToPluginWorker, + PluginInstruction::ApplyCachedWorkerMessages(..) => PluginContext::ApplyCachedWorkerMessages, + PluginInstruction::PostMessagesToPluginWorker(..) => PluginContext::PostMessageToPluginWorker, PluginInstruction::PostMessageToPlugin(..) => PluginContext::PostMessageToPlugin, } } @@ -216,8 +220,11 @@ pub(crate) fn plugin_thread_main( PluginInstruction::ApplyCachedEvents(plugin_id) => { wasm_bridge.apply_cached_events(plugin_id)?; }, - PluginInstruction::PostMessageToPluginWorker(plugin_id, client_id, worker_name, message, payload) => { - wasm_bridge.post_message_to_plugin_worker(plugin_id, client_id, worker_name, message, payload)?; + PluginInstruction::ApplyCachedWorkerMessages(plugin_id) => { + wasm_bridge.apply_cached_worker_messages(plugin_id)?; + }, + PluginInstruction::PostMessagesToPluginWorker(plugin_id, client_id, worker_name, messages) => { + wasm_bridge.post_messages_to_plugin_worker(plugin_id, client_id, worker_name, messages)?; }, PluginInstruction::PostMessageToPlugin(plugin_id, client_id, message, payload) => { let updates = vec![( diff --git a/zellij-server/src/plugins/plugin_loader.rs b/zellij-server/src/plugins/plugin_loader.rs index 2afffa81f..64314dfe0 100644 --- a/zellij-server/src/plugins/plugin_loader.rs +++ b/zellij-server/src/plugins/plugin_loader.rs @@ -20,7 +20,7 @@ use crate::{ }; use zellij_utils::{ - consts::{VERSION, ZELLIJ_CACHE_DIR, ZELLIJ_TMP_DIR}, + consts::{VERSION, ZELLIJ_CACHE_DIR, ZELLIJ_SESSION_CACHE_DIR, ZELLIJ_TMP_DIR}, errors::prelude::*, input::plugins::PluginConfig, pane_size::Size, @@ -199,7 +199,6 @@ impl<'a> PluginLoader<'a> { plugin_loader.create_plugin_environment(module) }) .and_then(|(instance, plugin_env, subscriptions)| { - log::info!("load_plugin_instance reload_plugin_from_memory"); plugin_loader.load_plugin_instance( &instance, &plugin_env, @@ -250,7 +249,6 @@ impl<'a> PluginLoader<'a> { plugin_loader.create_plugin_environment(module) }) .and_then(|(instance, plugin_env, subscriptions)| { - log::info!("load_plugin_instance start plugin"); plugin_loader.load_plugin_instance( &instance, &plugin_env, @@ -279,7 +277,7 @@ impl<'a> PluginLoader<'a> { loading_indication: &mut LoadingIndication, ) -> Result<()> { let mut new_plugins = HashSet::new(); - for (&(plugin_id, _), _) in &*plugin_map.lock().unwrap() { + for plugin_id in plugin_map.lock().unwrap().plugin_ids() { new_plugins.insert((plugin_id, client_id)); } for (plugin_id, existing_client_id) in new_plugins { @@ -299,7 +297,6 @@ impl<'a> PluginLoader<'a> { plugin_loader.create_plugin_environment(module) }) .and_then(|(instance, plugin_env, subscriptions)| { - log::info!("load_plugin_instance add_client"); plugin_loader.load_plugin_instance( &instance, &plugin_env, @@ -347,7 +344,6 @@ impl<'a> PluginLoader<'a> { plugin_loader.create_plugin_environment(module) }) .and_then(|(instance, plugin_env, subscriptions)| { - log::info!("load_plugin_instance reload plugin"); plugin_loader.load_plugin_instance( &instance, &plugin_env, @@ -374,7 +370,7 @@ impl<'a> PluginLoader<'a> { tab_index: usize, size: Size, ) -> Result<Self> { - let plugin_own_data_dir = ZELLIJ_CACHE_DIR.join(Url::from(&plugin.location).to_string()); + let plugin_own_data_dir = ZELLIJ_SESSION_CACHE_DIR.join(Url::from(&plugin.location).to_string()).join(format!("{}-{}", plugin_id, client_id)); create_plugin_fs_entries(&plugin_own_data_dir)?; let plugin_path = plugin.path.clone(); Ok(PluginLoader { @@ -407,7 +403,7 @@ impl<'a> PluginLoader<'a> { let (running_plugin, _subscriptions, workers) = { let mut plugin_map = plugin_map.lock().unwrap(); plugin_map - .remove(&(plugin_id, client_id)) + .remove_single_plugin(plugin_id, client_id) .with_context(err_context)? }; let running_plugin = running_plugin.lock().unwrap(); @@ -442,13 +438,14 @@ impl<'a> PluginLoader<'a> { plugin_dir: &'a PathBuf, ) -> Result<Self> { let err_context = || "Failed to find existing plugin"; - let (running_plugin, _subscriptions, workers) = { + let running_plugin = { let plugin_map = plugin_map.lock().unwrap(); plugin_map - .iter() - .find(|((p_id, _c_id), _)| p_id == &plugin_id) - .with_context(err_context)? - .1 + .get_running_plugin(plugin_id, None) +// .iter() +// .find(|((p_id, _c_id), _)| p_id == &plugin_id) + .with_context(err_context)? +// .1 .clone() }; let running_plugin = running_plugin.lock().unwrap(); @@ -596,7 +593,6 @@ impl<'a> PluginLoader<'a> { plugin_map: &Arc<Mutex<PluginMap>>, subscriptions: &Arc<Mutex<Subscriptions>>, ) -> Result<()> { - log::info!("load_plugin_instance"); let err_context = || format!("failed to load plugin from instance {instance:#?}"); let main_user_instance = instance.clone(); let main_user_env = plugin_env.clone(); @@ -637,17 +633,16 @@ impl<'a> PluginLoader<'a> { self.plugin_id ); plugin_map.lock().unwrap().insert( - (self.plugin_id, self.client_id), - ( - Arc::new(Mutex::new(RunningPlugin::new( - main_user_instance, - main_user_env, - self.size.rows, - self.size.cols, - ))), - subscriptions.clone(), - workers, - ), + self.plugin_id, + self.client_id, + Arc::new(Mutex::new(RunningPlugin::new( + main_user_instance, + main_user_env, + self.size.rows, + self.size.cols, + ))), + subscriptions.clone(), + workers, ); display_loading_stage!( indicate_writing_plugin_to_cache_success, @@ -662,7 +657,6 @@ impl<'a> PluginLoader<'a> { connected_clients: &[ClientId], plugin_map: &Arc<Mutex<PluginMap>>, ) -> Result<()> { - log::info!("own client id in clone_instance_for_other_clients: {:?}", self.client_id); if !connected_clients.is_empty() { display_loading_stage!( indicate_cloning_plugin_for_other_clients, @@ -693,7 +687,6 @@ impl<'a> PluginLoader<'a> { .create_plugin_environment(module) }) .and_then(|(instance, plugin_env, subscriptions)| { - log::info!("load_plugin_instance for other clients..."); plugin_loader_for_client.load_plugin_instance( &instance, &plugin_env, diff --git a/zellij-server/src/plugins/plugin_map.rs b/zellij-server/src/plugins/plugin_map.rs index 8bbf0bca1..f8c083ed1 100644 --- a/zellij-server/src/plugins/plugin_map.rs +++ b/zellij-server/src/plugins/plugin_map.rs @@ -12,6 +12,7 @@ use wasmer_wasi::WasiEnv; use crate::{thread_bus::ThreadSenders, ClientId}; use zellij_utils::{ + input::layout::RunPluginLocation, consts::VERSION, data::EventType, input::plugins::PluginConfig }; @@ -22,8 +23,108 @@ use zellij_utils::errors::prelude::*; // so when adding/removing from the map - everything is halted, that's life // but when cloning the internal RunningPlugin and Subscriptions atomics, we can call methods on // them without blocking other instances -pub type PluginMap = - HashMap<(PluginId, ClientId), (Arc<Mutex<RunningPlugin>>, Arc<Mutex<Subscriptions>>, HashMap<String, Arc<Mutex<RunningWorker>>>)>; +#[derive(Default)] +pub struct PluginMap { + // TODO: types + plugin_assets: HashMap<(PluginId, ClientId), (Arc<Mutex<RunningPlugin>>, Arc<Mutex<Subscriptions>>, HashMap<String, Arc<Mutex<RunningWorker>>>)>, +} + +impl PluginMap { + pub fn remove_plugins(&mut self, pid: PluginId) -> Vec<(Arc<Mutex<RunningPlugin>>, Arc<Mutex<Subscriptions>>, HashMap<String, Arc<Mutex<RunningWorker>>>)> { + let mut removed = vec![]; + let ids_in_plugin_map: Vec<(PluginId, ClientId)> = self.plugin_assets.keys().copied().collect(); + for (plugin_id, client_id) in ids_in_plugin_map { + if pid == plugin_id { + if let Some(plugin_asset) = self.plugin_assets.remove(&(plugin_id, client_id)) { + removed.push(plugin_asset); + } + } + } + removed + } + // TODO: CONTINUE HERE - implement these + pub fn remove_single_plugin(&mut self, plugin_id: PluginId, client_id: ClientId) -> Option<(Arc<Mutex<RunningPlugin>>, Arc<Mutex<Subscriptions>>, HashMap<String, Arc<Mutex<RunningWorker>>>)> { + self.plugin_assets.remove(&(plugin_id, client_id)) + } + pub fn plugin_ids(&self) -> Vec<PluginId> { + let mut unique_plugins: HashSet<PluginId> = self.plugin_assets.keys().map(|(plugin_id, _client_id)| *plugin_id).collect(); + unique_plugins.drain().into_iter().collect() + } + pub fn running_plugins(&mut self) -> Vec<(PluginId, ClientId, Arc<Mutex<RunningPlugin>>)> { + self.plugin_assets + .iter() + .map(|((plugin_id, client_id), (running_plugin, _, _))| (*plugin_id, *client_id, running_plugin.clone())) + .collect() + } + pub fn running_plugins_and_subscriptions(&mut self) -> Vec<(PluginId, ClientId, Arc<Mutex<RunningPlugin>>, Arc<Mutex<Subscriptions>>)> { + self.plugin_assets + .iter() + .map(|((plugin_id, client_id), (running_plugin, subscriptions, _))| (*plugin_id, *client_id, running_plugin.clone(), subscriptions.clone())) + .collect() + } + pub fn get_running_plugin_and_subscriptions(&self, plugin_id: PluginId, client_id: ClientId) -> Option<(Arc<Mutex<RunningPlugin>>, Arc<Mutex<Subscriptions>>)> { + self.plugin_assets + .get(&(plugin_id, client_id)) + .and_then(|(running_plugin, subscriptions, _)| Some((running_plugin.clone(), subscriptions.clone()))) + } + pub fn get_running_plugin(&self, plugin_id: PluginId, client_id: Option<ClientId>) -> Option<Arc<Mutex<RunningPlugin>>> { + match client_id { + Some(client_id) => { + self.plugin_assets + .get(&(plugin_id, client_id)) + .and_then(|(running_plugin, _, _)| Some(running_plugin.clone())) + }, + None => { + self.plugin_assets + .iter() + .find(|((p_id, _), _)| *p_id == plugin_id) + .and_then(|(_, (running_plugin, _, _))| Some(running_plugin.clone())) + } + } + } + pub fn clone_worker(&self, plugin_id: PluginId, client_id: ClientId, worker_name: &str) -> Option<Arc<Mutex<RunningWorker>>> { + self.plugin_assets + .iter() + .find(|((p_id, c_id), _)| p_id == &plugin_id && c_id == &client_id) + .and_then(|(_, (_running_plugin, _subscriptions, workers))| { + if let Some(worker) = workers.get(&format!("{}_worker", worker_name)) { + Some(worker.clone()) + } else { + None + } + }).clone() + } + pub fn all_plugin_ids_for_plugin_location( + &self, + plugin_location: &RunPluginLocation, + ) -> Result<Vec<PluginId>> { + let err_context = || format!("Failed to get plugin ids for location {plugin_location}"); + let plugin_ids: Vec<PluginId> = self + .plugin_assets + .iter() + .filter(|(_, (running_plugin, _subscriptions, _workers))| { + &running_plugin.lock().unwrap().plugin_env.plugin.location == plugin_location + }) + .map(|((plugin_id, _client_id), _)| *plugin_id) + .collect(); + if plugin_ids.is_empty() { + return Err(ZellijError::PluginDoesNotExist).with_context(err_context); + } + Ok(plugin_ids) + } + pub fn insert( + &mut self, + plugin_id: PluginId, + client_id: ClientId, + running_plugin: Arc<Mutex<RunningPlugin>>, + subscriptions: Arc<Mutex<Subscriptions>>, + running_workers: HashMap<String, Arc<Mutex<RunningWorker>>> + ) { + self.plugin_assets.insert((plugin_id, client_id), (running_plugin, subscriptions, running_workers)); + } + +} + pub type Subscriptions = HashSet<EventType>; #[derive(Clone)] diff --git a/zellij-server/src/plugins/wasm_bridge.rs b/zellij-server/src/plugins/wasm_bridge.rs index 30a8026d1..36042c0db 100644 --- a/zellij-server/src/plugins/wasm_bridge.rs +++ b/zellij-server/src/plugins/wasm_bridge.rs @@ -1,6 +1,6 @@ use super::{PluginInstruction, PluginId}; use crate::plugins::plugin_loader::{PluginLoader, VersionMismatchError}; -use crate::plugins::plugin_map::{AtomicEvent, PluginEnv, PluginMap}; +use crate::plugins::plugin_map::{AtomicEvent, PluginEnv, PluginMap, RunningWorker, RunningPlugin, Subscriptions}; use crate::plugins::zellij_exports::{wasi_read_string, wasi_write_object}; use log::info; use std::{ @@ -8,7 +8,7 @@ use std::{ fmt::Display, path::PathBuf, str::FromStr, - sync::{Arc, Mutex}, + sync::{Arc, Mutex, TryLockError}, }; use wasmer::{Instance, Module, Store, Value}; use zellij_utils::async_std::task::{self, JoinHandle}; @@ -22,7 +22,6 @@ use zellij_utils::{ consts::VERSION, data::{Event, EventType}, errors::prelude::*, - errors::ZellijError, input::{ layout::{RunPlugin, RunPluginLocation}, plugins::PluginsConfig, @@ -30,6 +29,8 @@ use zellij_utils::{ pane_size::Size, }; +const RETRY_INTERVAL_MS: u64 = 100; + pub struct WasmBridge { connected_clients: Arc<Mutex<Vec<ClientId>>>, plugins: PluginsConfig, @@ -56,7 +57,7 @@ impl WasmBridge { store: Store, plugin_dir: PathBuf, ) -> Self { - let plugin_map = Arc::new(Mutex::new(HashMap::new())); + let plugin_map = Arc::new(Mutex::new(PluginMap::default())); let connected_clients: Arc<Mutex<Vec<ClientId>>> = Arc::new(Mutex::new(vec![])); let plugin_cache: Arc<Mutex<HashMap<PathBuf, Module>>> = Arc::new(Mutex::new(HashMap::new())); @@ -156,13 +157,14 @@ impl WasmBridge { Ok(plugin_id) } pub fn unload_plugin(&mut self, pid: PluginId) -> Result<()> { + // TODO: these should also be called on Zellij exit info!("Bye from plugin {}", &pid); - // TODO: remove plugin's own data directory let mut plugin_map = self.plugin_map.lock().unwrap(); - let ids_in_plugin_map: Vec<(PluginId, ClientId)> = plugin_map.keys().copied().collect(); - for (plugin_id, client_id) in ids_in_plugin_map { - if pid == plugin_id { - drop(plugin_map.remove(&(plugin_id, client_id))); + for (running_plugin, _, _) in plugin_map.remove_plugins(pid) { + let running_plugin = running_plugin.lock().unwrap(); + let cache_dir = running_plugin.plugin_env.plugin_own_data_dir.clone(); + if let Err(e) = std::fs::remove_dir_all(cache_dir) { + log::error!("Failed to remove cache dir for plugin: {:?}", e); } } Ok(()) @@ -273,16 +275,17 @@ impl WasmBridge { } pub fn resize_plugin(&mut self, pid: PluginId, new_columns: usize, new_rows: usize) -> Result<()> { let err_context = move || format!("failed to resize plugin {pid}"); - for ((plugin_id, client_id), (running_plugin, _subscriptions, workers)) in - self.plugin_map.lock().unwrap().iter_mut() - { - if self - .cached_resizes_for_pending_plugins - .contains_key(&plugin_id) - { - continue; - } - if *plugin_id == pid { + + let plugins_to_resize: Vec<(PluginId, ClientId, Arc<Mutex<RunningPlugin>>)> = self.plugin_map + .lock() + .unwrap() + .running_plugins() + .iter() + .cloned() + .filter(|(plugin_id, _client_id, _running_plugin)| !self.cached_resizes_for_pending_plugins.contains_key(&plugin_id)) + .collect(); + for (plugin_id, client_id, running_plugin) in plugins_to_resize { + if plugin_id == pid { let event_id = running_plugin .lock() .unwrap() @@ -290,8 +293,8 @@ impl WasmBridge { task::spawn({ let senders = self.senders.clone(); let running_plugin = running_plugin.clone(); - let plugin_id = *plugin_id; - let client_id = *client_id; + let plugin_id = plugin_id; + let client_id = client_id; async move { let mut running_plugin = running_plugin.lock().unwrap(); if running_plugin.apply_event_id(AtomicEvent::Resize, event_id) { @@ -346,30 +349,32 @@ impl WasmBridge { ) -> Result<()> { let err_context = || "failed to update plugin state".to_string(); + let plugins_to_update: Vec<(PluginId, ClientId, Arc<Mutex<RunningPlugin>>, Arc<Mutex<Subscriptions>>)> = self.plugin_map + .lock() + .unwrap() + .running_plugins_and_subscriptions() + .iter() + .cloned() + .filter(|(plugin_id, _client_id, _running_plugin, _subscriptions)| !&self.cached_events_for_pending_plugins.contains_key(&plugin_id)) + .collect(); for (pid, cid, event) in updates.drain(..) { - for (&(plugin_id, client_id), (running_plugin, subscriptions, workers)) in - &*self.plugin_map.lock().unwrap() - { - if self - .cached_events_for_pending_plugins - .contains_key(&plugin_id) - { |