1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
use std::sync::Arc;
use anyhow::Result;
use crate::backend::Id;
use crate::backend::Node;
#[derive(Clone)]
pub struct IpfsEmbedBackend {
ipfs: Arc<ipfs_embed::Ipfs<ipfs_embed::DefaultParams>>,
}
impl IpfsEmbedBackend {
pub fn ipfs(&self) -> Arc<ipfs_embed::Ipfs<ipfs_embed::DefaultParams>> {
self.ipfs.clone()
}
}
#[async_trait::async_trait]
impl daglib::DagBackend<Id, Node> for IpfsEmbedBackend {
async fn get(&self, id: Id) -> Result<Option<(Id, Node)>> {
log::trace!("GET {:?}", id);
let block = self.ipfs.fetch(id.as_ref(), self.ipfs.peers()).await?;
let node = block.decode::<libipld::cbor::DagCborCodec, crate::backend::Node>()?;
Ok(Some((id, node)))
}
async fn put(&mut self, node: Node) -> Result<Id> {
log::trace!("PUT {:?}", node);
let block = libipld::block::Block::encode(libipld::cbor::DagCborCodec, libipld::multihash::Code::Blake3_256, &node)?;
let cid = Id::from(block.cid().clone());
self.ipfs
.insert(&block)
.map(|_| cid)
}
}
impl IpfsEmbedBackend {
pub async fn new_in_memory(cache_size: u64) -> Result<Self> {
let in_memory = None; // that's how it works...
let config = ipfs_embed::Config::new(in_memory, cache_size);
ipfs_embed::Ipfs::new(config).await.map(Arc::new).map(|ipfs| IpfsEmbedBackend { ipfs })
}
pub async fn new_with_config(cfg: ipfs_embed::Config) -> Result<Self> {
ipfs_embed::Ipfs::new(cfg)
.await
.map(Arc::new)
.map(|ipfs| IpfsEmbedBackend { ipfs })
}
pub async fn write_payload(&self, payload: &crate::backend::Payload) -> Result<cid::Cid> {
log::trace!("Write payload: {:?}", payload);
let block = libipld::block::Block::encode(libipld::cbor::DagCborCodec, libipld::multihash::Code::Blake3_256, &payload)?;
log::trace!("Block = {:?}", block);
let _ = self.ipfs.insert(&block)?;
log::trace!("Inserted. CID = {}", block.cid());
Ok(block.cid().clone())
}
pub async fn get_payload(&self, cid: &cid::Cid) -> Result<crate::backend::Payload> {
let block = self.ipfs.fetch(cid, self.ipfs.peers()).await?;
log::trace!("Block = {:?}", block);
let payload = block.decode::<libipld::cbor::DagCborCodec, crate::backend::Payload>()?;
log::trace!("Payload = {:?}", payload);
Ok(payload)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::Payload;
#[tokio::test]
async fn test_roundtrip_payload() {
let backend = IpfsEmbedBackend::new_in_memory(1024).await.unwrap();
let cid = backend.write_payload(&Payload::now_from_text(String::from("test"))).await.unwrap();
let payload = backend.ipfs().fetch(&cid, backend.ipfs().peers()).await.unwrap();
let payload = payload.decode::<libipld::cbor::DagCborCodec, crate::backend::Payload>().unwrap();
assert_eq!(payload.content(), "test".as_bytes())
}
}
|