diff options
author | ebroto <ebroto@tutanota.com> | 2019-12-23 09:01:14 +0100 |
---|---|---|
committer | Aram Drevekenin <aram@poor.dev> | 2019-12-23 09:01:14 +0100 |
commit | 5a6c4a46f2918b628b762b3184d282028e3bfbd6 (patch) | |
tree | 6d347a74a47000dc11ce62a83239e77087284d04 | |
parent | 6ba0d2c0ba38cab433c8a98735eeaf841fcedbaf (diff) |
fix(dns): keep track of addresses being resolved (#28)
Before we could spawn a lot of name resolution queries while waiting for the
response, which could lead to a "too many open files" error.
-rw-r--r-- | src/network/dns/client.rs | 27 |
1 files changed, 22 insertions, 5 deletions
diff --git a/src/network/dns/client.rs b/src/network/dns/client.rs index bf479d4..1f3fe17 100644 --- a/src/network/dns/client.rs +++ b/src/network/dns/client.rs @@ -1,5 +1,6 @@ use crate::network::dns::{resolver::Lookup, IpTable}; use std::{ + collections::HashSet, future::Future, net::Ipv4Addr, sync::{Arc, Mutex}, @@ -10,10 +11,13 @@ use tokio::{ sync::mpsc::{self, Sender}, }; +type PendingAddrs = HashSet<Ipv4Addr>; + const CHANNEL_SIZE: usize = 1_000; pub struct Client { cache: Arc<Mutex<IpTable>>, + pending: Arc<Mutex<PendingAddrs>>, tx: Option<Sender<Vec<Ipv4Addr>>>, handle: Option<JoinHandle<()>>, } @@ -25,11 +29,13 @@ impl Client { B: Future<Output = ()> + Send + 'static, { let cache = Arc::new(Mutex::new(IpTable::new())); + let pending = Arc::new(Mutex::new(PendingAddrs::new())); let mut runtime = Runtime::new()?; let (tx, mut rx) = mpsc::channel::<Vec<Ipv4Addr>>(CHANNEL_SIZE); let handle = Builder::new().name("resolver".into()).spawn({ let cache = cache.clone(); + let pending = pending.clone(); move || { runtime.block_on(async { let resolver = Arc::new(resolver); @@ -40,11 +46,13 @@ impl Client { tokio::spawn({ let resolver = resolver.clone(); let cache = cache.clone(); + let pending = pending.clone(); + async move { if let Some(name) = resolver.lookup(ip).await { - let mut cache = cache.lock().unwrap(); - cache.insert(ip, name); + cache.lock().unwrap().insert(ip, name); } + pending.lock().unwrap().remove(&ip); } }); } @@ -55,14 +63,23 @@ impl Client { Ok(Self { cache, + pending, tx: Some(tx), handle: Some(handle), }) } pub fn resolve(&mut self, ips: Vec<Ipv4Addr>) { - // Discard the message if the channel is full; it will be retried eventually. - let _ = self.tx.as_mut().unwrap().try_send(ips); + // Remove ips that are already being resolved + let ips = ips + .into_iter() + .filter(|ip| self.pending.lock().unwrap().insert(ip.clone())) + .collect::<Vec<_>>(); + + if !ips.is_empty() { + // Discard the message if the channel is full; it will be retried eventually + let _ = self.tx.as_mut().unwrap().try_send(ips); + } } pub fn cache(&mut self) -> IpTable { @@ -73,7 +90,7 @@ impl Client { impl Drop for Client { fn drop(&mut self) { - // Do the Option dance to be able to drop the sender so that the receiver finishes and the thread can be joined. + // Do the Option dance to be able to drop the sender so that the receiver finishes and the thread can be joined drop(self.tx.take().unwrap()); self.handle.take().unwrap().join().unwrap(); } |