summaryrefslogtreecommitdiffstats
path: root/libnetdata
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-02-09 20:27:05 +0200
committerGitHub <noreply@github.com>2023-02-09 20:27:05 +0200
commit414f416c5d290db3c3eed9073258c834fac7f2f7 (patch)
treeaeed0a2619a1f42c60ec816311d1dd10c8e5ccc1 /libnetdata
parent8043106b60ec3d8d32b3a9ff3ef53991e73d9037 (diff)
Virtual hosts for data collection (#14464)
* support multiple hosts at pluginsd structures * cleanup obsolete code * use a lookup hashtable to quickly find the keyword to execute, without traversing the whole linked list of keywords * more cleanup * move new hash function to inlined.h * minimize comparisons, eliminate a pre-parsing of the first keyword for each line * cleanup parser from old code * move parser into libnetdata * unique entries in parser keywords hashtable * move all hashing functions to inlined.h, name their sources, simple_hash() now defaults to FNV1a, it was FNV1 * small_hash() for parser * plugins.d now can switch hosts, and also create/update them * update hash function and hashtable size * updated message * unittest all hashing functions * reset the chart when setting a new host * remove host tags * enable archived hosts when a collector pushes host info * do not need localhost to swtich to localhost * disable ARAL and OWA with -DFSANITIZE_ADDRESS=1
Diffstat (limited to 'libnetdata')
-rw-r--r--libnetdata/Makefile.am1
-rw-r--r--libnetdata/aral/aral.c8
-rw-r--r--libnetdata/inlined.h113
-rw-r--r--libnetdata/libnetdata.c29
-rw-r--r--libnetdata/libnetdata.h57
-rw-r--r--libnetdata/onewayalloc/onewayalloc.c9
-rw-r--r--libnetdata/parser/Makefile.am9
-rw-r--r--libnetdata/parser/README.md156
-rw-r--r--libnetdata/parser/parser.c225
-rw-r--r--libnetdata/parser/parser.h101
-rw-r--r--libnetdata/string/string.c8
-rw-r--r--libnetdata/string/string.h1
12 files changed, 633 insertions, 84 deletions
diff --git a/libnetdata/Makefile.am b/libnetdata/Makefile.am
index b81d620ba6..4bf7791368 100644
--- a/libnetdata/Makefile.am
+++ b/libnetdata/Makefile.am
@@ -20,6 +20,7 @@ SUBDIRS = \
locks \
log \
onewayalloc \
+ parser \
popen \
procfile \
simple_pattern \
diff --git a/libnetdata/aral/aral.c b/libnetdata/aral/aral.c
index 4505ee0f28..2a4b0687dd 100644
--- a/libnetdata/aral/aral.c
+++ b/libnetdata/aral/aral.c
@@ -465,6 +465,9 @@ static inline ARAL_PAGE *aral_acquire_a_free_slot(ARAL *ar TRACE_ALLOCATIONS_FUN
}
void *aral_mallocz_internal(ARAL *ar TRACE_ALLOCATIONS_FUNCTION_DEFINITION_PARAMS) {
+#ifdef FSANITIZE_ADDRESS
+ return mallocz(ar->config.requested_element_size);
+#endif
ARAL_PAGE *page = aral_acquire_a_free_slot(ar TRACE_ALLOCATIONS_FUNCTION_CALL_PARAMS);
@@ -614,6 +617,11 @@ static inline void aral_move_page_with_free_list___aral_lock_needed(ARAL *ar, AR
}
void aral_freez_internal(ARAL *ar, void *ptr TRACE_ALLOCATIONS_FUNCTION_DEFINITION_PARAMS) {
+#ifdef FSANITIZE_ADDRESS
+ freez(ptr);
+ return;
+#endif
+
if(unlikely(!ptr)) return;
// get the page pointer
diff --git a/libnetdata/inlined.h b/libnetdata/inlined.h
index 94410cee44..730cd079b7 100644
--- a/libnetdata/inlined.h
+++ b/libnetdata/inlined.h
@@ -21,25 +21,118 @@ typedef uint64_t kernel_uint_t;
// for faster execution, allow the compiler to inline
// these functions that are called thousands of times per second
-static inline uint32_t simple_hash(const char *name) {
+static inline uint32_t djb2_hash32(const char* name) {
unsigned char *s = (unsigned char *) name;
- uint32_t hval = 0x811c9dc5;
+ uint32_t hash = 5381;
+ while (*s)
+ hash = ((hash << 5) + hash) + (uint32_t) *s++; // hash * 33 + char
+ return hash;
+}
+
+static inline uint32_t pluginsd_parser_hash32(const char *name) {
+ unsigned char *s = (unsigned char *) name;
+ uint32_t hash = 0;
while (*s) {
- hval *= 16777619;
- hval ^= (uint32_t) *s++;
+ hash <<= 5;
+ hash += *s++ - ' ';
}
- return hval;
+ return hash;
}
-static inline uint32_t simple_uhash(const char *name) {
+// https://stackoverflow.com/a/107657
+static inline uint32_t larson_hash32(const char *name) {
unsigned char *s = (unsigned char *) name;
- uint32_t hval = 0x811c9dc5, c;
+ uint32_t hash = 0;
+ while (*s)
+ hash = hash * 101 + (uint32_t) *s++;
+ return hash;
+}
+
+// http://isthe.com/chongo/tech/comp/fnv/
+static inline uint32_t fnv1_hash32(const char *name) {
+ unsigned char *s = (unsigned char *) name;
+ uint32_t hash = 0x811c9dc5;
+ while (*s) {
+ hash *= 0x01000193; // 16777619
+ hash ^= (uint32_t) *s++;
+ }
+ return hash;
+}
+
+// http://isthe.com/chongo/tech/comp/fnv/
+static inline uint32_t fnv1a_hash32(const char *name) {
+ unsigned char *s = (unsigned char *) name;
+ uint32_t hash = 0x811c9dc5;
+ while (*s) {
+ hash ^= (uint32_t) *s++;
+ hash *= 0x01000193; // 16777619
+ }
+ return hash;
+}
+
+static inline uint32_t fnv1a_uhash32(const char *name) {
+ unsigned char *s = (unsigned char *) name;
+ uint32_t hash = 0x811c9dc5, c;
while ((c = *s++)) {
if (unlikely(c >= 'A' && c <= 'Z')) c += 'a' - 'A';
- hval *= 16777619;
- hval ^= c;
+ hash ^= c;
+ hash *= 0x01000193; // 16777619
}
- return hval;
+ return hash;
+}
+
+#define simple_hash(s) fnv1a_hash32(s)
+#define simple_uhash(s) fnv1a_uhash32(s)
+
+static inline size_t indexing_partition_old(Word_t ptr, Word_t modulo) {
+ size_t total = 0;
+
+ total += (ptr & 0xff) >> 0;
+ total += (ptr & 0xff00) >> 8;
+ total += (ptr & 0xff0000) >> 16;
+ total += (ptr & 0xff000000) >> 24;
+
+ if(sizeof(Word_t) > 4) {
+ total += (ptr & 0xff00000000) >> 32;
+ total += (ptr & 0xff0000000000) >> 40;
+ total += (ptr & 0xff000000000000) >> 48;
+ total += (ptr & 0xff00000000000000) >> 56;
+ }
+
+ return (total % modulo);
+}
+
+static uint32_t murmur32(uint32_t k) __attribute__((const));
+static inline uint32_t murmur32(uint32_t k) {
+ k ^= k >> 16;
+ k *= 0x85ebca6b;
+ k ^= k >> 13;
+ k *= 0xc2b2ae35;
+ k ^= k >> 16;
+
+ return k;
+}
+
+static uint64_t murmur64(uint64_t k) __attribute__((const));
+static inline uint64_t murmur64(uint64_t k) {
+ k ^= k >> 33;
+ k *= 0xff51afd7ed558ccdUL;
+ k ^= k >> 33;
+ k *= 0xc4ceb9fe1a85ec53UL;
+ k ^= k >> 33;
+
+ return k;
+}
+
+static inline size_t indexing_partition(Word_t ptr, Word_t modulo) __attribute__((const));
+static inline size_t indexing_partition(Word_t ptr, Word_t modulo) {
+#ifdef ENV64BIT
+ uint64_t hash = murmur64(ptr);
+ return hash % modulo;
+#else
+ uint32_t hash = murmur32(ptr);
+ return hash % modulo;
+#endif
}
static inline int str2i(const char *s) {
diff --git a/libnetdata/libnetdata.c b/libnetdata/libnetdata.c
index 5f5c710785..66d41bd546 100644
--- a/libnetdata/libnetdata.c
+++ b/libnetdata/libnetdata.c
@@ -1883,17 +1883,20 @@ inline int config_isspace(char c)
}
// split a text into words, respecting quotes
-inline size_t quoted_strings_splitter(char *str, char **words, size_t max_words, int (*custom_isspace)(char), char *recover_input, char **recover_location, int max_recover)
+inline size_t quoted_strings_splitter(char *str, char **words, size_t max_words, int (*custom_isspace)(char))
{
char *s = str, quote = 0;
size_t i = 0;
- int rec = 0;
- char *recover = recover_input;
// skip all white space
while (unlikely(custom_isspace(*s)))
s++;
+ if(unlikely(!*s)) {
+ words[i] = NULL;
+ return 0;
+ }
+
// check for quote
if (unlikely(*s == '\'' || *s == '"')) {
quote = *s; // remember the quote
@@ -1905,19 +1908,15 @@ inline size_t quoted_strings_splitter(char *str, char **words, size_t max_words,
// while we have something
while (likely(*s)) {
- // if it is escape
+ // if it is an escape
if (unlikely(*s == '\\' && s[1])) {
s += 2;
continue;
}
- // if it is quote
+ // if it is a quote
else if (unlikely(*s == quote)) {
quote = 0;
- if (recover && rec < max_recover) {
- recover_location[rec++] = s;
- *recover++ = *s;
- }
*s = ' ';
continue;
}
@@ -1925,19 +1924,13 @@ inline size_t quoted_strings_splitter(char *str, char **words, size_t max_words,
// if it is a space
else if (unlikely(quote == 0 && custom_isspace(*s))) {
// terminate the word
- if (recover && rec < max_recover) {
- if (!rec || recover_location[rec-1] != s) {
- recover_location[rec++] = s;
- *recover++ = *s;
- }
- }
*s++ = '\0';
// skip all white space
while (likely(custom_isspace(*s)))
s++;
- // check for quote
+ // check for a quote
if (unlikely(*s == '\'' || *s == '"')) {
quote = *s; // remember the quote
s++; // skip the quote
@@ -1965,9 +1958,9 @@ inline size_t quoted_strings_splitter(char *str, char **words, size_t max_words,
return i;
}
-inline size_t pluginsd_split_words(char *str, char **words, size_t max_words, char *recover_input, char **recover_location, int max_recover)
+inline size_t pluginsd_split_words(char *str, char **words, size_t max_words)
{
- return quoted_strings_splitter(str, words, max_words, pluginsd_space, recover_input, recover_location, max_recover);
+ return quoted_strings_splitter(str, words, max_words, pluginsd_space);
}
bool bitmap256_get_bit(BITMAP256 *ptr, uint8_t idx) {
diff --git a/libnetdata/libnetdata.h b/libnetdata/libnetdata.h
index 8d50a55c57..65f1b5185a 100644
--- a/libnetdata/libnetdata.h
+++ b/libnetdata/libnetdata.h
@@ -485,8 +485,8 @@ void bitmap256_set_bit(BITMAP256 *ptr, uint8_t idx, bool value);
int config_isspace(char c);
int pluginsd_space(char c);
-size_t quoted_strings_splitter(char *str, char **words, size_t max_words, int (*custom_isspace)(char), char *recover_input, char **recover_location, int max_recover);
-size_t pluginsd_split_words(char *str, char **words, size_t max_words, char *recover_string, char **recover_location, int max_recover);
+size_t quoted_strings_splitter(char *str, char **words, size_t max_words, int (*custom_isspace)(char));
+size_t pluginsd_split_words(char *str, char **words, size_t max_words);
static inline char *get_word(char **words, size_t num_words, size_t index) {
if (index >= num_words)
@@ -547,6 +547,7 @@ extern char *netdata_configured_host_prefix;
#include "libnetdata/aral/aral.h"
#include "onewayalloc/onewayalloc.h"
#include "worker_utilization/worker_utilization.h"
+#include "parser/parser.h"
// BEWARE: Outside of the C code this also exists in alarm-notify.sh
#define DEFAULT_CLOUD_BASE_URL "https://api.netdata.cloud"
@@ -609,58 +610,6 @@ static inline PPvoid_t JudyLLastThenPrev(Pcvoid_t PArray, Word_t * PIndex, bool
return JudyLPrev(PArray, PIndex, PJE0);
}
-static inline size_t indexing_partition_old(Word_t ptr, Word_t modulo) {
- size_t total = 0;
-
- total += (ptr & 0xff) >> 0;
- total += (ptr & 0xff00) >> 8;
- total += (ptr & 0xff0000) >> 16;
- total += (ptr & 0xff000000) >> 24;
-
- if(sizeof(Word_t) > 4) {
- total += (ptr & 0xff00000000) >> 32;
- total += (ptr & 0xff0000000000) >> 40;
- total += (ptr & 0xff000000000000) >> 48;
- total += (ptr & 0xff00000000000000) >> 56;
- }
-
- return (total % modulo);
-}
-
-static uint32_t murmur32(uint32_t h) __attribute__((const));
-static inline uint32_t murmur32(uint32_t h) {
- h ^= h >> 16;
- h *= 0x85ebca6b;
- h ^= h >> 13;
- h *= 0xc2b2ae35;
- h ^= h >> 16;
-
- return h;
-}
-
-static uint64_t murmur64(uint64_t h) __attribute__((const));
-static inline uint64_t murmur64(uint64_t k) {
- k ^= k >> 33;
- k *= 0xff51afd7ed558ccdUL;
- k ^= k >> 33;
- k *= 0xc4ceb9fe1a85ec53UL;
- k ^= k >> 33;
-
- return k;
-}
-
-static inline size_t indexing_partition(Word_t ptr, Word_t modulo) __attribute__((const));
-static inline size_t indexing_partition(Word_t ptr, Word_t modulo) {
- if(sizeof(Word_t) == 8) {
- uint64_t hash = murmur64(ptr);
- return hash % modulo;
- }
- else {
- uint32_t hash = murmur32(ptr);
- return hash % modulo;
- }
-}
-
typedef enum {
TIMING_STEP_INTERNAL = 0,
diff --git a/libnetdata/onewayalloc/onewayalloc.c b/libnetdata/onewayalloc/onewayalloc.c
index 2f007b1898..489ce73d7e 100644
--- a/libnetdata/onewayalloc/onewayalloc.c
+++ b/libnetdata/onewayalloc/onewayalloc.c
@@ -97,6 +97,10 @@ ONEWAYALLOC *onewayalloc_create(size_t size_hint) {
}
void *onewayalloc_mallocz(ONEWAYALLOC *owa, size_t size) {
+#ifdef FSANITIZE_ADDRESS
+ return mallocz(size);
+#endif
+
OWA_PAGE *head = (OWA_PAGE *)owa;
OWA_PAGE *page = head->last;
@@ -142,6 +146,11 @@ void *onewayalloc_memdupz(ONEWAYALLOC *owa, const void *src, size_t size) {
}
void onewayalloc_freez(ONEWAYALLOC *owa __maybe_unused, const void *ptr __maybe_unused) {
+#ifdef FSANITIZE_ADDRESS
+ freez((void *)ptr);
+ return;
+#endif
+
#ifdef NETDATA_INTERNAL_CHECKS
// allow the caller to call us for a mallocz() allocation
// so try to find it in our memory and if it is not there
diff --git a/libnetdata/parser/Makefile.am b/libnetdata/parser/Makefile.am
new file mode 100644
index 0000000000..02fe3a314f
--- /dev/null
+++ b/libnetdata/parser/Makefile.am
@@ -0,0 +1,9 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
+
diff --git a/libnetdata/parser/README.md b/libnetdata/parser/README.md
new file mode 100644
index 0000000000..cf59bfd767
--- /dev/null
+++ b/libnetdata/parser/README.md
@@ -0,0 +1,156 @@
+<!--
+title: "Parser"
+custom_edit_url: https://github.com/netdata/netdata/blob/master/parser/README.md
+sidebar_label: "Parser"
+learn_status: "Published"
+learn_topic_type: "References"
+learn_rel_path: "Developers/Database"
+-->
+
+
+#### Introduction
+
+The parser will be used to process streaming and plugins input as well as metadata
+
+Usage
+
+1. Define a structure that will be used to share user state across calls
+1. Initialize the parser using `parser_init`
+2. Register keywords and associated callback function using `parser_add_keyword`
+3. Register actions on the keywords
+4. Start a loop until EOF
+ 1. Fetch the next line using `parser_next`
+ 2. Process the line using `parser_action`
+ 1. The registered callbacks are executed to parse the input
+ 2. The registered action for the callback is called for processing
+4. Release the parser using `parser_destroy`
+5. Release the user structure
+
+#### Functions
+
+TODO:
+
+##### parse_init(RRDHOST *host, void *user, void *input, int flags)
+
+Initialize an internal parser with the specified user defined data structure that will be shared across calls.
+
+Input
+- Host
+ - The host this parser will be dealing with. For streaming with SSL enabled for this host
+- user
+ - User defined structure that is passed in all the calls
+- input
+ - Where the parser will get the input from
+- flags
+ - flags to define processing on the input
+
+Output
+- A parser structure
+
+
+
+##### parse_push(PARSER *parser, char *line)
+
+Push a new line for processing
+
+Input
+
+- parser
+ - The parser object as returned by the `parser_init`
+- line
+ - The new line to process
+
+
+Output
+- The line will be injected into the stream and will be the next one to be processed
+
+Returns
+- 0 line added
+- 1 error detected
+
+
+##### parse_add_keyword(PARSER *parser, char *keyword, keyword_function callback_function)
+
+The function will add callbacks for keywords. The callback function is defined as
+
+`typedef PARSER_RC (*keyword_function)(char **, void *);`
+
+Input
+
+- parser
+ - The parser object as returned by the `parser_init`
+- keyword
+ - The keyword to register
+- keyword_function
+ - The callback that will handle the keyword processing
+ * The callback function should return one of the following
+ * PARSER_RC_OK - Callback was successful (continue with other callbacks)
+ * PARSER_RC_STOP - Stop processing callbacks (return OK)
+ * PARSER_RC_ERROR - Callback failed, exit
+
+Output
+- The corresponding keyword and callback will be registered
+
+Returns
+- 0 maximum callbacks already registered for this keyword
+- > 0 which is the number of callbacks associated with this keyword.
+
+
+##### parser_next(PARSER *parser)
+Return the next item to parse
+
+Input
+- parser
+ - The parser object as returned by the `parser_init`
+
+Output
+- The parser will store internally the next item to parse
+
+Returns
+- 0 Next item fetched successfully
+- 1 No more items to parse
+
+
+##### parser_action(PARSER *parser, char *input)
+Return the next item to parse
+
+Input
+- parser
+ - The parser object as returned by the `parser_init`
+- input
+ - Process the input specified instead of using the internal buffer
+
+Output
+- The current keyword will be processed by calling all the registered callbacks
+
+Returns
+- 0 Callbacks called successfully
+- 1 Failed
+
+
+##### parser_destroy(PARSER *parser)
+Cleanup a previously allocated parser
+
+Input
+- parser
+ - The parser object as returned by the `parser_init`
+
+Output
+- The parser is deallocated
+
+Returns
+- none
+
+
+##### parser_recover_input(PARSER *parser)
+Cleanup a previously allocated parser
+
+Input
+- parser
+ - The parser object as returned by the `parser_init`
+
+Output
+- The parser is deallocated
+
+Returns
+- none
diff --git a/libnetdata/parser/parser.c b/libnetdata/parser/parser.c
new file mode 100644
index 0000000000..c3eebcd163
--- /dev/null
+++ b/libnetdata/parser/parser.c
@@ -0,0 +1,225 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "parser.h"
+#include "collectors/plugins.d/pluginsd_parser.h"
+
+static inline int find_first_keyword(const char *src, char *dst, int dst_size, int (*custom_isspace)(char)) {
+ const char *s = src, *keyword_start;
+
+ while (unlikely(custom_isspace(*s))) s++;
+ keyword_start = s;
+
+ while (likely(*s && !custom_isspace(*s)) && dst_size > 1) {
+ *dst++ = *s++;
+ dst_size--;
+ }
+ *dst = '\0';
+ return dst_size == 0 ? 0 : (int) (s - keyword_start);
+}
+
+/*
+ * Initialize a parser
+ * user : as defined by the user, will be shared across calls
+ * input : main input stream (auto detect stream -- file, socket, pipe)
+ * buffer : This is the buffer to be used (if null a buffer of size will be allocated)
+ * size : buffer size either passed or will be allocated
+ * If the buffer is auto allocated, it will auto freed when the parser is destroyed
+ *
+ *
+ */
+
+PARSER *parser_init(void *user, FILE *fp_input, FILE *fp_output, int fd,
+ PARSER_INPUT_TYPE flags, void *ssl __maybe_unused)
+{
+ PARSER *parser;
+
+ parser = callocz(1, sizeof(*parser));
+ parser->user = user;
+ parser->fd = fd;
+ parser->fp_input = fp_input;
+ parser->fp_output = fp_output;
+#ifdef ENABLE_HTTPS
+ parser->ssl_output = ssl;
+#endif
+ parser->flags = flags;
+ parser->worker_job_next_id = WORKER_PARSER_FIRST_JOB;
+
+ return parser;
+}
+
+
+static inline PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *command) {
+ uint32_t hash = parser_hash_function(command);
+ uint32_t slot = hash % PARSER_KEYWORDS_HASHTABLE_SIZE;
+ PARSER_KEYWORD *t = parser->keywords.hashtable[slot];
+
+ if(likely(t && strcmp(t->keyword, command) == 0))
+ return t;
+
+ return NULL;
+}
+
+/*
+ * Add a keyword and the corresponding function that will be called
+ * Multiple functions may be added
+ * Input : keyword
+ * : callback function
+ * : flags
+ * Output: > 0 registered function number
+ * : 0 Error
+ */
+
+void parser_add_keyword(PARSER *parser, char *keyword, keyword_function func) {
+ if(unlikely(!parser || !keyword || !*keyword || !func))
+ fatal("PARSER: invalid parameters");
+
+ PARSER_KEYWORD *t = callocz(1, sizeof(*t));
+ t->worker_job_id = parser->worker_job_next_id++;
+ t->keyword = strdupz(keyword);
+ t->func = func;
+
+ uint32_t hash = parser_hash_function(keyword);
+ uint32_t slot = hash % PARSER_KEYWORDS_HASHTABLE_SIZE;
+
+ if(unlikely(parser->keywords.hashtable[slot]))
+ fatal("PARSER: hashtable collision between keyword '%s' and '%s' on slot %u. "
+ "Change the hashtable size and / or the hashing function. "
+ "Run the unit test to find the optimal values.",
+ parser->keywords.hashtable[slot]->keyword,
+ t->keyword,
+ slot
+ );
+
+ parser->keywords.hashtable[slot] = t;
+
+ worker_register_job_name(t->worker_job_id, t->keyword);
+}
+
+/*
+ * Cleanup a previously allocated parser
+ */
+
+void parser_destroy(PARSER *parser)
+{
+ if (unlikely(!parser))
+ return;
+
+ dictionary_destroy(parser->inflight.functions);
+
+ // Remove keywords
+ for(size_t i = 0 ; i < PARSER_KEYWORDS_HASHTABLE_SIZE; i++) {
+ PARSER_KEYWORD *t = parser->keywords.hashtable[i];
+ if (t) {
+ freez(t->keyword);
+ freez(t);
+ }
+ }
+
+ freez(parser);
+}
+
+
+/*
+ * Fetch the next line to process
+ *
+ */
+
+int parser_next(PARSER *parser, char *buffer, size_t buffer_size)
+{
+ char *tmp = fgets(buffer, (int)buffer_size, (FILE *)parser->fp_input);
+
+ if (unlikely(!tmp)) {
+ if (feof((FILE *)parser->fp_input))
+ error("PARSER: read failed: end of file");
+
+ else if (ferror((FILE *)parser->fp_input))
+ error("PARSER: read failed: input error");
+
+ else
+ error("PARSER: read failed: unknown error");
+
+ return 1;
+ }
+
+ return 0;
+}
+
+
+/*
+* Takes an initialized parser object that has an unprocessed entry (by calling parser_next)
+* and if it contains a valid keyword, it will execute all the callbacks
+*
+*/
+
+inline int parser_action(PARSER *parser, char *input)
+{
+ parser->line++;
+
+ if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) {
+ char command[PLUGINSD_LINE_MAX + 1];
+ bool has_keyword = find_first_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space);
+
+ if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) {
+ if(parser->defer.response) {
+ buffer_strcat(parser->defer.response, input);
+ if(buffer_strlen(parser->defer.response) > 10 * 1024 * 1024) {
+ // more than 10MB of data
+ // a bad plugin that did not send the end_keyword
+ internal_error(true, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response));
+ return 1;
+ }
+ }
+ return 0;
+ }
+ else {
+ // call the action
+ parser->defer.action(parser, parser->defer.action_data);
+
+ // empty everything
+ parser->defer.action = NULL;
+ parser->defer.action_data = NULL;
+ parser->defer.end_keyword = NULL;
+ parser->defer.response = NULL;
+ parser->flags &= ~PARSER_DEFER_UNTIL_KEYWORD;
+ }
+ return 0;
+ }
+
+ char *words[PLUGINSD_MAX_WORDS];
+ size_t num_words = pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS);
+ const char *command = get_word(words, num_words, 0);
+
+ if(unlikely(!command))
+ return 0;
+
+ PARSER_RC rc;
+ PARSER_KEYWORD *t = parser_find_keyword(parser, command);
+ if(likely(t)) {
+ worker_is_busy(t->worker_job_id);
+ rc = (*t->func)(words, num_words, parser->user);
+ worker_is_idle();
+ }
+ else
+ rc = PARSER_RC_ERROR;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(rc == PARSER_RC_ERROR) {
+ BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL);
+ for(size_t i = 0; i < num_words ;i++) {
+ if(i) buffer_fast_strcat(wb, " ", 1);
+
+ buffer_fast_strcat(wb, "\"", 1);
+ const char *s = get_word(words, num_words, i);
+ buffer_strcat(wb, s?s:"");
+ buffer_fast_strcat(wb, "\"", 1);
+ }
+
+ internal_error(true, "PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)",
+ command, parser->line, buffer_tostring(wb));
+
+ buffer_free(wb);
+ }
+#endif
+
+ return (rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP);
+}
diff --git a/libnetdata/parser/parser.h b/libnetdata/parser/parser.h
new file mode 100644
index 0000000000..9e0d3480de
--- /dev/null
+++ b/libnetdata/parser/parser.h
@@ -0,0 +1,101 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_INCREMENTAL_PARSER_H
+#define NETDATA_INCREMENTAL_PARSER_H 1
+
+#include "../libnetdata.h"
+
+#define WORKER_PARSER_FIRST_JOB 3
+
+// this has to be in-sync with the same at receiver.c
+#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3)
+
+#define PARSER_KEYWORDS_HASHTABLE_SIZE 73 // unittest finds this magic number
+//#define parser_hash_function(s) djb2_hash32(s)
+//#define parser_hash_function(s) fnv1_hash32(s)
+//#define parser_hash_function(s) fnv1a_hash32(s)
+//#define parser_hash_function(s) larson_hash32(s)
+#define parser_hash_function(s) pluginsd_parser_hash32(s)
+
+// PARSER return codes
+typedef enum __attribute__ ((__packed__)) parser_rc {
+ PARSER_RC_OK, // Callback was successful, go on
+ PARSER_RC_STOP, // Callback says STOP
+ PARSER_RC_ERROR // Callback failed (abort rest of callbacks)
+} PARSER_RC;
+
+typedef enum __attribute__ ((__packed__)) parser_input_type {
+ PARSER_INPUT_SPLIT = (1 << 1),
+ PARSER_DEFER_UNTIL_KEYWORD = (1 << 2),
+} PARSER_INPUT_TYPE;
+
+typedef PARSER_RC (*keyword_function)(char **words, size_t num_words, void *user_data);
+
+typedef struct parser_keyword {
+ size_t worker_job_id;
+ char *keyword;
+ keyword_function func;
+} PARSER_KEYWORD;
+
+typedef struct parser {
+ size_t worker_job_next_id;
+ uint8_t version; // Parser version
+ int fd; // Socket
+ FILE *fp_input; // Input source e.g. stream
+ FILE *fp_output; // Stream to send commands to plugin
+#ifdef ENABLE_HTTPS
+ struct netdata_ssl *ssl_output;
+#endif
+ void *user; // User defined structure to hold extra state between calls
+ uint32_t flags;
+ size_t line;
+
+ struct {
+ PARSER_KEYWORD *hashtable[PARSER_KEYWORDS_HASHTABLE_SIZE];
+ } keywords;
+
+ struct {
+ const char *end_keyword;
+ BUFFER *response;
+ void (*action)(struct parser *parser, void *action_data);
+ void *action_data;
+ } defer;
+
+ struct {
+ DICTIONARY *functions;
+ usec_t smaller_timeout;
+ } inflight;
+} PARSER;
+
+PARSER *parser_init(void *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl);
+void parser_add_keyword(PARSER *working_parser, char *keyword, keyword_function func);
+int parser_next(PARSER *working_parser, char *buffer, size_t buffer_size);
+int parser_action(PARSER *working_parser, char *input);
+void parser_destroy(PARSER *working_parser);
+
+PARSER_RC pluginsd_set(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_end(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_flush(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_disable(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_label(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_overwrite(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_clabel_commit(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user);
+
+PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user);
+
+PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user);
+PARSER_RC pluginsd_end_v2(char **words, size_t num_words, void *user);
+void pluginsd_cleanup_v2(void *user);
+
+#endif
diff --git a/libnetdata/string/string.c b/libnetdata/string/string.c
index 4e232523c8..9385aa6e8d 100644
--- a/libnetdata/string/string.c
+++ b/libnetdata/string/string.c
@@ -300,16 +300,20 @@ void string_freez(STRING *string) {
string_stats_atomic_increment(releases);
}
-size_t string_strlen(STRING *string) {
+inline size_t string_strlen(STRING *string) {
if(unlikely(!string)) return 0;
return string->length - 1;
}
-const char *string2str(STRING *string) {
+inline const char *string2str(STRING *string) {
if(unlikely(!string)) return "";
return string->str;
}
+int string_strcmp(STRING *string, const char *s) {
+ return strcmp(string2str(string), s);
+}
+
STRING *string_2way_merge(STRING *a, STRING *b) {
static STRING *X = NULL;
diff --git a/libnetdata/string/string.h b/libnetdata/string/string.h
index cec44ebd99..70840ee9a2 100644
--- a/