summaryrefslogtreecommitdiffstats
path: root/examples
diff options
context:
space:
mode:
authorJoão Oliveira <hello@jxs.pt>2019-12-19 00:54:06 +0000
committerLucio Franco <luciofranco14@gmail.com>2019-12-18 19:54:06 -0500
commit58b5abdb99f113152e9953b0576c4c2fd0aaab99 (patch)
treed1de44ec17d64d03cd6444f6b82f514f271e60b5 /examples
parent2d78cfe56ac14f384e26278951b52099d33bd797 (diff)
update connect example (#1787)
Diffstat (limited to 'examples')
-rw-r--r--examples/connect.rs87
1 files changed, 25 insertions, 62 deletions
diff --git a/examples/connect.rs b/examples/connect.rs
index 75640c62..5d0515a7 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -16,8 +16,9 @@
#![warn(rust_2018_idioms)]
+use futures::StreamExt;
use tokio::io;
-use tokio_util::codec::{FramedRead, FramedWrite};
+use tokio_util::codec::{BytesCodec, FramedRead, FramedWrite};
use std::env;
use std::error::Error;
@@ -41,8 +42,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
.ok_or("this program requires at least one argument")?;
let addr = addr.parse::<SocketAddr>()?;
- let stdin = FramedRead::new(io::stdin(), codec::Bytes);
- let stdout = FramedWrite::new(io::stdout(), codec::Bytes);
+ let stdin = FramedRead::new(io::stdin(), BytesCodec::new());
+ let stdin = stdin.map(|i| i.map(|bytes| bytes.freeze()));
+ let stdout = FramedWrite::new(io::stdout(), BytesCodec::new());
if tcp {
tcp::connect(&addr, stdin, stdout).await?;
@@ -54,25 +56,26 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
mod tcp {
- use super::codec;
- use futures::StreamExt;
- use futures::{future, Sink, SinkExt};
+ use bytes::Bytes;
+ use futures::{future, Sink, SinkExt, Stream, StreamExt};
use std::{error::Error, io, net::SocketAddr};
use tokio::net::TcpStream;
- use tokio::stream::Stream;
- use tokio_util::codec::{FramedRead, FramedWrite};
+ use tokio_util::codec::{BytesCodec, FramedRead, FramedWrite};
pub async fn connect(
addr: &SocketAddr,
- mut stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin,
- mut stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin,
+ mut stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin,
+ mut stdout: impl Sink<Bytes, Error = io::Error> + Unpin,
) -> Result<(), Box<dyn Error>> {
let mut stream = TcpStream::connect(addr).await?;
let (r, w) = stream.split();
- let mut sink = FramedWrite::new(w, codec::Bytes);
- let mut stream = FramedRead::new(r, codec::Bytes)
+ let mut sink = FramedWrite::new(w, BytesCodec::new());
+ // filter map Result<BytesMut, Error> stream into just a Bytes stream to match stdout Sink
+ // on the event of an Error, log the error and end the stream
+ let mut stream = FramedRead::new(r, BytesCodec::new())
.filter_map(|i| match i {
- Ok(i) => future::ready(Some(i)),
+ //BytesMut into Bytes
+ Ok(i) => future::ready(Some(i.freeze())),
Err(e) => {
println!("failed to read from socket; error={}", e);
future::ready(None)
@@ -88,19 +91,18 @@ mod tcp {
}
mod udp {
- use tokio::net::udp::{RecvHalf, SendHalf};
- use tokio::net::UdpSocket;
- use tokio::stream::{Stream, StreamExt};
-
- use futures::{future, Sink, SinkExt};
+ use bytes::Bytes;
+ use futures::{future, Sink, SinkExt, Stream, StreamExt};
use std::error::Error;
use std::io;
use std::net::SocketAddr;
+ use tokio::net::udp::{RecvHalf, SendHalf};
+ use tokio::net::UdpSocket;
pub async fn connect(
addr: &SocketAddr,
- stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin,
- stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin,
+ stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin,
+ stdout: impl Sink<Bytes, Error = io::Error> + Unpin,
) -> Result<(), Box<dyn Error>> {
// We'll bind our UDP socket to a local IP/port, but for now we
// basically let the OS pick both of those.
@@ -120,7 +122,7 @@ mod udp {
}
async fn send(
- mut stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin,
+ mut stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin,
writer: &mut SendHalf,
) -> Result<(), io::Error> {
while let Some(item) = stdin.next().await {
@@ -132,7 +134,7 @@ mod udp {
}
async fn recv(
- mut stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin,
+ mut stdout: impl Sink<Bytes, Error = io::Error> + Unpin,
reader: &mut RecvHalf,
) -> Result<(), io::Error> {
loop {
@@ -140,47 +142,8 @@ mod udp {
let n = reader.recv(&mut buf[..]).await?;
if n > 0 {
- stdout.send(buf).await?;
+ stdout.send(Bytes::from(buf)).await?;
}
}
}
}
-
-mod codec {
- use bytes::{BufMut, BytesMut};
- use std::io;
- use tokio_util::codec::{Decoder, Encoder};
-
- /// A simple `Codec` implementation that just ships bytes around.
- ///
- /// This type is used for "framing" a TCP/UDP stream of bytes but it's really
- /// just a convenient method for us to work with streams/sinks for now.
- /// This'll just take any data read and interpret it as a "frame" and
- /// conversely just shove data into the output location without looking at
- /// it.
- pub struct Bytes;
-
- impl Decoder for Bytes {
- type Item = Vec<u8>;
- type Error = io::Error;
-
- fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Vec<u8>>> {
- if !buf.is_empty() {
- let len = buf.len();
- Ok(Some(buf.split_to(len).into_iter().collect()))
- } else {
- Ok(None)
- }
- }
- }
-
- impl Encoder for Bytes {
- type Item = Vec<u8>;
- type Error = io::Error;
-
- fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> {
- buf.put(&data[..]);
- Ok(())
- }
- }
-}