summaryrefslogtreecommitdiffstats
path: root/lib/src/gossip/deserializer.rs
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-12-18 22:04:31 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-12-18 22:14:37 +0100
commit542e1e9dc50a96a36ab9d4236293cd0a4f5d22c3 (patch)
tree89905044c44e1af21ab79b150b85011f96efa8fe /lib/src/gossip/deserializer.rs
parent7675eba9244475c3847fd48eb13b5d8b54cf271c (diff)
Shrink idea of "Reactors"
A Reactor can be waaay less complex if we simply use it as "map"-helper for mapping over `Stream`s. If we map over a stream of Vec<u8> and deserialize them to GossipMessages in one step, and handle them appropriately in the next step, it is way less complex to implement these things and we do not have to care about this whole "how do I shut down the thing" because we can simply drop() everything and let the destructors do their job. This patch removes the Reactor nonsense. Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'lib/src/gossip/deserializer.rs')
-rw-r--r--lib/src/gossip/deserializer.rs55
1 files changed, 55 insertions, 0 deletions
diff --git a/lib/src/gossip/deserializer.rs b/lib/src/gossip/deserializer.rs
new file mode 100644
index 0000000..a50644f
--- /dev/null
+++ b/lib/src/gossip/deserializer.rs
@@ -0,0 +1,55 @@
+use anyhow::Result;
+use futures::Stream;
+use futures::StreamExt;
+
+use crate::gossip::GossipMessage;
+
+pub struct GossipDeserializer<ErrStrategy = LogStrategy>
+ where ErrStrategy: GossipDeserializerErrorStrategy
+{
+ strategy: std::marker::PhantomData<ErrStrategy>,
+}
+
+impl<ErrStrategy> GossipDeserializer<ErrStrategy>
+ where ErrStrategy: GossipDeserializerErrorStrategy
+{
+ pub fn new() -> Self {
+ Self {
+ strategy: std::marker::PhantomData,
+ }
+ }
+
+ pub fn run<S>(mut self, input: S) -> impl Stream<Item = GossipMessage>
+ where S: Stream<Item = ipfs::PubsubMessage>
+ {
+ input.filter_map(|message| async move {
+ log::trace!("Received gossip message");
+
+ match serde_json::from_slice(&message.data).map_err(anyhow::Error::from) {
+ Ok(m) => Some(m),
+ Err(e) => {
+ ErrStrategy::handle_error(e);
+ None
+ }
+ }
+ })
+ }
+}
+
+pub trait GossipDeserializerErrorStrategy {
+ fn handle_error(err: anyhow::Error);
+}
+
+pub struct LogStrategy;
+impl GossipDeserializerErrorStrategy for LogStrategy {
+ fn handle_error(err: anyhow::Error) {
+ log::trace!("Error: {}", err);
+ }
+}
+
+pub struct IgnoreStrategy;
+impl GossipDeserializerErrorStrategy for IgnoreStrategy {
+ fn handle_error(_: anyhow::Error) {
+ ()
+ }
+}