From d8e0af6e5c2f4c37763ed9e22adae6b2ce2bd016 Mon Sep 17 00:00:00 2001 From: Ferris Tseng Date: Thu, 12 Oct 2017 20:34:35 -0400 Subject: refactor examples; add pubsub example --- ipfs-api/examples/add_file.rs | 21 ++++----- ipfs-api/examples/bootstrap_default.rs | 70 ++++++++++++++-------------- ipfs-api/examples/get_commands.rs | 16 ++----- ipfs-api/examples/get_stats.rs | 83 ++++++++++++++++------------------ ipfs-api/examples/get_swarm.rs | 50 ++++++++++---------- ipfs-api/examples/get_version.rs | 17 +++---- ipfs-api/examples/ping_peer.rs | 55 ++++++++++------------ ipfs-api/examples/pubsub.rs | 76 +++++++++++++++++++++++++++++++ 8 files changed, 215 insertions(+), 173 deletions(-) create mode 100644 ipfs-api/examples/pubsub.rs (limited to 'ipfs-api/examples') diff --git a/ipfs-api/examples/add_file.rs b/ipfs-api/examples/add_file.rs index 2dccfa9..da991cf 100644 --- a/ipfs-api/examples/add_file.rs +++ b/ipfs-api/examples/add_file.rs @@ -5,22 +5,17 @@ use ipfs_api::IpfsClient; use std::fs::File; use tokio_core::reactor::Core; - // Creates an Ipfs client, and adds this source file to Ipfs. // fn main() { - if let Ok(mut core) = Core::new() { - println!("note: this must be run in the root of the project repository"); - println!("connecting to localhost:5001..."); + println!("note: this must be run in the root of the project repository"); + println!("connecting to localhost:5001..."); - let client = - IpfsClient::new(&core.handle(), "localhost", 5001).expect("expected a valid url"); - let file = File::open(file!()).expect("could not read source file"); - let req = client.add(file); - let add = core.run(req).expect("expected a valid response"); + let mut core = Core::new().expect("expected event loop"); + let client = IpfsClient::default(&core.handle()); + let file = File::open(file!()).expect("could not read source file"); + let req = client.add(file); + let add = core.run(req).expect("expected a valid response"); - println!("added file: {:?}", add); - } else { - println!("failed to create event loop"); - } + println!("added file: {:?}", add); } diff --git a/ipfs-api/examples/bootstrap_default.rs b/ipfs-api/examples/bootstrap_default.rs index d845d88..90126b5 100644 --- a/ipfs-api/examples/bootstrap_default.rs +++ b/ipfs-api/examples/bootstrap_default.rs @@ -4,46 +4,42 @@ extern crate tokio_core; use ipfs_api::IpfsClient; use tokio_core::reactor::Core; - // Lists clients in bootstrap list, then adds the default list, then removes // them, and readds them. // fn main() { - if let Ok(mut core) = Core::new() { - println!("connecting to localhost:5001..."); - - let client = - IpfsClient::new(&core.handle(), "localhost", 5001).expect("expected a valid url"); - let bootstrap = client.bootstrap_list(); - let bootstrap = core.run(bootstrap).expect("expected a valid response"); - - println!("current bootstrap peers:"); - for peer in bootstrap.peers { - println!(" {}", peer); - } - - println!(""); - println!("dropping all bootstrap peers..."); - - let drop = client.bootstrap_rm_all(); - let drop = core.run(drop).expect("expected a valid response"); - - println!("dropped:"); - for peer in drop.peers { - println!(" {}", peer); - } - - println!(""); - println!("adding default peers..."); - - let add = client.bootstrap_add_default(); - let add = core.run(add).expect("expected a valid response"); - - println!("added:"); - for peer in add.peers { - println!(" {}", peer); - } - } else { - println!("failed to create event loop"); + println!("connecting to localhost:5001..."); + + let mut core = Core::new().expect("expected event loop"); + let client = IpfsClient::default(&core.handle()); + + let bootstrap = client.bootstrap_list(); + let bootstrap = core.run(bootstrap).expect("expected a valid response"); + + println!("current bootstrap peers:"); + for peer in bootstrap.peers { + println!(" {}", peer); + } + + println!(""); + println!("dropping all bootstrap peers..."); + + let drop = client.bootstrap_rm_all(); + let drop = core.run(drop).expect("expected a valid response"); + + println!("dropped:"); + for peer in drop.peers { + println!(" {}", peer); + } + + println!(""); + println!("adding default peers..."); + + let add = client.bootstrap_add_default(); + let add = core.run(add).expect("expected a valid response"); + + println!("added:"); + for peer in add.peers { + println!(" {}", peer); } } diff --git a/ipfs-api/examples/get_commands.rs b/ipfs-api/examples/get_commands.rs index 1d849f0..ecf5b0e 100644 --- a/ipfs-api/examples/get_commands.rs +++ b/ipfs-api/examples/get_commands.rs @@ -4,7 +4,6 @@ extern crate tokio_core; use ipfs_api::{response, IpfsClient}; use tokio_core::reactor::Core; - fn print_recursive(indent: usize, cmd: &response::CommandsResponse) { let cmd_indent = " ".repeat(indent * 4); let opt_indent = " ".repeat((indent + 1) * 4); @@ -26,20 +25,15 @@ fn print_recursive(indent: usize, cmd: &response::CommandsResponse) { } } - // Creates an Ipfs client, and gets a list of available commands from the // Ipfs server. // fn main() { - if let Ok(mut core) = Core::new() { - println!("connecting to localhost:5001..."); + println!("connecting to localhost:5001..."); - let client = - IpfsClient::new(&core.handle(), "localhost", 5001).expect("expected a valid url"); - let req = client.commands(); + let mut core = Core::new().expect("expected event loop"); + let client = IpfsClient::default(&core.handle()); + let req = client.commands(); - print_recursive(0, &core.run(req).expect("expected a valid response")); - } else { - println!("failed to create event loop"); - } + print_recursive(0, &core.run(req).expect("expected a valid response")); } diff --git a/ipfs-api/examples/get_stats.rs b/ipfs-api/examples/get_stats.rs index a0cde97..d3c7ea1 100644 --- a/ipfs-api/examples/get_stats.rs +++ b/ipfs-api/examples/get_stats.rs @@ -4,51 +4,46 @@ extern crate tokio_core; use ipfs_api::IpfsClient; use tokio_core::reactor::Core; - // Creates an Ipfs client, and gets some stats about the Ipfs server. // fn main() { - if let Ok(mut core) = Core::new() { - println!("connecting to localhost:5001..."); - - let client = - IpfsClient::new(&core.handle(), "localhost", 5001).expect("expected a valid url"); - - let bitswap_stats = client.stats_bitswap(); - let bitswap_stats = core.run(bitswap_stats).expect("expected a valid response"); - - let bw_stats = client.stats_bw(); - let bw_stats = core.run(bw_stats).expect("expected a valid response"); - - let repo_stats = client.stats_repo(); - let repo_stats = core.run(repo_stats).expect("expected a valid response"); - - println!("bitswap stats:"); - println!(" blocks recv: {}", bitswap_stats.blocks_received); - println!(" data recv: {}", bitswap_stats.data_received); - println!(" blocks sent: {}", bitswap_stats.blocks_sent); - println!(" data sent: {}", bitswap_stats.data_sent); - println!( - " peers: {}", - bitswap_stats.peers.join("\n ") - ); - println!( - " wantlist: {}", - bitswap_stats.wantlist.join("\n ") - ); - println!(""); - println!("bandwidth stats:"); - println!(" total in: {}", bw_stats.total_in); - println!(" total out: {}", bw_stats.total_out); - println!(" rate in: {}", bw_stats.rate_in); - println!(" rate out: {}", bw_stats.rate_out); - println!(""); - println!("repo stats:"); - println!(" num objs: {}", repo_stats.num_objects); - println!(" repo size: {}", repo_stats.repo_size); - println!(" repo path: {}", repo_stats.repo_path); - println!(" version : {}", repo_stats.version); - } else { - println!("failed to create event loop"); - } + println!("connecting to localhost:5001..."); + + let mut core = Core::new().expect("expected event loop"); + let client = IpfsClient::default(&core.handle()); + + let bitswap_stats = client.stats_bitswap(); + let bitswap_stats = core.run(bitswap_stats).expect("expected a valid response"); + + let bw_stats = client.stats_bw(); + let bw_stats = core.run(bw_stats).expect("expected a valid response"); + + let repo_stats = client.stats_repo(); + let repo_stats = core.run(repo_stats).expect("expected a valid response"); + + println!("bitswap stats:"); + println!(" blocks recv: {}", bitswap_stats.blocks_received); + println!(" data recv: {}", bitswap_stats.data_received); + println!(" blocks sent: {}", bitswap_stats.blocks_sent); + println!(" data sent: {}", bitswap_stats.data_sent); + println!( + " peers: {}", + bitswap_stats.peers.join("\n ") + ); + println!( + " wantlist: {}", + bitswap_stats.wantlist.join("\n ") + ); + println!(""); + println!("bandwidth stats:"); + println!(" total in: {}", bw_stats.total_in); + println!(" total out: {}", bw_stats.total_out); + println!(" rate in: {}", bw_stats.rate_in); + println!(" rate out: {}", bw_stats.rate_out); + println!(""); + println!("repo stats:"); + println!(" num objs: {}", repo_stats.num_objects); + println!(" repo size: {}", repo_stats.repo_size); + println!(" repo path: {}", repo_stats.repo_path); + println!(" version : {}", repo_stats.version); } diff --git a/ipfs-api/examples/get_swarm.rs b/ipfs-api/examples/get_swarm.rs index 6ee2d1a..34a4096 100644 --- a/ipfs-api/examples/get_swarm.rs +++ b/ipfs-api/examples/get_swarm.rs @@ -4,40 +4,36 @@ extern crate tokio_core; use ipfs_api::IpfsClient; use tokio_core::reactor::Core; - // Creates an Ipfs client, and gets information about your local address, and // connected peers. // fn main() { - if let Ok(mut core) = Core::new() { - println!("connecting to localhost:5001..."); - - let client = - IpfsClient::new(&core.handle(), "localhost", 5001).expect("expected a valid url"); + println!("connecting to localhost:5001..."); - let local = client.swarm_addrs_local(); - let local = core.run(local).expect("expected a valid response"); + let mut core = Core::new().expect("expected event loop"); + let client = IpfsClient::default(&core.handle()); - println!("your addrs:"); - for addr in local.strings { - println!(" {}", addr); - } - println!(""); + let local = client.swarm_addrs_local(); + let local = core.run(local).expect("expected a valid response"); - let connected = client.swarm_peers(); - let connected = core.run(connected).expect("expected a valid response"); + println!(""); + println!("your addrs:"); + for addr in local.strings { + println!(" {}", addr); + } - println!("connected:"); - for peer in connected.peers { - let streams: Vec<&str> = peer.streams.iter().map(|s| &s.protocol[..]).collect(); - println!(" addr: {}", peer.addr); - println!(" peer: {}", peer.peer); - println!(" latency: {}", peer.latency); - println!(" muxer: {}", peer.muxer); - println!(" streams: {}", streams.join(", ")); - println!(""); - } - } else { - println!("failed to create event loop"); + let connected = client.swarm_peers(); + let connected = core.run(connected).expect("expected a valid response"); + + println!(""); + println!("connected:"); + for peer in connected.peers { + let streams: Vec<&str> = peer.streams.iter().map(|s| &s.protocol[..]).collect(); + println!(" addr: {}", peer.addr); + println!(" peer: {}", peer.peer); + println!(" latency: {}", peer.latency); + println!(" muxer: {}", peer.muxer); + println!(" streams: {}", streams.join(", ")); + println!(""); } } diff --git a/ipfs-api/examples/get_version.rs b/ipfs-api/examples/get_version.rs index 8f743ab..4eaf7cc 100644 --- a/ipfs-api/examples/get_version.rs +++ b/ipfs-api/examples/get_version.rs @@ -4,20 +4,15 @@ extern crate tokio_core; use ipfs_api::IpfsClient; use tokio_core::reactor::Core; - // Creates an Ipfs client, and gets the version of the Ipfs server. // fn main() { - if let Ok(mut core) = Core::new() { - println!("connecting to localhost:5001..."); + println!("connecting to localhost:5001..."); - let client = - IpfsClient::new(&core.handle(), "localhost", 5001).expect("expected a valid url"); - let req = client.version(); - let version = core.run(req).expect("expected a valid response"); + let mut core = Core::new().expect("expected event loop"); + let client = IpfsClient::default(&core.handle()); + let req = client.version(); + let version = core.run(req).expect("expected a valid response"); - println!("version: {:?}", version.version); - } else { - println!("failed to create event loop"); - } + println!("version: {:?}", version.version); } diff --git a/ipfs-api/examples/ping_peer.rs b/ipfs-api/examples/ping_peer.rs index 8bd1346..a4d5f5a 100644 --- a/ipfs-api/examples/ping_peer.rs +++ b/ipfs-api/examples/ping_peer.rs @@ -6,48 +6,43 @@ use futures::stream::Stream; use ipfs_api::IpfsClient; use tokio_core::reactor::Core; - // Creates an Ipfs client, discovers a connected peer, and pings it using the // streaming Api, and by collecting it into a collection. // fn main() { - if let Ok(mut core) = Core::new() { - println!("connecting to localhost:5001..."); + println!("connecting to localhost:5001..."); - let client = - IpfsClient::new(&core.handle(), "localhost", 5001).expect("expected a valid url"); + let mut core = Core::new().expect("expected event loop"); + let client = IpfsClient::default(&core.handle()); - println!(""); - println!("discovering connected peers..."); + println!(""); + println!("discovering connected peers..."); - let connected = client.swarm_peers(); - let connected = core.run(connected).expect("expected a valid response"); + let connected = client.swarm_peers(); + let connected = core.run(connected).expect("expected a valid response"); - let peer = connected.peers.iter().next().expect( - "expected at least one peer", - ); + let peer = connected.peers.iter().next().expect( + "expected at least one peer", + ); - println!(""); - println!("discovered peer ({})", peer.peer); - println!(""); - println!("streaming 10 pings..."); - let req = client.ping(&peer.peer[..], Some(10)); + println!(""); + println!("discovered peer ({})", peer.peer); + println!(""); + println!("streaming 10 pings..."); + let req = client.ping(&peer.peer[..], Some(10)); - core.run(req.for_each(|ping| { - println!("{:?}", ping); - Ok(()) - })).expect("expected a valid response"); + core.run(req.for_each(|ping| { + println!("{:?}", ping); + Ok(()) + })).expect("expected a valid response"); - println!(""); - println!("gathering 15 pings..."); + println!(""); + println!("gathering 15 pings..."); - let req = client.ping(&peer.peer[..], Some(15)); - let pings: Vec<_> = core.run(req.collect()).expect("expected a valid response"); + let req = client.ping(&peer.peer[..], Some(15)); + let pings: Vec<_> = core.run(req.collect()).expect("expected a valid response"); - for ping in pings.iter() { - println!("got response ({:?}) in ({})...", ping.text, ping.time); - } - } else { - println!("failed to create event loop"); + for ping in pings.iter() { + println!("got response ({:?}) in ({})...", ping.text, ping.time); } } diff --git a/ipfs-api/examples/pubsub.rs b/ipfs-api/examples/pubsub.rs new file mode 100644 index 0000000..9948fbb --- /dev/null +++ b/ipfs-api/examples/pubsub.rs @@ -0,0 +1,76 @@ +extern crate futures; +extern crate ipfs_api; +extern crate tokio_core; +extern crate tokio_timer; + +use futures::Future; +use futures::stream::Stream; +use ipfs_api::{response, IpfsClient}; +use std::thread; +use std::time::Duration; +use tokio_core::reactor::{Core, Handle}; +use tokio_timer::Timer; + +static TOPIC: &'static str = "test"; + +fn get_client(handle: &Handle) -> IpfsClient { + println!("connecting to localhost:5001..."); + + IpfsClient::default(handle) +} + +// Creates an Ipfs client, and simultaneously publishes and reads from a pubsub +// topic. +// +fn main() { + // This block will execute a repeating function that sends + // a message to the "test" topic. + // + thread::spawn(move || { + let mut event_loop = Core::new().expect("expected event loop"); + let handle = event_loop.handle(); + let timer = Timer::default(); + let publish = timer + .interval(Duration::from_secs(1)) + .map_err(|_| { + response::Error::Uncategorized("timeout error".to_string()) + }) + .for_each(move |_| { + println!(""); + println!("publishing message..."); + + get_client(&handle) + .pubsub_pub(TOPIC, "Hello World!") + .then(|_| { + println!("success"); + Ok(()) + }) + }); + + println!(""); + println!("starting task to publish messages to ({})...", TOPIC); + event_loop.run(publish).expect( + "expected the publish task to start", + ); + }); + + // This block will execute a future that suscribes to a topic, + // and reads any incoming messages. + // + { + let mut event_loop = Core::new().expect("expected event loop"); + let client = get_client(&event_loop.handle()); + let req = client.pubsub_sub(TOPIC, None); + + println!(""); + println!("waiting for messages on ({})...", TOPIC); + event_loop + .run(req.for_each(|msg| { + println!(""); + println!("received ({:?})", msg); + + Ok(()) + })) + .expect("expected a valid response"); + } +} -- cgit v1.2.3