From 8a7e57786a5dca139f5b4261685e22991ded0859 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 15 Nov 2019 22:11:13 -0800 Subject: Limit `futures` dependency to `Stream` via feature flag (#1774) In an effort to reach API stability, the `tokio` crate is shedding its _public_ dependencies on crates that are either a) do not provide a stable (1.0+) release with longevity guarantees or b) match the `tokio` release cadence. Of course, implementing `std` traits fits the requirements. The on exception, for now, is the `Stream` trait found in `futures_core`. It is expected that this trait will not change much and be moved into `std. Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain a dependency on this trait given how foundational it is. Since the `Stream` implementation is optional, types that are logically streams provide `async fn next_*` functions to obtain the next value. Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`. Additionally, some misc cleanup is also done: - `tokio::io::io` -> `tokio::io::util`. - `delay` -> `delay_until`. - `Timeout::new` -> `timeout(...)`. - `signal::ctrl_c()` returns a future instead of a stream. - `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait). - `time::Throttle` is removed (due to lack of `Stream` trait). - Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns). --- examples/chat.rs | 16 +++++----------- examples/connect.rs | 10 ++++++---- examples/print_each_packet.rs | 2 +- examples/proxy.rs | 6 +++--- examples/udp-codec.rs | 5 ++--- 5 files changed, 17 insertions(+), 22 deletions(-) (limited to 'examples') diff --git a/examples/chat.rs b/examples/chat.rs index e0213afd..2553cc5e 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -116,18 +116,12 @@ impl Shared { /// Send a `LineCodec` encoded message to every peer, except /// for the sender. - async fn broadcast( - &mut self, - sender: SocketAddr, - message: &str, - ) -> Result<(), mpsc::error::UnboundedSendError> { + async fn broadcast(&mut self, sender: SocketAddr, message: &str) { for peer in self.peers.iter_mut() { if *peer.0 != sender { - peer.1.send(message.into()).await?; + let _ = peer.1.send(message.into()); } } - - Ok(()) } } @@ -218,7 +212,7 @@ async fn process( let mut state = state.lock().await; let msg = format!("{} has joined the chat", username); println!("{}", msg); - state.broadcast(addr, &msg).await?; + state.broadcast(addr, &msg).await; } // Process incoming messages until our stream is exhausted by a disconnect. @@ -230,7 +224,7 @@ async fn process( let mut state = state.lock().await; let msg = format!("{}: {}", username, msg); - state.broadcast(addr, &msg).await?; + state.broadcast(addr, &msg).await; } // A message was received from a peer. Send it to the // current user. @@ -254,7 +248,7 @@ async fn process( let msg = format!("{} has left the chat", username); println!("{}", msg); - state.broadcast(addr, &msg).await?; + state.broadcast(addr, &msg).await; } Ok(()) diff --git a/examples/connect.rs b/examples/connect.rs index 38d81229..cb003d9d 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -20,7 +20,7 @@ use tokio::io; use tokio::sync::{mpsc, oneshot}; use tokio_util::codec::{FramedRead, FramedWrite}; -use futures::{SinkExt, Stream, StreamExt}; +use futures::{Stream, StreamExt}; use std::env; use std::error::Error; use std::net::SocketAddr; @@ -69,12 +69,14 @@ async fn run() -> Result<(), Box> { // Temporary work around for stdin blocking the stream fn stdin() -> impl Stream, io::Error>> + Unpin { - let mut stdin = FramedRead::new(io::stdin(), codec::Bytes).map(Ok); + let mut stdin = FramedRead::new(io::stdin(), codec::Bytes); - let (mut tx, rx) = mpsc::unbounded_channel(); + let (tx, rx) = mpsc::unbounded_channel(); tokio::spawn(async move { - tx.send_all(&mut stdin).await.unwrap(); + while let Some(res) = stdin.next().await { + let _ = tx.send(res); + } }); rx diff --git a/examples/print_each_packet.rs b/examples/print_each_packet.rs index 0a275545..f056db4a 100644 --- a/examples/print_each_packet.rs +++ b/examples/print_each_packet.rs @@ -55,9 +55,9 @@ #![warn(rust_2018_idioms)] use tokio::net::TcpListener; -use tokio::prelude::*; use tokio_util::codec::{BytesCodec, Decoder}; +use futures::StreamExt; use std::env; #[tokio::main] diff --git a/examples/proxy.rs b/examples/proxy.rs index 6886a813..4314d1b9 100644 --- a/examples/proxy.rs +++ b/examples/proxy.rs @@ -22,7 +22,7 @@ #![warn(rust_2018_idioms)] -use futures::{future::try_join, FutureExt, StreamExt}; +use futures::{future::try_join, FutureExt}; use std::{env, error::Error}; use tokio::{ io::AsyncReadExt, @@ -37,9 +37,9 @@ async fn main() -> Result<(), Box> { println!("Listening on: {}", listen_addr); println!("Proxying to: {}", server_addr); - let mut incoming = TcpListener::bind(listen_addr).await?.incoming(); + let mut listener = TcpListener::bind(listen_addr).await?; - while let Some(Ok(inbound)) = incoming.next().await { + while let Ok((inbound, _)) = listener.accept().await { let transfer = transfer(inbound, server_addr.clone()).map(|r| { if let Err(e) = r { println!("Failed to transfer; error={}", e); diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index baf64886..0c9dbf76 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -8,9 +8,8 @@ #![warn(rust_2018_idioms)] -use tokio::future::FutureExt as TokioFutureExt; -use tokio::io; use tokio::net::UdpSocket; +use tokio::{io, time}; use tokio_util::codec::BytesCodec; use tokio_util::udp::UdpFramed; @@ -68,7 +67,7 @@ async fn ping(socket: &mut UdpFramed, b_addr: SocketAddr) -> Result< async fn pong(socket: &mut UdpFramed) -> Result<(), io::Error> { let timeout = Duration::from_millis(200); - while let Ok(Some(Ok((bytes, addr)))) = socket.next().timeout(timeout).await { + while let Ok(Some(Ok((bytes, addr)))) = time::timeout(timeout, socket.next()).await { println!("[b] recv: {}", String::from_utf8_lossy(&bytes)); socket.send((Bytes::from(&b"PONG"[..]), addr)).await?; -- cgit v1.2.3