diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-02-09 20:27:05 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-02-09 20:27:05 +0200 |
commit | 414f416c5d290db3c3eed9073258c834fac7f2f7 (patch) | |
tree | aeed0a2619a1f42c60ec816311d1dd10c8e5ccc1 /libnetdata | |
parent | 8043106b60ec3d8d32b3a9ff3ef53991e73d9037 (diff) |
Virtual hosts for data collection (#14464)
* support multiple hosts at pluginsd structures
* cleanup obsolete code
* use a lookup hashtable to quickly find the keyword to execute, without traversing the whole linked list of keywords
* more cleanup
* move new hash function to inlined.h
* minimize comparisons, eliminate a pre-parsing of the first keyword for each line
* cleanup parser from old code
* move parser into libnetdata
* unique entries in parser keywords hashtable
* move all hashing functions to inlined.h, name their sources, simple_hash() now defaults to FNV1a, it was FNV1
* small_hash() for parser
* plugins.d now can switch hosts, and also create/update them
* update hash function and hashtable size
* updated message
* unittest all hashing functions
* reset the chart when setting a new host
* remove host tags
* enable archived hosts when a collector pushes host info
* do not need localhost to swtich to localhost
* disable ARAL and OWA with -DFSANITIZE_ADDRESS=1
Diffstat (limited to 'libnetdata')
-rw-r--r-- | libnetdata/Makefile.am | 1 | ||||
-rw-r--r-- | libnetdata/aral/aral.c | 8 | ||||
-rw-r--r-- | libnetdata/inlined.h | 113 | ||||
-rw-r--r-- | libnetdata/libnetdata.c | 29 | ||||
-rw-r--r-- | libnetdata/libnetdata.h | 57 | ||||
-rw-r--r-- | libnetdata/onewayalloc/onewayalloc.c | 9 | ||||
-rw-r--r-- | libnetdata/parser/Makefile.am | 9 | ||||
-rw-r--r-- | libnetdata/parser/README.md | 156 | ||||
-rw-r--r-- | libnetdata/parser/parser.c | 225 | ||||
-rw-r--r-- | libnetdata/parser/parser.h | 101 | ||||
-rw-r--r-- | libnetdata/string/string.c | 8 | ||||
-rw-r--r-- | libnetdata/string/string.h | 1 |
12 files changed, 633 insertions, 84 deletions
diff --git a/libnetdata/Makefile.am b/libnetdata/Makefile.am index b81d620ba6..4bf7791368 100644 --- a/libnetdata/Makefile.am +++ b/libnetdata/Makefile.am @@ -20,6 +20,7 @@ SUBDIRS = \ locks \ log \ onewayalloc \ + parser \ popen \ procfile \ simple_pattern \ diff --git a/libnetdata/aral/aral.c b/libnetdata/aral/aral.c index 4505ee0f28..2a4b0687dd 100644 --- a/libnetdata/aral/aral.c +++ b/libnetdata/aral/aral.c @@ -465,6 +465,9 @@ static inline ARAL_PAGE *aral_acquire_a_free_slot(ARAL *ar TRACE_ALLOCATIONS_FUN } void *aral_mallocz_internal(ARAL *ar TRACE_ALLOCATIONS_FUNCTION_DEFINITION_PARAMS) { +#ifdef FSANITIZE_ADDRESS + return mallocz(ar->config.requested_element_size); +#endif ARAL_PAGE *page = aral_acquire_a_free_slot(ar TRACE_ALLOCATIONS_FUNCTION_CALL_PARAMS); @@ -614,6 +617,11 @@ static inline void aral_move_page_with_free_list___aral_lock_needed(ARAL *ar, AR } void aral_freez_internal(ARAL *ar, void *ptr TRACE_ALLOCATIONS_FUNCTION_DEFINITION_PARAMS) { +#ifdef FSANITIZE_ADDRESS + freez(ptr); + return; +#endif + if(unlikely(!ptr)) return; // get the page pointer diff --git a/libnetdata/inlined.h b/libnetdata/inlined.h index 94410cee44..730cd079b7 100644 --- a/libnetdata/inlined.h +++ b/libnetdata/inlined.h @@ -21,25 +21,118 @@ typedef uint64_t kernel_uint_t; // for faster execution, allow the compiler to inline // these functions that are called thousands of times per second -static inline uint32_t simple_hash(const char *name) { +static inline uint32_t djb2_hash32(const char* name) { unsigned char *s = (unsigned char *) name; - uint32_t hval = 0x811c9dc5; + uint32_t hash = 5381; + while (*s) + hash = ((hash << 5) + hash) + (uint32_t) *s++; // hash * 33 + char + return hash; +} + +static inline uint32_t pluginsd_parser_hash32(const char *name) { + unsigned char *s = (unsigned char *) name; + uint32_t hash = 0; while (*s) { - hval *= 16777619; - hval ^= (uint32_t) *s++; + hash <<= 5; + hash += *s++ - ' '; } - return hval; + return hash; } -static inline uint32_t simple_uhash(const char *name) { +// https://stackoverflow.com/a/107657 +static inline uint32_t larson_hash32(const char *name) { unsigned char *s = (unsigned char *) name; - uint32_t hval = 0x811c9dc5, c; + uint32_t hash = 0; + while (*s) + hash = hash * 101 + (uint32_t) *s++; + return hash; +} + +// http://isthe.com/chongo/tech/comp/fnv/ +static inline uint32_t fnv1_hash32(const char *name) { + unsigned char *s = (unsigned char *) name; + uint32_t hash = 0x811c9dc5; + while (*s) { + hash *= 0x01000193; // 16777619 + hash ^= (uint32_t) *s++; + } + return hash; +} + +// http://isthe.com/chongo/tech/comp/fnv/ +static inline uint32_t fnv1a_hash32(const char *name) { + unsigned char *s = (unsigned char *) name; + uint32_t hash = 0x811c9dc5; + while (*s) { + hash ^= (uint32_t) *s++; + hash *= 0x01000193; // 16777619 + } + return hash; +} + +static inline uint32_t fnv1a_uhash32(const char *name) { + unsigned char *s = (unsigned char *) name; + uint32_t hash = 0x811c9dc5, c; while ((c = *s++)) { if (unlikely(c >= 'A' && c <= 'Z')) c += 'a' - 'A'; - hval *= 16777619; - hval ^= c; + hash ^= c; + hash *= 0x01000193; // 16777619 } - return hval; + return hash; +} + +#define simple_hash(s) fnv1a_hash32(s) +#define simple_uhash(s) fnv1a_uhash32(s) + +static inline size_t indexing_partition_old(Word_t ptr, Word_t modulo) { + size_t total = 0; + + total += (ptr & 0xff) >> 0; + total += (ptr & 0xff00) >> 8; + total += (ptr & 0xff0000) >> 16; + total += (ptr & 0xff000000) >> 24; + + if(sizeof(Word_t) > 4) { + total += (ptr & 0xff00000000) >> 32; + total += (ptr & 0xff0000000000) >> 40; + total += (ptr & 0xff000000000000) >> 48; + total += (ptr & 0xff00000000000000) >> 56; + } + + return (total % modulo); +} + +static uint32_t murmur32(uint32_t k) __attribute__((const)); +static inline uint32_t murmur32(uint32_t k) { + k ^= k >> 16; + k *= 0x85ebca6b; + k ^= k >> 13; + k *= 0xc2b2ae35; + k ^= k >> 16; + + return k; +} + +static uint64_t murmur64(uint64_t k) __attribute__((const)); +static inline uint64_t murmur64(uint64_t k) { + k ^= k >> 33; + k *= 0xff51afd7ed558ccdUL; + k ^= k >> 33; + k *= 0xc4ceb9fe1a85ec53UL; + k ^= k >> 33; + + return k; +} + +static inline size_t indexing_partition(Word_t ptr, Word_t modulo) __attribute__((const)); +static inline size_t indexing_partition(Word_t ptr, Word_t modulo) { +#ifdef ENV64BIT + uint64_t hash = murmur64(ptr); + return hash % modulo; +#else + uint32_t hash = murmur32(ptr); + return hash % modulo; +#endif } static inline int str2i(const char *s) { diff --git a/libnetdata/libnetdata.c b/libnetdata/libnetdata.c index 5f5c710785..66d41bd546 100644 --- a/libnetdata/libnetdata.c +++ b/libnetdata/libnetdata.c @@ -1883,17 +1883,20 @@ inline int config_isspace(char c) } // split a text into words, respecting quotes -inline size_t quoted_strings_splitter(char *str, char **words, size_t max_words, int (*custom_isspace)(char), char *recover_input, char **recover_location, int max_recover) +inline size_t quoted_strings_splitter(char *str, char **words, size_t max_words, int (*custom_isspace)(char)) { char *s = str, quote = 0; size_t i = 0; - int rec = 0; - char *recover = recover_input; // skip all white space while (unlikely(custom_isspace(*s))) s++; + if(unlikely(!*s)) { + words[i] = NULL; + return 0; + } + // check for quote if (unlikely(*s == '\'' || *s == '"')) { quote = *s; // remember the quote @@ -1905,19 +1908,15 @@ inline size_t quoted_strings_splitter(char *str, char **words, size_t max_words, // while we have something while (likely(*s)) { - // if it is escape + // if it is an escape if (unlikely(*s == '\\' && s[1])) { s += 2; continue; } - // if it is quote + // if it is a quote else if (unlikely(*s == quote)) { quote = 0; - if (recover && rec < max_recover) { - recover_location[rec++] = s; - *recover++ = *s; - } *s = ' '; continue; } @@ -1925,19 +1924,13 @@ inline size_t quoted_strings_splitter(char *str, char **words, size_t max_words, // if it is a space else if (unlikely(quote == 0 && custom_isspace(*s))) { // terminate the word - if (recover && rec < max_recover) { - if (!rec || recover_location[rec-1] != s) { - recover_location[rec++] = s; - *recover++ = *s; - } - } *s++ = '\0'; // skip all white space while (likely(custom_isspace(*s))) s++; - // check for quote + // check for a quote if (unlikely(*s == '\'' || *s == '"')) { quote = *s; // remember the quote s++; // skip the quote @@ -1965,9 +1958,9 @@ inline size_t quoted_strings_splitter(char *str, char **words, size_t max_words, return i; } -inline size_t pluginsd_split_words(char *str, char **words, size_t max_words, char *recover_input, char **recover_location, int max_recover) +inline size_t pluginsd_split_words(char *str, char **words, size_t max_words) { - return quoted_strings_splitter(str, words, max_words, pluginsd_space, recover_input, recover_location, max_recover); + return quoted_strings_splitter(str, words, max_words, pluginsd_space); } bool bitmap256_get_bit(BITMAP256 *ptr, uint8_t idx) { diff --git a/libnetdata/libnetdata.h b/libnetdata/libnetdata.h index 8d50a55c57..65f1b5185a 100644 --- a/libnetdata/libnetdata.h +++ b/libnetdata/libnetdata.h @@ -485,8 +485,8 @@ void bitmap256_set_bit(BITMAP256 *ptr, uint8_t idx, bool value); int config_isspace(char c); int pluginsd_space(char c); -size_t quoted_strings_splitter(char *str, char **words, size_t max_words, int (*custom_isspace)(char), char *recover_input, char **recover_location, int max_recover); -size_t pluginsd_split_words(char *str, char **words, size_t max_words, char *recover_string, char **recover_location, int max_recover); +size_t quoted_strings_splitter(char *str, char **words, size_t max_words, int (*custom_isspace)(char)); +size_t pluginsd_split_words(char *str, char **words, size_t max_words); static inline char *get_word(char **words, size_t num_words, size_t index) { if (index >= num_words) @@ -547,6 +547,7 @@ extern char *netdata_configured_host_prefix; #include "libnetdata/aral/aral.h" #include "onewayalloc/onewayalloc.h" #include "worker_utilization/worker_utilization.h" +#include "parser/parser.h" // BEWARE: Outside of the C code this also exists in alarm-notify.sh #define DEFAULT_CLOUD_BASE_URL "https://api.netdata.cloud" @@ -609,58 +610,6 @@ static inline PPvoid_t JudyLLastThenPrev(Pcvoid_t PArray, Word_t * PIndex, bool return JudyLPrev(PArray, PIndex, PJE0); } -static inline size_t indexing_partition_old(Word_t ptr, Word_t modulo) { - size_t total = 0; - - total += (ptr & 0xff) >> 0; - total += (ptr & 0xff00) >> 8; - total += (ptr & 0xff0000) >> 16; - total += (ptr & 0xff000000) >> 24; - - if(sizeof(Word_t) > 4) { - total += (ptr & 0xff00000000) >> 32; - total += (ptr & 0xff0000000000) >> 40; - total += (ptr & 0xff000000000000) >> 48; - total += (ptr & 0xff00000000000000) >> 56; - } - - return (total % modulo); -} - -static uint32_t murmur32(uint32_t h) __attribute__((const)); -static inline uint32_t murmur32(uint32_t h) { - h ^= h >> 16; - h *= 0x85ebca6b; - h ^= h >> 13; - h *= 0xc2b2ae35; - h ^= h >> 16; - - return h; -} - -static uint64_t murmur64(uint64_t h) __attribute__((const)); -static inline uint64_t murmur64(uint64_t k) { - k ^= k >> 33; - k *= 0xff51afd7ed558ccdUL; - k ^= k >> 33; - k *= 0xc4ceb9fe1a85ec53UL; - k ^= k >> 33; - - return k; -} - -static inline size_t indexing_partition(Word_t ptr, Word_t modulo) __attribute__((const)); -static inline size_t indexing_partition(Word_t ptr, Word_t modulo) { - if(sizeof(Word_t) == 8) { - uint64_t hash = murmur64(ptr); - return hash % modulo; - } - else { - uint32_t hash = murmur32(ptr); - return hash % modulo; - } -} - typedef enum { TIMING_STEP_INTERNAL = 0, diff --git a/libnetdata/onewayalloc/onewayalloc.c b/libnetdata/onewayalloc/onewayalloc.c index 2f007b1898..489ce73d7e 100644 --- a/libnetdata/onewayalloc/onewayalloc.c +++ b/libnetdata/onewayalloc/onewayalloc.c @@ -97,6 +97,10 @@ ONEWAYALLOC *onewayalloc_create(size_t size_hint) { } void *onewayalloc_mallocz(ONEWAYALLOC *owa, size_t size) { +#ifdef FSANITIZE_ADDRESS + return mallocz(size); +#endif + OWA_PAGE *head = (OWA_PAGE *)owa; OWA_PAGE *page = head->last; @@ -142,6 +146,11 @@ void *onewayalloc_memdupz(ONEWAYALLOC *owa, const void *src, size_t size) { } void onewayalloc_freez(ONEWAYALLOC *owa __maybe_unused, const void *ptr __maybe_unused) { +#ifdef FSANITIZE_ADDRESS + freez((void *)ptr); + return; +#endif + #ifdef NETDATA_INTERNAL_CHECKS // allow the caller to call us for a mallocz() allocation // so try to find it in our memory and if it is not there diff --git a/libnetdata/parser/Makefile.am b/libnetdata/parser/Makefile.am new file mode 100644 index 0000000000..02fe3a314f --- /dev/null +++ b/libnetdata/parser/Makefile.am @@ -0,0 +1,9 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +dist_noinst_DATA = \ + README.md \ + $(NULL) + diff --git a/libnetdata/parser/README.md b/libnetdata/parser/README.md new file mode 100644 index 0000000000..cf59bfd767 --- /dev/null +++ b/libnetdata/parser/README.md @@ -0,0 +1,156 @@ +<!-- +title: "Parser" +custom_edit_url: https://github.com/netdata/netdata/blob/master/parser/README.md +sidebar_label: "Parser" +learn_status: "Published" +learn_topic_type: "References" +learn_rel_path: "Developers/Database" +--> + + +#### Introduction + +The parser will be used to process streaming and plugins input as well as metadata + +Usage + +1. Define a structure that will be used to share user state across calls +1. Initialize the parser using `parser_init` +2. Register keywords and associated callback function using `parser_add_keyword` +3. Register actions on the keywords +4. Start a loop until EOF + 1. Fetch the next line using `parser_next` + 2. Process the line using `parser_action` + 1. The registered callbacks are executed to parse the input + 2. The registered action for the callback is called for processing +4. Release the parser using `parser_destroy` +5. Release the user structure + +#### Functions + +TODO: + +##### parse_init(RRDHOST *host, void *user, void *input, int flags) + +Initialize an internal parser with the specified user defined data structure that will be shared across calls. + +Input +- Host + - The host this parser will be dealing with. For streaming with SSL enabled for this host +- user + - User defined structure that is passed in all the calls +- input + - Where the parser will get the input from +- flags + - flags to define processing on the input + +Output +- A parser structure + + + +##### parse_push(PARSER *parser, char *line) + +Push a new line for processing + +Input + +- parser + - The parser object as returned by the `parser_init` +- line + - The new line to process + + +Output +- The line will be injected into the stream and will be the next one to be processed + +Returns +- 0 line added +- 1 error detected + + +##### parse_add_keyword(PARSER *parser, char *keyword, keyword_function callback_function) + +The function will add callbacks for keywords. The callback function is defined as + +`typedef PARSER_RC (*keyword_function)(char **, void *);` + +Input + +- parser + - The parser object as returned by the `parser_init` +- keyword + - The keyword to register +- keyword_function + - The callback that will handle the keyword processing + * The callback function should return one of the following + * PARSER_RC_OK - Callback was successful (continue with other callbacks) + * PARSER_RC_STOP - Stop processing callbacks (return OK) + * PARSER_RC_ERROR - Callback failed, exit + +Output +- The corresponding keyword and callback will be registered + +Returns +- 0 maximum callbacks already registered for this keyword +- > 0 which is the number of callbacks associated with this keyword. + + +##### parser_next(PARSER *parser) +Return the next item to parse + +Input +- parser + - The parser object as returned by the `parser_init` + +Output +- The parser will store internally the next item to parse + +Returns +- 0 Next item fetched successfully +- 1 No more items to parse + + +##### parser_action(PARSER *parser, char *input) +Return the next item to parse + +Input +- parser + - The parser object as returned by the `parser_init` +- input + - Process the input specified instead of using the internal buffer + +Output +- The current keyword will be processed by calling all the registered callbacks + +Returns +- 0 Callbacks called successfully +- 1 Failed + + +##### parser_destroy(PARSER *parser) +Cleanup a previously allocated parser + +Input +- parser + - The parser object as returned by the `parser_init` + +Output +- The parser is deallocated + +Returns +- none + + +##### parser_recover_input(PARSER *parser) +Cleanup a previously allocated parser + +Input +- parser + - The parser object as returned by the `parser_init` + +Output +- The parser is deallocated + +Returns +- none diff --git a/libnetdata/parser/parser.c b/libnetdata/parser/parser.c new file mode 100644 index 0000000000..c3eebcd163 --- /dev/null +++ b/libnetdata/parser/parser.c @@ -0,0 +1,225 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "parser.h" +#include "collectors/plugins.d/pluginsd_parser.h" + +static inline int find_first_keyword(const char *src, char *dst, int dst_size, int (*custom_isspace)(char)) { + const char *s = src, *keyword_start; + + while (unlikely(custom_isspace(*s))) s++; + keyword_start = s; + + while (likely(*s && !custom_isspace(*s)) && dst_size > 1) { + *dst++ = *s++; + dst_size--; + } + *dst = '\0'; + return dst_size == 0 ? 0 : (int) (s - keyword_start); +} + +/* + * Initialize a parser + * user : as defined by the user, will be shared across calls + * input : main input stream (auto detect stream -- file, socket, pipe) + * buffer : This is the buffer to be used (if null a buffer of size will be allocated) + * size : buffer size either passed or will be allocated + * If the buffer is auto allocated, it will auto freed when the parser is destroyed + * + * + */ + +PARSER *parser_init(void *user, FILE *fp_input, FILE *fp_output, int fd, + PARSER_INPUT_TYPE flags, void *ssl __maybe_unused) +{ + PARSER *parser; + + parser = callocz(1, sizeof(*parser)); + parser->user = user; + parser->fd = fd; + parser->fp_input = fp_input; + parser->fp_output = fp_output; +#ifdef ENABLE_HTTPS + parser->ssl_output = ssl; +#endif + parser->flags = flags; + parser->worker_job_next_id = WORKER_PARSER_FIRST_JOB; + + return parser; +} + + +static inline PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *command) { + uint32_t hash = parser_hash_function(command); + uint32_t slot = hash % PARSER_KEYWORDS_HASHTABLE_SIZE; + PARSER_KEYWORD *t = parser->keywords.hashtable[slot]; + + if(likely(t && strcmp(t->keyword, command) == 0)) + return t; + + return NULL; +} + +/* + * Add a keyword and the corresponding function that will be called + * Multiple functions may be added + * Input : keyword + * : callback function + * : flags + * Output: > 0 registered function number + * : 0 Error + */ + +void parser_add_keyword(PARSER *parser, char *keyword, keyword_function func) { + if(unlikely(!parser || !keyword || !*keyword || !func)) + fatal("PARSER: invalid parameters"); + + PARSER_KEYWORD *t = callocz(1, sizeof(*t)); + t->worker_job_id = parser->worker_job_next_id++; + t->keyword = strdupz(keyword); + t->func = func; + + uint32_t hash = parser_hash_function(keyword); + uint32_t slot = hash % PARSER_KEYWORDS_HASHTABLE_SIZE; + + if(unlikely(parser->keywords.hashtable[slot])) + fatal("PARSER: hashtable collision between keyword '%s' and '%s' on slot %u. " + "Change the hashtable size and / or the hashing function. " + "Run the unit test to find the optimal values.", + parser->keywords.hashtable[slot]->keyword, + t->keyword, + slot + ); + + parser->keywords.hashtable[slot] = t; + + worker_register_job_name(t->worker_job_id, t->keyword); +} + +/* + * Cleanup a previously allocated parser + */ + +void parser_destroy(PARSER *parser) +{ + if (unlikely(!parser)) + return; + + dictionary_destroy(parser->inflight.functions); + + // Remove keywords + for(size_t i = 0 ; i < PARSER_KEYWORDS_HASHTABLE_SIZE; i++) { + PARSER_KEYWORD *t = parser->keywords.hashtable[i]; + if (t) { + freez(t->keyword); + freez(t); + } + } + + freez(parser); +} + + +/* + * Fetch the next line to process + * + */ + +int parser_next(PARSER *parser, char *buffer, size_t buffer_size) +{ + char *tmp = fgets(buffer, (int)buffer_size, (FILE *)parser->fp_input); + + if (unlikely(!tmp)) { + if (feof((FILE *)parser->fp_input)) + error("PARSER: read failed: end of file"); + + else if (ferror((FILE *)parser->fp_input)) + error("PARSER: read failed: input error"); + + else + error("PARSER: read failed: unknown error"); + + return 1; + } + + return 0; +} + + +/* +* Takes an initialized parser object that has an unprocessed entry (by calling parser_next) +* and if it contains a valid keyword, it will execute all the callbacks +* +*/ + +inline int parser_action(PARSER *parser, char *input) +{ + parser->line++; + + if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) { + char command[PLUGINSD_LINE_MAX + 1]; + bool has_keyword = find_first_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space); + + if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) { + if(parser->defer.response) { + buffer_strcat(parser->defer.response, input); + if(buffer_strlen(parser->defer.response) > 10 * 1024 * 1024) { + // more than 10MB of data + // a bad plugin that did not send the end_keyword + internal_error(true, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response)); + return 1; + } + } + return 0; + } + else { + // call the action + parser->defer.action(parser, parser->defer.action_data); + + // empty everything + parser->defer.action = NULL; + parser->defer.action_data = NULL; + parser->defer.end_keyword = NULL; + parser->defer.response = NULL; + parser->flags &= ~PARSER_DEFER_UNTIL_KEYWORD; + } + return 0; + } + + char *words[PLUGINSD_MAX_WORDS]; + size_t num_words = pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS); + const char *command = get_word(words, num_words, 0); + + if(unlikely(!command)) + return 0; + + PARSER_RC rc; + PARSER_KEYWORD *t = parser_find_keyword(parser, command); + if(likely(t)) { + worker_is_busy(t->worker_job_id); + rc = (*t->func)(words, num_words, parser->user); + worker_is_idle(); + } + else + rc = PARSER_RC_ERROR; + +#ifdef NETDATA_INTERNAL_CHECKS + if(rc == PARSER_RC_ERROR) { + BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL); + for(size_t i = 0; i < num_words ;i++) { + if(i) buffer_fast_strcat(wb, " ", 1); + + buffer_fast_strcat(wb, "\"", 1); + const char *s = get_word(words, num_words, i); + buffer_strcat(wb, s?s:""); + buffer_fast_strcat(wb, "\"", 1); + } + + internal_error(true, "PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)", + command, parser->line, buffer_tostring(wb)); + + buffer_free(wb); + } +#endif + + return (rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP); +} diff --git a/libnetdata/parser/parser.h b/libnetdata/parser/parser.h new file mode 100644 index 0000000000..9e0d3480de --- /dev/null +++ b/libnetdata/parser/parser.h @@ -0,0 +1,101 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_INCREMENTAL_PARSER_H +#define NETDATA_INCREMENTAL_PARSER_H 1 + +#include "../libnetdata.h" + +#define WORKER_PARSER_FIRST_JOB 3 + +// this has to be in-sync with the same at receiver.c +#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) + +#define PARSER_KEYWORDS_HASHTABLE_SIZE 73 // unittest finds this magic number +//#define parser_hash_function(s) djb2_hash32(s) +//#define parser_hash_function(s) fnv1_hash32(s) +//#define parser_hash_function(s) fnv1a_hash32(s) +//#define parser_hash_function(s) larson_hash32(s) +#define parser_hash_function(s) pluginsd_parser_hash32(s) + +// PARSER return codes +typedef enum __attribute__ ((__packed__)) parser_rc { + PARSER_RC_OK, // Callback was successful, go on + PARSER_RC_STOP, // Callback says STOP + PARSER_RC_ERROR // Callback failed (abort rest of callbacks) +} PARSER_RC; + +typedef enum __attribute__ ((__packed__)) parser_input_type { + PARSER_INPUT_SPLIT = (1 << 1), + PARSER_DEFER_UNTIL_KEYWORD = (1 << 2), +} PARSER_INPUT_TYPE; + +typedef PARSER_RC (*keyword_function)(char **words, size_t num_words, void *user_data); + +typedef struct parser_keyword { + size_t worker_job_id; + char *keyword; + keyword_function func; +} PARSER_KEYWORD; + +typedef struct parser { + size_t worker_job_next_id; + uint8_t version; // Parser version + int fd; // Socket + FILE *fp_input; // Input source e.g. stream + FILE *fp_output; // Stream to send commands to plugin +#ifdef ENABLE_HTTPS + struct netdata_ssl *ssl_output; +#endif + void *user; // User defined structure to hold extra state between calls + uint32_t flags; + size_t line; + + struct { + PARSER_KEYWORD *hashtable[PARSER_KEYWORDS_HASHTABLE_SIZE]; + } keywords; + + struct { + const char *end_keyword; + BUFFER *response; + void (*action)(struct parser *parser, void *action_data); + void *action_data; + } defer; + + struct { + DICTIONARY *functions; + usec_t smaller_timeout; + } inflight; +} PARSER; + +PARSER *parser_init(void *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl); +void parser_add_keyword(PARSER *working_parser, char *keyword, keyword_function func); +int parser_next(PARSER *working_parser, char *buffer, size_t buffer_size); +int parser_action(PARSER *working_parser, char *input); +void parser_destroy(PARSER *working_parser); + +PARSER_RC pluginsd_set(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_end(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_flush(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_disable(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_label(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_overwrite(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_clabel_commit(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user); + +PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user); + +PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user); +PARSER_RC pluginsd_end_v2(char **words, size_t num_words, void *user); +void pluginsd_cleanup_v2(void *user); + +#endif diff --git a/libnetdata/string/string.c b/libnetdata/string/string.c index 4e232523c8..9385aa6e8d 100644 --- a/libnetdata/string/string.c +++ b/libnetdata/string/string.c @@ -300,16 +300,20 @@ void string_freez(STRING *string) { string_stats_atomic_increment(releases); } -size_t string_strlen(STRING *string) { +inline size_t string_strlen(STRING *string) { if(unlikely(!string)) return 0; return string->length - 1; } -const char *string2str(STRING *string) { +inline const char *string2str(STRING *string) { if(unlikely(!string)) return ""; return string->str; } +int string_strcmp(STRING *string, const char *s) { + return strcmp(string2str(string), s); +} + STRING *string_2way_merge(STRING *a, STRING *b) { static STRING *X = NULL; |