summaryrefslogtreecommitdiffstats
path: root/ipfs-api/examples/pubsub.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ipfs-api/examples/pubsub.rs')
-rw-r--r--ipfs-api/examples/pubsub.rs80
1 files changed, 45 insertions, 35 deletions
diff --git a/ipfs-api/examples/pubsub.rs b/ipfs-api/examples/pubsub.rs
index 924eab6..465b86b 100644
--- a/ipfs-api/examples/pubsub.rs
+++ b/ipfs-api/examples/pubsub.rs
@@ -6,16 +6,18 @@
// copied, modified, or distributed except according to those terms.
//
-use futures::{Future, Stream};
+#[macro_use]
+extern crate futures;
+
+use futures::{future, FutureExt, StreamExt, TryStreamExt};
use ipfs_api::IpfsClient;
-use std::time::{Duration, Instant};
-use tokio::runtime::current_thread::Runtime;
-use tokio_timer::Interval;
+use std::time::Duration;
+use tokio::time;
static TOPIC: &'static str = "test";
fn get_client() -> IpfsClient {
- println!("connecting to localhost:5001...");
+ eprintln!("connecting to localhost:5001...");
IpfsClient::default()
}
@@ -23,50 +25,58 @@ fn get_client() -> IpfsClient {
// Creates an Ipfs client, and simultaneously publishes and reads from a pubsub
// topic.
//
-fn main() {
- let mut rt = Runtime::new().expect("tokio runtime");
+#[cfg_attr(feature = "actix", actix_rt::main)]
+#[cfg_attr(feature = "hyper", tokio::main)]
+async fn main() {
+ eprintln!("note: ipfs must be run with the --enable-pubsub-experiment flag");
+
+ let publish_client = get_client();
// This block will execute a repeating function that sends
// a message to the "test" topic.
//
- {
- let client = get_client();
- let publish = Interval::new(Instant::now(), Duration::from_secs(1))
- .map_err(|e| eprintln!("{}", e))
- .for_each(move |_| {
- println!();
- println!("publishing message...");
-
- client
- .pubsub_pub(TOPIC, "Hello World!")
- .map_err(|e| eprintln!("{}", e))
- });
+ let mut publish = time::interval(Duration::from_secs(1))
+ .then(|_| future::ok(())) // Coerce the stream into a TryStream
+ .try_for_each(|_| {
+ eprintln!();
+ eprintln!("publishing message...");
- println!();
- println!("starting task to publish messages to ({})...", TOPIC);
-
- rt.spawn(publish);
- }
+ publish_client
+ .pubsub_pub(TOPIC, "Hello World!")
+ .boxed_local()
+ })
+ .boxed_local()
+ .fuse();
// This block will execute a future that suscribes to a topic,
// and reads any incoming messages.
//
- {
+ let mut subscribe = {
let client = get_client();
- let req = client.pubsub_sub(TOPIC, false);
- println!();
- println!("waiting for messages on ({})...", TOPIC);
- let fut = req
+ client
+ .pubsub_sub(TOPIC, false)
.take(5)
- .for_each(|msg| {
- println!();
- println!("received ({:?})", msg);
+ .try_for_each(|msg| {
+ eprintln!();
+ eprintln!("received ({:?})", msg);
- Ok(())
+ future::ok(())
})
- .map_err(|e| eprintln!("{}", e));
+ .fuse()
+ };
+
+ eprintln!();
+ eprintln!("publish messages to ({})...", TOPIC);
+ eprintln!("waiting for messages from ({})...", TOPIC);
- rt.block_on(fut).expect("successful response");
+ select! {
+ res = publish => if let Err(e) = res {
+ eprintln!("error publishing messages: {}", e);
+ },
+ res = subscribe => match res {
+ Ok(_) => eprintln!("done reading messages..."),
+ Err(e) => eprintln!("error reading messages: {}", e)
+ },
}
}