diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-09-05 19:31:06 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-05 19:31:06 +0300 |
commit | 5e1b95cf92168c4df74586fb4430dc284806da82 (patch) | |
tree | f42077d8b02eaf316683453a7474bd1f599a833d /libnetdata/dictionary | |
parent | 544aef1fde6e79ac57d2dea85d3f063076d7f885 (diff) |
Deduplicate all netdata strings (#13570)
* rrdfamily
* rrddim
* rrdset plugin and module names
* rrdset units
* rrdset type
* rrdset family
* rrdset title
* rrdset title more
* rrdset context
* rrdcalctemplate context and removal of context hash from rrdset
* strings statistics
* rrdset name
* rearranged members of rrdset
* eliminate rrdset name hash; rrdcalc chart converted to STRING
* rrdset id, eliminated rrdset hash
* rrdcalc, alarm_entry, alert_config and some of rrdcalctemplate
* rrdcalctemplate
* rrdvar
* eval_variable
* rrddimvar and rrdsetvar
* rrdhost hostname, os and tags
* fix master commits
* added thread cache; implemented string_dup without locks
* faster thread cache
* rrdset and rrddim now use dictionaries for indexing
* rrdhost now uses dictionary
* rrdfamily now uses DICTIONARY
* rrdvar using dictionary instead of AVL
* allocate the right size to rrdvar flag members
* rrdhost remaining char * members to STRING *
* better error handling on indexing
* strings now use a read/write lock to allow parallel searches to the index
* removed AVL support from dictionaries; implemented STRING with native Judy calls
* string releases should be negative
* only 31 bits are allowed for enum flags
* proper locking on strings
* string threading unittest and fixes
* fix lgtm finding
* fixed naming
* stream chart/dimension definitions at the beginning of a streaming session
* thread stack variable is undefined on thread cancel
* rrdcontext garbage collect per host on startup
* worker control in garbage collection
* relaxed deletion of rrdmetrics
* type checking on dictfe
* netdata chart to monitor rrdcontext triggers
* Group chart label updates
* rrdcontext better handling of collected rrdsets
* rrdpush incremental transmition of definitions should use as much buffer as possible
* require 1MB per chart
* empty the sender buffer before enabling metrics streaming
* fill up to 50% of buffer
* reset signaling metrics sending
* use the shared variable for status
* use separate host flag for enabling streaming of metrics
* make sure the flag is clear
* add logging for streaming
* add logging for streaming on buffer overflow
* circular_buffer proper sizing
* removed obsolete logs
* do not execute worker jobs if not necessary
* better messages about compression disabling
* proper use of flags and updating rrdset last access time every time the obsoletion flag is flipped
* monitor stream sender used buffer ratio
* Update exporting unit tests
* no need to compare label value with strcmp
* streaming send workers now monitor bandwidth
* workers now use strings
* streaming receiver monitors incoming bandwidth
* parser shift of worker ids
* minor fixes
* Group chart label updates
* Populate context with dimensions that have data
* Fix chart id
* better shift of parser worker ids
* fix for streaming compression
* properly count received bytes
* ensure LZ4 compression ring buffer does not wrap prematurely
* do not stream empty charts; do not process empty instances in rrdcontext
* need_to_send_chart_definition() does not need an rrdset lock any more
* rrdcontext objects are collected, after data have been written to the db
* better logging of RRDCONTEXT transitions
* always set all variables needed by the worker utilization charts
* implemented double linked list for most objects; eliminated alarm indexes from rrdhost; and many more fixes
* lockless strings design - string_dup() and string_freez() are totally lockless when they dont need to touch Judy - only Judy is protected with a read/write lock
* STRING code re-organization for clarity
* thread_cache improvements; double numbers precision on worker threads
* STRING_ENTRY now shadown STRING, so no duplicate definition is required; string_length() renamed to string_strlen() to follow the paradigm of all other functions, STRING internal statistics are now only compiled with NETDATA_INTERNAL_CHECKS
* rrdhost index by hostname now cleans up; aclk queries of archieved hosts do not index hosts
* Add index to speed up database context searches
* Removed last_updated optimization (was also buggy after latest merge with master)
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Co-authored-by: Vladimir Kobal <vlad@prokk.net>
Diffstat (limited to 'libnetdata/dictionary')
-rw-r--r-- | libnetdata/dictionary/dictionary.c | 608 | ||||
-rw-r--r-- | libnetdata/dictionary/dictionary.h | 33 |
2 files changed, 438 insertions, 203 deletions
diff --git a/libnetdata/dictionary/dictionary.c b/libnetdata/dictionary/dictionary.c index c1325ecb54..6e37554ea4 100644 --- a/libnetdata/dictionary/dictionary.c +++ b/libnetdata/dictionary/dictionary.c @@ -1,28 +1,17 @@ // SPDX-License-Identifier: GPL-3.0-or-later // NOT TO BE USED BY USERS -#define DICTIONARY_FLAG_EXCLUSIVE_ACCESS (1 << 29) // there is only one thread accessing the dictionary -#define DICTIONARY_FLAG_DESTROYED (1 << 30) // this dictionary has been destroyed -#define DICTIONARY_FLAG_DEFER_ALL_DELETIONS (1 << 31) // defer all deletions of items in the dictionary +#define DICTIONARY_FLAG_EXCLUSIVE_ACCESS (1 << 28) // there is only one thread accessing the dictionary +#define DICTIONARY_FLAG_DESTROYED (1 << 29) // this dictionary has been destroyed +#define DICTIONARY_FLAG_DEFER_ALL_DELETIONS (1 << 30) // defer all deletions of items in the dictionary // our reserved flags that cannot be set by users #define DICTIONARY_FLAGS_RESERVED (DICTIONARY_FLAG_EXCLUSIVE_ACCESS|DICTIONARY_FLAG_DESTROYED|DICTIONARY_FLAG_DEFER_ALL_DELETIONS) -typedef struct dictionary DICTIONARY; #define DICTIONARY_INTERNALS #include "../libnetdata.h" - -#ifndef ENABLE_DBENGINE -#define DICTIONARY_WITH_AVL -#warning Compiling DICTIONARY with an AVL index -#else -#define DICTIONARY_WITH_JUDYHS -#endif - -#ifdef DICTIONARY_WITH_JUDYHS #include <Judy.h> -#endif typedef enum name_value_flags { NAME_VALUE_FLAG_NONE = 0, @@ -38,10 +27,6 @@ typedef enum name_value_flags { */ typedef struct name_value { -#ifdef DICTIONARY_WITH_AVL - avl_t avl_node; -#endif - #ifdef NETDATA_INTERNAL_CHECKS DICTIONARY *dict; #endif @@ -72,15 +57,7 @@ struct dictionary { NAME_VALUE *first_item; // the double linked list base pointers NAME_VALUE *last_item; -#ifdef DICTIONARY_WITH_AVL - avl_tree_type values_index; - NAME_VALUE *hash_base; - void *(*get_thread_static_name_value)(const char *name); -#endif - -#ifdef DICTIONARY_WITH_JUDYHS Pvoid_t JudyHSArray; // the hash table -#endif netdata_rwlock_t rwlock; // the r/w lock when DICTIONARY_FLAG_SINGLE_THREADED is not set @@ -315,9 +292,9 @@ static inline size_t dictionary_lock_free(DICTIONARY *dict) { } static void dictionary_lock(DICTIONARY *dict, char rw) { - if(rw == 'u' || rw == 'U') return; + if(rw == DICTIONARY_LOCK_NONE || rw == 'U') return; - if(rw == 'r' || rw == 'R') { + if(rw == DICTIONARY_LOCK_READ || rw == DICTIONARY_LOCK_REENTRANT || rw == 'R') { // read lock __atomic_add_fetch(&dict->readers, 1, __ATOMIC_RELAXED); } @@ -329,7 +306,7 @@ static void dictionary_lock(DICTIONARY *dict, char rw) { if(likely(dict->flags & DICTIONARY_FLAG_SINGLE_THREADED)) return; - if(rw == 'r' || rw == 'R') { + if(rw == DICTIONARY_LOCK_READ || rw == DICTIONARY_LOCK_REENTRANT || rw == 'R') { // read lock netdata_rwlock_rdlock(&dict->rwlock); @@ -347,9 +324,9 @@ static void dictionary_lock(DICTIONARY *dict, char rw) { } static void dictionary_unlock(DICTIONARY *dict, char rw) { - if(rw == 'u' || rw == 'U') return; + if(rw == DICTIONARY_LOCK_NONE || rw == 'U') return; - if(rw == 'r' || rw == 'R') { + if(rw == DICTIONARY_LOCK_READ || rw == DICTIONARY_LOCK_REENTRANT || rw == 'R') { // read unlock __atomic_sub_fetch(&dict->readers, 1, __ATOMIC_RELAXED); } @@ -480,68 +457,6 @@ static uint32_t reference_counter_release(DICTIONARY *dict, NAME_VALUE *nv, bool // ---------------------------------------------------------------------------- // hash table -#ifdef DICTIONARY_WITH_AVL -static inline const char *namevalue_get_name(NAME_VALUE *nv); - -static int name_value_compare(void* a, void* b) { - return strcmp(namevalue_get_name((NAME_VALUE *)a), namevalue_get_name((NAME_VALUE *)b)); -} - -static void *get_thread_static_name_value(const char *name) { - static __thread NAME_VALUE tmp = { 0 }; - tmp.flags = NAME_VALUE_FLAG_NONE; - tmp.caller_name = (char *)name; - return &tmp; -} - -static void hashtable_init_unsafe(DICTIONARY *dict) { - avl_init(&dict->values_index, name_value_compare); - dict->get_thread_static_name_value = get_thread_static_name_value; -} - -static size_t hashtable_destroy_unsafe(DICTIONARY *dict) { - (void)dict; - return 0; -} - -static inline int hashtable_delete_unsafe(DICTIONARY *dict, const char *name, size_t name_len, void *nv) { - (void)name; - (void)name_len; - - if(unlikely(avl_remove(&(dict->values_index), (avl_t *)(nv)) != (avl_t *)nv)) - return 0; - - return 1; -} - -static inline NAME_VALUE *hashtable_get_unsafe(DICTIONARY *dict, const char *name, size_t name_len) { - (void)name_len; - - void *tmp = dict->get_thread_static_name_value(name); - return (NAME_VALUE *)avl_search(&(dict->values_index), (avl_t *)tmp); -} - -static inline NAME_VALUE **hashtable_insert_unsafe(DICTIONARY *dict, const char *name, size_t name_len) { - // AVL needs a NAME_VALUE to insert into the dictionary but we don't have it yet. - // So, the only thing we can do, is return an existing one if it is already there. - // Returning NULL will make the caller thing we added it, will allocate one - // and will call hashtable_inserted_name_value_unsafe(), at which we will do - // the actual indexing. - - dict->hash_base = hashtable_get_unsafe(dict, name, name_len); - return &dict->hash_base; -} - -static inline void hashtable_inserted_name_value_unsafe(DICTIONARY *dict, void *nv) { - // we have our new NAME_VALUE object. - // Let's index it. - - if(unlikely(avl_insert(&((dict)->values_index), (avl_t *)(nv)) != (avl_t *)nv)) - error("dictionary: INTERNAL ERROR: duplicate insertion to dictionary."); -} -#endif - -#ifdef DICTIONARY_WITH_JUDYHS static void hashtable_init_unsafe(DICTIONARY *dict) { dict->JudyHSArray = NULL; } @@ -562,7 +477,7 @@ static size_t hashtable_destroy_unsafe(DICTIONARY *dict) { return (size_t)ret; } -static inline NAME_VALUE **hashtable_insert_unsafe(DICTIONARY *dict, const char *name, size_t name_len) { +static inline void **hashtable_insert_unsafe(DICTIONARY *dict, const char *name, size_t name_len) { internal_error(!(dict->flags & DICTIONARY_FLAG_EXCLUSIVE_ACCESS), "DICTIONARY: inserting item from the index without exclusive access to the dictionary created by %s() (%zu@%s)", dict->creation_function, dict->creation_line, dict->creation_file); JError_t J_Error; @@ -579,7 +494,7 @@ static inline NAME_VALUE **hashtable_insert_unsafe(DICTIONARY *dict, const char // put anything needed at the value of the index. // The pointer to pointer we return has to be used before // any other operation that may change the index (insert/delete). - return (NAME_VALUE **)Rc; + return Rc; } static inline int hashtable_delete_unsafe(DICTIONARY *dict, const char *name, size_t name_len, void *nv) { @@ -632,8 +547,6 @@ static inline void hashtable_inserted_name_value_unsafe(DICTIONARY *dict, void * ; } -#endif // DICTIONARY_WITH_JUDYHS - // ---------------------------------------------------------------------------- // linked list management @@ -979,7 +892,7 @@ static NAME_VALUE *dictionary_set_name_value_unsafe(DICTIONARY *dict, const char // But the caller has the option to do this on his/her own. // So, let's do the fastest here and let the caller decide the flow of calls. - NAME_VALUE *nv, **pnv = hashtable_insert_unsafe(dict, name, name_len); + NAME_VALUE *nv, **pnv = (NAME_VALUE **)hashtable_insert_unsafe(dict, name, name_len); if(likely(*pnv == 0)) { // a new item added to the index nv = *pnv = namevalue_create_unsafe(dict, name, name_len, value, value_len); @@ -1280,6 +1193,9 @@ void *dictionary_foreach_start_rw(DICTFE *dfe, DICTIONARY *dict, char rw) { dfe->value = NULL; } + if(unlikely(dfe->rw == DICTIONARY_LOCK_REENTRANT)) + dictionary_unlock(dfe->dict, dfe->rw); + return dfe->value; } @@ -1294,6 +1210,9 @@ void *dictionary_foreach_next(DICTFE *dfe) { return NULL; } + if(unlikely(dfe->rw == DICTIONARY_LOCK_REENTRANT)) + dictionary_lock(dfe->dict, dfe->rw); + // the item we just did NAME_VALUE *nv = (NAME_VALUE *)dfe->last_item; @@ -1320,6 +1239,9 @@ void *dictionary_foreach_next(DICTFE *dfe) { dfe->value = NULL; } + if(unlikely(dfe->rw == DICTIONARY_LOCK_REENTRANT)) + dictionary_unlock(dfe->dict, dfe->rw); + return dfe->value; } @@ -1338,7 +1260,9 @@ usec_t dictionary_foreach_done(DICTFE *dfe) { if(likely(nv)) reference_counter_release(dfe->dict, nv, false); - dictionary_unlock(dfe->dict, dfe->rw); + if(likely(dfe->rw != DICTIONARY_LOCK_REENTRANT)) + dictionary_unlock(dfe->dict, dfe->rw); + dfe->dict = NULL; dfe->last_item = NULL; dfe->name = NULL; @@ -1383,8 +1307,14 @@ int dictionary_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)(const c // while we are using it reference_counter_acquire(dict, nv); + if(unlikely(rw == DICTIONARY_LOCK_REENTRANT)) + dictionary_unlock(dict, rw); + int r = callback(namevalue_get_name(nv), nv->value, data); + if(unlikely(rw == DICTIONARY_LOCK_REENTRANT)) + dictionary_lock(dict, rw); + // since we have a reference counter, this item cannot be deleted // until we release the reference counter, so the pointers are there nv_next = nv->next; @@ -1449,7 +1379,15 @@ int dictionary_sorted_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)( nv = array[i]; if(likely(!(nv->flags & NAME_VALUE_FLAG_DELETED))) { reference_counter_acquire(dict, nv); + + if(unlikely(rw == DICTIONARY_LOCK_REENTRANT)) + dictionary_unlock(dict, rw); + int r = callback(namevalue_get_name(nv), nv->value, data); + + if(unlikely(rw == DICTIONARY_LOCK_REENTRANT)) + dictionary_lock(dict, rw); + reference_counter_release(dict, nv, false); if (r < 0) { ret = r; @@ -1469,132 +1407,293 @@ int dictionary_sorted_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)( // ---------------------------------------------------------------------------- // STRING implementation - dedup all STRINGs -typedef struct string_entry { -#ifdef DICTIONARY_WITH_AVL - avl_t avl_node; +struct netdata_string { + uint32_t length; // the string length including the terminating '\0' + + int32_t refcount; // how many times this string is used + // We use a signed number to be able to detect duplicate frees of a string. + // If at any point this goes below zero, we have a duplicate free. + + const char str[]; // the string itself, is appended to this structure +}; + +static struct string_hashtable { + Pvoid_t JudyHSArray; // the Judy array - hashtable + netdata_rwlock_t rwlock; // the R/W lock to protect the Judy array + + long int entries; // the number of entries in the index + long int active_references; // the number of active references alive + long int memory; // the memory used, without the JudyHS index + + size_t inserts; // the number of successful inserts to the index + size_t deletes; // the number of successful deleted from the index + size_t searches; // the number of successful searches in the index + size_t duplications; // when a string is referenced + size_t releases; // when a string is unreferenced + +#ifdef NETDATA_INTERNAL_CHECKS + // internal statistics + size_t found_deleted_on_search; + size_t found_available_on_search; + size_t found_deleted_on_insert; + size_t found_available_on_insert; + size_t spins; +#endif + +} string_base = { + .JudyHSArray = NULL, + .rwlock = NETDATA_RWLOCK_INITIALIZER, +}; + +#ifdef NETDATA_INTERNAL_CHECKS +#define string_internal_stats_add(var, val) __atomic_add_fetch(&string_base.var, val, __ATOMIC_RELAXED) +#else +#define string_internal_stats_add(var, val) do {;} while(0) #endif - uint32_t length; // the string length with the terminating '\0' - uint32_t refcount; // how many times this string is used - const char str[]; // the string itself -} STRING_ENTRY; -#ifdef DICTIONARY_WITH_AVL -static int string_entry_compare(void* a, void* b) { - return strcmp(((STRING_ENTRY *)a)->str, ((STRING_ENTRY *)b)->str); +#define string_stats_atomic_increment(var) __atomic_add_fetch(&string_base.var, 1, __ATOMIC_RELAXED) +#define string_stats_atomic_decrement(var) __atomic_sub_fetch(&string_base.var, 1, __ATOMIC_RELAXED) + +void string_statistics(size_t *inserts, size_t *deletes, size_t *searches, size_t *entries, size_t *references, size_t *memory, size_t *duplications, size_t *releases) { + *inserts = string_base.inserts; + *deletes = string_base.deletes; + *searches = string_base.searches; + *entries = (size_t)string_base.entries; + *references = (size_t)string_base.active_references; + *memory = (size_t)string_base.memory; + *duplications = string_base.duplications; + *releases = string_base.releases; } -static void *get_thread_static_string_entry(const char *name) { - static __thread size_t _length = 0; - static __thread STRING_ENTRY *_tmp = NULL; +#define string_entry_acquire(se) __atomic_add_fetch(&((se)->refcount), 1, __ATOMIC_SEQ_CST); +#define string_entry_release(se) __atomic_sub_fetch(&((se)->refcount), 1, __ATOMIC_SEQ_CST); - size_t size = sizeof(STRING_ENTRY) + strlen(name) + 1; - if(likely(_tmp && _length < size)) { - freez(_tmp); - _tmp = NULL; - _length = 0; - } +static inline bool string_entry_check_and_acquire(STRING *se) { + int32_t expected, desired, count = 0; + do { + count++; + + expected = se->refcount; + + if(expected <= 0) { + // We cannot use this. + // The reference counter reached value zero, + // so another thread is deleting this. + string_internal_stats_add(spins, count - 1); + return false; + } - if(unlikely(!_tmp)) { - _tmp = callocz(1, size); - _length = size; + desired = expected + 1; } + while(!__atomic_compare_exchange_n(&se->refcount, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)); + + string_internal_stats_add(spins, count - 1); + + // statistics + // string_base.active_references is altered at the in string_strdupz() and string_freez() + string_stats_atomic_increment(duplications); - strcpy((char *)&_tmp->str[0], name); - return _tmp; + return true; } -#endif -DICTIONARY string_dictionary = { -#ifdef DICTIONARY_WITH_AVL - .values_index = { - .root = NULL, - .compar = string_entry_compare - }, - .get_thread_static_name_value = get_thread_static_string_entry, +STRING *string_dup(STRING *string) { + if(unlikely(!string)) return NULL; + +#ifdef NETDATA_INTERNAL_CHECKS + if(unlikely(__atomic_load_n(&string->refcount, __ATOMIC_SEQ_CST) <= 0)) + fatal("STRING: tried to %s() a string that is freed (it has %d references).", __FUNCTION__, string->refcount); #endif - .flags = DICTIONARY_FLAG_EXCLUSIVE_ACCESS, - .rwlock = NETDATA_RWLOCK_INITIALIZER -}; + string_entry_acquire(string); -static netdata_mutex_t string_mutex = NETDATA_MUTEX_INITIALIZER; + // statistics + string_stats_atomic_increment(active_references); + string_stats_atomic_increment(duplications); -STRING *string_dup(STRING *string) { - if(unlikely(!string)) return NULL; + return string; +} + +// Search the index and return an ACQUIRED string entry, or NULL +static inline STRING *string_index_search(const char *str, size_t length) { + if(unlikely(!string_base.JudyHSArray)) + return NULL; + + STRING *string; + + // Find the string in the index + // With a read-lock so that multiple readers can use the index concurrently. + + netdata_rwlock_rdlock(&string_base.rwlock); + + Pvoid_t *Rc; + Rc = JudyHSGet(string_base.JudyHSArray, (void *)str, length); + if(likely(Rc)) { + // found in the hash table + string = *Rc; + + if(string_entry_check_and_acquire(string)) { + // we can use this entry + string_internal_stats_add(found_available_on_search, 1); + } + else { + // this entry is about to be deleted by another thread + // do not touch it, let it go... + string = NULL; + string_internal_stats_add(found_deleted_on_search, 1); + } + } + else { + // not found in the hash table + string = NULL; + } + + string_stats_atomic_increment(searches); + netdata_rwlock_unlock(&string_base.rwlock); - STRING_ENTRY *se = (STRING_ENTRY *)string; - netdata_mutex_lock(&string_mutex); - se->refcount++; - netdata_mutex_unlock(&string_mutex); return string; } -STRING *string_strdupz(const char *str) { - if(unlikely(!str || !*str)) return NULL; +// Insert a string to the index and return an ACQUIRED string entry, +// or NULL if the call needs to be retried (a deleted entry with the same key is still in the index) +// The returned entry is ACQUIRED and it can either be: +// 1. a new item inserted, or +// 2. an item found in the index that is not currently deleted +static inline STRING *string_index_insert(const char *str, size_t length) { + STRING *string; - netdata_mutex_lock(&string_mutex); + netdata_rwlock_wrlock(&string_base.rwlock); - size_t length = strlen(str) + 1; - STRING_ENTRY *se; - STRING_ENTRY **ptr = (STRING_ENTRY **)hashtable_insert_unsafe(&string_dictionary, str, length); - if(unlikely(*ptr == 0)) { + STRING **ptr; + { + JError_t J_Error; + Pvoid_t *Rc = JudyHSIns(&string_base.JudyHSArray, (void *)str, length, &J_Error); + if (unlikely(Rc == PJERR)) { + fatal( + "STRING: Cannot insert entry with name '%s' to JudyHS, JU_ERRNO_* == %u, ID == %d", + str, + JU_ERRNO(&J_Error), + JU_ERRID(&J_Error)); + } + ptr = (STRING **)Rc; + } + + if (likely(*ptr == 0)) { // a new item added to the index - size_t mem_size = sizeof(STRING_ENTRY) + length; - se = mallocz(mem_size); - strcpy((char *)se->str, str); - se->length = length; - se->refcount = 1; - *ptr = se; - hashtable_inserted_name_value_unsafe(&string_dictionary, se); - string_dictionary.version++; - string_dictionary.inserts++; - string_dictionary.entries++; - string_dictionary.memory += (long)mem_size; + size_t mem_size = sizeof(STRING) + length; + string = mallocz(mem_size); + strcpy((char *)string->str, str); + string->length = length; + string->refcount = 1; + *ptr = string; + string_base.inserts++; + string_base.entries++; + string_base.memory += (long)mem_size; } else { // the item is already in the index - se = *ptr; - se->refcount++; - string_dictionary.searches++; + string = *ptr; + + if(string_entry_check_and_acquire(string)) { + // we can use this entry + string_internal_stats_add(found_available_on_insert, 1); + } + else { + // this entry is about to be deleted by another thread + // do not touch it, let it go... + string = NULL; + string_internal_stats_add(found_deleted_on_insert, 1); + } + + string_stats_atomic_increment(searches); } - netdata_mutex_unlock(&string_mutex); - return (STRING *)se; + netdata_rwlock_unlock(&string_base.rwlock); + return string; } -void string_freez(STRING *string) { - if(unlikely(!string)) return; - netdata_mutex_lock(&string_mutex); +// delete an entry from the index +static inline void string_index_delete(STRING *string) { + netdata_rwlock_wrlock(&string_base.rwlock); - STRING_ENTRY *se = (STRING_ENTRY *)string; +#ifdef NETDATA_INTERNAL_CHECKS + if(unlikely(__atomic_load_n(&string->refcount, __ATOMIC_SEQ_CST) != 0)) + fatal("STRING: tried to delete a string at %s() that is already freed (it has %d references).", __FUNCTION__, string->refcount); +#endif - if(se->refcount == 0) - fatal("STRING: tried to free string that has zero references."); + bool deleted = false; - se->refcount--; - if(unlikely(se->refcount == 0)) { - if(hashtable_delete_unsafe(&string_dictionary, se->str, se->length, se) == 0) - error("STRING: INTERNAL ERROR: tried to delete '%s' that is not in the index", se->str); + if (likely(string_base.JudyHSArray)) { + JError_t J_Error; + int ret = JudyHSDel(&string_base.JudyHSArray, (void *)string->str, string->length, &J_Error); + if (unlikely(ret == JERR)) { + error( + "STRING: Cannot delete entry with name '%s' from JudyHS, JU_ERRNO_* == %u, ID == %d", + string->str, + JU_ERRNO(&J_Error), + JU_ERRID(&J_Error)); + } else + deleted = true; + } - size_t mem_size = sizeof(STRING_ENTRY) + se->length; - freez(se); - string_dictionary.version++; - string_dictionary.deletes++; - string_dictionary.entries--; - string_dictionary.memory -= (long)mem_size; + if (unlikely(!deleted)) + error("STRING: tried to delete '%s' that is not in the index. Ignoring it.", string->str); + else { + size_t mem_size = sizeof(STRING) + string->length; + string_base.deletes++; + string_base.entries--; + string_base.memory -= (long)mem_size; + freez(string); } - netdata_mutex_unlock(&string_mutex); + netdata_rwlock_unlock(&string_base.rwlock); } -size_t string_length(STRING *string) { +STRING *string_strdupz(const char *str) { + if(unlikely(!str || !*str)) return NULL; + + size_t length = strlen(str) + 1; + STRING *string = string_index_search(str, length); + + while(!string) { + // The search above did not find anything, + // We loop here, because during insert we may find an entry that is being deleted by another thread. + // So, we have to let it go and retry to insert it again. + + string = string_index_insert(str, length); + } + + // statistics + string_stats_atomic_increment(active_references); + + return string; +} + +void string_freez(STRING *string) { + if(unlikely(!string)) return; + + int32_t refcount = string_entry_release(string); + +#ifdef NETDATA_INTERNAL_CHECKS + if(unlikely(refcount < 0)) + fatal("STRING: tried to %s() a string that is already freed (it has %d references).", __FUNCTION__, string->refcount); +#endif + + if(unlikely(refcount == 0)) + string_index_delete(string); + + // statistics + string_stats_atomic_decrement(active_references); + string_stats_atomic_increment(releases); +} + +size_t string_strlen(STRING *string) { if(unlikely(!string)) return 0; - return ((STRING_ENTRY *)string)->length - 1; + return string->length - 1; } const char *string2str(STRING *string) { if(unlikely(!string)) return ""; - return ((STRING_ENTRY *)string)->str; + return string->str; } STRING *string_2way_merge(STRING *a, STRING *b) { @@ -1610,9 +1709,9 @@ STRING *string_2way_merge(STRING *a, STRING *b) { if(unlikely(!a)) return string_dup(X); if(unlikely(!b)) return string_dup(X); - size_t alen = string_length(a); - size_t blen = string_length(b); - size_t length = alen + blen + string_length(X) + 1; + size_t alen = string_strlen(a); + size_t blen = string_strlen(b); + size_t length = alen + blen + string_strlen(X) + 1; char buf1[length + 1], buf2[length + 1], *dst1; const char *s1, *s2; @@ -1643,6 +1742,52 @@ STRING *string_2way_merge(STRING *a, STRING *b) { } // ---------------------------------------------------------------------------- +// THREAD_CACHE + +static __thread Pvoid_t thread_cache_judy_array = NULL; + +void *thread_cache_entry_get_or_set(void *key, + ssize_t key_length, + void *value, + void *(*transform_the_value_before_insert)(void *key, size_t key_length, void *value) + ) { + if(unlikely(!key || !key_length)) return NULL; + + if(key_length == -1) + key_length = (ssize_t)strlen((char *)key) + 1; + + JError_t J_Error; + Pvoid_t *Rc = JudyHSIns(&thread_cache_judy_array, key, key_length, &J_Error); + if (unlikely(Rc == PJERR)) { + fatal("THREAD_CACHE: Cannot insert entry to JudyHS, JU_ERRNO_* == %u, ID == %d", + JU_ERRNO(&J_Error), JU_ERRID(&J_Error)); + } + + if(*Rc == 0) { + // new item added + + *Rc = (transform_the_value_before_insert) ? transform_the_value_before_insert(key, key_length, value) : value; + } + + return *Rc; +} + +void thread_cache_destroy(void) { + if(unlikely(!thread_cache_judy_array)) return; + + JError_t J_Error; + Word_t ret = JudyHSFreeArray(&thread_cache_judy_array, &J_Error); + if(unlikely(ret == (Word_t) JERR)) { + error("THREAD_CACHE: Cannot destroy JudyHS, JU_ERRNO_* == %u, ID == %d", + JU_ERRNO(&J_Error), JU_ERRID(&J_Error)); + } + + internal_error(true, "THREAD_CACHE: hash table freed %lu bytes", ret); + + thread_cache_judy_array = NULL; +} + +// ---------------------------------------------------------------------------- // unit test static void dictionary_unittest_free_char_pp(char **pp, size_t entries) { @@ -2153,6 +2298,27 @@ static size_t check_name_value_deleted_flag(DICTIONARY *dict, NAME_VALUE *nv, co return errors; } +static int string_threads_join = 0; +static void *string_thread(void *arg __maybe_unused) { + int dups = 1; //(gettid() % 10); + for(; 1 ;) { + if(string_threads_join) + break; + + STRING *s = string_strdupz("string thread checking 1234567890"); + + for(int i = 0; i < dups ; i++) + string_dup(s); + + for(int i = 0; i < dups ; i++) + string_freez(s); + + string_freez(s); + } + + return arg; +} + int dictionary_unittest(size_t entries) { if(entries < 10) entries = 10; @@ -2291,7 +2457,7 @@ int dictionary_unittest(size_t entries) { // check string { - long string_entries_starting = dictionary_stats_entries(&string_dictionary); + long int string_entries_starting = string_base.entries; fprintf(stderr, "\nChecking strings...\n"); @@ -2312,8 +2478,7 @@ int dictionary_unittest(size_t entries) { else fprintf(stderr, "OK: cloning string are deduplicated\n"); - STRING_ENTRY *se = (STRING_ENTRY *)s1; - if(se->refcount != 3) { + if(s1->refcount != 3) { errors++; fprintf(stderr, "ERROR: string refcount is not 3\n"); } @@ -2355,9 +2520,9 @@ int dictionary_unittest(size_t entries) { freez(strings); - if(dictionary_stats_entries(&string_dictionary) != string_entries_starting + 2) { + if(string_base.entries != string_entries_starting + 2) { errors++; - fprintf(stderr, "ERROR: strings dictionary should have %ld items but it has %ld\n", string_entries_starting + 2, dictionary_stats_entries(&string_dictionary)); + fprintf(stderr, "ERROR: strings dictionary should have %ld items but it has %ld\n", string_entries_starting + 2, string_base.entries); } else fprintf(stderr, "OK: strings dictionary has 2 items\n"); @@ -2405,6 +2570,65 @@ int dictionary_unittest(size_t entries) { dictionary_unittest_free_char_pp(names, entries); dictionary_unittest_free_char_pp(values, entries); + { +#ifdef NETDATA_INTERNAL_CHECKS + size_t ofound_deleted_on_search = string_base.found_deleted_on_search, + ofound_available_on_search = string_base.found_available_on_search, + ofound_deleted_on_insert = string_base.found_deleted_on_insert, + ofound_available_on_insert = string_base.found_available_on_insert, + ospins = string_base.spins; +#endif + + size_t oinserts, odeletes, osearches, oentries, oreferences, omemory, oduplications, oreleases; + string_statistics(&oinserts, &odeletes, &osearches, &oentries, &oreferences, &omemory, &oduplications, &oreleases); + + time_t seconds_to_run = 5; + int threads_to_create = 2; + fprintf( + stderr, + "Checking string concurrency with %d threads for %ld seconds...\n", + threads_to_create, + seconds_to_run); + // check string concurrency + netdata_thread_t threads[threads_to_create]; + string_threads_join = 0; + for (int i = 0; i < threads_to_create; i++) { + char buf[100 + 1]; + snprintf(buf, 100, "string%d", i); + netdata_thread_create( + &threads[i], buf, NETDATA_THREAD_OPTION_DONT_LOG | NETDATA_THREAD_OPTION_JOINABLE, string_thread, NULL); + } + sleep_usec(seconds_to_run * USEC_PER_SEC); + + string_threads_join = 1; + for (int i = 0; i < threads_to_create; i++) { + void *retval; + netdata_thread_join(threads[i], &retval); + } + + size_t inserts, deletes, searches, sentries, references, memory, duplications, releases; + string_statistics(&inserts, &deletes, &searches, &sentries, &references, &memory, &duplications, &releases); + + fprintf(stderr, "inserts %zu, deletes %zu, searches %zu, entries %zu, references %zu, memory %zu, duplications %zu, releases %zu\n", + inserts - oinserts, deletes - odeletes, searches - osearches, sentries - oentries, references - oreferences, memory - omemory, duplications - oduplications, releases - oreleases); + +#ifdef NETDATA_INTERNAL_CHECKS + size_t found_deleted_on_search = string_base.found_deleted_on_search, + found_available_on_search = string_base.found_available_on_search, + found_deleted_on_insert = string_base.found_deleted_on_insert, + found_available_on_insert = string_base.found_available_on_insert, + spins = string_base.spins; + + fprintf(stderr, "on insert: %zu ok + %zu deleted\non search: %zu ok + %zu deleted\nspins: %zu\n", + found_available_on_insert - ofound_available_on_insert, + found_deleted_on_insert - ofound_deleted_on_insert, + found_available_on_search - ofound_available_on_search, + found_deleted_on_search - ofound_deleted_on_search, + spins - ospins + ); +#endif + } |