summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvan Cameron <cameron.evan@gmail.com>2020-11-06 10:59:15 -0500
committerGitHub <noreply@github.com>2020-11-06 16:59:15 +0100
commit47658a6da5a6cf2d7db4727e61915709727cd632 (patch)
treef39936d87559c4a4a195df948ee6cb86791d882f
parentd7e3fcb9ee472d40337776cd5f5ffd51bc50272c (diff)
util: resurrect UdpFramed (#3044)
-rw-r--r--tokio-util/Cargo.toml3
-rw-r--r--tokio-util/src/cfg.rs8
-rw-r--r--tokio-util/src/lib.rs7
-rw-r--r--tokio-util/src/udp/frame.rs66
-rw-r--r--tokio-util/src/udp/mod.rs2
-rw-r--r--tokio-util/tests/udp.rs2
6 files changed, 56 insertions, 32 deletions
diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml
index ba2f3a4f..c089b805 100644
--- a/tokio-util/Cargo.toml
+++ b/tokio-util/Cargo.toml
@@ -24,8 +24,9 @@ categories = ["asynchronous"]
default = []
# Shorthand for enabling everything
-full = ["codec", "compat", "io", "time"]
+full = ["codec", "compat", "io", "time", "net"]
+net = ["tokio/net"]
compat = ["futures-io",]
codec = ["tokio/stream"]
time = ["tokio/time","slab"]
diff --git a/tokio-util/src/cfg.rs b/tokio-util/src/cfg.rs
index ee423d43..dad0c444 100644
--- a/tokio-util/src/cfg.rs
+++ b/tokio-util/src/cfg.rs
@@ -18,17 +18,15 @@ macro_rules! cfg_compat {
}
}
-/*
-macro_rules! cfg_udp {
+macro_rules! cfg_net {
($($item:item)*) => {
$(
- #[cfg(all(feature = "udp", feature = "codec"))]
- #[cfg_attr(docsrs, doc(cfg(all(feature = "udp", feature = "codec"))))]
+ #[cfg(all(feature = "net", feature = "codec"))]
+ #[cfg_attr(docsrs, doc(cfg(all(feature = "net", feature = "codec"))))]
$item
)*
}
}
-*/
macro_rules! cfg_io {
($($item:item)*) => {
diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs
index 09dd5a10..c4d80440 100644
--- a/tokio-util/src/lib.rs
+++ b/tokio-util/src/lib.rs
@@ -30,14 +30,9 @@ cfg_codec! {
pub mod codec;
}
-/*
-Disabled due to removal of poll_ functions on UdpSocket.
-
-See https://github.com/tokio-rs/tokio/issues/2830
-cfg_udp! {
+cfg_net! {
pub mod udp;
}
-*/
cfg_compat! {
pub mod compat;
diff --git a/tokio-util/src/udp/frame.rs b/tokio-util/src/udp/frame.rs
index 560f35c9..0c711a86 100644
--- a/tokio-util/src/udp/frame.rs
+++ b/tokio-util/src/udp/frame.rs
@@ -1,17 +1,16 @@
use crate::codec::{Decoder, Encoder};
-use tokio::{net::UdpSocket, stream::Stream};
+use tokio::{io::ReadBuf, net::UdpSocket, stream::Stream};
use bytes::{BufMut, BytesMut};
use futures_core::ready;
use futures_sink::Sink;
-use std::io;
-use std::mem::MaybeUninit;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::pin::Pin;
use std::task::{Context, Poll};
+use std::{io, mem::MaybeUninit};
-/// A unified `Stream` and `Sink` interface to an underlying `UdpSocket`, using
+/// A unified [`Stream`] and [`Sink`] interface to an underlying `UdpSocket`, using
/// the `Encoder` and `Decoder` traits to encode and decode frames.
///
/// Raw UDP sockets work with datagrams, but higher-level code usually wants to
@@ -20,13 +19,17 @@ use std::task::{Context, Poll};
/// handle encoding and decoding of messages frames. Note that the incoming and
/// outgoing frame types may be distinct.
///
-/// This function returns a *single* object that is both `Stream` and `Sink`;
+/// This function returns a *single* object that is both [`Stream`] and [`Sink`];
/// grouping this into a single object is often useful for layering things which
/// require both read and write access to the underlying object.
///
/// If you want to work more directly with the streams and sink, consider
-/// calling `split` on the `UdpFramed` returned by this method, which will break
+/// calling [`split`] on the `UdpFramed` returned by this method, which will break
/// them into separate objects, allowing them to interact more easily.
+///
+/// [`Stream`]: tokio::stream::Stream
+/// [`Sink`]: futures_sink::Sink
+/// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
#[must_use = "sinks do nothing unless polled"]
#[cfg_attr(docsrs, doc(all(feature = "codec", feature = "udp")))]
#[derive(Debug)]
@@ -41,6 +44,9 @@ pub struct UdpFramed<C> {
current_addr: Option<SocketAddr>,
}
+const INITIAL_RD_CAPACITY: usize = 64 * 1024;
+const INITIAL_WR_CAPACITY: usize = 8 * 1024;
+
impl<C: Decoder + Unpin> Stream for UdpFramed<C> {
type Item = Result<(C::Item, SocketAddr), C::Error>;
@@ -69,13 +75,14 @@ impl<C: Decoder + Unpin> Stream for UdpFramed<C> {
let addr = unsafe {
// Convert `&mut [MaybeUnit<u8>]` to `&mut [u8]` because we will be
// writing to it via `poll_recv_from` and therefore initializing the memory.
- let buf: &mut [u8] =
- &mut *(pin.rd.bytes_mut() as *mut [MaybeUninit<u8>] as *mut [u8]);
-
- let res = ready!(Pin::new(&mut pin.socket).poll_recv_from(cx, buf));
-
- let (n, addr) = res?;
- pin.rd.advance_mut(n);
+ let buf = &mut *(pin.rd.bytes_mut() as *mut _ as *mut [MaybeUninit<u8>]);
+ let mut read = ReadBuf::uninit(buf);
+ let ptr = read.filled().as_ptr();
+ let res = ready!(Pin::new(&mut pin.socket).poll_recv_from(cx, &mut read));
+
+ assert_eq!(ptr, read.filled().as_ptr());
+ let addr = res?;
+ pin.rd.advance_mut(read.filled().len());
addr
};
@@ -148,15 +155,12 @@ impl<I, C: Encoder<I> + Unpin> Sink<(I, SocketAddr)> for UdpFramed<C> {
}
}
-const INITIAL_RD_CAPACITY: usize = 64 * 1024;
-const INITIAL_WR_CAPACITY: usize = 8 * 1024;
-
impl<C> UdpFramed<C> {
/// Create a new `UdpFramed` backed by the given socket and codec.
///
/// See struct level documentation for more details.
pub fn new(socket: UdpSocket, codec: C) -> UdpFramed<C> {
- UdpFramed {
+ Self {
socket,
codec,
out_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)),
@@ -195,4 +199,32 @@ impl<C> UdpFramed<C> {
pub fn into_inner(self) -> UdpSocket {
self.socket
}
+
+ /// Returns a reference to the underlying codec wrapped by
+ /// `Framed`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying codec
+ /// as it may corrupt the stream of frames otherwise being worked with.
+ pub fn codec(&self) -> &C {
+ &self.codec
+ }
+
+ /// Returns a mutable reference to the underlying codec wrapped by
+ /// `UdpFramed`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying codec
+ /// as it may corrupt the stream of frames otherwise being worked with.
+ pub fn codec_mut(&mut self) -> &mut C {
+ &mut self.codec
+ }
+
+ /// Returns a reference to the read buffer.
+ pub fn read_buffer(&self) -> &BytesMut {
+ &self.rd
+ }
+
+ /// Returns a mutable reference to the read buffer.
+ pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
+ &mut self.rd
+ }
}
diff --git a/tokio-util/src/udp/mod.rs b/tokio-util/src/udp/mod.rs
index 7c4bb2b3..f88ea030 100644
--- a/tokio-util/src/udp/mod.rs
+++ b/tokio-util/src/udp/mod.rs
@@ -1,4 +1,4 @@
//! UDP framing
mod frame;
-pub use self::frame::UdpFramed;
+pub use frame::UdpFramed;
diff --git a/tokio-util/tests/udp.rs b/tokio-util/tests/udp.rs
index 99763854..4820ac72 100644
--- a/tokio-util/tests/udp.rs
+++ b/tokio-util/tests/udp.rs
@@ -1,4 +1,3 @@
-/*
#![warn(rust_2018_idioms)]
use tokio::{net::UdpSocket, stream::StreamExt};
@@ -101,4 +100,3 @@ async fn send_framed_lines_codec() -> std::io::Result<()> {
Ok(())
}
-*/