diff options
author | Manos Pitsidianakis <el13635@mail.ntua.gr> | 2019-09-09 12:53:39 +0300 |
---|---|---|
committer | Manos Pitsidianakis <el13635@mail.ntua.gr> | 2019-09-15 13:21:14 +0300 |
commit | 81a55abc7c106ce3dfbef8ab98b7532a722edb00 (patch) | |
tree | 320182d76bcd3da0407b8cc3c8809f606e5dfbe4 /ui | |
parent | ecb3fd7f3d5bab2a6f0f7592d2f45928d90fdf56 (diff) |
Update crossbeam to 0.7.2 and remove chan
Diffstat (limited to 'ui')
-rw-r--r-- | ui/Cargo.toml | 3 | ||||
-rw-r--r-- | ui/src/conf/accounts.rs | 10 | ||||
-rw-r--r-- | ui/src/lib.rs | 3 | ||||
-rw-r--r-- | ui/src/state.rs | 28 | ||||
-rw-r--r-- | ui/src/terminal/keys.rs | 15 | ||||
-rw-r--r-- | ui/src/workers.rs | 29 |
6 files changed, 45 insertions, 43 deletions
diff --git a/ui/Cargo.toml b/ui/Cargo.toml index 2816ff60..60364009 100644 --- a/ui/Cargo.toml +++ b/ui/Cargo.toml @@ -11,8 +11,7 @@ serde = "1.0.71" serde_derive = "1.0.71" serde_json = "1.0" toml = "0.5.3" -chan = "0.1.21" -chan-signal = "0.3.1" +crossbeam = "0.7.2" fnv = "1.0.3" # >:c linkify = "0.3.1" # >:c melib = { path = "../melib", version = "*" } diff --git a/ui/src/conf/accounts.rs b/ui/src/conf/accounts.rs index 3a0525ba..5f9bcee6 100644 --- a/ui/src/conf/accounts.rs +++ b/ui/src/conf/accounts.rs @@ -341,18 +341,18 @@ impl Account { debug!("LL"); match debug!(mailbox_handle.poll_block()) { Ok(s @ AsyncStatus::Payload(_)) => { - our_tx.send(s); + our_tx.send(s).unwrap(); debug!("notifying for {}", folder_hash); notify_fn.notify(folder_hash); } Ok(s @ AsyncStatus::Finished) => { - our_tx.send(s); + our_tx.send(s).unwrap(); notify_fn.notify(folder_hash); debug!("exiting"); return; } Ok(s) => { - our_tx.send(s); + our_tx.send(s).unwrap(); } Err(_) => { debug!("poll error"); @@ -368,7 +368,7 @@ impl Account { &mut self, event: RefreshEvent, folder_hash: FolderHash, - sender: &chan::Sender<crate::types::ThreadEvent>, + sender: &crossbeam::channel::Sender<crate::types::ThreadEvent>, ) -> Option<UIEvent> { if !self.folders[&folder_hash].is_available() { self.event_queue.push_back((folder_hash, event)); @@ -442,7 +442,7 @@ impl Account { debug!("RefreshEvent Failure: {}", e.to_string()); let sender = sender.clone(); self.watch(RefreshEventConsumer::new(Box::new(move |r| { - sender.send(crate::types::ThreadEvent::from(r)); + sender.send(crate::types::ThreadEvent::from(r)).unwrap(); }))); } } diff --git a/ui/src/lib.rs b/ui/src/lib.rs index 443d4932..d93213e1 100644 --- a/ui/src/lib.rs +++ b/ui/src/lib.rs @@ -30,9 +30,6 @@ extern crate notify_rust; extern crate text_processing; #[macro_use] extern crate serde_derive; -#[macro_use] -extern crate chan; -extern crate chan_signal; extern crate linkify; extern crate uuid; diff --git a/ui/src/state.rs b/ui/src/state.rs index 27084608..70342abf 100644 --- a/ui/src/state.rs +++ b/ui/src/state.rs @@ -31,7 +31,7 @@ Input is received in the main loop from threads which listen on the stdin for us use super::*; use melib::backends::{FolderHash, NotifyFn}; -use chan::{Receiver, Sender}; +use crossbeam::channel::{bounded, unbounded, Receiver, Sender}; use fnv::FnvHashMap; use std::env; use std::io::Write; @@ -58,10 +58,11 @@ impl InputHandler { get_events( stdin, |k| { - tx.send(ThreadEvent::Input(k)); + tx.send(ThreadEvent::Input(k)).unwrap(); }, || { - tx.send(ThreadEvent::UIEvent(UIEvent::ChangeMode(UIMode::Fork))); + tx.send(ThreadEvent::UIEvent(UIEvent::ChangeMode(UIMode::Fork))) + .unwrap(); }, &rx, ) @@ -69,7 +70,7 @@ impl InputHandler { .unwrap(); } fn kill(&self) { - self.tx.send(false); + self.tx.send(false).unwrap(); } } @@ -130,7 +131,7 @@ pub struct State { pub mode: UIMode, components: Vec<Box<dyn Component>>, pub context: Context, - threads: FnvHashMap<thread::ThreadId, (chan::Sender<bool>, thread::JoinHandle<()>)>, + threads: FnvHashMap<thread::ThreadId, (Sender<bool>, thread::JoinHandle<()>)>, work_controller: WorkController, } @@ -161,13 +162,13 @@ impl State { pub fn new() -> Self { /* Create a channel to communicate with other threads. The main process is the sole receiver. * */ - let (sender, receiver) = chan::sync(32 * ::std::mem::size_of::<ThreadEvent>()); + let (sender, receiver) = bounded(32 * ::std::mem::size_of::<ThreadEvent>()); /* * Create async channel to block the input-thread if we need to fork and stop it from reading * stdin, see get_events() for details * */ - let input_thread = chan::r#async(); + let input_thread = unbounded(); let backends = Backends::new(); let settings = Settings::new(); @@ -186,7 +187,9 @@ impl State { a_s.clone(), &backends, NotifyFn::new(Box::new(move |f: FolderHash| { - sender.send(ThreadEvent::UIEvent(UIEvent::StartupCheck(f))) + sender + .send(ThreadEvent::UIEvent(UIEvent::StartupCheck(f))) + .unwrap(); })), ) }) @@ -254,14 +257,14 @@ impl State { } let sender = s.context.sender.clone(); account.watch(RefreshEventConsumer::new(Box::new(move |r| { - sender.send(ThreadEvent::from(r)); + sender.send(ThreadEvent::from(r)).unwrap(); }))); } s.restore_input(); s } - pub fn worker_receiver(&mut self) -> chan::Receiver<bool> { + pub fn worker_receiver(&mut self) -> Receiver<bool> { self.work_controller.results_rx() } @@ -299,7 +302,7 @@ impl State { /// the thread from its list and `join` it. pub fn join(&mut self, id: thread::ThreadId) { let (tx, handle) = self.threads.remove(&id).unwrap(); - tx.send(true); + tx.send(true).unwrap(); handle.join().unwrap(); } @@ -555,7 +558,8 @@ impl State { UIEvent::ChangeMode(m) => { self.context .sender - .send(ThreadEvent::UIEvent(UIEvent::ChangeMode(m))); + .send(ThreadEvent::UIEvent(UIEvent::ChangeMode(m))) + .unwrap(); } _ => {} } diff --git a/ui/src/terminal/keys.rs b/ui/src/terminal/keys.rs index fc431870..3bf1561e 100644 --- a/ui/src/terminal/keys.rs +++ b/ui/src/terminal/keys.rs @@ -20,7 +20,7 @@ */ use super::*; -use chan; +use crossbeam::{channel::Receiver, select}; use serde::{Serialize, Serializer}; use std::fmt; use std::io; @@ -154,24 +154,23 @@ pub fn get_events( stdin: io::Stdin, mut closure: impl FnMut(Key), mut exit: impl FnMut(), - rx: &chan::Receiver<bool>, + rx: &Receiver<bool>, ) { let mut input_mode = InputMode::Normal; let mut paste_buf = String::with_capacity(256); for c in stdin.events() { - chan_select! { + select! { default => {}, - rx.recv() -> val => { - if let Some(true) = val { + recv(rx) -> val => { + if let Ok(true) = val { exit(); return; - } else if let Some(false) = val { + } else { return; } } - - }; + match c { Ok(TermionEvent::Key(k)) if input_mode == InputMode::Normal => { closure(Key::from(k)); diff --git a/ui/src/workers.rs b/ui/src/workers.rs index 1ea7f131..4bdd1325 100644 --- a/ui/src/workers.rs +++ b/ui/src/workers.rs @@ -1,4 +1,7 @@ -use chan; +use crossbeam::{ + channel::{bounded, unbounded, Receiver, Sender}, + select, +}; use melib::async_workers::Work; use std; @@ -8,13 +11,13 @@ const MAX_WORKER: usize = 4; pub struct WorkController { pub queue: WorkQueue<Work>, - thread_end_tx: chan::Sender<bool>, - results: Option<chan::Receiver<bool>>, + thread_end_tx: Sender<bool>, + results: Option<Receiver<bool>>, threads: Vec<std::thread::JoinHandle<()>>, } impl WorkController { - pub fn results_rx(&mut self) -> chan::Receiver<bool> { + pub fn results_rx(&mut self) -> Receiver<bool> { self.results.take().unwrap() } } @@ -22,7 +25,7 @@ impl WorkController { impl Drop for WorkController { fn drop(&mut self) { for _ in 0..self.threads.len() { - self.thread_end_tx.send(true); + self.thread_end_tx.send(true).unwrap(); } /* let threads = mem::replace(&mut self.threads, Vec::new()); @@ -141,7 +144,7 @@ impl<T: Send> WorkQueue<T> { // the internal VecDeque. queue.push_back(work); - self.new_jobs_tx.send(true); + self.new_jobs_tx.send(true).unwrap(); // Now return the length of the queue. queue.len() } else { @@ -152,7 +155,7 @@ impl<T: Send> WorkQueue<T> { impl WorkController { pub fn new() -> WorkController { - let (new_jobs_tx, new_jobs_rx) = chan::r#async(); + let (new_jobs_tx, new_jobs_rx) = unbounded(); // Create a new work queue to keep track of what work needs to be done. // Note that the queue is internally mutable (or, rather, the Mutex is), // but this binding doesn't need to be mutable. This isn't unsound because @@ -163,10 +166,10 @@ impl WorkController { // Create a MPSC (Multiple Producer, Single Consumer) channel. Every worker // is a producer, the main thread is a consumer; the producers put their // work into the channel when it's done. - let (results_tx, results_rx) = chan::r#async(); + let (results_tx, results_rx) = unbounded(); // Create a SyncFlag to share whether or not there are more jobs to be done. - let (thread_end_tx, thread_end_rx) = chan::sync(::std::mem::size_of::<bool>()); + let (thread_end_tx, thread_end_rx) = bounded(::std::mem::size_of::<bool>()); // This Vec will hold thread join handles to allow us to not exit while work // is still being done. These handles provide a .join() method which blocks @@ -196,12 +199,12 @@ impl WorkController { 'work_loop: loop { debug!("Waiting for work"); // Loop while there's expected to be work, looking for work. - chan_select! { - thread_end_rx.recv() -> _ => { + select! { + recv(thread_end_rx) -> _ => { debug!("received thread_end_rx, quitting"); break 'work_loop; }, - new_jobs_rx.recv() -> _ => { + recv(new_jobs_rx) -> _ => { // If work is available, do that work. while let Some(work) = thread_queue.get_work() { debug!("Got some work"); @@ -216,7 +219,7 @@ impl WorkController { // // Sending could fail. If so, there's no use in // doing any more work, so abort. - thread_results_tx.send(true); + thread_results_tx.send(true).unwrap(); // Signal to the operating system that now is a good time // to give another thread a chance to run. |