diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2019-06-15 18:19:26 +0200 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2019-06-15 18:22:05 +0200 |
commit | d31967d74840877b1233350a09ffc427e16d9cfa (patch) | |
tree | 1d72646d506807cf1e007cc9d6a9ed0b21da64c3 | |
parent | ec57a3cc0ee58383838f8957f19d9cab4d375f12 (diff) |
Rewrite repository
-rw-r--r-- | src/repository/client.rs | 354 | ||||
-rw-r--r-- | src/repository/iter.rs | 41 | ||||
-rw-r--r-- | src/repository/mod.rs | 267 | ||||
-rw-r--r-- | src/repository/profile.rs | 65 | ||||
-rw-r--r-- | src/repository/repository.rs | 33 |
5 files changed, 106 insertions, 654 deletions
diff --git a/src/repository/client.rs b/src/repository/client.rs index 005a33c..db15d91 100644 --- a/src/repository/client.rs +++ b/src/repository/client.rs @@ -3,303 +3,91 @@ use std::sync::Arc; use std::ops::Deref; use ipfs_api::IpfsClient; -use ipfs_api::KeyType; use failure::Error; use failure::err_msg; use futures::future::Future; use futures::stream::Stream; + use serde_json::from_str as serde_json_from_str; use serde_json::to_string as serde_json_to_str; +use serde::Serialize; +use serde::de::DeserializeOwned; use chrono::NaiveDateTime; -use itertools::Itertools; use crate::types::block::Block; use crate::types::content::Content; use crate::types::content::Payload; use crate::types::util::IPFSHash; use crate::types::util::IPNSHash; -use crate::types::util::Timestamp; -use crate::repository::{ProfileName, ProfileKey}; -use crate::version::protocol_version; - -pub fn deref_ipns_hash(client: Arc<IpfsClient>, - hash: &IPNSHash) - -> impl Future<Item = IPFSHash, Error = Error> -{ - client - .name_resolve(Some(hash.deref()), false, false) - .map_err(Error::from) - .map(|resp| IPFSHash::from(resp.path)) -} - -pub fn resolve_plain(client: Arc<IpfsClient>, hash: &IPFSHash) - -> impl Future<Item = Vec<u8>, Error = Error> -{ - client - .cat(hash) - .concat2() - .map_err(Error::from) - .map(|blob| blob.to_vec()) -} - -pub fn resolve_block(client: Arc<IpfsClient>, hash: &IPFSHash) - -> impl Future<Item = Block, Error = Error> -{ - client - .cat(hash) - .concat2() - .map_err(Error::from) - .and_then(|block| { - debug!("Got Block data, building Block object"); - String::from_utf8(block.to_vec()) - .map_err(Error::from) - .and_then(|s| serde_json_from_str(&s).map_err(Error::from)) - }) -} - -pub fn get_key_id_from_key_name(client: Arc<IpfsClient>, name: ProfileName) - -> impl Future<Item = ProfileKey, Error = Error> -{ - client.key_list() - .map_err(Error::from) - .and_then(move |list| { - list.keys - .into_iter() - .filter(|pair| pair.name == *name.deref()) - .next() - .map(|pair| ProfileKey::from(pair.id)) - .ok_or_else(|| err_msg("No Key")) - }) -} -pub fn resolve_latest_block(client: Arc<IpfsClient>, hash: &IPNSHash) - -> impl Future<Item = Block, Error = Error> -{ - deref_ipns_hash(client.clone(), hash) - .map_err(Error::from) - .and_then(|ipfs_hash| resolve_block(client, &ipfs_hash)) -} - -pub fn resolve_content(client: Arc<IpfsClient>, hash: &IPFSHash) - -> impl Future<Item = Content, Error = Error> -{ - client - .cat(hash) - .concat2() - .map_err(Error::from) - .and_then(|content| { - debug!("Got Content data, building Content object"); +/// Internal ClientFassade types +/// +/// Abstracts the procedural interface of IpfsClient calls. +#[derive(Clone)] +struct ClientFassade(Arc<IpfsClient>); + +impl ClientFassade { + fn new(host: &str, port: u16) -> Result<ClientFassade, Error> { + debug!("Creating new ClientFassade object: {}:{}", host, port); + IpfsClient::new(host, port) + .map(Arc::new) + .map(|c| ClientFassade(c)) + .map_err(Into::into) + } + + fn get<H: AsRef<IPFSHash>>(&self, hash: H) -> impl Future<Item = Vec<u8>, Error = Error> { + debug!("Get: {}", hash.as_ref()); + self.0 + .clone() + .cat(hash.as_ref()) + .concat2() + .map_err(Error::from) + .map(|blob| blob.to_vec()) + } + + fn put(&self, data: Vec<u8>) -> impl Future<Item = IPFSHash, Error = Error> { + debug!("Put: {:?}", data); + self.0 + .clone() + .add(Cursor::new(data)) + .map(|res| IPFSHash::from(res.hash)) + .map_err(Into::into) + } +} + +/// Client wrapper for working with types directly on the client +#[derive(Clone)] +pub struct TypedClientFassade(ClientFassade); + +impl TypedClientFassade { + pub fn new(host: &str, port: u16) -> Result<TypedClientFassade, Error> { + ClientFassade::new(host, port).map(TypedClientFassade) + } + + pub fn get<H, D>(&self, hash: H) -> impl Future<Item = D, Error = Error> + where H: AsRef<IPFSHash>, + D: DeserializeOwned + { + self.0 + .clone() + .get(hash) + .and_then(|data| { + debug!("Got data, building object: {:?}", data); + + serde_json::from_slice(&data).map_err(Error::from) + }) + } + + pub fn put<S, Ser>(&self, data: &S) -> impl Future<Item = IPFSHash, Error = Error> + where S: AsRef<Ser>, + Ser: Serialize + { + let client = self.0.clone(); + + ::futures::future::result(serde_json_to_str(data.as_ref())) + .map_err(Into::into) + .and_then(move |d| client.put(d.into_bytes())) + } - String::from_utf8(content.to_vec()) - .map_err(Error::from) - .and_then(|s| serde_json_from_str(&s).map_err(Error::from)) - }) } - -pub fn resolve_content_none(client: Arc<IpfsClient>, hash: &IPFSHash) - -> impl Future<Item = Content, Error = Error> -{ - resolve_content(client, hash).and_then(|content| { - debug!("Got Content object, checking whether it is None"); - match content.payload() { - &Payload::None => Ok(content), - _ => Err(err_msg("Content is not None")), - } - }) -} - -pub fn resolve_content_post(client: Arc<IpfsClient>, hash: &IPFSHash) - -> impl Future<Item = Content, Error = Error> -{ - resolve_content(client, hash) - .and_then(|content| { - debug!("Got Content object, checking whether it is Post"); - match content.payload() { - &Payload::Post {..} => Ok(content), - _ => Err(err_msg("Content is not a Post")), - } - }) -} - -pub fn resolve_content_attached_post_comments(client: Arc<IpfsClient>, hash: &IPFSHash) - -> impl Future<Item = Content, Error = Error> -{ - resolve_content(client, hash) - .and_then(|content| { - debug!("Got Content object, checking whether it is AttachedPostComments"); - match content.payload() { - &Payload::AttachedPostComments {..} => Ok(content), - _ => Err(err_msg("Content is not AttachedPostComments")), - } - }) -} - -pub fn resolve_content_profile(client: Arc<IpfsClient>, hash: &IPFSHash) - -> impl Future<Item = Content, Error = Error> -{ - resolve_content(client, hash) - .and_then(|content| { - debug!("Got Content object, checking whether it is Profile"); - match content.payload() { - &Payload::Profile {..} => Ok(content), - _ => Err(err_msg("Content is not a Profile")), - } - }) -} - -pub fn announce_block(client: Arc<IpfsClient>, - key: ProfileKey, - state: &IPFSHash, - lifetime: Option<String>, - ttl: Option<String>) - -> impl Future<Item = (), Error = Error> -{ - let name = format!("/ipfs/{}", state); - - resolve_block(client.clone(), state) - .and_then(move |_| { - debug!("Publishing block."); - client.name_publish(&name, - false, - lifetime.as_ref().map(String::deref), - ttl.as_ref().map(String::deref), - Some(&key)) - .map_err(From::from) - .map(|_| ()) - }) -} - - -pub fn put_plain(client: Arc<IpfsClient>, data: Vec<u8>) - -> impl Future<Item = IPFSHash, Error = Error> -{ - client - .add(Cursor::new(data)) - .map(|res| IPFSHash::from(res.hash)) - .map_err(Into::into) -} - -pub fn put_block(client: Arc<IpfsClient>, block: &Block) - -> impl Future<Item = IPFSHash, Error = Error> -{ - let data = serde_json_to_str(block); - - ::futures::future::result(data) - .map_err(Into::into) - .and_then(move |data| put_plain(client, data.into_bytes())) -} - -pub fn put_content(client: Arc<IpfsClient>, content: &Content) - -> impl Future<Item = IPFSHash, Error = Error> -{ - let data = serde_json_to_str(content); - ::futures::future::result(data) - .map_err(Into::into) - .and_then(move |data| put_plain(client, data.into_bytes())) -} - -pub fn new_profile(client: Arc<IpfsClient>, - keyname: String, - profile: Content, - lifetime: Option<String>, - ttl: Option<String>) - -> impl Future<Item = (ProfileName, ProfileKey), Error = Error> -{ - let client1 = client.clone(); - let client2 = client.clone(); - let client3 = client.clone(); - - client - .key_gen(&keyname, KeyType::Rsa, 4096) - .map_err(Error::from) - .map(|kp| (kp.name, kp.id)) - .and_then(move |(key_name, key_id)| { // put the content into IPFS - let mut prof = profile; - prof.push_device(IPNSHash::from(key_id.clone())); - - put_content(client1, &prof) - .map(move |content_hash| (content_hash, key_name, key_id)) - .map_err(Error::from) - }) - .map(|(content_hash, key_name, key_id)| { - let block = Block::new(protocol_version(), - vec![], // no parents for new profile - content_hash); - - (block, key_name, key_id) - }) - .and_then(move |(block, key_name, key_id)| { // put the content into a new block - put_block(client2, &block) - .map(|block_hash| (block_hash, key_name, key_id)) - .map_err(Error::from) - }) - .map(|(block_hash, key_name, key_id)| { - (format!("/ipfs/{}", block_hash), key_name, key_id) - }) - .and_then(move |(path, key_name, key_id)| { - client3 - .name_publish(&path, - false, - lifetime.as_ref().map(String::deref), - ttl.as_ref().map(String::deref), - Some(&key_name)) - .map(|_publish_response| { - (ProfileName(key_name), ProfileKey::from(key_id)) - }) - .map_err(Error::from) - }) -} - -pub fn new_text_post(client: Arc<IpfsClient>, - _publish_key_id: ProfileKey, - parent_blocks: Vec<IPFSHash>, - text: String, - time: Option<NaiveDateTime>) - -> impl Future<Item = IPFSHash, Error = Error> -{ - let client3 = client.clone(); - let client4 = client.clone(); - let client5 = client.clone(); - - let iterator = parent_blocks - .clone() - .into_iter() - .map(move |parent_block| { - let client1 = client.clone(); - let client2 = client.clone(); - resolve_block(client1, &parent_block) - .and_then(move |block| { - resolve_content(client2, block.content()) - }) - .map(|content| content.devices().to_vec()) - }); - - ::futures::future::join_all(iterator) - .and_then(move |devices| { - let devices = Iterator::flatten(devices.into_iter()).unique().collect(); - - put_plain(client3, text.into_bytes()) - .and_then(move |content_hash| { - let post = Payload::Post { - content_format: ::mime::TEXT_PLAIN.into(), - content: content_hash, - reply_to: None, - - comments_will_be_propagated: None, - comments_propagated_until: None, - }; - - let ts = time.map(Timestamp::from); - let content_obj = Content::new(devices, ts, post); - - put_content(client4, &content_obj) - }) - }) - .and_then(move |content_obj_hash| { - let block = Block::new(protocol_version(), parent_blocks, content_obj_hash); - put_block(client5, &block) - }) -} - - diff --git a/src/repository/iter.rs b/src/repository/iter.rs deleted file mode 100644 index 0606769..0000000 --- a/src/repository/iter.rs +++ /dev/null @@ -1,41 +0,0 @@ -use std::collcetions::VecDeque; - -use failure::Error; - -use types::block::Block; -use types::util::IPFSHash; -use repository::Repository; - -/// An iterator for iterating over a chain of blocks -/// -pub struct BlockIter<'a> { - repository: &'a Repository, - queue: VecDeque<Future<Item = IPFSHash, Error = Error>>, -} - -impl<'a> BlockIter<'a> { - pub fn new(repository: &'a Repository, head: IPFSHash) -> BlockIter<'a> { - let mut queue = VecDeque::new(); - queue.push_back(head); - BlockIter { repository, queue } - } -} - -impl<'a> Iterator for BlockIter<'a> { - type Item = Future<Item = Block, Error = Error>; - - fn next(&mut self) -> Option<Self::Item> { - while let Some(next) = self.queue.pop_front() { - self.repository - .resolve_block(&next) - .then(|block| { - self.queue.extend(block.parents().iter().cloned()); - block - }) - } - - None - } - -} - diff --git a/src/repository/mod.rs b/src/repository/mod.rs index b1fc2f3..d2e5dd3 100644 --- a/src/repository/mod.rs +++ b/src/repository/mod.rs @@ -1,268 +1,5 @@ -//! TODO: Finalize, this is only an idea -//! - -// pub mod iter; // TODO: Implement. Complicated stuff though! -pub mod profile; - -use std::io::Cursor; -use std::sync::Arc; -use std::ops::Deref; - -use ipfs_api::IpfsClient; -use failure::Error; -use failure::err_msg; -use futures::future::Future; -use futures::stream::Stream; - -use serde_json::to_string as serde_json_to_str; -use serde::Serialize; -use chrono::NaiveDateTime; - -use crate::types::block::Block; -use crate::types::content::Content; -use crate::types::content::Payload; -use crate::types::util::IPFSHash; -use crate::types::util::IPNSHash; - - -// use repository::iter::BlockIter; -// use repository::profile::Profile; - mod client; +mod repository; -pub struct Repository { - client: Arc<IpfsClient>, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ProfileName(String); - -impl Deref for ProfileName { - type Target = String; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl From<String> for ProfileName { - fn from(s: String) -> Self { - ProfileName(s) - } -} - -pub type ProfileKey = IPNSHash; - - -impl Repository { - - pub fn new(host: &str, port: u16) -> Result<Repository, Error> { - debug!("Creating new Repository object: {}:{}", host, port); - IpfsClient::new(host, port) - .map(|c| Repository { client: Arc::new(c) }) - .map_err(Into::into) - } - - // pub fn get_profile<H: AsRef<IPNSHash>>(&self, hash: H, chain_depth: usize) - // -> Result<Profile, Error> - // { - // BlockIter::new(self, (*hash.as_ref()).clone()) - // .take(chain_depth) - // .collect::<Result<LinkedList<Block, Error>>>() - // .map(Profile::new) - // } - - pub fn resolve_plain(&self, hash: &IPFSHash) - -> impl Future<Item = Vec<u8>, Error = Error> - { - debug!("Resolving plain: {}", hash); - crate::repository::client::resolve_plain(self.client.clone(), hash) - } - - /// Gets a types::Block from a hash or fails - pub fn resolve_block(&self, hash: &IPFSHash) - -> impl Future<Item = Block, Error = Error> - { - debug!("Resolving block: {}", hash); - crate::repository::client::resolve_block(self.client.clone(), hash) - } - - pub fn resolve_latest_block(&self, hash: &IPNSHash) - -> impl Future<Item = Block, Error = Error> - { - debug!("Resolving latest block: {}", hash); - crate::repository::client::resolve_latest_block(self.client.clone(), hash) - } - - /// Gets a types::Content from a hash or fails - pub fn resolve_content(&self, hash: &IPFSHash) - -> impl Future<Item = Content, Error = Error> - { - debug!("Resolving content: {}", hash); - crate::repository::client::resolve_content(self.client.clone(), hash) - } - - /// Helper over Self::resolve_content() which ensures that the Content payload is None - #[inline] - pub fn resolve_content_none(&self, hash: &IPFSHash) - -> impl Future<Item = Content, Error = Error> - { - debug!("Resolving content (none): {}", hash); - crate::repository::client::resolve_content_none(self.client.clone(), hash) - } - - /// Helper over Self::resolve_content() which ensures that the Content payload is Post - #[inline] - pub fn resolve_content_post(&self, hash: &IPFSHash) - -> impl Future<Item = Content, Error = Error> - { - debug!("Resolving content (post): {}", hash); - crate::repository::client::resolve_content_post(self.client.clone(), hash) - } - - /// Helper over Self::resolve_content() which ensures that the Content payload is AttachedPostComments - #[inline] - pub fn resolve_content_attached_post_comments(&self, hash: &IPFSHash) - -> impl Future<Item = Content, Error = Error> - { - debug!("Resolving content (attached post comments): {}", hash); - crate::repository::client::resolve_content_attached_post_comments(self.client.clone(), hash) - } - - /// Helper over Self::resolve_content() which ensures that the Content payload is Profile - #[inline] - pub fn resolve_content_profile(&self, hash: &IPFSHash) - -> impl Future<Item = Content, Error = Error> - { - debug!("Resolving content (profile): {}", hash); - crate::repository::client::resolve_content_profile(self.client.clone(), hash) - } - - - // - // PUT - // - - pub fn put_plain(&self, data: Vec<u8>) - -> impl Future<Item = IPFSHash, Error = Error> - { - debug!("Putting plain"); - crate::repository::client::put_plain(self.client.clone(), data) - } - - fn put_serialized<'a, S>(&'a self, s: &'a S) - -> impl Future<Item = IPFSHash, Error = Error> - where S: Serialize - { - debug!("Putting serializable object"); - let client = self.client.clone(); - let data = serde_json_to_str(&s); - - ::futures::future::result(data) - .map_err(Into::into) - .and_then(move |data| { - client - .add(Cursor::new(data)) - .map(|res| IPFSHash::from(res.hash)) - .map_err(Into::into) - }) - } - - pub fn put_block<'a>(&'a self, block: &'a Block) - -> impl Future<Item = IPFSHash, Error = Error> - { - debug!("Putting block: {:?}", block); - crate::repository::client::put_block(self.client.clone(), block) - } - - pub fn put_content<'a>(&'a self, content: &'a Content) - -> impl Future<Item = IPFSHash, Error = Error> - { - debug!("Putting content: {:?}", content); - crate::repository::client::put_content(self.client.clone(), content) - } - - /// The default lifetime for name publishing (profile announcements) - /// - /// 10 minutes - pub fn profile_announce_default_lifetime() -> &'static str { - "10m" - } - - /// The default TTL for name publishing (profile announcements) - /// - /// 10 minutes - pub fn profile_announce_default_ttl() -> &'static str { - "10m" - } - - /// Announce a block as current - /// - /// Block identified by IPFS hash. - /// - /// Lifetime and TTL are _not_ set to the default in the implementation of this function, but - /// the IPFS defaults apply (set by the IPFS daemon) - /// - pub fn announce_block<'a>(&'a self, - key: ProfileKey, - state: &IPFSHash, - lifetime: Option<String>, - ttl: Option<String>) - -> impl Future<Item = (), Error = Error> - { - debug!("Announcing profile: key: {key:?}, state: {state:?}, lifetime: {lifetime:?}, ttl: {ttl:?}", - key = key, state = state, lifetime = lifetime, ttl = ttl); - crate::repository::client::announce_block(self.client.clone(), key, state, lifetime, ttl) - } - - pub fn new_profile<'a>(&'a self, - keyname: String, - profile: Content, - lifetime: Option<String>, - ttl: Option<String>) - -> impl Future<Item = (ProfileName, ProfileKey), Error = Error> - { - - - debug!("Creating new profile: key: {key:?}, profile: {profile:?}, lifetime: {lifetime:?}, ttl: {ttl:?}", - key = keyname, profile = profile, lifetime = lifetime, ttl = ttl); - - if !is_match!(profile.payload(), Payload::Profile { .. }) { - let out = ::futures::future::err(err_msg(format!("Not a Profile: {:?}", profile))); - return ::futures::future::Either::B(out) - } - - let client = self.client.clone(); - let result = crate::repository::client::new_profile(client, keyname, profile, lifetime, ttl); - - ::futures::future::Either::A(result) - } - - pub fn new_text_post<'a>(&'a self, - publish_key_id: ProfileKey, - parent_blocks: Vec<IPFSHash>, - text: String, - time: Option<NaiveDateTime>) - -> impl Future<Item = IPFSHash, Error = Error> - { - debug!("New text post under {:?}, after blocks {:?}", publish_key_id, parent_blocks); - crate::repository::client::new_text_post(self.client.clone(), - publish_key_id, - parent_blocks, - text, - time) - } - - pub fn get_key_id_from_key_name<'a>(&'a self, name: ProfileName) - -> impl Future<Item = ProfileKey, Error = Error> - { - crate::repository::client::get_key_id_from_key_name(self.client.clone(), name) - } - - pub fn deref_ipns_hash<'a>(&'a self, hash: &IPNSHash) - -> impl Future<Item = IPFSHash, Error = Error> - { - crate::repository::client::deref_ipns_hash(self.client.clone(), hash) - } +pub use repository::Repository; -} diff --git a/src/repository/profile.rs b/src/repository/profile.rs deleted file mode 100644 index 9d27280..0000000 --- a/src/repository/profile.rs +++ /dev/null @@ -1,65 +0,0 @@ -use std::collections::LinkedList; - -//use futures::future::Future; -//use failure::Error; - -// use repository::Repository; -use crate::types::block::Block; -// use types::content::Payload; -// use types::content::Content; -// use types::util::IPFSHash; - -pub struct Profile { - chain: LinkedList<Block>, - - // Accumulated - - // not yet, we do not do in-memory caching in the first prototype -} - -impl Profile { - pub fn new(chain: LinkedList<Block>) -> Self { - Profile { chain } - } - - //pub fn find_current_profile_information(&self, repo: &Repository) - // -> Option<impl Future<Item = Content, Error = Error>> - //{ - // self.chain - // .iter() - // .map(|obj| repo.resolve_content_profile(obj.content())) - // .next() - //} - - //pub fn posts(&self) -> impl Iterator<Item = &Payload> + Sized { - // self.chain - // .iter() - // .map(Block::content) - // .map(Content::payload) - // .filter(|pl| is_match!(pl, Payload::Post(..))) - //} - - //pub fn comments_on_post<'a, H: AsRef<IPFSHash>>(&self, post: H, repo: &Repository) - // -> impl Iterator<Item = Future<Item = Content, Error = Error> + Sized> - //{ - // self.chain - // .iter() - // .map(|obj| repo.resolve_content_attached_post_comments(obj.content())) - // .filter_map(|cmts| { - // cmts.map(|c| { - // match c.payload() { - // &Payload::AttachedPostComments { - // ref comments_for, - // ref refs - // } => if comments_for == post.as_ref() { - // Some(refs) - // } else { - // None - // }, - // _ => None - // } - // }) - // }) - //} - -} diff --git a/src/repository/repository.rs b/src/repository/repository.rs new file mode 100644 index 0000000..7381176 --- /dev/null +++ b/src/repository/repository.rs @@ -0,0 +1,33 @@ +use std::io::Cursor; +use std::sync::Arc; +use std::ops::Deref; + +use ipfs_api::IpfsClient; +use failure::Error; +use failure::err_msg; +use futures::future::Future; +use futures::stream::Stream; + +use serde_json::from_str as serde_json_from_str; +use serde_json::to_string as serde_json_to_str; +use serde::Serialize; +use serde::de::DeserializeOwned; +use chrono::NaiveDateTime; + +use crate::types::block::Block; +use crate::types::content::Content; +use crate::types::content::Payload; +use crate::types::util::IPFSHash; +use crate::types::util::IPNSHash; + + +/// High-level Client abstraction +#[derive(Clone)] +pub struct Repository(TypedClientFassade); + +impl Repository { + pub fn new(host: &str, port: u16) -> Result<Repository, Error> { + TypedClientFassade::new(host, port).map(Repository) + } + +} |