diff options
Diffstat (limited to 'src/repository/client.rs')
-rw-r--r-- | src/repository/client.rs | 29 |
1 files changed, 17 insertions, 12 deletions
diff --git a/src/repository/client.rs b/src/repository/client.rs index 08ec15b..c21df0b 100644 --- a/src/repository/client.rs +++ b/src/repository/client.rs @@ -6,7 +6,10 @@ use ipfs_api::IpfsClient; use failure::Error; use failure::err_msg; use futures::future::Future; +use futures::future::FutureExt; use futures::stream::Stream; +use futures::stream::StreamExt; +use futures::stream::TryStreamExt; use serde_json::from_str as serde_json_from_str; use serde_json::to_string as serde_json_to_str; @@ -36,21 +39,23 @@ impl ClientFassade { .map_err(Into::into) } - fn get<H: AsRef<IPFSHash>>(&self, hash: H) -> impl Future<Item = Vec<u8>, Error = Error> { + async fn get<H: AsRef<IPFSHash>>(&self, hash: H) -> Result<Vec<u8>, Error> { debug!("Get: {}", hash.as_ref()); self.0 .clone() .cat(hash.as_ref()) - .concat2() - .map_err(Error::from) - .map(|blob| blob.to_vec()) + .map_ok(|b| b.to_vec()) + .try_concat() + .map(|r| r.map_err(Error::from)) + .await } - fn put(&self, data: Vec<u8>) -> impl Future<Item = IPFSHash, Error = Error> { + async fn put(&self, data: Vec<u8>) -> Result<IPFSHash, Error> { debug!("Put: {:?}", data); self.0 .clone() .add(Cursor::new(data)) + .await .map(|res| IPFSHash::from(res.hash)) .map_err(Into::into) } @@ -65,19 +70,20 @@ impl TypedClientFassade { ClientFassade::new(host, port).map(TypedClientFassade) } - pub fn get_raw_bytes<H>(&self, hash: H) -> impl Future<Item = Vec<u8>, Error = Error> + pub async fn get_raw_bytes<H>(&self, hash: H) -> Result<Vec<u8>, Error> where H: AsRef<IPFSHash> { - self.0.get(hash) + self.0.get(hash).await } - pub fn get<H, D>(&self, hash: H) -> impl Future<Item = D, Error = Error> + pub async fn get<H, D>(&self, hash: H) -> Result<D, Error> where H: AsRef<IPFSHash>, D: DeserializeOwned { self.0 .clone() .get(hash) + .await .and_then(|data| { debug!("Got data, building object: {:?}", data); @@ -85,15 +91,14 @@ impl TypedClientFassade { }) } - pub fn put<S, Ser>(&self, data: &S) -> impl Future<Item = IPFSHash, Error = Error> + pub async fn put<S, Ser>(&self, data: &S) -> Result<IPFSHash, Error> where S: AsRef<Ser>, Ser: Serialize { let client = self.0.clone(); - ::futures::future::result(serde_json_to_str(data.as_ref())) - .map_err(Into::into) - .and_then(move |d| client.put(d.into_bytes())) + let data = serde_json_to_str(data.as_ref())?; + client.put(data.into_bytes()).await } } |