diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-10-05 14:13:46 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-05 14:13:46 +0300 |
commit | 8fc3b351a2e7fc96eced8f924de2e9cec9842128 (patch) | |
tree | bde41c66573ccaf8876c280e00742cc6096b587c /streaming/rrdpush.h | |
parent | 6850878e697d66dc90b9af1e750b22238c63c292 (diff) |
Allow netdata plugins to expose functions for querying more information about specific charts (#13720)
* function renames and code cleanup in popen.c; no actual code changes
* netdata popen() now opens both child process stdin and stdout and returns FILE * for both
* pass both input and output to parser structures
* updated rrdset to call custom functions
* RRDSET FUNCTION leading calls for both sync and async operation
* put RRDSET functions to a separate file
* added format and timeout at function definition
* support for synchronous (internal plugins) and asynchronous (external plugins and children) functions
* /api/v1/function endpoint
* functions are now attached to the host and there is a dictionary view per chart
* functions implemented at plugins.d
* remove the defer until keyword hook from plugins.d when it is done
* stream sender implementation of functions
* sanitization of all functions so that certain characters are only allowed
* strictier sanitization
* common max size
* 1st working plugins.d example
* always init inflight dictionary
* properly destroy dictionaries to avoid parallel insertion of items
* add more debugging on disconnection reasons
* add more debugging on disconnection reasons again
* streaming receiver respects newlines
* dont use the same fp for both streaming receive and send
* dont free dbengine memory with internal checks
* make sender proceed in the buffer
* added timing info and garbage collection at plugins.d
* added info about routing nodes
* added info about routing nodes with delay
* added more info about delays
* added more info about delays again
* signal sending thread to wake up
* streaming version labeling and commented code to support capabilities
* added functions to /api/v1/data, /api/v1/charts, /api/v1/chart, /api/v1/info
* redirect top output to stdout
* address coverity findings
* fix resource leaks of popen
* log attempts to connect to individual destinations
* better messages
* properly parse destinations
* try to find a function from the most matching to the least matching
* log added streaming destinations
* rotate destinations bypassing a node in the middle that does not accept our connection
* break the loops properly
* use typedef to define callbacks
* capabilities negotiation during streaming
* functions exposed upstream based on capabilities; compression disabled per node persisting reconnects; always try to connect with all capabilities
* restore functionality to lookup functions
* better logging of capabilities
* remove old versions from capabilities when a newer version is there
* fix formatting
* optimization for plugins.d rrdlabels to avoid creating and destructing dictionaries all the time
* delayed health initialization for rrddim and rrdset
* cleanup health initialization
* fix for popen() not returning the right value
* add health worker jobs for initializing rrdset and rrddim
* added content type support for functions; apps.plugin permanent function to display all the processes
* fixes for functions parameters parsing in apps.plugin
* fix for process matching in apps.plugiin
* first working function for apps.plugin
* Dashboard ACL is disabled for functions; Function errors are all in JSON format
* apps.plugin function processes returns json table
* use json_escape_string() to escape message
* fix formatting
* apps.plugin exposes all its metrics to function processes
* fix json formatting when filtering out some rows
* reopen the internal pipe of rrdpush in case of errors
* misplaced statement
* do not use buffer->len
* support for GLOBAL functions (functions that are not linked to a chart
* added /api/v1/functions endpoint; removed format from the FUNCTIONS api;
* swagger documentation about the new api end points
* added plugins.d documentation about functions
* never re-close a file
* remove uncessesary ifdef
* fixed issues identified by codacy
* fix for null label value
* make edit-config copy-and-paste friendly
* Revert "make edit-config copy-and-paste friendly"
This reverts commit 54500c0e0a97f65a0c66c4d34e966f6a9056698e.
* reworked sender handshake to fix coverity findings
* timeout is zero, for both send_timeout() and recv_timeout()
* properly detect that parent closed the socket
* support caching of function responses; limit function response to 10MB; added protection from malformed function responses
* disabled excessive logging
* added units to apps.plugin function processes and normalized all values to be human readable
* shorter field names
* fixed issues reported
* fixed apps.plugin error response; tested that pluginsd can properly handle faulty responses
* use double linked list macros for double linked list management
* faster apps.plugin function printing by minimizing file operations
* added memory percentage
* fix compatibility issues with older compilers and FreeBSD
* rrdpush sender code cleanup; rrhost structure cleanup from sender flags and variables;
* fix letftover variable in ifdef
* apps.plugin: do not call detach from the thread; exit immediately when input is broken
* exclude AR charts from health
* flush cleaner; prefer sender output
* clarity
* do not fill the cbuffer if not connected
* fix
* dont enabled host->sender if streaming is not enabled; send host label updates to parent;
* functions are only available through ACLK
* Prepared statement reports only in dev mode
* fix AR chart detection
* fix for streaming not being enabling itself
* more cleanup of sender and receiver structures
* moved read-only flags and configuration options to rrdhost->options
* fixed merge with master
* fix for incomplete rename
* prevent service thread from working on charts that are being collected
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r-- | streaming/rrdpush.h | 134 |
1 files changed, 101 insertions, 33 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index a6ff5ef030..e6566d25c9 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -10,32 +10,79 @@ #define CONNECTED_TO_SIZE 100 -#define STREAM_VERSION_CLAIM 3 -#define STREAM_VERSION_CLABELS 4 -#define STREAM_VERSION_COMPRESSION 5 -#define VERSION_GAP_FILLING 6 +// ---------------------------------------------------------------------------- +// obsolete versions - do not use anymore + +#define STREAM_OLD_VERSION_CLAIM 3 +#define STREAM_OLD_VERSION_CLABELS 4 +#define STREAM_OLD_VERSION_COMPRESSION 5 // this is production + +// ---------------------------------------------------------------------------- +// capabilities negotiation + +typedef enum { + // do not use the first 3 bits + STREAM_CAP_V1 = (1 << 3), // v1 = the oldest protocol + STREAM_CAP_V2 = (1 << 4), // v2 = the second version of the protocol (with host labels) + STREAM_CAP_VN = (1 << 5), // version negotiation supported (for versions 3, 4, 5 of the protocol) + // v3 = claiming supported + // v4 = chart labels supported + // v5 = lz4 compression supported + STREAM_CAP_VCAPS = (1 << 6), // capabilities negotiation supported + STREAM_CAP_HLABELS = (1 << 7), // host labels supported + STREAM_CAP_CLAIM = (1 << 8), // claiming supported + STREAM_CAP_CLABELS = (1 << 9), // chart labels supported + STREAM_CAP_COMPRESSION = (1 << 10), // lz4 compression supported + STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported + STREAM_CAP_GAP_FILLING = (1 << 12), // gap filling supported + + // this must be signed int, so don't use the last bit + // needed for negotiating errors between parent and child +} STREAM_CAPABILITIES; #ifdef ENABLE_COMPRESSION -#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_COMPRESSION) +#define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION #else -#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)(STREAM_VERSION_CLABELS) +#define STREAM_HAS_COMPRESSION 0 #endif //ENABLE_COMPRESSION +#define STREAM_OUR_CAPABILITIES (STREAM_CAP_V1 | STREAM_CAP_V2 | STREAM_CAP_VN | STREAM_CAP_VCAPS | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_HAS_COMPRESSION | STREAM_CAP_FUNCTIONS) + +#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability))) + +// ---------------------------------------------------------------------------- +// stream handshake + +#define HTTP_HEADER_SIZE 8192 + #define STREAMING_PROTOCOL_VERSION "1.1" -#define START_STREAMING_PROMPT "Hit me baby, push them over..." -#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..." +#define START_STREAMING_PROMPT_V1 "Hit me baby, push them over..." +#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..." #define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version=" #define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back" #define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server" #define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info." -#define HTTP_HEADER_SIZE 8192 - typedef enum { - RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW, - RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW -} RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY; + STREAM_HANDSHAKE_OK_V5 = 5, // COMPRESSION + STREAM_HANDSHAKE_OK_V4 = 4, // CLABELS + STREAM_HANDSHAKE_OK_V3 = 3, // CLAIM + STREAM_HANDSHAKE_OK_V2 = 2, // HLABELS + STREAM_HANDSHAKE_OK_V1 = 1, + STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE = -1, + STREAM_HANDSHAKE_ERROR_LOCALHOST = -2, + STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED = -3, + STREAM_HANDSHAKE_ERROR_DENIED = -4, + STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT = -5, + STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT = -6, + STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE = -7, + STREAM_HANDSHAKE_ERROR_SSL_ERROR = -8, + STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9 +} STREAM_HANDSHAKE; + + +// ---------------------------------------------------------------------------- typedef struct { char *os_name; @@ -46,7 +93,6 @@ typedef struct { } stream_encoded_t; #ifdef ENABLE_COMPRESSION -#define LZ4_MAX_MSG_SIZE 0x4000 struct compressor_state { char *compression_result_buffer; size_t compression_result_buffer_size; @@ -81,11 +127,17 @@ struct decompressor_state { // Thread-local storage // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it. +typedef enum { + SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown + SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression +} SENDER_FLAGS; + struct sender_state { RRDHOST *host; - pid_t task_id; - unsigned int overflow:1; - int timeout, default_port; + pid_t tid; // the thread id of the sender, from gettid() + SENDER_FLAGS flags; + int timeout; + int default_port; usec_t reconnect_delay; char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c size_t begin; @@ -99,14 +151,19 @@ struct sender_state { // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here. netdata_mutex_t mutex; struct circular_buffer *buffer; - BUFFER *build; - char read_buffer[512]; + char read_buffer[PLUGINSD_LINE_MAX + 1]; int read_len; - int32_t version; + STREAM_CAPABILITIES capabilities; + + int rrdpush_sender_pipe[2]; // collector to sender thread signaling + int rrdpush_sender_socket; + #ifdef ENABLE_COMPRESSION - unsigned int rrdpush_compression; struct compressor_state *compressor; #endif +#ifdef ENABLE_HTTPS + struct netdata_ssl ssl; // Structure used to encrypt the connection +#endif }; struct receiver_state { @@ -128,9 +185,9 @@ struct receiver_state { char *program_version; struct rrdhost_system_info *system_info; int update_every; - uint32_t stream_version; + STREAM_CAPABILITIES capabilities; time_t last_msg_t; - char read_buffer[1024]; // Need to allow RRD_ID_LENGTH_MAX * 4 + the other fields + char read_buffer[PLUGINSD_LINE_MAX + 1]; int read_len; unsigned int shutdown:1; // Tell the thread to exit unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!) @@ -144,11 +201,13 @@ struct receiver_state { }; struct rrdpush_destinations { - char destination[CONNECTED_TO_SIZE + 1]; - int disabled_no_proper_reply; - int disabled_because_of_localhost; - time_t disabled_already_streaming; - int disabled_because_of_denied_access; + STRING *destination; + + const char *last_error; + time_t postpone_reconnection_until; + STREAM_HANDSHAKE last_handshake; + + struct rrdpush_destinations *prev; struct rrdpush_destinations *next; }; @@ -161,10 +220,12 @@ extern char *default_rrdpush_api_key; extern char *default_rrdpush_send_charts_matching; extern unsigned int remote_clock_resync_iterations; +extern void rrdpush_destinations_init(RRDHOST *host); +extern void rrdpush_destinations_free(RRDHOST *host); + extern void sender_init(RRDHOST *parent); -extern struct rrdpush_destinations *destinations_init(const char *destinations); -void sender_start(struct sender_state *s); -void sender_commit(struct sender_state *s); +BUFFER *sender_start(struct sender_state *s); +void sender_commit(struct sender_state *s, BUFFER *wb); void sender_cancel(struct sender_state *s); extern int rrdpush_init(); extern int configured_as_parent(); @@ -172,7 +233,7 @@ extern void rrdset_done_push(RRDSET *st); extern bool rrdset_push_chart_definition_now(RRDSET *st); extern bool rrdpush_incremental_transmission_of_chart_definitions(RRDHOST *host, DICTFE *dictfe, bool restart, bool stop); extern void *rrdpush_sender_thread(void *ptr); -extern void rrdpush_send_labels(RRDHOST *host); +extern void rrdpush_send_host_labels(RRDHOST *host); extern void rrdpush_claimed_id(RRDHOST *host); extern int rrdpush_receiver_thread_spawn(struct web_client *w, char *url); @@ -181,7 +242,7 @@ extern void rrdpush_sender_thread_stop(RRDHOST *host); extern void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva); extern void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg); extern int connect_to_one_of_destinations( - struct rrdpush_destinations *destinations, + RRDHOST *host, int default_port, struct timeval *timeout, size_t *reconnects_counter, @@ -189,10 +250,17 @@ extern int connect_to_one_of_destinations( size_t connected_to_size, struct rrdpush_destinations **destination); +extern void rrdpush_signal_sender_to_wake_up(struct sender_state *s); + #ifdef ENABLE_COMPRESSION struct compressor_state *create_compressor(); struct decompressor_state *create_decompressor(); size_t is_compressed_data(const char *data, size_t data_size); #endif +extern void log_receiver_capabilities(struct receiver_state *rpt); +extern void log_sender_capabilities(struct sender_state *s); +extern STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version); +extern int32_t stream_capabilities_to_vn(uint32_t caps); + #endif //NETDATA_RRDPUSH_H |