summaryrefslogtreecommitdiffstats
path: root/src/repository/profile.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/repository/profile.rs')
-rw-r--r--src/repository/profile.rs41
1 files changed, 27 insertions, 14 deletions
diff --git a/src/repository/profile.rs b/src/repository/profile.rs
index 8d9a12b..bc62d9f 100644
--- a/src/repository/profile.rs
+++ b/src/repository/profile.rs
@@ -1,7 +1,8 @@
use failure::Error;
-use futures::stream::{self, Stream};
+use futures::stream::{self, Stream, StreamExt};
use futures::future;
use futures::Future;
+use futures::FutureExt;
use crate::types::util::IPNSHash;
use crate::types::util::IPFSHash;
@@ -40,19 +41,31 @@ impl Profile {
unimplemented!()
}
- pub fn blocks(&self) -> impl Stream<Item = Block, Error = Error> {
- let repo = self.repository.clone();
- stream::unfold(vec![self.head.clone()], move |mut state| {
- let repo = repo.clone();
- state.pop()
- .map(move |hash| {
- repo.get_block(hash).map(|block| {
- block.parents().iter().for_each(|parent| {
- state.push(parent.clone())
- });
- (block, state)
- })
- })
+ pub fn blocks(&self) -> impl Stream<Item = Result<Block, Error>> {
+ stream::unfold((self.repository.clone(), vec![self.head.clone()]), move |(repo, mut state)| {
+ async {
+ if let Some(hash) = state.pop() {
+ match repo
+ .get_block(hash)
+ .await
+ .map(move |block| {
+ block.parents().iter().for_each(|parent| {
+ state.push(parent.clone())
+ });
+
+ (block, state)
+ })
+ .map(Some)
+ .transpose()
+ {
+ Some(Ok((item, state))) => Some((Ok(item), (repo, state))),
+ Some(Err(e)) => Some((Err(e), (repo, vec![]))),
+ None => None,
+ }
+ } else {
+ None
+ }
+ }
})
}
}