summaryrefslogtreecommitdiffstats
path: root/src/repository
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-03-25 15:05:47 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-03-25 19:50:48 +0100
commit442d148ad6e0f2df7e9e8acbd841a15c4347b0e1 (patch)
tree79df854e75081650ef57da1ff5588180e2246af7 /src/repository
parent42d741c1928264f9b4cb5ec8d990bde4b6feaed3 (diff)
Rewrite library code for async-await
Diffstat (limited to 'src/repository')
-rw-r--r--src/repository/client.rs29
-rw-r--r--src/repository/profile.rs41
-rw-r--r--src/repository/repository.rs20
3 files changed, 54 insertions, 36 deletions
diff --git a/src/repository/client.rs b/src/repository/client.rs
index 08ec15b..c21df0b 100644
--- a/src/repository/client.rs
+++ b/src/repository/client.rs
@@ -6,7 +6,10 @@ use ipfs_api::IpfsClient;
use failure::Error;
use failure::err_msg;
use futures::future::Future;
+use futures::future::FutureExt;
use futures::stream::Stream;
+use futures::stream::StreamExt;
+use futures::stream::TryStreamExt;
use serde_json::from_str as serde_json_from_str;
use serde_json::to_string as serde_json_to_str;
@@ -36,21 +39,23 @@ impl ClientFassade {
.map_err(Into::into)
}
- fn get<H: AsRef<IPFSHash>>(&self, hash: H) -> impl Future<Item = Vec<u8>, Error = Error> {
+ async fn get<H: AsRef<IPFSHash>>(&self, hash: H) -> Result<Vec<u8>, Error> {
debug!("Get: {}", hash.as_ref());
self.0
.clone()
.cat(hash.as_ref())
- .concat2()
- .map_err(Error::from)
- .map(|blob| blob.to_vec())
+ .map_ok(|b| b.to_vec())
+ .try_concat()
+ .map(|r| r.map_err(Error::from))
+ .await
}
- fn put(&self, data: Vec<u8>) -> impl Future<Item = IPFSHash, Error = Error> {
+ async fn put(&self, data: Vec<u8>) -> Result<IPFSHash, Error> {
debug!("Put: {:?}", data);
self.0
.clone()
.add(Cursor::new(data))
+ .await
.map(|res| IPFSHash::from(res.hash))
.map_err(Into::into)
}
@@ -65,19 +70,20 @@ impl TypedClientFassade {
ClientFassade::new(host, port).map(TypedClientFassade)
}
- pub fn get_raw_bytes<H>(&self, hash: H) -> impl Future<Item = Vec<u8>, Error = Error>
+ pub async fn get_raw_bytes<H>(&self, hash: H) -> Result<Vec<u8>, Error>
where H: AsRef<IPFSHash>
{
- self.0.get(hash)
+ self.0.get(hash).await
}
- pub fn get<H, D>(&self, hash: H) -> impl Future<Item = D, Error = Error>
+ pub async fn get<H, D>(&self, hash: H) -> Result<D, Error>
where H: AsRef<IPFSHash>,
D: DeserializeOwned
{
self.0
.clone()
.get(hash)
+ .await
.and_then(|data| {
debug!("Got data, building object: {:?}", data);
@@ -85,15 +91,14 @@ impl TypedClientFassade {
})
}
- pub fn put<S, Ser>(&self, data: &S) -> impl Future<Item = IPFSHash, Error = Error>
+ pub async fn put<S, Ser>(&self, data: &S) -> Result<IPFSHash, Error>
where S: AsRef<Ser>,
Ser: Serialize
{
let client = self.0.clone();
- ::futures::future::result(serde_json_to_str(data.as_ref()))
- .map_err(Into::into)
- .and_then(move |d| client.put(d.into_bytes()))
+ let data = serde_json_to_str(data.as_ref())?;
+ client.put(data.into_bytes()).await
}
}
diff --git a/src/repository/profile.rs b/src/repository/profile.rs
index 8d9a12b..bc62d9f 100644
--- a/src/repository/profile.rs
+++ b/src/repository/profile.rs
@@ -1,7 +1,8 @@
use failure::Error;
-use futures::stream::{self, Stream};
+use futures::stream::{self, Stream, StreamExt};
use futures::future;
use futures::Future;
+use futures::FutureExt;
use crate::types::util::IPNSHash;
use crate::types::util::IPFSHash;
@@ -40,19 +41,31 @@ impl Profile {
unimplemented!()
}
- pub fn blocks(&self) -> impl Stream<Item = Block, Error = Error> {
- let repo = self.repository.clone();
- stream::unfold(vec![self.head.clone()], move |mut state| {
- let repo = repo.clone();
- state.pop()
- .map(move |hash| {
- repo.get_block(hash).map(|block| {
- block.parents().iter().for_each(|parent| {
- state.push(parent.clone())
- });
- (block, state)
- })
- })
+ pub fn blocks(&self) -> impl Stream<Item = Result<Block, Error>> {
+ stream::unfold((self.repository.clone(), vec![self.head.clone()]), move |(repo, mut state)| {
+ async {
+ if let Some(hash) = state.pop() {
+ match repo
+ .get_block(hash)
+ .await
+ .map(move |block| {
+ block.parents().iter().for_each(|parent| {
+ state.push(parent.clone())
+ });
+
+ (block, state)
+ })
+ .map(Some)
+ .transpose()
+ {
+ Some(Ok((item, state))) => Some((Ok(item), (repo, state))),
+ Some(Err(e)) => Some((Err(e), (repo, vec![]))),
+ None => None,
+ }
+ } else {
+ None
+ }
+ }
})
}
}
diff --git a/src/repository/repository.rs b/src/repository/repository.rs
index 7386ba4..c162e65 100644
--- a/src/repository/repository.rs
+++ b/src/repository/repository.rs
@@ -36,34 +36,34 @@ impl Repository {
TypedClientFassade::new(host, port).map(Repository)
}
- pub fn get_raw_bytes<H>(&self, hash: H) -> impl Future<Item = Vec<u8>, Error = Error>
+ pub async fn get_raw_bytes<H>(&self, hash: H) -> Result<Vec<u8>, Error>
where H: AsRef<IPFSHash>
{
- self.0.get_raw_bytes(hash)
+ self.0.get_raw_bytes(hash).await
}
- pub fn get_block<H>(&self, hash: H) -> impl Future<Item = Block, Error = Error>
+ pub async fn get_block<H>(&self, hash: H) -> Result<Block, Error>
where H: AsRef<IPFSHash>
{
- self.0.get(hash)
+ self.0.get(hash).await
}
- pub fn put_block<B>(&self, b: B) -> impl Future<Item = IPFSHash, Error = Error>
+ pub async fn put_block<B>(&self, b: B) -> Result<IPFSHash, Error>
where B: AsRef<Block>
{
- self.0.put(b.as_ref())
+ self.0.put(b.as_ref()).await
}
- pub fn get_content<H>(&self, hash: H) -> impl Future<Item = Content, Error = Error>
+ pub async fn get_content<H>(&self, hash: H) -> Result<Content, Error>
where H: AsRef<IPFSHash>
{
- self.0.get(hash)
+ self.0.get(hash).await
}
- pub fn put_content<C>(&self, c: C) -> impl Future<Item = IPFSHash, Error = Error>
+ pub async fn put_content<C>(&self, c: C) -> Result<IPFSHash, Error>
where C: AsRef<Content>
{
- self.0.put(c.as_ref())
+ self.0.put(c.as_ref()).await
}
}