summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-09-05 19:31:06 +0300
committerGitHub <noreply@github.com>2022-09-05 19:31:06 +0300
commit5e1b95cf92168c4df74586fb4430dc284806da82 (patch)
treef42077d8b02eaf316683453a7474bd1f599a833d /streaming
parent544aef1fde6e79ac57d2dea85d3f063076d7f885 (diff)
Deduplicate all netdata strings (#13570)
* rrdfamily * rrddim * rrdset plugin and module names * rrdset units * rrdset type * rrdset family * rrdset title * rrdset title more * rrdset context * rrdcalctemplate context and removal of context hash from rrdset * strings statistics * rrdset name * rearranged members of rrdset * eliminate rrdset name hash; rrdcalc chart converted to STRING * rrdset id, eliminated rrdset hash * rrdcalc, alarm_entry, alert_config and some of rrdcalctemplate * rrdcalctemplate * rrdvar * eval_variable * rrddimvar and rrdsetvar * rrdhost hostname, os and tags * fix master commits * added thread cache; implemented string_dup without locks * faster thread cache * rrdset and rrddim now use dictionaries for indexing * rrdhost now uses dictionary * rrdfamily now uses DICTIONARY * rrdvar using dictionary instead of AVL * allocate the right size to rrdvar flag members * rrdhost remaining char * members to STRING * * better error handling on indexing * strings now use a read/write lock to allow parallel searches to the index * removed AVL support from dictionaries; implemented STRING with native Judy calls * string releases should be negative * only 31 bits are allowed for enum flags * proper locking on strings * string threading unittest and fixes * fix lgtm finding * fixed naming * stream chart/dimension definitions at the beginning of a streaming session * thread stack variable is undefined on thread cancel * rrdcontext garbage collect per host on startup * worker control in garbage collection * relaxed deletion of rrdmetrics * type checking on dictfe * netdata chart to monitor rrdcontext triggers * Group chart label updates * rrdcontext better handling of collected rrdsets * rrdpush incremental transmition of definitions should use as much buffer as possible * require 1MB per chart * empty the sender buffer before enabling metrics streaming * fill up to 50% of buffer * reset signaling metrics sending * use the shared variable for status * use separate host flag for enabling streaming of metrics * make sure the flag is clear * add logging for streaming * add logging for streaming on buffer overflow * circular_buffer proper sizing * removed obsolete logs * do not execute worker jobs if not necessary * better messages about compression disabling * proper use of flags and updating rrdset last access time every time the obsoletion flag is flipped * monitor stream sender used buffer ratio * Update exporting unit tests * no need to compare label value with strcmp * streaming send workers now monitor bandwidth * workers now use strings * streaming receiver monitors incoming bandwidth * parser shift of worker ids * minor fixes * Group chart label updates * Populate context with dimensions that have data * Fix chart id * better shift of parser worker ids * fix for streaming compression * properly count received bytes * ensure LZ4 compression ring buffer does not wrap prematurely * do not stream empty charts; do not process empty instances in rrdcontext * need_to_send_chart_definition() does not need an rrdset lock any more * rrdcontext objects are collected, after data have been written to the db * better logging of RRDCONTEXT transitions * always set all variables needed by the worker utilization charts * implemented double linked list for most objects; eliminated alarm indexes from rrdhost; and many more fixes * lockless strings design - string_dup() and string_freez() are totally lockless when they dont need to touch Judy - only Judy is protected with a read/write lock * STRING code re-organization for clarity * thread_cache improvements; double numbers precision on worker threads * STRING_ENTRY now shadown STRING, so no duplicate definition is required; string_length() renamed to string_strlen() to follow the paradigm of all other functions, STRING internal statistics are now only compiled with NETDATA_INTERNAL_CHECKS * rrdhost index by hostname now cleans up; aclk queries of archieved hosts do not index hosts * Add index to speed up database context searches * Removed last_updated optimization (was also buggy after latest merge with master) Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Co-authored-by: Vladimir Kobal <vlad@prokk.net>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/compression.c71
-rw-r--r--streaming/receiver.c72
-rw-r--r--streaming/rrdpush.c140
-rw-r--r--streaming/rrdpush.h9
-rw-r--r--streaming/sender.c307
5 files changed, 400 insertions, 199 deletions
diff --git a/streaming/compression.c b/streaming/compression.c
index d6178d6c34..302b0b1809 100644
--- a/streaming/compression.c
+++ b/streaming/compression.c
@@ -5,9 +5,6 @@
#define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION"
-#define LZ4_MAX_MSG_SIZE 0x4000
-#define LZ4_STREAM_BUFFER_SIZE (0x10000 + LZ4_MAX_MSG_SIZE)
-
#define SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
#define SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
#define SIGNATURE_SIZE 4
@@ -18,8 +15,9 @@
*/
struct compressor_data {
LZ4_stream_t *stream;
- char *stream_buffer;
- size_t stream_buffer_pos;
+ char *input_ring_buffer;
+ size_t input_ring_buffer_size;
+ size_t input_ring_buffer_pos;
};
@@ -33,7 +31,7 @@ static void lz4_compressor_reset(struct compressor_state *state)
LZ4_resetStream_fast(state->data->stream);
info("%s: Compressor Reset", STREAM_COMPRESSION_MSG);
}
- state->data->stream_buffer_pos = 0;
+ state->data->input_ring_buffer_pos = 0;
}
}
@@ -47,10 +45,10 @@ static void lz4_compressor_destroy(struct compressor_state **state)
if (s->data) {
if (s->data->stream)
LZ4_freeStream(s->data->stream);
- freez(s->data->stream_buffer);
+ freez(s->data->input_ring_buffer);
freez(s->data);
}
- freez(s->buffer);
+ freez(s->compression_result_buffer);
freez(s);
*state = NULL;
debug(D_STREAM, "%s: Compressor Destroyed.", STREAM_COMPRESSION_MSG);
@@ -65,37 +63,53 @@ static void lz4_compressor_destroy(struct compressor_state **state)
*/
static size_t lz4_compressor_compress(struct compressor_state *state, const char *data, size_t size, char **out)
{
- if (!state || !size || !out)
+ if(unlikely(!state || !size || !out))
return 0;
- if (size > LZ4_MAX_MSG_SIZE) {
+
+ if(unlikely(size > LZ4_MAX_MSG_SIZE)) {
error("%s: Compression Failed - Message size %lu above compression buffer limit: %d", STREAM_COMPRESSION_MSG, size, LZ4_MAX_MSG_SIZE);
return 0;
}
+
size_t max_dst_size = LZ4_COMPRESSBOUND(size);
size_t data_size = max_dst_size + SIGNATURE_SIZE;
- if (!state->buffer) {
- state->buffer = mallocz(data_size);
- state->buffer_size = data_size;
- } else if (state->buffer_size < data_size) {
- state->buffer = reallocz(state->buffer, data_size);
- state->buffer_size = data_size;
+ if (!state->compression_result_buffer) {
+ state->compression_result_buffer = mallocz(data_size);
+ state->compression_result_buffer_size = data_size;
+ }
+ else if(unlikely(state->compression_result_buffer_size < data_size)) {
+ state->compression_result_buffer = reallocz(state->compression_result_buffer, data_size);
+ state->compression_result_buffer_size = data_size;
}
- memcpy(state->data->stream_buffer + state->data->stream_buffer_pos, data, size);
- long int compressed_data_size = LZ4_compress_fast_continue(state->data->stream,
- state->data->stream_buffer + state->data->stream_buffer_pos,
- state->buffer + SIGNATURE_SIZE, size, max_dst_size, 1);
+ // the ring buffer always has space for LZ4_MAX_MSG_SIZE
+ memcpy(state->data->input_ring_buffer + state->data->input_ring_buffer_pos, data, size);
+
+ // this call needs the last 64K of our previous data
+ // they are available in the ring buffer
+ long int compressed_data_size = LZ4_compress_fast_continue(
+ state->data->stream,
+ state->data->input_ring_buffer + state->data->input_ring_buffer_pos,
+ state->compression_result_buffer + SIGNATURE_SIZE,
+ size,
+ max_dst_size,
+ 1);
+
if (compressed_data_size < 0) {
error("Data compression error: %ld", compressed_data_size);
return 0;
}
- state->data->stream_buffer_pos += size;
- if (state->data->stream_buffer_pos >= LZ4_STREAM_BUFFER_SIZE - LZ4_MAX_MSG_SIZE)
- state->data->stream_buffer_pos = 0;
+
+ // update the next writing position of the ring buffer
+ state->data->input_ring_buffer_pos += size;
+ if(unlikely(state->data->input_ring_buffer_pos >= state->data->input_ring_buffer_size - LZ4_MAX_MSG_SIZE))
+ state->data->input_ring_buffer_pos = 0;
+
+ // update the signature header
uint32_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8;
- *(uint32_t *)state->buffer = len | SIGNATURE;
- *out = state->buffer;
+ *(uint32_t *)state->compression_result_buffer = len | SIGNATURE;
+ *out = state->compression_result_buffer;
debug(D_STREAM, "%s: Compressed data header: %ld", STREAM_COMPRESSION_MSG, compressed_data_size);
return compressed_data_size + SIGNATURE_SIZE;
}
@@ -114,8 +128,9 @@ struct compressor_state *create_compressor()
state->data = callocz(1, sizeof(struct compressor_data));
state->data->stream = LZ4_createStream();
- state->data->stream_buffer = callocz(1, LZ4_DECODER_RING_BUFFER_SIZE(LZ4_MAX_MSG_SIZE));
- state->buffer_size = LZ4_STREAM_BUFFER_SIZE;
+ state->data->input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(LZ4_MAX_MSG_SIZE * 2);
+ state->data->input_ring_buffer = callocz(1, state->data->input_ring_buffer_size);
+ state->compression_result_buffer_size = 0;
state->reset(state);
debug(D_STREAM, "%s: Initialize streaming compression!", STREAM_COMPRESSION_MSG);
return state;
@@ -281,6 +296,8 @@ static size_t lz4_decompressor_decompress(struct decompressor_state *state)
size_t avg_saving = saving_percent(state->total_compressed, state->total_uncompressed);
size_t avg_size = state->total_uncompressed / state->packet_count;
+ (void)saving;
+
if (old_avg_saving != avg_saving || old_avg_size != avg_size){
debug(D_STREAM, "%s: Saving: %lu%% (avg. %lu%%), avg.size: %lu", STREAM_COMPRESSION_MSG, saving, avg_saving, avg_size);
}
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 0890ebbcd0..a2852981a2 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -1,6 +1,13 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
+#include "parser/parser.h"
+
+#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1)
+
+#if WORKER_PARSER_FIRST_JOB < 1
+#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1
+#endif
extern struct config stream_config;
@@ -66,7 +73,7 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins
RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
struct plugind *cd = ((PARSER_USER_OBJECT *)user)->cd;
if (cd->version < VERSION_GAP_FILLING ) {
- error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", host->hostname, cd->cmd,
+ error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", rrdhost_hostname(host), cd->cmd,
cd->version);
return PARSER_RC_OK; // Ignore error and continue stream
}
@@ -78,7 +85,7 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins
info(
"STREAM %s from %s: Initial connection (no gap to check), "
"remote=%"PRId64" local=%"PRId64" slew=%"PRId64"",
- host->hostname,
+ rrdhost_hostname(host),
cd->cmd,
(int64_t)remote_time,
(int64_t)now,
@@ -88,7 +95,7 @@ PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins
info(
"STREAM %s from %s: Checking for gaps... "
"remote=%"PRId64" local=%"PRId64"..%"PRId64" slew=%"PRId64" %"PRId64"-sec gap",
- host->hostname,
+ rrdhost_hostname(host),
cd->cmd,
(int64_t)remote_time,
(int64_t)prev,
@@ -177,6 +184,7 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
int ret = SSL_read(r->ssl.conn, r->read_buffer + r->read_len, desired);
if (ret > 0 ) {
r->read_len += ret;
+ worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret);
return 0;
}
// Don't treat SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE differently on blocking socket
@@ -192,6 +200,7 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
if (!fgets(r->read_buffer, sizeof(r->read_buffer), fp))
return 1;
r->read_len = strlen(r->read_buffer);
+ worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, r->read_len);
return 0;
}
#else
@@ -279,22 +288,24 @@ static int read_stream(struct receiver_state *r, FILE *fp, char* buffer, size_t
*/
static int receiver_read(struct receiver_state *r, FILE *fp) {
// check any decompressed data present
- if (r->decompressor &&
- r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) {
+ if (r->decompressor && r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) {
size_t available = sizeof(r->read_buffer) - r->read_len;
if (available) {
- size_t len = r->decompressor->get(r->decompressor,
- r->read_buffer + r->read_len, available);
+ size_t len = r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, available);
if (!len)
return 1;
+
r->read_len += len;
}
return 0;
}
+
int ret = 0;
if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret))
return 1;
+ worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret);
+
if (!is_compressed_data(r->read_buffer, ret)) {
r->read_len += ret;
return 0;
@@ -303,8 +314,7 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
if (unlikely(!r->decompressor))
r->decompressor = create_decompressor();
- size_t bytes_to_read = r->decompressor->start(r->decompressor,
- r->read_buffer, ret);
+ size_t bytes_to_read = r->decompressor->start(r->decompressor, r->read_buffer, ret);
// Read the entire block of compressed data because
// we're unable to decompress incomplete block
@@ -312,18 +322,23 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
do {
if (read_stream(r, fp, compressed, bytes_to_read, &ret))
return 1;
+
+ worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, ret);
+
// Send input data to decompressor
if (ret)
r->decompressor->put(r->decompressor, compressed, ret);
+
bytes_to_read -= ret;
} while (bytes_to_read > 0);
+
// Decompress
size_t bytes_to_parse = r->decompressor->decompress(r->decompressor);
if (!bytes_to_parse)
return 1;
+
// Fill read buffer with decompressed data
- r->read_len = r->decompressor->get(r->decompressor,
- r->read_buffer, sizeof(r->read_buffer));
+ r->read_len = r->decompressor->get(r->decompressor, r->read_buffer, sizeof(r->read_buffer));
return 0;
}
@@ -486,8 +501,8 @@ static int rrdpush_receive(struct receiver_state *rpt)
#else
if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
#endif
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY");
- error("STREAM %s [receive from [%s]:%s]: cannot send command.", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY");
+ error("STREAM %s [receive from [%s]:%s]: cannot send command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
close(rpt->fd);
return 0;
}
@@ -575,14 +590,14 @@ static int rrdpush_receive(struct receiver_state *rpt)
, rpt->hostname
, rpt->client_ip
, rpt->client_port
- , rpt->host->hostname
+ , rrdhost_hostname(rpt->host)
, rpt->host->machine_guid
, rpt->host->rrd_update_every
, rpt->host->rrd_history_entries
, rrd_memory_mode_name(rpt->host->rrd_memory_mode)
, (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto")
, ssl ? " SSL," : ""
- , rpt->host->tags?rpt->host->tags:""
+ , rrdhost_tags(rpt->host)
);
#endif // NETDATA_INTERNAL_CHECKS
@@ -605,7 +620,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
- info("STREAM %s [receive from [%s]:%s]: initializing communication...", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
char initial_response[HTTP_HEADER_SIZE];
if (rpt->stream_version > 1) {
if(rpt->stream_version >= STREAM_VERSION_COMPRESSION){
@@ -618,13 +633,13 @@ static int rrdpush_receive(struct receiver_state *rpt)
}
#endif
}
- info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version);
+ info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->stream_version);
sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->stream_version);
} else if (rpt->stream_version == 1) {
- info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version);
+ info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->stream_version);
sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2);
} else {
- info("STREAM %s [receive from [%s]:%s]: Netdata is using first stream protocol.", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ info("STREAM %s [receive from [%s]:%s]: Netdata is using first stream protocol.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
sprintf(initial_response, "%s", START_STREAMING_PROMPT);
}
debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response);
@@ -635,27 +650,27 @@ static int rrdpush_receive(struct receiver_state *rpt)
#else
if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
#endif
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - CANNOT REPLY");
- error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rpt->host->hostname, rpt->client_ip, rpt->client_port);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY");
+ error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
close(rpt->fd);
return 0;
}
// remove the non-blocking flag from the socket
if(sock_delnonblock(rpt->fd) < 0)
- error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd);
+ error("STREAM %s [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
struct timeval timeout;
timeout.tv_sec = 120;
timeout.tv_usec = 0;
if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0))
- error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd);
+ error("STREAM %s [receive from [%s]:%s]: cannot set timeout for socket %d", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
// convert the socket to a FILE *
FILE *fp = fdopen(rpt->fd, "r");
if(!fp) {
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "FAILED - SOCKET ERROR");
- error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->fd);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - SOCKET ERROR");
+ error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, rpt->fd);
close(rpt->fd);
return 0;
}
@@ -686,7 +701,7 @@ static int rrdpush_receive(struct receiver_state *rpt)
info(
"Postponing health checks for %" PRId64 " seconds, on host '%s', because it was just connected.",
(int64_t)alarms_delay,
- rpt->host->hostname);
+ rrdhost_hostname(rpt->host));
}
}
rpt->host->senders_connect_time = now_realtime_sec();
@@ -695,8 +710,8 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdhost_unlock(rpt->host);
// call the plugins.d processor to receive the metrics
- info("STREAM %s [receive from [%s]:%s]: receiving metrics...", rpt->host->hostname, rpt->client_ip, rpt->client_port);
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "CONNECTED");
+ info("STREAM %s [receive from [%s]:%s]: receiving metrics...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "CONNECTED");
cd.version = rpt->stream_version;
@@ -758,6 +773,7 @@ void *rrdpush_receiver_thread(void *ptr) {
info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
worker_register("STREAMRCV");
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, "received bytes", "bytes/s", WORKER_METRIC_INCREMENTAL);
rrdpush_receive(rpt);
worker_unregister();
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index b73f24633c..67fc48aa09 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -140,8 +140,8 @@ static inline int should_send_chart_matching(RRDSET *st) {
if(!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE)) {
RRDHOST *host = st->rrdhost;
- if(simple_pattern_matches(host->rrdpush_send_charts_matching, st->id) ||
- simple_pattern_matches(host->rrdpush_send_charts_matching, st->name)) {
+ if(simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_id(st)) ||
+ simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_name(st))) {
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE);
rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
}
@@ -175,20 +175,17 @@ int configured_as_parent() {
// checks if the current chart definition has been sent
static inline int need_to_send_chart_definition(RRDSET *st) {
- rrdset_check_rdlock(st);
-
if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_EXPOSED))))
return 1;
RRDDIM *rd;
- rrddim_foreach_read(rd, st) {
+ dfe_start_read(st->rrddim_root_index, rd) {
if(unlikely(!rd->exposed)) {
- #ifdef NETDATA_INTERNAL_CHECKS
- info("host '%s', chart '%s', dimension '%s' flag 'exposed' triggered chart refresh to upstream", st->rrdhost->hostname, st->id, rd->id);
- #endif
+ internal_error(true, "host '%s', chart '%s', dimension '%s' flag 'exposed' triggered chart refresh to upstream", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
return 1;
}
}
+ dfe_done(rd);
return 0;
}
@@ -216,9 +213,9 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
// properly set the name for the remote end to parse it
char *name = "";
if(likely(st->name)) {
- if(unlikely(strcmp(st->id, st->name))) {
+ if(unlikely(st->id != st->name)) {
// they differ
- name = strchr(st->name, '.');
+ name = strchr(rrdset_name(st), '.');
if(name)
name++;
else
@@ -230,12 +227,12 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
buffer_sprintf(
host->sender->build
, "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n"
- , st->id
+ , rrdset_id(st)
, name
- , st->title
- , st->units
- , st->family
- , st->context
+ , rrdset_title(st)
+ , rrdset_units(st)
+ , rrdset_family(st)
+ , rrdset_context(st)
, rrdset_type_name(st->chart_type)
, st->priority
, st->update_every
@@ -243,8 +240,8 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
, rrdset_flag_check(st, RRDSET_FLAG_DETAIL)?"detail":""
, rrdset_flag_check(st, RRDSET_FLAG_STORE_FIRST)?"store_first":""
, rrdset_flag_check(st, RRDSET_FLAG_HIDDEN)?"hidden":""
- , (st->plugin_name)?st->plugin_name:""
- , (st->module_name)?st->module_name:""
+ , rrdset_plugin_name(st)
+ , rrdset_module_name(st)
);
// send the chart labels
@@ -257,8 +254,8 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
buffer_sprintf(
host->sender->build
, "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n"
- , rd->id
- , rd->name
+ , rrddim_id(rd)
+ , rrddim_name(rd)
, rrd_algorithm_name(rd->algorithm)
, rd->multiplier
, rd->divisor
@@ -278,7 +275,7 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
buffer_sprintf(
host->sender->build
, "VARIABLE CHART %s = " NETDATA_DOUBLE_FORMAT "\n"
- , rs->variable
+ , string2str(rs->variable)
, *value
);
}
@@ -288,40 +285,75 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
}
// sends the current chart dimensions
-static inline void rrdpush_send_chart_metrics_nolock(RRDSET *st, struct sender_state *s) {
+static inline bool rrdpush_send_chart_metrics_nolock(RRDSET *st, struct sender_state *s) {
RRDHOST *host = st->rrdhost;
- buffer_sprintf(host->sender->build, "BEGIN \"%s\" %llu", st->id, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0);
+ buffer_sprintf(host->sender->build, "BEGIN \"%s\" %llu", rrdset_id(st), (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0);
if (s->version >= VERSION_GAP_FILLING)
buffer_sprintf(host->sender->build, " %"PRId64"\n", (int64_t)st->last_collected_time.tv_sec);
else
buffer_strcat(host->sender->build, "\n");
+ size_t count_of_dimensions_written = 0;
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if(rd->updated && rd->exposed)
- buffer_sprintf(host->sender->build
- , "SET \"%s\" = " COLLECTED_NUMBER_FORMAT "\n"
- , rd->id
- , rd->collected_value
- );
+ if(rd->updated && rd->exposed) {
+ buffer_sprintf(host->sender->build, "SET \"%s\" = " COLLECTED_NUMBER_FORMAT "\n", rrddim_id(rd), rd->collected_value);
+ count_of_dimensions_written++;
+ }
}
buffer_strcat(host->sender->build, "END\n");
+
+ return count_of_dimensions_written != 0;
}
static void rrdpush_sender_thread_spawn(RRDHOST *host);
// Called from the internal collectors to mark a chart obsolete.
-void rrdset_push_chart_definition_now(RRDSET *st) {
+bool rrdset_push_chart_definition_now(RRDSET *st) {
RRDHOST *host = st->rrdhost;
if(unlikely(!host->rrdpush_send_enabled || !should_send_chart_matching(st)))
- return;
+ return false;
rrdset_rdlock(st);
sender_start(host->sender);
rrdpush_send_chart_definition_nolock(st);
sender_commit(host->sender);
rrdset_unlock(st);
+
+ return true;
+}
+
+bool rrdpush_incremental_transmission_of_chart_definitions(RRDHOST *host, DICTFE *dictfe, bool restart, bool stop) {
+ if(stop || restart)
+ dictionary_foreach_done(dictfe);
+
+ if(stop)
+ return false;
+
+ RRDSET *st = NULL;
+
+ if(unlikely(!dictfe->dict)) {
+ st = dictionary_foreach_start_rw(dictfe, host->rrdset_root_index, DICTIONARY_LOCK_REENTRANT);
+ }
+ else
+ st = dictionary_foreach_next(dictfe);
+
+ do {
+ while(st && !need_to_send_chart_definition(st))
+ st = dictionary_foreach_next(dictfe);
+
+ if(st && rrdset_push_chart_definition_now(st))
+ break;
+
+ } while((st = dictionary_foreach_next(dictfe)));
+
+ if (!st) {
+ dictionary_foreach_done(dictfe);
+ return false;
+ }
+
+ return true;
}
void rrdset_done_push(RRDSET *st) {
@@ -334,29 +366,37 @@ void rrdset_done_push(RRDSET *st) {
rrdpush_sender_thread_spawn(host);
// Handle non-connected case
- if(unlikely(!__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST))) {
+ if(unlikely(!__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST)
+ || !rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_COLLECTED_METRICS))) {
+
if(unlikely(!host->rrdpush_sender_error_shown))
- error("STREAM %s [send]: not ready - discarding collected metrics.", host->hostname);
+ error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
host->rrdpush_sender_error_shown = 1;
+
return;
}
else if(unlikely(host->rrdpush_sender_error_shown)) {
- info("STREAM %s [send]: sending metrics...", host->hostname);
+ info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
host->rrdpush_sender_error_shown = 0;
}
+ if(dictionary_stats_entries(st->rrddim_root_index) == 0)
+ return;
+
sender_start(host->sender);
if(need_to_send_chart_definition(st))
rrdpush_send_chart_definition_nolock(st);
- rrdpush_send_chart_metrics_nolock(st, host->sender);
-
- // signal the sender there are more data
- if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
- error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
+ if(rrdpush_send_chart_metrics_nolock(st, host->sender)) {
+ // signal the sender there are more data
+ if (host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
+ error("STREAM %s [send]: cannot write to internal pipe", rrdhost_hostname(host));
- sender_commit(host->sender);
+ sender_commit(host->sender);
+ }
+ else
+ sender_cancel(host->sender);
}
// labels
@@ -376,7 +416,7 @@ void rrdpush_send_labels(RRDHOST *host) {
sender_commit(host->sender);
if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
- error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
+ error("STREAM %s [send]: cannot write to internal pipe", rrdhost_hostname(host));
rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
}
@@ -399,7 +439,7 @@ void rrdpush_claimed_id(RRDHOST *host)
// signal the sender there are more data
if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
- error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
+ error("STREAM %s [send]: cannot write to internal pipe", rrdhost_hostname(host));
}
int connect_to_one_of_destinations(
@@ -496,7 +536,7 @@ void rrdpush_sender_thread_stop(RRDHOST *host) {
netdata_thread_t thr = 0;
if(host->rrdpush_sender_spawn) {
- info("STREAM %s [send]: signaling sending thread to stop...", host->hostname);
+ info("STREAM %s [send]: signaling sending thread to stop...", rrdhost_hostname(host));
// signal the thread that we want to join it
host->rrdpush_sender_join = 1;
@@ -512,10 +552,10 @@ void rrdpush_sender_thread_stop(RRDHOST *host) {
netdata_mutex_unlock(&host->sender->mutex);
if(thr != 0) {
- info("STREAM %s [send]: waiting for the sending thread to stop...", host->hostname);
+ info("STREAM %s [send]: waiting for the sending thread to stop...", rrdhost_hostname(host));
void *result;
netdata_thread_join(thr, &result);
- info("STREAM %s [send]: sending thread has exited.", host->hostname);
+ info("STREAM %s [send]: sending thread has exited.", rrdhost_hostname(host));
}
}
@@ -533,10 +573,10 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host) {
if(!host->rrdpush_sender_spawn) {
char tag[NETDATA_THREAD_TAG_MAX + 1];
- snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", host->hostname);
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", rrdhost_hostname(host));
if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, rrdpush_sender_thread, (void *) host->sender))
- error("STREAM %s [send]: failed to create new thread for client.", host->hostname);
+ error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host));
else
host->rrdpush_sender_spawn = 1;
}
@@ -746,7 +786,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
struct receiver_state *rpt = callocz(1, sizeof(*rpt));
rrd_rdlock();
- RRDHOST *host = rrdhost_find_by_guid(machine_guid, 0);
+ RRDHOST *host = rrdhost_find_by_guid(machine_guid);
if (unlikely(host && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) /* Ignore archived hosts. */
host = NULL;
if (host) {
@@ -763,7 +803,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
info(
"STREAM %s [receive from [%s]:%s]: multiple connections for same host detected - "
"existing connection is dead (%"PRId64" sec), accepting new connection.",
- host->hostname,
+ rrdhost_hostname(host),
w->client_ip,
w->client_port,
(int64_t)age);
@@ -772,12 +812,12 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
netdata_mutex_unlock(&host->receiver_lock);
rrdhost_unlock(host);
rrd_unlock();
- log_stream_connection(w->client_ip, w->client_port, key, host->machine_guid, host->hostname,
+ log_stream_connection(w->client_ip, w->client_port, key, host->machine_guid, rrdhost_hostname(host),