diff options
Diffstat (limited to 'src/repository/profile.rs')
-rw-r--r-- | src/repository/profile.rs | 41 |
1 files changed, 27 insertions, 14 deletions
diff --git a/src/repository/profile.rs b/src/repository/profile.rs index 8d9a12b..bc62d9f 100644 --- a/src/repository/profile.rs +++ b/src/repository/profile.rs @@ -1,7 +1,8 @@ use failure::Error; -use futures::stream::{self, Stream}; +use futures::stream::{self, Stream, StreamExt}; use futures::future; use futures::Future; +use futures::FutureExt; use crate::types::util::IPNSHash; use crate::types::util::IPFSHash; @@ -40,19 +41,31 @@ impl Profile { unimplemented!() } - pub fn blocks(&self) -> impl Stream<Item = Block, Error = Error> { - let repo = self.repository.clone(); - stream::unfold(vec![self.head.clone()], move |mut state| { - let repo = repo.clone(); - state.pop() - .map(move |hash| { - repo.get_block(hash).map(|block| { - block.parents().iter().for_each(|parent| { - state.push(parent.clone()) - }); - (block, state) - }) - }) + pub fn blocks(&self) -> impl Stream<Item = Result<Block, Error>> { + stream::unfold((self.repository.clone(), vec![self.head.clone()]), move |(repo, mut state)| { + async { + if let Some(hash) = state.pop() { + match repo + .get_block(hash) + .await + .map(move |block| { + block.parents().iter().for_each(|parent| { + state.push(parent.clone()) + }); + + (block, state) + }) + .map(Some) + .transpose() + { + Some(Ok((item, state))) => Some((Ok(item), (repo, state))), + Some(Err(e)) => Some((Err(e), (repo, vec![]))), + None => None, + } + } else { + None + } + } }) } } |