summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRomeo Disca <romeo.disca@gmail.com>2020-08-17 20:11:14 +0200
committerRomeo Disca <romeo.disca@gmail.com>2020-08-17 20:11:14 +0200
commitd50a95ba36c86d49545c9fe7c3bcb85530c48093 (patch)
treef70556ec3fa98073134e9e2e9ff13fd45ec1ac8b
parentf2630bbb255dab230348dff8cedfcef40787e57e (diff)
chore: create event loop
-rw-r--r--src/client.rs88
-rw-r--r--src/main.rs28
2 files changed, 72 insertions, 44 deletions
diff --git a/src/client.rs b/src/client.rs
index 320ae86..8cf6bca 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -1,12 +1,14 @@
-use tokio::io::*;
use bytes::BufMut;
use tokio::net::TcpStream;
+use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
+use tokio::io::*;
+use tokio::sync::Mutex;
use std::net::Shutdown;
-use std::sync::{Arc, Mutex, MutexGuard};
-use std::time::Duration;
+//use std::sync::Arc;
+//use std::time::Duration;
use super::events::Event;
use super::events::stream_mapper::*;
@@ -23,63 +25,73 @@ pub fn event_handler<F>(f: F) -> EventClosureMutex
}
pub struct FlicClient {
- stream: TcpStream,
- command_mapper: CommandToByteMapper,
- map: Vec<EventClosureMutex>,
- is_running: bool,
+ reader: Mutex<OwnedReadHalf>,
+ writer: Mutex<OwnedWriteHalf>,
+ is_running: Mutex<bool>,
+ command_mapper: Mutex<CommandToByteMapper>,
+ event_mapper: Mutex<ByteToEventMapper>,
+ map: Mutex<Vec<EventClosureMutex>>,
}
impl FlicClient {
pub async fn new(conn: &str) -> Result<FlicClient> {
match TcpStream::connect(conn).await {
+
Ok(stream) => {
- println!("stream open");
+ let (reader, writer) = stream.into_split();
Ok(FlicClient{
- stream,
- command_mapper: CommandToByteMapper::new(),
- map: vec![],
- is_running: true,
+ reader: Mutex::new(reader),
+ writer: Mutex::new(writer),
+ is_running: Mutex::new(true),
+ command_mapper: Mutex::new(CommandToByteMapper::new()),
+ event_mapper: Mutex::new(ByteToEventMapper::new()),
+ map: Mutex::new(vec![]),
})
}
Err(err) => Err(err)
}
}
- pub fn register_event_handler(mut self, event: EventClosureMutex) -> Self {
- self.map.push(event);
+ pub async fn register_event_handler(mut self, event: EventClosureMutex) -> Self {
+ self.map.lock().await.push(event);
self
}
- pub async fn listen(&mut self) {
- let mut mapper = ByteToEventMapper::new();
- let (mut reader, _writer) = self.stream.split();
+ pub async fn listen(&self) {
let mut buffer = vec![];
- while self.is_running {
- if let Some(size) = reader.read_buf(&mut buffer).await.ok() {
- for b in buffer.iter() {
- match mapper.map(*b) {
- EventResult::Some(Event::NoOp) => {}
- EventResult::Some(event) => for ref mut f in &mut self.map {
- f(event.clone());
+// if let Some(size) = self.reader.lock().await.peek(&mut buffer).await.ok() {
+ // if size > 0 {
+ if let Some(_) = self.reader.lock().await.read_buf(&mut buffer).await.ok() {
+ for b in buffer.iter() {
+ match self.event_mapper.lock().await.map(*b) {
+ EventResult::Some(Event::NoOp) => {}
+ EventResult::Some(event) => {
+ let mut map = self.map.lock().await;
+ for ref mut f in &mut *map {
+ f(event.clone());
+ }
+ }
+ _ => {}
}
- _ => {}
}
- }
-
- }
+ // }
+ // }
}
}
- pub async fn stop(&mut self) {
- self.is_running = false;
- self.stream.shutdown(Shutdown::Both);
- println!("stopped");
+ pub async fn is_running(&self) -> bool {
+ return *self.is_running.lock().await
+ }
+ pub async fn stop(&self) {
+ *self.is_running.lock().await = false;
+ //self.reader.lock().await.shutdown();
+ //self.writer.lock().await.shutdown();
}
- pub async fn submit(&mut self, cmd: Command) {
- let (_reader, mut writer) = self.stream.split();
- for b in self.command_mapper.map(cmd) {
- writer.write_u8(b).await;
- println!("{:?}", b);
- }
+ pub async fn submit(&self, cmd: Command) {
+ let mut writer = self.writer.lock().await;
+ for b in self.command_mapper.lock().await.map(cmd) {
+ writer.write_u8(b).await;
+ println!("{:?}", b);
+ }
}
}
diff --git a/src/main.rs b/src/main.rs
index 581547b..28eb8dc 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -19,13 +19,29 @@ async fn main() -> Result<(), Box<dyn Error>> {
let event = event_handler(|event| { println!("ping response: {:?}", event); });
let event2 = event_handler(|event| { println!("ping response: {:?}", event); });
- let mut client = FlicClient::new("127.0.0.1:5551").await?
- .register_event_handler(event)
- .register_event_handler(event2)
+ let client = FlicClient::new("127.0.0.1:5551").await?
+ .register_event_handler(event).await
+ .register_event_handler(event2).await
;
-
- client.submit(Command::GetInfo).await;
- client.listen().await;
+ let client1 = Arc::new(client);
+ let client2 = client1.clone();
+
+ let cmd = tokio::spawn(async move {
+ client1.submit(Command::GetInfo).await;
+ tokio::time::delay_for(Duration::from_secs(3)).await;
+ client1.submit(Command::GetInfo).await;
+ tokio::time::delay_for(Duration::from_secs(3)).await;
+ client1.stop().await;
+ });
+ let lst = tokio::spawn(async move {
+ while client2.is_running().await {
+ client2.listen().await;
+ }
+ println!("stop");
+ });
+
+ lst.await;
+ cmd.await;
Ok(())
}