summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFerris Tseng <ferristseng@gmail.com>2017-11-19 21:19:53 -0500
committerGitHub <noreply@github.com>2017-11-19 21:19:53 -0500
commite231cbbf2c131e8ac912c20864dce4ec259d1867 (patch)
tree76b9dd3641d188a82d4e9d0fc513d5ec4c24e4b7
parentc28e62d54e1794e55688cb6ad81ea48b62e202d1 (diff)
parentd7955441038d61d7c66313fe7330f37a8b906518 (diff)
Merge pull request #2 from ferristseng/multipart
Implement multipart form data
-rw-r--r--ipfs-api/Cargo.toml1
-rw-r--r--ipfs-api/examples/server.rs58
-rw-r--r--ipfs-api/src/client.rs32
-rw-r--r--ipfs-api/src/lib.rs2
-rw-r--r--ipfs-api/src/multipart.rs437
-rw-r--r--ipfs-api/src/response/add.rs2
6 files changed, 520 insertions, 12 deletions
diff --git a/ipfs-api/Cargo.toml b/ipfs-api/Cargo.toml
index 2bb7a7e..9c85c7a 100644
--- a/ipfs-api/Cargo.toml
+++ b/ipfs-api/Cargo.toml
@@ -9,6 +9,7 @@ bytes = "0.4"
error-chain = "0.11"
futures = "0.1"
hyper = "0.11"
+rand = "0.3"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
diff --git a/ipfs-api/examples/server.rs b/ipfs-api/examples/server.rs
new file mode 100644
index 0000000..bc02c90
--- /dev/null
+++ b/ipfs-api/examples/server.rs
@@ -0,0 +1,58 @@
+// Copyright 2017 rust-ipfs-api Developers
+//
+// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
+// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
+// http://opensource.org/licenses/MIT>, at your option. This file may not be
+// copied, modified, or distributed except according to those terms.
+//
+
+extern crate futures;
+extern crate hyper;
+
+use futures::future::Future;
+use futures::stream::Stream;
+use hyper::StatusCode;
+use hyper::server::{Http, Service, Request, Response};
+
+struct Debug;
+
+impl Service for Debug {
+ type Request = Request;
+
+ type Response = Response;
+
+ type Error = hyper::Error;
+
+ type Future = Box<Future<Item = Response, Error = hyper::Error>>;
+
+
+ fn call(&self, req: Request) -> Self::Future {
+ println!("{:?}", req);
+
+ let res = req.body().concat2().map(|bod| {
+ println!("{}", String::from_utf8_lossy(&bod));
+
+ Response::new().with_status(StatusCode::Ok)
+ });
+
+ Box::new(res)
+ }
+}
+
+
+/// This example runs a server on the default Ipfs port. All it does is
+/// print requests as it gets them. It is useful for debugging.
+///
+fn main() {
+ let addr = "127.0.0.1:5001".parse().unwrap();
+ let mut server = Http::new().bind(&addr, || Ok(Debug)).unwrap();
+
+ server.no_proto();
+
+ println!(
+ "Listening on http://{} with 1 thread.",
+ server.local_addr().unwrap()
+ );
+
+ server.run().unwrap();
+}
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index 9640a54..53c2663 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -9,11 +9,12 @@
use futures::{stream, Stream};
use futures::future::{Future, IntoFuture};
use header::Trailer;
+use multipart;
use read::{JsonLineDecoder, LineDecoder, StreamReader};
use request::{self, ApiRequest};
use response::{self, Error, ErrorKind};
-use hyper::{self, Body, Chunk, Request, Response, Uri, Method, StatusCode};
-use hyper::client::{Client, HttpConnector};
+use hyper::{self, Chunk, Request, Response, Uri, Method, StatusCode};
+use hyper::client::{Client, Config, HttpConnector};
use serde::{Deserialize, Serialize};
use serde_json;
use std::io::Read;
@@ -35,7 +36,7 @@ type AsyncStreamResponse<T> = Box<Stream<Item = T, Error = Error>>;
///
pub struct IpfsClient {
base: Uri,
- client: Client<HttpConnector, Body>,
+ client: Client<HttpConnector, multipart::Body>,
}
impl IpfsClient {
@@ -51,7 +52,10 @@ impl IpfsClient {
Ok(IpfsClient {
base: base_path,
- client: Client::new(handle),
+ client: Config::default()
+ .body::<multipart::Body>()
+ .keep_alive(true)
+ .build(handle),
})
}
@@ -69,7 +73,7 @@ impl IpfsClient {
/// Builds the url for an api call.
///
- fn build_base_request<Req>(&self, req: &Req) -> Result<Request, Error>
+ fn build_base_request<Req>(&self, req: &Req) -> Result<Request<multipart::Body>, Error>
where
Req: ApiRequest + Serialize,
{
@@ -153,7 +157,7 @@ impl IpfsClient {
/// Methods prefixed with `send_` work on a raw reqwest `RequestBuilder`
/// instance.
///
- fn send_request(&self, req: Request) -> AsyncResponse<(StatusCode, Chunk)> {
+ fn send_request(&self, req: Request<multipart::Body>) -> AsyncResponse<(StatusCode, Chunk)> {
let res = self.client
.request(req)
.and_then(|res| {
@@ -171,7 +175,7 @@ impl IpfsClient {
/// Methods prefixed with `send_` work on a raw reqwest `RequestBuilder`
/// instance.
///
- fn send_request_json<Res>(&self, req: Request) -> AsyncResponse<Res>
+ fn send_request_json<Res>(&self, req: Request<multipart::Body>) -> AsyncResponse<Res>
where
for<'de> Res: 'static + Deserialize<'de>,
{
@@ -217,14 +221,16 @@ impl IpfsClient {
/// Generic method for making a request to the Ipfs server, and getting
/// a deserializable response.
///
- fn request_with_body<Req, Res, R>(&self, data: R, req: &Req) -> AsyncResponse<Res>
+ fn request_with_body<Req, Res>(&self, form: multipart::Form, req: &Req) -> AsyncResponse<Res>
where
Req: ApiRequest + Serialize,
for<'de> Res: 'static + Deserialize<'de>,
- R: 'static + Read + Send,
{
let res = self.build_base_request(req)
- .map(|req| self.send_request_json(req))
+ .map(move |mut req| {
+ form.set_body(&mut req);
+ self.send_request_json(req)
+ })
.into_future()
.flatten();
@@ -311,7 +317,11 @@ impl IpfsClient {
where
R: 'static + Read + Send,
{
- self.request_with_body(data, &request::Add)
+ let mut form = multipart::Form::default();
+
+ form.add_reader("path", data);
+
+ self.request_with_body(form, &request::Add)
}
/// Returns the current ledger for a peer.
diff --git a/ipfs-api/src/lib.rs b/ipfs-api/src/lib.rs
index ad8aee7..f693965 100644
--- a/ipfs-api/src/lib.rs
+++ b/ipfs-api/src/lib.rs
@@ -11,6 +11,7 @@ extern crate bytes;
extern crate error_chain;
extern crate futures;
extern crate hyper;
+extern crate rand;
extern crate serde;
#[macro_use]
extern crate serde_derive;
@@ -26,4 +27,5 @@ mod request;
pub mod response;
mod client;
mod header;
+mod multipart;
mod read;
diff --git a/ipfs-api/src/multipart.rs b/ipfs-api/src/multipart.rs
new file mode 100644
index 0000000..1ffac62
--- /dev/null
+++ b/ipfs-api/src/multipart.rs
@@ -0,0 +1,437 @@
+// Copyright 2017 rust-ipfs-api Developers
+//
+// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
+// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
+// http://opensource.org/licenses/MIT>, at your option. This file may not be
+// copied, modified, or distributed except according to those terms.
+//
+
+use bytes::{BufMut, BytesMut};
+use futures::{Poll, Async};
+use futures::stream::Stream;
+use hyper::{self, Request};
+use hyper::header::{ContentDisposition, ContentType, DispositionParam, DispositionType, Header};
+use hyper::mime::{self, Mime};
+use rand::{self, Rng};
+use std::borrow::Borrow;
+use std::fmt::Display;
+use std::fs::File;
+use std::io::{self, Cursor, Read, Write};
+use std::iter::{FromIterator, Peekable};
+use std::path::Path;
+use std::str::FromStr;
+use std::vec::IntoIter;
+
+
+/// Converts a hyper Header into a String.
+///
+fn header_to_string<H>(header: &H) -> String
+where
+ H: Header + Display,
+{
+ format!("{}: {}", H::header_name(), header)
+}
+
+
+/// Writes a CLRF.
+///
+fn write_crlf<W>(write: &mut W) -> io::Result<()>
+where
+ W: Write,
+{
+ write.write_all(&[b'\r', b'\n'])
+}
+
+
+/// Multipart body that is compatible with Hyper.
+///
+pub struct Body {
+ /// The amount of data to write with each chunk.
+ ///
+ buf_size: usize,
+
+ /// The active reader.
+ ///
+ current: Option<Box<'static + Read + Send>>,
+
+ /// The parts as an iterator. When the iterator stops
+ /// yielding, the body is fully written.
+ ///
+ parts: Peekable<IntoIter<Part>>,
+
+ /// The multipart boundary.
+ ///
+ boundary: String,
+}
+
+impl Body {
+ /// Implements section 4.1.
+ ///
+ /// [See](https://tools.ietf.org/html/rfc7578#section-4.1).
+ ///
+ fn write_boundary<W>(&self, write: &mut W) -> io::Result<()>
+ where
+ W: Write,
+ {
+ write_crlf(write)?;
+ write.write_all(&[b'-', b'-'])?;
+ write.write_all(self.boundary.as_bytes())
+ }
+
+ fn write_final_boundary<W>(&self, write: &mut W) -> io::Result<()>
+ where
+ W: Write,
+ {
+ self.write_boundary(write)?;
+ write.write_all(&[b'-', b'-'])
+ }
+
+ /// Writes the Content-Disposition, and Content-Type headers.
+ ///
+ fn write_headers<W>(&self, write: &mut W, part: &Part) -> io::Result<()>
+ where
+ W: Write,
+ {
+ write_crlf(write)?;
+ write.write_all(
+ header_to_string(&part.content_type).as_bytes(),
+ )?;
+ write_crlf(write)?;
+ write.write_all(
+ header_to_string(&part.content_disposition).as_bytes(),
+ )?;
+ write_crlf(write)?;
+ write_crlf(write)
+ }
+}
+
+impl Stream for Body {
+ type Item = BytesMut;
+
+ type Error = hyper::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ let bytes = BytesMut::with_capacity(self.buf_size);
+ let mut writer = bytes.writer();
+
+ if self.current.is_none() {
+ if let Some(part) = self.parts.next() {
+ self.write_boundary(&mut writer)?;
+ self.write_headers(&mut writer, &part)?;
+
+ let read = match part.inner {
+ Inner::Read(read, _) => read,
+ Inner::Text(s) => Box::new(Cursor::new(s.into_bytes())),
+ };
+
+ self.current = Some(read);
+ } else {
+ // No current part, and no parts left means there is nothing
+ // left to write.
+ //
+ return Ok(Async::Ready(None));
+ }
+ }
+
+ let num = if let Some(ref mut read) = self.current {
+ // TODO: This should not write more bytes that are remaining in
+ // the buffer.
+ //
+ io::copy(read, &mut writer)?
+ } else {
+ 0
+ };
+
+ if num == 0 {
+ // Wrote 0 bytes from the reader, so we reached the EOF for the
+ // current item.
+ //
+ self.current = None;
+
+ // Peek to check if there are are any parts not yet written.
+ // If there is nothing, the final boundary can be written.
+ //
+ if self.parts.peek().is_none() {
+ self.write_final_boundary(&mut writer)?;
+
+ Ok(Async::Ready(Some(writer.into_inner())))
+ } else {
+ self.poll()
+ }
+ } else {
+ Ok(Async::Ready(Some(writer.into_inner())))
+ }
+ }
+}
+
+
+/// Implements the multipart/form-data media type as described by
+/// RFC 7578.
+///
+/// [See](https://tools.ietf.org/html/rfc7578#section-1).
+///
+pub struct Form {
+ parts: Vec<Part>,
+
+ /// The auto-generated boundary as described by 4.1.
+ ///
+ /// [See](https://tools.ietf.org/html/rfc7578#section-4.1).
+ ///
+ boundary: String,
+}
+
+impl Default for Form {
+ /// Creates a new form with the default boundary generator.
+ ///
+ #[inline]
+ fn default() -> Form {
+ Form::new::<RandomAsciiGenerator>()
+ }
+}
+
+impl Form {
+ /// Creates a new form with the specified boundary generator function.
+ ///
+ #[inline]
+ pub fn new<G>() -> Form
+ where
+ G: BoundaryGenerator,
+ {
+ Form {
+ parts: vec![],
+ boundary: G::generate_boundary(),
+ }
+ }
+
+ /// Updates a request instance with the multipart Content-Type header
+ /// and the payload data.
+ ///
+ pub fn set_body(self, req: &mut Request<Body>) {
+ let header = format!("multipart/form-data; boundary=\"{}\"", &self.boundary);
+
+ {
+ let headers = req.headers_mut();
+
+ headers.set(ContentType(Mime::from_str(&header).expect(
+ "multipart mime type should parse",
+ )));
+ }
+
+ req.set_body(self);
+ }
+
+ /// Adds a struct that implements Read.
+ ///
+ pub fn add_reader<F, R>(&mut self, name: F, read: R)
+ where
+ F: Into<String>,
+ R: 'static + Read + Send,
+ {
+ let read = Box::new(read);
+
+ self.parts.push(Part::new::<_, String>(
+ Inner::Read(read, None),
+ name,
+ None,
+ None,
+ ));
+ }
+
+ /// Adds a file, and attempts to derive the mime type.
+ ///
+ #[inline]
+ pub fn add_file<P, F>(&mut self, name: F, path: P) -> io::Result<()>
+ where
+ P: AsRef<Path>,
+ F: Into<String>,
+ {
+ self.add_file_with_mime(name, path, None)
+ }
+
+ /// Adds a file with the specified mime type to the form.
+ /// If the mime type isn't specified, a mime type will try to
+ /// be derived.
+ ///
+ fn add_file_with_mime<P, F>(&mut self, name: F, path: P, mime: Option<Mime>) -> io::Result<()>
+ where
+ P: AsRef<Path>,
+ F: Into<String>,
+ {
+ let f = File::open(&path)?;
+ let mime = if let Some(ext) = path.as_ref().extension() {
+ Mime::from_str(ext.to_string_lossy().borrow()).ok()
+ } else {
+ mime
+ };
+ let len = match f.metadata() {
+ // If the path is not a file, it can't be uploaded because there
+ // is no content.
+ //
+ Ok(ref meta) if !meta.is_file() => Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "expected a file not directory",
+ )),
+
+ // If there is some metadata on the file, try to derive some
+ // header values.
+ //
+ Ok(ref meta) => Ok(Some(meta.len())),
+
+ // The file metadata could not be accessed. This MIGHT not be an
+ // error, if the file could be opened.
+ //
+ Err(e) => Err(e),
+ }?;
+
+ let read = Box::new(f);
+
+ self.parts.push(Part::new(
+ Inner::Read(read, len),
+ name,
+ mime,
+ Some(path.as_ref().as_os_str().to_string_lossy()),
+ ));
+
+ Ok(())
+ }
+}
+
+impl Into<Body> for Form {
+ #[inline]
+ fn into(self) -> Body {
+ Body {
+ buf_size: 2048,
+ current: None,
+ parts: self.parts.into_iter().peekable(),
+ boundary: self.boundary,
+ }
+ }
+}
+
+
+pub struct Part {
+ inner: Inner,
+
+ /// Each part can include a Content-Type header field. If this
+ /// is not specified, it defaults to "text/plain", or
+ /// "application/octet-stream" for file data.
+ ///
+ /// [See](https://tools.ietf.org/html/rfc7578#section-4.4)
+ ///
+ content_type: ContentType,
+
+ /// Each part must contain a Content-Disposition header field.
+ ///
+ /// [See](https://tools.ietf.org/html/rfc7578#section-4.2).
+ ///
+ content_disposition: ContentDisposition,
+}
+
+impl Part {
+ /// Internal method to build a new Part instance. Sets the disposition type,
+ /// content-type, and the disposition parameters for name, and optionally
+ /// for filename.
+ ///
+ /// Per [4.3](https://tools.ietf.org/html/rfc7578#section-4.3), if multiple
+ /// files need to be specified for one form field, they can all be specified
+ /// with the same name parameter.
+ ///
+ fn new<N, F>(inner: Inner, name: N, mime: Option<Mime>, filename: Option<F>) -> Part
+ where
+ N: Into<String>,
+ F: Into<String>,
+ {
+ // `name` disposition parameter is required. It should correspond to the
+ // name of a form field.
+ //
+ // [See 4.2](https://tools.ietf.org/html/rfc7578#section-4.2)
+ //
+ let mut disposition_params = vec![DispositionParam::Ext("name".into(), name.into())];
+
+ // `filename` can be supplied for files, but is totally optional.
+ //
+ // [See 4.2](https://tools.ietf.org/html/rfc7578#section-4.2)
+ //
+ if let Some(filename) = filename {
+ disposition_params.push(DispositionParam::Ext("filename".into(), filename.into()));
+ }
+
+ let content_type = ContentType(mime.unwrap_or_else(|| inner.default_content_type()));
+
+ Part {
+ inner: inner,
+ content_type: content_type,
+ content_disposition: ContentDisposition {
+ disposition: DispositionType::Ext("form-data".into()),
+ parameters: disposition_params,
+ },
+ }
+ }
+}
+
+
+enum Inner {
+ /// The `Read` variant captures multiple cases.
+ ///
+ /// * The first is it supports uploading a file, which is explicitly
+ /// described in RFC 7578.
+ ///
+ /// * The second (which is not described by RFC 7578), is it can handle
+ /// arbitrary input streams (for example, a server response).
+ /// Any arbitrary input stream is automatically considered a file,
+ /// and assigned the corresponding content type if not explicitly
+ /// specified.
+ ///
+ Read(Box<'static + Read + Send>, Option<u64>),
+
+ /// The `String` variant handles "text/plain" form data payloads.
+ ///
+ Text(String),
+}
+
+impl Inner {
+ /// Returns the default Content-Type header value as described in section 4.4.
+ ///
+ /// [See](https://tools.ietf.org/html/rfc7578#section-4.4)
+ ///
+ #[inline]
+ fn default_content_type(&self) -> Mime {
+ match self {
+ &Inner::Read(_, _) => mime::APPLICATION_OCTET_STREAM,
+ &Inner::Text(_) => mime::TEXT_PLAIN,
+ }
+ }
+
+ /// Returns the length of the inner type.
+ ///
+ #[inline]
+ fn len(&self) -> Option<u64> {
+ match self {
+ &Inner::Read(_, len) => len,
+ &Inner::Text(ref s) => Some(s.len() as u64),
+ }
+ }
+}
+
+
+/// Random boundary string provider.
+///
+pub trait BoundaryGenerator {
+ /// Generates a String to use as a boundary.
+ ///
+ fn generate_boundary() -> String;
+}
+
+
+struct RandomAsciiGenerator;
+
+impl BoundaryGenerator for RandomAsciiGenerator {
+ /// Creates a boundary of 6 ascii characters.
+ ///
+ fn generate_boundary() -> String {
+ let mut rng = rand::weak_rng();
+ let ascii = rng.gen_ascii_chars();
+
+ String::from_iter(ascii.take(6))
+ }
+}
diff --git a/ipfs-api/src/response/add.rs b/ipfs-api/src/response/add.rs
index bb29a89..24c82ac 100644
--- a/ipfs-api/src/response/add.rs
+++ b/ipfs-api/src/response/add.rs
@@ -11,5 +11,5 @@
pub struct AddResponse {
pub name: String,
pub hash: String,
- pub bytes: i64,
+ pub size: String,
}