summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--examples/udp-codec.rs10
-rw-r--r--src/io/udp_frame.rs11
-rw-r--r--src/net/udp.rs39
3 files changed, 22 insertions, 38 deletions
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs
index 230c98ff..f899180a 100644
--- a/examples/udp-codec.rs
+++ b/examples/udp-codec.rs
@@ -69,12 +69,10 @@ fn main() {
let client = UdpSocket::bind(&clientaddr, &handle).unwrap();
//start things off by sending a ping from the client to the server
- //This doesn't go through the codec to encode the message, but rather
- //it sends raw data with the send_dgram future
- {
- let job = client.send_dgram(b"PING\n", &srvaddr);
- core.run(job).unwrap();
- }
+ //This doesn't utilize the codec to encode the message, but rather
+ //it sends raw data directly to the remote peer with the send_dgram future
+ let job = client.send_dgram(b"PING\n", srvaddr);
+ let (client, _buf) = core.run(job).unwrap();
//We create a FramedUdp instance, which associates a socket
//with a codec. We then immediate split that into the
diff --git a/src/io/udp_frame.rs b/src/io/udp_frame.rs
index 1996ac59..a9b89cb8 100644
--- a/src/io/udp_frame.rs
+++ b/src/io/udp_frame.rs
@@ -115,7 +115,7 @@ impl<C : CodecUdp> Sink for FramedUdp<C> {
fn poll_complete(&mut self) -> Poll<(), io::Error> {
trace!("flushing framed transport");
- while !self.wr.is_empty() {
+ if !self.wr.is_empty() {
if let Some(outaddr) = self.out_addr {
let remaining = self.wr.len();
trace!("writing; remaining={}", remaining);
@@ -124,13 +124,12 @@ impl<C : CodecUdp> Sink for FramedUdp<C> {
self.wr.clear();
self.out_addr = None;
if n != remaining {
- return Err(io::Error::new(io::ErrorKind::WriteZero,
- "failed to write frame datagram to socket"));
+ return Err(io::Error::new(io::ErrorKind::Other,
+ "failed to write entire datagram to socket"));
}
}
else {
- return Err(io::Error::new(io::ErrorKind::Other,
- "outbound stream in invalid state: out_addr is not known"));
+ panic!("outbound stream in invalid state: out_addr is not known");
}
}
@@ -153,7 +152,7 @@ impl<C> FramedUdp<C> {
/// Creates a new FramedUdp object. It moves the supplied socket, codec
/// supplied vecs.
- pub fn new(sock : UdpSocket, codec : C, rd_buf : Vec<u8>, wr_buf : Vec<u8>) -> FramedUdp<C> {
+ fn new(sock : UdpSocket, codec : C, rd_buf : Vec<u8>, wr_buf : Vec<u8>) -> FramedUdp<C> {
FramedUdp {
socket: sock,
codec : codec,
diff --git a/src/net/udp.rs b/src/net/udp.rs
index 109b759a..7fb6b9a7 100644
--- a/src/net/udp.rs
+++ b/src/net/udp.rs
@@ -1,7 +1,7 @@
use std::io;
use std::net::{self, SocketAddr, Ipv4Addr, Ipv6Addr};
use std::fmt;
-use io::FramedUdp;
+use io::{FramedUdp, framed_udp};
use futures::{Async, Future, Poll};
use mio;
use std::mem;
@@ -47,12 +47,7 @@ impl UdpSocket {
/// and `DecodeUdp` to implement `Stream` and `Sink`
/// This moves the socket into the newly created FramedUdp object
pub fn framed<C>(self, codec : C) -> FramedUdp<C> {
- FramedUdp::new(
- self,
- codec,
- vec![0; 64 * 1024],
- Vec::with_capacity(64 * 1024)
- )
+ framed_udp(self, codec)
}
/// Returns the local address that this stream is bound to.
@@ -114,7 +109,7 @@ impl UdpSocket {
/// be broadly applicable to accepting data which can be converted to a slice.
/// The `Window` struct is also available in this crate to provide a different
/// window into a slice if necessary.
- pub fn send_dgram<'a, T>(&'a self, buf: T, addr : &'a SocketAddr) -> SendDGram<T>
+ pub fn send_dgram<T>(self, buf: T, addr : SocketAddr) -> SendDGram<T>
where T: AsRef<[u8]>,
{
SendDGram {
@@ -294,51 +289,43 @@ impl fmt::Debug for UdpSocket {
/// This is created by the [`write_all`] top-level method.
///
/// [`write_all`]: fn.write_all.html
-pub struct SendDGram<'a, T> {
- state: UdpState<'a, T>,
+pub struct SendDGram<T> {
+ state: UdpState<T>,
}
-enum UdpState<'a, T> {
+enum UdpState<T> {
Writing {
- sock: &'a UdpSocket,
+ sock: UdpSocket,
buf: T,
- addr: &'a SocketAddr,
+ addr: SocketAddr,
},
Empty,
}
-
-fn zero_write() -> io::Error {
- io::Error::new(io::ErrorKind::WriteZero, "zero-length write")
-}
-
fn incomplete_write(reason : &str) -> io::Error {
io::Error::new(io::ErrorKind::Other, reason)
}
-impl<'a, T> Future for SendDGram<'a, T>
+impl<T> Future for SendDGram<T>
where T: AsRef<[u8]>,
{
- type Item = T;
+ type Item = (UdpSocket, T);
type Error = io::Error;
- fn poll(&mut self) -> Poll<T, io::Error> {
+ fn poll(&mut self) -> Poll<(UdpSocket, T), io::Error> {
match self.state {
UdpState::Writing { ref sock, ref buf, ref addr} => {
let buf = buf.as_ref();
let n = try_nb!(sock.send_to(&buf, addr));
- if n == 0 {
- return Err(zero_write())
- }
if n != buf.len() {
return Err(incomplete_write("Failed to send entire message in datagram"))
}
}
- UdpState::Empty => panic!("poll a SendAllTo after it's done"),
+ UdpState::Empty => panic!("poll a SendDGram after it's done"),
}
match mem::replace(&mut self.state, UdpState::Empty) {
- UdpState::Writing { buf, .. } => Ok((buf).into()),
+ UdpState::Writing { sock, buf, .. } => Ok(Async::Ready((sock, (buf).into()))),
UdpState::Empty => panic!(),
}
}