From 4c645866ef4ea5b0ef8c7852281a09b2f96d969b Mon Sep 17 00:00:00 2001 From: Artem Vorotnikov Date: Wed, 18 Dec 2019 22:57:22 +0300 Subject: stream: add `next` and `map` utility fn (#1962) Introduces `StreamExt` trait. This trait will be used to add utility functions to make working with streams easier. This patch includes two functions: * `next`: a future returning the item in the stream. * `map`: transform each item in the stream. --- examples/chat.rs | 7 ++++--- examples/connect.rs | 13 ++++++++----- examples/print_each_packet.rs | 2 +- examples/tinydb.rs | 3 ++- examples/tinyhttp.rs | 3 ++- examples/udp-codec.rs | 3 ++- 6 files changed, 19 insertions(+), 12 deletions(-) (limited to 'examples') diff --git a/examples/chat.rs b/examples/chat.rs index 91589072..e1da5f32 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -27,10 +27,11 @@ #![warn(rust_2018_idioms)] use tokio::net::{TcpListener, TcpStream}; +use tokio::stream::{Stream, StreamExt}; use tokio::sync::{mpsc, Mutex}; use tokio_util::codec::{Framed, LinesCodec, LinesCodecError}; -use futures::{SinkExt, Stream, StreamExt}; +use futures::SinkExt; use std::collections::HashMap; use std::env; use std::error::Error; @@ -163,12 +164,12 @@ impl Stream for Peer { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // First poll the `UnboundedReceiver`. - if let Poll::Ready(Some(v)) = self.rx.poll_next_unpin(cx) { + if let Poll::Ready(Some(v)) = Pin::new(&mut self.rx).poll_next(cx) { return Poll::Ready(Some(Ok(Message::Received(v)))); } // Secondly poll the `Framed` stream. - let result: Option<_> = futures::ready!(self.lines.poll_next_unpin(cx)); + let result: Option<_> = futures::ready!(Pin::new(&mut self.lines).poll_next(cx)); Poll::Ready(match result { // We've received a message we should broadcast to others. diff --git a/examples/connect.rs b/examples/connect.rs index d51af88c..75640c62 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -55,19 +55,21 @@ async fn main() -> Result<(), Box> { mod tcp { use super::codec; - use futures::{future, Sink, SinkExt, Stream, StreamExt}; + use futures::StreamExt; + use futures::{future, Sink, SinkExt}; use std::{error::Error, io, net::SocketAddr}; use tokio::net::TcpStream; + use tokio::stream::Stream; use tokio_util::codec::{FramedRead, FramedWrite}; pub async fn connect( addr: &SocketAddr, - stdin: impl Stream, io::Error>> + Unpin, + mut stdin: impl Stream, io::Error>> + Unpin, mut stdout: impl Sink, Error = io::Error> + Unpin, ) -> Result<(), Box> { let mut stream = TcpStream::connect(addr).await?; let (r, w) = stream.split(); - let sink = FramedWrite::new(w, codec::Bytes); + let mut sink = FramedWrite::new(w, codec::Bytes); let mut stream = FramedRead::new(r, codec::Bytes) .filter_map(|i| match i { Ok(i) => future::ready(Some(i)), @@ -78,7 +80,7 @@ mod tcp { }) .map(Ok); - match future::join(stdin.forward(sink), stdout.send_all(&mut stream)).await { + match future::join(sink.send_all(&mut stdin), stdout.send_all(&mut stream)).await { (Err(e), _) | (_, Err(e)) => Err(e.into()), _ => Ok(()), } @@ -88,8 +90,9 @@ mod tcp { mod udp { use tokio::net::udp::{RecvHalf, SendHalf}; use tokio::net::UdpSocket; + use tokio::stream::{Stream, StreamExt}; - use futures::{future, Sink, SinkExt, Stream, StreamExt}; + use futures::{future, Sink, SinkExt}; use std::error::Error; use std::io; use std::net::SocketAddr; diff --git a/examples/print_each_packet.rs b/examples/print_each_packet.rs index 4604139b..d650b5bd 100644 --- a/examples/print_each_packet.rs +++ b/examples/print_each_packet.rs @@ -55,9 +55,9 @@ #![warn(rust_2018_idioms)] use tokio::net::TcpListener; +use tokio::stream::StreamExt; use tokio_util::codec::{BytesCodec, Decoder}; -use futures::StreamExt; use std::env; #[tokio::main] diff --git a/examples/tinydb.rs b/examples/tinydb.rs index cf867a0a..7c71dedf 100644 --- a/examples/tinydb.rs +++ b/examples/tinydb.rs @@ -42,9 +42,10 @@ #![warn(rust_2018_idioms)] use tokio::net::TcpListener; +use tokio::stream::StreamExt; use tokio_util::codec::{Framed, LinesCodec}; -use futures::{SinkExt, StreamExt}; +use futures::SinkExt; use std::collections::HashMap; use std::env; use std::error::Error; diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs index 5ddf0d48..9ac2806e 100644 --- a/examples/tinyhttp.rs +++ b/examples/tinyhttp.rs @@ -14,13 +14,14 @@ #![warn(rust_2018_idioms)] use bytes::BytesMut; -use futures::{SinkExt, StreamExt}; +use futures::SinkExt; use http::{header::HeaderValue, Request, Response, StatusCode}; #[macro_use] extern crate serde_derive; use serde_json; use std::{env, error::Error, fmt, io}; use tokio::net::{TcpListener, TcpStream}; +use tokio::stream::StreamExt; use tokio_util::codec::{Decoder, Encoder, Framed}; #[tokio::main] diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index 6b3f84a0..dc30394f 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -9,12 +9,13 @@ #![warn(rust_2018_idioms)] use tokio::net::UdpSocket; +use tokio::stream::StreamExt; use tokio::{io, time}; use tokio_util::codec::BytesCodec; use tokio_util::udp::UdpFramed; use bytes::Bytes; -use futures::{FutureExt, SinkExt, StreamExt}; +use futures::{FutureExt, SinkExt}; use std::env; use std::error::Error; use std::net::SocketAddr; -- cgit v1.2.3