diff options
author | Manos Pitsidianakis <el13635@mail.ntua.gr> | 2019-09-09 12:53:39 +0300 |
---|---|---|
committer | Manos Pitsidianakis <el13635@mail.ntua.gr> | 2019-09-15 13:21:14 +0300 |
commit | 81a55abc7c106ce3dfbef8ab98b7532a722edb00 (patch) | |
tree | 320182d76bcd3da0407b8cc3c8809f606e5dfbe4 /melib | |
parent | ecb3fd7f3d5bab2a6f0f7592d2f45928d90fdf56 (diff) |
Update crossbeam to 0.7.2 and remove chan
Diffstat (limited to 'melib')
-rw-r--r-- | melib/Cargo.toml | 3 | ||||
-rw-r--r-- | melib/src/async_workers.rs | 48 | ||||
-rw-r--r-- | melib/src/backends/imap.rs | 8 | ||||
-rw-r--r-- | melib/src/backends/maildir/backend.rs | 21 | ||||
-rw-r--r-- | melib/src/backends/mbox.rs | 8 | ||||
-rw-r--r-- | melib/src/lib.rs | 2 |
6 files changed, 47 insertions, 43 deletions
diff --git a/melib/Cargo.toml b/melib/Cargo.toml index 9a85e4e3..c1072ec6 100644 --- a/melib/Cargo.toml +++ b/melib/Cargo.toml @@ -7,9 +7,8 @@ edition = "2018" [dependencies] bitflags = "1.0" -chan = "0.1.21" chrono = { version = "0.4", features = ["serde"] } -crossbeam = "^0.3.0" +crossbeam = "0.7.2" data-encoding = "2.1.1" encoding = "0.2.33" fnv = "1.0.3" diff --git a/melib/src/async_workers.rs b/melib/src/async_workers.rs index bc7571a6..3dfb36ef 100644 --- a/melib/src/async_workers.rs +++ b/melib/src/async_workers.rs @@ -31,7 +31,11 @@ * can be extracted with `extract`. */ -use chan; +use crossbeam::{ + bounded, + channel::{Receiver, Sender}, + select, +}; use std::fmt; use std::sync::Arc; @@ -74,16 +78,16 @@ impl<T> fmt::Debug for AsyncStatus<T> { /// A builder object for `Async<T>` #[derive(Debug, Clone)] pub struct AsyncBuilder<T: Send + Sync> { - tx: chan::Sender<AsyncStatus<T>>, - rx: chan::Receiver<AsyncStatus<T>>, + tx: Sender<AsyncStatus<T>>, + rx: Receiver<AsyncStatus<T>>, } #[derive(Clone, Debug)] pub struct Async<T: Send + Sync> { work: Work, active: bool, - tx: chan::Sender<AsyncStatus<T>>, - rx: chan::Receiver<AsyncStatus<T>>, + tx: Sender<AsyncStatus<T>>, + rx: Receiver<AsyncStatus<T>>, } impl<T: Send + Sync> Default for AsyncBuilder<T> { @@ -97,18 +101,18 @@ where T: Send + Sync, { pub fn new() -> Self { - let (sender, receiver) = chan::sync(8 * ::std::mem::size_of::<AsyncStatus<T>>()); + let (sender, receiver) = bounded(8 * ::std::mem::size_of::<AsyncStatus<T>>()); AsyncBuilder { tx: sender, rx: receiver, } } /// Returns the sender object of the promise's channel. - pub fn tx(&mut self) -> chan::Sender<AsyncStatus<T>> { + pub fn tx(&mut self) -> Sender<AsyncStatus<T>> { self.tx.clone() } /// Returns the receiver object of the promise's channel. - pub fn rx(&mut self) -> chan::Receiver<AsyncStatus<T>> { + pub fn rx(&mut self) -> Receiver<AsyncStatus<T>> { self.rx.clone() } /// Returns an `Async<T>` object that contains a `Thread` join handle that returns a `T` @@ -135,11 +139,11 @@ where } } /// Returns the sender object of the promise's channel. - pub fn tx(&mut self) -> chan::Sender<AsyncStatus<T>> { + pub fn tx(&mut self) -> Sender<AsyncStatus<T>> { self.tx.clone() } /// Returns the receiver object of the promise's channel. - pub fn rx(&mut self) -> chan::Receiver<AsyncStatus<T>> { + pub fn rx(&mut self) -> Receiver<AsyncStatus<T>> { self.rx.clone() } /// Polls worker thread and returns result. @@ -149,20 +153,20 @@ where } let rx = &self.rx; - chan_select! { - rx.recv() -> r => { + select! { + recv(rx) -> r => { match r { - Some(p @ AsyncStatus::Payload(_)) => { + Ok(p @ AsyncStatus::Payload(_)) => { return Ok(p); }, - Some(f @ AsyncStatus::Finished) => { + Ok(f @ AsyncStatus::Finished) => { self.active = false; return Ok(f); }, - Some(a) => { + Ok(a) => { return Ok(a); } - _ => { + Err(_) => { return Err(()); }, } @@ -176,23 +180,23 @@ where } let rx = &self.rx; - chan_select! { + select! { default => { return Ok(AsyncStatus::NoUpdate); }, - rx.recv() -> r => { + recv(rx) -> r => { match r { - Some(p @ AsyncStatus::Payload(_)) => { + Ok(p @ AsyncStatus::Payload(_)) => { return Ok(p); }, - Some(f @ AsyncStatus::Finished) => { + Ok(f @ AsyncStatus::Finished) => { self.active = false; return Ok(f); }, - Some(a) => { + Ok(a) => { return Ok(a); } - _ => { + Err(_) => { return Err(()); }, } diff --git a/melib/src/backends/imap.rs b/melib/src/backends/imap.rs index d68ff490..9e44c43c 100644 --- a/melib/src/backends/imap.rs +++ b/melib/src/backends/imap.rs @@ -74,7 +74,7 @@ impl MailBackend for ImapType { macro_rules! exit_on_error { ($tx:expr,$($result:expr)+) => { $(if let Err(e) = $result { - $tx.send(AsyncStatus::Payload(Err(e.into()))); + $tx.send(AsyncStatus::Payload(Err(e.into()))).unwrap(); std::process::exit(1); })+ }; @@ -147,15 +147,15 @@ impl MailBackend for ImapType { } Err(e) => { debug!(&e); - tx.send(AsyncStatus::Payload(Err(e))); + tx.send(AsyncStatus::Payload(Err(e))).unwrap(); } } exists = std::cmp::max(exists.saturating_sub(20000), 1); debug!("sending payload"); - tx.send(AsyncStatus::Payload(Ok(envelopes))); + tx.send(AsyncStatus::Payload(Ok(envelopes))).unwrap(); } drop(conn); - tx.send(AsyncStatus::Finished); + tx.send(AsyncStatus::Finished).unwrap(); }; Box::new(closure) }; diff --git a/melib/src/backends/maildir/backend.rs b/melib/src/backends/maildir/backend.rs index 46b51eec..d06bfff9 100644 --- a/melib/src/backends/maildir/backend.rs +++ b/melib/src/backends/maildir/backend.rs @@ -706,9 +706,9 @@ impl MaildirType { })?; files.push(e); } - let mut threads = Vec::with_capacity(cores); if !files.is_empty() { crossbeam::scope(|scope| { + let mut threads = Vec::with_capacity(cores); let cache_dir = cache_dir.clone(); let chunk_size = if count / cores > 0 { count / cores @@ -720,7 +720,7 @@ impl MaildirType { let tx = tx.clone(); let map = map.clone(); let root_path = root_path.clone(); - let s = scope.builder().name(name.clone()).spawn(move || { + let s = scope.builder().name(name.clone()).spawn(move |_| { let len = chunk.len(); let size = if len <= 100 { 100 } else { (len / 100) * 100 }; let mut local_r: Vec<Envelope> = @@ -789,23 +789,24 @@ impl MaildirType { continue; } } - tx.send(AsyncStatus::ProgressReport(len)); + tx.send(AsyncStatus::ProgressReport(len)).unwrap(); } local_r }); threads.push(s.unwrap()); } - }); - } - for t in threads { - let mut result = t.join(); - ret.append(&mut result); + for t in threads { + let mut result = t.join().unwrap(); + ret.append(&mut result); + } + }) + .unwrap(); } Ok(ret) }; let result = thunk(); - tx_final.send(AsyncStatus::Payload(result)); - tx_final.send(AsyncStatus::Finished); + tx_final.send(AsyncStatus::Payload(result)).unwrap(); + tx_final.send(AsyncStatus::Finished).unwrap(); }; Box::new(closure) }; diff --git a/melib/src/backends/mbox.rs b/melib/src/backends/mbox.rs index 51d6231f..a984602b 100644 --- a/melib/src/backends/mbox.rs +++ b/melib/src/backends/mbox.rs @@ -384,7 +384,8 @@ impl MailBackend for MboxType { { Ok(f) => f, Err(e) => { - tx.send(AsyncStatus::Payload(Err(MeliError::from(e)))); + tx.send(AsyncStatus::Payload(Err(MeliError::from(e)))) + .unwrap(); return; } }; @@ -392,7 +393,8 @@ impl MailBackend for MboxType { let mut buf_reader = BufReader::new(file); let mut contents = Vec::new(); if let Err(e) = buf_reader.read_to_end(&mut contents) { - tx.send(AsyncStatus::Payload(Err(MeliError::from(e)))); + tx.send(AsyncStatus::Payload(Err(MeliError::from(e)))) + .unwrap(); return; }; @@ -406,7 +408,7 @@ impl MailBackend for MboxType { .and_modify(|f| f.content = contents); } - tx.send(AsyncStatus::Payload(payload)); + tx.send(AsyncStatus::Payload(payload)).unwrap(); }; Box::new(closure) }; diff --git a/melib/src/lib.rs b/melib/src/lib.rs index d9cea92a..06d97dae 100644 --- a/melib/src/lib.rs +++ b/melib/src/lib.rs @@ -109,8 +109,6 @@ extern crate chrono; extern crate data_encoding; extern crate encoding; extern crate memmap; -#[macro_use] -extern crate chan; #[macro_use] extern crate bitflags; |