diff options
author | Ferris Tseng <ferristseng@fastmail.fm> | 2017-10-11 18:12:45 -0400 |
---|---|---|
committer | Ferris Tseng <ferristseng@fastmail.fm> | 2017-10-11 18:12:45 -0400 |
commit | 96b5dee89ffa7d868c5145c5de55e8be94fccbc5 (patch) | |
tree | db9984988db56bea562687ddf8a552f63a60aa59 | |
parent | 9b93207e13a39e3667f26859deaadaa1640316e8 (diff) |
add ping command and example
-rw-r--r-- | ipfs-api/examples/ping_peer.rs | 53 | ||||
-rw-r--r-- | ipfs-api/src/client.rs | 92 | ||||
-rw-r--r-- | ipfs-api/src/request/mod.rs | 2 | ||||
-rw-r--r-- | ipfs-api/src/request/ping.rs | 17 | ||||
-rw-r--r-- | ipfs-api/src/response/ping.rs | 2 |
5 files changed, 146 insertions, 20 deletions
diff --git a/ipfs-api/examples/ping_peer.rs b/ipfs-api/examples/ping_peer.rs new file mode 100644 index 0000000..8bd1346 --- /dev/null +++ b/ipfs-api/examples/ping_peer.rs @@ -0,0 +1,53 @@ +extern crate futures; +extern crate ipfs_api; +extern crate tokio_core; + +use futures::stream::Stream; +use ipfs_api::IpfsClient; +use tokio_core::reactor::Core; + + +// Creates an Ipfs client, discovers a connected peer, and pings it using the +// streaming Api, and by collecting it into a collection. +// +fn main() { + if let Ok(mut core) = Core::new() { + println!("connecting to localhost:5001..."); + + let client = + IpfsClient::new(&core.handle(), "localhost", 5001).expect("expected a valid url"); + + println!(""); + println!("discovering connected peers..."); + + let connected = client.swarm_peers(); + let connected = core.run(connected).expect("expected a valid response"); + + let peer = connected.peers.iter().next().expect( + "expected at least one peer", + ); + + println!(""); + println!("discovered peer ({})", peer.peer); + println!(""); + println!("streaming 10 pings..."); + let req = client.ping(&peer.peer[..], Some(10)); + + core.run(req.for_each(|ping| { + println!("{:?}", ping); + Ok(()) + })).expect("expected a valid response"); + + println!(""); + println!("gathering 15 pings..."); + + let req = client.ping(&peer.peer[..], Some(15)); + let pings: Vec<_> = core.run(req.collect()).expect("expected a valid response"); + + for ping in pings.iter() { + println!("got response ({:?}) in ({})...", ping.text, ping.time); + } + } else { + println!("failed to create event loop"); + } +} diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index 5bf95a9..a7b89db 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize}; use serde_json; use std::io::Read; use tokio_core::reactor::Handle; +use tokio_io::{AsyncRead, io as async_io}; /// A future response returned by the reqwest HTTP client. @@ -15,6 +16,11 @@ use tokio_core::reactor::Handle; type AsyncResponse<T> = Box<Future<Item = T, Error = Error>>; +/// A future that returns a stream of responses. +/// +type AsyncStreamResponse<T> = Box<Stream<Item = T, Error = Error>>; + + /// Asynchronous Ipfs client. /// pub struct IpfsClient { @@ -47,18 +53,20 @@ impl IpfsClient { /// Builds the url for an api call. /// - fn build_url<Req>(&self, req: &Req) -> Result<Url, Error> + fn build_base_request<Req>(&self, req: &Req) -> Result<async::RequestBuilder, Error> where Req: ApiRequest + Serialize, { - let uri = format!( + let url = format!( "{}{}?{}", self.base, Req::path(), ::serde_urlencoded::to_string(req)? ); - uri.parse().map_err(From::from) + url.parse::<Url>() + .map(|url| self.client.request(Method::Get, url)) + .map_err(From::from) } /// Processes a response, returning an error or a deserialized json response. @@ -120,10 +128,9 @@ impl IpfsClient { where Req: ApiRequest + Serialize, { - let res = self.build_url(req) - .map(|url| self.client.request(Method::Get, url)) - .into_future() - .and_then(|req| IpfsClient::send_request(req)); + let res = self.build_base_request(req).into_future().and_then(|req| { + IpfsClient::send_request(req) + }); Box::new(res) } @@ -136,10 +143,9 @@ impl IpfsClient { Req: ApiRequest + Serialize, for<'de> Res: 'static + Deserialize<'de>, { - let res = self.build_url(req) - .map(|url| self.client.request(Method::Get, url)) - .into_future() - .and_then(|req| IpfsClient::send_request_json(req)); + let res = self.build_base_request(req).into_future().and_then(|req| { + IpfsClient::send_request_json(req) + }); Box::new(res) } @@ -153,13 +159,30 @@ impl IpfsClient { for<'de> Res: 'static + Deserialize<'de>, R: 'static + Read + Send, { - let res = self.build_url(req) - .map(|url| self.client.request(Method::Get, url)) - .into_future() - .and_then(move |req| { + let res = self.build_base_request(req).into_future().and_then( + move |req| { let form = multipart::Form::new().part("file", multipart::Part::reader(data)); IpfsClient::send_request_json(req) - }); + }, + ); + + Box::new(res) + } + + /// Generic method to return a streaming response of deserialized json + /// objects delineated by new line separators. + /// + fn request_stream<Req, Res>(&self, req: &Req) -> AsyncStreamResponse<Res> + where + Req: ApiRequest + Serialize, + for<'de> Res: 'static + Deserialize<'de>, + { + let res = self.build_base_request(req) + .into_future() + .and_then(|mut req| req.send().from_err()) + .map(|res| res.into_body().from_err()) + .flatten_stream() + .and_then(|chunk| serde_json::from_slice(&chunk).map_err(From::from)); Box::new(res) } @@ -236,7 +259,11 @@ impl IpfsClient { /// Returns the diff of two Ipfs objects. /// - pub fn object_diff(&self, key0: &str, key1: &str) -> AsyncResponse<response::ObjectDiffResponse> { + pub fn object_diff( + &self, + key0: &str, + key1: &str, + ) -> AsyncResponse<response::ObjectDiffResponse> { self.request(&request::ObjectDiff { key0, key1 }) } @@ -270,10 +297,24 @@ impl IpfsClient { /// Removes a pinned object from local storage. /// - pub fn pin_rm(&self, key: &str, recursive: Option<bool>) -> AsyncResponse<response::PinRmResponse> { + pub fn pin_rm( + &self, + key: &str, + recursive: Option<bool>, + ) -> AsyncResponse<response::PinRmResponse> { self.request(&request::PinRm { key, recursive }) } + /// Pings a peer. + /// + pub fn ping( + &self, + peer: &str, + count: Option<usize>, + ) -> AsyncStreamResponse<response::PingResponse> { + self.request_stream(&request::Ping { peer, count }) + } + /// List subscribed pubsub topics. /// pub fn pubsub_ls(&self) -> AsyncResponse<response::PubsubLsResponse> { @@ -282,10 +323,23 @@ impl IpfsClient { /// List peers that are being published to. /// - pub fn pubsub_peers(&self, topic: Option<&str>) -> AsyncResponse<response::PubsubPeersResponse> { + pub fn pubsub_peers( + &self, + topic: Option<&str>, + ) -> AsyncResponse<response::PubsubPeersResponse> { self.request(&request::PubsubPeers { topic }) } + /// Publish a message to a topic. + /// + pub fn pubsub_pub( + &self, + topic: &str, + payload: &str, + ) -> AsyncResponse<response::PubsubPubResponse> { + self.request(&request::PubsubPub { topic, payload }) + } + /// Returns bitswap stats. /// pub fn stats_bitswap(&self) -> AsyncResponse<response::StatsBitswapResponse> { diff --git a/ipfs-api/src/request/mod.rs b/ipfs-api/src/request/mod.rs index 73bb206..79daa32 100644 --- a/ipfs-api/src/request/mod.rs +++ b/ipfs-api/src/request/mod.rs @@ -6,6 +6,7 @@ pub use self::dag::*; pub use self::ls::*; pub use self::object::*; pub use self::pin::*; +pub use self::ping::*; pub use self::pubsub::*; pub use self::stats::*; pub use self::swarm::*; @@ -54,6 +55,7 @@ mod dag; mod ls; mod object; mod pin; +mod ping; mod pubsub; mod stats; mod swarm; diff --git a/ipfs-api/src/request/ping.rs b/ipfs-api/src/request/ping.rs new file mode 100644 index 0000000..89407d3 --- /dev/null +++ b/ipfs-api/src/request/ping.rs @@ -0,0 +1,17 @@ +use request::ApiRequest; + + +#[derive(Serialize)] +pub struct Ping<'a> { + #[serde(rename = "arg")] + pub peer: &'a str, + + pub count: Option<usize>, +} + +impl<'a> ApiRequest for Ping<'a> { + #[inline] + fn path() -> &'static str { + "/ping" + } +} diff --git a/ipfs-api/src/response/ping.rs b/ipfs-api/src/response/ping.rs index 76ecc5c..b5740b3 100644 --- a/ipfs-api/src/response/ping.rs +++ b/ipfs-api/src/response/ping.rs @@ -1,4 +1,4 @@ -#[derive(Deserialize)] +#[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")] pub struct PingResponse { pub success: bool, |