summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-12-20 19:13:18 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-12-20 19:13:18 +0100
commit1034a454d0900130796fd8941bbc9696bcc41b29 (patch)
tree085bf79d2d6a98144619f16906147f1331565786
parenta23b897b5c3c9ee721b793e26401d1863d97f84e (diff)
parent9a22b05a96f5ed066d588ab3aadf620f242d7ae5 (diff)
Merge branch 'gossipping-cli'
-rw-r--r--cli/src/cli.rs10
-rw-r--r--cli/src/profile.rs72
-rw-r--r--lib/src/client.rs16
-rw-r--r--lib/src/profile/mod.rs4
4 files changed, 95 insertions, 7 deletions
diff --git a/cli/src/cli.rs b/cli/src/cli.rs
index ee8d608..4b52ad6 100644
--- a/cli/src/cli.rs
+++ b/cli/src/cli.rs
@@ -46,9 +46,19 @@ pub fn app<'a>() -> App<'a> {
.long("connect")
.required(false)
.takes_value(true)
+ .multiple(true)
.value_name("MULTIADDR")
.about("Connect to MULTIADDR as well")
)
+
+ .arg(Arg::new("listen")
+ .long("listen")
+ .required(false)
+ .takes_value(true)
+ .multiple(true)
+ .value_name("MULTIADDR")
+ .about("Listen on MULTIADDR, e.g. '/ip4/127.0.0.1/tcp/10000'")
+ )
)
.subcommand(App::new("cat")
diff --git a/cli/src/profile.rs b/cli/src/profile.rs
index 87b1e54..cf1a2f0 100644
--- a/cli/src/profile.rs
+++ b/cli/src/profile.rs
@@ -36,10 +36,20 @@ async fn profile_serve(matches: &ArgMatches) -> Result<()> {
use ipfs::MultiaddrWithPeerId;
let name = matches.value_of("name").map(String::from).unwrap(); // required
- let connect_peer = matches.value_of("connect").map(|s| {
- s.parse::<MultiaddrWithPeerId>()
- .map_err(anyhow::Error::from)
- }).transpose()?;
+ let listen_addrs = matches.values_of("listen")
+ .map(|v| {
+ v.map(|s| s.parse::<ipfs::Multiaddr>().map_err(anyhow::Error::from))
+ .collect::<Result<Vec<_>>>()
+ })
+ .transpose()?;
+ let connect_peer = matches.values_of("connect")
+ .map(|v| {
+ v.map(|s| {
+ s.parse::<MultiaddrWithPeerId>().map_err(anyhow::Error::from)
+ })
+ .collect::<Result<Vec<_>>>()
+ })
+ .transpose()?;
let state_dir = Profile::state_dir_path(&name)?;
@@ -50,21 +60,69 @@ async fn profile_serve(matches: &ArgMatches) -> Result<()> {
log::info!("Profile HEAD = {}", head);
}
+ if let Some(listen) = listen_addrs {
+ for l in listen {
+ log::debug!("Adding listening address: {}", l);
+ profile.listen_on(l).await?;
+ }
+ }
+
+ {
+ let addrs = profile.client().own_addresses().await?;
+ if addrs.is_empty() {
+ log::error!("No own address");
+ } else {
+ for addr in addrs {
+ log::info!("Own addr: {}", addr);
+ }
+ }
+ }
+
if let Some(connect_to) = connect_peer {
- log::info!("Connecting to {:?}", connect_to);
- profile.connect(connect_to).await?;
+ for c in connect_to {
+ log::info!("Connecting to {:?}", c);
+ profile.connect(c).await?;
+ }
}
+ let mut gossip_channel = Box::pin({
+ profile.client()
+ .pubsub_subscribe("distrox".to_string())
+ .await
+ .map(|stream| {
+ use distrox_lib::gossip::deserializer::GossipDeserializer;
+ use distrox_lib::gossip::deserializer::LogStrategy;
+
+ GossipDeserializer::<LogStrategy>::new().run(stream)
+ })?
+ });
+
let running = Arc::new(AtomicBool::new(true));
let r = running.clone();
+ let own_peer_id = profile.client().own_id().await?;
+
ctrlc::set_handler(move || {
r.store(false, Ordering::SeqCst);
}).context("Error setting Ctrl-C handler")?;
log::info!("Serving...");
while running.load(Ordering::SeqCst) {
- tokio::time::sleep(std::time::Duration::from_millis(500)).await // sleep not so busy
+ use futures::stream::StreamExt;
+ use distrox_lib::gossip::GossipMessage;
+
+ tokio::time::sleep(std::time::Duration::from_millis(500)).await; // sleep not so busy
+
+ tokio::select! {
+ own = profile.gossip_own_state("distrox".to_string()) => own?,
+ other = gossip_channel.next() => {
+ let gossip_myself = other.as_ref().map(|(source, _)| *source == own_peer_id).unwrap_or(false);
+
+ if !gossip_myself {
+ log::trace!("Received gossip: {:?}", other);
+ }
+ }
+ }
}
log::info!("Shutting down...");
profile.exit().await
diff --git a/lib/src/client.rs b/lib/src/client.rs
index 2128f76..fadcd77 100644
--- a/lib/src/client.rs
+++ b/lib/src/client.rs
@@ -37,6 +37,22 @@ impl Client {
.map_err(anyhow::Error::from)
}
+ pub async fn own_addresses(&self) -> Result<Vec<ipfs::Multiaddr>> {
+ self.ipfs
+ .identity()
+ .await
+ .map(|(_, addrs)| addrs)
+ .map_err(anyhow::Error::from)
+ }
+
+ pub async fn listen_on(&self, addr: ipfs::Multiaddr) -> Result<()> {
+ self.ipfs
+ .add_listening_address(addr)
+ .await
+ .map(|_| ())
+ .map_err(anyhow::Error::from)
+ }
+
pub async fn connect(&self, peer: ipfs::MultiaddrWithPeerId) -> Result<()> {
self.ipfs.connect(peer).await
}
diff --git a/lib/src/profile/mod.rs b/lib/src/profile/mod.rs
index 3b1e063..00087f3 100644
--- a/lib/src/profile/mod.rs
+++ b/lib/src/profile/mod.rs
@@ -63,6 +63,10 @@ impl Profile {
self.state.profile_head().as_ref()
}
+ pub async fn listen_on(&self, addr: ipfs::Multiaddr) -> Result<()> {
+ self.client.listen_on(addr).await
+ }
+
pub async fn connect(&self, peer: ipfs::MultiaddrWithPeerId) -> Result<()> {
self.client.connect(peer).await
}