diff options
author | Rick Richardson <rick.richardson@gmail.com> | 2016-11-20 09:08:03 -0800 |
---|---|---|
committer | Rick Richardson <rick.richardson@gmail.com> | 2016-11-20 09:08:03 -0800 |
commit | 71d8672aab2b6c4712942783920e01db578eac9c (patch) | |
tree | e27160d255539b3bb45bee0d57179569708d074a /src/io | |
parent | 592a99bca4e760d00057fc3927c3c6f164e353e2 (diff) |
implemented moste of udp frames test
Diffstat (limited to 'src/io')
-rw-r--r-- | src/io/mod.rs | 2 | ||||
-rw-r--r-- | src/io/udp_frame.rs | 128 |
2 files changed, 62 insertions, 68 deletions
diff --git a/src/io/mod.rs b/src/io/mod.rs index 15aba3b7..69417489 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -33,6 +33,7 @@ macro_rules! try_nb { mod copy; mod frame; +mod udp_frame; mod flush; mod read_exact; mod read_to_end; @@ -43,6 +44,7 @@ mod window; mod write_all; pub use self::copy::{copy, Copy}; pub use self::frame::{EasyBuf, EasyBufMut, FramedRead, FramedWrite, Framed, Codec}; +pub use self::udp_frame::{FramedUdp, framed_udp, FramedUdpRead, FramedUdpWrite, CodecUdp}; pub use self::flush::{flush, Flush}; pub use self::read_exact::{read_exact, ReadExact}; pub use self::read_to_end::{read_to_end, ReadToEnd}; diff --git a/src/io/udp_frame.rs b/src/io/udp_frame.rs index 9c1eeb95..a1b64db0 100644 --- a/src/io/udp_frame.rs +++ b/src/io/udp_frame.rs @@ -1,12 +1,9 @@ use std::io; -use std::ops::{Deref, DerefMut}; -use std::sync::Arc; -use net::udp::UdpSocket +use std::net::SocketAddr; +use net::UdpSocket; use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink}; use futures::sync::BiLock; -use io::Io; - /// Encoding of frames via buffers. /// /// This trait is used when constructing an instance of `FramedUdp`. It provides @@ -20,10 +17,13 @@ use io::Io; /// or encoding, which is particularly useful for streaming parsers. In many /// cases, though, this type will simply be a unit struct (e.g. `struct /// HttpCodec`). -pub trait EncodeUdp { +pub trait CodecUdp { /// The type of frames to be encoded. type Out; + + /// The type of decoded frames. + type In; /// Encodes a frame into the buffer provided. @@ -32,87 +32,64 @@ pub trait EncodeUdp { /// The `buf` provided is an internal buffer of the `Framed` instance and /// will be written out when possible. /// - /// The codec also determines the destination to which the buffer should + /// The encode method also determines the destination to which the buffer should /// be directed, which will be returned as a SocketAddr; fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> SocketAddr; -} - -/// Decoding of frames via buffers. -/// -/// This trait is used when constructing an instance of `FramedUdp`. It provides -/// one type: `In` for decoding incoming frames from a Datagram -/// -/// Because UDP is a connectionless protocol, the decode method will also be -/// supplied with a SocketAddr of the remote host which sent the datagram -/// -/// The trait itself is implemented on a type that can track state for decoding -/// or encoding, which is particularly useful for streaming parsers. In many -/// cases, though, this type will simply be a unit struct (e.g. `struct -/// HttpCodec`). -pub trait DecodeUdp { - /// The type of decoded frames. - type In; - + /// Attempts to decode a frame from the provided buffer of bytes. /// /// This method is called by `FramedUdp` on a single datagram which has been /// read from a socket. /// - /// It is required that the Decoder empty the read buffer in every call to + /// It is required that the decode method empty the read buffer in every call to /// decode, as the next poll_read that occurs will write the next datagram /// into the buffer, without regard for what is already there. /// /// If the bytes look valid, but a frame isn't fully available yet, then /// `Ok(None)` is returned. This indicates to the `Framed` instance that /// it needs to read some more bytes before calling this method again. - /// In such a case, it is the decoder's responsibility to copy the data + /// In such a case, it is decode's responsibility to copy the data /// into their own internal buffer for future use. /// /// Finally, if the bytes in the buffer are malformed then an error is /// returned indicating why. This informs `Framed` that the stream is now /// corrupt and should be terminated. /// - /// When dealing with connectionless streams, there will likely be some sort - /// of state machine. fn decode(&mut self, src: &SocketAddr, buf: &mut Vec<u8>) -> Result<Option<Self::In>, io::Error>; } /// A unified `Stream` and `Sink` interface to an underlying `Io` object, using -/// the `Encode` and `Decode` traits to encode and decode frames. +/// the `CodecUdp` trait to encode and decode frames. /// /// You can acquire a `Framed` instance by using the `Io::framed` adapter. -pub struct FramedUdp<D, E> { +pub struct FramedUdp<C> { socket: UdpSocket, - encoder: E, - decoder: D, + codec: C, out_addr : Option<SocketAddr>, rd: Vec<u8>, wr: Vec<u8>, } -impl<D : DecodeUdp, E : EncodeUdp> Stream for Framed<D, E> { - type Item = D::In; +impl<C : CodecUdp> Stream for FramedUdp<C> { + type Item = C::In; type Error = io::Error; fn poll(&mut self) -> Poll<Option<C::In>, io::Error> { loop { - let before = self.rd.len(); - let ret = self.socket.recv_from(self.rd.mut_bytes(), &mut inaddr); + let ret = self.socket.recv_from(self.rd.as_mut_slice()); match ret { Ok((n, addr)) => { trace!("read {} bytes", n); trace!("attempting to decode a frame"); - if let Some(frame) = try!(self.decoder.decode(&addr, &mut self.rd)) { + if let Some(frame) = try!(self.codec.decode(&addr, &mut self.rd)) { trace!("frame decoded from buffer"); self.rd.clear(); return Ok(Async::Ready(Some(frame))); } } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - if self.rd.len() == before { - return Ok(Async::NotReady) - } + return Ok(Async::NotReady) } Err(e) => return Err(e), } @@ -120,11 +97,11 @@ impl<D : DecodeUdp, E : EncodeUdp> Stream for Framed<D, E> { } } -impl<D : DecodeUdp, E : EncodeUdp> Sink for Framed<D, E> { - type SinkItem = E::Out; +impl<C : CodecUdp> Sink for FramedUdp<C> { + type SinkItem = C::Out; type SinkError = io::Error; - fn start_send(&mut self, item: C::Out) -> StartSend<E::Out, io::Error> { + fn start_send(&mut self, item: C::Out) -> StartSend<C::Out, io::Error> { if self.wr.len() > 0 { try!(self.poll_complete()); if self.wr.len() > 0 { @@ -140,9 +117,9 @@ impl<D : DecodeUdp, E : EncodeUdp> Sink for Framed<D, E> { trace!("flushing framed transport"); while !self.wr.is_empty() { - if let Some(outaddr) = self.out_addr.ref() { + if let Some(outaddr) = self.out_addr { trace!("writing; remaining={}", self.wr.len()); - let n = try_nb!(self.socket.send_to(&self.wr, outaddr)); + let n = try_nb!(self.socket.send_to(&self.wr, &outaddr)); self.wr.clear(); self.out_addr = None; if n != self.wr.len() { @@ -160,22 +137,37 @@ impl<D : DecodeUdp, E : EncodeUdp> Sink for Framed<D, E> { } } -pub fn framed_udp<D, E>(socket : UdpSocket, decoder : D, encoder : E) -> Framed<D, E> { - Framed { - socket: socket, - encoder: encoder, - decoder: decoder, - rd: Vec::with_capacity(64 * 1024), - wr: Vec::with_capacity(64 * 1024) - } +/// Helper function that Creates a new FramedUdp object. It moves the supplied socket, codec +/// into the resulting FramedUdp +pub fn framed_udp<C>(socket : UdpSocket, codec : C) -> FramedUdp<C> { + FramedUdp::new( + socket, + codec, + Vec::with_capacity(64 * 1024), + Vec::with_capacity(64 * 1024) + ) } -impl<D, E> FramedUdp<D, E> { +impl<C> FramedUdp<C> { + + /// Creates a new FramedUdp object. It moves the supplied socket, codec + /// supplied vecs. + pub fn new(sock : UdpSocket, codec : C, rd_buf : Vec<u8>, wr_buf : Vec<u8>) -> FramedUdp<C> { + FramedUdp { + socket: sock, + codec : codec, + out_addr: None, + rd: rd_buf, + wr: wr_buf + } + } + + /// Splits this `Stream + Sink` object into separate `Stream` and `Sink` /// objects, which can be useful when you want to split ownership between /// tasks, or allow direct interaction between the two objects (e.g. via /// `Sink::send_all`). - pub fn split(self) -> (FramedRead<D>, FramedWrite<E>) { + pub fn split(self) -> (FramedUdpRead<C>, FramedUdpWrite<C>) { let (a, b) = BiLock::new(self); let read = FramedUdpRead { framed: a }; let write = FramedUdpWrite { framed: b }; @@ -210,17 +202,17 @@ impl<D, E> FramedUdp<D, E> { self.socket } } -/// A `Stream` interface to an underlying `Io` object, using the `Decode` trait +/// A `Stream` interface to an underlying `Io` object, using the `CodecUdp` trait /// to decode frames. -pub struct FramedRead<D, E> { - framed: BiLock<Framed<D, E>>, +pub struct FramedUdpRead<C> { + framed: BiLock<FramedUdp<C>>, } -impl<D : DecodeUdp, E : EncodeUdp> Stream for FramedRead<D, E> { - type Item = D::In; +impl<C : CodecUdp> Stream for FramedUdpRead<C> { + type Item = C::In; type Error = io::Error; - fn poll(&mut self) -> Poll<Option<D::In>, io::Error> { + fn poll(&mut self) -> Poll<Option<C::In>, io::Error> { if let Async::Ready(mut guard) = self.framed.poll_lock() { guard.poll() } else { @@ -229,17 +221,17 @@ impl<D : DecodeUdp, E : EncodeUdp> Stream for FramedRead<D, E> { } } -/// A `Sink` interface to an underlying `Io` object, using the `Encode` trait +/// A `Sink` interface to an underlying `Io` object, using the `CodecUdp` trait /// to encode frames. -pub struct FramedWrite<D, E> { - framed: BiLock<Framed<D, E>>, +pub struct FramedUdpWrite<C> { + framed: BiLock<FramedUdp<C>>, } -impl<D : DecodeUdp, E : EncodeUdp> Sink for FramedWrite<D, E> { - type SinkItem = E::Out; +impl<C : CodecUdp> Sink for FramedUdpWrite<C> { + type SinkItem = C::Out; type SinkError = io::Error; - fn start_send(&mut self, item: E::Out) -> StartSend<E::Out, io::Error> { + fn start_send(&mut self, item: C::Out) -> StartSend<C::Out, io::Error> { if let Async::Ready(mut guard) = self.framed.poll_lock() { guard.start_send(item) } else { |