From bfc63947702ffb3ca1dca6e8184f0998fb98ed13 Mon Sep 17 00:00:00 2001 From: Romeo Disca Date: Tue, 18 Aug 2020 00:11:09 +0200 Subject: chore: non-blocking read stream --- Cargo.lock | 134 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + src/client.rs | 54 +++++++++++------------ src/main.rs | 10 ++--- 4 files changed, 165 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 64df848..0e0402d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,12 +66,101 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" +[[package]] +name = "futures" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" +[[package]] +name = "futures-executor" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" + +[[package]] +name = "futures-macro" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc" + +[[package]] +name = "futures-task" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626" +dependencies = [ + "once_cell", +] + +[[package]] +name = "futures-util" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project", + "pin-utils", + "proc-macro-hack", + "proc-macro-nested", + "slab", +] + [[package]] name = "hermit-abi" version = "0.1.15" @@ -234,12 +323,44 @@ dependencies = [ "syn", ] +[[package]] +name = "once_cell" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" + +[[package]] +name = "pin-project" +version = "0.4.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca4433fff2ae79342e497d9f8ee990d174071408f28f726d6d83af93e58e48aa" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "0.4.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c0e815c3ee9a031fdf5af21c10aa17c573c9c6a566328d99e3936c34e36461f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282adbf10f2698a7a77f8e983a74b2d18176c19a7fd32a45446139ae7b02b715" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "proc-macro-crate" version = "0.1.5" @@ -249,6 +370,18 @@ dependencies = [ "toml", ] +[[package]] +name = "proc-macro-hack" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99c605b9a0adc77b7211c6b1f722dcb613d68d66859a44f3d485a6da332b0598" + +[[package]] +name = "proc-macro-nested" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a" + [[package]] name = "proc-macro2" version = "1.0.19" @@ -312,6 +445,7 @@ name = "simpleclient" version = "0.1.0" dependencies = [ "bytes", + "futures", "num_enum", "regex", "tokio", diff --git a/Cargo.toml b/Cargo.toml index cfacbb8..77a14a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" [dependencies] bytes = "0.5" +futures = "0.3.5" num_enum = "0.4.2" regex = "1" tokio = { version = "0.2", features = ["full"] } diff --git a/src/client.rs b/src/client.rs index 8cf6bca..a1a63f7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,15 +1,12 @@ -use bytes::BufMut; +use futures::future::poll_fn; +use futures::task::Poll; use tokio::net::TcpStream; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::io::*; use tokio::sync::Mutex; -use std::net::Shutdown; -//use std::sync::Arc; -//use std::time::Duration; - use super::events::Event; use super::events::stream_mapper::*; use super::commands::stream_mapper::CommandToByteMapper; @@ -52,45 +49,48 @@ impl FlicClient { } } - pub async fn register_event_handler(mut self, event: EventClosureMutex) -> Self { + pub async fn register_event_handler(self, event: EventClosureMutex) -> Self { self.map.lock().await.push(event); self } pub async fn listen(&self) { - let mut buffer = vec![]; -// if let Some(size) = self.reader.lock().await.peek(&mut buffer).await.ok() { - // if size > 0 { - if let Some(_) = self.reader.lock().await.read_buf(&mut buffer).await.ok() { - for b in buffer.iter() { - match self.event_mapper.lock().await.map(*b) { - EventResult::Some(Event::NoOp) => {} - EventResult::Some(event) => { - let mut map = self.map.lock().await; - for ref mut f in &mut *map { - f(event.clone()); + while *self.is_running.lock().await { + let mut reader = self.reader.lock().await; + if let Ok(size) = poll_fn(|cx| { + let mut buf = [0; 1]; + match reader.poll_peek(cx, &mut buf) { + Poll::Pending => Poll::Ready(Ok(0_usize)), + Poll::Ready(all) => Poll::Ready(all), + } + }).await{ + if size > 0 { + let mut buffer = vec![]; + if let Some(_) = reader.read_buf(&mut buffer).await.ok() { + for b in buffer.iter() { + match self.event_mapper.lock().await.map(*b) { + EventResult::Some(Event::NoOp) => {} + EventResult::Some(event) => { + let mut map = self.map.lock().await; + for ref mut f in &mut *map { + f(event.clone()); + } } + _ => {} } - _ => {} } } - // } - // } + } + } } } - pub async fn is_running(&self) -> bool { - return *self.is_running.lock().await - } pub async fn stop(&self) { *self.is_running.lock().await = false; - //self.reader.lock().await.shutdown(); - //self.writer.lock().await.shutdown(); } pub async fn submit(&self, cmd: Command) { let mut writer = self.writer.lock().await; for b in self.command_mapper.lock().await.map(cmd) { - writer.write_u8(b).await; - println!("{:?}", b); + let _ = writer.write_u8(b).await; } } } diff --git a/src/main.rs b/src/main.rs index 28eb8dc..dd068fe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,8 +8,6 @@ use std::error::Error; use std::time::Duration; use std::sync::Arc; -use tokio::sync::Mutex; - use client::*; use commands::Command; @@ -34,14 +32,12 @@ async fn main() -> Result<(), Box> { client1.stop().await; }); let lst = tokio::spawn(async move { - while client2.is_running().await { - client2.listen().await; - } + client2.listen().await; println!("stop"); }); - lst.await; - cmd.await; + lst.await?; + cmd.await?; Ok(()) } -- cgit v1.2.3