diff options
Diffstat (limited to 'lib/src/gossip/deserializer.rs')
-rw-r--r-- | lib/src/gossip/deserializer.rs | 57 |
1 files changed, 57 insertions, 0 deletions
diff --git a/lib/src/gossip/deserializer.rs b/lib/src/gossip/deserializer.rs new file mode 100644 index 0000000..dcd5082 --- /dev/null +++ b/lib/src/gossip/deserializer.rs @@ -0,0 +1,57 @@ +use std::sync::Arc; + +use anyhow::Result; +use futures::Stream; +use futures::StreamExt; + +use crate::gossip::GossipMessage; + +pub struct GossipDeserializer<ErrStrategy = LogStrategy> + where ErrStrategy: GossipDeserializerErrorStrategy +{ + strategy: std::marker::PhantomData<ErrStrategy>, +} + +impl<ErrStrategy> GossipDeserializer<ErrStrategy> + where ErrStrategy: GossipDeserializerErrorStrategy +{ + pub fn new() -> Self { + Self { + strategy: std::marker::PhantomData, + } + } + + pub fn run<S>(self, input: S) -> impl Stream<Item = (ipfs::PeerId, GossipMessage)> + where S: Stream<Item = Arc<ipfs::PubsubMessage>> + { + input.filter_map(|message| async move { + log::trace!("Received gossip message"); + + match serde_json::from_slice(&message.data).map_err(anyhow::Error::from) { + Ok(m) => Some((message.source, m)), + Err(e) => { + ErrStrategy::handle_error(e); + None + } + } + }) + } +} + +pub trait GossipDeserializerErrorStrategy { + fn handle_error(err: anyhow::Error); +} + +pub struct LogStrategy; +impl GossipDeserializerErrorStrategy for LogStrategy { + fn handle_error(err: anyhow::Error) { + log::trace!("Error: {}", err); + } +} + +pub struct IgnoreStrategy; +impl GossipDeserializerErrorStrategy for IgnoreStrategy { + fn handle_error(_: anyhow::Error) { + () + } +} |