diff options
-rw-r--r-- | examples/udp-codec.rs | 10 | ||||
-rw-r--r-- | src/io/udp_frame.rs | 11 | ||||
-rw-r--r-- | src/net/udp.rs | 39 |
3 files changed, 22 insertions, 38 deletions
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index 230c98ff..f899180a 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -69,12 +69,10 @@ fn main() { let client = UdpSocket::bind(&clientaddr, &handle).unwrap(); //start things off by sending a ping from the client to the server - //This doesn't go through the codec to encode the message, but rather - //it sends raw data with the send_dgram future - { - let job = client.send_dgram(b"PING\n", &srvaddr); - core.run(job).unwrap(); - } + //This doesn't utilize the codec to encode the message, but rather + //it sends raw data directly to the remote peer with the send_dgram future + let job = client.send_dgram(b"PING\n", srvaddr); + let (client, _buf) = core.run(job).unwrap(); //We create a FramedUdp instance, which associates a socket //with a codec. We then immediate split that into the diff --git a/src/io/udp_frame.rs b/src/io/udp_frame.rs index 1996ac59..a9b89cb8 100644 --- a/src/io/udp_frame.rs +++ b/src/io/udp_frame.rs @@ -115,7 +115,7 @@ impl<C : CodecUdp> Sink for FramedUdp<C> { fn poll_complete(&mut self) -> Poll<(), io::Error> { trace!("flushing framed transport"); - while !self.wr.is_empty() { + if !self.wr.is_empty() { if let Some(outaddr) = self.out_addr { let remaining = self.wr.len(); trace!("writing; remaining={}", remaining); @@ -124,13 +124,12 @@ impl<C : CodecUdp> Sink for FramedUdp<C> { self.wr.clear(); self.out_addr = None; if n != remaining { - return Err(io::Error::new(io::ErrorKind::WriteZero, - "failed to write frame datagram to socket")); + return Err(io::Error::new(io::ErrorKind::Other, + "failed to write entire datagram to socket")); } } else { - return Err(io::Error::new(io::ErrorKind::Other, - "outbound stream in invalid state: out_addr is not known")); + panic!("outbound stream in invalid state: out_addr is not known"); } } @@ -153,7 +152,7 @@ impl<C> FramedUdp<C> { /// Creates a new FramedUdp object. It moves the supplied socket, codec /// supplied vecs. - pub fn new(sock : UdpSocket, codec : C, rd_buf : Vec<u8>, wr_buf : Vec<u8>) -> FramedUdp<C> { + fn new(sock : UdpSocket, codec : C, rd_buf : Vec<u8>, wr_buf : Vec<u8>) -> FramedUdp<C> { FramedUdp { socket: sock, codec : codec, diff --git a/src/net/udp.rs b/src/net/udp.rs index 109b759a..7fb6b9a7 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -1,7 +1,7 @@ use std::io; use std::net::{self, SocketAddr, Ipv4Addr, Ipv6Addr}; use std::fmt; -use io::FramedUdp; +use io::{FramedUdp, framed_udp}; use futures::{Async, Future, Poll}; use mio; use std::mem; @@ -47,12 +47,7 @@ impl UdpSocket { /// and `DecodeUdp` to implement `Stream` and `Sink` /// This moves the socket into the newly created FramedUdp object pub fn framed<C>(self, codec : C) -> FramedUdp<C> { - FramedUdp::new( - self, - codec, - vec![0; 64 * 1024], - Vec::with_capacity(64 * 1024) - ) + framed_udp(self, codec) } /// Returns the local address that this stream is bound to. @@ -114,7 +109,7 @@ impl UdpSocket { /// be broadly applicable to accepting data which can be converted to a slice. /// The `Window` struct is also available in this crate to provide a different /// window into a slice if necessary. - pub fn send_dgram<'a, T>(&'a self, buf: T, addr : &'a SocketAddr) -> SendDGram<T> + pub fn send_dgram<T>(self, buf: T, addr : SocketAddr) -> SendDGram<T> where T: AsRef<[u8]>, { SendDGram { @@ -294,51 +289,43 @@ impl fmt::Debug for UdpSocket { /// This is created by the [`write_all`] top-level method. /// /// [`write_all`]: fn.write_all.html -pub struct SendDGram<'a, T> { - state: UdpState<'a, T>, +pub struct SendDGram<T> { + state: UdpState<T>, } -enum UdpState<'a, T> { +enum UdpState<T> { Writing { - sock: &'a UdpSocket, + sock: UdpSocket, buf: T, - addr: &'a SocketAddr, + addr: SocketAddr, }, Empty, } - -fn zero_write() -> io::Error { - io::Error::new(io::ErrorKind::WriteZero, "zero-length write") -} - fn incomplete_write(reason : &str) -> io::Error { io::Error::new(io::ErrorKind::Other, reason) } -impl<'a, T> Future for SendDGram<'a, T> +impl<T> Future for SendDGram<T> where T: AsRef<[u8]>, { - type Item = T; + type Item = (UdpSocket, T); type Error = io::Error; - fn poll(&mut self) -> Poll<T, io::Error> { + fn poll(&mut self) -> Poll<(UdpSocket, T), io::Error> { match self.state { UdpState::Writing { ref sock, ref buf, ref addr} => { let buf = buf.as_ref(); let n = try_nb!(sock.send_to(&buf, addr)); - if n == 0 { - return Err(zero_write()) - } if n != buf.len() { return Err(incomplete_write("Failed to send entire message in datagram")) } } - UdpState::Empty => panic!("poll a SendAllTo after it's done"), + UdpState::Empty => panic!("poll a SendDGram after it's done"), } match mem::replace(&mut self.state, UdpState::Empty) { - UdpState::Writing { buf, .. } => Ok((buf).into()), + UdpState::Writing { sock, buf, .. } => Ok(Async::Ready((sock, (buf).into()))), UdpState::Empty => panic!(), } } |