/*
* meli - plugins
*
* Copyright 2019 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see .
*/
use super::*;
use fnv::FnvHashMap;
use melib::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext};
use melib::backends::FolderHash;
use melib::backends::{
Backend, BackendOp, Backends, Folder, MailBackend, RefreshEvent, RefreshEventConsumer,
};
use melib::conf::AccountSettings;
use melib::email::{Envelope, EnvelopeHash, Flag};
use melib::error::{MeliError, Result};
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex, RwLock};
#[derive(Debug)]
pub struct PluginBackend {
plugin: Plugin,
child: std::process::Child,
channel: Arc>,
is_online: Arc)>>,
}
impl MailBackend for PluginBackend {
fn is_online(&self) -> Result<()> {
if let Ok(mut is_online) = self.is_online.try_lock() {
let now = std::time::Instant::now();
if now.duration_since(is_online.0) >= std::time::Duration::new(2, 0) {
let mut channel = self.channel.lock().unwrap();
channel.write_ref(&rmpv::ValueRef::Ext(BACKEND_FN, b"is_online"))?;
debug!(channel.expect_ack())?;
let ret: PluginResult<()> = debug!(channel.from_read())?;
is_online.0 = now;
is_online.1 = ret.into();
}
is_online.1.clone()
} else {
Err(MeliError::new("busy"))
}
}
fn connect(&mut self) {}
fn get(&mut self, folder: &Folder) -> Async>> {
let mut w = AsyncBuilder::new();
let folder_hash = folder.hash();
let channel = self.channel.clone();
let handle = {
let tx = w.tx();
let closure = move |_work_context| {
let mut channel = channel.lock().unwrap();
channel
.write_ref(&rmpv::ValueRef::Ext(BACKEND_FN, b"get"))
.unwrap();
channel.expect_ack().unwrap();
loop {
let read_val: Result>>> =
debug!(channel.from_read());
match read_val.map(Into::into).and_then(std::convert::identity) {
Ok(Some(a)) => {
tx.send(AsyncStatus::Payload(Ok(a
.into_iter()
.filter_map(|s| Envelope::from_bytes(s.as_bytes(), None).ok())
.collect::>())))
.unwrap();
}
Ok(None) => {
tx.send(AsyncStatus::Finished).unwrap();
return;
}
Err(err) => {
tx.send(AsyncStatus::Payload(Err(err))).unwrap();
tx.send(AsyncStatus::Finished).unwrap();
return;
}
};
}
};
Box::new(closure)
};
w.build(handle)
}
fn refresh(
&mut self,
_folder_hash: FolderHash,
_sender: RefreshEventConsumer,
) -> Result>>> {
Err(MeliError::new("Unimplemented."))
}
fn watch(
&self,
sender: RefreshEventConsumer,
work_context: WorkContext,
) -> Result {
Err(MeliError::new("Unimplemented."))
}
fn folders(&self) -> Result> {
let mut ret: FnvHashMap = Default::default();
ret.insert(0, Folder::default());
Ok(ret)
}
fn operation(&self, hash: EnvelopeHash) -> Box {
unimplemented!()
}
fn save(&self, bytes: &[u8], folder: &str, flags: Option) -> Result<()> {
Err(MeliError::new("Unimplemented."))
}
fn create_folder(&mut self, name: String) -> Result {
Err(MeliError::new("Unimplemented."))
}
fn tags(&self) -> Option>>> {
None
}
fn as_any(&self) -> &dyn::std::any::Any {
self
}
}
impl PluginBackend {
pub fn new(
listener: UnixListener,
plugin: Plugin,
_s: &AccountSettings,
_is_subscribed: Box bool>,
) -> Result> {
if plugin.kind != PluginKind::Backend {
return Err(MeliError::new(format!(
"Error: Plugin `{}` is not a mail backend plugin, it's `{:?}`",
&plugin.name, &plugin.kind
)));
}
let parts = split_command!(&plugin.executable);
let child = std::process::Command::new(&parts[0])
.args(&parts[1..])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
let (mut stream, _) = listener.accept()?;
/* send init message to plugin to register hooks */
let session = Uuid::new_v4();
let channel = RpcChannel::new(stream, &session)?;
let now = std::time::Instant::now() - std::time::Duration::from_secs(5);
Ok(Box::new(PluginBackend {
child,
plugin,
channel: Arc::new(Mutex::new(channel)),
is_online: Arc::new(Mutex::new((now, Err(MeliError::new("Unitialized"))))),
}))
}
pub fn register(listener: UnixListener, plugin: Plugin, backends: &mut Backends) {
backends.register(
plugin.name.clone(),
Backend {
create_fn: Box::new(move || {
let plugin = plugin.clone();
let listener = listener.try_clone().unwrap();
Box::new(move |f, i| {
let plugin = plugin.clone();
let listener = listener.try_clone().unwrap();
PluginBackend::new(listener, plugin, f, i)
})
}),
validate_conf_fn: Box::new(|_| Ok(())),
},
);
}
}