From 685e35d302873d3047ef86dd8904e769202d23cf Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Wed, 20 May 2020 11:20:12 +0200 Subject: Unite ipfs-api client abstraction into one "Model" type Signed-off-by: Matthias Beyer --- src/model.rs | 151 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 151 insertions(+) create mode 100644 src/model.rs (limited to 'src/model.rs') diff --git a/src/model.rs b/src/model.rs new file mode 100644 index 0000000..b81ea17 --- /dev/null +++ b/src/model.rs @@ -0,0 +1,151 @@ +//! The "model" module implements the Database-layer of the application + +use std::io::Cursor; +use std::ops::Deref; +use std::result::Result as RResult; +use std::sync::Arc; + +use anyhow::Error; +use chrono::NaiveDateTime; +use failure::Fail; +use futures::future::Future; +use futures::future::FutureExt; +use futures::stream::Stream; +use futures::stream::StreamExt; +use futures::stream::TryStreamExt; +use ipfs_api::IpfsClient; +use ipfs_api::TryFromUri; +use serde::Serialize; +use serde::de::DeserializeOwned; +use serde_json::from_str as serde_json_from_str; +use serde_json::to_string as serde_json_to_str; + +use crate::types::block::Block; +use crate::types::content::Content; +use crate::types::payload::Payload; +use crate::types::util::IPFSHash; +use crate::types::util::IPNSHash; + +#[derive(Clone)] +pub struct Model(Arc); + +impl std::fmt::Debug for Model{ + fn fmt(&self, f: &mut std::fmt::Formatter) -> RResult<(), std::fmt::Error> { + write!(f, "Model") + } +} + +impl Model { + pub fn new(host: &str, port: u16) -> Result { + debug!("Creating new Model object: {}:{}", host, port); + IpfsClient::from_str(&format!("{}:{}", host, port)) + .map(Arc::new) + .map(|c| Model(c)) + .map_err(|e| Error::from(e.compat())) + } + + + // + // + // Low-level interface to the ipfs-api + // + // + + pub(crate) async fn get_raw_bytes>(&self, hash: H) -> Result, Error> { + debug!("Get: {}", hash.as_ref()); + self.0 + .clone() + .cat(hash.as_ref()) + .map_ok(|b| b.to_vec()) + .try_concat() + .map(|r| r.map_err(|e| anyhow!("UNIMPLEMENTED!()"))) + .await + } + + pub(crate) async fn put_raw_bytes(&self, data: Vec) -> Result { + debug!("Put: {:?}", data); + self.0 + .clone() + .add(Cursor::new(data)) + .await + .map(|res| IPFSHash::from(res.hash)) + .map_err(|e| anyhow!("UNIMPLEMENTED!()")) + } + + pub(crate) async fn publish(&self, key: &str, hash: &str) -> Result { + debug!("Publish: {:?} -> {:?}", key, hash); + self.0 + .clone() + .name_publish(hash, false, None, None, Some(key)) + .await + .map(|res| IPNSHash::from(res.value)) + .map_err(|e| anyhow!("UNIMPLEMENTED!()")) + } + + pub(crate) async fn resolve(&self, ipns: IPNSHash) -> Result { + self.0 + .clone() + .name_resolve(Some(&ipns), true, false) + .await + .map(|res| IPFSHash::from(res.path)) + .map_err(|e| anyhow!("UNIMPLEMENTED!()")) + } + + // + // + // Generic typed interface + // + // + + pub(crate) async fn get_typed(&self, hash: H) -> Result + where H: AsRef, + D: DeserializeOwned + { + self.get_raw_bytes(hash) + .await + .and_then(|data| { + debug!("Got data, building object: {:?}", data); + + serde_json::from_slice(&data).map_err(|e| Error::from(e.compat())) + }) + } + + pub(crate) async fn put_typed(&self, data: &S) -> Result + where S: AsRef, + Ser: Serialize + { + let data = serde_json_to_str(data.as_ref())?; + self.put_raw_bytes(data.into_bytes()).await + } + + // + // + // Typed interface + // + // + + pub async fn get_block(&self, hash: H) -> Result + where H: AsRef + { + self.get_typed(hash).await + } + + pub async fn put_block(&self, b: B) -> Result + where B: AsRef + { + self.put_typed(b.as_ref()).await + } + + pub async fn get_content(&self, hash: H) -> Result + where H: AsRef + { + self.get_typed(hash).await + } + + pub async fn put_content(&self, c: C) -> Result + where C: AsRef + { + self.put_typed(c.as_ref()).await + } + +} -- cgit v1.2.3