diff options
author | Manos Pitsidianakis <el13635@mail.ntua.gr> | 2018-10-14 19:49:16 +0300 |
---|---|---|
committer | Manos Pitsidianakis <el13635@mail.ntua.gr> | 2019-06-10 19:40:36 +0300 |
commit | 5a283200047f00d45d894b2873c583be7f00ad8b (patch) | |
tree | 536ff537b4cc26d96816e0ee70b9e2be9b214a5f /melib/src/async.rs | |
parent | 21a918e4c0a20c2c7dce594979f4419e06ee16b1 (diff) |
WIP
Diffstat (limited to 'melib/src/async.rs')
-rw-r--r-- | melib/src/async.rs | 115 |
1 files changed, 88 insertions, 27 deletions
diff --git a/melib/src/async.rs b/melib/src/async.rs index 0235d93f..41c2c613 100644 --- a/melib/src/async.rs +++ b/melib/src/async.rs @@ -32,61 +32,94 @@ */ use chan; -use std::thread; +use std::fmt; +use std::sync::Arc; + +#[derive(Clone)] +pub struct Work(pub Arc<Box<dyn Fn() -> ()>>); + +impl Work { + pub fn compute(&self) { + (self.0)(); + } +} + +impl fmt::Debug for Work { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Work object") + } +} + +unsafe impl Send for Work {} +unsafe impl Sync for Work {} /// Messages to pass between `Async<T>` owner and its worker thread. -#[derive(Debug)] -pub enum AsyncStatus { +#[derive(Clone)] +pub enum AsyncStatus<T> { NoUpdate, + Payload(T), Finished, ///The number may hold whatever meaning the user chooses. ProgressReport(usize), } +impl<T> fmt::Debug for AsyncStatus<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + AsyncStatus::NoUpdate => write!(f, "AsyncStatus<T>::NoUpdate"), + AsyncStatus::Payload(_) => write!(f, "AsyncStatus<T>::Payload(_)"), + AsyncStatus::Finished => write!(f, "AsyncStatus<T>::Finished"), + AsyncStatus::ProgressReport(u) => write!(f, "AsyncStatus<T>::ProgressReport({})", u), + } + } +} + /// A builder object for `Async<T>` -#[derive(Debug)] -pub struct AsyncBuilder { - tx: chan::Sender<AsyncStatus>, - rx: chan::Receiver<AsyncStatus>, +#[derive(Debug, Clone)] +pub struct AsyncBuilder<T> { + tx: chan::Sender<AsyncStatus<T>>, + rx: chan::Receiver<AsyncStatus<T>>, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Async<T> { value: Option<T>, - worker: Option<thread::JoinHandle<T>>, - tx: chan::Sender<AsyncStatus>, - rx: chan::Receiver<AsyncStatus>, + work: Work, + active: bool, + tx: chan::Sender<AsyncStatus<T>>, + rx: chan::Receiver<AsyncStatus<T>>, } -impl Default for AsyncBuilder { +impl<T> Default for AsyncBuilder<T> { fn default() -> Self { - AsyncBuilder::new() + AsyncBuilder::<T>::new() } } -impl AsyncBuilder { +impl<T> AsyncBuilder<T> { pub fn new() -> Self { - let (sender, receiver) = chan::sync(::std::mem::size_of::<AsyncStatus>()); + let (sender, receiver) = chan::sync(8 * ::std::mem::size_of::<AsyncStatus<T>>()); AsyncBuilder { tx: sender, rx: receiver, } } /// Returns the sender object of the promise's channel. - pub fn tx(&mut self) -> chan::Sender<AsyncStatus> { + pub fn tx(&mut self) -> chan::Sender<AsyncStatus<T>> { self.tx.clone() } /// Returns the receiver object of the promise's channel. - pub fn rx(&mut self) -> chan::Receiver<AsyncStatus> { + pub fn rx(&mut self) -> chan::Receiver<AsyncStatus<T>> { self.rx.clone() } /// Returns an `Async<T>` object that contains a `Thread` join handle that returns a `T` - pub fn build<T: Clone>(self, worker: thread::JoinHandle<T>) -> Async<T> { + pub fn build(self, work: Box<dyn Fn() -> ()>) -> Async<T> { Async { - worker: Some(worker), + work: Work(Arc::new(work)), value: None, tx: self.tx, rx: self.rx, + active: false, } } } @@ -96,20 +129,34 @@ impl<T> Async<T> { pub fn extract(self) -> T { self.value.unwrap() } + pub fn work(&mut self) -> Option<Work> { + if !self.active { + self.active = true; + Some(self.work.clone()) + } else { + None + } + } + /// Returns the sender object of the promise's channel. + pub fn tx(&mut self) -> chan::Sender<AsyncStatus<T>> { + self.tx.clone() + } /// Polls worker thread and returns result. - pub fn poll(&mut self) -> Result<AsyncStatus, ()> { + pub fn poll(&mut self) -> Result<AsyncStatus<T>, ()> { if self.value.is_some() { return Ok(AsyncStatus::Finished); } //self.tx.send(true); let rx = &self.rx; + let result: T; chan_select! { default => { return Ok(AsyncStatus::NoUpdate); }, rx.recv() -> r => { match r { - Some(AsyncStatus::Finished) => { + Some(AsyncStatus::Payload(payload)) => { + result = payload; }, Some(a) => { return Ok(a); @@ -118,15 +165,29 @@ impl<T> Async<T> { return Err(()); }, } - }, - } - let v = self.worker.take().unwrap().join().unwrap(); - self.value = Some(v); + }; + self.value = Some(result); Ok(AsyncStatus::Finished) } /// Blocks until thread joins. - pub fn join(mut self) -> T { - self.worker.take().unwrap().join().unwrap() + pub fn join(&mut self) { + let result: T; + let rx = &self.rx; + loop { + chan_select! { + rx.recv() -> r => { + match r { + Some(AsyncStatus::Payload(payload)) => { + result = payload; + break; + }, + _ => continue, + } + } + + } + } + self.value = Some(result); } } |