summaryrefslogtreecommitdiffstats
path: root/tokio-net
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-22 10:13:49 -0700
committerGitHub <noreply@github.com>2019-10-22 10:13:49 -0700
commitcfc15617a5247ea780c32c85b7134b88b6de5845 (patch)
treeef0a46c61c51505a60f386c9760acac9d1f9b7b1 /tokio-net
parentb8cee1a60ad99ef28ec494ae4230e2ef4399fcf9 (diff)
codec: move into tokio-util (#1675)
Related to #1318, Tokio APIs that are "less stable" are moved into a new `tokio-util` crate. This crate will mirror `tokio` and provide additional APIs that may require a greater rate of breaking changes. As examples require `tokio-util`, they are moved into a separate crate (`examples`). This has the added advantage of being able to avoid example only dependencies in the `tokio` crate.
Diffstat (limited to 'tokio-net')
-rw-r--r--tokio-net/Cargo.toml1
-rw-r--r--tokio-net/src/process/mod.rs9
-rw-r--r--tokio-net/src/udp/frame.rs189
-rw-r--r--tokio-net/src/udp/mod.rs2
-rw-r--r--tokio-net/src/udp/socket.rs29
-rw-r--r--tokio-net/src/udp/split.rs8
-rw-r--r--tokio-net/src/uds/frame.rs175
-rw-r--r--tokio-net/src/uds/mod.rs17
-rw-r--r--tokio-net/tests/process_stdio.rs13
-rw-r--r--tokio-net/tests/udp.rs76
10 files changed, 39 insertions, 480 deletions
diff --git a/tokio-net/Cargo.toml b/tokio-net/Cargo.toml
index d7df3397..f19fd759 100644
--- a/tokio-net/Cargo.toml
+++ b/tokio-net/Cargo.toml
@@ -62,7 +62,6 @@ uds = [
log = ["tracing/log"]
[dependencies]
-tokio-codec = { version = "=0.2.0-alpha.6", path = "../tokio-codec" }
tokio-executor = { version = "=0.2.0-alpha.6", features = ["blocking"], path = "../tokio-executor" }
tokio-io = { version = "=0.2.0-alpha.6", path = "../tokio-io" }
tokio-sync = { version = "=0.2.0-alpha.6", path = "../tokio-sync" }
diff --git a/tokio-net/src/process/mod.rs b/tokio-net/src/process/mod.rs
index 140daa5d..fbe905ad 100644
--- a/tokio-net/src/process/mod.rs
+++ b/tokio-net/src/process/mod.rs
@@ -55,10 +55,11 @@
//! We can also read input line by line.
//!
//! ```no_run
+//! use tokio::io::{BufReader, AsyncBufReadExt};
+//! use tokio::process::Command;
+//!
//! use futures_util::stream::StreamExt;
-//! use std::process::{Stdio};
-//! use tokio::codec::{FramedRead, LinesCodec};
-//! use tokio_net::process::Command;
+//! use std::process::Stdio;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -77,7 +78,7 @@
//! let stdout = child.stdout().take()
//! .expect("child did not have a handle to stdout");
//!
-//! let mut reader = FramedRead::new(stdout, LinesCodec::new());
+//! let mut reader = BufReader::new(stdout).lines();
//!
//! // Ensure the child process is spawned in the runtime so it can
//! // make progress on its own while we await for any output.
diff --git a/tokio-net/src/udp/frame.rs b/tokio-net/src/udp/frame.rs
deleted file mode 100644
index d694e2fe..00000000
--- a/tokio-net/src/udp/frame.rs
+++ /dev/null
@@ -1,189 +0,0 @@
-use super::UdpSocket;
-
-use tokio_codec::{Decoder, Encoder};
-
-use bytes::{BufMut, BytesMut};
-use core::task::{Context, Poll};
-use futures_core::{ready, Stream};
-use futures_sink::Sink;
-use std::io;
-use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
-use std::pin::Pin;
-
-/// A unified `Stream` and `Sink` interface to an underlying `UdpSocket`, using
-/// the `Encoder` and `Decoder` traits to encode and decode frames.
-///
-/// Raw UDP sockets work with datagrams, but higher-level code usually wants to
-/// batch these into meaningful chunks, called "frames". This method layers
-/// framing on top of this socket by using the `Encoder` and `Decoder` traits to
-/// handle encoding and decoding of messages frames. Note that the incoming and
-/// outgoing frame types may be distinct.
-///
-/// This function returns a *single* object that is both `Stream` and `Sink`;
-/// grouping this into a single object is often useful for layering things which
-/// require both read and write access to the underlying object.
-///
-/// If you want to work more directly with the streams and sink, consider
-/// calling `split` on the `UdpFramed` returned by this method, which will break
-/// them into separate objects, allowing them to interact more easily.
-#[must_use = "sinks do nothing unless polled"]
-#[derive(Debug)]
-pub struct UdpFramed<C> {
- socket: UdpSocket,
- codec: C,
- rd: BytesMut,
- wr: BytesMut,
- out_addr: SocketAddr,
- flushed: bool,
-}
-
-impl<C: Decoder + Unpin> Stream for UdpFramed<C> {
- type Item = Result<(C::Item, SocketAddr), C::Error>;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let pin = self.get_mut();
-
- pin.rd.reserve(INITIAL_RD_CAPACITY);
-
- let (_n, addr) = unsafe {
- // Read into the buffer without having to initialize the memory.
- let res = ready!(Pin::new(&mut pin.socket).poll_recv_from_priv(cx, pin.rd.bytes_mut()));
- let (n, addr) = res?;
- pin.rd.advance_mut(n);
- (n, addr)
- };
-
- let span = trace_span!("decoding", from.addr = %addr, dgram.length = _n);
- let _e = span.enter();
- trace!("trying to decode a frame...");
-
- let frame_res = pin.codec.decode(&mut pin.rd);
- pin.rd.clear();
- let frame = frame_res?;
- let result = frame.map(|frame| Ok((frame, addr))); // frame -> (frame, addr)
-
- trace!("frame decoded from buffer");
- Poll::Ready(result)
- }
-}
-
-impl<C: Encoder + Unpin> Sink<(C::Item, SocketAddr)> for UdpFramed<C> {
- type Error = C::Error;
-
- fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- if !self.flushed {
- match self.poll_flush(cx)? {
- Poll::Ready(()) => {}
- Poll::Pending => return Poll::Pending,
- }
- }
-
- Poll::Ready(Ok(()))
- }
-
- fn start_send(self: Pin<&mut Self>, item: (C::Item, SocketAddr)) -> Result<(), Self::Error> {
- let (frame, out_addr) = item;
-
- let span = trace_span!("sending", to.addr = %out_addr);
- let _e = span.enter();
- trace!("encoding frame...");
-
- let pin = self.get_mut();
-
- pin.codec.encode(frame, &mut pin.wr)?;
- pin.out_addr = out_addr;
- pin.flushed = false;
- trace!(message = "frame encoded", frame.length = pin.wr.len());
-
- Ok(())
- }
-
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- if self.flushed {
- return Poll::Ready(Ok(()));
- }
-
- let Self {
- ref mut socket,
- ref mut out_addr,
- ref mut wr,
- ..
- } = *self;
-
- let span = trace_span!("flushing", to.addr = %out_addr, frame.length = wr.len());
- let _e = span.enter();
- trace!("flushing frame...");
-
- let n = ready!(socket.poll_send_to_priv(cx, &wr, &out_addr))?;
-
- let wrote_all = n == self.wr.len();
- self.wr.clear();
- self.flushed = true;
-
- trace!(written.length = n, written.complete = wrote_all);
-
- let res = if wrote_all {
- Ok(())
- } else {
- Err(io::Error::new(
- io::ErrorKind::Other,
- "failed to write entire datagram to socket",
- )
- .into())
- };
-
- Poll::Ready(res)
- }
-
- fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- ready!(self.poll_flush(cx))?;
- Poll::Ready(Ok(()))
- }
-}
-
-const INITIAL_RD_CAPACITY: usize = 64 * 1024;
-const INITIAL_WR_CAPACITY: usize = 8 * 1024;
-
-impl<C> UdpFramed<C> {
- /// Create a new `UdpFramed` backed by the given socket and codec.
- ///
- /// See struct level documentation for more details.
- pub fn new(socket: UdpSocket, codec: C) -> UdpFramed<C> {
- UdpFramed {
- socket,
- codec,
- out_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)),
- rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY),
- wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY),
- flushed: true,
- }
- }
-
- /// Returns a reference to the underlying I/O stream wrapped by `Framed`.
- ///
- /// # Note
- ///
- /// Care should be taken to not tamper with the underlying stream of data
- /// coming in as it may corrupt the stream of frames otherwise being worked
- /// with.
- pub fn get_ref(&self) -> &UdpSocket {
- &self.socket
- }
-
- /// Returns a mutable reference to the underlying I/O stream wrapped by
- /// `Framed`.
- ///
- /// # Note
- ///
- /// Care should be taken to not tamper with the underlying stream of data
- /// coming in as it may corrupt the stream of frames otherwise being worked
- /// with.
- pub fn get_mut(&mut self) -> &mut UdpSocket {
- &mut self.socket
- }
-
- /// Consumes the `Framed`, returning its underlying I/O stream.
- pub fn into_inner(self) -> UdpSocket {
- self.socket
- }
-}
diff --git a/tokio-net/src/udp/mod.rs b/tokio-net/src/udp/mod.rs
index e5c06585..45656773 100644
--- a/tokio-net/src/udp/mod.rs
+++ b/tokio-net/src/udp/mod.rs
@@ -7,9 +7,7 @@
//!
//! [`UdpSocket`]: struct.UdpSocket
-mod frame;
mod socket;
pub mod split;
-pub use self::frame::UdpFramed;
pub use self::socket::UdpSocket;
diff --git a/tokio-net/src/udp/socket.rs b/tokio-net/src/udp/socket.rs
index c59b564b..90f212a2 100644
--- a/tokio-net/src/udp/socket.rs
+++ b/tokio-net/src/udp/socket.rs
@@ -108,7 +108,7 @@ impl UdpSocket {
///
/// [`connect`]: #method.connect
pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
- poll_fn(|cx| self.poll_send_priv(cx, buf)).await
+ poll_fn(|cx| self.poll_send(cx, buf)).await
}
// Poll IO functions that takes `&self` are provided for the split API.
@@ -121,11 +121,8 @@ impl UdpSocket {
// While violating this requirement is "safe" from a Rust memory model point
// of view, it will result in unexpected behavior in the form of lost
// notifications and tasks hanging.
- pub(crate) fn poll_send_priv(
- &self,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
+ #[doc(hidden)]
+ pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
ready!(self.io.poll_write_ready(cx))?;
match self.io.get_ref().send(buf) {
@@ -150,14 +147,11 @@ impl UdpSocket {
///
/// [`connect`]: #method.connect
pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- poll_fn(|cx| self.poll_recv_priv(cx, buf)).await
+ poll_fn(|cx| self.poll_recv(cx, buf)).await
}
- pub(crate) fn poll_recv_priv(
- &self,
- cx: &mut Context<'_>,
- buf: &mut [u8],
- ) -> Poll<io::Result<usize>> {
+ #[doc(hidden)]
+ pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
match self.io.get_ref().recv(buf) {
@@ -178,7 +172,7 @@ impl UdpSocket {
let mut addrs = target.to_socket_addrs().await?;
match addrs.next() {
- Some(target) => poll_fn(|cx| self.poll_send_to_priv(cx, buf, &target)).await,
+ Some(target) => poll_fn(|cx| self.poll_send_to(cx, buf, &target)).await,
None => Err(io::Error::new(
io::ErrorKind::InvalidInput,
"no addresses to send data to",
@@ -186,7 +180,9 @@ impl UdpSocket {
}
}
- pub(crate) fn poll_send_to_priv(
+ // TODO: Public or not?
+ #[doc(hidden)]
+ pub fn poll_send_to(
&self,
cx: &mut Context<'_>,
buf: &[u8],
@@ -210,10 +206,11 @@ impl UdpSocket {
/// to hold the message bytes. If a message is too long to fit in the supplied
/// buffer, excess bytes may be discarded.
pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
- poll_fn(|cx| self.poll_recv_from_priv(cx, buf)).await
+ poll_fn(|cx| self.poll_recv_from(cx, buf)).await
}
- pub(crate) fn poll_recv_from_priv(
+ #[doc(hidden)]
+ pub fn poll_recv_from(
&self,
cx: &mut Context<'_>,
buf: &mut [u8],
diff --git a/tokio-net/src/udp/split.rs b/tokio-net/src/udp/split.rs
index e58f9276..ad8ce061 100644
--- a/tokio-net/src/udp/split.rs
+++ b/tokio-net/src/udp/split.rs
@@ -86,7 +86,7 @@ impl UdpSocketRecvHalf {
/// to hold the message bytes. If a message is too long to fit in the supplied
/// buffer, excess bytes may be discarded.
pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
- poll_fn(|cx| self.0.poll_recv_from_priv(cx, buf)).await
+ poll_fn(|cx| self.0.poll_recv_from(cx, buf)).await
}
/// Returns a future that receives a single datagram message on the socket from
@@ -102,7 +102,7 @@ impl UdpSocketRecvHalf {
///
/// [`connect`]: super::UdpSocket::connect
pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- poll_fn(|cx| self.0.poll_recv_priv(cx, buf)).await
+ poll_fn(|cx| self.0.poll_recv(cx, buf)).await
}
}
@@ -120,7 +120,7 @@ impl UdpSocketSendHalf {
/// The future will resolve to an error if the IP version of the socket does
/// not match that of `target`.
pub async fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
- poll_fn(|cx| self.0.poll_send_to_priv(cx, buf, target)).await
+ poll_fn(|cx| self.0.poll_send_to(cx, buf, target)).await
}
/// Returns a future that sends data on the socket to the remote address to which it is connected.
@@ -131,7 +131,7 @@ impl UdpSocketSendHalf {
///
/// [`connect`]: super::UdpSocket::connect
pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
- poll_fn(|cx| self.0.poll_send_priv(cx, buf)).await
+ poll_fn(|cx| self.0.poll_send(cx, buf)).await
}
}
diff --git a/tokio-net/src/uds/frame.rs b/tokio-net/src/uds/frame.rs
deleted file mode 100644
index 584da8ae..00000000
--- a/tokio-net/src/uds/frame.rs
+++ /dev/null
@@ -1,175 +0,0 @@
-use super::UnixDatagram;
-use bytes::{BufMut, BytesMut};
-use futures::{try_ready, Async, AsyncSink, Poll, Sink, StartSend, Stream};
-use std::io;
-use std::os::unix::net::SocketAddr;
-use std::path::Path;
-use tokio_codec::{Decoder, Encoder};
-
-/// A unified `Stream` and `Sink` interface to an underlying `UnixDatagram`, using
-/// the `Encoder` and `Decoder` traits to encode and decode frames.
-///
-/// Unix datagram sockets work with datagrams, but higher-level code may wants to
-/// batch these into meaningful chunks, called "frames". This method layers
-/// framing on top of this socket by using the `Encoder` and `Decoder` traits to
-/// handle encoding and decoding of messages frames. Note that the incoming and
-/// outgoing frame types may be distinct.
-///
-/// This function returns a *single* object that is both `Stream` and `Sink`;
-/// grouping this into a single object is often useful for layering things which
-/// require both read and write access to the underlying object.
-///
-/// If you want to work more directly with the streams and sink, consider
-/// calling `split` on the `UnixDatagramFramed` returned by this method, which will break
-/// them into separate objects, allowing them to interact more easily.
-#[must_use = "sinks do nothing unless polled"]
-#[derive(Debug)]
-pub struct UnixDatagramFramed<A, C> {
- socket: UnixDatagram,
- codec: C,
- rd: BytesMut,
- wr: BytesMut,
- out_addr: Option<A>,
- flushed: bool,
-}
-
-impl<A, C: Decoder> Stream for UnixDatagramFramed<A, C> {
- type Item = (C::Item, SocketAddr);
- type Error = C::Error;
-
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- self.rd.reserve(INITIAL_RD_CAPACITY);
-
- let (_n, addr) = unsafe {
- let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut()));
- self.rd.advance_mut(n);
- (n, addr)
- };
-
- let span = trace_span!("decoding", from.addr = %addr, dgram.length = _n);
- let _e = span.enter();
- trace!("trying to decode a frame...");
-
- let frame_res = self.codec.decode(&mut self.rd);
- self.rd.clear();
- let frame = frame_res?;
- let result = frame.map(|frame| (frame, addr));
- trace!("frame decoded from buffer");
- Ok(Async::Ready(result))
- }
-}
-
-impl<A: AsRef<Path>, C: Encoder> Sink for UnixDatagramFramed<A, C> {
- type SinkItem = (C::Item, A);
- type SinkError = C::Error;
-
- fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
- let span = trace_span!("sending", to.addr = %item.0, flushed = self.flushed);
- let _e = span.enter();
-
- trace!("sending frame...");
-
- if !self.flushed {
- match self.poll_complete()? {
- Async::Ready(()) => {}
- Async::NotReady => return Ok(AsyncSink::NotReady(item)),
- }
- }
-
- let (frame, out_addr) = item;
- self.codec.encode(frame, &mut self.wr)?;
- self.out_addr = Some(out_addr);
- self.flushed = false;
- trace!(message = "frame encoded", frame.length = pin.wr.len());
-
- Ok(AsyncSink::Ready)
- }
-
- fn poll_complete(&mut self) -> Poll<(), C::Error> {
- if self.flushed {
- return Ok(Async::Ready(()));
- }
-
- let span = trace_span!("flushing", to.addr = %self.out_addr);
- let _e = span.enter();
-
- let n = {
- let out_path = match self.out_addr {
- Some(ref out_path) => out_path.as_ref(),
- None => {
- return Err(io::Error::new(
- io::ErrorKind::Other,
- "internal error: addr not available while data not flushed",
- )
- .into());
- }
- };
-
- trace!(message = "flushing frame", frame.length = self.wr.len());
- try_ready!(self.socket.poll_send_to(&self.wr, out_path))
- };
-
- let wrote_all = n == self.wr.len();
- self.wr.clear();
- self.flushed = true;
-
- trace!(written.length = n, written.complete = wrote_all);
-
- if wrote_all {
- self.out_addr = None;
- Ok(Async::Ready(()))
- } else {
- Err(io::Error::new(
- io::ErrorKind::Other,
- "failed to write entire datagram to socket",
- )
- .into())
- }
- }
-
- fn close(&mut self) -> Poll<(), C::Error> {
- self.poll_complete()
- }
-}
-
-const INITIAL_RD_CAPACITY: usize = 64 * 1024;
-const INITIAL_WR_CAPACITY: usize = 8 * 1024;
-
-impl<A, C> UnixDatagramFramed<A, C> {
- /// Create a new `UnixDatagramFramed` backed by the given socket and codec.
- ///
- /// See struct level documentation for more details.
- pub fn new(socket: UnixDatagram, codec: C) -> UnixDatagramFramed<A, C> {
- UnixDatagramFramed {
- socket: socket,
- codec: codec,
- out_addr: None,
- rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY),
- wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY),
- flushed: true,
- }
- }
-
- /// Returns a reference to the underlying I/O stream wrapped by `Framed`.
- ///
- /// # Note
- ///
- /// Care should be taken to not tamper with the underlying stream of data
- /// coming in as it may corrupt the stream of frames otherwise being worked
- /// with.
- pub fn get_ref(&self) -> &UnixDatagram {
- &self.socket
- }
-
- /// Returns a mutable reference to the underlying I/O stream wrapped by
- /// `Framed`.
- ///
- /// # Note
- ///
- /// Care should be taken to not tamper with the underlying stream of data
- /// coming in as it may corrupt the stream of frames otherwise being worked
- /// with.
- pub fn get_mut(&mut self) -> &mut UnixDatagram {
- &mut self.socket
- }
-}
diff --git a/tokio-net/src/uds/mod.rs b/tokio-net/src/uds/mod.rs
index 3b684992..4447ca5c 100644
--- a/tokio-net/src/uds/mod.rs
+++ b/tokio-net/src/uds/mod.rs
@@ -3,16 +3,19 @@
//! This crate provides APIs for using Unix Domain Sockets with Tokio.
mod datagram;
-// mod frame;
-mod incoming;
-mod listener;
-pub mod split;
-mod stream;
-mod ucred;
-
pub use self::datagram::UnixDatagram;
+
+mod incoming;
#[cfg(feature = "async-traits")]
pub use self::incoming::Incoming;
+
+mod listener;
pub use self::listener::UnixListener;
+
+pub mod split;
+
+mod stream;
pub use self::stream::UnixStream;
+
+mod ucred;
pub use self::ucred::UCred;
diff --git a/tokio-net/tests/process_stdio.rs b/tokio-net/tests/process_stdio.rs
index b7f7c2af..e98162d4 100644
--- a/tokio-net/tests/process_stdio.rs
+++ b/tokio-net/tests/process_stdio.rs
@@ -4,16 +4,15 @@
#[macro_use]
extern crate tracing;
-use std::env;
-use std::io;
-use std::process::{ExitStatus, Stdio};
+use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
+use tokio_net::process::{Child, Command};
use futures_util::future;
use futures_util::future::FutureExt;
use futures_util::stream::StreamExt;
-use tokio::codec::{FramedRead, LinesCodec};
-use tokio::io::AsyncWriteExt;
-use tokio_net::process::{Child, Command};
+use std::env;
+use std::io;
+use std::process::{ExitStatus, Stdio};
mod support;
use support::*;
@@ -51,7 +50,7 @@ async fn feed_cat(mut cat: Child, n: usize) -> io::Result<ExitStatus> {
};
let read = async {
- let mut reader = FramedRead::new(stdout, LinesCodec::new());
+ let mut reader = BufReader::new(stdout).lines();
let mut num_lines = 0;
// Try to read `n + 1` lines, ensuring the last one is empty
diff --git a/tokio-net/tests/udp.rs b/tokio-net/tests/udp.rs
index fb8f068a..124d1387 100644
--- a/tokio-net/tests/udp.rs
+++ b/tokio-net/tests/udp.rs
@@ -1,11 +1,6 @@
#![warn(rust_2018_idioms)]
-use tokio_codec::{Decoder, Encoder};
-use tokio_net::udp::{UdpFramed, UdpSocket};
-
-use bytes::{BufMut, BytesMut};
-use futures_util::{future::FutureExt, sink::SinkExt, stream::StreamExt, try_future::try_join};
-use std::io;
+use tokio_net::udp::UdpSocket;
#[tokio::test]
async fn send_recv() -> std::io::Result<()> {
@@ -75,72 +70,3 @@ async fn reunite_error() -> std::io::Result<()> {
assert!(s.reunite(r1).is_err());
Ok(())
}
-
-pub struct ByteCodec;
-
-impl Decoder for ByteCodec {
- type Item = Vec<u8>;
- type Error = io::Error;
-
- fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error> {
- let len = buf.len();
- Ok(Some(buf.split_to(len).to_vec()))
- }
-}
-
-impl Encoder for ByteCodec {
- type Item = Vec<u8>;
- type Error = io::Error;
-
- fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> Result<(), io::Error> {
- buf.reserve(data.len());
- buf.put(data);
- Ok(())
- }
-}
-
-#[tokio::test]
-async fn send_framed() -> std::io::Result<()> {
- let mut a_soc = UdpSocket::bind("127.0.0.1:0").await?;
- let mut b_soc = UdpSocket::bind("127.0.0.1:0").await?;
-
- let a_addr = a_soc.local_addr()?;
- let b_addr = b_soc.local_addr()?;
-
- // test sending & receiving bytes
- {
- let mut a = UdpFramed::new(a_soc, ByteCodec);
- let mut b = UdpFramed::new(b_soc, ByteCodec);
-
- let msg = b"4567".to_vec();
-
- let send = a.send((msg.clone(), b_addr));
- let recv = b.next().map(|e| e.unwrap());
- let (_, received) = try_join(send, recv).await.unwrap();
-
- let (data, addr) = received;
- assert_eq!(msg, data);
- assert_eq!(a_addr, addr);
-
- a_soc = a.into_inner();
- b_soc = b.into_inner();
- }
-
- // test sending & receiving an empty message
- {
- let mut a = UdpFramed::new(a_soc, ByteCodec);
- let mut b = UdpFramed::new(b_soc, ByteCodec);
-
- let msg = b"".to_vec();
-
- let send = a.send((msg.clone(), b_addr));
- let recv = b.next().map(|e| e.unwrap());
- let (_, received) = try_join(send, recv).await.unwrap();
-
- let (data, addr) = received;
- assert_eq!(msg, data);
- assert_eq!(a_addr, addr);
- }
-
- Ok(())
-}