diff options
author | Artem Vorotnikov <artem@vorotnikov.me> | 2019-12-18 22:57:22 +0300 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2019-12-18 11:57:22 -0800 |
commit | 4c645866ef4ea5b0ef8c7852281a09b2f96d969b (patch) | |
tree | fe10e6fffea1033c595b920935dc723be3cc3ac4 /examples/connect.rs | |
parent | b0836ece7aa5219e9e40355d0eb784baffc7b6c6 (diff) |
stream: add `next` and `map` utility fn (#1962)
Introduces `StreamExt` trait. This trait will be used to add utility functions
to make working with streams easier. This patch includes two functions:
* `next`: a future returning the item in the stream.
* `map`: transform each item in the stream.
Diffstat (limited to 'examples/connect.rs')
-rw-r--r-- | examples/connect.rs | 13 |
1 files changed, 8 insertions, 5 deletions
diff --git a/examples/connect.rs b/examples/connect.rs index d51af88c..75640c62 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -55,19 +55,21 @@ async fn main() -> Result<(), Box<dyn Error>> { mod tcp { use super::codec; - use futures::{future, Sink, SinkExt, Stream, StreamExt}; + use futures::StreamExt; + use futures::{future, Sink, SinkExt}; use std::{error::Error, io, net::SocketAddr}; use tokio::net::TcpStream; + use tokio::stream::Stream; use tokio_util::codec::{FramedRead, FramedWrite}; pub async fn connect( addr: &SocketAddr, - stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin, + mut stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin, mut stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin, ) -> Result<(), Box<dyn Error>> { let mut stream = TcpStream::connect(addr).await?; let (r, w) = stream.split(); - let sink = FramedWrite::new(w, codec::Bytes); + let mut sink = FramedWrite::new(w, codec::Bytes); let mut stream = FramedRead::new(r, codec::Bytes) .filter_map(|i| match i { Ok(i) => future::ready(Some(i)), @@ -78,7 +80,7 @@ mod tcp { }) .map(Ok); - match future::join(stdin.forward(sink), stdout.send_all(&mut stream)).await { + match future::join(sink.send_all(&mut stdin), stdout.send_all(&mut stream)).await { (Err(e), _) | (_, Err(e)) => Err(e.into()), _ => Ok(()), } @@ -88,8 +90,9 @@ mod tcp { mod udp { use tokio::net::udp::{RecvHalf, SendHalf}; use tokio::net::UdpSocket; + use tokio::stream::{Stream, StreamExt}; - use futures::{future, Sink, SinkExt, Stream, StreamExt}; + use futures::{future, Sink, SinkExt}; use std::error::Error; use std::io; use std::net::SocketAddr; |