summaryrefslogtreecommitdiffstats
path: root/ui/src/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'ui/src/plugins')
-rw-r--r--ui/src/plugins/backend.rs190
-rwxr-xr-xui/src/plugins/python3/ansi-plugin.py48
-rw-r--r--ui/src/plugins/python3/libmeliapi.py173
-rwxr-xr-xui/src/plugins/python3/nntp-backend.py92
-rw-r--r--ui/src/plugins/rpc.rs144
5 files changed, 647 insertions, 0 deletions
diff --git a/ui/src/plugins/backend.rs b/ui/src/plugins/backend.rs
new file mode 100644
index 00000000..2cd701c3
--- /dev/null
+++ b/ui/src/plugins/backend.rs
@@ -0,0 +1,190 @@
+/*
+ * meli - plugins
+ *
+ * Copyright 2019 Manos Pitsidianakis
+ *
+ * This file is part of meli.
+ *
+ * meli is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * meli is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with meli. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+use super::*;
+use fnv::FnvHashMap;
+use melib::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext};
+use melib::backends::FolderHash;
+use melib::backends::{
+ Backend, BackendOp, Backends, Folder, MailBackend, RefreshEvent, RefreshEventConsumer,
+};
+use melib::conf::AccountSettings;
+use melib::email::{Envelope, EnvelopeHash, Flag};
+use melib::error::{MeliError, Result};
+use std::collections::BTreeMap;
+use std::sync::{Arc, Mutex, RwLock};
+
+#[derive(Debug)]
+pub struct PluginBackend {
+ plugin: Plugin,
+ child: std::process::Child,
+ channel: Arc<Mutex<RpcChannel>>,
+ is_online: Arc<Mutex<(std::time::Instant, Result<()>)>>,
+}
+
+impl MailBackend for PluginBackend {
+ fn is_online(&self) -> Result<()> {
+ if let Ok(mut is_online) = self.is_online.try_lock() {
+ let now = std::time::Instant::now();
+ if now.duration_since(is_online.0) >= std::time::Duration::new(2, 0) {
+ let mut channel = self.channel.lock().unwrap();
+ channel.write_ref(&rmpv::ValueRef::Ext(BACKEND_FN, b"is_online"))?;
+ debug!(channel.expect_ack())?;
+ let ret: PluginResult<()> = debug!(channel.from_read())?;
+ is_online.0 = now;
+ is_online.1 = ret.into();
+ }
+ is_online.1.clone()
+ } else {
+ Err(MeliError::new("busy"))
+ }
+ }
+
+ fn connect(&mut self) {}
+
+ fn get(&mut self, folder: &Folder) -> Async<Result<Vec<Envelope>>> {
+ let mut w = AsyncBuilder::new();
+ let folder_hash = folder.hash();
+ let channel = self.channel.clone();
+ let handle = {
+ let tx = w.tx();
+ let closure = move |_work_context| {
+ let mut channel = channel.lock().unwrap();
+ channel
+ .write_ref(&rmpv::ValueRef::Ext(BACKEND_FN, b"get"))
+ .unwrap();
+ channel.expect_ack().unwrap();
+ loop {
+ let read_val: Result<PluginResult<Option<Vec<String>>>> =
+ debug!(channel.from_read());
+ match read_val.map(Into::into).and_then(std::convert::identity) {
+ Ok(Some(a)) => {
+ tx.send(AsyncStatus::Payload(Ok(a
+ .into_iter()
+ .filter_map(|s| Envelope::from_bytes(s.as_bytes(), None).ok())
+ .collect::<Vec<Envelope>>())))
+ .unwrap();
+ }
+ Ok(None) => {
+ tx.send(AsyncStatus::Finished).unwrap();
+ return;
+ }
+ Err(err) => {
+ tx.send(AsyncStatus::Payload(Err(err))).unwrap();
+ tx.send(AsyncStatus::Finished).unwrap();
+ return;
+ }
+ };
+ }
+ };
+ Box::new(closure)
+ };
+ w.build(handle)
+ }
+
+ fn refresh(
+ &mut self,
+ _folder_hash: FolderHash,
+ _sender: RefreshEventConsumer,
+ ) -> Result<Async<Result<Vec<RefreshEvent>>>> {
+ Err(MeliError::new("Unimplemented."))
+ }
+ fn watch(
+ &self,
+ sender: RefreshEventConsumer,
+ work_context: WorkContext,
+ ) -> Result<std::thread::ThreadId> {
+ Err(MeliError::new("Unimplemented."))
+ }
+ fn folders(&self) -> Result<FnvHashMap<FolderHash, Folder>> {
+ let mut ret: FnvHashMap<FolderHash, Folder> = Default::default();
+ ret.insert(0, Folder::default());
+ Ok(ret)
+ }
+ fn operation(&self, hash: EnvelopeHash) -> Box<dyn BackendOp> {
+ unimplemented!()
+ }
+
+ fn save(&self, bytes: &[u8], folder: &str, flags: Option<Flag>) -> Result<()> {
+ Err(MeliError::new("Unimplemented."))
+ }
+ fn create_folder(&mut self, name: String) -> Result<Folder> {
+ Err(MeliError::new("Unimplemented."))
+ }
+ fn tags(&self) -> Option<Arc<RwLock<BTreeMap<u64, String>>>> {
+ None
+ }
+ fn as_any(&self) -> &dyn::std::any::Any {
+ self
+ }
+}
+
+impl PluginBackend {
+ pub fn new(
+ listener: UnixListener,
+ plugin: Plugin,
+ _s: &AccountSettings,
+ _is_subscribed: Box<dyn Fn(&str) -> bool>,
+ ) -> Result<Box<dyn MailBackend>> {
+ if plugin.kind != PluginKind::Backend {
+ return Err(MeliError::new(format!(
+ "Error: Plugin `{}` is not a mail backend plugin, it's `{:?}`",
+ &plugin.name, &plugin.kind
+ )));
+ }
+ let parts = split_command!(&plugin.executable);
+ let child = std::process::Command::new(&parts[0])
+ .args(&parts[1..])
+ .stdin(Stdio::piped())
+ .stdout(Stdio::piped())
+ .spawn()?;
+ let (mut stream, _) = listener.accept()?;
+ /* send init message to plugin to register hooks */
+ let session = Uuid::new_v4();
+ let channel = RpcChannel::new(stream, &session)?;
+ let now = std::time::Instant::now() - std::time::Duration::from_secs(5);
+
+ Ok(Box::new(PluginBackend {
+ child,
+ plugin,
+ channel: Arc::new(Mutex::new(channel)),
+ is_online: Arc::new(Mutex::new((now, Err(MeliError::new("Unitialized"))))),
+ }))
+ }
+
+ pub fn register(listener: UnixListener, plugin: Plugin, backends: &mut Backends) {
+ backends.register(
+ plugin.name.clone(),
+ Backend {
+ create_fn: Box::new(move || {
+ let plugin = plugin.clone();
+ let listener = listener.try_clone().unwrap();
+ Box::new(move |f, i| {
+ let plugin = plugin.clone();
+ let listener = listener.try_clone().unwrap();
+ PluginBackend::new(listener, plugin, f, i)
+ })
+ }),
+ validate_conf_fn: Box::new(|_| Ok(())),
+ },
+ );
+ }
+}
diff --git a/ui/src/plugins/python3/ansi-plugin.py b/ui/src/plugins/python3/ansi-plugin.py
new file mode 100755
index 00000000..507cae72
--- /dev/null
+++ b/ui/src/plugins/python3/ansi-plugin.py
@@ -0,0 +1,48 @@
+#! /usr/bin/env python3
+"""
+meli - sample plugin
+
+Copyright 2019 Manos Pitsidianakis
+
+This file is part of meli.
+
+meli is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+meli is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with meli. If not, see <http://www.gnu.org/licenses/>.
+"""
+
+import sys
+import subprocess
+print(sys.path, file=sys.stderr)
+from libmeliapi import Client
+
+if __name__ == "__main__":
+ server_address = './soworkfile'
+ client = Client(server_address)
+ client.connect()
+ try:
+ _bytes = client.read()
+ print('got bytes {!r}'.format(_bytes),file=sys.stderr, )
+
+ # run() returns a CompletedProcess object if it was successful
+ # errors in the created process are raised here too
+ process = subprocess.run(['tiv','-w', '120','-h', '40', _bytes[0]], check=True, stdout=subprocess.PIPE, universal_newlines=True)
+ output = process.stdout
+ print('tiv output len {}'.format(len(output)),file=sys.stderr, )
+ #print('tiv output bytes {!r}'.format(output),file=sys.stderr, )
+
+ message = { "t": "ansi", "c": output }
+ #print('sending {!r}'.format(message),file=sys.stderr, )
+ print('returned :', client.send(message), file=sys.stderr,)
+ except Exception as msg:
+ print(msg, file=sys.stderr,)
+
diff --git a/ui/src/plugins/python3/libmeliapi.py b/ui/src/plugins/python3/libmeliapi.py
new file mode 100644
index 00000000..044b1a9a
--- /dev/null
+++ b/ui/src/plugins/python3/libmeliapi.py
@@ -0,0 +1,173 @@
+"""
+meli - python3 api plugin
+
+Copyright 2019 Manos Pitsidianakis
+
+This file is part of meli.
+
+meli is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+meli is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with meli. If not, see <http://www.gnu.org/licenses/>.
+"""
+
+from collections import deque
+import errno
+import json
+import msgpack
+import socket
+import struct
+import sys
+import time
+
+class IPCError(Exception):
+ pass
+
+class UnknownMessageClass(IPCError):
+ pass
+
+class InvalidSerialization(IPCError):
+ pass
+
+class ConnectionClosed(IPCError):
+ pass
+
+
+def _read_objects(sock):
+ unpacker = msgpack.Unpacker()
+ ret = []
+ #reader = socket.socket.makefile(sock, 'rb')
+ counter = 0
+ while True:
+ print("[libmeliapi]: _read_objects loop = ", counter, flush=True, file=sys.stderr)
+ counter += 1
+ try:
+ buf = sock.recv(1024**2)
+ if not buf:
+ break
+ unpacker.feed(buf)
+ for o in unpacker:
+ ret.append(o)
+ except:
+ break
+ return ret
+
+ #try:
+ # for unpack in unpacker:
+ # return unpack
+ #except Exception as e:
+ # print("[libmeliapi]: ", "_read_objects error ", e, file=sys.stderr,)
+ # return None
+ #finally:
+ # reader.flush()
+
+def _write_objects(sock, objects):
+ sys.stderr.flush()
+ print("[libmeliapi]: ", "_write_objects ", objects, flush=True, file=sys.stderr, )
+ data = msgpack.packb(objects)
+ #print("[libmeliapi]: ", "_write_objects data ", data, flush=True, file=sys.stderr, )
+ sent = 0
+
+ while sent < len(data):
+ try:
+ _len = min(len(data[sent:]), 2048)
+ sent += sock.send(data[sent:sent+_len])
+ except IOError as e:
+ print("[libmeliapi]: IOError: ", e, e.errno, flush=True, file=sys.stderr, )
+ sys.stderr.flush()
+ if e.errno == errno.EWOULDBLOCK:
+ break
+ else:
+ raise
+
+class Client(object):
+ def __init__(self, server_address):
+ self.buffer = deque()
+ self.addr = server_address
+ address_family = socket.AF_UNIX
+ self.sock = socket.socket(address_family, socket.SOCK_STREAM)
+ self.sock.setblocking(0)
+
+ def connect(self):
+ try:
+ self.sock.connect(self.addr)
+
+ print("[libmeliapi]: ", "self.send({ \"version\": \"dev\" }) = ",self.send({ "version": "dev" }), flush=True, file=sys.stderr)
+ self.expect_ack()
+ self._session = self.read()
+ self.ack()
+ print("[libmeliapi]: ", "self.buffer =", self.buffer, flush=True, file=sys.stderr, )
+ print("[libmeliapi]: ", "connected, session id is", self._session, flush=True, file=sys.stderr)
+ except socket.error as msg:
+ print("[libmeliapi]: ", msg, flush=True, file=sys.stderr, )
+ sys.stderr.flush()
+ sys.exit(1)
+
+ def close(self):
+ self.sock.close()
+
+ def setblocking(self, new_val):
+ self.sock.setblocking(new_val)
+
+ def __enter__(self):
+ self.connect()
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.close()
+
+ def send(self, objects):
+ sys.stderr.flush()
+ print("[libmeliapi]: ", "stuck in send ", self.buffer, flush=True, file=sys.stderr, )
+ _write_objects(self.sock, objects)
+ print("[libmeliapi]: ", "unstuck wrote objs", flush=True, file=sys.stderr, )
+ #print("[libmeliapi]: ", "wrote object ", objects, file=sys.stderr)
+ time.sleep(0.1)
+
+ def ack(self):
+ sys.stderr.flush()
+ _write_objects(self.sock, 0x06)
+ time.sleep(0.1)
+
+ def expect_ack(self):
+ print("[libmeliapi]: expect_ack, ", self.buffer, flush=True, file=sys.stderr, )
+ read_list = _read_objects(self.sock)
+ time.sleep(0.1)
+ self.buffer.extend(read_list)
+ if len(self.buffer) > 0 and self.buffer.popleft() == 0x6:
+ print("[libmeliapi]: got_ack, ", self.buffer, flush=True, file=sys.stderr, )
+ return
+ else:
+ raise "ACK expected"
+
+ def read(self):
+ sys.stderr.flush()
+ print("[libmeliapi]: ", "stuck in read ", self.buffer, flush=True, file=sys.stderr, )
+ read_list = _read_objects(self.sock)
+ time.sleep(0.1)
+ self.buffer.extend(read_list)
+ print("[libmeliapi]: ", "unstuck read self.buffer =", self.buffer, flush=True, file=sys.stderr, )
+ if len(self.buffer) > 0:
+ return self.buffer.popleft()
+ else:
+ return None
+
+ @property
+ def backend_fn_type(self):
+ return 0
+
+ def backend_fn_ok_send(self, objects):
+ self.send({"t": "ok", "c": objects })
+ self.expect_ack()
+
+ def backend_fn_err_send(self, objects):
+ self.send({"t": "err", "c": objects })
+ self.expect_ack()
diff --git a/ui/src/plugins/python3/nntp-backend.py b/ui/src/plugins/python3/nntp-backend.py
new file mode 100755
index 00000000..37331eef
--- /dev/null
+++ b/ui/src/plugins/python3/nntp-backend.py
@@ -0,0 +1,92 @@
+#! /usr/bin/env python3
+"""
+meli - sample plugin
+
+Copyright 2019 Manos Pitsidianakis
+
+This file is part of meli.
+
+meli is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+meli is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with meli. If not, see <http://www.gnu.org/licenses/>.
+"""
+
+import sys
+import time
+import subprocess
+import msgpack
+import nntplib
+import libmeliapi
+import itertools
+
+def chunks(iterable, n):
+ while True:
+ try:
+ yield itertools.chain((next(iterable),), itertools.islice(iterable, n-1))
+ except:
+ break
+
+
+if __name__ == "__main__":
+ import importlib
+ importlib.reload(libmeliapi)
+ server_address = './soworkfile'
+ client = libmeliapi.Client(server_address)
+ client.connect()
+ #client.setblocking(True)
+ try:
+ counter = 0
+ while True:
+ print("[nntp-plugin]: loop = ", counter, flush=True, file=sys.stderr)
+ counter += 1
+ req = client.read()
+ if req is None:
+ time.sleep(0.15)
+ continue
+ #client.setblocking(True)
+ client.ack()
+ print("[nntp-plugin]: ", "req: ", req, flush=True, file=sys.stderr)
+ sys.stderr.flush()
+ if isinstance(req, msgpack.ExtType):
+ print("[nntp-plugin]: ", req, flush=True, file=sys.stderr)
+ if req.data == b'is_online':
+ client.backend_fn_ok_send(None)
+ elif req.data == b'get':
+ s = nntplib.NNTP('news.gmane.org')
+ resp, count, first, last, name = s.group('gmane.comp.python.committers')
+ print('Group', name, 'has', count, 'articles, range', first, 'to', last, flush=True, file=sys.stderr)
+
+ resp, overviews = s.over((last - 9, last))
+ ids = []
+ for id, over in overviews:
+ ids.append(id)
+ print(id, nntplib.decode_header(over['subject']), flush=True, file=sys.stderr)
+ for chunk in chunks(iter(ids), 2):
+ ret = []
+ for _id in chunk:
+ resp, info = s.article(_id)
+ #print(_id, " line0 = ", str(info.lines[0], 'utf-8', 'ignore'))
+ elem = b'\n'.join(info.lines)
+ ret.append(str(elem, 'utf-8', 'ignore'))
+ print("ret len = ", len(ret), flush=True,file=sys.stderr)
+ client.backend_fn_ok_send(ret)
+ time.sleep(0.85)
+ s.quit()
+ client.backend_fn_ok_send(None)
+ #client.setblocking(True)
+ time.sleep(0.15)
+
+
+ except Exception as msg:
+ print("[nntp-plugin]: ", msg, flush=True, file=sys.stderr,)
+ sys.stderr.flush()
+
diff --git a/ui/src/plugins/rpc.rs b/ui/src/plugins/rpc.rs
new file mode 100644
index 00000000..1d3b91a0
--- /dev/null
+++ b/ui/src/plugins/rpc.rs
@@ -0,0 +1,144 @@
+/*
+ * meli - plugins
+ *
+ * Copyright 2019 Manos Pitsidianakis
+ *
+ * This file is part of meli.
+ *
+ * meli is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * meli is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with meli. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+use super::*;
+use rmp_serde::Deserializer;
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug)]
+pub struct RpcChannel {
+ stream: UnixStream,
+ session: Uuid,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub struct PluginGreeting {
+ version: String,
+}
+
+impl RpcChannel {
+ pub fn new(mut stream: UnixStream, session: &Uuid) -> Result<RpcChannel> {
+ let mut ret = RpcChannel {
+ stream,
+ session: session.clone(),
+ };
+ let greeting: PluginGreeting = ret.from_read().map_err(|err| {
+ MeliError::new(format!("Could not get correct plugin greeting: {}", err))
+ })?;
+ debug!(&greeting);
+ //if greeting.version != "dev" {
+ // return Err("Plugin is not compatible with our API (dev)".into());
+ //}
+ ret.write_ref(&rmpv::ValueRef::String(session.to_string().as_str().into()))?;
+ debug!(ret.expect_ack())?;
+ Ok(ret)
+ }
+
+ pub fn expect_ack(&mut self) -> Result<()> {
+ debug!("expect_ack()");
+ let ack: u32 = debug!(rmp_serde::decode::from_read(&mut self.stream))
+ .map_err(|_| MeliError::new("Plugin did not return ACK."))?;
+ if 0x6 == ack {
+ Ok(())
+ } else {
+ Err(MeliError::new("Plugin did not return ACK."))
+ }
+ }
+
+ pub fn ack(&mut self) -> Result<()> {
+ debug!("ack()");
+ debug!(rmpv::encode::write_value_ref(
+ &mut self.stream,
+ &rmpv::ValueRef::Integer(0x6.into())
+ ))
+ .map_err(|err| MeliError::new(err.to_string()))?;
+ let _ = self.stream.flush();
+ Ok(())
+ }
+
+ pub fn write_ref(&mut self, value_ref: &rmpv::ValueRef) -> Result<()> {
+ debug!("write_ref() {:?}", value_ref);
+ debug!(rmpv::encode::write_value_ref(&mut self.stream, value_ref))
+ .map_err(|err| MeliError::new(err.to_string()))?;
+ let _ = self.stream.flush();
+ Ok(())
+ }
+
+ pub fn read(&mut self) -> Result<rmpv::Value> {
+ debug!("read()");
+ let ret: RpcResult = debug!(rmp_serde::decode::from_read(&mut self.stream))
+ .map_err(|err| MeliError::new(err.to_string()))?;
+ let _ = self.stream.flush();
+ self.ack();
+ debug!("read() ret={:?}", &ret);
+ ret.into()
+ }
+
+ pub fn from_read<T>(&mut self) -> Result<T>
+ where
+ T: core::fmt::Debug + serde::de::DeserializeOwned,
+ {
+ debug!("from_read()");
+ let ret: Result<T> = debug!(rmp_serde::decode::from_read(&mut self.stream))
+ .map_err(|err| MeliError::new(err.to_string()));
+ let _ = self.stream.flush();
+ self.ack();
+ debug!("read() ret={:?}", &ret);
+ ret
+ }
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+#[serde(tag = "t", content = "c")]
+enum RpcResult {
+ Ok(rmpv::Value),
+ Err(String),
+}
+
+impl RpcResult {
+ fn into(self) -> Result<rmpv::Value> {
+ match self {
+ RpcResult::Ok(v) => Ok(v),
+ RpcResult::Err(err) => Err(MeliError::new(err)),
+ }
+ }
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+#[serde(tag = "t", content = "c")]
+pub enum PluginResult<T: core::fmt::Debug + Clone> {
+ Ok(T),
+ Err(String),
+}
+
+impl<T: core::fmt::Debug + Clone + serde::Serialize + serde::de::DeserializeOwned> Into<Result<T>>
+ for PluginResult<T>
+{
+ fn into(self) -> Result<T> {
+ match self {
+ PluginResult::Ok(v) => Ok(v),
+ PluginResult::Err(err) => Err(MeliError::new(err)),
+ }
+ }
+}