diff options
author | Ferris Tseng <ferristseng@fastmail.fm> | 2017-11-19 21:14:33 -0500 |
---|---|---|
committer | Ferris Tseng <ferristseng@fastmail.fm> | 2017-11-19 21:14:33 -0500 |
commit | d7955441038d61d7c66313fe7330f37a8b906518 (patch) | |
tree | 76b9dd3641d188a82d4e9d0fc513d5ec4c24e4b7 | |
parent | 1bb1691f617d7a2114150a4de382868b689d6dd8 (diff) |
finish implementing multipart api
-rw-r--r-- | ipfs-api/src/client.rs | 34 | ||||
-rw-r--r-- | ipfs-api/src/lib.rs | 2 | ||||
-rw-r--r-- | ipfs-api/src/multipart.rs (renamed from ipfs-api/src/form.rs) | 245 | ||||
-rw-r--r-- | ipfs-api/src/response/add.rs | 2 |
4 files changed, 241 insertions, 42 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index 221a1d2..53c2663 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -6,15 +6,15 @@ // copied, modified, or distributed except according to those terms. // -use form::Form; use futures::{stream, Stream}; use futures::future::{Future, IntoFuture}; use header::Trailer; +use multipart; use read::{JsonLineDecoder, LineDecoder, StreamReader}; use request::{self, ApiRequest}; use response::{self, Error, ErrorKind}; -use hyper::{self, Body, Chunk, Request, Response, Uri, Method, StatusCode}; -use hyper::client::{Client, HttpConnector}; +use hyper::{self, Chunk, Request, Response, Uri, Method, StatusCode}; +use hyper::client::{Client, Config, HttpConnector}; use serde::{Deserialize, Serialize}; use serde_json; use std::io::Read; @@ -36,7 +36,7 @@ type AsyncStreamResponse<T> = Box<Stream<Item = T, Error = Error>>; /// pub struct IpfsClient { base: Uri, - client: Client<HttpConnector, Body>, + client: Client<HttpConnector, multipart::Body>, } impl IpfsClient { @@ -52,7 +52,10 @@ impl IpfsClient { Ok(IpfsClient { base: base_path, - client: Client::new(handle), + client: Config::default() + .body::<multipart::Body>() + .keep_alive(true) + .build(handle), }) } @@ -70,7 +73,7 @@ impl IpfsClient { /// Builds the url for an api call. /// - fn build_base_request<Req>(&self, req: &Req) -> Result<Request, Error> + fn build_base_request<Req>(&self, req: &Req) -> Result<Request<multipart::Body>, Error> where Req: ApiRequest + Serialize, { @@ -154,7 +157,7 @@ impl IpfsClient { /// Methods prefixed with `send_` work on a raw reqwest `RequestBuilder` /// instance. /// - fn send_request(&self, req: Request) -> AsyncResponse<(StatusCode, Chunk)> { + fn send_request(&self, req: Request<multipart::Body>) -> AsyncResponse<(StatusCode, Chunk)> { let res = self.client .request(req) .and_then(|res| { @@ -172,7 +175,7 @@ impl IpfsClient { /// Methods prefixed with `send_` work on a raw reqwest `RequestBuilder` /// instance. /// - fn send_request_json<Res>(&self, req: Request) -> AsyncResponse<Res> + fn send_request_json<Res>(&self, req: Request<multipart::Body>) -> AsyncResponse<Res> where for<'de> Res: 'static + Deserialize<'de>, { @@ -218,15 +221,16 @@ impl IpfsClient { /// Generic method for making a request to the Ipfs server, and getting /// a deserializable response. /// - fn request_with_body<Req, Res, R>(&self, data: R, req: &Req) -> AsyncResponse<Res> + fn request_with_body<Req, Res>(&self, form: multipart::Form, req: &Req) -> AsyncResponse<Res> where Req: ApiRequest + Serialize, for<'de> Res: 'static + Deserialize<'de>, - R: 'static + Read + Send, { - let form = Form::default(); let res = self.build_base_request(req) - .map(|req| self.send_request_json(req)) + .map(move |mut req| { + form.set_body(&mut req); + self.send_request_json(req) + }) .into_future() .flatten(); @@ -313,7 +317,11 @@ impl IpfsClient { where R: 'static + Read + Send, { - self.request_with_body(data, &request::Add) + let mut form = multipart::Form::default(); + + form.add_reader("path", data); + + self.request_with_body(form, &request::Add) } /// Returns the current ledger for a peer. diff --git a/ipfs-api/src/lib.rs b/ipfs-api/src/lib.rs index 7c0c9f1..f693965 100644 --- a/ipfs-api/src/lib.rs +++ b/ipfs-api/src/lib.rs @@ -26,6 +26,6 @@ pub use request::{KeyType, Logger, LoggingLevel}; mod request; pub mod response; mod client; -mod form; mod header; +mod multipart; mod read; diff --git a/ipfs-api/src/form.rs b/ipfs-api/src/multipart.rs index 6bf0442..1ffac62 100644 --- a/ipfs-api/src/form.rs +++ b/ipfs-api/src/multipart.rs @@ -6,36 +6,161 @@ // copied, modified, or distributed except according to those terms. // -use hyper::header::{ContentDisposition, ContentType, DispositionParam, DispositionType}; +use bytes::{BufMut, BytesMut}; +use futures::{Poll, Async}; +use futures::stream::Stream; +use hyper::{self, Request}; +use hyper::header::{ContentDisposition, ContentType, DispositionParam, DispositionType, Header}; use hyper::mime::{self, Mime}; use rand::{self, Rng}; -use std::borrow::{Borrow, Cow}; +use std::borrow::Borrow; +use std::fmt::Display; use std::fs::File; -use std::io::{self, Read}; -use std::iter::FromIterator; +use std::io::{self, Cursor, Read, Write}; +use std::iter::{FromIterator, Peekable}; use std::path::Path; use std::str::FromStr; +use std::vec::IntoIter; -/// Random boundary string provider. +/// Converts a hyper Header into a String. /// -pub trait BoundaryGenerator { - /// Generates a String to use as a boundary. +fn header_to_string<H>(header: &H) -> String +where + H: Header + Display, +{ + format!("{}: {}", H::header_name(), header) +} + + +/// Writes a CLRF. +/// +fn write_crlf<W>(write: &mut W) -> io::Result<()> +where + W: Write, +{ + write.write_all(&[b'\r', b'\n']) +} + + +/// Multipart body that is compatible with Hyper. +/// +pub struct Body { + /// The amount of data to write with each chunk. /// - fn generate_boundary() -> String; + buf_size: usize, + + /// The active reader. + /// + current: Option<Box<'static + Read + Send>>, + + /// The parts as an iterator. When the iterator stops + /// yielding, the body is fully written. + /// + parts: Peekable<IntoIter<Part>>, + + /// The multipart boundary. + /// + boundary: String, } +impl Body { + /// Implements section 4.1. + /// + /// [See](https://tools.ietf.org/html/rfc7578#section-4.1). + /// + fn write_boundary<W>(&self, write: &mut W) -> io::Result<()> + where + W: Write, + { + write_crlf(write)?; + write.write_all(&[b'-', b'-'])?; + write.write_all(self.boundary.as_bytes()) + } -struct RandomAsciiGenerator; + fn write_final_boundary<W>(&self, write: &mut W) -> io::Result<()> + where + W: Write, + { + self.write_boundary(write)?; + write.write_all(&[b'-', b'-']) + } -impl BoundaryGenerator for RandomAsciiGenerator { - /// Creates a boundary of 6 ascii characters. + /// Writes the Content-Disposition, and Content-Type headers. /// - fn generate_boundary() -> String { - let mut rng = rand::weak_rng(); - let ascii = rng.gen_ascii_chars(); + fn write_headers<W>(&self, write: &mut W, part: &Part) -> io::Result<()> + where + W: Write, + { + write_crlf(write)?; + write.write_all( + header_to_string(&part.content_type).as_bytes(), + )?; + write_crlf(write)?; + write.write_all( + header_to_string(&part.content_disposition).as_bytes(), + )?; + write_crlf(write)?; + write_crlf(write) + } +} - String::from_iter(ascii.take(6)) +impl Stream for Body { + type Item = BytesMut; + + type Error = hyper::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + let bytes = BytesMut::with_capacity(self.buf_size); + let mut writer = bytes.writer(); + + if self.current.is_none() { + if let Some(part) = self.parts.next() { + self.write_boundary(&mut writer)?; + self.write_headers(&mut writer, &part)?; + + let read = match part.inner { + Inner::Read(read, _) => read, + Inner::Text(s) => Box::new(Cursor::new(s.into_bytes())), + }; + + self.current = Some(read); + } else { + // No current part, and no parts left means there is nothing + // left to write. + // + return Ok(Async::Ready(None)); + } + } + + let num = if let Some(ref mut read) = self.current { + // TODO: This should not write more bytes that are remaining in + // the buffer. + // + io::copy(read, &mut writer)? + } else { + 0 + }; + + if num == 0 { + // Wrote 0 bytes from the reader, so we reached the EOF for the + // current item. + // + self.current = None; + + // Peek to check if there are are any parts not yet written. + // If there is nothing, the final boundary can be written. + // + if self.parts.peek().is_none() { + self.write_final_boundary(&mut writer)?; + + Ok(Async::Ready(Some(writer.into_inner()))) + } else { + self.poll() + } + } else { + Ok(Async::Ready(Some(writer.into_inner()))) + } } } @@ -78,12 +203,38 @@ impl Form { } } - /// Implements section 4.1. + /// Updates a request instance with the multipart Content-Type header + /// and the payload data. /// - /// [See](https://tools.ietf.org/html/rfc7578#section-4.1). + pub fn set_body(self, req: &mut Request<Body>) { + let header = format!("multipart/form-data; boundary=\"{}\"", &self.boundary); + + { + let headers = req.headers_mut(); + + headers.set(ContentType(Mime::from_str(&header).expect( + "multipart mime type should parse", + ))); + } + + req.set_body(self); + } + + /// Adds a struct that implements Read. /// - fn write_boundary(&self) -> io::Result<()> { - Ok(()) + pub fn add_reader<F, R>(&mut self, name: F, read: R) + where + F: Into<String>, + R: 'static + Read + Send, + { + let read = Box::new(read); + + self.parts.push(Part::new::<_, String>( + Inner::Read(read, None), + name, + None, + None, + )); } /// Adds a file, and attempts to derive the mime type. @@ -124,7 +275,7 @@ impl Form { // If there is some metadata on the file, try to derive some // header values. // - Ok(ref meta) => Ok(meta.len()), + Ok(ref meta) => Ok(Some(meta.len())), // The file metadata could not be accessed. This MIGHT not be an // error, if the file could be opened. @@ -145,8 +296,22 @@ impl Form { } } +impl Into<Body> for Form { + #[inline] + fn into(self) -> Body { + Body { + buf_size: 2048, + current: None, + parts: self.parts.into_iter().peekable(), + boundary: self.boundary, + } + } +} + pub struct Part { + inner: Inner, + /// Each part can include a Content-Type header field. If this /// is not specified, it defaults to "text/plain", or /// "application/octet-stream" for file data. @@ -191,8 +356,11 @@ impl Part { disposition_params.push(DispositionParam::Ext("filename".into(), filename.into())); } + let content_type = ContentType(mime.unwrap_or_else(|| inner.default_content_type())); + Part { - content_type: ContentType(mime.unwrap_or_else(|| inner.default_content_type())), + inner: inner, + content_type: content_type, content_disposition: ContentDisposition { disposition: DispositionType::Ext("form-data".into()), parameters: disposition_params, @@ -214,11 +382,11 @@ enum Inner { /// and assigned the corresponding content type if not explicitly /// specified. /// - Read(Box<Read>, u64), + Read(Box<'static + Read + Send>, Option<u64>), /// The `String` variant handles "text/plain" form data payloads. /// - String(Cow<'static, str>), + Text(String), } impl Inner { @@ -229,18 +397,41 @@ impl Inner { #[inline] fn default_content_type(&self) -> Mime { match self { - &Inner::Read(_, _) => mime::TEXT_PLAIN, - &Inner::String(_) => mime::APPLICATION_OCTET_STREAM, + &Inner::Read(_, _) => mime::APPLICATION_OCTET_STREAM, + &Inner::Text(_) => mime::TEXT_PLAIN, } } /// Returns the length of the inner type. /// #[inline] - fn len(&self) -> u64 { + fn len(&self) -> Option<u64> { match self { &Inner::Read(_, len) => len, - &Inner::String(ref s) => s.len() as u64, + &Inner::Text(ref s) => Some(s.len() as u64), } } } + + +/// Random boundary string provider. +/// +pub trait BoundaryGenerator { + /// Generates a String to use as a boundary. + /// + fn generate_boundary() -> String; +} + + +struct RandomAsciiGenerator; + +impl BoundaryGenerator for RandomAsciiGenerator { + /// Creates a boundary of 6 ascii characters. + /// + fn generate_boundary() -> String { + let mut rng = rand::weak_rng(); + let ascii = rng.gen_ascii_chars(); + + String::from_iter(ascii.take(6)) + } +} diff --git a/ipfs-api/src/response/add.rs b/ipfs-api/src/response/add.rs index bb29a89..24c82ac 100644 --- a/ipfs-api/src/response/add.rs +++ b/ipfs-api/src/response/add.rs @@ -11,5 +11,5 @@ pub struct AddResponse { pub name: String, pub hash: String, - pub bytes: i64, + pub size: String, } |