diff options
-rw-r--r-- | examples/Cargo.toml | 2 | ||||
-rw-r--r-- | examples/chat.rs | 6 | ||||
-rw-r--r-- | examples/tinydb.rs | 2 | ||||
-rw-r--r-- | examples/tinyhttp.rs | 3 | ||||
-rw-r--r-- | tokio-tls/Cargo.toml | 2 | ||||
-rw-r--r-- | tokio-util/CHANGELOG.md | 11 | ||||
-rw-r--r-- | tokio-util/Cargo.toml | 4 | ||||
-rw-r--r-- | tokio-util/src/codec/bytes_codec.rs | 3 | ||||
-rw-r--r-- | tokio-util/src/codec/decoder.rs | 3 | ||||
-rw-r--r-- | tokio-util/src/codec/encoder.rs | 7 | ||||
-rw-r--r-- | tokio-util/src/codec/framed.rs | 15 | ||||
-rw-r--r-- | tokio-util/src/codec/framed_write.rs | 7 | ||||
-rw-r--r-- | tokio-util/src/codec/length_delimited.rs | 5 | ||||
-rw-r--r-- | tokio-util/src/codec/lines_codec.rs | 9 | ||||
-rw-r--r-- | tokio-util/src/udp/frame.rs | 4 | ||||
-rw-r--r-- | tokio-util/tests/codecs.rs | 4 | ||||
-rw-r--r-- | tokio-util/tests/framed.rs | 3 | ||||
-rw-r--r-- | tokio-util/tests/framed_write.rs | 3 | ||||
-rw-r--r-- | tokio-util/tests/udp.rs | 21 |
19 files changed, 59 insertions, 55 deletions
diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 423ffe02..a25f526f 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dev-dependencies] tokio = { version = "0.2.0", path = "../tokio", features = ["full"] } -tokio-util = { version = "0.2.0", path = "../tokio-util", features = ["full"] } +tokio-util = { version = "0.3.0", path = "../tokio-util", features = ["full"] } bytes = "0.5" futures = "0.3.0" http = "0.2" diff --git a/examples/chat.rs b/examples/chat.rs index fac43cbf..b3fb727a 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -193,9 +193,7 @@ async fn process( let mut lines = Framed::new(stream, LinesCodec::new()); // Send a prompt to the client to enter their username. - lines - .send(String::from("Please enter your username:")) - .await?; + lines.send("Please enter your username:").await?; // Read the first line from the `LineCodec` stream to get the username. let username = match lines.next().await { @@ -232,7 +230,7 @@ async fn process( // A message was received from a peer. Send it to the // current user. Ok(Message::Received(msg)) => { - peer.lines.send(msg).await?; + peer.lines.send(&msg).await?; } Err(e) => { println!( diff --git a/examples/tinydb.rs b/examples/tinydb.rs index 7c71dedf..c1af2541 100644 --- a/examples/tinydb.rs +++ b/examples/tinydb.rs @@ -130,7 +130,7 @@ async fn main() -> Result<(), Box<dyn Error>> { let response = response.serialize(); - if let Err(e) = lines.send(response).await { + if let Err(e) = lines.send(response.as_str()).await { println!("error on sending response; error = {:?}", e); } } diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs index 9ac2806e..732da0d6 100644 --- a/examples/tinyhttp.rs +++ b/examples/tinyhttp.rs @@ -96,8 +96,7 @@ struct Http; /// Implementation of encoding an HTTP response into a `BytesMut`, basically /// just writing out an HTTP/1.1 response. -impl Encoder for Http { - type Item = Response<String>; +impl Encoder<Response<String>> for Http { type Error = io::Error; fn encode(&mut self, item: Response<String>, dst: &mut BytesMut) -> io::Result<()> { diff --git a/tokio-tls/Cargo.toml b/tokio-tls/Cargo.toml index d5a5c6de..a9877926 100644 --- a/tokio-tls/Cargo.toml +++ b/tokio-tls/Cargo.toml @@ -30,7 +30,7 @@ tokio = { version = "0.2.0", path = "../tokio" } [dev-dependencies] tokio = { version = "0.2.0", path = "../tokio", features = ["macros", "stream", "rt-core", "io-util", "net"] } -tokio-util = { version = "0.2.0", path = "../tokio-util", features = ["full"] } +tokio-util = { version = "0.3.0", path = "../tokio-util", features = ["full"] } cfg-if = "0.1" env_logger = { version = "0.6", default-features = false } diff --git a/tokio-util/CHANGELOG.md b/tokio-util/CHANGELOG.md index 48022e34..aaabd106 100644 --- a/tokio-util/CHANGELOG.md +++ b/tokio-util/CHANGELOG.md @@ -1,3 +1,14 @@ +# 0.3.0 (February 28, 2020) + +Breaking changes: + +- Change codec::Encoder trait to take a generic Item parameter (#1746), which allows + codec writers to pass references into `Framed` and `FramedWrite` types. + +Other additions: + +- Add futures-io/tokio::io compatibility layer (#2117) + # 0.2.0 (November 26, 2019) - Initial release diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index e029de2f..9a537ba1 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -7,7 +7,7 @@ name = "tokio-util" # - Cargo.toml # - Update CHANGELOG.md. # - Create "v0.2.x" git tag. -version = "0.2.0" +version = "0.3.0" edition = "2018" authors = ["Tokio Contributors <team@tokio.rs>"] license = "MIT" @@ -38,7 +38,7 @@ futures-core = "0.3.0" futures-sink = "0.3.0" futures-io = { version = "0.3.0", optional = true } log = "0.4" -pin-project-lite = "0.1.1" +pin-project-lite = "0.1.4" [dev-dependencies] tokio = { version = "0.2.0", path = "../tokio", features = ["full"] } diff --git a/tokio-util/src/codec/bytes_codec.rs b/tokio-util/src/codec/bytes_codec.rs index 46b31aa3..a5e73749 100644 --- a/tokio-util/src/codec/bytes_codec.rs +++ b/tokio-util/src/codec/bytes_codec.rs @@ -65,8 +65,7 @@ impl Decoder for BytesCodec { } } -impl Encoder for BytesCodec { - type Item = Bytes; +impl Encoder<Bytes> for BytesCodec { type Error = io::Error; fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> { diff --git a/tokio-util/src/codec/decoder.rs b/tokio-util/src/codec/decoder.rs index 5f53ebc3..84d27fbf 100644 --- a/tokio-util/src/codec/decoder.rs +++ b/tokio-util/src/codec/decoder.rs @@ -1,4 +1,3 @@ -use crate::codec::encoder::Encoder; use crate::codec::Framed; use tokio::io::{AsyncRead, AsyncWrite}; @@ -159,7 +158,7 @@ pub trait Decoder { /// [`Framed`]: crate::codec::Framed fn framed<T: AsyncRead + AsyncWrite + Sized>(self, io: T) -> Framed<T, Self> where - Self: Encoder + Sized, + Self: Sized, { Framed::new(io, self) } diff --git a/tokio-util/src/codec/encoder.rs b/tokio-util/src/codec/encoder.rs index e33e6684..770a10fa 100644 --- a/tokio-util/src/codec/encoder.rs +++ b/tokio-util/src/codec/encoder.rs @@ -5,10 +5,7 @@ use std::io; /// [`FramedWrite`]. /// /// [`FramedWrite`]: crate::codec::FramedWrite -pub trait Encoder { - /// The type of items consumed by the `Encoder` - type Item; - +pub trait Encoder<Item> { /// The type of encoding errors. /// /// [`FramedWrite`] requires `Encoder`s errors to implement `From<io::Error>` @@ -24,5 +21,5 @@ pub trait Encoder { /// will be written out when possible. /// /// [`FramedWrite`]: crate::codec::FramedWrite - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error>; + fn encode(&mut self, item: Item, dst: &mut BytesMut) -> Result<(), Self::Error>; } diff --git a/tokio-util/src/codec/framed.rs b/tokio-util/src/codec/framed.rs index e42ed28c..d2e7659e 100644 --- a/tokio-util/src/codec/framed.rs +++ b/tokio-util/src/codec/framed.rs @@ -67,7 +67,6 @@ impl<T, U> ProjectFuse for Fuse<T, U> { impl<T, U> Framed<T, U> where T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, { /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data. @@ -262,7 +261,7 @@ where impl<T, I, U> Sink<I> for Framed<T, U> where T: AsyncWrite, - U: Encoder<Item = I>, + U: Encoder<I>, U::Error: From<io::Error>, { type Error = U::Error; @@ -380,11 +379,10 @@ impl<T, U: Decoder> Decoder for Fuse<T, U> { } } -impl<T, U: Encoder> Encoder for Fuse<T, U> { - type Item = U::Item; +impl<T, I, U: Encoder<I>> Encoder<I> for Fuse<T, U> { type Error = U::Error; - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, item: I, dst: &mut BytesMut) -> Result<(), Self::Error> { self.codec.encode(item, dst) } } @@ -414,8 +412,11 @@ pub struct FramedParts<T, U> { } impl<T, U> FramedParts<T, U> { - /// Create a new, default, `FramedParts`. - pub fn new(io: T, codec: U) -> FramedParts<T, U> { + /// Create a new, default, `FramedParts` + pub fn new<I>(io: T, codec: U) -> FramedParts<T, U> + where + U: Encoder<I>, + { FramedParts { io, codec, diff --git a/tokio-util/src/codec/framed_write.rs b/tokio-util/src/codec/framed_write.rs index 80aa21c6..c0049b2d 100644 --- a/tokio-util/src/codec/framed_write.rs +++ b/tokio-util/src/codec/framed_write.rs @@ -42,7 +42,6 @@ const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY; impl<T, E> FramedWrite<T, E> where T: AsyncWrite, - E: Encoder, { /// Creates a new `FramedWrite` with the given `encoder`. pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> { @@ -100,7 +99,7 @@ impl<T, E> FramedWrite<T, E> { impl<T, I, E> Sink<I> for FramedWrite<T, E> where T: AsyncWrite, - E: Encoder<Item = I>, + E: Encoder<I>, E::Error: From<io::Error>, { type Error = E::Error; @@ -191,9 +190,9 @@ impl<T> FramedWrite2<T> { impl<I, T> Sink<I> for FramedWrite2<T> where T: ProjectFuse + AsyncWrite, - T::Codec: Encoder<Item = I>, + T::Codec: Encoder<I>, { - type Error = <T::Codec as Encoder>::Error; + type Error = <T::Codec as Encoder<I>>::Error; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { // If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's diff --git a/tokio-util/src/codec/length_delimited.rs b/tokio-util/src/codec/length_delimited.rs index 04061c0a..5e98d4a7 100644 --- a/tokio-util/src/codec/length_delimited.rs +++ b/tokio-util/src/codec/length_delimited.rs @@ -546,12 +546,11 @@ impl Decoder for LengthDelimitedCodec { } } -impl Encoder for LengthDelimitedCodec { - type Item = Bytes; +impl Encoder<Bytes> for LengthDelimitedCodec { type Error = io::Error; fn encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error> { - let n = (&data).remaining(); + let n = data.len(); if n > self.builder.max_frame_len { return Err(io::Error::new( diff --git a/tokio-util/src/codec/lines_codec.rs b/tokio-util/src/codec/lines_codec.rs index e048b287..e1816d8c 100644 --- a/tokio-util/src/codec/lines_codec.rs +++ b/tokio-util/src/codec/lines_codec.rs @@ -182,11 +182,14 @@ impl Decoder for LinesCodec { } } -impl Encoder for LinesCodec { - type Item = String; +impl<T> Encoder<T> for LinesCodec +where + T: AsRef<str>, +{ type Error = LinesCodecError; - fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), LinesCodecError> { + fn encode(&mut self, line: T, buf: &mut BytesMut) -> Result<(), LinesCodecError> { + let line = line.as_ref(); buf.reserve(line.len() + 1); buf.put(line.as_bytes()); buf.put_u8(b'\n'); diff --git a/tokio-util/src/udp/frame.rs b/tokio-util/src/udp/frame.rs index c16e12f8..5b098bd4 100644 --- a/tokio-util/src/udp/frame.rs +++ b/tokio-util/src/udp/frame.rs @@ -70,7 +70,7 @@ impl<C: Decoder + Unpin> Stream for UdpFramed<C> { } } -impl<C: Encoder + Unpin> Sink<(C::Item, SocketAddr)> for UdpFramed<C> { +impl<I, C: Encoder<I> + Unpin> Sink<(I, SocketAddr)> for UdpFramed<C> { type Error = C::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { @@ -84,7 +84,7 @@ impl<C: Encoder + Unpin> Sink<(C::Item, SocketAddr)> for UdpFramed<C> { Poll::Ready(Ok(())) } - fn start_send(self: Pin<&mut Self>, item: (C::Item, SocketAddr)) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: (I, SocketAddr)) -> Result<(), Self::Error> { let (frame, out_addr) = item; let pin = self.get_mut(); diff --git a/tokio-util/tests/codecs.rs b/tokio-util/tests/codecs.rs index d1212866..a22effbe 100644 --- a/tokio-util/tests/codecs.rs +++ b/tokio-util/tests/codecs.rs @@ -209,9 +209,9 @@ fn lines_encoder() { let mut codec = LinesCodec::new(); let mut buf = BytesMut::new(); - codec.encode(String::from("line 1"), &mut buf).unwrap(); + codec.encode("line 1", &mut buf).unwrap(); assert_eq!("line 1\n", buf); - codec.encode(String::from("line 2"), &mut buf).unwrap(); + codec.encode("line 2", &mut buf).unwrap(); assert_eq!("line 1\nline 2\n", buf); } diff --git a/tokio-util/tests/framed.rs b/tokio-util/tests/framed.rs index cb82f8df..d7ee3ef5 100644 --- a/tokio-util/tests/framed.rs +++ b/tokio-util/tests/framed.rs @@ -28,8 +28,7 @@ impl Decoder for U32Codec { } } -impl Encoder for U32Codec { - type Item = u32; +impl Encoder<u32> for U32Codec { type Error = io::Error; fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> { diff --git a/tokio-util/tests/framed_write.rs b/tokio-util/tests/framed_write.rs index b2970c39..9ac6c1d1 100644 --- a/tokio-util/tests/framed_write.rs +++ b/tokio-util/tests/framed_write.rs @@ -28,8 +28,7 @@ macro_rules! pin { struct U32Encoder; -impl Encoder for U32Encoder { - type Item = u32; +impl Encoder<u32> for U32Encoder { type Error = io::Error; fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> { diff --git a/tokio-util/tests/udp.rs b/tokio-util/tests/udp.rs index 51c65c63..0ba05742 100644 --- a/tokio-util/tests/udp.rs +++ b/tokio-util/tests/udp.rs @@ -8,6 +8,7 @@ use futures::future::FutureExt; use futures::sink::SinkExt; use std::io; +#[cfg_attr(any(target_os = "macos", target_os = "ios"), allow(unused_assignments))] #[tokio::test] async fn send_framed() -> std::io::Result<()> { let mut a_soc = UdpSocket::bind("127.0.0.1:0").await?; @@ -21,33 +22,34 @@ async fn send_framed() -> std::io::Result<()> { let mut a = UdpFramed::new(a_soc, ByteCodec); let mut b = UdpFramed::new(b_soc, ByteCodec); - let msg = b"4567".to_vec(); + let msg = b"4567"; - let send = a.send((msg.clone(), b_addr)); + let send = a.send((msg, b_addr)); let recv = b.next().map(|e| e.unwrap()); let (_, received) = try_join(send, recv).await.unwrap(); let (data, addr) = received; - assert_eq!(msg, data); + assert_eq!(msg, &*data); assert_eq!(a_addr, addr); a_soc = a.into_inner(); b_soc = b.into_inner(); } + #[cfg(not(any(target_os = "macos", target_os = "ios")))] // test sending & receiving an empty message { let mut a = UdpFramed::new(a_soc, ByteCodec); let mut b = UdpFramed::new(b_soc, ByteCodec); - let msg = b"".to_vec(); + let msg = b""; - let send = a.send((msg.clone(), b_addr)); + let send = a.send((msg, b_addr)); let recv = b.next().map(|e| e.unwrap()); let (_, received) = try_join(send, recv).await.unwrap(); let (data, addr) = received; - assert_eq!(msg, data); + assert_eq!(msg, &*data); assert_eq!(a_addr, addr); } @@ -66,13 +68,12 @@ impl Decoder for ByteCodec { } } -impl Encoder for ByteCodec { - type Item = Vec<u8>; +impl Encoder<&[u8]> for ByteCodec { type Error = io::Error; - fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> Result<(), io::Error> { + fn encode(&mut self, data: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> { buf.reserve(data.len()); - buf.put_slice(&data); + buf.put_slice(data); Ok(()) } } |