diff options
author | Carl Lerche <me@carllerche.com> | 2019-10-22 10:13:49 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-22 10:13:49 -0700 |
commit | cfc15617a5247ea780c32c85b7134b88b6de5845 (patch) | |
tree | ef0a46c61c51505a60f386c9760acac9d1f9b7b1 | |
parent | b8cee1a60ad99ef28ec494ae4230e2ef4399fcf9 (diff) |
codec: move into tokio-util (#1675)
Related to #1318, Tokio APIs that are "less stable" are moved into a new
`tokio-util` crate. This crate will mirror `tokio` and provide
additional APIs that may require a greater rate of breaking changes.
As examples require `tokio-util`, they are moved into a separate
crate (`examples`). This has the added advantage of being able to avoid
example only dependencies in the `tokio` crate.
-rw-r--r-- | Cargo.toml | 5 | ||||
-rw-r--r-- | README.md | 2 | ||||
-rw-r--r-- | azure-pipelines.yml | 4 | ||||
-rw-r--r-- | ci/patch.toml | 2 | ||||
-rw-r--r-- | examples/Cargo.toml | 52 | ||||
-rw-r--r-- | examples/README.md (renamed from tokio/examples/README.md) | 0 | ||||
-rw-r--r-- | examples/chat.rs (renamed from tokio/examples/chat.rs) | 22 | ||||
-rw-r--r-- | examples/connect.rs (renamed from tokio/examples/connect.rs) | 21 | ||||
-rw-r--r-- | examples/echo-udp.rs (renamed from tokio/examples/echo-udp.rs) | 0 | ||||
-rw-r--r-- | examples/echo.rs (renamed from tokio/examples/echo.rs) | 0 | ||||
-rw-r--r-- | examples/hello_world.rs (renamed from tokio/examples/hello_world.rs) | 0 | ||||
-rw-r--r-- | examples/print_each_packet.rs (renamed from tokio/examples/print_each_packet.rs) | 3 | ||||
-rw-r--r-- | examples/proxy.rs (renamed from tokio/examples/proxy.rs) | 0 | ||||
-rw-r--r-- | examples/tinydb.rs (renamed from tokio/examples/tinydb.rs) | 10 | ||||
-rw-r--r-- | examples/tinyhttp.rs (renamed from tokio/examples/tinyhttp.rs) | 0 | ||||
-rw-r--r-- | examples/udp-client.rs (renamed from tokio/examples/udp-client.rs) | 0 | ||||
-rw-r--r-- | examples/udp-codec.rs (renamed from tokio/examples/udp-codec.rs) | 19 | ||||
-rw-r--r-- | tokio-codec/CHANGELOG.md | 35 | ||||
-rw-r--r-- | tokio-net/Cargo.toml | 1 | ||||
-rw-r--r-- | tokio-net/src/process/mod.rs | 9 | ||||
-rw-r--r-- | tokio-net/src/udp/mod.rs | 2 | ||||
-rw-r--r-- | tokio-net/src/udp/socket.rs | 29 | ||||
-rw-r--r-- | tokio-net/src/udp/split.rs | 8 | ||||
-rw-r--r-- | tokio-net/src/uds/frame.rs | 175 | ||||
-rw-r--r-- | tokio-net/src/uds/mod.rs | 17 | ||||
-rw-r--r-- | tokio-net/tests/process_stdio.rs | 13 | ||||
-rw-r--r-- | tokio-net/tests/udp.rs | 76 | ||||
-rw-r--r-- | tokio-util/CHANGELOG.md | 0 | ||||
-rw-r--r-- | tokio-util/Cargo.toml (renamed from tokio-codec/Cargo.toml) | 7 | ||||
-rw-r--r-- | tokio-util/LICENSE (renamed from tokio-codec/LICENSE) | 0 | ||||
-rw-r--r-- | tokio-util/README.md (renamed from tokio-codec/README.md) | 2 | ||||
-rw-r--r-- | tokio-util/src/codec/bytes_codec.rs (renamed from tokio-codec/src/bytes_codec.rs) | 5 | ||||
-rw-r--r-- | tokio-util/src/codec/decoder.rs (renamed from tokio-codec/src/decoder.rs) | 12 | ||||
-rw-r--r-- | tokio-util/src/codec/encoder.rs (renamed from tokio-codec/src/encoder.rs) | 0 | ||||
-rw-r--r-- | tokio-util/src/codec/framed.rs (renamed from tokio-codec/src/framed.rs) | 10 | ||||
-rw-r--r-- | tokio-util/src/codec/framed_read.rs (renamed from tokio-codec/src/framed_read.rs) | 4 | ||||
-rw-r--r-- | tokio-util/src/codec/framed_write.rs (renamed from tokio-codec/src/framed_write.rs) | 8 | ||||
-rw-r--r-- | tokio-util/src/codec/length_delimited.rs (renamed from tokio-codec/src/length_delimited.rs) | 50 | ||||
-rw-r--r-- | tokio-util/src/codec/lines_codec.rs (renamed from tokio-codec/src/lines_codec.rs) | 9 | ||||
-rw-r--r-- | tokio-util/src/codec/macros.rs (renamed from tokio-codec/src/macros.rs) | 0 | ||||
-rw-r--r-- | tokio-util/src/codec/mod.rs (renamed from tokio-codec/src/lib.rs) | 37 | ||||
-rw-r--r-- | tokio-util/src/lib.rs | 17 | ||||
-rw-r--r-- | tokio-util/src/udp/frame.rs (renamed from tokio-net/src/udp/frame.rs) | 26 | ||||
-rw-r--r-- | tokio-util/src/udp/mod.rs | 4 | ||||
-rw-r--r-- | tokio-util/tests/codecs.rs (renamed from tokio-codec/tests/codecs.rs) | 3 | ||||
-rw-r--r-- | tokio-util/tests/framed.rs (renamed from tokio-codec/tests/framed.rs) | 2 | ||||
-rw-r--r-- | tokio-util/tests/framed_read.rs (renamed from tokio-codec/tests/framed_read.rs) | 2 | ||||
-rw-r--r-- | tokio-util/tests/framed_write.rs (renamed from tokio-codec/tests/framed_write.rs) | 2 | ||||
-rw-r--r-- | tokio-util/tests/length_delimited.rs (renamed from tokio-codec/tests/length_delimited.rs) | 2 | ||||
-rw-r--r-- | tokio-util/tests/udp.rs | 79 | ||||
-rw-r--r-- | tokio/Cargo.toml | 5 | ||||
-rw-r--r-- | tokio/README.md | 2 | ||||
-rw-r--r-- | tokio/src/net.rs | 4 |
53 files changed, 318 insertions, 479 deletions
@@ -2,7 +2,6 @@ members = [ "tokio", - "tokio-codec", "tokio-executor", "tokio-io", "tokio-macros", @@ -10,5 +9,9 @@ members = [ "tokio-sync", "tokio-test", "tokio-tls", + "tokio-util", + + # Internal + "examples", "build-tests", ] @@ -94,7 +94,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { ``` -More examples can be found [here](tokio/examples). Note that the `master` branch +More examples can be found [here](examples). Note that the `master` branch is currently being updated to use `async` / `await`. The examples are not fully ported. Examples for stable Tokio can be found [here](https://github.com/tokio-rs/tokio/tree/v0.1.x/tokio/examples). diff --git a/azure-pipelines.yml b/azure-pipelines.yml index b943bfe6..2acdb522 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -26,7 +26,6 @@ jobs: cross: true crates: tokio: - - codec - fs - io - net @@ -61,7 +60,6 @@ jobs: displayName: Test sub crates - rust: beta crates: - tokio-codec: [] tokio-executor: - current-thread - thread-pool @@ -71,6 +69,8 @@ jobs: - async-traits tokio-macros: [] tokio-test: [] + tokio-util: [] + examples: [] # Test compilation failure - template: ci/azure-test-stable.yml diff --git a/ci/patch.toml b/ci/patch.toml index e6dc9148..6d739341 100644 --- a/ci/patch.toml +++ b/ci/patch.toml @@ -2,10 +2,10 @@ # repository. [patch.crates-io] tokio = { path = "tokio" } -tokio-codec = { path = "tokio-codec" } tokio-executor = { path = "tokio-executor" } tokio-io = { path = "tokio-io" } tokio-macros = { path = "tokio-macros" } tokio-net = { path = "tokio-net" } tokio-sync = { path = "tokio-sync" } tokio-tls = { path = "tokio-tls" } +tokio-util = { path = "tokio-util" } diff --git a/examples/Cargo.toml b/examples/Cargo.toml new file mode 100644 index 00000000..84a546f7 --- /dev/null +++ b/examples/Cargo.toml @@ -0,0 +1,52 @@ +[package] +name = "examples" +version = "0.0.0" +publish = false +edition = "2018" + +[dev-dependencies] +tokio = { version = "=0.2.0-alpha.6", path = "../tokio" } +tokio-util = { version = "=0.2.0-alpha.6", path = "../tokio-util" } + +bytes = "0.4.12" +futures-preview = "=0.3.0-alpha.19" + +[[example]] +name = "chat" +path = "chat.rs" + +[[example]] +name = "connect" +path = "connect.rs" + +[[example]] +name = "echo-udp" +path = "echo-udp.rs" + +[[example]] +name = "echo" +path = "echo.rs" + +[[example]] +name = "hello_world" +path = "hello_world.rs" + +[[example]] +name = "print_each_packet" +path = "print_each_packet.rs" + +[[example]] +name = "proxy" +path = "proxy.rs" + +[[example]] +name = "tinydb" +path = "tinydb.rs" + +[[example]] +name = "udp-client" +path = "udp-client.rs" + +[[example]] +name = "udp-codec" +path = "udp-codec.rs" diff --git a/tokio/examples/README.md b/examples/README.md index 802d0aa4..802d0aa4 100644 --- a/tokio/examples/README.md +++ b/examples/README.md diff --git a/tokio/examples/chat.rs b/examples/chat.rs index 7fc317d8..0a3976d5 100644 --- a/tokio/examples/chat.rs +++ b/examples/chat.rs @@ -26,17 +26,19 @@ #![warn(rust_2018_idioms)] +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::{mpsc, Mutex}; +use tokio_util::codec::{Framed, LinesCodec, LinesCodecError}; + use futures::{Poll, SinkExt, Stream, StreamExt}; -use std::{ - collections::HashMap, env, error::Error, io, net::SocketAddr, pin::Pin, sync::Arc, - task::Context, -}; -use tokio::{ - self, - codec::{Framed, LinesCodec, LinesCodecError}, - net::{TcpListener, TcpStream}, - sync::{mpsc, Mutex}, -}; +use std::collections::HashMap; +use std::env; +use std::error::Error; +use std::io; +use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { diff --git a/tokio/examples/connect.rs b/examples/connect.rs index c53471d8..0dd14ef2 100644 --- a/tokio/examples/connect.rs +++ b/examples/connect.rs @@ -16,13 +16,14 @@ #![warn(rust_2018_idioms)] +use tokio::io; +use tokio::sync::{mpsc, oneshot}; +use tokio_util::codec::{FramedRead, FramedWrite}; + use futures::{SinkExt, Stream}; -use std::{env, error::Error, net::SocketAddr}; -use tokio::{ - codec::{FramedRead, FramedWrite}, - io, - sync::{mpsc, oneshot}, -}; +use std::env; +use std::error::Error; +use std::net::SocketAddr; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { @@ -83,10 +84,8 @@ mod tcp { use super::codec; use futures::{future, Sink, SinkExt, Stream, StreamExt}; use std::{error::Error, io, net::SocketAddr}; - use tokio::{ - codec::{FramedRead, FramedWrite}, - net::TcpStream, - }; + use tokio::net::TcpStream; + use tokio_util::codec::{FramedRead, FramedWrite}; pub async fn connect( addr: &SocketAddr, @@ -171,7 +170,7 @@ mod udp { mod codec { use bytes::{BufMut, BytesMut}; use std::io; - use tokio::codec::{Decoder, Encoder}; + use tokio_util::codec::{Decoder, Encoder}; /// A simple `Codec` implementation that just ships bytes around. /// diff --git a/tokio/examples/echo-udp.rs b/examples/echo-udp.rs index f1e8134d..f1e8134d 100644 --- a/tokio/examples/echo-udp.rs +++ b/examples/echo-udp.rs diff --git a/tokio/examples/echo.rs b/examples/echo.rs index 455aebde..455aebde 100644 --- a/tokio/examples/echo.rs +++ b/examples/echo.rs diff --git a/tokio/examples/hello_world.rs b/examples/hello_world.rs index 8ff40902..8ff40902 100644 --- a/tokio/examples/hello_world.rs +++ b/examples/hello_world.rs diff --git a/tokio/examples/print_each_packet.rs b/examples/print_each_packet.rs index 3729c3e9..0a275545 100644 --- a/tokio/examples/print_each_packet.rs +++ b/examples/print_each_packet.rs @@ -54,10 +54,9 @@ #![warn(rust_2018_idioms)] -use tokio; -use tokio::codec::{BytesCodec, Decoder}; use tokio::net::TcpListener; use tokio::prelude::*; +use tokio_util::codec::{BytesCodec, Decoder}; use std::env; diff --git a/tokio/examples/proxy.rs b/examples/proxy.rs index 6886a813..6886a813 100644 --- a/tokio/examples/proxy.rs +++ b/examples/proxy.rs diff --git a/tokio/examples/tinydb.rs b/examples/tinydb.rs index be4951dc..3fc88f6b 100644 --- a/tokio/examples/tinydb.rs +++ b/examples/tinydb.rs @@ -41,17 +41,15 @@ #![warn(rust_2018_idioms)] +use tokio::net::TcpListener; +use tokio_util::codec::{Framed, LinesCodec}; + +use futures::{SinkExt, StreamExt}; use std::collections::HashMap; use std::env; use std::error::Error; use std::sync::{Arc, Mutex}; -use tokio; -use tokio::codec::{Framed, LinesCodec}; -use tokio::net::TcpListener; - -use futures::{SinkExt, StreamExt}; - /// The in-memory database shared amongst all clients. /// /// This database will be shared via `Arc`, so to mutate the internal map we're diff --git a/tokio/examples/tinyhttp.rs b/examples/tinyhttp.rs index 65074018..65074018 100644 --- a/tokio/examples/tinyhttp.rs +++ b/examples/tinyhttp.rs diff --git a/tokio/examples/udp-client.rs b/examples/udp-client.rs index 5437daf6..5437daf6 100644 --- a/tokio/examples/udp-client.rs +++ b/examples/udp-client.rs diff --git a/tokio/examples/udp-codec.rs b/examples/udp-codec.rs index 7d0aaf69..baf64886 100644 --- a/tokio/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -6,26 +6,23 @@ //! new message with a new destination. Overall, we then use this to construct a //! "ping pong" pair where two sockets are sending messages back and forth. -#![cfg(feature = "rt-full")] #![warn(rust_2018_idioms)] +use tokio::future::FutureExt as TokioFutureExt; +use tokio::io; +use tokio::net::UdpSocket; +use tokio_util::codec::BytesCodec; +use tokio_util::udp::UdpFramed; + +use bytes::Bytes; +use futures::{FutureExt, SinkExt, StreamExt}; use std::env; use std::error::Error; use std::net::SocketAddr; use std::time::Duration; -use bytes::Bytes; - -use futures::{FutureExt, SinkExt, StreamExt}; -use tokio::codec::BytesCodec; -use tokio::future::FutureExt as TokioFutureExt; -use tokio::io; -use tokio::net::{UdpFramed, UdpSocket}; - #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { - let _ = env_logger::init(); - let addr = env::args().nth(1).unwrap_or("127.0.0.1:0".to_string()); // Bind both our sockets and then figure out what ports we got. diff --git a/tokio-codec/CHANGELOG.md b/tokio-codec/CHANGELOG.md deleted file mode 100644 index 2f0ca5f5..00000000 --- a/tokio-codec/CHANGELOG.md +++ /dev/null @@ -1,35 +0,0 @@ -# 0.2.0-alpha.6 (September 30, 2019) - -- Move to `futures-*-preview 0.3.0-alpha.19` -- Move to `pin-project 0.4` - -# 0.2.0-alpha.5 (September 19, 2019) - -- Track tokio release - -# 0.2.0-alpha.4 (August 29, 2019) - -- Track tokio release. - -# 0.2.0-alpha.3 (August 28, 2019) - -### Fix -- Infinite loop in `LinesCodec` (#1489). - -# 0.2.0-alpha.2 (August 17, 2019) - -### Changed -- Update `futures` dependency to 0.3.0-alpha.18. - -# 0.2.0-alpha.1 (August 8, 2019) - -### Changed -- Switch to `async`, `await`, and `std::future`. - -# 0.1.1 (September 26, 2018) - -* Allow setting max line length with `LinesCodec` (#632) - -# 0.1.0 (June 13, 2018) - -* Initial release (#353) diff --git a/tokio-net/Cargo.toml b/tokio-net/Cargo.toml index d7df3397..f19fd759 100644 --- a/tokio-net/Cargo.toml +++ b/tokio-net/Cargo.toml @@ -62,7 +62,6 @@ uds = [ log = ["tracing/log"] [dependencies] -tokio-codec = { version = "=0.2.0-alpha.6", path = "../tokio-codec" } tokio-executor = { version = "=0.2.0-alpha.6", features = ["blocking"], path = "../tokio-executor" } tokio-io = { version = "=0.2.0-alpha.6", path = "../tokio-io" } tokio-sync = { version = "=0.2.0-alpha.6", path = "../tokio-sync" } diff --git a/tokio-net/src/process/mod.rs b/tokio-net/src/process/mod.rs index 140daa5d..fbe905ad 100644 --- a/tokio-net/src/process/mod.rs +++ b/tokio-net/src/process/mod.rs @@ -55,10 +55,11 @@ //! We can also read input line by line. //! //! ```no_run +//! use tokio::io::{BufReader, AsyncBufReadExt}; +//! use tokio::process::Command; +//! //! use futures_util::stream::StreamExt; -//! use std::process::{Stdio}; -//! use tokio::codec::{FramedRead, LinesCodec}; -//! use tokio_net::process::Command; +//! use std::process::Stdio; //! //! #[tokio::main] //! async fn main() -> Result<(), Box<dyn std::error::Error>> { @@ -77,7 +78,7 @@ //! let stdout = child.stdout().take() //! .expect("child did not have a handle to stdout"); //! -//! let mut reader = FramedRead::new(stdout, LinesCodec::new()); +//! let mut reader = BufReader::new(stdout).lines(); //! //! // Ensure the child process is spawned in the runtime so it can //! // make progress on its own while we await for any output. diff --git a/tokio-net/src/udp/mod.rs b/tokio-net/src/udp/mod.rs index e5c06585..45656773 100644 --- a/tokio-net/src/udp/mod.rs +++ b/tokio-net/src/udp/mod.rs @@ -7,9 +7,7 @@ //! //! [`UdpSocket`]: struct.UdpSocket -mod frame; mod socket; pub mod split; -pub use self::frame::UdpFramed; pub use self::socket::UdpSocket; diff --git a/tokio-net/src/udp/socket.rs b/tokio-net/src/udp/socket.rs index c59b564b..90f212a2 100644 --- a/tokio-net/src/udp/socket.rs +++ b/tokio-net/src/udp/socket.rs @@ -108,7 +108,7 @@ impl UdpSocket { /// /// [`connect`]: #method.connect pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { - poll_fn(|cx| self.poll_send_priv(cx, buf)).await + poll_fn(|cx| self.poll_send(cx, buf)).await } // Poll IO functions that takes `&self` are provided for the split API. @@ -121,11 +121,8 @@ impl UdpSocket { // While violating this requirement is "safe" from a Rust memory model point // of view, it will result in unexpected behavior in the form of lost // notifications and tasks hanging. - pub(crate) fn poll_send_priv( - &self, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<io::Result<usize>> { + #[doc(hidden)] + pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> { ready!(self.io.poll_write_ready(cx))?; match self.io.get_ref().send(buf) { @@ -150,14 +147,11 @@ impl UdpSocket { /// /// [`connect`]: #method.connect pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { - poll_fn(|cx| self.poll_recv_priv(cx, buf)).await + poll_fn(|cx| self.poll_recv(cx, buf)).await } - pub(crate) fn poll_recv_priv( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll<io::Result<usize>> { + #[doc(hidden)] + pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> { ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; match self.io.get_ref().recv(buf) { @@ -178,7 +172,7 @@ impl UdpSocket { let mut addrs = target.to_socket_addrs().await?; match addrs.next() { - Some(target) => poll_fn(|cx| self.poll_send_to_priv(cx, buf, &target)).await, + Some(target) => poll_fn(|cx| self.poll_send_to(cx, buf, &target)).await, None => Err(io::Error::new( io::ErrorKind::InvalidInput, "no addresses to send data to", @@ -186,7 +180,9 @@ impl UdpSocket { } } - pub(crate) fn poll_send_to_priv( + // TODO: Public or not? + #[doc(hidden)] + pub fn poll_send_to( &self, cx: &mut Context<'_>, buf: &[u8], @@ -210,10 +206,11 @@ impl UdpSocket { /// to hold the message bytes. If a message is too long to fit in the supplied /// buffer, excess bytes may be discarded. pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.poll_recv_from_priv(cx, buf)).await + poll_fn(|cx| self.poll_recv_from(cx, buf)).await } - pub(crate) fn poll_recv_from_priv( + #[doc(hidden)] + pub fn poll_recv_from( &self, cx: &mut Context<'_>, buf: &mut [u8], diff --git a/tokio-net/src/udp/split.rs b/tokio-net/src/udp/split.rs index e58f9276..ad8ce061 100644 --- a/tokio-net/src/udp/split.rs +++ b/tokio-net/src/udp/split.rs @@ -86,7 +86,7 @@ impl UdpSocketRecvHalf { /// to hold the message bytes. If a message is too long to fit in the supplied /// buffer, excess bytes may be discarded. pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.0.poll_recv_from_priv(cx, buf)).await + poll_fn(|cx| self.0.poll_recv_from(cx, buf)).await } /// Returns a future that receives a single datagram message on the socket from @@ -102,7 +102,7 @@ impl UdpSocketRecvHalf { /// /// [`connect`]: super::UdpSocket::connect pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { - poll_fn(|cx| self.0.poll_recv_priv(cx, buf)).await + poll_fn(|cx| self.0.poll_recv(cx, buf)).await } } @@ -120,7 +120,7 @@ impl UdpSocketSendHalf { /// The future will resolve to an error if the IP version of the socket does /// not match that of `target`. pub async fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> { - poll_fn(|cx| self.0.poll_send_to_priv(cx, buf, target)).await + poll_fn(|cx| self.0.poll_send_to(cx, buf, target)).await } /// Returns a future that sends data on the socket to the remote address to which it is connected. @@ -131,7 +131,7 @@ impl UdpSocketSendHalf { /// /// [`connect`]: super::UdpSocket::connect pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { - poll_fn(|cx| self.0.poll_send_priv(cx, buf)).await + poll_fn(|cx| self.0.poll_send(cx, buf)).await } } diff --git a/tokio-net/src/uds/frame.rs b/tokio-net/src/uds/frame.rs deleted file mode 100644 index 584da8ae..00000000 --- a/tokio-net/src/uds/frame.rs +++ /dev/null @@ -1,175 +0,0 @@ -use super::UnixDatagram; -use bytes::{BufMut, BytesMut}; -use futures::{try_ready, Async, AsyncSink, Poll, Sink, StartSend, Stream}; -use std::io; -use std::os::unix::net::SocketAddr; -use std::path::Path; -use tokio_codec::{Decoder, Encoder}; - -/// A unified `Stream` and `Sink` interface to an underlying `UnixDatagram`, using -/// the `Encoder` and `Decoder` traits to encode and decode frames. -/// -/// Unix datagram sockets work with datagrams, but higher-level code may wants to -/// batch these into meaningful chunks, called "frames". This method layers -/// framing on top of this socket by using the `Encoder` and `Decoder` traits to -/// handle encoding and decoding of messages frames. Note that the incoming and -/// outgoing frame types may be distinct. -/// -/// This function returns a *single* object that is both `Stream` and `Sink`; -/// grouping this into a single object is often useful for layering things which -/// require both read and write access to the underlying object. -/// -/// If you want to work more directly with the streams and sink, consider -/// calling `split` on the `UnixDatagramFramed` returned by this method, which will break -/// them into separate objects, allowing them to interact more easily. -#[must_use = "sinks do nothing unless polled"] -#[derive(Debug)] -pub struct UnixDatagramFramed<A, C> { - socket: UnixDatagram, - codec: C, - rd: BytesMut, - wr: BytesMut, - out_addr: Option<A>, - flushed: bool, -} - -impl<A, C: Decoder> Stream for UnixDatagramFramed<A, C> { - type Item = (C::Item, SocketAddr); - type Error = C::Error; - - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - self.rd.reserve(INITIAL_RD_CAPACITY); - - let (_n, addr) = unsafe { - let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut())); - self.rd.advance_mut(n); - (n, addr) - }; - - let span = trace_span!("decoding", from.addr = %addr, dgram.length = _n); - let _e = span.enter(); - trace!("trying to decode a frame..."); - - let frame_res = self.codec.decode(&mut self.rd); - self.rd.clear(); - let frame = frame_res?; - let result = frame.map(|frame| (frame, addr)); - trace!("frame decoded from buffer"); - Ok(Async::Ready(result)) - } -} - -impl<A: AsRef<Path>, C: Encoder> Sink for UnixDatagramFramed<A, C> { - type SinkItem = (C::Item, A); - type SinkError = C::Error; - - fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { - let span = trace_span!("sending", to.addr = %item.0, flushed = self.flushed); - let _e = span.enter(); - - trace!("sending frame..."); - - if !self.flushed { - match self.poll_complete()? { - Async::Ready(()) => {} - Async::NotReady => return Ok(AsyncSink::NotReady(item)), - } - } - - let (frame, out_addr) = item; - self.codec.encode(frame, &mut self.wr)?; - self.out_addr = Some(out_addr); - self.flushed = false; - trace!(message = "frame encoded", frame.length = pin.wr.len()); - - Ok(AsyncSink::Ready) - } - - fn poll_complete(&mut self) -> Poll<(), C::Error> { |