diff options
author | Ferris Tseng <ferristseng@fastmail.fm> | 2017-10-25 21:58:05 -0400 |
---|---|---|
committer | Ferris Tseng <ferristseng@fastmail.fm> | 2017-10-25 21:58:05 -0400 |
commit | 4ea3bc9d4a3d62574800bf4aa1e7c28884ef9385 (patch) | |
tree | f276d183f60579ca8f10c3e023fea4494d3dd9d4 /ipfs-api/src/client.rs | |
parent | 0979c2bfdaa98039a8aced651941b50fa1f50707 (diff) |
move stream reader to module
Diffstat (limited to 'ipfs-api/src/client.rs')
-rw-r--r-- | ipfs-api/src/client.rs | 151 |
1 files changed, 6 insertions, 145 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index c17c548..436accd 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -1,155 +1,15 @@ -use bytes::BytesMut; -use futures::{Async, Stream}; +use futures::Stream; use futures::future::{Future, IntoFuture}; +use read::{JsonLineDecoder, StreamReader}; use request::{self, ApiRequest}; use reqwest::{self, multipart, Method, StatusCode, Url}; use reqwest::unstable::async::{self, Client, ClientBuilder}; use response::{self, ErrorKind, Error}; use serde::{Deserialize, Serialize}; use serde_json; -use std::cmp; -use std::io::{self, Read}; -use std::marker::PhantomData; +use std::io::Read; use tokio_core::reactor::Handle; -use tokio_io::AsyncRead; -use tokio_io::codec::{Decoder, FramedRead}; - - -/// A decoder for a response where each line is a full json object. -/// -struct JsonLineDecoder<T> { - ty: PhantomData<T>, -} - -impl<T> JsonLineDecoder<T> { - #[inline] - fn new() -> JsonLineDecoder<T> { - JsonLineDecoder { ty: PhantomData } - } -} - -impl<T> Decoder for JsonLineDecoder<T> -where - for<'de> T: Deserialize<'de>, -{ - type Item = T; - - type Error = Error; - - /// Tries to find a new line character. If it does, it will split the buffer, - /// and parse the first slice. - /// - fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { - let nl_index = src.iter().position(|b| *b == b'\n'); - - if let Some(pos) = nl_index { - let slice = src.split_to(pos + 1); - - serde_json::from_slice(&slice[..slice.len() - 1]).map_err(From::from) - } else { - Ok(None) - } - } -} - - -/// The state of a stream returning Chunks. -/// -enum ReadState { - /// A chunk is ready to be read from. - /// - Ready(async::Chunk, usize), - - /// The next chunk isn't ready yet. - /// - NotReady, -} - - -/// Reads from a stream of chunks asynchronously. -/// -/// This is adapted from reqwest: -/// -/// https://github.com/seanmonstar/reqwest/blob/99e7f7a1b05c952eef46372de70dfae2a732c374/src/async_impl/decoder.rs#L333 -/// -struct StreamReader<S> { - stream: S, - state: ReadState, -} - -impl<S> StreamReader<S> -where - S: Stream<Item = async::Chunk, Error = Error>, -{ - #[inline] - fn new(stream: S) -> StreamReader<S> { - StreamReader { - stream: stream, - state: ReadState::NotReady, - } - } -} - -impl<S> Read for StreamReader<S> -where - S: Stream<Item = async::Chunk, Error = Error>, -{ - fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - loop { - let ret; - - match self.state { - // Stream yielded a Chunk to read. - // - ReadState::Ready(ref mut chunk, ref mut pos) => { - let chunk_start = *pos; - let len = cmp::min(buf.len(), chunk.len() - chunk_start); - let chunk_end = chunk_start + len; - - buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]); - - *pos += len; - - if *pos == chunk.len() { - ret = len; - } else { - return Ok(len); - } - } - // Stream is not ready, and a Chunk needs to be read. - // - ReadState::NotReady => { - match self.stream.poll() { - // Polling stream yielded a Chunk that can be read from. - // - Ok(Async::Ready(Some(chunk))) => { - self.state = ReadState::Ready(chunk, 0); - - continue; - } - // Polling stream yielded EOF. - // - Ok(Async::Ready(None)) => return Ok(0), - // Stream could not be read from. - // - Ok(Async::NotReady) => return Err(io::ErrorKind::WouldBlock.into()), - Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e.description())), - } - } - } - - self.state = ReadState::NotReady; - - return Ok(ret); - } - } -} - -impl<S> AsyncRead for StreamReader<S> -where - S: Stream<Item = async::Chunk, Error = Error>, -{ -} +use tokio_io::codec::FramedRead; /// A future response returned by the reqwest HTTP client. @@ -238,6 +98,7 @@ impl IpfsClient { where for<'de> Res: 'static + Deserialize<'de>, { + println!("received '{}'", String::from_utf8_lossy(&chunk)); match status { StatusCode::Ok => serde_json::from_slice(&chunk).map_err(From::from), _ => Err(Self::build_error_from_body(chunk)), @@ -663,7 +524,7 @@ impl IpfsClient { topic: &str, payload: &str, ) -> AsyncResponse<response::PubsubPubResponse> { - self.request(&request::PubsubPub { topic, payload }) + self.request_empty(&request::PubsubPub { topic, payload }) } /// Subscribes to a pubsub topic. |