summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-02-15 21:16:29 +0200
committerGitHub <noreply@github.com>2023-02-15 21:16:29 +0200
commitd2daa19bf53c9a8cb781c8e50a86b9961b0503a9 (patch)
tree8d8b744138c28e010a24456aee55447d31a719bd /collectors/plugins.d
parent37a918ae2bc996fc881ab60042ae5a8f434f4c52 (diff)
JSON internal API, IEEE754 base64/hex streaming, weights endpoint optimization (#14493)
* first work on standardizing json formatting * renamed old grouping to time_grouping and added group_by * add dummy functions to enable compilation * buffer json api work * jsonwrap opening with buffer_json_X() functions * cleanup * storage for quotes * optimize buffer printing for both numbers and strings * removed ; from define * contexts json generation using the new json functions * fix buffer overflow at unit test * weights endpoint using new json api * fixes to weights endpoint * check buffer overflow on all buffer functions * do synchronous queries for weights * buffer_flush() now resets json state too * content type typedef * print double values that are above the max 64-bit value * str2ndd() can now parse values above UINT64_MAX * faster number parsing by avoiding double calculations as much as possible * faster number parsing * faster hex parsing * accurate printing and parsing of double values, even for very large numbers that cannot fit in 64bit integers * full printing and parsing without using library functions - and related unit tests * added IEEE754 streaming capability to enable streaming of double values in hex * streaming and replication to transfer all values in hex * use our own str2ndd for set2 * remove subnormal check from ieee * base64 encoding for numbers, instead of hex * when increasing double precision, also make sure the fractional number printed is aligned to the wanted precision * str2ndd_encoded() parses all encoding formats, including integers * prevent uninitialized use * /api/v1/info using the new json API * Fix error when compiling with --disable-ml * Remove redundant 'buffer_unittest' declaration * Fix formatting * Fix formatting * Fix formatting * fix buffer unit test * apps.plugin using the new JSON API * make sure the metrics registry does not accept negative timestamps * do not allow pages with negative timestamps to be loaded from db files; do not accept pages with negative timestamps in the cache * Fix more formatting --------- Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'collectors/plugins.d')
-rw-r--r--collectors/plugins.d/pluginsd_parser.c93
-rw-r--r--collectors/plugins.d/pluginsd_parser.h4
2 files changed, 65 insertions, 32 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 891968e85e..3ce2d49117 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -221,7 +221,7 @@ PARSER_RC pluginsd_set(char **words, size_t num_words, void *user)
rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET");
if (value && *value)
- rrddim_set_by_pointer(st, rd, str2ll_hex_or_dec(value));
+ rrddim_set_by_pointer(st, rd, str2ll_encoded(value));
return PARSER_RC_OK;
}
@@ -616,14 +616,14 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user)
long multiplier = 1;
if (multiplier_s && *multiplier_s) {
- multiplier = str2ll_hex_or_dec(multiplier_s);
+ multiplier = str2ll_encoded(multiplier_s);
if (unlikely(!multiplier))
multiplier = 1;
}
long divisor = 1;
if (likely(divisor_s && *divisor_s)) {
- divisor = str2ll_hex_or_dec(divisor_s);
+ divisor = str2ll_encoded(divisor_s);
if (unlikely(!divisor))
divisor = 1;
}
@@ -901,7 +901,7 @@ PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *u
}
else {
if(format && *format)
- pf->destination_wb->contenttype = functions_format_to_content_type(format);
+ pf->destination_wb->content_type = functions_format_to_content_type(format);
pf->code = code;
@@ -967,7 +967,7 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given");
char *endptr = NULL;
- v = (NETDATA_DOUBLE)str2ndd(value, &endptr);
+ v = (NETDATA_DOUBLE) str2ndd_encoded(value, &endptr);
if (unlikely(endptr && *endptr)) {
if (endptr == value)
error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number",
@@ -1162,13 +1162,13 @@ PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, void *user) {
pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_REPLAY_BEGIN);
if(start_time_str && end_time_str) {
- time_t start_time = (time_t)str2ul(start_time_str);
- time_t end_time = (time_t)str2ul(end_time_str);
+ time_t start_time = (time_t) str2ull_encoded(start_time_str);
+ time_t end_time = (time_t) str2ull_encoded(end_time_str);
time_t wall_clock_time = 0, tolerance;
bool wall_clock_comes_from_child; (void)wall_clock_comes_from_child;
if(child_now_str) {
- wall_clock_time = (time_t)str2ul(child_now_str);
+ wall_clock_time = (time_t) str2ull_encoded(child_now_str);
tolerance = st->update_every + 1;
wall_clock_comes_from_child = true;
}
@@ -1314,7 +1314,7 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED);
if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) {
- NETDATA_DOUBLE value = strtondd(value_str, NULL);
+ NETDATA_DOUBLE value = str2ndd_encoded(value_str, NULL);
SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
if (!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT)) {
@@ -1358,15 +1358,15 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words
if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
usec_t dim_last_collected_ut = (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec;
- usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0;
+ usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
if(last_collected_ut > dim_last_collected_ut) {
rd->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
rd->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
}
- rd->last_collected_value = last_collected_value_str ? str2ll(last_collected_value_str, NULL) : 0;
- rd->last_calculated_value = last_calculated_value_str ? str2ndd(last_calculated_value_str, NULL) : 0;
- rd->last_stored_value = last_stored_value_str ? str2ndd(last_stored_value_str, NULL) : 0.0;
+ rd->last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0;
+ rd->last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0;
+ rd->last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0;
return PARSER_RC_OK;
}
@@ -1386,14 +1386,14 @@ PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words
if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec;
- usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0;
+ usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
if(last_collected_ut > chart_last_collected_ut) {
st->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
st->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
}
usec_t chart_last_updated_ut = (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec;
- usec_t last_updated_ut = last_updated_ut_str ? str2ull(last_updated_ut_str) : 0;
+ usec_t last_updated_ut = last_updated_ut_str ? str2ull_encoded(last_updated_ut_str) : 0;
if(last_updated_ut > chart_last_updated_ut) {
st->last_updated.tv_sec = (time_t)(last_updated_ut / USEC_PER_SEC);
st->last_updated.tv_usec = (last_updated_ut % USEC_PER_SEC);
@@ -1420,16 +1420,17 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
const char *last_entry_requested_txt = get_word(words, num_words, 6);
const char *child_world_time_txt = get_word(words, num_words, 7); // optional
- time_t update_every_child = (time_t)str2ul(update_every_child_txt);
- time_t first_entry_child = (time_t)str2ul(first_entry_child_txt);
- time_t last_entry_child = (time_t)str2ul(last_entry_child_txt);
+ time_t update_every_child = (time_t) str2ull_encoded(update_every_child_txt);
+ time_t first_entry_child = (time_t) str2ull_encoded(first_entry_child_txt);
+ time_t last_entry_child = (time_t) str2ull_encoded(last_entry_child_txt);
bool start_streaming = (strcmp(start_streaming_txt, "true") == 0);
- time_t first_entry_requested = (time_t)str2ul(first_entry_requested_txt);
- time_t last_entry_requested = (time_t)str2ul(last_entry_requested_txt);
+ time_t first_entry_requested = (time_t) str2ull_encoded(first_entry_requested_txt);
+ time_t last_entry_requested = (time_t) str2ull_encoded(last_entry_requested_txt);
// the optional child world time
- time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t)str2ul(child_world_time_txt) : now_realtime_sec();
+ time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t) str2ull_encoded(
+ child_world_time_txt) : now_realtime_sec();
PARSER_USER_OBJECT *user_object = user;
@@ -1543,14 +1544,14 @@ PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) {
// ------------------------------------------------------------------------
// parse the parameters
- time_t update_every = (time_t)str2ull_hex_or_dec(update_every_str);
- time_t end_time = (time_t)str2ull_hex_or_dec(end_time_str);
+ time_t update_every = (time_t) str2ull_encoded(update_every_str);
+ time_t end_time = (time_t) str2ull_encoded(end_time_str);
time_t wall_clock_time;
if(likely(*wall_clock_time_str == '#'))
wall_clock_time = end_time;
else
- wall_clock_time = (time_t)str2ull_hex_or_dec(wall_clock_time_str);
+ wall_clock_time = (time_t) str2ull_encoded(wall_clock_time_str);
if (unlikely(update_every != st->update_every))
rrdset_set_update_every_s(st, update_every);
@@ -1577,6 +1578,10 @@ PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) {
u->v2.stream_buffer = rrdset_push_metric_initialize(u->st, wall_clock_time);
if(u->v2.stream_buffer.v2 && u->v2.stream_buffer.wb) {
+ // check if receiver and sender have the same number parsing capabilities
+ bool can_copy = stream_has_capability(u, STREAM_CAP_IEEE754) == stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754);
+ NUMBER_ENCODING encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
+
BUFFER *wb = u->v2.stream_buffer.wb;
buffer_need_bytes(wb, 1024);
@@ -1587,11 +1592,26 @@ PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) {
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2);
buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
buffer_fast_strcat(wb, "' ", 2);
- buffer_strcat(wb, update_every_str);
+
+ if(can_copy)
+ buffer_strcat(wb, update_every_str);
+ else
+ buffer_print_uint64_encoded(wb, encoding, update_every);
+
buffer_fast_strcat(wb, " ", 1);
- buffer_strcat(wb, end_time_str);
+
+ if(can_copy)
+ buffer_strcat(wb, end_time_str);
+ else
+ buffer_print_uint64_encoded(wb, encoding, end_time);
+
buffer_fast_strcat(wb, " ", 1);
- buffer_strcat(wb, wall_clock_time_str);
+
+ if(can_copy)
+ buffer_strcat(wb, wall_clock_time_str);
+ else
+ buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
+
buffer_fast_strcat(wb, "\n", 1);
u->v2.stream_buffer.last_point_end_time_s = end_time;
@@ -1652,13 +1672,13 @@ PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) {
// ------------------------------------------------------------------------
// parse the parameters
- collected_number collected_value = (collected_number)str2ll_hex_or_dec(collected_str);
+ collected_number collected_value = (collected_number) str2ll_encoded(collected_str);
NETDATA_DOUBLE value;
if(*value_str == '#')
value = (NETDATA_DOUBLE)collected_value;
else
- value = strtondd(value_str, NULL);
+ value = str2ndd_encoded(value_str, NULL);
SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
@@ -1689,14 +1709,25 @@ PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) {
// propagate it forward in v2
if(u->v2.stream_buffer.v2 && u->v2.stream_buffer.begin_v2_added && u->v2.stream_buffer.wb) {
+ // check if receiver and sender have the same number parsing capabilities
+ bool can_copy = stream_has_capability(u, STREAM_CAP_IEEE754) == stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754);
+ NUMBER_ENCODING integer_encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
+ NUMBER_ENCODING doubles_encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
+
BUFFER *wb = u->v2.stream_buffer.wb;
buffer_need_bytes(wb, 1024);
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "' ", 2);
- buffer_strcat(wb, collected_str);
+ if(can_copy)
+ buffer_strcat(wb, collected_str);
+ else
+ buffer_print_int64_encoded(wb, integer_encoding, collected_value); // original v2 had hex
buffer_fast_strcat(wb, " ", 1);
- buffer_strcat(wb, value_str);
+ if(can_copy)
+ buffer_strcat(wb, value_str);
+ else
+ buffer_print_netdata_double_encoded(wb, doubles_encoding, value); // original v2 had decimal
buffer_fast_strcat(wb, " ", 1);
buffer_print_sn_flags(wb, flags, true);
buffer_fast_strcat(wb, "\n", 1);
diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h
index 57829ca349..1fdc23a0eb 100644
--- a/collectors/plugins.d/pluginsd_parser.h
+++ b/collectors/plugins.d/pluginsd_parser.h
@@ -22,6 +22,8 @@ typedef struct parser_user_object {
size_t data_collections_count;
int enabled;
+ STREAM_CAPABILITIES capabilities; // receiver capabilities
+
struct {
bool parsing_host;
uuid_t machine_guid;
@@ -44,7 +46,7 @@ typedef struct parser_user_object {
struct parser_user_object_v2 {
bool locked_data_collection;
- RRDSET_STREAM_BUFFER stream_buffer;
+ RRDSET_STREAM_BUFFER stream_buffer; // sender capabilities in this
time_t update_every;
time_t end_time;
time_t wall_clock_time;