From fd1173f06e746995db066222b7be76879fe53d22 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 17 Dec 2021 22:20:45 +0100 Subject: Fix impl of gossipping test Signed-off-by: Matthias Beyer --- lib/src/reactor/gossip/mod.rs | 63 ++++++++++++++++++++++++++----------------- 1 file changed, 38 insertions(+), 25 deletions(-) diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs index 689bf37..11cdc61 100644 --- a/lib/src/reactor/gossip/mod.rs +++ b/lib/src/reactor/gossip/mod.rs @@ -40,7 +40,6 @@ impl ReactorBuilder for GossipReactorBuilder { fn build_with_receiver(self, rr: ReactorReceiver) -> Self::Reactor { GossipReactor { - running: false, profile: self.profile, gossip_topic_name: self.gossip_topic_name, receiver: rr, @@ -50,7 +49,6 @@ impl ReactorBuilder for GossipReactorBuilder { #[derive(Debug)] pub struct GossipReactor { - running: bool, profile: Arc>, gossip_topic_name: String, receiver: ReactorReceiver, @@ -99,6 +97,7 @@ impl GossipReactor { } async fn connect(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<()> { + log::trace!("Connecting GossipReactor with {:?}", addr); self.profile.read().await.client().connect(addr).await } @@ -143,7 +142,7 @@ impl Reactor for GossipReactor { async fn run(mut self) -> Result<()> { use futures::stream::StreamExt; - self.running = true; + log::trace!("Booting {:?}", self); let mut subscription_stream = self.profile .read() .await @@ -152,9 +151,11 @@ impl Reactor for GossipReactor { .pubsub_subscribe(self.gossip_topic_name.clone()) .await?; + log::trace!("{:?} main loop", self); loop { tokio::select! { next_control_msg = self.receiver.recv() => { + log::trace!("Received control message"); match next_control_msg { None => break, Some((GossipRequest::Exit, reply_channel)) => { @@ -183,16 +184,14 @@ impl Reactor for GossipReactor { next_gossip_message = subscription_stream.next() => { if let Some(next_gossip_message) = next_gossip_message { + log::trace!("Received gossip message"); self.handle_gossip_message(next_gossip_message).await?; } else { + log::trace!("Gossip stream closed, breaking reactor loop"); break; } } } - - if !self.running { - break; - } } Ok(()) } @@ -221,7 +220,7 @@ mod tests { let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name).build_with_receiver(tx); let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel(); - rx.send((GossipRequest::Ping, reply_sender)); + rx.send((GossipRequest::Ping, reply_sender)).unwrap(); let mut pong_received = false; tokio::select! { @@ -230,7 +229,7 @@ mod tests { Some(GossipReply::Pong) => { pong_received = true; let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel(); - rx.send((GossipRequest::Exit, reply_sender)); + rx.send((GossipRequest::Exit, reply_sender)).unwrap(); } Some(r) => { assert!(false, "Expected ReactorReply::Pong, got: {:?}", r); @@ -260,7 +259,7 @@ mod tests { let _ = env_logger::try_init(); let gossip_topic_name = String::from("test-gossip-reactor-gossipping-topic"); - let (left_profile, left_reactor, left_tx) = { + let (left_profile, left_reactor, left_rx) = { let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple-left").await; assert!(profile.is_ok()); let profile = Arc::new(RwLock::new(profile.unwrap())); @@ -269,8 +268,9 @@ mod tests { let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx); (profile, reactor, rx) }; + log::trace!("Built left GossipReactor"); - let (right_profile, right_reactor, right_tx) = { + let (right_profile, right_reactor, right_rx) = { let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple-right").await; assert!(profile.is_ok()); let profile = Arc::new(RwLock::new(profile.unwrap())); @@ -279,6 +279,7 @@ mod tests { let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx); (profile, reactor, rx) }; + log::trace!("Built right GossipReactor"); async fn get_peer_id(profile: Arc>) -> Result { profile.read() @@ -296,26 +297,38 @@ mod tests { }) } + let left_running_reactor = tokio::spawn(async move { + left_reactor.run().await + }); + + let right_running_reactor = tokio::spawn(async move { + right_reactor.run().await + }); + let left_peer_id = get_peer_id(left_profile.clone()).await; + log::trace!("Left GossipReactor = {:?}", left_peer_id); assert!(left_peer_id.is_ok(), "Not ok: {:?}", left_peer_id); let left_peer_id = left_peer_id.unwrap(); - let (right_reply_sender, mut right_reply_receiver) = tokio::sync::mpsc::unbounded_channel(); - right_tx.send((GossipRequest::Connect(left_peer_id), right_reply_sender)); - - if let Some(reply) = right_reply_receiver.recv().await { - match reply { - GossipReply::ConnectResult(Ok(())) => assert!(true), - other => assert!(false, "Expected ConnectResult(Ok(())), recv: {:?}", other), - } - } else { - assert!(false, "No reply from right reactor received"); - } let right_peer_id = get_peer_id(right_profile.clone()).await; + log::trace!("Right GossipReactor = {:?}", right_peer_id); assert!(right_peer_id.is_ok(), "Not ok: {:?}", right_peer_id); let right_peer_id = right_peer_id.unwrap(); - let connected = left_reactor.is_connected_to(right_peer_id).await; - assert!(connected.is_ok()); - assert!(connected.unwrap(), "Peers are not connected"); + + let (right_reply_sender, mut right_reply_receiver) = tokio::sync::mpsc::unbounded_channel(); + + log::trace!("Right GossipReactor should now connect to left GossipReactor"); + right_rx.send((GossipRequest::Connect(left_peer_id), right_reply_sender)).unwrap(); + + log::trace!("Right GossipReactor should now connect to left GossipReactor... waiting for reply"); + match tokio::time::timeout(std::time::Duration::from_secs(5), right_reply_receiver.recv()).await { + Err(_) => assert!(false, "Timeout elapsed when waiting for connection status"), + Ok(Some(GossipReply::ConnectResult(Ok(())))) => { + log::trace!("Right GossipReactor is connected"); + assert!(true) + }, + Ok(Some(other)) => assert!(false, "Expected ConnectResult(Ok(())), recv: {:?}", other), + Ok(None) => assert!(false, "No reply from right reactor received"), + } } } -- cgit v1.2.3