summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorManos Pitsidianakis <el13635@mail.ntua.gr>2019-08-25 10:46:25 +0300
committerManos Pitsidianakis <el13635@mail.ntua.gr>2019-09-15 13:21:13 +0300
commitc9f7b41e4720b2aef0c68f5c6e3361a859c1f4e3 (patch)
treee388b3168f0b37fabad6cd48fa03243183038168
parentc561814cd64d55e379e65263bfdff4bf17a5b907 (diff)
imap: continuous payload delivery in async workers
-rw-r--r--melib/src/async_workers.rs115
-rw-r--r--melib/src/backends/imap.rs24
-rw-r--r--melib/src/backends/imap/operations.rs4
-rw-r--r--melib/src/backends/maildir/backend.rs1
-rw-r--r--ui/src/conf/accounts.rs58
5 files changed, 87 insertions, 115 deletions
diff --git a/melib/src/async_workers.rs b/melib/src/async_workers.rs
index fc0a16fe..bc7571a6 100644
--- a/melib/src/async_workers.rs
+++ b/melib/src/async_workers.rs
@@ -72,36 +72,20 @@ impl<T> fmt::Debug for AsyncStatus<T> {
}
/// A builder object for `Async<T>`
-#[derive(Clone)]
+#[derive(Debug, Clone)]
pub struct AsyncBuilder<T: Send + Sync> {
- payload_hook: Option<Arc<Fn() -> () + Send + Sync>>,
tx: chan::Sender<AsyncStatus<T>>,
rx: chan::Receiver<AsyncStatus<T>>,
}
-#[derive(Clone)]
+#[derive(Clone, Debug)]
pub struct Async<T: Send + Sync> {
- pub value: Option<T>,
work: Work,
active: bool,
- payload_hook: Option<Arc<dyn Fn() -> () + Send + Sync>>,
- link: Option<T>,
tx: chan::Sender<AsyncStatus<T>>,
rx: chan::Receiver<AsyncStatus<T>>,
}
-impl<T: Send + Sync> std::fmt::Debug for Async<T> {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(
- f,
- "Async<{}> {{ active: {}, payload_hook: {} }}",
- stringify!(T),
- self.active,
- self.payload_hook.is_some()
- )
- }
-}
-
impl<T: Send + Sync> Default for AsyncBuilder<T> {
fn default() -> Self {
AsyncBuilder::<T>::new()
@@ -117,7 +101,6 @@ where
AsyncBuilder {
tx: sender,
rx: receiver,
- payload_hook: None,
}
}
/// Returns the sender object of the promise's channel.
@@ -132,32 +115,17 @@ where
pub fn build(self, work: Box<dyn Fn() -> () + Send + Sync>) -> Async<T> {
Async {
work: Work(Arc::new(work)),
- value: None,
tx: self.tx,
rx: self.rx,
- link: None,
- payload_hook: None,
active: false,
}
}
-
- pub fn add_payload_hook(
- &mut self,
- payload_hook: Option<Arc<dyn Fn() -> () + Send + Sync>>,
- ) -> &mut Self {
- self.payload_hook = payload_hook;
- self
- }
}
impl<T> Async<T>
where
T: Send + Sync,
{
- /// Consumes `self` and returns the computed value. Will panic if computation hasn't finished.
- pub fn extract(self) -> T {
- self.value.unwrap()
- }
pub fn work(&mut self) -> Option<Work> {
if !self.active {
self.active = true;
@@ -175,21 +143,21 @@ where
self.rx.clone()
}
/// Polls worker thread and returns result.
- pub fn poll(&mut self) -> Result<AsyncStatus<T>, ()> {
- if self.value.is_some() {
+ pub fn poll_block(&mut self) -> Result<AsyncStatus<T>, ()> {
+ if !self.active {
return Ok(AsyncStatus::Finished);
}
- //self.tx.send(true);
+
let rx = &self.rx;
- let result: T;
chan_select! {
- default => {
- return Ok(AsyncStatus::NoUpdate);
- },
rx.recv() -> r => {
match r {
- Some(AsyncStatus::Payload(payload)) => {
- result = payload;
+ Some(p @ AsyncStatus::Payload(_)) => {
+ return Ok(p);
+ },
+ Some(f @ AsyncStatus::Finished) => {
+ self.active = false;
+ return Ok(f);
},
Some(a) => {
return Ok(a);
@@ -200,46 +168,35 @@ where
}
},
};
- self.value = Some(result);
- if let Some(hook) = self.payload_hook.as_ref() {
- hook();
+ }
+ /// Polls worker thread and returns result.
+ pub fn poll(&mut self) -> Result<AsyncStatus<T>, ()> {
+ if !self.active {
+ return Ok(AsyncStatus::Finished);
}
- Ok(AsyncStatus::Finished)
- }
- /// Blocks until thread joins.
- pub fn join(&mut self) {
- let result: T;
let rx = &self.rx;
- loop {
- chan_select! {
- rx.recv() -> r => {
- match r {
- Some(AsyncStatus::Payload(payload)) => {
- result = payload;
- break;
- },
- _ => continue,
+ chan_select! {
+ default => {
+ return Ok(AsyncStatus::NoUpdate);
+ },
+ rx.recv() -> r => {
+ match r {
+ Some(p @ AsyncStatus::Payload(_)) => {
+ return Ok(p);
+ },
+ Some(f @ AsyncStatus::Finished) => {
+ self.active = false;
+ return Ok(f);
+ },
+ Some(a) => {
+ return Ok(a);
}
+ _ => {
+ return Err(());
+ },
}
-
- }
- }
- self.value = Some(result);
- }
-
- pub fn link(&mut self, other: Async<T>) -> &mut Self {
- let Async {
- rx,
- tx,
- work,
- value,
- ..
- } = other;
- self.rx = rx;
- self.tx = tx;
- self.work = work;
- self.value = value;
- self
+ },
+ };
}
}
diff --git a/melib/src/backends/imap.rs b/melib/src/backends/imap.rs
index 36b1e51d..7307336c 100644
--- a/melib/src/backends/imap.rs
+++ b/melib/src/backends/imap.rs
@@ -569,6 +569,18 @@ macro_rules! get_conf_val {
};
}
+macro_rules! exit_on_error {
+ ($s:ident, $($result:expr)+) => {
+ $(if let Err(e) = $result {
+ eprintln!(
+ "IMAP error ({}): {}",
+ $s.name.as_str(),
+ e.to_string(),
+ );
+ std::process::exit(1);
+ })+
+ };
+}
impl ImapType {
pub fn new(s: &AccountSettings) -> Self {
use std::io::prelude::*;
@@ -591,11 +603,15 @@ impl ImapType {
std::process::exit(1);
};
- let mut socket = TcpStream::connect(&addr).unwrap();
+ let mut socket = TcpStream::connect(&addr);
let cmd_id = 0;
- socket
- .write_all(format!("M{} STARTTLS\r\n", cmd_id).as_bytes())
- .unwrap();
+ exit_on_error!(
+ s,
+ socket
+ socket.as_mut().unwrap().write_all(format!("M{} STARTTLS\r\n", cmd_id).as_bytes())
+ );
+ let mut socket = socket.unwrap();
+ // FIXME handle response properly
let mut buf = vec![0; 1024];
let mut response = String::with_capacity(1024);
let mut cap_flag = false;
diff --git a/melib/src/backends/imap/operations.rs b/melib/src/backends/imap/operations.rs
index c653a19f..266bb14d 100644
--- a/melib/src/backends/imap/operations.rs
+++ b/melib/src/backends/imap/operations.rs
@@ -63,8 +63,8 @@ impl BackendOp for ImapOp {
let mut response = String::with_capacity(8 * 1024);
{
let mut conn = self.connection.lock().unwrap();
- conn.send_command(format!("SELECT {}", self.folder_path).as_bytes());
- conn.read_response(&mut response);
+ conn.send_command(format!("SELECT {}", self.folder_path).as_bytes())?;
+ conn.read_response(&mut response)?;
conn.send_command(format!("UID FETCH {} (FLAGS RFC822)", self.uid).as_bytes())?;
conn.read_response(&mut response)?;
}
diff --git a/melib/src/backends/maildir/backend.rs b/melib/src/backends/maildir/backend.rs
index 615f1cea..60b7d263 100644
--- a/melib/src/backends/maildir/backend.rs
+++ b/melib/src/backends/maildir/backend.rs
@@ -804,6 +804,7 @@ impl MaildirType {
};
let result = thunk();
tx_final.send(AsyncStatus::Payload(result));
+ tx_final.send(AsyncStatus::Finished);
};
Box::new(closure)
};
diff --git a/ui/src/conf/accounts.rs b/ui/src/conf/accounts.rs
index 77002ac1..29ced12d 100644
--- a/ui/src/conf/accounts.rs
+++ b/ui/src/conf/accounts.rs
@@ -221,6 +221,7 @@ impl Account {
let mut stack: StackVec<FolderHash> = StackVec::new();
let mut tree: Vec<FolderNode> = Vec::new();
+ let mut collection: Collection = Collection::new(Default::default());
for (h, f) in ref_folders.iter() {
if !settings.folder_confs.contains_key(f.path())
|| settings.folder_confs[f.path()].subscribe.is_false()
@@ -258,6 +259,7 @@ impl Account {
*h,
Account::new_worker(f.clone(), &mut backend, notify_fn.clone()),
);
+ collection.threads.insert(*h, Threads::default());
}
tree.sort_unstable_by_key(|f| ref_folders[&f.hash].path());
@@ -298,7 +300,7 @@ impl Account {
tree,
address_book,
sent_folder,
- collection: Collection::new(Default::default()),
+ collection,
workers,
settings: settings.clone(),
runtime_settings: settings,
@@ -320,37 +322,39 @@ impl Account {
let w = builder.build(Box::new(move || {
let mut mailbox_handle = mailbox_handle.clone();
let work = mailbox_handle.work().unwrap();
- let rx = mailbox_handle.rx();
- let tx = mailbox_handle.tx();
-
+ debug!("AA");
std::thread::Builder::new()
.spawn(move || {
+ debug!("A");
work.compute();
+ debug!("B");
})
.unwrap();
+ debug!("BB");
loop {
- debug!("looping");
- chan_select! {
- rx.recv() -> r => {
- debug!("got {:?}", r);
- match r {
- Some(s @ AsyncStatus::Payload(_)) => {
- our_tx.send(s);
- debug!("notifying for {}", folder_hash);
- notify_fn.notify(folder_hash);
- }
- Some(AsyncStatus::Finished) => {
- debug!("exiting");
- return;
- }
- Some(s) => {
- our_tx.send(s);
- }
- None => return,
- }
+ debug!("LL");
+ match debug!(mailbox_handle.poll_block()) {
+ Ok(s @ AsyncStatus::Payload(_)) => {
+ our_tx.send(s);
+ debug!("notifying for {}", folder_hash);
+ notify_fn.notify(folder_hash);
+ }
+ Ok(s @ AsyncStatus::Finished) => {
+ our_tx.send(s);
+ notify_fn.notify(folder_hash);
+ debug!("exiting");
+ return;
+ }
+ Ok(s) => {
+ our_tx.send(s);
+ }
+ Err(_) => {
+ debug!("poll error");
+ return;
}
}
+ debug!("DD");
}
}));
Some(w)
@@ -535,7 +539,7 @@ impl Account {
debug!("got payload in status for {}", folder_hash);
self.load_mailbox(folder_hash, envs);
}
- Ok(AsyncStatus::Finished) if w.value.is_none() => {
+ Ok(AsyncStatus::Finished) => {
debug!("got finished in status for {}", folder_hash);
self.folders.entry(folder_hash).and_modify(|f| {
let m = if let MailboxEntry::Parsing(m, _, _) = f {
@@ -548,11 +552,6 @@ impl Account {
self.workers.insert(folder_hash, None);
}
- Ok(AsyncStatus::Finished) if w.value.is_some() => {
- let envs = w.value.take().unwrap();
- debug!("got payload in status for {}", folder_hash);
- self.load_mailbox(folder_hash, envs);
- }
Ok(AsyncStatus::ProgressReport(n)) => {
self.folders.entry(folder_hash).and_modify(|f| {
if let MailboxEntry::Parsing(_, ref mut d, _) = f {
@@ -565,7 +564,6 @@ impl Account {
//return Err(0);
}
},
- Some(_) => return Ok(()),
};
if self.folders[&folder_hash].is_available()
|| (self.folders[&folder_hash].is_parsing()