summaryrefslogtreecommitdiffstats
path: root/src/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/client.rs')
-rw-r--r--src/client.rs88
1 files changed, 50 insertions, 38 deletions
diff --git a/src/client.rs b/src/client.rs
index 320ae86..8cf6bca 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -1,12 +1,14 @@
-use tokio::io::*;
use bytes::BufMut;
use tokio::net::TcpStream;
+use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
+use tokio::io::*;
+use tokio::sync::Mutex;
use std::net::Shutdown;
-use std::sync::{Arc, Mutex, MutexGuard};
-use std::time::Duration;
+//use std::sync::Arc;
+//use std::time::Duration;
use super::events::Event;
use super::events::stream_mapper::*;
@@ -23,63 +25,73 @@ pub fn event_handler<F>(f: F) -> EventClosureMutex
}
pub struct FlicClient {
- stream: TcpStream,
- command_mapper: CommandToByteMapper,
- map: Vec<EventClosureMutex>,
- is_running: bool,
+ reader: Mutex<OwnedReadHalf>,
+ writer: Mutex<OwnedWriteHalf>,
+ is_running: Mutex<bool>,
+ command_mapper: Mutex<CommandToByteMapper>,
+ event_mapper: Mutex<ByteToEventMapper>,
+ map: Mutex<Vec<EventClosureMutex>>,
}
impl FlicClient {
pub async fn new(conn: &str) -> Result<FlicClient> {
match TcpStream::connect(conn).await {
+
Ok(stream) => {
- println!("stream open");
+ let (reader, writer) = stream.into_split();
Ok(FlicClient{
- stream,
- command_mapper: CommandToByteMapper::new(),
- map: vec![],
- is_running: true,
+ reader: Mutex::new(reader),
+ writer: Mutex::new(writer),
+ is_running: Mutex::new(true),
+ command_mapper: Mutex::new(CommandToByteMapper::new()),
+ event_mapper: Mutex::new(ByteToEventMapper::new()),
+ map: Mutex::new(vec![]),
})
}
Err(err) => Err(err)
}
}
- pub fn register_event_handler(mut self, event: EventClosureMutex) -> Self {
- self.map.push(event);
+ pub async fn register_event_handler(mut self, event: EventClosureMutex) -> Self {
+ self.map.lock().await.push(event);
self
}
- pub async fn listen(&mut self) {
- let mut mapper = ByteToEventMapper::new();
- let (mut reader, _writer) = self.stream.split();
+ pub async fn listen(&self) {
let mut buffer = vec![];
- while self.is_running {
- if let Some(size) = reader.read_buf(&mut buffer).await.ok() {
- for b in buffer.iter() {
- match mapper.map(*b) {
- EventResult::Some(Event::NoOp) => {}
- EventResult::Some(event) => for ref mut f in &mut self.map {
- f(event.clone());
+// if let Some(size) = self.reader.lock().await.peek(&mut buffer).await.ok() {
+ // if size > 0 {
+ if let Some(_) = self.reader.lock().await.read_buf(&mut buffer).await.ok() {
+ for b in buffer.iter() {
+ match self.event_mapper.lock().await.map(*b) {
+ EventResult::Some(Event::NoOp) => {}
+ EventResult::Some(event) => {
+ let mut map = self.map.lock().await;
+ for ref mut f in &mut *map {
+ f(event.clone());
+ }
+ }
+ _ => {}
}
- _ => {}
}
- }
-
- }
+ // }
+ // }
}
}
- pub async fn stop(&mut self) {
- self.is_running = false;
- self.stream.shutdown(Shutdown::Both);
- println!("stopped");
+ pub async fn is_running(&self) -> bool {
+ return *self.is_running.lock().await
+ }
+ pub async fn stop(&self) {
+ *self.is_running.lock().await = false;
+ //self.reader.lock().await.shutdown();
+ //self.writer.lock().await.shutdown();
}
- pub async fn submit(&mut self, cmd: Command) {
- let (_reader, mut writer) = self.stream.split();
- for b in self.command_mapper.map(cmd) {
- writer.write_u8(b).await;
- println!("{:?}", b);
- }
+ pub async fn submit(&self, cmd: Command) {
+ let mut writer = self.writer.lock().await;
+ for b in self.command_mapper.lock().await.map(cmd) {
+ writer.write_u8(b).await;
+ println!("{:?}", b);
+ }
}
}