summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-02-15 21:16:29 +0200
committerGitHub <noreply@github.com>2023-02-15 21:16:29 +0200
commitd2daa19bf53c9a8cb781c8e50a86b9961b0503a9 (patch)
tree8d8b744138c28e010a24456aee55447d31a719bd /streaming
parent37a918ae2bc996fc881ab60042ae5a8f434f4c52 (diff)
JSON internal API, IEEE754 base64/hex streaming, weights endpoint optimization (#14493)
* first work on standardizing json formatting * renamed old grouping to time_grouping and added group_by * add dummy functions to enable compilation * buffer json api work * jsonwrap opening with buffer_json_X() functions * cleanup * storage for quotes * optimize buffer printing for both numbers and strings * removed ; from define * contexts json generation using the new json functions * fix buffer overflow at unit test * weights endpoint using new json api * fixes to weights endpoint * check buffer overflow on all buffer functions * do synchronous queries for weights * buffer_flush() now resets json state too * content type typedef * print double values that are above the max 64-bit value * str2ndd() can now parse values above UINT64_MAX * faster number parsing by avoiding double calculations as much as possible * faster number parsing * faster hex parsing * accurate printing and parsing of double values, even for very large numbers that cannot fit in 64bit integers * full printing and parsing without using library functions - and related unit tests * added IEEE754 streaming capability to enable streaming of double values in hex * streaming and replication to transfer all values in hex * use our own str2ndd for set2 * remove subnormal check from ieee * base64 encoding for numbers, instead of hex * when increasing double precision, also make sure the fractional number printed is aligned to the wanted precision * str2ndd_encoded() parses all encoding formats, including integers * prevent uninitialized use * /api/v1/info using the new json API * Fix error when compiling with --disable-ml * Remove redundant 'buffer_unittest' declaration * Fix formatting * Fix formatting * Fix formatting * fix buffer unit test * apps.plugin using the new JSON API * make sure the metrics registry does not accept negative timestamps * do not allow pages with negative timestamps to be loaded from db files; do not accept pages with negative timestamps in the cache * Fix more formatting --------- Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/receiver.c3
-rw-r--r--streaming/replication.c99
-rw-r--r--streaming/rrdpush.c37
-rw-r--r--streaming/rrdpush.h10
-rw-r--r--streaming/sender.c4
5 files changed, 81 insertions, 72 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 0eed7b0fb2..ce2b4869c7 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -339,7 +339,8 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
.host = rpt->host,
.opaque = rpt,
.cd = cd,
- .trust_durations = 1
+ .trust_durations = 1,
+ .capabilities = rpt->capabilities,
};
PARSER *parser = parser_init(&user, NULL, NULL, fd,
diff --git a/streaming/replication.c b/streaming/replication.c
index a1767c234b..5a2ffee53c 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -88,7 +88,7 @@ struct replication_query {
bool locked_data_collection;
bool execute;
bool interrupted;
- bool send_anomaly_bit;
+ STREAM_CAPABILITIES capabilities;
} query;
time_t wall_clock_time;
@@ -114,7 +114,7 @@ static struct replication_query *replication_query_prepare(
time_t query_before,
bool query_enable_streaming,
time_t wall_clock_time,
- bool send_anomaly_bit
+ STREAM_CAPABILITIES capabilities
) {
size_t dimensions = rrdset_number_of_dimensions(st);
struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension));
@@ -133,7 +133,7 @@ static struct replication_query *replication_query_prepare(
q->query.after = query_after;
q->query.before = query_before;
q->query.enable_streaming = query_enable_streaming;
- q->query.send_anomaly_bit = send_anomaly_bit;
+ q->query.capabilities = capabilities;
q->wall_clock_time = wall_clock_time;
@@ -212,29 +212,32 @@ static struct replication_query *replication_query_prepare(
return q;
}
-void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) {
+static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STREAM_CAPABILITIES capabilities) {
+ NUMBER_ENCODING encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
RRDDIM *rd;
- rrddim_foreach_read(rd, st) {
- if(!rd->exposed) continue;
-
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " '", sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1 + 2);
- buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
- buffer_fast_strcat(wb, "' ", 2);
- buffer_print_llu(wb, (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec);
- buffer_fast_strcat(wb, " ", 1);
- buffer_print_ll(wb, rd->last_collected_value);
- buffer_fast_strcat(wb, " ", 1);
- buffer_rrd_value(wb, rd->last_calculated_value);
- buffer_fast_strcat(wb, " ", 1);
- buffer_rrd_value(wb, rd->last_stored_value);
- buffer_fast_strcat(wb, "\n", 1);
- }
+ rrddim_foreach_read(rd, st){
+ if (!rd->exposed) continue;
+
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " '",
+ sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1 + 2);
+ buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
+ buffer_fast_strcat(wb, "' ", 2);
+ buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->last_collected_time.tv_sec * USEC_PER_SEC +
+ (usec_t) rd->last_collected_time.tv_usec);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_int64_encoded(wb, encoding, rd->last_collected_value);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_netdata_double_encoded(wb, encoding, rd->last_calculated_value);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_netdata_double_encoded(wb, encoding, rd->last_stored_value);
+ buffer_fast_strcat(wb, "\n", 1);
+ }
rrddim_foreach_done(rd);
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " ", sizeof(PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE) - 1 + 1);
- buffer_print_llu(wb, (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec);
+ buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t) st->last_collected_time.tv_usec);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_llu(wb, (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec);
+ buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_updated.tv_sec * USEC_PER_SEC + (usec_t) st->last_updated.tv_usec);
buffer_fast_strcat(wb, "\n", 1);
}
@@ -242,7 +245,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q,
size_t dimensions = q->dimensions;
if(wb && q->query.enable_streaming)
- replication_send_chart_collection_state(wb, q->st);
+ replication_send_chart_collection_state(wb, q->st, q->query.capabilities);
if(q->query.locked_data_collection) {
netdata_spinlock_unlock(&q->st->data_collection_lock);
@@ -304,6 +307,7 @@ static void replication_query_align_to_optimal_before(struct replication_query *
static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) {
replication_query_align_to_optimal_before(q);
+ NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
time_t after = q->query.after;
time_t before = q->query.before;
size_t dimensions = q->dimensions;
@@ -436,11 +440,11 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
last_end_time_in_buffer = min_end_time;
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' ", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 4);
- buffer_print_llu(wb, min_start_time);
+ buffer_print_uint64_encoded(wb, encoding, min_start_time);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_llu(wb, min_end_time);
+ buffer_print_uint64_encoded(wb, encoding, min_end_time);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_llu(wb, wall_clock_time);
+ buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
buffer_fast_strcat(wb, "\n", 1);
// output the replay values for this time
@@ -456,9 +460,9 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"", sizeof(PLUGINSD_KEYWORD_REPLAY_SET) - 1 + 2);
buffer_fast_strcat(wb, rrddim_id(d->rd), string_strlen(d->rd->id));
buffer_fast_strcat(wb, "\" ", 2);
- buffer_rrd_value(wb, d->sp.sum);
+ buffer_print_netdata_double_encoded(wb, encoding, d->sp.sum);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_sn_flags(wb, d->sp.flags, q->query.send_anomaly_bit);
+ buffer_print_sn_flags(wb, d->sp.flags, q->query.capabilities & STREAM_CAP_INTERPOLATED);
buffer_fast_strcat(wb, "\n", 1);
points_generated++;
@@ -514,7 +518,7 @@ static struct replication_query *replication_response_prepare(
bool requested_enable_streaming,
time_t requested_after,
time_t requested_before,
- bool send_anomaly_bit
+ STREAM_CAPABILITIES capabilities
) {
time_t wall_clock_time = now_realtime_sec();
@@ -576,7 +580,7 @@ static struct replication_query *replication_response_prepare(
db_first_entry, db_last_entry,
requested_after, requested_before, requested_enable_streaming,
query_after, query_before, query_enable_streaming,
- wall_clock_time, send_anomaly_bit);
+ wall_clock_time, capabilities);
}
void replication_response_cancel_and_finalize(struct replication_query *q) {
@@ -586,6 +590,7 @@ void replication_response_cancel_and_finalize(struct replication_query *q) {
static bool sender_is_still_connected_for_this_request(struct replication_request *rq);
bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) {
+ NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
struct replication_request *rq = q->rq;
RRDSET *st = q->st;
RRDHOST *host = st->rrdhost;
@@ -624,37 +629,21 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
// last end time of the data we sent
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_END " ", sizeof(PLUGINSD_KEYWORD_REPLAY_END) - 1 + 1);
- buffer_print_ll(wb, st->update_every);
+ buffer_print_int64_encoded(wb, encoding, st->update_every);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_llu(wb, db_first_entry);
+ buffer_print_uint64_encoded(wb, encoding, db_first_entry);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_llu(wb, db_last_entry);
+ buffer_print_uint64_encoded(wb, encoding, db_last_entry);
+
buffer_fast_strcat(wb, enable_streaming ? " true " : " false ", 7);
- buffer_print_llu(wb, after);
+
+ buffer_print_uint64_encoded(wb, encoding, after);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_llu(wb, before);
+ buffer_print_uint64_encoded(wb, encoding, before);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_llu(wb, wall_clock_time);
+ buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
buffer_fast_strcat(wb, "\n", 1);
-// buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu %llu\n",
-//
-// // current chart update every
-// (int)st->update_every
-//
-// // child first db time, child end db time
-// , (unsigned long long)db_first_entry, (unsigned long long)db_last_entry
-//
-// // start streaming boolean
-// , enable_streaming ? "true" : "false"
-//
-// // after requested, before requested ('before' can be altered by the child when the request had enable_streaming true)
-// , (unsigned long long)after, (unsigned long long)before
-//
-// // child world clock time
-// , (unsigned long long)wall_clock_time
-// );
-
worker_is_busy(WORKER_JOB_BUFFER_COMMIT);
sender_commit(host->sender, wb);
worker_is_busy(WORKER_JOB_CLEANUP);
@@ -1428,7 +1417,7 @@ static bool replication_execute_request(struct replication_request *rq, bool wor
rq->start_streaming,
rq->after,
rq->before,
- stream_has_capability(rq->sender, STREAM_CAP_INTERPOLATED));
+ rq->sender->capabilities);
}
if(likely(workers))
@@ -1712,7 +1701,7 @@ static int replication_execute_next_pending_request(bool cancel) {
rq->start_streaming,
rq->after,
rq->before,
- stream_has_capability(rq->sender, STREAM_CAP_INTERPOLATED));
+ rq->sender->capabilities);
}
rq->executed = false;
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 0fbe80257d..d8ad802578 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -68,6 +68,23 @@ static void load_stream_conf() {
freez(filename);
}
+STREAM_CAPABILITIES stream_our_capabilities() {
+ return STREAM_CAP_V1 |
+ STREAM_CAP_V2 |
+ STREAM_CAP_VN |
+ STREAM_CAP_VCAPS |
+ STREAM_CAP_HLABELS |
+ STREAM_CAP_CLAIM |
+ STREAM_CAP_CLABELS |
+ STREAM_CAP_FUNCTIONS |
+ STREAM_CAP_REPLICATION |
+ STREAM_CAP_BINARY |
+ STREAM_CAP_INTERPOLATED |
+ STREAM_HAS_COMPRESSION |
+ (ieee754_doubles ? STREAM_CAP_IEEE754 : 0) |
+ 0;
+}
+
bool rrdpush_receiver_needs_dbengine() {
struct section *co;
@@ -326,7 +343,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
buffer_fast_strcat(wb, "\" ", 2);
if(st->last_collected_time.tv_sec > st->upstream_resync_time_s)
- buffer_print_llu(wb, st->usec_since_last_update);
+ buffer_print_uint64(wb, st->usec_since_last_update);
else
buffer_fast_strcat(wb, "0", 1);
@@ -341,7 +358,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
buffer_fast_strcat(wb, "SET \"", 5);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "\" = ", 4);
- buffer_print_ll(wb, rd->collected_value);
+ buffer_print_int64(wb, rd->collected_value);
buffer_fast_strcat(wb, "\n", 1);
}
else {
@@ -386,6 +403,8 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_
if(!rsb->wb || !rsb->v2 || !netdata_double_isnumber(n) || !does_storage_number_exist(flags))
return;
+ NUMBER_ENCODING integer_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
+ NUMBER_ENCODING doubles_encoding = stream_has_capability(rsb, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
BUFFER *wb = rsb->wb;
time_t point_end_time_s = (time_t)(point_end_time_ut / USEC_PER_SEC);
if(unlikely(rsb->last_point_end_time_s != point_end_time_s)) {
@@ -396,14 +415,14 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2);
buffer_fast_strcat(wb, rrdset_id(rd->rrdset), string_strlen(rd->rrdset->id));
buffer_fast_strcat(wb, "' ", 2);
- buffer_print_llu_hex(wb, rd->rrdset->update_every);
+ buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdset->update_every);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_llu_hex(wb, point_end_time_s);
+ buffer_print_uint64_encoded(wb, integer_encoding, point_end_time_s);
buffer_fast_strcat(wb, " ", 1);
if(point_end_time_s == rsb->wall_clock_time)
buffer_fast_strcat(wb, "#", 1);
else
- buffer_print_llu_hex(wb, rsb->wall_clock_time);
+ buffer_print_uint64_encoded(wb, integer_encoding, rsb->wall_clock_time);
buffer_fast_strcat(wb, "\n", 1);
rsb->last_point_end_time_s = point_end_time_s;
@@ -413,13 +432,13 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "' ", 2);
- buffer_print_ll_hex(wb, rd->last_collected_value);
+ buffer_print_int64_encoded(wb, integer_encoding, rd->last_collected_value);
buffer_fast_strcat(wb, " ", 1);
if((NETDATA_DOUBLE)rd->last_collected_value == n)
buffer_fast_strcat(wb, "#", 1);
else
- buffer_rrd_value(wb, n);
+ buffer_print_netdata_double_encoded(wb, doubles_encoding, n);
buffer_fast_strcat(wb, " ", 1);
buffer_print_sn_flags(wb, flags, true);
@@ -484,6 +503,7 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock
return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
return (RRDSET_STREAM_BUFFER) {
+ .capabilities = host->sender->capabilities,
.v2 = stream_has_capability(host->sender, STREAM_CAP_INTERPOLATED),
.rrdset_flags = rrdset_flags,
.wb = sender_start(host->sender),
@@ -1148,6 +1168,7 @@ static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps)
if(caps & STREAM_CAP_REPLICATION) buffer_strcat(wb, "REPLICATION ");
if(caps & STREAM_CAP_BINARY) buffer_strcat(wb, "BINARY ");
if(caps & STREAM_CAP_INTERPOLATED) buffer_strcat(wb, "INTERPOLATED ");
+ if(caps & STREAM_CAP_IEEE754) buffer_strcat(wb, "IEEE754 ");
}
void log_receiver_capabilities(struct receiver_state *rpt) {
@@ -1189,7 +1210,7 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version) {
if(caps & STREAM_CAP_V2)
caps &= ~(STREAM_CAP_V1);
- return caps & STREAM_OUR_CAPABILITIES;
+ return caps & stream_our_capabilities();
}
int32_t stream_capabilities_to_vn(uint32_t caps) {
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 50e2359564..a775b94992 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -42,6 +42,7 @@ typedef enum {
STREAM_CAP_REPLICATION = (1 << 12), // replication supported
STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values
+ STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values
STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
// this must be signed int, so don't use the last bit
@@ -54,13 +55,9 @@ typedef enum {
#define STREAM_HAS_COMPRESSION 0
#endif // ENABLE_COMPRESSION
-#define STREAM_OUR_CAPABILITIES ( \
- STREAM_CAP_V1 | STREAM_CAP_V2 | STREAM_CAP_VN | STREAM_CAP_VCAPS | \
- STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | \
- STREAM_HAS_COMPRESSION | STREAM_CAP_FUNCTIONS | STREAM_CAP_REPLICATION | STREAM_CAP_BINARY | \
- STREAM_CAP_INTERPOLATED)
+STREAM_CAPABILITIES stream_our_capabilities();
-#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)))
+#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability))
// ----------------------------------------------------------------------------
// stream handshake
@@ -307,6 +304,7 @@ bool rrdpush_receiver_needs_dbengine();
int configured_as_parent();
typedef struct rrdset_stream_buffer {
+ STREAM_CAPABILITIES capabilities;
bool v2;
bool begin_v2_added;
time_t wall_clock_time;
diff --git a/streaming/sender.c b/streaming/sender.c
index 1bae576e34..51c373f636 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -529,7 +529,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
#endif
// reset our capabilities to default
- s->capabilities = STREAM_OUR_CAPABILITIES;
+ s->capabilities = stream_our_capabilities();
#ifdef ENABLE_COMPRESSION
// If we don't want compression, remove it from our capabilities
@@ -901,7 +901,7 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
pluginsd_function_result_begin_to_buffer(wb
, string2str(tmp->transaction)
, code
- , functions_content_type_to_format(func_wb->contenttype)
+ , functions_content_type_to_format(func_wb->content_type)
, func_wb->expires);
buffer_fast_strcat(wb, buffer_tostring(func_wb), buffer_strlen(func_wb));