diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-28 12:22:38 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-28 12:22:38 +0200 |
commit | 53a13ab8e110923d097968353a6bc1e22399480f (patch) | |
tree | a22fe110436844fcc0331073d37182adefbf0edc /libnetdata | |
parent | 1e9f2c7a2a866be27203c528067adecd283e0ceb (diff) |
replication fixes No 7 (#14053)
* move global statistics workers to a separate thread; query statistics per query source; query statistics for ML, exporters, backfilling; reset replication point in time every 10 seconds, instead of every 1; fix compilation warnings; optimize the replication queries code; prevent long tail of replication requests (big sleeps); provide query statistics about replication ; optimize replication sender when most senders are full; optimize replication_request_get_first_available(); reset replication completion calculation;
* remove workers utilization from global statistics thread
Diffstat (limited to 'libnetdata')
-rw-r--r-- | libnetdata/clocks/clocks.c | 15 | ||||
-rw-r--r-- | libnetdata/dictionary/dictionary.c | 15 | ||||
-rw-r--r-- | libnetdata/locks/locks.c | 30 | ||||
-rw-r--r-- | libnetdata/locks/locks.h | 8 |
4 files changed, 55 insertions, 13 deletions
diff --git a/libnetdata/clocks/clocks.c b/libnetdata/clocks/clocks.c index 9428c7e8b1..cabc0000e6 100644 --- a/libnetdata/clocks/clocks.c +++ b/libnetdata/clocks/clocks.c @@ -341,20 +341,21 @@ usec_t heartbeat_next(heartbeat_t *hb, usec_t tick) { void sleep_usec(usec_t usec) { // we expect microseconds (1.000.000 per second) // but timespec is nanoseconds (1.000.000.000 per second) - struct timespec rem, req = { + struct timespec rem = { 0, 0 }, req = { .tv_sec = (time_t) (usec / USEC_PER_SEC), .tv_nsec = (suseconds_t) ((usec % USEC_PER_SEC) * NSEC_PER_USEC) }; #ifdef __linux__ - while ((errno = clock_nanosleep(CLOCK_REALTIME, 0, &req, &rem)) != 0) { + while (clock_nanosleep(CLOCK_REALTIME, 0, &req, &rem) != 0) { #else - while ((errno = nanosleep(&req, &rem)) != 0) { + while (nanosleep(&req, &rem) != 0) { #endif - if (likely(errno == EINTR)) { - req.tv_sec = rem.tv_sec; - req.tv_nsec = rem.tv_nsec; - } else { + if (likely(errno == EINTR && (rem.tv_sec || rem.tv_nsec))) { + req = rem; + rem = (struct timespec){ 0, 0 }; + } + else { #ifdef __linux__ error("Cannot clock_nanosleep(CLOCK_REALTIME) for %llu microseconds.", usec); #else diff --git a/libnetdata/dictionary/dictionary.c b/libnetdata/dictionary/dictionary.c index c6b6b022f5..f03da25abf 100644 --- a/libnetdata/dictionary/dictionary.c +++ b/libnetdata/dictionary/dictionary.c @@ -199,7 +199,10 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it #define item_is_not_referenced_and_can_be_removed(dict, item) (item_is_not_referenced_and_can_be_removed_advanced(dict, item) == RC_ITEM_OK) static inline int item_is_not_referenced_and_can_be_removed_advanced(DICTIONARY *dict, DICTIONARY_ITEM *item); -static inline void pointer_index_init(DICTIONARY *dict) { +// ---------------------------------------------------------------------------- +// validate each pointer is indexed once - internal checks only + +static inline void pointer_index_init(DICTIONARY *dict __maybe_unused) { #ifdef NETDATA_INTERNAL_CHECKS netdata_mutex_init(&dict->global_pointer_registry_mutex); #else @@ -207,7 +210,7 @@ static inline void pointer_index_init(DICTIONARY *dict) { #endif } -static inline void pointer_destroy_index(DICTIONARY *dict) { +static inline void pointer_destroy_index(DICTIONARY *dict __maybe_unused) { #ifdef NETDATA_INTERNAL_CHECKS netdata_mutex_lock(&dict->global_pointer_registry_mutex); JudyHSFreeArray(&dict->global_pointer_registry, PJE0); @@ -216,7 +219,7 @@ static inline void pointer_destroy_index(DICTIONARY *dict) { ; #endif } -static inline void pointer_add(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item) { +static inline void pointer_add(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item __maybe_unused) { #ifdef NETDATA_INTERNAL_CHECKS netdata_mutex_lock(&dict->global_pointer_registry_mutex); Pvoid_t *PValue = JudyHSIns(&dict->global_pointer_registry, &item, sizeof(void *), PJE0); @@ -229,7 +232,7 @@ static inline void pointer_add(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM #endif } -static inline void pointer_check(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item) { +static inline void pointer_check(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item __maybe_unused) { #ifdef NETDATA_INTERNAL_CHECKS netdata_mutex_lock(&dict->global_pointer_registry_mutex); Pvoid_t *PValue = JudyHSGet(dict->global_pointer_registry, &item, sizeof(void *)); @@ -241,7 +244,7 @@ static inline void pointer_check(DICTIONARY *dict __maybe_unused, DICTIONARY_ITE #endif } -static inline void pointer_del(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item) { +static inline void pointer_del(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item __maybe_unused) { #ifdef NETDATA_INTERNAL_CHECKS netdata_mutex_lock(&dict->global_pointer_registry_mutex); int ret = JudyHSDel(&dict->global_pointer_registry, &item, sizeof(void *), PJE0); @@ -413,7 +416,7 @@ static inline void DICTIONARY_ENTRIES_MINUS1(DICTIONARY *dict) { __atomic_fetch_add(&dict->stats->ops.deletes, 1, __ATOMIC_RELAXED); __atomic_fetch_sub(&dict->stats->items.entries, 1, __ATOMIC_RELAXED); - size_t entries; + size_t entries; (void)entries; if(unlikely(is_dictionary_single_threaded(dict))) { dict->version++; entries = dict->entries++; diff --git a/libnetdata/locks/locks.c b/libnetdata/locks/locks.c index 8b5348678d..6677e220bf 100644 --- a/libnetdata/locks/locks.c +++ b/libnetdata/locks/locks.c @@ -278,6 +278,36 @@ int __netdata_rwlock_trywrlock(netdata_rwlock_t *rwlock) { return ret; } +// ---------------------------------------------------------------------------- +// spinlock implementation +// https://www.youtube.com/watch?v=rmGJc9PXpuE&t=41s + +void netdata_spinlock_init(SPINLOCK *spinlock) { + *spinlock = NETDATA_SPINLOCK_INITIALIZER; +} + +void netdata_spinlock_lock(SPINLOCK *spinlock) { + static const struct timespec ns = { .tv_sec = 0, .tv_nsec = 1 }; + bool expected = false, desired = true; + + for(int i = 1; + __atomic_load_n(&spinlock->locked, __ATOMIC_RELAXED) || + !__atomic_compare_exchange_n(&spinlock->locked, &expected, desired, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) + ; i++ + ) { + + if(unlikely(i == 8)) { + i = 0; + nanosleep(&ns, NULL); + } + } + // we have the lock +} + +void netdata_spinlock_unlock(SPINLOCK *spinlock) { + __atomic_store_n(&spinlock->locked, false, __ATOMIC_RELEASE); +} + #ifdef NETDATA_TRACE_RWLOCKS // ---------------------------------------------------------------------------- diff --git a/libnetdata/locks/locks.h b/libnetdata/locks/locks.h index a3aa421b9a..4d2d1655ca 100644 --- a/libnetdata/locks/locks.h +++ b/libnetdata/locks/locks.h @@ -9,6 +9,14 @@ typedef pthread_mutex_t netdata_mutex_t; #define NETDATA_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER +typedef struct netdata_spinlock { + bool locked; +} SPINLOCK; +#define NETDATA_SPINLOCK_INITIALIZER (SPINLOCK){ .locked = false } +void netdata_spinlock_init(SPINLOCK *spinlock); +void netdata_spinlock_lock(SPINLOCK *spinlock); +void netdata_spinlock_unlock(SPINLOCK *spinlock); + #ifdef NETDATA_TRACE_RWLOCKS typedef struct netdata_rwlock_locker { pid_t pid; |