summaryrefslogtreecommitdiffstats
path: root/examples/udp-codec.rs
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2016-11-22 12:35:30 -0800
committerAlex Crichton <alex@alexcrichton.com>2016-11-22 12:35:30 -0800
commit1a48b79474402435f371c31f3121f9ec6cd9aab7 (patch)
tree16f5a62a38808277f9564d049e95730eee7a9d7e /examples/udp-codec.rs
parent9b62ade9629a44a792ba0243af52613d7283cec0 (diff)
Touch up examples to ensure consistency
Diffstat (limited to 'examples/udp-codec.rs')
-rw-r--r--examples/udp-codec.rs153
1 files changed, 58 insertions, 95 deletions
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs
index 894d21b4..d43160c8 100644
--- a/examples/udp-codec.rs
+++ b/examples/udp-codec.rs
@@ -1,59 +1,38 @@
+//! This is a basic example of leveraging `UdpCodec` to create a simple UDP
+//! client and server which speak a custom protocol.
+//!
+//! Here we're using the a custom codec to convert a UDP socket to a stream of
+//! client messages. These messages are then processed and returned back as a
+//! new message with a new destination. Overall, we then use this to construct a
+//! "ping pong" pair where two sockets are sending messages back and forth.
+
extern crate tokio_core;
extern crate env_logger;
extern crate futures;
-#[macro_use]
-extern crate log;
-
use std::io;
-use std::net::{SocketAddr};
-use futures::{future, Future, Stream, Sink};
-use tokio_core::net::{UdpSocket, UdpCodec};
-use tokio_core::reactor::{Core, Timeout};
-use std::time::Duration;
+use std::net::SocketAddr;
use std::str;
-/// This is a basic example of leveraging `FramedUdp` to create
-/// a simple UDP client and server which speak a custom Protocol.
-/// `FramedUdp` applies a `Codec` to the input and output of an
-/// `Evented`
+use futures::{Future, Stream, Sink};
+use tokio_core::net::{UdpSocket, UdpCodec};
+use tokio_core::reactor::Core;
-/// Simple Newline based parser,
-/// This is for a connectionless server, it must keep track
-/// of the Socket address of the last peer to contact it
-/// so that it can respond back.
-/// In the real world, one would probably
-/// want an associative of remote peers to their state
-///
-/// Note that this takes a pretty draconian stance by returning
-/// an error if it can't find a newline in the datagram it received
-pub struct LineCodec {
- addr : Option<SocketAddr>
-}
+pub struct LineCodec;
impl UdpCodec for LineCodec {
- type In = Vec<Vec<u8>>;
- type Out = Vec<u8>;
+ type In = (SocketAddr, Vec<u8>);
+ type Out = (SocketAddr, Vec<u8>);
- fn decode(&mut self, addr : &SocketAddr, buf: &[u8]) -> Result<Self::In, io::Error> {
- trace!("decoding {} - {}", str::from_utf8(buf).unwrap(), addr);
- self.addr = Some(*addr);
- let res : Vec<Vec<u8>> = buf.split(|c| *c == b'\n').map(|s| s.into()).collect();
- if res.len() > 0 {
- Ok(res)
- }
- else {
- Err(io::Error::new(io::ErrorKind::Other,
- "failed to find newline in datagram"))
- }
+ fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> {
+ Ok((*addr, buf.to_vec()))
}
- fn encode(&mut self, item: Vec<u8>, into: &mut Vec<u8>) -> SocketAddr {
- trace!("encoding {}", str::from_utf8(item.as_slice()).unwrap());
- into.extend_from_slice(item.as_slice());
- into.push('\n' as u8);
-
- self.addr.unwrap()
+ fn encode(&mut self,
+ (addr, buf): (SocketAddr, Vec<u8>),
+ into: &mut Vec<u8>) -> SocketAddr {
+ into.extend(buf);
+ return addr
}
}
@@ -63,56 +42,40 @@ fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
- //create the line codec parser for each
- let srvcodec = LineCodec { addr : None };
- let clicodec = LineCodec { addr : None };
-
- let srvaddr : SocketAddr = "127.0.0.1:31999".parse().unwrap();
- let clientaddr : SocketAddr = "127.0.0.1:32000".parse().unwrap();
-
- //We bind each socket to a specific port
- let server = UdpSocket::bind(&srvaddr, &handle).unwrap();
- let client = UdpSocket::bind(&clientaddr, &handle).unwrap();
-
- //start things off by sending a ping from the client to the server
- //This doesn't utilize the codec to encode the message, but rather
- //it sends raw data directly to the remote peer with the send_dgram future
- let job = client.send_dgram(b"PING\n", srvaddr);
- let (client, _buf) = core.run(job).unwrap();
-
- //We create a FramedUdp instance, which associates a socket
- //with a codec. We then immediate split that into the
- //receiving side `Stream` and the writing side `Sink`
- let (srvsink, srvstream) = server.framed(srvcodec).split();
-
- //`Stream::fold` runs once per every received datagram.
- //Note that we pass srvsink into fold, so that it can be
- //supplied to every iteration. The reason for this is
- //sink.send moves itself into `send` and then returns itself
- let srvloop = srvstream.fold(srvsink, move |sink, lines| {
- println!("{}", str::from_utf8(lines[0].as_slice()).unwrap());
- sink.send(b"PONG".to_vec())
- }).map(|_| ());
-
- //We create another FramedUdp instance, this time for the client socket
- let (clisink, clistream) = client.framed(clicodec).split();
-
- //And another infinite iteration
- let cliloop = clistream.fold(clisink, move |sink, lines| {
- println!("{}", str::from_utf8(lines[0].as_slice()).unwrap());
- sink.send(b"PING".to_vec())
- }).map(|_| ());
-
- let timeout = Timeout::new(Duration::from_millis(500), &handle).unwrap();
-
- //`select_all` takes an `Iterable` of `Future` and returns a future itself
- //This future waits until the first `Future` completes, it then returns
- //that result.
- let wait = future::select_all(vec![timeout.boxed(), srvloop.boxed(), cliloop.boxed()]);
-
- //Now we instruct `reactor::Core` to iterate, processing events until its future, `SelectAll`
- //has completed
- if let Err(e) = core.run(wait) {
- error!("{}", e.0);
- }
+ let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
+
+ // Bind both our sockets and then figure out what ports we got.
+ let a = UdpSocket::bind(&addr, &handle).unwrap();
+ let b = UdpSocket::bind(&addr, &handle).unwrap();
+ let b_addr = b.local_addr().unwrap();
+
+ // We're parsing each socket with the `LineCodec` defined above, and then we
+ // `split` each codec into the sink/stream halves.
+ let (a_sink, a_stream) = a.framed(LineCodec).split();
+ let (b_sink, b_stream) = b.framed(LineCodec).split();
+
+ // Start off by sending a ping from a to b, afterwards we just print out
+ // what they send us and continually send pings
+ // let pings = stream::iter((0..5).map(Ok));
+ let a = a_sink.send((b_addr, b"PING".to_vec())).and_then(|a_sink| {
+ let mut i = 0;
+ let a_stream = a_stream.take(4).map(move |(addr, msg)| {
+ i += 1;
+ println!("[a] recv: {}", String::from_utf8_lossy(&msg));
+ (addr, format!("PING {}", i).into_bytes())
+ });
+ a_sink.send_all(a_stream)
+ });
+
+ // The second client we have will receive the pings from `a` and then send
+ // back pongs.
+ let b_stream = b_stream.map(|(addr, msg)| {
+ println!("[b] recv: {}", String::from_utf8_lossy(&msg));
+ (addr, b"PONG".to_vec())
+ });
+ let b = b_sink.send_all(b_stream);
+
+ // Spawn the sender of pongs and then wait for our pinger to finish.
+ handle.spawn(b.then(|_| Ok(())));
+ drop(core.run(a));
}