summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFerris Tseng <ferristseng@fastmail.fm>2017-11-19 21:14:33 -0500
committerFerris Tseng <ferristseng@fastmail.fm>2017-11-19 21:14:33 -0500
commitd7955441038d61d7c66313fe7330f37a8b906518 (patch)
tree76b9dd3641d188a82d4e9d0fc513d5ec4c24e4b7
parent1bb1691f617d7a2114150a4de382868b689d6dd8 (diff)
finish implementing multipart api
-rw-r--r--ipfs-api/src/client.rs34
-rw-r--r--ipfs-api/src/lib.rs2
-rw-r--r--ipfs-api/src/multipart.rs (renamed from ipfs-api/src/form.rs)245
-rw-r--r--ipfs-api/src/response/add.rs2
4 files changed, 241 insertions, 42 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index 221a1d2..53c2663 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -6,15 +6,15 @@
// copied, modified, or distributed except according to those terms.
//
-use form::Form;
use futures::{stream, Stream};
use futures::future::{Future, IntoFuture};
use header::Trailer;
+use multipart;
use read::{JsonLineDecoder, LineDecoder, StreamReader};
use request::{self, ApiRequest};
use response::{self, Error, ErrorKind};
-use hyper::{self, Body, Chunk, Request, Response, Uri, Method, StatusCode};
-use hyper::client::{Client, HttpConnector};
+use hyper::{self, Chunk, Request, Response, Uri, Method, StatusCode};
+use hyper::client::{Client, Config, HttpConnector};
use serde::{Deserialize, Serialize};
use serde_json;
use std::io::Read;
@@ -36,7 +36,7 @@ type AsyncStreamResponse<T> = Box<Stream<Item = T, Error = Error>>;
///
pub struct IpfsClient {
base: Uri,
- client: Client<HttpConnector, Body>,
+ client: Client<HttpConnector, multipart::Body>,
}
impl IpfsClient {
@@ -52,7 +52,10 @@ impl IpfsClient {
Ok(IpfsClient {
base: base_path,
- client: Client::new(handle),
+ client: Config::default()
+ .body::<multipart::Body>()
+ .keep_alive(true)
+ .build(handle),
})
}
@@ -70,7 +73,7 @@ impl IpfsClient {
/// Builds the url for an api call.
///
- fn build_base_request<Req>(&self, req: &Req) -> Result<Request, Error>
+ fn build_base_request<Req>(&self, req: &Req) -> Result<Request<multipart::Body>, Error>
where
Req: ApiRequest + Serialize,
{
@@ -154,7 +157,7 @@ impl IpfsClient {
/// Methods prefixed with `send_` work on a raw reqwest `RequestBuilder`
/// instance.
///
- fn send_request(&self, req: Request) -> AsyncResponse<(StatusCode, Chunk)> {
+ fn send_request(&self, req: Request<multipart::Body>) -> AsyncResponse<(StatusCode, Chunk)> {
let res = self.client
.request(req)
.and_then(|res| {
@@ -172,7 +175,7 @@ impl IpfsClient {
/// Methods prefixed with `send_` work on a raw reqwest `RequestBuilder`
/// instance.
///
- fn send_request_json<Res>(&self, req: Request) -> AsyncResponse<Res>
+ fn send_request_json<Res>(&self, req: Request<multipart::Body>) -> AsyncResponse<Res>
where
for<'de> Res: 'static + Deserialize<'de>,
{
@@ -218,15 +221,16 @@ impl IpfsClient {
/// Generic method for making a request to the Ipfs server, and getting
/// a deserializable response.
///
- fn request_with_body<Req, Res, R>(&self, data: R, req: &Req) -> AsyncResponse<Res>
+ fn request_with_body<Req, Res>(&self, form: multipart::Form, req: &Req) -> AsyncResponse<Res>
where
Req: ApiRequest + Serialize,
for<'de> Res: 'static + Deserialize<'de>,
- R: 'static + Read + Send,
{
- let form = Form::default();
let res = self.build_base_request(req)
- .map(|req| self.send_request_json(req))
+ .map(move |mut req| {
+ form.set_body(&mut req);
+ self.send_request_json(req)
+ })
.into_future()
.flatten();
@@ -313,7 +317,11 @@ impl IpfsClient {
where
R: 'static + Read + Send,
{
- self.request_with_body(data, &request::Add)
+ let mut form = multipart::Form::default();
+
+ form.add_reader("path", data);
+
+ self.request_with_body(form, &request::Add)
}
/// Returns the current ledger for a peer.
diff --git a/ipfs-api/src/lib.rs b/ipfs-api/src/lib.rs
index 7c0c9f1..f693965 100644
--- a/ipfs-api/src/lib.rs
+++ b/ipfs-api/src/lib.rs
@@ -26,6 +26,6 @@ pub use request::{KeyType, Logger, LoggingLevel};
mod request;
pub mod response;
mod client;
-mod form;
mod header;
+mod multipart;
mod read;
diff --git a/ipfs-api/src/form.rs b/ipfs-api/src/multipart.rs
index 6bf0442..1ffac62 100644
--- a/ipfs-api/src/form.rs
+++ b/ipfs-api/src/multipart.rs
@@ -6,36 +6,161 @@
// copied, modified, or distributed except according to those terms.
//
-use hyper::header::{ContentDisposition, ContentType, DispositionParam, DispositionType};
+use bytes::{BufMut, BytesMut};
+use futures::{Poll, Async};
+use futures::stream::Stream;
+use hyper::{self, Request};
+use hyper::header::{ContentDisposition, ContentType, DispositionParam, DispositionType, Header};
use hyper::mime::{self, Mime};
use rand::{self, Rng};
-use std::borrow::{Borrow, Cow};
+use std::borrow::Borrow;
+use std::fmt::Display;
use std::fs::File;
-use std::io::{self, Read};
-use std::iter::FromIterator;
+use std::io::{self, Cursor, Read, Write};
+use std::iter::{FromIterator, Peekable};
use std::path::Path;
use std::str::FromStr;
+use std::vec::IntoIter;
-/// Random boundary string provider.
+/// Converts a hyper Header into a String.
///
-pub trait BoundaryGenerator {
- /// Generates a String to use as a boundary.
+fn header_to_string<H>(header: &H) -> String
+where
+ H: Header + Display,
+{
+ format!("{}: {}", H::header_name(), header)
+}
+
+
+/// Writes a CLRF.
+///
+fn write_crlf<W>(write: &mut W) -> io::Result<()>
+where
+ W: Write,
+{
+ write.write_all(&[b'\r', b'\n'])
+}
+
+
+/// Multipart body that is compatible with Hyper.
+///
+pub struct Body {
+ /// The amount of data to write with each chunk.
///
- fn generate_boundary() -> String;
+ buf_size: usize,
+
+ /// The active reader.
+ ///
+ current: Option<Box<'static + Read + Send>>,
+
+ /// The parts as an iterator. When the iterator stops
+ /// yielding, the body is fully written.
+ ///
+ parts: Peekable<IntoIter<Part>>,
+
+ /// The multipart boundary.
+ ///
+ boundary: String,
}
+impl Body {
+ /// Implements section 4.1.
+ ///
+ /// [See](https://tools.ietf.org/html/rfc7578#section-4.1).
+ ///
+ fn write_boundary<W>(&self, write: &mut W) -> io::Result<()>
+ where
+ W: Write,
+ {
+ write_crlf(write)?;
+ write.write_all(&[b'-', b'-'])?;
+ write.write_all(self.boundary.as_bytes())
+ }
-struct RandomAsciiGenerator;
+ fn write_final_boundary<W>(&self, write: &mut W) -> io::Result<()>
+ where
+ W: Write,
+ {
+ self.write_boundary(write)?;
+ write.write_all(&[b'-', b'-'])
+ }
-impl BoundaryGenerator for RandomAsciiGenerator {
- /// Creates a boundary of 6 ascii characters.
+ /// Writes the Content-Disposition, and Content-Type headers.
///
- fn generate_boundary() -> String {
- let mut rng = rand::weak_rng();
- let ascii = rng.gen_ascii_chars();
+ fn write_headers<W>(&self, write: &mut W, part: &Part) -> io::Result<()>
+ where
+ W: Write,
+ {
+ write_crlf(write)?;
+ write.write_all(
+ header_to_string(&part.content_type).as_bytes(),
+ )?;
+ write_crlf(write)?;
+ write.write_all(
+ header_to_string(&part.content_disposition).as_bytes(),
+ )?;
+ write_crlf(write)?;
+ write_crlf(write)
+ }
+}
- String::from_iter(ascii.take(6))
+impl Stream for Body {
+ type Item = BytesMut;
+
+ type Error = hyper::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ let bytes = BytesMut::with_capacity(self.buf_size);
+ let mut writer = bytes.writer();
+
+ if self.current.is_none() {
+ if let Some(part) = self.parts.next() {
+ self.write_boundary(&mut writer)?;
+ self.write_headers(&mut writer, &part)?;
+
+ let read = match part.inner {
+ Inner::Read(read, _) => read,
+ Inner::Text(s) => Box::new(Cursor::new(s.into_bytes())),
+ };
+
+ self.current = Some(read);
+ } else {
+ // No current part, and no parts left means there is nothing
+ // left to write.
+ //
+ return Ok(Async::Ready(None));
+ }
+ }
+
+ let num = if let Some(ref mut read) = self.current {
+ // TODO: This should not write more bytes that are remaining in
+ // the buffer.
+ //
+ io::copy(read, &mut writer)?
+ } else {
+ 0
+ };
+
+ if num == 0 {
+ // Wrote 0 bytes from the reader, so we reached the EOF for the
+ // current item.
+ //
+ self.current = None;
+
+ // Peek to check if there are are any parts not yet written.
+ // If there is nothing, the final boundary can be written.
+ //
+ if self.parts.peek().is_none() {
+ self.write_final_boundary(&mut writer)?;
+
+ Ok(Async::Ready(Some(writer.into_inner())))
+ } else {
+ self.poll()
+ }
+ } else {
+ Ok(Async::Ready(Some(writer.into_inner())))
+ }
}
}
@@ -78,12 +203,38 @@ impl Form {
}
}
- /// Implements section 4.1.
+ /// Updates a request instance with the multipart Content-Type header
+ /// and the payload data.
///
- /// [See](https://tools.ietf.org/html/rfc7578#section-4.1).
+ pub fn set_body(self, req: &mut Request<Body>) {
+ let header = format!("multipart/form-data; boundary=\"{}\"", &self.boundary);
+
+ {
+ let headers = req.headers_mut();
+
+ headers.set(ContentType(Mime::from_str(&header).expect(
+ "multipart mime type should parse",
+ )));
+ }
+
+ req.set_body(self);
+ }
+
+ /// Adds a struct that implements Read.
///
- fn write_boundary(&self) -> io::Result<()> {
- Ok(())
+ pub fn add_reader<F, R>(&mut self, name: F, read: R)
+ where
+ F: Into<String>,
+ R: 'static + Read + Send,
+ {
+ let read = Box::new(read);
+
+ self.parts.push(Part::new::<_, String>(
+ Inner::Read(read, None),
+ name,
+ None,
+ None,
+ ));
}
/// Adds a file, and attempts to derive the mime type.
@@ -124,7 +275,7 @@ impl Form {
// If there is some metadata on the file, try to derive some
// header values.
//
- Ok(ref meta) => Ok(meta.len()),
+ Ok(ref meta) => Ok(Some(meta.len())),
// The file metadata could not be accessed. This MIGHT not be an
// error, if the file could be opened.
@@ -145,8 +296,22 @@ impl Form {
}
}
+impl Into<Body> for Form {
+ #[inline]
+ fn into(self) -> Body {
+ Body {
+ buf_size: 2048,
+ current: None,
+ parts: self.parts.into_iter().peekable(),
+ boundary: self.boundary,
+ }
+ }
+}
+
pub struct Part {
+ inner: Inner,
+
/// Each part can include a Content-Type header field. If this
/// is not specified, it defaults to "text/plain", or
/// "application/octet-stream" for file data.
@@ -191,8 +356,11 @@ impl Part {
disposition_params.push(DispositionParam::Ext("filename".into(), filename.into()));
}
+ let content_type = ContentType(mime.unwrap_or_else(|| inner.default_content_type()));
+
Part {
- content_type: ContentType(mime.unwrap_or_else(|| inner.default_content_type())),
+ inner: inner,
+ content_type: content_type,
content_disposition: ContentDisposition {
disposition: DispositionType::Ext("form-data".into()),
parameters: disposition_params,
@@ -214,11 +382,11 @@ enum Inner {
/// and assigned the corresponding content type if not explicitly
/// specified.
///
- Read(Box<Read>, u64),
+ Read(Box<'static + Read + Send>, Option<u64>),
/// The `String` variant handles "text/plain" form data payloads.
///
- String(Cow<'static, str>),
+ Text(String),
}
impl Inner {
@@ -229,18 +397,41 @@ impl Inner {
#[inline]
fn default_content_type(&self) -> Mime {
match self {
- &Inner::Read(_, _) => mime::TEXT_PLAIN,
- &Inner::String(_) => mime::APPLICATION_OCTET_STREAM,
+ &Inner::Read(_, _) => mime::APPLICATION_OCTET_STREAM,
+ &Inner::Text(_) => mime::TEXT_PLAIN,
}
}
/// Returns the length of the inner type.
///
#[inline]
- fn len(&self) -> u64 {
+ fn len(&self) -> Option<u64> {
match self {
&Inner::Read(_, len) => len,
- &Inner::String(ref s) => s.len() as u64,
+ &Inner::Text(ref s) => Some(s.len() as u64),
}
}
}
+
+
+/// Random boundary string provider.
+///
+pub trait BoundaryGenerator {
+ /// Generates a String to use as a boundary.
+ ///
+ fn generate_boundary() -> String;
+}
+
+
+struct RandomAsciiGenerator;
+
+impl BoundaryGenerator for RandomAsciiGenerator {
+ /// Creates a boundary of 6 ascii characters.
+ ///
+ fn generate_boundary() -> String {
+ let mut rng = rand::weak_rng();
+ let ascii = rng.gen_ascii_chars();
+
+ String::from_iter(ascii.take(6))
+ }
+}
diff --git a/ipfs-api/src/response/add.rs b/ipfs-api/src/response/add.rs
index bb29a89..24c82ac 100644
--- a/ipfs-api/src/response/add.rs
+++ b/ipfs-api/src/response/add.rs
@@ -11,5 +11,5 @@
pub struct AddResponse {
pub name: String,
pub hash: String,
- pub bytes: i64,
+ pub size: String,
}