summaryrefslogtreecommitdiffstats
path: root/libnetdata
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-28 12:22:38 +0200
committerGitHub <noreply@github.com>2022-11-28 12:22:38 +0200
commit53a13ab8e110923d097968353a6bc1e22399480f (patch)
treea22fe110436844fcc0331073d37182adefbf0edc /libnetdata
parent1e9f2c7a2a866be27203c528067adecd283e0ceb (diff)
replication fixes No 7 (#14053)
* move global statistics workers to a separate thread; query statistics per query source; query statistics for ML, exporters, backfilling; reset replication point in time every 10 seconds, instead of every 1; fix compilation warnings; optimize the replication queries code; prevent long tail of replication requests (big sleeps); provide query statistics about replication ; optimize replication sender when most senders are full; optimize replication_request_get_first_available(); reset replication completion calculation; * remove workers utilization from global statistics thread
Diffstat (limited to 'libnetdata')
-rw-r--r--libnetdata/clocks/clocks.c15
-rw-r--r--libnetdata/dictionary/dictionary.c15
-rw-r--r--libnetdata/locks/locks.c30
-rw-r--r--libnetdata/locks/locks.h8
4 files changed, 55 insertions, 13 deletions
diff --git a/libnetdata/clocks/clocks.c b/libnetdata/clocks/clocks.c
index 9428c7e8b1..cabc0000e6 100644
--- a/libnetdata/clocks/clocks.c
+++ b/libnetdata/clocks/clocks.c
@@ -341,20 +341,21 @@ usec_t heartbeat_next(heartbeat_t *hb, usec_t tick) {
void sleep_usec(usec_t usec) {
// we expect microseconds (1.000.000 per second)
// but timespec is nanoseconds (1.000.000.000 per second)
- struct timespec rem, req = {
+ struct timespec rem = { 0, 0 }, req = {
.tv_sec = (time_t) (usec / USEC_PER_SEC),
.tv_nsec = (suseconds_t) ((usec % USEC_PER_SEC) * NSEC_PER_USEC)
};
#ifdef __linux__
- while ((errno = clock_nanosleep(CLOCK_REALTIME, 0, &req, &rem)) != 0) {
+ while (clock_nanosleep(CLOCK_REALTIME, 0, &req, &rem) != 0) {
#else
- while ((errno = nanosleep(&req, &rem)) != 0) {
+ while (nanosleep(&req, &rem) != 0) {
#endif
- if (likely(errno == EINTR)) {
- req.tv_sec = rem.tv_sec;
- req.tv_nsec = rem.tv_nsec;
- } else {
+ if (likely(errno == EINTR && (rem.tv_sec || rem.tv_nsec))) {
+ req = rem;
+ rem = (struct timespec){ 0, 0 };
+ }
+ else {
#ifdef __linux__
error("Cannot clock_nanosleep(CLOCK_REALTIME) for %llu microseconds.", usec);
#else
diff --git a/libnetdata/dictionary/dictionary.c b/libnetdata/dictionary/dictionary.c
index c6b6b022f5..f03da25abf 100644
--- a/libnetdata/dictionary/dictionary.c
+++ b/libnetdata/dictionary/dictionary.c
@@ -199,7 +199,10 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it
#define item_is_not_referenced_and_can_be_removed(dict, item) (item_is_not_referenced_and_can_be_removed_advanced(dict, item) == RC_ITEM_OK)
static inline int item_is_not_referenced_and_can_be_removed_advanced(DICTIONARY *dict, DICTIONARY_ITEM *item);
-static inline void pointer_index_init(DICTIONARY *dict) {
+// ----------------------------------------------------------------------------
+// validate each pointer is indexed once - internal checks only
+
+static inline void pointer_index_init(DICTIONARY *dict __maybe_unused) {
#ifdef NETDATA_INTERNAL_CHECKS
netdata_mutex_init(&dict->global_pointer_registry_mutex);
#else
@@ -207,7 +210,7 @@ static inline void pointer_index_init(DICTIONARY *dict) {
#endif
}
-static inline void pointer_destroy_index(DICTIONARY *dict) {
+static inline void pointer_destroy_index(DICTIONARY *dict __maybe_unused) {
#ifdef NETDATA_INTERNAL_CHECKS
netdata_mutex_lock(&dict->global_pointer_registry_mutex);
JudyHSFreeArray(&dict->global_pointer_registry, PJE0);
@@ -216,7 +219,7 @@ static inline void pointer_destroy_index(DICTIONARY *dict) {
;
#endif
}
-static inline void pointer_add(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item) {
+static inline void pointer_add(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item __maybe_unused) {
#ifdef NETDATA_INTERNAL_CHECKS
netdata_mutex_lock(&dict->global_pointer_registry_mutex);
Pvoid_t *PValue = JudyHSIns(&dict->global_pointer_registry, &item, sizeof(void *), PJE0);
@@ -229,7 +232,7 @@ static inline void pointer_add(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM
#endif
}
-static inline void pointer_check(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item) {
+static inline void pointer_check(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item __maybe_unused) {
#ifdef NETDATA_INTERNAL_CHECKS
netdata_mutex_lock(&dict->global_pointer_registry_mutex);
Pvoid_t *PValue = JudyHSGet(dict->global_pointer_registry, &item, sizeof(void *));
@@ -241,7 +244,7 @@ static inline void pointer_check(DICTIONARY *dict __maybe_unused, DICTIONARY_ITE
#endif
}
-static inline void pointer_del(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item) {
+static inline void pointer_del(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item __maybe_unused) {
#ifdef NETDATA_INTERNAL_CHECKS
netdata_mutex_lock(&dict->global_pointer_registry_mutex);
int ret = JudyHSDel(&dict->global_pointer_registry, &item, sizeof(void *), PJE0);
@@ -413,7 +416,7 @@ static inline void DICTIONARY_ENTRIES_MINUS1(DICTIONARY *dict) {
__atomic_fetch_add(&dict->stats->ops.deletes, 1, __ATOMIC_RELAXED);
__atomic_fetch_sub(&dict->stats->items.entries, 1, __ATOMIC_RELAXED);
- size_t entries;
+ size_t entries; (void)entries;
if(unlikely(is_dictionary_single_threaded(dict))) {
dict->version++;
entries = dict->entries++;
diff --git a/libnetdata/locks/locks.c b/libnetdata/locks/locks.c
index 8b5348678d..6677e220bf 100644
--- a/libnetdata/locks/locks.c
+++ b/libnetdata/locks/locks.c
@@ -278,6 +278,36 @@ int __netdata_rwlock_trywrlock(netdata_rwlock_t *rwlock) {
return ret;
}
+// ----------------------------------------------------------------------------
+// spinlock implementation
+// https://www.youtube.com/watch?v=rmGJc9PXpuE&t=41s
+
+void netdata_spinlock_init(SPINLOCK *spinlock) {
+ *spinlock = NETDATA_SPINLOCK_INITIALIZER;
+}
+
+void netdata_spinlock_lock(SPINLOCK *spinlock) {
+ static const struct timespec ns = { .tv_sec = 0, .tv_nsec = 1 };
+ bool expected = false, desired = true;
+
+ for(int i = 1;
+ __atomic_load_n(&spinlock->locked, __ATOMIC_RELAXED) ||
+ !__atomic_compare_exchange_n(&spinlock->locked, &expected, desired, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)
+ ; i++
+ ) {
+
+ if(unlikely(i == 8)) {
+ i = 0;
+ nanosleep(&ns, NULL);
+ }
+ }
+ // we have the lock
+}
+
+void netdata_spinlock_unlock(SPINLOCK *spinlock) {
+ __atomic_store_n(&spinlock->locked, false, __ATOMIC_RELEASE);
+}
+
#ifdef NETDATA_TRACE_RWLOCKS
// ----------------------------------------------------------------------------
diff --git a/libnetdata/locks/locks.h b/libnetdata/locks/locks.h
index a3aa421b9a..4d2d1655ca 100644
--- a/libnetdata/locks/locks.h
+++ b/libnetdata/locks/locks.h
@@ -9,6 +9,14 @@
typedef pthread_mutex_t netdata_mutex_t;
#define NETDATA_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER
+typedef struct netdata_spinlock {
+ bool locked;
+} SPINLOCK;
+#define NETDATA_SPINLOCK_INITIALIZER (SPINLOCK){ .locked = false }
+void netdata_spinlock_init(SPINLOCK *spinlock);
+void netdata_spinlock_lock(SPINLOCK *spinlock);
+void netdata_spinlock_unlock(SPINLOCK *spinlock);
+
#ifdef NETDATA_TRACE_RWLOCKS
typedef struct netdata_rwlock_locker {
pid_t pid;