diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2019-06-15 18:19:26 +0200 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2019-06-15 18:22:05 +0200 |
commit | d31967d74840877b1233350a09ffc427e16d9cfa (patch) | |
tree | 1d72646d506807cf1e007cc9d6a9ed0b21da64c3 /src/repository/client.rs | |
parent | ec57a3cc0ee58383838f8957f19d9cab4d375f12 (diff) |
Rewrite repository
Diffstat (limited to 'src/repository/client.rs')
-rw-r--r-- | src/repository/client.rs | 354 |
1 files changed, 71 insertions, 283 deletions
diff --git a/src/repository/client.rs b/src/repository/client.rs index 005a33c..db15d91 100644 --- a/src/repository/client.rs +++ b/src/repository/client.rs @@ -3,303 +3,91 @@ use std::sync::Arc; use std::ops::Deref; use ipfs_api::IpfsClient; -use ipfs_api::KeyType; use failure::Error; use failure::err_msg; use futures::future::Future; use futures::stream::Stream; + use serde_json::from_str as serde_json_from_str; use serde_json::to_string as serde_json_to_str; +use serde::Serialize; +use serde::de::DeserializeOwned; use chrono::NaiveDateTime; -use itertools::Itertools; use crate::types::block::Block; use crate::types::content::Content; use crate::types::content::Payload; use crate::types::util::IPFSHash; use crate::types::util::IPNSHash; -use crate::types::util::Timestamp; -use crate::repository::{ProfileName, ProfileKey}; -use crate::version::protocol_version; - -pub fn deref_ipns_hash(client: Arc<IpfsClient>, - hash: &IPNSHash) - -> impl Future<Item = IPFSHash, Error = Error> -{ - client - .name_resolve(Some(hash.deref()), false, false) - .map_err(Error::from) - .map(|resp| IPFSHash::from(resp.path)) -} - -pub fn resolve_plain(client: Arc<IpfsClient>, hash: &IPFSHash) - -> impl Future<Item = Vec<u8>, Error = Error> -{ - client - .cat(hash) - .concat2() - .map_err(Error::from) - .map(|blob| blob.to_vec()) -} - -pub fn resolve_block(client: Arc<IpfsClient>, hash: &IPFSHash) - -> impl Future<Item = Block, Error = Error> -{ - client - .cat(hash) - .concat2() - .map_err(Error::from) - .and_then(|block| { - debug!("Got Block data, building Block object"); - String::from_utf8(block.to_vec()) - .map_err(Error::from) - .and_then(|s| serde_json_from_str(&s).map_err(Error::from)) - }) -} - -pub fn get_key_id_from_key_name(client: Arc<IpfsClient>, name: ProfileName) - -> impl Future<Item = ProfileKey, Error = Error> -{ - client.key_list() - .map_err(Error::from) - .and_then(move |list| { - list.keys - .into_iter() - .filter(|pair| pair.name == *name.deref()) - .next() - .map(|pair| ProfileKey::from(pair.id)) - .ok_or_else(|| err_msg("No Key")) - }) -} -pub fn resolve_latest_block(client: Arc<IpfsClient>, hash: &IPNSHash) - -> impl Future<Item = Block, Error = Error> -{ - deref_ipns_hash(client.clone(), hash) - .map_err(Error::from) - .and_then(|ipfs_hash| resolve_block(client, &ipfs_hash)) -} - -pub fn resolve_content(client: Arc<IpfsClient>, hash: &IPFSHash) - -> impl Future<Item = Content, Error = Error> -{ - client - .cat(hash) - .concat2() - .map_err(Error::from) - .and_then(|content| { - debug!("Got Content data, building Content object"); +/// Internal ClientFassade types +/// +/// Abstracts the procedural interface of IpfsClient calls. +#[derive(Clone)] +struct ClientFassade(Arc<IpfsClient>); + +impl ClientFassade { + fn new(host: &str, port: u16) -> Result<ClientFassade, Error> { + debug!("Creating new ClientFassade object: {}:{}", host, port); + IpfsClient::new(host, port) + .map(Arc::new) + .map(|c| ClientFassade(c)) + .map_err(Into::into) + } + + fn get<H: AsRef<IPFSHash>>(&self, hash: H) -> impl Future<Item = Vec<u8>, Error = Error> { + debug!("Get: {}", hash.as_ref()); + self.0 + .clone() + .cat(hash.as_ref()) + .concat2() + .map_err(Error::from) + .map(|blob| blob.to_vec()) + } + + fn put(&self, data: Vec<u8>) -> impl Future<Item = IPFSHash, Error = Error> { + debug!("Put: {:?}", data); + self.0 + .clone() + .add(Cursor::new(data)) + .map(|res| IPFSHash::from(res.hash)) + .map_err(Into::into) + } +} + +/// Client wrapper for working with types directly on the client +#[derive(Clone)] +pub struct TypedClientFassade(ClientFassade); + +impl TypedClientFassade { + pub fn new(host: &str, port: u16) -> Result<TypedClientFassade, Error> { + ClientFassade::new(host, port).map(TypedClientFassade) + } + + pub fn get<H, D>(&self, hash: H) -> impl Future<Item = D, Error = Error> + where H: AsRef<IPFSHash>, + D: DeserializeOwned + { + self.0 + .clone() + .get(hash) + .and_then(|data| { + debug!("Got data, building object: {:?}", data); + + serde_json::from_slice(&data).map_err(Error::from) + }) + } + + pub fn put<S, Ser>(&self, data: &S) -> impl Future<Item = IPFSHash, Error = Error> + where S: AsRef<Ser>, + Ser: Serialize + { + let client = self.0.clone(); + + ::futures::future::result(serde_json_to_str(data.as_ref())) + .map_err(Into::into) + .and_then(move |d| client.put(d.into_bytes())) + } - String::from_utf8(content.to_vec()) - .map_err(Error::from) - .and_then(|s| serde_json_from_str(&s).map_err(Error::from)) - }) } - -pub fn resolve_content_none(client: Arc<IpfsClient>, hash: &IPFSHash) - -> impl Future<Item = Content, Error = Error> -{ - resolve_content(client, hash).and_then(|content| { - debug!("Got Content object, checking whether it is None"); - match content.payload() { - &Payload::None => Ok(content), - _ => Err(err_msg("Content is not None")), - } - }) -} - -pub fn resolve_content_post(client: Arc<IpfsClient>, hash: &IPFSHash) - -> impl Future<Item = Content, Error = Error> -{ - resolve_content(client, hash) - .and_then(|content| { - debug!("Got Content object, checking whether it is Post"); - match content.payload() { - &Payload::Post {..} => Ok(content), - _ => Err(err_msg("Content is not a Post")), - } - }) -} - -pub fn resolve_content_attached_post_comments(client: Arc<IpfsClient>, hash: &IPFSHash) - -> impl Future<Item = Content, Error = Error> -{ - resolve_content(client, hash) - .and_then(|content| { - debug!("Got Content object, checking whether it is AttachedPostComments"); - match content.payload() { - &Payload::AttachedPostComments {..} => Ok(content), - _ => Err(err_msg("Content is not AttachedPostComments")), - } - }) -} - -pub fn resolve_content_profile(client: Arc<IpfsClient>, hash: &IPFSHash) - -> impl Future<Item = Content, Error = Error> -{ - resolve_content(client, hash) - .and_then(|content| { - debug!("Got Content object, checking whether it is Profile"); - match content.payload() { - &Payload::Profile {..} => Ok(content), - _ => Err(err_msg("Content is not a Profile")), - } - }) -} - -pub fn announce_block(client: Arc<IpfsClient>, - key: ProfileKey, - state: &IPFSHash, - lifetime: Option<String>, - ttl: Option<String>) - -> impl Future<Item = (), Error = Error> -{ - let name = format!("/ipfs/{}", state); - - resolve_block(client.clone(), state) - .and_then(move |_| { - debug!("Publishing block."); - client.name_publish(&name, - false, - lifetime.as_ref().map(String::deref), - ttl.as_ref().map(String::deref), - Some(&key)) - .map_err(From::from) - .map(|_| ()) - }) -} - - -pub fn put_plain(client: Arc<IpfsClient>, data: Vec<u8>) - -> impl Future<Item = IPFSHash, Error = Error> -{ - client - .add(Cursor::new(data)) - .map(|res| IPFSHash::from(res.hash)) - .map_err(Into::into) -} - -pub fn put_block(client: Arc<IpfsClient>, block: &Block) - -> impl Future<Item = IPFSHash, Error = Error> -{ - let data = serde_json_to_str(block); - - ::futures::future::result(data) - .map_err(Into::into) - .and_then(move |data| put_plain(client, data.into_bytes())) -} - -pub fn put_content(client: Arc<IpfsClient>, content: &Content) - -> impl Future<Item = IPFSHash, Error = Error> -{ - let data = serde_json_to_str(content); - ::futures::future::result(data) - .map_err(Into::into) - .and_then(move |data| put_plain(client, data.into_bytes())) -} - -pub fn new_profile(client: Arc<IpfsClient>, - keyname: String, - profile: Content, - lifetime: Option<String>, - ttl: Option<String>) - -> impl Future<Item = (ProfileName, ProfileKey), Error = Error> -{ - let client1 = client.clone(); - let client2 = client.clone(); - let client3 = client.clone(); - - client - .key_gen(&keyname, KeyType::Rsa, 4096) - .map_err(Error::from) - .map(|kp| (kp.name, kp.id)) - .and_then(move |(key_name, key_id)| { // put the content into IPFS - let mut prof = profile; - prof.push_device(IPNSHash::from(key_id.clone())); - - put_content(client1, &prof) - .map(move |content_hash| (content_hash, key_name, key_id)) - .map_err(Error::from) - }) - .map(|(content_hash, key_name, key_id)| { - let block = Block::new(protocol_version(), - vec![], // no parents for new profile - content_hash); - - (block, key_name, key_id) - }) - .and_then(move |(block, key_name, key_id)| { // put the content into a new block - put_block(client2, &block) - .map(|block_hash| (block_hash, key_name, key_id)) - .map_err(Error::from) - }) - .map(|(block_hash, key_name, key_id)| { - (format!("/ipfs/{}", block_hash), key_name, key_id) - }) - .and_then(move |(path, key_name, key_id)| { - client3 - .name_publish(&path, - false, - lifetime.as_ref().map(String::deref), - ttl.as_ref().map(String::deref), - Some(&key_name)) - .map(|_publish_response| { - (ProfileName(key_name), ProfileKey::from(key_id)) - }) - .map_err(Error::from) - }) -} - -pub fn new_text_post(client: Arc<IpfsClient>, - _publish_key_id: ProfileKey, - parent_blocks: Vec<IPFSHash>, - text: String, - time: Option<NaiveDateTime>) - -> impl Future<Item = IPFSHash, Error = Error> -{ - let client3 = client.clone(); - let client4 = client.clone(); - let client5 = client.clone(); - - let iterator = parent_blocks - .clone() - .into_iter() - .map(move |parent_block| { - let client1 = client.clone(); - let client2 = client.clone(); - resolve_block(client1, &parent_block) - .and_then(move |block| { - resolve_content(client2, block.content()) - }) - .map(|content| content.devices().to_vec()) - }); - - ::futures::future::join_all(iterator) - .and_then(move |devices| { - let devices = Iterator::flatten(devices.into_iter()).unique().collect(); - - put_plain(client3, text.into_bytes()) - .and_then(move |content_hash| { - let post = Payload::Post { - content_format: ::mime::TEXT_PLAIN.into(), - content: content_hash, - reply_to: None, - - comments_will_be_propagated: None, - comments_propagated_until: None, - }; - - let ts = time.map(Timestamp::from); - let content_obj = Content::new(devices, ts, post); - - put_content(client4, &content_obj) - }) - }) - .and_then(move |content_obj_hash| { - let block = Block::new(protocol_version(), parent_blocks, content_obj_hash); - put_block(client5, &block) - }) -} - - |