summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-12-17 22:20:45 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-12-18 16:35:48 +0100
commitfd1173f06e746995db066222b7be76879fe53d22 (patch)
tree075a15a4b9efaddacd98b44f7ae4a7e51c54e8c9
parentdbf9acf5f3cdf9c2c84040f9408872b3ee1e69a0 (diff)
Fix impl of gossipping test
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--lib/src/reactor/gossip/mod.rs63
1 files changed, 38 insertions, 25 deletions
diff --git a/lib/src/reactor/gossip/mod.rs b/lib/src/reactor/gossip/mod.rs
index 689bf37..11cdc61 100644
--- a/lib/src/reactor/gossip/mod.rs
+++ b/lib/src/reactor/gossip/mod.rs
@@ -40,7 +40,6 @@ impl ReactorBuilder for GossipReactorBuilder {
fn build_with_receiver(self, rr: ReactorReceiver<GossipRequest, GossipReply>) -> Self::Reactor {
GossipReactor {
- running: false,
profile: self.profile,
gossip_topic_name: self.gossip_topic_name,
receiver: rr,
@@ -50,7 +49,6 @@ impl ReactorBuilder for GossipReactorBuilder {
#[derive(Debug)]
pub struct GossipReactor {
- running: bool,
profile: Arc<RwLock<Profile>>,
gossip_topic_name: String,
receiver: ReactorReceiver<GossipRequest, GossipReply>,
@@ -99,6 +97,7 @@ impl GossipReactor {
}
async fn connect(&self, addr: ipfs::MultiaddrWithPeerId) -> Result<()> {
+ log::trace!("Connecting GossipReactor with {:?}", addr);
self.profile.read().await.client().connect(addr).await
}
@@ -143,7 +142,7 @@ impl Reactor for GossipReactor {
async fn run(mut self) -> Result<()> {
use futures::stream::StreamExt;
- self.running = true;
+ log::trace!("Booting {:?}", self);
let mut subscription_stream = self.profile
.read()
.await
@@ -152,9 +151,11 @@ impl Reactor for GossipReactor {
.pubsub_subscribe(self.gossip_topic_name.clone())
.await?;
+ log::trace!("{:?} main loop", self);
loop {
tokio::select! {
next_control_msg = self.receiver.recv() => {
+ log::trace!("Received control message");
match next_control_msg {
None => break,
Some((GossipRequest::Exit, reply_channel)) => {
@@ -183,16 +184,14 @@ impl Reactor for GossipReactor {
next_gossip_message = subscription_stream.next() => {
if let Some(next_gossip_message) = next_gossip_message {
+ log::trace!("Received gossip message");
self.handle_gossip_message(next_gossip_message).await?;
} else {
+ log::trace!("Gossip stream closed, breaking reactor loop");
break;
}
}
}
-
- if !self.running {
- break;
- }
}
Ok(())
}
@@ -221,7 +220,7 @@ mod tests {
let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name).build_with_receiver(tx);
let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
- rx.send((GossipRequest::Ping, reply_sender));
+ rx.send((GossipRequest::Ping, reply_sender)).unwrap();
let mut pong_received = false;
tokio::select! {
@@ -230,7 +229,7 @@ mod tests {
Some(GossipReply::Pong) => {
pong_received = true;
let (reply_sender, mut reply_receiver) = tokio::sync::mpsc::unbounded_channel();
- rx.send((GossipRequest::Exit, reply_sender));
+ rx.send((GossipRequest::Exit, reply_sender)).unwrap();
}
Some(r) => {
assert!(false, "Expected ReactorReply::Pong, got: {:?}", r);
@@ -260,7 +259,7 @@ mod tests {
let _ = env_logger::try_init();
let gossip_topic_name = String::from("test-gossip-reactor-gossipping-topic");
- let (left_profile, left_reactor, left_tx) = {
+ let (left_profile, left_reactor, left_rx) = {
let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple-left").await;
assert!(profile.is_ok());
let profile = Arc::new(RwLock::new(profile.unwrap()));
@@ -269,8 +268,9 @@ mod tests {
let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
(profile, reactor, rx)
};
+ log::trace!("Built left GossipReactor");
- let (right_profile, right_reactor, right_tx) = {
+ let (right_profile, right_reactor, right_rx) = {
let profile = Profile::new_inmemory(Config::default(), "test-gossip-reactor-simple-right").await;
assert!(profile.is_ok());
let profile = Arc::new(RwLock::new(profile.unwrap()));
@@ -279,6 +279,7 @@ mod tests {
let reactor = GossipReactorBuilder::new(profile.clone(), gossip_topic_name.clone()).build_with_receiver(tx);
(profile, reactor, rx)
};
+ log::trace!("Built right GossipReactor");
async fn get_peer_id(profile: Arc<RwLock<Profile>>) -> Result<ipfs::MultiaddrWithPeerId> {
profile.read()
@@ -296,26 +297,38 @@ mod tests {
})
}
+ let left_running_reactor = tokio::spawn(async move {
+ left_reactor.run().await
+ });
+
+ let right_running_reactor = tokio::spawn(async move {
+ right_reactor.run().await
+ });
+
let left_peer_id = get_peer_id(left_profile.clone()).await;
+ log::trace!("Left GossipReactor = {:?}", left_peer_id);
assert!(left_peer_id.is_ok(), "Not ok: {:?}", left_peer_id);
let left_peer_id = left_peer_id.unwrap();
- let (right_reply_sender, mut right_reply_receiver) = tokio::sync::mpsc::unbounded_channel();
- right_tx.send((GossipRequest::Connect(left_peer_id), right_reply_sender));
-
- if let Some(reply) = right_reply_receiver.recv().await {
- match reply {
- GossipReply::ConnectResult(Ok(())) => assert!(true),
- other => assert!(false, "Expected ConnectResult(Ok(())), recv: {:?}", other),
- }
- } else {
- assert!(false, "No reply from right reactor received");
- }
let right_peer_id = get_peer_id(right_profile.clone()).await;
+ log::trace!("Right GossipReactor = {:?}", right_peer_id);
assert!(right_peer_id.is_ok(), "Not ok: {:?}", right_peer_id);
let right_peer_id = right_peer_id.unwrap();
- let connected = left_reactor.is_connected_to(right_peer_id).await;
- assert!(connected.is_ok());
- assert!(connected.unwrap(), "Peers are not connected");
+
+ let (right_reply_sender, mut right_reply_receiver) = tokio::sync::mpsc::unbounded_channel();
+
+ log::trace!("Right GossipReactor should now connect to left GossipReactor");
+ right_rx.send((GossipRequest::Connect(left_peer_id), right_reply_sender)).unwrap();
+
+ log::trace!("Right GossipReactor should now connect to left GossipReactor... waiting for reply");
+ match tokio::time::timeout(std::time::Duration::from_secs(5), right_reply_receiver.recv()).await {
+ Err(_) => assert!(false, "Timeout elapsed when waiting for connection status"),
+ Ok(Some(GossipReply::ConnectResult(Ok(())))) => {
+ log::trace!("Right GossipReactor is connected");
+ assert!(true)
+ },
+ Ok(Some(other)) => assert!(false, "Expected ConnectResult(Ok(())), recv: {:?}", other),
+ Ok(None) => assert!(false, "No reply from right reactor received"),
+ }
}
}