diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2020-03-25 15:05:47 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2020-03-25 19:50:48 +0100 |
commit | 442d148ad6e0f2df7e9e8acbd841a15c4347b0e1 (patch) | |
tree | 79df854e75081650ef57da1ff5588180e2246af7 /src/repository | |
parent | 42d741c1928264f9b4cb5ec8d990bde4b6feaed3 (diff) |
Rewrite library code for async-await
Diffstat (limited to 'src/repository')
-rw-r--r-- | src/repository/client.rs | 29 | ||||
-rw-r--r-- | src/repository/profile.rs | 41 | ||||
-rw-r--r-- | src/repository/repository.rs | 20 |
3 files changed, 54 insertions, 36 deletions
diff --git a/src/repository/client.rs b/src/repository/client.rs index 08ec15b..c21df0b 100644 --- a/src/repository/client.rs +++ b/src/repository/client.rs @@ -6,7 +6,10 @@ use ipfs_api::IpfsClient; use failure::Error; use failure::err_msg; use futures::future::Future; +use futures::future::FutureExt; use futures::stream::Stream; +use futures::stream::StreamExt; +use futures::stream::TryStreamExt; use serde_json::from_str as serde_json_from_str; use serde_json::to_string as serde_json_to_str; @@ -36,21 +39,23 @@ impl ClientFassade { .map_err(Into::into) } - fn get<H: AsRef<IPFSHash>>(&self, hash: H) -> impl Future<Item = Vec<u8>, Error = Error> { + async fn get<H: AsRef<IPFSHash>>(&self, hash: H) -> Result<Vec<u8>, Error> { debug!("Get: {}", hash.as_ref()); self.0 .clone() .cat(hash.as_ref()) - .concat2() - .map_err(Error::from) - .map(|blob| blob.to_vec()) + .map_ok(|b| b.to_vec()) + .try_concat() + .map(|r| r.map_err(Error::from)) + .await } - fn put(&self, data: Vec<u8>) -> impl Future<Item = IPFSHash, Error = Error> { + async fn put(&self, data: Vec<u8>) -> Result<IPFSHash, Error> { debug!("Put: {:?}", data); self.0 .clone() .add(Cursor::new(data)) + .await .map(|res| IPFSHash::from(res.hash)) .map_err(Into::into) } @@ -65,19 +70,20 @@ impl TypedClientFassade { ClientFassade::new(host, port).map(TypedClientFassade) } - pub fn get_raw_bytes<H>(&self, hash: H) -> impl Future<Item = Vec<u8>, Error = Error> + pub async fn get_raw_bytes<H>(&self, hash: H) -> Result<Vec<u8>, Error> where H: AsRef<IPFSHash> { - self.0.get(hash) + self.0.get(hash).await } - pub fn get<H, D>(&self, hash: H) -> impl Future<Item = D, Error = Error> + pub async fn get<H, D>(&self, hash: H) -> Result<D, Error> where H: AsRef<IPFSHash>, D: DeserializeOwned { self.0 .clone() .get(hash) + .await .and_then(|data| { debug!("Got data, building object: {:?}", data); @@ -85,15 +91,14 @@ impl TypedClientFassade { }) } - pub fn put<S, Ser>(&self, data: &S) -> impl Future<Item = IPFSHash, Error = Error> + pub async fn put<S, Ser>(&self, data: &S) -> Result<IPFSHash, Error> where S: AsRef<Ser>, Ser: Serialize { let client = self.0.clone(); - ::futures::future::result(serde_json_to_str(data.as_ref())) - .map_err(Into::into) - .and_then(move |d| client.put(d.into_bytes())) + let data = serde_json_to_str(data.as_ref())?; + client.put(data.into_bytes()).await } } diff --git a/src/repository/profile.rs b/src/repository/profile.rs index 8d9a12b..bc62d9f 100644 --- a/src/repository/profile.rs +++ b/src/repository/profile.rs @@ -1,7 +1,8 @@ use failure::Error; -use futures::stream::{self, Stream}; +use futures::stream::{self, Stream, StreamExt}; use futures::future; use futures::Future; +use futures::FutureExt; use crate::types::util::IPNSHash; use crate::types::util::IPFSHash; @@ -40,19 +41,31 @@ impl Profile { unimplemented!() } - pub fn blocks(&self) -> impl Stream<Item = Block, Error = Error> { - let repo = self.repository.clone(); - stream::unfold(vec![self.head.clone()], move |mut state| { - let repo = repo.clone(); - state.pop() - .map(move |hash| { - repo.get_block(hash).map(|block| { - block.parents().iter().for_each(|parent| { - state.push(parent.clone()) - }); - (block, state) - }) - }) + pub fn blocks(&self) -> impl Stream<Item = Result<Block, Error>> { + stream::unfold((self.repository.clone(), vec![self.head.clone()]), move |(repo, mut state)| { + async { + if let Some(hash) = state.pop() { + match repo + .get_block(hash) + .await + .map(move |block| { + block.parents().iter().for_each(|parent| { + state.push(parent.clone()) + }); + + (block, state) + }) + .map(Some) + .transpose() + { + Some(Ok((item, state))) => Some((Ok(item), (repo, state))), + Some(Err(e)) => Some((Err(e), (repo, vec![]))), + None => None, + } + } else { + None + } + } }) } } diff --git a/src/repository/repository.rs b/src/repository/repository.rs index 7386ba4..c162e65 100644 --- a/src/repository/repository.rs +++ b/src/repository/repository.rs @@ -36,34 +36,34 @@ impl Repository { TypedClientFassade::new(host, port).map(Repository) } - pub fn get_raw_bytes<H>(&self, hash: H) -> impl Future<Item = Vec<u8>, Error = Error> + pub async fn get_raw_bytes<H>(&self, hash: H) -> Result<Vec<u8>, Error> where H: AsRef<IPFSHash> { - self.0.get_raw_bytes(hash) + self.0.get_raw_bytes(hash).await } - pub fn get_block<H>(&self, hash: H) -> impl Future<Item = Block, Error = Error> + pub async fn get_block<H>(&self, hash: H) -> Result<Block, Error> where H: AsRef<IPFSHash> { - self.0.get(hash) + self.0.get(hash).await } - pub fn put_block<B>(&self, b: B) -> impl Future<Item = IPFSHash, Error = Error> + pub async fn put_block<B>(&self, b: B) -> Result<IPFSHash, Error> where B: AsRef<Block> { - self.0.put(b.as_ref()) + self.0.put(b.as_ref()).await } - pub fn get_content<H>(&self, hash: H) -> impl Future<Item = Content, Error = Error> + pub async fn get_content<H>(&self, hash: H) -> Result<Content, Error> where H: AsRef<IPFSHash> { - self.0.get(hash) + self.0.get(hash).await } - pub fn put_content<C>(&self, c: C) -> impl Future<Item = IPFSHash, Error = Error> + pub async fn put_content<C>(&self, c: C) -> Result<IPFSHash, Error> where C: AsRef<Content> { - self.0.put(c.as_ref()) + self.0.put(c.as_ref()).await } } |