summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRomeo Disca <romeo.disca@gmail.com>2020-08-18 00:11:09 +0200
committerRomeo Disca <romeo.disca@gmail.com>2020-08-18 06:13:12 +0200
commitbfc63947702ffb3ca1dca6e8184f0998fb98ed13 (patch)
tree50483c4c1c7263968c947615f0cff855c6c73e4d
parentd50a95ba36c86d49545c9fe7c3bcb85530c48093 (diff)
downloadflicbtn-bfc63947702ffb3ca1dca6e8184f0998fb98ed13.tar.gz
flicbtn-bfc63947702ffb3ca1dca6e8184f0998fb98ed13.tar.xz
chore: non-blocking read stream
-rw-r--r--Cargo.lock134
-rw-r--r--Cargo.toml1
-rw-r--r--src/client.rs54
-rw-r--r--src/main.rs10
4 files changed, 165 insertions, 34 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 64df848..0e0402d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -67,12 +67,101 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
[[package]]
+name = "futures"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-channel"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5"
+dependencies = [
+ "futures-core",
+ "futures-sink",
+]
+
+[[package]]
name = "futures-core"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399"
[[package]]
+name = "futures-executor"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-io"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789"
+
+[[package]]
+name = "futures-macro"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39"
+dependencies = [
+ "proc-macro-hack",
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "futures-sink"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc"
+
+[[package]]
+name = "futures-task"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626"
+dependencies = [
+ "once_cell",
+]
+
+[[package]]
+name = "futures-util"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-io",
+ "futures-macro",
+ "futures-sink",
+ "futures-task",
+ "memchr",
+ "pin-project",
+ "pin-utils",
+ "proc-macro-hack",
+ "proc-macro-nested",
+ "slab",
+]
+
+[[package]]
name = "hermit-abi"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -235,12 +324,44 @@ dependencies = [
]
[[package]]
+name = "once_cell"
+version = "1.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d"
+
+[[package]]
+name = "pin-project"
+version = "0.4.23"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ca4433fff2ae79342e497d9f8ee990d174071408f28f726d6d83af93e58e48aa"
+dependencies = [
+ "pin-project-internal",
+]
+
+[[package]]
+name = "pin-project-internal"
+version = "0.4.23"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2c0e815c3ee9a031fdf5af21c10aa17c573c9c6a566328d99e3936c34e36461f"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
name = "pin-project-lite"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282adbf10f2698a7a77f8e983a74b2d18176c19a7fd32a45446139ae7b02b715"
[[package]]
+name = "pin-utils"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
+
+[[package]]
name = "proc-macro-crate"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -250,6 +371,18 @@ dependencies = [
]
[[package]]
+name = "proc-macro-hack"
+version = "0.5.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "99c605b9a0adc77b7211c6b1f722dcb613d68d66859a44f3d485a6da332b0598"
+
+[[package]]
+name = "proc-macro-nested"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a"
+
+[[package]]
name = "proc-macro2"
version = "1.0.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -312,6 +445,7 @@ name = "simpleclient"
version = "0.1.0"
dependencies = [
"bytes",
+ "futures",
"num_enum",
"regex",
"tokio",
diff --git a/Cargo.toml b/Cargo.toml
index cfacbb8..77a14a5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -8,6 +8,7 @@ edition = "2018"
[dependencies]
bytes = "0.5"
+futures = "0.3.5"
num_enum = "0.4.2"
regex = "1"
tokio = { version = "0.2", features = ["full"] }
diff --git a/src/client.rs b/src/client.rs
index 8cf6bca..a1a63f7 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -1,15 +1,12 @@
-use bytes::BufMut;
+use futures::future::poll_fn;
+use futures::task::Poll;
use tokio::net::TcpStream;
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::io::*;
use tokio::sync::Mutex;
-use std::net::Shutdown;
-//use std::sync::Arc;
-//use std::time::Duration;
-
use super::events::Event;
use super::events::stream_mapper::*;
use super::commands::stream_mapper::CommandToByteMapper;
@@ -52,45 +49,48 @@ impl FlicClient {
}
}
- pub async fn register_event_handler(mut self, event: EventClosureMutex) -> Self {
+ pub async fn register_event_handler(self, event: EventClosureMutex) -> Self {
self.map.lock().await.push(event);
self
}
pub async fn listen(&self) {
- let mut buffer = vec![];
-// if let Some(size) = self.reader.lock().await.peek(&mut buffer).await.ok() {
- // if size > 0 {
- if let Some(_) = self.reader.lock().await.read_buf(&mut buffer).await.ok() {
- for b in buffer.iter() {
- match self.event_mapper.lock().await.map(*b) {
- EventResult::Some(Event::NoOp) => {}
- EventResult::Some(event) => {
- let mut map = self.map.lock().await;
- for ref mut f in &mut *map {
- f(event.clone());
+ while *self.is_running.lock().await {
+ let mut reader = self.reader.lock().await;
+ if let Ok(size) = poll_fn(|cx| {
+ let mut buf = [0; 1];
+ match reader.poll_peek(cx, &mut buf) {
+ Poll::Pending => Poll::Ready(Ok(0_usize)),
+ Poll::Ready(all) => Poll::Ready(all),
+ }
+ }).await{
+ if size > 0 {
+ let mut buffer = vec![];
+ if let Some(_) = reader.read_buf(&mut buffer).await.ok() {
+ for b in buffer.iter() {
+ match self.event_mapper.lock().await.map(*b) {
+ EventResult::Some(Event::NoOp) => {}
+ EventResult::Some(event) => {
+ let mut map = self.map.lock().await;
+ for ref mut f in &mut *map {
+ f(event.clone());
+ }
}
+ _ => {}
}
- _ => {}
}
}
- // }
- // }
+ }
+ }
}
}
- pub async fn is_running(&self) -> bool {
- return *self.is_running.lock().await
- }
pub async fn stop(&self) {
*self.is_running.lock().await = false;
- //self.reader.lock().await.shutdown();
- //self.writer.lock().await.shutdown();
}
pub async fn submit(&self, cmd: Command) {
let mut writer = self.writer.lock().await;
for b in self.command_mapper.lock().await.map(cmd) {
- writer.write_u8(b).await;
- println!("{:?}", b);
+ let _ = writer.write_u8(b).await;
}
}
}
diff --git a/src/main.rs b/src/main.rs
index 28eb8dc..dd068fe 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -8,8 +8,6 @@ use std::error::Error;
use std::time::Duration;
use std::sync::Arc;
-use tokio::sync::Mutex;
-
use client::*;
use commands::Command;
@@ -34,14 +32,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
client1.stop().await;
});
let lst = tokio::spawn(async move {
- while client2.is_running().await {
- client2.listen().await;
- }
+ client2.listen().await;
println!("stop");
});
- lst.await;
- cmd.await;
+ lst.await?;
+ cmd.await?;
Ok(())
}