summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--examples/udp-codec.rs25
-rw-r--r--src/io/udp_frame.rs15
2 files changed, 20 insertions, 20 deletions
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs
index f899180a..afc70d75 100644
--- a/examples/udp-codec.rs
+++ b/examples/udp-codec.rs
@@ -25,20 +25,27 @@ use std::str;
/// so that it can respond back.
/// In the real world, one would probably
/// want an associative of remote peers to their state
+///
+/// Note that this takes a pretty draconian stance by returning
+/// an error if it can't find a newline in the datagram it received
pub struct LineCodec {
addr : Option<SocketAddr>
}
impl CodecUdp for LineCodec {
- type In = Vec<u8>;
+ type In = Vec<Vec<u8>>;
type Out = Vec<u8>;
- fn decode(&mut self, addr : &SocketAddr, buf: &[u8]) -> Result<Option<Self::In>, io::Error> {
+ fn decode(&mut self, addr : &SocketAddr, buf: &[u8]) -> Result<Self::In, io::Error> {
trace!("decoding {} - {}", str::from_utf8(buf).unwrap(), addr);
self.addr = Some(*addr);
- match buf.iter().position(|&b| b == b'\n') {
- Some(i) => Ok(Some(buf[.. i].into())),
- None => Ok(None),
+ let res : Vec<Vec<u8>> = buf.split(|c| *c == b'\n').map(|s| s.into()).collect();
+ if res.len() > 0 {
+ Ok(res)
+ }
+ else {
+ Err(io::Error::new(io::ErrorKind::Other,
+ "failed to find newline in datagram"))
}
}
@@ -83,8 +90,8 @@ fn main() {
//Note that we pass srvsink into fold, so that it can be
//supplied to every iteration. The reason for this is
//sink.send moves itself into `send` and then returns itself
- let srvloop = srvstream.fold(srvsink, move |sink, buf| {
- println!("{}", str::from_utf8(buf.as_slice()).unwrap());
+ let srvloop = srvstream.fold(srvsink, move |sink, lines| {
+ println!("{}", str::from_utf8(lines[0].as_slice()).unwrap());
sink.send(b"PONG".to_vec())
}).map(|_| ());
@@ -92,8 +99,8 @@ fn main() {
let (clistream, clisink) = client.framed(clicodec).split();
//And another infinite iteration
- let cliloop = clistream.fold(clisink, move |sink, buf| {
- println!("{}", str::from_utf8(buf.as_slice()).unwrap());
+ let cliloop = clistream.fold(clisink, move |sink, lines| {
+ println!("{}", str::from_utf8(lines[0].as_slice()).unwrap());
sink.send(b"PING".to_vec())
}).map(|_| ());
diff --git a/src/io/udp_frame.rs b/src/io/udp_frame.rs
index a9b89cb8..d09d0a8f 100644
--- a/src/io/udp_frame.rs
+++ b/src/io/udp_frame.rs
@@ -45,17 +45,11 @@ pub trait CodecUdp {
/// decode, as the next poll_read that occurs will write the next datagram
/// into the buffer, without regard for what is already there.
///
- /// If the bytes look valid, but a frame isn't fully available yet, then
- /// `Ok(None)` is returned. This indicates to the `Framed` instance that
- /// it needs to read some more bytes before calling this method again.
- /// In such a case, it is decode's responsibility to copy the data
- /// into their own internal buffer for future use.
- ///
/// Finally, if the bytes in the buffer are malformed then an error is
/// returned indicating why. This informs `Framed` that the stream is now
/// corrupt and should be terminated.
///
- fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> Result<Option<Self::In>, io::Error>;
+ fn decode(&mut self, src: &SocketAddr, buf: &[u8]) -> Result<Self::In, io::Error>;
}
/// A unified `Stream` and `Sink` interface to an underlying `Io` object, using
@@ -82,10 +76,9 @@ impl<C : CodecUdp> Stream for FramedUdp<C> {
Ok((n, addr)) => {
trace!("read {} bytes", n);
trace!("attempting to decode a frame");
- if let Some(frame) = try!(self.codec.decode(&addr, & self.rd[.. n])) {
- trace!("frame decoded from buffer");
- return Ok(Async::Ready(Some(frame)));
- }
+ let frame = try!(self.codec.decode(&addr, & self.rd[.. n]));
+ trace!("frame decoded from buffer");
+ return Ok(Async::Ready(Some(frame)));
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
return Ok(Async::NotReady)