diff options
-rw-r--r-- | ipfs-api/src/client.rs | 4 | ||||
-rw-r--r-- | ipfs-api/src/read.rs | 21 | ||||
-rw-r--r-- | ipfs-api/src/response/dht.rs | 39 | ||||
-rw-r--r-- | ipfs-api/src/response/serde.rs | 53 | ||||
-rw-r--r-- | ipfs-cli/src/command/dht.rs | 44 |
5 files changed, 150 insertions, 11 deletions
diff --git a/ipfs-api/src/client.rs b/ipfs-api/src/client.rs index 486b9b4..78379d4 100644 --- a/ipfs-api/src/client.rs +++ b/ipfs-api/src/client.rs @@ -497,8 +497,8 @@ impl IpfsClient { /// Announce to the network that you are providing a given value. /// #[inline] - pub fn dht_provide(&self, key: &str) -> AsyncResponse<response::DhtProvideResponse> { - self.request(&request::DhtProvide { key }, None) + pub fn dht_provide(&self, key: &str) -> AsyncStreamResponse<response::DhtProvideResponse> { + self.request_stream(&request::DhtProvide { key }, None) } /// Write a key/value pair to the DHT. diff --git a/ipfs-api/src/read.rs b/ipfs-api/src/read.rs index 5396f46..5f60c69 100644 --- a/ipfs-api/src/read.rs +++ b/ipfs-api/src/read.rs @@ -69,13 +69,20 @@ where // Err(e) => { if self.parse_stream_error { - let raw = Raw::from(slice); - - match XStreamError::parse_header(&raw) { - Ok(stream_error) => Err( - ErrorKind::StreamError(stream_error.error).into(), - ), - Err(_) => Err(e.into()), + match slice.iter().position(|&x| x == b':') { + Some(colon) + if &slice[..colon] == XStreamError::header_name().as_bytes() => { + let raw = Raw::from(&slice[colon + 2..]); + + match XStreamError::parse_header(&raw) { + Ok(stream_error) => Err( + ErrorKind::StreamError(stream_error.error) + .into(), + ), + Err(_) => Err(e.into()), + } + } + _ => Err(e.into()), } } else { Err(e.into()) diff --git a/ipfs-api/src/response/dht.rs b/ipfs-api/src/response/dht.rs index b8493e7..5279299 100644 --- a/ipfs-api/src/response/dht.rs +++ b/ipfs-api/src/response/dht.rs @@ -7,6 +7,43 @@ // use response::serde; +use serde::de::{Deserialize, Deserializer, Error}; + + +/// See +/// [libp2p](https://github.com/libp2p/go-libp2p-routing/blob/master/notifications/query.go#L16). +/// +#[derive(Debug)] +pub enum DhtType { + SendingQuery, + PeerResponse, + FinalPeer, + QueryError, + Provider, + Value, + AddingPeer, + DialingPeer, +} + +impl<'de> Deserialize<'de> for DhtType { + #[inline] + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + match deserializer.deserialize_i64(serde::IntegerVisitor)? { + 0 => Ok(DhtType::SendingQuery), + 1 => Ok(DhtType::PeerResponse), + 2 => Ok(DhtType::FinalPeer), + 3 => Ok(DhtType::QueryError), + 4 => Ok(DhtType::Provider), + 5 => Ok(DhtType::Value), + 6 => Ok(DhtType::AddingPeer), + 7 => Ok(DhtType::DialingPeer), + i => Err(D::Error::custom(format!("unknown dht type '{}'", i))), + } + } +} #[derive(Debug, Deserialize)] @@ -27,7 +64,7 @@ pub struct DhtMessage { pub id: String, #[serde(rename = "Type")] - pub typ: isize, + pub typ: DhtType, #[serde(deserialize_with = "serde::deserialize_vec")] pub responses: Vec<DhtPeerResponse>, diff --git a/ipfs-api/src/response/serde.rs b/ipfs-api/src/response/serde.rs index 307e49e..cb96ba2 100644 --- a/ipfs-api/src/response/serde.rs +++ b/ipfs-api/src/response/serde.rs @@ -13,6 +13,59 @@ use std::fmt; use std::marker::PhantomData; +pub struct IntegerVisitor; + +impl<'de> Visitor<'de> for IntegerVisitor { + type Value = i64; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("integer") + } + + fn visit_i8<E>(self, num: i8) -> Result<Self::Value, E> + where + E: Error, + { + Ok(num as i64) + } + + fn visit_i32<E>(self, num: i32) -> Result<Self::Value, E> + where + E: Error, + { + Ok(num as i64) + } + + fn visit_i64<E>(self, num: i64) -> Result<Self::Value, E> + where + E: Error, + { + Ok(num) + } + + fn visit_u8<E>(self, num: u8) -> Result<Self::Value, E> + where + E: Error, + { + Ok(num as i64) + } + + fn visit_u32<E>(self, num: u32) -> Result<Self::Value, E> + where + E: Error, + { + Ok(num as i64) + } + + fn visit_u64<E>(self, num: u64) -> Result<Self::Value, E> + where + E: Error, + { + Ok(num as i64) + } +} + + /// Deserializes a sequence or null values as a vec. /// pub fn deserialize_vec<'de, T, D>(deserializer: D) -> Result<Vec<T>, D::Error> diff --git a/ipfs-cli/src/command/dht.rs b/ipfs-cli/src/command/dht.rs index dc28188..7fd72e1 100644 --- a/ipfs-cli/src/command/dht.rs +++ b/ipfs-cli/src/command/dht.rs @@ -22,6 +22,18 @@ pub fn signature<'a, 'b>() -> App<'a, 'b> { (about: "Query the DHT for all of the multiaddresses associated with a Peer ID") (@arg PEER: +required "Peer to search for") ) + (@subcommand findprovs => + (about: "Find peers in the DHT that can provide the given key") + (@arg KEY: +required "Key to search for") + ) + (@subcommand get => + (about: "Given a key, query the DHT for its best value") + (@arg KEY: +required "The key search for") + ) + (@subcommand provide => + (about: "Announce to the network that you are providing the given values") + (@arg KEY: +required "The key you are providing") + ) ) } @@ -29,7 +41,7 @@ pub fn signature<'a, 'b>() -> App<'a, 'b> { fn print_dht_response(res: DhtMessage) { println!(""); println!(" id : {}", res.id); - println!(" type : {}", res.typ); + println!(" type : {:?}", res.typ); println!(" responses :"); for peer_res in res.responses { println!(" id : {}", peer_res.id); @@ -56,6 +68,36 @@ pub fn handle(core: &mut Core, client: &IpfsClient, args: &ArgMatches) { core.run(req).expect(EXPECTED_API); } + ("findprovs", Some(args)) => { + let key = args.value_of("KEY").unwrap(); + let req = client.dht_findprovs(&key).for_each(|peer| { + print_dht_response(peer); + + Ok(()) + }); + + core.run(req).expect(EXPECTED_API); + } + ("get", Some(args)) => { + let key = args.value_of("KEY").unwrap(); + let req = client.dht_get(&key).for_each(|peer| { + print_dht_response(peer); + + Ok(()) + }); + + core.run(req).expect(EXPECTED_API); + } + ("provide", Some(args)) => { + let key = args.value_of("KEY").unwrap(); + let req = client.dht_provide(&key).for_each(|peer| { + print_dht_response(peer); + + Ok(()) + }); + + core.run(req).expect(EXPECTED_API); + } _ => unreachable!(), } } |