diff options
author | João Oliveira <hello@jxs.pt> | 2019-12-19 00:54:06 +0000 |
---|---|---|
committer | Lucio Franco <luciofranco14@gmail.com> | 2019-12-18 19:54:06 -0500 |
commit | 58b5abdb99f113152e9953b0576c4c2fd0aaab99 (patch) | |
tree | d1de44ec17d64d03cd6444f6b82f514f271e60b5 /examples | |
parent | 2d78cfe56ac14f384e26278951b52099d33bd797 (diff) |
update connect example (#1787)
Diffstat (limited to 'examples')
-rw-r--r-- | examples/connect.rs | 87 |
1 files changed, 25 insertions, 62 deletions
diff --git a/examples/connect.rs b/examples/connect.rs index 75640c62..5d0515a7 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -16,8 +16,9 @@ #![warn(rust_2018_idioms)] +use futures::StreamExt; use tokio::io; -use tokio_util::codec::{FramedRead, FramedWrite}; +use tokio_util::codec::{BytesCodec, FramedRead, FramedWrite}; use std::env; use std::error::Error; @@ -41,8 +42,9 @@ async fn main() -> Result<(), Box<dyn Error>> { .ok_or("this program requires at least one argument")?; let addr = addr.parse::<SocketAddr>()?; - let stdin = FramedRead::new(io::stdin(), codec::Bytes); - let stdout = FramedWrite::new(io::stdout(), codec::Bytes); + let stdin = FramedRead::new(io::stdin(), BytesCodec::new()); + let stdin = stdin.map(|i| i.map(|bytes| bytes.freeze())); + let stdout = FramedWrite::new(io::stdout(), BytesCodec::new()); if tcp { tcp::connect(&addr, stdin, stdout).await?; @@ -54,25 +56,26 @@ async fn main() -> Result<(), Box<dyn Error>> { } mod tcp { - use super::codec; - use futures::StreamExt; - use futures::{future, Sink, SinkExt}; + use bytes::Bytes; + use futures::{future, Sink, SinkExt, Stream, StreamExt}; use std::{error::Error, io, net::SocketAddr}; use tokio::net::TcpStream; - use tokio::stream::Stream; - use tokio_util::codec::{FramedRead, FramedWrite}; + use tokio_util::codec::{BytesCodec, FramedRead, FramedWrite}; pub async fn connect( addr: &SocketAddr, - mut stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin, - mut stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin, + mut stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin, + mut stdout: impl Sink<Bytes, Error = io::Error> + Unpin, ) -> Result<(), Box<dyn Error>> { let mut stream = TcpStream::connect(addr).await?; let (r, w) = stream.split(); - let mut sink = FramedWrite::new(w, codec::Bytes); - let mut stream = FramedRead::new(r, codec::Bytes) + let mut sink = FramedWrite::new(w, BytesCodec::new()); + // filter map Result<BytesMut, Error> stream into just a Bytes stream to match stdout Sink + // on the event of an Error, log the error and end the stream + let mut stream = FramedRead::new(r, BytesCodec::new()) .filter_map(|i| match i { - Ok(i) => future::ready(Some(i)), + //BytesMut into Bytes + Ok(i) => future::ready(Some(i.freeze())), Err(e) => { println!("failed to read from socket; error={}", e); future::ready(None) @@ -88,19 +91,18 @@ mod tcp { } mod udp { - use tokio::net::udp::{RecvHalf, SendHalf}; - use tokio::net::UdpSocket; - use tokio::stream::{Stream, StreamExt}; - - use futures::{future, Sink, SinkExt}; + use bytes::Bytes; + use futures::{future, Sink, SinkExt, Stream, StreamExt}; use std::error::Error; use std::io; use std::net::SocketAddr; + use tokio::net::udp::{RecvHalf, SendHalf}; + use tokio::net::UdpSocket; pub async fn connect( addr: &SocketAddr, - stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin, - stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin, + stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin, + stdout: impl Sink<Bytes, Error = io::Error> + Unpin, ) -> Result<(), Box<dyn Error>> { // We'll bind our UDP socket to a local IP/port, but for now we // basically let the OS pick both of those. @@ -120,7 +122,7 @@ mod udp { } async fn send( - mut stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin, + mut stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin, writer: &mut SendHalf, ) -> Result<(), io::Error> { while let Some(item) = stdin.next().await { @@ -132,7 +134,7 @@ mod udp { } async fn recv( - mut stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin, + mut stdout: impl Sink<Bytes, Error = io::Error> + Unpin, reader: &mut RecvHalf, ) -> Result<(), io::Error> { loop { @@ -140,47 +142,8 @@ mod udp { let n = reader.recv(&mut buf[..]).await?; if n > 0 { - stdout.send(buf).await?; + stdout.send(Bytes::from(buf)).await?; } } } } - -mod codec { - use bytes::{BufMut, BytesMut}; - use std::io; - use tokio_util::codec::{Decoder, Encoder}; - - /// A simple `Codec` implementation that just ships bytes around. - /// - /// This type is used for "framing" a TCP/UDP stream of bytes but it's really - /// just a convenient method for us to work with streams/sinks for now. - /// This'll just take any data read and interpret it as a "frame" and - /// conversely just shove data into the output location without looking at - /// it. - pub struct Bytes; - - impl Decoder for Bytes { - type Item = Vec<u8>; - type Error = io::Error; - - fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Vec<u8>>> { - if !buf.is_empty() { - let len = buf.len(); - Ok(Some(buf.split_to(len).into_iter().collect())) - } else { - Ok(None) - } - } - } - - impl Encoder for Bytes { - type Item = Vec<u8>; - type Error = io::Error; - - fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> { - buf.put(&data[..]); - Ok(()) - } - } -} |