From 1a48b79474402435f371c31f3121f9ec6cd9aab7 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Tue, 22 Nov 2016 12:35:30 -0800 Subject: Touch up examples to ensure consistency --- examples/udp-codec.rs | 153 +++++++++++++++++++------------------------------- 1 file changed, 58 insertions(+), 95 deletions(-) (limited to 'examples/udp-codec.rs') diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index 894d21b4..d43160c8 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -1,59 +1,38 @@ +//! This is a basic example of leveraging `UdpCodec` to create a simple UDP +//! client and server which speak a custom protocol. +//! +//! Here we're using the a custom codec to convert a UDP socket to a stream of +//! client messages. These messages are then processed and returned back as a +//! new message with a new destination. Overall, we then use this to construct a +//! "ping pong" pair where two sockets are sending messages back and forth. + extern crate tokio_core; extern crate env_logger; extern crate futures; -#[macro_use] -extern crate log; - use std::io; -use std::net::{SocketAddr}; -use futures::{future, Future, Stream, Sink}; -use tokio_core::net::{UdpSocket, UdpCodec}; -use tokio_core::reactor::{Core, Timeout}; -use std::time::Duration; +use std::net::SocketAddr; use std::str; -/// This is a basic example of leveraging `FramedUdp` to create -/// a simple UDP client and server which speak a custom Protocol. -/// `FramedUdp` applies a `Codec` to the input and output of an -/// `Evented` +use futures::{Future, Stream, Sink}; +use tokio_core::net::{UdpSocket, UdpCodec}; +use tokio_core::reactor::Core; -/// Simple Newline based parser, -/// This is for a connectionless server, it must keep track -/// of the Socket address of the last peer to contact it -/// so that it can respond back. -/// In the real world, one would probably -/// want an associative of remote peers to their state -/// -/// Note that this takes a pretty draconian stance by returning -/// an error if it can't find a newline in the datagram it received -pub struct LineCodec { - addr : Option -} +pub struct LineCodec; impl UdpCodec for LineCodec { - type In = Vec>; - type Out = Vec; + type In = (SocketAddr, Vec); + type Out = (SocketAddr, Vec); - fn decode(&mut self, addr : &SocketAddr, buf: &[u8]) -> Result { - trace!("decoding {} - {}", str::from_utf8(buf).unwrap(), addr); - self.addr = Some(*addr); - let res : Vec> = buf.split(|c| *c == b'\n').map(|s| s.into()).collect(); - if res.len() > 0 { - Ok(res) - } - else { - Err(io::Error::new(io::ErrorKind::Other, - "failed to find newline in datagram")) - } + fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> io::Result { + Ok((*addr, buf.to_vec())) } - fn encode(&mut self, item: Vec, into: &mut Vec) -> SocketAddr { - trace!("encoding {}", str::from_utf8(item.as_slice()).unwrap()); - into.extend_from_slice(item.as_slice()); - into.push('\n' as u8); - - self.addr.unwrap() + fn encode(&mut self, + (addr, buf): (SocketAddr, Vec), + into: &mut Vec) -> SocketAddr { + into.extend(buf); + return addr } } @@ -63,56 +42,40 @@ fn main() { let mut core = Core::new().unwrap(); let handle = core.handle(); - //create the line codec parser for each - let srvcodec = LineCodec { addr : None }; - let clicodec = LineCodec { addr : None }; - - let srvaddr : SocketAddr = "127.0.0.1:31999".parse().unwrap(); - let clientaddr : SocketAddr = "127.0.0.1:32000".parse().unwrap(); - - //We bind each socket to a specific port - let server = UdpSocket::bind(&srvaddr, &handle).unwrap(); - let client = UdpSocket::bind(&clientaddr, &handle).unwrap(); - - //start things off by sending a ping from the client to the server - //This doesn't utilize the codec to encode the message, but rather - //it sends raw data directly to the remote peer with the send_dgram future - let job = client.send_dgram(b"PING\n", srvaddr); - let (client, _buf) = core.run(job).unwrap(); - - //We create a FramedUdp instance, which associates a socket - //with a codec. We then immediate split that into the - //receiving side `Stream` and the writing side `Sink` - let (srvsink, srvstream) = server.framed(srvcodec).split(); - - //`Stream::fold` runs once per every received datagram. - //Note that we pass srvsink into fold, so that it can be - //supplied to every iteration. The reason for this is - //sink.send moves itself into `send` and then returns itself - let srvloop = srvstream.fold(srvsink, move |sink, lines| { - println!("{}", str::from_utf8(lines[0].as_slice()).unwrap()); - sink.send(b"PONG".to_vec()) - }).map(|_| ()); - - //We create another FramedUdp instance, this time for the client socket - let (clisink, clistream) = client.framed(clicodec).split(); - - //And another infinite iteration - let cliloop = clistream.fold(clisink, move |sink, lines| { - println!("{}", str::from_utf8(lines[0].as_slice()).unwrap()); - sink.send(b"PING".to_vec()) - }).map(|_| ()); - - let timeout = Timeout::new(Duration::from_millis(500), &handle).unwrap(); - - //`select_all` takes an `Iterable` of `Future` and returns a future itself - //This future waits until the first `Future` completes, it then returns - //that result. - let wait = future::select_all(vec![timeout.boxed(), srvloop.boxed(), cliloop.boxed()]); - - //Now we instruct `reactor::Core` to iterate, processing events until its future, `SelectAll` - //has completed - if let Err(e) = core.run(wait) { - error!("{}", e.0); - } + let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + + // Bind both our sockets and then figure out what ports we got. + let a = UdpSocket::bind(&addr, &handle).unwrap(); + let b = UdpSocket::bind(&addr, &handle).unwrap(); + let b_addr = b.local_addr().unwrap(); + + // We're parsing each socket with the `LineCodec` defined above, and then we + // `split` each codec into the sink/stream halves. + let (a_sink, a_stream) = a.framed(LineCodec).split(); + let (b_sink, b_stream) = b.framed(LineCodec).split(); + + // Start off by sending a ping from a to b, afterwards we just print out + // what they send us and continually send pings + // let pings = stream::iter((0..5).map(Ok)); + let a = a_sink.send((b_addr, b"PING".to_vec())).and_then(|a_sink| { + let mut i = 0; + let a_stream = a_stream.take(4).map(move |(addr, msg)| { + i += 1; + println!("[a] recv: {}", String::from_utf8_lossy(&msg)); + (addr, format!("PING {}", i).into_bytes()) + }); + a_sink.send_all(a_stream) + }); + + // The second client we have will receive the pings from `a` and then send + // back pongs. + let b_stream = b_stream.map(|(addr, msg)| { + println!("[b] recv: {}", String::from_utf8_lossy(&msg)); + (addr, b"PONG".to_vec()) + }); + let b = b_sink.send_all(b_stream); + + // Spawn the sender of pongs and then wait for our pinger to finish. + handle.spawn(b.then(|_| Ok(()))); + drop(core.run(a)); } -- cgit v1.2.3