summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRick Richardson <rick.richardson@gmail.com>2016-11-20 11:40:43 -0800
committerRick Richardson <rick.richardson@gmail.com>2016-11-20 11:40:43 -0800
commit161811de8b59b66f156599807c779c89ba261817 (patch)
tree515a14be1d6fa0ddd83aefd8be129be5e565b26c
parent71d8672aab2b6c4712942783920e01db578eac9c (diff)
moved udp test to examples, optimized buffer handling
-rw-r--r--examples/udp-codec.rs114
-rw-r--r--src/io/udp_frame.rs19
-rw-r--r--src/net/udp.rs87
-rw-r--r--tests/udp-line-frames.rs74
4 files changed, 209 insertions, 85 deletions
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs
new file mode 100644
index 00000000..fd6d9874
--- /dev/null
+++ b/examples/udp-codec.rs
@@ -0,0 +1,114 @@
+extern crate tokio_core;
+extern crate env_logger;
+extern crate futures;
+
+#[macro_use]
+extern crate log;
+
+use std::io;
+use std::net::{SocketAddr};
+use futures::{future, Future, Stream, Sink};
+use tokio_core::io::{CodecUdp};
+use tokio_core::net::{UdpSocket};
+use tokio_core::reactor::{Core, Timeout};
+use std::time::Duration;
+use std::str;
+
+/// This is a basic example of leveraging `FramedUdp` to create
+/// a simple UDP client and server which speak a custom Protocol.
+/// `FramedUdp` applies a `Codec` to the input and output of an
+/// `Evented`
+
+/// Simple Newline based parser,
+/// This is for a connectionless server, it must keep track
+/// of the Socket address of the last peer to contact it
+/// so that it can respond back.
+/// In the real world, one would probably
+/// want an associative of remote peers to their state
+pub struct LineCodec {
+ addr : Option<SocketAddr>
+}
+
+impl CodecUdp for LineCodec {
+ type In = Vec<u8>;
+ type Out = Vec<u8>;
+
+ fn decode(&mut self, addr : &SocketAddr, buf: &[u8]) -> Result<Option<Self::In>, io::Error> {
+ trace!("decoding {} - {}", str::from_utf8(buf).unwrap(), addr);
+ self.addr = Some(*addr);
+ match buf.iter().position(|&b| b == b'\n') {
+ Some(i) => Ok(Some(buf[.. i].into())),
+ None => Ok(None),
+ }
+ }
+
+ fn encode(&mut self, item: &Vec<u8>, into: &mut Vec<u8>) -> SocketAddr {
+ trace!("encoding {}", str::from_utf8(item.as_slice()).unwrap());
+ into.extend_from_slice(item.as_slice());
+ into.push('\n' as u8);
+
+ self.addr.unwrap()
+ }
+}
+
+fn main() {
+ drop(env_logger::init());
+
+ let mut core = Core::new().unwrap();
+ let handle = core.handle();
+
+ //create the line codec parser for each
+ let srvcodec = LineCodec { addr : None };
+ let clicodec = LineCodec { addr : None };
+
+ let srvaddr : SocketAddr = "127.0.0.1:31999".parse().unwrap();
+ let clientaddr : SocketAddr = "127.0.0.1:32000".parse().unwrap();
+
+ //We bind each socket to a specific port
+ let server = UdpSocket::bind(&srvaddr, &handle).unwrap();
+ let client = UdpSocket::bind(&clientaddr, &handle).unwrap();
+
+ //start things off by sending a ping from the client to the server
+ //This doesn't go through the codec to encode the message, but rather
+ //it sends raw data with the send_dgram future
+ {
+ let job = client.send_dgram(b"PING\n", &srvaddr);
+ core.run(job).unwrap();
+ }
+
+ //We create a FramedUdp instance, which associates a socket
+ //with a codec. We then immediate split that into the
+ //receiving side `Stream` and the writing side `Sink`
+ let (srvstream, srvsink) = server.framed(srvcodec).split();
+
+ //`Stream::fold` runs once per every received datagram.
+ //Note that we pass srvsink into fold, so that it can be
+ //supplied to every iteration. The reason for this is
+ //sink.send moves itself into `send` and then returns itself
+ let srvloop = srvstream.fold(srvsink, move |sink, buf| {
+ println!("{}", str::from_utf8(buf.as_slice()).unwrap());
+ sink.send(b"PONG".to_vec())
+ }).map(|_| ());
+
+ //We create another FramedUdp instance, this time for the client socket
+ let (clistream, clisink) = client.framed(clicodec).split();
+
+ //And another infinite iteration
+ let cliloop = clistream.fold(clisink, move |sink, buf| {
+ println!("{}", str::from_utf8(buf.as_slice()).unwrap());
+ sink.send(b"PING".to_vec())
+ }).map(|_| ());
+
+ let timeout = Timeout::new(Duration::from_millis(500), &handle).unwrap();
+
+ //`select_all` takes an `Iterable` of `Future` and returns a future itself
+ //This future waits until the first `Future` completes, it then returns
+ //that result.
+ let wait = future::select_all(vec![timeout.boxed(), srvloop.boxed(), cliloop.boxed()]);
+
+ //Now we instruct `reactor::Core` to iterate, processing events until its future, `SelectAll`
+ //has completed
+ if let Err(e) core.run(wait) {
+ error!("{}", e.0);
+ }
+}
diff --git a/src/io/udp_frame.rs b/src/io/udp_frame.rs
index a1b64db0..1996ac59 100644
--- a/src/io/udp_frame.rs
+++ b/src/io/udp_frame.rs
@@ -34,7 +34,7 @@ pub trait CodecUdp {
///
/// The encode method also determines the destination to which the buffer should
/// be directed, which will be returned as a SocketAddr;
- fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> SocketAddr;
+ fn encode(&mut self, msg: &Self::Out, buf: &mut Vec<u8>) -> SocketAddr;
/// Attempts to decode a frame from the provided buffer of bytes.
///
@@ -55,7 +55,7 @@ pub trait CodecUdp {
/// returned indicating why. This informs `Framed` that the stream is now
/// corrupt and should be terminated.
///
- fn decode(&mut self, src: &SocketAddr, buf: &mut Vec<u8>) -> Result<Option<Self::In>, io::Error>;
+ fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> Result<Option<Self::In>, io::Error>;
}
/// A unified `Stream` and `Sink` interface to an underlying `Io` object, using
@@ -82,9 +82,8 @@ impl<C : CodecUdp> Stream for FramedUdp<C> {
Ok((n, addr)) => {
trace!("read {} bytes", n);
trace!("attempting to decode a frame");
- if let Some(frame) = try!(self.codec.decode(&addr, &mut self.rd)) {
+ if let Some(frame) = try!(self.codec.decode(&addr, & self.rd[.. n])) {
trace!("frame decoded from buffer");
- self.rd.clear();
return Ok(Async::Ready(Some(frame)));
}
}
@@ -109,7 +108,7 @@ impl<C : CodecUdp> Sink for FramedUdp<C> {
}
}
- self.out_addr = Some(self.codec.encode(item, &mut self.wr));
+ self.out_addr = Some(self.codec.encode(&item, &mut self.wr));
Ok(AsyncSink::Ready)
}
@@ -118,11 +117,13 @@ impl<C : CodecUdp> Sink for FramedUdp<C> {
while !self.wr.is_empty() {
if let Some(outaddr) = self.out_addr {
- trace!("writing; remaining={}", self.wr.len());
- let n = try_nb!(self.socket.send_to(&self.wr, &outaddr));
+ let remaining = self.wr.len();
+ trace!("writing; remaining={}", remaining);
+ let n = try_nb!(self.socket.send_to(self.wr.as_slice(), &outaddr));
+ trace!("written {}", n);
self.wr.clear();
self.out_addr = None;
- if n != self.wr.len() {
+ if n != remaining {
return Err(io::Error::new(io::ErrorKind::WriteZero,
"failed to write frame datagram to socket"));
}
@@ -143,7 +144,7 @@ pub fn framed_udp<C>(socket : UdpSocket, codec : C) -> FramedUdp<C> {
FramedUdp::new(
socket,
codec,
- Vec::with_capacity(64 * 1024),
+ vec![0; 64 * 1024],
Vec::with_capacity(64 * 1024)
)
}
diff --git a/src/net/udp.rs b/src/net/udp.rs
index 34de052c..109b759a 100644
--- a/src/net/udp.rs
+++ b/src/net/udp.rs
@@ -2,8 +2,9 @@ use std::io;
use std::net::{self, SocketAddr, Ipv4Addr, Ipv6Addr};
use std::fmt;
use io::FramedUdp;
-use futures::Async;
+use futures::{Async, Future, Poll};
use mio;
+use std::mem;
use reactor::{Handle, PollEvented};
@@ -49,7 +50,7 @@ impl UdpSocket {
FramedUdp::new(
self,
codec,
- Vec::with_capacity(64 * 1024),
+ vec![0; 64 * 1024],
Vec::with_capacity(64 * 1024)
)
}
@@ -97,6 +98,33 @@ impl UdpSocket {
Err(e) => Err(e),
}
}
+
+ /// Creates a future that will write the entire contents of the buffer `buf` to
+ /// the stream `a` provided.
+ ///
+ /// The returned future will return after data has been written to the outbound
+ /// socket.
+ /// The future will resolve to the stream as well as the buffer (for reuse if
+ /// needed).
+ ///
+ /// Any error which happens during writing will cause both the stream and the
+ /// buffer to get destroyed.
+ ///
+ /// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should
+ /// be broadly applicable to accepting data which can be converted to a slice.
+ /// The `Window` struct is also available in this crate to provide a different
+ /// window into a slice if necessary.
+ pub fn send_dgram<'a, T>(&'a self, buf: T, addr : &'a SocketAddr) -> SendDGram<T>
+ where T: AsRef<[u8]>,
+ {
+ SendDGram {
+ state: UdpState::Writing {
+ sock: self,
+ addr: addr,
+ buf: buf,
+ },
+ }
+ }
/// Receives data from the socket. On success, returns the number of bytes
/// read and the address from whence the data came.
@@ -261,6 +289,61 @@ impl fmt::Debug for UdpSocket {
}
}
+/// A future used to write the entire contents of some data to a stream.
+///
+/// This is created by the [`write_all`] top-level method.
+///
+/// [`write_all`]: fn.write_all.html
+pub struct SendDGram<'a, T> {
+ state: UdpState<'a, T>,
+}
+
+enum UdpState<'a, T> {
+ Writing {
+ sock: &'a UdpSocket,
+ buf: T,
+ addr: &'a SocketAddr,
+ },
+ Empty,
+}
+
+
+fn zero_write() -> io::Error {
+ io::Error::new(io::ErrorKind::WriteZero, "zero-length write")
+}
+
+fn incomplete_write(reason : &str) -> io::Error {
+ io::Error::new(io::ErrorKind::Other, reason)
+}
+
+impl<'a, T> Future for SendDGram<'a, T>
+ where T: AsRef<[u8]>,
+{
+ type Item = T;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<T, io::Error> {
+ match self.state {
+ UdpState::Writing { ref sock, ref buf, ref addr} => {
+ let buf = buf.as_ref();
+ let n = try_nb!(sock.send_to(&buf, addr));
+ if n == 0 {
+ return Err(zero_write())
+ }
+ if n != buf.len() {
+ return Err(incomplete_write("Failed to send entire message in datagram"))
+ }
+ }
+ UdpState::Empty => panic!("poll a SendAllTo after it's done"),
+ }
+
+ match mem::replace(&mut self.state, UdpState::Empty) {
+ UdpState::Writing { buf, .. } => Ok((buf).into()),
+ UdpState::Empty => panic!(),
+ }
+ }
+}
+
#[cfg(unix)]
mod sys {
use std::os::unix::prelude::*;
diff --git a/tests/udp-line-frames.rs b/tests/udp-line-frames.rs
deleted file mode 100644
index 90e8968d..00000000
--- a/tests/udp-line-frames.rs
+++ /dev/null
@@ -1,74 +0,0 @@
-extern crate tokio_core;
-extern crate env_logger;
-extern crate futures;
-
-use std::io;
-use std::net::{SocketAddr};
-use futures::{future, Future, Stream, Sink, IntoFuture};
-use tokio_core::io::{write_all, read, FramedUdp, CodecUdp, Io};
-use tokio_core::net::{UdpSocket};
-use tokio_core::reactor::{Core, Timeout};
-use std::time::Duration;
-use std::str;
-
-pub struct LineCodec {
- addr : Option<SocketAddr>
-}
-
-impl CodecUdp for LineCodec {
- type In = Vec<u8>;
- type Out = Vec<u8>;
-
- fn decode(&mut self, addr : &SocketAddr, buf: &mut Vec<u8>) -> Result<Option<Self::In>, io::Error> {
- self.addr = Some(*addr);
- match buf.as_slice().iter().position(|&b| b == b'\n') {
- Some(i) => Ok(Some(buf[.. i + 1].into())),
- None => Ok(None),
- }
- }
-
- fn encode(&mut self, item: Vec<u8>, into: &mut Vec<u8>) -> SocketAddr {
- into.extend_from_slice(item.as_slice());
- into.push('\n' as u8);
-
- self.addr.unwrap()
- }
-}
-
-#[test]
-fn echo() {
- drop(env_logger::init());
-
- let mut core = Core::new().unwrap();
- let handle = core.handle();
-
- let srvcodec = LineCodec { addr : None };
- let clicodec = LineCodec { addr : None };
-
- let srvaddr : SocketAddr = "127.0.0.1:31999".parse().unwrap();
- let clientaddr : SocketAddr = "127.0.0.1:32000".parse().unwrap();
-
- let server = UdpSocket::bind(&srvaddr, &handle).unwrap();
- let client = UdpSocket::bind(&clientaddr, &handle).unwrap();
-
- let job = client.send_to(b"PING", &srvaddr);
- let _ = core.run(job.into_future()).unwrap();
-
- let (srvstream, srvsink) = server.framed(srvcodec).split();
- let srvloop = srvstream.for_each(move |buf| {
- println!("{}", str::from_utf8(buf.as_slice()).unwrap());
- srvsink.send(b"PONG".to_vec()).map(|_| ()).wait()
- });
-
- let (clistream, clisink) = client.framed(clicodec).split();
- let cliloop = clistream.for_each(move |buf| {
- println!("{}", str::from_utf8(buf.as_slice()).unwrap());
- clisink.send(b"PING".to_vec()).map(|_| ()).wait()
- });
-
- let timeout = Timeout::new(Duration::from_millis(500), &handle).unwrap();
-
- let wait = future::select_all(vec![timeout.boxed(), srvloop.boxed(), cliloop.boxed()]);
- core.run(wait);
-
-}