diff options
Diffstat (limited to 'ipfs-api-examples/examples/pubsub.rs')
-rw-r--r-- | ipfs-api-examples/examples/pubsub.rs | 82 |
1 files changed, 82 insertions, 0 deletions
diff --git a/ipfs-api-examples/examples/pubsub.rs b/ipfs-api-examples/examples/pubsub.rs new file mode 100644 index 0000000..8944f3c --- /dev/null +++ b/ipfs-api-examples/examples/pubsub.rs @@ -0,0 +1,82 @@ +// Copyright 2017 rust-ipfs-api Developers +// +// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or +// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or +// http://opensource.org/licenses/MIT>, at your option. This file may not be +// copied, modified, or distributed except according to those terms. +// + +use futures::{future, select, FutureExt, StreamExt, TryStreamExt}; +use ipfs_api_examples::ipfs_api::{IpfsApi, IpfsClient}; +use std::time::Duration; +use tokio::time; +use tokio_stream::wrappers::IntervalStream; + +static TOPIC: &'static str = "test"; + +fn get_client() -> IpfsClient { + eprintln!("connecting to localhost:5001..."); + + IpfsClient::default() +} + +// Creates an Ipfs client, and simultaneously publishes and reads from a pubsub +// topic. +// +#[ipfs_api_examples::main] +async fn main() { + tracing_subscriber::fmt::init(); + + eprintln!("note: ipfs must be run with the --enable-pubsub-experiment flag"); + + let publish_client = get_client(); + + // This block will execute a repeating function that sends + // a message to the "test" topic. + // + let interval = time::interval(Duration::from_secs(1)); + let mut publish = IntervalStream::new(interval) + .then(|_| future::ok(())) // Coerce the stream into a TryStream + .try_for_each(|_| { + eprintln!(); + eprintln!("publishing message..."); + + publish_client + .pubsub_pub(TOPIC, "Hello World!") + .boxed_local() + }) + .boxed_local() + .fuse(); + + // This block will execute a future that suscribes to a topic, + // and reads any incoming messages. + // + let mut subscribe = { + let client = get_client(); + + client + .pubsub_sub(TOPIC, false) + .take(5) + .try_for_each(|msg| { + eprintln!(); + eprintln!("received ({:?})", msg); + + future::ok(()) + }) + .fuse() + }; + + eprintln!(); + eprintln!("publish messages to ({})...", TOPIC); + eprintln!("waiting for messages from ({})...", TOPIC); + + select! { + res = publish => if let Err(e) = res { + eprintln!("error publishing messages: {}", e); + }, + res = subscribe => match res { + Ok(_) => eprintln!("done reading messages..."), + Err(e) => eprintln!("error reading messages: {}", e) + }, + } +} |