From 79d65c286025c551a775c0964d168e6feb4b3409 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antoine=20B=C3=BCsch?= Date: Wed, 14 Nov 2018 20:36:14 +1100 Subject: Async api (#128) * Refactored Transport for better async use Still a bit rough, but it now builds a big future using combinators. It still does one `Runtime::block_on()` to keep the existing API, but this is a first up before making the whole API async. * Migrate most APIs to be Future-based I still need to finish a few of the more tricky ones that I've commented out for now, but most of it compiles and some examples work. In particular, `Docker::stats()` now properly returns an async stream of stats. * Fix events and containerinspect examples * Fix imageinspect, images, info and top examples * Fix containercreate, imagedelete and imagepull examples * Fix more examples * Add back debug statement in Transport::request * De-glob imports in examples * Remove unused imports in examples * Fix NetworkCreateOptions serialization * Add back error message extraction in Transport * Fix Container::create serialization of options * Add containerdelete example * Simplify result * Fix some error handling to remove unwrap() * Fix Image::export() * Fix imagebuild example * Add adapter from Stream of Chunks to AsyncRead Having an `AsyncRead` is required to be able to use the `FramedRead` and `Decoder` stuff from tokio_codec. This code is "borrowed" from https:/github.com/ferristseng/rust-ipfs-api though should probably be moved to its own crate or to tokio_codec. * Fix Container::logs() It now properly demuxes stdout/stderr, and returns a `Stream`. * Fix Container::export() * Use LineCodec for streaming JSON Although in my limited testing it seemed to work fine, there is no guarantee that 1 chunk == 1 piece of valid JSON. However, each JSON structure seems to be serialized on one line, so use LineCodec to turn the body into a stream of lines, then deserialize over this. * Fix serialization of ExecContainerOptions * Fix Container::exec() (kind of...) * Simplify deserialisation in Image::delete() * Small clean-ups * More clean ups * Fix rustdoc + remove extraneous "extern crate" * Fix doc example * Fix formatting --- src/read.rs | 103 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 src/read.rs (limited to 'src/read.rs') diff --git a/src/read.rs b/src/read.rs new file mode 100644 index 0000000..847ee21 --- /dev/null +++ b/src/read.rs @@ -0,0 +1,103 @@ +use errors::Error; +use futures::{Async, Stream}; +use hyper::Chunk; +use std::cmp; +use std::io::{self, Read}; +use tokio_io::AsyncRead; + +/* + * The following is taken from + * https://github.com/ferristseng/rust-ipfs-api/blob/master/ipfs-api/src/read.rs. + * TODO: see with upstream author to move to a separate crate. + */ + +/// The state of a stream returning Chunks. +/// +enum ReadState { + /// A chunk is ready to be read from. + /// + Ready(Chunk, usize), + + /// The next chunk isn't ready yet. + /// + NotReady, +} + +/// Reads from a stream of chunks asynchronously. +/// +pub struct StreamReader { + stream: S, + state: ReadState, +} + +impl StreamReader +where + S: Stream, +{ + #[inline] + pub fn new(stream: S) -> StreamReader { + StreamReader { + stream, + state: ReadState::NotReady, + } + } +} + +impl Read for StreamReader +where + S: Stream, +{ + fn read( + &mut self, + buf: &mut [u8], + ) -> io::Result { + loop { + let ret; + + match self.state { + // Stream yielded a Chunk to read. + // + ReadState::Ready(ref mut chunk, ref mut pos) => { + let chunk_start = *pos; + let len = cmp::min(buf.len(), chunk.len() - chunk_start); + let chunk_end = chunk_start + len; + + buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]); + *pos += len; + + if *pos == chunk.len() { + ret = len; + } else { + return Ok(len); + } + } + // Stream is not ready, and a Chunk needs to be read. + // + ReadState::NotReady => { + match self.stream.poll() { + // Polling stream yielded a Chunk that can be read from. + // + Ok(Async::Ready(Some(chunk))) => { + self.state = ReadState::Ready(chunk, 0); + + continue; + } + // Polling stream yielded EOF. + // + Ok(Async::Ready(None)) => return Ok(0), + // Stream could not be read from. + // + Ok(Async::NotReady) => return Err(io::ErrorKind::WouldBlock.into()), + Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e.to_string())), + } + } + } + + self.state = ReadState::NotReady; + + return Ok(ret); + } + } +} + +impl AsyncRead for StreamReader where S: Stream {} -- cgit v1.2.3