summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-10-05 14:13:46 +0300
committerGitHub <noreply@github.com>2022-10-05 14:13:46 +0300
commit8fc3b351a2e7fc96eced8f924de2e9cec9842128 (patch)
treebde41c66573ccaf8876c280e00742cc6096b587c /streaming
parent6850878e697d66dc90b9af1e750b22238c63c292 (diff)
Allow netdata plugins to expose functions for querying more information about specific charts (#13720)
* function renames and code cleanup in popen.c; no actual code changes * netdata popen() now opens both child process stdin and stdout and returns FILE * for both * pass both input and output to parser structures * updated rrdset to call custom functions * RRDSET FUNCTION leading calls for both sync and async operation * put RRDSET functions to a separate file * added format and timeout at function definition * support for synchronous (internal plugins) and asynchronous (external plugins and children) functions * /api/v1/function endpoint * functions are now attached to the host and there is a dictionary view per chart * functions implemented at plugins.d * remove the defer until keyword hook from plugins.d when it is done * stream sender implementation of functions * sanitization of all functions so that certain characters are only allowed * strictier sanitization * common max size * 1st working plugins.d example * always init inflight dictionary * properly destroy dictionaries to avoid parallel insertion of items * add more debugging on disconnection reasons * add more debugging on disconnection reasons again * streaming receiver respects newlines * dont use the same fp for both streaming receive and send * dont free dbengine memory with internal checks * make sender proceed in the buffer * added timing info and garbage collection at plugins.d * added info about routing nodes * added info about routing nodes with delay * added more info about delays * added more info about delays again * signal sending thread to wake up * streaming version labeling and commented code to support capabilities * added functions to /api/v1/data, /api/v1/charts, /api/v1/chart, /api/v1/info * redirect top output to stdout * address coverity findings * fix resource leaks of popen * log attempts to connect to individual destinations * better messages * properly parse destinations * try to find a function from the most matching to the least matching * log added streaming destinations * rotate destinations bypassing a node in the middle that does not accept our connection * break the loops properly * use typedef to define callbacks * capabilities negotiation during streaming * functions exposed upstream based on capabilities; compression disabled per node persisting reconnects; always try to connect with all capabilities * restore functionality to lookup functions * better logging of capabilities * remove old versions from capabilities when a newer version is there * fix formatting * optimization for plugins.d rrdlabels to avoid creating and destructing dictionaries all the time * delayed health initialization for rrddim and rrdset * cleanup health initialization * fix for popen() not returning the right value * add health worker jobs for initializing rrdset and rrddim * added content type support for functions; apps.plugin permanent function to display all the processes * fixes for functions parameters parsing in apps.plugin * fix for process matching in apps.plugiin * first working function for apps.plugin * Dashboard ACL is disabled for functions; Function errors are all in JSON format * apps.plugin function processes returns json table * use json_escape_string() to escape message * fix formatting * apps.plugin exposes all its metrics to function processes * fix json formatting when filtering out some rows * reopen the internal pipe of rrdpush in case of errors * misplaced statement * do not use buffer->len * support for GLOBAL functions (functions that are not linked to a chart * added /api/v1/functions endpoint; removed format from the FUNCTIONS api; * swagger documentation about the new api end points * added plugins.d documentation about functions * never re-close a file * remove uncessesary ifdef * fixed issues identified by codacy * fix for null label value * make edit-config copy-and-paste friendly * Revert "make edit-config copy-and-paste friendly" This reverts commit 54500c0e0a97f65a0c66c4d34e966f6a9056698e. * reworked sender handshake to fix coverity findings * timeout is zero, for both send_timeout() and recv_timeout() * properly detect that parent closed the socket * support caching of function responses; limit function response to 10MB; added protection from malformed function responses * disabled excessive logging * added units to apps.plugin function processes and normalized all values to be human readable * shorter field names * fixed issues reported * fixed apps.plugin error response; tested that pluginsd can properly handle faulty responses * use double linked list macros for double linked list management * faster apps.plugin function printing by minimizing file operations * added memory percentage * fix compatibility issues with older compilers and FreeBSD * rrdpush sender code cleanup; rrhost structure cleanup from sender flags and variables; * fix letftover variable in ifdef * apps.plugin: do not call detach from the thread; exit immediately when input is broken * exclude AR charts from health * flush cleaner; prefer sender output * clarity * do not fill the cbuffer if not connected * fix * dont enabled host->sender if streaming is not enabled; send host label updates to parent; * functions are only available through ACLK * Prepared statement reports only in dev mode * fix AR chart detection * fix for streaming not being enabling itself * more cleanup of sender and receiver structures * moved read-only flags and configuration options to rrdhost->options * fixed merge with master * fix for incomplete rename * prevent service thread from working on charts that are being collected Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/compression.c12
-rw-r--r--streaming/receiver.c206
-rw-r--r--streaming/rrdpush.c376
-rw-r--r--streaming/rrdpush.h134
-rw-r--r--streaming/sender.c1018
5 files changed, 1108 insertions, 638 deletions
diff --git a/streaming/compression.c b/streaming/compression.c
index cae370817a..1fddc02b91 100644
--- a/streaming/compression.c
+++ b/streaming/compression.c
@@ -66,8 +66,8 @@ static size_t lz4_compressor_compress(struct compressor_state *state, const char
if(unlikely(!state || !size || !out))
return 0;
- if(unlikely(size > LZ4_MAX_MSG_SIZE)) {
- error("%s: Compression Failed - Message size %lu above compression buffer limit: %d", STREAM_COMPRESSION_MSG, (long unsigned int) size, LZ4_MAX_MSG_SIZE);
+ if(unlikely(size > COMPRESSION_MAX_MSG_SIZE)) {
+ error("%s: Compression Failed - Message size %lu above compression buffer limit: %d", STREAM_COMPRESSION_MSG, (long unsigned int)size, COMPRESSION_MAX_MSG_SIZE);
return 0;
}
@@ -103,7 +103,7 @@ static size_t lz4_compressor_compress(struct compressor_state *state, const char
// update the next writing position of the ring buffer
state->data->input_ring_buffer_pos += size;
- if(unlikely(state->data->input_ring_buffer_pos >= state->data->input_ring_buffer_size - LZ4_MAX_MSG_SIZE))
+ if(unlikely(state->data->input_ring_buffer_pos >= state->data->input_ring_buffer_size - COMPRESSION_MAX_MSG_SIZE))
state->data->input_ring_buffer_pos = 0;
// update the signature header
@@ -128,7 +128,7 @@ struct compressor_state *create_compressor()
state->data = callocz(1, sizeof(struct compressor_data));
state->data->stream = LZ4_createStream();
- state->data->input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(LZ4_MAX_MSG_SIZE * 2);
+ state->data->input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(COMPRESSION_MAX_MSG_SIZE * 2);
state->data->input_ring_buffer = callocz(1, state->data->input_ring_buffer_size);
state->compression_result_buffer_size = 0;
state->reset(state);
@@ -280,7 +280,7 @@ static size_t lz4_decompressor_decompress(struct decompressor_state *state)
state->out_buffer = state->data->stream_buffer + state->data->stream_buffer_pos;
state->data->stream_buffer_pos += decompressed_size;
- if (state->data->stream_buffer_pos >= state->data->stream_buffer_size - LZ4_MAX_MSG_SIZE)
+ if (state->data->stream_buffer_pos >= state->data->stream_buffer_size - COMPRESSION_MAX_MSG_SIZE)
state->data->stream_buffer_pos = 0;
state->out_buffer_len = decompressed_size;
state->out_buffer_pos = 0;
@@ -358,7 +358,7 @@ struct decompressor_state *create_decompressor()
state->data = callocz(1, sizeof(struct decompressor_data));
fatal_assert(state->data);
state->data->stream = LZ4_createStreamDecode();
- state->data->stream_buffer_size = LZ4_decoderRingBufferSize(LZ4_MAX_MSG_SIZE);
+ state->data->stream_buffer_size = LZ4_decoderRingBufferSize(COMPRESSION_MAX_MSG_SIZE);
state->data->stream_buffer = mallocz(state->data->stream_buffer_size);
fatal_assert(state->data->stream_buffer);
state->reset(state);
diff --git a/streaming/receiver.c b/streaming/receiver.c
index a2852981a2..6890f8b2d9 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -72,9 +72,8 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins
time_t remote_time = 0;
RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
struct plugind *cd = ((PARSER_USER_OBJECT *)user)->cd;
- if (cd->version < VERSION_GAP_FILLING ) {
- error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", rrdhost_hostname(host), cd->cmd,
- cd->version);
+ if (!(cd->capabilities & STREAM_CAP_GAP_FILLING)) {
+ error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", rrdhost_hostname(host), cd->cmd, cd->capabilities);
return PARSER_RC_OK; // Ignore error and continue stream
}
if (remote_time_txt && *remote_time_txt) {
@@ -111,8 +110,8 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins
(int64_t)remote_time);
int ret;
#ifdef ENABLE_HTTPS
- SSL *conn = host->stream_ssl.conn ;
- if(conn && !host->stream_ssl.flags) {
+ SSL *conn = host->receiver->ssl.conn ;
+ if(conn && !host->receiver->ssl.flags) {
ret = SSL_write(conn, message, strlen(message));
} else {
ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT);
@@ -292,8 +291,10 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
size_t available = sizeof(r->read_buffer) - r->read_len;
if (available) {
size_t len = r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, available);
- if (!len)
+ if (!len) {
+ internal_error(true, "decompressor returned zero length");
return 1;
+ }
r->read_len += len;
}
@@ -301,8 +302,10 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
}
int ret = 0;
- if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret))
+ if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret)) {
+ internal_error(true, "read_stream() failed (1).");
return 1;
+ }
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret);
@@ -320,8 +323,10 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
// we're unable to decompress incomplete block
char compressed[bytes_to_read];
do {
- if (read_stream(r, fp, compressed, bytes_to_read, &ret))
+ if (read_stream(r, fp, compressed, bytes_to_read, &ret)) {
+ internal_error(true, "read_stream() failed (2).");
return 1;
+ }
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret);
@@ -334,8 +339,10 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
// Decompress
size_t bytes_to_parse = r->decompressor->decompress(r->decompressor);
- if (!bytes_to_parse)
+ if (!bytes_to_parse) {
+ internal_error(true, "no bytes to parse.");
return 1;
+ }
// Fill read buffer with decompressed data
r->read_len = r->decompressor->get(r->decompressor, r->read_buffer, sizeof(r->read_buffer));
@@ -347,30 +354,56 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
/* Produce a full line if one exists, statefully return where we start next time.
* When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
*/
-static char *receiver_next_line(struct receiver_state *r, int *pos) {
- int start = *pos, scan = *pos;
- if (scan >= r->read_len) {
+static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t buffer_length, size_t *pos) {
+ size_t start = *pos;
+
+ char *ss = &r->read_buffer[start];
+ char *se = &r->read_buffer[r->read_len];
+ char *ds = buffer;
+ char *de = &buffer[buffer_length - 2];
+
+ if(ss >= se) {
r->read_len = 0;
return NULL;
}
- while (scan < r->read_len && r->read_buffer[scan] != '\n')
- scan++;
- if (scan < r->read_len && r->read_buffer[scan] == '\n') {
- *pos = scan+1;
- r->read_buffer[scan] = 0;
- return &r->read_buffer[start];
+
+ // copy all bytes to buffer
+ while(ss < se && ds < de && *ss != '\n')
+ *ds++ = *ss++;
+
+ // if we have a newline, return the buffer
+ if(ss < se && ds < de && *ss == '\n') {
+ // newline found in the r->read_buffer
+
+ *ds++ = *ss++; // copy the newline too
+ *ds = '\0';
+
+ *pos = ss - r->read_buffer;
+ return buffer;
}
+
+ // if the destination is full, oops!
+ if(ds == de) {
+ error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX);
+ *ds = '\0';
+ *pos = ss - r->read_buffer;
+ return buffer;
+ }
+
+ // no newline found in the r->read_buffer
+ // move everything to the beginning
memmove(r->read_buffer, &r->read_buffer[start], r->read_len - start);
- r->read_len -= start;
+ r->read_len -= (int)start;
return NULL;
}
static void streaming_parser_thread_cleanup(void *ptr) {
PARSER *parser = (PARSER *)ptr;
+ rrd_collector_finished();
parser_destroy(parser);
}
-size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp) {
+size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp_in, FILE *fp_out) {
size_t result;
PARSER_USER_OBJECT user = {
@@ -381,7 +414,9 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
.trust_durations = 1
};
- PARSER *parser = parser_init(rpt->host, &user, fp, PARSER_INPUT_SPLIT);
+ PARSER *parser = parser_init(rpt->host, &user, fp_in, fp_out, PARSER_INPUT_SPLIT);
+
+ rrd_collector_started();
// this keeps the parser with its current value
// so, parser needs to be allocated before pushing it
@@ -390,19 +425,6 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
parser_add_keyword(parser, "TIMESTAMP", streaming_timestamp);
parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id);
- parser->plugins_action->begin_action = &pluginsd_begin_action;
- parser->plugins_action->flush_action = &pluginsd_flush_action;
- parser->plugins_action->end_action = &pluginsd_end_action;
- parser->plugins_action->disable_action = &pluginsd_disable_action;
- parser->plugins_action->variable_action = &pluginsd_variable_action;
- parser->plugins_action->dimension_action = &pluginsd_dimension_action;
- parser->plugins_action->label_action = &pluginsd_label_action;
- parser->plugins_action->overwrite_action = &pluginsd_overwrite_action;
- parser->plugins_action->chart_action = &pluginsd_chart_action;
- parser->plugins_action->set_action = &pluginsd_set_action;
- parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action;
- parser->plugins_action->clabel_action = &pluginsd_clabel_action;
-
user.parser = parser;
#ifdef ENABLE_COMPRESSION
@@ -410,15 +432,26 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
rpt->decompressor->reset(rpt->decompressor);
#endif
- do{
- if (receiver_read(rpt, fp))
- break;
- int pos = 0;
- char *line;
- while ((line = receiver_next_line(rpt, &pos))) {
- if (unlikely(netdata_exit || rpt->shutdown || parser_action(parser, line)))
+ char buffer[PLUGINSD_LINE_MAX + 2];
+ do {
+ if(receiver_read(rpt, fp_in)) break;
+
+ size_t pos = 0;
+ while(receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &pos)) {
+ if(unlikely(netdata_exit)) {
+ internal_error(true, "exiting...");
+ goto done;
+ }
+ if(unlikely(rpt->shutdown)) {
+ internal_error(true, "parser shutdown...");
goto done;
+ }
+ if (unlikely(parser_action(parser, buffer))) {
+ internal_error(true, "parser_action() failed...");
+ goto done;
+ }
}
+
rpt->last_msg_t = now_realtime_sec();
}
while(!netdata_exit);
@@ -495,8 +528,6 @@ static int rrdpush_receive(struct receiver_state *rpt)
char initial_response[HTTP_HEADER_SIZE + 1];
snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST);
#ifdef ENABLE_HTTPS
- rpt->host->stream_ssl.conn = rpt->ssl.conn;
- rpt->host->stream_ssl.flags = rpt->ssl.flags;
if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
#else
if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
@@ -611,7 +642,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
.obsolete = 0,
.started_t = now_realtime_sec(),
.next = NULL,
- .version = 0,
+ .capabilities = 0,
};
// put the client IP and port into the buffers used by plugins.d
@@ -620,32 +651,31 @@ static int rrdpush_receive(struct receiver_state *rpt)
snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
- info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
- char initial_response[HTTP_HEADER_SIZE];
- if (rpt->stream_version > 1) {
- if(rpt->stream_version >= STREAM_VERSION_COMPRESSION){
#ifdef ENABLE_COMPRESSION
- if(!rpt->rrdpush_compression)
- rpt->stream_version = STREAM_VERSION_CLABELS;
-#else
- if(STREAMING_PROTOCOL_CURRENT_VERSION < rpt->stream_version) {
- rpt->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION;
- }
+ if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) {
+ if (!rpt->rrdpush_compression)
+ rpt->capabilities &= ~STREAM_CAP_COMPRESSION;
+ }
#endif
- }
- info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->stream_version);
- sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->stream_version);
- } else if (rpt->stream_version == 1) {
- info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->stream_version);
+
+ info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
+ char initial_response[HTTP_HEADER_SIZE];
+ if (stream_has_capability(rpt, STREAM_CAP_VCAPS)) {
+ log_receiver_capabilities(rpt);
+ sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->capabilities);
+ }
+ else if (stream_has_capability(rpt, STREAM_CAP_VN)) {
+ log_receiver_capabilities(rpt);
+ sprintf(initial_response, "%s%d", START_STREAMING_PROMPT_VN, stream_capabilities_to_vn(rpt->capabilities));
+ } else if (stream_has_capability(rpt, STREAM_CAP_V2)) {
+ log_receiver_capabilities(rpt);
sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2);
- } else {
- info("STREAM %s [receive from [%s]:%s]: Netdata is using first stream protocol.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
- sprintf(initial_response, "%s", START_STREAMING_PROMPT);
+ } else { // stream_has_capability(rpt, STREAM_CAP_V1)
+ log_receiver_capabilities(rpt);
+ sprintf(initial_response, "%s", START_STREAMING_PROMPT_V1);
}
debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response);
- #ifdef ENABLE_HTTPS
- rpt->host->stream_ssl.conn = rpt->ssl.conn;
- rpt->host->stream_ssl.flags = rpt->ssl.flags;
+#ifdef ENABLE_HTTPS
if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
#else
if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
@@ -667,11 +697,33 @@ static int rrdpush_receive(struct receiver_state *rpt)
error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
// convert the socket to a FILE *
- FILE *fp = fdopen(rpt->fd, "r");
- if(!fp) {
+ // It seems that the same FILE * cannot be used for both reading and writing.
+ // (reads and writes seem to interfere with each other, with undefined results).
+
+ int fd_in = rpt->fd;
+ int fd_out = fcntl(rpt->fd, F_DUPFD_CLOEXEC, 0);
+ if(fd_out == -1) {
+ error("STREAM %s [receive from [%s]:%s]: failed to duplicate FD %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - SOCKET ERROR");
- error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
- close(rpt->fd);
+ close(fd_in);
+ return 0;
+ }
+
+ FILE *fp_out = fdopen(fd_out, "w");
+ if(!fp_out) {
+ error("STREAM %s [receive from [%s]:%s]: failed to get a FILE pointer for fd_out %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - SOCKET ERROR");
+ close(fd_in);
+ close(fd_out);
+ return 0;
+ }
+
+ FILE *fp_in = fdopen(fd_in, "r");
+ if(!fp_in) {
+ error("STREAM %s [receive from [%s]:%s]: failed to get a FILE pointer for fd_in %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - SOCKET ERROR");
+ close(fd_in);
+ fclose(fp_out);
return 0;
}
@@ -686,15 +738,6 @@ static int rrdpush_receive(struct receiver_state *rpt)
*/
// rpt->host->connected_senders++;
- if(rpt->stream_version > 0) {
- rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
- rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP);
- }
- else {
- rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP);
- rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
- }
-
if(health_enabled != CONFIG_BOOLEAN_NO) {
if(alarms_delay > 0) {
rpt->host->health_delay_up_to = now_realtime_sec() + alarms_delay;
@@ -713,7 +756,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
info("STREAM %s [receive from [%s]:%s]: receiving metrics...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED");
- cd.version = rpt->stream_version;
+ cd.capabilities = rpt->capabilities;
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
@@ -724,7 +767,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdcontext_host_child_connected(rpt->host);
- size_t count = streaming_parser(rpt, &cd, fp);
+ size_t count = streaming_parser(rpt, &cd, fp_in, fp_out);
log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname,
"DISCONNECTED");
@@ -762,7 +805,8 @@ static int rrdpush_receive(struct receiver_state *rpt)
}
// cleanup
- fclose(fp);
+ fclose(fp_in);
+ fclose(fp_out);
return (int)count;
}
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 9fa9793de2..018b29a033 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -11,8 +11,8 @@
* 1. a random data collection thread, calling rrdset_done_push()
* this is called for each chart.
*
- * the output of this work is kept in a BUFFER in RRDHOST
- * the sender thread is signalled via a pipe (also in RRDHOST)
+ * the output of this work is kept in a thread BUFFER
+ * the sender thread is signalled via a pipe (in RRDHOST)
*
* 2. a sender thread running at the sending netdata
* this is spawned automatically on the first chart to be pushed
@@ -101,9 +101,9 @@ int rrdpush_init() {
bool invalid_certificate = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", CONFIG_BOOLEAN_NO);
if(invalid_certificate == CONFIG_BOOLEAN_YES){
- if(netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE){
+ if(netdata_ssl_validate_server == NETDATA_SSL_VALID_CERTIFICATE){
info("Netdata is configured to accept invalid SSL certificate.");
- netdata_validate_server = NETDATA_SSL_INVALID_CERTIFICATE;
+ netdata_ssl_validate_server = NETDATA_SSL_INVALID_CERTIFICATE;
}
}
@@ -130,40 +130,35 @@ unsigned int remote_clock_resync_iterations = 60;
static inline bool should_send_chart_matching(RRDSET *st) {
- RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE);
+ // get all the flags we need to check, with one atomic operation
+ RRDSET_FLAGS flags = rrdset_flag_check(st,
+ RRDSET_FLAG_UPSTREAM_SEND
+ |RRDSET_FLAG_UPSTREAM_IGNORE
+ |RRDSET_FLAG_ANOMALY_RATE_CHART
+ |RRDSET_FLAG_ANOMALY_DETECTION);
if(unlikely(!flags)) {
RRDHOST *host = st->rrdhost;
// Do not stream anomaly rates charts.
- if (unlikely(rrdset_is_ar_chart(st))) {
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
+ if (unlikely(flags & RRDSET_FLAG_ANOMALY_RATE_CHART))
rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
- flags = RRDSET_FLAG_UPSTREAM_IGNORE;
- }
- else if (rrdset_flag_check(st, RRDSET_FLAG_ANOMALY_DETECTION)) {
- if(ml_streaming_enabled()) {
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+
+ else if (flags & RRDSET_FLAG_ANOMALY_DETECTION) {
+ if(ml_streaming_enabled())
rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
- flags = RRDSET_FLAG_UPSTREAM_SEND;
- }
- else {
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
+ else
rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
- flags = RRDSET_FLAG_UPSTREAM_IGNORE;
- }
}
else if(simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_id(st)) ||
- simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_name(st))) {
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+ simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_name(st)))
+
rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
- flags = RRDSET_FLAG_UPSTREAM_SEND;
- }
- else {
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
+ else
rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
- flags = RRDSET_FLAG_UPSTREAM_IGNORE;
- }
+
+ // get the flags again, to know how to respond
+ flags = rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE);
}
return flags & RRDSET_FLAG_UPSTREAM_SEND;
@@ -196,16 +191,17 @@ static int send_clabels_callback(const char *name, const char *value, RRDLABEL_S
buffer_sprintf(wb, "CLABEL \"%s\" \"%s\" %d\n", name, value, ls);
return 1;
}
-void rrdpush_send_clabels(RRDHOST *host, RRDSET *st) {
+
+static void rrdpush_send_clabels(BUFFER *wb, RRDSET *st) {
if (st->rrdlabels) {
- if(rrdlabels_walkthrough_read(st->rrdlabels, send_clabels_callback, host->sender->build) > 0)
- buffer_sprintf(host->sender->build,"CLABEL_COMMIT\n");
+ if(rrdlabels_walkthrough_read(st->rrdlabels, send_clabels_callback, wb) > 0)
+ buffer_sprintf(wb, "CLABEL_COMMIT\n");
}
}
// Send the current chart definition.
// Assumes that collector thread has already called sender_start for mutex / buffer state.
-static inline void rrdpush_send_chart_definition(RRDSET *st) {
+static inline void rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
RRDHOST *host = st->rrdhost;
rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
@@ -225,7 +221,7 @@ static inline void rrdpush_send_chart_definition(RRDSET *st) {
// send the chart
buffer_sprintf(
- host->sender->build
+ wb
, "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
, rrdset_id(st)
, name
@@ -245,14 +241,14 @@ static inline void rrdpush_send_chart_definition(RRDSET *st) {
);
// send the chart labels
- if (host->sender->version >= STREAM_VERSION_CLABELS)
- rrdpush_send_clabels(host, st);
+ if (stream_has_capability(host->sender, STREAM_CAP_CLABELS))
+ rrdpush_send_clabels(wb, st);
// send the dimensions
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
buffer_sprintf(
- host->sender->build
+ wb
, "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n"
, rrddim_id(rd)
, rrddim_name(rd)
@@ -267,30 +263,30 @@ static inline void rrdpush_send_chart_definition(RRDSET *st) {
}
rrddim_foreach_done(rd);
+ // send the chart functions
+ if(stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS))
+ rrd_functions_expose_rrdpush(st, wb);
+
// send the chart local custom variables
- rrdsetvar_print_to_streaming_custom_chart_variables(st, host->sender->build);
+ rrdsetvar_print_to_streaming_custom_chart_variables(st, wb);
st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
}
// sends the current chart dimensions
-static inline bool rrdpush_send_chart_metrics_nolock(RRDSET *st, struct sender_state *s) {
- RRDHOST *host = st->rrdhost;
- BUFFER *wb = host->sender->build;
-
+static inline void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s) {
buffer_fast_strcat(wb, "BEGIN \"", 7);
buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
buffer_fast_strcat(wb, "\" ", 2);
buffer_print_llu(wb, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0);
- if (s->version >= VERSION_GAP_FILLING) {
+ if (stream_has_capability(s, STREAM_CAP_GAP_FILLING)) {
buffer_fast_strcat(wb, " ", 1);
buffer_print_ll(wb, st->last_collected_time.tv_sec);
}
buffer_fast_strcat(wb, "\n", 1);
- size_t count_of_dimensions_written = 0;
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
if(unlikely(!rd->updated))
@@ -302,7 +298,6 @@ static inline bool rrdpush_send_chart_metrics_nolock(RRDSET *st, struct sender_s
buffer_fast_strcat(wb, "\" = ", 4);
buffer_print_ll(wb, rd->collected_value);
buffer_fast_strcat(wb, "\n", 1);
- count_of_dimensions_written++;
}
else {
internal_error(true, "host '%s', chart '%s', dimension '%s' flag 'exposed' is updated but not exposed", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
@@ -312,8 +307,6 @@ static inline bool rrdpush_send_chart_metrics_nolock(RRDSET *st, struct sender_s
}
rrddim_foreach_done(rd);
buffer_fast_strcat(wb, "END\n", 4);
-
- return count_of_dimensions_written != 0;
}
static void rrdpush_sender_thread_spawn(RRDHOST *host);
@@ -322,12 +315,12 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host);
bool rrdset_push_chart_definition_now(RRDSET *st) {
RRDHOST *host = st->rrdhost;
- if(unlikely(!host->rrdpush_send_enabled || !should_send_chart_matching(st)))
+ if(unlikely(!rrdhost_can_send_definitions_to_parent(host) || !should_send_chart_matching(st)))
return false;
- sender_start(host->sender);
- rrdpush_send_chart_definition(st);
- sender_commit(host->sender);
+ BUFFER *wb = sender_start(host->sender);
+ rrdpush_send_chart_definition(wb, st);
+ sender_commit(host->sender, wb);
return true;
}
@@ -365,44 +358,44 @@ bool rrdpush_incremental_transmission_of_chart_definitions(RRDHOST *host, DICTFE
}
void rrdset_done_push(RRDSET *st) {
- if(unlikely(!should_send_chart_matching(st)))
- return;
-
RRDHOST *host = st->rrdhost;
- // Handle non-connected case
- if(unlikely(!__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST)
- || !rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_COLLECTED_METRICS))) {
+ // fetch the flags we need to check with one atomic operation
+ RRDHOST_FLAGS flags = rrdhost_flag_check(host,
+ RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS
+ | RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS
+ | RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN
+ );
+
+ // check if we are not connected
+ if(unlikely(!(flags & RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS))) {
- if(unlikely(host->rrdpush_send_enabled && !host->rrdpush_sender_spawn))
+ if(unlikely(!(flags & RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)))
rrdpush_sender_thread_spawn(host);
- if(unlikely(!host->rrdpush_sender_error_shown))
+ if(unlikely(!(flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) {
+ rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
-
- host->rrdpush_sender_error_shown = 1;
+ }
return;
}
- else if(unlikely(host->rrdpush_sender_error_shown)) {
+ else if(unlikely(flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) {
info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
- host->rrdpush_sender_error_shown = 0;
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
}
- sender_start(host->sender);
+ if(unlikely(!should_send_chart_matching(st)))
+ return;
+
+ BUFFER *wb = sender_start(host->sender);
if(unlikely(need_to_send_chart_definition(st)))
- rrdpush_send_chart_definition(st);
+ rrdpush_send_chart_definition(wb, st);
- if(likely(rrdpush_send_chart_metrics_nolock(st, host->sender))) {
- // signal the sender there are more data
- if (host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
- error("STREAM %s [send]: cannot write to internal pipe", rrdhost_hostname(host));
+ rrdpush_send_chart_metrics(wb, st, host->sender);
- sender_commit(host->sender);
- }
- else
- sender_cancel(host->sender);
+ sender_commit(host->sender, wb);
}
// labels
@@ -411,45 +404,38 @@ static int send_labels_callback(const char *name, const char *value, RRDLABEL_SR
buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value);
return 1;
}
-void rrdpush_send_labels(RRDHOST *host) {
- if (!host->rrdlabels || !rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE) || (rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_STOP