summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAram Drevekenin <aram@poor.dev>2023-05-10 19:26:55 +0200
committerAram Drevekenin <aram@poor.dev>2023-05-10 19:26:55 +0200
commitdb53d0a56f85f1d2b55d9abb3667648d7a4e5616 (patch)
tree7b85f8496535532f96ad8cca7ccfe3815f56f682
parenta0ec6e0a83f6921018206ff0978591298be67372 (diff)
moar refactoring
-rw-r--r--Cargo.lock1
-rw-r--r--default-plugins/strider/src/main.rs15
-rw-r--r--default-plugins/strider/src/search.rs55
-rw-r--r--zellij-server/src/plugins/mod.rs19
-rw-r--r--zellij-server/src/plugins/plugin_loader.rs47
-rw-r--r--zellij-server/src/plugins/plugin_map.rs105
-rw-r--r--zellij-server/src/plugins/wasm_bridge.rs194
-rw-r--r--zellij-server/src/plugins/zellij_exports.rs5
-rw-r--r--zellij-tile/src/lib.rs77
-rw-r--r--zellij-utils/Cargo.toml1
-rw-r--r--zellij-utils/src/consts.rs2
-rw-r--r--zellij-utils/src/errors.rs1
12 files changed, 323 insertions, 199 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 64381f371..7f2555423 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4129,6 +4129,7 @@ dependencies = [
"thiserror",
"unicode-width",
"url",
+ "uuid",
"vte 0.11.0",
]
diff --git a/default-plugins/strider/src/main.rs b/default-plugins/strider/src/main.rs
index 75aa46bd4..cb8e07f46 100644
--- a/default-plugins/strider/src/main.rs
+++ b/default-plugins/strider/src/main.rs
@@ -14,20 +14,7 @@ use zellij_tile::prelude::*;
use serde_json;
register_plugin!(State);
-
-thread_local! {
- static SEARCH_WORKER: std::cell::RefCell<SearchWorker> = std::cell::RefCell::new(SearchWorker::new());
-}
-
-#[no_mangle]
-pub fn search_worker() {
- let mut json = String::new();
- std::io::stdin().read_line(&mut json).unwrap();
- let (message, payload): (String, String) = serde_json::from_str(&json).unwrap(); // TODO: no unwrap
- SEARCH_WORKER.with(|search_worker| {
- search_worker.borrow_mut().on_message(message, payload);
- });
-}
+register_worker!(SearchWorker, search_worker);
impl ZellijPlugin for State {
fn load(&mut self) {
diff --git a/default-plugins/strider/src/search.rs b/default-plugins/strider/src/search.rs
index b5346d6e7..8d516990d 100644
--- a/default-plugins/strider/src/search.rs
+++ b/default-plugins/strider/src/search.rs
@@ -170,20 +170,11 @@ pub struct SearchWorker {
skip_hidden_files: bool,
}
-impl SearchWorker {
- pub fn new() -> Self {
- SearchWorker {
- search_paths: vec![],
- search_file_contents: vec![],
- skip_hidden_files: true,
- }
- }
- pub fn on_message(&mut self, message: String, payload: String) {
+impl <'de> ZellijWorker<'de> for SearchWorker {
+ // TODO: handle out of order messages, likely when rendering
+ fn on_message(&mut self, message: String, payload: String) {
match message.as_str() { // TODO: deserialize to type
"scan_folder" => {
- if let Err(e) = std::fs::remove_file("/data/search_data") {
- eprintln!("Warning: failed to remove cache file: {:?}", e);
- }
self.populate_search_paths();
post_message_to_plugin("done_scanning_folder".into(), "".into());
}
@@ -206,6 +197,16 @@ impl SearchWorker {
_ => {}
}
}
+}
+
+impl SearchWorker {
+ pub fn new() -> Self {
+ SearchWorker {
+ search_paths: vec![],
+ search_file_contents: vec![],
+ skip_hidden_files: true,
+ }
+ }
fn search(&mut self, search_term: String) -> (String, Vec<SearchResult>) {
if self.search_paths.is_empty() {
self.populate_search_paths();
@@ -226,14 +227,12 @@ impl SearchWorker {
(search_term, matches)
}
fn populate_search_paths(&mut self) {
- // TODO: CONTINUE HERE - when we start, check to see if /data/search_data exists, if it is
- // deserialize it and place it in our own state, if not, do the below and then write to it
- if let Ok(search_data) = std::fs::read("/data/search_data") { // TODO: add cwd to here
- if let Ok(mut existing_state) = serde_json::from_str::<Self>(&String::from_utf8_lossy(&search_data)) {
- std::mem::swap(self, &mut existing_state);
- return;
- }
- }
+// if let Ok(search_data) = std::fs::read("/data/search_data") { // TODO: add cwd to here
+// if let Ok(mut existing_state) = serde_json::from_str::<Self>(&String::from_utf8_lossy(&search_data)) {
+// std::mem::swap(self, &mut existing_state);
+// return;
+// }
+// }
for entry in WalkDir::new(ROOT).into_iter().filter_map(|e| e.ok()) {
if self.skip_hidden_files && entry.file_name().to_str().map(|s| s.starts_with('.')).unwrap_or(false) {
continue;
@@ -258,14 +257,14 @@ impl SearchWorker {
self.search_paths.push(file_path);
}
- let serialized_state = serde_json::to_string(&self).unwrap(); // TODO: unwrap city
- std::fs::write("/data/search_data", serialized_state.as_bytes()).unwrap();
- if let Ok(search_data) = std::fs::read("/data/search_data") {
- if let Ok(mut existing_state) = serde_json::from_str::<Self>(&String::from_utf8_lossy(&search_data)) {
- std::mem::swap(self, &mut existing_state);
- return;
- }
- }
+// let serialized_state = serde_json::to_string(&self).unwrap(); // TODO: unwrap city
+// std::fs::write("/data/search_data", serialized_state.as_bytes()).unwrap();
+// if let Ok(search_data) = std::fs::read("/data/search_data") {
+// if let Ok(mut existing_state) = serde_json::from_str::<Self>(&String::from_utf8_lossy(&search_data)) {
+// std::mem::swap(self, &mut existing_state);
+// return;
+// }
+// }
}
fn search_file_names(&self, search_term: &str, matcher: &mut SkimMatcherV2, matches: &mut Vec<SearchResult>) {
for entry in &self.search_paths {
diff --git a/zellij-server/src/plugins/mod.rs b/zellij-server/src/plugins/mod.rs
index f4bed9be4..67a8c96c2 100644
--- a/zellij-server/src/plugins/mod.rs
+++ b/zellij-server/src/plugins/mod.rs
@@ -55,12 +55,15 @@ pub enum PluginInstruction {
ClientId,
),
ApplyCachedEvents(Vec<PluginId>),
- PostMessageToPluginWorker(
+ ApplyCachedWorkerMessages(PluginId),
+ PostMessagesToPluginWorker(
PluginId,
ClientId,
String, // worker name
- String, // serialized message
- String, // serialized payload
+ Vec<(
+ String, // serialized message name
+ String, // serialized payload
+ )>
),
PostMessageToPlugin(
PluginId,
@@ -84,7 +87,8 @@ impl From<&PluginInstruction> for PluginContext {
PluginInstruction::RemoveClient(_) => PluginContext::RemoveClient,
PluginInstruction::NewTab(..) => PluginContext::NewTab,
PluginInstruction::ApplyCachedEvents(..) => PluginContext::ApplyCachedEvents,
- PluginInstruction::PostMessageToPluginWorker(..) => PluginContext::PostMessageToPluginWorker,
+ PluginInstruction::ApplyCachedWorkerMessages(..) => PluginContext::ApplyCachedWorkerMessages,
+ PluginInstruction::PostMessagesToPluginWorker(..) => PluginContext::PostMessageToPluginWorker,
PluginInstruction::PostMessageToPlugin(..) => PluginContext::PostMessageToPlugin,
}
}
@@ -216,8 +220,11 @@ pub(crate) fn plugin_thread_main(
PluginInstruction::ApplyCachedEvents(plugin_id) => {
wasm_bridge.apply_cached_events(plugin_id)?;
},
- PluginInstruction::PostMessageToPluginWorker(plugin_id, client_id, worker_name, message, payload) => {
- wasm_bridge.post_message_to_plugin_worker(plugin_id, client_id, worker_name, message, payload)?;
+ PluginInstruction::ApplyCachedWorkerMessages(plugin_id) => {
+ wasm_bridge.apply_cached_worker_messages(plugin_id)?;
+ },
+ PluginInstruction::PostMessagesToPluginWorker(plugin_id, client_id, worker_name, messages) => {
+ wasm_bridge.post_messages_to_plugin_worker(plugin_id, client_id, worker_name, messages)?;
},
PluginInstruction::PostMessageToPlugin(plugin_id, client_id, message, payload) => {
let updates = vec![(
diff --git a/zellij-server/src/plugins/plugin_loader.rs b/zellij-server/src/plugins/plugin_loader.rs
index 2afffa81f..64314dfe0 100644
--- a/zellij-server/src/plugins/plugin_loader.rs
+++ b/zellij-server/src/plugins/plugin_loader.rs
@@ -20,7 +20,7 @@ use crate::{
};
use zellij_utils::{
- consts::{VERSION, ZELLIJ_CACHE_DIR, ZELLIJ_TMP_DIR},
+ consts::{VERSION, ZELLIJ_CACHE_DIR, ZELLIJ_SESSION_CACHE_DIR, ZELLIJ_TMP_DIR},
errors::prelude::*,
input::plugins::PluginConfig,
pane_size::Size,
@@ -199,7 +199,6 @@ impl<'a> PluginLoader<'a> {
plugin_loader.create_plugin_environment(module)
})
.and_then(|(instance, plugin_env, subscriptions)| {
- log::info!("load_plugin_instance reload_plugin_from_memory");
plugin_loader.load_plugin_instance(
&instance,
&plugin_env,
@@ -250,7 +249,6 @@ impl<'a> PluginLoader<'a> {
plugin_loader.create_plugin_environment(module)
})
.and_then(|(instance, plugin_env, subscriptions)| {
- log::info!("load_plugin_instance start plugin");
plugin_loader.load_plugin_instance(
&instance,
&plugin_env,
@@ -279,7 +277,7 @@ impl<'a> PluginLoader<'a> {
loading_indication: &mut LoadingIndication,
) -> Result<()> {
let mut new_plugins = HashSet::new();
- for (&(plugin_id, _), _) in &*plugin_map.lock().unwrap() {
+ for plugin_id in plugin_map.lock().unwrap().plugin_ids() {
new_plugins.insert((plugin_id, client_id));
}
for (plugin_id, existing_client_id) in new_plugins {
@@ -299,7 +297,6 @@ impl<'a> PluginLoader<'a> {
plugin_loader.create_plugin_environment(module)
})
.and_then(|(instance, plugin_env, subscriptions)| {
- log::info!("load_plugin_instance add_client");
plugin_loader.load_plugin_instance(
&instance,
&plugin_env,
@@ -347,7 +344,6 @@ impl<'a> PluginLoader<'a> {
plugin_loader.create_plugin_environment(module)
})
.and_then(|(instance, plugin_env, subscriptions)| {
- log::info!("load_plugin_instance reload plugin");
plugin_loader.load_plugin_instance(
&instance,
&plugin_env,
@@ -374,7 +370,7 @@ impl<'a> PluginLoader<'a> {
tab_index: usize,
size: Size,
) -> Result<Self> {
- let plugin_own_data_dir = ZELLIJ_CACHE_DIR.join(Url::from(&plugin.location).to_string());
+ let plugin_own_data_dir = ZELLIJ_SESSION_CACHE_DIR.join(Url::from(&plugin.location).to_string()).join(format!("{}-{}", plugin_id, client_id));
create_plugin_fs_entries(&plugin_own_data_dir)?;
let plugin_path = plugin.path.clone();
Ok(PluginLoader {
@@ -407,7 +403,7 @@ impl<'a> PluginLoader<'a> {
let (running_plugin, _subscriptions, workers) = {
let mut plugin_map = plugin_map.lock().unwrap();
plugin_map
- .remove(&(plugin_id, client_id))
+ .remove_single_plugin(plugin_id, client_id)
.with_context(err_context)?
};
let running_plugin = running_plugin.lock().unwrap();
@@ -442,13 +438,14 @@ impl<'a> PluginLoader<'a> {
plugin_dir: &'a PathBuf,
) -> Result<Self> {
let err_context = || "Failed to find existing plugin";
- let (running_plugin, _subscriptions, workers) = {
+ let running_plugin = {
let plugin_map = plugin_map.lock().unwrap();
plugin_map
- .iter()
- .find(|((p_id, _c_id), _)| p_id == &plugin_id)
- .with_context(err_context)?
- .1
+ .get_running_plugin(plugin_id, None)
+// .iter()
+// .find(|((p_id, _c_id), _)| p_id == &plugin_id)
+ .with_context(err_context)?
+// .1
.clone()
};
let running_plugin = running_plugin.lock().unwrap();
@@ -596,7 +593,6 @@ impl<'a> PluginLoader<'a> {
plugin_map: &Arc<Mutex<PluginMap>>,
subscriptions: &Arc<Mutex<Subscriptions>>,
) -> Result<()> {
- log::info!("load_plugin_instance");
let err_context = || format!("failed to load plugin from instance {instance:#?}");
let main_user_instance = instance.clone();
let main_user_env = plugin_env.clone();
@@ -637,17 +633,16 @@ impl<'a> PluginLoader<'a> {
self.plugin_id
);
plugin_map.lock().unwrap().insert(
- (self.plugin_id, self.client_id),
- (
- Arc::new(Mutex::new(RunningPlugin::new(
- main_user_instance,
- main_user_env,
- self.size.rows,
- self.size.cols,
- ))),
- subscriptions.clone(),
- workers,
- ),
+ self.plugin_id,
+ self.client_id,
+ Arc::new(Mutex::new(RunningPlugin::new(
+ main_user_instance,
+ main_user_env,
+ self.size.rows,
+ self.size.cols,
+ ))),
+ subscriptions.clone(),
+ workers,
);
display_loading_stage!(
indicate_writing_plugin_to_cache_success,
@@ -662,7 +657,6 @@ impl<'a> PluginLoader<'a> {
connected_clients: &[ClientId],
plugin_map: &Arc<Mutex<PluginMap>>,
) -> Result<()> {
- log::info!("own client id in clone_instance_for_other_clients: {:?}", self.client_id);
if !connected_clients.is_empty() {
display_loading_stage!(
indicate_cloning_plugin_for_other_clients,
@@ -693,7 +687,6 @@ impl<'a> PluginLoader<'a> {
.create_plugin_environment(module)
})
.and_then(|(instance, plugin_env, subscriptions)| {
- log::info!("load_plugin_instance for other clients...");
plugin_loader_for_client.load_plugin_instance(
&instance,
&plugin_env,
diff --git a/zellij-server/src/plugins/plugin_map.rs b/zellij-server/src/plugins/plugin_map.rs
index 8bbf0bca1..f8c083ed1 100644
--- a/zellij-server/src/plugins/plugin_map.rs
+++ b/zellij-server/src/plugins/plugin_map.rs
@@ -12,6 +12,7 @@ use wasmer_wasi::WasiEnv;
use crate::{thread_bus::ThreadSenders, ClientId};
use zellij_utils::{
+ input::layout::RunPluginLocation,
consts::VERSION,
data::EventType, input::plugins::PluginConfig
};
@@ -22,8 +23,108 @@ use zellij_utils::errors::prelude::*;
// so when adding/removing from the map - everything is halted, that's life
// but when cloning the internal RunningPlugin and Subscriptions atomics, we can call methods on
// them without blocking other instances
-pub type PluginMap =
- HashMap<(PluginId, ClientId), (Arc<Mutex<RunningPlugin>>, Arc<Mutex<Subscriptions>>, HashMap<String, Arc<Mutex<RunningWorker>>>)>;
+#[derive(Default)]
+pub struct PluginMap {
+ // TODO: types
+ plugin_assets: HashMap<(PluginId, ClientId), (Arc<Mutex<RunningPlugin>>, Arc<Mutex<Subscriptions>>, HashMap<String, Arc<Mutex<RunningWorker>>>)>,
+}
+
+impl PluginMap {
+ pub fn remove_plugins(&mut self, pid: PluginId) -> Vec<(Arc<Mutex<RunningPlugin>>, Arc<Mutex<Subscriptions>>, HashMap<String, Arc<Mutex<RunningWorker>>>)> {
+ let mut removed = vec![];
+ let ids_in_plugin_map: Vec<(PluginId, ClientId)> = self.plugin_assets.keys().copied().collect();
+ for (plugin_id, client_id) in ids_in_plugin_map {
+ if pid == plugin_id {
+ if let Some(plugin_asset) = self.plugin_assets.remove(&(plugin_id, client_id)) {
+ removed.push(plugin_asset);
+ }
+ }
+ }
+ removed
+ }
+ // TODO: CONTINUE HERE - implement these
+ pub fn remove_single_plugin(&mut self, plugin_id: PluginId, client_id: ClientId) -> Option<(Arc<Mutex<RunningPlugin>>, Arc<Mutex<Subscriptions>>, HashMap<String, Arc<Mutex<RunningWorker>>>)> {
+ self.plugin_assets.remove(&(plugin_id, client_id))
+ }
+ pub fn plugin_ids(&self) -> Vec<PluginId> {
+ let mut unique_plugins: HashSet<PluginId> = self.plugin_assets.keys().map(|(plugin_id, _client_id)| *plugin_id).collect();
+ unique_plugins.drain().into_iter().collect()
+ }
+ pub fn running_plugins(&mut self) -> Vec<(PluginId, ClientId, Arc<Mutex<RunningPlugin>>)> {
+ self.plugin_assets
+ .iter()
+ .map(|((plugin_id, client_id), (running_plugin, _, _))| (*plugin_id, *client_id, running_plugin.clone()))
+ .collect()
+ }
+ pub fn running_plugins_and_subscriptions(&mut self) -> Vec<(PluginId, ClientId, Arc<Mutex<RunningPlugin>>, Arc<Mutex<Subscriptions>>)> {
+ self.plugin_assets
+ .iter()
+ .map(|((plugin_id, client_id), (running_plugin, subscriptions, _))| (*plugin_id, *client_id, running_plugin.clone(), subscriptions.clone()))
+ .collect()
+ }
+ pub fn get_running_plugin_and_subscriptions(&self, plugin_id: PluginId, client_id: ClientId) -> Option<(Arc<Mutex<RunningPlugin>>, Arc<Mutex<Subscriptions>>)> {
+ self.plugin_assets
+ .get(&(plugin_id, client_id))
+ .and_then(|(running_plugin, subscriptions, _)| Some((running_plugin.clone(), subscriptions.clone())))
+ }
+ pub fn get_running_plugin(&self, plugin_id: PluginId, client_id: Option<ClientId>) -> Option<Arc<Mutex<RunningPlugin>>> {
+ match client_id {
+ Some(client_id) => {
+ self.plugin_assets
+ .get(&(plugin_id, client_id))
+ .and_then(|(running_plugin, _, _)| Some(running_plugin.clone()))
+ },
+ None => {
+ self.plugin_assets
+ .iter()
+ .find(|((p_id, _), _)| *p_id == plugin_id)
+ .and_then(|(_, (running_plugin, _, _))| Some(running_plugin.clone()))
+ }
+ }
+ }
+ pub fn clone_worker(&self, plugin_id: PluginId, client_id: ClientId, worker_name: &str) -> Option<Arc<Mutex<RunningWorker>>> {
+ self.plugin_assets
+ .iter()
+ .find(|((p_id, c_id), _)| p_id == &plugin_id && c_id == &client_id)
+ .and_then(|(_, (_running_plugin, _subscriptions, workers))| {
+ if let Some(worker) = workers.get(&format!("{}_worker", worker_name)) {
+ Some(worker.clone())
+ } else {
+ None
+ }
+ }).clone()
+ }
+ pub fn all_plugin_ids_for_plugin_location(
+ &self,
+ plugin_location: &RunPluginLocation,
+ ) -> Result<Vec<PluginId>> {
+ let err_context = || format!("Failed to get plugin ids for location {plugin_location}");
+ let plugin_ids: Vec<PluginId> = self
+ .plugin_assets
+ .iter()
+ .filter(|(_, (running_plugin, _subscriptions, _workers))| {
+ &running_plugin.lock().unwrap().plugin_env.plugin.location == plugin_location
+ })
+ .map(|((plugin_id, _client_id), _)| *plugin_id)
+ .collect();
+ if plugin_ids.is_empty() {
+ return Err(ZellijError::PluginDoesNotExist).with_context(err_context);
+ }
+ Ok(plugin_ids)
+ }
+ pub fn insert(
+ &mut self,
+ plugin_id: PluginId,
+ client_id: ClientId,
+ running_plugin: Arc<Mutex<RunningPlugin>>,
+ subscriptions: Arc<Mutex<Subscriptions>>,
+ running_workers: HashMap<String, Arc<Mutex<RunningWorker>>>
+ ) {
+ self.plugin_assets.insert((plugin_id, client_id), (running_plugin, subscriptions, running_workers));
+ }
+
+}
+
pub type Subscriptions = HashSet<EventType>;
#[derive(Clone)]
diff --git a/zellij-server/src/plugins/wasm_bridge.rs b/zellij-server/src/plugins/wasm_bridge.rs
index 30a8026d1..36042c0db 100644
--- a/zellij-server/src/plugins/wasm_bridge.rs
+++ b/zellij-server/src/plugins/wasm_bridge.rs
@@ -1,6 +1,6 @@
use super::{PluginInstruction, PluginId};
use crate::plugins::plugin_loader::{PluginLoader, VersionMismatchError};
-use crate::plugins::plugin_map::{AtomicEvent, PluginEnv, PluginMap};
+use crate::plugins::plugin_map::{AtomicEvent, PluginEnv, PluginMap, RunningWorker, RunningPlugin, Subscriptions};
use crate::plugins::zellij_exports::{wasi_read_string, wasi_write_object};
use log::info;
use std::{
@@ -8,7 +8,7 @@ use std::{
fmt::Display,
path::PathBuf,
str::FromStr,
- sync::{Arc, Mutex},
+ sync::{Arc, Mutex, TryLockError},
};
use wasmer::{Instance, Module, Store, Value};
use zellij_utils::async_std::task::{self, JoinHandle};
@@ -22,7 +22,6 @@ use zellij_utils::{
consts::VERSION,
data::{Event, EventType},
errors::prelude::*,
- errors::ZellijError,
input::{
layout::{RunPlugin, RunPluginLocation},
plugins::PluginsConfig,
@@ -30,6 +29,8 @@ use zellij_utils::{
pane_size::Size,
};
+const RETRY_INTERVAL_MS: u64 = 100;
+
pub struct WasmBridge {
connected_clients: Arc<Mutex<Vec<ClientId>>>,
plugins: PluginsConfig,
@@ -56,7 +57,7 @@ impl WasmBridge {
store: Store,
plugin_dir: PathBuf,
) -> Self {
- let plugin_map = Arc::new(Mutex::new(HashMap::new()));
+ let plugin_map = Arc::new(Mutex::new(PluginMap::default()));
let connected_clients: Arc<Mutex<Vec<ClientId>>> = Arc::new(Mutex::new(vec![]));
let plugin_cache: Arc<Mutex<HashMap<PathBuf, Module>>> =
Arc::new(Mutex::new(HashMap::new()));
@@ -156,13 +157,14 @@ impl WasmBridge {
Ok(plugin_id)
}
pub fn unload_plugin(&mut self, pid: PluginId) -> Result<()> {
+ // TODO: these should also be called on Zellij exit
info!("Bye from plugin {}", &pid);
- // TODO: remove plugin's own data directory
let mut plugin_map = self.plugin_map.lock().unwrap();
- let ids_in_plugin_map: Vec<(PluginId, ClientId)> = plugin_map.keys().copied().collect();
- for (plugin_id, client_id) in ids_in_plugin_map {
- if pid == plugin_id {
- drop(plugin_map.remove(&(plugin_id, client_id)));
+ for (running_plugin, _, _) in plugin_map.remove_plugins(pid) {
+ let running_plugin = running_plugin.lock().unwrap();
+ let cache_dir = running_plugin.plugin_env.plugin_own_data_dir.clone();
+ if let Err(e) = std::fs::remove_dir_all(cache_dir) {
+ log::error!("Failed to remove cache dir for plugin: {:?}", e);
}
}
Ok(())
@@ -273,16 +275,17 @@ impl WasmBridge {
}
pub fn resize_plugin(&mut self, pid: PluginId, new_columns: usize, new_rows: usize) -> Result<()> {
let err_context = move || format!("failed to resize plugin {pid}");
- for ((plugin_id, client_id), (running_plugin, _subscriptions, workers)) in
- self.plugin_map.lock().unwrap().iter_mut()
- {
- if self
- .cached_resizes_for_pending_plugins
- .contains_key(&plugin_id)
- {
- continue;
- }
- if *plugin_id == pid {
+
+ let plugins_to_resize: Vec<(PluginId, ClientId, Arc<Mutex<RunningPlugin>>)> = self.plugin_map
+ .lock()
+ .unwrap()
+ .running_plugins()
+ .iter()
+ .cloned()
+ .filter(|(plugin_id, _client_id, _running_plugin)| !self.cached_resizes_for_pending_plugins.contains_key(&plugin_id))
+ .collect();
+ for (plugin_id, client_id, running_plugin) in plugins_to_resize {
+ if plugin_id == pid {
let event_id = running_plugin
.lock()
.unwrap()
@@ -290,8 +293,8 @@ impl WasmBridge {
task::spawn({
let senders = self.senders.clone();
let running_plugin = running_plugin.clone();
- let plugin_id = *plugin_id;
- let client_id = *client_id;
+ let plugin_id = plugin_id;
+ let client_id = client_id;
async move {
let mut running_plugin = running_plugin.lock().unwrap();
if running_plugin.apply_event_id(AtomicEvent::Resize, event_id) {
@@ -346,30 +349,32 @@ impl WasmBridge {
) -> Result<()> {
let err_context = || "failed to update plugin state".to_string();
+ let plugins_to_update: Vec<(PluginId, ClientId, Arc<Mutex<RunningPlugin>>, Arc<Mutex<Subscriptions>>)> = self.plugin_map
+ .lock()
+ .unwrap()
+ .running_plugins_and_subscriptions()
+ .iter()
+ .cloned()
+ .filter(|(plugin_id, _client_id, _running_plugin, _subscriptions)| !&self.cached_events_for_pending_plugins.contains_key(&plugin_id))
+ .collect();
for (pid, cid, event) in updates.drain(..) {
- for (&(plugin_id, client_id), (running_plugin, subscriptions, workers)) in
- &*self.plugin_map.lock().unwrap()
- {
- if self
- .cached_events_for_pending_plugins
- .contains_key(&plugin_id)
- {