From bfc63947702ffb3ca1dca6e8184f0998fb98ed13 Mon Sep 17 00:00:00 2001 From: Romeo Disca Date: Tue, 18 Aug 2020 00:11:09 +0200 Subject: chore: non-blocking read stream --- src/client.rs | 54 +++++++++++++++++++++++++++--------------------------- 1 file changed, 27 insertions(+), 27 deletions(-) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index 8cf6bca..a1a63f7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,15 +1,12 @@ -use bytes::BufMut; +use futures::future::poll_fn; +use futures::task::Poll; use tokio::net::TcpStream; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::io::*; use tokio::sync::Mutex; -use std::net::Shutdown; -//use std::sync::Arc; -//use std::time::Duration; - use super::events::Event; use super::events::stream_mapper::*; use super::commands::stream_mapper::CommandToByteMapper; @@ -52,45 +49,48 @@ impl FlicClient { } } - pub async fn register_event_handler(mut self, event: EventClosureMutex) -> Self { + pub async fn register_event_handler(self, event: EventClosureMutex) -> Self { self.map.lock().await.push(event); self } pub async fn listen(&self) { - let mut buffer = vec![]; -// if let Some(size) = self.reader.lock().await.peek(&mut buffer).await.ok() { - // if size > 0 { - if let Some(_) = self.reader.lock().await.read_buf(&mut buffer).await.ok() { - for b in buffer.iter() { - match self.event_mapper.lock().await.map(*b) { - EventResult::Some(Event::NoOp) => {} - EventResult::Some(event) => { - let mut map = self.map.lock().await; - for ref mut f in &mut *map { - f(event.clone()); + while *self.is_running.lock().await { + let mut reader = self.reader.lock().await; + if let Ok(size) = poll_fn(|cx| { + let mut buf = [0; 1]; + match reader.poll_peek(cx, &mut buf) { + Poll::Pending => Poll::Ready(Ok(0_usize)), + Poll::Ready(all) => Poll::Ready(all), + } + }).await{ + if size > 0 { + let mut buffer = vec![]; + if let Some(_) = reader.read_buf(&mut buffer).await.ok() { + for b in buffer.iter() { + match self.event_mapper.lock().await.map(*b) { + EventResult::Some(Event::NoOp) => {} + EventResult::Some(event) => { + let mut map = self.map.lock().await; + for ref mut f in &mut *map { + f(event.clone()); + } } + _ => {} } - _ => {} } } - // } - // } + } + } } } - pub async fn is_running(&self) -> bool { - return *self.is_running.lock().await - } pub async fn stop(&self) { *self.is_running.lock().await = false; - //self.reader.lock().await.shutdown(); - //self.writer.lock().await.shutdown(); } pub async fn submit(&self, cmd: Command) { let mut writer = self.writer.lock().await; for b in self.command_mapper.lock().await.map(cmd) { - writer.write_u8(b).await; - println!("{:?}", b); + let _ = writer.write_u8(b).await; } } } -- cgit v1.2.3