diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-12-18 22:04:31 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-12-18 22:14:37 +0100 |
commit | 542e1e9dc50a96a36ab9d4236293cd0a4f5d22c3 (patch) | |
tree | 89905044c44e1af21ab79b150b85011f96efa8fe /lib/src/gossip/deserializer.rs | |
parent | 7675eba9244475c3847fd48eb13b5d8b54cf271c (diff) |
Shrink idea of "Reactors"
A Reactor can be waaay less complex if we simply use it as "map"-helper
for mapping over `Stream`s.
If we map over a stream of Vec<u8> and deserialize them to
GossipMessages in one step, and handle them appropriately in the next
step, it is way less complex to implement these things and we do not
have to care about this whole "how do I shut down the thing" because we
can simply drop() everything and let the destructors do their job.
This patch removes the Reactor nonsense.
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'lib/src/gossip/deserializer.rs')
-rw-r--r-- | lib/src/gossip/deserializer.rs | 55 |
1 files changed, 55 insertions, 0 deletions
diff --git a/lib/src/gossip/deserializer.rs b/lib/src/gossip/deserializer.rs new file mode 100644 index 0000000..a50644f --- /dev/null +++ b/lib/src/gossip/deserializer.rs @@ -0,0 +1,55 @@ +use anyhow::Result; +use futures::Stream; +use futures::StreamExt; + +use crate::gossip::GossipMessage; + +pub struct GossipDeserializer<ErrStrategy = LogStrategy> + where ErrStrategy: GossipDeserializerErrorStrategy +{ + strategy: std::marker::PhantomData<ErrStrategy>, +} + +impl<ErrStrategy> GossipDeserializer<ErrStrategy> + where ErrStrategy: GossipDeserializerErrorStrategy +{ + pub fn new() -> Self { + Self { + strategy: std::marker::PhantomData, + } + } + + pub fn run<S>(mut self, input: S) -> impl Stream<Item = GossipMessage> + where S: Stream<Item = ipfs::PubsubMessage> + { + input.filter_map(|message| async move { + log::trace!("Received gossip message"); + + match serde_json::from_slice(&message.data).map_err(anyhow::Error::from) { + Ok(m) => Some(m), + Err(e) => { + ErrStrategy::handle_error(e); + None + } + } + }) + } +} + +pub trait GossipDeserializerErrorStrategy { + fn handle_error(err: anyhow::Error); +} + +pub struct LogStrategy; +impl GossipDeserializerErrorStrategy for LogStrategy { + fn handle_error(err: anyhow::Error) { + log::trace!("Error: {}", err); + } +} + +pub struct IgnoreStrategy; +impl GossipDeserializerErrorStrategy for IgnoreStrategy { + fn handle_error(_: anyhow::Error) { + () + } +} |