summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-06-08 16:33:22 +0300
committerGitHub <noreply@github.com>2023-06-08 16:33:22 +0300
commit80d83b7bd1eca5872ed3ac5c34eb8bcb5fbd56e8 (patch)
tree8afd42a59cc6b114b26fdd3e38b137db99357154 /streaming/sender.c
parent028e26a194f5421432b577ee67afed8b66c0f6b7 (diff)
api v2 nodes for streaming statuses (#15162)
* api v2 nodes for streaming statuses * remove test * move parts of the output * in api/v2/data return 5 values per point when aggregation=percentage and raw option is given; return final values when aggregation=percentage is not the final grouping
Diffstat (limited to 'streaming/sender.c')
-rw-r--r--streaming/sender.c170
1 files changed, 159 insertions, 11 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index cedf58ce2d..b207039b5e 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -84,7 +84,7 @@ static inline void deactivate_compression(struct sender_state *s) {
#define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3
// Collector thread finishing a transmission
-void sender_commit(struct sender_state *s, BUFFER *wb) {
+void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) {
if(unlikely(wb != sender_thread_buffer))
fatal("STREAMING: sender is trying to commit a buffer that is not this thread's buffer.");
@@ -163,6 +163,8 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
if(cbuffer_add_unsafe(s->buffer, dst, dst_len))
s->flags |= SENDER_FLAG_OVERFLOW;
+ else
+ s->sent_bytes_on_this_connection_per_type[type] += dst_len;
src = src + size_to_compress;
src_len -= size_to_compress;
@@ -170,9 +172,13 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
}
else if(cbuffer_add_unsafe(s->buffer, src, src_len))
s->flags |= SENDER_FLAG_OVERFLOW;
+ else
+ s->sent_bytes_on_this_connection_per_type[type] += src_len;
#else
if(cbuffer_add_unsafe(s->buffer, src, src_len))
s->flags |= SENDER_FLAG_OVERFLOW;
+ else
+ s->sent_bytes_on_this_connection_per_type[type] += src_len;
#endif
replication_recalculate_buffer_used_ratio_unsafe(s);
@@ -204,7 +210,7 @@ void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQU
if(rrdhost_can_send_definitions_to_parent(host)) {
BUFFER *wb = sender_start(host->sender);
rrdpush_sender_add_host_variable_to_buffer(wb, rva);
- sender_commit(host->sender, wb);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
sender_thread_buffer_free();
}
}
@@ -233,7 +239,7 @@ static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
};
int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, &tmp);
(void)ret;
- sender_commit(host->sender, wb);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
sender_thread_buffer_free();
debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
@@ -426,6 +432,33 @@ struct {
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 1 * 60, // 1 minute
},
+ {
+ .response = START_STREAMING_ERROR_BUSY_TRY_LATER,
+ .length = sizeof(START_STREAMING_ERROR_BUSY_TRY_LATER) - 1,
+ .version = STREAM_HANDSHAKE_BUSY_TRY_LATER,
+ .dynamic = false,
+ .error = "remote server is currently busy, we should try later",
+ .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
+ .postpone_reconnect_seconds = 2 * 60, // 2 minutes
+ },
+ {
+ .response = START_STREAMING_ERROR_INTERNAL_ERROR,
+ .length = sizeof(START_STREAMING_ERROR_INTERNAL_ERROR) - 1,
+ .version = STREAM_HANDSHAKE_INTERNAL_ERROR,
+ .dynamic = false,
+ .error = "remote server is encountered an internal error, we should try later",
+ .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
+ .postpone_reconnect_seconds = 5 * 60, // 5 minutes
+ },
+ {
+ .response = START_STREAMING_ERROR_INITIALIZATION,
+ .length = sizeof(START_STREAMING_ERROR_INITIALIZATION) - 1,
+ .version = STREAM_HANDSHAKE_INITIALIZATION,
+ .dynamic = false,
+ .error = "remote server is initializing, we should try later",
+ .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
+ .postpone_reconnect_seconds = 2 * 60, // 2 minute
+ },
// terminator
{
@@ -750,6 +783,10 @@ static bool attempt_to_connect(struct sender_state *state)
{
state->send_attempts = 0;
+ // reset the bytes we have sent for this session
+ state->sent_bytes_on_this_connection = 0;
+ memset(state->sent_bytes_on_this_connection_per_type, 0, sizeof(state->sent_bytes_on_this_connection_per_type));
+
if(rrdpush_sender_thread_connect_to_parent(state->host, state->default_port, state->timeout, state)) {
// reset the buffer, to properly send charts and metrics
rrdpush_sender_on_connect(state->host);
@@ -760,9 +797,6 @@ static bool attempt_to_connect(struct sender_state *state)
// make sure the next reconnection will be immediate
state->not_connected_loops = 0;
- // reset the bytes we have sent for this session
- state->sent_bytes_on_this_connection = 0;
-
// let the data collection threads know we are ready
rrdhost_flag_set(state->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
@@ -776,9 +810,6 @@ static bool attempt_to_connect(struct sender_state *state)
// increase the failed connections counter
state->not_connected_loops++;
- // reset the number of bytes sent
- state->sent_bytes_on_this_connection = 0;
-
// slow re-connection on repeating errors
usec_t now_ut = now_monotonic_usec();
usec_t end_ut = now_ut + USEC_PER_SEC * state->reconnect_delay;
@@ -899,7 +930,7 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
buffer_fast_strcat(wb, buffer_tostring(func_wb), buffer_strlen(func_wb));
pluginsd_function_result_end_to_buffer(wb);
- sender_commit(s, wb);
+ sender_commit(s, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
sender_thread_buffer_free();
internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %llu usec).",
@@ -1067,6 +1098,120 @@ void rrdpush_signal_sender_to_wake_up(struct sender_state *s) {
}
}
+static NETDATA_DOUBLE rrdhost_sender_replication_completion(RRDHOST *host, time_t now, size_t *instances) {
+ size_t charts = rrdhost_sender_replicating_charts(host);
+ NETDATA_DOUBLE completion;
+ if(!charts || !host->sender->replication.oldest_request_after_t)
+ completion = 100.0;
+ else if(!host->sender->replication.latest_completed_before_t || host->sender->replication.latest_completed_before_t < host->sender->replication.oldest_request_after_t)
+ completion = 0.0;
+ else {
+ time_t total = now - host->sender->replication.oldest_request_after_t;
+ time_t current = host->sender->replication.latest_completed_before_t - host->sender->replication.oldest_request_after_t;
+ completion = (NETDATA_DOUBLE) current * 100.0 / (NETDATA_DOUBLE) total;
+ }
+
+ *instances = charts;
+
+ return completion;
+}
+
+void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused) {
+ bool online = rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
+ buffer_json_member_add_object(wb, key);
+
+ if(host->sender)
+ buffer_json_member_add_uint64(wb, "hops", host->sender->hops);
+
+ buffer_json_member_add_boolean(wb, "online", online);
+
+ if(host->sender && host->sender->last_state_since_t) {
+ buffer_json_member_add_time_t(wb, "since", host->sender->last_state_since_t);
+ buffer_json_member_add_time_t(wb, "age", now - host->sender->last_state_since_t);
+ }
+
+ if(!online && host->sender && host->sender->exit.reason)
+ buffer_json_member_add_string(wb, "reason", host->sender->exit.reason);
+
+ buffer_json_member_add_object(wb, "replication");
+ {
+ size_t instances;
+ NETDATA_DOUBLE completion = rrdhost_sender_replication_completion(host, now, &instances);
+ buffer_json_member_add_boolean(wb, "in_progress", instances);
+ buffer_json_member_add_double(wb, "completion", completion);
+ buffer_json_member_add_uint64(wb, "instances", instances);
+ }
+ buffer_json_object_close(wb);
+
+ if(host->sender) {
+ netdata_mutex_lock(&host->sender->mutex);
+
+ buffer_json_member_add_object(wb, "destination");
+ {
+ char buf[1024 + 1];
+ if(online && host->sender->rrdpush_sender_socket != -1) {
+ SOCKET_PEERS peers = socket_peers(host->sender->rrdpush_sender_socket);
+ bool ssl = SSL_connection(&host->sender->ssl);
+
+ snprintfz(buf, 1024, "[%s]:%d%s", peers.local.ip, peers.local.port, ssl ? ":SSL" : "");
+ buffer_json_member_add_string(wb, "local", buf);
+
+ snprintfz(buf, 1024, "[%s]:%d%s", peers.peer.ip, peers.peer.port, ssl ? ":SSL" : "");
+ buffer_json_member_add_string(wb, "remote", buf);
+
+ stream_capabilities_to_json_array(wb, online ? host->sender->capabilities : 0,
+ "capabilities");
+
+ buffer_json_member_add_object(wb, "traffic");
+ {
+ bool compression = false;
+#ifdef ENABLE_COMPRESSION
+ compression = (stream_has_capability(host->sender, STREAM_CAP_COMPRESSION) && host->sender->compressor);
+#endif
+ buffer_json_member_add_boolean(wb, "compression", compression);
+ buffer_json_member_add_uint64(wb, "data", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_DATA]);
+ buffer_json_member_add_uint64(wb, "metadata", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_METADATA]);
+ buffer_json_member_add_uint64(wb, "functions", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_FUNCTIONS]);
+ buffer_json_member_add_uint64(wb, "replication", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_REPLICATION]);
+ }
+ buffer_json_object_close(wb); // traffic
+ }
+
+ buffer_json_member_add_array(wb, "candidates");
+ struct rrdpush_destinations *d;
+ for (d = host->destinations; d; d = d->next) {
+ buffer_json_add_array_item_object(wb);
+ {
+
+ if (d->ssl) {
+ snprintfz(buf, 1024, "%s:SSL", string2str(d->destination));
+ buffer_json_member_add_string(wb, "destination", buf);
+ }
+ else
+ buffer_json_member_add_string(wb, "destination", string2str(d->destination));
+
+ buffer_json_member_add_time_t(wb, "last_check", d->last_attempt);
+ buffer_json_member_add_time_t(wb, "age", now - d->last_attempt);
+ buffer_json_member_add_string(wb, "last_error", d->last_error);
+ buffer_json_member_add_string(wb, "last_handshake",
+ stream_handshake_error_to_string(d->last_handshake));
+ buffer_json_member_add_time_t(wb, "next_check", d->postpone_reconnection_until);
+ buffer_json_member_add_time_t(wb, "next_in",
+ (d->postpone_reconnection_until > now) ?
+ d->postpone_reconnection_until - now : 0);
+ }
+ buffer_json_object_close(wb); // each candidate
+ }
+ buffer_json_array_close(wb); // candidates
+ }
+ buffer_json_object_close(wb); // destination
+
+ netdata_mutex_unlock(&host->sender->mutex);
+ }
+
+ buffer_json_object_close(wb); // streaming
+}
+
static bool rrdhost_set_sender(RRDHOST *host) {
if(unlikely(!host->sender)) return false;
@@ -1076,6 +1221,8 @@ static bool rrdhost_set_sender(RRDHOST *host) {
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
host->sender->tid = gettid();
+ host->sender->last_state_since_t = now_realtime_sec();
+ host->sender->exit.reason = NULL;
ret = true;
}
netdata_mutex_unlock(&host->sender->mutex);
@@ -1091,8 +1238,8 @@ static void rrdhost_clear_sender___while_having_sender_mutex(RRDHOST *host) {
if(host->sender->tid == gettid()) {
host->sender->tid = 0;
host->sender->exit.shutdown = false;
- host->sender->exit.reason = NULL;
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
+ host->sender->last_state_since_t = now_realtime_sec();
}
rrdpush_reset_destinations_postpone_time(host);
@@ -1291,6 +1438,7 @@ void *rrdpush_sender_thread(void *ptr) {
now_s = s->last_traffic_seen_t = now_monotonic_sec();
rrdpush_claimed_id(s->host);
rrdpush_send_host_labels(s->host);
+ s->replication.oldest_request_after_t = 0;
rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);