diff options
author | Roman <humbug@deeptown.org> | 2018-02-07 01:41:31 +0400 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2018-02-06 13:41:31 -0800 |
commit | ad8338e4da63f659acce89284381d08a2474f85b (patch) | |
tree | e98a11f5aed7663c88956eb9635747389aae144e /examples | |
parent | 73b763f69fe517fdbbb0360bd9c0a50db8f8f62c (diff) |
Remove UdpCodec (#109)
`UdpFramed` is updated to use the `Encoder` and
`Decoder` traits from `tokio-io`.
Diffstat (limited to 'examples')
-rw-r--r-- | examples/README.md | 4 | ||||
-rw-r--r-- | examples/connect.rs | 105 | ||||
-rw-r--r-- | examples/udp-codec.rs | 40 |
3 files changed, 59 insertions, 90 deletions
diff --git a/examples/README.md b/examples/README.md index 688984b9..3f4734c6 100644 --- a/examples/README.md +++ b/examples/README.md @@ -39,8 +39,8 @@ A high level description of each example is: showcasing running on multiple cores, working with futures and spawning tasks, and finally framing a TCP connection to discrete request/response objects. -* `udp-codec` - an example of using the `UdpCodec` trait along with a small - ping-pong protocol happening locally. +* `udp-codec` - an example of using the `Encoder`/`Decoder` traits for UDP + along with a small ping-pong protocol happening locally. * `compress` - an echo-like server where instead of echoing back everything read it echos back a gzip-compressed version of everything read! All compression occurs on a CPU pool to offload work from the event loop. diff --git a/examples/connect.rs b/examples/connect.rs index a4160449..275864d1 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -76,17 +76,55 @@ fn main() { }).wait().unwrap(); } +mod codec { + use std::io; + use bytes::{BufMut, BytesMut}; + use tokio_io::codec::{Encoder, Decoder}; + /// A simple `Codec` implementation that just ships bytes around. + /// + /// This type is used for "framing" a TCP/UDP stream of bytes but it's really + /// just a convenient method for us to work with streams/sinks for now. + /// This'll just take any data read and interpret it as a "frame" and + /// conversely just shove data into the output location without looking at + /// it. + pub struct Bytes; + + impl Decoder for Bytes { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> { + if buf.len() > 0 { + let len = buf.len(); + Ok(Some(buf.split_to(len))) + } else { + Ok(None) + } + } + } + + impl Encoder for Bytes { + type Item = Vec<u8>; + type Error = io::Error; + + fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> { + buf.put(&data[..]); + Ok(()) + } + } +} + mod tcp { use std::io; use std::net::SocketAddr; - use bytes::{BufMut, BytesMut}; + use bytes::BytesMut; use futures::{Future, Stream}; use futures::future::Executor; use futures_cpupool::CpuPool; use tokio::net::TcpStream; use tokio_io::AsyncRead; - use tokio_io::codec::{Encoder, Decoder}; + use codec::Bytes; pub fn connect(addr: &SocketAddr, pool: &CpuPool, @@ -122,43 +160,6 @@ mod tcp { stream }).flatten_stream()) } - - /// A simple `Codec` implementation that just ships bytes around. - /// - /// This type is used for "framing" a TCP stream of bytes but it's really - /// just a convenient method for us to work with streams/sinks for now. - /// This'll just take any data read and interpret it as a "frame" and - /// conversely just shove data into the output location without looking at - /// it. - struct Bytes; - - impl Decoder for Bytes { - type Item = BytesMut; - type Error = io::Error; - - fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> { - if buf.len() > 0 { - let len = buf.len(); - Ok(Some(buf.split_to(len))) - } else { - Ok(None) - } - } - - fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> { - self.decode(buf) - } - } - - impl Encoder for Bytes { - type Item = Vec<u8>; - type Error = io::Error; - - fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> { - buf.put(&data[..]); - Ok(()) - } - } } mod udp { @@ -169,7 +170,8 @@ mod udp { use futures::{Future, Stream}; use futures::future::Executor; use futures_cpupool::CpuPool; - use tokio::net::{UdpCodec, UdpSocket}; + use tokio::net::UdpSocket; + use codec::Bytes; pub fn connect(&addr: &SocketAddr, pool: &CpuPool, @@ -186,7 +188,7 @@ mod udp { let udp = UdpSocket::bind(&addr_to_bind) .expect("failed to bind socket"); - // Like above with TCP we use an instance of `UdpCodec` to transform + // Like above with TCP we use an instance of `Bytes` codec to transform // this UDP socket into a framed sink/stream which operates over // discrete values. In this case we're working with *pairs* of socket // addresses and byte buffers. @@ -195,7 +197,7 @@ mod udp { // All bytes from `stdin` will go to the `addr` specified in our // argument list. Like with TCP this is spawned concurrently pool.execute(stdin.map(move |chunk| { - (addr, chunk) + (chunk, addr) }).forward(sink).then(|result| { if let Err(e) = result { panic!("failed to write to socket: {}", e) @@ -205,7 +207,7 @@ mod udp { // With UDP we could receive data from any source, so filter out // anything coming from a different address - Box::new(stream.filter_map(move |(src, chunk)| { + Box::new(stream.filter_map(move |(chunk, src)| { if src == addr { Some(chunk.into()) } else { @@ -213,23 +215,6 @@ mod udp { } })) } - - struct Bytes; - - impl UdpCodec for Bytes { - type In = (SocketAddr, Vec<u8>); - type Out = (SocketAddr, Vec<u8>); - type Error = io::Error; - - fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> { - Ok((*addr, buf.to_vec())) - } - - fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> io::Result<SocketAddr> { - into.extend(buf); - Ok(addr) - } - } } // Our helper method which will read data from stdin and send it along the diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index c874ebd7..a2429e49 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -1,40 +1,24 @@ -//! This is a basic example of leveraging `UdpCodec` to create a simple UDP +//! This is a basic example of leveraging `BytesCodec` to create a simple UDP //! client and server which speak a custom protocol. //! -//! Here we're using the a custom codec to convert a UDP socket to a stream of +//! Here we're using the codec from tokio-io to convert a UDP socket to a stream of //! client messages. These messages are then processed and returned back as a //! new message with a new destination. Overall, we then use this to construct a //! "ping pong" pair where two sockets are sending messages back and forth. extern crate tokio; +extern crate tokio_io; extern crate env_logger; extern crate futures; extern crate futures_cpupool; -use std::io; use std::net::SocketAddr; use futures::{Future, Stream, Sink}; use futures::future::Executor; use futures_cpupool::CpuPool; -use tokio::net::{UdpSocket, UdpCodec}; - -pub struct LineCodec; - -impl UdpCodec for LineCodec { - type In = (SocketAddr, Vec<u8>); - type Out = (SocketAddr, Vec<u8>); - type Error = io::Error; - - fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> { - Ok((*addr, buf.to_vec())) - } - - fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> io::Result<SocketAddr> { - into.extend(buf); - Ok(addr) - } -} +use tokio::net::UdpSocket; +use tokio_io::codec::BytesCodec; fn main() { drop(env_logger::init()); @@ -50,27 +34,27 @@ fn main() { // We're parsing each socket with the `LineCodec` defined above, and then we // `split` each codec into the sink/stream halves. - let (a_sink, a_stream) = a.framed(LineCodec).split(); - let (b_sink, b_stream) = b.framed(LineCodec).split(); + let (a_sink, a_stream) = a.framed(BytesCodec::new()).split(); + let (b_sink, b_stream) = b.framed(BytesCodec::new()).split(); // Start off by sending a ping from a to b, afterwards we just print out // what they send us and continually send pings // let pings = stream::iter((0..5).map(Ok)); - let a = a_sink.send((b_addr, b"PING".to_vec())).and_then(|a_sink| { + let a = a_sink.send(("PING".into(), b_addr)).and_then(|a_sink| { let mut i = 0; - let a_stream = a_stream.take(4).map(move |(addr, msg)| { + let a_stream = a_stream.take(4).map(move |(msg, addr)| { i += 1; println!("[a] recv: {}", String::from_utf8_lossy(&msg)); - (addr, format!("PING {}", i).into_bytes()) + (format!("PING {}", i).into(), addr) }); a_sink.send_all(a_stream) }); // The second client we have will receive the pings from `a` and then send // back pongs. - let b_stream = b_stream.map(|(addr, msg)| { + let b_stream = b_stream.map(|(msg, addr)| { println!("[b] recv: {}", String::from_utf8_lossy(&msg)); - (addr, b"PONG".to_vec()) + ("PONG".into(), addr) }); let b = b_sink.send_all(b_stream); |