summaryrefslogtreecommitdiffstats
path: root/src/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/client.rs')
-rw-r--r--src/client.rs54
1 files changed, 27 insertions, 27 deletions
diff --git a/src/client.rs b/src/client.rs
index 8cf6bca..a1a63f7 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -1,15 +1,12 @@
-use bytes::BufMut;
+use futures::future::poll_fn;
+use futures::task::Poll;
use tokio::net::TcpStream;
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::io::*;
use tokio::sync::Mutex;
-use std::net::Shutdown;
-//use std::sync::Arc;
-//use std::time::Duration;
-
use super::events::Event;
use super::events::stream_mapper::*;
use super::commands::stream_mapper::CommandToByteMapper;
@@ -52,45 +49,48 @@ impl FlicClient {
}
}
- pub async fn register_event_handler(mut self, event: EventClosureMutex) -> Self {
+ pub async fn register_event_handler(self, event: EventClosureMutex) -> Self {
self.map.lock().await.push(event);
self
}
pub async fn listen(&self) {
- let mut buffer = vec![];
-// if let Some(size) = self.reader.lock().await.peek(&mut buffer).await.ok() {
- // if size > 0 {
- if let Some(_) = self.reader.lock().await.read_buf(&mut buffer).await.ok() {
- for b in buffer.iter() {
- match self.event_mapper.lock().await.map(*b) {
- EventResult::Some(Event::NoOp) => {}
- EventResult::Some(event) => {
- let mut map = self.map.lock().await;
- for ref mut f in &mut *map {
- f(event.clone());
+ while *self.is_running.lock().await {
+ let mut reader = self.reader.lock().await;
+ if let Ok(size) = poll_fn(|cx| {
+ let mut buf = [0; 1];
+ match reader.poll_peek(cx, &mut buf) {
+ Poll::Pending => Poll::Ready(Ok(0_usize)),
+ Poll::Ready(all) => Poll::Ready(all),
+ }
+ }).await{
+ if size > 0 {
+ let mut buffer = vec![];
+ if let Some(_) = reader.read_buf(&mut buffer).await.ok() {
+ for b in buffer.iter() {
+ match self.event_mapper.lock().await.map(*b) {
+ EventResult::Some(Event::NoOp) => {}
+ EventResult::Some(event) => {
+ let mut map = self.map.lock().await;
+ for ref mut f in &mut *map {
+ f(event.clone());
+ }
}
+ _ => {}
}
- _ => {}
}
}
- // }
- // }
+ }
+ }
}
}
- pub async fn is_running(&self) -> bool {
- return *self.is_running.lock().await
- }
pub async fn stop(&self) {
*self.is_running.lock().await = false;
- //self.reader.lock().await.shutdown();
- //self.writer.lock().await.shutdown();
}
pub async fn submit(&self, cmd: Command) {
let mut writer = self.writer.lock().await;
for b in self.command_mapper.lock().await.map(cmd) {
- writer.write_u8(b).await;
- println!("{:?}", b);
+ let _ = writer.write_u8(b).await;
}
}
}