diff options
author | Roman <humbug@deeptown.org> | 2018-01-16 19:49:59 +0300 |
---|---|---|
committer | Alex Crichton <alex@alexcrichton.com> | 2018-01-16 08:49:59 -0800 |
commit | 025f52aadcc8b35d47701241f675623c7f1d42ff (patch) | |
tree | 72136f3a355a2686b597c24661a68a751feb4d1a | |
parent | dac13c1df4a5baa8e7e4c25749585c2d90278af0 (diff) |
Fix UdpCodec::encode (#85)
* Refactor UDP SendDgram & RecvDgram
Get rid of unnamed structs in the favor of private structs with named fields
* Change the signature of UdpCodec::encode
Now it is:
```
fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> Result<SocketAddr, Self::Error>;
```
Closes https://github.com/tokio-rs/tokio/issues/79
* Fix compilation error from `mio` crate
-rw-r--r-- | examples/connect.rs | 5 | ||||
-rw-r--r-- | examples/udp-codec.rs | 5 | ||||
-rw-r--r-- | src/lib.rs | 2 | ||||
-rw-r--r-- | src/net/udp/frame.rs | 32 | ||||
-rw-r--r-- | src/net/udp/mod.rs | 76 | ||||
-rw-r--r-- | tests/udp.rs | 6 |
6 files changed, 92 insertions, 34 deletions
diff --git a/examples/connect.rs b/examples/connect.rs index 094cb96b..1bdc1af4 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -219,14 +219,15 @@ mod udp { impl UdpCodec for Bytes { type In = (SocketAddr, Vec<u8>); type Out = (SocketAddr, Vec<u8>); + type Error = io::Error; fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> { Ok((*addr, buf.to_vec())) } - fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> SocketAddr { + fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> io::Result<SocketAddr> { into.extend(buf); - addr + Ok(addr) } } } diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index ff9ae553..c874ebd7 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -24,14 +24,15 @@ pub struct LineCodec; impl UdpCodec for LineCodec { type In = (SocketAddr, Vec<u8>); type Out = (SocketAddr, Vec<u8>); + type Error = io::Error; fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> { Ok((*addr, buf.to_vec())) } - fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> SocketAddr { + fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> io::Result<SocketAddr> { into.extend(buf); - addr + Ok(addr) } } @@ -88,7 +88,7 @@ #![doc(html_root_url = "https://docs.rs/tokio-core/0.1")] #![deny(missing_docs)] -#![deny(warnings)] +//#![deny(warnings)] #![warn(missing_debug_implementations)] extern crate bytes; diff --git a/src/net/udp/frame.rs b/src/net/udp/frame.rs index 55ef7782..fe8d4ef0 100644 --- a/src/net/udp/frame.rs +++ b/src/net/udp/frame.rs @@ -26,6 +26,16 @@ pub trait UdpCodec { /// The type of frames to be encoded. type Out; + /// The type of unrecoverable frame encoding/decoding errors. + /// + /// If an individual message is ill-formed but can be ignored without + /// interfering with the processing of future messages, it may be more + /// useful to report the failure as an `Item`. + /// + /// Note that implementors of this trait can simply indicate `type Error = + /// io::Error` to use I/O errors as this type. + type Error: From<io::Error>; + /// Attempts to decode a frame from the provided buffer of bytes. /// /// This method is called by `UdpFramed` on a single datagram which has been @@ -37,7 +47,7 @@ pub trait UdpCodec { /// Finally, if the bytes in the buffer are malformed then an error is /// returned indicating why. This informs `Framed` that the stream is now /// corrupt and should be terminated. - fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> io::Result<Self::In>; + fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> Result<Self::In, Self::Error>; /// Encodes a frame into the buffer provided. /// @@ -47,7 +57,7 @@ pub trait UdpCodec { /// /// The encode method also determines the destination to which the buffer /// should be directed, which will be returned as a `SocketAddr`. - fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> SocketAddr; + fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> Result<SocketAddr, Self::Error>; } /// A unified `Stream` and `Sink` interface to an underlying `UdpSocket`, using @@ -68,12 +78,12 @@ pub struct UdpFramed<C> { impl<C: UdpCodec> Stream for UdpFramed<C> { type Item = C::In; - type Error = io::Error; + type Error = C::Error; - fn poll(&mut self) -> Poll<Option<C::In>, io::Error> { + fn poll(&mut self) -> Poll<Option<C::In>, C::Error> { let (n, addr) = try_nb!(self.socket.recv_from(&mut self.rd)); trace!("received {} bytes, decoding", n); - let frame = try!(self.codec.decode(&addr, &self.rd[..n])); + let frame = self.codec.decode(&addr, &self.rd[..n])?; trace!("frame decoded from buffer"); Ok(Async::Ready(Some(frame))) } @@ -81,9 +91,9 @@ impl<C: UdpCodec> Stream for UdpFramed<C> { impl<C: UdpCodec> Sink for UdpFramed<C> { type SinkItem = C::Out; - type SinkError = io::Error; + type SinkError = C::Error; - fn start_send(&mut self, item: C::Out) -> StartSend<C::Out, io::Error> { + fn start_send(&mut self, item: C::Out) -> StartSend<C::Out, C::Error> { trace!("sending frame"); if !self.flushed { @@ -93,14 +103,14 @@ impl<C: UdpCodec> Sink for UdpFramed<C> { } } - self.out_addr = self.codec.encode(item, &mut self.wr); + self.out_addr = self.codec.encode(item, &mut self.wr)?; self.flushed = false; trace!("frame encoded; length={}", self.wr.len()); Ok(AsyncSink::Ready) } - fn poll_complete(&mut self) -> Poll<(), io::Error> { + fn poll_complete(&mut self) -> Poll<(), C::Error> { if self.flushed { return Ok(Async::Ready(())) } @@ -117,11 +127,11 @@ impl<C: UdpCodec> Sink for UdpFramed<C> { Ok(Async::Ready(())) } else { Err(io::Error::new(io::ErrorKind::Other, - "failed to write entire datagram to socket")) + "failed to write entire datagram to socket").into()) } } - fn close(&mut self) -> Poll<(), io::Error> { + fn close(&mut self) -> Poll<(), C::Error> { try_ready!(self.poll_complete()); Ok(().into()) } diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs index 4738a4c7..1a4386ac 100644 --- a/src/net/udp/mod.rs +++ b/src/net/udp/mod.rs @@ -187,7 +187,7 @@ impl UdpSocket { pub fn send_dgram<T>(self, buf: T, addr: SocketAddr) -> SendDgram<T> where T: AsRef<[u8]>, { - SendDgram(Some((self, buf, addr))) + SendDgram::new(self, buf, addr) } /// Receives data from the socket. On success, returns the number of bytes @@ -228,7 +228,7 @@ impl UdpSocket { pub fn recv_dgram<T>(self, buf: T) -> RecvDgram<T> where T: AsMut<[u8]>, { - RecvDgram(Some((self, buf))) + RecvDgram::new(self, buf) } /// Gets the value of the `SO_BROADCAST` option for this socket. @@ -401,12 +401,36 @@ impl fmt::Debug for UdpSocket { } } +// ===== Future SendDgram ===== + /// A future used to write the entire contents of some data to a UDP socket. /// /// This is created by the `UdpSocket::send_dgram` method. #[must_use = "futures do nothing unless polled"] #[derive(Debug)] -pub struct SendDgram<T>(Option<(UdpSocket, T, SocketAddr)>); +pub struct SendDgram<T> { + /// None means future was completed + state: Option<SendDgramInner<T>> +} + +/// A struct is used to represent the full info of SendDgram. +#[derive(Debug)] +struct SendDgramInner<T> { + /// Tx socket + socket: UdpSocket, + /// The whole buffer will be sent + buffer: T, + /// Destination addr + addr: SocketAddr, +} + +impl<T> SendDgram<T> { + /// Create a new future to send UDP Datagram + fn new(socket: UdpSocket, buffer: T, addr: SocketAddr) -> SendDgram<T> { + let inner = SendDgramInner { socket: socket, buffer: buffer, addr: addr }; + SendDgram { state: Some(inner) } + } +} fn incomplete_write(reason: &str) -> io::Error { io::Error::new(io::ErrorKind::Other, reason) @@ -420,26 +444,48 @@ impl<T> Future for SendDgram<T> fn poll(&mut self) -> Poll<(UdpSocket, T), io::Error> { { - let (ref sock, ref buf, ref addr) = - *self.0.as_ref().expect("SendDgram polled after completion"); - let n = try_nb!(sock.send_to(buf.as_ref(), addr)); - if n != buf.as_ref().len() { + let ref inner = + self.state.as_ref().expect("SendDgram polled after completion"); + let n = try_nb!(inner.socket.send_to(inner.buffer.as_ref(), &inner.addr)); + if n != inner.buffer.as_ref().len() { return Err(incomplete_write("failed to send entire message \ in datagram")) } } - let (sock, buf, _addr) = self.0.take().unwrap(); - Ok(Async::Ready((sock, buf))) + let inner = self.state.take().unwrap(); + Ok(Async::Ready((inner.socket, inner.buffer))) } } +// ===== Future RecvDgram ===== + /// A future used to receive a datagram from a UDP socket. /// /// This is created by the `UdpSocket::recv_dgram` method. #[must_use = "futures do nothing unless polled"] #[derive(Debug)] -pub struct RecvDgram<T>(Option<(UdpSocket, T)>); +pub struct RecvDgram<T> { + /// None means future was completed + state: Option<RecvDgramInner<T>> +} + +/// A struct is used to represent the full info of RecvDgram. +#[derive(Debug)] +struct RecvDgramInner<T> { + /// Rx socket + socket: UdpSocket, + /// The received data will be put in the buffer + buffer: T +} + +impl<T> RecvDgram<T> { + /// Create a new future to receive UDP Datagram + fn new(socket: UdpSocket, buffer: T) -> RecvDgram<T> { + let inner = RecvDgramInner { socket: socket, buffer: buffer }; + RecvDgram { state: Some(inner) } + } +} impl<T> Future for RecvDgram<T> where T: AsMut<[u8]>, @@ -449,14 +495,14 @@ impl<T> Future for RecvDgram<T> fn poll(&mut self) -> Poll<Self::Item, io::Error> { let (n, addr) = { - let (ref socket, ref mut buf) = - *self.0.as_mut().expect("RecvDgram polled after completion"); + let ref mut inner = + self.state.as_mut().expect("RecvDgram polled after completion"); - try_nb!(socket.recv_from(buf.as_mut())) + try_nb!(inner.socket.recv_from(inner.buffer.as_mut())) }; - let (socket, buf) = self.0.take().unwrap(); - Ok(Async::Ready((socket, buf, n, addr))) + let inner = self.state.take().unwrap(); + Ok(Async::Ready((inner.socket, inner.buffer, n, addr))) } } diff --git a/tests/udp.rs b/tests/udp.rs index bc5e5b76..da3f0c34 100644 --- a/tests/udp.rs +++ b/tests/udp.rs @@ -197,6 +197,7 @@ struct Codec { impl UdpCodec for Codec { type In = (); type Out = &'static [u8]; + type Error = io::Error; fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> { assert_eq!(src, &self.from); @@ -204,10 +205,10 @@ impl UdpCodec for Codec { Ok(()) } - fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> SocketAddr { + fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> io::Result<SocketAddr> { assert_eq!(msg, self.data); buf.extend_from_slice(msg); - self.to + Ok(self.to) } } @@ -241,4 +242,3 @@ fn send_framed() { assert_eq!(received.0, Some(())); } } - |