From d50a95ba36c86d49545c9fe7c3bcb85530c48093 Mon Sep 17 00:00:00 2001 From: Romeo Disca Date: Mon, 17 Aug 2020 20:11:14 +0200 Subject: chore: create event loop --- src/client.rs | 88 +++++++++++++++++++++++++++++++++-------------------------- src/main.rs | 28 +++++++++++++++---- 2 files changed, 72 insertions(+), 44 deletions(-) diff --git a/src/client.rs b/src/client.rs index 320ae86..8cf6bca 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,12 +1,14 @@ -use tokio::io::*; use bytes::BufMut; use tokio::net::TcpStream; +use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; +use tokio::io::*; +use tokio::sync::Mutex; use std::net::Shutdown; -use std::sync::{Arc, Mutex, MutexGuard}; -use std::time::Duration; +//use std::sync::Arc; +//use std::time::Duration; use super::events::Event; use super::events::stream_mapper::*; @@ -23,63 +25,73 @@ pub fn event_handler(f: F) -> EventClosureMutex } pub struct FlicClient { - stream: TcpStream, - command_mapper: CommandToByteMapper, - map: Vec, - is_running: bool, + reader: Mutex, + writer: Mutex, + is_running: Mutex, + command_mapper: Mutex, + event_mapper: Mutex, + map: Mutex>, } impl FlicClient { pub async fn new(conn: &str) -> Result { match TcpStream::connect(conn).await { + Ok(stream) => { - println!("stream open"); + let (reader, writer) = stream.into_split(); Ok(FlicClient{ - stream, - command_mapper: CommandToByteMapper::new(), - map: vec![], - is_running: true, + reader: Mutex::new(reader), + writer: Mutex::new(writer), + is_running: Mutex::new(true), + command_mapper: Mutex::new(CommandToByteMapper::new()), + event_mapper: Mutex::new(ByteToEventMapper::new()), + map: Mutex::new(vec![]), }) } Err(err) => Err(err) } } - pub fn register_event_handler(mut self, event: EventClosureMutex) -> Self { - self.map.push(event); + pub async fn register_event_handler(mut self, event: EventClosureMutex) -> Self { + self.map.lock().await.push(event); self } - pub async fn listen(&mut self) { - let mut mapper = ByteToEventMapper::new(); - let (mut reader, _writer) = self.stream.split(); + pub async fn listen(&self) { let mut buffer = vec![]; - while self.is_running { - if let Some(size) = reader.read_buf(&mut buffer).await.ok() { - for b in buffer.iter() { - match mapper.map(*b) { - EventResult::Some(Event::NoOp) => {} - EventResult::Some(event) => for ref mut f in &mut self.map { - f(event.clone()); +// if let Some(size) = self.reader.lock().await.peek(&mut buffer).await.ok() { + // if size > 0 { + if let Some(_) = self.reader.lock().await.read_buf(&mut buffer).await.ok() { + for b in buffer.iter() { + match self.event_mapper.lock().await.map(*b) { + EventResult::Some(Event::NoOp) => {} + EventResult::Some(event) => { + let mut map = self.map.lock().await; + for ref mut f in &mut *map { + f(event.clone()); + } + } + _ => {} } - _ => {} } - } - - } + // } + // } } } - pub async fn stop(&mut self) { - self.is_running = false; - self.stream.shutdown(Shutdown::Both); - println!("stopped"); + pub async fn is_running(&self) -> bool { + return *self.is_running.lock().await + } + pub async fn stop(&self) { + *self.is_running.lock().await = false; + //self.reader.lock().await.shutdown(); + //self.writer.lock().await.shutdown(); } - pub async fn submit(&mut self, cmd: Command) { - let (_reader, mut writer) = self.stream.split(); - for b in self.command_mapper.map(cmd) { - writer.write_u8(b).await; - println!("{:?}", b); - } + pub async fn submit(&self, cmd: Command) { + let mut writer = self.writer.lock().await; + for b in self.command_mapper.lock().await.map(cmd) { + writer.write_u8(b).await; + println!("{:?}", b); + } } } diff --git a/src/main.rs b/src/main.rs index 581547b..28eb8dc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,13 +19,29 @@ async fn main() -> Result<(), Box> { let event = event_handler(|event| { println!("ping response: {:?}", event); }); let event2 = event_handler(|event| { println!("ping response: {:?}", event); }); - let mut client = FlicClient::new("127.0.0.1:5551").await? - .register_event_handler(event) - .register_event_handler(event2) + let client = FlicClient::new("127.0.0.1:5551").await? + .register_event_handler(event).await + .register_event_handler(event2).await ; - - client.submit(Command::GetInfo).await; - client.listen().await; + let client1 = Arc::new(client); + let client2 = client1.clone(); + + let cmd = tokio::spawn(async move { + client1.submit(Command::GetInfo).await; + tokio::time::delay_for(Duration::from_secs(3)).await; + client1.submit(Command::GetInfo).await; + tokio::time::delay_for(Duration::from_secs(3)).await; + client1.stop().await; + }); + let lst = tokio::spawn(async move { + while client2.is_running().await { + client2.listen().await; + } + println!("stop"); + }); + + lst.await; + cmd.await; Ok(()) } -- cgit v1.2.3