summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-09-05 19:31:06 +0300
committerGitHub <noreply@github.com>2022-09-05 19:31:06 +0300
commit5e1b95cf92168c4df74586fb4430dc284806da82 (patch)
treef42077d8b02eaf316683453a7474bd1f599a833d /streaming/sender.c
parent544aef1fde6e79ac57d2dea85d3f063076d7f885 (diff)
Deduplicate all netdata strings (#13570)
* rrdfamily * rrddim * rrdset plugin and module names * rrdset units * rrdset type * rrdset family * rrdset title * rrdset title more * rrdset context * rrdcalctemplate context and removal of context hash from rrdset * strings statistics * rrdset name * rearranged members of rrdset * eliminate rrdset name hash; rrdcalc chart converted to STRING * rrdset id, eliminated rrdset hash * rrdcalc, alarm_entry, alert_config and some of rrdcalctemplate * rrdcalctemplate * rrdvar * eval_variable * rrddimvar and rrdsetvar * rrdhost hostname, os and tags * fix master commits * added thread cache; implemented string_dup without locks * faster thread cache * rrdset and rrddim now use dictionaries for indexing * rrdhost now uses dictionary * rrdfamily now uses DICTIONARY * rrdvar using dictionary instead of AVL * allocate the right size to rrdvar flag members * rrdhost remaining char * members to STRING * * better error handling on indexing * strings now use a read/write lock to allow parallel searches to the index * removed AVL support from dictionaries; implemented STRING with native Judy calls * string releases should be negative * only 31 bits are allowed for enum flags * proper locking on strings * string threading unittest and fixes * fix lgtm finding * fixed naming * stream chart/dimension definitions at the beginning of a streaming session * thread stack variable is undefined on thread cancel * rrdcontext garbage collect per host on startup * worker control in garbage collection * relaxed deletion of rrdmetrics * type checking on dictfe * netdata chart to monitor rrdcontext triggers * Group chart label updates * rrdcontext better handling of collected rrdsets * rrdpush incremental transmition of definitions should use as much buffer as possible * require 1MB per chart * empty the sender buffer before enabling metrics streaming * fill up to 50% of buffer * reset signaling metrics sending * use the shared variable for status * use separate host flag for enabling streaming of metrics * make sure the flag is clear * add logging for streaming * add logging for streaming on buffer overflow * circular_buffer proper sizing * removed obsolete logs * do not execute worker jobs if not necessary * better messages about compression disabling * proper use of flags and updating rrdset last access time every time the obsoletion flag is flipped * monitor stream sender used buffer ratio * Update exporting unit tests * no need to compare label value with strcmp * streaming send workers now monitor bandwidth * workers now use strings * streaming receiver monitors incoming bandwidth * parser shift of worker ids * minor fixes * Group chart label updates * Populate context with dimensions that have data * Fix chart id * better shift of parser worker ids * fix for streaming compression * properly count received bytes * ensure LZ4 compression ring buffer does not wrap prematurely * do not stream empty charts; do not process empty instances in rrdcontext * need_to_send_chart_definition() does not need an rrdset lock any more * rrdcontext objects are collected, after data have been written to the db * better logging of RRDCONTEXT transitions * always set all variables needed by the worker utilization charts * implemented double linked list for most objects; eliminated alarm indexes from rrdhost; and many more fixes * lockless strings design - string_dup() and string_freez() are totally lockless when they dont need to touch Judy - only Judy is protected with a read/write lock * STRING code re-organization for clarity * thread_cache improvements; double numbers precision on worker threads * STRING_ENTRY now shadown STRING, so no duplicate definition is required; string_length() renamed to string_strlen() to follow the paradigm of all other functions, STRING internal statistics are now only compiled with NETDATA_INTERNAL_CHECKS * rrdhost index by hostname now cleans up; aclk queries of archieved hosts do not index hosts * Add index to speed up database context searches * Removed last_updated optimization (was also buggy after latest merge with master) Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Co-authored-by: Vladimir Kobal <vlad@prokk.net>
Diffstat (limited to 'streaming/sender.c')
-rw-r--r--streaming/sender.c307
1 files changed, 216 insertions, 91 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index c4836aeafb..a4d7b78dfd 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -17,9 +17,12 @@
#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12
#define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13
#define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14
+#define WORKER_SENDER_JOB_BUFFER_RATIO 15
+#define WORKER_SENDER_JOB_BYTES_RECEIVED 16
+#define WORKER_SENDER_JOB_BYTES_SENT 17
-#if WORKER_UTILIZATION_MAX_JOB_TYPES < 15
-#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 15
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < 18
+#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 18
#endif
extern struct config stream_config;
@@ -33,6 +36,11 @@ void sender_start(struct sender_state *s) {
buffer_flush(s->build);
}
+void sender_cancel(struct sender_state *s) {
+ buffer_flush(s->build);
+ netdata_mutex_unlock(&s->mutex);
+}
+
static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
#ifdef ENABLE_COMPRESSION
@@ -43,11 +51,11 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
*/
static inline void deactivate_compression(struct sender_state *s) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION);
- error("STREAM_COMPRESSION: Deactivating compression to avoid stream corruption");
+ error("STREAM_COMPRESSION: Compression returned error, disabling it.");
default_compression_enabled = 0;
s->rrdpush_compression = 0;
s->version = STREAM_VERSION_CLABELS;
- error("STREAM_COMPRESSION %s [send to %s]: Restarting connection without compression", s->host->hostname, s->connected_to);
+ error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to);
rrdpush_sender_thread_close_socket(s->host);
}
#endif
@@ -59,27 +67,55 @@ void sender_commit(struct sender_state *s) {
#ifdef ENABLE_COMPRESSION
if (src && src_len) {
if (s->compressor && s->rrdpush_compression) {
- src_len = s->compressor->compress(s->compressor, src, src_len, &src);
- if (!src_len) {
- deactivate_compression(s);
- buffer_flush(s->build);
- netdata_mutex_unlock(&s->mutex);
- return;
+ while(src_len) {
+ size_t size_to_compress = src_len;
+
+ if(size_to_compress > LZ4_MAX_MSG_SIZE) {
+ // we need to find the last newline
+ // so that the decompressor will have a whole line to work with
+
+ const char *t = &src[LZ4_MAX_MSG_SIZE - 1];
+ while(t-- > src)
+ if(*t == '\n')
+ break;
+
+ if(t == src)
+ size_to_compress = LZ4_MAX_MSG_SIZE;
+ else
+ size_to_compress = t - src + 1;
+ }
+
+ char *dst;
+ size_t dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst);
+ if (!dst_len) {
+ deactivate_compression(s);
+ buffer_flush(s->build);
+ netdata_mutex_unlock(&s->mutex);
+ return;
+ }
+
+ if(cbuffer_add_unsafe(s->host->sender->buffer, dst, dst_len))
+ s->overflow = 1;
+
+ src = src + size_to_compress;
+ src_len -= size_to_compress;
}
}
- if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
+ else if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
s->overflow = 1;
}
#else
if(cbuffer_add_unsafe(s->host->sender->buffer, src, src_len))
s->overflow = 1;
#endif
+
buffer_flush(s->build);
netdata_mutex_unlock(&s->mutex);
}
static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
+ rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_COLLECTED_METRICS);
__atomic_clear(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST);
if(host->rrdpush_sender_socket != -1) {
@@ -94,11 +130,11 @@ static inline void rrdpush_sender_add_host_variable_to_buffer_nolock(RRDHOST *ho
buffer_sprintf(
host->sender->build
, "VARIABLE HOST %s = " NETDATA_DOUBLE_FORMAT "\n"
- , rv->name
+ , rrdvar_name(rv)
, *value
);
- debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rv->name, *value);
+ debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rrdvar_name(rv), *value);
}
void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv) {
@@ -110,7 +146,7 @@ void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv) {
}
-static int rrdpush_sender_thread_custom_host_variables_callback(void *rrdvar_ptr, void *host_ptr) {
+static int rrdpush_sender_thread_custom_host_variables_callback(const char *name __maybe_unused, void *rrdvar_ptr, void *host_ptr) {
RRDVAR *rv = (RRDVAR *)rrdvar_ptr;
RRDHOST *host = (RRDHOST *)host_ptr;
@@ -127,7 +163,7 @@ static int rrdpush_sender_thread_custom_host_variables_callback(void *rrdvar_ptr
static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
sender_start(host->sender);
- int ret = rrdvar_callback_for_all_host_variables(host, rrdpush_sender_thread_custom_host_variables_callback, host);
+ int ret = rrdvar_walkthrough_read(host->rrdvar_root_index, rrdpush_sender_thread_custom_host_variables_callback, host);
(void)ret;
sender_commit(host->sender);
@@ -162,7 +198,7 @@ static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) {
size_t len = cbuffer_next_unsafe(host->sender->buffer, NULL);
if (len)
- error("STREAM %s [send]: discarding %zu bytes of metrics already in the buffer.", host->hostname, len);
+ error("STREAM %s [send]: discarding %zu bytes of metrics already in the buffer.", rrdhost_hostname(host), len);
cbuffer_remove_unsafe(host->sender->buffer, len);
netdata_mutex_unlock(&host->sender->mutex);
@@ -259,7 +295,7 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
rrdpush_sender_thread_close_socket(host);
debug(D_STREAM, "STREAM: Attempting to connect...");
- info("STREAM %s [send to %s]: connecting...", host->hostname, host->rrdpush_send_destination);
+ info("STREAM %s [send to %s]: connecting...", rrdhost_hostname(host), host->rrdpush_send_destination);
host->rrdpush_sender_socket = connect_to_one_of_destinations(
host->destinations
@@ -272,11 +308,11 @@ static int rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_po
);
if(unlikely(host->rrdpush_sender_socket == -1)) {
- error("STREAM %s [send to %s]: failed to connect", host->hostname, host->rrdpush_send_destination);
+ error("STREAM %s [send to %s]: failed to connect", rrdhost_hostname(host), host->rrdpush_send_destination);
return 0;
}
- info("STREAM %s [send to %s]: initializing communication...", host->hostname, s->connected_to);
+ info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to);
#ifdef ENABLE_HTTPS
if( netdata_client_ctx ){
@@ -370,19 +406,19 @@ if(!s->rrdpush_compression)
"User-Agent: %s/%s\r\n"
"Accept: */*\r\n\r\n"
, host->rrdpush_send_api_key
- , host->hostname
- , host->registry_hostname
+ , rrdhost_hostname(host)
+ , rrdhost_registry_hostname(host)
, host->machine_guid
, default_rrd_update_every
- , host->os
- , host->timezone
- , host->abbrev_timezone
+ , rrdhost_os(host)
+ , rrdhost_timezone(host)
+ , rrdhost_abbrev_timezone(host)
, host->utc_offset
, host->system_info->hops + 1
, host->system_info->ml_capable
, host->system_info->ml_enabled
, host->system_info->mc_version
- , (host->tags) ? host->tags : ""
+ , rrdhost_tags(host)
, s->version
, (host->system_info->cloud_provider_type) ? host->system_info->cloud_provider_type : ""
, (host->system_info->cloud_instance_type) ? host->system_info->cloud_instance_type : ""
@@ -412,8 +448,8 @@ if(!s->rrdpush_compression)
, (host->system_info->host_ram_total) ? host->system_info->host_ram_total : ""
, (host->system_info->host_disk_space) ? host->system_info->host_disk_space : ""
, STREAMING_PROTOCOL_VERSION
- , host->program_name
- , host->program_version
+ , rrdhost_program_name(host)
+ , rrdhost_program_version(host)
);
http[eol] = 0x00;
rrdpush_clean_encoded(&se);
@@ -456,12 +492,12 @@ if(!s->rrdpush_compression)
if(send_timeout(host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) {
#endif
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
- error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", host->hostname, s->connected_to);
+ error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to);
rrdpush_sender_thread_close_socket(host);
return 0;
}
- info("STREAM %s [send to %s]: waiting response from remote netdata...", host->hostname, s->connected_to);
+ info("STREAM %s [send to %s]: waiting response from remote netdata...", rrdhost_hostname(host), s->connected_to);
ssize_t received;
#ifdef ENABLE_HTTPS
@@ -472,7 +508,7 @@ if(!s->rrdpush_compression)
if(received == -1) {
#endif
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
- error("STREAM %s [send to %s]: remote netdata does not respond.", host->hostname, s->connected_to);
+ error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to);
rrdpush_sender_thread_close_socket(host);
return 0;
}
@@ -482,7 +518,7 @@ if(!s->rrdpush_compression)
int32_t version = (int32_t)parse_stream_version(host, http);
if(version == -1) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE);
- error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", host->hostname, s->connected_to);
+ error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", rrdhost_hostname(host), s->connected_to);
rrdpush_sender_thread_close_socket(host);
//catch other reject reasons and force to check other destinations
if (host->destination->next)
@@ -490,19 +526,19 @@ if(!s->rrdpush_compression)
return 0;
}
else if(version == -2) {
- error("STREAM %s [send to %s]: remote server is the localhost for [%s].", host->hostname, s->connected_to, host->hostname);
+ error("STREAM %s [send to %s]: remote server is the localhost for [%s].", rrdhost_hostname(host), s->connected_to, rrdhost_hostname(host));
rrdpush_sender_thread_close_socket(host);
host->destination->disabled_because_of_localhost = 1;
return 0;
}
else if(version == -3) {
- error("STREAM %s [send to %s]: remote server already receives metrics for [%s].", host->hostname, s->connected_to, host->hostname);
+ error("STREAM %s [send to %s]: remote server already receives metrics for [%s].", rrdhost_hostname(host), s->connected_to, rrdhost_hostname(host));
rrdpush_sender_thread_close_socket(host);
host->destination->disabled_already_streaming = now_realtime_sec();
return 0;
}
else if(version == -4) {
- error("STREAM %s [send to %s]: remote server denied access for [%s].", host->hostname, s->connected_to, host->hostname);
+ error("STREAM %s [send to %s]: remote server denied access for [%s].", rrdhost_hostname(host), s->connected_to, rrdhost_hostname(host));
rrdpush_sender_thread_close_socket(host);
if (host->destination->next)
host->destination->disabled_because_of_denied_access = 1;
@@ -520,23 +556,23 @@ if(!s->rrdpush_compression)
}
else {
//parent does not support compression or has compression disabled
- debug(D_STREAM, "Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, s->host->hostname);
- infoerr("Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, s->host->hostname);
+ debug(D_STREAM, "Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, rrdhost_hostname(s->host));
+ infoerr("Stream is uncompressed! One of the agents (%s <-> %s) does not support compression OR compression is disabled.", s->connected_to, rrdhost_hostname(s->host));
s->version = STREAM_VERSION_CLABELS;
}
#endif //ENABLE_COMPRESSION
- info("STREAM %s [send to %s]: established communication with a parent using protocol version %d - ready to send metrics..."
- , host->hostname
+ info("STREAM %s [send to %s]: established communication with a parent using protocol version %d"
+ , rrdhost_hostname(host)
, s->connected_to
, s->version);
if(sock_setnonblock(host->rrdpush_sender_socket) < 0)
- error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", host->hostname, s->connected_to);
+ error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to);
if(sock_enlarge_out(host->rrdpush_sender_socket) < 0)
- error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", host->hostname, s->connected_to);
+ error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to);
debug(D_STREAM, "STREAM: Connected on fd %d...", host->rrdpush_sender_socket);
@@ -578,7 +614,8 @@ static void attempt_to_connect(struct sender_state *state)
}
// TCP window is open and we have data to transmit.
-void attempt_to_send(struct sender_state *s) {
+static ssize_t attempt_to_send(struct sender_state *s) {
+ ssize_t ret = 0;
rrdpush_send_labels(s->host);
@@ -591,42 +628,44 @@ void attempt_to_send(struct sender_state *s) {
char *chunk;
size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk);
debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
- ssize_t ret;
+
#ifdef ENABLE_HTTPS
SSL *conn = s->host->ssl.conn ;
- if(conn && !s->host->ssl.flags) {
+ if(conn && !s->host->ssl.flags)
ret = SSL_write(conn, chunk, outstanding);
- } else {
+ else
ret = send(s->host->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
- }
#else
ret = send(s->host->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
#endif
+
if (likely(ret > 0)) {
cbuffer_remove_unsafe(s->buffer, ret);
s->sent_bytes_on_this_connection += ret;
s->sent_bytes += ret;
- debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", s->host->hostname, s->connected_to, ret);
+ debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", rrdhost_hostname(s->host), s->connected_to, ret);
s->last_sent_t = now_monotonic_sec();
}
else if (ret == -1 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
- debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", s->host->hostname, s->connected_to);
+ debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", rrdhost_hostname(s->host), s->connected_to);
else if (ret == -1) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR);
debug(D_STREAM, "STREAM: Send failed - closing socket...");
- error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", s->host->hostname, s->connected_to, s->sent_bytes_on_this_connection);
+ error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", rrdhost_hostname(s->host), s->connected_to, s->sent_bytes_on_this_connection);
rrdpush_sender_thread_close_socket(s->host);
}
- else {
+ else
debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission");
- }
netdata_mutex_unlock(&s->mutex);
netdata_thread_enable_cancelability();
+
+ return ret;
}
-void attempt_read(struct sender_state *s) {
-int ret;
+static ssize_t attempt_read(struct sender_state *s) {
+ ssize_t ret = 0;
+
#ifdef ENABLE_HTTPS
if (s->host->ssl.conn && !s->host->stream_ssl.flags) {
ERR_clear_error();
@@ -634,44 +673,46 @@ int ret;
ret = SSL_read(s->host->ssl.conn, s->read_buffer, desired);
if (ret > 0 ) {
s->read_len += ret;
- return;
+ return ret;
}
int sslerrno = SSL_get_error(s->host->ssl.conn, desired);
if (sslerrno == SSL_ERROR_WANT_READ || sslerrno == SSL_ERROR_WANT_WRITE)
- return;
+ return ret;
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
u_long err;
char buf[256];
while ((err = ERR_get_error()) != 0) {
ERR_error_string_n(err, buf, sizeof(buf));
- error("STREAM %s [send to %s] ssl error: %s", s->host->hostname, s->connected_to, buf);
+ error("STREAM %s [send to %s] ssl error: %s", rrdhost_hostname(s->host), s->connected_to, buf);
}
error("Restarting connection");
rrdpush_sender_thread_close_socket(s->host);
- return;
+ return ret;
}
#endif
ret = recv(s->host->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT);
- if (ret>0) {
+ if (ret > 0) {
s->read_len += ret;
- return;
+ return ret;
}
- debug(D_STREAM, "Socket was POLLIN, but req %zu bytes gave %d", sizeof(s->read_buffer) - s->read_len - 1, ret);
+ debug(D_STREAM, "Socket was POLLIN, but req %zu bytes gave %zd", sizeof(s->read_buffer) - s->read_len - 1, ret);
- if (ret<0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
- return;
+ if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
+ return ret;
- if (ret==0) {
+ if (ret == 0) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED);
- error("STREAM %s [send to %s]: connection closed by far end. Restarting connection", s->host->hostname, s->connected_to);
+ error("STREAM %s [send to %s]: connection closed by far end. Restarting connection", rrdhost_hostname(s->host), s->connected_to);
}
else {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR);
- error("STREAM %s [send to %s]: error during receive (%d). Restarting connection", s->host->hostname, s->connected_to, ret);
+ error("STREAM %s [send to %s]: error during receive (%zd). Restarting connection", rrdhost_hostname(s->host), s->connected_to, ret);
}
rrdpush_sender_thread_close_socket(s->host);
+
+ return ret;
}
// This is just a placeholder until the gap filling state machine is inserted
@@ -680,7 +721,7 @@ void execute_commands(struct sender_state *s) {
*end = 0;
while( start<end && (newline=strchr(start, '\n')) ) {
*newline = 0;
- info("STREAM %s [send to %s] received command over connection: %s", s->host->hostname, s->connected_to, start);
+ info("STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start);
start = newline+1;
}
if (start<end) {
@@ -689,15 +730,57 @@ void execute_commands(struct sender_state *s) {
}
}
+struct rrdpush_sender_thread_data {
+ struct sender_state *sender_state;
+ RRDHOST *host;
+ DICTFE dictfe;
+ enum {
+ SENDING_DEFINITIONS_RESTART,
+ SENDING_DEFINITIONS_CONTINUE,
+ SENDING_DEFINITIONS_DONE,
+ } sending_definitions_status;
+};
+
+static size_t cbuffer_available_bytes_with_lock(struct rrdpush_sender_thread_data *thread_data) {
+ netdata_mutex_lock(&thread_data->sender_state->mutex);
+ size_t outstanding = cbuffer_available_size_unsafe(thread_data->sender_state->host->sender->buffer);
+ netdata_mutex_unlock(&thread_data->sender_state->mutex);
+ return outstanding;
+}
+
+static void rrdpush_queue_incremental_definitions(struct rrdpush_sender_thread_data *thread_data) {
+
+ while(__atomic_load_n(&thread_data->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST)
+ && thread_data->sending_definitions_status != SENDING_DEFINITIONS_DONE
+ && cbuffer_available_bytes_with_lock(thread_data) > (thread_data->sender_state->buffer->max_size / 2)) {
+
+ if(thread_data->sending_definitions_status == SENDING_DEFINITIONS_RESTART)
+ info("STREAM %s [send to %s]: sending metric definitions...", rrdhost_hostname(thread_data->host), thread_data->sender_state->connected_to);
+
+ bool more_defs_available = rrdpush_incremental_transmission_of_chart_definitions(
+ thread_data->sender_state->host, &thread_data->dictfe,
+ thread_data->sending_definitions_status == SENDING_DEFINITIONS_RESTART, false);
+
+ if (unlikely(!more_defs_available)) {
+ thread_data->sending_definitions_status = SENDING_DEFINITIONS_DONE;
+ info("STREAM %s [send to %s]: sending metric definitions finished.", rrdhost_hostname(thread_data->host), thread_data->sender_state->connected_to);
+ }
+ else
+ thread_data->sending_definitions_status = SENDING_DEFINITIONS_CONTINUE;
+ }
+}
static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
+ struct rrdpush_sender_thread_data *data = ptr;
worker_unregister();
- RRDHOST *host = (RRDHOST *)ptr;
+ RRDHOST *host = data->host;
+
+ rrdpush_incremental_transmission_of_chart_definitions(host, &data->dictfe, false, true);
netdata_mutex_lock(&host->sender->mutex);
- info("STREAM %s [send]: sending thread cleans up...", host->hostname);
+ info("STREAM %s [send]: sending thread cleans up...", rrdhost_hostname(host));
rrdpush_sender_thread_close_socket(host);
@@ -713,15 +796,17 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
}
if(!host->rrdpush_sender_join) {
- info("STREAM %s [send]: sending thread detaches itself.", host->hostname);
+ info("STREAM %s [send]: sending thread detaches itself.", rrdhost_hostname(host));
netdata_thread_detach(netdata_thread_self());
}
host->rrdpush_sender_spawn = 0;
- info("STREAM %s [send]: sending thread now exits.", host->hostname);
+ info("STREAM %s [send]: sending thread now exits.", rrdhost_hostname(host));
netdata_mutex_unlock(&host->sender->mutex);
+
+ freez(data);
}
void sender_init(RRDHOST *parent)
@@ -749,7 +834,7 @@ void *rrdpush_sender_thread(void *ptr) {
!*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key ||
!*s->host->rrdpush_send_api_key) {
error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.",
- s->host->hostname, s->task_id);
+ rrdhost_hostname(s->host), s->task_id);
return NULL;
}
@@ -760,7 +845,7 @@ void *rrdpush_sender_thread(void *ptr) {
}
#endif
- info("STREAM %s [send]: thread created (task id %d)", s->host->hostname, s->task_id);
+ info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), s->task_id);
s->timeout = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 60);
s->default_port = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "default port", 19999);
@@ -774,9 +859,10 @@ void *rrdpush_sender_thread(void *ptr) {
remote_clock_resync_iterations); // TODO: REMOVE FOR SLEW / GAPFILLING
// initialize rrdpush globals
+ rrdhost_flag_clear(s->host, RRDHOST_FLAG_STREAM_COLLECTED_METRICS);
__atomic_clear(&s->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST);
if(pipe(s->host->rrdpush_sender_pipe) == -1) {
- error("STREAM %s [send]: cannot create required pipe. DISABLING STREAMING THREAD", s->host->hostname);
+ error("STREAM %s [send]: cannot create required pipe. DISABLING STREAMING THREAD", rrdhost_hostname(s->host));
return NULL;
}
s->version = STREAMING_PROTOCOL_CURRENT_VERSION;
@@ -808,7 +894,17 @@ void *rrdpush_sender_thread(void *ptr) {
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake");
- netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, s->host);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENTAL);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENTAL);
+
+ struct rrdpush_sender_thread_data *thread_data = callocz(1, sizeof(struct rrdpush_sender_thread_data));
+ thread_data->sender_state = s;
+ thread_data->host = s->host;
+ thread_data->sending_definitions_status = SENDING_DEFINITIONS_RESTART;
+
+ netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, thread_data);
+
for(; s->host->rrdpush_send_enabled && !netdata_exit ;) {
// check for outstanding cancellation requests
netdata_thread_testcancel();
@@ -816,6 +912,8 @@ void *rrdpush_sender_thread(void *ptr) {
// The connection attempt blocks (after which we use the socket in nonblocking)
if(unlikely(s->host->rrdpush_sender_socket == -1)) {
worker_is_busy(WORKER_SENDER_JOB_CONNECT);
+ thread_data->sending_definitions_status = SENDING_DEFINITIONS_RESTART;
+ rrdhost_flag_clear(s->host, RRDHOST_FLAG_STREAM_COLLECTED_METRICS);
s->overflow = 0;
s->read_len = 0;
s->buffer->read = 0;
@@ -828,17 +926,27 @@ void *rrdpush_sender_thread(void *ptr) {
sender_commit(s);
}
rrdpush_claimed_id(s->host);
+
+ // TO PUSH METRICS WITH DEFINITIONS:
+ //if(unlikely(s->host->rrdpush_sender_socket != -1 && __atomic_load_n(&s->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST))) {
+ // thread_data->sending_definitions_status = SENDING_DEFINITIONS_DONE;
+ // rrdhost_flag_set(s->host, RRDHOST_FLAG_STREAM_COLLECTED_METRICS);
+ //}
+
continue;
}
// If the TCP window never opened then something is wrong, restart connection
if(unlikely(now_monotonic_sec() - s->last_sent_t > s->timeout)) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
- error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", s->host->hostname, s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts);
+ error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts);
rrdpush_sender_thread_close_socket(s->host);
continue;
}
+ if(unlikely(thread_data->sending_definitions_status != SENDING_DEFINITIONS_DONE))
+ rrdpush_queue_incremental_definitions(thread_data);
+
worker_is_idle();
// Wait until buffer opens in the socket or a rrdset_done_push wakes us
@@ -847,16 +955,27 @@ void *rrdpush_sender_thread(void *ptr) {
fds[Socket].fd = s->host->rrdpush_sender_socket;
netdata_mutex_lock(&s->mutex);
- char *chunk;
- size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, &chunk);
- chunk = NULL; // Do not cache pointer outside of region - could be invalidated
+ size_t outstanding = cbuffer_next_unsafe(s->host->sender->buffer, NULL);
+ size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
netdata_mutex_unlock(&s->mutex);
+
+ worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->host->sender->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->host->sender->buffer->max_size);
+
if(outstanding) {
s->send_attempts++;
fds[Socket].events = POLLIN | POLLOUT;
}
else {
fds[Socket].events = POLLIN;
+
+ if(unlikely(thread_data->sending_definitions_status == SENDING_DEFINITIONS_DONE
+ && __atomic_load_n(&s->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST)
+ && !rrdhost_flag_check(s->host, RRDHOST_FLAG_STREAM_COLLECTED_METRICS)
+ )) {
+ // let the data collection threads know we are ready to push metrics
+ rrdhost_flag_set(s->host, RRDHOST_FLAG_STREAM_COLLECTED_METRICS);
+ info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);
+ }
}
int retval = poll(fds, 2, 1000);
@@ -874,7 +993,7 @@ void *rrdpush_sender_thread(void *ptr) {
// Only errors from poll() are internal, but try restarting the connection
if(unlikely(retval == -1)) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR);
- error("STREAM %s [send to %s]: failed to poll(). Closing socket.", s->host->hostname, s->connected_to);
+ error("STREAM %s [send to %s]: failed to poll(). Closing socket.", rrdhost_hostname(s->host), s->connected_to);
rrdpush_sender_thread_close_socket(s->host);
continue;
}
@@ -884,28 +1003,34 @@ void *rrdpush_sender_thread(void *ptr) {
worker_is_busy(WORKER_SENDER_JOB_PIPE_READ);
debug(D_STREAM, "STREAM: Data added to send buffer (current buffer chunk %zu bytes)...", outstanding);
- char buffer[1000 + 1];
- if (read(s->host->rrdpush_sender_pipe[PIPE_READ], buffer, 1000) == -1)
- error("STREAM %s [send to %s]: cannot read from internal pipe.", s->host->hostname, s->connected_to);
+ char buffer[10000 + 1];
+ if (read(s->host->rrdpush_sender_pipe[PIPE_READ], buffer, 10000) == -1)
+ error("STREAM %s [send to %s]: cannot read from internal pipe.", rrdhost_hostname(s->host), s->connected_to);
}
// Read as much as possible to fill the buffer, split into full lines for execution.
if (fds[Socket].revents & POLLIN) {
worker_is_busy(WORKER_SENDER_JOB_SOCKET_RECEIVE);
- attempt_read(s);
+ ssize_t bytes = attempt_read(s);
+ if(bytes > 0)
+ worker_set_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, bytes);
}
- worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
- execute_commands(s);
+ if(unlikely(s->read_len)) {
+ worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
+ execute_commands(s);
+ }
// If we have data and have seen the TCP window open then try to close it by a transmission.
- if (outstanding && fds[Socket].revents & POLLOUT) {
+ if(likely(outstanding && fds[Socket].revents & POLLOUT)) {
worker_is_busy(WORKER_SENDER_JOB_SOCKET_SEND);
- attempt_to_send(s);
+ ssize_t bytes = attempt_to_send(s);
+ if(bytes > 0)
+ worker_set_metric(WORKER_SENDER_JOB_BYTES_SENT, bytes);
}
// TODO-GAPS - why do we only check this on the socket, not the pipe?
- if (outstanding) {
+ if(outstanding) {
char *error = NULL;
if (unlikely(fds[Socket].revents & POLLERR))
error = "socket reports errors (POLLERR)";
@@ -915,18 +1040,18 @@ void *rrdpush_sender_thread(void *ptr) {
error = "connection is invalid (POLLNVAL)";
if(unlikely(error)) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR);
- error("STREAM %s [send to %s]: restart stream because %s - %zu bytes transmitted.", s->host->hostname,
+ error("STREAM %s [send to %s]: restart stream because %s - %zu bytes transmitted.", rrdhost_hostname(s->host),
s->connected_to, error, s->sent_bytes_on_this_connection);
rrdpush_sender_thread_close_socket(s->host);
}
}
// protection from overflow
- if (s->overflow) {
+ if(unlikely(s->overflow)) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW);
errno = 0;
- error("STREAM %s [send to %s]: buffer full (%zu-bytes) after %zu bytes. Restarting connection",
- s->host->hostname, s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection);
+ error("STREAM %s [send to %s]: buffer full (allocated %zu bytes) after sending %zu bytes. Res