diff options
author | Ferris Tseng <ferristseng@fastmail.fm> | 2021-02-23 23:26:48 -0500 |
---|---|---|
committer | Ferris Tseng <ferristseng@fastmail.fm> | 2021-02-23 23:26:48 -0500 |
commit | 1dbab207ece88a47f92b951a83ef8d95dc40fe1c (patch) | |
tree | e93fe962328364bde9e5ac2bbdb80c62746ff28a /ipfs-api-examples/examples/pubsub.rs | |
parent | fd4a1e309d565d88c5e394e9e8411f4968499d18 (diff) |
migrate examples to separate crate
Diffstat (limited to 'ipfs-api-examples/examples/pubsub.rs')
-rw-r--r-- | ipfs-api-examples/examples/pubsub.rs | 84 |
1 files changed, 84 insertions, 0 deletions
diff --git a/ipfs-api-examples/examples/pubsub.rs b/ipfs-api-examples/examples/pubsub.rs new file mode 100644 index 0000000..0311ed7 --- /dev/null +++ b/ipfs-api-examples/examples/pubsub.rs @@ -0,0 +1,84 @@ +// Copyright 2017 rust-ipfs-api Developers +// +// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or +// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or +// http://opensource.org/licenses/MIT>, at your option. This file may not be +// copied, modified, or distributed except according to those terms. +// + +use futures::{future, select, FutureExt, StreamExt, TryStreamExt}; +use ipfs_api_examples::{ + ipfs_api::{IpfsApi, IpfsClient}, + tokio::time, +}; +use std::time::Duration; +use tokio_stream::wrappers::IntervalStream; + +static TOPIC: &'static str = "test"; + +fn get_client() -> IpfsClient { + eprintln!("connecting to localhost:5001..."); + + IpfsClient::default() +} + +// Creates an Ipfs client, and simultaneously publishes and reads from a pubsub +// topic. +// +#[ipfs_api_examples::main] +async fn main() { + tracing_subscriber::fmt::init(); + + eprintln!("note: ipfs must be run with the --enable-pubsub-experiment flag"); + + let publish_client = get_client(); + + // This block will execute a repeating function that sends + // a message to the "test" topic. + // + let interval = time::interval(Duration::from_secs(1)); + let mut publish = IntervalStream::new(interval) + .then(|_| future::ok(())) // Coerce the stream into a TryStream + .try_for_each(|_| { + eprintln!(); + eprintln!("publishing message..."); + + publish_client + .pubsub_pub(TOPIC, "Hello World!") + .boxed_local() + }) + .boxed_local() + .fuse(); + + // This block will execute a future that suscribes to a topic, + // and reads any incoming messages. + // + let mut subscribe = { + let client = get_client(); + + client + .pubsub_sub(TOPIC, false) + .take(5) + .try_for_each(|msg| { + eprintln!(); + eprintln!("received ({:?})", msg); + + future::ok(()) + }) + .fuse() + }; + + eprintln!(); + eprintln!("publish messages to ({})...", TOPIC); + eprintln!("waiting for messages from ({})...", TOPIC); + + select! { + res = publish => if let Err(e) = res { + eprintln!("error publishing messages: {}", e); + }, + res = subscribe => match res { + Ok(_) => eprintln!("done reading messages..."), + Err(e) => eprintln!("error reading messages: {}", e) + }, + } +} |