From 8d3c3356ddeb6d62fa76b197e086e3e7fc5eb3dd Mon Sep 17 00:00:00 2001 From: Costa Tsaousis Date: Tue, 7 Feb 2023 22:26:16 +0200 Subject: Streaming interpolated values (#14431) * first commit - untested * fix wrong begin command * added set v2 too * debug to log stream buffer * debug to log stream buffer * faster streaming printing * mark charts and dimensions as collected * use stream points even if sender is not enabled * comment out stream debug log * parse null as nan * custom begin v2 * custom set v2; replication now copies the anomalous flag too * custom end v2 * enabled stream log test * renamed to BEGIN2, SET2, END2 * dont mix up replay and v2 members in user object * fix typo * cleanup * support to v2 to v1 proxying * mark updated dimensions as such * do not log unknown flags * comment out stream debug log * send also the chart id on BEGIN2, v2 to v2 * update the data collections counter * v2 values are transferred in hex * faster hex parsing * a little more generic hex and dec printing and parsing * fix hex parsing * minor optimization in dbengine api * turn debugging into info message * generalized the timings tracking, so that it can be used in more places * commented out debug info * renamed conflicting variable with macro * remove wrong edits * integrated ML and added cleanup in case parsing is interrupted * disable data collection locking during v2 * cleanup stale ML locks; send updated chart variables during v2; add info to find stale locks * inject an END2 between repeated BEGIN2 from rrdset_done() * test: remove lockless single-threaded logic from dictionary and aral and apply the right acquire/release memory order to reference counters * more fine grained dictionary atomics * remove unecessary return values * pointer validation under NETDATA_DICTIONARY_VALIDATE_POINTERS * Revert "pointer validation under NETDATA_DICTIONARY_VALIDATE_POINTERS" This reverts commit 846cdf2713e2a7ee2ff797f38db11714228800e9. * Revert "remove unecessary return values" This reverts commit 8c87d30f4d86f0f5d6b4562cf74fe7447138bbff. * Revert "more fine grained dictionary atomics" This reverts commit 984aec4234a340d197d45239ff9a10fd479fcf3c. * Revert "test: remove lockless single-threaded logic from dictionary and aral and apply the right acquire/release memory order to reference counters" This reverts commit c460b3d0ad497d2641bd0ea1d63cec7c052e74e4. * Apply again "pointer validation under NETDATA_DICTIONARY_VALIDATE_POINTERS" while keeping the improved atomic operations. This reverts commit f158d009 * fix last commit * fix last commit again * optimizations in dbengine * do not send anomaly bit on non-supporting agents (send it when the INTERPOLATED capability is available) * break long empty-points-loops in rrdset_done() * decide page alignment on new page allocation, not on every point collected * create max size pages but no smaller than 1/3 * Fix compilation when --disable-ml is specified * Return false * fixes for NETDATA_LOG_REPLICATION_REQUESTS * added compile option NETDATA_WITHOUT_WORKERS_LATENCY * put timings in BEGIN2, SET2, END2 * isolate begin2 ml * revert repositioning data collection lock * fixed multi-threading of statistics * do not lookup dimensions all the time if they come in the same order * update used on iteration, not on every points; also do better error handling --------- Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> --- libnetdata/buffer/buffer.c | 88 ++++++-- libnetdata/buffer/buffer.h | 3 + libnetdata/dictionary/dictionary.c | 243 ++++++++++----------- libnetdata/inlined.h | 31 +++ libnetdata/libnetdata.c | 113 ++++++++++ libnetdata/libnetdata.h | 54 +++++ libnetdata/storage_number/storage_number.h | 4 + libnetdata/worker_utilization/worker_utilization.c | 16 +- 8 files changed, 401 insertions(+), 151 deletions(-) (limited to 'libnetdata') diff --git a/libnetdata/buffer/buffer.c b/libnetdata/buffer/buffer.c index eeb283209f..6cd6ae38a5 100644 --- a/libnetdata/buffer/buffer.c +++ b/libnetdata/buffer/buffer.c @@ -108,21 +108,44 @@ void buffer_print_llu(BUFFER *wb, unsigned long long uvalue) { buffer_need_bytes(wb, 50); + switch(uvalue) { + case 0: + buffer_fast_strcat(wb, "0", 1); + return; + + case 1: + buffer_fast_strcat(wb, "1", 1); + return; + + case 5: + buffer_fast_strcat(wb, "5", 1); + return; + + case 10: + buffer_fast_strcat(wb, "10", 2); + return; + + default: + break; + } + char *str = &wb->buffer[wb->len]; char *wstr = str; switch (sizeof(void *)) { - case 4: - wstr = (uvalue > (unsigned long long) 0xffffffff) ? print_number_llu_r(wstr, uvalue) : - print_number_lu_r(wstr, uvalue); - break; - case 8: - do { - *wstr++ = (char) ('0' + (uvalue % 10)); - } while (uvalue /= 10); - break; - default: - fatal("Netdata supports only 32-bit & 64-bit systems."); + case 8: + do { + *wstr++ = (char) ('0' + (uvalue % 10)); + } while (uvalue /= 10); + break; + + case 4: + wstr = (uvalue > (unsigned long long) 0xffffffff) ? print_number_llu_r(wstr, uvalue) : + print_number_lu_r(wstr, uvalue); + break; + + default: + fatal("Netdata supports only 32-bit & 64-bit systems."); } // terminate it @@ -132,8 +155,9 @@ void buffer_print_llu(BUFFER *wb, unsigned long long uvalue) char *begin = str, *end = wstr - 1, aux; while (end > begin) aux = *end, *end-- = *begin, *begin++ = aux; + size_t len = wstr - str; // return the buffer length - wb->len += wstr - str; + wb->len += len; } void buffer_print_ll(BUFFER *wb, long long value) @@ -167,7 +191,7 @@ static unsigned char bits03_to_hex[16] = { [15] = 'F' }; -void buffer_print_llu_hex(BUFFER *wb, unsigned long long value) +inline void buffer_print_llu_hex(BUFFER *wb, unsigned long long value) { unsigned char buffer[sizeof(unsigned long long) * 2 + 2 + 1]; // 8 bytes * 2 + '0x' + '\0' unsigned char *e = &buffer[sizeof(unsigned long long) * 2 + 2]; @@ -196,7 +220,16 @@ void buffer_print_llu_hex(BUFFER *wb, unsigned long long value) buffer_fast_strcat(wb, (char *)p, e - p); } -void buffer_fast_strcat(BUFFER *wb, const char *txt, size_t len) { +void buffer_print_ll_hex(BUFFER *wb, long long value) { + if(value < 0) { + buffer_fast_strcat(wb, "-", 1); + value = -value; + } + + buffer_print_llu_hex(wb, value); +} + +inline void buffer_fast_strcat(BUFFER *wb, const char *txt, size_t len) { if(unlikely(!txt || !*txt)) return; buffer_need_bytes(wb, len + 1); @@ -214,6 +247,27 @@ void buffer_fast_strcat(BUFFER *wb, const char *txt, size_t len) { wb->buffer[wb->len] = '\0'; } +void buffer_print_sn_flags(BUFFER *wb, SN_FLAGS flags, bool send_anomaly_bit) { + if(unlikely(flags == SN_EMPTY_SLOT)) { + buffer_fast_strcat(wb, "E", 1); + return; + } + + size_t printed = 0; + if(likely(send_anomaly_bit && (flags & SN_FLAG_NOT_ANOMALOUS))) { + buffer_fast_strcat(wb, "A", 1); + printed++; + } + + if(unlikely(flags & SN_FLAG_RESET)) { + buffer_fast_strcat(wb, "R", 1); + printed++; + } + + if(!printed) + buffer_fast_strcat(wb, "''", 2); +} + void buffer_strcat(BUFFER *wb, const char *txt) { // buffer_sprintf(wb, "%s", txt); @@ -234,7 +288,7 @@ void buffer_strcat(BUFFER *wb, const char *txt) wb->len = len; buffer_overflow_check(wb); - if(*txt) { + if(unlikely(*txt)) { debug(D_WEB_BUFFER, "strcat(): increasing web_buffer at position %zu, size = %zu\n", wb->len, wb->size); len = strlen(txt); buffer_fast_strcat(wb, txt, len); @@ -242,7 +296,7 @@ void buffer_strcat(BUFFER *wb, const char *txt) else { // terminate the string // without increasing the length - buffer_need_bytes(wb, (size_t)1); + buffer_need_bytes(wb, 1); wb->buffer[wb->len] = '\0'; } } @@ -361,7 +415,7 @@ void buffer_sprintf(BUFFER *wb, const char *fmt, ...) void buffer_rrd_value(BUFFER *wb, NETDATA_DOUBLE value) { - buffer_need_bytes(wb, 50); + buffer_need_bytes(wb, 512); if(isnan(value) || isinf(value)) { buffer_strcat(wb, "null"); diff --git a/libnetdata/buffer/buffer.h b/libnetdata/buffer/buffer.h index 0fa3495b4a..0b12c81752 100644 --- a/libnetdata/buffer/buffer.h +++ b/libnetdata/buffer/buffer.h @@ -74,6 +74,8 @@ void buffer_strcat_htmlescape(BUFFER *wb, const char *txt); void buffer_char_replace(BUFFER *wb, char from, char to); +void buffer_print_sn_flags(BUFFER *wb, SN_FLAGS flags, bool send_anomaly_bit); + char *print_number_lu_r(char *str, unsigned long uvalue); char *print_number_llu_r(char *str, unsigned long long uvalue); char *print_number_llu_r_smart(char *str, unsigned long long uvalue); @@ -81,6 +83,7 @@ char *print_number_llu_r_smart(char *str, unsigned long long uvalue); void buffer_print_llu(BUFFER *wb, unsigned long long uvalue); void buffer_print_ll(BUFFER *wb, long long value); void buffer_print_llu_hex(BUFFER *wb, unsigned long long value); +void buffer_print_ll_hex(BUFFER *wb, long long value); static inline void buffer_need_bytes(BUFFER *buffer, size_t needed_free_size) { if(unlikely(buffer->size - buffer->len < needed_free_size)) diff --git a/libnetdata/dictionary/dictionary.c b/libnetdata/dictionary/dictionary.c index 061b671abf..cb8aef9e08 100644 --- a/libnetdata/dictionary/dictionary.c +++ b/libnetdata/dictionary/dictionary.c @@ -10,9 +10,9 @@ typedef enum __attribute__ ((__packed__)) { DICT_FLAG_DESTROYED = (1 << 0), // this dictionary has been destroyed } DICT_FLAGS; -#define dict_flag_check(dict, flag) (__atomic_load_n(&((dict)->flags), __ATOMIC_SEQ_CST) & (flag)) -#define dict_flag_set(dict, flag) __atomic_or_fetch(&((dict)->flags), flag, __ATOMIC_SEQ_CST) -#define dict_flag_clear(dict, flag) __atomic_and_fetch(&((dict)->flags), ~(flag), __ATOMIC_SEQ_CST) +#define dict_flag_check(dict, flag) (__atomic_load_n(&((dict)->flags), __ATOMIC_RELAXED) & (flag)) +#define dict_flag_set(dict, flag) __atomic_or_fetch(&((dict)->flags), flag, __ATOMIC_RELAXED) +#define dict_flag_clear(dict, flag) __atomic_and_fetch(&((dict)->flags), ~(flag), __ATOMIC_RELAXED) // flags macros #define is_dictionary_destroyed(dict) dict_flag_check(dict, DICT_FLAG_DESTROYED) @@ -37,13 +37,13 @@ typedef enum __attribute__ ((__packed__)) item_flags { // IMPORTANT: This is 8-bit } ITEM_FLAGS; -#define item_flag_check(item, flag) (__atomic_load_n(&((item)->flags), __ATOMIC_SEQ_CST) & (flag)) -#define item_flag_set(item, flag) __atomic_or_fetch(&((item)->flags), flag, __ATOMIC_SEQ_CST) -#define item_flag_clear(item, flag) __atomic_and_fetch(&((item)->flags), ~(flag), __ATOMIC_SEQ_CST) +#define item_flag_check(item, flag) (__atomic_load_n(&((item)->flags), __ATOMIC_RELAXED) & (flag)) +#define item_flag_set(item, flag) __atomic_or_fetch(&((item)->flags), flag, __ATOMIC_RELAXED) +#define item_flag_clear(item, flag) __atomic_and_fetch(&((item)->flags), ~(flag), __ATOMIC_RELAXED) -#define item_shared_flag_check(item, flag) (__atomic_load_n(&((item)->shared->flags), __ATOMIC_SEQ_CST) & (flag)) -#define item_shared_flag_set(item, flag) __atomic_or_fetch(&((item)->shared->flags), flag, __ATOMIC_SEQ_CST) -#define item_shared_flag_clear(item, flag) __atomic_and_fetch(&((item)->shared->flags), ~(flag), __ATOMIC_SEQ_CST) +#define item_shared_flag_check(item, flag) (__atomic_load_n(&((item)->shared->flags), __ATOMIC_RELAXED) & (flag)) +#define item_shared_flag_set(item, flag) __atomic_or_fetch(&((item)->shared->flags), flag, __ATOMIC_RELAXED) +#define item_shared_flag_clear(item, flag) __atomic_and_fetch(&((item)->shared->flags), ~(flag), __ATOMIC_RELAXED) #define REFCOUNT_DELETING (-100) @@ -175,7 +175,7 @@ struct dictionary { long int referenced_items; // how many items of the dictionary are currently being used by 3rd parties long int pending_deletion_items; // how many items of the dictionary have been deleted, but have not been removed yet -#ifdef NETDATA_INTERNAL_CHECKS +#ifdef NETDATA_DICTIONARY_VALIDATE_POINTERS netdata_mutex_t global_pointer_registry_mutex; Pvoid_t global_pointer_registry; #endif @@ -205,59 +205,47 @@ static inline int item_is_not_referenced_and_can_be_removed_advanced(DICTIONARY // ---------------------------------------------------------------------------- // validate each pointer is indexed once - internal checks only +#ifdef NETDATA_DICTIONARY_VALIDATE_POINTERS static inline void pointer_index_init(DICTIONARY *dict __maybe_unused) { -#ifdef NETDATA_INTERNAL_CHECKS netdata_mutex_init(&dict->global_pointer_registry_mutex); -#else - ; -#endif } static inline void pointer_destroy_index(DICTIONARY *dict __maybe_unused) { -#ifdef NETDATA_INTERNAL_CHECKS netdata_mutex_lock(&dict->global_pointer_registry_mutex); JudyHSFreeArray(&dict->global_pointer_registry, PJE0); netdata_mutex_unlock(&dict->global_pointer_registry_mutex); -#else - ; -#endif } static inline void pointer_add(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item __maybe_unused) { -#ifdef NETDATA_INTERNAL_CHECKS netdata_mutex_lock(&dict->global_pointer_registry_mutex); Pvoid_t *PValue = JudyHSIns(&dict->global_pointer_registry, &item, sizeof(void *), PJE0); if(*PValue != NULL) fatal("pointer already exists in registry"); *PValue = item; netdata_mutex_unlock(&dict->global_pointer_registry_mutex); -#else - ; -#endif } static inline void pointer_check(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item __maybe_unused) { -#ifdef NETDATA_INTERNAL_CHECKS netdata_mutex_lock(&dict->global_pointer_registry_mutex); Pvoid_t *PValue = JudyHSGet(dict->global_pointer_registry, &item, sizeof(void *)); if(PValue == NULL) fatal("pointer is not found in registry"); netdata_mutex_unlock(&dict->global_pointer_registry_mutex); -#else - ; -#endif } static inline void pointer_del(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item __maybe_unused) { -#ifdef NETDATA_INTERNAL_CHECKS netdata_mutex_lock(&dict->global_pointer_registry_mutex); int ret = JudyHSDel(&dict->global_pointer_registry, &item, sizeof(void *), PJE0); if(!ret) fatal("pointer to be deleted does not exist in registry"); netdata_mutex_unlock(&dict->global_pointer_registry_mutex); -#else - ; -#endif } +#else // !NETDATA_DICTIONARY_VALIDATE_POINTERS +#define pointer_index_init(dict) debug_dummy() +#define pointer_destroy_index(dict) debug_dummy() +#define pointer_add(dict, item) debug_dummy() +#define pointer_check(dict, item) debug_dummy() +#define pointer_del(dict, item) debug_dummy() +#endif // !NETDATA_DICTIONARY_VALIDATE_POINTERS // ---------------------------------------------------------------------------- // memory statistics @@ -298,7 +286,7 @@ static inline void dictionary_hooks_allocate(DICTIONARY *dict) { static inline size_t dictionary_hooks_free(DICTIONARY *dict) { if(!dict->hooks) return 0; - REFCOUNT links = __atomic_sub_fetch(&dict->hooks->links, 1, __ATOMIC_SEQ_CST); + REFCOUNT links = __atomic_sub_fetch(&dict->hooks->links, 1, __ATOMIC_ACQUIRE); if(links == 0) { freez(dict->hooks); dict->hooks = NULL; @@ -358,7 +346,7 @@ size_t dictionary_version(DICTIONARY *dict) { // this is required for views to return the right number garbage_collect_pending_deletes(dict); - return __atomic_load_n(&dict->version, __ATOMIC_SEQ_CST); + return __atomic_load_n(&dict->version, __ATOMIC_RELAXED); } size_t dictionary_entries(DICTIONARY *dict) { if(unlikely(!dict)) return 0; @@ -366,7 +354,7 @@ size_t dictionary_entries(DICTIONARY *dict) { // this is required for views to return the right number garbage_collect_pending_deletes(dict); - long int entries = __atomic_load_n(&dict->entries, __ATOMIC_SEQ_CST); + long int entries = __atomic_load_n(&dict->entries, __ATOMIC_RELAXED); if(entries < 0) fatal("DICTIONARY: entries is negative: %ld", entries); @@ -375,7 +363,7 @@ size_t dictionary_entries(DICTIONARY *dict) { size_t dictionary_referenced_items(DICTIONARY *dict) { if(unlikely(!dict)) return 0; - long int referenced_items = __atomic_load_n(&dict->referenced_items, __ATOMIC_SEQ_CST); + long int referenced_items = __atomic_load_n(&dict->referenced_items, __ATOMIC_RELAXED); if(referenced_items < 0) fatal("DICTIONARY: referenced items is negative: %ld", referenced_items); @@ -387,7 +375,7 @@ long int dictionary_stats_for_registry(DICTIONARY *dict) { return (dict->stats->memory.index + dict->stats->memory.dict); } void dictionary_version_increment(DICTIONARY *dict) { - __atomic_fetch_add(&dict->version, 1, __ATOMIC_SEQ_CST); + __atomic_fetch_add(&dict->version, 1, __ATOMIC_RELAXED); } // ---------------------------------------------------------------------------- @@ -409,9 +397,9 @@ static inline void DICTIONARY_ENTRIES_PLUS1(DICTIONARY *dict) { } else { - __atomic_fetch_add(&dict->version, 1, __ATOMIC_SEQ_CST); - __atomic_fetch_add(&dict->entries, 1, __ATOMIC_SEQ_CST); - __atomic_fetch_add(&dict->referenced_items, 1, __ATOMIC_SEQ_CST); + __atomic_fetch_add(&dict->version, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&dict->entries, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&dict->referenced_items, 1, __ATOMIC_RELAXED); } } static inline void DICTIONARY_ENTRIES_MINUS1(DICTIONARY *dict) { @@ -425,17 +413,15 @@ static inline void DICTIONARY_ENTRIES_MINUS1(DICTIONARY *dict) { entries = dict->entries++; } else { - __atomic_fetch_add(&dict->version, 1, __ATOMIC_SEQ_CST); - entries = __atomic_fetch_sub(&dict->entries, 1, __ATOMIC_SEQ_CST); + __atomic_fetch_add(&dict->version, 1, __ATOMIC_RELAXED); + entries = __atomic_fetch_sub(&dict->entries, 1, __ATOMIC_RELAXED); } -#ifdef NETDATA_INTERNAL_CHECKS - if(unlikely(entries == 0)) - fatal("DICT: negative number of entries in dictionary created from %s() (%zu@%s)", - dict->creation_function, - dict->creation_line, - dict->creation_file); -#endif + internal_fatal(entries == 0, + "DICT: negative number of entries in dictionary created from %s() (%zu@%s)", + dict->creation_function, + dict->creation_line, + dict->creation_file); } static inline void DICTIONARY_VALUE_RESETS_PLUS1(DICTIONARY *dict) { __atomic_fetch_add(&dict->stats->ops.resets, 1, __ATOMIC_RELAXED); @@ -443,7 +429,7 @@ static inline void DICTIONARY_VALUE_RESETS_PLUS1(DICTIONARY *dict) { if(unlikely(is_dictionary_single_threaded(dict))) dict->version++; else - __atomic_fetch_add(&dict->version, 1, __ATOMIC_SEQ_CST); + __atomic_fetch_add(&dict->version, 1, __ATOMIC_RELAXED); } static inline void DICTIONARY_STATS_TRAVERSALS_PLUS1(DICTIONARY *dict) { __atomic_fetch_add(&dict->stats->ops.traversals, 1, __ATOMIC_RELAXED); @@ -464,16 +450,16 @@ static inline void DICTIONARY_STATS_SEARCH_IGNORES_PLUS1(DICTIONARY *dict) { __atomic_fetch_add(&dict->stats->spin_locks.search_spins, 1, __ATOMIC_RELAXED); } static inline void DICTIONARY_STATS_CALLBACK_INSERTS_PLUS1(DICTIONARY *dict) { - __atomic_fetch_add(&dict->stats->callbacks.inserts, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&dict->stats->callbacks.inserts, 1, __ATOMIC_RELEASE); } static inline void DICTIONARY_STATS_CALLBACK_CONFLICTS_PLUS1(DICTIONARY *dict) { - __atomic_fetch_add(&dict->stats->callbacks.conflicts, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&dict->stats->callbacks.conflicts, 1, __ATOMIC_RELEASE); } static inline void DICTIONARY_STATS_CALLBACK_REACTS_PLUS1(DICTIONARY *dict) { - __atomic_fetch_add(&dict->stats->callbacks.reacts, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&dict->stats->callbacks.reacts, 1, __ATOMIC_RELEASE); } static inline void DICTIONARY_STATS_CALLBACK_DELETES_PLUS1(DICTIONARY *dict) { - __atomic_fetch_add(&dict->stats->callbacks.deletes, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&dict->stats->callbacks.deletes, 1, __ATOMIC_RELEASE); } static inline void DICTIONARY_STATS_GARBAGE_COLLECTIONS_PLUS1(DICTIONARY *dict) { __atomic_fetch_add(&dict->stats->ops.garbage_collections, 1, __ATOMIC_RELAXED); @@ -496,52 +482,48 @@ static inline void DICTIONARY_STATS_DICT_FLUSHES_PLUS1(DICTIONARY *dict) { __atomic_fetch_add(&dict->stats->ops.flushes, 1, __ATOMIC_RELAXED); } -static inline long int DICTIONARY_REFERENCED_ITEMS_PLUS1(DICTIONARY *dict) { +static inline void DICTIONARY_REFERENCED_ITEMS_PLUS1(DICTIONARY *dict) { __atomic_fetch_add(&dict->stats->items.referenced, 1, __ATOMIC_RELAXED); if(unlikely(is_dictionary_single_threaded(dict))) - return ++dict->referenced_items; + ++dict->referenced_items; else - return __atomic_add_fetch(&dict->referenced_items, 1, __ATOMIC_SEQ_CST); + __atomic_add_fetch(&dict->referenced_items, 1, __ATOMIC_RELAXED); } -static inline long int DICTIONARY_REFERENCED_ITEMS_MINUS1(DICTIONARY *dict) { +static inline void DICTIONARY_REFERENCED_ITEMS_MINUS1(DICTIONARY *dict) { __atomic_fetch_sub(&dict->stats->items.referenced, 1, __ATOMIC_RELAXED); - long int referenced_items; + long int referenced_items; (void)referenced_items; if(unlikely(is_dictionary_single_threaded(dict))) referenced_items = --dict->referenced_items; else referenced_items = __atomic_sub_fetch(&dict->referenced_items, 1, __ATOMIC_SEQ_CST); -#ifdef NETDATA_INTERNAL_CHECKS - if(unlikely(referenced_items < 0)) - fatal("DICT: negative number of referenced items (%ld) in dictionary created from %s() (%zu@%s)", - referenced_items, - dict->creation_function, - dict->creation_line, - dict->creation_file); -#endif - - return referenced_items; + internal_fatal(referenced_items < 0, + "DICT: negative number of referenced items (%ld) in dictionary created from %s() (%zu@%s)", + referenced_items, + dict->creation_function, + dict->creation_line, + dict->creation_file); } -static inline long int DICTIONARY_PENDING_DELETES_PLUS1(DICTIONARY *dict) { +static inline void DICTIONARY_PENDING_DELETES_PLUS1(DICTIONARY *dict) { __atomic_fetch_add(&dict->stats->items.pending_deletion, 1, __ATOMIC_RELAXED); if(unlikely(is_dictionary_single_threaded(dict))) - return ++dict->pending_deletion_items; + ++dict->pending_deletion_items; else - return __atomic_add_fetch(&dict->pending_deletion_items, 1, __ATOMIC_SEQ_CST); + __atomic_add_fetch(&dict->pending_deletion_items, 1, __ATOMIC_RELEASE); } static inline long int DICTIONARY_PENDING_DELETES_MINUS1(DICTIONARY *dict) { - __atomic_fetch_sub(&dict->stats->items.pending_deletion, 1, __ATOMIC_RELAXED); + __atomic_fetch_sub(&dict->stats->items.pending_deletion, 1, __ATOMIC_RELEASE); if(unlikely(is_dictionary_single_threaded(dict))) return --dict->pending_deletion_items; else - return __atomic_sub_fetch(&dict->pending_deletion_items, 1, __ATOMIC_SEQ_CST); + return __atomic_sub_fetch(&dict->pending_deletion_items, 1, __ATOMIC_ACQUIRE); } static inline long int DICTIONARY_PENDING_DELETES_GET(DICTIONARY *dict) { @@ -555,11 +537,11 @@ static inline REFCOUNT DICTIONARY_ITEM_REFCOUNT_GET(DICTIONARY *dict, DICTIONARY if(unlikely(dict && is_dictionary_single_threaded(dict))) // this is an exception, dict can be null return item->refcount; else - return (REFCOUNT)__atomic_load_n(&item->refcount, __ATOMIC_SEQ_CST); + return (REFCOUNT)__atomic_load_n(&item->refcount, __ATOMIC_ACQUIRE); } static inline REFCOUNT DICTIONARY_ITEM_REFCOUNT_GET_SOLE(DICTIONARY_ITEM *item) { - return (REFCOUNT)__atomic_load_n(&item->refcount, __ATOMIC_SEQ_CST); + return (REFCOUNT)__atomic_load_n(&item->refcount, __ATOMIC_ACQUIRE); } // ---------------------------------------------------------------------------- @@ -579,8 +561,8 @@ static void dictionary_execute_insert_callback(DICTIONARY *dict, DICTIONARY_ITEM dict->creation_line, dict->creation_file); - DICTIONARY_STATS_CALLBACK_INSERTS_PLUS1(dict); dict->hooks->ins_callback(item, item->shared->value, constructor_data?constructor_data:dict->hooks->ins_callback_data); + DICTIONARY_STATS_CALLBACK_INSERTS_PLUS1(dict); } static bool dictionary_execute_conflict_callback(DICTIONARY *dict, DICTIONARY_ITEM *item, void *new_value, void *constructor_data) { @@ -597,10 +579,13 @@ static bool dictionary_execute_conflict_callback(DICTIONARY *dict, DICTIONARY_IT dict->creation_line, dict->creation_file); - DICTIONARY_STATS_CALLBACK_CONFLICTS_PLUS1(dict); - return dict->hooks->conflict_callback( + bool ret = dict->hooks->conflict_callback( item, item->shared->value, new_value, constructor_data ? constructor_data : dict->hooks->conflict_callback_data); + + DICTIONARY_STATS_CALLBACK_CONFLICTS_PLUS1(dict); + + return ret; } static void dictionary_execute_react_callback(DICTIONARY *dict, DICTIONARY_ITEM *item, void *constructor_data) { @@ -617,9 +602,10 @@ static void dictionary_execute_react_callback(DICTIONARY *dict, DICTIONARY_ITEM dict->creation_line, dict->creation_file); - DICTIONARY_STATS_CALLBACK_REACTS_PLUS1(dict); dict->hooks->react_callback(item, item->shared->value, constructor_data?constructor_data:dict->hooks->react_callback_data); + + DICTIONARY_STATS_CALLBACK_REACTS_PLUS1(dict); } static void dictionary_execute_delete_callback(DICTIONARY *dict, DICTIONARY_ITEM *item) { @@ -637,8 +623,9 @@ static void dictionary_execute_delete_callback(DICTIONARY *dict, DICTIONARY_ITEM dict->creation_line, dict->creation_file); - DICTIONARY_STATS_CALLBACK_DELETES_PLUS1(dict); dict->hooks->del_callback(item, item->shared->value, dict->hooks->del_callback_data); + + DICTIONARY_STATS_CALLBACK_DELETES_PLUS1(dict); } // ---------------------------------------------------------------------------- @@ -648,8 +635,8 @@ static inline size_t dictionary_locks_init(DICTIONARY *dict) { if(likely(!is_dictionary_single_threaded(dict))) { netdata_rwlock_init(&dict->index.rwlock); netdata_rwlock_init(&dict->items.rwlock); - return 0; } + return 0; } @@ -657,29 +644,29 @@ static inline size_t dictionary_locks_destroy(DICTIONARY *dict) { if(likely(!is_dictionary_single_threaded(dict))) { netdata_rwlock_destroy(&dict->index.rwlock); netdata_rwlock_destroy(&dict->items.rwlock); - return 0; } + return 0; } static inline void ll_recursive_lock_set_thread_as_writer(DICTIONARY *dict) { pid_t expected = 0, desired = gettid(); - if(!__atomic_compare_exchange_n(&dict->items.writer_pid, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) - fatal("DICTIONARY: Cannot set thread %d as exclusive writer, expected %d, desired %d, found %d.", gettid(), expected, desired, __atomic_load_n(&dict->items.writer_pid, __ATOMIC_SEQ_CST)); + if(!__atomic_compare_exchange_n(&dict->items.writer_pid, &expected, desired, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) + fatal("DICTIONARY: Cannot set thread %d as exclusive writer, expected %d, desired %d, found %d.", gettid(), expected, desired, __atomic_load_n(&dict->items.writer_pid, __ATOMIC_RELAXED)); } static inline void ll_recursive_unlock_unset_thread_writer(DICTIONARY *dict) { pid_t expected = gettid(), desired = 0; - if(!__atomic_compare_exchange_n(&dict->items.writer_pid, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) - fatal("DICTIONARY: Cannot unset thread %d as exclusive writer, expected %d, desired %d, found %d.", gettid(), expected, desired, __atomic_load_n(&dict->items.writer_pid, __ATOMIC_SEQ_CST)); + if(!__atomic_compare_exchange_n(&dict->items.writer_pid, &expected, desired, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) + fatal("DICTIONARY: Cannot unset thread %d as exclusive writer, expected %d, desired %d, found %d.", gettid(), expected, desired, __atomic_load_n(&dict->items.writer_pid, __ATOMIC_RELAXED)); } static inline bool ll_recursive_lock_is_thread_the_writer(DICTIONARY *dict) { pid_t tid = gettid(); - return tid > 0 && tid == __atomic_load_n(&dict->items.writer_pid, __ATOMIC_SEQ_CST); + return tid > 0 && tid == __atomic_load_n(&dict->items.writer_pid, __ATOMIC_RELAXED); } -static void ll_recursive_lock(DICTIONARY *dict, char rw) { +static inline void ll_recursive_lock(DICTIONARY *dict, char rw) { if(unlikely(is_dictionary_single_threaded(dict))) return; @@ -699,7 +686,7 @@ static void ll_recursive_lock(DICTIONARY *dict, char rw) { } } -static void ll_recursive_unlock(DICTIONARY *dict, char rw) { +static inline void ll_recursive_unlock(DICTIONARY *dict, char rw) { if(unlikely(is_dictionary_single_threaded(dict))) return; @@ -722,10 +709,10 @@ static void ll_recursive_unlock(DICTIONARY *dict, char rw) { } } -void dictionary_write_lock(DICTIONARY *dict) { +inline void dictionary_write_lock(DICTIONARY *dict) { ll_recursive_lock(dict, DICTIONARY_LOCK_WRITE); } -void dictionary_write_unlock(DICTIONARY *dict) { +inline void dictionary_write_unlock(DICTIONARY *dict) { ll_recursive_unlock(dict, DICTIONARY_LOCK_WRITE); } @@ -760,8 +747,8 @@ static inline void dictionary_index_wrlock_unlock(DICTIONARY *dict) { // items garbage collector static void garbage_collect_pending_deletes(DICTIONARY *dict) { - usec_t last_master_deletion_us = dict->hooks?__atomic_load_n(&dict->hooks->last_master_deletion_us, __ATOMIC_SEQ_CST):0; - usec_t last_gc_run_us = __atomic_load_n(&dict->last_gc_run_us, __ATOMIC_SEQ_CST); + usec_t last_master_deletion_us = dict->hooks?__atomic_load_n(&dict->hooks->last_master_deletion_us, __ATOMIC_RELAXED):0; + usec_t last_gc_run_us = __atomic_load_n(&dict->last_gc_run_us, __ATOMIC_RELAXED); bool is_view = is_view_dictionary(dict); @@ -773,7 +760,7 @@ static void garbage_collect_pending_deletes(DICTIONARY *dict) { ll_recursive_lock(dict, DICTIONARY_LOCK_WRITE); - __atomic_store_n(&dict->last_gc_run_us, now_realtime_usec(), __ATOMIC_SEQ_CST); + __atomic_store_n(&dict->last_gc_run_us, now_realtime_usec(), __ATOMIC_RELAXED); if(is_view) dictionary_index_lock_wrlock(dict); @@ -819,25 +806,22 @@ static void garbage_collect_pending_deletes(DICTIONARY *dict) { (void)deleted; (void)examined; - internal_error(false, "DICTIONARY: garbage collected dictionary created by %s (%zu@%s), examined %zu items, deleted %zu items, still pending %zu items", - dict->creation_function, dict->creation_line, dict->creation_file, examined, deleted, pending); - + internal_error(false, "DICTIONARY: garbage collected dictionary created by %s (%zu@%s), " + "examined %zu items, deleted %zu items, still pending %zu items", + dict->creation_function, dict->creation_line, dict->creation_file, + examined, deleted, pending); } // ---------------------------------------------------------------------------- // reference counters -static inline size_t reference_counter_init(DICTIONARY *dict) { - (void)dict; - +static inline size_t reference_counter_init(DICTIONARY *dict __maybe_unused) { // allocate memory required for reference counters // return number of bytes return 0; } -static inline size_t reference_counter_free(DICTIONARY *dict) { - (void)dict; - +static inline size_t reference_counter_free(DICTIONARY *dict __maybe_unused) { // free memory required for reference counters // return number of bytes return 0; @@ -846,13 +830,13 @@ static inline size_t reference_counter_free(DICTIONARY *dict) { static void item_acquire(DICTIONARY *dict, DICTIONARY_ITEM *item) { REFCOUNT refcount; - if(unlikely(is_dictionary_single_threaded(dict))) { + if(unlikely(is_dictionary_single_threaded(dict))) refcount = ++item->refcount; - } - else { + + else // increment the refcount refcount = __atomic_add_fetch(&item->refcount, 1, __ATOMIC_SEQ_CST); - } + if(refcount <= 0) { internal_error( @@ -900,7 +884,7 @@ static void item_release(DICTIONARY *dict, DICTIONARY_ITEM *item) { is_deleted = item_flag_check(item, ITEM_FLAG_DELETED); // decrement the refcount - refcount = __atomic_sub_fetch(&item->refcount, 1, __ATOMIC_SEQ_CST); + refcount = __atomic_sub_fetch(&item->refcount, 1, __ATOMIC_RELEASE); } if(refcount < 0) { @@ -956,14 +940,14 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it desired = refcount + 1; - } while(!__atomic_compare_exchange_n(&item->refcount, &refcount, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)); + } while(!__atomic_compare_exchange_n(&item->refcount, &refcount, desired, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)); // if ret == ITEM_OK, we acquired the item if(ret == RC_ITEM_OK) { - if (is_view_dictionary(dict) && + if (unlikely(is_view_dictionary(dict) && item_shared_flag_check(item, ITEM_FLAG_DELETED) && - !item_flag_check(item, ITEM_FLAG_DELETED)) { + !item_flag_check(item, ITEM_FLAG_DELETED))) { // but, we can't use this item if (having_index_lock) { @@ -979,7 +963,7 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it dict_item_set_deleted(dict, item); // decrement the refcount we incremented above - if (__atomic_sub_fetch(&item->refcount, 1, __ATOMIC_SEQ_CST) == 0) { + if (__atomic_sub_fetch(&item->refcount, 1, __ATOMIC_RELEASE) == 0) { // this is a deleted item, and we are the last one DICTIONARY_PENDING_DELETES_PLUS1(dict); } @@ -988,7 +972,7 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it } else { // this is traversal / walkthrough // decrement the refcount we incremented above - __atomic_sub_fetch(&item->refcount, 1, __ATOMIC_SEQ_CST); + __atomic_sub_fetch(&item->refcount, 1, __ATOMIC_RELEASE); } return RC_ITEM_MARKED_FOR_DELETION; @@ -998,7 +982,6 @@ static int item_check_and_acquire_advanced(DICTIONARY *dict, DICTIONARY_ITEM *it DICTIONARY_REFERENCED_ITEMS_PLUS1(dict); } - if(unlikely(spins > 1 && dict->stats)) DICTIONARY_STATS_CHECK_SPINS_PLUS(dict, spins - 1); @@ -1037,7 +1020,7 @@ static inline int item_is_not_referenced_and_can_be_removed_advanced(DICTIONARY ret = RC_ITEM_IS_CURRENTLY_BEING_CREATED; break; } - } while(!__atomic_compare_exchange_n(&item->refcount, &refcount, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)); + } while(!__atomic_compare_exchange_n(&item->refcount, &refcount, desired, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)); #ifdef NETDATA_INTERNAL_CHECKS if(ret == RC_ITEM_OK) @@ -1055,8 +1038,8 @@ static inline int item_is_not_referenced_and_can_be_removed_advanced(DICTIONARY static inline bool item_shared_release_and_check_if_it_can_be_freed(DICTIONARY *dict __maybe_unused, DICTIONARY_ITEM *item) { // if we can set refcount to REFCOUNT_DELETING, we can delete this item - REFCOUNT links = __atomic_sub_fetch(&item->shared->links, 1, __ATOMIC_SEQ_CST); - if(links == 0 && __atomic_compare_exchange_n(&item->shared->links, &links, REFCOUNT_DELETING, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { + REFCOUNT links = __atomic_sub_fetch(&item->shared->links, 1, __ATOMIC_RELEASE); + if(links == 0 && __atomic_compare_exchange_n(&item->shared->links, &links, REFCOUNT_DELETING, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) { // we can delete it return true; @@ -1290,7 +1273,7 @@ static DICTIONARY_ITEM *dict_item_create(DICTIONARY *dict __maybe_unused, size_t if(master_item) { item->shared = master_item->shared; - if(unlikely(__atomic_add_fetch(&item->shared->links, 1, __ATOMIC_SEQ_CST) <= 1)) + if(unlikely(__atomic_add_fetch(&item->shared->links, 1, __ATOMIC_ACQUIRE) <= 1)) fatal("DICTIONARY: attempted to link to a shared item structure that had zero references"); } else { @@ -1478,7 +1461,7 @@ static void dict_item_shared_set_deleted(DICTIONARY *dict, DICTIONARY_ITEM *item item_shared_flag_set(item, ITEM_FLAG_DELETED); if(dict->hooks) - __atomic_store_n(&dict->hooks->last_master_deletion_us, now_realtime_usec(), __ATOMIC_SEQ_CST); + __atomic_store_n(&dict->hooks->last_master_deletion_us, now_realtime_usec(), __ATOMIC_RELAXED); } } @@ -1486,7 +1469,7 @@ static void dict_item_shared_set_deleted(DICTIONARY *dict, DICTIONARY_ITEM *item static bool dict_item_set_deleted(DICTIONARY *dict, DICTIONARY_ITEM *item) { ITEM_FLAGS expected, desired; - expected = __atomic_load_n(&item->flags, __ATOMIC_SEQ_CST); + expected = __atomic_load_n(&item->flags, __ATOMIC_RELAXED); do { @@ -1495,7 +1478,7 @@ static bool dict_item_set_deleted(DICTIONARY *dict, DICTIONARY_ITEM *item) { desired = expected | ITEM_FLAG_DELETED; - } while(!__atomic_compare_exchange_n(&item->flags, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)); + } while(!__atomic_compare_exchange_n(&item->flags, &expected, desired, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)); DICTIONARY_ENTRIES_MINUS1(dict); return true; @@ -2063,11 +2046,11 @@ DICTIONARY *dictionary_create_view(DICTIONARY *master) { dictionary_hooks_allocate(master); - if(unlikely(__atomic_load_n(&master->hooks->links, __ATOMIC_SEQ_CST)) < 1) + if(unlikely(__atomic_load_n(&master->hooks->links, __ATOMIC_RELAXED)) < 1) fatal("DICTIONARY: attempted to create a view that has %d links", master->hooks->links); dict->hooks = master->hooks; - __atomic_add_fetch(&master->hooks->links, 1, __ATOMIC_SEQ_CST); + __atomic_add_fetch(&master->hooks->links, 1, __ATOMIC_ACQUIRE); #ifdef NETDATA_INTERNAL_CHECKS dict->creation_function = function; @@ -2472,7 +2455,7 @@ int dictionary_sorted_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)( DICTIONARY_STATS_WALKTHROUGHS_PLUS1(dict); ll_recursive_lock(dict, rw); - size_t entries = __atomic_load_n(&dict->entries, __ATOMIC_SEQ_CST); + size_t entries = __atomic_load_n(&dict->entries, __ATOMIC_RELAXED); DICTIONARY_ITEM **array = mallocz(sizeof(DICTIONARY_ITEM *) * entries); size_t i; @@ -3293,12 +3276,12 @@ static void *unittest_dict_master_thread(void *arg) { DICTIONARY_ITEM *item = NULL; int loops = 0; - while(!__atomic_load_n(&tv->join, __ATOMIC_SEQ_CST)) { + while(!__atomic_load_n(&tv->join, __ATOMIC_RELAXED)) { if(!item) item = dictionary_set_and_acquire_item(tv->master, "ITEM1", "123", strlen("123") + 1); - if(__atomic_load_n(&tv->item_master, __ATOMIC_SEQ_CST) != NULL) { + if(__atomic_load_n(&tv->item_master, __ATOMIC_RELAXED) != NULL) { dictionary_acquired_item_release(tv->master, item); dictionary_del(tv->master, "ITEM1"); item = NULL; @@ -3307,7 +3290,7 @@ static void *unittest_dict_master_thread(void *arg) { } dictionary_acquired_item_dup(tv->master, item); // for the view thread - __atomic_store_n(&tv->item_master, item, __ATOMIC_SEQ_CST); + __atomic_store_n(&tv->item_master, item, __ATOMIC_RELAXED); dictionary_del(tv->master, "ITEM1"); @@ -3333,13 +3316,13 @@ static void *unittest_dict_view_thread(void *arg) { DICTIONARY_ITEM *m_item = NULL; - while(!__atomic_load_n(&tv->join, __ATOMIC_SEQ_CST)) { - if(!(m_item = __atomic_load_n(&tv->item_master, __ATOMIC_SEQ_CST))) + while(!__atomic_load_n(&tv->join, __ATOMIC_RELAXED)) { + if(!(m_item = __atomic_load_n(&tv->item_master, __ATOMIC_RELAXED))) continue; DICTIONARY_ITEM *v_item = dictionary_view_set_and_acquire_item(tv->view, "ITEM2", m_item); dictionary_acquired_item_release(tv->master, m_item); - __atomic_store_n(&tv->item_master, NULL, __ATOMIC_SEQ_CST); + __atomic_store_n(&tv->item_master, NULL, __ATOMIC_RELAXED); for(int i = 0; i < tv->dups ; i++) { dictionary_acquired_item_dup(tv->view, v_item); @@ -3351,7 +3334,7 @@ static void *unittest_dict_view_thread(void *arg) { dictionary_del(tv->view, "ITEM2"); - while(!__atomic_load_n(&tv->join, __ATOMIC_SEQ_CST) && !(m_item = __atomic_load_n(&tv->item_master, __ATOMIC_SEQ_CST))) { + while(!__atomic_load_n(&tv->join, __ATOMIC_RELAXED) && !(m_item = __atomic_load_n(&tv->item_master, __ATOMIC_RELAXED))) { dictionary_acquired_item_dup(tv->view, v_item); dictionary_acquired_item_release(tv->view, v_item); } diff --git a/libnetdata/inlined.h b/libnetdata/inlined.h index aa7f3c2137..94410cee44 100644 --- a/libnetdata/inlined.h +++ b/libnetdata/inlined.h @@ -122,6 +122,37 @@ static inline unsigned long long str2ull(const char *s) { return n; } +static inline unsigned long long str2ull_hex_or_dec(const char *s) { + unsigned long long n = 0; + char c; + + if(likely(s[0] == '0' && s[1] == 'x')) { + const char *e = &s[sizeof(unsigned long long) * 2 + 2 + 1]; // max number of character to iterate: 8 bytes * 2 + '0x' + '\0' + + // skip 0x + s += 2; + + for (c = *s; ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'F')) && s < e; c = *(++s)) { + n = n << 4; + + if (c <= '9') + n += c - '0'; + else + n += c - 'A' + 10; + } + return n; + } + else + return str2ull(s); +} + +static inline long long str2ll_hex_or_dec(const char *s) { + if(*s == '-') + return -(long long)str2ull_hex_or_dec(&s[1]); + else + return (long long)str2ull_hex_or_dec(s); +} + static inline long long str2ll(const char *s, char **endptr) { int negative = 0; diff --git a/libnetdata/libnetdata.c b/libnetdata/libnetdata.c index f6b6b026a9..efdc4e5226 100644 --- a/libnetdata/libnetdata.c +++ b/libnetdata/libnetdata.c @@ -2069,3 +2069,116 @@ void for_each_open_fd(OPEN_FD_ACTION action, OPEN_FD_EXCLUDE excluded_fds){ closedir(dir); } } + +struct timing_steps { + const char *name; + usec_t time; + size_t count; +} timing_steps[TIMING_STEP_MAX + 1] = { + [TIMING_STEP_INTERNAL] = { .name = "internal", .time = 0, }, + + [TIMING_STEP_BEGIN2_PREPARE] = { .name = "BEGIN2 prepare", .time = 0, }, + [TIMING_STEP_BEGIN2_FIND_CHART] = { .name = "BEGIN2 find chart", .time = 0, }, + [TIMING_STEP_BEGIN2_PARSE] = { .name = "BEGIN2 parse", .time = 0, }, + [TIMING_STEP_BEGIN2_ML] = { .name = "BEGIN2 ml", .time = 0, }, + [TIMING_STEP_BEGIN2_PROPAGATE] = { .name = "BEGIN2 propagate", .time = 0, }, + [TIMING_STEP_BEGIN2_STORE] = { .name = "BEGIN2 store", .time = 0, }, + + [TIMING_STEP_SET2_PREPARE] = { .name = "SET2 prepare", .time = 0, }, + [TIMING_STEP_SET2_LOOKUP_DIMENSION] = { .name = "SET2 find dimension", .time = 0, }, + [TIMING_STEP_SET2_PARSE] = { .name = "SET2 parse", .time = 0, }, + [TIMING_STEP_SET2_ML] = { .name = "SET2 ml", .time = 0, }, + [TIMING_STEP_SET2_PROPAGATE] = { .name = "SET2 propagate", .time = 0, }, + [TIMING_STEP_RRDSET_STORE_METRIC] = { .name = "SET2 rrdset store", .time = 0, }, + [TIMING_STEP_DBENGINE_FIRST_CHECK] = { .name = "db 1st check", .time = 0, }, + [TIMING_STEP_DBENGINE_CHECK_DATA] = { .name = "db check data", .time = 0, }, + [TIMING_STEP_DBENGINE_PACK] = { .name = "db pack", .time = 0, }, + [TIMING_STEP_DBENGINE_PAGE_FIN] = { .name = "db page fin", .time = 0, }, + [TIMING_STEP_DBENGINE_MRG_UPDATE] = { .name = "db mrg update", .time = 0, }, + [TIMING_STEP_DBENGINE_PAGE_ALLOC] = { .name = "db page alloc", .time = 0, }, + [TIMING_STEP_DBENGINE_CREATE_NEW_PAGE] = { .name = "db new page", .time = 0, }, + [TIMING_STEP_DBENGINE_FLUSH_PAGE] = { .name = "db page flush", .time = 0, }, + [TIMING_STEP_SET2_STORE] = { .name = "SET2 store", .time = 0, }, + + [TIMING_STEP_END2_PREPARE] = { .name = "END2 prepare", .time = 0, }, + [TIMING_STEP_END2_PUSH_V1] = { .name = "END2 push v1", .time = 0, }, + [TIMING_STEP_END2_ML] = { .name = "END2 ml", .time = 0, }, + [TIMING_STEP_END2_RRDSET] = { .name = "END2 rrdset", .time = 0, }, + [TIMING_STEP_END2_PROPAGATE] = { .name = "END2 propagate", .time = 0, }, + [TIMING_STEP_END2_STORE] = { .name = "END2 store", .time = 0, }, + + // terminator + [TIMING_STEP_MAX] = { .name = NULL, .time = 0, }, +}; + +void timing_action(TIMING_ACTION action, TIMING_STEP step) { + static __thread usec_t last_action_time = 0; + static struct timing_steps timings2[TIMING_STEP_MAX + 1] = {}; + + switch(action) { + case TIMING_ACTION_INIT: + last_action_time = now_monotonic_usec(); + break; + + case TIMING_ACTION_STEP: { + if(!last_action_time) + return; + + usec_t now = now_monotonic_usec(); + __atomic_add_fetch(&timing_steps[step].time, now - last_action_time, __ATOMIC_RELAXED); + __atomic_add_fetch(&timing_steps[step].count, 1, __ATOMIC_RELAXED); + last_action_time = now; + break; + } + + case TIMING_ACTION_FINISH: { + if(!last_action_time) + return; + + usec_t expected = __atomic_load_n(&timing_steps[TIMING_STEP_INTERNAL].time, __ATOMIC_RELAXED); + if(last_action_time - expected < 10 * USEC_PER_SEC) { + last_action_time = 0; + return; + } + + if(!__atomic_compare_exchange_n(&timing_steps[TIMING_STEP_INTERNAL].time, &expected, last_action_time, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { + last_action_time = 0; + return; + } + + struct timing_steps timings3[TIMING_STEP_MAX + 1]; + memcpy(timings3, timing_steps, sizeof(timings3)); + + size_t total_reqs = 0; + usec_t total_usec = 0; + for(size_t t = 1; t < TIMING_STEP_MAX ; t++) { + total_usec += timings3[t].time - timings2[t].time; + total_reqs += timings3[t].count - timings2[t].count; + } + + BUFFER *wb = buffer_create(1024, NULL); + + for(size_t t = 1; t < TIMING_STEP_MAX ; t++) { + size_t requests = timings3[t].count - timings2[t].count; + if(!requests) continue; + + buffer_sprintf(wb, "TIMINGS REPORT: [%3zu. %-20s]: # %10zu, t %11.2f ms (%6.2f %%), avg %6.2f usec/run\n", + t, + timing_steps[t].name ? timing_steps[t].name : "x", + requests, + (double) (timings3[t].time - timings2[t].time) / (double)USEC_PER_MS, + (double) (timings3[t].time - timings2[t].time) * 100.0 / (double) total_usec, + (double) (timings3[t].time - timings2[t].time) / (double)requests + ); + } + + info("TIMINGS REPORT:\n%sTIMINGS REPORT: total # %10zu, t %11.2f ms", + buffer_tostring(wb), total_reqs, (double)total_usec / USEC_PER_MS); + + memcpy(timings2, timings3, sizeof(timings2)); + + last_action_time = 0; + buffer_free(wb); + } + } +} diff --git a/libnetdata/libnetdata.h b/libnetdata/libnetdata.h index c504bd4bdb..8d50a55c57 100644 --- a/libnetdata/libnetdata.h +++ b/libnetdata/libnetdata.h @@ -661,6 +661,60 @@ static inline size_t indexing_partition(Word_t ptr, Word_t modulo) { } } +typedef enum { + TIMING_STEP_INTERNAL = 0, + + TIMING_STEP_BEGIN2_PREPARE, + TIMING_STEP_BEGIN2_FIND_CHART, + TIMING_STEP_BEGIN2_PARSE, + TIMING_STEP_BEGIN2_ML, + TIMING_STEP_BEGIN2_PROPAGATE, + TIMING_STEP_BEGIN2_STORE, + + TIMING_STEP_SET2_PREPARE, + TIMING_STEP_SET2_LOOKUP_DIMENSION, + TIMING_STEP_SET2_PARSE, + TIMING_STEP_SET2_ML, + TIMING_STEP_SET2_PROPAGATE, + TIMING_STEP_RRDSET_STORE_METRIC, + TIMING_STEP_DBENGINE_FIRST_CHECK, + TIMING_STEP_DBENGINE_CHECK_DATA, + TIMING_STEP_DBENGINE_PACK, + TIMING_STEP_DBENGINE_PAGE_FIN, + TIMING_STEP_DBENGINE_MRG_UPDATE, + TIMING_STEP_DBENGINE_PAGE_ALLOC, + TIMING_STEP_DBENGINE_CREATE_NEW_PAGE, + TIMING_STEP_DBENGINE_FLUSH_PAGE, + TIMING_STEP_SET2_STORE, + + TIMING_STEP_END2_PREPARE, + TIMING_STEP_END2_PUSH_V1, + TIMING_STEP_END2_ML, + TIMING_STEP_END2_RRDSET, + TIMING_STEP_END2_PROPAGATE, + TIMING_STEP_END2_STORE, + + // terminator + TIMING_STEP_MAX, +} TIMING_STEP; + +typedef enum { + TIMING_ACTION_INIT, + TIMING_ACTION_STEP, + TIMING_ACTION_FINISH, +} TIMING_ACTION; + +#ifdef NETDATA_TIMING_REPORT +#define timing_init() timing_action(TIMING_ACTION_INIT, TIMING_STEP_INTERNAL) +#define timing_step(step) timing_action(TIMING_ACTION_STEP, step) +#define timing_report() timing_action(TIMING_ACTION_FINISH, TIMING_STEP_INTERNAL) +#else +#define timing_init() debug_dummy() +#define timing_step(step) debug_dummy() +#define timing_report() debug_dummy() +#endif +void timing_action(TIMING_ACTION action, TIMING_STEP step); + # ifdef __cplusplus } # endif diff --git a/libnetdata/storage_number/storage_number.h b/libnetdata/storage_number/storage_number.h index faea477511..a8ad352c3d 100644 --- a/libnetdata/storage_number/storage_number.h +++ b/libnetdata/storage_number/storage_number.h @@ -180,6 +180,10 @@ static inline NETDATA_DOUBLE str2ndd(const char *s, char **endptr) { if(endptr) *endptr = (char *)&s[3]; return NAN; } + if(s[1] == 'u' && s[2] == 'l' && s[3] == 'l') { + if(endptr) *endptr = (char *)&s[3]; + return NAN; + } break; case 'i': diff --git a/libnetdata/worker_utilization/worker_utilization.c b/libnetdata/worker_utilization/worker_utilization.c index 8028e3a21d..d47d81c4e5 100644 --- a/libnetdata/worker_utilization/worker_utilization.c +++ b/libnetdata/worker_utilization/worker_utilization.c @@ -61,6 +61,14 @@ static struct workers_globals { static __thread struct worker *worker = NULL; // the current thread worker +static inline usec_t worker_now_monotonic_usec(void) { +#ifdef NETDATA_WITHOUT_WORKERS_LATENCY + return 0; +#else + return now_monotonic_usec(); +#endif +} + size_t workers_allocated_memory(void) { netdata_spinlock_lock(&workers_globals.spinlock); size_t memory = workers_globals.memory; @@ -77,7 +85,7 @@ void worker_register(const char *name) { worker->tag = strdupz(netdata_thread_tag()); worker->workname = strdupz(name); - usec_t now = now_monotonic_usec(); + usec_t now = worker_now_monotonic_usec(); worker->statistics_last_checkpoint = now; worker->last_action_timestamp = now; worker->last_action = WORKER_IDLE; @@ -181,14 +189,14 @@ static inline void worker_is_idle_with_time(usec_t now) { void worker_is_idle(void) { if(unlikely(!worker || worker->last_action != WORKER_BUSY)) return; - worker_is_idle_with_time(now_monotonic_usec()); + worker_is_idle_with_time(worker_now_monotonic_usec()); } void worker_is_busy(size_t job_id) { if(unlikely(!worker || job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES)) return; - usec_t now = now_monotonic_usec(); + usec_t now = worker_now_monotonic_usec(); if(worker->last_action == WORKER_BUSY) worker_is_idle_with_time(now); @@ -260,7 +268,7 @@ void workers_foreach(const char *name, void (*callback)( struct worker *p; DOUBLE_LINKED_LIST_FOREACH_FORWARD(workname->base, p, prev, next) { - usec_t now = now_monotonic_usec(); + usec_t now = worker_now_monotonic_usec(); // find per job type statistics STRING *per_job_type_name[WORKER_UTILIZATION_MAX_JOB_TYPES]; -- cgit v1.2.3