diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-12-20 19:13:18 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-12-20 19:13:18 +0100 |
commit | 1034a454d0900130796fd8941bbc9696bcc41b29 (patch) | |
tree | 085bf79d2d6a98144619f16906147f1331565786 | |
parent | a23b897b5c3c9ee721b793e26401d1863d97f84e (diff) | |
parent | 9a22b05a96f5ed066d588ab3aadf620f242d7ae5 (diff) |
Merge branch 'gossipping-cli'
-rw-r--r-- | cli/src/cli.rs | 10 | ||||
-rw-r--r-- | cli/src/profile.rs | 72 | ||||
-rw-r--r-- | lib/src/client.rs | 16 | ||||
-rw-r--r-- | lib/src/profile/mod.rs | 4 |
4 files changed, 95 insertions, 7 deletions
diff --git a/cli/src/cli.rs b/cli/src/cli.rs index ee8d608..4b52ad6 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -46,9 +46,19 @@ pub fn app<'a>() -> App<'a> { .long("connect") .required(false) .takes_value(true) + .multiple(true) .value_name("MULTIADDR") .about("Connect to MULTIADDR as well") ) + + .arg(Arg::new("listen") + .long("listen") + .required(false) + .takes_value(true) + .multiple(true) + .value_name("MULTIADDR") + .about("Listen on MULTIADDR, e.g. '/ip4/127.0.0.1/tcp/10000'") + ) ) .subcommand(App::new("cat") diff --git a/cli/src/profile.rs b/cli/src/profile.rs index 87b1e54..cf1a2f0 100644 --- a/cli/src/profile.rs +++ b/cli/src/profile.rs @@ -36,10 +36,20 @@ async fn profile_serve(matches: &ArgMatches) -> Result<()> { use ipfs::MultiaddrWithPeerId; let name = matches.value_of("name").map(String::from).unwrap(); // required - let connect_peer = matches.value_of("connect").map(|s| { - s.parse::<MultiaddrWithPeerId>() - .map_err(anyhow::Error::from) - }).transpose()?; + let listen_addrs = matches.values_of("listen") + .map(|v| { + v.map(|s| s.parse::<ipfs::Multiaddr>().map_err(anyhow::Error::from)) + .collect::<Result<Vec<_>>>() + }) + .transpose()?; + let connect_peer = matches.values_of("connect") + .map(|v| { + v.map(|s| { + s.parse::<MultiaddrWithPeerId>().map_err(anyhow::Error::from) + }) + .collect::<Result<Vec<_>>>() + }) + .transpose()?; let state_dir = Profile::state_dir_path(&name)?; @@ -50,21 +60,69 @@ async fn profile_serve(matches: &ArgMatches) -> Result<()> { log::info!("Profile HEAD = {}", head); } + if let Some(listen) = listen_addrs { + for l in listen { + log::debug!("Adding listening address: {}", l); + profile.listen_on(l).await?; + } + } + + { + let addrs = profile.client().own_addresses().await?; + if addrs.is_empty() { + log::error!("No own address"); + } else { + for addr in addrs { + log::info!("Own addr: {}", addr); + } + } + } + if let Some(connect_to) = connect_peer { - log::info!("Connecting to {:?}", connect_to); - profile.connect(connect_to).await?; + for c in connect_to { + log::info!("Connecting to {:?}", c); + profile.connect(c).await?; + } } + let mut gossip_channel = Box::pin({ + profile.client() + .pubsub_subscribe("distrox".to_string()) + .await + .map(|stream| { + use distrox_lib::gossip::deserializer::GossipDeserializer; + use distrox_lib::gossip::deserializer::LogStrategy; + + GossipDeserializer::<LogStrategy>::new().run(stream) + })? + }); + let running = Arc::new(AtomicBool::new(true)); let r = running.clone(); + let own_peer_id = profile.client().own_id().await?; + ctrlc::set_handler(move || { r.store(false, Ordering::SeqCst); }).context("Error setting Ctrl-C handler")?; log::info!("Serving..."); while running.load(Ordering::SeqCst) { - tokio::time::sleep(std::time::Duration::from_millis(500)).await // sleep not so busy + use futures::stream::StreamExt; + use distrox_lib::gossip::GossipMessage; + + tokio::time::sleep(std::time::Duration::from_millis(500)).await; // sleep not so busy + + tokio::select! { + own = profile.gossip_own_state("distrox".to_string()) => own?, + other = gossip_channel.next() => { + let gossip_myself = other.as_ref().map(|(source, _)| *source == own_peer_id).unwrap_or(false); + + if !gossip_myself { + log::trace!("Received gossip: {:?}", other); + } + } + } } log::info!("Shutting down..."); profile.exit().await diff --git a/lib/src/client.rs b/lib/src/client.rs index 2128f76..fadcd77 100644 --- a/lib/src/client.rs +++ b/lib/src/client.rs @@ -37,6 +37,22 @@ impl Client { .map_err(anyhow::Error::from) } + pub async fn own_addresses(&self) -> Result<Vec<ipfs::Multiaddr>> { + self.ipfs + .identity() + .await + .map(|(_, addrs)| addrs) + .map_err(anyhow::Error::from) + } + + pub async fn listen_on(&self, addr: ipfs::Multiaddr) -> Result<()> { + self.ipfs + .add_listening_address(addr) + .await + .map(|_| ()) + .map_err(anyhow::Error::from) + } + pub async fn connect(&self, peer: ipfs::MultiaddrWithPeerId) -> Result<()> { self.ipfs.connect(peer).await } diff --git a/lib/src/profile/mod.rs b/lib/src/profile/mod.rs index 3b1e063..00087f3 100644 --- a/lib/src/profile/mod.rs +++ b/lib/src/profile/mod.rs @@ -63,6 +63,10 @@ impl Profile { self.state.profile_head().as_ref() } + pub async fn listen_on(&self, addr: ipfs::Multiaddr) -> Result<()> { + self.client.listen_on(addr).await + } + pub async fn connect(&self, peer: ipfs::MultiaddrWithPeerId) -> Result<()> { self.client.connect(peer).await } |