diff options
Diffstat (limited to 'ipfs-api/examples/pubsub.rs')
-rw-r--r-- | ipfs-api/examples/pubsub.rs | 80 |
1 files changed, 45 insertions, 35 deletions
diff --git a/ipfs-api/examples/pubsub.rs b/ipfs-api/examples/pubsub.rs index 924eab6..465b86b 100644 --- a/ipfs-api/examples/pubsub.rs +++ b/ipfs-api/examples/pubsub.rs @@ -6,16 +6,18 @@ // copied, modified, or distributed except according to those terms. // -use futures::{Future, Stream}; +#[macro_use] +extern crate futures; + +use futures::{future, FutureExt, StreamExt, TryStreamExt}; use ipfs_api::IpfsClient; -use std::time::{Duration, Instant}; -use tokio::runtime::current_thread::Runtime; -use tokio_timer::Interval; +use std::time::Duration; +use tokio::time; static TOPIC: &'static str = "test"; fn get_client() -> IpfsClient { - println!("connecting to localhost:5001..."); + eprintln!("connecting to localhost:5001..."); IpfsClient::default() } @@ -23,50 +25,58 @@ fn get_client() -> IpfsClient { // Creates an Ipfs client, and simultaneously publishes and reads from a pubsub // topic. // -fn main() { - let mut rt = Runtime::new().expect("tokio runtime"); +#[cfg_attr(feature = "actix", actix_rt::main)] +#[cfg_attr(feature = "hyper", tokio::main)] +async fn main() { + eprintln!("note: ipfs must be run with the --enable-pubsub-experiment flag"); + + let publish_client = get_client(); // This block will execute a repeating function that sends // a message to the "test" topic. // - { - let client = get_client(); - let publish = Interval::new(Instant::now(), Duration::from_secs(1)) - .map_err(|e| eprintln!("{}", e)) - .for_each(move |_| { - println!(); - println!("publishing message..."); - - client - .pubsub_pub(TOPIC, "Hello World!") - .map_err(|e| eprintln!("{}", e)) - }); + let mut publish = time::interval(Duration::from_secs(1)) + .then(|_| future::ok(())) // Coerce the stream into a TryStream + .try_for_each(|_| { + eprintln!(); + eprintln!("publishing message..."); - println!(); - println!("starting task to publish messages to ({})...", TOPIC); - - rt.spawn(publish); - } + publish_client + .pubsub_pub(TOPIC, "Hello World!") + .boxed_local() + }) + .boxed_local() + .fuse(); // This block will execute a future that suscribes to a topic, // and reads any incoming messages. // - { + let mut subscribe = { let client = get_client(); - let req = client.pubsub_sub(TOPIC, false); - println!(); - println!("waiting for messages on ({})...", TOPIC); - let fut = req + client + .pubsub_sub(TOPIC, false) .take(5) - .for_each(|msg| { - println!(); - println!("received ({:?})", msg); + .try_for_each(|msg| { + eprintln!(); + eprintln!("received ({:?})", msg); - Ok(()) + future::ok(()) }) - .map_err(|e| eprintln!("{}", e)); + .fuse() + }; + + eprintln!(); + eprintln!("publish messages to ({})...", TOPIC); + eprintln!("waiting for messages from ({})...", TOPIC); - rt.block_on(fut).expect("successful response"); + select! { + res = publish => if let Err(e) = res { + eprintln!("error publishing messages: {}", e); + }, + res = subscribe => match res { + Ok(_) => eprintln!("done reading messages..."), + Err(e) => eprintln!("error reading messages: {}", e) + }, } } |