summaryrefslogtreecommitdiffstats
path: root/examples
diff options
context:
space:
mode:
authorRoman <humbug@deeptown.org>2018-02-07 01:41:31 +0400
committerCarl Lerche <me@carllerche.com>2018-02-06 13:41:31 -0800
commitad8338e4da63f659acce89284381d08a2474f85b (patch)
treee98a11f5aed7663c88956eb9635747389aae144e /examples
parent73b763f69fe517fdbbb0360bd9c0a50db8f8f62c (diff)
Remove UdpCodec (#109)
`UdpFramed` is updated to use the `Encoder` and `Decoder` traits from `tokio-io`.
Diffstat (limited to 'examples')
-rw-r--r--examples/README.md4
-rw-r--r--examples/connect.rs105
-rw-r--r--examples/udp-codec.rs40
3 files changed, 59 insertions, 90 deletions
diff --git a/examples/README.md b/examples/README.md
index 688984b9..3f4734c6 100644
--- a/examples/README.md
+++ b/examples/README.md
@@ -39,8 +39,8 @@ A high level description of each example is:
showcasing running on multiple cores, working with futures and spawning
tasks, and finally framing a TCP connection to discrete request/response
objects.
-* `udp-codec` - an example of using the `UdpCodec` trait along with a small
- ping-pong protocol happening locally.
+* `udp-codec` - an example of using the `Encoder`/`Decoder` traits for UDP
+ along with a small ping-pong protocol happening locally.
* `compress` - an echo-like server where instead of echoing back everything read
it echos back a gzip-compressed version of everything read! All compression
occurs on a CPU pool to offload work from the event loop.
diff --git a/examples/connect.rs b/examples/connect.rs
index a4160449..275864d1 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -76,17 +76,55 @@ fn main() {
}).wait().unwrap();
}
+mod codec {
+ use std::io;
+ use bytes::{BufMut, BytesMut};
+ use tokio_io::codec::{Encoder, Decoder};
+ /// A simple `Codec` implementation that just ships bytes around.
+ ///
+ /// This type is used for "framing" a TCP/UDP stream of bytes but it's really
+ /// just a convenient method for us to work with streams/sinks for now.
+ /// This'll just take any data read and interpret it as a "frame" and
+ /// conversely just shove data into the output location without looking at
+ /// it.
+ pub struct Bytes;
+
+ impl Decoder for Bytes {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> {
+ if buf.len() > 0 {
+ let len = buf.len();
+ Ok(Some(buf.split_to(len)))
+ } else {
+ Ok(None)
+ }
+ }
+ }
+
+ impl Encoder for Bytes {
+ type Item = Vec<u8>;
+ type Error = io::Error;
+
+ fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> {
+ buf.put(&data[..]);
+ Ok(())
+ }
+ }
+}
+
mod tcp {
use std::io;
use std::net::SocketAddr;
- use bytes::{BufMut, BytesMut};
+ use bytes::BytesMut;
use futures::{Future, Stream};
use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::TcpStream;
use tokio_io::AsyncRead;
- use tokio_io::codec::{Encoder, Decoder};
+ use codec::Bytes;
pub fn connect(addr: &SocketAddr,
pool: &CpuPool,
@@ -122,43 +160,6 @@ mod tcp {
stream
}).flatten_stream())
}
-
- /// A simple `Codec` implementation that just ships bytes around.
- ///
- /// This type is used for "framing" a TCP stream of bytes but it's really
- /// just a convenient method for us to work with streams/sinks for now.
- /// This'll just take any data read and interpret it as a "frame" and
- /// conversely just shove data into the output location without looking at
- /// it.
- struct Bytes;
-
- impl Decoder for Bytes {
- type Item = BytesMut;
- type Error = io::Error;
-
- fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> {
- if buf.len() > 0 {
- let len = buf.len();
- Ok(Some(buf.split_to(len)))
- } else {
- Ok(None)
- }
- }
-
- fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> {
- self.decode(buf)
- }
- }
-
- impl Encoder for Bytes {
- type Item = Vec<u8>;
- type Error = io::Error;
-
- fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> {
- buf.put(&data[..]);
- Ok(())
- }
- }
}
mod udp {
@@ -169,7 +170,8 @@ mod udp {
use futures::{Future, Stream};
use futures::future::Executor;
use futures_cpupool::CpuPool;
- use tokio::net::{UdpCodec, UdpSocket};
+ use tokio::net::UdpSocket;
+ use codec::Bytes;
pub fn connect(&addr: &SocketAddr,
pool: &CpuPool,
@@ -186,7 +188,7 @@ mod udp {
let udp = UdpSocket::bind(&addr_to_bind)
.expect("failed to bind socket");
- // Like above with TCP we use an instance of `UdpCodec` to transform
+ // Like above with TCP we use an instance of `Bytes` codec to transform
// this UDP socket into a framed sink/stream which operates over
// discrete values. In this case we're working with *pairs* of socket
// addresses and byte buffers.
@@ -195,7 +197,7 @@ mod udp {
// All bytes from `stdin` will go to the `addr` specified in our
// argument list. Like with TCP this is spawned concurrently
pool.execute(stdin.map(move |chunk| {
- (addr, chunk)
+ (chunk, addr)
}).forward(sink).then(|result| {
if let Err(e) = result {
panic!("failed to write to socket: {}", e)
@@ -205,7 +207,7 @@ mod udp {
// With UDP we could receive data from any source, so filter out
// anything coming from a different address
- Box::new(stream.filter_map(move |(src, chunk)| {
+ Box::new(stream.filter_map(move |(chunk, src)| {
if src == addr {
Some(chunk.into())
} else {
@@ -213,23 +215,6 @@ mod udp {
}
}))
}
-
- struct Bytes;
-
- impl UdpCodec for Bytes {
- type In = (SocketAddr, Vec<u8>);
- type Out = (SocketAddr, Vec<u8>);
- type Error = io::Error;
-
- fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> {
- Ok((*addr, buf.to_vec()))
- }
-
- fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> io::Result<SocketAddr> {
- into.extend(buf);
- Ok(addr)
- }
- }
}
// Our helper method which will read data from stdin and send it along the
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs
index c874ebd7..a2429e49 100644
--- a/examples/udp-codec.rs
+++ b/examples/udp-codec.rs
@@ -1,40 +1,24 @@
-//! This is a basic example of leveraging `UdpCodec` to create a simple UDP
+//! This is a basic example of leveraging `BytesCodec` to create a simple UDP
//! client and server which speak a custom protocol.
//!
-//! Here we're using the a custom codec to convert a UDP socket to a stream of
+//! Here we're using the codec from tokio-io to convert a UDP socket to a stream of
//! client messages. These messages are then processed and returned back as a
//! new message with a new destination. Overall, we then use this to construct a
//! "ping pong" pair where two sockets are sending messages back and forth.
extern crate tokio;
+extern crate tokio_io;
extern crate env_logger;
extern crate futures;
extern crate futures_cpupool;
-use std::io;
use std::net::SocketAddr;
use futures::{Future, Stream, Sink};
use futures::future::Executor;
use futures_cpupool::CpuPool;
-use tokio::net::{UdpSocket, UdpCodec};
-
-pub struct LineCodec;
-
-impl UdpCodec for LineCodec {
- type In = (SocketAddr, Vec<u8>);
- type Out = (SocketAddr, Vec<u8>);
- type Error = io::Error;
-
- fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> {
- Ok((*addr, buf.to_vec()))
- }
-
- fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> io::Result<SocketAddr> {
- into.extend(buf);
- Ok(addr)
- }
-}
+use tokio::net::UdpSocket;
+use tokio_io::codec::BytesCodec;
fn main() {
drop(env_logger::init());
@@ -50,27 +34,27 @@ fn main() {
// We're parsing each socket with the `LineCodec` defined above, and then we
// `split` each codec into the sink/stream halves.
- let (a_sink, a_stream) = a.framed(LineCodec).split();
- let (b_sink, b_stream) = b.framed(LineCodec).split();
+ let (a_sink, a_stream) = a.framed(BytesCodec::new()).split();
+ let (b_sink, b_stream) = b.framed(BytesCodec::new()).split();
// Start off by sending a ping from a to b, afterwards we just print out
// what they send us and continually send pings
// let pings = stream::iter((0..5).map(Ok));
- let a = a_sink.send((b_addr, b"PING".to_vec())).and_then(|a_sink| {
+ let a = a_sink.send(("PING".into(), b_addr)).and_then(|a_sink| {
let mut i = 0;
- let a_stream = a_stream.take(4).map(move |(addr, msg)| {
+ let a_stream = a_stream.take(4).map(move |(msg, addr)| {
i += 1;
println!("[a] recv: {}", String::from_utf8_lossy(&msg));
- (addr, format!("PING {}", i).into_bytes())
+ (format!("PING {}", i).into(), addr)
});
a_sink.send_all(a_stream)
});
// The second client we have will receive the pings from `a` and then send
// back pongs.
- let b_stream = b_stream.map(|(addr, msg)| {
+ let b_stream = b_stream.map(|(msg, addr)| {
println!("[b] recv: {}", String::from_utf8_lossy(&msg));
- (addr, b"PONG".to_vec())
+ ("PONG".into(), addr)
});
let b = b_sink.send_all(b_stream);