summaryrefslogtreecommitdiffstats
path: root/src/repository/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/repository/client.rs')
-rw-r--r--src/repository/client.rs29
1 files changed, 17 insertions, 12 deletions
diff --git a/src/repository/client.rs b/src/repository/client.rs
index 08ec15b..c21df0b 100644
--- a/src/repository/client.rs
+++ b/src/repository/client.rs
@@ -6,7 +6,10 @@ use ipfs_api::IpfsClient;
use failure::Error;
use failure::err_msg;
use futures::future::Future;
+use futures::future::FutureExt;
use futures::stream::Stream;
+use futures::stream::StreamExt;
+use futures::stream::TryStreamExt;
use serde_json::from_str as serde_json_from_str;
use serde_json::to_string as serde_json_to_str;
@@ -36,21 +39,23 @@ impl ClientFassade {
.map_err(Into::into)
}
- fn get<H: AsRef<IPFSHash>>(&self, hash: H) -> impl Future<Item = Vec<u8>, Error = Error> {
+ async fn get<H: AsRef<IPFSHash>>(&self, hash: H) -> Result<Vec<u8>, Error> {
debug!("Get: {}", hash.as_ref());
self.0
.clone()
.cat(hash.as_ref())
- .concat2()
- .map_err(Error::from)
- .map(|blob| blob.to_vec())
+ .map_ok(|b| b.to_vec())
+ .try_concat()
+ .map(|r| r.map_err(Error::from))
+ .await
}
- fn put(&self, data: Vec<u8>) -> impl Future<Item = IPFSHash, Error = Error> {
+ async fn put(&self, data: Vec<u8>) -> Result<IPFSHash, Error> {
debug!("Put: {:?}", data);
self.0
.clone()
.add(Cursor::new(data))
+ .await
.map(|res| IPFSHash::from(res.hash))
.map_err(Into::into)
}
@@ -65,19 +70,20 @@ impl TypedClientFassade {
ClientFassade::new(host, port).map(TypedClientFassade)
}
- pub fn get_raw_bytes<H>(&self, hash: H) -> impl Future<Item = Vec<u8>, Error = Error>
+ pub async fn get_raw_bytes<H>(&self, hash: H) -> Result<Vec<u8>, Error>
where H: AsRef<IPFSHash>
{
- self.0.get(hash)
+ self.0.get(hash).await
}
- pub fn get<H, D>(&self, hash: H) -> impl Future<Item = D, Error = Error>
+ pub async fn get<H, D>(&self, hash: H) -> Result<D, Error>
where H: AsRef<IPFSHash>,
D: DeserializeOwned
{
self.0
.clone()
.get(hash)
+ .await
.and_then(|data| {
debug!("Got data, building object: {:?}", data);
@@ -85,15 +91,14 @@ impl TypedClientFassade {
})
}
- pub fn put<S, Ser>(&self, data: &S) -> impl Future<Item = IPFSHash, Error = Error>
+ pub async fn put<S, Ser>(&self, data: &S) -> Result<IPFSHash, Error>
where S: AsRef<Ser>,
Ser: Serialize
{
let client = self.0.clone();
- ::futures::future::result(serde_json_to_str(data.as_ref()))
- .map_err(Into::into)
- .and_then(move |d| client.put(d.into_bytes()))
+ let data = serde_json_to_str(data.as_ref())?;
+ client.put(data.into_bytes()).await
}
}