summaryrefslogtreecommitdiffstats
path: root/examples/connect.rs
diff options
context:
space:
mode:
Diffstat (limited to 'examples/connect.rs')
-rw-r--r--examples/connect.rs13
1 files changed, 8 insertions, 5 deletions
diff --git a/examples/connect.rs b/examples/connect.rs
index d51af88c..75640c62 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -55,19 +55,21 @@ async fn main() -> Result<(), Box<dyn Error>> {
mod tcp {
use super::codec;
- use futures::{future, Sink, SinkExt, Stream, StreamExt};
+ use futures::StreamExt;
+ use futures::{future, Sink, SinkExt};
use std::{error::Error, io, net::SocketAddr};
use tokio::net::TcpStream;
+ use tokio::stream::Stream;
use tokio_util::codec::{FramedRead, FramedWrite};
pub async fn connect(
addr: &SocketAddr,
- stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin,
+ mut stdin: impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin,
mut stdout: impl Sink<Vec<u8>, Error = io::Error> + Unpin,
) -> Result<(), Box<dyn Error>> {
let mut stream = TcpStream::connect(addr).await?;
let (r, w) = stream.split();
- let sink = FramedWrite::new(w, codec::Bytes);
+ let mut sink = FramedWrite::new(w, codec::Bytes);
let mut stream = FramedRead::new(r, codec::Bytes)
.filter_map(|i| match i {
Ok(i) => future::ready(Some(i)),
@@ -78,7 +80,7 @@ mod tcp {
})
.map(Ok);
- match future::join(stdin.forward(sink), stdout.send_all(&mut stream)).await {
+ match future::join(sink.send_all(&mut stdin), stdout.send_all(&mut stream)).await {
(Err(e), _) | (_, Err(e)) => Err(e.into()),
_ => Ok(()),
}
@@ -88,8 +90,9 @@ mod tcp {
mod udp {
use tokio::net::udp::{RecvHalf, SendHalf};
use tokio::net::UdpSocket;
+ use tokio::stream::{Stream, StreamExt};
- use futures::{future, Sink, SinkExt, Stream, StreamExt};
+ use futures::{future, Sink, SinkExt};
use std::error::Error;
use std::io;
use std::net::SocketAddr;