summaryrefslogtreecommitdiffstats
path: root/libnetdata/dictionary
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-09-05 19:31:06 +0300
committerGitHub <noreply@github.com>2022-09-05 19:31:06 +0300
commit5e1b95cf92168c4df74586fb4430dc284806da82 (patch)
treef42077d8b02eaf316683453a7474bd1f599a833d /libnetdata/dictionary
parent544aef1fde6e79ac57d2dea85d3f063076d7f885 (diff)
Deduplicate all netdata strings (#13570)
* rrdfamily * rrddim * rrdset plugin and module names * rrdset units * rrdset type * rrdset family * rrdset title * rrdset title more * rrdset context * rrdcalctemplate context and removal of context hash from rrdset * strings statistics * rrdset name * rearranged members of rrdset * eliminate rrdset name hash; rrdcalc chart converted to STRING * rrdset id, eliminated rrdset hash * rrdcalc, alarm_entry, alert_config and some of rrdcalctemplate * rrdcalctemplate * rrdvar * eval_variable * rrddimvar and rrdsetvar * rrdhost hostname, os and tags * fix master commits * added thread cache; implemented string_dup without locks * faster thread cache * rrdset and rrddim now use dictionaries for indexing * rrdhost now uses dictionary * rrdfamily now uses DICTIONARY * rrdvar using dictionary instead of AVL * allocate the right size to rrdvar flag members * rrdhost remaining char * members to STRING * * better error handling on indexing * strings now use a read/write lock to allow parallel searches to the index * removed AVL support from dictionaries; implemented STRING with native Judy calls * string releases should be negative * only 31 bits are allowed for enum flags * proper locking on strings * string threading unittest and fixes * fix lgtm finding * fixed naming * stream chart/dimension definitions at the beginning of a streaming session * thread stack variable is undefined on thread cancel * rrdcontext garbage collect per host on startup * worker control in garbage collection * relaxed deletion of rrdmetrics * type checking on dictfe * netdata chart to monitor rrdcontext triggers * Group chart label updates * rrdcontext better handling of collected rrdsets * rrdpush incremental transmition of definitions should use as much buffer as possible * require 1MB per chart * empty the sender buffer before enabling metrics streaming * fill up to 50% of buffer * reset signaling metrics sending * use the shared variable for status * use separate host flag for enabling streaming of metrics * make sure the flag is clear * add logging for streaming * add logging for streaming on buffer overflow * circular_buffer proper sizing * removed obsolete logs * do not execute worker jobs if not necessary * better messages about compression disabling * proper use of flags and updating rrdset last access time every time the obsoletion flag is flipped * monitor stream sender used buffer ratio * Update exporting unit tests * no need to compare label value with strcmp * streaming send workers now monitor bandwidth * workers now use strings * streaming receiver monitors incoming bandwidth * parser shift of worker ids * minor fixes * Group chart label updates * Populate context with dimensions that have data * Fix chart id * better shift of parser worker ids * fix for streaming compression * properly count received bytes * ensure LZ4 compression ring buffer does not wrap prematurely * do not stream empty charts; do not process empty instances in rrdcontext * need_to_send_chart_definition() does not need an rrdset lock any more * rrdcontext objects are collected, after data have been written to the db * better logging of RRDCONTEXT transitions * always set all variables needed by the worker utilization charts * implemented double linked list for most objects; eliminated alarm indexes from rrdhost; and many more fixes * lockless strings design - string_dup() and string_freez() are totally lockless when they dont need to touch Judy - only Judy is protected with a read/write lock * STRING code re-organization for clarity * thread_cache improvements; double numbers precision on worker threads * STRING_ENTRY now shadown STRING, so no duplicate definition is required; string_length() renamed to string_strlen() to follow the paradigm of all other functions, STRING internal statistics are now only compiled with NETDATA_INTERNAL_CHECKS * rrdhost index by hostname now cleans up; aclk queries of archieved hosts do not index hosts * Add index to speed up database context searches * Removed last_updated optimization (was also buggy after latest merge with master) Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Co-authored-by: Vladimir Kobal <vlad@prokk.net>
Diffstat (limited to 'libnetdata/dictionary')
-rw-r--r--libnetdata/dictionary/dictionary.c608
-rw-r--r--libnetdata/dictionary/dictionary.h33
2 files changed, 438 insertions, 203 deletions
diff --git a/libnetdata/dictionary/dictionary.c b/libnetdata/dictionary/dictionary.c
index c1325ecb54..6e37554ea4 100644
--- a/libnetdata/dictionary/dictionary.c
+++ b/libnetdata/dictionary/dictionary.c
@@ -1,28 +1,17 @@
// SPDX-License-Identifier: GPL-3.0-or-later
// NOT TO BE USED BY USERS
-#define DICTIONARY_FLAG_EXCLUSIVE_ACCESS (1 << 29) // there is only one thread accessing the dictionary
-#define DICTIONARY_FLAG_DESTROYED (1 << 30) // this dictionary has been destroyed
-#define DICTIONARY_FLAG_DEFER_ALL_DELETIONS (1 << 31) // defer all deletions of items in the dictionary
+#define DICTIONARY_FLAG_EXCLUSIVE_ACCESS (1 << 28) // there is only one thread accessing the dictionary
+#define DICTIONARY_FLAG_DESTROYED (1 << 29) // this dictionary has been destroyed
+#define DICTIONARY_FLAG_DEFER_ALL_DELETIONS (1 << 30) // defer all deletions of items in the dictionary
// our reserved flags that cannot be set by users
#define DICTIONARY_FLAGS_RESERVED (DICTIONARY_FLAG_EXCLUSIVE_ACCESS|DICTIONARY_FLAG_DESTROYED|DICTIONARY_FLAG_DEFER_ALL_DELETIONS)
-typedef struct dictionary DICTIONARY;
#define DICTIONARY_INTERNALS
#include "../libnetdata.h"
-
-#ifndef ENABLE_DBENGINE
-#define DICTIONARY_WITH_AVL
-#warning Compiling DICTIONARY with an AVL index
-#else
-#define DICTIONARY_WITH_JUDYHS
-#endif
-
-#ifdef DICTIONARY_WITH_JUDYHS
#include <Judy.h>
-#endif
typedef enum name_value_flags {
NAME_VALUE_FLAG_NONE = 0,
@@ -38,10 +27,6 @@ typedef enum name_value_flags {
*/
typedef struct name_value {
-#ifdef DICTIONARY_WITH_AVL
- avl_t avl_node;
-#endif
-
#ifdef NETDATA_INTERNAL_CHECKS
DICTIONARY *dict;
#endif
@@ -72,15 +57,7 @@ struct dictionary {
NAME_VALUE *first_item; // the double linked list base pointers
NAME_VALUE *last_item;
-#ifdef DICTIONARY_WITH_AVL
- avl_tree_type values_index;
- NAME_VALUE *hash_base;
- void *(*get_thread_static_name_value)(const char *name);
-#endif
-
-#ifdef DICTIONARY_WITH_JUDYHS
Pvoid_t JudyHSArray; // the hash table
-#endif
netdata_rwlock_t rwlock; // the r/w lock when DICTIONARY_FLAG_SINGLE_THREADED is not set
@@ -315,9 +292,9 @@ static inline size_t dictionary_lock_free(DICTIONARY *dict) {
}
static void dictionary_lock(DICTIONARY *dict, char rw) {
- if(rw == 'u' || rw == 'U') return;
+ if(rw == DICTIONARY_LOCK_NONE || rw == 'U') return;
- if(rw == 'r' || rw == 'R') {
+ if(rw == DICTIONARY_LOCK_READ || rw == DICTIONARY_LOCK_REENTRANT || rw == 'R') {
// read lock
__atomic_add_fetch(&dict->readers, 1, __ATOMIC_RELAXED);
}
@@ -329,7 +306,7 @@ static void dictionary_lock(DICTIONARY *dict, char rw) {
if(likely(dict->flags & DICTIONARY_FLAG_SINGLE_THREADED))
return;
- if(rw == 'r' || rw == 'R') {
+ if(rw == DICTIONARY_LOCK_READ || rw == DICTIONARY_LOCK_REENTRANT || rw == 'R') {
// read lock
netdata_rwlock_rdlock(&dict->rwlock);
@@ -347,9 +324,9 @@ static void dictionary_lock(DICTIONARY *dict, char rw) {
}
static void dictionary_unlock(DICTIONARY *dict, char rw) {
- if(rw == 'u' || rw == 'U') return;
+ if(rw == DICTIONARY_LOCK_NONE || rw == 'U') return;
- if(rw == 'r' || rw == 'R') {
+ if(rw == DICTIONARY_LOCK_READ || rw == DICTIONARY_LOCK_REENTRANT || rw == 'R') {
// read unlock
__atomic_sub_fetch(&dict->readers, 1, __ATOMIC_RELAXED);
}
@@ -480,68 +457,6 @@ static uint32_t reference_counter_release(DICTIONARY *dict, NAME_VALUE *nv, bool
// ----------------------------------------------------------------------------
// hash table
-#ifdef DICTIONARY_WITH_AVL
-static inline const char *namevalue_get_name(NAME_VALUE *nv);
-
-static int name_value_compare(void* a, void* b) {
- return strcmp(namevalue_get_name((NAME_VALUE *)a), namevalue_get_name((NAME_VALUE *)b));
-}
-
-static void *get_thread_static_name_value(const char *name) {
- static __thread NAME_VALUE tmp = { 0 };
- tmp.flags = NAME_VALUE_FLAG_NONE;
- tmp.caller_name = (char *)name;
- return &tmp;
-}
-
-static void hashtable_init_unsafe(DICTIONARY *dict) {
- avl_init(&dict->values_index, name_value_compare);
- dict->get_thread_static_name_value = get_thread_static_name_value;
-}
-
-static size_t hashtable_destroy_unsafe(DICTIONARY *dict) {
- (void)dict;
- return 0;
-}
-
-static inline int hashtable_delete_unsafe(DICTIONARY *dict, const char *name, size_t name_len, void *nv) {
- (void)name;
- (void)name_len;
-
- if(unlikely(avl_remove(&(dict->values_index), (avl_t *)(nv)) != (avl_t *)nv))
- return 0;
-
- return 1;
-}
-
-static inline NAME_VALUE *hashtable_get_unsafe(DICTIONARY *dict, const char *name, size_t name_len) {
- (void)name_len;
-
- void *tmp = dict->get_thread_static_name_value(name);
- return (NAME_VALUE *)avl_search(&(dict->values_index), (avl_t *)tmp);
-}
-
-static inline NAME_VALUE **hashtable_insert_unsafe(DICTIONARY *dict, const char *name, size_t name_len) {
- // AVL needs a NAME_VALUE to insert into the dictionary but we don't have it yet.
- // So, the only thing we can do, is return an existing one if it is already there.
- // Returning NULL will make the caller thing we added it, will allocate one
- // and will call hashtable_inserted_name_value_unsafe(), at which we will do
- // the actual indexing.
-
- dict->hash_base = hashtable_get_unsafe(dict, name, name_len);
- return &dict->hash_base;
-}
-
-static inline void hashtable_inserted_name_value_unsafe(DICTIONARY *dict, void *nv) {
- // we have our new NAME_VALUE object.
- // Let's index it.
-
- if(unlikely(avl_insert(&((dict)->values_index), (avl_t *)(nv)) != (avl_t *)nv))
- error("dictionary: INTERNAL ERROR: duplicate insertion to dictionary.");
-}
-#endif
-
-#ifdef DICTIONARY_WITH_JUDYHS
static void hashtable_init_unsafe(DICTIONARY *dict) {
dict->JudyHSArray = NULL;
}
@@ -562,7 +477,7 @@ static size_t hashtable_destroy_unsafe(DICTIONARY *dict) {
return (size_t)ret;
}
-static inline NAME_VALUE **hashtable_insert_unsafe(DICTIONARY *dict, const char *name, size_t name_len) {
+static inline void **hashtable_insert_unsafe(DICTIONARY *dict, const char *name, size_t name_len) {
internal_error(!(dict->flags & DICTIONARY_FLAG_EXCLUSIVE_ACCESS), "DICTIONARY: inserting item from the index without exclusive access to the dictionary created by %s() (%zu@%s)", dict->creation_function, dict->creation_line, dict->creation_file);
JError_t J_Error;
@@ -579,7 +494,7 @@ static inline NAME_VALUE **hashtable_insert_unsafe(DICTIONARY *dict, const char
// put anything needed at the value of the index.
// The pointer to pointer we return has to be used before
// any other operation that may change the index (insert/delete).
- return (NAME_VALUE **)Rc;
+ return Rc;
}
static inline int hashtable_delete_unsafe(DICTIONARY *dict, const char *name, size_t name_len, void *nv) {
@@ -632,8 +547,6 @@ static inline void hashtable_inserted_name_value_unsafe(DICTIONARY *dict, void *
;
}
-#endif // DICTIONARY_WITH_JUDYHS
-
// ----------------------------------------------------------------------------
// linked list management
@@ -979,7 +892,7 @@ static NAME_VALUE *dictionary_set_name_value_unsafe(DICTIONARY *dict, const char
// But the caller has the option to do this on his/her own.
// So, let's do the fastest here and let the caller decide the flow of calls.
- NAME_VALUE *nv, **pnv = hashtable_insert_unsafe(dict, name, name_len);
+ NAME_VALUE *nv, **pnv = (NAME_VALUE **)hashtable_insert_unsafe(dict, name, name_len);
if(likely(*pnv == 0)) {
// a new item added to the index
nv = *pnv = namevalue_create_unsafe(dict, name, name_len, value, value_len);
@@ -1280,6 +1193,9 @@ void *dictionary_foreach_start_rw(DICTFE *dfe, DICTIONARY *dict, char rw) {
dfe->value = NULL;
}
+ if(unlikely(dfe->rw == DICTIONARY_LOCK_REENTRANT))
+ dictionary_unlock(dfe->dict, dfe->rw);
+
return dfe->value;
}
@@ -1294,6 +1210,9 @@ void *dictionary_foreach_next(DICTFE *dfe) {
return NULL;
}
+ if(unlikely(dfe->rw == DICTIONARY_LOCK_REENTRANT))
+ dictionary_lock(dfe->dict, dfe->rw);
+
// the item we just did
NAME_VALUE *nv = (NAME_VALUE *)dfe->last_item;
@@ -1320,6 +1239,9 @@ void *dictionary_foreach_next(DICTFE *dfe) {
dfe->value = NULL;
}
+ if(unlikely(dfe->rw == DICTIONARY_LOCK_REENTRANT))
+ dictionary_unlock(dfe->dict, dfe->rw);
+
return dfe->value;
}
@@ -1338,7 +1260,9 @@ usec_t dictionary_foreach_done(DICTFE *dfe) {
if(likely(nv))
reference_counter_release(dfe->dict, nv, false);
- dictionary_unlock(dfe->dict, dfe->rw);
+ if(likely(dfe->rw != DICTIONARY_LOCK_REENTRANT))
+ dictionary_unlock(dfe->dict, dfe->rw);
+
dfe->dict = NULL;
dfe->last_item = NULL;
dfe->name = NULL;
@@ -1383,8 +1307,14 @@ int dictionary_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)(const c
// while we are using it
reference_counter_acquire(dict, nv);
+ if(unlikely(rw == DICTIONARY_LOCK_REENTRANT))
+ dictionary_unlock(dict, rw);
+
int r = callback(namevalue_get_name(nv), nv->value, data);
+ if(unlikely(rw == DICTIONARY_LOCK_REENTRANT))
+ dictionary_lock(dict, rw);
+
// since we have a reference counter, this item cannot be deleted
// until we release the reference counter, so the pointers are there
nv_next = nv->next;
@@ -1449,7 +1379,15 @@ int dictionary_sorted_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)(
nv = array[i];
if(likely(!(nv->flags & NAME_VALUE_FLAG_DELETED))) {
reference_counter_acquire(dict, nv);
+
+ if(unlikely(rw == DICTIONARY_LOCK_REENTRANT))
+ dictionary_unlock(dict, rw);
+
int r = callback(namevalue_get_name(nv), nv->value, data);
+
+ if(unlikely(rw == DICTIONARY_LOCK_REENTRANT))
+ dictionary_lock(dict, rw);
+
reference_counter_release(dict, nv, false);
if (r < 0) {
ret = r;
@@ -1469,132 +1407,293 @@ int dictionary_sorted_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)(
// ----------------------------------------------------------------------------
// STRING implementation - dedup all STRINGs
-typedef struct string_entry {
-#ifdef DICTIONARY_WITH_AVL
- avl_t avl_node;
+struct netdata_string {
+ uint32_t length; // the string length including the terminating '\0'
+
+ int32_t refcount; // how many times this string is used
+ // We use a signed number to be able to detect duplicate frees of a string.
+ // If at any point this goes below zero, we have a duplicate free.
+
+ const char str[]; // the string itself, is appended to this structure
+};
+
+static struct string_hashtable {
+ Pvoid_t JudyHSArray; // the Judy array - hashtable
+ netdata_rwlock_t rwlock; // the R/W lock to protect the Judy array
+
+ long int entries; // the number of entries in the index
+ long int active_references; // the number of active references alive
+ long int memory; // the memory used, without the JudyHS index
+
+ size_t inserts; // the number of successful inserts to the index
+ size_t deletes; // the number of successful deleted from the index
+ size_t searches; // the number of successful searches in the index
+ size_t duplications; // when a string is referenced
+ size_t releases; // when a string is unreferenced
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ // internal statistics
+ size_t found_deleted_on_search;
+ size_t found_available_on_search;
+ size_t found_deleted_on_insert;
+ size_t found_available_on_insert;
+ size_t spins;
+#endif
+
+} string_base = {
+ .JudyHSArray = NULL,
+ .rwlock = NETDATA_RWLOCK_INITIALIZER,
+};
+
+#ifdef NETDATA_INTERNAL_CHECKS
+#define string_internal_stats_add(var, val) __atomic_add_fetch(&string_base.var, val, __ATOMIC_RELAXED)
+#else
+#define string_internal_stats_add(var, val) do {;} while(0)
#endif
- uint32_t length; // the string length with the terminating '\0'
- uint32_t refcount; // how many times this string is used
- const char str[]; // the string itself
-} STRING_ENTRY;
-#ifdef DICTIONARY_WITH_AVL
-static int string_entry_compare(void* a, void* b) {
- return strcmp(((STRING_ENTRY *)a)->str, ((STRING_ENTRY *)b)->str);
+#define string_stats_atomic_increment(var) __atomic_add_fetch(&string_base.var, 1, __ATOMIC_RELAXED)
+#define string_stats_atomic_decrement(var) __atomic_sub_fetch(&string_base.var, 1, __ATOMIC_RELAXED)
+
+void string_statistics(size_t *inserts, size_t *deletes, size_t *searches, size_t *entries, size_t *references, size_t *memory, size_t *duplications, size_t *releases) {
+ *inserts = string_base.inserts;
+ *deletes = string_base.deletes;
+ *searches = string_base.searches;
+ *entries = (size_t)string_base.entries;
+ *references = (size_t)string_base.active_references;
+ *memory = (size_t)string_base.memory;
+ *duplications = string_base.duplications;
+ *releases = string_base.releases;
}
-static void *get_thread_static_string_entry(const char *name) {
- static __thread size_t _length = 0;
- static __thread STRING_ENTRY *_tmp = NULL;
+#define string_entry_acquire(se) __atomic_add_fetch(&((se)->refcount), 1, __ATOMIC_SEQ_CST);
+#define string_entry_release(se) __atomic_sub_fetch(&((se)->refcount), 1, __ATOMIC_SEQ_CST);
- size_t size = sizeof(STRING_ENTRY) + strlen(name) + 1;
- if(likely(_tmp && _length < size)) {
- freez(_tmp);
- _tmp = NULL;
- _length = 0;
- }
+static inline bool string_entry_check_and_acquire(STRING *se) {
+ int32_t expected, desired, count = 0;
+ do {
+ count++;
+
+ expected = se->refcount;
+
+ if(expected <= 0) {
+ // We cannot use this.
+ // The reference counter reached value zero,
+ // so another thread is deleting this.
+ string_internal_stats_add(spins, count - 1);
+ return false;
+ }
- if(unlikely(!_tmp)) {
- _tmp = callocz(1, size);
- _length = size;
+ desired = expected + 1;
}
+ while(!__atomic_compare_exchange_n(&se->refcount, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
+
+ string_internal_stats_add(spins, count - 1);
+
+ // statistics
+ // string_base.active_references is altered at the in string_strdupz() and string_freez()
+ string_stats_atomic_increment(duplications);
- strcpy((char *)&_tmp->str[0], name);
- return _tmp;
+ return true;
}
-#endif
-DICTIONARY string_dictionary = {
-#ifdef DICTIONARY_WITH_AVL
- .values_index = {
- .root = NULL,
- .compar = string_entry_compare
- },
- .get_thread_static_name_value = get_thread_static_string_entry,
+STRING *string_dup(STRING *string) {
+ if(unlikely(!string)) return NULL;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(unlikely(__atomic_load_n(&string->refcount, __ATOMIC_SEQ_CST) <= 0))
+ fatal("STRING: tried to %s() a string that is freed (it has %d references).", __FUNCTION__, string->refcount);
#endif
- .flags = DICTIONARY_FLAG_EXCLUSIVE_ACCESS,
- .rwlock = NETDATA_RWLOCK_INITIALIZER
-};
+ string_entry_acquire(string);
-static netdata_mutex_t string_mutex = NETDATA_MUTEX_INITIALIZER;
+ // statistics
+ string_stats_atomic_increment(active_references);
+ string_stats_atomic_increment(duplications);
-STRING *string_dup(STRING *string) {
- if(unlikely(!string)) return NULL;
+ return string;
+}
+
+// Search the index and return an ACQUIRED string entry, or NULL
+static inline STRING *string_index_search(const char *str, size_t length) {
+ if(unlikely(!string_base.JudyHSArray))
+ return NULL;
+
+ STRING *string;
+
+ // Find the string in the index
+ // With a read-lock so that multiple readers can use the index concurrently.
+
+ netdata_rwlock_rdlock(&string_base.rwlock);
+
+ Pvoid_t *Rc;
+ Rc = JudyHSGet(string_base.JudyHSArray, (void *)str, length);
+ if(likely(Rc)) {
+ // found in the hash table
+ string = *Rc;
+
+ if(string_entry_check_and_acquire(string)) {
+ // we can use this entry
+ string_internal_stats_add(found_available_on_search, 1);
+ }
+ else {
+ // this entry is about to be deleted by another thread
+ // do not touch it, let it go...
+ string = NULL;
+ string_internal_stats_add(found_deleted_on_search, 1);
+ }
+ }
+ else {
+ // not found in the hash table
+ string = NULL;
+ }
+
+ string_stats_atomic_increment(searches);
+ netdata_rwlock_unlock(&string_base.rwlock);
- STRING_ENTRY *se = (STRING_ENTRY *)string;
- netdata_mutex_lock(&string_mutex);
- se->refcount++;
- netdata_mutex_unlock(&string_mutex);
return string;
}
-STRING *string_strdupz(const char *str) {
- if(unlikely(!str || !*str)) return NULL;
+// Insert a string to the index and return an ACQUIRED string entry,
+// or NULL if the call needs to be retried (a deleted entry with the same key is still in the index)
+// The returned entry is ACQUIRED and it can either be:
+// 1. a new item inserted, or
+// 2. an item found in the index that is not currently deleted
+static inline STRING *string_index_insert(const char *str, size_t length) {
+ STRING *string;
- netdata_mutex_lock(&string_mutex);
+ netdata_rwlock_wrlock(&string_base.rwlock);
- size_t length = strlen(str) + 1;
- STRING_ENTRY *se;
- STRING_ENTRY **ptr = (STRING_ENTRY **)hashtable_insert_unsafe(&string_dictionary, str, length);
- if(unlikely(*ptr == 0)) {
+ STRING **ptr;
+ {
+ JError_t J_Error;
+ Pvoid_t *Rc = JudyHSIns(&string_base.JudyHSArray, (void *)str, length, &J_Error);
+ if (unlikely(Rc == PJERR)) {
+ fatal(
+ "STRING: Cannot insert entry with name '%s' to JudyHS, JU_ERRNO_* == %u, ID == %d",
+ str,
+ JU_ERRNO(&J_Error),
+ JU_ERRID(&J_Error));
+ }
+ ptr = (STRING **)Rc;
+ }
+
+ if (likely(*ptr == 0)) {
// a new item added to the index
- size_t mem_size = sizeof(STRING_ENTRY) + length;
- se = mallocz(mem_size);
- strcpy((char *)se->str, str);
- se->length = length;
- se->refcount = 1;
- *ptr = se;
- hashtable_inserted_name_value_unsafe(&string_dictionary, se);
- string_dictionary.version++;
- string_dictionary.inserts++;
- string_dictionary.entries++;
- string_dictionary.memory += (long)mem_size;
+ size_t mem_size = sizeof(STRING) + length;
+ string = mallocz(mem_size);
+ strcpy((char *)string->str, str);
+ string->length = length;
+ string->refcount = 1;
+ *ptr = string;
+ string_base.inserts++;
+ string_base.entries++;
+ string_base.memory += (long)mem_size;
}
else {
// the item is already in the index
- se = *ptr;
- se->refcount++;
- string_dictionary.searches++;
+ string = *ptr;
+
+ if(string_entry_check_and_acquire(string)) {
+ // we can use this entry
+ string_internal_stats_add(found_available_on_insert, 1);
+ }
+ else {
+ // this entry is about to be deleted by another thread
+ // do not touch it, let it go...
+ string = NULL;
+ string_internal_stats_add(found_deleted_on_insert, 1);
+ }
+
+ string_stats_atomic_increment(searches);
}
- netdata_mutex_unlock(&string_mutex);
- return (STRING *)se;
+ netdata_rwlock_unlock(&string_base.rwlock);
+ return string;
}
-void string_freez(STRING *string) {
- if(unlikely(!string)) return;
- netdata_mutex_lock(&string_mutex);
+// delete an entry from the index
+static inline void string_index_delete(STRING *string) {
+ netdata_rwlock_wrlock(&string_base.rwlock);
- STRING_ENTRY *se = (STRING_ENTRY *)string;
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(unlikely(__atomic_load_n(&string->refcount, __ATOMIC_SEQ_CST) != 0))
+ fatal("STRING: tried to delete a string at %s() that is already freed (it has %d references).", __FUNCTION__, string->refcount);
+#endif
- if(se->refcount == 0)
- fatal("STRING: tried to free string that has zero references.");
+ bool deleted = false;
- se->refcount--;
- if(unlikely(se->refcount == 0)) {
- if(hashtable_delete_unsafe(&string_dictionary, se->str, se->length, se) == 0)
- error("STRING: INTERNAL ERROR: tried to delete '%s' that is not in the index", se->str);
+ if (likely(string_base.JudyHSArray)) {
+ JError_t J_Error;
+ int ret = JudyHSDel(&string_base.JudyHSArray, (void *)string->str, string->length, &J_Error);
+ if (unlikely(ret == JERR)) {
+ error(
+ "STRING: Cannot delete entry with name '%s' from JudyHS, JU_ERRNO_* == %u, ID == %d",
+ string->str,
+ JU_ERRNO(&J_Error),
+ JU_ERRID(&J_Error));
+ } else
+ deleted = true;
+ }
- size_t mem_size = sizeof(STRING_ENTRY) + se->length;
- freez(se);
- string_dictionary.version++;
- string_dictionary.deletes++;
- string_dictionary.entries--;
- string_dictionary.memory -= (long)mem_size;
+ if (unlikely(!deleted))
+ error("STRING: tried to delete '%s' that is not in the index. Ignoring it.", string->str);
+ else {
+ size_t mem_size = sizeof(STRING) + string->length;
+ string_base.deletes++;
+ string_base.entries--;
+ string_base.memory -= (long)mem_size;
+ freez(string);
}
- netdata_mutex_unlock(&string_mutex);
+ netdata_rwlock_unlock(&string_base.rwlock);
}
-size_t string_length(STRING *string) {
+STRING *string_strdupz(const char *str) {
+ if(unlikely(!str || !*str)) return NULL;
+
+ size_t length = strlen(str) + 1;
+ STRING *string = string_index_search(str, length);
+
+ while(!string) {
+ // The search above did not find anything,
+ // We loop here, because during insert we may find an entry that is being deleted by another thread.
+ // So, we have to let it go and retry to insert it again.
+
+ string = string_index_insert(str, length);
+ }
+
+ // statistics
+ string_stats_atomic_increment(active_references);
+
+ return string;
+}
+
+void string_freez(STRING *string) {
+ if(unlikely(!string)) return;
+
+ int32_t refcount = string_entry_release(string);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(unlikely(refcount < 0))
+ fatal("STRING: tried to %s() a string that is already freed (it has %d references).", __FUNCTION__, string->refcount);
+#endif
+
+ if(unlikely(refcount == 0))
+ string_index_delete(string);
+
+ // statistics
+ string_stats_atomic_decrement(active_references);
+ string_stats_atomic_increment(releases);
+}
+
+size_t string_strlen(STRING *string) {
if(unlikely(!string)) return 0;
- return ((STRING_ENTRY *)string)->length - 1;
+ return string->length - 1;
}
const char *string2str(STRING *string) {
if(unlikely(!string)) return "";
- return ((STRING_ENTRY *)string)->str;
+ return string->str;
}
STRING *string_2way_merge(STRING *a, STRING *b) {
@@ -1610,9 +1709,9 @@ STRING *string_2way_merge(STRING *a, STRING *b) {
if(unlikely(!a)) return string_dup(X);
if(unlikely(!b)) return string_dup(X);
- size_t alen = string_length(a);
- size_t blen = string_length(b);
- size_t length = alen + blen + string_length(X) + 1;
+ size_t alen = string_strlen(a);
+ size_t blen = string_strlen(b);
+ size_t length = alen + blen + string_strlen(X) + 1;
char buf1[length + 1], buf2[length + 1], *dst1;
const char *s1, *s2;
@@ -1643,6 +1742,52 @@ STRING *string_2way_merge(STRING *a, STRING *b) {
}
// ----------------------------------------------------------------------------
+// THREAD_CACHE
+
+static __thread Pvoid_t thread_cache_judy_array = NULL;
+
+void *thread_cache_entry_get_or_set(void *key,
+ ssize_t key_length,
+ void *value,
+ void *(*transform_the_value_before_insert)(void *key, size_t key_length, void *value)
+ ) {
+ if(unlikely(!key || !key_length)) return NULL;
+
+ if(key_length == -1)
+ key_length = (ssize_t)strlen((char *)key) + 1;
+
+ JError_t J_Error;
+ Pvoid_t *Rc = JudyHSIns(&thread_cache_judy_array, key, key_length, &J_Error);
+ if (unlikely(Rc == PJERR)) {
+ fatal("THREAD_CACHE: Cannot insert entry to JudyHS, JU_ERRNO_* == %u, ID == %d",
+ JU_ERRNO(&J_Error), JU_ERRID(&J_Error));
+ }
+
+ if(*Rc == 0) {
+ // new item added
+
+ *Rc = (transform_the_value_before_insert) ? transform_the_value_before_insert(key, key_length, value) : value;
+ }
+
+ return *Rc;
+}
+
+void thread_cache_destroy(void) {
+ if(unlikely(!thread_cache_judy_array)) return;
+
+ JError_t J_Error;
+ Word_t ret = JudyHSFreeArray(&thread_cache_judy_array, &J_Error);
+ if(unlikely(ret == (Word_t) JERR)) {
+ error("THREAD_CACHE: Cannot destroy JudyHS, JU_ERRNO_* == %u, ID == %d",
+ JU_ERRNO(&J_Error), JU_ERRID(&J_Error));
+ }
+
+ internal_error(true, "THREAD_CACHE: hash table freed %lu bytes", ret);
+
+ thread_cache_judy_array = NULL;
+}
+
+// ----------------------------------------------------------------------------
// unit test
static void dictionary_unittest_free_char_pp(char **pp, size_t entries) {
@@ -2153,6 +2298,27 @@ static size_t check_name_value_deleted_flag(DICTIONARY *dict, NAME_VALUE *nv, co
return errors;
}
+static int string_threads_join = 0;
+static void *string_thread(void *arg __maybe_unused) {
+ int dups = 1; //(gettid() % 10);
+ for(; 1 ;) {
+ if(string_threads_join)
+ break;
+
+ STRING *s = string_strdupz("string thread checking 1234567890");
+
+ for(int i = 0; i < dups ; i++)
+ string_dup(s);
+
+ for(int i = 0; i < dups ; i++)
+ string_freez(s);
+
+ string_freez(s);
+ }
+
+ return arg;
+}
+
int dictionary_unittest(size_t entries) {
if(entries < 10) entries = 10;
@@ -2291,7 +2457,7 @@ int dictionary_unittest(size_t entries) {
// check string
{
- long string_entries_starting = dictionary_stats_entries(&string_dictionary);
+ long int string_entries_starting = string_base.entries;
fprintf(stderr, "\nChecking strings...\n");
@@ -2312,8 +2478,7 @@ int dictionary_unittest(size_t entries) {
else
fprintf(stderr, "OK: cloning string are deduplicated\n");
- STRING_ENTRY *se = (STRING_ENTRY *)s1;
- if(se->refcount != 3) {
+ if(s1->refcount != 3) {
errors++;
fprintf(stderr, "ERROR: string refcount is not 3\n");
}
@@ -2355,9 +2520,9 @@ int dictionary_unittest(size_t entries) {
freez(strings);
- if(dictionary_stats_entries(&string_dictionary) != string_entries_starting + 2) {
+ if(string_base.entries != string_entries_starting + 2) {
errors++;
- fprintf(stderr, "ERROR: strings dictionary should have %ld items but it has %ld\n", string_entries_starting + 2, dictionary_stats_entries(&string_dictionary));
+ fprintf(stderr, "ERROR: strings dictionary should have %ld items but it has %ld\n", string_entries_starting + 2, string_base.entries);
}
else
fprintf(stderr, "OK: strings dictionary has 2 items\n");
@@ -2405,6 +2570,65 @@ int dictionary_unittest(size_t entries) {
dictionary_unittest_free_char_pp(names, entries);
dictionary_unittest_free_char_pp(values, entries);
+ {
+#ifdef NETDATA_INTERNAL_CHECKS
+ size_t ofound_deleted_on_search = string_base.found_deleted_on_search,
+ ofound_available_on_search = string_base.found_available_on_search,
+ ofound_deleted_on_insert = string_base.found_deleted_on_insert,
+ ofound_available_on_insert = string_base.found_available_on_insert,
+ ospins = string_base.spins;
+#endif
+
+ size_t oinserts, odeletes, osearches, oentries, oreferences, omemory, oduplications, oreleases;
+ string_statistics(&oinserts, &odeletes, &osearches, &oentries, &oreferences, &omemory, &oduplications, &oreleases);
+
+ time_t seconds_to_run = 5;
+ int threads_to_create = 2;
+ fprintf(
+ stderr,
+ "Checking string concurrency with %d threads for %ld seconds...\n",
+ threads_to_create,
+ seconds_to_run);
+ // check string concurrency
+ netdata_thread_t threads[threads_to_create];
+ string_threads_join = 0;
+ for (int i = 0; i < threads_to_create; i++) {
+ char buf[100 + 1];
+ snprintf(buf, 100, "string%d", i);
+ netdata_thread_create(
+ &threads[i], buf, NETDATA_THREAD_OPTION_DONT_LOG | NETDATA_THREAD_OPTION_JOINABLE, string_thread, NULL);
+ }
+ sleep_usec(seconds_to_run * USEC_PER_SEC);
+
+ string_threads_join = 1;
+ for (int i = 0; i < threads_to_create; i++) {
+ void *retval;
+ netdata_thread_join(threads[i], &retval);
+ }
+
+ size_t inserts, deletes, searches, sentries, references, memory, duplications, releases;
+ string_statistics(&inserts, &deletes, &searches, &sentries, &references, &memory, &duplications, &releases);
+
+ fprintf(stderr, "inserts %zu, deletes %zu, searches %zu, entries %zu, references %zu, memory %zu, duplications %zu, releases %zu\n",
+ inserts - oinserts, deletes - odeletes, searches - osearches, sentries - oentries, references - oreferences, memory - omemory, duplications - oduplications, releases - oreleases);
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ size_t found_deleted_on_search = string_base.found_deleted_on_search,
+ found_available_on_search = string_base.found_available_on_search,
+ found_deleted_on_insert = string_base.found_deleted_on_insert,
+ found_available_on_insert = string_base.found_available_on_insert,
+ spins = string_base.spins;
+
+ fprintf(stderr, "on insert: %zu ok + %zu deleted\non search: %zu ok + %zu deleted\nspins: %zu\n",
+ found_available_on_insert - ofound_available_on_insert,
+ found_deleted_on_insert - ofound_deleted_on_insert,
+ found_available_on_search - ofound_available_on_search,
+ found_deleted_on_search - ofound_deleted_on_search,
+ spins - ospins
+ );
+#endif
+ }