diff options
author | Carl Lerche <me@carllerche.com> | 2019-10-22 10:13:49 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-22 10:13:49 -0700 |
commit | cfc15617a5247ea780c32c85b7134b88b6de5845 (patch) | |
tree | ef0a46c61c51505a60f386c9760acac9d1f9b7b1 /tokio-net | |
parent | b8cee1a60ad99ef28ec494ae4230e2ef4399fcf9 (diff) |
codec: move into tokio-util (#1675)
Related to #1318, Tokio APIs that are "less stable" are moved into a new
`tokio-util` crate. This crate will mirror `tokio` and provide
additional APIs that may require a greater rate of breaking changes.
As examples require `tokio-util`, they are moved into a separate
crate (`examples`). This has the added advantage of being able to avoid
example only dependencies in the `tokio` crate.
Diffstat (limited to 'tokio-net')
-rw-r--r-- | tokio-net/Cargo.toml | 1 | ||||
-rw-r--r-- | tokio-net/src/process/mod.rs | 9 | ||||
-rw-r--r-- | tokio-net/src/udp/frame.rs | 189 | ||||
-rw-r--r-- | tokio-net/src/udp/mod.rs | 2 | ||||
-rw-r--r-- | tokio-net/src/udp/socket.rs | 29 | ||||
-rw-r--r-- | tokio-net/src/udp/split.rs | 8 | ||||
-rw-r--r-- | tokio-net/src/uds/frame.rs | 175 | ||||
-rw-r--r-- | tokio-net/src/uds/mod.rs | 17 | ||||
-rw-r--r-- | tokio-net/tests/process_stdio.rs | 13 | ||||
-rw-r--r-- | tokio-net/tests/udp.rs | 76 |
10 files changed, 39 insertions, 480 deletions
diff --git a/tokio-net/Cargo.toml b/tokio-net/Cargo.toml index d7df3397..f19fd759 100644 --- a/tokio-net/Cargo.toml +++ b/tokio-net/Cargo.toml @@ -62,7 +62,6 @@ uds = [ log = ["tracing/log"] [dependencies] -tokio-codec = { version = "=0.2.0-alpha.6", path = "../tokio-codec" } tokio-executor = { version = "=0.2.0-alpha.6", features = ["blocking"], path = "../tokio-executor" } tokio-io = { version = "=0.2.0-alpha.6", path = "../tokio-io" } tokio-sync = { version = "=0.2.0-alpha.6", path = "../tokio-sync" } diff --git a/tokio-net/src/process/mod.rs b/tokio-net/src/process/mod.rs index 140daa5d..fbe905ad 100644 --- a/tokio-net/src/process/mod.rs +++ b/tokio-net/src/process/mod.rs @@ -55,10 +55,11 @@ //! We can also read input line by line. //! //! ```no_run +//! use tokio::io::{BufReader, AsyncBufReadExt}; +//! use tokio::process::Command; +//! //! use futures_util::stream::StreamExt; -//! use std::process::{Stdio}; -//! use tokio::codec::{FramedRead, LinesCodec}; -//! use tokio_net::process::Command; +//! use std::process::Stdio; //! //! #[tokio::main] //! async fn main() -> Result<(), Box<dyn std::error::Error>> { @@ -77,7 +78,7 @@ //! let stdout = child.stdout().take() //! .expect("child did not have a handle to stdout"); //! -//! let mut reader = FramedRead::new(stdout, LinesCodec::new()); +//! let mut reader = BufReader::new(stdout).lines(); //! //! // Ensure the child process is spawned in the runtime so it can //! // make progress on its own while we await for any output. diff --git a/tokio-net/src/udp/frame.rs b/tokio-net/src/udp/frame.rs deleted file mode 100644 index d694e2fe..00000000 --- a/tokio-net/src/udp/frame.rs +++ /dev/null @@ -1,189 +0,0 @@ -use super::UdpSocket; - -use tokio_codec::{Decoder, Encoder}; - -use bytes::{BufMut, BytesMut}; -use core::task::{Context, Poll}; -use futures_core::{ready, Stream}; -use futures_sink::Sink; -use std::io; -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; -use std::pin::Pin; - -/// A unified `Stream` and `Sink` interface to an underlying `UdpSocket`, using -/// the `Encoder` and `Decoder` traits to encode and decode frames. -/// -/// Raw UDP sockets work with datagrams, but higher-level code usually wants to -/// batch these into meaningful chunks, called "frames". This method layers -/// framing on top of this socket by using the `Encoder` and `Decoder` traits to -/// handle encoding and decoding of messages frames. Note that the incoming and -/// outgoing frame types may be distinct. -/// -/// This function returns a *single* object that is both `Stream` and `Sink`; -/// grouping this into a single object is often useful for layering things which -/// require both read and write access to the underlying object. -/// -/// If you want to work more directly with the streams and sink, consider -/// calling `split` on the `UdpFramed` returned by this method, which will break -/// them into separate objects, allowing them to interact more easily. -#[must_use = "sinks do nothing unless polled"] -#[derive(Debug)] -pub struct UdpFramed<C> { - socket: UdpSocket, - codec: C, - rd: BytesMut, - wr: BytesMut, - out_addr: SocketAddr, - flushed: bool, -} - -impl<C: Decoder + Unpin> Stream for UdpFramed<C> { - type Item = Result<(C::Item, SocketAddr), C::Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - let pin = self.get_mut(); - - pin.rd.reserve(INITIAL_RD_CAPACITY); - - let (_n, addr) = unsafe { - // Read into the buffer without having to initialize the memory. - let res = ready!(Pin::new(&mut pin.socket).poll_recv_from_priv(cx, pin.rd.bytes_mut())); - let (n, addr) = res?; - pin.rd.advance_mut(n); - (n, addr) - }; - - let span = trace_span!("decoding", from.addr = %addr, dgram.length = _n); - let _e = span.enter(); - trace!("trying to decode a frame..."); - - let frame_res = pin.codec.decode(&mut pin.rd); - pin.rd.clear(); - let frame = frame_res?; - let result = frame.map(|frame| Ok((frame, addr))); // frame -> (frame, addr) - - trace!("frame decoded from buffer"); - Poll::Ready(result) - } -} - -impl<C: Encoder + Unpin> Sink<(C::Item, SocketAddr)> for UdpFramed<C> { - type Error = C::Error; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - if !self.flushed { - match self.poll_flush(cx)? { - Poll::Ready(()) => {} - Poll::Pending => return Poll::Pending, - } - } - - Poll::Ready(Ok(())) - } - - fn start_send(self: Pin<&mut Self>, item: (C::Item, SocketAddr)) -> Result<(), Self::Error> { - let (frame, out_addr) = item; - - let span = trace_span!("sending", to.addr = %out_addr); - let _e = span.enter(); - trace!("encoding frame..."); - - let pin = self.get_mut(); - - pin.codec.encode(frame, &mut pin.wr)?; - pin.out_addr = out_addr; - pin.flushed = false; - trace!(message = "frame encoded", frame.length = pin.wr.len()); - - Ok(()) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - if self.flushed { - return Poll::Ready(Ok(())); - } - - let Self { - ref mut socket, - ref mut out_addr, - ref mut wr, - .. - } = *self; - - let span = trace_span!("flushing", to.addr = %out_addr, frame.length = wr.len()); - let _e = span.enter(); - trace!("flushing frame..."); - - let n = ready!(socket.poll_send_to_priv(cx, &wr, &out_addr))?; - - let wrote_all = n == self.wr.len(); - self.wr.clear(); - self.flushed = true; - - trace!(written.length = n, written.complete = wrote_all); - - let res = if wrote_all { - Ok(()) - } else { - Err(io::Error::new( - io::ErrorKind::Other, - "failed to write entire datagram to socket", - ) - .into()) - }; - - Poll::Ready(res) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - ready!(self.poll_flush(cx))?; - Poll::Ready(Ok(())) - } -} - -const INITIAL_RD_CAPACITY: usize = 64 * 1024; -const INITIAL_WR_CAPACITY: usize = 8 * 1024; - -impl<C> UdpFramed<C> { - /// Create a new `UdpFramed` backed by the given socket and codec. - /// - /// See struct level documentation for more details. - pub fn new(socket: UdpSocket, codec: C) -> UdpFramed<C> { - UdpFramed { - socket, - codec, - out_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)), - rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY), - wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY), - flushed: true, - } - } - - /// Returns a reference to the underlying I/O stream wrapped by `Framed`. - /// - /// # Note - /// - /// Care should be taken to not tamper with the underlying stream of data - /// coming in as it may corrupt the stream of frames otherwise being worked - /// with. - pub fn get_ref(&self) -> &UdpSocket { - &self.socket - } - - /// Returns a mutable reference to the underlying I/O stream wrapped by - /// `Framed`. - /// - /// # Note - /// - /// Care should be taken to not tamper with the underlying stream of data - /// coming in as it may corrupt the stream of frames otherwise being worked - /// with. - pub fn get_mut(&mut self) -> &mut UdpSocket { - &mut self.socket - } - - /// Consumes the `Framed`, returning its underlying I/O stream. - pub fn into_inner(self) -> UdpSocket { - self.socket - } -} diff --git a/tokio-net/src/udp/mod.rs b/tokio-net/src/udp/mod.rs index e5c06585..45656773 100644 --- a/tokio-net/src/udp/mod.rs +++ b/tokio-net/src/udp/mod.rs @@ -7,9 +7,7 @@ //! //! [`UdpSocket`]: struct.UdpSocket -mod frame; mod socket; pub mod split; -pub use self::frame::UdpFramed; pub use self::socket::UdpSocket; diff --git a/tokio-net/src/udp/socket.rs b/tokio-net/src/udp/socket.rs index c59b564b..90f212a2 100644 --- a/tokio-net/src/udp/socket.rs +++ b/tokio-net/src/udp/socket.rs @@ -108,7 +108,7 @@ impl UdpSocket { /// /// [`connect`]: #method.connect pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { - poll_fn(|cx| self.poll_send_priv(cx, buf)).await + poll_fn(|cx| self.poll_send(cx, buf)).await } // Poll IO functions that takes `&self` are provided for the split API. @@ -121,11 +121,8 @@ impl UdpSocket { // While violating this requirement is "safe" from a Rust memory model point // of view, it will result in unexpected behavior in the form of lost // notifications and tasks hanging. - pub(crate) fn poll_send_priv( - &self, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<io::Result<usize>> { + #[doc(hidden)] + pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> { ready!(self.io.poll_write_ready(cx))?; match self.io.get_ref().send(buf) { @@ -150,14 +147,11 @@ impl UdpSocket { /// /// [`connect`]: #method.connect pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { - poll_fn(|cx| self.poll_recv_priv(cx, buf)).await + poll_fn(|cx| self.poll_recv(cx, buf)).await } - pub(crate) fn poll_recv_priv( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll<io::Result<usize>> { + #[doc(hidden)] + pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> { ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; match self.io.get_ref().recv(buf) { @@ -178,7 +172,7 @@ impl UdpSocket { let mut addrs = target.to_socket_addrs().await?; match addrs.next() { - Some(target) => poll_fn(|cx| self.poll_send_to_priv(cx, buf, &target)).await, + Some(target) => poll_fn(|cx| self.poll_send_to(cx, buf, &target)).await, None => Err(io::Error::new( io::ErrorKind::InvalidInput, "no addresses to send data to", @@ -186,7 +180,9 @@ impl UdpSocket { } } - pub(crate) fn poll_send_to_priv( + // TODO: Public or not? + #[doc(hidden)] + pub fn poll_send_to( &self, cx: &mut Context<'_>, buf: &[u8], @@ -210,10 +206,11 @@ impl UdpSocket { /// to hold the message bytes. If a message is too long to fit in the supplied /// buffer, excess bytes may be discarded. pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.poll_recv_from_priv(cx, buf)).await + poll_fn(|cx| self.poll_recv_from(cx, buf)).await } - pub(crate) fn poll_recv_from_priv( + #[doc(hidden)] + pub fn poll_recv_from( &self, cx: &mut Context<'_>, buf: &mut [u8], diff --git a/tokio-net/src/udp/split.rs b/tokio-net/src/udp/split.rs index e58f9276..ad8ce061 100644 --- a/tokio-net/src/udp/split.rs +++ b/tokio-net/src/udp/split.rs @@ -86,7 +86,7 @@ impl UdpSocketRecvHalf { /// to hold the message bytes. If a message is too long to fit in the supplied /// buffer, excess bytes may be discarded. pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.0.poll_recv_from_priv(cx, buf)).await + poll_fn(|cx| self.0.poll_recv_from(cx, buf)).await } /// Returns a future that receives a single datagram message on the socket from @@ -102,7 +102,7 @@ impl UdpSocketRecvHalf { /// /// [`connect`]: super::UdpSocket::connect pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { - poll_fn(|cx| self.0.poll_recv_priv(cx, buf)).await + poll_fn(|cx| self.0.poll_recv(cx, buf)).await } } @@ -120,7 +120,7 @@ impl UdpSocketSendHalf { /// The future will resolve to an error if the IP version of the socket does /// not match that of `target`. pub async fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> { - poll_fn(|cx| self.0.poll_send_to_priv(cx, buf, target)).await + poll_fn(|cx| self.0.poll_send_to(cx, buf, target)).await } /// Returns a future that sends data on the socket to the remote address to which it is connected. @@ -131,7 +131,7 @@ impl UdpSocketSendHalf { /// /// [`connect`]: super::UdpSocket::connect pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { - poll_fn(|cx| self.0.poll_send_priv(cx, buf)).await + poll_fn(|cx| self.0.poll_send(cx, buf)).await } } diff --git a/tokio-net/src/uds/frame.rs b/tokio-net/src/uds/frame.rs deleted file mode 100644 index 584da8ae..00000000 --- a/tokio-net/src/uds/frame.rs +++ /dev/null @@ -1,175 +0,0 @@ -use super::UnixDatagram; -use bytes::{BufMut, BytesMut}; -use futures::{try_ready, Async, AsyncSink, Poll, Sink, StartSend, Stream}; -use std::io; -use std::os::unix::net::SocketAddr; -use std::path::Path; -use tokio_codec::{Decoder, Encoder}; - -/// A unified `Stream` and `Sink` interface to an underlying `UnixDatagram`, using -/// the `Encoder` and `Decoder` traits to encode and decode frames. -/// -/// Unix datagram sockets work with datagrams, but higher-level code may wants to -/// batch these into meaningful chunks, called "frames". This method layers -/// framing on top of this socket by using the `Encoder` and `Decoder` traits to -/// handle encoding and decoding of messages frames. Note that the incoming and -/// outgoing frame types may be distinct. -/// -/// This function returns a *single* object that is both `Stream` and `Sink`; -/// grouping this into a single object is often useful for layering things which -/// require both read and write access to the underlying object. -/// -/// If you want to work more directly with the streams and sink, consider -/// calling `split` on the `UnixDatagramFramed` returned by this method, which will break -/// them into separate objects, allowing them to interact more easily. -#[must_use = "sinks do nothing unless polled"] -#[derive(Debug)] -pub struct UnixDatagramFramed<A, C> { - socket: UnixDatagram, - codec: C, - rd: BytesMut, - wr: BytesMut, - out_addr: Option<A>, - flushed: bool, -} - -impl<A, C: Decoder> Stream for UnixDatagramFramed<A, C> { - type Item = (C::Item, SocketAddr); - type Error = C::Error; - - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - self.rd.reserve(INITIAL_RD_CAPACITY); - - let (_n, addr) = unsafe { - let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut())); - self.rd.advance_mut(n); - (n, addr) - }; - - let span = trace_span!("decoding", from.addr = %addr, dgram.length = _n); - let _e = span.enter(); - trace!("trying to decode a frame..."); - - let frame_res = self.codec.decode(&mut self.rd); - self.rd.clear(); - let frame = frame_res?; - let result = frame.map(|frame| (frame, addr)); - trace!("frame decoded from buffer"); - Ok(Async::Ready(result)) - } -} - -impl<A: AsRef<Path>, C: Encoder> Sink for UnixDatagramFramed<A, C> { - type SinkItem = (C::Item, A); - type SinkError = C::Error; - - fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { - let span = trace_span!("sending", to.addr = %item.0, flushed = self.flushed); - let _e = span.enter(); - - trace!("sending frame..."); - - if !self.flushed { - match self.poll_complete()? { - Async::Ready(()) => {} - Async::NotReady => return Ok(AsyncSink::NotReady(item)), - } - } - - let (frame, out_addr) = item; - self.codec.encode(frame, &mut self.wr)?; - self.out_addr = Some(out_addr); - self.flushed = false; - trace!(message = "frame encoded", frame.length = pin.wr.len()); - - Ok(AsyncSink::Ready) - } - - fn poll_complete(&mut self) -> Poll<(), C::Error> { - if self.flushed { - return Ok(Async::Ready(())); - } - - let span = trace_span!("flushing", to.addr = %self.out_addr); - let _e = span.enter(); - - let n = { - let out_path = match self.out_addr { - Some(ref out_path) => out_path.as_ref(), - None => { - return Err(io::Error::new( - io::ErrorKind::Other, - "internal error: addr not available while data not flushed", - ) - .into()); - } - }; - - trace!(message = "flushing frame", frame.length = self.wr.len()); - try_ready!(self.socket.poll_send_to(&self.wr, out_path)) - }; - - let wrote_all = n == self.wr.len(); - self.wr.clear(); - self.flushed = true; - - trace!(written.length = n, written.complete = wrote_all); - - if wrote_all { - self.out_addr = None; - Ok(Async::Ready(())) - } else { - Err(io::Error::new( - io::ErrorKind::Other, - "failed to write entire datagram to socket", - ) - .into()) - } - } - - fn close(&mut self) -> Poll<(), C::Error> { - self.poll_complete() - } -} - -const INITIAL_RD_CAPACITY: usize = 64 * 1024; -const INITIAL_WR_CAPACITY: usize = 8 * 1024; - -impl<A, C> UnixDatagramFramed<A, C> { - /// Create a new `UnixDatagramFramed` backed by the given socket and codec. - /// - /// See struct level documentation for more details. - pub fn new(socket: UnixDatagram, codec: C) -> UnixDatagramFramed<A, C> { - UnixDatagramFramed { - socket: socket, - codec: codec, - out_addr: None, - rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY), - wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY), - flushed: true, - } - } - - /// Returns a reference to the underlying I/O stream wrapped by `Framed`. - /// - /// # Note - /// - /// Care should be taken to not tamper with the underlying stream of data - /// coming in as it may corrupt the stream of frames otherwise being worked - /// with. - pub fn get_ref(&self) -> &UnixDatagram { - &self.socket - } - - /// Returns a mutable reference to the underlying I/O stream wrapped by - /// `Framed`. - /// - /// # Note - /// - /// Care should be taken to not tamper with the underlying stream of data - /// coming in as it may corrupt the stream of frames otherwise being worked - /// with. - pub fn get_mut(&mut self) -> &mut UnixDatagram { - &mut self.socket - } -} diff --git a/tokio-net/src/uds/mod.rs b/tokio-net/src/uds/mod.rs index 3b684992..4447ca5c 100644 --- a/tokio-net/src/uds/mod.rs +++ b/tokio-net/src/uds/mod.rs @@ -3,16 +3,19 @@ //! This crate provides APIs for using Unix Domain Sockets with Tokio. mod datagram; -// mod frame; -mod incoming; -mod listener; -pub mod split; -mod stream; -mod ucred; - pub use self::datagram::UnixDatagram; + +mod incoming; #[cfg(feature = "async-traits")] pub use self::incoming::Incoming; + +mod listener; pub use self::listener::UnixListener; + +pub mod split; + +mod stream; pub use self::stream::UnixStream; + +mod ucred; pub use self::ucred::UCred; diff --git a/tokio-net/tests/process_stdio.rs b/tokio-net/tests/process_stdio.rs index b7f7c2af..e98162d4 100644 --- a/tokio-net/tests/process_stdio.rs +++ b/tokio-net/tests/process_stdio.rs @@ -4,16 +4,15 @@ #[macro_use] extern crate tracing; -use std::env; -use std::io; -use std::process::{ExitStatus, Stdio}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio_net::process::{Child, Command}; use futures_util::future; use futures_util::future::FutureExt; use futures_util::stream::StreamExt; -use tokio::codec::{FramedRead, LinesCodec}; -use tokio::io::AsyncWriteExt; -use tokio_net::process::{Child, Command}; +use std::env; +use std::io; +use std::process::{ExitStatus, Stdio}; mod support; use support::*; @@ -51,7 +50,7 @@ async fn feed_cat(mut cat: Child, n: usize) -> io::Result<ExitStatus> { }; let read = async { - let mut reader = FramedRead::new(stdout, LinesCodec::new()); + let mut reader = BufReader::new(stdout).lines(); let mut num_lines = 0; // Try to read `n + 1` lines, ensuring the last one is empty diff --git a/tokio-net/tests/udp.rs b/tokio-net/tests/udp.rs index fb8f068a..124d1387 100644 --- a/tokio-net/tests/udp.rs +++ b/tokio-net/tests/udp.rs @@ -1,11 +1,6 @@ #![warn(rust_2018_idioms)] -use tokio_codec::{Decoder, Encoder}; -use tokio_net::udp::{UdpFramed, UdpSocket}; - -use bytes::{BufMut, BytesMut}; -use futures_util::{future::FutureExt, sink::SinkExt, stream::StreamExt, try_future::try_join}; -use std::io; +use tokio_net::udp::UdpSocket; #[tokio::test] async fn send_recv() -> std::io::Result<()> { @@ -75,72 +70,3 @@ async fn reunite_error() -> std::io::Result<()> { assert!(s.reunite(r1).is_err()); Ok(()) } - -pub struct ByteCodec; - -impl Decoder for ByteCodec { - type Item = Vec<u8>; - type Error = io::Error; - - fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error> { - let len = buf.len(); - Ok(Some(buf.split_to(len).to_vec())) - } -} - -impl Encoder for ByteCodec { - type Item = Vec<u8>; - type Error = io::Error; - - fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> Result<(), io::Error> { - buf.reserve(data.len()); - buf.put(data); - Ok(()) - } -} - -#[tokio::test] -async fn send_framed() -> std::io::Result<()> { - let mut a_soc = UdpSocket::bind("127.0.0.1:0").await?; - let mut b_soc = UdpSocket::bind("127.0.0.1:0").await?; - - let a_addr = a_soc.local_addr()?; - let b_addr = b_soc.local_addr()?; - - // test sending & receiving bytes - { - let mut a = UdpFramed::new(a_soc, ByteCodec); - let mut b = UdpFramed::new(b_soc, ByteCodec); - - let msg = b"4567".to_vec(); - - let send = a.send((msg.clone(), b_addr)); - let recv = b.next().map(|e| e.unwrap()); - let (_, received) = try_join(send, recv).await.unwrap(); - - let (data, addr) = received; - assert_eq!(msg, data); - assert_eq!(a_addr, addr); - - a_soc = a.into_inner(); - b_soc = b.into_inner(); - } - - // test sending & receiving an empty message - { - let mut a = UdpFramed::new(a_soc, ByteCodec); - let mut b = UdpFramed::new(b_soc, ByteCodec); - - let msg = b"".to_vec(); - - let send = a.send((msg.clone(), b_addr)); - let recv = b.next().map(|e| e.unwrap()); - let (_, received) = try_join(send, recv).await.unwrap(); - - let (data, addr) = received; - assert_eq!(msg, data); - assert_eq!(a_addr, addr); - } - - Ok(()) -} |