summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFerris Tseng <ferristseng@fastmail.fm>2017-10-11 18:12:45 -0400
committerFerris Tseng <ferristseng@fastmail.fm>2017-10-11 18:12:45 -0400
commit96b5dee89ffa7d868c5145c5de55e8be94fccbc5 (patch)
treedb9984988db56bea562687ddf8a552f63a60aa59
parent9b93207e13a39e3667f26859deaadaa1640316e8 (diff)
add ping command and example
-rw-r--r--ipfs-api/examples/ping_peer.rs53
-rw-r--r--ipfs-api/src/client.rs92
-rw-r--r--ipfs-api/src/request/mod.rs2
-rw-r--r--ipfs-api/src/request/ping.rs17
-rw-r--r--ipfs-api/src/response/ping.rs2
5 files changed, 146 insertions, 20 deletions
diff --git a/ipfs-api/examples/ping_peer.rs b/ipfs-api/examples/ping_peer.rs
new file mode 100644
index 0000000..8bd1346
--- /dev/null
+++ b/ipfs-api/examples/ping_peer.rs
@@ -0,0 +1,53 @@
+extern crate futures;
+extern crate ipfs_api;
+extern crate tokio_core;
+
+use futures::stream::Stream;
+use ipfs_api::IpfsClient;
+use tokio_core::reactor::Core;
+
+
+// Creates an Ipfs client, discovers a connected peer, and pings it using the
+// streaming Api, and by collecting it into a collection.
+//
+fn main() {
+ if let Ok(mut core) = Core::new() {
+ println!("connecting to localhost:5001...");
+
+ let client =
+ IpfsClient::new(&core.handle(), "localhost", 5001).expect("expected a valid url");
+
+ println!("");
+ println!("discovering connected peers...");
+
+ let connected = client.swarm_peers();
+ let connected = core.run(connected).expect("expected a valid response");
+
+ let peer = connected.peers.iter().next().expect(
+ "expected at least one peer",
+ );
+
+ println!("");
+ println!("discovered peer ({})", peer.peer);
+ println!("");
+ println!("streaming 10 pings...");
+ let req = client.ping(&peer.peer[..], Some(10));
+
+ core.run(req.for_each(|ping| {
+ println!("{:?}", ping);
+ Ok(())
+ })).expect("expected a valid response");
+
+ println!("");
+ println!("gathering 15 pings...");
+
+ let req = client.ping(&peer.peer[..], Some(15));
+ let pings: Vec<_> = core.run(req.collect()).expect("expected a valid response");
+
+ for ping in pings.iter() {
+ println!("got response ({:?}) in ({})...", ping.text, ping.time);
+ }
+ } else {
+ println!("failed to create event loop");
+ }
+}
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index 5bf95a9..a7b89db 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize};
use serde_json;
use std::io::Read;
use tokio_core::reactor::Handle;
+use tokio_io::{AsyncRead, io as async_io};
/// A future response returned by the reqwest HTTP client.
@@ -15,6 +16,11 @@ use tokio_core::reactor::Handle;
type AsyncResponse<T> = Box<Future<Item = T, Error = Error>>;
+/// A future that returns a stream of responses.
+///
+type AsyncStreamResponse<T> = Box<Stream<Item = T, Error = Error>>;
+
+
/// Asynchronous Ipfs client.
///
pub struct IpfsClient {
@@ -47,18 +53,20 @@ impl IpfsClient {
/// Builds the url for an api call.
///
- fn build_url<Req>(&self, req: &Req) -> Result<Url, Error>
+ fn build_base_request<Req>(&self, req: &Req) -> Result<async::RequestBuilder, Error>
where
Req: ApiRequest + Serialize,
{
- let uri = format!(
+ let url = format!(
"{}{}?{}",
self.base,
Req::path(),
::serde_urlencoded::to_string(req)?
);
- uri.parse().map_err(From::from)
+ url.parse::<Url>()
+ .map(|url| self.client.request(Method::Get, url))
+ .map_err(From::from)
}
/// Processes a response, returning an error or a deserialized json response.
@@ -120,10 +128,9 @@ impl IpfsClient {
where
Req: ApiRequest + Serialize,
{
- let res = self.build_url(req)
- .map(|url| self.client.request(Method::Get, url))
- .into_future()
- .and_then(|req| IpfsClient::send_request(req));
+ let res = self.build_base_request(req).into_future().and_then(|req| {
+ IpfsClient::send_request(req)
+ });
Box::new(res)
}
@@ -136,10 +143,9 @@ impl IpfsClient {
Req: ApiRequest + Serialize,
for<'de> Res: 'static + Deserialize<'de>,
{
- let res = self.build_url(req)
- .map(|url| self.client.request(Method::Get, url))
- .into_future()
- .and_then(|req| IpfsClient::send_request_json(req));
+ let res = self.build_base_request(req).into_future().and_then(|req| {
+ IpfsClient::send_request_json(req)
+ });
Box::new(res)
}
@@ -153,13 +159,30 @@ impl IpfsClient {
for<'de> Res: 'static + Deserialize<'de>,
R: 'static + Read + Send,
{
- let res = self.build_url(req)
- .map(|url| self.client.request(Method::Get, url))
- .into_future()
- .and_then(move |req| {
+ let res = self.build_base_request(req).into_future().and_then(
+ move |req| {
let form = multipart::Form::new().part("file", multipart::Part::reader(data));
IpfsClient::send_request_json(req)
- });
+ },
+ );
+
+ Box::new(res)
+ }
+
+ /// Generic method to return a streaming response of deserialized json
+ /// objects delineated by new line separators.
+ ///
+ fn request_stream<Req, Res>(&self, req: &Req) -> AsyncStreamResponse<Res>
+ where
+ Req: ApiRequest + Serialize,
+ for<'de> Res: 'static + Deserialize<'de>,
+ {
+ let res = self.build_base_request(req)
+ .into_future()
+ .and_then(|mut req| req.send().from_err())
+ .map(|res| res.into_body().from_err())
+ .flatten_stream()
+ .and_then(|chunk| serde_json::from_slice(&chunk).map_err(From::from));
Box::new(res)
}
@@ -236,7 +259,11 @@ impl IpfsClient {
/// Returns the diff of two Ipfs objects.
///
- pub fn object_diff(&self, key0: &str, key1: &str) -> AsyncResponse<response::ObjectDiffResponse> {
+ pub fn object_diff(
+ &self,
+ key0: &str,
+ key1: &str,
+ ) -> AsyncResponse<response::ObjectDiffResponse> {
self.request(&request::ObjectDiff { key0, key1 })
}
@@ -270,10 +297,24 @@ impl IpfsClient {
/// Removes a pinned object from local storage.
///
- pub fn pin_rm(&self, key: &str, recursive: Option<bool>) -> AsyncResponse<response::PinRmResponse> {
+ pub fn pin_rm(
+ &self,
+ key: &str,
+ recursive: Option<bool>,
+ ) -> AsyncResponse<response::PinRmResponse> {
self.request(&request::PinRm { key, recursive })
}
+ /// Pings a peer.
+ ///
+ pub fn ping(
+ &self,
+ peer: &str,
+ count: Option<usize>,
+ ) -> AsyncStreamResponse<response::PingResponse> {
+ self.request_stream(&request::Ping { peer, count })
+ }
+
/// List subscribed pubsub topics.
///
pub fn pubsub_ls(&self) -> AsyncResponse<response::PubsubLsResponse> {
@@ -282,10 +323,23 @@ impl IpfsClient {
/// List peers that are being published to.
///
- pub fn pubsub_peers(&self, topic: Option<&str>) -> AsyncResponse<response::PubsubPeersResponse> {
+ pub fn pubsub_peers(
+ &self,
+ topic: Option<&str>,
+ ) -> AsyncResponse<response::PubsubPeersResponse> {
self.request(&request::PubsubPeers { topic })
}
+ /// Publish a message to a topic.
+ ///
+ pub fn pubsub_pub(
+ &self,
+ topic: &str,
+ payload: &str,
+ ) -> AsyncResponse<response::PubsubPubResponse> {
+ self.request(&request::PubsubPub { topic, payload })
+ }
+
/// Returns bitswap stats.
///
pub fn stats_bitswap(&self) -> AsyncResponse<response::StatsBitswapResponse> {
diff --git a/ipfs-api/src/request/mod.rs b/ipfs-api/src/request/mod.rs
index 73bb206..79daa32 100644
--- a/ipfs-api/src/request/mod.rs
+++ b/ipfs-api/src/request/mod.rs
@@ -6,6 +6,7 @@ pub use self::dag::*;
pub use self::ls::*;
pub use self::object::*;
pub use self::pin::*;
+pub use self::ping::*;
pub use self::pubsub::*;
pub use self::stats::*;
pub use self::swarm::*;
@@ -54,6 +55,7 @@ mod dag;
mod ls;
mod object;
mod pin;
+mod ping;
mod pubsub;
mod stats;
mod swarm;
diff --git a/ipfs-api/src/request/ping.rs b/ipfs-api/src/request/ping.rs
new file mode 100644
index 0000000..89407d3
--- /dev/null
+++ b/ipfs-api/src/request/ping.rs
@@ -0,0 +1,17 @@
+use request::ApiRequest;
+
+
+#[derive(Serialize)]
+pub struct Ping<'a> {
+ #[serde(rename = "arg")]
+ pub peer: &'a str,
+
+ pub count: Option<usize>,
+}
+
+impl<'a> ApiRequest for Ping<'a> {
+ #[inline]
+ fn path() -> &'static str {
+ "/ping"
+ }
+}
diff --git a/ipfs-api/src/response/ping.rs b/ipfs-api/src/response/ping.rs
index 76ecc5c..b5740b3 100644
--- a/ipfs-api/src/response/ping.rs
+++ b/ipfs-api/src/response/ping.rs
@@ -1,4 +1,4 @@
-#[derive(Deserialize)]
+#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct PingResponse {
pub success: bool,