diff options
-rw-r--r-- | examples/udp-codec.rs | 25 | ||||
-rw-r--r-- | src/io/udp_frame.rs | 15 |
2 files changed, 20 insertions, 20 deletions
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index f899180a..afc70d75 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -25,20 +25,27 @@ use std::str; /// so that it can respond back. /// In the real world, one would probably /// want an associative of remote peers to their state +/// +/// Note that this takes a pretty draconian stance by returning +/// an error if it can't find a newline in the datagram it received pub struct LineCodec { addr : Option<SocketAddr> } impl CodecUdp for LineCodec { - type In = Vec<u8>; + type In = Vec<Vec<u8>>; type Out = Vec<u8>; - fn decode(&mut self, addr : &SocketAddr, buf: &[u8]) -> Result<Option<Self::In>, io::Error> { + fn decode(&mut self, addr : &SocketAddr, buf: &[u8]) -> Result<Self::In, io::Error> { trace!("decoding {} - {}", str::from_utf8(buf).unwrap(), addr); self.addr = Some(*addr); - match buf.iter().position(|&b| b == b'\n') { - Some(i) => Ok(Some(buf[.. i].into())), - None => Ok(None), + let res : Vec<Vec<u8>> = buf.split(|c| *c == b'\n').map(|s| s.into()).collect(); + if res.len() > 0 { + Ok(res) + } + else { + Err(io::Error::new(io::ErrorKind::Other, + "failed to find newline in datagram")) } } @@ -83,8 +90,8 @@ fn main() { //Note that we pass srvsink into fold, so that it can be //supplied to every iteration. The reason for this is //sink.send moves itself into `send` and then returns itself - let srvloop = srvstream.fold(srvsink, move |sink, buf| { - println!("{}", str::from_utf8(buf.as_slice()).unwrap()); + let srvloop = srvstream.fold(srvsink, move |sink, lines| { + println!("{}", str::from_utf8(lines[0].as_slice()).unwrap()); sink.send(b"PONG".to_vec()) }).map(|_| ()); @@ -92,8 +99,8 @@ fn main() { let (clistream, clisink) = client.framed(clicodec).split(); //And another infinite iteration - let cliloop = clistream.fold(clisink, move |sink, buf| { - println!("{}", str::from_utf8(buf.as_slice()).unwrap()); + let cliloop = clistream.fold(clisink, move |sink, lines| { + println!("{}", str::from_utf8(lines[0].as_slice()).unwrap()); sink.send(b"PING".to_vec()) }).map(|_| ()); diff --git a/src/io/udp_frame.rs b/src/io/udp_frame.rs index a9b89cb8..d09d0a8f 100644 --- a/src/io/udp_frame.rs +++ b/src/io/udp_frame.rs @@ -45,17 +45,11 @@ pub trait CodecUdp { /// decode, as the next poll_read that occurs will write the next datagram /// into the buffer, without regard for what is already there. /// - /// If the bytes look valid, but a frame isn't fully available yet, then - /// `Ok(None)` is returned. This indicates to the `Framed` instance that - /// it needs to read some more bytes before calling this method again. - /// In such a case, it is decode's responsibility to copy the data - /// into their own internal buffer for future use. - /// /// Finally, if the bytes in the buffer are malformed then an error is /// returned indicating why. This informs `Framed` that the stream is now /// corrupt and should be terminated. /// - fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> Result<Option<Self::In>, io::Error>; + fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> Result<Self::In, io::Error>; } /// A unified `Stream` and `Sink` interface to an underlying `Io` object, using @@ -82,10 +76,9 @@ impl<C : CodecUdp> Stream for FramedUdp<C> { Ok((n, addr)) => { trace!("read {} bytes", n); trace!("attempting to decode a frame"); - if let Some(frame) = try!(self.codec.decode(&addr, & self.rd[.. n])) { - trace!("frame decoded from buffer"); - return Ok(Async::Ready(Some(frame))); - } + let frame = try!(self.codec.decode(&addr, & self.rd[.. n])); + trace!("frame decoded from buffer"); + return Ok(Async::Ready(Some(frame))); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { return Ok(Async::NotReady) |