summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-02-07 22:26:16 +0200
committerGitHub <noreply@github.com>2023-02-07 22:26:16 +0200
commit8d3c3356ddeb6d62fa76b197e086e3e7fc5eb3dd (patch)
treee7661d49d0a0044cf1a5f1d3e0e6cc7dbc27f7a6 /streaming
parent12d92fe308f4107f67149ec9105b69ce2610a4f2 (diff)
Streaming interpolated values (#14431)
* first commit - untested * fix wrong begin command * added set v2 too * debug to log stream buffer * debug to log stream buffer * faster streaming printing * mark charts and dimensions as collected * use stream points even if sender is not enabled * comment out stream debug log * parse null as nan * custom begin v2 * custom set v2; replication now copies the anomalous flag too * custom end v2 * enabled stream log test * renamed to BEGIN2, SET2, END2 * dont mix up replay and v2 members in user object * fix typo * cleanup * support to v2 to v1 proxying * mark updated dimensions as such * do not log unknown flags * comment out stream debug log * send also the chart id on BEGIN2, v2 to v2 * update the data collections counter * v2 values are transferred in hex * faster hex parsing * a little more generic hex and dec printing and parsing * fix hex parsing * minor optimization in dbengine api * turn debugging into info message * generalized the timings tracking, so that it can be used in more places * commented out debug info * renamed conflicting variable with macro * remove wrong edits * integrated ML and added cleanup in case parsing is interrupted * disable data collection locking during v2 * cleanup stale ML locks; send updated chart variables during v2; add info to find stale locks * inject an END2 between repeated BEGIN2 from rrdset_done() * test: remove lockless single-threaded logic from dictionary and aral and apply the right acquire/release memory order to reference counters * more fine grained dictionary atomics * remove unecessary return values * pointer validation under NETDATA_DICTIONARY_VALIDATE_POINTERS * Revert "pointer validation under NETDATA_DICTIONARY_VALIDATE_POINTERS" This reverts commit 846cdf2713e2a7ee2ff797f38db11714228800e9. * Revert "remove unecessary return values" This reverts commit 8c87d30f4d86f0f5d6b4562cf74fe7447138bbff. * Revert "more fine grained dictionary atomics" This reverts commit 984aec4234a340d197d45239ff9a10fd479fcf3c. * Revert "test: remove lockless single-threaded logic from dictionary and aral and apply the right acquire/release memory order to reference counters" This reverts commit c460b3d0ad497d2641bd0ea1d63cec7c052e74e4. * Apply again "pointer validation under NETDATA_DICTIONARY_VALIDATE_POINTERS" while keeping the improved atomic operations. This reverts commit f158d009 * fix last commit * fix last commit again * optimizations in dbengine * do not send anomaly bit on non-supporting agents (send it when the INTERPOLATED capability is available) * break long empty-points-loops in rrdset_done() * decide page alignment on new page allocation, not on every point collected * create max size pages but no smaller than 1/3 * Fix compilation when --disable-ml is specified * Return false * fixes for NETDATA_LOG_REPLICATION_REQUESTS * added compile option NETDATA_WITHOUT_WORKERS_LATENCY * put timings in BEGIN2, SET2, END2 * isolate begin2 ml * revert repositioning data collection lock * fixed multi-threading of statistics * do not lookup dimensions all the time if they come in the same order * update used on iteration, not on every points; also do better error handling --------- Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/receiver.c8
-rw-r--r--streaming/replication.c157
-rw-r--r--streaming/rrdpush.c90
-rw-r--r--streaming/rrdpush.h20
-rw-r--r--streaming/sender.c8
5 files changed, 219 insertions, 64 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 95652942e9..9378d2d825 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -332,6 +332,10 @@ static void streaming_parser_thread_cleanup(void *ptr) {
bool plugin_is_enabled(struct plugind *cd);
+void streaming_parser_cleanup(void *user) {
+ pluginsd_cleanup_v2(user);
+}
+
static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) {
size_t result;
@@ -343,7 +347,7 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
.trust_durations = 1
};
- PARSER *parser = parser_init(rpt->host, &user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl);
+ PARSER *parser = parser_init(rpt->host, &user, streaming_parser_cleanup, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl);
rrd_collector_started();
@@ -416,7 +420,7 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
}
done:
- result = user.count;
+ result = user.data_collections_count;
// free parser with the pop function
netdata_thread_cleanup_pop(1);
diff --git a/streaming/replication.c b/streaming/replication.c
index 7c1f16b4c7..2b96dc6cb0 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -88,6 +88,7 @@ struct replication_query {
bool locked_data_collection;
bool execute;
bool interrupted;
+ bool send_anomaly_bit;
} query;
time_t wall_clock_time;
@@ -112,7 +113,8 @@ static struct replication_query *replication_query_prepare(
time_t query_after,
time_t query_before,
bool query_enable_streaming,
- time_t wall_clock_time
+ time_t wall_clock_time,
+ bool send_anomaly_bit
) {
size_t dimensions = rrdset_number_of_dimensions(st);
struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension));
@@ -131,6 +133,7 @@ static struct replication_query *replication_query_prepare(
q->query.after = query_after;
q->query.before = query_before;
q->query.enable_streaming = query_enable_streaming;
+ q->query.send_anomaly_bit = send_anomaly_bit;
q->wall_clock_time = wall_clock_time;
@@ -209,25 +212,30 @@ static struct replication_query *replication_query_prepare(
return q;
}
-static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) {
+void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) {
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
if(!rd->exposed) continue;
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n",
- rrddim_id(rd),
- (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec,
- rd->last_collected_value,
- rd->last_calculated_value,
- rd->last_stored_value
- );
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " '", sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1 + 2);
+ buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
+ buffer_fast_strcat(wb, "' ", 2);
+ buffer_print_llu(wb, (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_ll(wb, rd->last_collected_value);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_rrd_value(wb, rd->last_calculated_value);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_rrd_value(wb, rd->last_stored_value);
+ buffer_fast_strcat(wb, "\n", 1);
}
rrddim_foreach_done(rd);
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n",
- (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec,
- (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec
- );
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " ", sizeof(PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE) - 1 + 1);
+ buffer_print_llu(wb, (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_llu(wb, (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec);
+ buffer_fast_strcat(wb, "\n", 1);
}
static void replication_query_finalize(BUFFER *wb, struct replication_query *q, bool executed) {
@@ -328,9 +336,10 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
error_limit_static_global_var(erl, 1, 0);
error_limit(&erl,
- "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)",
- rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd),
- (unsigned long long) now);
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query "
+ "beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)",
+ rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd),
+ (unsigned long long) now);
continue;
}
@@ -374,9 +383,10 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
else
fix_min_start_time = min_end_time - min_update_every;
+#ifdef NETDATA_INTERNAL_CHECKS
error_limit_static_global_var(erl, 1, 0);
error_limit(&erl, "REPLAY WARNING: 'host:%s/chart:%s' "
- "misaligned dimensions "
+ "misaligned dimensions, "
"update every (min: %ld, max: %ld), "
"start time (min: %ld, max: %ld), "
"end time (min %ld, max %ld), "
@@ -389,6 +399,7 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
now, last_end_time_in_buffer,
fix_min_start_time
);
+#endif
min_start_time = fix_min_start_time;
}
@@ -410,7 +421,8 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
q->query.before = last_end_time_in_buffer;
q->query.enable_streaming = false;
- internal_error(true, "REPLICATION: buffer size %zu is more than the max message size %zu for chart '%s' of host '%s'. "
+ internal_error(true, "REPLICATION: current buffer size %zu is more than the "
+ "max message size %zu for chart '%s' of host '%s'. "
"Interrupting replication request (%ld to %ld, %s) at %ld to %ld, %s.",
buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost),
q->request.after, q->request.before, q->request.enable_streaming?"true":"false",
@@ -422,11 +434,13 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
}
last_end_time_in_buffer = min_end_time;
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n",
- (unsigned long long) min_start_time,
- (unsigned long long) min_end_time,
- (unsigned long long) wall_clock_time
- );
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' ", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 4);
+ buffer_print_llu(wb, min_start_time);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_llu(wb, min_end_time);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_llu(wb, wall_clock_time);
+ buffer_fast_strcat(wb, "\n", 1);
// output the replay values for this time
for (size_t i = 0; i < dimensions; i++) {
@@ -438,8 +452,13 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
!storage_point_is_unset(d->sp) &&
!storage_point_is_gap(d->sp))) {
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n",
- rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : "");
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"", sizeof(PLUGINSD_KEYWORD_REPLAY_SET) - 1 + 2);
+ buffer_fast_strcat(wb, rrddim_id(d->rd), string_strlen(d->rd->id));
+ buffer_fast_strcat(wb, "\" ", 2);
+ buffer_rrd_value(wb, d->sp.sum);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_sn_flags(wb, d->sp.flags, q->query.send_anomaly_bit);
+ buffer_fast_strcat(wb, "\n", 1);
points_generated++;
}
@@ -462,14 +481,14 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
internal_error(true,
"STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
(unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
(unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
}
else
internal_error(true,
"STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
(unsigned long long)after, (unsigned long long)before);
#endif // NETDATA_LOG_REPLICATION_REQUESTS
@@ -483,7 +502,13 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
return finished_with_gap;
}
-static struct replication_query *replication_response_prepare(RRDSET *st, bool requested_enable_streaming, time_t requested_after, time_t requested_before) {
+static struct replication_query *replication_response_prepare(
+ RRDSET *st,
+ bool requested_enable_streaming,
+ time_t requested_after,
+ time_t requested_before,
+ bool send_anomaly_bit
+ ) {
time_t wall_clock_time = now_realtime_sec();
if(requested_after > requested_before) {
@@ -509,7 +534,8 @@ static struct replication_query *replication_response_prepare(RRDSET *st, bool r
bool query_enable_streaming = requested_enable_streaming;
time_t db_first_entry = 0, db_last_entry = 0;
- rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
+ rrdset_get_retention_of_tier_for_collected_chart(
+ st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
if(requested_after == 0 && requested_before == 0 && requested_enable_streaming == true) {
// no data requested - just enable streaming
@@ -543,7 +569,7 @@ static struct replication_query *replication_response_prepare(RRDSET *st, bool r
db_first_entry, db_last_entry,
requested_after, requested_before, requested_enable_streaming,
query_after, query_before, query_enable_streaming,
- wall_clock_time);
+ wall_clock_time, send_anomaly_bit);
}
void replication_response_cancel_and_finalize(struct replication_query *q) {
@@ -562,7 +588,11 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
// holding the host's buffer lock for too long
BUFFER *wb = sender_start(host->sender);
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 2);
+ buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
+ buffer_fast_strcat(wb, "'\n", 2);
+
+// buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
bool locked_data_collection = q->query.locked_data_collection;
q->query.locked_data_collection = false;
@@ -585,23 +615,38 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
// end with first/last entries we have, and the first start time and
// last end time of the data we sent
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu %llu\n",
-
- // current chart update every
- (int)st->update_every
- // child first db time, child end db time
- , (unsigned long long)db_first_entry, (unsigned long long)db_last_entry
-
- // start streaming boolean
- , enable_streaming ? "true" : "false"
-
- // after requested, before requested ('before' can be altered by the child when the request had enable_streaming true)
- , (unsigned long long)after, (unsigned long long)before
-
- // child world clock time
- , (unsigned long long)wall_clock_time
- );
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_END " ", sizeof(PLUGINSD_KEYWORD_REPLAY_END) - 1 + 1);
+ buffer_print_ll(wb, st->update_every);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_llu(wb, db_first_entry);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_llu(wb, db_last_entry);
+ buffer_fast_strcat(wb, enable_streaming ? " true " : " false ", 7);
+ buffer_print_llu(wb, after);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_llu(wb, before);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_llu(wb, wall_clock_time);
+ buffer_fast_strcat(wb, "\n", 1);
+
+// buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu %llu\n",
+//
+// // current chart update every
+// (int)st->update_every
+//
+// // child first db time, child end db time
+// , (unsigned long long)db_first_entry, (unsigned long long)db_last_entry
+//
+// // start streaming boolean
+// , enable_streaming ? "true" : "false"
+//
+// // after requested, before requested ('before' can be altered by the child when the request had enable_streaming true)
+// , (unsigned long long)after, (unsigned long long)before
+//
+// // child world clock time
+// , (unsigned long long)wall_clock_time
+// );
worker_is_busy(WORKER_JOB_BUFFER_COMMIT);
sender_commit(host->sender, wb);
@@ -733,9 +778,9 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
, msg
, r->last_request.after, r->last_request.before
, r->child_db.first_entry_t, r->child_db.last_entry_t
- , r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD"
+ , r->child_db.wall_clock_time, (r->child_db.wall_clock_time == r->local_db.wall_clock_time) ? "SAME" : (r->child_db.wall_clock_time < r->local_db.wall_clock_time) ? "BEHIND" : "AHEAD"
, r->local_db.first_entry_t, r->local_db.last_entry_t
- , r->local_db.now
+ , r->local_db.wall_clock_time
, r->gap.from, r->gap.to
, (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL"
, (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : ""
@@ -1371,7 +1416,12 @@ static bool replication_execute_request(struct replication_request *rq, bool wor
if(likely(workers))
worker_is_busy(WORKER_JOB_PREPARE_QUERY);
- rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before);
+ rq->q = replication_response_prepare(
+ rq->st,
+ rq->start_streaming,
+ rq->after,
+ rq->before,
+ stream_has_capability(rq->sender, STREAM_CAP_INTERPOLATED));
}
if(likely(workers))
@@ -1650,7 +1700,12 @@ static int replication_execute_next_pending_request(bool cancel) {
if (rq->st && !rq->q) {
worker_is_busy(WORKER_JOB_PREPARE_QUERY);
- rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before);
+ rq->q = replication_response_prepare(
+ rq->st,
+ rq->start_streaming,
+ rq->after,
+ rq->before,
+ stream_has_capability(rq->sender, STREAM_CAP_INTERPOLATED));
}
rq->executed = false;
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 256fa82821..3ba9a1da84 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -378,7 +378,72 @@ bool rrdset_push_chart_definition_now(RRDSET *st) {
return true;
}
-void rrdset_done_push(RRDSET *st) {
+void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
+ RRDHOST *host = st->rrdhost;
+ rrdpush_send_chart_metrics(rsb->wb, st, host->sender, rsb->rrdset_flags);
+}
+
+void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags) {
+ if(!rsb->wb || !rsb->v2 || !netdata_double_isnumber(n) || !does_storage_number_exist(flags))
+ return;
+
+ BUFFER *wb = rsb->wb;
+ time_t point_end_time_s = (time_t)(point_end_time_ut / USEC_PER_SEC);
+ if(unlikely(rsb->last_point_end_time_s != point_end_time_s)) {
+
+ if(unlikely(rsb->begin_v2_added))
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
+
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2);
+ buffer_fast_strcat(wb, rrdset_id(rd->rrdset), string_strlen(rd->rrdset->id));
+ buffer_fast_strcat(wb, "' ", 2);
+ buffer_print_llu_hex(wb, rd->rrdset->update_every);
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_llu_hex(wb, point_end_time_s);
+ buffer_fast_strcat(wb, " ", 1);
+ if(point_end_time_s == rsb->wall_clock_time)
+ buffer_fast_strcat(wb, "#", 1);
+ else
+ buffer_print_llu_hex(wb, rsb->wall_clock_time);
+ buffer_fast_strcat(wb, "\n", 1);
+
+ rsb->last_point_end_time_s = point_end_time_s;
+ rsb->begin_v2_added = true;
+ }
+
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
+ buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
+ buffer_fast_strcat(wb, "' ", 2);
+ buffer_print_ll_hex(wb, rd->last_collected_value);
+ buffer_fast_strcat(wb, " ", 1);
+
+ if((NETDATA_DOUBLE)rd->last_collected_value == n)
+ buffer_fast_strcat(wb, "#", 1);
+ else
+ buffer_rrd_value(wb, n);
+
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_sn_flags(wb, flags, true);
+ buffer_fast_strcat(wb, "\n", 1);
+}
+
+void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
+ if(!rsb->wb)
+ return;
+
+ if(rsb->v2 && rsb->begin_v2_added) {
+ if(unlikely(rsb->rrdset_flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES))
+ rrdsetvar_print_to_streaming_custom_chart_variables(st, rsb->wb);
+
+ buffer_fast_strcat(rsb->wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
+ }
+
+ sender_commit(st->rrdhost->sender, rsb->wb);
+
+ *rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, };
+}
+
+RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) {
RRDHOST *host = st->rrdhost;
// fetch the flags we need to check with one atomic operation
@@ -395,7 +460,7 @@ void rrdset_done_push(RRDSET *st) {
error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
}
- return;
+ return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
}
else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) {
info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
@@ -408,17 +473,23 @@ void rrdset_done_push(RRDSET *st) {
if(unlikely((exposed_upstream && replication_in_progress) ||
!should_send_chart_matching(st, rrdset_flags)))
- return;
-
- BUFFER *wb = sender_start(host->sender);
+ return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
- if(unlikely(!exposed_upstream))
+ if(unlikely(!exposed_upstream)) {
+ BUFFER *wb = sender_start(host->sender);
replication_in_progress = rrdpush_send_chart_definition(wb, st);
+ sender_commit(host->sender, wb);
+ }
- if (likely(!replication_in_progress))
- rrdpush_send_chart_metrics(wb, st, host->sender, rrdset_flags);
+ if(replication_in_progress)
+ return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
- sender_commit(host->sender, wb);
+ return (RRDSET_STREAM_BUFFER) {
+ .v2 = stream_has_capability(host->sender, STREAM_CAP_INTERPOLATED),
+ .rrdset_flags = rrdset_flags,
+ .wb = sender_start(host->sender),
+ .wall_clock_time = wall_clock_time,
+ };
}
// labels
@@ -1077,6 +1148,7 @@ static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps)
if(caps & STREAM_CAP_FUNCTIONS) buffer_strcat(wb, "FUNCTIONS ");
if(caps & STREAM_CAP_REPLICATION) buffer_strcat(wb, "REPLICATION ");
if(caps & STREAM_CAP_BINARY) buffer_strcat(wb, "BINARY ");
+ if(caps & STREAM_CAP_INTERPOLATED) buffer_strcat(wb, "INTERPOLATED ");
}
void log_receiver_capabilities(struct receiver_state *rpt) {
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 94c1320e76..50e2359564 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -41,6 +41,7 @@ typedef enum {
STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported
STREAM_CAP_REPLICATION = (1 << 12), // replication supported
STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
+ STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values
STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
// this must be signed int, so don't use the last bit
@@ -56,7 +57,8 @@ typedef enum {
#define STREAM_OUR_CAPABILITIES ( \
STREAM_CAP_V1 | STREAM_CAP_V2 | STREAM_CAP_VN | STREAM_CAP_VCAPS | \
STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | \
- STREAM_HAS_COMPRESSION | STREAM_CAP_FUNCTIONS | STREAM_CAP_REPLICATION | STREAM_CAP_BINARY )
+ STREAM_HAS_COMPRESSION | STREAM_CAP_FUNCTIONS | STREAM_CAP_REPLICATION | STREAM_CAP_BINARY | \
+ STREAM_CAP_INTERPOLATED)
#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)))
@@ -303,7 +305,21 @@ void sender_commit(struct sender_state *s, BUFFER *wb);
int rrdpush_init();
bool rrdpush_receiver_needs_dbengine();
int configured_as_parent();
-void rrdset_done_push(RRDSET *st);
+
+typedef struct rrdset_stream_buffer {
+ bool v2;
+ bool begin_v2_added;
+ time_t wall_clock_time;
+ uint64_t rrdset_flags; // RRDSET_FLAGS
+ time_t last_point_end_time_s;
+ BUFFER *wb;
+} RRDSET_STREAM_BUFFER;
+
+RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time);
+void rrdset_push_metrics_v1(RRDSET_STREAM_BUFFER *rsb, RRDSET *st);
+void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st);
+void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags);
+
bool rrdset_push_chart_definition_now(RRDSET *st);
void *rrdpush_sender_thread(void *ptr);
void rrdpush_send_host_labels(RRDHOST *host);
diff --git a/streaming/sender.c b/streaming/sender.c
index 854b57fc5a..d1ce9b0f86 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -104,6 +104,14 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
netdata_mutex_lock(&s->mutex);
+// FILE *fp = fopen("/tmp/stream.txt", "a");
+// fprintf(fp,
+// "\n--- SEND BEGIN: %s ----\n"
+// "%s"
+// "--- SEND END ----------------------------------------\n"
+// , rrdhost_hostname(s->host), src);
+// fclose(fp);
+
if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
rrdhost_hostname(s->host), s->connected_to, s->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE);