diff options
Diffstat (limited to 'examples/connect.rs')
-rw-r--r-- | examples/connect.rs | 13 |
1 files changed, 8 insertions, 5 deletions
diff --git a/examples/connect.rs b/examples/connect.rs index d51af88c..75640c62 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -55,19 +55,21 @@ async fn main() -> Result<(), Box<dyn Error>> { mod tcp { use super::codec; - use futures::{future, Sink, SinkExt, Stream, StreamExt}; + use futures::StreamExt; + use futures::{future, Sink, SinkExt}; use std::{error::Error, io, net::SocketAddr}; use tokio::net::TcpStream; + use tokio::stream::Stream; use tokio_util::codec::{FramedRead, FramedWrite}; pub async fn connect( addr: &SocketAddr, - stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin, + mut stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin, mut stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin, ) -> Result<(), Box<dyn Error>> { let mut stream = TcpStream::connect(addr).await?; let (r, w) = stream.split(); - let sink = FramedWrite::new(w, codec::Bytes); + let mut sink = FramedWrite::new(w, codec::Bytes); let mut stream = FramedRead::new(r, codec::Bytes) .filter_map(|i| match i { Ok(i) => future::ready(Some(i)), @@ -78,7 +80,7 @@ mod tcp { }) .map(Ok); - match future::join(stdin.forward(sink), stdout.send_all(&mut stream)).await { + match future::join(sink.send_all(&mut stdin), stdout.send_all(&mut stream)).await { (Err(e), _) | (_, Err(e)) => Err(e.into()), _ => Ok(()), } @@ -88,8 +90,9 @@ mod tcp { mod udp { use tokio::net::udp::{RecvHalf, SendHalf}; use tokio::net::UdpSocket; + use tokio::stream::{Stream, StreamExt}; - use futures::{future, Sink, SinkExt, Stream, StreamExt}; + use futures::{future, Sink, SinkExt}; use std::error::Error; use std::io; use std::net::SocketAddr; |