summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-06-19 20:52:35 +0300
committerGitHub <noreply@github.com>2023-06-19 20:52:35 +0300
commit0b4f820e9d42d10f64c3305d9c084261bc9880cf (patch)
tree641fcb81e9c84e08fbe08ca80776c6b593b218ba /streaming
parent35884c7a8447fbeb699cae6a2a20dc0a2137c659 (diff)
/api/v2/nodes and streaming function (#15168)
* dummy streaming function * expose global functions upstream * separate function for pushing global functions * add missing conditions * allow streaming function to run async * started internal API for functions * cache host retention and expose it to /api/v2/nodes * internal API for function table fields; more progress on streaming status * abstracted and unified rrdhost status * port old coverity warning fix - although it is not needed * add ML information to rrdhost status * add ML capability to streaming to signal the transmission of ML information; added ML information to host status * protect host->receiver * count metrics and instances per host * exposed all inbound and outbound streaming * fix for ML status and dependency of DATA_WITH_ML to INTERPOLATED, not IEEE754 * update ML dummy * added all fields * added streaming group by and cleaned up accepted values by cloud * removed type * Revert "removed type" This reverts commit faae4177e603d4f85b7433f33f92ef3ccd23976e. * added context to db summary * new /api/v2/nodes schema * added ML type * change default function charts * log to trace new capa * add more debug * removed debugging code * retry on receive interrupted read; respect sender reconnect delay in all cases * set disconnected host flag and manipulate localhost child count atomically, inside set/clear receiver * fix infinite loop * send_to_plugin() now has a spinlock to ensure that only 1 thread is writing to the plugin/child at the same time * global cloud_status() call * cloud should be a section, since it will contain error information * put cloud capabilities into cloud * aclk status in /api/v2 agents sections * keep aclk_connection_counter * updates on /api/v2/nodes * final /api/v2/nodes and addition of /api/v2/nodes_instances * parametrize all /api/v2/xxx output to control which info is outputed per endpoint * always accept nodes selector * st needs to be per instance, not per node * fix merging of contexts; fix cups plugin priorities * add after and before parameters to /api/v2/contexts/nodes/nodes_instances/q * give each libuv worker a unique id * aclk http_api_v2 version 4
Diffstat (limited to 'streaming')
-rw-r--r--streaming/receiver.c97
-rw-r--r--streaming/replication.c4
-rw-r--r--streaming/replication.h2
-rw-r--r--streaming/rrdpush.c92
-rw-r--r--streaming/rrdpush.h313
-rw-r--r--streaming/sender.c130
6 files changed, 398 insertions, 240 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 709f15bd58..006cbb67d0 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -93,7 +93,7 @@ PARSER_RC streaming_claimed_id(char **words, size_t num_words, void *user)
rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE);
- rrdpush_claimed_id(host);
+ rrdpush_send_claimed_id(host);
return PARSER_RC_OK;
}
@@ -104,17 +104,23 @@ static int read_stream(struct receiver_state *r, char* buffer, size_t size) {
return 0;
}
+ int tries = 100;
ssize_t bytes_read;
+ do {
+ errno = 0;
+
#ifdef ENABLE_HTTPS
- if (SSL_connection(&r->ssl))
- bytes_read = netdata_ssl_read(&r->ssl, buffer, size);
- else
- bytes_read = read(r->fd, buffer, size);
+ if (SSL_connection(&r->ssl))
+ bytes_read = netdata_ssl_read(&r->ssl, buffer, size);
+ else
+ bytes_read = read(r->fd, buffer, size);
#else
- bytes_read = read(r->fd, buffer, size);
+ bytes_read = read(r->fd, buffer, size);
#endif
+ } while(bytes_read < 0 && errno == EINTR && tries--);
+
if((bytes_read == 0 || bytes_read == -1) && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) {
error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__);
bytes_read = -3;
@@ -423,67 +429,18 @@ static void rrdpush_receiver_replication_reset(RRDHOST *host) {
rrdhost_receiver_replicating_charts_zero(host);
}
-void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused) {
- size_t receiver_hops = host->system_info ? host->system_info->hops : (host == localhost) ? 0 : 1;
-
- netdata_mutex_lock(&host->receiver_lock);
-
- buffer_json_member_add_object(wb, key);
- buffer_json_member_add_uint64(wb, "hops", receiver_hops);
-
- bool online = host == localhost || !rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN | RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
- buffer_json_member_add_boolean(wb, "online", online);
-
- if(host->child_connect_time || host->child_disconnected_time) {
- time_t since = MAX(host->child_connect_time, host->child_disconnected_time);
- buffer_json_member_add_time_t(wb, "since", since);
- buffer_json_member_add_time_t(wb, "age", now - since);
- }
-
- if(!online && host->rrdpush_last_receiver_exit_reason)
- buffer_json_member_add_string(wb, "reason", host->rrdpush_last_receiver_exit_reason);
-
- if(host != localhost && host->receiver) {
- buffer_json_member_add_object(wb, "replication");
- {
- size_t instances = rrdhost_receiver_replicating_charts(host);
- buffer_json_member_add_boolean(wb, "in_progress", instances);
- buffer_json_member_add_double(wb, "completion", host->rrdpush_receiver_replication_percent);
- buffer_json_member_add_uint64(wb, "instances", instances);
- }
- buffer_json_object_close(wb); // replication
-
- buffer_json_member_add_object(wb, "source");
- {
-
- char buf[1024 + 1];
- SOCKET_PEERS peers = socket_peers(host->receiver->fd);
- bool ssl = SSL_connection(&host->receiver->ssl);
-
- snprintfz(buf, 1024, "[%s]:%d%s", peers.local.ip, peers.local.port, ssl ? ":SSL" : "");
- buffer_json_member_add_string(wb, "local", buf);
-
- snprintfz(buf, 1024, "[%s]:%d%s", peers.peer.ip, peers.peer.port, ssl ? ":SSL" : "");
- buffer_json_member_add_string(wb, "remote", buf);
-
- stream_capabilities_to_json_array(wb, host->receiver->capabilities, "capabilities");
- }
- buffer_json_object_close(wb); // source
- }
- buffer_json_object_close(wb); // collection
-
- netdata_mutex_unlock(&host->receiver_lock);
-}
-
static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
bool signal_rrdcontext = false;
bool set_this = false;
netdata_mutex_lock(&host->receiver_lock);
- if (!host->receiver || host->receiver == rpt) {
+ if (!host->receiver) {
rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
+ host->rrdpush_receiver_connection_counter++;
+ __atomic_add_fetch(&localhost->connected_children_count, 1, __ATOMIC_RELAXED);
+
host->receiver = rpt;
rpt->host = host;
@@ -534,6 +491,9 @@ static void rrdhost_clear_receiver(struct receiver_state *rpt) {
// Make sure that we detach this thread and don't kill a freshly arriving receiver
if(host->receiver == rpt) {
+ __atomic_sub_fetch(&localhost->connected_children_count, 1, __ATOMIC_RELAXED);
+ rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
+
host->trigger_chart_obsoletion_check = 0;
host->child_connect_time = 0;
host->child_disconnected_time = now_realtime_sec();
@@ -633,11 +593,6 @@ void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, con
}
-static void rrdhost_reset_destinations(RRDHOST *host) {
- for (struct rrdpush_destinations *d = host->destinations; d; d = d->next)
- d->postpone_reconnection_until = 0;
-}
-
static void rrdpush_receive(struct receiver_state *rpt)
{
rpt->config.mode = default_rrd_memory_mode;
@@ -867,14 +822,14 @@ static void rrdpush_receive(struct receiver_state *rpt)
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// new child connected
- if (netdata_cloud_setting)
+ if (netdata_cloud_enabled)
aclk_host_state_update(rpt->host, 1);
#endif
- rrdhost_set_is_parent_label(++localhost->connected_children_count);
+ rrdhost_set_is_parent_label();
// let it reconnect to parent immediately
- rrdhost_reset_destinations(rpt->host);
+ rrdpush_reset_destinations_postpone_time(rpt->host);
size_t count = streaming_parser(rpt, &cd, rpt->fd,
#ifdef ENABLE_HTTPS
@@ -884,8 +839,6 @@ static void rrdpush_receive(struct receiver_state *rpt)
#endif
);
- rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
-
if(!rpt->exit.reason)
rpt->exit.reason = "PARSER EXIT";
@@ -898,12 +851,10 @@ static void rrdpush_receive(struct receiver_state *rpt)
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
// a child disconnected
- if (netdata_cloud_setting)
+ if (netdata_cloud_enabled)
aclk_host_state_update(rpt->host, 0);
#endif
- rrdhost_set_is_parent_label(--localhost->connected_children_count);
-
cleanup:
;
}
@@ -921,6 +872,8 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) {
, gettid());
receiver_state_free(rpt);
+
+ rrdhost_set_is_parent_label();
}
void *rrdpush_receiver_thread(void *ptr) {
diff --git a/streaming/replication.c b/streaming/replication.c
index c6fafc357b..753c72d8bd 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -797,9 +797,9 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
rrdset_id(st), r->wanted.start_streaming ? "true" : "false",
(unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before);
- int ret = r->caller.callback(buffer, r->caller.data);
+ ssize_t ret = r->caller.callback(buffer, r->caller.data);
if (ret < 0) {
- error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %d)",
+ error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %zd)",
rrdhost_hostname(r->host), rrdset_id(r->st), ret);
return false;
}
diff --git a/streaming/replication.h b/streaming/replication.h
index f5b64706c8..507b7c32f7 100644
--- a/streaming/replication.h
+++ b/streaming/replication.h
@@ -17,7 +17,7 @@ struct replication_query_statistics replication_get_query_statistics(void);
bool replicate_chart_response(RRDHOST *rh, RRDSET *rs, bool start_streaming, time_t after, time_t before);
-typedef int (*send_command)(const char *txt, void *data);
+typedef ssize_t (*send_command)(const char *txt, void *data);
bool replicate_chart_request(send_command callback, void *callback_data,
RRDHOST *rh, RRDSET *rs,
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index c481871cc7..d12d1761d3 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -67,20 +67,37 @@ static void load_stream_conf() {
freez(filename);
}
-STREAM_CAPABILITIES stream_our_capabilities() {
- return STREAM_CAP_V1 |
- STREAM_CAP_V2 |
- STREAM_CAP_VN |
- STREAM_CAP_VCAPS |
- STREAM_CAP_HLABELS |
- STREAM_CAP_CLAIM |
- STREAM_CAP_CLABELS |
- STREAM_CAP_FUNCTIONS |
- STREAM_CAP_REPLICATION |
- STREAM_CAP_BINARY |
- STREAM_CAP_INTERPOLATED |
- STREAM_HAS_COMPRESSION |
+STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) {
+
+ // we can have DATA_WITH_ML when INTERPOLATED is available
+ bool ml_capability = true;
+
+ if(host && sender) {
+ // we have DATA_WITH_ML capability
+ // we should remove the DATA_WITH_ML capability if our database does not have anomaly info
+ // this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML
+ netdata_mutex_lock(&host->receiver_lock);
+
+ if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML))
+ ml_capability = false;
+
+ netdata_mutex_unlock(&host->receiver_lock);
+ }
+
+ return STREAM_CAP_V1 |
+ STREAM_CAP_V2 |
+ STREAM_CAP_VN |
+ STREAM_CAP_VCAPS |
+ STREAM_CAP_HLABELS |
+ STREAM_CAP_CLAIM |
+ STREAM_CAP_CLABELS |
+ STREAM_CAP_FUNCTIONS |
+ STREAM_CAP_REPLICATION |
+ STREAM_CAP_BINARY |
+ STREAM_CAP_INTERPOLATED |
+ STREAM_HAS_COMPRESSION |
(ieee754_doubles ? STREAM_CAP_IEEE754 : 0) |
+ (ml_capability ? STREAM_CAP_DATA_WITH_ML : 0) |
0;
}
@@ -504,6 +521,7 @@ static int send_labels_callback(const char *name, const char *value, RRDLABEL_SR
buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value);
return 1;
}
+
void rrdpush_send_host_labels(RRDHOST *host) {
if(unlikely(!rrdhost_can_send_definitions_to_parent(host)
|| !stream_has_capability(host->sender, STREAM_CAP_HLABELS)))
@@ -519,8 +537,23 @@ void rrdpush_send_host_labels(RRDHOST *host) {
sender_thread_buffer_free();
}
-void rrdpush_claimed_id(RRDHOST *host)
-{
+void rrdpush_send_global_functions(RRDHOST *host) {
+ if(!stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS))
+ return;
+
+ if(unlikely(!rrdhost_can_send_definitions_to_parent(host)))
+ return;
+
+ BUFFER *wb = sender_start(host->sender);
+
+ rrd_functions_expose_global_rrdpush(host, wb);
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
+
+void rrdpush_send_claimed_id(RRDHOST *host) {
if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM))
return;
@@ -823,7 +856,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
rpt->tags = strdupz(value);
else if(!strcmp(name, "ver") && (rpt->capabilities & STREAM_CAP_INVALID))
- rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0));
+ rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0), NULL, false);
else {
// An old Netdata child does not have a compatible streaming protocol, map to something sane.
@@ -846,7 +879,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
name = "NETDATA_HOST_OS_DETECTION";
else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && (rpt->capabilities & STREAM_CAP_INVALID))
- rpt->capabilities = convert_stream_version_to_capabilities(1);
+ rpt->capabilities = convert_stream_version_to_capabilities(1, NULL, false);
if (unlikely(rrdhost_set_system_info_variable(rpt->system_info, name, value))) {
info("STREAM '%s' [receive from [%s]:%s]: "
@@ -860,7 +893,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if (rpt->capabilities & STREAM_CAP_INVALID)
// no version is supplied, assume version 0;
- rpt->capabilities = convert_stream_version_to_capabilities(0);
+ rpt->capabilities = convert_stream_version_to_capabilities(0, NULL, false);
// find the program name and version
if(w->user_agent && w->user_agent[0]) {
@@ -1177,9 +1210,10 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
}
void rrdpush_reset_destinations_postpone_time(RRDHOST *host) {
- struct rrdpush_destinations *d;
- for (d = host->destinations; d; d = d->next)
- d->postpone_reconnection_until = 0;
+ uint32_t wait = (host->sender) ? host->sender->reconnect_delay : 5;
+ time_t now = now_realtime_sec();
+ for (struct rrdpush_destinations *d = host->destinations; d; d = d->next)
+ d->postpone_reconnection_until = now + wait;
}
static struct {
@@ -1232,6 +1266,7 @@ static struct {
{ STREAM_CAP_BINARY, "BINARY" },
{ STREAM_CAP_INTERPOLATED, "INTERPOLATED" },
{ STREAM_CAP_IEEE754, "IEEE754" },
+ { STREAM_CAP_DATA_WITH_ML, "ML" },
{ 0 , NULL },
};
@@ -1245,7 +1280,10 @@ static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps)
}
void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key) {
- buffer_json_member_add_array(wb, key);
+ if(key)
+ buffer_json_member_add_array(wb, key);
+ else
+ buffer_json_add_array_item_array(wb);
for(size_t i = 0; capability_names[i].str ; i++) {
if(caps & capability_names[i].cap)
@@ -1275,7 +1313,7 @@ void log_sender_capabilities(struct sender_state *s) {
buffer_free(wb);
}
-STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version) {
+STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender) {
STREAM_CAPABILITIES caps = 0;
if(version <= 1) caps = STREAM_CAP_V1;
@@ -1294,7 +1332,13 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version) {
if(caps & STREAM_CAP_V2)
caps &= ~(STREAM_CAP_V1);
- return caps & stream_our_capabilities();
+ STREAM_CAPABILITIES common_caps = caps & stream_our_capabilities(host, sender);
+
+ if(!(common_caps & STREAM_CAP_INTERPOLATED))
+ // DATA WITH ML requires INTERPOLATED
+ common_caps &= ~STREAM_CAP_DATA_WITH_ML;
+
+ return common_caps;
}
int32_t stream_capabilities_to_vn(uint32_t caps) {
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index f97c8ddfb3..d9b2f062b8 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -43,6 +43,7 @@ typedef enum {
STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values
STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values
+ STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit
STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
// this must be signed int, so don't use the last bit
@@ -55,7 +56,7 @@ typedef enum {
#define STREAM_HAS_COMPRESSION 0
#endif // ENABLE_COMPRESSION
-STREAM_CAPABILITIES stream_our_capabilities();
+STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender);
#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability))
@@ -99,18 +100,6 @@ typedef enum {
// ----------------------------------------------------------------------------
-typedef enum __attribute__((packed)) {
- STREAM_TRAFFIC_TYPE_REPLICATION,
- STREAM_TRAFFIC_TYPE_FUNCTIONS,
- STREAM_TRAFFIC_TYPE_METADATA,
- STREAM_TRAFFIC_TYPE_DATA,
-
- // terminator
- STREAM_TRAFFIC_TYPE_MAX,
-} STREAM_TRAFFIC_TYPE;
-
-// ----------------------------------------------------------------------------
-
typedef struct {
char *os_name;
char *os_id;
@@ -145,9 +134,19 @@ struct decompressor_state {
#endif
// Thread-local storage
- // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
+// Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
-typedef enum {
+typedef enum __attribute__((packed)) {
+ STREAM_TRAFFIC_TYPE_REPLICATION = 0,
+ STREAM_TRAFFIC_TYPE_FUNCTIONS,
+ STREAM_TRAFFIC_TYPE_METADATA,
+ STREAM_TRAFFIC_TYPE_DATA,
+
+ // terminator
+ STREAM_TRAFFIC_TYPE_MAX,
+} STREAM_TRAFFIC_TYPE;
+
+typedef enum __attribute__((packed)) {
SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
} SENDER_FLAGS;
@@ -158,7 +157,7 @@ struct sender_state {
SENDER_FLAGS flags;
int timeout;
int default_port;
- usec_t reconnect_delay;
+ uint32_t reconnect_delay;
char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
size_t begin;
size_t reconnects_counter;
@@ -242,6 +241,31 @@ struct sender_state {
#define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
#define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication.atomic.pending_requests), 0, __ATOMIC_RELAXED)
+/*
+typedef enum {
+ STREAM_NODE_INSTANCE_FEATURE_CLOUD_ONLINE = (1 << 0),
+ STREAM_NODE_INSTANCE_FEATURE_VIRTUAL_HOST = (1 << 1),
+ STREAM_NODE_INSTANCE_FEATURE_HEALTH_ENABLED = (1 << 2),
+ STREAM_NODE_INSTANCE_FEATURE_ML_SELF = (1 << 3),
+ STREAM_NODE_INSTANCE_FEATURE_ML_RECEIVED = (1 << 4),
+ STREAM_NODE_INSTANCE_FEATURE_SSL = (1 << 5),
+} STREAM_NODE_INSTANCE_FEATURES;
+
+typedef struct stream_node_instance {
+ uuid_t uuid;
+ STRING *agent;
+ STREAM_NODE_INSTANCE_FEATURES features;
+ uint32_t hops;
+
+ // receiver information on that agent
+ int32_t capabilities;
+ uint32_t local_port;
+ uint32_t remote_port;
+ STRING *local_ip;
+ STRING *remote_ip;
+} STREAM_NODE_INSTANCE;
+*/
+
struct receiver_state {
RRDHOST *host;
pid_t tid;
@@ -298,6 +322,13 @@ struct receiver_state {
#endif
time_t replication_first_time_t;
+
+/*
+ struct {
+ uint32_t count;
+ STREAM_NODE_INSTANCE *array;
+ } instances;
+*/
};
struct rrdpush_destinations {
@@ -352,7 +383,8 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_
bool rrdset_push_chart_definition_now(RRDSET *st);
void *rrdpush_sender_thread(void *ptr);
void rrdpush_send_host_labels(RRDHOST *host);
-void rrdpush_claimed_id(RRDHOST *host);
+void rrdpush_send_claimed_id(RRDHOST *host);
+void rrdpush_send_global_functions(RRDHOST *host);
#define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
@@ -383,7 +415,7 @@ void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, con
void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status);
void log_receiver_capabilities(struct receiver_state *rpt);
void log_sender_capabilities(struct sender_state *s);
-STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version);
+STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender);
int32_t stream_capabilities_to_vn(uint32_t caps);
void receiver_state_free(struct receiver_state *rpt);
@@ -391,9 +423,248 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason);
void sender_thread_buffer_free(void);
-void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused);
-void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused);
-
#include "replication.h"
+typedef enum __attribute__((packed)) {
+ RRDHOST_DB_STATUS_INITIALIZING = 0,
+ RRDHOST_DB_STATUS_QUERYABLE,
+} RRDHOST_DB_STATUS;
+
+static inline const char *rrdhost_db_status_to_string(RRDHOST_DB_STATUS status) {
+ switch(status) {
+ default:
+ case RRDHOST_DB_STATUS_INITIALIZING:
+ return "initializing";
+
+ case RRDHOST_DB_STATUS_QUERYABLE:
+ return "online";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_DB_LIVENESS_STALE = 0,
+ RRDHOST_DB_LIVENESS_LIVE,
+} RRDHOST_DB_LIVENESS;
+
+static inline const char *rrdhost_db_liveness_to_string(RRDHOST_DB_LIVENESS status) {
+ switch(status) {
+ default:
+ case RRDHOST_DB_LIVENESS_STALE:
+ return "stale";
+
+ case RRDHOST_DB_LIVENESS_LIVE:
+ return "live";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_INGEST_STATUS_ARCHIVED = 0,
+ RRDHOST_INGEST_STATUS_INITIALIZING,
+ RRDHOST_INGEST_STATUS_REPLICATING,
+ RRDHOST_INGEST_STATUS_ONLINE,
+ RRDHOST_INGEST_STATUS_OFFLINE,
+} RRDHOST_INGEST_STATUS;
+
+static inline const char *rrdhost_ingest_status_to_string(RRDHOST_INGEST_STATUS status) {
+ switch(status) {
+ case RRDHOST_INGEST_STATUS_ARCHIVED:
+ return "archived";
+
+ case RRDHOST_INGEST_STATUS_INITIALIZING:
+ return "initializing";
+
+ case RRDHOST_INGEST_STATUS_REPLICATING:
+ return "replicating";
+
+ case RRDHOST_INGEST_STATUS_ONLINE:
+ return "online";
+
+ default:
+ case RRDHOST_INGEST_STATUS_OFFLINE:
+ return "offline";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_INGEST_TYPE_LOCALHOST = 0,
+ RRDHOST_INGEST_TYPE_VIRTUAL,
+ RRDHOST_INGEST_TYPE_CHILD,
+ RRDHOST_INGEST_TYPE_ARCHIVED,
+} RRDHOST_INGEST_TYPE;
+
+static inline const char *rrdhost_ingest_type_to_string(RRDHOST_INGEST_TYPE type) {
+ switch(type) {
+ case RRDHOST_INGEST_TYPE_LOCALHOST:
+ return "localhost";
+
+ case RRDHOST_INGEST_TYPE_VIRTUAL:
+ return "virtual";
+
+ case RRDHOST_INGEST_TYPE_CHILD:
+ return "child";
+
+ default:
+ case RRDHOST_INGEST_TYPE_ARCHIVED:
+ return "archived";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_STREAM_STATUS_DISABLED = 0,
+ RRDHOST_STREAM_STATUS_REPLICATING,
+ RRDHOST_STREAM_STATUS_ONLINE,
+ RRDHOST_STREAM_STATUS_OFFLINE,
+} RRDHOST_STREAMING_STATUS;
+
+static inline const char *rrdhost_streaming_status_to_string(RRDHOST_STREAMING_STATUS status) {
+ switch(status) {
+ case RRDHOST_STREAM_STATUS_DISABLED:
+ return "disabled";
+
+ case RRDHOST_STREAM_STATUS_REPLICATING:
+ return "replicating";
+
+ case RRDHOST_STREAM_STATUS_ONLINE:
+ return "online";
+
+ default:
+ case RRDHOST_STREAM_STATUS_OFFLINE:
+ return "offline";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_ML_STATUS_DISABLED = 0,
+ RRDHOST_ML_STATUS_OFFLINE,
+ RRDHOST_ML_STATUS_RUNNING,
+} RRDHOST_ML_STATUS;
+
+static inline const char *rrdhost_ml_status_to_string(RRDHOST_ML_STATUS status) {
+ switch(status) {
+ case RRDHOST_ML_STATUS_RUNNING:
+ return "online";
+
+ case RRDHOST_ML_STATUS_OFFLINE:
+ return "offline";
+
+ default:
+ case RRDHOST_ML_STATUS_DISABLED:
+ return "disabled";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_ML_TYPE_DISABLED = 0,
+ RRDHOST_ML_TYPE_SELF,
+ RRDHOST_ML_TYPE_RECEIVED,
+} RRDHOST_ML_TYPE;
+
+static inline const char *rrdhost_ml_type_to_string(RRDHOST_ML_TYPE type) {
+ switch(type) {
+ case RRDHOST_ML_TYPE_SELF:
+ return "self";
+
+ case RRDHOST_ML_TYPE_RECEIVED:
+ return "received";
+
+ default:
+ case RRDHOST_ML_TYPE_DISABLED:
+ return "disabled";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_HEALTH_STATUS_DISABLED = 0,
+ RRDHOST_HEALTH_STATUS_INITIALIZING,
+ RRDHOST_HEALTH_STATUS_RUNNING,
+} RRDHOST_HEALTH_STATUS;
+
+static inline const char *rrdhost_health_status_to_string(RRDHOST_HEALTH_STATUS status) {
+ switch(status) {
+ default:
+ case RRDHOST_HEALTH_STATUS_DISABLED:
+ return "disabled";
+
+ case RRDHOST_HEALTH_STATUS_INITIALIZING:
+ return "initializing";
+
+ case RRDHOST_HEALTH_STATUS_RUNNING:
+ return "online";
+ }
+}
+
+typedef struct rrdhost_status {
+ RRDHOST *host;
+ time_t now;
+
+ struct {
+ RRDHOST_DB_STATUS status;
+ RRDHOST_DB_LIVENESS liveness;
+ RRD_MEMORY_MODE mode;
+ time_t first_time_s;
+ time_t last_time_s;
+ size_t metrics;
+ size_t instances;
+ size_t contexts;
+ } db;
+
+ struct {
+ RRDHOST_ML_STATUS status;
+ RRDHOST_ML_TYPE type;
+ struct ml_metrics_statistics metrics;
+ } ml;
+
+ struct {
+ size_t hops;
+ RRDHOST_INGEST_TYPE type;
+ RRDHOST_INGEST_STATUS status;
+ SOCKET_PEERS peers;
+ bool ssl;
+ STREAM_CAPABILITIES capabilities;
+ uint32_t id;
+ time_t since;
+ const char *reason;
+
+ struct {
+ bool in_progress;
+ NETDATA_DOUBLE completion;
+ size_t instances;
+ } replication;
+ } ingest;
+
+ struct {
+ size_t hops;
+ RRDHOST_STREAMING_STATUS status;
+ SOCKET_PEERS peers;
+ bool ssl;
+ bool compression;
+ STREAM_CAPABILITIES capabilities;
+ uint32_t id;
+ time_t since;
+ const char *reason;
+
+ struct {
+ bool in_progress;
+ NETDATA_DOUBLE completion;
+ size_t instances;
+ } replication;
+
+ size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
+ } stream;
+
+ struct {
+ RRDHOST_HEALTH_STATUS status;
+ struct {
+ uint32_t undefined;
+ uint32_t uninitialized;
+ uint32_t clear;
+ uint32_t warning;
+ uint32_t critical;
+ } alerts;
+ } health;
+} RRDHOST_STATUS;
+
+void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s);
+bool rrdhost_state_cloud_emulation(RRDHOST *host);
+
#endif //NETDATA_RRDPUSH_H
diff --git a/streaming/sender.c b/streaming/sender.c
index c74c9b407c..3743889edb 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -490,18 +490,19 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
break;
}
}
- const char *error = stream_responses[i].error;
- int worker_job_id = stream_responses[i].worker_job_id;
- time_t delay = stream_responses[i].postpone_reconnect_seconds;
if(version >= STREAM_HANDSHAKE_OK_V1) {
host->destination->last_error = NULL;
host->destination->last_handshake = version;
- host->destination->postpone_reconnection_until = 0;
- s->capabilities = convert_stream_version_to_capabilities(version);
+ host->destination->postpone_reconnection_until = now_realtime_sec() + s->reconnect_delay;
+ s->capabilities = convert_stream_version_to_capabilities(version, host, true);
return true;
}
+ const char *error = stream_responses[i].error;
+ int worker_job_id = stream_responses[i].worker_job_id;
+ time_t delay = stream_responses[i].postpone_reconnect_seconds;
+
worker_is_busy(worker_job_id);
rrdpush_sender_thread_close_socket(host);
host->destination->last_error = error;
@@ -591,7 +592,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
// info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to);
// reset our capabilities to default
- s->capabilities = stream_our_capabilities();
+ s->capabilities = stream_our_capabilities(host, true);
#ifdef ENABLE_COMPRESSION
// If we don't want compression, remove it from our capabilities
@@ -1098,119 +1099,6 @@ void rrdpush_signal_sender_to_wake_up(struct sender_state *s) {
}
}
-static NETDATA_DOUBLE rrdhost_sender_replication_completion(RRDHOST *host, time_t now, size_t *instances) {
- size_t charts = rrdhost_sender_replicating_charts(host);
- NETDATA_DOUBLE completion;
- if(!charts || !host->sender || !host->sender->replication.oldest_request_after_t)
- completion = 100.0;
- else if(!host->sender->replication.latest_completed_before_t || host->sender->replication.latest_completed_before_t < host->sender->replication.oldest_request_after_t)
- completion = 0.0;
- else {
- time_t total = now - host->sender->replication.oldest_request_after_t;
- time_t current = host->sender->replication.latest_completed_before_t - host->sender->replication.oldest_request_after_t;
- completion = (NETDATA_DOUBLE) current * 100.0 / (NETDATA_DOUBLE) total;
- }
-
- *instances = charts;
-
- return completion;
-}
-
-void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused) {
- bool online = rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
- buffer_json_member_add_object(wb, key);
-
- if(host->sender)
- buffer_json_member_add_uint64(wb, "hops", host->sender->hops);
-
- buffer_json_member_add_boolean(wb, "online", online);
-
- if(host->sender && host->sender->last_state_since_t) {
- buffer_json_member_add_time_t(wb, "since", host->sender->last_state_since_t);
- buffer_json_member_add_time_t(wb, "age", now - host->sender->last_state_since_t);
- }
-
- if(!online && host->sender && host->sender->exit.reason)
- buffer_json_member_add_string(wb, "reason", host->sender->exit.reason);
-
- buffer_json_member_add_object(wb, "replication");
- {
- size_t instances;
- NETDATA_DOUBLE completion = rrdhost_sender_replication_completion(host, now, &instances);
- buffer_json_member_add_boolean(wb, "in_progress", instances);
- buffer_json_member_add_double(wb, "completion", completion);
- buffer_json_member_add_uint64(wb, "instances", instances);
- }
- buffer_json_object_close(wb);
-
- if(host->sender) {