summaryrefslogtreecommitdiffstats
path: root/ui/src/plugins.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ui/src/plugins.rs')
-rw-r--r--ui/src/plugins.rs280
1 files changed, 173 insertions, 107 deletions
diff --git a/ui/src/plugins.rs b/ui/src/plugins.rs
index a661e853..75092f8e 100644
--- a/ui/src/plugins.rs
+++ b/ui/src/plugins.rs
@@ -19,23 +19,25 @@
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
-use crate::workers::WorkController;
use melib::error::{MeliError, Result};
-use rmpv::{Value, ValueRef};
-use std::any::TypeId;
+use rmpv::Value;
use std::collections::HashMap;
-use std::io::{self, BufRead, BufReader};
-use std::io::{Read, Write};
+use std::io::Write;
use std::os::unix::net::{UnixListener, UnixStream};
-use std::path::{Path, PathBuf};
-use std::process::{Command, Stdio};
-use std::thread;
-use std::thread::ThreadId;
+use std::process::Stdio;
+use uuid::Uuid;
-#[derive(Debug, Clone, Serialize, Deserialize)]
+pub mod backend;
+pub mod rpc;
+pub use rpc::*;
+
+pub const BACKEND_FN: i8 = 0;
+
+#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub enum PluginKind {
LongLived,
- Ephemeral,
+ Filter,
+ Backend,
}
impl Default for PluginKind {
@@ -49,13 +51,24 @@ pub struct Plugin {
kind: PluginKind,
executable: String,
name: String,
+ #[serde(default)]
+ hooks: Vec<String>,
+}
+
+impl Plugin {
+ pub fn kind(&self) -> PluginKind {
+ self.kind
+ }
}
#[derive(Debug)]
pub struct PluginManager {
plugins: HashMap<String, Plugin>,
- instances: HashMap<String, std::process::Child>,
+ sessions: HashMap<Uuid, String>,
+ instances: HashMap<Uuid, std::process::Child>,
+ streams: HashMap<Uuid, RpcChannel>,
hooks: HashMap<String, UIHook>,
+ listener: UnixListener,
}
impl Drop for PluginManager {
@@ -68,57 +81,81 @@ impl PluginManager {
pub fn new() -> Self {
let _ = std::fs::remove_file("./soworkfile");
let listener = UnixListener::bind("./soworkfile").unwrap();
- debug!("bound");
- // accept connections and process them, spawning a new thread for each one
- thread::spawn(move || {
- debug!("spawn");
- let stream = listener.accept();
- debug!("socket stream {:?}", &stream);
- match stream {
- Ok((mut stream, _)) => {
- debug!("socket stream {:?}", &stream);
- /* connection succeeded */
- thread::spawn(move || {
- debug!("socket listen {:?}", &stream);
- debug!(initialize(stream));
- //let mut response = Vec::new();
- //debug!(stream.read_to_end(&mut response));
- //loop {
- // debug!("pre-flush 1");
- // stream.flush();
- // debug!("post-flush 1");
- // if debug!(rmpv::decode::value::read_value(&mut stream)).is_err() {
- // return;
- // }
- // debug!("post-read_value");
- // //debug!("socket response {}", unsafe {
- // // String::from_utf8_lossy(&response)
- // //});
- // stream.flush();
- // debug!("post-flush 2");
- // if debug!(rmpv::encode::write_value(
- // &mut stream,
- // &rmpv::Value::String("hello 2 u 2".into())
- // ))
- // .is_err()
- // {
- // return;
- // }
- // debug!("post-write_value");
- //}
- });
- }
- Err(err) => {
- /* connection failed */
- debug!(err);
+ /*
+ debug!("bound");
+ // accept connections and process them, spawning a new thread for each one
+ thread::spawn(move || {
+ debug!("spawn");
+ let stream = listener.accept();
+ debug!("socket stream {:?}", &stream);
+ match stream {
+ Ok((mut stream, _)) => {
+ debug!("socket stream {:?}", &stream);
+ /* connection succeeded */
+ thread::spawn(move || {
+ debug!("socket listen {:?}", &stream);
+ debug!(initialize(stream));
+ //let mut response = Vec::new();
+ //debug!(stream.read_to_end(&mut response));
+ //loop {
+ // debug!("pre-flush 1");
+ // stream.flush();
+ // debug!("post-flush 1");
+ // if debug!(rmpv::decode::value::read_value(&mut stream)).is_err() {
+ // return;
+ // }
+ // debug!("post-read_value");
+ // //debug!("socket response {}", unsafe {
+ // // String::from_utf8_lossy(&response)
+ // //});
+ // stream.flush();
+ // debug!("post-flush 2");
+ // if debug!(rmpv::encode::write_value(
+ // &mut stream,
+ // &rmpv::Value::String("hello 2 u 2".into())
+ // ))
+ // .is_err()
+ // {
+ // return;
+ // }
+ // debug!("post-write_value");
+ //}
+ });
+ }
+ Err(err) => {
+ /* connection failed */
+ debug!(err);
+ }
}
- }
- });
+ });
+ */
+ let mut hooks: HashMap<String, UIHook> = Default::default();
+
+ hooks.insert(
+ "attachment-view".to_string(),
+ UIHook {
+ name: "attachment-view".to_string(),
+ wait_response: true,
+ listeners: Vec::new(),
+ },
+ );
+
+ hooks.insert(
+ "refresh-account".to_string(),
+ UIHook {
+ name: "refresh-account".to_string(),
+ wait_response: false,
+ listeners: Vec::new(),
+ },
+ );
PluginManager {
plugins: Default::default(),
+ sessions: Default::default(),
instances: Default::default(),
- hooks: Default::default(),
+ streams: Default::default(),
+ hooks,
+ listener,
}
}
@@ -128,19 +165,38 @@ impl PluginManager {
PluginKind::LongLived => {
/* spawn thread */
let parts = split_command!(&plugin.executable);
- let mut child = std::process::Command::new(&parts[0])
+ let child = std::process::Command::new(&parts[0])
.args(&parts[1..])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
+ let (mut stream, _) = self.listener.accept()?;
+ /* send init message to plugin to register hooks */
+ let session = Uuid::new_v4();
+ let channel = RpcChannel::new(stream, &session)?;
+
+ for h in &plugin.hooks {
+ self.add_listener(h, session.clone());
+ }
+
+ self.instances.insert(session.clone(), child);
+ self.sessions.insert(session.clone(), plugin.name.clone());
+ self.streams.insert(session.clone(), channel);
+ self.plugins.insert(plugin.name.clone(), plugin);
+ Ok(())
+ }
+ PluginKind::Filter => {
+ let session = Uuid::new_v4();
+ for h in &plugin.hooks {
+ self.add_listener(h, session.clone());
+ }
- /* add thread to workcontroller */
- self.instances.insert(plugin.name.clone(), child);
+ self.sessions.insert(session.clone(), plugin.name.clone());
self.plugins.insert(plugin.name.clone(), plugin);
/* send init message to plugin to register hooks */
Ok(())
}
- PluginKind::Ephemeral => {
+ PluginKind::Backend => {
self.plugins.insert(plugin.name.clone(), plugin);
/* send init message to plugin to register hooks */
Ok(())
@@ -151,57 +207,67 @@ impl PluginManager {
pub fn register_hook(&mut self, hook: UIHook) {
self.hooks.insert(hook.name.clone(), hook);
}
-}
-#[derive(Debug)]
-pub struct UIHook {
- name: String,
- listeners: Vec<String>,
- kind: TypeId,
-}
-
-#[derive(Debug, Serialize, Deserialize)]
-pub struct PluginGreeting {
- version: String,
-}
+ pub fn add_listener(&mut self, hook: &str, session: Uuid) {
+ self.hooks
+ .entry(hook.to_string())
+ .and_modify(|entry| entry.listeners.push(session));
+ }
-pub fn initialize(mut stream: UnixStream) -> Result<()> {
- let greeting: std::result::Result<PluginGreeting, _> =
- rmp_serde::decode::from_read(&mut stream);
- match debug!(greeting) {
- Ok(greeting) => {
- if greeting.version != "dev" {
- return Err("Plugin is not compatible with our API (dev)".into());
+ pub fn activate_hook(&mut self, hook: &str, bytes: Vec<u8>) -> Result<FilterResult> {
+ debug!("activate_hook {}", hook);
+ debug!("bytes {:?}", &bytes);
+ for l in &self.hooks[hook].listeners {
+ let plugin = &self.plugins[&self.sessions[l]];
+ debug!(&plugin);
+ match &plugin.kind {
+ PluginKind::LongLived => {
+ debug!("listener: {}", l);
+ let channel = self.streams.get_mut(l).unwrap();
+ channel.write_ref(&rmpv::ValueRef::Binary(bytes.as_slice()));
+ let reply: Result<FilterResult> = channel.from_read();
+ return reply;
+ }
+ PluginKind::Filter => {
+ let parts = split_command!(&plugin.executable);
+ let child = std::process::Command::new(&parts[0])
+ .args(&parts[1..])
+ .stdin(Stdio::piped())
+ .stdout(Stdio::piped())
+ .spawn()?;
+ let (mut stream, _) = self.listener.accept()?;
+ let mut channel = RpcChannel::new(stream, l)?;
+ channel.write_ref(&rmpv::ValueRef::Binary(bytes.as_slice()));
+ let reply: Result<FilterResult> = channel.from_read();
+ return reply;
+ }
+ k => {
+ debug!("got plugin kind {:?} in hook {}", k, hook);
+ }
}
}
- Err(err) => {
- return Err(MeliError::new(err.to_string()));
- }
+ Err(MeliError::new("no listeners for this hook"))
}
- loop {
- debug!("pre-flush 1");
- stream.flush();
- debug!("post-flush 1");
- if debug!(rmpv::decode::value::read_value(&mut stream)).is_err() {
- break;
- }
- debug!("post-read_value");
- //debug!("socket response {}", unsafe {
- // String::from_utf8_lossy(&response)
- //});
- stream.flush();
- debug!("post-flush 2");
- if debug!(rmpv::encode::write_value(
- &mut stream,
- &rmpv::Value::String("hello 2 u 2".into())
- ))
- .is_err()
- {
- break;
- }
- debug!("post-write_value");
+ pub fn listener(&self) -> UnixListener {
+ self.listener.try_clone().unwrap()
}
+}
- return Ok(());
+#[derive(Debug)]
+pub struct UIHook {
+ name: String,
+ wait_response: bool,
+ listeners: Vec<Uuid>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+#[serde(tag = "t", content = "c")]
+pub enum FilterResult {
+ UiMessage(String),
+ Text(String),
+ Ansi(String),
+ Binary(Vec<u8>),
+ Error(String),
}