summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorebroto <ebroto@tutanota.com>2019-12-23 09:01:14 +0100
committerAram Drevekenin <aram@poor.dev>2019-12-23 09:01:14 +0100
commit5a6c4a46f2918b628b762b3184d282028e3bfbd6 (patch)
tree6d347a74a47000dc11ce62a83239e77087284d04
parent6ba0d2c0ba38cab433c8a98735eeaf841fcedbaf (diff)
fix(dns): keep track of addresses being resolved (#28)
Before we could spawn a lot of name resolution queries while waiting for the response, which could lead to a "too many open files" error.
-rw-r--r--src/network/dns/client.rs27
1 files changed, 22 insertions, 5 deletions
diff --git a/src/network/dns/client.rs b/src/network/dns/client.rs
index bf479d4..1f3fe17 100644
--- a/src/network/dns/client.rs
+++ b/src/network/dns/client.rs
@@ -1,5 +1,6 @@
use crate::network::dns::{resolver::Lookup, IpTable};
use std::{
+ collections::HashSet,
future::Future,
net::Ipv4Addr,
sync::{Arc, Mutex},
@@ -10,10 +11,13 @@ use tokio::{
sync::mpsc::{self, Sender},
};
+type PendingAddrs = HashSet<Ipv4Addr>;
+
const CHANNEL_SIZE: usize = 1_000;
pub struct Client {
cache: Arc<Mutex<IpTable>>,
+ pending: Arc<Mutex<PendingAddrs>>,
tx: Option<Sender<Vec<Ipv4Addr>>>,
handle: Option<JoinHandle<()>>,
}
@@ -25,11 +29,13 @@ impl Client {
B: Future<Output = ()> + Send + 'static,
{
let cache = Arc::new(Mutex::new(IpTable::new()));
+ let pending = Arc::new(Mutex::new(PendingAddrs::new()));
let mut runtime = Runtime::new()?;
let (tx, mut rx) = mpsc::channel::<Vec<Ipv4Addr>>(CHANNEL_SIZE);
let handle = Builder::new().name("resolver".into()).spawn({
let cache = cache.clone();
+ let pending = pending.clone();
move || {
runtime.block_on(async {
let resolver = Arc::new(resolver);
@@ -40,11 +46,13 @@ impl Client {
tokio::spawn({
let resolver = resolver.clone();
let cache = cache.clone();
+ let pending = pending.clone();
+
async move {
if let Some(name) = resolver.lookup(ip).await {
- let mut cache = cache.lock().unwrap();
- cache.insert(ip, name);
+ cache.lock().unwrap().insert(ip, name);
}
+ pending.lock().unwrap().remove(&ip);
}
});
}
@@ -55,14 +63,23 @@ impl Client {
Ok(Self {
cache,
+ pending,
tx: Some(tx),
handle: Some(handle),
})
}
pub fn resolve(&mut self, ips: Vec<Ipv4Addr>) {
- // Discard the message if the channel is full; it will be retried eventually.
- let _ = self.tx.as_mut().unwrap().try_send(ips);
+ // Remove ips that are already being resolved
+ let ips = ips
+ .into_iter()
+ .filter(|ip| self.pending.lock().unwrap().insert(ip.clone()))
+ .collect::<Vec<_>>();
+
+ if !ips.is_empty() {
+ // Discard the message if the channel is full; it will be retried eventually
+ let _ = self.tx.as_mut().unwrap().try_send(ips);
+ }
}
pub fn cache(&mut self) -> IpTable {
@@ -73,7 +90,7 @@ impl Client {
impl Drop for Client {
fn drop(&mut self) {
- // Do the Option dance to be able to drop the sender so that the receiver finishes and the thread can be joined.
+ // Do the Option dance to be able to drop the sender so that the receiver finishes and the thread can be joined
drop(self.tx.take().unwrap());
self.handle.take().unwrap().join().unwrap();
}