diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-09-05 19:31:06 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-05 19:31:06 +0300 |
commit | 5e1b95cf92168c4df74586fb4430dc284806da82 (patch) | |
tree | f42077d8b02eaf316683453a7474bd1f599a833d /streaming | |
parent | 544aef1fde6e79ac57d2dea85d3f063076d7f885 (diff) |
Deduplicate all netdata strings (#13570)
* rrdfamily
* rrddim
* rrdset plugin and module names
* rrdset units
* rrdset type
* rrdset family
* rrdset title
* rrdset title more
* rrdset context
* rrdcalctemplate context and removal of context hash from rrdset
* strings statistics
* rrdset name
* rearranged members of rrdset
* eliminate rrdset name hash; rrdcalc chart converted to STRING
* rrdset id, eliminated rrdset hash
* rrdcalc, alarm_entry, alert_config and some of rrdcalctemplate
* rrdcalctemplate
* rrdvar
* eval_variable
* rrddimvar and rrdsetvar
* rrdhost hostname, os and tags
* fix master commits
* added thread cache; implemented string_dup without locks
* faster thread cache
* rrdset and rrddim now use dictionaries for indexing
* rrdhost now uses dictionary
* rrdfamily now uses DICTIONARY
* rrdvar using dictionary instead of AVL
* allocate the right size to rrdvar flag members
* rrdhost remaining char * members to STRING *
* better error handling on indexing
* strings now use a read/write lock to allow parallel searches to the index
* removed AVL support from dictionaries; implemented STRING with native Judy calls
* string releases should be negative
* only 31 bits are allowed for enum flags
* proper locking on strings
* string threading unittest and fixes
* fix lgtm finding
* fixed naming
* stream chart/dimension definitions at the beginning of a streaming session
* thread stack variable is undefined on thread cancel
* rrdcontext garbage collect per host on startup
* worker control in garbage collection
* relaxed deletion of rrdmetrics
* type checking on dictfe
* netdata chart to monitor rrdcontext triggers
* Group chart label updates
* rrdcontext better handling of collected rrdsets
* rrdpush incremental transmition of definitions should use as much buffer as possible
* require 1MB per chart
* empty the sender buffer before enabling metrics streaming
* fill up to 50% of buffer
* reset signaling metrics sending
* use the shared variable for status
* use separate host flag for enabling streaming of metrics
* make sure the flag is clear
* add logging for streaming
* add logging for streaming on buffer overflow
* circular_buffer proper sizing
* removed obsolete logs
* do not execute worker jobs if not necessary
* better messages about compression disabling
* proper use of flags and updating rrdset last access time every time the obsoletion flag is flipped
* monitor stream sender used buffer ratio
* Update exporting unit tests
* no need to compare label value with strcmp
* streaming send workers now monitor bandwidth
* workers now use strings
* streaming receiver monitors incoming bandwidth
* parser shift of worker ids
* minor fixes
* Group chart label updates
* Populate context with dimensions that have data
* Fix chart id
* better shift of parser worker ids
* fix for streaming compression
* properly count received bytes
* ensure LZ4 compression ring buffer does not wrap prematurely
* do not stream empty charts; do not process empty instances in rrdcontext
* need_to_send_chart_definition() does not need an rrdset lock any more
* rrdcontext objects are collected, after data have been written to the db
* better logging of RRDCONTEXT transitions
* always set all variables needed by the worker utilization charts
* implemented double linked list for most objects; eliminated alarm indexes from rrdhost; and many more fixes
* lockless strings design - string_dup() and string_freez() are totally lockless when they dont need to touch Judy - only Judy is protected with a read/write lock
* STRING code re-organization for clarity
* thread_cache improvements; double numbers precision on worker threads
* STRING_ENTRY now shadown STRING, so no duplicate definition is required; string_length() renamed to string_strlen() to follow the paradigm of all other functions, STRING internal statistics are now only compiled with NETDATA_INTERNAL_CHECKS
* rrdhost index by hostname now cleans up; aclk queries of archieved hosts do not index hosts
* Add index to speed up database context searches
* Removed last_updated optimization (was also buggy after latest merge with master)
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Co-authored-by: Vladimir Kobal <vlad@prokk.net>
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/compression.c | 71 | ||||
-rw-r--r-- | streaming/receiver.c | 72 | ||||
-rw-r--r-- | streaming/rrdpush.c | 140 | ||||
-rw-r--r-- | streaming/rrdpush.h | 9 | ||||
-rw-r--r-- | streaming/sender.c | 307 |
5 files changed, 400 insertions, 199 deletions
diff --git a/streaming/compression.c b/streaming/compression.c index d6178d6c34..302b0b1809 100644 --- a/streaming/compression.c +++ b/streaming/compression.c @@ -5,9 +5,6 @@ #define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION" -#define LZ4_MAX_MSG_SIZE 0x4000 -#define LZ4_STREAM_BUFFER_SIZE (0x10000 + LZ4_MAX_MSG_SIZE) - #define SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24)) #define SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24)) #define SIGNATURE_SIZE 4 @@ -18,8 +15,9 @@ */ struct compressor_data { LZ4_stream_t *stream; - char *stream_buffer; - size_t stream_buffer_pos; + char *input_ring_buffer; + size_t input_ring_buffer_size; + size_t input_ring_buffer_pos; }; @@ -33,7 +31,7 @@ static void lz4_compressor_reset(struct compressor_state *state) LZ4_resetStream_fast(state->data->stream); info("%s: Compressor Reset", STREAM_COMPRESSION_MSG); } - state->data->stream_buffer_pos = 0; + state->data->input_ring_buffer_pos = 0; } } @@ -47,10 +45,10 @@ static void lz4_compressor_destroy(struct compressor_state **state) if (s->data) { if (s->data->stream) LZ4_freeStream(s->data->stream); - freez(s->data->stream_buffer); + freez(s->data->input_ring_buffer); freez(s->data); } - freez(s->buffer); + freez(s->compression_result_buffer); freez(s); *state = NULL; debug(D_STREAM, "%s: Compressor Destroyed.", STREAM_COMPRESSION_MSG); @@ -65,37 +63,53 @@ static void lz4_compressor_destroy(struct compressor_state **state) */ static size_t lz4_compressor_compress(struct compressor_state *state, const char *data, size_t size, char **out) { - if (!state || !size || !out) + if(unlikely(!state || !size || !out)) return 0; - if (size > LZ4_MAX_MSG_SIZE) { + + if(unlikely(size > LZ4_MAX_MSG_SIZE)) { error("%s: Compression Failed - Message size %lu above compression buffer limit: %d", STREAM_COMPRESSION_MSG, size, LZ4_MAX_MSG_SIZE); return 0; } + size_t max_dst_size = LZ4_COMPRESSBOUND(size); size_t data_size = max_dst_size + SIGNATURE_SIZE; - if (!state->buffer) { - state->buffer = mallocz(data_size); - state->buffer_size = data_size; - } else if (state->buffer_size < data_size) { - state->buffer = reallocz(state->buffer, data_size); - state->buffer_size = data_size; + if (!state->compression_result_buffer) { + state->compression_result_buffer = mallocz(data_size); + state->compression_result_buffer_size = data_size; + } + else if(unlikely(state->compression_result_buffer_size < data_size)) { + state->compression_result_buffer = reallocz(state->compression_result_buffer, data_size); + state->compression_result_buffer_size = data_size; } - memcpy(state->data->stream_buffer + state->data->stream_buffer_pos, data, size); - long int compressed_data_size = LZ4_compress_fast_continue(state->data->stream, - state->data->stream_buffer + state->data->stream_buffer_pos, - state->buffer + SIGNATURE_SIZE, size, max_dst_size, 1); + // the ring buffer always has space for LZ4_MAX_MSG_SIZE + memcpy(state->data->input_ring_buffer + state->data->input_ring_buffer_pos, data, size); + + // this call needs the last 64K of our previous data + // they are available in the ring buffer + long int compressed_data_size = LZ4_compress_fast_continue( + state->data->stream, + state->data->input_ring_buffer + state->data->input_ring_buffer_pos, + state->compression_result_buffer + SIGNATURE_SIZE, + size, + max_dst_size, + 1); + if (compressed_data_size < 0) { error("Data compression error: %ld", compressed_data_size); return 0; } - state->data->stream_buffer_pos += size; - if (state->data->stream_buffer_pos >= LZ4_STREAM_BUFFER_SIZE - LZ4_MAX_MSG_SIZE) - state->data->stream_buffer_pos = 0; + + // update the next writing position of the ring buffer + state->data->input_ring_buffer_pos += size; + if(unlikely(state->data->input_ring_buffer_pos >= state->data->input_ring_buffer_size - LZ4_MAX_MSG_SIZE)) + state->data->input_ring_buffer_pos = 0; + + // update the signature header uint32_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8; - *(uint32_t *)state->buffer = len | SIGNATURE; - *out = state->buffer; + *(uint32_t *)state->compression_result_buffer = len | SIGNATURE; + *out = state->compression_result_buffer; debug(D_STREAM, "%s: Compressed data header: %ld", STREAM_COMPRESSION_MSG, compressed_data_size); return compressed_data_size + SIGNATURE_SIZE; } @@ -114,8 +128,9 @@ struct compressor_state *create_compressor() state->data = callocz(1, sizeof(struct compressor_data)); state->data->stream = LZ4_createStream(); - state->data->stream_buffer = callocz(1, LZ4_DECODER_RING_BUFFER_SIZE(LZ4_MAX_MSG_SIZE)); - state->buffer_size = LZ4_STREAM_BUFFER_SIZE; + state->data->input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(LZ4_MAX_MSG_SIZE * 2); + state->data->input_ring_buffer = callocz(1, state->data->input_ring_buffer_size); + state->compression_result_buffer_size = 0; state->reset(state); debug(D_STREAM, "%s: Initialize streaming compression!", STREAM_COMPRESSION_MSG); return state; @@ -281,6 +296,8 @@ static size_t lz4_decompressor_decompress(struct decompressor_state *state) size_t avg_saving = saving_percent(state->total_compressed, state->total_uncompressed); size_t avg_size = state->total_uncompressed / state->packet_count; + (void)saving; + if (old_avg_saving != avg_saving || old_avg_size != avg_size){ debug(D_STREAM, "%s: Saving: %lu%% (avg. %lu%%), avg.size: %lu", STREAM_COMPRESSION_MSG, saving, avg_saving, avg_size); } diff --git a/streaming/receiver.c b/streaming/receiver.c index 0890ebbcd0..a2852981a2 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -1,6 +1,13 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdpush.h" +#include "parser/parser.h" + +#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1) + +#if WORKER_PARSER_FIRST_JOB < 1 +#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1 +#endif extern struct config stream_config; @@ -66,7 +73,7 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; struct plugind *cd = ((PARSER_USER_OBJECT *)user)->cd; if (cd->version < VERSION_GAP_FILLING ) { - error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", host->hostname, cd->cmd, + error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", rrdhost_hostname(host), cd->cmd, cd->version); return PARSER_RC_OK; // Ignore error and continue stream } @@ -78,7 +85,7 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins info( "STREAM %s from %s: Initial connection (no gap to check), " "remote=%"PRId64" local=%"PRId64" slew=%"PRId64"", - host->hostname, + rrdhost_hostname(host), cd->cmd, (int64_t)remote_time, (int64_t)now, @@ -88,7 +95,7 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins info( "STREAM %s from %s: Checking for gaps... " "remote=%"PRId64" local=%"PRId64"..%"PRId64" slew=%"PRId64" %"PRId64"-sec gap", - host->hostname, + rrdhost_hostname(host), cd->cmd, (int64_t)remote_time, (int64_t)prev, @@ -177,6 +184,7 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { int ret = SSL_read(r->ssl.conn, r->read_buffer + r->read_len, desired); if (ret > 0 ) { r->read_len += ret; + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret); return 0; } // Don't treat SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE differently on blocking socket @@ -192,6 +200,7 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { if (!fgets(r->read_buffer, sizeof(r->read_buffer), fp)) return 1; r->read_len = strlen(r->read_buffer); + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, r->read_len); return 0; } #else @@ -279,22 +288,24 @@ static int read_stream(struct receiver_state *r, FILE *fp, char* buffer, size_t */ static int receiver_read(struct receiver_state *r, FILE *fp) { // check any decompressed data present - if (r->decompressor && - r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) { + if (r->decompressor && r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) { size_t available = sizeof(r->read_buffer) - r->read_len; if (available) { - size_t len = r->decompressor->get(r->decompressor, - r->read_buffer + r->read_len, available); + size_t len = r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, available); if (!len) return 1; + r->read_len += len; } return 0; } + int ret = 0; if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret)) return 1; + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret); + if (!is_compressed_data(r->read_buffer, ret)) { r->read_len += ret; return 0; @@ -303,8 +314,7 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { if (unlikely(!r->decompressor)) r->decompressor = create_decompressor(); - size_t bytes_to_read = r->decompressor->start(r->decompressor, - r->read_buffer, ret); + size_t bytes_to_read = r->decompressor->start(r->decompressor, r->read_buffer, ret); // Read the entire block of compressed data because // we're unable to decompress incomplete block @@ -312,18 +322,23 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { do { if (read_stream(r, fp, compressed, bytes_to_read, &ret)) return 1; + + worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret); + // Send input data to decompressor if (ret) r->decompressor->put(r->decompressor, compressed, ret); + bytes_to_read -= ret; } while (bytes_to_read > 0); + // Decompress size_t bytes_to_parse = r->decompressor->decompress(r->decompressor); if (!bytes_to_parse) return 1; + // Fill read buffer with decompressed data - r->read_len = r->decompressor->get(r->decompressor, - r->read_buffer, sizeof(r->read_buffer)); + r->read_len = r->decompressor->get(r->decompressor, r->read_buffer, sizeof(r->read_buffer)); return 0; } @@ -486,8 +501,8 @@ static int rrdpush_receive(struct receiver_state *rpt) #else if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { #endif - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY"); - error("STREAM %s [receive from [%s]:%s]: cannot send command.", rpt->host->hostname, rpt->client_ip, rpt->client_port); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY"); + error("STREAM %s [receive from [%s]:%s]: cannot send command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); close(rpt->fd); return 0; } @@ -575,14 +590,14 @@ static int rrdpush_receive(struct receiver_state *rpt) , rpt->hostname , rpt->client_ip , rpt->client_port - , rpt->host->hostname + , rrdhost_hostname(rpt->host) , rpt->host->machine_guid , rpt->host->rrd_update_every , rpt->host->rrd_history_entries , rrd_memory_mode_name(rpt->host->rrd_memory_mode) , (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto") , ssl ? " SSL," : "" - , rpt->host->tags?rpt->host->tags:"" + , rrdhost_tags(rpt->host) ); #endif // NETDATA_INTERNAL_CHECKS @@ -605,7 +620,7 @@ static int rrdpush_receive(struct receiver_state *rpt) snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port); snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port); - info("STREAM %s [receive from [%s]:%s]: initializing communication...", rpt->host->hostname, rpt->client_ip, rpt->client_port); + info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); char initial_response[HTTP_HEADER_SIZE]; if (rpt->stream_version > 1) { if(rpt->stream_version >= STREAM_VERSION_COMPRESSION){ @@ -618,13 +633,13 @@ static int rrdpush_receive(struct receiver_state *rpt) } #endif } - info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version); + info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->stream_version); sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->stream_version); } else if (rpt->stream_version == 1) { - info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version); + info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->stream_version); sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2); } else { - info("STREAM %s [receive from [%s]:%s]: Netdata is using first stream protocol.", rpt->host->hostname, rpt->client_ip, rpt->client_port); + info("STREAM %s [receive from [%s]:%s]: Netdata is using first stream protocol.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); sprintf(initial_response, "%s", START_STREAMING_PROMPT); } debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response); @@ -635,27 +650,27 @@ static int rrdpush_receive(struct receiver_state *rpt) #else if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { #endif - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY"); - error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rpt->host->hostname, rpt->client_ip, rpt->client_port); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY"); + error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); close(rpt->fd); return 0; } // remove the non-blocking flag from the socket if(sock_delnonblock(rpt->fd) < 0) - error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd); + error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); struct timeval timeout; timeout.tv_sec = 120; timeout.tv_usec = 0; if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0)) - error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd); + error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); // convert the socket to a FILE * FILE *fp = fdopen(rpt->fd, "r"); if(!fp) { - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - SOCKET ERROR"); - error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - SOCKET ERROR"); + error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd); close(rpt->fd); return 0; } @@ -686,7 +701,7 @@ static int rrdpush_receive(struct receiver_state *rpt) info( "Postponing health checks for %" PRId64 " seconds, on host '%s', because it was just connected.", (int64_t)alarms_delay, - rpt->host->hostname); + rrdhost_hostname(rpt->host)); } } rpt->host->senders_connect_time = now_realtime_sec(); @@ -695,8 +710,8 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdhost_unlock(rpt->host); // call the plugins.d processor to receive the metrics - info("STREAM %s [receive from [%s]:%s]: receiving metrics...", rpt->host->hostname, rpt->client_ip, rpt->client_port); - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "CONNECTED"); + info("STREAM %s [receive from [%s]:%s]: receiving metrics...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED"); cd.version = rpt->stream_version; @@ -758,6 +773,7 @@ void *rrdpush_receiver_thread(void *ptr) { info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); worker_register("STREAMRCV"); + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, "received bytes", "bytes/s", WORKER_METRIC_INCREMENTAL); rrdpush_receive(rpt); worker_unregister(); diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index b73f24633c..67fc48aa09 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -140,8 +140,8 @@ static inline int should_send_chart_matching(RRDSET *st) { if(!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE)) { RRDHOST *host = st->rrdhost; - if(simple_pattern_matches(host->rrdpush_send_charts_matching, st->id) || - simple_pattern_matches(host->rrdpush_send_charts_matching, st->name)) { + if(simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_id(st)) || + simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_name(st))) { rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE); rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND); } @@ -175,20 +175,17 @@ int configured_as_parent() { // checks if the current chart definition has been sent static inline int need_to_send_chart_definition(RRDSET *st) { - rrdset_check_rdlock(st); - if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_EXPOSED)))) return 1; RRDDIM *rd; - rrddim_foreach_read(rd, st) { + dfe_start_read(st->rrddim_root_index, rd) { if(unlikely(!rd->exposed)) { - #ifdef NETDATA_INTERNAL_CHECKS - info("host '%s', chart '%s', dimension '%s' flag 'exposed' triggered chart refresh to upstream", st->rrdhost->hostname, st->id, rd->id); - #endif + internal_error(true, "host '%s', chart '%s', dimension '%s' flag 'exposed' triggered chart refresh to upstream", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd)); return 1; } } + dfe_done(rd); return 0; } @@ -216,9 +213,9 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { // properly set the name for the remote end to parse it char *name = ""; if(likely(st->name)) { - if(unlikely(strcmp(st->id, st->name))) { + if(unlikely(st->id != st->name)) { // they differ - name = strchr(st->name, '.'); + name = strchr(rrdset_name(st), '.'); if(name) name++; else @@ -230,12 +227,12 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { buffer_sprintf( host->sender->build , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n" - , st->id + , rrdset_id(st) , name - , st->title - , st->units - , st->family - , st->context + , rrdset_title(st) + , rrdset_units(st) + , rrdset_family(st) + , rrdset_context(st) , rrdset_type_name(st->chart_type) , st->priority , st->update_every @@ -243,8 +240,8 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { , rrdset_flag_check(st, RRDSET_FLAG_DETAIL)?"detail":"" , rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST)?"store_first":"" , rrdset_flag_check(st, RRDSET_FLAG_HIDDEN)?"hidden":"" - , (st->plugin_name)?st->plugin_name:"" - , (st->module_name)?st->module_name:"" + , rrdset_plugin_name(st) + , rrdset_module_name(st) ); // send the chart labels @@ -257,8 +254,8 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { buffer_sprintf( host->sender->build , "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n" - , rd->id - , rd->name + , rrddim_id(rd) + , rrddim_name(rd) , rrd_algorithm_name(rd->algorithm) , rd->multiplier , rd->divisor @@ -278,7 +275,7 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { buffer_sprintf( host->sender->build , "VARIABLE CHART %s = " NETDATA_DOUBLE_FORMAT "\n" - , rs->variable + , string2str(rs->variable) , *value ); } @@ -288,40 +285,75 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { } // sends the current chart dimensions -static inline void rrdpush_send_chart_metrics_nolock(RRDSET *st, struct sender_state *s) { +static inline bool rrdpush_send_chart_metrics_nolock(RRDSET *st, struct sender_state *s) { RRDHOST *host = st->rrdhost; - buffer_sprintf(host->sender->build, "BEGIN \"%s\" %llu", st->id, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0); + buffer_sprintf(host->sender->build, "BEGIN \"%s\" %llu", rrdset_id(st), (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0); if (s->version >= VERSION_GAP_FILLING) buffer_sprintf(host->sender->build, " %"PRId64"\n", (int64_t)st->last_collected_time.tv_sec); else buffer_strcat(host->sender->build, "\n"); + size_t count_of_dimensions_written = 0; RRDDIM *rd; rrddim_foreach_read(rd, st) { - if(rd->updated && rd->exposed) - buffer_sprintf(host->sender->build - , "SET \"%s\" = " COLLECTED_NUMBER_FORMAT "\n" - , rd->id - , rd->collected_value - ); + if(rd->updated && rd->exposed) { + buffer_sprintf(host->sender->build, "SET \"%s\" = " COLLECTED_NUMBER_FORMAT "\n", rrddim_id(rd), rd->collected_value); + count_of_dimensions_written++; + } } buffer_strcat(host->sender->build, "END\n"); + + return count_of_dimensions_written != 0; } static void rrdpush_sender_thread_spawn(RRDHOST *host); // Called from the internal collectors to mark a chart obsolete. -void rrdset_push_chart_definition_now(RRDSET *st) { +bool rrdset_push_chart_definition_now(RRDSET *st) { RRDHOST *host = st->rrdhost; if(unlikely(!host->rrdpush_send_enabled || !should_send_chart_matching(st))) - return; + return false; rrdset_rdlock(st); sender_start(host->sender); rrdpush_send_chart_definition_nolock(st); sender_commit(host->sender); rrdset_unlock(st); + + return true; +} + +bool rrdpush_incremental_transmission_of_chart_definitions(RRDHOST *host, DICTFE *dictfe, bool restart, bool stop) { + if(stop || restart) + dictionary_foreach_done(dictfe); + + if(stop) + return false; + + RRDSET *st = NULL; + + if(unlikely(!dictfe->dict)) { + st = dictionary_foreach_start_rw(dictfe, host->rrdset_root_index, DICTIONARY_LOCK_REENTRANT); + } + else + st = dictionary_foreach_next(dictfe); + + do { + while(st && !need_to_send_chart_definition(st)) + st = dictionary_foreach_next(dictfe); + + if(st && rrdset_push_chart_definition_now(st)) + break; + + } while((st = dictionary_foreach_next(dictfe))); + + if (!st) { + dictionary_foreach_done(dictfe); + return false; + } + + return true; } void rrdset_done_push(RRDSET *st) { @@ -334,29 +366,37 @@ void rrdset_done_push(RRDSET *st) { rrdpush_sender_thread_spawn(host); // Handle non-connected case - if(unlikely(!__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST))) { + if(unlikely(!__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST) + || !rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_COLLECTED_METRICS))) { + if(unlikely(!host->rrdpush_sender_error_shown)) - error("STREAM %s [send]: not ready - discarding collected metrics.", host->hostname); + error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host)); host->rrdpush_sender_error_shown = 1; + return; } else if(unlikely(host->rrdpush_sender_error_shown)) { - info("STREAM %s [send]: sending metrics...", host->hostname); + info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host)); host->rrdpush_sender_error_shown = 0; } + if(dictionary_stats_entries(st->rrddim_root_index) == 0) + return; + sender_start(host->sender); if(need_to_send_chart_definition(st)) rrdpush_send_chart_definition_nolock(st); - rrdpush_send_chart_metrics_nolock(st, host->sender); - - // signal the sender there are more data - if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1) - error("STREAM %s [send]: cannot write to internal pipe", host->hostname); + if(rrdpush_send_chart_metrics_nolock(st, host->sender)) { + // signal the sender there are more data + if (host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1) + error("STREAM %s [send]: cannot write to internal pipe", rrdhost_hostname(host)); - sender_commit(host->sender); + sender_commit(host->sender); + } + else + sender_cancel(host->sender); } // labels @@ -376,7 +416,7 @@ void rrdpush_send_labels(RRDHOST *host) { sender_commit(host->sender); if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1) - error("STREAM %s [send]: cannot write to internal pipe", host->hostname); + error("STREAM %s [send]: cannot write to internal pipe", rrdhost_hostname(host)); rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); } @@ -399,7 +439,7 @@ void rrdpush_claimed_id(RRDHOST *host) // signal the sender there are more data if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1) - error("STREAM %s [send]: cannot write to internal pipe", host->hostname); + error("STREAM %s [send]: cannot write to internal pipe", rrdhost_hostname(host)); } int connect_to_one_of_destinations( @@ -496,7 +536,7 @@ void rrdpush_sender_thread_stop(RRDHOST *host) { netdata_thread_t thr = 0; if(host->rrdpush_sender_spawn) { - info("STREAM %s [send]: signaling sending thread to stop...", host->hostname); + info("STREAM %s [send]: signaling sending thread to stop...", rrdhost_hostname(host)); // signal the thread that we want to join it host->rrdpush_sender_join = 1; @@ -512,10 +552,10 @@ void rrdpush_sender_thread_stop(RRDHOST *host) { netdata_mutex_unlock(&host->sender->mutex); if(thr != 0) { - info("STREAM %s [send]: waiting for the sending thread to stop...", host->hostname); + info("STREAM %s [send]: waiting for the sending thread to stop...", rrdhost_hostname(host)); void *result; netdata_thread_join(thr, &result); - info("STREAM %s [send]: sending thread has exited.", host->hostname); + info("STREAM %s [send]: sending thread has exited.", rrdhost_hostname(host)); } } @@ -533,10 +573,10 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host) { if(!host->rrdpush_sender_spawn) { char tag[NETDATA_THREAD_TAG_MAX + 1]; - snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", host->hostname); + snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", rrdhost_hostname(host)); if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, rrdpush_sender_thread, (void *) host->sender)) - error("STREAM %s [send]: failed to create new thread for client.", host->hostname); + error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host)); else host->rrdpush_sender_spawn = 1; } @@ -746,7 +786,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { struct receiver_state *rpt = callocz(1, sizeof(*rpt)); rrd_rdlock(); - RRDHOST *host = rrdhost_find_by_guid(machine_guid, 0); + RRDHOST *host = rrdhost_find_by_guid(machine_guid); if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */ host = NULL; if (host) { @@ -763,7 +803,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { info( "STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - " "existing connection is dead (%"PRId64" sec), accepting new connection.", - host->hostname, + rrdhost_hostname(host), w->client_ip, w->client_port, (int64_t)age); @@ -772,12 +812,12 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { netdata_mutex_unlock(&host->receiver_lock); rrdhost_unlock(host); rrd_unlock(); - log_stream_connection(w->client_ip, w->client_port, key, host->machine_guid, host->hostname, + log_stream_connection(w->client_ip, w->client_port, key, host->machine_guid, rrdhost_hostname(host), |