diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-10-05 14:13:46 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-05 14:13:46 +0300 |
commit | 8fc3b351a2e7fc96eced8f924de2e9cec9842128 (patch) | |
tree | bde41c66573ccaf8876c280e00742cc6096b587c /streaming | |
parent | 6850878e697d66dc90b9af1e750b22238c63c292 (diff) |
Allow netdata plugins to expose functions for querying more information about specific charts (#13720)
* function renames and code cleanup in popen.c; no actual code changes
* netdata popen() now opens both child process stdin and stdout and returns FILE * for both
* pass both input and output to parser structures
* updated rrdset to call custom functions
* RRDSET FUNCTION leading calls for both sync and async operation
* put RRDSET functions to a separate file
* added format and timeout at function definition
* support for synchronous (internal plugins) and asynchronous (external plugins and children) functions
* /api/v1/function endpoint
* functions are now attached to the host and there is a dictionary view per chart
* functions implemented at plugins.d
* remove the defer until keyword hook from plugins.d when it is done
* stream sender implementation of functions
* sanitization of all functions so that certain characters are only allowed
* strictier sanitization
* common max size
* 1st working plugins.d example
* always init inflight dictionary
* properly destroy dictionaries to avoid parallel insertion of items
* add more debugging on disconnection reasons
* add more debugging on disconnection reasons again
* streaming receiver respects newlines
* dont use the same fp for both streaming receive and send
* dont free dbengine memory with internal checks
* make sender proceed in the buffer
* added timing info and garbage collection at plugins.d
* added info about routing nodes
* added info about routing nodes with delay
* added more info about delays
* added more info about delays again
* signal sending thread to wake up
* streaming version labeling and commented code to support capabilities
* added functions to /api/v1/data, /api/v1/charts, /api/v1/chart, /api/v1/info
* redirect top output to stdout
* address coverity findings
* fix resource leaks of popen
* log attempts to connect to individual destinations
* better messages
* properly parse destinations
* try to find a function from the most matching to the least matching
* log added streaming destinations
* rotate destinations bypassing a node in the middle that does not accept our connection
* break the loops properly
* use typedef to define callbacks
* capabilities negotiation during streaming
* functions exposed upstream based on capabilities; compression disabled per node persisting reconnects; always try to connect with all capabilities
* restore functionality to lookup functions
* better logging of capabilities
* remove old versions from capabilities when a newer version is there
* fix formatting
* optimization for plugins.d rrdlabels to avoid creating and destructing dictionaries all the time
* delayed health initialization for rrddim and rrdset
* cleanup health initialization
* fix for popen() not returning the right value
* add health worker jobs for initializing rrdset and rrddim
* added content type support for functions; apps.plugin permanent function to display all the processes
* fixes for functions parameters parsing in apps.plugin
* fix for process matching in apps.plugiin
* first working function for apps.plugin
* Dashboard ACL is disabled for functions; Function errors are all in JSON format
* apps.plugin function processes returns json table
* use json_escape_string() to escape message
* fix formatting
* apps.plugin exposes all its metrics to function processes
* fix json formatting when filtering out some rows
* reopen the internal pipe of rrdpush in case of errors
* misplaced statement
* do not use buffer->len
* support for GLOBAL functions (functions that are not linked to a chart
* added /api/v1/functions endpoint; removed format from the FUNCTIONS api;
* swagger documentation about the new api end points
* added plugins.d documentation about functions
* never re-close a file
* remove uncessesary ifdef
* fixed issues identified by codacy
* fix for null label value
* make edit-config copy-and-paste friendly
* Revert "make edit-config copy-and-paste friendly"
This reverts commit 54500c0e0a97f65a0c66c4d34e966f6a9056698e.
* reworked sender handshake to fix coverity findings
* timeout is zero, for both send_timeout() and recv_timeout()
* properly detect that parent closed the socket
* support caching of function responses; limit function response to 10MB; added protection from malformed function responses
* disabled excessive logging
* added units to apps.plugin function processes and normalized all values to be human readable
* shorter field names
* fixed issues reported
* fixed apps.plugin error response; tested that pluginsd can properly handle faulty responses
* use double linked list macros for double linked list management
* faster apps.plugin function printing by minimizing file operations
* added memory percentage
* fix compatibility issues with older compilers and FreeBSD
* rrdpush sender code cleanup; rrhost structure cleanup from sender flags and variables;
* fix letftover variable in ifdef
* apps.plugin: do not call detach from the thread; exit immediately when input is broken
* exclude AR charts from health
* flush cleaner; prefer sender output
* clarity
* do not fill the cbuffer if not connected
* fix
* dont enabled host->sender if streaming is not enabled; send host label updates to parent;
* functions are only available through ACLK
* Prepared statement reports only in dev mode
* fix AR chart detection
* fix for streaming not being enabling itself
* more cleanup of sender and receiver structures
* moved read-only flags and configuration options to rrdhost->options
* fixed merge with master
* fix for incomplete rename
* prevent service thread from working on charts that are being collected
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/compression.c | 12 | ||||
-rw-r--r-- | streaming/receiver.c | 206 | ||||
-rw-r--r-- | streaming/rrdpush.c | 376 | ||||
-rw-r--r-- | streaming/rrdpush.h | 134 | ||||
-rw-r--r-- | streaming/sender.c | 1018 |
5 files changed, 1108 insertions, 638 deletions
diff --git a/streaming/compression.c b/streaming/compression.c index cae370817a..1fddc02b91 100644 --- a/streaming/compression.c +++ b/streaming/compression.c @@ -66,8 +66,8 @@ static size_t lz4_compressor_compress(struct compressor_state *state, const char if(unlikely(!state || !size || !out)) return 0; - if(unlikely(size > LZ4_MAX_MSG_SIZE)) { - error("%s: Compression Failed - Message size %lu above compression buffer limit: %d", STREAM_COMPRESSION_MSG, (long unsigned int) size, LZ4_MAX_MSG_SIZE); + if(unlikely(size > COMPRESSION_MAX_MSG_SIZE)) { + error("%s: Compression Failed - Message size %lu above compression buffer limit: %d", STREAM_COMPRESSION_MSG, (long unsigned int)size, COMPRESSION_MAX_MSG_SIZE); return 0; } @@ -103,7 +103,7 @@ static size_t lz4_compressor_compress(struct compressor_state *state, const char // update the next writing position of the ring buffer state->data->input_ring_buffer_pos += size; - if(unlikely(state->data->input_ring_buffer_pos >= state->data->input_ring_buffer_size - LZ4_MAX_MSG_SIZE)) + if(unlikely(state->data->input_ring_buffer_pos >= state->data->input_ring_buffer_size - COMPRESSION_MAX_MSG_SIZE)) state->data->input_ring_buffer_pos = 0; // update the signature header @@ -128,7 +128,7 @@ struct compressor_state *create_compressor() state->data = callocz(1, sizeof(struct compressor_data)); state->data->stream = LZ4_createStream(); - state->data->input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(LZ4_MAX_MSG_SIZE * 2); + state->data->input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(COMPRESSION_MAX_MSG_SIZE * 2); state->data->input_ring_buffer = callocz(1, state->data->input_ring_buffer_size); state->compression_result_buffer_size = 0; state->reset(state); @@ -280,7 +280,7 @@ static size_t lz4_decompressor_decompress(struct decompressor_state *state) state->out_buffer = state->data->stream_buffer + state->data->stream_buffer_pos; state->data->stream_buffer_pos += decompressed_size; - if (state->data->stream_buffer_pos >= state->data->stream_buffer_size - LZ4_MAX_MSG_SIZE) + if (state->data->stream_buffer_pos >= state->data->stream_buffer_size - COMPRESSION_MAX_MSG_SIZE) state->data->stream_buffer_pos = 0; state->out_buffer_len = decompressed_size; state->out_buffer_pos = 0; @@ -358,7 +358,7 @@ struct decompressor_state *create_decompressor() state->data = callocz(1, sizeof(struct decompressor_data)); fatal_assert(state->data); state->data->stream = LZ4_createStreamDecode(); - state->data->stream_buffer_size = LZ4_decoderRingBufferSize(LZ4_MAX_MSG_SIZE); + state->data->stream_buffer_size = LZ4_decoderRingBufferSize(COMPRESSION_MAX_MSG_SIZE); state->data->stream_buffer = mallocz(state->data->stream_buffer_size); fatal_assert(state->data->stream_buffer); state->reset(state); diff --git a/streaming/receiver.c b/streaming/receiver.c index a2852981a2..6890f8b2d9 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -72,9 +72,8 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins time_t remote_time = 0; RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; struct plugind *cd = ((PARSER_USER_OBJECT *)user)->cd; - if (cd->version < VERSION_GAP_FILLING ) { - error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", rrdhost_hostname(host), cd->cmd, - cd->version); + if (!(cd->capabilities & STREAM_CAP_GAP_FILLING)) { + error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", rrdhost_hostname(host), cd->cmd, cd->capabilities); return PARSER_RC_OK; // Ignore error and continue stream } if (remote_time_txt && *remote_time_txt) { @@ -111,8 +110,8 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins (int64_t)remote_time); int ret; #ifdef ENABLE_HTTPS - SSL *conn = host->stream_ssl.conn ; - if(conn && !host->stream_ssl.flags) { + SSL *conn = host->receiver->ssl.conn ; + if(conn && !host->receiver->ssl.flags) { ret = SSL_write(conn, message, strlen(message)); } else { ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT); @@ -292,8 +291,10 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { size_t available = sizeof(r->read_buffer) - r->read_len; if (available) { size_t len = r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, available); - if (!len) + if (!len) { + internal_error(true, "decompressor returned zero length"); return 1; + } r->read_len += len; } @@ -301,8 +302,10 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { } int ret = 0; - if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret)) + if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret)) { + internal_error(true, "read_stream() failed (1)."); return 1; + } worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret); @@ -320,8 +323,10 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { // we're unable to decompress incomplete block char compressed[bytes_to_read]; do { - if (read_stream(r, fp, compressed, bytes_to_read, &ret)) + if (read_stream(r, fp, compressed, bytes_to_read, &ret)) { + internal_error(true, "read_stream() failed (2)."); return 1; + } worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret); @@ -334,8 +339,10 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { // Decompress size_t bytes_to_parse = r->decompressor->decompress(r->decompressor); - if (!bytes_to_parse) + if (!bytes_to_parse) { + internal_error(true, "no bytes to parse."); return 1; + } // Fill read buffer with decompressed data r->read_len = r->decompressor->get(r->decompressor, r->read_buffer, sizeof(r->read_buffer)); @@ -347,30 +354,56 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { /* Produce a full line if one exists, statefully return where we start next time. * When we hit the end of the buffer with a partial line move it to the beginning for the next fill. */ -static char *receiver_next_line(struct receiver_state *r, int *pos) { - int start = *pos, scan = *pos; - if (scan >= r->read_len) { +static char *receiver_next_line(struct receiver_state *r, char *buffer, size_t buffer_length, size_t *pos) { + size_t start = *pos; + + char *ss = &r->read_buffer[start]; + char *se = &r->read_buffer[r->read_len]; + char *ds = buffer; + char *de = &buffer[buffer_length - 2]; + + if(ss >= se) { r->read_len = 0; return NULL; } - while (scan < r->read_len && r->read_buffer[scan] != '\n') - scan++; - if (scan < r->read_len && r->read_buffer[scan] == '\n') { - *pos = scan+1; - r->read_buffer[scan] = 0; - return &r->read_buffer[start]; + + // copy all bytes to buffer + while(ss < se && ds < de && *ss != '\n') + *ds++ = *ss++; + + // if we have a newline, return the buffer + if(ss < se && ds < de && *ss == '\n') { + // newline found in the r->read_buffer + + *ds++ = *ss++; // copy the newline too + *ds = '\0'; + + *pos = ss - r->read_buffer; + return buffer; } + + // if the destination is full, oops! + if(ds == de) { + error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX); + *ds = '\0'; + *pos = ss - r->read_buffer; + return buffer; + } + + // no newline found in the r->read_buffer + // move everything to the beginning memmove(r->read_buffer, &r->read_buffer[start], r->read_len - start); - r->read_len -= start; + r->read_len -= (int)start; return NULL; } static void streaming_parser_thread_cleanup(void *ptr) { PARSER *parser = (PARSER *)ptr; + rrd_collector_finished(); parser_destroy(parser); } -size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp) { +size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp_in, FILE *fp_out) { size_t result; PARSER_USER_OBJECT user = { @@ -381,7 +414,9 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp .trust_durations = 1 }; - PARSER *parser = parser_init(rpt->host, &user, fp, PARSER_INPUT_SPLIT); + PARSER *parser = parser_init(rpt->host, &user, fp_in, fp_out, PARSER_INPUT_SPLIT); + + rrd_collector_started(); // this keeps the parser with its current value // so, parser needs to be allocated before pushing it @@ -390,19 +425,6 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp parser_add_keyword(parser, "TIMESTAMP", streaming_timestamp); parser_add_keyword(parser, "CLAIMED_ID", streaming_claimed_id); - parser->plugins_action->begin_action = &pluginsd_begin_action; - parser->plugins_action->flush_action = &pluginsd_flush_action; - parser->plugins_action->end_action = &pluginsd_end_action; - parser->plugins_action->disable_action = &pluginsd_disable_action; - parser->plugins_action->variable_action = &pluginsd_variable_action; - parser->plugins_action->dimension_action = &pluginsd_dimension_action; - parser->plugins_action->label_action = &pluginsd_label_action; - parser->plugins_action->overwrite_action = &pluginsd_overwrite_action; - parser->plugins_action->chart_action = &pluginsd_chart_action; - parser->plugins_action->set_action = &pluginsd_set_action; - parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action; - parser->plugins_action->clabel_action = &pluginsd_clabel_action; - user.parser = parser; #ifdef ENABLE_COMPRESSION @@ -410,15 +432,26 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp rpt->decompressor->reset(rpt->decompressor); #endif - do{ - if (receiver_read(rpt, fp)) - break; - int pos = 0; - char *line; - while ((line = receiver_next_line(rpt, &pos))) { - if (unlikely(netdata_exit || rpt->shutdown || parser_action(parser, line))) + char buffer[PLUGINSD_LINE_MAX + 2]; + do { + if(receiver_read(rpt, fp_in)) break; + + size_t pos = 0; + while(receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &pos)) { + if(unlikely(netdata_exit)) { + internal_error(true, "exiting..."); + goto done; + } + if(unlikely(rpt->shutdown)) { + internal_error(true, "parser shutdown..."); goto done; + } + if (unlikely(parser_action(parser, buffer))) { + internal_error(true, "parser_action() failed..."); + goto done; + } } + rpt->last_msg_t = now_realtime_sec(); } while(!netdata_exit); @@ -495,8 +528,6 @@ static int rrdpush_receive(struct receiver_state *rpt) char initial_response[HTTP_HEADER_SIZE + 1]; snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST); #ifdef ENABLE_HTTPS - rpt->host->stream_ssl.conn = rpt->ssl.conn; - rpt->host->stream_ssl.flags = rpt->ssl.flags; if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { #else if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { @@ -611,7 +642,7 @@ static int rrdpush_receive(struct receiver_state *rpt) .obsolete = 0, .started_t = now_realtime_sec(), .next = NULL, - .version = 0, + .capabilities = 0, }; // put the client IP and port into the buffers used by plugins.d @@ -620,32 +651,31 @@ static int rrdpush_receive(struct receiver_state *rpt) snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port); snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port); - info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); - char initial_response[HTTP_HEADER_SIZE]; - if (rpt->stream_version > 1) { - if(rpt->stream_version >= STREAM_VERSION_COMPRESSION){ #ifdef ENABLE_COMPRESSION - if(!rpt->rrdpush_compression) - rpt->stream_version = STREAM_VERSION_CLABELS; -#else - if(STREAMING_PROTOCOL_CURRENT_VERSION < rpt->stream_version) { - rpt->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION; - } + if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { + if (!rpt->rrdpush_compression) + rpt->capabilities &= ~STREAM_CAP_COMPRESSION; + } #endif - } - info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->stream_version); - sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->stream_version); - } else if (rpt->stream_version == 1) { - info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->stream_version); + + info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); + char initial_response[HTTP_HEADER_SIZE]; + if (stream_has_capability(rpt, STREAM_CAP_VCAPS)) { + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->capabilities); + } + else if (stream_has_capability(rpt, STREAM_CAP_VN)) { + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s%d", START_STREAMING_PROMPT_VN, stream_capabilities_to_vn(rpt->capabilities)); + } else if (stream_has_capability(rpt, STREAM_CAP_V2)) { + log_receiver_capabilities(rpt); sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2); - } else { - info("STREAM %s [receive from [%s]:%s]: Netdata is using first stream protocol.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); - sprintf(initial_response, "%s", START_STREAMING_PROMPT); + } else { // stream_has_capability(rpt, STREAM_CAP_V1) + log_receiver_capabilities(rpt); + sprintf(initial_response, "%s", START_STREAMING_PROMPT_V1); } debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response); - #ifdef ENABLE_HTTPS - rpt->host->stream_ssl.conn = rpt->ssl.conn; - rpt->host->stream_ssl.flags = rpt->ssl.flags; +#ifdef ENABLE_HTTPS if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { #else if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { @@ -667,11 +697,33 @@ static int rrdpush_receive(struct receiver_state *rpt) error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); // convert the socket to a FILE * - FILE *fp = fdopen(rpt->fd, "r"); - if(!fp) { + // It seems that the same FILE * cannot be used for both reading and writing. + // (reads and writes seem to interfere with each other, with undefined results). + + int fd_in = rpt->fd; + int fd_out = fcntl(rpt->fd, F_DUPFD_CLOEXEC, 0); + if(fd_out == -1) { + error("STREAM %s [receive from [%s]:%s]: failed to duplicate FD %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - SOCKET ERROR"); - error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); - close(rpt->fd); + close(fd_in); + return 0; + } + + FILE *fp_out = fdopen(fd_out, "w"); + if(!fp_out) { + error("STREAM %s [receive from [%s]:%s]: failed to get a FILE pointer for fd_out %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - SOCKET ERROR"); + close(fd_in); + close(fd_out); + return 0; + } + + FILE *fp_in = fdopen(fd_in, "r"); + if(!fp_in) { + error("STREAM %s [receive from [%s]:%s]: failed to get a FILE pointer for fd_in %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - SOCKET ERROR"); + close(fd_in); + fclose(fp_out); return 0; } @@ -686,15 +738,6 @@ static int rrdpush_receive(struct receiver_state *rpt) */ // rpt->host->connected_senders++; - if(rpt->stream_version > 0) { - rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); - rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP); - } - else { - rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP); - rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); - } - if(health_enabled != CONFIG_BOOLEAN_NO) { if(alarms_delay > 0) { rpt->host->health_delay_up_to = now_realtime_sec() + alarms_delay; @@ -713,7 +756,7 @@ static int rrdpush_receive(struct receiver_state *rpt) info("STREAM %s [receive from [%s]:%s]: receiving metrics...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED"); - cd.version = rpt->stream_version; + cd.capabilities = rpt->capabilities; #ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud @@ -724,7 +767,7 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdcontext_host_child_connected(rpt->host); - size_t count = streaming_parser(rpt, &cd, fp); + size_t count = streaming_parser(rpt, &cd, fp_in, fp_out); log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname, "DISCONNECTED"); @@ -762,7 +805,8 @@ static int rrdpush_receive(struct receiver_state *rpt) } // cleanup - fclose(fp); + fclose(fp_in); + fclose(fp_out); return (int)count; } diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 9fa9793de2..018b29a033 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -11,8 +11,8 @@ * 1. a random data collection thread, calling rrdset_done_push() * this is called for each chart. * - * the output of this work is kept in a BUFFER in RRDHOST - * the sender thread is signalled via a pipe (also in RRDHOST) + * the output of this work is kept in a thread BUFFER + * the sender thread is signalled via a pipe (in RRDHOST) * * 2. a sender thread running at the sending netdata * this is spawned automatically on the first chart to be pushed @@ -101,9 +101,9 @@ int rrdpush_init() { bool invalid_certificate = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", CONFIG_BOOLEAN_NO); if(invalid_certificate == CONFIG_BOOLEAN_YES){ - if(netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE){ + if(netdata_ssl_validate_server == NETDATA_SSL_VALID_CERTIFICATE){ info("Netdata is configured to accept invalid SSL certificate."); - netdata_validate_server = NETDATA_SSL_INVALID_CERTIFICATE; + netdata_ssl_validate_server = NETDATA_SSL_INVALID_CERTIFICATE; } } @@ -130,40 +130,35 @@ unsigned int remote_clock_resync_iterations = 60; static inline bool should_send_chart_matching(RRDSET *st) { - RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE); + // get all the flags we need to check, with one atomic operation + RRDSET_FLAGS flags = rrdset_flag_check(st, + RRDSET_FLAG_UPSTREAM_SEND + |RRDSET_FLAG_UPSTREAM_IGNORE + |RRDSET_FLAG_ANOMALY_RATE_CHART + |RRDSET_FLAG_ANOMALY_DETECTION); if(unlikely(!flags)) { RRDHOST *host = st->rrdhost; // Do not stream anomaly rates charts. - if (unlikely(rrdset_is_ar_chart(st))) { - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND); + if (unlikely(flags & RRDSET_FLAG_ANOMALY_RATE_CHART)) rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE); - flags = RRDSET_FLAG_UPSTREAM_IGNORE; - } - else if (rrdset_flag_check(st, RRDSET_FLAG_ANOMALY_DETECTION)) { - if(ml_streaming_enabled()) { - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE); + + else if (flags & RRDSET_FLAG_ANOMALY_DETECTION) { + if(ml_streaming_enabled()) rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND); - flags = RRDSET_FLAG_UPSTREAM_SEND; - } - else { - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND); + else rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE); - flags = RRDSET_FLAG_UPSTREAM_IGNORE; - } } else if(simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_id(st)) || - simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_name(st))) { - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE); + simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_name(st))) + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND); - flags = RRDSET_FLAG_UPSTREAM_SEND; - } - else { - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND); + else rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE); - flags = RRDSET_FLAG_UPSTREAM_IGNORE; - } + + // get the flags again, to know how to respond + flags = rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE); } return flags & RRDSET_FLAG_UPSTREAM_SEND; @@ -196,16 +191,17 @@ static int send_clabels_callback(const char *name, const char *value, RRDLABEL_S buffer_sprintf(wb, "CLABEL \"%s\" \"%s\" %d\n", name, value, ls); return 1; } -void rrdpush_send_clabels(RRDHOST *host, RRDSET *st) { + +static void rrdpush_send_clabels(BUFFER *wb, RRDSET *st) { if (st->rrdlabels) { - if(rrdlabels_walkthrough_read(st->rrdlabels, send_clabels_callback, host->sender->build) > 0) - buffer_sprintf(host->sender->build,"CLABEL_COMMIT\n"); + if(rrdlabels_walkthrough_read(st->rrdlabels, send_clabels_callback, wb) > 0) + buffer_sprintf(wb, "CLABEL_COMMIT\n"); } } // Send the current chart definition. // Assumes that collector thread has already called sender_start for mutex / buffer state. -static inline void rrdpush_send_chart_definition(RRDSET *st) { +static inline void rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { RRDHOST *host = st->rrdhost; rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_EXPOSED); @@ -225,7 +221,7 @@ static inline void rrdpush_send_chart_definition(RRDSET *st) { // send the chart buffer_sprintf( - host->sender->build + wb , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n" , rrdset_id(st) , name @@ -245,14 +241,14 @@ static inline void rrdpush_send_chart_definition(RRDSET *st) { ); // send the chart labels - if (host->sender->version >= STREAM_VERSION_CLABELS) - rrdpush_send_clabels(host, st); + if (stream_has_capability(host->sender, STREAM_CAP_CLABELS)) + rrdpush_send_clabels(wb, st); // send the dimensions RRDDIM *rd; rrddim_foreach_read(rd, st) { buffer_sprintf( - host->sender->build + wb , "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n" , rrddim_id(rd) , rrddim_name(rd) @@ -267,30 +263,30 @@ static inline void rrdpush_send_chart_definition(RRDSET *st) { } rrddim_foreach_done(rd); + // send the chart functions + if(stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS)) + rrd_functions_expose_rrdpush(st, wb); + // send the chart local custom variables - rrdsetvar_print_to_streaming_custom_chart_variables(st, host->sender->build); + rrdsetvar_print_to_streaming_custom_chart_variables(st, wb); st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every); } // sends the current chart dimensions -static inline bool rrdpush_send_chart_metrics_nolock(RRDSET *st, struct sender_state *s) { - RRDHOST *host = st->rrdhost; - BUFFER *wb = host->sender->build; - +static inline void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s) { buffer_fast_strcat(wb, "BEGIN \"", 7); buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); buffer_fast_strcat(wb, "\" ", 2); buffer_print_llu(wb, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0); - if (s->version >= VERSION_GAP_FILLING) { + if (stream_has_capability(s, STREAM_CAP_GAP_FILLING)) { buffer_fast_strcat(wb, " ", 1); buffer_print_ll(wb, st->last_collected_time.tv_sec); } buffer_fast_strcat(wb, "\n", 1); - size_t count_of_dimensions_written = 0; RRDDIM *rd; rrddim_foreach_read(rd, st) { if(unlikely(!rd->updated)) @@ -302,7 +298,6 @@ static inline bool rrdpush_send_chart_metrics_nolock(RRDSET *st, struct sender_s buffer_fast_strcat(wb, "\" = ", 4); buffer_print_ll(wb, rd->collected_value); buffer_fast_strcat(wb, "\n", 1); - count_of_dimensions_written++; } else { internal_error(true, "host '%s', chart '%s', dimension '%s' flag 'exposed' is updated but not exposed", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd)); @@ -312,8 +307,6 @@ static inline bool rrdpush_send_chart_metrics_nolock(RRDSET *st, struct sender_s } rrddim_foreach_done(rd); buffer_fast_strcat(wb, "END\n", 4); - - return count_of_dimensions_written != 0; } static void rrdpush_sender_thread_spawn(RRDHOST *host); @@ -322,12 +315,12 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host); bool rrdset_push_chart_definition_now(RRDSET *st) { RRDHOST *host = st->rrdhost; - if(unlikely(!host->rrdpush_send_enabled || !should_send_chart_matching(st))) + if(unlikely(!rrdhost_can_send_definitions_to_parent(host) || !should_send_chart_matching(st))) return false; - sender_start(host->sender); - rrdpush_send_chart_definition(st); - sender_commit(host->sender); + BUFFER *wb = sender_start(host->sender); + rrdpush_send_chart_definition(wb, st); + sender_commit(host->sender, wb); return true; } @@ -365,44 +358,44 @@ bool rrdpush_incremental_transmission_of_chart_definitions(RRDHOST *host, DICTFE } void rrdset_done_push(RRDSET *st) { - if(unlikely(!should_send_chart_matching(st))) - return; - RRDHOST *host = st->rrdhost; - // Handle non-connected case - if(unlikely(!__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST) - || !rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_COLLECTED_METRICS))) { + // fetch the flags we need to check with one atomic operation + RRDHOST_FLAGS flags = rrdhost_flag_check(host, + RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS + | RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS + | RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN + ); + + // check if we are not connected + if(unlikely(!(flags & RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS))) { - if(unlikely(host->rrdpush_send_enabled && !host->rrdpush_sender_spawn)) + if(unlikely(!(flags & RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN))) rrdpush_sender_thread_spawn(host); - if(unlikely(!host->rrdpush_sender_error_shown)) + if(unlikely(!(flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) { + rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS); error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host)); - - host->rrdpush_sender_error_shown = 1; + } return; } - else if(unlikely(host->rrdpush_sender_error_shown)) { + else if(unlikely(flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) { info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host)); - host->rrdpush_sender_error_shown = 0; + rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS); } - sender_start(host->sender); + if(unlikely(!should_send_chart_matching(st))) + return; + + BUFFER *wb = sender_start(host->sender); if(unlikely(need_to_send_chart_definition(st))) - rrdpush_send_chart_definition(st); + rrdpush_send_chart_definition(wb, st); - if(likely(rrdpush_send_chart_metrics_nolock(st, host->sender))) { - // signal the sender there are more data - if (host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1) - error("STREAM %s [send]: cannot write to internal pipe", rrdhost_hostname(host)); + rrdpush_send_chart_metrics(wb, st, host->sender); - sender_commit(host->sender); - } - else - sender_cancel(host->sender); + sender_commit(host->sender, wb); } // labels @@ -411,45 +404,38 @@ static int send_labels_callback(const char *name, const char *value, RRDLABEL_SR buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value); return 1; } -void rrdpush_send_labels(RRDHOST *host) { - if (!host->rrdlabels || !rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE) || (rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_STOP |