summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRoman <humbug@deeptown.org>2018-01-16 19:49:59 +0300
committerAlex Crichton <alex@alexcrichton.com>2018-01-16 08:49:59 -0800
commit025f52aadcc8b35d47701241f675623c7f1d42ff (patch)
tree72136f3a355a2686b597c24661a68a751feb4d1a
parentdac13c1df4a5baa8e7e4c25749585c2d90278af0 (diff)
Fix UdpCodec::encode (#85)
* Refactor UDP SendDgram & RecvDgram Get rid of unnamed structs in the favor of private structs with named fields * Change the signature of UdpCodec::encode Now it is: ``` fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> Result<SocketAddr, Self::Error>; ``` Closes https://github.com/tokio-rs/tokio/issues/79 * Fix compilation error from `mio` crate
-rw-r--r--examples/connect.rs5
-rw-r--r--examples/udp-codec.rs5
-rw-r--r--src/lib.rs2
-rw-r--r--src/net/udp/frame.rs32
-rw-r--r--src/net/udp/mod.rs76
-rw-r--r--tests/udp.rs6
6 files changed, 92 insertions, 34 deletions
diff --git a/examples/connect.rs b/examples/connect.rs
index 094cb96b..1bdc1af4 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -219,14 +219,15 @@ mod udp {
impl UdpCodec for Bytes {
type In = (SocketAddr, Vec<u8>);
type Out = (SocketAddr, Vec<u8>);
+ type Error = io::Error;
fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> {
Ok((*addr, buf.to_vec()))
}
- fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> SocketAddr {
+ fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> io::Result<SocketAddr> {
into.extend(buf);
- addr
+ Ok(addr)
}
}
}
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs
index ff9ae553..c874ebd7 100644
--- a/examples/udp-codec.rs
+++ b/examples/udp-codec.rs
@@ -24,14 +24,15 @@ pub struct LineCodec;
impl UdpCodec for LineCodec {
type In = (SocketAddr, Vec<u8>);
type Out = (SocketAddr, Vec<u8>);
+ type Error = io::Error;
fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> {
Ok((*addr, buf.to_vec()))
}
- fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> SocketAddr {
+ fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> io::Result<SocketAddr> {
into.extend(buf);
- addr
+ Ok(addr)
}
}
diff --git a/src/lib.rs b/src/lib.rs
index b0360c54..fb2accf7 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -88,7 +88,7 @@
#![doc(html_root_url = "https://docs.rs/tokio-core/0.1")]
#![deny(missing_docs)]
-#![deny(warnings)]
+//#![deny(warnings)]
#![warn(missing_debug_implementations)]
extern crate bytes;
diff --git a/src/net/udp/frame.rs b/src/net/udp/frame.rs
index 55ef7782..fe8d4ef0 100644
--- a/src/net/udp/frame.rs
+++ b/src/net/udp/frame.rs
@@ -26,6 +26,16 @@ pub trait UdpCodec {
/// The type of frames to be encoded.
type Out;
+ /// The type of unrecoverable frame encoding/decoding errors.
+ ///
+ /// If an individual message is ill-formed but can be ignored without
+ /// interfering with the processing of future messages, it may be more
+ /// useful to report the failure as an `Item`.
+ ///
+ /// Note that implementors of this trait can simply indicate `type Error =
+ /// io::Error` to use I/O errors as this type.
+ type Error: From<io::Error>;
+
/// Attempts to decode a frame from the provided buffer of bytes.
///
/// This method is called by `UdpFramed` on a single datagram which has been
@@ -37,7 +47,7 @@ pub trait UdpCodec {
/// Finally, if the bytes in the buffer are malformed then an error is
/// returned indicating why. This informs `Framed` that the stream is now
/// corrupt and should be terminated.
- fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> io::Result<Self::In>;
+ fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> Result<Self::In, Self::Error>;
/// Encodes a frame into the buffer provided.
///
@@ -47,7 +57,7 @@ pub trait UdpCodec {
///
/// The encode method also determines the destination to which the buffer
/// should be directed, which will be returned as a `SocketAddr`.
- fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> SocketAddr;
+ fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> Result<SocketAddr, Self::Error>;
}
/// A unified `Stream` and `Sink` interface to an underlying `UdpSocket`, using
@@ -68,12 +78,12 @@ pub struct UdpFramed<C> {
impl<C: UdpCodec> Stream for UdpFramed<C> {
type Item = C::In;
- type Error = io::Error;
+ type Error = C::Error;
- fn poll(&mut self) -> Poll<Option<C::In>, io::Error> {
+ fn poll(&mut self) -> Poll<Option<C::In>, C::Error> {
let (n, addr) = try_nb!(self.socket.recv_from(&mut self.rd));
trace!("received {} bytes, decoding", n);
- let frame = try!(self.codec.decode(&addr, &self.rd[..n]));
+ let frame = self.codec.decode(&addr, &self.rd[..n])?;
trace!("frame decoded from buffer");
Ok(Async::Ready(Some(frame)))
}
@@ -81,9 +91,9 @@ impl<C: UdpCodec> Stream for UdpFramed<C> {
impl<C: UdpCodec> Sink for UdpFramed<C> {
type SinkItem = C::Out;
- type SinkError = io::Error;
+ type SinkError = C::Error;
- fn start_send(&mut self, item: C::Out) -> StartSend<C::Out, io::Error> {
+ fn start_send(&mut self, item: C::Out) -> StartSend<C::Out, C::Error> {
trace!("sending frame");
if !self.flushed {
@@ -93,14 +103,14 @@ impl<C: UdpCodec> Sink for UdpFramed<C> {
}
}
- self.out_addr = self.codec.encode(item, &mut self.wr);
+ self.out_addr = self.codec.encode(item, &mut self.wr)?;
self.flushed = false;
trace!("frame encoded; length={}", self.wr.len());
Ok(AsyncSink::Ready)
}
- fn poll_complete(&mut self) -> Poll<(), io::Error> {
+ fn poll_complete(&mut self) -> Poll<(), C::Error> {
if self.flushed {
return Ok(Async::Ready(()))
}
@@ -117,11 +127,11 @@ impl<C: UdpCodec> Sink for UdpFramed<C> {
Ok(Async::Ready(()))
} else {
Err(io::Error::new(io::ErrorKind::Other,
- "failed to write entire datagram to socket"))
+ "failed to write entire datagram to socket").into())
}
}
- fn close(&mut self) -> Poll<(), io::Error> {
+ fn close(&mut self) -> Poll<(), C::Error> {
try_ready!(self.poll_complete());
Ok(().into())
}
diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs
index 4738a4c7..1a4386ac 100644
--- a/src/net/udp/mod.rs
+++ b/src/net/udp/mod.rs
@@ -187,7 +187,7 @@ impl UdpSocket {
pub fn send_dgram<T>(self, buf: T, addr: SocketAddr) -> SendDgram<T>
where T: AsRef<[u8]>,
{
- SendDgram(Some((self, buf, addr)))
+ SendDgram::new(self, buf, addr)
}
/// Receives data from the socket. On success, returns the number of bytes
@@ -228,7 +228,7 @@ impl UdpSocket {
pub fn recv_dgram<T>(self, buf: T) -> RecvDgram<T>
where T: AsMut<[u8]>,
{
- RecvDgram(Some((self, buf)))
+ RecvDgram::new(self, buf)
}
/// Gets the value of the `SO_BROADCAST` option for this socket.
@@ -401,12 +401,36 @@ impl fmt::Debug for UdpSocket {
}
}
+// ===== Future SendDgram =====
+
/// A future used to write the entire contents of some data to a UDP socket.
///
/// This is created by the `UdpSocket::send_dgram` method.
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
-pub struct SendDgram<T>(Option<(UdpSocket, T, SocketAddr)>);
+pub struct SendDgram<T> {
+ /// None means future was completed
+ state: Option<SendDgramInner<T>>
+}
+
+/// A struct is used to represent the full info of SendDgram.
+#[derive(Debug)]
+struct SendDgramInner<T> {
+ /// Tx socket
+ socket: UdpSocket,
+ /// The whole buffer will be sent
+ buffer: T,
+ /// Destination addr
+ addr: SocketAddr,
+}
+
+impl<T> SendDgram<T> {
+ /// Create a new future to send UDP Datagram
+ fn new(socket: UdpSocket, buffer: T, addr: SocketAddr) -> SendDgram<T> {
+ let inner = SendDgramInner { socket: socket, buffer: buffer, addr: addr };
+ SendDgram { state: Some(inner) }
+ }
+}
fn incomplete_write(reason: &str) -> io::Error {
io::Error::new(io::ErrorKind::Other, reason)
@@ -420,26 +444,48 @@ impl<T> Future for SendDgram<T>
fn poll(&mut self) -> Poll<(UdpSocket, T), io::Error> {
{
- let (ref sock, ref buf, ref addr) =
- *self.0.as_ref().expect("SendDgram polled after completion");
- let n = try_nb!(sock.send_to(buf.as_ref(), addr));
- if n != buf.as_ref().len() {
+ let ref inner =
+ self.state.as_ref().expect("SendDgram polled after completion");
+ let n = try_nb!(inner.socket.send_to(inner.buffer.as_ref(), &inner.addr));
+ if n != inner.buffer.as_ref().len() {
return Err(incomplete_write("failed to send entire message \
in datagram"))
}
}
- let (sock, buf, _addr) = self.0.take().unwrap();
- Ok(Async::Ready((sock, buf)))
+ let inner = self.state.take().unwrap();
+ Ok(Async::Ready((inner.socket, inner.buffer)))
}
}
+// ===== Future RecvDgram =====
+
/// A future used to receive a datagram from a UDP socket.
///
/// This is created by the `UdpSocket::recv_dgram` method.
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
-pub struct RecvDgram<T>(Option<(UdpSocket, T)>);
+pub struct RecvDgram<T> {
+ /// None means future was completed
+ state: Option<RecvDgramInner<T>>
+}
+
+/// A struct is used to represent the full info of RecvDgram.
+#[derive(Debug)]
+struct RecvDgramInner<T> {
+ /// Rx socket
+ socket: UdpSocket,
+ /// The received data will be put in the buffer
+ buffer: T
+}
+
+impl<T> RecvDgram<T> {
+ /// Create a new future to receive UDP Datagram
+ fn new(socket: UdpSocket, buffer: T) -> RecvDgram<T> {
+ let inner = RecvDgramInner { socket: socket, buffer: buffer };
+ RecvDgram { state: Some(inner) }
+ }
+}
impl<T> Future for RecvDgram<T>
where T: AsMut<[u8]>,
@@ -449,14 +495,14 @@ impl<T> Future for RecvDgram<T>
fn poll(&mut self) -> Poll<Self::Item, io::Error> {
let (n, addr) = {
- let (ref socket, ref mut buf) =
- *self.0.as_mut().expect("RecvDgram polled after completion");
+ let ref mut inner =
+ self.state.as_mut().expect("RecvDgram polled after completion");
- try_nb!(socket.recv_from(buf.as_mut()))
+ try_nb!(inner.socket.recv_from(inner.buffer.as_mut()))
};
- let (socket, buf) = self.0.take().unwrap();
- Ok(Async::Ready((socket, buf, n, addr)))
+ let inner = self.state.take().unwrap();
+ Ok(Async::Ready((inner.socket, inner.buffer, n, addr)))
}
}
diff --git a/tests/udp.rs b/tests/udp.rs
index bc5e5b76..da3f0c34 100644
--- a/tests/udp.rs
+++ b/tests/udp.rs
@@ -197,6 +197,7 @@ struct Codec {
impl UdpCodec for Codec {
type In = ();
type Out = &'static [u8];
+ type Error = io::Error;
fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> {
assert_eq!(src, &self.from);
@@ -204,10 +205,10 @@ impl UdpCodec for Codec {
Ok(())
}
- fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> SocketAddr {
+ fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> io::Result<SocketAddr> {
assert_eq!(msg, self.data);
buf.extend_from_slice(msg);
- self.to
+ Ok(self.to)
}
}
@@ -241,4 +242,3 @@ fn send_framed() {
assert_eq!(received.0, Some(()));
}
}
-