summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2019-06-15 20:00:09 +0200
committerMatthias Beyer <mail@beyermatthias.de>2020-03-25 10:50:14 +0100
commitc1a7b40f9610c552d6f6d0d51503d3011fd85841 (patch)
tree15e82ce0827005f2c4a78e9e7bfb4c035b6a35fc /src
parent6d4b52c24a768bcc620dbc06f500a9d5bb71eea7 (diff)
Implement iterator over all blocks of a profile
Diffstat (limited to 'src')
-rw-r--r--src/main.rs263
-rw-r--r--src/repository/client.rs6
-rw-r--r--src/repository/iter/block.rs52
-rw-r--r--src/repository/iter/mod.rs1
-rw-r--r--src/repository/mod.rs1
-rw-r--r--src/repository/profile.rs34
-rw-r--r--src/repository/repository.rs6
7 files changed, 116 insertions, 247 deletions
diff --git a/src/main.rs b/src/main.rs
index b51bc11..a9b8f8c 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -101,7 +101,7 @@ fn main() {
debug!("Working with hash: {}", hash);
hyper::rt::run(repo
- .resolve_block(&hash)
+ .get_block(hash)
.map_err(|e| {
let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..)));
@@ -122,7 +122,7 @@ fn main() {
debug!("Working with hash: {}", hash);
hyper::rt::run(repo
- .resolve_content(&hash)
+ .get_content(hash)
.map_err(|e| {
let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..)));
@@ -142,8 +142,9 @@ fn main() {
let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap
debug!("Working with hash: {}", hash);
+ let (tx, rx) = ::std::sync::mpsc::channel();
hyper::rt::run(repo
- .resolve_content_post(&hash)
+ .get_content(hash)
.map_err(|e| {
let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..)));
@@ -154,7 +155,8 @@ fn main() {
exit(1)
})
- .map(|_| ()));
+ .map(move |content| tx.send(content.payload().is_post()).unwrap()));
+ exit(if rx.recv().unwrap() { 0 } else { 1 });
}
("is-reply", Some(mtch)) => {
@@ -163,8 +165,9 @@ fn main() {
let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap
debug!("Working with hash: {}", hash);
+ let (tx, rx) = ::std::sync::mpsc::channel();
hyper::rt::run(repo
- .resolve_content_post(&hash)
+ .get_content(hash)
.map_err(|e| {
let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..)));
@@ -175,16 +178,19 @@ fn main() {
exit(1)
})
- .map(|content| {
- let is_reply = match content.payload() {
- Payload::Post { reply_to, .. } => reply_to.is_some(),
- _ => false,
+ .map(move |content| {
+ let is_reply = if !content.payload().is_post() {
+ false
+ } else {
+ match content.payload() {
+ Payload::Post { reply_to, .. } => reply_to.is_some(),
+ _ => false,
+ }
};
- if !is_reply {
- exit(1)
- }
+ tx.send(is_reply).unwrap()
}));
+ exit(if rx.recv().unwrap() { 0 } else { 1 });
}
("is-profile", Some(mtch)) => {
@@ -193,8 +199,9 @@ fn main() {
let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap
debug!("Working with hash: {}", hash);
+ let (tx, rx) = ::std::sync::mpsc::channel();
hyper::rt::run(repo
- .resolve_content_profile(&hash)
+ .get_content(hash)
.map_err(|e| {
let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..)));
@@ -205,7 +212,10 @@ fn main() {
exit(1)
})
- .map(|_| ()));
+ .map(move |content| {
+ tx.send(content.payload().is_profile()).unwrap()
+ }));
+ exit(if rx.recv().unwrap() { 0 } else { 1 });
}
("get-parent-blocks", Some(mtch)) => {
@@ -214,8 +224,9 @@ fn main() {
let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap
debug!("Working with hash: {}", hash);
+ let (tx, rx) = ::std::sync::mpsc::channel();
hyper::rt::run(repo
- .resolve_block(&hash)
+ .get_block(hash)
.map_err(|e| {
let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..)));
@@ -226,10 +237,11 @@ fn main() {
exit(1)
})
- .map(|block| block.parents().iter().for_each(|hash| {
- println!("{}", hash);
- })));
+ .map(move |block| tx.send(block.parents().clone()).unwrap()));
+ for parent in rx.recv().unwrap() {
+ println!("{}", parent);
+ }
}
("get-devices", Some(mtch)) => {
@@ -238,8 +250,9 @@ fn main() {
let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap
debug!("Working with hash: {}", hash);
+ let (tx, rx) = ::std::sync::mpsc::channel();
hyper::rt::run(repo
- .resolve_content(&hash)
+ .get_content(hash)
.map_err(|e| {
let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..)));
@@ -250,7 +263,11 @@ fn main() {
exit(1)
})
- .map(|c| c.devices().iter().for_each(|d| println!("{}", d))));
+ .map(move |c| tx.send(c.devices().clone()).unwrap()));
+
+ for device in rx.recv().unwrap() {
+ println!("{}", device);
+ }
}
("get-payload-type", Some(mtch)) => {
@@ -259,8 +276,9 @@ fn main() {
let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap
debug!("Working with hash: {}", hash);
+ let (tx, rx) = ::std::sync::mpsc::channel();
hyper::rt::run(repo
- .resolve_content_post(&hash)
+ .get_content(hash)
.map_err(|e| {
let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..)));
@@ -271,12 +289,14 @@ fn main() {
exit(1)
})
- .map(|c| println!("{}", match c.payload() {
+ .map(move |c| tx.send(match c.payload() {
Payload::None => "None",
Payload::Post { .. } => "Post",
Payload::AttachedPostComments { .. } => "AttachedPostComments",
Payload::Profile { .. } => "Profile",
- })));
+ }).unwrap()));
+
+ println!("{}", rx.recv().unwrap());
}
("get-payload", Some(mtch)) => {
@@ -285,8 +305,9 @@ fn main() {
let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap
debug!("Working with hash: {}", hash);
+ let (tx, rx) = ::std::sync::mpsc::channel();
hyper::rt::run(repo
- .resolve_content_post(&hash)
+ .get_content(hash)
.map_err(|e| {
let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..)));
@@ -302,7 +323,8 @@ fn main() {
error!("Error building JSON: {:?}", e);
exit(1)
})
- .map(|j| println!("{}", j)));
+ .map(move |j| tx.send(j).unwrap()));
+ println!("{}", rx.recv().unwrap());
}
("get-post-content", Some(mtch)) => {
@@ -311,8 +333,9 @@ fn main() {
let hash = IPFSHash::from(mtch.value_of("HASH").unwrap()); // safe by clap
debug!("Working with hash: {}", hash);
+ let (tx, rx) = ::std::sync::mpsc::channel();
hyper::rt::run(repo
- .resolve_content_post(&hash)
+ .get_content(hash.clone())
.map_err(|e| {
let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..)));
@@ -323,18 +346,12 @@ fn main() {
exit(1)
})
- .and_then(move |content| {
- match content.payload() {
+ .map(move |c| {
+ match c.payload() {
Payload::Post { content, content_format, .. } => {
match (content_format.type_(), content_format.subtype()) {
(mime::TEXT, _) => { // plain text will be printed
- repo.resolve_plain(&content)
- .and_then(|blob| String::from_utf8(blob).map_err(Into::into))
- .map_err(move |e| {
- error!("Content is not UTF-8: {:?}", e);
- exit(1)
- })
- .map(|blob| println!("{}", blob))
+ content.clone()
},
(_, _) => {
@@ -351,7 +368,19 @@ fn main() {
exit(1)
}
}
- }));
+ })
+ .and_then(move |content_hash| {
+ repo.get_raw_bytes(content_hash)
+ .and_then(|blob| String::from_utf8(blob).map_err(Into::into))
+ .map_err(|e| {
+ error!("Content is not UTF-8: {:?}", e);
+ exit(1)
+ })
+ })
+ .map(move |blob| tx.send(blob).unwrap())
+ );
+
+ println!("{}", rx.recv().unwrap());
}
("get-post-content-format", Some(mtch)) => {
@@ -361,7 +390,7 @@ fn main() {
debug!("Working with hash: {}", hash);
hyper::rt::run(repo
- .resolve_content_post(&hash)
+ .get_content(hash)
.map_err(|e| {
let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..)));
@@ -390,7 +419,7 @@ fn main() {
debug!("Working with hash: {}", hash);
hyper::rt::run(repo
- .resolve_content_post(&hash)
+ .get_content(hash)
.map_err(|e| {
let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..)));
@@ -421,7 +450,7 @@ fn main() {
debug!("Working with hash: {}", hash);
hyper::rt::run(repo
- .resolve_content_post(&hash)
+ .get_content(hash)
.map_err(|e| {
let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..)));
@@ -461,7 +490,7 @@ fn main() {
debug!("Working with hash: {}", hash);
hyper::rt::run(repo
- .resolve_content_profile(&hash)
+ .get_content(hash)
.map_err(|e| {
let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..)));
@@ -493,7 +522,7 @@ fn main() {
debug!("Working with hash: {}", hash);
hyper::rt::run(repo
- .resolve_content_profile(&hash)
+ .get_content(hash)
.map_err(|e| {
let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..)));
@@ -528,7 +557,7 @@ fn main() {
debug!("Working with hash: {}", hash);
hyper::rt::run(repo
- .resolve_content_profile(&hash)
+ .get_content(hash)
.map_err(|e| {
let ignore_err = is_match!(e.downcast_ref(), Some(&ipfs_api::response::Error::Api(..)));
@@ -542,7 +571,7 @@ fn main() {
.map(|content| {
match content.payload() {
Payload::Profile { more, .. } => {
- match serde_json_to_string_pretty(more) {
+ match serde_json_to_string_pretty(&more) {
Err(e) => {
error!("Error building JSON: {:?}", e);
exit(1)
@@ -613,7 +642,7 @@ fn main() {
hyper::rt::run({
repo
- .put_content(&content)
+ .put_content(content)
.map_err(|e| {
error!("Error running: {:?}", e);
print_error_details(e);
@@ -661,7 +690,7 @@ fn main() {
hyper::rt::run({
repo
- .put_content(&content)
+ .put_content(content)
.map_err(|e| {
error!("Error running: {:?}", e);
print_error_details(e);
@@ -715,7 +744,7 @@ fn main() {
hyper::rt::run({
repo
- .put_content(&content)
+ .put_content(content)
.map_err(|e| {
error!("Error running: {:?}", e);
print_error_details(e);
@@ -749,7 +778,7 @@ fn main() {
hyper::rt::run({
repo
- .put_block(&block)
+ .put_block(block)
.map_err(|e| {
error!("Error running: {:?}", e);
print_error_details(e);
@@ -759,146 +788,8 @@ fn main() {
});
}
- ("create-profile", Some(mtch)) => {
- debug!("Calling: create-profile");
- let (_config, repo) = boot();
-
- let name = mtch.value_of("name").map(String::from).unwrap(); // safe by clap
- let keyname = format!("distrox-{}", name);
- let timestamp = Timestamp::from(::chrono::offset::Local::now().naive_local());
- let payload = Payload::Profile {
- names: vec![name],
- picture: None,
- more: BTreeMap::new(),
- };
- let profile = Content::new(vec![], Some(timestamp), payload);
-
- hyper::rt::run({
- // use ipfs defaults for lifetime and ttl
- repo.new_profile(keyname, profile, None, None)
- .map_err(|e| {
- error!("Error running: {:?}", e);
- print_error_details(e);
- exit(1)
- })
- .map(|(profile_name, profile_key)| {
- println!("{}, {}", profile_name.deref(), profile_key.deref());
- })
- });
- },
-
- ("post", Some(mtch)) => {
- use crate::repository::ProfileName;
-
- debug!("Calling: post");
- let (_config, repo) = boot();
- let repo = Arc::new(repo);
- let publish_key_name = mtch
- .value_of("profile-name")
- .map(String::from)
- .map(ProfileName::from)
- .unwrap(); // safe by clap
-
- let parent_block_hashes = mtch
- .values_of("parents")
- .unwrap() // safe by clap
- .map(String::from)
- .map(IPFSHash::from)
- .collect::<Vec<_>>();
-
- let text = mtch
- .value_of("text")
- .map(String::from)
- .unwrap(); // safe by clap
-
- let time = ::chrono::offset::Local::now().naive_local();
-
- let _repo2 = repo.clone();
-
- hyper::rt::run({
- repo.clone()
- .get_key_id_from_key_name(publish_key_name.clone())
- .and_then(move |key_id| {
- repo.new_text_post(key_id, parent_block_hashes, text, Some(time))
- })
- .map(|hash| {
- println!("{}", hash);
- })
- .map_err(|e| {
- error!("Error running: {:?}", e);
- print_error_details(e);
- exit(1)
- })
- });
- },
-
- ("publish", Some(mtch)) => {
- use crate::repository::ProfileName;
-
- debug!("Calling: publish");
- let (_config, repo) = boot();
- let repo = Arc::new(repo);
- let publish_key_name = mtch
- .value_of("profile_name")
- .map(String::from)
- .map(ProfileName::from)
- .unwrap(); // safe by clap
- let blockhash = mtch
- .value_of("blockhash")
- .map(String::from)
- .map(IPFSHash::from)
- .unwrap(); // safe by clap
-
- let _repo2 = repo.clone();
-
- hyper::rt::run({
- repo.clone()
- .get_key_id_from_key_name(publish_key_name)
- .and_then(move |publish_key_id| {
- repo.announce_block(publish_key_id, &blockhash, None, None)
- })
- .map_err(|e| {
- error!("Error running: {:?}", e);
- print_error_details(e);
- exit(1)
- })
- });
- },
-
- ("get-profile-state", Some(mtch)) => {
- use crate::repository::ProfileName;
-
- debug!("Calling: get-profile-state");
- let (_config, repo) = boot();
- let repo = Arc::new(repo);
- let publish_key_name = mtch
- .value_of("profile_name")
- .map(String::from)
- .map(ProfileName::from)
- .unwrap(); // safe by clap
-
- let repo2 = repo.clone();
-
- hyper::rt::run({
- repo.clone()
- .get_key_id_from_key_name(publish_key_name.clone())
- .and_then(move |key_id| {
- let key_id = key_id.into();
- repo2.deref_ipns_hash(&key_id)
- })
- .map(|hash| {
- println!("{}", hash);
- })
- .map_err(|e| {
- error!("Error running: {:?}", e);
- print_error_details(e);
- exit(1)
- })
- });
- },
-
(other, _mtch) => {
- error!("Unknown command: {}", other);
+ error!("Unknown or unimplemented command: {}", other);
exit(1)
}
}
diff --git a/src/repository/client.rs b/src/repository/client.rs
index db15d91..08ec15b 100644
--- a/src/repository/client.rs
+++ b/src/repository/client.rs
@@ -65,6 +65,12 @@ impl TypedClientFassade {
ClientFassade::new(host, port).map(TypedClientFassade)
}
+ pub fn get_raw_bytes<H>(&self, hash: H) -> impl Future<Item = Vec<u8>, Error = Error>
+ where H: AsRef<IPFSHash>
+ {
+ self.0.get(hash)
+ }
+
pub fn get<H, D>(&self, hash: H) -> impl Future<Item = D, Error = Error>
where H: AsRef<IPFSHash>,
D: DeserializeOwned
diff --git a/src/repository/iter/block.rs b/src/repository/iter/block.rs
deleted file mode 100644
index fb7e609..0000000
--- a/src/repository/iter/block.rs
+++ /dev/null
@@ -1,52 +0,0 @@
-use queues::Queue;
-use failure::Error;
-use futures::Future;
-
-use crate::repository::Repository;
-use crate::types::block::Block;
-
-/// An iterator that iterates over `Block`s using a `Repository`
-///
-pub struct BlockIterator {
- repo: Repository,
- queue: Queue<impl Future<Block, Error>>,
-}
-
-impl BlockIterator {
- pub fn new(repo: Repository, initial: IPFSHash) -> Self {
- Repository {
- queue: {
- let q = Queue::default();
- q.add(repo.get_block(initial));
- q
- },
- repo
- }
- }
-}
-
-impl Iterator for BlockIterator {
- type Item = Result<Block, Error>;
-
- fn next(&mut self) -> Option<Self::Item> {
- if let Ok(next_block) = self.queue.remove() {
- match next_block.wait() {
- Some(block) => {
- block.parents().iter().for_each(|parent| {
- self.queue.add({
- self.repo.get_block(parent)
- });
- })
-
- Some(block)
- },
-
- Err(e) => return Some(Err(e)),
- }
-
-
- } else {
- None
- }
- }
-}
diff --git a/src/repository/iter/mod.rs b/src/repository/iter/mod.rs
deleted file mode 100644
index a863eaa..0000000
--- a/src/repository/iter/mod.rs
+++ /dev/null
@@ -1 +0,0 @@
-pub mod block;
diff --git a/src/repository/mod.rs b/src/repository/mod.rs
index aee6a01..f837ecc 100644
--- a/src/repository/mod.rs
+++ b/src/repository/mod.rs
@@ -1,7 +1,6 @@
mod client;
mod repository;
mod profile;
-mod iter;
pub use repository::Repository;
pub use profile::ProfileName;
diff --git a/src/repository/profile.rs b/src/repository/profile.rs
index 991a0bf..8d9a12b 100644
--- a/src/repository/profile.rs
+++ b/src/repository/profile.rs
@@ -1,3 +1,13 @@
+use failure::Error;
+use futures::stream::{self, Stream};
+use futures::future;
+use futures::Future;
+
+use crate::types::util::IPNSHash;
+use crate::types::util::IPFSHash;
+use crate::types::block::Block;
+use crate::repository::Repository;
+
#[derive(Clone, Debug, Hash, PartialOrd, Ord, PartialEq, Eq)]
pub struct ProfileName(String);
@@ -10,9 +20,9 @@ impl From<String> for ProfileName {
/// A profile
///
/// A profile can be _any_ profile, not only the profile of the user
-#[derive(Debug)]
pub struct Profile {
repository: Repository,
+ head: IPFSHash,
}
impl Profile {
@@ -26,13 +36,24 @@ impl Profile {
}
/// Load a profile from the repository
- pub fn load(repository: Repository, key: Key) -> Result<Self, Error> {
+ pub fn load(repository: Repository, key: IPNSHash) -> Result<Self, Error> {
unimplemented!()
}
- pub fn blocks(&self) -> impl Iterator<Item = Result<Block, Error>> {
- use crate::repository::iter::block::BlockIterator;
- BlockIterator::new(&self.repository)
+ pub fn blocks(&self) -> impl Stream<Item = Block, Error = Error> {
+ let repo = self.repository.clone();
+ stream::unfold(vec![self.head.clone()], move |mut state| {
+ let repo = repo.clone();
+ state.pop()
+ .map(move |hash| {
+ repo.get_block(hash).map(|block| {
+ block.parents().iter().for_each(|parent| {
+ state.push(parent.clone())
+ });
+ (block, state)
+ })
+ })
+ })
}
}
@@ -42,7 +63,6 @@ impl Profile {
/// Internally this wraps the `Profile` type, but it provides more functionality, for example
/// posting new content.
///
-#[derive(Debug)]
pub struct UserProfile {
profile: Profile
}
@@ -60,7 +80,7 @@ impl UserProfile {
}
/// Load a profile from the repository
- pub fn load(repository: Repository, key: Key) -> Result<Self, Error> {
+ pub fn load(repository: Repository, key: IPNSHash) -> Result<Self, Error> {
Ok(UserProfile {
profile: Profile::load(repository, key)?,
})
diff --git a/src/repository/repository.rs b/src/repository/repository.rs
index 5ed1244..7386ba4 100644
--- a/src/repository/repository.rs
+++ b/src/repository/repository.rs
@@ -36,6 +36,12 @@ impl Repository {
TypedClientFassade::new(host, port).map(Repository)
}
+ pub fn get_raw_bytes<H>(&self, hash: H) -> impl Future<Item = Vec<u8>, Error = Error>
+ where H: AsRef<IPFSHash>
+ {
+ self.0.get_raw_bytes(hash)
+ }
+
pub fn get_block<H>(&self, hash: H) -> impl Future<Item = Block, Error = Error>
where H: AsRef<IPFSHash>
{