summaryrefslogtreecommitdiffstats
path: root/melib/src/async.rs
diff options
context:
space:
mode:
authorManos Pitsidianakis <el13635@mail.ntua.gr>2018-10-14 19:49:16 +0300
committerManos Pitsidianakis <el13635@mail.ntua.gr>2019-06-10 19:40:36 +0300
commit5a283200047f00d45d894b2873c583be7f00ad8b (patch)
tree536ff537b4cc26d96816e0ee70b9e2be9b214a5f /melib/src/async.rs
parent21a918e4c0a20c2c7dce594979f4419e06ee16b1 (diff)
WIP
Diffstat (limited to 'melib/src/async.rs')
-rw-r--r--melib/src/async.rs115
1 files changed, 88 insertions, 27 deletions
diff --git a/melib/src/async.rs b/melib/src/async.rs
index 0235d93f..41c2c613 100644
--- a/melib/src/async.rs
+++ b/melib/src/async.rs
@@ -32,61 +32,94 @@
*/
use chan;
-use std::thread;
+use std::fmt;
+use std::sync::Arc;
+
+#[derive(Clone)]
+pub struct Work(pub Arc<Box<dyn Fn() -> ()>>);
+
+impl Work {
+ pub fn compute(&self) {
+ (self.0)();
+ }
+}
+
+impl fmt::Debug for Work {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "Work object")
+ }
+}
+
+unsafe impl Send for Work {}
+unsafe impl Sync for Work {}
/// Messages to pass between `Async<T>` owner and its worker thread.
-#[derive(Debug)]
-pub enum AsyncStatus {
+#[derive(Clone)]
+pub enum AsyncStatus<T> {
NoUpdate,
+ Payload(T),
Finished,
///The number may hold whatever meaning the user chooses.
ProgressReport(usize),
}
+impl<T> fmt::Debug for AsyncStatus<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match self {
+ AsyncStatus::NoUpdate => write!(f, "AsyncStatus<T>::NoUpdate"),
+ AsyncStatus::Payload(_) => write!(f, "AsyncStatus<T>::Payload(_)"),
+ AsyncStatus::Finished => write!(f, "AsyncStatus<T>::Finished"),
+ AsyncStatus::ProgressReport(u) => write!(f, "AsyncStatus<T>::ProgressReport({})", u),
+ }
+ }
+}
+
/// A builder object for `Async<T>`
-#[derive(Debug)]
-pub struct AsyncBuilder {
- tx: chan::Sender<AsyncStatus>,
- rx: chan::Receiver<AsyncStatus>,
+#[derive(Debug, Clone)]
+pub struct AsyncBuilder<T> {
+ tx: chan::Sender<AsyncStatus<T>>,
+ rx: chan::Receiver<AsyncStatus<T>>,
}
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct Async<T> {
value: Option<T>,
- worker: Option<thread::JoinHandle<T>>,
- tx: chan::Sender<AsyncStatus>,
- rx: chan::Receiver<AsyncStatus>,
+ work: Work,
+ active: bool,
+ tx: chan::Sender<AsyncStatus<T>>,
+ rx: chan::Receiver<AsyncStatus<T>>,
}
-impl Default for AsyncBuilder {
+impl<T> Default for AsyncBuilder<T> {
fn default() -> Self {
- AsyncBuilder::new()
+ AsyncBuilder::<T>::new()
}
}
-impl AsyncBuilder {
+impl<T> AsyncBuilder<T> {
pub fn new() -> Self {
- let (sender, receiver) = chan::sync(::std::mem::size_of::<AsyncStatus>());
+ let (sender, receiver) = chan::sync(8 * ::std::mem::size_of::<AsyncStatus<T>>());
AsyncBuilder {
tx: sender,
rx: receiver,
}
}
/// Returns the sender object of the promise's channel.
- pub fn tx(&mut self) -> chan::Sender<AsyncStatus> {
+ pub fn tx(&mut self) -> chan::Sender<AsyncStatus<T>> {
self.tx.clone()
}
/// Returns the receiver object of the promise's channel.
- pub fn rx(&mut self) -> chan::Receiver<AsyncStatus> {
+ pub fn rx(&mut self) -> chan::Receiver<AsyncStatus<T>> {
self.rx.clone()
}
/// Returns an `Async<T>` object that contains a `Thread` join handle that returns a `T`
- pub fn build<T: Clone>(self, worker: thread::JoinHandle<T>) -> Async<T> {
+ pub fn build(self, work: Box<dyn Fn() -> ()>) -> Async<T> {
Async {
- worker: Some(worker),
+ work: Work(Arc::new(work)),
value: None,
tx: self.tx,
rx: self.rx,
+ active: false,
}
}
}
@@ -96,20 +129,34 @@ impl<T> Async<T> {
pub fn extract(self) -> T {
self.value.unwrap()
}
+ pub fn work(&mut self) -> Option<Work> {
+ if !self.active {
+ self.active = true;
+ Some(self.work.clone())
+ } else {
+ None
+ }
+ }
+ /// Returns the sender object of the promise's channel.
+ pub fn tx(&mut self) -> chan::Sender<AsyncStatus<T>> {
+ self.tx.clone()
+ }
/// Polls worker thread and returns result.
- pub fn poll(&mut self) -> Result<AsyncStatus, ()> {
+ pub fn poll(&mut self) -> Result<AsyncStatus<T>, ()> {
if self.value.is_some() {
return Ok(AsyncStatus::Finished);
}
//self.tx.send(true);
let rx = &self.rx;
+ let result: T;
chan_select! {
default => {
return Ok(AsyncStatus::NoUpdate);
},
rx.recv() -> r => {
match r {
- Some(AsyncStatus::Finished) => {
+ Some(AsyncStatus::Payload(payload)) => {
+ result = payload;
},
Some(a) => {
return Ok(a);
@@ -118,15 +165,29 @@ impl<T> Async<T> {
return Err(());
},
}
-
},
- }
- let v = self.worker.take().unwrap().join().unwrap();
- self.value = Some(v);
+ };
+ self.value = Some(result);
Ok(AsyncStatus::Finished)
}
/// Blocks until thread joins.
- pub fn join(mut self) -> T {
- self.worker.take().unwrap().join().unwrap()
+ pub fn join(&mut self) {
+ let result: T;
+ let rx = &self.rx;
+ loop {
+ chan_select! {
+ rx.recv() -> r => {
+ match r {
+ Some(AsyncStatus::Payload(payload)) => {
+ result = payload;
+ break;
+ },
+ _ => continue,
+ }
+ }
+
+ }
+ }
+ self.value = Some(result);
}
}