summaryrefslogtreecommitdiffstats
path: root/melib
diff options
context:
space:
mode:
authorManos Pitsidianakis <el13635@mail.ntua.gr>2019-08-25 10:46:25 +0300
committerManos Pitsidianakis <el13635@mail.ntua.gr>2019-09-15 13:21:13 +0300
commitc9f7b41e4720b2aef0c68f5c6e3361a859c1f4e3 (patch)
treee388b3168f0b37fabad6cd48fa03243183038168 /melib
parentc561814cd64d55e379e65263bfdff4bf17a5b907 (diff)
imap: continuous payload delivery in async workers
Diffstat (limited to 'melib')
-rw-r--r--melib/src/async_workers.rs115
-rw-r--r--melib/src/backends/imap.rs24
-rw-r--r--melib/src/backends/imap/operations.rs4
-rw-r--r--melib/src/backends/maildir/backend.rs1
4 files changed, 59 insertions, 85 deletions
diff --git a/melib/src/async_workers.rs b/melib/src/async_workers.rs
index fc0a16fe..bc7571a6 100644
--- a/melib/src/async_workers.rs
+++ b/melib/src/async_workers.rs
@@ -72,36 +72,20 @@ impl<T> fmt::Debug for AsyncStatus<T> {
}
/// A builder object for `Async<T>`
-#[derive(Clone)]
+#[derive(Debug, Clone)]
pub struct AsyncBuilder<T: Send + Sync> {
- payload_hook: Option<Arc<Fn() -> () + Send + Sync>>,
tx: chan::Sender<AsyncStatus<T>>,
rx: chan::Receiver<AsyncStatus<T>>,
}
-#[derive(Clone)]
+#[derive(Clone, Debug)]
pub struct Async<T: Send + Sync> {
- pub value: Option<T>,
work: Work,
active: bool,
- payload_hook: Option<Arc<dyn Fn() -> () + Send + Sync>>,
- link: Option<T>,
tx: chan::Sender<AsyncStatus<T>>,
rx: chan::Receiver<AsyncStatus<T>>,
}
-impl<T: Send + Sync> std::fmt::Debug for Async<T> {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(
- f,
- "Async<{}> {{ active: {}, payload_hook: {} }}",
- stringify!(T),
- self.active,
- self.payload_hook.is_some()
- )
- }
-}
-
impl<T: Send + Sync> Default for AsyncBuilder<T> {
fn default() -> Self {
AsyncBuilder::<T>::new()
@@ -117,7 +101,6 @@ where
AsyncBuilder {
tx: sender,
rx: receiver,
- payload_hook: None,
}
}
/// Returns the sender object of the promise's channel.
@@ -132,32 +115,17 @@ where
pub fn build(self, work: Box<dyn Fn() -> () + Send + Sync>) -> Async<T> {
Async {
work: Work(Arc::new(work)),
- value: None,
tx: self.tx,
rx: self.rx,
- link: None,
- payload_hook: None,
active: false,
}
}
-
- pub fn add_payload_hook(
- &mut self,
- payload_hook: Option<Arc<dyn Fn() -> () + Send + Sync>>,
- ) -> &mut Self {
- self.payload_hook = payload_hook;
- self
- }
}
impl<T> Async<T>
where
T: Send + Sync,
{
- /// Consumes `self` and returns the computed value. Will panic if computation hasn't finished.
- pub fn extract(self) -> T {
- self.value.unwrap()
- }
pub fn work(&mut self) -> Option<Work> {
if !self.active {
self.active = true;
@@ -175,21 +143,21 @@ where
self.rx.clone()
}
/// Polls worker thread and returns result.
- pub fn poll(&mut self) -> Result<AsyncStatus<T>, ()> {
- if self.value.is_some() {
+ pub fn poll_block(&mut self) -> Result<AsyncStatus<T>, ()> {
+ if !self.active {
return Ok(AsyncStatus::Finished);
}
- //self.tx.send(true);
+
let rx = &self.rx;
- let result: T;
chan_select! {
- default => {
- return Ok(AsyncStatus::NoUpdate);
- },
rx.recv() -> r => {
match r {
- Some(AsyncStatus::Payload(payload)) => {
- result = payload;
+ Some(p @ AsyncStatus::Payload(_)) => {
+ return Ok(p);
+ },
+ Some(f @ AsyncStatus::Finished) => {
+ self.active = false;
+ return Ok(f);
},
Some(a) => {
return Ok(a);
@@ -200,46 +168,35 @@ where
}
},
};
- self.value = Some(result);
- if let Some(hook) = self.payload_hook.as_ref() {
- hook();
+ }
+ /// Polls worker thread and returns result.
+ pub fn poll(&mut self) -> Result<AsyncStatus<T>, ()> {
+ if !self.active {
+ return Ok(AsyncStatus::Finished);
}
- Ok(AsyncStatus::Finished)
- }
- /// Blocks until thread joins.
- pub fn join(&mut self) {
- let result: T;
let rx = &self.rx;
- loop {
- chan_select! {
- rx.recv() -> r => {
- match r {
- Some(AsyncStatus::Payload(payload)) => {
- result = payload;
- break;
- },
- _ => continue,
+ chan_select! {
+ default => {
+ return Ok(AsyncStatus::NoUpdate);
+ },
+ rx.recv() -> r => {
+ match r {
+ Some(p @ AsyncStatus::Payload(_)) => {
+ return Ok(p);
+ },
+ Some(f @ AsyncStatus::Finished) => {
+ self.active = false;
+ return Ok(f);
+ },
+ Some(a) => {
+ return Ok(a);
}
+ _ => {
+ return Err(());
+ },
}
-
- }
- }
- self.value = Some(result);
- }
-
- pub fn link(&mut self, other: Async<T>) -> &mut Self {
- let Async {
- rx,
- tx,
- work,
- value,
- ..
- } = other;
- self.rx = rx;
- self.tx = tx;
- self.work = work;
- self.value = value;
- self
+ },
+ };
}
}
diff --git a/melib/src/backends/imap.rs b/melib/src/backends/imap.rs
index 36b1e51d..7307336c 100644
--- a/melib/src/backends/imap.rs
+++ b/melib/src/backends/imap.rs
@@ -569,6 +569,18 @@ macro_rules! get_conf_val {
};
}
+macro_rules! exit_on_error {
+ ($s:ident, $($result:expr)+) => {
+ $(if let Err(e) = $result {
+ eprintln!(
+ "IMAP error ({}): {}",
+ $s.name.as_str(),
+ e.to_string(),
+ );
+ std::process::exit(1);
+ })+
+ };
+}
impl ImapType {
pub fn new(s: &AccountSettings) -> Self {
use std::io::prelude::*;
@@ -591,11 +603,15 @@ impl ImapType {
std::process::exit(1);
};
- let mut socket = TcpStream::connect(&addr).unwrap();
+ let mut socket = TcpStream::connect(&addr);
let cmd_id = 0;
- socket
- .write_all(format!("M{} STARTTLS\r\n", cmd_id).as_bytes())
- .unwrap();
+ exit_on_error!(
+ s,
+ socket
+ socket.as_mut().unwrap().write_all(format!("M{} STARTTLS\r\n", cmd_id).as_bytes())
+ );
+ let mut socket = socket.unwrap();
+ // FIXME handle response properly
let mut buf = vec![0; 1024];
let mut response = String::with_capacity(1024);
let mut cap_flag = false;
diff --git a/melib/src/backends/imap/operations.rs b/melib/src/backends/imap/operations.rs
index c653a19f..266bb14d 100644
--- a/melib/src/backends/imap/operations.rs
+++ b/melib/src/backends/imap/operations.rs
@@ -63,8 +63,8 @@ impl BackendOp for ImapOp {
let mut response = String::with_capacity(8 * 1024);
{
let mut conn = self.connection.lock().unwrap();
- conn.send_command(format!("SELECT {}", self.folder_path).as_bytes());
- conn.read_response(&mut response);
+ conn.send_command(format!("SELECT {}", self.folder_path).as_bytes())?;
+ conn.read_response(&mut response)?;
conn.send_command(format!("UID FETCH {} (FLAGS RFC822)", self.uid).as_bytes())?;
conn.read_response(&mut response)?;
}
diff --git a/melib/src/backends/maildir/backend.rs b/melib/src/backends/maildir/backend.rs
index 615f1cea..60b7d263 100644
--- a/melib/src/backends/maildir/backend.rs
+++ b/melib/src/backends/maildir/backend.rs
@@ -804,6 +804,7 @@ impl MaildirType {
};
let result = thunk();
tx_final.send(AsyncStatus::Payload(result));
+ tx_final.send(AsyncStatus::Finished);
};
Box::new(closure)
};