summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--collectors/plugins.d/plugins_d.h4
-rw-r--r--collectors/plugins.d/pluginsd_parser.c638
-rw-r--r--collectors/plugins.d/pluginsd_parser.h16
-rw-r--r--database/engine/pagecache.h2
-rw-r--r--database/engine/rrdengine.h6
-rwxr-xr-xdatabase/engine/rrdengineapi.c249
-rw-r--r--database/rrd.h7
-rw-r--r--database/rrdset.c44
-rw-r--r--libnetdata/buffer/buffer.c88
-rw-r--r--libnetdata/buffer/buffer.h3
-rw-r--r--libnetdata/dictionary/dictionary.c243
-rw-r--r--libnetdata/inlined.h31
-rw-r--r--libnetdata/libnetdata.c113
-rw-r--r--libnetdata/libnetdata.h54
-rw-r--r--libnetdata/storage_number/storage_number.h4
-rw-r--r--libnetdata/worker_utilization/worker_utilization.c16
-rw-r--r--ml/ml-dummy.c3
-rw-r--r--ml/ml.cc6
-rw-r--r--ml/ml.h2
-rw-r--r--parser/parser.c9
-rw-r--r--parser/parser.h12
-rw-r--r--streaming/receiver.c8
-rw-r--r--streaming/replication.c157
-rw-r--r--streaming/rrdpush.c90
-rw-r--r--streaming/rrdpush.h20
-rw-r--r--streaming/sender.c8
26 files changed, 1305 insertions, 528 deletions
diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h
index 35af9fe583..5c7da73361 100644
--- a/collectors/plugins.d/plugins_d.h
+++ b/collectors/plugins.d/plugins_d.h
@@ -34,6 +34,10 @@
#define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE "RSSTATE"
#define PLUGINSD_KEYWORD_REPLAY_END "REND"
+#define PLUGINSD_KEYWORD_BEGIN_V2 "BEGIN2"
+#define PLUGINSD_KEYWORD_SET_V2 "SET2"
+#define PLUGINSD_KEYWORD_END_V2 "END2"
+
#define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds
#define PLUGINSD_LINE_MAX_SSL_READ 512
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 2c0f2cbc60..448e13de28 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -71,20 +71,109 @@ static inline RRDSET *pluginsd_require_chart_from_parent(void *user, const char
return st;
}
-static inline RRDDIM_ACQUIRED *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) {
+static inline RRDSET *pluginsd_get_chart_from_parent(void *user) {
+ return ((PARSER_USER_OBJECT *) user)->st;
+}
+
+static inline void pluginsd_lock_rrdset_data_collection(void *user) {
+ PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
+ if(u->st && !u->v2.locked_data_collection) {
+ netdata_spinlock_lock(&u->st->data_collection_lock);
+ u->v2.locked_data_collection = true;
+ }
+}
+
+static inline bool pluginsd_unlock_rrdset_data_collection(void *user) {
+ PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
+ if(u->st && u->v2.locked_data_collection) {
+ netdata_spinlock_unlock(&u->st->data_collection_lock);
+ u->v2.locked_data_collection = false;
+ return true;
+ }
+
+ return false;
+}
+
+void pluginsd_rrdset_cleanup(RRDSET *st) {
+ for(size_t i = 0; i < st->pluginsd.used ; i++) {
+ if (st->pluginsd.rda[i]) {
+ rrddim_acquired_release(st->pluginsd.rda[i]);
+ st->pluginsd.rda[i] = NULL;
+ }
+ }
+ freez(st->pluginsd.rda);
+ st->pluginsd.rda = NULL;
+ st->pluginsd.size = 0;
+ st->pluginsd.used = 0;
+ st->pluginsd.pos = 0;
+}
+
+static inline void pluginsd_set_chart_from_parent(void *user, RRDSET *st, const char *keyword) {
+ PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
+
+ if(unlikely(pluginsd_unlock_rrdset_data_collection(user))) {
+ error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked",
+ rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword);
+ }
+
+ if(unlikely(u->v2.ml_locked)) {
+ ml_chart_update_end(u->st);
+ u->v2.ml_locked = false;
+
+ error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked",
+ rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword);
+ }
+
+ if(st) {
+ size_t dims = dictionary_entries(st->rrddim_root_index);
+ if(unlikely(st->pluginsd.size < dims)) {
+ st->pluginsd.rda = reallocz(st->pluginsd.rda, dims * sizeof(RRDDIM_ACQUIRED *));
+ st->pluginsd.size = dims;
+ }
+
+ if(st->pluginsd.pos > st->pluginsd.used && st->pluginsd.pos <= st->pluginsd.size)
+ st->pluginsd.used = st->pluginsd.pos;
+
+ st->pluginsd.pos = 0;
+ }
+
+ u->st = st;
+}
+
+static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) {
if (unlikely(!dimension || !*dimension)) {
error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.",
rrdhost_hostname(host), rrdset_id(st), cmd);
return NULL;
}
- RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
+ RRDDIM_ACQUIRED *rda;
- if (unlikely(!rda))
+ if(likely(st->pluginsd.pos < st->pluginsd.used)) {
+ rda = st->pluginsd.rda[st->pluginsd.pos];
+ RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
+ if (likely(rd && strcmp(rrddim_id(rd), dimension) == 0)) {
+ st->pluginsd.pos++;
+ return rd;
+ }
+ else {
+ rrddim_acquired_release(rda);
+ st->pluginsd.rda[st->pluginsd.pos] = NULL;
+ }
+ }
+
+ rda = rrddim_find_and_acquire(st, dimension);
+ if (unlikely(!rda)) {
error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.",
rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
- return rda;
+ return NULL;
+ }
+
+ if(likely(st->pluginsd.pos < st->pluginsd.size))
+ st->pluginsd.rda[st->pluginsd.pos++] = rda;
+
+ return rrddim_acquired_to_rrddim(rda);
}
static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) {
@@ -102,8 +191,14 @@ static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, cons
return st;
}
-static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user) {
+static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user, const char *keyword, const char *msg) {
((PARSER_USER_OBJECT *) user)->enabled = 0;
+
+ if(keyword && msg) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "PLUGINSD: keyword %s: %s", keyword, msg);
+ }
+
return PARSER_RC_ERROR;
}
@@ -113,24 +208,21 @@ PARSER_RC pluginsd_set(char **words, size_t num_words, void *user)
char *value = get_word(words, num_words, 2);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
- RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET);
- if(!rda) return PLUGINSD_DISABLE_PLUGIN(user);
-
- RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
+ RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET);
+ if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'",
rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET");
if (value && *value)
- rrddim_set_by_pointer(st, rd, strtoll(value, NULL, 0));
+ rrddim_set_by_pointer(st, rd, str2ll_hex_or_dec(value));
- rrddim_acquired_release(rda);
return PARSER_RC_OK;
}
@@ -140,12 +232,12 @@ PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user)
char *microseconds_txt = get_word(words, num_words, 2);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
- ((PARSER_USER_OBJECT *)user)->st = st;
+ pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_BEGIN);
usec_t microseconds = 0;
if (microseconds_txt && *microseconds_txt) {
@@ -187,16 +279,16 @@ PARSER_RC pluginsd_end(char **words, size_t num_words, void *user)
UNUSED(num_words);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st));
- ((PARSER_USER_OBJECT *) user)->st = NULL;
- ((PARSER_USER_OBJECT *) user)->count++;
+ pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_END);
+ ((PARSER_USER_OBJECT *) user)->data_collections_count++;
struct timeval now;
now_realtime_timeval(&now);
@@ -208,7 +300,7 @@ PARSER_RC pluginsd_end(char **words, size_t num_words, void *user)
PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
{
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
char *type = get_word(words, num_words, 1);
char *name = get_word(words, num_words, 2);
@@ -231,19 +323,14 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
}
// make sure we have the required variables
- if (unlikely((!type || !*type || !id || !*id))) {
- error("PLUGINSD: 'host:%s' requested a CHART, without a type.id. Disabling it.",
- rrdhost_hostname(host));
-
- ((PARSER_USER_OBJECT *) user)->enabled = 0;
- return PARSER_RC_ERROR;
- }
+ if (unlikely((!type || !*type || !id || !*id)))
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_CHART, "missing parameters");
// parse the name, and make sure it does not include 'type.'
if (unlikely(name && *name)) {
// when data are streamed from child nodes
// name will be type.name
- // so we have to remove 'type.' from name too
+ // so, we have to remove 'type.' from name too
size_t len = strlen(type);
if (strncmp(type, name, len) == 0 && name[len] == '.')
name = &name[len + 1];
@@ -320,7 +407,7 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
}
}
- ((PARSER_USER_OBJECT *)user)->st = st;
+ pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_CHART);
return PARSER_RC_OK;
}
@@ -332,10 +419,10 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
const char *wall_clock_time_txt = get_word(words, num_words, 3);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0;
time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0;
@@ -379,33 +466,24 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user)
char *options = get_word(words, num_words, 6);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_DIMENSION);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
- if (unlikely(!id)) {
- error("PLUGINSD: 'host:%s/chart:%s' got a DIMENSION, without an id. Disabling it.",
- rrdhost_hostname(host), st ? rrdset_id(st) : "UNSET");
- return PLUGINSD_DISABLE_PLUGIN(user);
- }
-
- if (unlikely(!st && !((PARSER_USER_OBJECT *) user)->st_exists)) {
- error("PLUGINSD: 'host:%s' got a DIMENSION, without a CHART. Disabling it.",
- rrdhost_hostname(host));
- return PLUGINSD_DISABLE_PLUGIN(user);
- }
+ if (unlikely(!id))
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id");
long multiplier = 1;
if (multiplier_s && *multiplier_s) {
- multiplier = strtol(multiplier_s, NULL, 0);
+ multiplier = str2ll_hex_or_dec(multiplier_s);
if (unlikely(!multiplier))
multiplier = 1;
}
long divisor = 1;
if (likely(divisor_s && *divisor_s)) {
- divisor = strtol(divisor_s, NULL, 0);
+ divisor = str2ll_hex_or_dec(divisor_s);
if (unlikely(!divisor))
divisor = 1;
}
@@ -712,9 +790,9 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
NETDATA_DOUBLE v;
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_VARIABLE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
- RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
+ RRDSET *st = pluginsd_get_chart_from_parent(user);
int global = (st) ? 0 : 1;
@@ -730,13 +808,8 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
}
}
- if (unlikely(!name || !*name)) {
- error("PLUGINSD: 'host:%s/chart:%s' got a VARIABLE without a variable name. Disabling it.",
- rrdhost_hostname(host), st ? rrdset_id(st):"UNSET");
-
- ((PARSER_USER_OBJECT *)user)->enabled = 0;
- return PLUGINSD_DISABLE_PLUGIN(user);
- }
+ if (unlikely(!name || !*name))
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_VARIABLE, "missing variable name");
if (unlikely(!value || !*value))
value = NULL;
@@ -750,14 +823,8 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
return PARSER_RC_OK;
}
- if (!global && !st) {
- error("PLUGINSD: 'host:%s/chart:%s' cannot update CHART VARIABLE '%s' without a chart",
- rrdhost_hostname(host),
- st ? rrdset_id(st):"UNSET",
- name
- );
- return PLUGINSD_DISABLE_PLUGIN(user);
- }
+ if (!global && !st)
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given");
char *endptr = NULL;
v = (NETDATA_DOUBLE)str2ndd(value, &endptr);
@@ -803,8 +870,8 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
{
- debug(D_PLUGINSD, "requested a FLUSH");
- ((PARSER_USER_OBJECT *) user)->st = NULL;
+ debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH);
+ pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_FLUSH);
((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
@@ -825,10 +892,8 @@ PARSER_RC pluginsd_label(char **words, size_t num_words, void *user)
const char *label_source = get_word(words, num_words, 2);
const char *value = get_word(words, num_words, 3);
- if (!name || !label_source || !value) {
- error("PLUGINSD: ignoring malformed or empty LABEL command.");
- return PLUGINSD_DISABLE_PLUGIN(user);
- }
+ if (!name || !label_source || !value)
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_LABEL, "missing parameters");
char *store = (char *)value;
bool allocated_store = false;
@@ -874,7 +939,7 @@ PARSER_RC pluginsd_label(char **words, size_t num_words, void *user)
PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
{
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_OVERWRITE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
debug(D_PLUGINSD, "requested to OVERWRITE host labels");
@@ -898,11 +963,12 @@ PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user)
if (!name || !value || !*label_source) {
error("Ignoring malformed or empty CHART LABEL command.");
- return PLUGINSD_DISABLE_PLUGIN(user);
+ return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
}
if(unlikely(!((PARSER_USER_OBJECT *) user)->chart_rrdlabels_linked_temporarily)) {
- ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = ((PARSER_USER_OBJECT *)user)->st->rrdlabels;
+ RRDSET *st = pluginsd_get_chart_from_parent(user);
+ ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = st->rrdlabels;
rrdlabels_unmark_all(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily);
}
@@ -915,17 +981,17 @@ PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user)
PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
{
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
debug(D_PLUGINSD, "requested to commit chart labels");
if(!((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily) {
error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.",
rrdhost_hostname(host));
- return PLUGINSD_DISABLE_PLUGIN(user);
+ return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
}
rrdlabels_remove_all_unmarked(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily);
@@ -937,15 +1003,14 @@ PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words _
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *user)
-{
+PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, void *user) {
char *id = get_word(words, num_words, 1);
char *start_time_str = get_word(words, num_words, 2);
char *end_time_str = get_word(words, num_words, 3);
char *child_now_str = get_word(words, num_words, 4);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st;
if (likely(!id || !*id))
@@ -953,8 +1018,8 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
else
st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
- ((PARSER_USER_OBJECT *) user)->st = st;
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_REPLAY_BEGIN);
if(start_time_str && end_time_str) {
time_t start_time = (time_t)str2ul(start_time_str);
@@ -1016,7 +1081,9 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
return PARSER_RC_OK;
}
- error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, but timestamps are invalid (now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET,
+ error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN
+ " from %ld to %ld, but timestamps are invalid "
+ "(now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET,
rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time,
wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance);
}
@@ -1033,6 +1100,33 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
return PARSER_RC_OK;
}
+static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str) {
+ SN_FLAGS flags = SN_FLAG_NONE;
+
+ char c;
+ while ((c = *flags_str++)) {
+ switch (c) {
+ case 'A':
+ flags |= SN_FLAG_NOT_ANOMALOUS;
+ break;
+
+ case 'R':
+ flags |= SN_FLAG_RESET;
+ break;
+
+ case 'E':
+ flags = SN_EMPTY_SLOT;
+ return flags;
+
+ default:
+ internal_error(true, "Unknown SN_FLAGS flag '%c'", c);
+ break;
+ }
+ }
+
+ return flags;
+}
+
PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
{
char *dimension = get_word(words, num_words, 1);
@@ -1040,31 +1134,34 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
char *flags_str = get_word(words, num_words, 3);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
- if(!((PARSER_USER_OBJECT *) user)->replay.rset_enabled) {
+ PARSER_USER_OBJECT *u = user;
+ if(!u->replay.rset_enabled) {
error_limit_static_thread_var(erl, 1, 0);
- error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_SET " but it is disabled by " PLUGINSD_KEYWORD_REPLAY_BEGIN " errors",
- rrdhost_hostname(host), rrdset_id(st));
+ error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors",
+ rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
// we have to return OK here
return PARSER_RC_OK;
}
- RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET);
- if(!rda) return PLUGINSD_DISABLE_PLUGIN(user);
+ RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET);
+ if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
- if (unlikely(!((PARSER_USER_OBJECT *) user)->replay.start_time || !((PARSER_USER_OBJECT *) user)->replay.end_time)) {
- error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a " PLUGINSD_KEYWORD_REPLAY_SET " with invalid timestamps %ld to %ld from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+ if (unlikely(!u->replay.start_time || !u->replay.end_time)) {
+ error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s with invalid timestamps %ld to %ld from a %s. Disabling it.",
rrdhost_hostname(host),
rrdset_id(st),
dimension,
- ((PARSER_USER_OBJECT *) user)->replay.start_time,
- ((PARSER_USER_OBJECT *) user)->replay.end_time);
- return PLUGINSD_DISABLE_PLUGIN(user);
+ PLUGINSD_KEYWORD_REPLAY_SET,
+ u->replay.start_time,
+ u->replay.end_time,
+ PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
}
if (unlikely(!value_str || !*value_str))
@@ -1074,39 +1171,19 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
flags_str = "";
if (likely(value_str)) {
- RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
-
RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED);
if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) {
NETDATA_DOUBLE value = strtondd(value_str, NULL);
- SN_FLAGS flags = SN_FLAG_NONE;
-
- char c;
- while ((c = *flags_str++)) {
- switch (c) {
- case 'R':
- flags |= SN_FLAG_RESET;
- break;
-
- case 'E':
- flags |= SN_EMPTY_SLOT;
- value = NAN;
- break;
-
- default:
- error("unknown flag '%c'", c);
- break;
- }
- }
+ SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
- if (!netdata_double_isnumber(value)) {
+ if (!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT)) {
value = NAN;
flags = SN_EMPTY_SLOT;
}
- rrddim_store_metric(rd, ((PARSER_USER_OBJECT *) user)->replay.end_time_ut, value, flags);
- rd->last_collected_time.tv_sec = ((PARSER_USER_OBJECT *) user)->replay.end_time;
+ rrddim_store_metric(rd, u->replay.end_time_ut, value, flags);
+ rd->last_collected_time.tv_sec = u->replay.end_time;
rd->last_collected_time.tv_usec = 0;
rd->collections_counter++;
}
@@ -1117,7 +1194,6 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
}
}
- rrddim_acquired_release(rda);
return PARSER_RC_OK;
}
@@ -1133,26 +1209,25 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words
char *last_stored_value_str = get_word(words, num_words, 5);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
- RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
- if(!rda) return PLUGINSD_DISABLE_PLUGIN(user);
+ RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
+ if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
- RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
usec_t dim_last_collected_ut = (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec;
usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0;
if(last_collected_ut > dim_last_collected_ut) {
- rd->last_collected_time.tv_sec = last_collected_ut / USEC_PER_SEC;
- rd->last_collected_time.tv_usec = last_collected_ut % USEC_PER_SEC;
+ rd->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
+ rd->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
}
rd->last_collected_value = last_collected_value_str ? str2ll(last_collected_value_str, NULL) : 0;
rd->last_calculated_value = last_calculated_value_str ? str2ndd(last_calculated_value_str, NULL) : 0;
rd->last_stored_value = last_stored_value_str ? str2ndd(last_stored_value_str, NULL) : 0.0;
- rrddim_acquired_release(rda);
+
return PARSER_RC_OK;
}
@@ -1165,23 +1240,23 @@ PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words
char *last_updated_ut_str = get_word(words, num_words, 2);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec;
usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0;
if(last_collected_ut > chart_last_collected_ut) {
- st->last_collected_time.tv_sec = last_collected_ut / USEC_PER_SEC;
- st->last_collected_time.tv_usec = last_collected_ut % USEC_PER_SEC;
+ st->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
+ st->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
}
usec_t chart_last_updated_ut = (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec;
usec_t last_updated_ut = last_updated_ut_str ? str2ull(last_updated_ut_str) : 0;
if(last_updated_ut > chart_last_updated_ut) {
- st->last_updated.tv_sec = last_updated_ut / USEC_PER_SEC;
- st->last_updated.tv_usec = last_updated_ut % USEC_PER_SEC;
+ st->last_updated.tv_sec = (time_t)(last_updated_ut / USEC_PER_SEC);
+ st->last_updated.tv_usec = (last_updated_ut % USEC_PER_SEC);
}
st->counter++;
@@ -1219,10 +1294,10 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
PARSER_USER_OBJECT *user_object = user;
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
internal_error(true,
@@ -1235,8 +1310,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
);
#endif
- ((PARSER_USER_OBJECT *) user)->st = NULL;
- ((PARSER_USER_OBJECT *) user)->count++;
+ ((PARSER_USER_OBJECT *) user)->data_collections_count++;
if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled && st->rrdhost->receiver) {
time_t now = now_realtime_sec();
@@ -1282,11 +1356,16 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.",
rrdhost_hostname(host), rrdset_id(st));
#endif
+
+ pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_REPLAY_END);
+
worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, 100.0);
return PARSER_RC_OK;
}
+ pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_REPLAY_END);
+
rrdcontext_updated_retention_rrdset(st);
bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st,
@@ -1295,8 +1374,289 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
return ok ? PARSER_RC_OK : PARSER_R