diff options
author | Colin Reeder <colin@vpzom.click> | 2020-10-14 16:51:47 -0600 |
---|---|---|
committer | Colin Reeder <colin@vpzom.click> | 2020-10-14 16:51:47 -0600 |
commit | 7127859a090c225b45d85ce403e187fc99fa6fae (patch) | |
tree | 37f7c3784fe0de35538afacc2b4d666246c9c7c9 | |
parent | 332d47f81ab0481757ee871f0cedd873154b1caf (diff) |
Move ratelimit implementation into external crate 'henry'
-rw-r--r-- | Cargo.lock | 16 | ||||
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | src/main.rs | 5 | ||||
-rw-r--r-- | src/ratelimit.rs | 82 |
4 files changed, 16 insertions, 89 deletions
@@ -703,6 +703,16 @@ dependencies = [ ] [[package]] +name = "henry" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6426cea37391e26fd222f13377b524426fc02eefb3d67f20f576cbdebc46dca" +dependencies = [ + "dashmap", + "tokio", +] + +[[package]] name = "hermit-abi" version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1000,7 +1010,6 @@ dependencies = [ "bumpalo", "bytes", "chrono", - "dashmap", "deadpool-postgres", "either", "fast_chemail", @@ -1009,6 +1018,7 @@ dependencies = [ "futures", "hancock", "headers", + "henry", "http", "hyper", "hyper-tls", @@ -1881,9 +1891,9 @@ checksum = "53953d2d3a5ad81d9f844a32f14ebb121f50b650cd59d0ee2a07cf13c617efed" [[package]] name = "tokio" -version = "0.2.21" +version = "0.2.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d099fa27b9702bed751524694adbe393e18b36b204da91eb1cbbbbb4a5ee2d58" +checksum = "5d34ca54d84bf2b5b4d7d31e901a8464f7b60ac145a284fba25ceb801f2ddccd" dependencies = [ "bytes", "fnv", @@ -47,7 +47,7 @@ rand = "0.7.3" bs58 = "0.3.1" bumpalo = "3.4.0" tokio-util = "0.3.1" -dashmap = "3.11.10" +henry = "0.1.0" [dev-dependencies] rand = "0.7.3" diff --git a/src/main.rs b/src/main.rs index bee4bec..972ee8a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,6 @@ use std::sync::Arc; use trout::hyper::RoutingFailureExtHyper; mod apub_util; -mod ratelimit; mod routes; mod tasks; mod worker; @@ -140,7 +139,7 @@ pub struct BaseContext { pub http_client: HttpClient, pub apub_proxy_rewrites: bool, pub media_location: Option<std::path::PathBuf>, - pub api_ratelimit: ratelimit::RatelimitBucket<std::net::IpAddr>, + pub api_ratelimit: henry::RatelimitBucket<std::net::IpAddr>, pub local_hostname: String, } @@ -937,7 +936,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { host_url_apub, http_client: hyper::Client::builder().build(hyper_tls::HttpsConnector::new()), apub_proxy_rewrites, - api_ratelimit: ratelimit::RatelimitBucket::new(300), + api_ratelimit: henry::RatelimitBucket::new(300), }); let worker_trigger = worker::start_worker(base_context.clone()); diff --git a/src/ratelimit.rs b/src/ratelimit.rs deleted file mode 100644 index 93d7887..0000000 --- a/src/ratelimit.rs +++ /dev/null @@ -1,82 +0,0 @@ -use std::sync::atomic::AtomicU16; - -pub struct RatelimitBucket<K> { - cap: u16, - inner: tokio::sync::RwLock<Inner<K>>, -} - -impl<K: Eq + std::hash::Hash + std::fmt::Debug> RatelimitBucket<K> { - pub fn new(cap: u16) -> Self { - Self { - cap, - inner: tokio::sync::RwLock::new(Inner { - divider_time: std::time::Instant::now(), - last_minute: None, - current_minute: dashmap::DashMap::new(), - }), - } - } - - pub async fn try_call(&self, key: K) -> bool { - let now = std::time::Instant::now(); - let inner = self.inner.read().await; - let seconds_into = now.duration_since(inner.divider_time).as_secs(); - if seconds_into >= 60 { - std::mem::drop(inner); - let mut inner = self.inner.write().await; - - let seconds_into_new = now.duration_since(inner.divider_time).as_secs(); - - // check again - if seconds_into_new >= 120 { - // more than two minutes elapsed, reset - inner.last_minute = None; - inner.current_minute = dashmap::DashMap::new(); - inner.divider_time = now; - - self.try_for_current(0, &inner, key).await - } else if seconds_into_new >= 60 { - let mut tmp = dashmap::DashMap::new(); - std::mem::swap(&mut tmp, &mut inner.current_minute); - inner.last_minute = Some(tmp.into_read_only()); - inner.divider_time += std::time::Duration::new(60, 0); - - self.try_for_current(seconds_into_new - 60, &inner, key) - .await - } else { - self.try_for_current(seconds_into_new, &inner, key).await - } - } else { - self.try_for_current(seconds_into, &inner, key).await - } - } - - async fn try_for_current(&self, seconds_into: u64, inner: &Inner<K>, key: K) -> bool { - let prev_count = if let Some(last_minute) = &inner.last_minute { - if let Some(prev_count) = last_minute.get(&key) { - (u64::from(prev_count.load(std::sync::atomic::Ordering::Relaxed)) - * (60 - seconds_into) - / 60) as u16 - } else { - 0 - } - } else { - 0 - }; - - let count = prev_count - + inner - .current_minute - .entry(key) - .or_default() - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); - - count < self.cap - } -} - -struct Inner<K> { - divider_time: std::time::Instant, - last_minute: Option<dashmap::ReadOnlyView<K, AtomicU16>>, - current_minute: dashmap::DashMap<K, AtomicU16>, -} |