diff options
author | Artem Vorotnikov <artem@vorotnikov.me> | 2019-12-18 22:57:22 +0300 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2019-12-18 11:57:22 -0800 |
commit | 4c645866ef4ea5b0ef8c7852281a09b2f96d969b (patch) | |
tree | fe10e6fffea1033c595b920935dc723be3cc3ac4 /examples | |
parent | b0836ece7aa5219e9e40355d0eb784baffc7b6c6 (diff) |
stream: add `next` and `map` utility fn (#1962)
Introduces `StreamExt` trait. This trait will be used to add utility functions
to make working with streams easier. This patch includes two functions:
* `next`: a future returning the item in the stream.
* `map`: transform each item in the stream.
Diffstat (limited to 'examples')
-rw-r--r-- | examples/chat.rs | 7 | ||||
-rw-r--r-- | examples/connect.rs | 13 | ||||
-rw-r--r-- | examples/print_each_packet.rs | 2 | ||||
-rw-r--r-- | examples/tinydb.rs | 3 | ||||
-rw-r--r-- | examples/tinyhttp.rs | 3 | ||||
-rw-r--r-- | examples/udp-codec.rs | 3 |
6 files changed, 19 insertions, 12 deletions
diff --git a/examples/chat.rs b/examples/chat.rs index 91589072..e1da5f32 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -27,10 +27,11 @@ #![warn(rust_2018_idioms)] use tokio::net::{TcpListener, TcpStream}; +use tokio::stream::{Stream, StreamExt}; use tokio::sync::{mpsc, Mutex}; use tokio_util::codec::{Framed, LinesCodec, LinesCodecError}; -use futures::{SinkExt, Stream, StreamExt}; +use futures::SinkExt; use std::collections::HashMap; use std::env; use std::error::Error; @@ -163,12 +164,12 @@ impl Stream for Peer { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { // First poll the `UnboundedReceiver`. - if let Poll::Ready(Some(v)) = self.rx.poll_next_unpin(cx) { + if let Poll::Ready(Some(v)) = Pin::new(&mut self.rx).poll_next(cx) { return Poll::Ready(Some(Ok(Message::Received(v)))); } // Secondly poll the `Framed` stream. - let result: Option<_> = futures::ready!(self.lines.poll_next_unpin(cx)); + let result: Option<_> = futures::ready!(Pin::new(&mut self.lines).poll_next(cx)); Poll::Ready(match result { // We've received a message we should broadcast to others. diff --git a/examples/connect.rs b/examples/connect.rs index d51af88c..75640c62 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -55,19 +55,21 @@ async fn main() -> Result<(), Box<dyn Error>> { mod tcp { use super::codec; - use futures::{future, Sink, SinkExt, Stream, StreamExt}; + use futures::StreamExt; + use futures::{future, Sink, SinkExt}; use std::{error::Error, io, net::SocketAddr}; use tokio::net::TcpStream; + use tokio::stream::Stream; use tokio_util::codec::{FramedRead, FramedWrite}; pub async fn connect( addr: &SocketAddr, - stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin, + mut stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin, mut stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin, ) -> Result<(), Box<dyn Error>> { let mut stream = TcpStream::connect(addr).await?; let (r, w) = stream.split(); - let sink = FramedWrite::new(w, codec::Bytes); + let mut sink = FramedWrite::new(w, codec::Bytes); let mut stream = FramedRead::new(r, codec::Bytes) .filter_map(|i| match i { Ok(i) => future::ready(Some(i)), @@ -78,7 +80,7 @@ mod tcp { }) .map(Ok); - match future::join(stdin.forward(sink), stdout.send_all(&mut stream)).await { + match future::join(sink.send_all(&mut stdin), stdout.send_all(&mut stream)).await { (Err(e), _) | (_, Err(e)) => Err(e.into()), _ => Ok(()), } @@ -88,8 +90,9 @@ mod tcp { mod udp { use tokio::net::udp::{RecvHalf, SendHalf}; use tokio::net::UdpSocket; + use tokio::stream::{Stream, StreamExt}; - use futures::{future, Sink, SinkExt, Stream, StreamExt}; + use futures::{future, Sink, SinkExt}; use std::error::Error; use std::io; use std::net::SocketAddr; diff --git a/examples/print_each_packet.rs b/examples/print_each_packet.rs index 4604139b..d650b5bd 100644 --- a/examples/print_each_packet.rs +++ b/examples/print_each_packet.rs @@ -55,9 +55,9 @@ #![warn(rust_2018_idioms)] use tokio::net::TcpListener; +use tokio::stream::StreamExt; use tokio_util::codec::{BytesCodec, Decoder}; -use futures::StreamExt; use std::env; #[tokio::main] diff --git a/examples/tinydb.rs b/examples/tinydb.rs index cf867a0a..7c71dedf 100644 --- a/examples/tinydb.rs +++ b/examples/tinydb.rs @@ -42,9 +42,10 @@ #![warn(rust_2018_idioms)] use tokio::net::TcpListener; +use tokio::stream::StreamExt; use tokio_util::codec::{Framed, LinesCodec}; -use futures::{SinkExt, StreamExt}; +use futures::SinkExt; use std::collections::HashMap; use std::env; use std::error::Error; diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs index 5ddf0d48..9ac2806e 100644 --- a/examples/tinyhttp.rs +++ b/examples/tinyhttp.rs @@ -14,13 +14,14 @@ #![warn(rust_2018_idioms)] use bytes::BytesMut; -use futures::{SinkExt, StreamExt}; +use futures::SinkExt; use http::{header::HeaderValue, Request, Response, StatusCode}; #[macro_use] extern crate serde_derive; use serde_json; use std::{env, error::Error, fmt, io}; use tokio::net::{TcpListener, TcpStream}; +use tokio::stream::StreamExt; use tokio_util::codec::{Decoder, Encoder, Framed}; #[tokio::main] diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index 6b3f84a0..dc30394f 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -9,12 +9,13 @@ #![warn(rust_2018_idioms)] use tokio::net::UdpSocket; +use tokio::stream::StreamExt; use tokio::{io, time}; use tokio_util::codec::BytesCodec; use tokio_util::udp::UdpFramed; use bytes::Bytes; -use futures::{FutureExt, SinkExt, StreamExt}; +use futures::{FutureExt, SinkExt}; use std::env; use std::error::Error; use std::net::SocketAddr; |