summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.h
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-06-19 20:52:35 +0300
committerGitHub <noreply@github.com>2023-06-19 20:52:35 +0300
commit0b4f820e9d42d10f64c3305d9c084261bc9880cf (patch)
tree641fcb81e9c84e08fbe08ca80776c6b593b218ba /streaming/rrdpush.h
parent35884c7a8447fbeb699cae6a2a20dc0a2137c659 (diff)
/api/v2/nodes and streaming function (#15168)
* dummy streaming function * expose global functions upstream * separate function for pushing global functions * add missing conditions * allow streaming function to run async * started internal API for functions * cache host retention and expose it to /api/v2/nodes * internal API for function table fields; more progress on streaming status * abstracted and unified rrdhost status * port old coverity warning fix - although it is not needed * add ML information to rrdhost status * add ML capability to streaming to signal the transmission of ML information; added ML information to host status * protect host->receiver * count metrics and instances per host * exposed all inbound and outbound streaming * fix for ML status and dependency of DATA_WITH_ML to INTERPOLATED, not IEEE754 * update ML dummy * added all fields * added streaming group by and cleaned up accepted values by cloud * removed type * Revert "removed type" This reverts commit faae4177e603d4f85b7433f33f92ef3ccd23976e. * added context to db summary * new /api/v2/nodes schema * added ML type * change default function charts * log to trace new capa * add more debug * removed debugging code * retry on receive interrupted read; respect sender reconnect delay in all cases * set disconnected host flag and manipulate localhost child count atomically, inside set/clear receiver * fix infinite loop * send_to_plugin() now has a spinlock to ensure that only 1 thread is writing to the plugin/child at the same time * global cloud_status() call * cloud should be a section, since it will contain error information * put cloud capabilities into cloud * aclk status in /api/v2 agents sections * keep aclk_connection_counter * updates on /api/v2/nodes * final /api/v2/nodes and addition of /api/v2/nodes_instances * parametrize all /api/v2/xxx output to control which info is outputed per endpoint * always accept nodes selector * st needs to be per instance, not per node * fix merging of contexts; fix cups plugin priorities * add after and before parameters to /api/v2/contexts/nodes/nodes_instances/q * give each libuv worker a unique id * aclk http_api_v2 version 4
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r--streaming/rrdpush.h313
1 files changed, 292 insertions, 21 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index f97c8ddfb3..d9b2f062b8 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -43,6 +43,7 @@ typedef enum {
STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values
STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values
+ STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit
STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
// this must be signed int, so don't use the last bit
@@ -55,7 +56,7 @@ typedef enum {
#define STREAM_HAS_COMPRESSION 0
#endif // ENABLE_COMPRESSION
-STREAM_CAPABILITIES stream_our_capabilities();
+STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender);
#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability))
@@ -99,18 +100,6 @@ typedef enum {
// ----------------------------------------------------------------------------
-typedef enum __attribute__((packed)) {
- STREAM_TRAFFIC_TYPE_REPLICATION,
- STREAM_TRAFFIC_TYPE_FUNCTIONS,
- STREAM_TRAFFIC_TYPE_METADATA,
- STREAM_TRAFFIC_TYPE_DATA,
-
- // terminator
- STREAM_TRAFFIC_TYPE_MAX,
-} STREAM_TRAFFIC_TYPE;
-
-// ----------------------------------------------------------------------------
-
typedef struct {
char *os_name;
char *os_id;
@@ -145,9 +134,19 @@ struct decompressor_state {
#endif
// Thread-local storage
- // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
+// Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
-typedef enum {
+typedef enum __attribute__((packed)) {
+ STREAM_TRAFFIC_TYPE_REPLICATION = 0,
+ STREAM_TRAFFIC_TYPE_FUNCTIONS,
+ STREAM_TRAFFIC_TYPE_METADATA,
+ STREAM_TRAFFIC_TYPE_DATA,
+
+ // terminator
+ STREAM_TRAFFIC_TYPE_MAX,
+} STREAM_TRAFFIC_TYPE;
+
+typedef enum __attribute__((packed)) {
SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
} SENDER_FLAGS;
@@ -158,7 +157,7 @@ struct sender_state {
SENDER_FLAGS flags;
int timeout;
int default_port;
- usec_t reconnect_delay;
+ uint32_t reconnect_delay;
char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
size_t begin;
size_t reconnects_counter;
@@ -242,6 +241,31 @@ struct sender_state {
#define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
#define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication.atomic.pending_requests), 0, __ATOMIC_RELAXED)
+/*
+typedef enum {
+ STREAM_NODE_INSTANCE_FEATURE_CLOUD_ONLINE = (1 << 0),
+ STREAM_NODE_INSTANCE_FEATURE_VIRTUAL_HOST = (1 << 1),
+ STREAM_NODE_INSTANCE_FEATURE_HEALTH_ENABLED = (1 << 2),
+ STREAM_NODE_INSTANCE_FEATURE_ML_SELF = (1 << 3),
+ STREAM_NODE_INSTANCE_FEATURE_ML_RECEIVED = (1 << 4),
+ STREAM_NODE_INSTANCE_FEATURE_SSL = (1 << 5),
+} STREAM_NODE_INSTANCE_FEATURES;
+
+typedef struct stream_node_instance {
+ uuid_t uuid;
+ STRING *agent;
+ STREAM_NODE_INSTANCE_FEATURES features;
+ uint32_t hops;
+
+ // receiver information on that agent
+ int32_t capabilities;
+ uint32_t local_port;
+ uint32_t remote_port;
+ STRING *local_ip;
+ STRING *remote_ip;
+} STREAM_NODE_INSTANCE;
+*/
+
struct receiver_state {
RRDHOST *host;
pid_t tid;
@@ -298,6 +322,13 @@ struct receiver_state {
#endif
time_t replication_first_time_t;
+
+/*
+ struct {
+ uint32_t count;
+ STREAM_NODE_INSTANCE *array;
+ } instances;
+*/
};
struct rrdpush_destinations {
@@ -352,7 +383,8 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_
bool rrdset_push_chart_definition_now(RRDSET *st);
void *rrdpush_sender_thread(void *ptr);
void rrdpush_send_host_labels(RRDHOST *host);
-void rrdpush_claimed_id(RRDHOST *host);
+void rrdpush_send_claimed_id(RRDHOST *host);
+void rrdpush_send_global_functions(RRDHOST *host);
#define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
@@ -383,7 +415,7 @@ void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, con
void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status);
void log_receiver_capabilities(struct receiver_state *rpt);
void log_sender_capabilities(struct sender_state *s);
-STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version);
+STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender);
int32_t stream_capabilities_to_vn(uint32_t caps);
void receiver_state_free(struct receiver_state *rpt);
@@ -391,9 +423,248 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason);
void sender_thread_buffer_free(void);
-void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused);
-void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused);
-
#include "replication.h"
+typedef enum __attribute__((packed)) {
+ RRDHOST_DB_STATUS_INITIALIZING = 0,
+ RRDHOST_DB_STATUS_QUERYABLE,
+} RRDHOST_DB_STATUS;
+
+static inline const char *rrdhost_db_status_to_string(RRDHOST_DB_STATUS status) {
+ switch(status) {
+ default:
+ case RRDHOST_DB_STATUS_INITIALIZING:
+ return "initializing";
+
+ case RRDHOST_DB_STATUS_QUERYABLE:
+ return "online";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_DB_LIVENESS_STALE = 0,
+ RRDHOST_DB_LIVENESS_LIVE,
+} RRDHOST_DB_LIVENESS;
+
+static inline const char *rrdhost_db_liveness_to_string(RRDHOST_DB_LIVENESS status) {
+ switch(status) {
+ default:
+ case RRDHOST_DB_LIVENESS_STALE:
+ return "stale";
+
+ case RRDHOST_DB_LIVENESS_LIVE:
+ return "live";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_INGEST_STATUS_ARCHIVED = 0,
+ RRDHOST_INGEST_STATUS_INITIALIZING,
+ RRDHOST_INGEST_STATUS_REPLICATING,
+ RRDHOST_INGEST_STATUS_ONLINE,
+ RRDHOST_INGEST_STATUS_OFFLINE,
+} RRDHOST_INGEST_STATUS;
+
+static inline const char *rrdhost_ingest_status_to_string(RRDHOST_INGEST_STATUS status) {
+ switch(status) {
+ case RRDHOST_INGEST_STATUS_ARCHIVED:
+ return "archived";
+
+ case RRDHOST_INGEST_STATUS_INITIALIZING:
+ return "initializing";
+
+ case RRDHOST_INGEST_STATUS_REPLICATING:
+ return "replicating";
+
+ case RRDHOST_INGEST_STATUS_ONLINE:
+ return "online";
+
+ default:
+ case RRDHOST_INGEST_STATUS_OFFLINE:
+ return "offline";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_INGEST_TYPE_LOCALHOST = 0,
+ RRDHOST_INGEST_TYPE_VIRTUAL,
+ RRDHOST_INGEST_TYPE_CHILD,
+ RRDHOST_INGEST_TYPE_ARCHIVED,
+} RRDHOST_INGEST_TYPE;
+
+static inline const char *rrdhost_ingest_type_to_string(RRDHOST_INGEST_TYPE type) {
+ switch(type) {
+ case RRDHOST_INGEST_TYPE_LOCALHOST:
+ return "localhost";
+
+ case RRDHOST_INGEST_TYPE_VIRTUAL:
+ return "virtual";
+
+ case RRDHOST_INGEST_TYPE_CHILD:
+ return "child";
+
+ default:
+ case RRDHOST_INGEST_TYPE_ARCHIVED:
+ return "archived";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_STREAM_STATUS_DISABLED = 0,
+ RRDHOST_STREAM_STATUS_REPLICATING,
+ RRDHOST_STREAM_STATUS_ONLINE,
+ RRDHOST_STREAM_STATUS_OFFLINE,
+} RRDHOST_STREAMING_STATUS;
+
+static inline const char *rrdhost_streaming_status_to_string(RRDHOST_STREAMING_STATUS status) {
+ switch(status) {
+ case RRDHOST_STREAM_STATUS_DISABLED:
+ return "disabled";
+
+ case RRDHOST_STREAM_STATUS_REPLICATING:
+ return "replicating";
+
+ case RRDHOST_STREAM_STATUS_ONLINE:
+ return "online";
+
+ default:
+ case RRDHOST_STREAM_STATUS_OFFLINE:
+ return "offline";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_ML_STATUS_DISABLED = 0,
+ RRDHOST_ML_STATUS_OFFLINE,
+ RRDHOST_ML_STATUS_RUNNING,
+} RRDHOST_ML_STATUS;
+
+static inline const char *rrdhost_ml_status_to_string(RRDHOST_ML_STATUS status) {
+ switch(status) {
+ case RRDHOST_ML_STATUS_RUNNING:
+ return "online";
+
+ case RRDHOST_ML_STATUS_OFFLINE:
+ return "offline";
+
+ default:
+ case RRDHOST_ML_STATUS_DISABLED:
+ return "disabled";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_ML_TYPE_DISABLED = 0,
+ RRDHOST_ML_TYPE_SELF,
+ RRDHOST_ML_TYPE_RECEIVED,
+} RRDHOST_ML_TYPE;
+
+static inline const char *rrdhost_ml_type_to_string(RRDHOST_ML_TYPE type) {
+ switch(type) {
+ case RRDHOST_ML_TYPE_SELF:
+ return "self";
+
+ case RRDHOST_ML_TYPE_RECEIVED:
+ return "received";
+
+ default:
+ case RRDHOST_ML_TYPE_DISABLED:
+ return "disabled";
+ }
+}
+
+typedef enum __attribute__((packed)) {
+ RRDHOST_HEALTH_STATUS_DISABLED = 0,
+ RRDHOST_HEALTH_STATUS_INITIALIZING,
+ RRDHOST_HEALTH_STATUS_RUNNING,
+} RRDHOST_HEALTH_STATUS;
+
+static inline const char *rrdhost_health_status_to_string(RRDHOST_HEALTH_STATUS status) {
+ switch(status) {
+ default:
+ case RRDHOST_HEALTH_STATUS_DISABLED:
+ return "disabled";
+
+ case RRDHOST_HEALTH_STATUS_INITIALIZING:
+ return "initializing";
+
+ case RRDHOST_HEALTH_STATUS_RUNNING:
+ return "online";
+ }
+}
+
+typedef struct rrdhost_status {
+ RRDHOST *host;
+ time_t now;
+
+ struct {
+ RRDHOST_DB_STATUS status;
+ RRDHOST_DB_LIVENESS liveness;
+ RRD_MEMORY_MODE mode;
+ time_t first_time_s;
+ time_t last_time_s;
+ size_t metrics;
+ size_t instances;
+ size_t contexts;
+ } db;
+
+ struct {
+ RRDHOST_ML_STATUS status;
+ RRDHOST_ML_TYPE type;
+ struct ml_metrics_statistics metrics;
+ } ml;
+
+ struct {
+ size_t hops;
+ RRDHOST_INGEST_TYPE type;
+ RRDHOST_INGEST_STATUS status;
+ SOCKET_PEERS peers;
+ bool ssl;
+ STREAM_CAPABILITIES capabilities;
+ uint32_t id;
+ time_t since;
+ const char *reason;
+
+ struct {
+ bool in_progress;
+ NETDATA_DOUBLE completion;
+ size_t instances;
+ } replication;
+ } ingest;
+
+ struct {
+ size_t hops;
+ RRDHOST_STREAMING_STATUS status;
+ SOCKET_PEERS peers;
+ bool ssl;
+ bool compression;
+ STREAM_CAPABILITIES capabilities;
+ uint32_t id;
+ time_t since;
+ const char *reason;
+
+ struct {
+ bool in_progress;
+ NETDATA_DOUBLE completion;
+ size_t instances;
+ } replication;
+
+ size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
+ } stream;
+
+ struct {
+ RRDHOST_HEALTH_STATUS status;
+ struct {
+ uint32_t undefined;
+ uint32_t uninitialized;
+ uint32_t clear;
+ uint32_t warning;
+ uint32_t critical;
+ } alerts;
+ } health;
+} RRDHOST_STATUS;
+
+void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s);
+bool rrdhost_state_cloud_emulation(RRDHOST *host);
+
#endif //NETDATA_RRDPUSH_H