summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2024-02-26 11:16:40 +0200
committerGitHub <noreply@github.com>2024-02-26 11:16:40 +0200
commitac6a2f6563084948332beba5c71d0204760e77a5 (patch)
tree1abd0dc6474c81bf15a7736ff33ab57008b4908e
parent5f68469ee56bff4f9f10d9a94c4461e76a7b2c52 (diff)
Protect type anomaly rate map (#17044)
* Add spinlock API to skip cancelability state changes. * Protect type anomaly rate map
-rw-r--r--src/libnetdata/locks/locks.c49
-rw-r--r--src/libnetdata/locks/locks.h4
-rw-r--r--src/ml/ad_charts.cc2
-rw-r--r--src/ml/ml-private.h1
-rw-r--r--src/ml/ml.cc31
5 files changed, 67 insertions, 20 deletions
diff --git a/src/libnetdata/locks/locks.c b/src/libnetdata/locks/locks.c
index 625dd052ce..adf683af23 100644
--- a/src/libnetdata/locks/locks.c
+++ b/src/libnetdata/locks/locks.c
@@ -297,14 +297,15 @@ void spinlock_init(SPINLOCK *spinlock) {
memset(spinlock, 0, sizeof(SPINLOCK));
}
-void spinlock_lock(SPINLOCK *spinlock) {
+static inline void spinlock_lock_internal(SPINLOCK *spinlock, bool cancelable) {
static const struct timespec ns = { .tv_sec = 0, .tv_nsec = 1 };
#ifdef NETDATA_INTERNAL_CHECKS
size_t spins = 0;
#endif
- netdata_thread_disable_cancelability();
+ if (!cancelable)
+ netdata_thread_disable_cancelability();
for(int i = 1;
__atomic_load_n(&spinlock->locked, __ATOMIC_RELAXED) ||
@@ -329,16 +330,19 @@ void spinlock_lock(SPINLOCK *spinlock) {
#endif
}
-void spinlock_unlock(SPINLOCK *spinlock) {
+static inline void spinlock_unlock_internal(SPINLOCK *spinlock, bool cancelable) {
#ifdef NETDATA_INTERNAL_CHECKS
spinlock->locker_pid = 0;
#endif
__atomic_clear(&spinlock->locked, __ATOMIC_RELEASE);
- netdata_thread_enable_cancelability();
+
+ if (!cancelable)
+ netdata_thread_enable_cancelability();
}
-bool spinlock_trylock(SPINLOCK *spinlock) {
- netdata_thread_disable_cancelability();
+static inline bool spinlock_trylock_internal(SPINLOCK *spinlock, bool cancelable) {
+ if (!cancelable)
+ netdata_thread_disable_cancelability();
if(!__atomic_load_n(&spinlock->locked, __ATOMIC_RELAXED) &&
!__atomic_test_and_set(&spinlock->locked, __ATOMIC_ACQUIRE))
@@ -346,10 +350,41 @@ bool spinlock_trylock(SPINLOCK *spinlock) {
return true;
// we didn't get the lock
- netdata_thread_enable_cancelability();
+ if (!cancelable)
+ netdata_thread_enable_cancelability();
return false;
}
+void spinlock_lock(SPINLOCK *spinlock)
+{
+ spinlock_lock_internal(spinlock, false);
+}
+
+void spinlock_unlock(SPINLOCK *spinlock)
+{
+ spinlock_unlock_internal(spinlock, false);
+}
+
+bool spinlock_trylock(SPINLOCK *spinlock)
+{
+ return spinlock_trylock_internal(spinlock, false);
+}
+
+void spinlock_lock_cancelable(SPINLOCK *spinlock)
+{
+ spinlock_lock_internal(spinlock, true);
+}
+
+void spinlock_unlock_cancelable(SPINLOCK *spinlock)
+{
+ spinlock_unlock_internal(spinlock, true);
+}
+
+bool spinlock_trylock_cancelable(SPINLOCK *spinlock)
+{
+ return spinlock_trylock_internal(spinlock, true);
+}
+
// ----------------------------------------------------------------------------
// rw_spinlock implementation
diff --git a/src/libnetdata/locks/locks.h b/src/libnetdata/locks/locks.h
index 6b492ae47c..09adfb41fc 100644
--- a/src/libnetdata/locks/locks.h
+++ b/src/libnetdata/locks/locks.h
@@ -25,6 +25,10 @@ void spinlock_lock(SPINLOCK *spinlock);
void spinlock_unlock(SPINLOCK *spinlock);
bool spinlock_trylock(SPINLOCK *spinlock);
+void spinlock_lock_cancelable(SPINLOCK *spinlock);
+void spinlock_unlock_cancelable(SPINLOCK *spinlock);
+bool spinlock_trylock_cancelable(SPINLOCK *spinlock);
+
typedef struct netdata_rw_spinlock {
int32_t readers;
SPINLOCK spinlock;
diff --git a/src/ml/ad_charts.cc b/src/ml/ad_charts.cc
index 4b70cb43f3..f70d009c4b 100644
--- a/src/ml/ad_charts.cc
+++ b/src/ml/ad_charts.cc
@@ -288,6 +288,7 @@ void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number
rrdset_flag_set(host->type_anomaly_rate_rs, RRDSET_FLAG_ANOMALY_DETECTION);
}
+ spinlock_lock_cancelable(&host->type_anomaly_rate_spinlock);
for (auto &entry : host->type_anomaly_rate) {
ml_type_anomaly_rate_t &type_anomaly_rate = entry.second;
@@ -304,6 +305,7 @@ void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number
type_anomaly_rate.anomalous_dimensions = 0;
type_anomaly_rate.normal_dimensions = 0;
}
+ spinlock_unlock_cancelable(&host->type_anomaly_rate_spinlock);
rrdset_done(host->type_anomaly_rate_rs);
}
diff --git a/src/ml/ml-private.h b/src/ml/ml-private.h
index 96b27d60ff..646a5cb17d 100644
--- a/src/ml/ml-private.h
+++ b/src/ml/ml-private.h
@@ -264,6 +264,7 @@ typedef struct {
RRDDIM *detector_events_new_anomaly_event_rd;
RRDSET *type_anomaly_rate_rs;
+ SPINLOCK type_anomaly_rate_spinlock;
std::unordered_map<STRING *, ml_type_anomaly_rate_t> type_anomaly_rate;
} ml_host_t;
diff --git a/src/ml/ml.cc b/src/ml/ml.cc
index 5844b0414d..057b8820b5 100644
--- a/src/ml/ml.cc
+++ b/src/ml/ml.cc
@@ -1090,20 +1090,24 @@ ml_host_detect_once(ml_host_t *host)
host->mls.num_anomalous_dimensions += chart_mls.num_anomalous_dimensions;
host->mls.num_normal_dimensions += chart_mls.num_normal_dimensions;
- STRING *key = rs->parts.type;
- auto &um = host->type_anomaly_rate;
- auto it = um.find(key);
- if (it == um.end()) {
- um[key] = ml_type_anomaly_rate_t {
- .rd = NULL,
- .normal_dimensions = 0,
- .anomalous_dimensions = 0
- };
- it = um.find(key);
- }
+ if (spinlock_trylock_cancelable(&host->type_anomaly_rate_spinlock))
+ {
+ STRING *key = rs->parts.type;
+ auto &um = host->type_anomaly_rate;
+ auto it = um.find(key);
+ if (it == um.end()) {
+ um[key] = ml_type_anomaly_rate_t {
+ .rd = NULL,
+ .normal_dimensions = 0,
+ .anomalous_dimensions = 0
+ };
+ it = um.find(key);
+ }
- it->second.anomalous_dimensions += chart_mls.num_anomalous_dimensions;
- it->second.normal_dimensions += chart_mls.num_normal_dimensions;
+ it->second.anomalous_dimensions += chart_mls.num_anomalous_dimensions;
+ it->second.normal_dimensions += chart_mls.num_normal_dimensions;
+ spinlock_unlock_cancelable(&host->type_anomaly_rate_spinlock);
+ }
}
rrdset_foreach_done(rsp);
@@ -1310,6 +1314,7 @@ void ml_host_new(RRDHOST *rh)
host->training_queue = Cfg.training_threads[times_called++ % Cfg.num_training_threads].training_queue;
netdata_mutex_init(&host->mutex);
+ spinlock_init(&host->type_anomaly_rate_spinlock);
host->ml_running = true;
rh->ml_host = (rrd_ml_host_t *) host;