summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-06-19 20:52:35 +0300
committerGitHub <noreply@github.com>2023-06-19 20:52:35 +0300
commit0b4f820e9d42d10f64c3305d9c084261bc9880cf (patch)
tree641fcb81e9c84e08fbe08ca80776c6b593b218ba /streaming/sender.c
parent35884c7a8447fbeb699cae6a2a20dc0a2137c659 (diff)
/api/v2/nodes and streaming function (#15168)
* dummy streaming function * expose global functions upstream * separate function for pushing global functions * add missing conditions * allow streaming function to run async * started internal API for functions * cache host retention and expose it to /api/v2/nodes * internal API for function table fields; more progress on streaming status * abstracted and unified rrdhost status * port old coverity warning fix - although it is not needed * add ML information to rrdhost status * add ML capability to streaming to signal the transmission of ML information; added ML information to host status * protect host->receiver * count metrics and instances per host * exposed all inbound and outbound streaming * fix for ML status and dependency of DATA_WITH_ML to INTERPOLATED, not IEEE754 * update ML dummy * added all fields * added streaming group by and cleaned up accepted values by cloud * removed type * Revert "removed type" This reverts commit faae4177e603d4f85b7433f33f92ef3ccd23976e. * added context to db summary * new /api/v2/nodes schema * added ML type * change default function charts * log to trace new capa * add more debug * removed debugging code * retry on receive interrupted read; respect sender reconnect delay in all cases * set disconnected host flag and manipulate localhost child count atomically, inside set/clear receiver * fix infinite loop * send_to_plugin() now has a spinlock to ensure that only 1 thread is writing to the plugin/child at the same time * global cloud_status() call * cloud should be a section, since it will contain error information * put cloud capabilities into cloud * aclk status in /api/v2 agents sections * keep aclk_connection_counter * updates on /api/v2/nodes * final /api/v2/nodes and addition of /api/v2/nodes_instances * parametrize all /api/v2/xxx output to control which info is outputed per endpoint * always accept nodes selector * st needs to be per instance, not per node * fix merging of contexts; fix cups plugin priorities * add after and before parameters to /api/v2/contexts/nodes/nodes_instances/q * give each libuv worker a unique id * aclk http_api_v2 version 4
Diffstat (limited to 'streaming/sender.c')
-rw-r--r--streaming/sender.c130
1 files changed, 10 insertions, 120 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index c74c9b407c..3743889edb 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -490,18 +490,19 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
break;
}
}
- const char *error = stream_responses[i].error;
- int worker_job_id = stream_responses[i].worker_job_id;
- time_t delay = stream_responses[i].postpone_reconnect_seconds;
if(version >= STREAM_HANDSHAKE_OK_V1) {
host->destination->last_error = NULL;
host->destination->last_handshake = version;
- host->destination->postpone_reconnection_until = 0;
- s->capabilities = convert_stream_version_to_capabilities(version);
+ host->destination->postpone_reconnection_until = now_realtime_sec() + s->reconnect_delay;
+ s->capabilities = convert_stream_version_to_capabilities(version, host, true);
return true;
}
+ const char *error = stream_responses[i].error;
+ int worker_job_id = stream_responses[i].worker_job_id;
+ time_t delay = stream_responses[i].postpone_reconnect_seconds;
+
worker_is_busy(worker_job_id);
rrdpush_sender_thread_close_socket(host);
host->destination->last_error = error;
@@ -591,7 +592,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
// info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to);
// reset our capabilities to default
- s->capabilities = stream_our_capabilities();
+ s->capabilities = stream_our_capabilities(host, true);
#ifdef ENABLE_COMPRESSION
// If we don't want compression, remove it from our capabilities
@@ -1098,119 +1099,6 @@ void rrdpush_signal_sender_to_wake_up(struct sender_state *s) {
}
}
-static NETDATA_DOUBLE rrdhost_sender_replication_completion(RRDHOST *host, time_t now, size_t *instances) {
- size_t charts = rrdhost_sender_replicating_charts(host);
- NETDATA_DOUBLE completion;
- if(!charts || !host->sender || !host->sender->replication.oldest_request_after_t)
- completion = 100.0;
- else if(!host->sender->replication.latest_completed_before_t || host->sender->replication.latest_completed_before_t < host->sender->replication.oldest_request_after_t)
- completion = 0.0;
- else {
- time_t total = now - host->sender->replication.oldest_request_after_t;
- time_t current = host->sender->replication.latest_completed_before_t - host->sender->replication.oldest_request_after_t;
- completion = (NETDATA_DOUBLE) current * 100.0 / (NETDATA_DOUBLE) total;
- }
-
- *instances = charts;
-
- return completion;
-}
-
-void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused) {
- bool online = rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
- buffer_json_member_add_object(wb, key);
-
- if(host->sender)
- buffer_json_member_add_uint64(wb, "hops", host->sender->hops);
-
- buffer_json_member_add_boolean(wb, "online", online);
-
- if(host->sender && host->sender->last_state_since_t) {
- buffer_json_member_add_time_t(wb, "since", host->sender->last_state_since_t);
- buffer_json_member_add_time_t(wb, "age", now - host->sender->last_state_since_t);
- }
-
- if(!online && host->sender && host->sender->exit.reason)
- buffer_json_member_add_string(wb, "reason", host->sender->exit.reason);
-
- buffer_json_member_add_object(wb, "replication");
- {
- size_t instances;
- NETDATA_DOUBLE completion = rrdhost_sender_replication_completion(host, now, &instances);
- buffer_json_member_add_boolean(wb, "in_progress", instances);
- buffer_json_member_add_double(wb, "completion", completion);
- buffer_json_member_add_uint64(wb, "instances", instances);
- }
- buffer_json_object_close(wb);
-
- if(host->sender) {
- netdata_mutex_lock(&host->sender->mutex);
-
- buffer_json_member_add_object(wb, "destination");
- {
- char buf[1024 + 1];
- if(online && host->sender->rrdpush_sender_socket != -1) {
- SOCKET_PEERS peers = socket_peers(host->sender->rrdpush_sender_socket);
- bool ssl = SSL_connection(&host->sender->ssl);
-
- snprintfz(buf, 1024, "[%s]:%d%s", peers.local.ip, peers.local.port, ssl ? ":SSL" : "");
- buffer_json_member_add_string(wb, "local", buf);
-
- snprintfz(buf, 1024, "[%s]:%d%s", peers.peer.ip, peers.peer.port, ssl ? ":SSL" : "");
- buffer_json_member_add_string(wb, "remote", buf);
-
- stream_capabilities_to_json_array(wb, host->sender->capabilities, "capabilities");
-
- buffer_json_member_add_object(wb, "traffic");
- {
- bool compression = false;
-#ifdef ENABLE_COMPRESSION
- compression = (stream_has_capability(host->sender, STREAM_CAP_COMPRESSION) && host->sender->compressor);
-#endif
- buffer_json_member_add_boolean(wb, "compression", compression);
- buffer_json_member_add_uint64(wb, "data", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_DATA]);
- buffer_json_member_add_uint64(wb, "metadata", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_METADATA]);
- buffer_json_member_add_uint64(wb, "functions", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_FUNCTIONS]);
- buffer_json_member_add_uint64(wb, "replication", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_REPLICATION]);
- }
- buffer_json_object_close(wb); // traffic
- }
-
- buffer_json_member_add_array(wb, "candidates");
- struct rrdpush_destinations *d;
- for (d = host->destinations; d; d = d->next) {
- buffer_json_add_array_item_object(wb);
- {
-
- if (d->ssl) {
- snprintfz(buf, 1024, "%s:SSL", string2str(d->destination));
- buffer_json_member_add_string(wb, "destination", buf);
- }
- else
- buffer_json_member_add_string(wb, "destination", string2str(d->destination));
-
- buffer_json_member_add_time_t(wb, "last_check", d->last_attempt);
- buffer_json_member_add_time_t(wb, "age", now - d->last_attempt);
- buffer_json_member_add_string(wb, "last_error", d->last_error);
- buffer_json_member_add_string(wb, "last_handshake",
- stream_handshake_error_to_string(d->last_handshake));
- buffer_json_member_add_time_t(wb, "next_check", d->postpone_reconnection_until);
- buffer_json_member_add_time_t(wb, "next_in",
- (d->postpone_reconnection_until > now) ?
- d->postpone_reconnection_until - now : 0);
- }
- buffer_json_object_close(wb); // each candidate
- }
- buffer_json_array_close(wb); // candidates
- }
- buffer_json_object_close(wb); // destination
-
- netdata_mutex_unlock(&host->sender->mutex);
- }
-
- buffer_json_object_close(wb); // streaming
-}
-
static bool rrdhost_set_sender(RRDHOST *host) {
if(unlikely(!host->sender)) return false;
@@ -1219,6 +1107,7 @@ static bool rrdhost_set_sender(RRDHOST *host) {
if(!host->sender->tid) {
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
+ host->rrdpush_sender_connection_counter++;
host->sender->tid = gettid();
host->sender->last_state_since_t = now_realtime_sec();
host->sender->exit.reason = NULL;
@@ -1435,8 +1324,9 @@ void *rrdpush_sender_thread(void *ptr) {
break;
now_s = s->last_traffic_seen_t = now_monotonic_sec();
- rrdpush_claimed_id(s->host);
+ rrdpush_send_claimed_id(s->host);
rrdpush_send_host_labels(s->host);
+ rrdpush_send_global_functions(s->host);
s->replication.oldest_request_after_t = 0;
rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);