summaryrefslogtreecommitdiffstats
path: root/ipfs-api/src/client.rs
diff options
context:
space:
mode:
authorFerris Tseng <ferristseng@fastmail.fm>2017-10-25 21:58:05 -0400
committerFerris Tseng <ferristseng@fastmail.fm>2017-10-25 21:58:05 -0400
commit4ea3bc9d4a3d62574800bf4aa1e7c28884ef9385 (patch)
treef276d183f60579ca8f10c3e023fea4494d3dd9d4 /ipfs-api/src/client.rs
parent0979c2bfdaa98039a8aced651941b50fa1f50707 (diff)
move stream reader to module
Diffstat (limited to 'ipfs-api/src/client.rs')
-rw-r--r--ipfs-api/src/client.rs151
1 files changed, 6 insertions, 145 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index c17c548..436accd 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -1,155 +1,15 @@
-use bytes::BytesMut;
-use futures::{Async, Stream};
+use futures::Stream;
use futures::future::{Future, IntoFuture};
+use read::{JsonLineDecoder, StreamReader};
use request::{self, ApiRequest};
use reqwest::{self, multipart, Method, StatusCode, Url};
use reqwest::unstable::async::{self, Client, ClientBuilder};
use response::{self, ErrorKind, Error};
use serde::{Deserialize, Serialize};
use serde_json;
-use std::cmp;
-use std::io::{self, Read};
-use std::marker::PhantomData;
+use std::io::Read;
use tokio_core::reactor::Handle;
-use tokio_io::AsyncRead;
-use tokio_io::codec::{Decoder, FramedRead};
-
-
-/// A decoder for a response where each line is a full json object.
-///
-struct JsonLineDecoder<T> {
- ty: PhantomData<T>,
-}
-
-impl<T> JsonLineDecoder<T> {
- #[inline]
- fn new() -> JsonLineDecoder<T> {
- JsonLineDecoder { ty: PhantomData }
- }
-}
-
-impl<T> Decoder for JsonLineDecoder<T>
-where
- for<'de> T: Deserialize<'de>,
-{
- type Item = T;
-
- type Error = Error;
-
- /// Tries to find a new line character. If it does, it will split the buffer,
- /// and parse the first slice.
- ///
- fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
- let nl_index = src.iter().position(|b| *b == b'\n');
-
- if let Some(pos) = nl_index {
- let slice = src.split_to(pos + 1);
-
- serde_json::from_slice(&slice[..slice.len() - 1]).map_err(From::from)
- } else {
- Ok(None)
- }
- }
-}
-
-
-/// The state of a stream returning Chunks.
-///
-enum ReadState {
- /// A chunk is ready to be read from.
- ///
- Ready(async::Chunk, usize),
-
- /// The next chunk isn't ready yet.
- ///
- NotReady,
-}
-
-
-/// Reads from a stream of chunks asynchronously.
-///
-/// This is adapted from reqwest:
-///
-/// https://github.com/seanmonstar/reqwest/blob/99e7f7a1b05c952eef46372de70dfae2a732c374/src/async_impl/decoder.rs#L333
-///
-struct StreamReader<S> {
- stream: S,
- state: ReadState,
-}
-
-impl<S> StreamReader<S>
-where
- S: Stream<Item = async::Chunk, Error = Error>,
-{
- #[inline]
- fn new(stream: S) -> StreamReader<S> {
- StreamReader {
- stream: stream,
- state: ReadState::NotReady,
- }
- }
-}
-
-impl<S> Read for StreamReader<S>
-where
- S: Stream<Item = async::Chunk, Error = Error>,
-{
- fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- loop {
- let ret;
-
- match self.state {
- // Stream yielded a Chunk to read.
- //
- ReadState::Ready(ref mut chunk, ref mut pos) => {
- let chunk_start = *pos;
- let len = cmp::min(buf.len(), chunk.len() - chunk_start);
- let chunk_end = chunk_start + len;
-
- buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]);
-
- *pos += len;
-
- if *pos == chunk.len() {
- ret = len;
- } else {
- return Ok(len);
- }
- }
- // Stream is not ready, and a Chunk needs to be read.
- //
- ReadState::NotReady => {
- match self.stream.poll() {
- // Polling stream yielded a Chunk that can be read from.
- //
- Ok(Async::Ready(Some(chunk))) => {
- self.state = ReadState::Ready(chunk, 0);
-
- continue;
- }
- // Polling stream yielded EOF.
- //
- Ok(Async::Ready(None)) => return Ok(0),
- // Stream could not be read from.
- //
- Ok(Async::NotReady) => return Err(io::ErrorKind::WouldBlock.into()),
- Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e.description())),
- }
- }
- }
-
- self.state = ReadState::NotReady;
-
- return Ok(ret);
- }
- }
-}
-
-impl<S> AsyncRead for StreamReader<S>
-where
- S: Stream<Item = async::Chunk, Error = Error>,
-{
-}
+use tokio_io::codec::FramedRead;
/// A future response returned by the reqwest HTTP client.
@@ -238,6 +98,7 @@ impl IpfsClient {
where
for<'de> Res: 'static + Deserialize<'de>,
{
+ println!("received '{}'", String::from_utf8_lossy(&chunk));
match status {
StatusCode::Ok => serde_json::from_slice(&chunk).map_err(From::from),
_ => Err(Self::build_error_from_body(chunk)),
@@ -663,7 +524,7 @@ impl IpfsClient {
topic: &str,
payload: &str,
) -> AsyncResponse<response::PubsubPubResponse> {
- self.request(&request::PubsubPub { topic, payload })
+ self.request_empty(&request::PubsubPub { topic, payload })
}
/// Subscribes to a pubsub topic.