//! The "model" module implements the Database-layer of the application use std::io::Cursor; use std::ops::Deref; use std::result::Result as RResult; use std::sync::Arc; use anyhow::Error; use chrono::NaiveDateTime; use failure::Fail; use futures::future::Future; use futures::future::FutureExt; use futures::stream::Stream; use futures::stream::StreamExt; use futures::stream::TryStreamExt; use ipfs_api::IpfsClient; use ipfs_api::TryFromUri; use serde::Serialize; use serde::de::DeserializeOwned; use serde_json::from_str as serde_json_from_str; use serde_json::to_string as serde_json_to_str; use crate::types::block::Block; use crate::types::content::Content; use crate::types::payload::Payload; use crate::types::util::IPFSHash; use crate::types::util::IPNSHash; #[derive(Clone)] pub struct Model(Arc); impl std::fmt::Debug for Model{ fn fmt(&self, f: &mut std::fmt::Formatter) -> RResult<(), std::fmt::Error> { write!(f, "Model") } } impl Model { pub fn new(host: &str, port: u16) -> Result { debug!("Creating new Model object: {}:{}", host, port); IpfsClient::from_str(&format!("{}:{}", host, port)) .map(Arc::new) .map(|c| Model(c)) .map_err(|e| Error::from(e.compat())) } // // // Low-level interface to the ipfs-api // // pub(crate) async fn get_raw_bytes>(&self, hash: H) -> Result, Error> { debug!("Get: {}", hash.as_ref()); self.0 .clone() .cat(hash.as_ref()) .map_ok(|b| b.to_vec()) .try_concat() .map(|r| r.map_err(|e| anyhow!("UNIMPLEMENTED!()"))) .await } pub(crate) async fn put_raw_bytes(&self, data: Vec) -> Result { debug!("Put: {:?}", data); self.0 .clone() .add(Cursor::new(data)) .await .map(|res| IPFSHash::from(res.hash)) .map_err(|e| anyhow!("UNIMPLEMENTED!()")) } pub(crate) async fn publish(&self, key: &str, hash: &str) -> Result { debug!("Publish: {:?} -> {:?}", key, hash); self.0 .clone() .name_publish(hash, false, None, None, Some(key)) .await .map(|res| IPNSHash::from(res.value)) .map_err(|e| anyhow!("UNIMPLEMENTED!()")) } pub(crate) async fn resolve(&self, ipns: IPNSHash) -> Result { self.0 .clone() .name_resolve(Some(&ipns), true, false) .await .map(|res| IPFSHash::from(res.path)) .map_err(|e| anyhow!("UNIMPLEMENTED!()")) } // // // Generic typed interface // // pub(crate) async fn get_typed(&self, hash: H) -> Result where H: AsRef, D: DeserializeOwned { self.get_raw_bytes(hash) .await .and_then(|data| { debug!("Got data, building object: {:?}", data); serde_json::from_slice(&data).map_err(|e| Error::from(e.compat())) }) } pub(crate) async fn put_typed(&self, data: &S) -> Result where S: AsRef, Ser: Serialize { let data = serde_json_to_str(data.as_ref())?; self.put_raw_bytes(data.into_bytes()).await } // // // Typed interface // // pub async fn get_block(&self, hash: H) -> Result where H: AsRef { self.get_typed(hash).await } pub async fn put_block(&self, b: B) -> Result where B: AsRef { self.put_typed(b.as_ref()).await } pub async fn get_content(&self, hash: H) -> Result where H: AsRef { self.get_typed(hash).await } pub async fn put_content(&self, c: C) -> Result where C: AsRef { self.put_typed(c.as_ref()).await } }