summaryrefslogtreecommitdiffstats
path: root/src/repository/client.rs
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2019-06-15 18:19:26 +0200
committerMatthias Beyer <mail@beyermatthias.de>2019-06-15 18:22:05 +0200
commitd31967d74840877b1233350a09ffc427e16d9cfa (patch)
tree1d72646d506807cf1e007cc9d6a9ed0b21da64c3 /src/repository/client.rs
parentec57a3cc0ee58383838f8957f19d9cab4d375f12 (diff)
Rewrite repository
Diffstat (limited to 'src/repository/client.rs')
-rw-r--r--src/repository/client.rs354
1 files changed, 71 insertions, 283 deletions
diff --git a/src/repository/client.rs b/src/repository/client.rs
index 005a33c..db15d91 100644
--- a/src/repository/client.rs
+++ b/src/repository/client.rs
@@ -3,303 +3,91 @@ use std::sync::Arc;
use std::ops::Deref;
use ipfs_api::IpfsClient;
-use ipfs_api::KeyType;
use failure::Error;
use failure::err_msg;
use futures::future::Future;
use futures::stream::Stream;
+
use serde_json::from_str as serde_json_from_str;
use serde_json::to_string as serde_json_to_str;
+use serde::Serialize;
+use serde::de::DeserializeOwned;
use chrono::NaiveDateTime;
-use itertools::Itertools;
use crate::types::block::Block;
use crate::types::content::Content;
use crate::types::content::Payload;
use crate::types::util::IPFSHash;
use crate::types::util::IPNSHash;
-use crate::types::util::Timestamp;
-use crate::repository::{ProfileName, ProfileKey};
-use crate::version::protocol_version;
-
-pub fn deref_ipns_hash(client: Arc<IpfsClient>,
- hash: &IPNSHash)
- -> impl Future<Item = IPFSHash, Error = Error>
-{
- client
- .name_resolve(Some(hash.deref()), false, false)
- .map_err(Error::from)
- .map(|resp| IPFSHash::from(resp.path))
-}
-
-pub fn resolve_plain(client: Arc<IpfsClient>, hash: &IPFSHash)
- -> impl Future<Item = Vec<u8>, Error = Error>
-{
- client
- .cat(hash)
- .concat2()
- .map_err(Error::from)
- .map(|blob| blob.to_vec())
-}
-
-pub fn resolve_block(client: Arc<IpfsClient>, hash: &IPFSHash)
- -> impl Future<Item = Block, Error = Error>
-{
- client
- .cat(hash)
- .concat2()
- .map_err(Error::from)
- .and_then(|block| {
- debug!("Got Block data, building Block object");
- String::from_utf8(block.to_vec())
- .map_err(Error::from)
- .and_then(|s| serde_json_from_str(&s).map_err(Error::from))
- })
-}
-
-pub fn get_key_id_from_key_name(client: Arc<IpfsClient>, name: ProfileName)
- -> impl Future<Item = ProfileKey, Error = Error>
-{
- client.key_list()
- .map_err(Error::from)
- .and_then(move |list| {
- list.keys
- .into_iter()
- .filter(|pair| pair.name == *name.deref())
- .next()
- .map(|pair| ProfileKey::from(pair.id))
- .ok_or_else(|| err_msg("No Key"))
- })
-}
-pub fn resolve_latest_block(client: Arc<IpfsClient>, hash: &IPNSHash)
- -> impl Future<Item = Block, Error = Error>
-{
- deref_ipns_hash(client.clone(), hash)
- .map_err(Error::from)
- .and_then(|ipfs_hash| resolve_block(client, &ipfs_hash))
-}
-
-pub fn resolve_content(client: Arc<IpfsClient>, hash: &IPFSHash)
- -> impl Future<Item = Content, Error = Error>
-{
- client
- .cat(hash)
- .concat2()
- .map_err(Error::from)
- .and_then(|content| {
- debug!("Got Content data, building Content object");
+/// Internal ClientFassade types
+///
+/// Abstracts the procedural interface of IpfsClient calls.
+#[derive(Clone)]
+struct ClientFassade(Arc<IpfsClient>);
+
+impl ClientFassade {
+ fn new(host: &str, port: u16) -> Result<ClientFassade, Error> {
+ debug!("Creating new ClientFassade object: {}:{}", host, port);
+ IpfsClient::new(host, port)
+ .map(Arc::new)
+ .map(|c| ClientFassade(c))
+ .map_err(Into::into)
+ }
+
+ fn get<H: AsRef<IPFSHash>>(&self, hash: H) -> impl Future<Item = Vec<u8>, Error = Error> {
+ debug!("Get: {}", hash.as_ref());
+ self.0
+ .clone()
+ .cat(hash.as_ref())
+ .concat2()
+ .map_err(Error::from)
+ .map(|blob| blob.to_vec())
+ }
+
+ fn put(&self, data: Vec<u8>) -> impl Future<Item = IPFSHash, Error = Error> {
+ debug!("Put: {:?}", data);
+ self.0
+ .clone()
+ .add(Cursor::new(data))
+ .map(|res| IPFSHash::from(res.hash))
+ .map_err(Into::into)
+ }
+}
+
+/// Client wrapper for working with types directly on the client
+#[derive(Clone)]
+pub struct TypedClientFassade(ClientFassade);
+
+impl TypedClientFassade {
+ pub fn new(host: &str, port: u16) -> Result<TypedClientFassade, Error> {
+ ClientFassade::new(host, port).map(TypedClientFassade)
+ }
+
+ pub fn get<H, D>(&self, hash: H) -> impl Future<Item = D, Error = Error>
+ where H: AsRef<IPFSHash>,
+ D: DeserializeOwned
+ {
+ self.0
+ .clone()
+ .get(hash)
+ .and_then(|data| {
+ debug!("Got data, building object: {:?}", data);
+
+ serde_json::from_slice(&data).map_err(Error::from)
+ })
+ }
+
+ pub fn put<S, Ser>(&self, data: &S) -> impl Future<Item = IPFSHash, Error = Error>
+ where S: AsRef<Ser>,
+ Ser: Serialize
+ {
+ let client = self.0.clone();
+
+ ::futures::future::result(serde_json_to_str(data.as_ref()))
+ .map_err(Into::into)
+ .and_then(move |d| client.put(d.into_bytes()))
+ }
- String::from_utf8(content.to_vec())
- .map_err(Error::from)
- .and_then(|s| serde_json_from_str(&s).map_err(Error::from))
- })
}
-
-pub fn resolve_content_none(client: Arc<IpfsClient>, hash: &IPFSHash)
- -> impl Future<Item = Content, Error = Error>
-{
- resolve_content(client, hash).and_then(|content| {
- debug!("Got Content object, checking whether it is None");
- match content.payload() {
- &Payload::None => Ok(content),
- _ => Err(err_msg("Content is not None")),
- }
- })
-}
-
-pub fn resolve_content_post(client: Arc<IpfsClient>, hash: &IPFSHash)
- -> impl Future<Item = Content, Error = Error>
-{
- resolve_content(client, hash)
- .and_then(|content| {
- debug!("Got Content object, checking whether it is Post");
- match content.payload() {
- &Payload::Post {..} => Ok(content),
- _ => Err(err_msg("Content is not a Post")),
- }
- })
-}
-
-pub fn resolve_content_attached_post_comments(client: Arc<IpfsClient>, hash: &IPFSHash)
- -> impl Future<Item = Content, Error = Error>
-{
- resolve_content(client, hash)
- .and_then(|content| {
- debug!("Got Content object, checking whether it is AttachedPostComments");
- match content.payload() {
- &Payload::AttachedPostComments {..} => Ok(content),
- _ => Err(err_msg("Content is not AttachedPostComments")),
- }
- })
-}
-
-pub fn resolve_content_profile(client: Arc<IpfsClient>, hash: &IPFSHash)
- -> impl Future<Item = Content, Error = Error>
-{
- resolve_content(client, hash)
- .and_then(|content| {
- debug!("Got Content object, checking whether it is Profile");
- match content.payload() {
- &Payload::Profile {..} => Ok(content),
- _ => Err(err_msg("Content is not a Profile")),
- }
- })
-}
-
-pub fn announce_block(client: Arc<IpfsClient>,
- key: ProfileKey,
- state: &IPFSHash,
- lifetime: Option<String>,
- ttl: Option<String>)
- -> impl Future<Item = (), Error = Error>
-{
- let name = format!("/ipfs/{}", state);
-
- resolve_block(client.clone(), state)
- .and_then(move |_| {
- debug!("Publishing block.");
- client.name_publish(&name,
- false,
- lifetime.as_ref().map(String::deref),
- ttl.as_ref().map(String::deref),
- Some(&key))
- .map_err(From::from)
- .map(|_| ())
- })
-}
-
-
-pub fn put_plain(client: Arc<IpfsClient>, data: Vec<u8>)
- -> impl Future<Item = IPFSHash, Error = Error>
-{
- client
- .add(Cursor::new(data))
- .map(|res| IPFSHash::from(res.hash))
- .map_err(Into::into)
-}
-
-pub fn put_block(client: Arc<IpfsClient>, block: &Block)
- -> impl Future<Item = IPFSHash, Error = Error>
-{
- let data = serde_json_to_str(block);
-
- ::futures::future::result(data)
- .map_err(Into::into)
- .and_then(move |data| put_plain(client, data.into_bytes()))
-}
-
-pub fn put_content(client: Arc<IpfsClient>, content: &Content)
- -> impl Future<Item = IPFSHash, Error = Error>
-{
- let data = serde_json_to_str(content);
- ::futures::future::result(data)
- .map_err(Into::into)
- .and_then(move |data| put_plain(client, data.into_bytes()))
-}
-
-pub fn new_profile(client: Arc<IpfsClient>,
- keyname: String,
- profile: Content,
- lifetime: Option<String>,
- ttl: Option<String>)
- -> impl Future<Item = (ProfileName, ProfileKey), Error = Error>
-{
- let client1 = client.clone();
- let client2 = client.clone();
- let client3 = client.clone();
-
- client
- .key_gen(&keyname, KeyType::Rsa, 4096)
- .map_err(Error::from)
- .map(|kp| (kp.name, kp.id))
- .and_then(move |(key_name, key_id)| { // put the content into IPFS
- let mut prof = profile;
- prof.push_device(IPNSHash::from(key_id.clone()));
-
- put_content(client1, &prof)
- .map(move |content_hash| (content_hash, key_name, key_id))
- .map_err(Error::from)
- })
- .map(|(content_hash, key_name, key_id)| {
- let block = Block::new(protocol_version(),
- vec![], // no parents for new profile
- content_hash);
-
- (block, key_name, key_id)
- })
- .and_then(move |(block, key_name, key_id)| { // put the content into a new block
- put_block(client2, &block)
- .map(|block_hash| (block_hash, key_name, key_id))
- .map_err(Error::from)
- })
- .map(|(block_hash, key_name, key_id)| {
- (format!("/ipfs/{}", block_hash), key_name, key_id)
- })
- .and_then(move |(path, key_name, key_id)| {
- client3
- .name_publish(&path,
- false,
- lifetime.as_ref().map(String::deref),
- ttl.as_ref().map(String::deref),
- Some(&key_name))
- .map(|_publish_response| {
- (ProfileName(key_name), ProfileKey::from(key_id))
- })
- .map_err(Error::from)
- })
-}
-
-pub fn new_text_post(client: Arc<IpfsClient>,
- _publish_key_id: ProfileKey,
- parent_blocks: Vec<IPFSHash>,
- text: String,
- time: Option<NaiveDateTime>)
- -> impl Future<Item = IPFSHash, Error = Error>
-{
- let client3 = client.clone();
- let client4 = client.clone();
- let client5 = client.clone();
-
- let iterator = parent_blocks
- .clone()
- .into_iter()
- .map(move |parent_block| {
- let client1 = client.clone();
- let client2 = client.clone();
- resolve_block(client1, &parent_block)
- .and_then(move |block| {
- resolve_content(client2, block.content())
- })
- .map(|content| content.devices().to_vec())
- });
-
- ::futures::future::join_all(iterator)
- .and_then(move |devices| {
- let devices = Iterator::flatten(devices.into_iter()).unique().collect();
-
- put_plain(client3, text.into_bytes())
- .and_then(move |content_hash| {
- let post = Payload::Post {
- content_format: ::mime::TEXT_PLAIN.into(),
- content: content_hash,
- reply_to: None,
-
- comments_will_be_propagated: None,
- comments_propagated_until: None,
- };
-
- let ts = time.map(Timestamp::from);
- let content_obj = Content::new(devices, ts, post);
-
- put_content(client4, &content_obj)
- })
- })
- .and_then(move |content_obj_hash| {
- let block = Block::new(protocol_version(), parent_blocks, content_obj_hash);
- put_block(client5, &block)
- })
-}
-
-