summaryrefslogtreecommitdiffstats
path: root/streaming/receiver.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-10-05 14:13:46 +0300
committerGitHub <noreply@github.com>2022-10-05 14:13:46 +0300
commit8fc3b351a2e7fc96eced8f924de2e9cec9842128 (patch)
treebde41c66573ccaf8876c280e00742cc6096b587c /streaming/receiver.c
parent6850878e697d66dc90b9af1e750b22238c63c292 (diff)
Allow netdata plugins to expose functions for querying more information about specific charts (#13720)
* function renames and code cleanup in popen.c; no actual code changes * netdata popen() now opens both child process stdin and stdout and returns FILE * for both * pass both input and output to parser structures * updated rrdset to call custom functions * RRDSET FUNCTION leading calls for both sync and async operation * put RRDSET functions to a separate file * added format and timeout at function definition * support for synchronous (internal plugins) and asynchronous (external plugins and children) functions * /api/v1/function endpoint * functions are now attached to the host and there is a dictionary view per chart * functions implemented at plugins.d * remove the defer until keyword hook from plugins.d when it is done * stream sender implementation of functions * sanitization of all functions so that certain characters are only allowed * strictier sanitization * common max size * 1st working plugins.d example * always init inflight dictionary * properly destroy dictionaries to avoid parallel insertion of items * add more debugging on disconnection reasons * add more debugging on disconnection reasons again * streaming receiver respects newlines * dont use the same fp for both streaming receive and send * dont free dbengine memory with internal checks * make sender proceed in the buffer * added timing info and garbage collection at plugins.d * added info about routing nodes * added info about routing nodes with delay * added more info about delays * added more info about delays again * signal sending thread to wake up * streaming version labeling and commented code to support capabilities * added functions to /api/v1/data, /api/v1/charts, /api/v1/chart, /api/v1/info * redirect top output to stdout * address coverity findings * fix resource leaks of popen * log attempts to connect to individual destinations * better messages * properly parse destinations * try to find a function from the most matching to the least matching * log added streaming destinations * rotate destinations bypassing a node in the middle that does not accept our connection * break the loops properly * use typedef to define callbacks * capabilities negotiation during streaming * functions exposed upstream based on capabilities; compression disabled per node persisting reconnects; always try to connect with all capabilities * restore functionality to lookup functions * better logging of capabilities * remove old versions from capabilities when a newer version is there * fix formatting * optimization for plugins.d rrdlabels to avoid creating and destructing dictionaries all the time * delayed health initialization for rrddim and rrdset * cleanup health initialization * fix for popen() not returning the right value * add health worker jobs for initializing rrdset and rrddim * added content type support for functions; apps.plugin permanent function to display all the processes * fixes for functions parameters parsing in apps.plugin * fix for process matching in apps.plugiin * first working function for apps.plugin * Dashboard ACL is disabled for functions; Function errors are all in JSON format * apps.plugin function processes returns json table * use json_escape_string() to escape message * fix formatting * apps.plugin exposes all its metrics to function processes * fix json formatting when filtering out some rows * reopen the internal pipe of rrdpush in case of errors * misplaced statement * do not use buffer->len * support for GLOBAL functions (functions that are not linked to a chart * added /api/v1/functions endpoint; removed format from the FUNCTIONS api; * swagger documentation about the new api end points * added plugins.d documentation about functions * never re-close a file * remove uncessesary ifdef * fixed issues identified by codacy * fix for null label value * make edit-config copy-and-paste friendly * Revert "make edit-config copy-and-paste friendly" This reverts commit 54500c0e0a97f65a0c66c4d34e966f6a9056698e. * reworked sender handshake to fix coverity findings * timeout is zero, for both send_timeout() and recv_timeout() * properly detect that parent closed the socket * support caching of function responses; limit function response to 10MB; added protection from malformed function responses * disabled excessive logging * added units to apps.plugin function processes and normalized all values to be human readable * shorter field names * fixed issues reported * fixed apps.plugin error response; tested that pluginsd can properly handle faulty responses * use double linked list macros for double linked list management * faster apps.plugin function printing by minimizing file operations * added memory percentage * fix compatibility issues with older compilers and FreeBSD * rrdpush sender code cleanup; rrhost structure cleanup from sender flags and variables; * fix letftover variable in ifdef * apps.plugin: do not call detach from the thread; exit immediately when input is broken * exclude AR charts from health * flush cleaner; prefer sender output * clarity * do not fill the cbuffer if not connected * fix * dont enabled host->sender if streaming is not enabled; send host label updates to parent; * functions are only available through ACLK * Prepared statement reports only in dev mode * fix AR chart detection * fix for streaming not being enabling itself * more cleanup of sender and receiver structures * moved read-only flags and configuration options to rrdhost->options * fixed merge with master * fix for incomplete rename * prevent service thread from working on charts that are being collected Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r--streaming/receiver.c206
1 files changed, 125 insertions, 81 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index a2852981a2..6890f8b2d9 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -72,9 +72,8 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins
time_t remote_time = 0;
RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
struct plugind *cd = ((PARSER_USER_OBJECT *)user)->cd;
- if (cd->version < VERSION_GAP_FILLING ) {
- error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", rrdhost_hostname(host), cd->cmd,
- cd->version);
+ if (!(cd->capabilities & STREAM_CAP_GAP_FILLING)) {
+ error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", rrdhost_hostname(host), cd->cmd, cd->capabilities);
return PARSER_RC_OK; // Ignore error and continue stream
}
if (remote_time_txt && *remote_time_txt) {
@@ -111,8 +110,8 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins
(int64_t)remote_time);
int ret;
#ifdef ENABLE_HTTPS
- SSL *conn = host->stream_ssl.conn ;
- if(conn && !host->stream_ssl.flags) {
+ SSL *conn = host->receiver->ssl.conn ;
+ if(conn && !host->receiver->ssl.flags) {
ret = SSL_write(conn, message, strlen(message));
} else {
ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT);
@@ -292,8 +291,10 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
size_t available = sizeof(r->read_buffer) - r->read_len;
if (available) {
size_t len = r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, available);
- if (!len)
+ if (!len) {
+ internal_error(true, "decompressor returned zero length");
return 1;
+ }
r->read_len += len;
}
@@ -301,8 +302,10 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
}
int ret = 0;
- if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret))
+ if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret)) {
+ internal_error(true, "read_stream() failed (1).");
return 1;
+ }
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret);
@@ -320,8 +323,10 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
// we're unable to decompress incomplete block
char compressed[bytes_to_read];
do {
- if (read_stream(r, fp, compressed, bytes_to_read, &ret))
+ if (read_stream(r, fp, compressed, bytes_to_read, &ret)) {
+ internal_error(true, "read_stream() failed (2).");
return 1;
+ }
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret);
@@ -334,8 +339,10 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
// Decompress
size_t bytes_to_parse = r->decompressor->decompress(r->decompressor);
- if (!bytes_to_parse)
+ if (!bytes_to_parse) {
+ internal_error(true, "no bytes to parse.");
return 1;
+ }
// Fill read buffer with decompressed data
r->read_len = r->decompressor->get(r->decompressor, r->read_buffer, sizeof(r->read_buffer));
@@ -347,30 +354,56 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
/* Produce a full line if one exists, statefully return where we start next time.
* When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
*/
-static char *receiver_next_line(struct receiver_state *r, int *pos) {
- int start = *pos, scan = *pos;
- if (scan >= r->read_len) {
+static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t buffer_length, size_t *pos) {
+ size_t start = *pos;
+
+ char *ss = &r->read_buffer[start];
+ char *se = &r->read_buffer[r->read_len];
+ char *ds = buffer;
+ char *de = &buffer[buffer_length - 2];
+
+ if(ss >= se) {
r->read_len = 0;
return NULL;
}
- while (scan < r->read_len && r->read_buffer[scan] != '\n')
- scan++;
- if (scan < r->read_len && r->read_buffer[scan] == '\n') {
- *pos = scan+1;
- r->read_buffer[scan] = 0;
- return &r->read_buffer[start];
+
+ // copy all bytes to buffer
+ while(ss < se && ds < de && *ss != '\n')
+ *ds++ = *ss++;
+
+ // if we have a newline, return the buffer
+ if(ss < se && ds < de && *ss == '\n') {
+ // newline found in the r->read_buffer
+
+ *ds++ = *ss++; // copy the newline too
+ *ds = '\0';
+
+ *pos = ss - r->read_buffer;
+ return buffer;
}
+
+ // if the destination is full, oops!
+ if(ds == de) {
+ error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX);
+ *ds = '\0';
+ *pos = ss - r->read_buffer;
+ return buffer;
+ }
+
+ // no newline found in the r->read_buffer
+ // move everything to the beginning
memmove(r->read_buffer, &r->read_buffer[start], r->read_len - start);
- r->read_len -= start;
+ r->read_len -= (int)start;
return NULL;
}
static void streaming_parser_thread_cleanup(void *ptr) {
PARSER *parser = (PARSER *)ptr;
+ rrd_collector_finished();
parser_destroy(parser);
}
-size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp) {
+size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp_in, FILE *fp_out) {
size_t result;
PARSER_USER_OBJECT user = {
@@ -381,7 +414,9 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
.trust_durations = 1
};
- PARSER *parser = parser_init(rpt->host, &user, fp, PARSER_INPUT_SPLIT);
+ PARSER *parser = parser_init(rpt->host, &user, fp_in, fp_out, PARSER_INPUT_SPLIT);
+
+ rrd_collector_started();
// this keeps the parser with its current value
// so, parser needs to be allocated before pushing it
@@ -390,19 +425,6 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
parser_add_keyword(parser, "TIMESTAMP", streaming_timestamp);
parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id);
- parser->plugins_action->begin_action = &pluginsd_begin_action;
- parser->plugins_action->flush_action = &pluginsd_flush_action;
- parser->plugins_action->end_action = &pluginsd_end_action;
- parser->plugins_action->disable_action = &pluginsd_disable_action;
- parser->plugins_action->variable_action = &pluginsd_variable_action;
- parser->plugins_action->dimension_action = &pluginsd_dimension_action;
- parser->plugins_action->label_action = &pluginsd_label_action;
- parser->plugins_action->overwrite_action = &pluginsd_overwrite_action;
- parser->plugins_action->chart_action = &pluginsd_chart_action;
- parser->plugins_action->set_action = &pluginsd_set_action;
- parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action;
- parser->plugins_action->clabel_action = &pluginsd_clabel_action;
-
user.parser = parser;
#ifdef ENABLE_COMPRESSION
@@ -410,15 +432,26 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
rpt->decompressor->reset(rpt->decompressor);
#endif
- do{
- if (receiver_read(rpt, fp))
- break;
- int pos = 0;
- char *line;
- while ((line = receiver_next_line(rpt, &pos))) {
- if (unlikely(netdata_exit || rpt->shutdown || parser_action(parser, line)))
+ char buffer[PLUGINSD_LINE_MAX + 2];
+ do {
+ if(receiver_read(rpt, fp_in)) break;
+
+ size_t pos = 0;
+ while(receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &pos)) {
+ if(unlikely(netdata_exit)) {
+ internal_error(true, "exiting...");
+ goto done;
+ }
+ if(unlikely(rpt->shutdown)) {
+ internal_error(true, "parser shutdown...");
goto done;
+ }
+ if (unlikely(parser_action(parser, buffer))) {
+ internal_error(true, "parser_action() failed...");
+ goto done;
+ }
}
+
rpt->last_msg_t = now_realtime_sec();
}
while(!netdata_exit);
@@ -495,8 +528,6 @@ static int rrdpush_receive(struct receiver_state *rpt)
char initial_response[HTTP_HEADER_SIZE + 1];
snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST);
#ifdef ENABLE_HTTPS
- rpt->host->stream_ssl.conn = rpt->ssl.conn;
- rpt->host->stream_ssl.flags = rpt->ssl.flags;
if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
#else
if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
@@ -611,7 +642,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
.obsolete = 0,
.started_t = now_realtime_sec(),
.next = NULL,
- .version = 0,
+ .capabilities = 0,
};
// put the client IP and port into the buffers used by plugins.d
@@ -620,32 +651,31 @@ static int rrdpush_receive(struct receiver_state *rpt)
snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
- info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
- char initial_response[HTTP_HEADER_SIZE];
- if (rpt->stream_version > 1) {
- if(rpt->stream_version >= STREAM_VERSION_COMPRESSION){
#ifdef ENABLE_COMPRESSION
- if(!rpt->rrdpush_compression)
- rpt->stream_version = STREAM_VERSION_CLABELS;
-#else
- if(STREAMING_PROTOCOL_CURRENT_VERSION < rpt->stream_version) {
- rpt->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION;
- }
+ if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) {
+ if (!rpt->rrdpush_compression)
+ rpt->capabilities &= ~STREAM_CAP_COMPRESSION;
+ }
#endif
- }
- info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->stream_version);
- sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->stream_version);
- } else if (rpt->stream_version == 1) {
- info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->stream_version);
+
+ info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
+ char initial_response[HTTP_HEADER_SIZE];
+ if (stream_has_capability(rpt, STREAM_CAP_VCAPS)) {
+ log_receiver_capabilities(rpt);
+ sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->capabilities);
+ }
+ else if (stream_has_capability(rpt, STREAM_CAP_VN)) {
+ log_receiver_capabilities(rpt);
+ sprintf(initial_response, "%s%d", START_STREAMING_PROMPT_VN, stream_capabilities_to_vn(rpt->capabilities));
+ } else if (stream_has_capability(rpt, STREAM_CAP_V2)) {
+ log_receiver_capabilities(rpt);
sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2);
- } else {
- info("STREAM %s [receive from [%s]:%s]: Netdata is using first stream protocol.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
- sprintf(initial_response, "%s", START_STREAMING_PROMPT);
+ } else { // stream_has_capability(rpt, STREAM_CAP_V1)
+ log_receiver_capabilities(rpt);
+ sprintf(initial_response, "%s", START_STREAMING_PROMPT_V1);
}
debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response);
- #ifdef ENABLE_HTTPS
- rpt->host->stream_ssl.conn = rpt->ssl.conn;
- rpt->host->stream_ssl.flags = rpt->ssl.flags;
+#ifdef ENABLE_HTTPS
if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
#else
if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
@@ -667,11 +697,33 @@ static int rrdpush_receive(struct receiver_state *rpt)
error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
// convert the socket to a FILE *
- FILE *fp = fdopen(rpt->fd, "r");
- if(!fp) {
+ // It seems that the same FILE * cannot be used for both reading and writing.
+ // (reads and writes seem to interfere with each other, with undefined results).
+
+ int fd_in = rpt->fd;
+ int fd_out = fcntl(rpt->fd, F_DUPFD_CLOEXEC, 0);
+ if(fd_out == -1) {
+ error("STREAM %s [receive from [%s]:%s]: failed to duplicate FD %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - SOCKET ERROR");
- error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
- close(rpt->fd);
+ close(fd_in);
+ return 0;
+ }
+
+ FILE *fp_out = fdopen(fd_out, "w");
+ if(!fp_out) {
+ error("STREAM %s [receive from [%s]:%s]: failed to get a FILE pointer for fd_out %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - SOCKET ERROR");
+ close(fd_in);
+ close(fd_out);
+ return 0;
+ }
+
+ FILE *fp_in = fdopen(fd_in, "r");
+ if(!fp_in) {
+ error("STREAM %s [receive from [%s]:%s]: failed to get a FILE pointer for fd_in %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - SOCKET ERROR");
+ close(fd_in);
+ fclose(fp_out);
return 0;
}
@@ -686,15 +738,6 @@ static int rrdpush_receive(struct receiver_state *rpt)
*/
// rpt->host->connected_senders++;
- if(rpt->stream_version > 0) {
- rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
- rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP);
- }
- else {
- rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP);
- rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
- }
-
if(health_enabled != CONFIG_BOOLEAN_NO) {
if(alarms_delay > 0) {
rpt->host->health_delay_up_to = now_realtime_sec() + alarms_delay;
@@ -713,7 +756,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
info("STREAM %s [receive from [%s]:%s]: receiving metrics...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED");
- cd.version = rpt->stream_version;
+ cd.capabilities = rpt->capabilities;
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
@@ -724,7 +767,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdcontext_host_child_connected(rpt->host);
- size_t count = streaming_parser(rpt, &cd, fp);
+ size_t count = streaming_parser(rpt, &cd, fp_in, fp_out);
log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname,
"DISCONNECTED");
@@ -762,7 +805,8 @@ static int rrdpush_receive(struct receiver_state *rpt)
}
// cleanup
- fclose(fp);
+ fclose(fp_in);
+ fclose(fp_out);
return (int)count;
}