diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-02-07 22:26:16 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-02-07 22:26:16 +0200 |
commit | 8d3c3356ddeb6d62fa76b197e086e3e7fc5eb3dd (patch) | |
tree | e7661d49d0a0044cf1a5f1d3e0e6cc7dbc27f7a6 /libnetdata | |
parent | 12d92fe308f4107f67149ec9105b69ce2610a4f2 (diff) |
Streaming interpolated values (#14431)
* first commit - untested
* fix wrong begin command
* added set v2 too
* debug to log stream buffer
* debug to log stream buffer
* faster streaming printing
* mark charts and dimensions as collected
* use stream points even if sender is not enabled
* comment out stream debug log
* parse null as nan
* custom begin v2
* custom set v2; replication now copies the anomalous flag too
* custom end v2
* enabled stream log test
* renamed to BEGIN2, SET2, END2
* dont mix up replay and v2 members in user object
* fix typo
* cleanup
* support to v2 to v1 proxying
* mark updated dimensions as such
* do not log unknown flags
* comment out stream debug log
* send also the chart id on BEGIN2, v2 to v2
* update the data collections counter
* v2 values are transferred in hex
* faster hex parsing
* a little more generic hex and dec printing and parsing
* fix hex parsing
* minor optimization in dbengine api
* turn debugging into info message
* generalized the timings tracking, so that it can be used in more places
* commented out debug info
* renamed conflicting variable with macro
* remove wrong edits
* integrated ML and added cleanup in case parsing is interrupted
* disable data collection locking during v2
* cleanup stale ML locks; send updated chart variables during v2; add info to find stale locks
* inject an END2 between repeated BEGIN2 from rrdset_done()
* test: remove lockless single-threaded logic from dictionary and aral and apply the right acquire/release memory order to reference counters
* more fine grained dictionary atomics
* remove unecessary return values
* pointer validation under NETDATA_DICTIONARY_VALIDATE_POINTERS
* Revert "pointer validation under NETDATA_DICTIONARY_VALIDATE_POINTERS"
This reverts commit 846cdf2713e2a7ee2ff797f38db11714228800e9.
* Revert "remove unecessary return values"
This reverts commit 8c87d30f4d86f0f5d6b4562cf74fe7447138bbff.
* Revert "more fine grained dictionary atomics"
This reverts commit 984aec4234a340d197d45239ff9a10fd479fcf3c.
* Revert "test: remove lockless single-threaded logic from dictionary and aral and apply the right acquire/release memory order to reference counters"
This reverts commit c460b3d0ad497d2641bd0ea1d63cec7c052e74e4.
* Apply again "pointer validation under NETDATA_DICTIONARY_VALIDATE_POINTERS" while keeping the improved atomic operations.
This reverts commit f158d009
* fix last commit
* fix last commit again
* optimizations in dbengine
* do not send anomaly bit on non-supporting agents (send it when the INTERPOLATED capability is available)
* break long empty-points-loops in rrdset_done()
* decide page alignment on new page allocation, not on every point collected
* create max size pages but no smaller than 1/3
* Fix compilation when --disable-ml is specified
* Return false
* fixes for NETDATA_LOG_REPLICATION_REQUESTS
* added compile option NETDATA_WITHOUT_WORKERS_LATENCY
* put timings in BEGIN2, SET2, END2
* isolate begin2 ml
* revert repositioning data collection lock
* fixed multi-threading of statistics
* do not lookup dimensions all the time if they come in the same order
* update used on iteration, not on every points; also do better error handling
---------
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'libnetdata')
-rw-r--r-- | libnetdata/buffer/buffer.c | 88 | ||||
-rw-r--r-- | libnetdata/buffer/buffer.h | 3 | ||||
-rw-r--r-- | libnetdata/dictionary/dictionary.c | 243 | ||||
-rw-r--r-- | libnetdata/inlined.h | 31 | ||||
-rw-r--r-- | libnetdata/libnetdata.c | 113 | ||||
-rw-r--r-- | libnetdata/libnetdata.h | 54 | ||||
-rw-r--r-- | libnetdata/storage_number/storage_number.h | 4 | ||||
-rw-r--r-- | libnetdata/worker_utilization/worker_utilization.c | 16 |
8 files changed, 401 insertions, 151 deletions
diff --git a/libnetdata/buffer/buffer.c b/libnetdata/buffer/buffer.c index eeb283209f..6cd6ae38a5 100644 --- a/libnetdata/buffer/buffer.c +++ b/libnetdata/buffer/buffer.c @@ -108,21 +108,44 @@ void buffer_print_llu(BUFFER *wb, unsigned long long uvalue) { buffer_need_bytes(wb, 50); + switch(uvalue) { + case 0: + buffer_fast_strcat(wb, "0", 1); + return; + + case 1: + buffer_fast_strcat(wb, "1", 1); + return; + + case 5: + buffer_fast_strcat(wb, "5", 1); + return; + + case 10: + buffer_fast_strcat(wb, "10", 2); + return; + + default: + break; + } + char *str = &wb->buffer[wb->len]; char *wstr = str; switch (sizeof(void *)) { - case 4: - wstr = (uvalue > (unsigned long long) 0xffffffff) ? print_number_llu_r(wstr, uvalue) : - print_number_lu_r(wstr, uvalue); - break; - case 8: - do { - *wstr++ = (char) ('0' + (uvalue % 10)); - } while (uvalue /= 10); - break; - default: - fatal("Netdata supports only 32-bit & 64-bit systems."); + case 8: + do { + *wstr++ = (char) ('0' + (uvalue % 10)); + } while (uvalue /= 10); + break; + + case 4: + wstr = (uvalue > (unsigned long long) 0xffffffff) ? print_number_llu_r(wstr, uvalue) : + print_number_lu_r(wstr, uvalue); + break; + + default: + fatal("Netdata supports only 32-bit & 64-bit systems."); } // terminate it @@ -132,8 +155,9 @@ void buffer_print_llu(BUFFER *wb, unsigned long long uvalue) char *begin = str, *end = wstr - 1, aux; while (end > begin) aux = *end, *end-- = *begin, *begin++ = aux; + size_t len = wstr - str; // return the buffer length - wb->len += wstr - str; + wb->len += len; } void buffer_print_ll(BUFFER *wb, long long value) @@ -167,7 +191,7 @@ static unsigned char bits03_to_hex[16] = { [15] = 'F' }; -void buffer_print_llu_hex(BUFFER *wb, unsigned long long value) +inline void buffer_print_llu_hex(BUFFER *wb, unsigned long long value) { unsigned char buffer[sizeof(unsigned long long) * 2 + 2 + 1]; // 8 bytes * 2 + '0x' + '\0' unsigned char *e = &buffer[sizeof(unsigned long long) * 2 + 2]; @@ -196,7 +220,16 @@ void buffer_print_llu_hex(BUFFER *wb, unsigned long long value) buffer_fast_strcat(wb, (char *)p, e - p); } -void buffer_fast_strcat(BUFFER *wb, const char *txt, size_t len) { +void buffer_print_ll_hex(BUFFER *wb, long long value) { + if(value < 0) { + buffer_fast_strcat(wb, "-", 1); + value = -value; + } + + buffer_print_llu_hex(wb, value); +} + +inline void buffer_fast_strcat(BUFFER *wb, const char *txt, size_t len) { if(unlikely(!txt || !*txt)) return; buffer_need_bytes(wb, len + 1); @@ -214,6 +247,27 @@ void buffer_fast_strcat(BUFFER *wb, const char *txt, size_t len) { wb->buffer[wb->len] = '\0'; } +void buffer_print_sn_flags(BUFFER *wb, SN_FLAGS flags, bool send_anomaly_bit) { + if(unlikely(flags == SN_EMPTY_SLOT)) { + buffer_fast_strcat(wb, "E", 1); + return; + } + + size_t printed = 0; + if(likely(send_anomaly_bit && (flags & SN_FLAG_NOT_ANOMALOUS))) { + buffer_fast_strcat(wb, "A", 1); + printed++; + } + + if(unlikely(flags & SN_FLAG_RESET)) { + buffer_fast_strcat(wb, "R", 1); + printed++; + } + + if(!printed) + buffer_fast_strcat(wb, "''", 2); +} + void buffer_strcat(BUFFER *wb, const char *txt) { // buffer_sprintf(wb, "%s", txt); @@ -234,7 +288,7 @@ void buffer_strcat(BUFFER *wb, const char *txt) wb->len = len; buffer_overflow_check(wb); - if(*txt) { + if(unlikely(*txt)) { debug(D_WEB_BUFFER, "strcat(): increasing web_buffer at position %zu, size = %zu\n", wb->len, wb->size); len = strlen(txt); buffer_fast_strcat(wb, txt, len); @@ -242,7 +296,7 @@ void buffer_strcat(BUFFER *wb, const char *txt) else { // terminate the string // without increasing the length - buffer_need_bytes(wb, (size_t)1); + buffer_need_bytes(wb, 1); wb->buffer[wb->len] = '\0'; } } @@ -361,7 +415,7 @@ void buffer_sprintf(BUFFER *wb, const char *fmt, ...) void buffer_rrd_value(BUFFER *wb, NETDATA_DOUBLE value) { - buffer_need_bytes(wb, 50); + buffer_need_bytes(wb, 512); if(isnan(value) || isinf(value)) { buffer_strcat(wb, "null"); diff --git a/libnetdata/buffer/buffer.h b/libnetdata/buffer/buffer.h index 0fa3495b4a..0b12c81752 100644 --- a/libnetdata/buffer/buffer.h +++ b/libnetdata/buffer/buffer.h @@ -74,6 +74,8 @@ void buffer_strcat_htmlescape(BUFFER *wb, const char *txt); void buffer_char_replace(BUFFER *wb, char from, char to); +void buffer_print_sn_flags(BUFFER *wb, SN_FLAGS flags, bool send_anomaly_bit); + char *print_number_lu_r(char *str, unsigned long uvalue); char *print_number_llu_r(char *str, unsigned long long uvalue); char *print_number_llu_r_smart(char *str, unsigned long long uvalue); @@ -81,6 +83,7 @@ char *print_number_llu_r_smart(char *str, unsigned long long uvalue); void buffer_print_llu(BUFFER *wb, unsigned long long uvalue); void buffer_print_ll(BUFFER *wb, long long value); void buffer_print_llu_hex(BUFFER *wb, unsigned long long value); +void buffer_print_ll_hex(BUFFER *wb, long long value); static inline void buffer_need_bytes(BUFFER *buffer, size_t needed_free_size) { if(unlikely(buffer->size - buffer->len < needed_free_size)) diff --git a/libnetdata/dictionary/dictionary.c b/libnetdata/dictionary/dictionary.c index 061b671abf..cb8aef9e08 100644 --- a/libnetdata/dictionary/dictionary.c +++ b/libnetdata/dictionary/dictionary.c @@ -10,9 +10,9 @@ typedef enum __attribute__ ((__packed__)) { DICT_FLAG_DESTROYED = (1 << 0), // this dictionary has been destroyed } DICT_FLAGS; -#define dict_flag_check(dict, flag) (__atomic_load_n(&((dict)->flags), __ATOMIC_SEQ_CST) & (flag)) -#define dict_flag_set(dict, flag) __atomic_or_fetch(&((dict)->flags), flag, __ATOMIC_SEQ_CST) -#define dict_flag_clear(dict, flag) __atomic_and_fetch(&((dict)->flags), ~(flag), __ATOMIC_SEQ_CST) +#define dict_flag_check(dict, flag) (__atomic_load_n(&((dict)->flags), __ATOMIC_RELAXED) & (flag)) +#define dict_flag_set(dict, flag) __atomic_or_fetch(&((dict)->flags), flag, __ATOMIC_RELAXED) +#define dict_flag_clear(dict, flag) __atomic_and_fetch(&((dict)->flags), ~(flag), __ATOMIC_RELAXED) // flags macros #define is_dictionary_destroyed(dict) dict_flag_check(dict, DICT_FLAG_DESTROYED) @@ -37,13 +37,13 @@ typedef enum __attribute__ ((__packed__)) item_flags { // IMPORTANT: This is 8-bit } ITEM_FLAGS; -#define item_flag_check(item, flag) (__atomic_load_n(&((item)->flags), __ATOMIC_SEQ_CST) & (flag)) -#define item_flag_set(item, flag) __atomic_or_fetch(&((item)->flags), flag, __ATOMIC_SEQ_CST) -#define item_flag_clear(item, flag) __atomic_and_fetch(&((item)->flags), ~(flag), __ATOMIC_SEQ_CST) +#define item_flag_check(item, flag) (__atomic_load_n(&((item)->flags), __ATOMIC_RELAXED) & (flag)) +#define item_flag_set(item, flag) __atomic_or_fetch(&((item)->flags), flag, __ATOMIC_RELAXED) +#define item_flag_clear(item, flag) __atomic_and_fetch(&((item)->flags), ~(flag), __ATOMIC_RELAXED) -#define item_shared_flag_check(item, flag) (__atomic_load_n(&((item)->shared->flags), __ATOMIC_SEQ_CST) & (flag)) -#define item_shared_flag_set(item, flag) __atomic_or_fetch(&((item)->shared->flags), flag, __ATOMIC_SEQ_CST) -#define item_shared_flag_clear(item, flag) __atomic_and_fetch(&((item)->shared->flags), ~(flag), __ATOMIC_SEQ_CST) +#define item_shared_flag_check(item, flag) (__atomic_load_n(&((item)->shared->flags), __ATOMIC_RELAXED) & (flag)) +#define item_shared_flag_set(item, flag) __atomic_or_fetch(&((item)->shared->flags), flag, __ATOMIC_RELAXED) +#define item_shared_flag_clear(item, flag) __atomic_and_fetch(&((item)->shared->flags), ~(flag), __ATOMIC_RELAXED) #define REFCOUNT_DELETING (-100) @@ -175,7 +175,7 @@ struct dictionary { long int referenced_items; // how many items of the dictionary are currently being used by 3rd parties long int pending_deletion_items; // how many items of the dictionary have been deleted, but have not been removed yet -#ifdef NETDATA_INTERNAL_CHECKS +#ifdef NETDATA_DICTIONARY_VALIDATE_POINTERS netdata_mutex_t global_pointer_registry_mutex; Pvoid_t global_pointer_registry; #endif @@ -205,59 +205,47 @@ static inline int item_is_not_referenced_and_can_be_removed_advanced(DICTIONARY // ---------------------------------------------------------------------------- // validate each pointer is indexed once - internal checks only +#ifdef NETDATA_DICTIONARY_VALIDATE_POINTERS static inline void pointer_index_init(DICTIONARY *dict __maybe_unused) { -#ifdef NETDATA_INTERNAL_CHECKS netdata_mutex_init(&dict->global_pointer_registry_mutex); -#else - ; -#endif } static inline void pointer_destroy_index(DICTIONARY *dict __maybe_unused) { -#ifdef NETDATA_INTERNAL_CHECKS netdata_mutex_lock(&dict->global_pointer_registry_mutex); JudyHSFreeArray(&dict->global_pointer_registry, PJE0); netdata_mutex_unlock(&dict->global_pointer_registry_mutex); -#else - ; -#endif } static inline void pointer_add(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item __maybe_unused) { -#ifdef NETDATA_INTERNAL_CHECKS netdata_mutex_lock(&dict->global_pointer_registry_mutex); Pvoid_t *PValue = JudyHSIns(&dict->global_pointer_registry, &item, sizeof(void *), PJE0); if(*PValue != NULL) fatal("pointer already exists in registry"); *PValue = item; netdata_mutex_unlock(&dict->global_pointer_registry_mutex); -#else - ; -#endif } static inline void pointer_check(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item __maybe_unused) { -#ifdef NETDATA_INTERNAL_CHECKS netdata_mutex_lock(&dict->global_pointer_registry_mutex); Pvoid_t *PValue = JudyHSGet(dict->global_pointer_registry, &item, sizeof(void *)); if(PValue == NULL) fatal("pointer is not found in registry"); netdata_mutex_unlock(&dict->global_pointer_registry_mutex); -#else - ; -#endif } static inline void pointer_del(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item __maybe_unused) { -#ifdef NETDATA_INTERNAL_CHECKS netdata_mutex_lock(&dict->global_pointer_registry_mutex); int ret = JudyHSDel(&dict->global_pointer_registry, &item, sizeof(void *), PJE0); if(!ret) fatal("pointer to be deleted does not exist in registry"); netdata_mutex_unlock(&dict->global_pointer_registry_mutex); -#else - ; -#endif } +#else // !NETDATA_DICTIONARY_VALIDATE_POINTERS +#define pointer_index_init(dict) debug_dummy() +#define pointer_destroy_index(dict) debug_dummy() +#define pointer_add(dict, item) debug_dummy() +#define pointer_check(dict, item) debug_dummy() +#define pointer_del(dict, item) debug_dummy() +#endif // !NETDATA_DICTIONARY_VALIDATE_POINTERS // ---------------------------------------------------------------------------- // memory statistics @@ -298,7 +286,7 @@ static inline void dictionary_hooks_allocate(DICTIONARY *dict) { static inline size_t dictionary_hooks_free(DICTIONARY *dict) { if(!dict->hooks) return 0; - REFCOUNT links = __atomic_sub_fetch(&dict->hooks->links, 1, __ATOMIC_SEQ_CST); + REFCOUNT links = __atomic_sub_fetch(&dict->hooks->links, 1, __ATOMIC_ACQUIRE); if(links == 0) { freez(dict->hooks); dict->hooks = NULL; @@ -358,7 +346,7 @@ size_t dictionary_version(DICTIONARY *dict) { // this is required for views to return the right number garbage_collect_pending_deletes(dict); - return __atomic_load_n(&dict->version, __ATOMIC_SEQ_CST); + return __atomic_load_n(&dict->version, __ATOMIC_RELAXED); } size_t dictionary_entries(DICTIONARY *dict) { if(unlikely(!dict)) return 0; @@ -366,7 +354,7 @@ size_t dictionary_entries(DICTIONARY *dict) { // this is required for views to return the right number garbage_collect_pending_deletes(dict); - long int entries = __atomic_load_n(&dict->entries, __ATOMIC_SEQ_CST); + long int entries = __atomic_load_n(&dict->entries, __ATOMIC_RELAXED); if(entries < 0) fatal("DICTIONARY: entries is negative: %ld", entries); @@ -375,7 +363,7 @@ size_t dictionary_entries(DICTIONARY *dict) { size_t dictionary_referenced_items(DICTIONARY *dict) { if(unlikely(!dict)) return 0; - long int referenced_items = __atomic_load_n(&dict->referenced_items, __ATOMIC_SEQ_CST); + long int referenced_items = __atomic_load_n(&dict->referenced_items, __ATOMIC_RELAXED); if(referenced_items < 0) fatal("DICTIONARY: referenced items is negative: %ld", referenced_items); @@ -387,7 +375,7 @@ long int dictionary_stats_for_registry(DICTIONARY *dict) { return (dict->stats->memory.index + dict->stats->memory.dict); } void dictionary_version_increment(DICTIONARY *dict) { - __atomic_fetch_add(&dict->version, 1, __ATOMIC_SEQ_CST); + __atomic_fetch_add(&dict->version, 1, __ATOMIC_RELAXED); } // ---------------------------------------------------------------------------- @@ -409,9 +397,9 @@ static inline void DICTIONARY_ENTRIES_PLUS1(DICTIONARY *dict) { } else { - __atomic_fetch_add(&dict->version, 1, __ATOMIC_SEQ_CST); - __atomic_fetch_add(&dict->entries, 1, __ATOMIC_SEQ_CST); - __atomic_fetch_add(&dict->referenced_items, 1, __ATOMIC_SEQ_CST); + __atomic_fetch_add(&dict->version, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&dict->entries, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&dict->referenced_items, 1, __ATOMIC_RELAXED); } } static inline void DICTIONARY_ENTRIES_MINUS1(DICTIONARY *dict) { @@ -425,17 +413,15 @@ static inline void DICTIONARY_ENTRIES_MINUS1(DICTIONARY *dict) { entries = dict->entries++; } else { - __atomic_fetch_add(&dict->version, 1, __ATOMIC_SEQ_CST); - entries = __atomic_fetch_sub(&dict->entries, 1, __ATOMIC_SEQ_CST); + __atomic_fetch_add(&dict->version, 1, __ATOMIC_RELAXED); + entries = __atomic_fetch_sub(&dict->entries, 1, __ATOMIC_RELAXED); } -#ifdef NETDATA_INTERNAL_CHECKS - if(unlikely(entries == 0)) - fatal("DICT: negative number of entries in dictionary created from %s() (%zu@%s)", - dict->creation_function, - dict->creation_line, - dict->creation_file); -#endif + internal_fatal(entries == 0, + "DICT: negative number of entries in dictionary created from %s() (%zu@%s)", + dict->creation_function, + dict->creation_line, + dict->creation_file); } static inline void DICTIONARY_VALUE_RESETS_PLUS1(DICTIONARY *dict) { __atomic_fetch_add(&dict->stats->ops.resets, 1, __ATOMIC_RELAXED); @@ -443,7 +429,7 @@ static inline void DICTIONARY_VALUE_RESETS_PLUS1(DICTIONARY *dict) { if(unlikely(is_dictionary_single_threaded(dict))) dict->version++; else - __atomic_fetch_add(&dict->version, 1, __ATOMIC_SEQ_CST); + __atomic_fetch_add(&dict->version, 1, __ATOMIC_RELAXED); } static inline void DICTIONARY_STATS_TRAVERSALS_PLUS1(DICTIONARY *dict) { __atomic_fetch_add(&dict->stats->ops.traversals, 1, __ATOMIC_RELAXED); @@ -464,16 +450,16 @@ static inline void DICTIONARY_STATS_SEARCH_IGNORES_PLUS1(DICTIONARY *dict) { __atomic_fetch_add(&dict->stats->spin_locks.search_spins, 1, __ATOMIC_RELAXED); } static inline void DICTIONARY_STATS_CALLBACK_INSERTS_PLUS1(DICTIONARY *dict) { - __atomic_fetch_add(&dict->stats->callbacks.inserts, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&dict->stats->callbacks.inserts, 1, __ATOMIC_RELEASE); } static inline void DICTIONARY_STATS_CALLBACK_CONFLICTS_PLUS1(DICTIONARY *dict) { - __atomic_fetch_add(&dict->stats->callbacks.conflicts, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&dict->stats->callbacks.conflicts, 1, __ATOMIC_RELEASE); } static inline void DICTIONARY_STATS_CALLBACK_REACTS_PLUS1(DICTIONARY *dict) { - __atomic_fetch_add(&dict->stats->callbacks.reacts, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&dict->stats->callbacks.reacts, 1, __ATOMIC_RELEASE); } static inline void DICTIONARY_STATS_CALLBACK_DELETES_PLUS1(DICTIONARY *dict) { - __atomic_fetch_add(&dict->stats->callbacks.deletes, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&dict->stats->callbacks.deletes, 1, __ATOMIC_RELEASE); } static inline void DICTIONARY_STATS_GARBAGE_COLLECTIONS_PLUS1(DICTIONARY *dict) { __atomic_fetch_add(&dict->stats->ops.garbage_collections, 1, __ATOMIC_RELAXED); @@ -496,52 +482,48 @@ static inline void DICTIONARY_STATS_DICT_FLUSHES_PLUS1(DICTIONARY *dict) { __atomic_fetch_add(&dict->stats->ops.flushes, 1, __ATOMIC_RELAXED); } -static inline long int DICTIONARY_REFERENCED_ITEMS_PLUS1(DICTIONARY *dict) { +static inline void DICTIONARY_REFERENCED_ITEMS_PLUS1(DICTIONARY *dict) { __atomic_fetch_add(&dict->stats->items.referenced, 1, __ATOMIC_RELAXED); if(unlikely(is_dictionary_single_threaded(dict))) - return ++dict->referenced_items; + ++dict->referenced_items; else - return __atomic_add_fetch(&dict->referenced_items, 1, __ATOMIC_SEQ_CST); + __atomic_add_fetch(&dict->referenced_items, 1, __ATOMIC_RELAXED); } -static inline long int DICTIONARY_REFERENCED_ITEMS_MINUS1(DICTIONARY *dict) { +static inline void DICTIONARY_REFERENCED_ITEMS_MINUS1(DICTIONARY *dict) { __atomic_fetch_sub(&dict->stats->items.referenced, 1, __ATOMIC_RELAXED); - long int referenced_items; + long int referenced_items; (void)referenced_items; if(unlikely(is_dictionary_single_threaded(dict))) referenced_items = --dict->referenced_items; else referenced_items = __atomic_sub_fetch(&dict->referenced_items, 1, __ATOMIC_SEQ_CST); -#ifdef NETDATA_INTERNAL_CHECKS - if(unlikely(referenced_items < 0)) - fatal("DICT: negative number of referenced items (%ld) in dictionary created from %s() (%zu@%s)", - referenced_items, - dict->creation_function, - dict->creation_line, - dict->creation_file); -#endif - - return referenced_items; + internal_fatal(referenced_items < 0, + "DICT: negative number of referenced items (%ld) in dictionary created from %s() (%zu@%s)", + referenced_items, + dict->creation_function, + dict->creation_line, + dict->creation_file); } -static inline long int DICTIONARY_PENDING_DELETES_PLUS1(DICTIONARY *dict) { +static inline void DICTIONARY_PENDING_DELETES_PLUS1(DICTIONARY *dict) { __atomic_fetch_add(&dict->stats->items.pending_deletion, 1, __ATOMIC_RELAXED); if(unlikely(is_dictionary_single_threaded(dict))) - return ++dict->pending_deletion_items; + ++dict->pending_deletion_items; else - return __atomic_add_fetch(&dict->pending_deletion_items, 1, __ATOMIC_SEQ_CST); + __atomic_add_fetch(&dict->pending_deletion_items, 1, __ATOMIC_RELEASE); } static inline long int DICTIONARY_PENDING_DELETES_MINUS1(DICTIONARY *dict) { - __atomic_fetch_sub(&dict->stats->items.pending_deletion, 1, __ATOMIC_RELAXED); + __atomic_fetch_sub(&dict->stats->items.pending_deletion, 1, __ATOMIC_RELEASE); if(unlikely(is_dictionary_single_threaded(dict))) return --dict->pending_deletion_items; else - return __atomic_sub_fetch(&dict->pending_deletion_items, 1, __ATOMIC_SEQ_CST); + return __atomic_sub_fetch(&dict->pending_deletion_items, 1, __ATOMIC_ACQUIRE); } static inline long int DICTIONARY_PENDING_DELETES_GET(DICTIONARY *dict) { @@ -555,11 +537,11 @@ static inline REFCOUNT DICTIONARY_ITEM_REFCOUNT_GET(DICTIONARY *dict, DICTIONARY if(unlikely(dict && is_dictionary_single_threaded(dict))) // this is an exception, dict can be null return item->refcount; else - return (REFCOUNT)__atomic_load_n(&item->refcount, __ATOMIC_SEQ_CST); + return (REFCOUNT)__atomic_load_n(&item->refcount, __ATOMIC_ACQUIRE); } static inline REFCOUNT DICTIONARY_ITEM_REFCOUNT_GET_SOLE(DICTIONARY_ITEM *item) { - return (REFCOUNT)__atomic_load_n(&item->refcount, __ATOMIC_SEQ_CST); + return (REFCOUNT)__atomic_load_n(&item->refcount, __ATOMIC_ACQUIRE); } // ---------------------------------------------------------------------------- @@ -579,8 +561,8 @@ static void dictionary_execute_insert_callback(DICTIONARY *dict, DICTIONARY_ITEM dict->creation_line, dict->creation_file); - DICTIONARY_STATS_CALLBACK_INSERTS_PLUS1(dict); dict->hooks->ins_callback(item, item->shared->value, constructor_data?constructor_data:dict->hooks->ins_callback_data); + DICTIONARY_STATS_CALLBACK_INSERTS_PLUS1(dict); } static bool dictionary_execute_conflict_callback(DICTIONARY *dict, DICTIONARY_ITEM *item, void *new_value, void *constructor_data) { @@ -597,10 +579,13 @@ static bool dictionary_execute_conflict_callback(DICTIONARY *dict, DICTIONARY_IT dict->creation_line, dict->creation_file); - DICTIONARY_STATS_CALLBACK_CONFLICTS_PLUS1(dict); - return dict->hooks->conflict_callback( + bool ret = dict->hooks->conflict_callback( item, item->shared->value, new_value, constructor_data ? constructor_data : dict->hooks->conflict_callback_data); + + DICTIONARY_STATS_CALLBACK_CONFLICTS_PLUS1(dict); + + return ret; } static void dictionary_execute_react_callback(DICTIONARY *dict, DICTIONARY_ITEM *item, void *constructor_data) { @@ -617,9 +602,10 @@ static void dictionary_execute_react_callback(DICTIONARY *dict, DICTIONARY_ITEM dict->creation_line, dict->creation_file); - DICTIONARY_STATS_CALLBACK_REACTS_PLUS1(dict); dict->hooks->react_callback(item, item->shared->value, constructor_data?constructor_data:dict->hooks->react_callback_data); + + DICTIONARY_STATS_CALLBACK_REACTS_PLUS1(dict); } static void dictionary_execute_delete_callback(DICTIONARY *dict, DICTIONARY_ITEM *item) { @@ -637,8 +623,9 @@ static void dictionary_execute_delete_callback(DICTIONARY *dict, DICTIONARY_ITEM dict->creation_line, dict->creation_file); - DICTIONARY_STATS_CALLBACK_DELETES_PLUS1(dict); dict->hooks->del_callback(item, item->shared->value, dict->hooks->del_callback_data); + + DICTIONARY_STATS_CALLBACK_DELETES_PLUS1(dict); } // ---------------------------------------------------------------------------- @@ -648,8 +635,8 @@ static inline size_t dictionary_locks_init(DICTIONARY *dict) { if(likely(!is_dictionary_single_threaded(dict))) { netdata_rwlock_init(&dict->index.rwlock); netdata_rwlock_init(&dict->items.rwlock); - return 0; } + return 0; } @@ -657,29 +644,29 @@ static inline size_t dictionary_locks_destroy(DICTIONARY *dict) { if(likely(!is_dictionary_single_threaded(dict))) { netdata_rwlock_destroy(&dict->index.rwlock); netdata_rwlock_destroy(&dict->items.rwlock); - return 0; } + return 0; } static inline void ll_recursive_lock_set_thread_as_writer(DICTIONARY *dict) { pid_t expected = 0, desired = gettid(); - if(!__atomic_compare_exchange_n(&dict->items.writer_pid, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) - fatal("DICTIONARY: Cannot set thread %d as exclusive writer, expected %d, desired %d, found %d.", gettid(), expected, desired, __atomic_load_n(&dict->items.writer_pid, __ATOMIC_SEQ_CST)); + if(!__atomic_compare_exchange_n(&dict->items.writer_pid, &expected, desired, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) + fatal("DICTIONARY: Cannot set thread %d as exclusive writer, expected %d, desired %d, found %d.", gettid(), expected, desired, __atomic_load_n(&dict->items.writer_pid, __ATOMIC_RELAXED)); } static inline void ll_recursive_unlock_unset_thread_writer(DICTIONARY *dict) { pid_t expected = gettid(), desired = 0; - if(!__atomic_compare_exchange_n(&dict->items.writer_pid, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) - fatal("DICTIONARY: Cannot unset thread %d as exclusive writer, expected %d, desired %d, found %d.", gettid(), expected, desired, __atomic_load_n(&dict->items.writer_pid, __ATOMIC_SEQ_CST)); + if(!__atomic_compare_exchange_n(&dict->items.writer_pid, &expected, desired, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) + fatal("DICTIONARY: Cannot unset thread %d as exclusive writer, expected %d, desired %d, found %d.", gettid(), expected, desired, __atomic_load_n(&dict->items.writer_pid, __ATOMIC_RELAXED)); } static inline bool ll_recursive_lock_is_thread_the_writer(DICTIONARY *dict) { pid_t tid = gettid(); - return tid > 0 && tid == __atomic_load_n(&dict->items.writer_pid, __ATOMIC_SEQ_CST); + return tid > 0 && tid == __atomic_load_n(&dict->items.writer_pid, __ATOMIC_RELAXED); } -static void ll_recursive_lock(DICTIONARY *dict, char rw) { +static inline void ll_recursive_lock(DICTIONARY *dict, char rw) { if(unlikely(is_dictionary_single_threaded(dict))) return; @@ -699,7 +686,7 @@ static void ll_recursive_lock(DICTIONARY *dict, char rw) { } } -static void ll_recursive_unlock(DICTIONARY *dict, char rw) { +static inline void ll_recursive_unlock(DICTIONARY *dict, char rw) { if(unlikely(is_dictionary_single_threaded(dict))) return; @@ -722,10 +709,10 @@ static void ll_recursive_unlock(DICTIONARY *dict, char rw) { } } -void dictionary_write_lock(DICTIONARY *dict) { +inline void dictionary_write_lock(DICTIONARY *dict) { ll_recursive_lock(dict, DICTIONARY_LOCK_WRITE); } -void dictionary_write_unlock(DICTIONARY *dict) { +inline void dictionary_write_unlock(DICTIONARY *dict) { ll_recursive_unlock(dict, DICTIONARY_LOCK_WRITE); } @@ -760,8 +747,8 @@ static inline void dictionary_index_wrlock_unlock(DICTIONARY *dict) { // items garbage collector static void garbage_collect_pending_deletes(DICTIONARY *dict) { - usec_t last_master_deletion_us = dict->hooks?__atomic_load_n(&dict->hooks->last_master_deletion_us, __ATOMIC_SEQ_CST):0; - usec_t last_gc_run_us = __atomic_load_n(&dict->last_gc_run_us, __ATOMIC_SEQ_CST); + usec_t last_master_deletion_us = dict->hooks?__atomic_load_n(&dict->hooks->last_master_deletion_us, __ATOMIC_RELAXED):0; + usec_t last_gc_run_us = __atomic_load_n(&dict->last_gc_run_us, __ATOMIC_RELAXED); bool is_view = is_view_dictionary(dict); @@ -773,7 +760,7 @@ static void garbage_collect_pending_deletes(DICTIONARY *dict) { ll_recursive_lock(dict, DICTIONARY_LOCK_WRITE); - __atomic_store_n(&dict->last_gc_run_us, now_realtime_usec(), __ATOMIC_SEQ_CST); + __atomic_store_n(&dict->last_gc_run_us, now_realtime_usec(), __ATOMIC_RELAXED); if(is_view) dictionary_index_lock_wrlock(dict); @@ -819,25 +806,22 @@ static void garbage_collect_pending_deletes(DICTIONARY *dict) { (void)deleted; (void)examined; - internal_error(false, "DICTIONARY: garbage collected dictionary created by %s (%zu@%s), examined %zu items, deleted %zu items, still pending %zu items", - dict->creation_function, dict->creation_line, dict->creation_file, examined, deleted, pending); - + internal_error(false, "DICTIONARY: garbage collected dictionary created by %s (%zu@%s), " + "examined %zu items, deleted %zu items, still pending %zu items", + dict->creation_function, dict->creation_line, dict->creation_file, + examined, deleted, pending); } // ---------------------------------------------------------------------------- // reference counters -static inline size_t reference_counter_init(DICTIONARY *dict) { - (void)dict; - +static inline size_t reference_counter_init(DICTIONARY *dict __maybe_unused) { // allocate memory required for reference counters // return number of bytes return 0; } -static inline size_t reference_counter_free(DICTIONARY *dict) { - (void)dict; - +static inline size_t reference_counter_free(DICTIONARY *dict __maybe_unused) { // free memory required for reference counters // return number of bytes return 0; @@ -846,13 +830,13 @@ static inline size_t reference_counter_free(DICTIONARY *dict) { static void item_acquire(DICTIONARY *dict, DICTIONARY_ITEM *item) { REFCOUNT refcount; - if(unlikely(is_dictionary_single_threaded(dict))) { + if(unlikely(is_dictionary_single_threaded(dict))) refcount = ++item->refcount; - } - else { + + else // increment the refcount refcount = __atomic_add_fetch(&item->refcount, 1, __ATOMIC_SEQ_CST); - } + if(refcount <= 0) { internal_error( @@ -900,7 +884,7 @@ static void item_release(DICTIONARY *dict, DICTIONARY_ITEM *item) { is_deleted = item_flag_check(item, ITEM_FLAG_DELETED); // decrement the refcount - refcount = __atomic_sub_fetch(&item->refcount, 1, __ATOMIC_SEQ_CST); + refcount = __atomic_sub_fetch(&item->refcount, 1, __ATOMIC_RELEASE); } if(refcount < 0) { @@ -956,14 +940,14 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it desired = refcount + 1; - } while(!__atomic_compare_exchange_n(&item->refcount, &refcount, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)); + } while(!__atomic_compare_exchange_n(&item->refcount, &refcount, desired, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)); // if ret == ITEM_OK, we acquired the item if(ret == RC_ITEM_OK) { - if (is_view_dictionary(dict) && + if (unlikely(is_view_dictionary(dict) && item_shared_flag_check(item, ITEM_FLAG_DELETED) && - !item_flag_check(item, ITEM_FLAG_DELETED)) { + !item_flag_check(item, ITEM_FLAG_DELETED))) { // but, we can't use this item if (having_index_lock) { @@ -979,7 +963,7 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it dict_item_set_deleted(dict, item); // decrement the refcount we incremented above - if (__atomic_sub_fetch(&item->refcount, 1, __ATOMIC_SEQ_CST) == 0) { + if (__atomic_sub_fetch(&item->refcount, 1, __ATOMIC_RELEASE) == 0) { // this is a deleted item, and we are the last one DICTIONARY_PENDING_DELETES_PLUS1(dict); } @@ -988,7 +972,7 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it } else { // this is traversal / walkthrough |