summaryrefslogtreecommitdiffstats
path: root/database/rrdhost.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-10-05 14:13:46 +0300
committerGitHub <noreply@github.com>2022-10-05 14:13:46 +0300
commit8fc3b351a2e7fc96eced8f924de2e9cec9842128 (patch)
treebde41c66573ccaf8876c280e00742cc6096b587c /database/rrdhost.c
parent6850878e697d66dc90b9af1e750b22238c63c292 (diff)
Allow netdata plugins to expose functions for querying more information about specific charts (#13720)
* function renames and code cleanup in popen.c; no actual code changes * netdata popen() now opens both child process stdin and stdout and returns FILE * for both * pass both input and output to parser structures * updated rrdset to call custom functions * RRDSET FUNCTION leading calls for both sync and async operation * put RRDSET functions to a separate file * added format and timeout at function definition * support for synchronous (internal plugins) and asynchronous (external plugins and children) functions * /api/v1/function endpoint * functions are now attached to the host and there is a dictionary view per chart * functions implemented at plugins.d * remove the defer until keyword hook from plugins.d when it is done * stream sender implementation of functions * sanitization of all functions so that certain characters are only allowed * strictier sanitization * common max size * 1st working plugins.d example * always init inflight dictionary * properly destroy dictionaries to avoid parallel insertion of items * add more debugging on disconnection reasons * add more debugging on disconnection reasons again * streaming receiver respects newlines * dont use the same fp for both streaming receive and send * dont free dbengine memory with internal checks * make sender proceed in the buffer * added timing info and garbage collection at plugins.d * added info about routing nodes * added info about routing nodes with delay * added more info about delays * added more info about delays again * signal sending thread to wake up * streaming version labeling and commented code to support capabilities * added functions to /api/v1/data, /api/v1/charts, /api/v1/chart, /api/v1/info * redirect top output to stdout * address coverity findings * fix resource leaks of popen * log attempts to connect to individual destinations * better messages * properly parse destinations * try to find a function from the most matching to the least matching * log added streaming destinations * rotate destinations bypassing a node in the middle that does not accept our connection * break the loops properly * use typedef to define callbacks * capabilities negotiation during streaming * functions exposed upstream based on capabilities; compression disabled per node persisting reconnects; always try to connect with all capabilities * restore functionality to lookup functions * better logging of capabilities * remove old versions from capabilities when a newer version is there * fix formatting * optimization for plugins.d rrdlabels to avoid creating and destructing dictionaries all the time * delayed health initialization for rrddim and rrdset * cleanup health initialization * fix for popen() not returning the right value * add health worker jobs for initializing rrdset and rrddim * added content type support for functions; apps.plugin permanent function to display all the processes * fixes for functions parameters parsing in apps.plugin * fix for process matching in apps.plugiin * first working function for apps.plugin * Dashboard ACL is disabled for functions; Function errors are all in JSON format * apps.plugin function processes returns json table * use json_escape_string() to escape message * fix formatting * apps.plugin exposes all its metrics to function processes * fix json formatting when filtering out some rows * reopen the internal pipe of rrdpush in case of errors * misplaced statement * do not use buffer->len * support for GLOBAL functions (functions that are not linked to a chart * added /api/v1/functions endpoint; removed format from the FUNCTIONS api; * swagger documentation about the new api end points * added plugins.d documentation about functions * never re-close a file * remove uncessesary ifdef * fixed issues identified by codacy * fix for null label value * make edit-config copy-and-paste friendly * Revert "make edit-config copy-and-paste friendly" This reverts commit 54500c0e0a97f65a0c66c4d34e966f6a9056698e. * reworked sender handshake to fix coverity findings * timeout is zero, for both send_timeout() and recv_timeout() * properly detect that parent closed the socket * support caching of function responses; limit function response to 10MB; added protection from malformed function responses * disabled excessive logging * added units to apps.plugin function processes and normalized all values to be human readable * shorter field names * fixed issues reported * fixed apps.plugin error response; tested that pluginsd can properly handle faulty responses * use double linked list macros for double linked list management * faster apps.plugin function printing by minimizing file operations * added memory percentage * fix compatibility issues with older compilers and FreeBSD * rrdpush sender code cleanup; rrhost structure cleanup from sender flags and variables; * fix letftover variable in ifdef * apps.plugin: do not call detach from the thread; exit immediately when input is broken * exclude AR charts from health * flush cleaner; prefer sender output * clarity * do not fill the cbuffer if not connected * fix * dont enabled host->sender if streaming is not enabled; send host label updates to parent; * functions are only available through ACLK * Prepared statement reports only in dev mode * fix AR chart detection * fix for streaming not being enabling itself * more cleanup of sender and receiver structures * moved read-only flags and configuration options to rrdhost->options * fixed merge with master * fix for incomplete rename * prevent service thread from working on charts that are being collected Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'database/rrdhost.c')
-rw-r--r--database/rrdhost.c109
1 files changed, 48 insertions, 61 deletions
diff --git a/database/rrdhost.c b/database/rrdhost.c
index 1daaadf2a7..4b9aec96d3 100644
--- a/database/rrdhost.c
+++ b/database/rrdhost.c
@@ -74,9 +74,9 @@ inline RRDHOST *rrdhost_find_by_guid(const char *guid) {
static inline RRDHOST *rrdhost_index_add_by_guid(RRDHOST *host) {
RRDHOST *ret_machine_guid = dictionary_set(rrdhost_root_index, host->machine_guid, host, sizeof(RRDHOST));
if(ret_machine_guid == host)
- rrdhost_flag_set(host, RRDHOST_FLAG_INDEXED_MACHINE_GUID);
+ rrdhost_option_set(host, RRDHOST_OPTION_INDEXED_MACHINE_GUID);
else {
- rrdhost_flag_clear(host, RRDHOST_FLAG_INDEXED_MACHINE_GUID);
+ rrdhost_option_clear(host, RRDHOST_OPTION_INDEXED_MACHINE_GUID);
error("RRDHOST: %s() host with machine guid '%s' is already indexed", __FUNCTION__, host->machine_guid);
}
@@ -84,11 +84,11 @@ static inline RRDHOST *rrdhost_index_add_by_guid(RRDHOST *host) {
}
static void rrdhost_index_del_by_guid(RRDHOST *host) {
- if(rrdhost_flag_check(host, RRDHOST_FLAG_INDEXED_MACHINE_GUID)) {
+ if(rrdhost_option_check(host, RRDHOST_OPTION_INDEXED_MACHINE_GUID)) {
if(!dictionary_del(rrdhost_root_index, host->machine_guid))
error("RRDHOST: %s() failed to delete machine guid '%s' from index", __FUNCTION__, host->machine_guid);
- rrdhost_flag_clear(host, RRDHOST_FLAG_INDEXED_MACHINE_GUID);
+ rrdhost_option_clear(host, RRDHOST_OPTION_INDEXED_MACHINE_GUID);
}
}
@@ -107,9 +107,9 @@ static inline RRDHOST *rrdhost_index_add_hostname(RRDHOST *host) {
RRDHOST *ret_hostname = dictionary_set(rrdhost_root_index_hostname, rrdhost_hostname(host), host, sizeof(RRDHOST));
if(ret_hostname == host)
- rrdhost_flag_set(host, RRDHOST_FLAG_INDEXED_HOSTNAME);
+ rrdhost_option_set(host, RRDHOST_OPTION_INDEXED_HOSTNAME);
else {
- rrdhost_flag_clear(host, RRDHOST_FLAG_INDEXED_HOSTNAME);
+ rrdhost_option_clear(host, RRDHOST_OPTION_INDEXED_HOSTNAME);
error("RRDHOST: %s() host with hostname '%s' is already indexed", __FUNCTION__, rrdhost_hostname(host));
}
@@ -119,11 +119,11 @@ static inline RRDHOST *rrdhost_index_add_hostname(RRDHOST *host) {
static inline void rrdhost_index_del_hostname(RRDHOST *host) {
if(unlikely(!host->hostname)) return;
- if(rrdhost_flag_check(host, RRDHOST_FLAG_INDEXED_HOSTNAME)) {
+ if(rrdhost_option_check(host, RRDHOST_OPTION_INDEXED_HOSTNAME)) {
if(!dictionary_del(rrdhost_root_index_hostname, rrdhost_hostname(host)))
error("RRDHOST: %s() failed to delete hostname '%s' from index", __FUNCTION__, rrdhost_hostname(host));
- rrdhost_flag_clear(host, RRDHOST_FLAG_INDEXED_HOSTNAME);
+ rrdhost_option_clear(host, RRDHOST_OPTION_INDEXED_HOSTNAME);
}
}
@@ -200,38 +200,34 @@ void set_host_properties(RRDHOST *host, int update_every, RRD_MEMORY_MODE memory
// ----------------------------------------------------------------------------
// RRDHOST - add a host
-static void rrdhost_initialize_rrdpush(RRDHOST *host,
+static void rrdhost_initialize_rrdpush_sender(RRDHOST *host,
unsigned int rrdpush_enabled,
char *rrdpush_destination,
char *rrdpush_api_key,
char *rrdpush_send_charts_matching
) {
- if(rrdhost_flag_check(host, RRDHOST_FLAG_INITIALIZED_RRDPUSH)) return;
- rrdhost_flag_set(host, RRDHOST_FLAG_INITIALIZED_RRDPUSH);
+ if(rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_INITIALIZED)) return;
- sender_init(host);
- netdata_mutex_init(&host->receiver_lock);
+ if(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key) {
+ rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_INITIALIZED);
- host->rrdpush_send_enabled = (rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key) ? 1 : 0;
- host->rrdpush_send_destination = (host->rrdpush_send_enabled)?strdupz(rrdpush_destination):NULL;
+ sender_init(host);
- if (host->rrdpush_send_destination)
- host->destinations = destinations_init(host->rrdpush_send_destination);
+#ifdef ENABLE_HTTPS
+ host->sender->ssl.conn = NULL;
+ host->sender->ssl.flags = NETDATA_SSL_START;
+#endif
- host->rrdpush_send_api_key = (host->rrdpush_send_enabled)?strdupz(rrdpush_api_key):NULL;
- host->rrdpush_send_charts_matching = simple_pattern_create(rrdpush_send_charts_matching, NULL, SIMPLE_PATTERN_EXACT);
+ host->rrdpush_send_destination = strdupz(rrdpush_destination);
+ rrdpush_destinations_init(host);
- host->rrdpush_sender_pipe[0] = -1;
- host->rrdpush_sender_pipe[1] = -1;
- host->rrdpush_sender_socket = -1;
+ host->rrdpush_send_api_key = strdupz(rrdpush_api_key);
+ host->rrdpush_send_charts_matching = simple_pattern_create(rrdpush_send_charts_matching, NULL, SIMPLE_PATTERN_EXACT);
- //host->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION; Unused?
-#ifdef ENABLE_HTTPS
- host->ssl.conn = NULL;
- host->ssl.flags = NETDATA_SSL_START;
- host->stream_ssl.conn = NULL;
- host->stream_ssl.flags = NETDATA_SSL_START;
-#endif
+ rrdhost_option_set(host, RRDHOST_OPTION_SENDER_ENABLED);
+ }
+ else
+ rrdhost_option_clear(host, RRDHOST_OPTION_SENDER_ENABLED);
}
static void rrdhost_initialize_health(RRDHOST *host,
@@ -342,8 +338,7 @@ RRDHOST *rrdhost_create(const char *hostname,
int is_in_multihost = (memory_mode == RRD_MEMORY_MODE_DBENGINE && !is_legacy);
RRDHOST *host = callocz(1, sizeof(RRDHOST));
- strncpy(host->machine_guid, guid, GUID_LEN);
- host->machine_guid[GUID_LEN] = '\0';
+ strncpyz(host->machine_guid, guid, GUID_LEN + 1);
set_host_properties(host, (update_every > 0)?update_every:1, memory_mode, registry_hostname, os,
tags, timezone, abbrev_timezone, utc_offset, program_name, program_version);
@@ -354,23 +349,25 @@ RRDHOST *rrdhost_create(const char *hostname,
host->health_enabled = ((memory_mode == RRD_MEMORY_MODE_NONE)) ? 0 : health_enabled;
if (likely(!archived)) {
+ rrdfunctions_init(host);
host->rrdlabels = rrdlabels_create();
- rrdhost_initialize_rrdpush(
+ rrdhost_initialize_rrdpush_sender(
host, rrdpush_enabled, rrdpush_destination, rrdpush_api_key, rrdpush_send_charts_matching);
}
netdata_rwlock_init(&host->rrdhost_rwlock);
netdata_mutex_init(&host->aclk_state_lock);
+ netdata_mutex_init(&host->receiver_lock);
host->system_info = system_info;
rrdset_index_init(host);
if(config_get_boolean(CONFIG_SECTION_DB, "delete obsolete charts files", 1))
- rrdhost_flag_set(host, RRDHOST_FLAG_DELETE_OBSOLETE_CHARTS);
+ rrdhost_option_set(host, RRDHOST_OPTION_DELETE_OBSOLETE_CHARTS);
if(config_get_boolean(CONFIG_SECTION_DB, "delete orphan hosts files", 1) && !is_localhost)
- rrdhost_flag_set(host, RRDHOST_FLAG_DELETE_ORPHAN_HOST);
+ rrdhost_option_set(host, RRDHOST_OPTION_DELETE_ORPHAN_HOST);
char filename[FILENAME_MAX + 1];
if(is_localhost) {
@@ -527,7 +524,7 @@ RRDHOST *rrdhost_create(const char *hostname,
, host->rrd_update_every
, rrd_memory_mode_name(host->rrd_memory_mode)
, host->rrd_history_entries
- , host->rrdpush_send_enabled?"enabled":"disabled"
+ , rrdhost_has_rrdpush_sender_enabled(host)?"enabled":"disabled"
, host->rrdpush_send_destination?host->rrdpush_send_destination:""
, host->rrdpush_send_api_key?host->rrdpush_send_api_key:""
, host->health_enabled?"enabled":"disabled"
@@ -574,10 +571,6 @@ void rrdhost_update(RRDHOST *host
)
{
UNUSED(guid);
- UNUSED(rrdpush_enabled);
- UNUSED(rrdpush_destination);
- UNUSED(rrdpush_api_key);
- UNUSED(rrdpush_send_charts_matching);
host->health_enabled = (mode == RRD_MEMORY_MODE_NONE) ? 0 : health_enabled;
//host->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION; Unused?
@@ -629,20 +622,21 @@ void rrdhost_update(RRDHOST *host
if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)) {
rrdhost_flag_clear(host, RRDHOST_FLAG_ARCHIVED);
+ rrdfunctions_init(host);
+
if(!host->rrdlabels)
host->rrdlabels = rrdlabels_create();
if (!host->rrdset_root_index)
rrdset_index_init(host);
- rrdhost_initialize_rrdpush(host,
+ rrdhost_initialize_rrdpush_sender(host,
rrdpush_enabled,
rrdpush_destination,
rrdpush_api_key,
rrdpush_send_charts_matching);
- rrdhost_initialize_health(host,
- host == localhost);
+ rrdhost_initialize_health(host, host == localhost);
rrd_hosts_available++;
ml_new_host(host);
@@ -1015,12 +1009,13 @@ void destroy_receiver_state(struct receiver_state *rpt);
void stop_streaming_sender(RRDHOST *host)
{
+ rrdhost_option_clear(host, RRDHOST_OPTION_SENDER_ENABLED);
+
if (unlikely(!host->sender))
return;
rrdpush_sender_thread_stop(host); // stop a possibly running thread
cbuffer_free(host->sender->buffer);
- buffer_free(host->sender->build);
#ifdef ENABLE_COMPRESSION
if (host->sender->compressor)
host->sender->compressor->destroy(&host->sender->compressor);
@@ -1156,12 +1151,7 @@ void rrdhost_free(RRDHOST *host, bool force) {
freez(host->varlib_dir);
freez(host->rrdpush_send_api_key);
freez(host->rrdpush_send_destination);
- struct rrdpush_destinations *tmp_destination;
- while (host->destinations) {
- tmp_destination = host->destinations->next;
- freez(host->destinations);
- host->destinations = tmp_destination;
- }
+ rrdpush_destinations_free(host);
string_freez(host->health_default_exec);
string_freez(host->health_default_recipient);
freez(host->health_log_filename);
@@ -1173,6 +1163,7 @@ void rrdhost_free(RRDHOST *host, bool force) {
freez(host->node_id);
rrdfamily_index_destroy(host);
+ rrdfunctions_destroy(host);
rrdvariables_destroy(host->rrdvars);
rrdhost_destroy_rrdcontexts(host);
@@ -1318,16 +1309,17 @@ static void rrdhost_load_kubernetes_labels(void) {
debug(D_RRDHOST, "Attempting to fetch external labels via %s", label_script);
pid_t pid;
- FILE *fp = mypopen(label_script, &pid);
- if(!fp) return;
+ FILE *fp_child_input;
+ FILE *fp_child_output = netdata_popen(label_script, &pid, &fp_child_input);
+ if(!fp_child_output) return;
char buffer[1000 + 1];
- while (fgets(buffer, 1000, fp) != NULL)
+ while (fgets(buffer, 1000, fp_child_output) != NULL)
rrdlabels_add_pair(localhost->rrdlabels, buffer, RRDLABEL_SRC_AUTO|RRDLABEL_SRC_K8S);
// Non-zero exit code means that all the script output is error messages. We've shown already any message that didn't include a ':'
// Here we'll inform with an ERROR that the script failed, show whatever (if anything) was added to the list of labels, free the memory and set the return to null
- int rc = mypclose(fp, pid);
+ int rc = netdata_pclose(fp_child_input, fp_child_output, pid);
if(rc) error("%s exited abnormally. Failed to get kubernetes labels.", label_script);
}
@@ -1347,12 +1339,7 @@ void reload_host_labels(void) {
health_label_log_save(localhost);
-/* TODO-GAPS - fix this so that it looks properly at the state and version of the sender
- if(localhost->rrdpush_send_enabled && localhost->rrdpush_sender_buffer){
- localhost->labels.labels_flag |= RRDHOST_FLAG_STREAM_LABELS_UPDATE;
- rrdpush_send_labels(localhost);
- }
-*/
+ rrdpush_send_host_labels(localhost);
health_reload();
}
@@ -1385,7 +1372,7 @@ void rrdhost_cleanup_charts(RRDHOST *host) {
info("Cleaning up database of host '%s'...", rrdhost_hostname(host));
RRDSET *st;
- uint32_t rrdhost_delete_obsolete_charts = rrdhost_flag_check(host, RRDHOST_FLAG_DELETE_OBSOLETE_CHARTS);
+ uint32_t rrdhost_delete_obsolete_charts = rrdhost_option_check(host, RRDHOST_OPTION_DELETE_OBSOLETE_CHARTS);
// we get a write lock
// to ensure only one thread is saving the database
@@ -1430,7 +1417,7 @@ void rrdhost_cleanup_all(void) {
RRDHOST *host;
rrdhost_foreach_read(host) {
- if (host != localhost && rrdhost_flag_check(host, RRDHOST_FLAG_DELETE_ORPHAN_HOST) && !host->receiver
+ if (host != localhost && rrdhost_option_check(host, RRDHOST_OPTION_DELETE_ORPHAN_HOST) && !host->receiver
#ifdef ENABLE_DBENGINE
/* don't delete multi-host DB host files */
&& !(host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && is_storage_engine_shared(host->storage_instance[0]))