summaryrefslogtreecommitdiffstats
path: root/examples/udp-codec.rs
diff options
context:
space:
mode:
authorRick Richardson <rick.richardson@gmail.com>2016-11-20 11:40:43 -0800
committerRick Richardson <rick.richardson@gmail.com>2016-11-20 11:40:43 -0800
commit161811de8b59b66f156599807c779c89ba261817 (patch)
tree515a14be1d6fa0ddd83aefd8be129be5e565b26c /examples/udp-codec.rs
parent71d8672aab2b6c4712942783920e01db578eac9c (diff)
moved udp test to examples, optimized buffer handling
Diffstat (limited to 'examples/udp-codec.rs')
-rw-r--r--examples/udp-codec.rs114
1 files changed, 114 insertions, 0 deletions
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs
new file mode 100644
index 00000000..fd6d9874
--- /dev/null
+++ b/examples/udp-codec.rs
@@ -0,0 +1,114 @@
+extern crate tokio_core;
+extern crate env_logger;
+extern crate futures;
+
+#[macro_use]
+extern crate log;
+
+use std::io;
+use std::net::{SocketAddr};
+use futures::{future, Future, Stream, Sink};
+use tokio_core::io::{CodecUdp};
+use tokio_core::net::{UdpSocket};
+use tokio_core::reactor::{Core, Timeout};
+use std::time::Duration;
+use std::str;
+
+/// This is a basic example of leveraging `FramedUdp` to create
+/// a simple UDP client and server which speak a custom Protocol.
+/// `FramedUdp` applies a `Codec` to the input and output of an
+/// `Evented`
+
+/// Simple Newline based parser,
+/// This is for a connectionless server, it must keep track
+/// of the Socket address of the last peer to contact it
+/// so that it can respond back.
+/// In the real world, one would probably
+/// want an associative of remote peers to their state
+pub struct LineCodec {
+ addr : Option<SocketAddr>
+}
+
+impl CodecUdp for LineCodec {
+ type In = Vec<u8>;
+ type Out = Vec<u8>;
+
+ fn decode(&mut self, addr : &SocketAddr, buf: &[u8]) -> Result<Option<Self::In>, io::Error> {
+ trace!("decoding {} - {}", str::from_utf8(buf).unwrap(), addr);
+ self.addr = Some(*addr);
+ match buf.iter().position(|&b| b == b'\n') {
+ Some(i) => Ok(Some(buf[.. i].into())),
+ None => Ok(None),
+ }
+ }
+
+ fn encode(&mut self, item: &Vec<u8>, into: &mut Vec<u8>) -> SocketAddr {
+ trace!("encoding {}", str::from_utf8(item.as_slice()).unwrap());
+ into.extend_from_slice(item.as_slice());
+ into.push('\n' as u8);
+
+ self.addr.unwrap()
+ }
+}
+
+fn main() {
+ drop(env_logger::init());
+
+ let mut core = Core::new().unwrap();
+ let handle = core.handle();
+
+ //create the line codec parser for each
+ let srvcodec = LineCodec { addr : None };
+ let clicodec = LineCodec { addr : None };
+
+ let srvaddr : SocketAddr = "127.0.0.1:31999".parse().unwrap();
+ let clientaddr : SocketAddr = "127.0.0.1:32000".parse().unwrap();
+
+ //We bind each socket to a specific port
+ let server = UdpSocket::bind(&srvaddr, &handle).unwrap();
+ let client = UdpSocket::bind(&clientaddr, &handle).unwrap();
+
+ //start things off by sending a ping from the client to the server
+ //This doesn't go through the codec to encode the message, but rather
+ //it sends raw data with the send_dgram future
+ {
+ let job = client.send_dgram(b"PING\n", &srvaddr);
+ core.run(job).unwrap();
+ }
+
+ //We create a FramedUdp instance, which associates a socket
+ //with a codec. We then immediate split that into the
+ //receiving side `Stream` and the writing side `Sink`
+ let (srvstream, srvsink) = server.framed(srvcodec).split();
+
+ //`Stream::fold` runs once per every received datagram.
+ //Note that we pass srvsink into fold, so that it can be
+ //supplied to every iteration. The reason for this is
+ //sink.send moves itself into `send` and then returns itself
+ let srvloop = srvstream.fold(srvsink, move |sink, buf| {
+ println!("{}", str::from_utf8(buf.as_slice()).unwrap());
+ sink.send(b"PONG".to_vec())
+ }).map(|_| ());
+
+ //We create another FramedUdp instance, this time for the client socket
+ let (clistream, clisink) = client.framed(clicodec).split();
+
+ //And another infinite iteration
+ let cliloop = clistream.fold(clisink, move |sink, buf| {
+ println!("{}", str::from_utf8(buf.as_slice()).unwrap());
+ sink.send(b"PING".to_vec())
+ }).map(|_| ());
+
+ let timeout = Timeout::new(Duration::from_millis(500), &handle).unwrap();
+
+ //`select_all` takes an `Iterable` of `Future` and returns a future itself
+ //This future waits until the first `Future` completes, it then returns
+ //that result.
+ let wait = future::select_all(vec![timeout.boxed(), srvloop.boxed(), cliloop.boxed()]);
+
+ //Now we instruct `reactor::Core` to iterate, processing events until its future, `SelectAll`
+ //has completed
+ if let Err(e) core.run(wait) {
+ error!("{}", e.0);
+ }
+}