diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2019-06-15 20:00:09 +0200 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2020-03-25 10:50:14 +0100 |
commit | c1a7b40f9610c552d6f6d0d51503d3011fd85841 (patch) | |
tree | 15e82ce0827005f2c4a78e9e7bfb4c035b6a35fc /src | |
parent | 6d4b52c24a768bcc620dbc06f500a9d5bb71eea7 (diff) |
Implement iterator over all blocks of a profile
Diffstat (limited to 'src')
-rw-r--r-- | src/main.rs | 263 | ||||
-rw-r--r-- | src/repository/client.rs | 6 | ||||
-rw-r--r-- | src/repository/iter/block.rs | 52 | ||||
-rw-r--r-- | src/repository/iter/mod.rs | 1 | ||||
-rw-r--r-- | src/repository/mod.rs | 1 | ||||
-rw-r--r-- | src/repository/profile.rs | 34 | ||||
-rw-r--r-- | src/repository/repository.rs | 6 |
7 files changed, 116 insertions, 247 deletions
diff --git a/src/main.rs b/src/main.rs index b51bc11..a9b8f8c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -101,7 +101,7 @@ fn main() { debug!("Working with hash: {}", hash); hyper::rt::run(repo - .resolve_block(&hash) + .get_block(hash) .map_err(|e| { let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); @@ -122,7 +122,7 @@ fn main() { debug!("Working with hash: {}", hash); hyper::rt::run(repo - .resolve_content(&hash) + .get_content(hash) .map_err(|e| { let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); @@ -142,8 +142,9 @@ fn main() { let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap debug!("Working with hash: {}", hash); + let (tx, rx) = ::std::sync::mpsc::channel(); hyper::rt::run(repo - .resolve_content_post(&hash) + .get_content(hash) .map_err(|e| { let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); @@ -154,7 +155,8 @@ fn main() { exit(1) }) - .map(|_| ())); + .map(move |content| tx.send(content.payload().is_post()).unwrap())); + exit(if rx.recv().unwrap() { 0 } else { 1 }); } ("is-reply", Some(mtch)) => { @@ -163,8 +165,9 @@ fn main() { let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap debug!("Working with hash: {}", hash); + let (tx, rx) = ::std::sync::mpsc::channel(); hyper::rt::run(repo - .resolve_content_post(&hash) + .get_content(hash) .map_err(|e| { let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); @@ -175,16 +178,19 @@ fn main() { exit(1) }) - .map(|content| { - let is_reply = match content.payload() { - Payload::Post { reply_to, .. } => reply_to.is_some(), - _ => false, + .map(move |content| { + let is_reply = if !content.payload().is_post() { + false + } else { + match content.payload() { + Payload::Post { reply_to, .. } => reply_to.is_some(), + _ => false, + } }; - if !is_reply { - exit(1) - } + tx.send(is_reply).unwrap() })); + exit(if rx.recv().unwrap() { 0 } else { 1 }); } ("is-profile", Some(mtch)) => { @@ -193,8 +199,9 @@ fn main() { let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap debug!("Working with hash: {}", hash); + let (tx, rx) = ::std::sync::mpsc::channel(); hyper::rt::run(repo - .resolve_content_profile(&hash) + .get_content(hash) .map_err(|e| { let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); @@ -205,7 +212,10 @@ fn main() { exit(1) }) - .map(|_| ())); + .map(move |content| { + tx.send(content.payload().is_profile()).unwrap() + })); + exit(if rx.recv().unwrap() { 0 } else { 1 }); } ("get-parent-blocks", Some(mtch)) => { @@ -214,8 +224,9 @@ fn main() { let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap debug!("Working with hash: {}", hash); + let (tx, rx) = ::std::sync::mpsc::channel(); hyper::rt::run(repo - .resolve_block(&hash) + .get_block(hash) .map_err(|e| { let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); @@ -226,10 +237,11 @@ fn main() { exit(1) }) - .map(|block| block.parents().iter().for_each(|hash| { - println!("{}", hash); - }))); + .map(move |block| tx.send(block.parents().clone()).unwrap())); + for parent in rx.recv().unwrap() { + println!("{}", parent); + } } ("get-devices", Some(mtch)) => { @@ -238,8 +250,9 @@ fn main() { let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap debug!("Working with hash: {}", hash); + let (tx, rx) = ::std::sync::mpsc::channel(); hyper::rt::run(repo - .resolve_content(&hash) + .get_content(hash) .map_err(|e| { let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); @@ -250,7 +263,11 @@ fn main() { exit(1) }) - .map(|c| c.devices().iter().for_each(|d| println!("{}", d)))); + .map(move |c| tx.send(c.devices().clone()).unwrap())); + + for device in rx.recv().unwrap() { + println!("{}", device); + } } ("get-payload-type", Some(mtch)) => { @@ -259,8 +276,9 @@ fn main() { let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap debug!("Working with hash: {}", hash); + let (tx, rx) = ::std::sync::mpsc::channel(); hyper::rt::run(repo - .resolve_content_post(&hash) + .get_content(hash) .map_err(|e| { let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); @@ -271,12 +289,14 @@ fn main() { exit(1) }) - .map(|c| println!("{}", match c.payload() { + .map(move |c| tx.send(match c.payload() { Payload::None => "None", Payload::Post { .. } => "Post", Payload::AttachedPostComments { .. } => "AttachedPostComments", Payload::Profile { .. } => "Profile", - }))); + }).unwrap())); + + println!("{}", rx.recv().unwrap()); } ("get-payload", Some(mtch)) => { @@ -285,8 +305,9 @@ fn main() { let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap debug!("Working with hash: {}", hash); + let (tx, rx) = ::std::sync::mpsc::channel(); hyper::rt::run(repo - .resolve_content_post(&hash) + .get_content(hash) .map_err(|e| { let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); @@ -302,7 +323,8 @@ fn main() { error!("Error building JSON: {:?}", e); exit(1) }) - .map(|j| println!("{}", j))); + .map(move |j| tx.send(j).unwrap())); + println!("{}", rx.recv().unwrap()); } ("get-post-content", Some(mtch)) => { @@ -311,8 +333,9 @@ fn main() { let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap debug!("Working with hash: {}", hash); + let (tx, rx) = ::std::sync::mpsc::channel(); hyper::rt::run(repo - .resolve_content_post(&hash) + .get_content(hash.clone()) .map_err(|e| { let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); @@ -323,18 +346,12 @@ fn main() { exit(1) }) - .and_then(move |content| { - match content.payload() { + .map(move |c| { + match c.payload() { Payload::Post { content, content_format, .. } => { match (content_format.type_(), content_format.subtype()) { (mime::TEXT, _) => { // plain text will be printed - repo.resolve_plain(&content) - .and_then(|blob| String::from_utf8(blob).map_err(Into::into)) - .map_err(move |e| { - error!("Content is not UTF-8: {:?}", e); - exit(1) - }) - .map(|blob| println!("{}", blob)) + content.clone() }, (_, _) => { @@ -351,7 +368,19 @@ fn main() { exit(1) } } - })); + }) + .and_then(move |content_hash| { + repo.get_raw_bytes(content_hash) + .and_then(|blob| String::from_utf8(blob).map_err(Into::into)) + .map_err(|e| { + error!("Content is not UTF-8: {:?}", e); + exit(1) + }) + }) + .map(move |blob| tx.send(blob).unwrap()) + ); + + println!("{}", rx.recv().unwrap()); } ("get-post-content-format", Some(mtch)) => { @@ -361,7 +390,7 @@ fn main() { debug!("Working with hash: {}", hash); hyper::rt::run(repo - .resolve_content_post(&hash) + .get_content(hash) .map_err(|e| { let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); @@ -390,7 +419,7 @@ fn main() { debug!("Working with hash: {}", hash); hyper::rt::run(repo - .resolve_content_post(&hash) + .get_content(hash) .map_err(|e| { let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); @@ -421,7 +450,7 @@ fn main() { debug!("Working with hash: {}", hash); hyper::rt::run(repo - .resolve_content_post(&hash) + .get_content(hash) .map_err(|e| { let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); @@ -461,7 +490,7 @@ fn main() { debug!("Working with hash: {}", hash); hyper::rt::run(repo - .resolve_content_profile(&hash) + .get_content(hash) .map_err(|e| { let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); @@ -493,7 +522,7 @@ fn main() { debug!("Working with hash: {}", hash); hyper::rt::run(repo - .resolve_content_profile(&hash) + .get_content(hash) .map_err(|e| { let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); @@ -528,7 +557,7 @@ fn main() { debug!("Working with hash: {}", hash); hyper::rt::run(repo - .resolve_content_profile(&hash) + .get_content(hash) .map_err(|e| { let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..))); @@ -542,7 +571,7 @@ fn main() { .map(|content| { match content.payload() { Payload::Profile { more, .. } => { - match serde_json_to_string_pretty(more) { + match serde_json_to_string_pretty(&more) { Err(e) => { error!("Error building JSON: {:?}", e); exit(1) @@ -613,7 +642,7 @@ fn main() { hyper::rt::run({ repo - .put_content(&content) + .put_content(content) .map_err(|e| { error!("Error running: {:?}", e); print_error_details(e); @@ -661,7 +690,7 @@ fn main() { hyper::rt::run({ repo - .put_content(&content) + .put_content(content) .map_err(|e| { error!("Error running: {:?}", e); print_error_details(e); @@ -715,7 +744,7 @@ fn main() { hyper::rt::run({ repo - .put_content(&content) + .put_content(content) .map_err(|e| { error!("Error running: {:?}", e); print_error_details(e); @@ -749,7 +778,7 @@ fn main() { hyper::rt::run({ repo - .put_block(&block) + .put_block(block) .map_err(|e| { error!("Error running: {:?}", e); print_error_details(e); @@ -759,146 +788,8 @@ fn main() { }); } - ("create-profile", Some(mtch)) => { - debug!("Calling: create-profile"); - let (_config, repo) = boot(); - - let name = mtch.value_of("name").map(String::from).unwrap(); // safe by clap - let keyname = format!("distrox-{}", name); - let timestamp = Timestamp::from(::chrono::offset::Local::now().naive_local()); - let payload = Payload::Profile { - names: vec![name], - picture: None, - more: BTreeMap::new(), - }; - let profile = Content::new(vec![], Some(timestamp), payload); - - hyper::rt::run({ - // use ipfs defaults for lifetime and ttl - repo.new_profile(keyname, profile, None, None) - .map_err(|e| { - error!("Error running: {:?}", e); - print_error_details(e); - exit(1) - }) - .map(|(profile_name, profile_key)| { - println!("{}, {}", profile_name.deref(), profile_key.deref()); - }) - }); - }, - - ("post", Some(mtch)) => { - use crate::repository::ProfileName; - - debug!("Calling: post"); - let (_config, repo) = boot(); - let repo = Arc::new(repo); - let publish_key_name = mtch - .value_of("profile-name") - .map(String::from) - .map(ProfileName::from) - .unwrap(); // safe by clap - - let parent_block_hashes = mtch - .values_of("parents") - .unwrap() // safe by clap - .map(String::from) - .map(IPFSHash::from) - .collect::<Vec<_>>(); - - let text = mtch - .value_of("text") - .map(String::from) - .unwrap(); // safe by clap - - let time = ::chrono::offset::Local::now().naive_local(); - - let _repo2 = repo.clone(); - - hyper::rt::run({ - repo.clone() - .get_key_id_from_key_name(publish_key_name.clone()) - .and_then(move |key_id| { - repo.new_text_post(key_id, parent_block_hashes, text, Some(time)) - }) - .map(|hash| { - println!("{}", hash); - }) - .map_err(|e| { - error!("Error running: {:?}", e); - print_error_details(e); - exit(1) - }) - }); - }, - - ("publish", Some(mtch)) => { - use crate::repository::ProfileName; - - debug!("Calling: publish"); - let (_config, repo) = boot(); - let repo = Arc::new(repo); - let publish_key_name = mtch - .value_of("profile_name") - .map(String::from) - .map(ProfileName::from) - .unwrap(); // safe by clap - let blockhash = mtch - .value_of("blockhash") - .map(String::from) - .map(IPFSHash::from) - .unwrap(); // safe by clap - - let _repo2 = repo.clone(); - - hyper::rt::run({ - repo.clone() - .get_key_id_from_key_name(publish_key_name) - .and_then(move |publish_key_id| { - repo.announce_block(publish_key_id, &blockhash, None, None) - }) - .map_err(|e| { - error!("Error running: {:?}", e); - print_error_details(e); - exit(1) - }) - }); - }, - - ("get-profile-state", Some(mtch)) => { - use crate::repository::ProfileName; - - debug!("Calling: get-profile-state"); - let (_config, repo) = boot(); - let repo = Arc::new(repo); - let publish_key_name = mtch - .value_of("profile_name") - .map(String::from) - .map(ProfileName::from) - .unwrap(); // safe by clap - - let repo2 = repo.clone(); - - hyper::rt::run({ - repo.clone() - .get_key_id_from_key_name(publish_key_name.clone()) - .and_then(move |key_id| { - let key_id = key_id.into(); - repo2.deref_ipns_hash(&key_id) - }) - .map(|hash| { - println!("{}", hash); - }) - .map_err(|e| { - error!("Error running: {:?}", e); - print_error_details(e); - exit(1) - }) - }); - }, - (other, _mtch) => { - error!("Unknown command: {}", other); + error!("Unknown or unimplemented command: {}", other); exit(1) } } diff --git a/src/repository/client.rs b/src/repository/client.rs index db15d91..08ec15b 100644 --- a/src/repository/client.rs +++ b/src/repository/client.rs @@ -65,6 +65,12 @@ impl TypedClientFassade { ClientFassade::new(host, port).map(TypedClientFassade) } + pub fn get_raw_bytes<H>(&self, hash: H) -> impl Future<Item = Vec<u8>, Error = Error> + where H: AsRef<IPFSHash> + { + self.0.get(hash) + } + pub fn get<H, D>(&self, hash: H) -> impl Future<Item = D, Error = Error> where H: AsRef<IPFSHash>, D: DeserializeOwned diff --git a/src/repository/iter/block.rs b/src/repository/iter/block.rs deleted file mode 100644 index fb7e609..0000000 --- a/src/repository/iter/block.rs +++ /dev/null @@ -1,52 +0,0 @@ -use queues::Queue; -use failure::Error; -use futures::Future; - -use crate::repository::Repository; -use crate::types::block::Block; - -/// An iterator that iterates over `Block`s using a `Repository` -/// -pub struct BlockIterator { - repo: Repository, - queue: Queue<impl Future<Block, Error>>, -} - -impl BlockIterator { - pub fn new(repo: Repository, initial: IPFSHash) -> Self { - Repository { - queue: { - let q = Queue::default(); - q.add(repo.get_block(initial)); - q - }, - repo - } - } -} - -impl Iterator for BlockIterator { - type Item = Result<Block, Error>; - - fn next(&mut self) -> Option<Self::Item> { - if let Ok(next_block) = self.queue.remove() { - match next_block.wait() { - Some(block) => { - block.parents().iter().for_each(|parent| { - self.queue.add({ - self.repo.get_block(parent) - }); - }) - - Some(block) - }, - - Err(e) => return Some(Err(e)), - } - - - } else { - None - } - } -} diff --git a/src/repository/iter/mod.rs b/src/repository/iter/mod.rs deleted file mode 100644 index a863eaa..0000000 --- a/src/repository/iter/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod block; diff --git a/src/repository/mod.rs b/src/repository/mod.rs index aee6a01..f837ecc 100644 --- a/src/repository/mod.rs +++ b/src/repository/mod.rs @@ -1,7 +1,6 @@ mod client; mod repository; mod profile; -mod iter; pub use repository::Repository; pub use profile::ProfileName; diff --git a/src/repository/profile.rs b/src/repository/profile.rs index 991a0bf..8d9a12b 100644 --- a/src/repository/profile.rs +++ b/src/repository/profile.rs @@ -1,3 +1,13 @@ +use failure::Error; +use futures::stream::{self, Stream}; +use futures::future; +use futures::Future; + +use crate::types::util::IPNSHash; +use crate::types::util::IPFSHash; +use crate::types::block::Block; +use crate::repository::Repository; + #[derive(Clone, Debug, Hash, PartialOrd, Ord, PartialEq, Eq)] pub struct ProfileName(String); @@ -10,9 +20,9 @@ impl From<String> for ProfileName { /// A profile /// /// A profile can be _any_ profile, not only the profile of the user -#[derive(Debug)] pub struct Profile { repository: Repository, + head: IPFSHash, } impl Profile { @@ -26,13 +36,24 @@ impl Profile { } /// Load a profile from the repository - pub fn load(repository: Repository, key: Key) -> Result<Self, Error> { + pub fn load(repository: Repository, key: IPNSHash) -> Result<Self, Error> { unimplemented!() } - pub fn blocks(&self) -> impl Iterator<Item = Result<Block, Error>> { - use crate::repository::iter::block::BlockIterator; - BlockIterator::new(&self.repository) + pub fn blocks(&self) -> impl Stream<Item = Block, Error = Error> { + let repo = self.repository.clone(); + stream::unfold(vec![self.head.clone()], move |mut state| { + let repo = repo.clone(); + state.pop() + .map(move |hash| { + repo.get_block(hash).map(|block| { + block.parents().iter().for_each(|parent| { + state.push(parent.clone()) + }); + (block, state) + }) + }) + }) } } @@ -42,7 +63,6 @@ impl Profile { /// Internally this wraps the `Profile` type, but it provides more functionality, for example /// posting new content. /// -#[derive(Debug)] pub struct UserProfile { profile: Profile } @@ -60,7 +80,7 @@ impl UserProfile { } /// Load a profile from the repository - pub fn load(repository: Repository, key: Key) -> Result<Self, Error> { + pub fn load(repository: Repository, key: IPNSHash) -> Result<Self, Error> { Ok(UserProfile { profile: Profile::load(repository, key)?, }) diff --git a/src/repository/repository.rs b/src/repository/repository.rs index 5ed1244..7386ba4 100644 --- a/src/repository/repository.rs +++ b/src/repository/repository.rs @@ -36,6 +36,12 @@ impl Repository { TypedClientFassade::new(host, port).map(Repository) } + pub fn get_raw_bytes<H>(&self, hash: H) -> impl Future<Item = Vec<u8>, Error = Error> + where H: AsRef<IPFSHash> + { + self.0.get_raw_bytes(hash) + } + pub fn get_block<H>(&self, hash: H) -> impl Future<Item = Block, Error = Error> where H: AsRef<IPFSHash> { |