summaryrefslogtreecommitdiffstats
path: root/examples/connect.rs
diff options
context:
space:
mode:
authorArtem Vorotnikov <artem@vorotnikov.me>2019-12-18 22:57:22 +0300
committerCarl Lerche <me@carllerche.com>2019-12-18 11:57:22 -0800
commit4c645866ef4ea5b0ef8c7852281a09b2f96d969b (patch)
treefe10e6fffea1033c595b920935dc723be3cc3ac4 /examples/connect.rs
parentb0836ece7aa5219e9e40355d0eb784baffc7b6c6 (diff)
stream: add `next` and `map` utility fn (#1962)
Introduces `StreamExt` trait. This trait will be used to add utility functions to make working with streams easier. This patch includes two functions: * `next`: a future returning the item in the stream. * `map`: transform each item in the stream.
Diffstat (limited to 'examples/connect.rs')
-rw-r--r--examples/connect.rs13
1 files changed, 8 insertions, 5 deletions
diff --git a/examples/connect.rs b/examples/connect.rs
index d51af88c..75640c62 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -55,19 +55,21 @@ async fn main() -> Result<(), Box<dyn Error>> {
mod tcp {
use super::codec;
- use futures::{future, Sink, SinkExt, Stream, StreamExt};
+ use futures::StreamExt;
+ use futures::{future, Sink, SinkExt};
use std::{error::Error, io, net::SocketAddr};
use tokio::net::TcpStream;
+ use tokio::stream::Stream;
use tokio_util::codec::{FramedRead, FramedWrite};
pub async fn connect(
addr: &SocketAddr,
- stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin,
+ mut stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin,
mut stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin,
) -> Result<(), Box<dyn Error>> {
let mut stream = TcpStream::connect(addr).await?;
let (r, w) = stream.split();
- let sink = FramedWrite::new(w, codec::Bytes);
+ let mut sink = FramedWrite::new(w, codec::Bytes);
let mut stream = FramedRead::new(r, codec::Bytes)
.filter_map(|i| match i {
Ok(i) => future::ready(Some(i)),
@@ -78,7 +80,7 @@ mod tcp {
})
.map(Ok);
- match future::join(stdin.forward(sink), stdout.send_all(&mut stream)).await {
+ match future::join(sink.send_all(&mut stdin), stdout.send_all(&mut stream)).await {
(Err(e), _) | (_, Err(e)) => Err(e.into()),
_ => Ok(()),
}
@@ -88,8 +90,9 @@ mod tcp {
mod udp {
use tokio::net::udp::{RecvHalf, SendHalf};
use tokio::net::UdpSocket;
+ use tokio::stream::{Stream, StreamExt};
- use futures::{future, Sink, SinkExt, Stream, StreamExt};
+ use futures::{future, Sink, SinkExt};
use std::error::Error;
use std::io;
use std::net::SocketAddr;