summaryrefslogtreecommitdiffstats
path: root/examples
diff options
context:
space:
mode:
authorArtem Vorotnikov <artem@vorotnikov.me>2019-12-18 22:57:22 +0300
committerCarl Lerche <me@carllerche.com>2019-12-18 11:57:22 -0800
commit4c645866ef4ea5b0ef8c7852281a09b2f96d969b (patch)
treefe10e6fffea1033c595b920935dc723be3cc3ac4 /examples
parentb0836ece7aa5219e9e40355d0eb784baffc7b6c6 (diff)
stream: add `next` and `map` utility fn (#1962)
Introduces `StreamExt` trait. This trait will be used to add utility functions to make working with streams easier. This patch includes two functions: * `next`: a future returning the item in the stream. * `map`: transform each item in the stream.
Diffstat (limited to 'examples')
-rw-r--r--examples/chat.rs7
-rw-r--r--examples/connect.rs13
-rw-r--r--examples/print_each_packet.rs2
-rw-r--r--examples/tinydb.rs3
-rw-r--r--examples/tinyhttp.rs3
-rw-r--r--examples/udp-codec.rs3
6 files changed, 19 insertions, 12 deletions
diff --git a/examples/chat.rs b/examples/chat.rs
index 91589072..e1da5f32 100644
--- a/examples/chat.rs
+++ b/examples/chat.rs
@@ -27,10 +27,11 @@
#![warn(rust_2018_idioms)]
use tokio::net::{TcpListener, TcpStream};
+use tokio::stream::{Stream, StreamExt};
use tokio::sync::{mpsc, Mutex};
use tokio_util::codec::{Framed, LinesCodec, LinesCodecError};
-use futures::{SinkExt, Stream, StreamExt};
+use futures::SinkExt;
use std::collections::HashMap;
use std::env;
use std::error::Error;
@@ -163,12 +164,12 @@ impl Stream for Peer {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// First poll the `UnboundedReceiver`.
- if let Poll::Ready(Some(v)) = self.rx.poll_next_unpin(cx) {
+ if let Poll::Ready(Some(v)) = Pin::new(&mut self.rx).poll_next(cx) {
return Poll::Ready(Some(Ok(Message::Received(v))));
}
// Secondly poll the `Framed` stream.
- let result: Option<_> = futures::ready!(self.lines.poll_next_unpin(cx));
+ let result: Option<_> = futures::ready!(Pin::new(&mut self.lines).poll_next(cx));
Poll::Ready(match result {
// We've received a message we should broadcast to others.
diff --git a/examples/connect.rs b/examples/connect.rs
index d51af88c..75640c62 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -55,19 +55,21 @@ async fn main() -> Result<(), Box<dyn Error>> {
mod tcp {
use super::codec;
- use futures::{future, Sink, SinkExt, Stream, StreamExt};
+ use futures::StreamExt;
+ use futures::{future, Sink, SinkExt};
use std::{error::Error, io, net::SocketAddr};
use tokio::net::TcpStream;
+ use tokio::stream::Stream;
use tokio_util::codec::{FramedRead, FramedWrite};
pub async fn connect(
addr: &SocketAddr,
- stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin,
+ mut stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin,
mut stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin,
) -> Result<(), Box<dyn Error>> {
let mut stream = TcpStream::connect(addr).await?;
let (r, w) = stream.split();
- let sink = FramedWrite::new(w, codec::Bytes);
+ let mut sink = FramedWrite::new(w, codec::Bytes);
let mut stream = FramedRead::new(r, codec::Bytes)
.filter_map(|i| match i {
Ok(i) => future::ready(Some(i)),
@@ -78,7 +80,7 @@ mod tcp {
})
.map(Ok);
- match future::join(stdin.forward(sink), stdout.send_all(&mut stream)).await {
+ match future::join(sink.send_all(&mut stdin), stdout.send_all(&mut stream)).await {
(Err(e), _) | (_, Err(e)) => Err(e.into()),
_ => Ok(()),
}
@@ -88,8 +90,9 @@ mod tcp {
mod udp {
use tokio::net::udp::{RecvHalf, SendHalf};
use tokio::net::UdpSocket;
+ use tokio::stream::{Stream, StreamExt};
- use futures::{future, Sink, SinkExt, Stream, StreamExt};
+ use futures::{future, Sink, SinkExt};
use std::error::Error;
use std::io;
use std::net::SocketAddr;
diff --git a/examples/print_each_packet.rs b/examples/print_each_packet.rs
index 4604139b..d650b5bd 100644
--- a/examples/print_each_packet.rs
+++ b/examples/print_each_packet.rs
@@ -55,9 +55,9 @@
#![warn(rust_2018_idioms)]
use tokio::net::TcpListener;
+use tokio::stream::StreamExt;
use tokio_util::codec::{BytesCodec, Decoder};
-use futures::StreamExt;
use std::env;
#[tokio::main]
diff --git a/examples/tinydb.rs b/examples/tinydb.rs
index cf867a0a..7c71dedf 100644
--- a/examples/tinydb.rs
+++ b/examples/tinydb.rs
@@ -42,9 +42,10 @@
#![warn(rust_2018_idioms)]
use tokio::net::TcpListener;
+use tokio::stream::StreamExt;
use tokio_util::codec::{Framed, LinesCodec};
-use futures::{SinkExt, StreamExt};
+use futures::SinkExt;
use std::collections::HashMap;
use std::env;
use std::error::Error;
diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs
index 5ddf0d48..9ac2806e 100644
--- a/examples/tinyhttp.rs
+++ b/examples/tinyhttp.rs
@@ -14,13 +14,14 @@
#![warn(rust_2018_idioms)]
use bytes::BytesMut;
-use futures::{SinkExt, StreamExt};
+use futures::SinkExt;
use http::{header::HeaderValue, Request, Response, StatusCode};
#[macro_use]
extern crate serde_derive;
use serde_json;
use std::{env, error::Error, fmt, io};
use tokio::net::{TcpListener, TcpStream};
+use tokio::stream::StreamExt;
use tokio_util::codec::{Decoder, Encoder, Framed};
#[tokio::main]
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs
index 6b3f84a0..dc30394f 100644
--- a/examples/udp-codec.rs
+++ b/examples/udp-codec.rs
@@ -9,12 +9,13 @@
#![warn(rust_2018_idioms)]
use tokio::net::UdpSocket;
+use tokio::stream::StreamExt;
use tokio::{io, time};
use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;
use bytes::Bytes;
-use futures::{FutureExt, SinkExt, StreamExt};
+use futures::{FutureExt, SinkExt};
use std::env;
use std::error::Error;
use std::net::SocketAddr;