diff options
Diffstat (limited to 'tokio-net/src/uds/frame.rs')
-rw-r--r-- | tokio-net/src/uds/frame.rs | 175 |
1 files changed, 0 insertions, 175 deletions
diff --git a/tokio-net/src/uds/frame.rs b/tokio-net/src/uds/frame.rs deleted file mode 100644 index 584da8ae..00000000 --- a/tokio-net/src/uds/frame.rs +++ /dev/null @@ -1,175 +0,0 @@ -use super::UnixDatagram; -use bytes::{BufMut, BytesMut}; -use futures::{try_ready, Async, AsyncSink, Poll, Sink, StartSend, Stream}; -use std::io; -use std::os::unix::net::SocketAddr; -use std::path::Path; -use tokio_codec::{Decoder, Encoder}; - -/// A unified `Stream` and `Sink` interface to an underlying `UnixDatagram`, using -/// the `Encoder` and `Decoder` traits to encode and decode frames. -/// -/// Unix datagram sockets work with datagrams, but higher-level code may wants to -/// batch these into meaningful chunks, called "frames". This method layers -/// framing on top of this socket by using the `Encoder` and `Decoder` traits to -/// handle encoding and decoding of messages frames. Note that the incoming and -/// outgoing frame types may be distinct. -/// -/// This function returns a *single* object that is both `Stream` and `Sink`; -/// grouping this into a single object is often useful for layering things which -/// require both read and write access to the underlying object. -/// -/// If you want to work more directly with the streams and sink, consider -/// calling `split` on the `UnixDatagramFramed` returned by this method, which will break -/// them into separate objects, allowing them to interact more easily. -#[must_use = "sinks do nothing unless polled"] -#[derive(Debug)] -pub struct UnixDatagramFramed<A, C> { - socket: UnixDatagram, - codec: C, - rd: BytesMut, - wr: BytesMut, - out_addr: Option<A>, - flushed: bool, -} - -impl<A, C: Decoder> Stream for UnixDatagramFramed<A, C> { - type Item = (C::Item, SocketAddr); - type Error = C::Error; - - fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { - self.rd.reserve(INITIAL_RD_CAPACITY); - - let (_n, addr) = unsafe { - let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut())); - self.rd.advance_mut(n); - (n, addr) - }; - - let span = trace_span!("decoding", from.addr = %addr, dgram.length = _n); - let _e = span.enter(); - trace!("trying to decode a frame..."); - - let frame_res = self.codec.decode(&mut self.rd); - self.rd.clear(); - let frame = frame_res?; - let result = frame.map(|frame| (frame, addr)); - trace!("frame decoded from buffer"); - Ok(Async::Ready(result)) - } -} - -impl<A: AsRef<Path>, C: Encoder> Sink for UnixDatagramFramed<A, C> { - type SinkItem = (C::Item, A); - type SinkError = C::Error; - - fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { - let span = trace_span!("sending", to.addr = %item.0, flushed = self.flushed); - let _e = span.enter(); - - trace!("sending frame..."); - - if !self.flushed { - match self.poll_complete()? { - Async::Ready(()) => {} - Async::NotReady => return Ok(AsyncSink::NotReady(item)), - } - } - - let (frame, out_addr) = item; - self.codec.encode(frame, &mut self.wr)?; - self.out_addr = Some(out_addr); - self.flushed = false; - trace!(message = "frame encoded", frame.length = pin.wr.len()); - - Ok(AsyncSink::Ready) - } - - fn poll_complete(&mut self) -> Poll<(), C::Error> { - if self.flushed { - return Ok(Async::Ready(())); - } - - let span = trace_span!("flushing", to.addr = %self.out_addr); - let _e = span.enter(); - - let n = { - let out_path = match self.out_addr { - Some(ref out_path) => out_path.as_ref(), - None => { - return Err(io::Error::new( - io::ErrorKind::Other, - "internal error: addr not available while data not flushed", - ) - .into()); - } - }; - - trace!(message = "flushing frame", frame.length = self.wr.len()); - try_ready!(self.socket.poll_send_to(&self.wr, out_path)) - }; - - let wrote_all = n == self.wr.len(); - self.wr.clear(); - self.flushed = true; - - trace!(written.length = n, written.complete = wrote_all); - - if wrote_all { - self.out_addr = None; - Ok(Async::Ready(())) - } else { - Err(io::Error::new( - io::ErrorKind::Other, - "failed to write entire datagram to socket", - ) - .into()) - } - } - - fn close(&mut self) -> Poll<(), C::Error> { - self.poll_complete() - } -} - -const INITIAL_RD_CAPACITY: usize = 64 * 1024; -const INITIAL_WR_CAPACITY: usize = 8 * 1024; - -impl<A, C> UnixDatagramFramed<A, C> { - /// Create a new `UnixDatagramFramed` backed by the given socket and codec. - /// - /// See struct level documentation for more details. - pub fn new(socket: UnixDatagram, codec: C) -> UnixDatagramFramed<A, C> { - UnixDatagramFramed { - socket: socket, - codec: codec, - out_addr: None, - rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY), - wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY), - flushed: true, - } - } - - /// Returns a reference to the underlying I/O stream wrapped by `Framed`. - /// - /// # Note - /// - /// Care should be taken to not tamper with the underlying stream of data - /// coming in as it may corrupt the stream of frames otherwise being worked - /// with. - pub fn get_ref(&self) -> &UnixDatagram { - &self.socket - } - - /// Returns a mutable reference to the underlying I/O stream wrapped by - /// `Framed`. - /// - /// # Note - /// - /// Care should be taken to not tamper with the underlying stream of data - /// coming in as it may corrupt the stream of frames otherwise being worked - /// with. - pub fn get_mut(&mut self) -> &mut UnixDatagram { - &mut self.socket - } -} |