From 47658a6da5a6cf2d7db4727e61915709727cd632 Mon Sep 17 00:00:00 2001 From: Evan Cameron Date: Fri, 6 Nov 2020 10:59:15 -0500 Subject: util: resurrect UdpFramed (#3044) --- tokio-util/Cargo.toml | 3 ++- tokio-util/src/cfg.rs | 8 +++--- tokio-util/src/lib.rs | 7 +---- tokio-util/src/udp/frame.rs | 66 +++++++++++++++++++++++++++++++++------------ tokio-util/src/udp/mod.rs | 2 +- tokio-util/tests/udp.rs | 2 -- 6 files changed, 56 insertions(+), 32 deletions(-) diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index ba2f3a4f..c089b805 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -24,8 +24,9 @@ categories = ["asynchronous"] default = [] # Shorthand for enabling everything -full = ["codec", "compat", "io", "time"] +full = ["codec", "compat", "io", "time", "net"] +net = ["tokio/net"] compat = ["futures-io",] codec = ["tokio/stream"] time = ["tokio/time","slab"] diff --git a/tokio-util/src/cfg.rs b/tokio-util/src/cfg.rs index ee423d43..dad0c444 100644 --- a/tokio-util/src/cfg.rs +++ b/tokio-util/src/cfg.rs @@ -18,17 +18,15 @@ macro_rules! cfg_compat { } } -/* -macro_rules! cfg_udp { +macro_rules! cfg_net { ($($item:item)*) => { $( - #[cfg(all(feature = "udp", feature = "codec"))] - #[cfg_attr(docsrs, doc(cfg(all(feature = "udp", feature = "codec"))))] + #[cfg(all(feature = "net", feature = "codec"))] + #[cfg_attr(docsrs, doc(cfg(all(feature = "net", feature = "codec"))))] $item )* } } -*/ macro_rules! cfg_io { ($($item:item)*) => { diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 09dd5a10..c4d80440 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -30,14 +30,9 @@ cfg_codec! { pub mod codec; } -/* -Disabled due to removal of poll_ functions on UdpSocket. - -See https://github.com/tokio-rs/tokio/issues/2830 -cfg_udp! { +cfg_net! { pub mod udp; } -*/ cfg_compat! { pub mod compat; diff --git a/tokio-util/src/udp/frame.rs b/tokio-util/src/udp/frame.rs index 560f35c9..0c711a86 100644 --- a/tokio-util/src/udp/frame.rs +++ b/tokio-util/src/udp/frame.rs @@ -1,17 +1,16 @@ use crate::codec::{Decoder, Encoder}; -use tokio::{net::UdpSocket, stream::Stream}; +use tokio::{io::ReadBuf, net::UdpSocket, stream::Stream}; use bytes::{BufMut, BytesMut}; use futures_core::ready; use futures_sink::Sink; -use std::io; -use std::mem::MaybeUninit; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::pin::Pin; use std::task::{Context, Poll}; +use std::{io, mem::MaybeUninit}; -/// A unified `Stream` and `Sink` interface to an underlying `UdpSocket`, using +/// A unified [`Stream`] and [`Sink`] interface to an underlying `UdpSocket`, using /// the `Encoder` and `Decoder` traits to encode and decode frames. /// /// Raw UDP sockets work with datagrams, but higher-level code usually wants to @@ -20,13 +19,17 @@ use std::task::{Context, Poll}; /// handle encoding and decoding of messages frames. Note that the incoming and /// outgoing frame types may be distinct. /// -/// This function returns a *single* object that is both `Stream` and `Sink`; +/// This function returns a *single* object that is both [`Stream`] and [`Sink`]; /// grouping this into a single object is often useful for layering things which /// require both read and write access to the underlying object. /// /// If you want to work more directly with the streams and sink, consider -/// calling `split` on the `UdpFramed` returned by this method, which will break +/// calling [`split`] on the `UdpFramed` returned by this method, which will break /// them into separate objects, allowing them to interact more easily. +/// +/// [`Stream`]: tokio::stream::Stream +/// [`Sink`]: futures_sink::Sink +/// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split #[must_use = "sinks do nothing unless polled"] #[cfg_attr(docsrs, doc(all(feature = "codec", feature = "udp")))] #[derive(Debug)] @@ -41,6 +44,9 @@ pub struct UdpFramed { current_addr: Option, } +const INITIAL_RD_CAPACITY: usize = 64 * 1024; +const INITIAL_WR_CAPACITY: usize = 8 * 1024; + impl Stream for UdpFramed { type Item = Result<(C::Item, SocketAddr), C::Error>; @@ -69,13 +75,14 @@ impl Stream for UdpFramed { let addr = unsafe { // Convert `&mut [MaybeUnit]` to `&mut [u8]` because we will be // writing to it via `poll_recv_from` and therefore initializing the memory. - let buf: &mut [u8] = - &mut *(pin.rd.bytes_mut() as *mut [MaybeUninit] as *mut [u8]); - - let res = ready!(Pin::new(&mut pin.socket).poll_recv_from(cx, buf)); - - let (n, addr) = res?; - pin.rd.advance_mut(n); + let buf = &mut *(pin.rd.bytes_mut() as *mut _ as *mut [MaybeUninit]); + let mut read = ReadBuf::uninit(buf); + let ptr = read.filled().as_ptr(); + let res = ready!(Pin::new(&mut pin.socket).poll_recv_from(cx, &mut read)); + + assert_eq!(ptr, read.filled().as_ptr()); + let addr = res?; + pin.rd.advance_mut(read.filled().len()); addr }; @@ -148,15 +155,12 @@ impl + Unpin> Sink<(I, SocketAddr)> for UdpFramed { } } -const INITIAL_RD_CAPACITY: usize = 64 * 1024; -const INITIAL_WR_CAPACITY: usize = 8 * 1024; - impl UdpFramed { /// Create a new `UdpFramed` backed by the given socket and codec. /// /// See struct level documentation for more details. pub fn new(socket: UdpSocket, codec: C) -> UdpFramed { - UdpFramed { + Self { socket, codec, out_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)), @@ -195,4 +199,32 @@ impl UdpFramed { pub fn into_inner(self) -> UdpSocket { self.socket } + + /// Returns a reference to the underlying codec wrapped by + /// `Framed`. + /// + /// Note that care should be taken to not tamper with the underlying codec + /// as it may corrupt the stream of frames otherwise being worked with. + pub fn codec(&self) -> &C { + &self.codec + } + + /// Returns a mutable reference to the underlying codec wrapped by + /// `UdpFramed`. + /// + /// Note that care should be taken to not tamper with the underlying codec + /// as it may corrupt the stream of frames otherwise being worked with. + pub fn codec_mut(&mut self) -> &mut C { + &mut self.codec + } + + /// Returns a reference to the read buffer. + pub fn read_buffer(&self) -> &BytesMut { + &self.rd + } + + /// Returns a mutable reference to the read buffer. + pub fn read_buffer_mut(&mut self) -> &mut BytesMut { + &mut self.rd + } } diff --git a/tokio-util/src/udp/mod.rs b/tokio-util/src/udp/mod.rs index 7c4bb2b3..f88ea030 100644 --- a/tokio-util/src/udp/mod.rs +++ b/tokio-util/src/udp/mod.rs @@ -1,4 +1,4 @@ //! UDP framing mod frame; -pub use self::frame::UdpFramed; +pub use frame::UdpFramed; diff --git a/tokio-util/tests/udp.rs b/tokio-util/tests/udp.rs index 99763854..4820ac72 100644 --- a/tokio-util/tests/udp.rs +++ b/tokio-util/tests/udp.rs @@ -1,4 +1,3 @@ -/* #![warn(rust_2018_idioms)] use tokio::{net::UdpSocket, stream::StreamExt}; @@ -101,4 +100,3 @@ async fn send_framed_lines_codec() -> std::io::Result<()> { Ok(()) } -*/ -- cgit v1.2.3