summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.h
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-10-05 14:13:46 +0300
committerGitHub <noreply@github.com>2022-10-05 14:13:46 +0300
commit8fc3b351a2e7fc96eced8f924de2e9cec9842128 (patch)
treebde41c66573ccaf8876c280e00742cc6096b587c /streaming/rrdpush.h
parent6850878e697d66dc90b9af1e750b22238c63c292 (diff)
Allow netdata plugins to expose functions for querying more information about specific charts (#13720)
* function renames and code cleanup in popen.c; no actual code changes * netdata popen() now opens both child process stdin and stdout and returns FILE * for both * pass both input and output to parser structures * updated rrdset to call custom functions * RRDSET FUNCTION leading calls for both sync and async operation * put RRDSET functions to a separate file * added format and timeout at function definition * support for synchronous (internal plugins) and asynchronous (external plugins and children) functions * /api/v1/function endpoint * functions are now attached to the host and there is a dictionary view per chart * functions implemented at plugins.d * remove the defer until keyword hook from plugins.d when it is done * stream sender implementation of functions * sanitization of all functions so that certain characters are only allowed * strictier sanitization * common max size * 1st working plugins.d example * always init inflight dictionary * properly destroy dictionaries to avoid parallel insertion of items * add more debugging on disconnection reasons * add more debugging on disconnection reasons again * streaming receiver respects newlines * dont use the same fp for both streaming receive and send * dont free dbengine memory with internal checks * make sender proceed in the buffer * added timing info and garbage collection at plugins.d * added info about routing nodes * added info about routing nodes with delay * added more info about delays * added more info about delays again * signal sending thread to wake up * streaming version labeling and commented code to support capabilities * added functions to /api/v1/data, /api/v1/charts, /api/v1/chart, /api/v1/info * redirect top output to stdout * address coverity findings * fix resource leaks of popen * log attempts to connect to individual destinations * better messages * properly parse destinations * try to find a function from the most matching to the least matching * log added streaming destinations * rotate destinations bypassing a node in the middle that does not accept our connection * break the loops properly * use typedef to define callbacks * capabilities negotiation during streaming * functions exposed upstream based on capabilities; compression disabled per node persisting reconnects; always try to connect with all capabilities * restore functionality to lookup functions * better logging of capabilities * remove old versions from capabilities when a newer version is there * fix formatting * optimization for plugins.d rrdlabels to avoid creating and destructing dictionaries all the time * delayed health initialization for rrddim and rrdset * cleanup health initialization * fix for popen() not returning the right value * add health worker jobs for initializing rrdset and rrddim * added content type support for functions; apps.plugin permanent function to display all the processes * fixes for functions parameters parsing in apps.plugin * fix for process matching in apps.plugiin * first working function for apps.plugin * Dashboard ACL is disabled for functions; Function errors are all in JSON format * apps.plugin function processes returns json table * use json_escape_string() to escape message * fix formatting * apps.plugin exposes all its metrics to function processes * fix json formatting when filtering out some rows * reopen the internal pipe of rrdpush in case of errors * misplaced statement * do not use buffer->len * support for GLOBAL functions (functions that are not linked to a chart * added /api/v1/functions endpoint; removed format from the FUNCTIONS api; * swagger documentation about the new api end points * added plugins.d documentation about functions * never re-close a file * remove uncessesary ifdef * fixed issues identified by codacy * fix for null label value * make edit-config copy-and-paste friendly * Revert "make edit-config copy-and-paste friendly" This reverts commit 54500c0e0a97f65a0c66c4d34e966f6a9056698e. * reworked sender handshake to fix coverity findings * timeout is zero, for both send_timeout() and recv_timeout() * properly detect that parent closed the socket * support caching of function responses; limit function response to 10MB; added protection from malformed function responses * disabled excessive logging * added units to apps.plugin function processes and normalized all values to be human readable * shorter field names * fixed issues reported * fixed apps.plugin error response; tested that pluginsd can properly handle faulty responses * use double linked list macros for double linked list management * faster apps.plugin function printing by minimizing file operations * added memory percentage * fix compatibility issues with older compilers and FreeBSD * rrdpush sender code cleanup; rrhost structure cleanup from sender flags and variables; * fix letftover variable in ifdef * apps.plugin: do not call detach from the thread; exit immediately when input is broken * exclude AR charts from health * flush cleaner; prefer sender output * clarity * do not fill the cbuffer if not connected * fix * dont enabled host->sender if streaming is not enabled; send host label updates to parent; * functions are only available through ACLK * Prepared statement reports only in dev mode * fix AR chart detection * fix for streaming not being enabling itself * more cleanup of sender and receiver structures * moved read-only flags and configuration options to rrdhost->options * fixed merge with master * fix for incomplete rename * prevent service thread from working on charts that are being collected Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r--streaming/rrdpush.h134
1 files changed, 101 insertions, 33 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index a6ff5ef030..e6566d25c9 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -10,32 +10,79 @@
#define CONNECTED_TO_SIZE 100
-#define STREAM_VERSION_CLAIM 3
-#define STREAM_VERSION_CLABELS 4
-#define STREAM_VERSION_COMPRESSION 5
-#define VERSION_GAP_FILLING 6
+// ----------------------------------------------------------------------------
+// obsolete versions - do not use anymore
+
+#define STREAM_OLD_VERSION_CLAIM 3
+#define STREAM_OLD_VERSION_CLABELS 4
+#define STREAM_OLD_VERSION_COMPRESSION 5 // this is production
+
+// ----------------------------------------------------------------------------
+// capabilities negotiation
+
+typedef enum {
+ // do not use the first 3 bits
+ STREAM_CAP_V1 = (1 << 3), // v1 = the oldest protocol
+ STREAM_CAP_V2 = (1 << 4), // v2 = the second version of the protocol (with host labels)
+ STREAM_CAP_VN = (1 << 5), // version negotiation supported (for versions 3, 4, 5 of the protocol)
+ // v3 = claiming supported
+ // v4 = chart labels supported
+ // v5 = lz4 compression supported
+ STREAM_CAP_VCAPS = (1 << 6), // capabilities negotiation supported
+ STREAM_CAP_HLABELS = (1 << 7), // host labels supported
+ STREAM_CAP_CLAIM = (1 << 8), // claiming supported
+ STREAM_CAP_CLABELS = (1 << 9), // chart labels supported
+ STREAM_CAP_COMPRESSION = (1 << 10), // lz4 compression supported
+ STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported
+ STREAM_CAP_GAP_FILLING = (1 << 12), // gap filling supported
+
+ // this must be signed int, so don't use the last bit
+ // needed for negotiating errors between parent and child
+} STREAM_CAPABILITIES;
#ifdef ENABLE_COMPRESSION
-#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_COMPRESSION)
+#define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION
#else
-#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_CLABELS)
+#define STREAM_HAS_COMPRESSION 0
#endif //ENABLE_COMPRESSION
+#define STREAM_OUR_CAPABILITIES (STREAM_CAP_V1 | STREAM_CAP_V2 | STREAM_CAP_VN | STREAM_CAP_VCAPS | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_HAS_COMPRESSION | STREAM_CAP_FUNCTIONS)
+
+#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)))
+
+// ----------------------------------------------------------------------------
+// stream handshake
+
+#define HTTP_HEADER_SIZE 8192
+
#define STREAMING_PROTOCOL_VERSION "1.1"
-#define START_STREAMING_PROMPT "Hit me baby, push them over..."
-#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
+#define START_STREAMING_PROMPT_V1 "Hit me baby, push them over..."
+#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
#define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version="
#define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back"
#define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server"
#define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info."
-#define HTTP_HEADER_SIZE 8192
-
typedef enum {
- RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW,
- RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW
-} RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY;
+ STREAM_HANDSHAKE_OK_V5 = 5, // COMPRESSION
+ STREAM_HANDSHAKE_OK_V4 = 4, // CLABELS
+ STREAM_HANDSHAKE_OK_V3 = 3, // CLAIM
+ STREAM_HANDSHAKE_OK_V2 = 2, // HLABELS
+ STREAM_HANDSHAKE_OK_V1 = 1,
+ STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE = -1,
+ STREAM_HANDSHAKE_ERROR_LOCALHOST = -2,
+ STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED = -3,
+ STREAM_HANDSHAKE_ERROR_DENIED = -4,
+ STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT = -5,
+ STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT = -6,
+ STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE = -7,
+ STREAM_HANDSHAKE_ERROR_SSL_ERROR = -8,
+ STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9
+} STREAM_HANDSHAKE;
+
+
+// ----------------------------------------------------------------------------
typedef struct {
char *os_name;
@@ -46,7 +93,6 @@ typedef struct {
} stream_encoded_t;
#ifdef ENABLE_COMPRESSION
-#define LZ4_MAX_MSG_SIZE 0x4000
struct compressor_state {
char *compression_result_buffer;
size_t compression_result_buffer_size;
@@ -81,11 +127,17 @@ struct decompressor_state {
// Thread-local storage
// Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
+typedef enum {
+ SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
+ SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
+} SENDER_FLAGS;
+
struct sender_state {
RRDHOST *host;
- pid_t task_id;
- unsigned int overflow:1;
- int timeout, default_port;
+ pid_t tid; // the thread id of the sender, from gettid()
+ SENDER_FLAGS flags;
+ int timeout;
+ int default_port;
usec_t reconnect_delay;
char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
size_t begin;
@@ -99,14 +151,19 @@ struct sender_state {
// the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
netdata_mutex_t mutex;
struct circular_buffer *buffer;
- BUFFER *build;
- char read_buffer[512];
+ char read_buffer[PLUGINSD_LINE_MAX + 1];
int read_len;
- int32_t version;
+ STREAM_CAPABILITIES capabilities;
+
+ int rrdpush_sender_pipe[2]; // collector to sender thread signaling
+ int rrdpush_sender_socket;
+
#ifdef ENABLE_COMPRESSION
- unsigned int rrdpush_compression;
struct compressor_state *compressor;
#endif
+#ifdef ENABLE_HTTPS
+ struct netdata_ssl ssl; // Structure used to encrypt the connection
+#endif
};
struct receiver_state {
@@ -128,9 +185,9 @@ struct receiver_state {
char *program_version;
struct rrdhost_system_info *system_info;
int update_every;
- uint32_t stream_version;
+ STREAM_CAPABILITIES capabilities;
time_t last_msg_t;
- char read_buffer[1024]; // Need to allow RRD_ID_LENGTH_MAX * 4 + the other fields
+ char read_buffer[PLUGINSD_LINE_MAX + 1];
int read_len;
unsigned int shutdown:1; // Tell the thread to exit
unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!)
@@ -144,11 +201,13 @@ struct receiver_state {
};
struct rrdpush_destinations {
- char destination[CONNECTED_TO_SIZE + 1];
- int disabled_no_proper_reply;
- int disabled_because_of_localhost;
- time_t disabled_already_streaming;
- int disabled_because_of_denied_access;
+ STRING *destination;
+
+ const char *last_error;
+ time_t postpone_reconnection_until;
+ STREAM_HANDSHAKE last_handshake;
+
+ struct rrdpush_destinations *prev;
struct rrdpush_destinations *next;
};
@@ -161,10 +220,12 @@ extern char *default_rrdpush_api_key;
extern char *default_rrdpush_send_charts_matching;
extern unsigned int remote_clock_resync_iterations;
+extern void rrdpush_destinations_init(RRDHOST *host);
+extern void rrdpush_destinations_free(RRDHOST *host);
+
extern void sender_init(RRDHOST *parent);
-extern struct rrdpush_destinations *destinations_init(const char *destinations);
-void sender_start(struct sender_state *s);
-void sender_commit(struct sender_state *s);
+BUFFER *sender_start(struct sender_state *s);
+void sender_commit(struct sender_state *s, BUFFER *wb);
void sender_cancel(struct sender_state *s);
extern int rrdpush_init();
extern int configured_as_parent();
@@ -172,7 +233,7 @@ extern void rrdset_done_push(RRDSET *st);
extern bool rrdset_push_chart_definition_now(RRDSET *st);
extern bool rrdpush_incremental_transmission_of_chart_definitions(RRDHOST *host, DICTFE *dictfe, bool restart, bool stop);
extern void *rrdpush_sender_thread(void *ptr);
-extern void rrdpush_send_labels(RRDHOST *host);
+extern void rrdpush_send_host_labels(RRDHOST *host);
extern void rrdpush_claimed_id(RRDHOST *host);
extern int rrdpush_receiver_thread_spawn(struct web_client *w, char *url);
@@ -181,7 +242,7 @@ extern void rrdpush_sender_thread_stop(RRDHOST *host);
extern void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);
extern void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
extern int connect_to_one_of_destinations(
- struct rrdpush_destinations *destinations,
+ RRDHOST *host,
int default_port,
struct timeval *timeout,
size_t *reconnects_counter,
@@ -189,10 +250,17 @@ extern int connect_to_one_of_destinations(
size_t connected_to_size,
struct rrdpush_destinations **destination);
+extern void rrdpush_signal_sender_to_wake_up(struct sender_state *s);
+
#ifdef ENABLE_COMPRESSION
struct compressor_state *create_compressor();
struct decompressor_state *create_decompressor();
size_t is_compressed_data(const char *data, size_t data_size);
#endif
+extern void log_receiver_capabilities(struct receiver_state *rpt);
+extern void log_sender_capabilities(struct sender_state *s);
+extern STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version);
+extern int32_t stream_capabilities_to_vn(uint32_t caps);
+
#endif //NETDATA_RRDPUSH_H