summaryrefslogtreecommitdiffstats
path: root/src/read.rs
diff options
context:
space:
mode:
authorAntoine Büsch <antoine.busch@gmail.com>2018-11-14 20:36:14 +1100
committerdoug tangren <d.tangren@gmail.com>2018-11-14 18:36:14 +0900
commit79d65c286025c551a775c0964d168e6feb4b3409 (patch)
tree34b49a0f97f6f851f47711be1cad0c002b8b78f7 /src/read.rs
parent29bd95b42cd2b3c364f0be1f3e07e4b654e0ccf3 (diff)
Async api (#128)
* Refactored Transport for better async use Still a bit rough, but it now builds a big future using combinators. It still does one `Runtime::block_on()` to keep the existing API, but this is a first up before making the whole API async. * Migrate most APIs to be Future-based I still need to finish a few of the more tricky ones that I've commented out for now, but most of it compiles and some examples work. In particular, `Docker::stats()` now properly returns an async stream of stats. * Fix events and containerinspect examples * Fix imageinspect, images, info and top examples * Fix containercreate, imagedelete and imagepull examples * Fix more examples * Add back debug statement in Transport::request * De-glob imports in examples * Remove unused imports in examples * Fix NetworkCreateOptions serialization * Add back error message extraction in Transport * Fix Container::create serialization of options * Add containerdelete example * Simplify result * Fix some error handling to remove unwrap() * Fix Image::export() * Fix imagebuild example * Add adapter from Stream of Chunks to AsyncRead Having an `AsyncRead` is required to be able to use the `FramedRead` and `Decoder` stuff from tokio_codec. This code is "borrowed" from https:/github.com/ferristseng/rust-ipfs-api though should probably be moved to its own crate or to tokio_codec. * Fix Container::logs() It now properly demuxes stdout/stderr, and returns a `Stream<Item = TtyLine>`. * Fix Container::export() * Use LineCodec for streaming JSON Although in my limited testing it seemed to work fine, there is no guarantee that 1 chunk == 1 piece of valid JSON. However, each JSON structure seems to be serialized on one line, so use LineCodec to turn the body into a stream of lines, then deserialize over this. * Fix serialization of ExecContainerOptions * Fix Container::exec() (kind of...) * Simplify deserialisation in Image::delete() * Small clean-ups * More clean ups * Fix rustdoc + remove extraneous "extern crate" * Fix doc example * Fix formatting
Diffstat (limited to 'src/read.rs')
-rw-r--r--src/read.rs103
1 files changed, 103 insertions, 0 deletions
diff --git a/src/read.rs b/src/read.rs
new file mode 100644
index 0000000..847ee21
--- /dev/null
+++ b/src/read.rs
@@ -0,0 +1,103 @@
+use errors::Error;
+use futures::{Async, Stream};
+use hyper::Chunk;
+use std::cmp;
+use std::io::{self, Read};
+use tokio_io::AsyncRead;
+
+/*
+ * The following is taken from
+ * https://github.com/ferristseng/rust-ipfs-api/blob/master/ipfs-api/src/read.rs.
+ * TODO: see with upstream author to move to a separate crate.
+ */
+
+/// The state of a stream returning Chunks.
+///
+enum ReadState {
+ /// A chunk is ready to be read from.
+ ///
+ Ready(Chunk, usize),
+
+ /// The next chunk isn't ready yet.
+ ///
+ NotReady,
+}
+
+/// Reads from a stream of chunks asynchronously.
+///
+pub struct StreamReader<S> {
+ stream: S,
+ state: ReadState,
+}
+
+impl<S> StreamReader<S>
+where
+ S: Stream<Item = Chunk, Error = Error>,
+{
+ #[inline]
+ pub fn new(stream: S) -> StreamReader<S> {
+ StreamReader {
+ stream,
+ state: ReadState::NotReady,
+ }
+ }
+}
+
+impl<S> Read for StreamReader<S>
+where
+ S: Stream<Item = Chunk, Error = Error>,
+{
+ fn read(
+ &mut self,
+ buf: &mut [u8],
+ ) -> io::Result<usize> {
+ loop {
+ let ret;
+
+ match self.state {
+ // Stream yielded a Chunk to read.
+ //
+ ReadState::Ready(ref mut chunk, ref mut pos) => {
+ let chunk_start = *pos;
+ let len = cmp::min(buf.len(), chunk.len() - chunk_start);
+ let chunk_end = chunk_start + len;
+
+ buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]);
+ *pos += len;
+
+ if *pos == chunk.len() {
+ ret = len;
+ } else {
+ return Ok(len);
+ }
+ }
+ // Stream is not ready, and a Chunk needs to be read.
+ //
+ ReadState::NotReady => {
+ match self.stream.poll() {
+ // Polling stream yielded a Chunk that can be read from.
+ //
+ Ok(Async::Ready(Some(chunk))) => {
+ self.state = ReadState::Ready(chunk, 0);
+
+ continue;
+ }
+ // Polling stream yielded EOF.
+ //
+ Ok(Async::Ready(None)) => return Ok(0),
+ // Stream could not be read from.
+ //
+ Ok(Async::NotReady) => return Err(io::ErrorKind::WouldBlock.into()),
+ Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
+ }
+ }
+ }
+
+ self.state = ReadState::NotReady;
+
+ return Ok(ret);
+ }
+ }
+}
+
+impl<S> AsyncRead for StreamReader<S> where S: Stream<Item = Chunk, Error = Error> {}