summaryrefslogtreecommitdiffstats
path: root/libnetdata
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-02-07 22:26:16 +0200
committerGitHub <noreply@github.com>2023-02-07 22:26:16 +0200
commit8d3c3356ddeb6d62fa76b197e086e3e7fc5eb3dd (patch)
treee7661d49d0a0044cf1a5f1d3e0e6cc7dbc27f7a6 /libnetdata
parent12d92fe308f4107f67149ec9105b69ce2610a4f2 (diff)
Streaming interpolated values (#14431)
* first commit - untested * fix wrong begin command * added set v2 too * debug to log stream buffer * debug to log stream buffer * faster streaming printing * mark charts and dimensions as collected * use stream points even if sender is not enabled * comment out stream debug log * parse null as nan * custom begin v2 * custom set v2; replication now copies the anomalous flag too * custom end v2 * enabled stream log test * renamed to BEGIN2, SET2, END2 * dont mix up replay and v2 members in user object * fix typo * cleanup * support to v2 to v1 proxying * mark updated dimensions as such * do not log unknown flags * comment out stream debug log * send also the chart id on BEGIN2, v2 to v2 * update the data collections counter * v2 values are transferred in hex * faster hex parsing * a little more generic hex and dec printing and parsing * fix hex parsing * minor optimization in dbengine api * turn debugging into info message * generalized the timings tracking, so that it can be used in more places * commented out debug info * renamed conflicting variable with macro * remove wrong edits * integrated ML and added cleanup in case parsing is interrupted * disable data collection locking during v2 * cleanup stale ML locks; send updated chart variables during v2; add info to find stale locks * inject an END2 between repeated BEGIN2 from rrdset_done() * test: remove lockless single-threaded logic from dictionary and aral and apply the right acquire/release memory order to reference counters * more fine grained dictionary atomics * remove unecessary return values * pointer validation under NETDATA_DICTIONARY_VALIDATE_POINTERS * Revert "pointer validation under NETDATA_DICTIONARY_VALIDATE_POINTERS" This reverts commit 846cdf2713e2a7ee2ff797f38db11714228800e9. * Revert "remove unecessary return values" This reverts commit 8c87d30f4d86f0f5d6b4562cf74fe7447138bbff. * Revert "more fine grained dictionary atomics" This reverts commit 984aec4234a340d197d45239ff9a10fd479fcf3c. * Revert "test: remove lockless single-threaded logic from dictionary and aral and apply the right acquire/release memory order to reference counters" This reverts commit c460b3d0ad497d2641bd0ea1d63cec7c052e74e4. * Apply again "pointer validation under NETDATA_DICTIONARY_VALIDATE_POINTERS" while keeping the improved atomic operations. This reverts commit f158d009 * fix last commit * fix last commit again * optimizations in dbengine * do not send anomaly bit on non-supporting agents (send it when the INTERPOLATED capability is available) * break long empty-points-loops in rrdset_done() * decide page alignment on new page allocation, not on every point collected * create max size pages but no smaller than 1/3 * Fix compilation when --disable-ml is specified * Return false * fixes for NETDATA_LOG_REPLICATION_REQUESTS * added compile option NETDATA_WITHOUT_WORKERS_LATENCY * put timings in BEGIN2, SET2, END2 * isolate begin2 ml * revert repositioning data collection lock * fixed multi-threading of statistics * do not lookup dimensions all the time if they come in the same order * update used on iteration, not on every points; also do better error handling --------- Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'libnetdata')
-rw-r--r--libnetdata/buffer/buffer.c88
-rw-r--r--libnetdata/buffer/buffer.h3
-rw-r--r--libnetdata/dictionary/dictionary.c243
-rw-r--r--libnetdata/inlined.h31
-rw-r--r--libnetdata/libnetdata.c113
-rw-r--r--libnetdata/libnetdata.h54
-rw-r--r--libnetdata/storage_number/storage_number.h4
-rw-r--r--libnetdata/worker_utilization/worker_utilization.c16
8 files changed, 401 insertions, 151 deletions
diff --git a/libnetdata/buffer/buffer.c b/libnetdata/buffer/buffer.c
index eeb283209f..6cd6ae38a5 100644
--- a/libnetdata/buffer/buffer.c
+++ b/libnetdata/buffer/buffer.c
@@ -108,21 +108,44 @@ void buffer_print_llu(BUFFER *wb, unsigned long long uvalue)
{
buffer_need_bytes(wb, 50);
+ switch(uvalue) {
+ case 0:
+ buffer_fast_strcat(wb, "0", 1);
+ return;
+
+ case 1:
+ buffer_fast_strcat(wb, "1", 1);
+ return;
+
+ case 5:
+ buffer_fast_strcat(wb, "5", 1);
+ return;
+
+ case 10:
+ buffer_fast_strcat(wb, "10", 2);
+ return;
+
+ default:
+ break;
+ }
+
char *str = &wb->buffer[wb->len];
char *wstr = str;
switch (sizeof(void *)) {
- case 4:
- wstr = (uvalue > (unsigned long long) 0xffffffff) ? print_number_llu_r(wstr, uvalue) :
- print_number_lu_r(wstr, uvalue);
- break;
- case 8:
- do {
- *wstr++ = (char) ('0' + (uvalue % 10));
- } while (uvalue /= 10);
- break;
- default:
- fatal("Netdata supports only 32-bit & 64-bit systems.");
+ case 8:
+ do {
+ *wstr++ = (char) ('0' + (uvalue % 10));
+ } while (uvalue /= 10);
+ break;
+
+ case 4:
+ wstr = (uvalue > (unsigned long long) 0xffffffff) ? print_number_llu_r(wstr, uvalue) :
+ print_number_lu_r(wstr, uvalue);
+ break;
+
+ default:
+ fatal("Netdata supports only 32-bit & 64-bit systems.");
}
// terminate it
@@ -132,8 +155,9 @@ void buffer_print_llu(BUFFER *wb, unsigned long long uvalue)
char *begin = str, *end = wstr - 1, aux;
while (end > begin) aux = *end, *end-- = *begin, *begin++ = aux;
+ size_t len = wstr - str;
// return the buffer length
- wb->len += wstr - str;
+ wb->len += len;
}
void buffer_print_ll(BUFFER *wb, long long value)
@@ -167,7 +191,7 @@ static unsigned char bits03_to_hex[16] = {
[15] = 'F'
};
-void buffer_print_llu_hex(BUFFER *wb, unsigned long long value)
+inline void buffer_print_llu_hex(BUFFER *wb, unsigned long long value)
{
unsigned char buffer[sizeof(unsigned long long) * 2 + 2 + 1]; // 8 bytes * 2 + '0x' + '\0'
unsigned char *e = &buffer[sizeof(unsigned long long) * 2 + 2];
@@ -196,7 +220,16 @@ void buffer_print_llu_hex(BUFFER *wb, unsigned long long value)
buffer_fast_strcat(wb, (char *)p, e - p);
}
-void buffer_fast_strcat(BUFFER *wb, const char *txt, size_t len) {
+void buffer_print_ll_hex(BUFFER *wb, long long value) {
+ if(value < 0) {
+ buffer_fast_strcat(wb, "-", 1);
+ value = -value;
+ }
+
+ buffer_print_llu_hex(wb, value);
+}
+
+inline void buffer_fast_strcat(BUFFER *wb, const char *txt, size_t len) {
if(unlikely(!txt || !*txt)) return;
buffer_need_bytes(wb, len + 1);
@@ -214,6 +247,27 @@ void buffer_fast_strcat(BUFFER *wb, const char *txt, size_t len) {
wb->buffer[wb->len] = '\0';
}
+void buffer_print_sn_flags(BUFFER *wb, SN_FLAGS flags, bool send_anomaly_bit) {
+ if(unlikely(flags == SN_EMPTY_SLOT)) {
+ buffer_fast_strcat(wb, "E", 1);
+ return;
+ }
+
+ size_t printed = 0;
+ if(likely(send_anomaly_bit && (flags & SN_FLAG_NOT_ANOMALOUS))) {
+ buffer_fast_strcat(wb, "A", 1);
+ printed++;
+ }
+
+ if(unlikely(flags & SN_FLAG_RESET)) {
+ buffer_fast_strcat(wb, "R", 1);
+ printed++;
+ }
+
+ if(!printed)
+ buffer_fast_strcat(wb, "''", 2);
+}
+
void buffer_strcat(BUFFER *wb, const char *txt)
{
// buffer_sprintf(wb, "%s", txt);
@@ -234,7 +288,7 @@ void buffer_strcat(BUFFER *wb, const char *txt)
wb->len = len;
buffer_overflow_check(wb);
- if(*txt) {
+ if(unlikely(*txt)) {
debug(D_WEB_BUFFER, "strcat(): increasing web_buffer at position %zu, size = %zu\n", wb->len, wb->size);
len = strlen(txt);
buffer_fast_strcat(wb, txt, len);
@@ -242,7 +296,7 @@ void buffer_strcat(BUFFER *wb, const char *txt)
else {
// terminate the string
// without increasing the length
- buffer_need_bytes(wb, (size_t)1);
+ buffer_need_bytes(wb, 1);
wb->buffer[wb->len] = '\0';
}
}
@@ -361,7 +415,7 @@ void buffer_sprintf(BUFFER *wb, const char *fmt, ...)
void buffer_rrd_value(BUFFER *wb, NETDATA_DOUBLE value)
{
- buffer_need_bytes(wb, 50);
+ buffer_need_bytes(wb, 512);
if(isnan(value) || isinf(value)) {
buffer_strcat(wb, "null");
diff --git a/libnetdata/buffer/buffer.h b/libnetdata/buffer/buffer.h
index 0fa3495b4a..0b12c81752 100644
--- a/libnetdata/buffer/buffer.h
+++ b/libnetdata/buffer/buffer.h
@@ -74,6 +74,8 @@ void buffer_strcat_htmlescape(BUFFER *wb, const char *txt);
void buffer_char_replace(BUFFER *wb, char from, char to);
+void buffer_print_sn_flags(BUFFER *wb, SN_FLAGS flags, bool send_anomaly_bit);
+
char *print_number_lu_r(char *str, unsigned long uvalue);
char *print_number_llu_r(char *str, unsigned long long uvalue);
char *print_number_llu_r_smart(char *str, unsigned long long uvalue);
@@ -81,6 +83,7 @@ char *print_number_llu_r_smart(char *str, unsigned long long uvalue);
void buffer_print_llu(BUFFER *wb, unsigned long long uvalue);
void buffer_print_ll(BUFFER *wb, long long value);
void buffer_print_llu_hex(BUFFER *wb, unsigned long long value);
+void buffer_print_ll_hex(BUFFER *wb, long long value);
static inline void buffer_need_bytes(BUFFER *buffer, size_t needed_free_size) {
if(unlikely(buffer->size - buffer->len < needed_free_size))
diff --git a/libnetdata/dictionary/dictionary.c b/libnetdata/dictionary/dictionary.c
index 061b671abf..cb8aef9e08 100644
--- a/libnetdata/dictionary/dictionary.c
+++ b/libnetdata/dictionary/dictionary.c
@@ -10,9 +10,9 @@ typedef enum __attribute__ ((__packed__)) {
DICT_FLAG_DESTROYED = (1 << 0), // this dictionary has been destroyed
} DICT_FLAGS;
-#define dict_flag_check(dict, flag) (__atomic_load_n(&((dict)->flags), __ATOMIC_SEQ_CST) & (flag))
-#define dict_flag_set(dict, flag) __atomic_or_fetch(&((dict)->flags), flag, __ATOMIC_SEQ_CST)
-#define dict_flag_clear(dict, flag) __atomic_and_fetch(&((dict)->flags), ~(flag), __ATOMIC_SEQ_CST)
+#define dict_flag_check(dict, flag) (__atomic_load_n(&((dict)->flags), __ATOMIC_RELAXED) & (flag))
+#define dict_flag_set(dict, flag) __atomic_or_fetch(&((dict)->flags), flag, __ATOMIC_RELAXED)
+#define dict_flag_clear(dict, flag) __atomic_and_fetch(&((dict)->flags), ~(flag), __ATOMIC_RELAXED)
// flags macros
#define is_dictionary_destroyed(dict) dict_flag_check(dict, DICT_FLAG_DESTROYED)
@@ -37,13 +37,13 @@ typedef enum __attribute__ ((__packed__)) item_flags {
// IMPORTANT: This is 8-bit
} ITEM_FLAGS;
-#define item_flag_check(item, flag) (__atomic_load_n(&((item)->flags), __ATOMIC_SEQ_CST) & (flag))
-#define item_flag_set(item, flag) __atomic_or_fetch(&((item)->flags), flag, __ATOMIC_SEQ_CST)
-#define item_flag_clear(item, flag) __atomic_and_fetch(&((item)->flags), ~(flag), __ATOMIC_SEQ_CST)
+#define item_flag_check(item, flag) (__atomic_load_n(&((item)->flags), __ATOMIC_RELAXED) & (flag))
+#define item_flag_set(item, flag) __atomic_or_fetch(&((item)->flags), flag, __ATOMIC_RELAXED)
+#define item_flag_clear(item, flag) __atomic_and_fetch(&((item)->flags), ~(flag), __ATOMIC_RELAXED)
-#define item_shared_flag_check(item, flag) (__atomic_load_n(&((item)->shared->flags), __ATOMIC_SEQ_CST) & (flag))
-#define item_shared_flag_set(item, flag) __atomic_or_fetch(&((item)->shared->flags), flag, __ATOMIC_SEQ_CST)
-#define item_shared_flag_clear(item, flag) __atomic_and_fetch(&((item)->shared->flags), ~(flag), __ATOMIC_SEQ_CST)
+#define item_shared_flag_check(item, flag) (__atomic_load_n(&((item)->shared->flags), __ATOMIC_RELAXED) & (flag))
+#define item_shared_flag_set(item, flag) __atomic_or_fetch(&((item)->shared->flags), flag, __ATOMIC_RELAXED)
+#define item_shared_flag_clear(item, flag) __atomic_and_fetch(&((item)->shared->flags), ~(flag), __ATOMIC_RELAXED)
#define REFCOUNT_DELETING (-100)
@@ -175,7 +175,7 @@ struct dictionary {
long int referenced_items; // how many items of the dictionary are currently being used by 3rd parties
long int pending_deletion_items; // how many items of the dictionary have been deleted, but have not been removed yet
-#ifdef NETDATA_INTERNAL_CHECKS
+#ifdef NETDATA_DICTIONARY_VALIDATE_POINTERS
netdata_mutex_t global_pointer_registry_mutex;
Pvoid_t global_pointer_registry;
#endif
@@ -205,59 +205,47 @@ static inline int item_is_not_referenced_and_can_be_removed_advanced(DICTIONARY
// ----------------------------------------------------------------------------
// validate each pointer is indexed once - internal checks only
+#ifdef NETDATA_DICTIONARY_VALIDATE_POINTERS
static inline void pointer_index_init(DICTIONARY *dict __maybe_unused) {
-#ifdef NETDATA_INTERNAL_CHECKS
netdata_mutex_init(&dict->global_pointer_registry_mutex);
-#else
- ;
-#endif
}
static inline void pointer_destroy_index(DICTIONARY *dict __maybe_unused) {
-#ifdef NETDATA_INTERNAL_CHECKS
netdata_mutex_lock(&dict->global_pointer_registry_mutex);
JudyHSFreeArray(&dict->global_pointer_registry, PJE0);
netdata_mutex_unlock(&dict->global_pointer_registry_mutex);
-#else
- ;
-#endif
}
static inline void pointer_add(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item __maybe_unused) {
-#ifdef NETDATA_INTERNAL_CHECKS
netdata_mutex_lock(&dict->global_pointer_registry_mutex);
Pvoid_t *PValue = JudyHSIns(&dict->global_pointer_registry, &item, sizeof(void *), PJE0);
if(*PValue != NULL)
fatal("pointer already exists in registry");
*PValue = item;
netdata_mutex_unlock(&dict->global_pointer_registry_mutex);
-#else
- ;
-#endif
}
static inline void pointer_check(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item __maybe_unused) {
-#ifdef NETDATA_INTERNAL_CHECKS
netdata_mutex_lock(&dict->global_pointer_registry_mutex);
Pvoid_t *PValue = JudyHSGet(dict->global_pointer_registry, &item, sizeof(void *));
if(PValue == NULL)
fatal("pointer is not found in registry");
netdata_mutex_unlock(&dict->global_pointer_registry_mutex);
-#else
- ;
-#endif
}
static inline void pointer_del(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item __maybe_unused) {
-#ifdef NETDATA_INTERNAL_CHECKS
netdata_mutex_lock(&dict->global_pointer_registry_mutex);
int ret = JudyHSDel(&dict->global_pointer_registry, &item, sizeof(void *), PJE0);
if(!ret)
fatal("pointer to be deleted does not exist in registry");
netdata_mutex_unlock(&dict->global_pointer_registry_mutex);
-#else
- ;
-#endif
}
+#else // !NETDATA_DICTIONARY_VALIDATE_POINTERS
+#define pointer_index_init(dict) debug_dummy()
+#define pointer_destroy_index(dict) debug_dummy()
+#define pointer_add(dict, item) debug_dummy()
+#define pointer_check(dict, item) debug_dummy()
+#define pointer_del(dict, item) debug_dummy()
+#endif // !NETDATA_DICTIONARY_VALIDATE_POINTERS
// ----------------------------------------------------------------------------
// memory statistics
@@ -298,7 +286,7 @@ static inline void dictionary_hooks_allocate(DICTIONARY *dict) {
static inline size_t dictionary_hooks_free(DICTIONARY *dict) {
if(!dict->hooks) return 0;
- REFCOUNT links = __atomic_sub_fetch(&dict->hooks->links, 1, __ATOMIC_SEQ_CST);
+ REFCOUNT links = __atomic_sub_fetch(&dict->hooks->links, 1, __ATOMIC_ACQUIRE);
if(links == 0) {
freez(dict->hooks);
dict->hooks = NULL;
@@ -358,7 +346,7 @@ size_t dictionary_version(DICTIONARY *dict) {
// this is required for views to return the right number
garbage_collect_pending_deletes(dict);
- return __atomic_load_n(&dict->version, __ATOMIC_SEQ_CST);
+ return __atomic_load_n(&dict->version, __ATOMIC_RELAXED);
}
size_t dictionary_entries(DICTIONARY *dict) {
if(unlikely(!dict)) return 0;
@@ -366,7 +354,7 @@ size_t dictionary_entries(DICTIONARY *dict) {
// this is required for views to return the right number
garbage_collect_pending_deletes(dict);
- long int entries = __atomic_load_n(&dict->entries, __ATOMIC_SEQ_CST);
+ long int entries = __atomic_load_n(&dict->entries, __ATOMIC_RELAXED);
if(entries < 0)
fatal("DICTIONARY: entries is negative: %ld", entries);
@@ -375,7 +363,7 @@ size_t dictionary_entries(DICTIONARY *dict) {
size_t dictionary_referenced_items(DICTIONARY *dict) {
if(unlikely(!dict)) return 0;
- long int referenced_items = __atomic_load_n(&dict->referenced_items, __ATOMIC_SEQ_CST);
+ long int referenced_items = __atomic_load_n(&dict->referenced_items, __ATOMIC_RELAXED);
if(referenced_items < 0)
fatal("DICTIONARY: referenced items is negative: %ld", referenced_items);
@@ -387,7 +375,7 @@ long int dictionary_stats_for_registry(DICTIONARY *dict) {
return (dict->stats->memory.index + dict->stats->memory.dict);
}
void dictionary_version_increment(DICTIONARY *dict) {
- __atomic_fetch_add(&dict->version, 1, __ATOMIC_SEQ_CST);
+ __atomic_fetch_add(&dict->version, 1, __ATOMIC_RELAXED);
}
// ----------------------------------------------------------------------------
@@ -409,9 +397,9 @@ static inline void DICTIONARY_ENTRIES_PLUS1(DICTIONARY *dict) {
}
else {
- __atomic_fetch_add(&dict->version, 1, __ATOMIC_SEQ_CST);
- __atomic_fetch_add(&dict->entries, 1, __ATOMIC_SEQ_CST);
- __atomic_fetch_add(&dict->referenced_items, 1, __ATOMIC_SEQ_CST);
+ __atomic_fetch_add(&dict->version, 1, __ATOMIC_RELAXED);
+ __atomic_fetch_add(&dict->entries, 1, __ATOMIC_RELAXED);
+ __atomic_fetch_add(&dict->referenced_items, 1, __ATOMIC_RELAXED);
}
}
static inline void DICTIONARY_ENTRIES_MINUS1(DICTIONARY *dict) {
@@ -425,17 +413,15 @@ static inline void DICTIONARY_ENTRIES_MINUS1(DICTIONARY *dict) {
entries = dict->entries++;
}
else {
- __atomic_fetch_add(&dict->version, 1, __ATOMIC_SEQ_CST);
- entries = __atomic_fetch_sub(&dict->entries, 1, __ATOMIC_SEQ_CST);
+ __atomic_fetch_add(&dict->version, 1, __ATOMIC_RELAXED);
+ entries = __atomic_fetch_sub(&dict->entries, 1, __ATOMIC_RELAXED);
}
-#ifdef NETDATA_INTERNAL_CHECKS
- if(unlikely(entries == 0))
- fatal("DICT: negative number of entries in dictionary created from %s() (%zu@%s)",
- dict->creation_function,
- dict->creation_line,
- dict->creation_file);
-#endif
+ internal_fatal(entries == 0,
+ "DICT: negative number of entries in dictionary created from %s() (%zu@%s)",
+ dict->creation_function,
+ dict->creation_line,
+ dict->creation_file);
}
static inline void DICTIONARY_VALUE_RESETS_PLUS1(DICTIONARY *dict) {
__atomic_fetch_add(&dict->stats->ops.resets, 1, __ATOMIC_RELAXED);
@@ -443,7 +429,7 @@ static inline void DICTIONARY_VALUE_RESETS_PLUS1(DICTIONARY *dict) {
if(unlikely(is_dictionary_single_threaded(dict)))
dict->version++;
else
- __atomic_fetch_add(&dict->version, 1, __ATOMIC_SEQ_CST);
+ __atomic_fetch_add(&dict->version, 1, __ATOMIC_RELAXED);
}
static inline void DICTIONARY_STATS_TRAVERSALS_PLUS1(DICTIONARY *dict) {
__atomic_fetch_add(&dict->stats->ops.traversals, 1, __ATOMIC_RELAXED);
@@ -464,16 +450,16 @@ static inline void DICTIONARY_STATS_SEARCH_IGNORES_PLUS1(DICTIONARY *dict) {
__atomic_fetch_add(&dict->stats->spin_locks.search_spins, 1, __ATOMIC_RELAXED);
}
static inline void DICTIONARY_STATS_CALLBACK_INSERTS_PLUS1(DICTIONARY *dict) {
- __atomic_fetch_add(&dict->stats->callbacks.inserts, 1, __ATOMIC_RELAXED);
+ __atomic_fetch_add(&dict->stats->callbacks.inserts, 1, __ATOMIC_RELEASE);
}
static inline void DICTIONARY_STATS_CALLBACK_CONFLICTS_PLUS1(DICTIONARY *dict) {
- __atomic_fetch_add(&dict->stats->callbacks.conflicts, 1, __ATOMIC_RELAXED);
+ __atomic_fetch_add(&dict->stats->callbacks.conflicts, 1, __ATOMIC_RELEASE);
}
static inline void DICTIONARY_STATS_CALLBACK_REACTS_PLUS1(DICTIONARY *dict) {
- __atomic_fetch_add(&dict->stats->callbacks.reacts, 1, __ATOMIC_RELAXED);
+ __atomic_fetch_add(&dict->stats->callbacks.reacts, 1, __ATOMIC_RELEASE);
}
static inline void DICTIONARY_STATS_CALLBACK_DELETES_PLUS1(DICTIONARY *dict) {
- __atomic_fetch_add(&dict->stats->callbacks.deletes, 1, __ATOMIC_RELAXED);
+ __atomic_fetch_add(&dict->stats->callbacks.deletes, 1, __ATOMIC_RELEASE);
}
static inline void DICTIONARY_STATS_GARBAGE_COLLECTIONS_PLUS1(DICTIONARY *dict) {
__atomic_fetch_add(&dict->stats->ops.garbage_collections, 1, __ATOMIC_RELAXED);
@@ -496,52 +482,48 @@ static inline void DICTIONARY_STATS_DICT_FLUSHES_PLUS1(DICTIONARY *dict) {
__atomic_fetch_add(&dict->stats->ops.flushes, 1, __ATOMIC_RELAXED);
}
-static inline long int DICTIONARY_REFERENCED_ITEMS_PLUS1(DICTIONARY *dict) {
+static inline void DICTIONARY_REFERENCED_ITEMS_PLUS1(DICTIONARY *dict) {
__atomic_fetch_add(&dict->stats->items.referenced, 1, __ATOMIC_RELAXED);
if(unlikely(is_dictionary_single_threaded(dict)))
- return ++dict->referenced_items;
+ ++dict->referenced_items;
else
- return __atomic_add_fetch(&dict->referenced_items, 1, __ATOMIC_SEQ_CST);
+ __atomic_add_fetch(&dict->referenced_items, 1, __ATOMIC_RELAXED);
}
-static inline long int DICTIONARY_REFERENCED_ITEMS_MINUS1(DICTIONARY *dict) {
+static inline void DICTIONARY_REFERENCED_ITEMS_MINUS1(DICTIONARY *dict) {
__atomic_fetch_sub(&dict->stats->items.referenced, 1, __ATOMIC_RELAXED);
- long int referenced_items;
+ long int referenced_items; (void)referenced_items;
if(unlikely(is_dictionary_single_threaded(dict)))
referenced_items = --dict->referenced_items;
else
referenced_items = __atomic_sub_fetch(&dict->referenced_items, 1, __ATOMIC_SEQ_CST);
-#ifdef NETDATA_INTERNAL_CHECKS
- if(unlikely(referenced_items < 0))
- fatal("DICT: negative number of referenced items (%ld) in dictionary created from %s() (%zu@%s)",
- referenced_items,
- dict->creation_function,
- dict->creation_line,
- dict->creation_file);
-#endif
-
- return referenced_items;
+ internal_fatal(referenced_items < 0,
+ "DICT: negative number of referenced items (%ld) in dictionary created from %s() (%zu@%s)",
+ referenced_items,
+ dict->creation_function,
+ dict->creation_line,
+ dict->creation_file);
}
-static inline long int DICTIONARY_PENDING_DELETES_PLUS1(DICTIONARY *dict) {
+static inline void DICTIONARY_PENDING_DELETES_PLUS1(DICTIONARY *dict) {
__atomic_fetch_add(&dict->stats->items.pending_deletion, 1, __ATOMIC_RELAXED);
if(unlikely(is_dictionary_single_threaded(dict)))
- return ++dict->pending_deletion_items;
+ ++dict->pending_deletion_items;
else
- return __atomic_add_fetch(&dict->pending_deletion_items, 1, __ATOMIC_SEQ_CST);
+ __atomic_add_fetch(&dict->pending_deletion_items, 1, __ATOMIC_RELEASE);
}
static inline long int DICTIONARY_PENDING_DELETES_MINUS1(DICTIONARY *dict) {
- __atomic_fetch_sub(&dict->stats->items.pending_deletion, 1, __ATOMIC_RELAXED);
+ __atomic_fetch_sub(&dict->stats->items.pending_deletion, 1, __ATOMIC_RELEASE);
if(unlikely(is_dictionary_single_threaded(dict)))
return --dict->pending_deletion_items;
else
- return __atomic_sub_fetch(&dict->pending_deletion_items, 1, __ATOMIC_SEQ_CST);
+ return __atomic_sub_fetch(&dict->pending_deletion_items, 1, __ATOMIC_ACQUIRE);
}
static inline long int DICTIONARY_PENDING_DELETES_GET(DICTIONARY *dict) {
@@ -555,11 +537,11 @@ static inline REFCOUNT DICTIONARY_ITEM_REFCOUNT_GET(DICTIONARY *dict, DICTIONARY
if(unlikely(dict && is_dictionary_single_threaded(dict))) // this is an exception, dict can be null
return item->refcount;
else
- return (REFCOUNT)__atomic_load_n(&item->refcount, __ATOMIC_SEQ_CST);
+ return (REFCOUNT)__atomic_load_n(&item->refcount, __ATOMIC_ACQUIRE);
}
static inline REFCOUNT DICTIONARY_ITEM_REFCOUNT_GET_SOLE(DICTIONARY_ITEM *item) {
- return (REFCOUNT)__atomic_load_n(&item->refcount, __ATOMIC_SEQ_CST);
+ return (REFCOUNT)__atomic_load_n(&item->refcount, __ATOMIC_ACQUIRE);
}
// ----------------------------------------------------------------------------
@@ -579,8 +561,8 @@ static void dictionary_execute_insert_callback(DICTIONARY *dict, DICTIONARY_ITEM
dict->creation_line,
dict->creation_file);
- DICTIONARY_STATS_CALLBACK_INSERTS_PLUS1(dict);
dict->hooks->ins_callback(item, item->shared->value, constructor_data?constructor_data:dict->hooks->ins_callback_data);
+ DICTIONARY_STATS_CALLBACK_INSERTS_PLUS1(dict);
}
static bool dictionary_execute_conflict_callback(DICTIONARY *dict, DICTIONARY_ITEM *item, void *new_value, void *constructor_data) {
@@ -597,10 +579,13 @@ static bool dictionary_execute_conflict_callback(DICTIONARY *dict, DICTIONARY_IT
dict->creation_line,
dict->creation_file);
- DICTIONARY_STATS_CALLBACK_CONFLICTS_PLUS1(dict);
- return dict->hooks->conflict_callback(
+ bool ret = dict->hooks->conflict_callback(
item, item->shared->value, new_value,
constructor_data ? constructor_data : dict->hooks->conflict_callback_data);
+
+ DICTIONARY_STATS_CALLBACK_CONFLICTS_PLUS1(dict);
+
+ return ret;
}
static void dictionary_execute_react_callback(DICTIONARY *dict, DICTIONARY_ITEM *item, void *constructor_data) {
@@ -617,9 +602,10 @@ static void dictionary_execute_react_callback(DICTIONARY *dict, DICTIONARY_ITEM
dict->creation_line,
dict->creation_file);
- DICTIONARY_STATS_CALLBACK_REACTS_PLUS1(dict);
dict->hooks->react_callback(item, item->shared->value,
constructor_data?constructor_data:dict->hooks->react_callback_data);
+
+ DICTIONARY_STATS_CALLBACK_REACTS_PLUS1(dict);
}
static void dictionary_execute_delete_callback(DICTIONARY *dict, DICTIONARY_ITEM *item) {
@@ -637,8 +623,9 @@ static void dictionary_execute_delete_callback(DICTIONARY *dict, DICTIONARY_ITEM
dict->creation_line,
dict->creation_file);
- DICTIONARY_STATS_CALLBACK_DELETES_PLUS1(dict);
dict->hooks->del_callback(item, item->shared->value, dict->hooks->del_callback_data);
+
+ DICTIONARY_STATS_CALLBACK_DELETES_PLUS1(dict);
}
// ----------------------------------------------------------------------------
@@ -648,8 +635,8 @@ static inline size_t dictionary_locks_init(DICTIONARY *dict) {
if(likely(!is_dictionary_single_threaded(dict))) {
netdata_rwlock_init(&dict->index.rwlock);
netdata_rwlock_init(&dict->items.rwlock);
- return 0;
}
+
return 0;
}
@@ -657,29 +644,29 @@ static inline size_t dictionary_locks_destroy(DICTIONARY *dict) {
if(likely(!is_dictionary_single_threaded(dict))) {
netdata_rwlock_destroy(&dict->index.rwlock);
netdata_rwlock_destroy(&dict->items.rwlock);
- return 0;
}
+
return 0;
}
static inline void ll_recursive_lock_set_thread_as_writer(DICTIONARY *dict) {
pid_t expected = 0, desired = gettid();
- if(!__atomic_compare_exchange_n(&dict->items.writer_pid, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
- fatal("DICTIONARY: Cannot set thread %d as exclusive writer, expected %d, desired %d, found %d.", gettid(), expected, desired, __atomic_load_n(&dict->items.writer_pid, __ATOMIC_SEQ_CST));
+ if(!__atomic_compare_exchange_n(&dict->items.writer_pid, &expected, desired, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED))
+ fatal("DICTIONARY: Cannot set thread %d as exclusive writer, expected %d, desired %d, found %d.", gettid(), expected, desired, __atomic_load_n(&dict->items.writer_pid, __ATOMIC_RELAXED));
}
static inline void ll_recursive_unlock_unset_thread_writer(DICTIONARY *dict) {
pid_t expected = gettid(), desired = 0;
- if(!__atomic_compare_exchange_n(&dict->items.writer_pid, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
- fatal("DICTIONARY: Cannot unset thread %d as exclusive writer, expected %d, desired %d, found %d.", gettid(), expected, desired, __atomic_load_n(&dict->items.writer_pid, __ATOMIC_SEQ_CST));
+ if(!__atomic_compare_exchange_n(&dict->items.writer_pid, &expected, desired, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED))
+ fatal("DICTIONARY: Cannot unset thread %d as exclusive writer, expected %d, desired %d, found %d.", gettid(), expected, desired, __atomic_load_n(&dict->items.writer_pid, __ATOMIC_RELAXED));
}
static inline bool ll_recursive_lock_is_thread_the_writer(DICTIONARY *dict) {
pid_t tid = gettid();
- return tid > 0 && tid == __atomic_load_n(&dict->items.writer_pid, __ATOMIC_SEQ_CST);
+ return tid > 0 && tid == __atomic_load_n(&dict->items.writer_pid, __ATOMIC_RELAXED);
}
-static void ll_recursive_lock(DICTIONARY *dict, char rw) {
+static inline void ll_recursive_lock(DICTIONARY *dict, char rw) {
if(unlikely(is_dictionary_single_threaded(dict)))
return;
@@ -699,7 +686,7 @@ static void ll_recursive_lock(DICTIONARY *dict, char rw) {
}
}
-static void ll_recursive_unlock(DICTIONARY *dict, char rw) {
+static inline void ll_recursive_unlock(DICTIONARY *dict, char rw) {
if(unlikely(is_dictionary_single_threaded(dict)))
return;
@@ -722,10 +709,10 @@ static void ll_recursive_unlock(DICTIONARY *dict, char rw) {
}
}
-void dictionary_write_lock(DICTIONARY *dict) {
+inline void dictionary_write_lock(DICTIONARY *dict) {
ll_recursive_lock(dict, DICTIONARY_LOCK_WRITE);
}
-void dictionary_write_unlock(DICTIONARY *dict) {
+inline void dictionary_write_unlock(DICTIONARY *dict) {
ll_recursive_unlock(dict, DICTIONARY_LOCK_WRITE);
}
@@ -760,8 +747,8 @@ static inline void dictionary_index_wrlock_unlock(DICTIONARY *dict) {
// items garbage collector
static void garbage_collect_pending_deletes(DICTIONARY *dict) {
- usec_t last_master_deletion_us = dict->hooks?__atomic_load_n(&dict->hooks->last_master_deletion_us, __ATOMIC_SEQ_CST):0;
- usec_t last_gc_run_us = __atomic_load_n(&dict->last_gc_run_us, __ATOMIC_SEQ_CST);
+ usec_t last_master_deletion_us = dict->hooks?__atomic_load_n(&dict->hooks->last_master_deletion_us, __ATOMIC_RELAXED):0;
+ usec_t last_gc_run_us = __atomic_load_n(&dict->last_gc_run_us, __ATOMIC_RELAXED);
bool is_view = is_view_dictionary(dict);
@@ -773,7 +760,7 @@ static void garbage_collect_pending_deletes(DICTIONARY *dict) {
ll_recursive_lock(dict, DICTIONARY_LOCK_WRITE);
- __atomic_store_n(&dict->last_gc_run_us, now_realtime_usec(), __ATOMIC_SEQ_CST);
+ __atomic_store_n(&dict->last_gc_run_us, now_realtime_usec(), __ATOMIC_RELAXED);
if(is_view)
dictionary_index_lock_wrlock(dict);
@@ -819,25 +806,22 @@ static void garbage_collect_pending_deletes(DICTIONARY *dict) {
(void)deleted;
(void)examined;
- internal_error(false, "DICTIONARY: garbage collected dictionary created by %s (%zu@%s), examined %zu items, deleted %zu items, still pending %zu items",
- dict->creation_function, dict->creation_line, dict->creation_file, examined, deleted, pending);
-
+ internal_error(false, "DICTIONARY: garbage collected dictionary created by %s (%zu@%s), "
+ "examined %zu items, deleted %zu items, still pending %zu items",
+ dict->creation_function, dict->creation_line, dict->creation_file,
+ examined, deleted, pending);
}
// ----------------------------------------------------------------------------
// reference counters
-static inline size_t reference_counter_init(DICTIONARY *dict) {
- (void)dict;
-
+static inline size_t reference_counter_init(DICTIONARY *dict __maybe_unused) {
// allocate memory required for reference counters
// return number of bytes
return 0;
}
-static inline size_t reference_counter_free(DICTIONARY *dict) {
- (void)dict;
-
+static inline size_t reference_counter_free(DICTIONARY *dict __maybe_unused) {
// free memory required for reference counters
// return number of bytes
return 0;
@@ -846,13 +830,13 @@ static inline size_t reference_counter_free(DICTIONARY *dict) {
static void item_acquire(DICTIONARY *dict, DICTIONARY_ITEM *item) {
REFCOUNT refcount;
- if(unlikely(is_dictionary_single_threaded(dict))) {
+ if(unlikely(is_dictionary_single_threaded(dict)))
refcount = ++item->refcount;
- }
- else {
+
+ else
// increment the refcount
refcount = __atomic_add_fetch(&item->refcount, 1, __ATOMIC_SEQ_CST);
- }
+
if(refcount <= 0) {
internal_error(
@@ -900,7 +884,7 @@ static void item_release(DICTIONARY *dict, DICTIONARY_ITEM *item) {
is_deleted = item_flag_check(item, ITEM_FLAG_DELETED);
// decrement the refcount
- refcount = __atomic_sub_fetch(&item->refcount, 1, __ATOMIC_SEQ_CST);
+ refcount = __atomic_sub_fetch(&item->refcount, 1, __ATOMIC_RELEASE);
}
if(refcount < 0) {
@@ -956,14 +940,14 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it
desired = refcount + 1;
- } while(!__atomic_compare_exchange_n(&item->refcount, &refcount, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
+ } while(!__atomic_compare_exchange_n(&item->refcount, &refcount, desired, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED));
// if ret == ITEM_OK, we acquired the item
if(ret == RC_ITEM_OK) {
- if (is_view_dictionary(dict) &&
+ if (unlikely(is_view_dictionary(dict) &&
item_shared_flag_check(item, ITEM_FLAG_DELETED) &&
- !item_flag_check(item, ITEM_FLAG_DELETED)) {
+ !item_flag_check(item, ITEM_FLAG_DELETED))) {
// but, we can't use this item
if (having_index_lock) {
@@ -979,7 +963,7 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it
dict_item_set_deleted(dict, item);
// decrement the refcount we incremented above
- if (__atomic_sub_fetch(&item->refcount, 1, __ATOMIC_SEQ_CST) == 0) {
+ if (__atomic_sub_fetch(&item->refcount, 1, __ATOMIC_RELEASE) == 0) {
// this is a deleted item, and we are the last one
DICTIONARY_PENDING_DELETES_PLUS1(dict);
}
@@ -988,7 +972,7 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it
} else {
// this is traversal / walkthrough