diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-02-07 22:26:16 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-02-07 22:26:16 +0200 |
commit | 8d3c3356ddeb6d62fa76b197e086e3e7fc5eb3dd (patch) | |
tree | e7661d49d0a0044cf1a5f1d3e0e6cc7dbc27f7a6 | |
parent | 12d92fe308f4107f67149ec9105b69ce2610a4f2 (diff) |
Streaming interpolated values (#14431)
* first commit - untested
* fix wrong begin command
* added set v2 too
* debug to log stream buffer
* debug to log stream buffer
* faster streaming printing
* mark charts and dimensions as collected
* use stream points even if sender is not enabled
* comment out stream debug log
* parse null as nan
* custom begin v2
* custom set v2; replication now copies the anomalous flag too
* custom end v2
* enabled stream log test
* renamed to BEGIN2, SET2, END2
* dont mix up replay and v2 members in user object
* fix typo
* cleanup
* support to v2 to v1 proxying
* mark updated dimensions as such
* do not log unknown flags
* comment out stream debug log
* send also the chart id on BEGIN2, v2 to v2
* update the data collections counter
* v2 values are transferred in hex
* faster hex parsing
* a little more generic hex and dec printing and parsing
* fix hex parsing
* minor optimization in dbengine api
* turn debugging into info message
* generalized the timings tracking, so that it can be used in more places
* commented out debug info
* renamed conflicting variable with macro
* remove wrong edits
* integrated ML and added cleanup in case parsing is interrupted
* disable data collection locking during v2
* cleanup stale ML locks; send updated chart variables during v2; add info to find stale locks
* inject an END2 between repeated BEGIN2 from rrdset_done()
* test: remove lockless single-threaded logic from dictionary and aral and apply the right acquire/release memory order to reference counters
* more fine grained dictionary atomics
* remove unecessary return values
* pointer validation under NETDATA_DICTIONARY_VALIDATE_POINTERS
* Revert "pointer validation under NETDATA_DICTIONARY_VALIDATE_POINTERS"
This reverts commit 846cdf2713e2a7ee2ff797f38db11714228800e9.
* Revert "remove unecessary return values"
This reverts commit 8c87d30f4d86f0f5d6b4562cf74fe7447138bbff.
* Revert "more fine grained dictionary atomics"
This reverts commit 984aec4234a340d197d45239ff9a10fd479fcf3c.
* Revert "test: remove lockless single-threaded logic from dictionary and aral and apply the right acquire/release memory order to reference counters"
This reverts commit c460b3d0ad497d2641bd0ea1d63cec7c052e74e4.
* Apply again "pointer validation under NETDATA_DICTIONARY_VALIDATE_POINTERS" while keeping the improved atomic operations.
This reverts commit f158d009
* fix last commit
* fix last commit again
* optimizations in dbengine
* do not send anomaly bit on non-supporting agents (send it when the INTERPOLATED capability is available)
* break long empty-points-loops in rrdset_done()
* decide page alignment on new page allocation, not on every point collected
* create max size pages but no smaller than 1/3
* Fix compilation when --disable-ml is specified
* Return false
* fixes for NETDATA_LOG_REPLICATION_REQUESTS
* added compile option NETDATA_WITHOUT_WORKERS_LATENCY
* put timings in BEGIN2, SET2, END2
* isolate begin2 ml
* revert repositioning data collection lock
* fixed multi-threading of statistics
* do not lookup dimensions all the time if they come in the same order
* update used on iteration, not on every points; also do better error handling
---------
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
-rw-r--r-- | collectors/plugins.d/plugins_d.h | 4 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 638 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.h | 16 | ||||
-rw-r--r-- | database/engine/pagecache.h | 2 | ||||
-rw-r--r-- | database/engine/rrdengine.h | 6 | ||||
-rwxr-xr-x | database/engine/rrdengineapi.c | 249 | ||||
-rw-r--r-- | database/rrd.h | 7 | ||||
-rw-r--r-- | database/rrdset.c | 44 | ||||
-rw-r--r-- | libnetdata/buffer/buffer.c | 88 | ||||
-rw-r--r-- | libnetdata/buffer/buffer.h | 3 | ||||
-rw-r--r-- | libnetdata/dictionary/dictionary.c | 243 | ||||
-rw-r--r-- | libnetdata/inlined.h | 31 | ||||
-rw-r--r-- | libnetdata/libnetdata.c | 113 | ||||
-rw-r--r-- | libnetdata/libnetdata.h | 54 | ||||
-rw-r--r-- | libnetdata/storage_number/storage_number.h | 4 | ||||
-rw-r--r-- | libnetdata/worker_utilization/worker_utilization.c | 16 | ||||
-rw-r--r-- | ml/ml-dummy.c | 3 | ||||
-rw-r--r-- | ml/ml.cc | 6 | ||||
-rw-r--r-- | ml/ml.h | 2 | ||||
-rw-r--r-- | parser/parser.c | 9 | ||||
-rw-r--r-- | parser/parser.h | 12 | ||||
-rw-r--r-- | streaming/receiver.c | 8 | ||||
-rw-r--r-- | streaming/replication.c | 157 | ||||
-rw-r--r-- | streaming/rrdpush.c | 90 | ||||
-rw-r--r-- | streaming/rrdpush.h | 20 | ||||
-rw-r--r-- | streaming/sender.c | 8 |
26 files changed, 1305 insertions, 528 deletions
diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h index 35af9fe583..5c7da73361 100644 --- a/collectors/plugins.d/plugins_d.h +++ b/collectors/plugins.d/plugins_d.h @@ -34,6 +34,10 @@ #define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE "RSSTATE" #define PLUGINSD_KEYWORD_REPLAY_END "REND" +#define PLUGINSD_KEYWORD_BEGIN_V2 "BEGIN2" +#define PLUGINSD_KEYWORD_SET_V2 "SET2" +#define PLUGINSD_KEYWORD_END_V2 "END2" + #define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds #define PLUGINSD_LINE_MAX_SSL_READ 512 diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 2c0f2cbc60..448e13de28 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -71,20 +71,109 @@ static inline RRDSET *pluginsd_require_chart_from_parent(void *user, const char return st; } -static inline RRDDIM_ACQUIRED *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) { +static inline RRDSET *pluginsd_get_chart_from_parent(void *user) { + return ((PARSER_USER_OBJECT *) user)->st; +} + +static inline void pluginsd_lock_rrdset_data_collection(void *user) { + PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; + if(u->st && !u->v2.locked_data_collection) { + netdata_spinlock_lock(&u->st->data_collection_lock); + u->v2.locked_data_collection = true; + } +} + +static inline bool pluginsd_unlock_rrdset_data_collection(void *user) { + PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; + if(u->st && u->v2.locked_data_collection) { + netdata_spinlock_unlock(&u->st->data_collection_lock); + u->v2.locked_data_collection = false; + return true; + } + + return false; +} + +void pluginsd_rrdset_cleanup(RRDSET *st) { + for(size_t i = 0; i < st->pluginsd.used ; i++) { + if (st->pluginsd.rda[i]) { + rrddim_acquired_release(st->pluginsd.rda[i]); + st->pluginsd.rda[i] = NULL; + } + } + freez(st->pluginsd.rda); + st->pluginsd.rda = NULL; + st->pluginsd.size = 0; + st->pluginsd.used = 0; + st->pluginsd.pos = 0; +} + +static inline void pluginsd_set_chart_from_parent(void *user, RRDSET *st, const char *keyword) { + PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; + + if(unlikely(pluginsd_unlock_rrdset_data_collection(user))) { + error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked", + rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword); + } + + if(unlikely(u->v2.ml_locked)) { + ml_chart_update_end(u->st); + u->v2.ml_locked = false; + + error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked", + rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword); + } + + if(st) { + size_t dims = dictionary_entries(st->rrddim_root_index); + if(unlikely(st->pluginsd.size < dims)) { + st->pluginsd.rda = reallocz(st->pluginsd.rda, dims * sizeof(RRDDIM_ACQUIRED *)); + st->pluginsd.size = dims; + } + + if(st->pluginsd.pos > st->pluginsd.used && st->pluginsd.pos <= st->pluginsd.size) + st->pluginsd.used = st->pluginsd.pos; + + st->pluginsd.pos = 0; + } + + u->st = st; +} + +static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) { if (unlikely(!dimension || !*dimension)) { error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.", rrdhost_hostname(host), rrdset_id(st), cmd); return NULL; } - RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension); + RRDDIM_ACQUIRED *rda; - if (unlikely(!rda)) + if(likely(st->pluginsd.pos < st->pluginsd.used)) { + rda = st->pluginsd.rda[st->pluginsd.pos]; + RRDDIM *rd = rrddim_acquired_to_rrddim(rda); + if (likely(rd && strcmp(rrddim_id(rd), dimension) == 0)) { + st->pluginsd.pos++; + return rd; + } + else { + rrddim_acquired_release(rda); + st->pluginsd.rda[st->pluginsd.pos] = NULL; + } + } + + rda = rrddim_find_and_acquire(st, dimension); + if (unlikely(!rda)) { error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.", rrdhost_hostname(host), rrdset_id(st), dimension, cmd); - return rda; + return NULL; + } + + if(likely(st->pluginsd.pos < st->pluginsd.size)) + st->pluginsd.rda[st->pluginsd.pos++] = rda; + + return rrddim_acquired_to_rrddim(rda); } static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) { @@ -102,8 +191,14 @@ static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, cons return st; } -static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user) { +static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user, const char *keyword, const char *msg) { ((PARSER_USER_OBJECT *) user)->enabled = 0; + + if(keyword && msg) { + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, "PLUGINSD: keyword %s: %s", keyword, msg); + } + return PARSER_RC_ERROR; } @@ -113,24 +208,21 @@ PARSER_RC pluginsd_set(char **words, size_t num_words, void *user) char *value = get_word(words, num_words, 2); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); - RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET); - if(!rda) return PLUGINSD_DISABLE_PLUGIN(user); - - RRDDIM *rd = rrddim_acquired_to_rrddim(rda); + RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET); + if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'", rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET"); if (value && *value) - rrddim_set_by_pointer(st, rd, strtoll(value, NULL, 0)); + rrddim_set_by_pointer(st, rd, str2ll_hex_or_dec(value)); - rrddim_acquired_release(rda); return PARSER_RC_OK; } @@ -140,12 +232,12 @@ PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user) char *microseconds_txt = get_word(words, num_words, 2); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); - ((PARSER_USER_OBJECT *)user)->st = st; + pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_BEGIN); usec_t microseconds = 0; if (microseconds_txt && *microseconds_txt) { @@ -187,16 +279,16 @@ PARSER_RC pluginsd_end(char **words, size_t num_words, void *user) UNUSED(num_words); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st)); - ((PARSER_USER_OBJECT *) user)->st = NULL; - ((PARSER_USER_OBJECT *) user)->count++; + pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_END); + ((PARSER_USER_OBJECT *) user)->data_collections_count++; struct timeval now; now_realtime_timeval(&now); @@ -208,7 +300,7 @@ PARSER_RC pluginsd_end(char **words, size_t num_words, void *user) PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) { RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); char *type = get_word(words, num_words, 1); char *name = get_word(words, num_words, 2); @@ -231,19 +323,14 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) } // make sure we have the required variables - if (unlikely((!type || !*type || !id || !*id))) { - error("PLUGINSD: 'host:%s' requested a CHART, without a type.id. Disabling it.", - rrdhost_hostname(host)); - - ((PARSER_USER_OBJECT *) user)->enabled = 0; - return PARSER_RC_ERROR; - } + if (unlikely((!type || !*type || !id || !*id))) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_CHART, "missing parameters"); // parse the name, and make sure it does not include 'type.' if (unlikely(name && *name)) { // when data are streamed from child nodes // name will be type.name - // so we have to remove 'type.' from name too + // so, we have to remove 'type.' from name too size_t len = strlen(type); if (strncmp(type, name, len) == 0 && name[len] == '.') name = &name[len + 1]; @@ -320,7 +407,7 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST); } } - ((PARSER_USER_OBJECT *)user)->st = st; + pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_CHART); return PARSER_RC_OK; } @@ -332,10 +419,10 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us const char *wall_clock_time_txt = get_word(words, num_words, 3); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0; time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0; @@ -379,33 +466,24 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user) char *options = get_word(words, num_words, 6); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_DIMENSION); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); - if (unlikely(!id)) { - error("PLUGINSD: 'host:%s/chart:%s' got a DIMENSION, without an id. Disabling it.", - rrdhost_hostname(host), st ? rrdset_id(st) : "UNSET"); - return PLUGINSD_DISABLE_PLUGIN(user); - } - - if (unlikely(!st && !((PARSER_USER_OBJECT *) user)->st_exists)) { - error("PLUGINSD: 'host:%s' got a DIMENSION, without a CHART. Disabling it.", - rrdhost_hostname(host)); - return PLUGINSD_DISABLE_PLUGIN(user); - } + if (unlikely(!id)) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id"); long multiplier = 1; if (multiplier_s && *multiplier_s) { - multiplier = strtol(multiplier_s, NULL, 0); + multiplier = str2ll_hex_or_dec(multiplier_s); if (unlikely(!multiplier)) multiplier = 1; } long divisor = 1; if (likely(divisor_s && *divisor_s)) { - divisor = strtol(divisor_s, NULL, 0); + divisor = str2ll_hex_or_dec(divisor_s); if (unlikely(!divisor)) divisor = 1; } @@ -712,9 +790,9 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user) NETDATA_DOUBLE v; RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_VARIABLE); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); - RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; + RRDSET *st = pluginsd_get_chart_from_parent(user); int global = (st) ? 0 : 1; @@ -730,13 +808,8 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user) } } - if (unlikely(!name || !*name)) { - error("PLUGINSD: 'host:%s/chart:%s' got a VARIABLE without a variable name. Disabling it.", - rrdhost_hostname(host), st ? rrdset_id(st):"UNSET"); - - ((PARSER_USER_OBJECT *)user)->enabled = 0; - return PLUGINSD_DISABLE_PLUGIN(user); - } + if (unlikely(!name || !*name)) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_VARIABLE, "missing variable name"); if (unlikely(!value || !*value)) value = NULL; @@ -750,14 +823,8 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user) return PARSER_RC_OK; } - if (!global && !st) { - error("PLUGINSD: 'host:%s/chart:%s' cannot update CHART VARIABLE '%s' without a chart", - rrdhost_hostname(host), - st ? rrdset_id(st):"UNSET", - name - ); - return PLUGINSD_DISABLE_PLUGIN(user); - } + if (!global && !st) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given"); char *endptr = NULL; v = (NETDATA_DOUBLE)str2ndd(value, &endptr); @@ -803,8 +870,8 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user) PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) { - debug(D_PLUGINSD, "requested a FLUSH"); - ((PARSER_USER_OBJECT *) user)->st = NULL; + debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH); + pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_FLUSH); ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; @@ -825,10 +892,8 @@ PARSER_RC pluginsd_label(char **words, size_t num_words, void *user) const char *label_source = get_word(words, num_words, 2); const char *value = get_word(words, num_words, 3); - if (!name || !label_source || !value) { - error("PLUGINSD: ignoring malformed or empty LABEL command."); - return PLUGINSD_DISABLE_PLUGIN(user); - } + if (!name || !label_source || !value) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_LABEL, "missing parameters"); char *store = (char *)value; bool allocated_store = false; @@ -874,7 +939,7 @@ PARSER_RC pluginsd_label(char **words, size_t num_words, void *user) PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) { RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_OVERWRITE); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); debug(D_PLUGINSD, "requested to OVERWRITE host labels"); @@ -898,11 +963,12 @@ PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user) if (!name || !value || !*label_source) { error("Ignoring malformed or empty CHART LABEL command."); - return PLUGINSD_DISABLE_PLUGIN(user); + return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); } if(unlikely(!((PARSER_USER_OBJECT *) user)->chart_rrdlabels_linked_temporarily)) { - ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = ((PARSER_USER_OBJECT *)user)->st->rrdlabels; + RRDSET *st = pluginsd_get_chart_from_parent(user); + ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = st->rrdlabels; rrdlabels_unmark_all(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily); } @@ -915,17 +981,17 @@ PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user) PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) { RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); debug(D_PLUGINSD, "requested to commit chart labels"); if(!((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily) { error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.", rrdhost_hostname(host)); - return PLUGINSD_DISABLE_PLUGIN(user); + return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); } rrdlabels_remove_all_unmarked(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily); @@ -937,15 +1003,14 @@ PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words _ return PARSER_RC_OK; } -PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *user) -{ +PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, void *user) { char *id = get_word(words, num_words, 1); char *start_time_str = get_word(words, num_words, 2); char *end_time_str = get_word(words, num_words, 3); char *child_now_str = get_word(words, num_words, 4); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st; if (likely(!id || !*id)) @@ -953,8 +1018,8 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use else st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); - ((PARSER_USER_OBJECT *) user)->st = st; + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_REPLAY_BEGIN); if(start_time_str && end_time_str) { time_t start_time = (time_t)str2ul(start_time_str); @@ -1016,7 +1081,9 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use return PARSER_RC_OK; } - error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, but timestamps are invalid (now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET, + error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN + " from %ld to %ld, but timestamps are invalid " + "(now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET, rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance); } @@ -1033,6 +1100,33 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use return PARSER_RC_OK; } +static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str) { + SN_FLAGS flags = SN_FLAG_NONE; + + char c; + while ((c = *flags_str++)) { + switch (c) { + case 'A': + flags |= SN_FLAG_NOT_ANOMALOUS; + break; + + case 'R': + flags |= SN_FLAG_RESET; + break; + + case 'E': + flags = SN_EMPTY_SLOT; + return flags; + + default: + internal_error(true, "Unknown SN_FLAGS flag '%c'", c); + break; + } + } + + return flags; +} + PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) { char *dimension = get_word(words, num_words, 1); @@ -1040,31 +1134,34 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) char *flags_str = get_word(words, num_words, 3); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); - if(!((PARSER_USER_OBJECT *) user)->replay.rset_enabled) { + PARSER_USER_OBJECT *u = user; + if(!u->replay.rset_enabled) { error_limit_static_thread_var(erl, 1, 0); - error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_SET " but it is disabled by " PLUGINSD_KEYWORD_REPLAY_BEGIN " errors", - rrdhost_hostname(host), rrdset_id(st)); + error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors", + rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN); // we have to return OK here return PARSER_RC_OK; } - RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET); - if(!rda) return PLUGINSD_DISABLE_PLUGIN(user); + RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET); + if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); - if (unlikely(!((PARSER_USER_OBJECT *) user)->replay.start_time || !((PARSER_USER_OBJECT *) user)->replay.end_time)) { - error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a " PLUGINSD_KEYWORD_REPLAY_SET " with invalid timestamps %ld to %ld from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", + if (unlikely(!u->replay.start_time || !u->replay.end_time)) { + error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s with invalid timestamps %ld to %ld from a %s. Disabling it.", rrdhost_hostname(host), rrdset_id(st), dimension, - ((PARSER_USER_OBJECT *) user)->replay.start_time, - ((PARSER_USER_OBJECT *) user)->replay.end_time); - return PLUGINSD_DISABLE_PLUGIN(user); + PLUGINSD_KEYWORD_REPLAY_SET, + u->replay.start_time, + u->replay.end_time, + PLUGINSD_KEYWORD_REPLAY_BEGIN); + return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); } if (unlikely(!value_str || !*value_str)) @@ -1074,39 +1171,19 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) flags_str = ""; if (likely(value_str)) { - RRDDIM *rd = rrddim_acquired_to_rrddim(rda); - RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED); if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) { NETDATA_DOUBLE value = strtondd(value_str, NULL); - SN_FLAGS flags = SN_FLAG_NONE; - - char c; - while ((c = *flags_str++)) { - switch (c) { - case 'R': - flags |= SN_FLAG_RESET; - break; - - case 'E': - flags |= SN_EMPTY_SLOT; - value = NAN; - break; - - default: - error("unknown flag '%c'", c); - break; - } - } + SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str); - if (!netdata_double_isnumber(value)) { + if (!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT)) { value = NAN; flags = SN_EMPTY_SLOT; } - rrddim_store_metric(rd, ((PARSER_USER_OBJECT *) user)->replay.end_time_ut, value, flags); - rd->last_collected_time.tv_sec = ((PARSER_USER_OBJECT *) user)->replay.end_time; + rrddim_store_metric(rd, u->replay.end_time_ut, value, flags); + rd->last_collected_time.tv_sec = u->replay.end_time; rd->last_collected_time.tv_usec = 0; rd->collections_counter++; } @@ -1117,7 +1194,6 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) } } - rrddim_acquired_release(rda); return PARSER_RC_OK; } @@ -1133,26 +1209,25 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words char *last_stored_value_str = get_word(words, num_words, 5); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); - RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); - if(!rda) return PLUGINSD_DISABLE_PLUGIN(user); + RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); + if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); - RRDDIM *rd = rrddim_acquired_to_rrddim(rda); usec_t dim_last_collected_ut = (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec; usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0; if(last_collected_ut > dim_last_collected_ut) { - rd->last_collected_time.tv_sec = last_collected_ut / USEC_PER_SEC; - rd->last_collected_time.tv_usec = last_collected_ut % USEC_PER_SEC; + rd->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC); + rd->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC); } rd->last_collected_value = last_collected_value_str ? str2ll(last_collected_value_str, |