summaryrefslogtreecommitdiffstats
path: root/src/io
diff options
context:
space:
mode:
authorRick Richardson <rick.richardson@gmail.com>2016-11-20 09:08:03 -0800
committerRick Richardson <rick.richardson@gmail.com>2016-11-20 09:08:03 -0800
commit71d8672aab2b6c4712942783920e01db578eac9c (patch)
treee27160d255539b3bb45bee0d57179569708d074a /src/io
parent592a99bca4e760d00057fc3927c3c6f164e353e2 (diff)
implemented moste of udp frames test
Diffstat (limited to 'src/io')
-rw-r--r--src/io/mod.rs2
-rw-r--r--src/io/udp_frame.rs128
2 files changed, 62 insertions, 68 deletions
diff --git a/src/io/mod.rs b/src/io/mod.rs
index 15aba3b7..69417489 100644
--- a/src/io/mod.rs
+++ b/src/io/mod.rs
@@ -33,6 +33,7 @@ macro_rules! try_nb {
mod copy;
mod frame;
+mod udp_frame;
mod flush;
mod read_exact;
mod read_to_end;
@@ -43,6 +44,7 @@ mod window;
mod write_all;
pub use self::copy::{copy, Copy};
pub use self::frame::{EasyBuf, EasyBufMut, FramedRead, FramedWrite, Framed, Codec};
+pub use self::udp_frame::{FramedUdp, framed_udp, FramedUdpRead, FramedUdpWrite, CodecUdp};
pub use self::flush::{flush, Flush};
pub use self::read_exact::{read_exact, ReadExact};
pub use self::read_to_end::{read_to_end, ReadToEnd};
diff --git a/src/io/udp_frame.rs b/src/io/udp_frame.rs
index 9c1eeb95..a1b64db0 100644
--- a/src/io/udp_frame.rs
+++ b/src/io/udp_frame.rs
@@ -1,12 +1,9 @@
use std::io;
-use std::ops::{Deref, DerefMut};
-use std::sync::Arc;
-use net::udp::UdpSocket
+use std::net::SocketAddr;
+use net::UdpSocket;
use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink};
use futures::sync::BiLock;
-use io::Io;
-
/// Encoding of frames via buffers.
///
/// This trait is used when constructing an instance of `FramedUdp`. It provides
@@ -20,10 +17,13 @@ use io::Io;
/// or encoding, which is particularly useful for streaming parsers. In many
/// cases, though, this type will simply be a unit struct (e.g. `struct
/// HttpCodec`).
-pub trait EncodeUdp {
+pub trait CodecUdp {
/// The type of frames to be encoded.
type Out;
+
+ /// The type of decoded frames.
+ type In;
/// Encodes a frame into the buffer provided.
@@ -32,87 +32,64 @@ pub trait EncodeUdp {
/// The `buf` provided is an internal buffer of the `Framed` instance and
/// will be written out when possible.
///
- /// The codec also determines the destination to which the buffer should
+ /// The encode method also determines the destination to which the buffer should
/// be directed, which will be returned as a SocketAddr;
fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> SocketAddr;
-}
-
-/// Decoding of frames via buffers.
-///
-/// This trait is used when constructing an instance of `FramedUdp`. It provides
-/// one type: `In` for decoding incoming frames from a Datagram
-///
-/// Because UDP is a connectionless protocol, the decode method will also be
-/// supplied with a SocketAddr of the remote host which sent the datagram
-///
-/// The trait itself is implemented on a type that can track state for decoding
-/// or encoding, which is particularly useful for streaming parsers. In many
-/// cases, though, this type will simply be a unit struct (e.g. `struct
-/// HttpCodec`).
-pub trait DecodeUdp {
- /// The type of decoded frames.
- type In;
-
+
/// Attempts to decode a frame from the provided buffer of bytes.
///
/// This method is called by `FramedUdp` on a single datagram which has been
/// read from a socket.
///
- /// It is required that the Decoder empty the read buffer in every call to
+ /// It is required that the decode method empty the read buffer in every call to
/// decode, as the next poll_read that occurs will write the next datagram
/// into the buffer, without regard for what is already there.
///
/// If the bytes look valid, but a frame isn't fully available yet, then
/// `Ok(None)` is returned. This indicates to the `Framed` instance that
/// it needs to read some more bytes before calling this method again.
- /// In such a case, it is the decoder's responsibility to copy the data
+ /// In such a case, it is decode's responsibility to copy the data
/// into their own internal buffer for future use.
///
/// Finally, if the bytes in the buffer are malformed then an error is
/// returned indicating why. This informs `Framed` that the stream is now
/// corrupt and should be terminated.
///
- /// When dealing with connectionless streams, there will likely be some sort
- /// of state machine.
fn decode(&mut self, src: &SocketAddr, buf: &mut Vec<u8>) -> Result<Option<Self::In>, io::Error>;
}
/// A unified `Stream` and `Sink` interface to an underlying `Io` object, using
-/// the `Encode` and `Decode` traits to encode and decode frames.
+/// the `CodecUdp` trait to encode and decode frames.
///
/// You can acquire a `Framed` instance by using the `Io::framed` adapter.
-pub struct FramedUdp<D, E> {
+pub struct FramedUdp<C> {
socket: UdpSocket,
- encoder: E,
- decoder: D,
+ codec: C,
out_addr : Option<SocketAddr>,
rd: Vec<u8>,
wr: Vec<u8>,
}
-impl<D : DecodeUdp, E : EncodeUdp> Stream for Framed<D, E> {
- type Item = D::In;
+impl<C : CodecUdp> Stream for FramedUdp<C> {
+ type Item = C::In;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<C::In>, io::Error> {
loop {
- let before = self.rd.len();
- let ret = self.socket.recv_from(self.rd.mut_bytes(), &mut inaddr);
+ let ret = self.socket.recv_from(self.rd.as_mut_slice());
match ret {
Ok((n, addr)) => {
trace!("read {} bytes", n);
trace!("attempting to decode a frame");
- if let Some(frame) = try!(self.decoder.decode(&addr, &mut self.rd)) {
+ if let Some(frame) = try!(self.codec.decode(&addr, &mut self.rd)) {
trace!("frame decoded from buffer");
self.rd.clear();
return Ok(Async::Ready(Some(frame)));
}
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- if self.rd.len() == before {
- return Ok(Async::NotReady)
- }
+ return Ok(Async::NotReady)
}
Err(e) => return Err(e),
}
@@ -120,11 +97,11 @@ impl<D : DecodeUdp, E : EncodeUdp> Stream for Framed<D, E> {
}
}
-impl<D : DecodeUdp, E : EncodeUdp> Sink for Framed<D, E> {
- type SinkItem = E::Out;
+impl<C : CodecUdp> Sink for FramedUdp<C> {
+ type SinkItem = C::Out;
type SinkError = io::Error;
- fn start_send(&mut self, item: C::Out) -> StartSend<E::Out, io::Error> {
+ fn start_send(&mut self, item: C::Out) -> StartSend<C::Out, io::Error> {
if self.wr.len() > 0 {
try!(self.poll_complete());
if self.wr.len() > 0 {
@@ -140,9 +117,9 @@ impl<D : DecodeUdp, E : EncodeUdp> Sink for Framed<D, E> {
trace!("flushing framed transport");
while !self.wr.is_empty() {
- if let Some(outaddr) = self.out_addr.ref() {
+ if let Some(outaddr) = self.out_addr {
trace!("writing; remaining={}", self.wr.len());
- let n = try_nb!(self.socket.send_to(&self.wr, outaddr));
+ let n = try_nb!(self.socket.send_to(&self.wr, &outaddr));
self.wr.clear();
self.out_addr = None;
if n != self.wr.len() {
@@ -160,22 +137,37 @@ impl<D : DecodeUdp, E : EncodeUdp> Sink for Framed<D, E> {
}
}
-pub fn framed_udp<D, E>(socket : UdpSocket, decoder : D, encoder : E) -> Framed<D, E> {
- Framed {
- socket: socket,
- encoder: encoder,
- decoder: decoder,
- rd: Vec::with_capacity(64 * 1024),
- wr: Vec::with_capacity(64 * 1024)
- }
+/// Helper function that Creates a new FramedUdp object. It moves the supplied socket, codec
+/// into the resulting FramedUdp
+pub fn framed_udp<C>(socket : UdpSocket, codec : C) -> FramedUdp<C> {
+ FramedUdp::new(
+ socket,
+ codec,
+ Vec::with_capacity(64 * 1024),
+ Vec::with_capacity(64 * 1024)
+ )
}
-impl<D, E> FramedUdp<D, E> {
+impl<C> FramedUdp<C> {
+
+ /// Creates a new FramedUdp object. It moves the supplied socket, codec
+ /// supplied vecs.
+ pub fn new(sock : UdpSocket, codec : C, rd_buf : Vec<u8>, wr_buf : Vec<u8>) -> FramedUdp<C> {
+ FramedUdp {
+ socket: sock,
+ codec : codec,
+ out_addr: None,
+ rd: rd_buf,
+ wr: wr_buf
+ }
+ }
+
+
/// Splits this `Stream + Sink` object into separate `Stream` and `Sink`
/// objects, which can be useful when you want to split ownership between
/// tasks, or allow direct interaction between the two objects (e.g. via
/// `Sink::send_all`).
- pub fn split(self) -> (FramedRead<D>, FramedWrite<E>) {
+ pub fn split(self) -> (FramedUdpRead<C>, FramedUdpWrite<C>) {
let (a, b) = BiLock::new(self);
let read = FramedUdpRead { framed: a };
let write = FramedUdpWrite { framed: b };
@@ -210,17 +202,17 @@ impl<D, E> FramedUdp<D, E> {
self.socket
}
}
-/// A `Stream` interface to an underlying `Io` object, using the `Decode` trait
+/// A `Stream` interface to an underlying `Io` object, using the `CodecUdp` trait
/// to decode frames.
-pub struct FramedRead<D, E> {
- framed: BiLock<Framed<D, E>>,
+pub struct FramedUdpRead<C> {
+ framed: BiLock<FramedUdp<C>>,
}
-impl<D : DecodeUdp, E : EncodeUdp> Stream for FramedRead<D, E> {
- type Item = D::In;
+impl<C : CodecUdp> Stream for FramedUdpRead<C> {
+ type Item = C::In;
type Error = io::Error;
- fn poll(&mut self) -> Poll<Option<D::In>, io::Error> {
+ fn poll(&mut self) -> Poll<Option<C::In>, io::Error> {
if let Async::Ready(mut guard) = self.framed.poll_lock() {
guard.poll()
} else {
@@ -229,17 +221,17 @@ impl<D : DecodeUdp, E : EncodeUdp> Stream for FramedRead<D, E> {
}
}
-/// A `Sink` interface to an underlying `Io` object, using the `Encode` trait
+/// A `Sink` interface to an underlying `Io` object, using the `CodecUdp` trait
/// to encode frames.
-pub struct FramedWrite<D, E> {
- framed: BiLock<Framed<D, E>>,
+pub struct FramedUdpWrite<C> {
+ framed: BiLock<FramedUdp<C>>,
}
-impl<D : DecodeUdp, E : EncodeUdp> Sink for FramedWrite<D, E> {
- type SinkItem = E::Out;
+impl<C : CodecUdp> Sink for FramedUdpWrite<C> {
+ type SinkItem = C::Out;
type SinkError = io::Error;
- fn start_send(&mut self, item: E::Out) -> StartSend<E::Out, io::Error> {
+ fn start_send(&mut self, item: C::Out) -> StartSend<C::Out, io::Error> {
if let Async::Ready(mut guard) = self.framed.poll_lock() {
guard.start_send(item)
} else {