diff options
author | Rick Richardson <rick.richardson@gmail.com> | 2016-11-20 11:40:43 -0800 |
---|---|---|
committer | Rick Richardson <rick.richardson@gmail.com> | 2016-11-20 11:40:43 -0800 |
commit | 161811de8b59b66f156599807c779c89ba261817 (patch) | |
tree | 515a14be1d6fa0ddd83aefd8be129be5e565b26c | |
parent | 71d8672aab2b6c4712942783920e01db578eac9c (diff) |
moved udp test to examples, optimized buffer handling
-rw-r--r-- | examples/udp-codec.rs | 114 | ||||
-rw-r--r-- | src/io/udp_frame.rs | 19 | ||||
-rw-r--r-- | src/net/udp.rs | 87 | ||||
-rw-r--r-- | tests/udp-line-frames.rs | 74 |
4 files changed, 209 insertions, 85 deletions
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs new file mode 100644 index 00000000..fd6d9874 --- /dev/null +++ b/examples/udp-codec.rs @@ -0,0 +1,114 @@ +extern crate tokio_core; +extern crate env_logger; +extern crate futures; + +#[macro_use] +extern crate log; + +use std::io; +use std::net::{SocketAddr}; +use futures::{future, Future, Stream, Sink}; +use tokio_core::io::{CodecUdp}; +use tokio_core::net::{UdpSocket}; +use tokio_core::reactor::{Core, Timeout}; +use std::time::Duration; +use std::str; + +/// This is a basic example of leveraging `FramedUdp` to create +/// a simple UDP client and server which speak a custom Protocol. +/// `FramedUdp` applies a `Codec` to the input and output of an +/// `Evented` + +/// Simple Newline based parser, +/// This is for a connectionless server, it must keep track +/// of the Socket address of the last peer to contact it +/// so that it can respond back. +/// In the real world, one would probably +/// want an associative of remote peers to their state +pub struct LineCodec { + addr : Option<SocketAddr> +} + +impl CodecUdp for LineCodec { + type In = Vec<u8>; + type Out = Vec<u8>; + + fn decode(&mut self, addr : &SocketAddr, buf: &[u8]) -> Result<Option<Self::In>, io::Error> { + trace!("decoding {} - {}", str::from_utf8(buf).unwrap(), addr); + self.addr = Some(*addr); + match buf.iter().position(|&b| b == b'\n') { + Some(i) => Ok(Some(buf[.. i].into())), + None => Ok(None), + } + } + + fn encode(&mut self, item: &Vec<u8>, into: &mut Vec<u8>) -> SocketAddr { + trace!("encoding {}", str::from_utf8(item.as_slice()).unwrap()); + into.extend_from_slice(item.as_slice()); + into.push('\n' as u8); + + self.addr.unwrap() + } +} + +fn main() { + drop(env_logger::init()); + + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + //create the line codec parser for each + let srvcodec = LineCodec { addr : None }; + let clicodec = LineCodec { addr : None }; + + let srvaddr : SocketAddr = "127.0.0.1:31999".parse().unwrap(); + let clientaddr : SocketAddr = "127.0.0.1:32000".parse().unwrap(); + + //We bind each socket to a specific port + let server = UdpSocket::bind(&srvaddr, &handle).unwrap(); + let client = UdpSocket::bind(&clientaddr, &handle).unwrap(); + + //start things off by sending a ping from the client to the server + //This doesn't go through the codec to encode the message, but rather + //it sends raw data with the send_dgram future + { + let job = client.send_dgram(b"PING\n", &srvaddr); + core.run(job).unwrap(); + } + + //We create a FramedUdp instance, which associates a socket + //with a codec. We then immediate split that into the + //receiving side `Stream` and the writing side `Sink` + let (srvstream, srvsink) = server.framed(srvcodec).split(); + + //`Stream::fold` runs once per every received datagram. + //Note that we pass srvsink into fold, so that it can be + //supplied to every iteration. The reason for this is + //sink.send moves itself into `send` and then returns itself + let srvloop = srvstream.fold(srvsink, move |sink, buf| { + println!("{}", str::from_utf8(buf.as_slice()).unwrap()); + sink.send(b"PONG".to_vec()) + }).map(|_| ()); + + //We create another FramedUdp instance, this time for the client socket + let (clistream, clisink) = client.framed(clicodec).split(); + + //And another infinite iteration + let cliloop = clistream.fold(clisink, move |sink, buf| { + println!("{}", str::from_utf8(buf.as_slice()).unwrap()); + sink.send(b"PING".to_vec()) + }).map(|_| ()); + + let timeout = Timeout::new(Duration::from_millis(500), &handle).unwrap(); + + //`select_all` takes an `Iterable` of `Future` and returns a future itself + //This future waits until the first `Future` completes, it then returns + //that result. + let wait = future::select_all(vec![timeout.boxed(), srvloop.boxed(), cliloop.boxed()]); + + //Now we instruct `reactor::Core` to iterate, processing events until its future, `SelectAll` + //has completed + if let Err(e) core.run(wait) { + error!("{}", e.0); + } +} diff --git a/src/io/udp_frame.rs b/src/io/udp_frame.rs index a1b64db0..1996ac59 100644 --- a/src/io/udp_frame.rs +++ b/src/io/udp_frame.rs @@ -34,7 +34,7 @@ pub trait CodecUdp { /// /// The encode method also determines the destination to which the buffer should /// be directed, which will be returned as a SocketAddr; - fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> SocketAddr; + fn encode(&mut self, msg: &Self::Out, buf: &mut Vec<u8>) -> SocketAddr; /// Attempts to decode a frame from the provided buffer of bytes. /// @@ -55,7 +55,7 @@ pub trait CodecUdp { /// returned indicating why. This informs `Framed` that the stream is now /// corrupt and should be terminated. /// - fn decode(&mut self, src: &SocketAddr, buf: &mut Vec<u8>) -> Result<Option<Self::In>, io::Error>; + fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> Result<Option<Self::In>, io::Error>; } /// A unified `Stream` and `Sink` interface to an underlying `Io` object, using @@ -82,9 +82,8 @@ impl<C : CodecUdp> Stream for FramedUdp<C> { Ok((n, addr)) => { trace!("read {} bytes", n); trace!("attempting to decode a frame"); - if let Some(frame) = try!(self.codec.decode(&addr, &mut self.rd)) { + if let Some(frame) = try!(self.codec.decode(&addr, & self.rd[.. n])) { trace!("frame decoded from buffer"); - self.rd.clear(); return Ok(Async::Ready(Some(frame))); } } @@ -109,7 +108,7 @@ impl<C : CodecUdp> Sink for FramedUdp<C> { } } - self.out_addr = Some(self.codec.encode(item, &mut self.wr)); + self.out_addr = Some(self.codec.encode(&item, &mut self.wr)); Ok(AsyncSink::Ready) } @@ -118,11 +117,13 @@ impl<C : CodecUdp> Sink for FramedUdp<C> { while !self.wr.is_empty() { if let Some(outaddr) = self.out_addr { - trace!("writing; remaining={}", self.wr.len()); - let n = try_nb!(self.socket.send_to(&self.wr, &outaddr)); + let remaining = self.wr.len(); + trace!("writing; remaining={}", remaining); + let n = try_nb!(self.socket.send_to(self.wr.as_slice(), &outaddr)); + trace!("written {}", n); self.wr.clear(); self.out_addr = None; - if n != self.wr.len() { + if n != remaining { return Err(io::Error::new(io::ErrorKind::WriteZero, "failed to write frame datagram to socket")); } @@ -143,7 +144,7 @@ pub fn framed_udp<C>(socket : UdpSocket, codec : C) -> FramedUdp<C> { FramedUdp::new( socket, codec, - Vec::with_capacity(64 * 1024), + vec![0; 64 * 1024], Vec::with_capacity(64 * 1024) ) } diff --git a/src/net/udp.rs b/src/net/udp.rs index 34de052c..109b759a 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -2,8 +2,9 @@ use std::io; use std::net::{self, SocketAddr, Ipv4Addr, Ipv6Addr}; use std::fmt; use io::FramedUdp; -use futures::Async; +use futures::{Async, Future, Poll}; use mio; +use std::mem; use reactor::{Handle, PollEvented}; @@ -49,7 +50,7 @@ impl UdpSocket { FramedUdp::new( self, codec, - Vec::with_capacity(64 * 1024), + vec![0; 64 * 1024], Vec::with_capacity(64 * 1024) ) } @@ -97,6 +98,33 @@ impl UdpSocket { Err(e) => Err(e), } } + + /// Creates a future that will write the entire contents of the buffer `buf` to + /// the stream `a` provided. + /// + /// The returned future will return after data has been written to the outbound + /// socket. + /// The future will resolve to the stream as well as the buffer (for reuse if + /// needed). + /// + /// Any error which happens during writing will cause both the stream and the + /// buffer to get destroyed. + /// + /// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should + /// be broadly applicable to accepting data which can be converted to a slice. + /// The `Window` struct is also available in this crate to provide a different + /// window into a slice if necessary. + pub fn send_dgram<'a, T>(&'a self, buf: T, addr : &'a SocketAddr) -> SendDGram<T> + where T: AsRef<[u8]>, + { + SendDGram { + state: UdpState::Writing { + sock: self, + addr: addr, + buf: buf, + }, + } + } /// Receives data from the socket. On success, returns the number of bytes /// read and the address from whence the data came. @@ -261,6 +289,61 @@ impl fmt::Debug for UdpSocket { } } +/// A future used to write the entire contents of some data to a stream. +/// +/// This is created by the [`write_all`] top-level method. +/// +/// [`write_all`]: fn.write_all.html +pub struct SendDGram<'a, T> { + state: UdpState<'a, T>, +} + +enum UdpState<'a, T> { + Writing { + sock: &'a UdpSocket, + buf: T, + addr: &'a SocketAddr, + }, + Empty, +} + + +fn zero_write() -> io::Error { + io::Error::new(io::ErrorKind::WriteZero, "zero-length write") +} + +fn incomplete_write(reason : &str) -> io::Error { + io::Error::new(io::ErrorKind::Other, reason) +} + +impl<'a, T> Future for SendDGram<'a, T> + where T: AsRef<[u8]>, +{ + type Item = T; + type Error = io::Error; + + fn poll(&mut self) -> Poll<T, io::Error> { + match self.state { + UdpState::Writing { ref sock, ref buf, ref addr} => { + let buf = buf.as_ref(); + let n = try_nb!(sock.send_to(&buf, addr)); + if n == 0 { + return Err(zero_write()) + } + if n != buf.len() { + return Err(incomplete_write("Failed to send entire message in datagram")) + } + } + UdpState::Empty => panic!("poll a SendAllTo after it's done"), + } + + match mem::replace(&mut self.state, UdpState::Empty) { + UdpState::Writing { buf, .. } => Ok((buf).into()), + UdpState::Empty => panic!(), + } + } +} + #[cfg(unix)] mod sys { use std::os::unix::prelude::*; diff --git a/tests/udp-line-frames.rs b/tests/udp-line-frames.rs deleted file mode 100644 index 90e8968d..00000000 --- a/tests/udp-line-frames.rs +++ /dev/null @@ -1,74 +0,0 @@ -extern crate tokio_core; -extern crate env_logger; -extern crate futures; - -use std::io; -use std::net::{SocketAddr}; -use futures::{future, Future, Stream, Sink, IntoFuture}; -use tokio_core::io::{write_all, read, FramedUdp, CodecUdp, Io}; -use tokio_core::net::{UdpSocket}; -use tokio_core::reactor::{Core, Timeout}; -use std::time::Duration; -use std::str; - -pub struct LineCodec { - addr : Option<SocketAddr> -} - -impl CodecUdp for LineCodec { - type In = Vec<u8>; - type Out = Vec<u8>; - - fn decode(&mut self, addr : &SocketAddr, buf: &mut Vec<u8>) -> Result<Option<Self::In>, io::Error> { - self.addr = Some(*addr); - match buf.as_slice().iter().position(|&b| b == b'\n') { - Some(i) => Ok(Some(buf[.. i + 1].into())), - None => Ok(None), - } - } - - fn encode(&mut self, item: Vec<u8>, into: &mut Vec<u8>) -> SocketAddr { - into.extend_from_slice(item.as_slice()); - into.push('\n' as u8); - - self.addr.unwrap() - } -} - -#[test] -fn echo() { - drop(env_logger::init()); - - let mut core = Core::new().unwrap(); - let handle = core.handle(); - - let srvcodec = LineCodec { addr : None }; - let clicodec = LineCodec { addr : None }; - - let srvaddr : SocketAddr = "127.0.0.1:31999".parse().unwrap(); - let clientaddr : SocketAddr = "127.0.0.1:32000".parse().unwrap(); - - let server = UdpSocket::bind(&srvaddr, &handle).unwrap(); - let client = UdpSocket::bind(&clientaddr, &handle).unwrap(); - - let job = client.send_to(b"PING", &srvaddr); - let _ = core.run(job.into_future()).unwrap(); - - let (srvstream, srvsink) = server.framed(srvcodec).split(); - let srvloop = srvstream.for_each(move |buf| { - println!("{}", str::from_utf8(buf.as_slice()).unwrap()); - srvsink.send(b"PONG".to_vec()).map(|_| ()).wait() - }); - - let (clistream, clisink) = client.framed(clicodec).split(); - let cliloop = clistream.for_each(move |buf| { - println!("{}", str::from_utf8(buf.as_slice()).unwrap()); - clisink.send(b"PING".to_vec()).map(|_| ()).wait() - }); - - let timeout = Timeout::new(Duration::from_millis(500), &handle).unwrap(); - - let wait = future::select_all(vec![timeout.boxed(), srvloop.boxed(), cliloop.boxed()]); - core.run(wait); - -} |