summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.h
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-06-08 16:33:22 +0300
committerGitHub <noreply@github.com>2023-06-08 16:33:22 +0300
commit80d83b7bd1eca5872ed3ac5c34eb8bcb5fbd56e8 (patch)
tree8afd42a59cc6b114b26fdd3e38b137db99357154 /streaming/rrdpush.h
parent028e26a194f5421432b577ee67afed8b66c0f6b7 (diff)
api v2 nodes for streaming statuses (#15162)
* api v2 nodes for streaming statuses * remove test * move parts of the output * in api/v2/data return 5 values per point when aggregation=percentage and raw option is given; return final values when aggregation=percentage is not the final grouping
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r--streaming/rrdpush.h30
1 files changed, 28 insertions, 2 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index cb33694cda..f97c8ddfb3 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -72,6 +72,9 @@ STREAM_CAPABILITIES stream_our_capabilities();
#define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back"
#define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server"
#define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info."
+#define START_STREAMING_ERROR_BUSY_TRY_LATER "The server is too busy now to accept this request. Try later."
+#define START_STREAMING_ERROR_INTERNAL_ERROR "The server encountered an internal error. Try later."
+#define START_STREAMING_ERROR_INITIALIZATION "The server is initializing. Try later."
typedef enum {
STREAM_HANDSHAKE_OK_V5 = 5, // COMPRESSION
@@ -87,12 +90,27 @@ typedef enum {
STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT = -6,
STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE = -7,
STREAM_HANDSHAKE_ERROR_SSL_ERROR = -8,
- STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9
+ STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9,
+ STREAM_HANDSHAKE_BUSY_TRY_LATER = -10,
+ STREAM_HANDSHAKE_INTERNAL_ERROR = -11,
+ STREAM_HANDSHAKE_INITIALIZATION = -12,
} STREAM_HANDSHAKE;
// ----------------------------------------------------------------------------
+typedef enum __attribute__((packed)) {
+ STREAM_TRAFFIC_TYPE_REPLICATION,
+ STREAM_TRAFFIC_TYPE_FUNCTIONS,
+ STREAM_TRAFFIC_TYPE_METADATA,
+ STREAM_TRAFFIC_TYPE_DATA,
+
+ // terminator
+ STREAM_TRAFFIC_TYPE_MAX,
+} STREAM_TRAFFIC_TYPE;
+
+// ----------------------------------------------------------------------------
+
typedef struct {
char *os_name;
char *os_id;
@@ -148,6 +166,7 @@ struct sender_state {
size_t sent_bytes_on_this_connection;
size_t send_attempts;
time_t last_traffic_seen_t;
+ time_t last_state_since_t; // the timestamp of the last state (online/offline) change
size_t not_connected_loops;
// Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
// the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
@@ -157,6 +176,8 @@ struct sender_state {
int read_len;
STREAM_CAPABILITIES capabilities;
+ size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
+
int rrdpush_sender_pipe[2]; // collector to sender thread signaling
int rrdpush_sender_socket;
@@ -176,6 +197,8 @@ struct sender_state {
struct {
DICTIONARY *requests; // de-duplication of replication requests, per chart
+ time_t oldest_request_after_t; // the timestamp of the oldest replication request
+ time_t latest_completed_before_t; // the timestamp of the latest replication request
struct {
size_t pending_requests; // the currently outstanding replication requests
@@ -306,7 +329,7 @@ void rrdpush_destinations_init(RRDHOST *host);
void rrdpush_destinations_free(RRDHOST *host);
BUFFER *sender_start(struct sender_state *s);
-void sender_commit(struct sender_state *s, BUFFER *wb);
+void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type);
int rrdpush_init();
bool rrdpush_receiver_needs_dbengine();
int configured_as_parent();
@@ -368,6 +391,9 @@ bool stop_streaming_receiver(RRDHOST *host, const char *reason);
void sender_thread_buffer_free(void);
+void rrdhost_receiver_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused);
+void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused);
+
#include "replication.h"
#endif //NETDATA_RRDPUSH_H