diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-10-05 14:13:46 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-05 14:13:46 +0300 |
commit | 8fc3b351a2e7fc96eced8f924de2e9cec9842128 (patch) | |
tree | bde41c66573ccaf8876c280e00742cc6096b587c /streaming/receiver.c | |
parent | 6850878e697d66dc90b9af1e750b22238c63c292 (diff) |
Allow netdata plugins to expose functions for querying more information about specific charts (#13720)
* function renames and code cleanup in popen.c; no actual code changes
* netdata popen() now opens both child process stdin and stdout and returns FILE * for both
* pass both input and output to parser structures
* updated rrdset to call custom functions
* RRDSET FUNCTION leading calls for both sync and async operation
* put RRDSET functions to a separate file
* added format and timeout at function definition
* support for synchronous (internal plugins) and asynchronous (external plugins and children) functions
* /api/v1/function endpoint
* functions are now attached to the host and there is a dictionary view per chart
* functions implemented at plugins.d
* remove the defer until keyword hook from plugins.d when it is done
* stream sender implementation of functions
* sanitization of all functions so that certain characters are only allowed
* strictier sanitization
* common max size
* 1st working plugins.d example
* always init inflight dictionary
* properly destroy dictionaries to avoid parallel insertion of items
* add more debugging on disconnection reasons
* add more debugging on disconnection reasons again
* streaming receiver respects newlines
* dont use the same fp for both streaming receive and send
* dont free dbengine memory with internal checks
* make sender proceed in the buffer
* added timing info and garbage collection at plugins.d
* added info about routing nodes
* added info about routing nodes with delay
* added more info about delays
* added more info about delays again
* signal sending thread to wake up
* streaming version labeling and commented code to support capabilities
* added functions to /api/v1/data, /api/v1/charts, /api/v1/chart, /api/v1/info
* redirect top output to stdout
* address coverity findings
* fix resource leaks of popen
* log attempts to connect to individual destinations
* better messages
* properly parse destinations
* try to find a function from the most matching to the least matching
* log added streaming destinations
* rotate destinations bypassing a node in the middle that does not accept our connection
* break the loops properly
* use typedef to define callbacks
* capabilities negotiation during streaming
* functions exposed upstream based on capabilities; compression disabled per node persisting reconnects; always try to connect with all capabilities
* restore functionality to lookup functions
* better logging of capabilities
* remove old versions from capabilities when a newer version is there
* fix formatting
* optimization for plugins.d rrdlabels to avoid creating and destructing dictionaries all the time
* delayed health initialization for rrddim and rrdset
* cleanup health initialization
* fix for popen() not returning the right value
* add health worker jobs for initializing rrdset and rrddim
* added content type support for functions; apps.plugin permanent function to display all the processes
* fixes for functions parameters parsing in apps.plugin
* fix for process matching in apps.plugiin
* first working function for apps.plugin
* Dashboard ACL is disabled for functions; Function errors are all in JSON format
* apps.plugin function processes returns json table
* use json_escape_string() to escape message
* fix formatting
* apps.plugin exposes all its metrics to function processes
* fix json formatting when filtering out some rows
* reopen the internal pipe of rrdpush in case of errors
* misplaced statement
* do not use buffer->len
* support for GLOBAL functions (functions that are not linked to a chart
* added /api/v1/functions endpoint; removed format from the FUNCTIONS api;
* swagger documentation about the new api end points
* added plugins.d documentation about functions
* never re-close a file
* remove uncessesary ifdef
* fixed issues identified by codacy
* fix for null label value
* make edit-config copy-and-paste friendly
* Revert "make edit-config copy-and-paste friendly"
This reverts commit 54500c0e0a97f65a0c66c4d34e966f6a9056698e.
* reworked sender handshake to fix coverity findings
* timeout is zero, for both send_timeout() and recv_timeout()
* properly detect that parent closed the socket
* support caching of function responses; limit function response to 10MB; added protection from malformed function responses
* disabled excessive logging
* added units to apps.plugin function processes and normalized all values to be human readable
* shorter field names
* fixed issues reported
* fixed apps.plugin error response; tested that pluginsd can properly handle faulty responses
* use double linked list macros for double linked list management
* faster apps.plugin function printing by minimizing file operations
* added memory percentage
* fix compatibility issues with older compilers and FreeBSD
* rrdpush sender code cleanup; rrhost structure cleanup from sender flags and variables;
* fix letftover variable in ifdef
* apps.plugin: do not call detach from the thread; exit immediately when input is broken
* exclude AR charts from health
* flush cleaner; prefer sender output
* clarity
* do not fill the cbuffer if not connected
* fix
* dont enabled host->sender if streaming is not enabled; send host label updates to parent;
* functions are only available through ACLK
* Prepared statement reports only in dev mode
* fix AR chart detection
* fix for streaming not being enabling itself
* more cleanup of sender and receiver structures
* moved read-only flags and configuration options to rrdhost->options
* fixed merge with master
* fix for incomplete rename
* prevent service thread from working on charts that are being collected
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r-- | streaming/receiver.c | 206 |
1 files changed, 125 insertions, 81 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index a2852981a2..6890f8b2d9 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -72,9 +72,8 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins time_t remote_time = 0; RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; struct plugind *cd = ((PARSER_USER_OBJECT *)user)->cd; - if (cd->version < VERSION_GAP_FILLING ) { - error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", rrdhost_hostname(host), cd->cmd, - cd->version); + if (!(cd->capabilities & STREAM_CAP_GAP_FILLING)) { + error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", rrdhost_hostname(host), cd->cmd, cd->capabilities); return PARSER_RC_OK; // Ignore error and continue stream } if (remote_time_txt && *remote_time_txt) { @@ -111,8 +110,8 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins (int64_t)remote_time); int ret; #ifdef ENABLE_HTTPS - SSL *conn = host->stream_ssl.conn ; - if(conn && !host->stream_ssl.flags) { + SSL *conn = host->receiver->ssl.conn ; + if(conn && !host->receiver->ssl.flags) { ret = SSL_write(conn, message, strlen(message)); } else { ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT); @@ -292,8 +291,10 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { size_t available = sizeof(r->read_buffer) - r->read_len; if (available) { size_t len = r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, available); - if (!len) + if (!len) { + internal_error(true, "decompressor returned zero length"); return 1; + } r->read_len += len; } @@ -301,8 +302,10 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { } int ret = 0; - if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret)) + if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret)) { + internal_error(true, "read_stream() failed (1)."); return 1; + } worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret); @@ -320,8 +323,10 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { // we're unable to decompress incomplete block char compressed[bytes_to_read]; do { - if (read_stream(r, fp, compressed, bytes_to_read, &ret)) + if (read_stream(r, fp, compressed, bytes_to_read, &ret)) { + internal_error(true, "read_stream() failed (2)."); return 1; + } worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret); @@ -334,8 +339,10 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { // Decompress size_t bytes_to_parse = r->decompressor->decompress(r->decompressor); - if (!bytes_to_parse) + if (!bytes_to_parse) { + internal_error(true, "no bytes to parse."); return 1; + } // Fill read buffer with decompressed data r->read_len = r->decompressor->get(r->decompressor, r->read_buffer, sizeof(r->read_buffer)); @@ -347,30 +354,56 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { /* Produce a full line if one exists, statefully return where we start next time. * When we hit the end of the buffer with a partial line move it to the beginning for the next fill. */ -static char *receiver_next_line(struct receiver_state *r, int *pos) { - int start = *pos, scan = *pos; - if (scan >= r->read_len) { +static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t buffer_length, size_t *pos) { + size_t start = *pos; + + char *ss = &r->read_buffer[start]; + char *se = &r->read_buffer[r->read_len]; + char *ds = buffer; + char *de = &buffer[buffer_length - 2]; + + if(ss >= se) { r->read_len = 0; return NULL; } - while (scan < r->read_len && r->read_buffer[scan] != '\n') - scan++; - if (scan < r->read_len && r->read_buffer[scan] == '\n') { - *pos = scan+1; - r->read_buffer[scan] = 0; - return &r->read_buffer[start]; + + // copy all bytes to buffer + while(ss < se && ds < de && *ss != '\n') + *ds++ = *ss++; + + // if we have a newline, return the buffer + if(ss < se && ds < de && *ss == '\n') { + // newline found in the r->read_buffer + + *ds++ = *ss++; // copy the newline too + *ds = '\0'; + + *pos = ss - r->read_buffer; + return buffer; } + + // if the destination is full, oops! + if(ds == de) { + error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX); + *ds = '\0'; + *pos = ss - r->read_buffer; + return buffer; + } + + // no newline found in the r->read_buffer + // move everything to the beginning memmove(r->read_buffer, &r->read_buffer[start], r->read_len - start); - r->read_len -= start; + r->read_len -= (int)start; return NULL; } static void streaming_parser_thread_cleanup(void *ptr) { PARSER *parser = (PARSER *)ptr; + rrd_collector_finished(); parser_destroy(parser); } -size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp) { +size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp_in, FILE *fp_out) { size_t result; PARSER_USER_OBJECT user = { @@ -381,7 +414,9 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp .trust_durations = 1 }; - PARSER *parser = parser_init(rpt->host, &user, fp, PARSER_INPUT_SPLIT); + PARSER *parser = parser_init(rpt->host, &user, fp_in, fp_out, PARSER_INPUT_SPLIT); + + rrd_collector_started(); // this keeps the parser with its current value // so, parser needs to be allocated before pushing it @@ -390,19 +425,6 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp parser_add_keyword(parser, "TIMESTAMP", streaming_timestamp); parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id); - parser->plugins_action->begin_action = &pluginsd_begin_action; - parser->plugins_action->flush_action = &pluginsd_flush_action; - parser->plugins_action->end_action = &pluginsd_end_action; - parser->plugins_action->disable_action = &pluginsd_disable_action; - parser->plugins_action->variable_action = &pluginsd_variable_action; - parser->plugins_action->dimension_action = &pluginsd_dimension_action; - parser->plugins_action->label_action = &pluginsd_label_action; - parser->plugins_action->overwrite_action = &pluginsd_overwrite_action; - parser->plugins_action->chart_action = &pluginsd_chart_action; - parser->plugins_action->set_action = &pluginsd_set_action; - parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action; - parser->plugins_action->clabel_action = &pluginsd_clabel_action; - user.parser = parser; #ifdef ENABLE_COMPRESSION @@ -410,15 +432,26 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp rpt->decompressor->reset(rpt->decompressor); #endif - do{ - if (receiver_read(rpt, fp)) - break; - int pos = 0; - char *line; - while ((line = receiver_next_line(rpt, &pos))) { - if (unlikely(netdata_exit || rpt->shutdown || parser_action(parser, line))) + char buffer[PLUGINSD_LINE_MAX + 2]; + do { + if(receiver_read(rpt, fp_in)) break; + + size_t pos = 0; + while(receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &pos)) { + if(unlikely(netdata_exit)) { + internal_error(true, "exiting..."); + goto done; + } + if(unlikely(rpt->shutdown)) { + internal_error(true, "parser shutdown..."); goto done; + } + if (unlikely(parser_action(parser, buffer))) { + internal_error(true, "parser_action() failed..."); + goto done; + } } + rpt->last_msg_t = now_realtime_sec(); } while(!netdata_exit); @@ -495,8 +528,6 @@ static int rrdpush_receive(struct receiver_state *rpt) char initial_response[HTTP_HEADER_SIZE + 1]; snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST); #ifdef ENABLE_HTTPS - rpt->host->stream_ssl.conn = rpt->ssl.conn; - rpt->host->stream_ssl.flags = rpt->ssl.flags; if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { #else if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { @@ -611,7 +642,7 @@ static int rrdpush_receive(struct receiver_state *rpt) .obsolete = 0, .started_t = now_realtime_sec(), .next = NULL, - .version = 0, + .capabilities = 0, }; // put the client IP and port into the buffers used by plugins.d @@ -620,32 +651,31 @@ static int rrdpush_receive(struct receiver_state *rpt) snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port); snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port); - info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); - char initial_response[HTTP_HEADER_SIZE]; - if (rpt->stream_version > 1) { - if(rpt->stream_version >= STREAM_VERSION_COMPRESSION){ #ifdef ENABLE_COMPRESSION - if(!rpt->rrdpush_compression) - rpt->stream_version = STREAM_VERSION_CLABELS; -#else - if(STREAMING_PROTOCOL_CURRENT_VERSION < rpt->stream_version) { - rpt->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION; - } + if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { + if (!rpt->rrdpush_compression) + rpt->capabilities &= ~STREAM_CAP_COMPRESSION; + } #endif - } - info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->stream_version); - sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->stream_version); - } else if (rpt->stream_version == 1) { - info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->stream_version); + + info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); + char initial_response[HTTP_HEADER_SIZE]; + if (stream_has_capability(rpt, STREAM_CAP_VCAPS)) { + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->capabilities); + } + else if (stream_has_capability(rpt, STREAM_CAP_VN)) { + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s%d", START_STREAMING_PROMPT_VN, stream_capabilities_to_vn(rpt->capabilities)); + } else if (stream_has_capability(rpt, STREAM_CAP_V2)) { + log_receiver_capabilities(rpt); sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2); - } else { - info("STREAM %s [receive from [%s]:%s]: Netdata is using first stream protocol.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); - sprintf(initial_response, "%s", START_STREAMING_PROMPT); + } else { // stream_has_capability(rpt, STREAM_CAP_V1) + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s", START_STREAMING_PROMPT_V1); } debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response); - #ifdef ENABLE_HTTPS - rpt->host->stream_ssl.conn = rpt->ssl.conn; - rpt->host->stream_ssl.flags = rpt->ssl.flags; +#ifdef ENABLE_HTTPS if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { #else if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { @@ -667,11 +697,33 @@ static int rrdpush_receive(struct receiver_state *rpt) error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); // convert the socket to a FILE * - FILE *fp = fdopen(rpt->fd, "r"); - if(!fp) { + // It seems that the same FILE * cannot be used for both reading and writing. + // (reads and writes seem to interfere with each other, with undefined results). + + int fd_in = rpt->fd; + int fd_out = fcntl(rpt->fd, F_DUPFD_CLOEXEC, 0); + if(fd_out == -1) { + error("STREAM %s [receive from [%s]:%s]: failed to duplicate FD %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - SOCKET ERROR"); - error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); - close(rpt->fd); + close(fd_in); + return 0; + } + + FILE *fp_out = fdopen(fd_out, "w"); + if(!fp_out) { + error("STREAM %s [receive from [%s]:%s]: failed to get a FILE pointer for fd_out %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - SOCKET ERROR"); + close(fd_in); + close(fd_out); + return 0; + } + + FILE *fp_in = fdopen(fd_in, "r"); + if(!fp_in) { + error("STREAM %s [receive from [%s]:%s]: failed to get a FILE pointer for fd_in %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - SOCKET ERROR"); + close(fd_in); + fclose(fp_out); return 0; } @@ -686,15 +738,6 @@ static int rrdpush_receive(struct receiver_state *rpt) */ // rpt->host->connected_senders++; - if(rpt->stream_version > 0) { - rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); - rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP); - } - else { - rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP); - rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); - } - if(health_enabled != CONFIG_BOOLEAN_NO) { if(alarms_delay > 0) { rpt->host->health_delay_up_to = now_realtime_sec() + alarms_delay; @@ -713,7 +756,7 @@ static int rrdpush_receive(struct receiver_state *rpt) info("STREAM %s [receive from [%s]:%s]: receiving metrics...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED"); - cd.version = rpt->stream_version; + cd.capabilities = rpt->capabilities; #ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud @@ -724,7 +767,7 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdcontext_host_child_connected(rpt->host); - size_t count = streaming_parser(rpt, &cd, fp); + size_t count = streaming_parser(rpt, &cd, fp_in, fp_out); log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname, "DISCONNECTED"); @@ -762,7 +805,8 @@ static int rrdpush_receive(struct receiver_state *rpt) } // cleanup - fclose(fp); + fclose(fp_in); + fclose(fp_out); return (int)count; } |