summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFerris Tseng <ferristseng@fastmail.fm>2017-11-25 17:18:14 -0500
committerFerris Tseng <ferristseng@fastmail.fm>2017-11-25 17:18:14 -0500
commit5c3c9b170f1415fcbb4c0de28339467bcbca5e29 (patch)
treec871a52c7611df78519c8673e2a1a7034bf337ba
parentbe4d9527b0ee4830d5434e6e70415d2f18af0180 (diff)
add dht and better serialization
-rw-r--r--ipfs-api/src/client.rs4
-rw-r--r--ipfs-api/src/read.rs21
-rw-r--r--ipfs-api/src/response/dht.rs39
-rw-r--r--ipfs-api/src/response/serde.rs53
-rw-r--r--ipfs-cli/src/command/dht.rs44
5 files changed, 150 insertions, 11 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs
index 486b9b4..78379d4 100644
--- a/ipfs-api/src/client.rs
+++ b/ipfs-api/src/client.rs
@@ -497,8 +497,8 @@ impl IpfsClient {
/// Announce to the network that you are providing a given value.
///
#[inline]
- pub fn dht_provide(&self, key: &str) -> AsyncResponse<response::DhtProvideResponse> {
- self.request(&request::DhtProvide { key }, None)
+ pub fn dht_provide(&self, key: &str) -> AsyncStreamResponse<response::DhtProvideResponse> {
+ self.request_stream(&request::DhtProvide { key }, None)
}
/// Write a key/value pair to the DHT.
diff --git a/ipfs-api/src/read.rs b/ipfs-api/src/read.rs
index 5396f46..5f60c69 100644
--- a/ipfs-api/src/read.rs
+++ b/ipfs-api/src/read.rs
@@ -69,13 +69,20 @@ where
//
Err(e) => {
if self.parse_stream_error {
- let raw = Raw::from(slice);
-
- match XStreamError::parse_header(&raw) {
- Ok(stream_error) => Err(
- ErrorKind::StreamError(stream_error.error).into(),
- ),
- Err(_) => Err(e.into()),
+ match slice.iter().position(|&x| x == b':') {
+ Some(colon)
+ if &slice[..colon] == XStreamError::header_name().as_bytes() => {
+ let raw = Raw::from(&slice[colon + 2..]);
+
+ match XStreamError::parse_header(&raw) {
+ Ok(stream_error) => Err(
+ ErrorKind::StreamError(stream_error.error)
+ .into(),
+ ),
+ Err(_) => Err(e.into()),
+ }
+ }
+ _ => Err(e.into()),
}
} else {
Err(e.into())
diff --git a/ipfs-api/src/response/dht.rs b/ipfs-api/src/response/dht.rs
index b8493e7..5279299 100644
--- a/ipfs-api/src/response/dht.rs
+++ b/ipfs-api/src/response/dht.rs
@@ -7,6 +7,43 @@
//
use response::serde;
+use serde::de::{Deserialize, Deserializer, Error};
+
+
+/// See
+/// [libp2p](https://github.com/libp2p/go-libp2p-routing/blob/master/notifications/query.go#L16).
+///
+#[derive(Debug)]
+pub enum DhtType {
+ SendingQuery,
+ PeerResponse,
+ FinalPeer,
+ QueryError,
+ Provider,
+ Value,
+ AddingPeer,
+ DialingPeer,
+}
+
+impl<'de> Deserialize<'de> for DhtType {
+ #[inline]
+ fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ match deserializer.deserialize_i64(serde::IntegerVisitor)? {
+ 0 => Ok(DhtType::SendingQuery),
+ 1 => Ok(DhtType::PeerResponse),
+ 2 => Ok(DhtType::FinalPeer),
+ 3 => Ok(DhtType::QueryError),
+ 4 => Ok(DhtType::Provider),
+ 5 => Ok(DhtType::Value),
+ 6 => Ok(DhtType::AddingPeer),
+ 7 => Ok(DhtType::DialingPeer),
+ i => Err(D::Error::custom(format!("unknown dht type '{}'", i))),
+ }
+ }
+}
#[derive(Debug, Deserialize)]
@@ -27,7 +64,7 @@ pub struct DhtMessage {
pub id: String,
#[serde(rename = "Type")]
- pub typ: isize,
+ pub typ: DhtType,
#[serde(deserialize_with = "serde::deserialize_vec")]
pub responses: Vec<DhtPeerResponse>,
diff --git a/ipfs-api/src/response/serde.rs b/ipfs-api/src/response/serde.rs
index 307e49e..cb96ba2 100644
--- a/ipfs-api/src/response/serde.rs
+++ b/ipfs-api/src/response/serde.rs
@@ -13,6 +13,59 @@ use std::fmt;
use std::marker::PhantomData;
+pub struct IntegerVisitor;
+
+impl<'de> Visitor<'de> for IntegerVisitor {
+ type Value = i64;
+
+ fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
+ formatter.write_str("integer")
+ }
+
+ fn visit_i8<E>(self, num: i8) -> Result<Self::Value, E>
+ where
+ E: Error,
+ {
+ Ok(num as i64)
+ }
+
+ fn visit_i32<E>(self, num: i32) -> Result<Self::Value, E>
+ where
+ E: Error,
+ {
+ Ok(num as i64)
+ }
+
+ fn visit_i64<E>(self, num: i64) -> Result<Self::Value, E>
+ where
+ E: Error,
+ {
+ Ok(num)
+ }
+
+ fn visit_u8<E>(self, num: u8) -> Result<Self::Value, E>
+ where
+ E: Error,
+ {
+ Ok(num as i64)
+ }
+
+ fn visit_u32<E>(self, num: u32) -> Result<Self::Value, E>
+ where
+ E: Error,
+ {
+ Ok(num as i64)
+ }
+
+ fn visit_u64<E>(self, num: u64) -> Result<Self::Value, E>
+ where
+ E: Error,
+ {
+ Ok(num as i64)
+ }
+}
+
+
/// Deserializes a sequence or null values as a vec.
///
pub fn deserialize_vec<'de, T, D>(deserializer: D) -> Result<Vec<T>, D::Error>
diff --git a/ipfs-cli/src/command/dht.rs b/ipfs-cli/src/command/dht.rs
index dc28188..7fd72e1 100644
--- a/ipfs-cli/src/command/dht.rs
+++ b/ipfs-cli/src/command/dht.rs
@@ -22,6 +22,18 @@ pub fn signature<'a, 'b>() -> App<'a, 'b> {
(about: "Query the DHT for all of the multiaddresses associated with a Peer ID")
(@arg PEER: +required "Peer to search for")
)
+ (@subcommand findprovs =>
+ (about: "Find peers in the DHT that can provide the given key")
+ (@arg KEY: +required "Key to search for")
+ )
+ (@subcommand get =>
+ (about: "Given a key, query the DHT for its best value")
+ (@arg KEY: +required "The key search for")
+ )
+ (@subcommand provide =>
+ (about: "Announce to the network that you are providing the given values")
+ (@arg KEY: +required "The key you are providing")
+ )
)
}
@@ -29,7 +41,7 @@ pub fn signature<'a, 'b>() -> App<'a, 'b> {
fn print_dht_response(res: DhtMessage) {
println!("");
println!(" id : {}", res.id);
- println!(" type : {}", res.typ);
+ println!(" type : {:?}", res.typ);
println!(" responses :");
for peer_res in res.responses {
println!(" id : {}", peer_res.id);
@@ -56,6 +68,36 @@ pub fn handle(core: &mut Core, client: &IpfsClient, args: &ArgMatches) {
core.run(req).expect(EXPECTED_API);
}
+ ("findprovs", Some(args)) => {
+ let key = args.value_of("KEY").unwrap();
+ let req = client.dht_findprovs(&key).for_each(|peer| {
+ print_dht_response(peer);
+
+ Ok(())
+ });
+
+ core.run(req).expect(EXPECTED_API);
+ }
+ ("get", Some(args)) => {
+ let key = args.value_of("KEY").unwrap();
+ let req = client.dht_get(&key).for_each(|peer| {
+ print_dht_response(peer);
+
+ Ok(())
+ });
+
+ core.run(req).expect(EXPECTED_API);
+ }
+ ("provide", Some(args)) => {
+ let key = args.value_of("KEY").unwrap();
+ let req = client.dht_provide(&key).for_each(|peer| {
+ print_dht_response(peer);
+
+ Ok(())
+ });
+
+ core.run(req).expect(EXPECTED_API);
+ }
_ => unreachable!(),
}
}