summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2024-01-11 16:56:45 +0200
committerGitHub <noreply@github.com>2024-01-11 16:56:45 +0200
commitf2b250a1f53af00241522db35f8c85f19ed282e1 (patch)
treee813ca47880e3e2adf09583658efef59633ec6bd /database
parentbead543ea52e51cf73f7e5b27de53197801399a7 (diff)
dyncfg v2 (#16702)
* split rrdfunctions streaming and progress * simplified internal inline functions API * split rrdfunctions inflight management * split rrd functions exporters * renames * base dyncfg structure * config pluginsd * intercept dyncfg function calls * loading and saving of dyncfg metadata and data * save metadata and payload to a single file; added code to update the plugins with jobs and saved configs * basic working unit test * added payload to functions execution * removed old dyncfg code that is not needed any more * more cleanup * cleanup sender for functions with payload * dyncfg functions are not exposed as functions * remaining work to avoid indexing the \0 terminating character in dictionary keys * added back old dyncfg plugins.d commands as noop, to allow plugins continue working * working api; working streaming; * updated plugins.d documentation * aclk and http api requests share the same header parsing logic * added source type internal * fixed crashes * added god mode for tests * fixes * fixed messages * save host machine guids to configs * cleaner manipulation of supported commands * the functions event loop for external plugins can now process dyncfg requests * unified internal and external plugins dyncfg API * Netdata serves schema requests from /etc/netdata/schema.d and /var/lib/netdata/conf.d/schema.d * cleanup and various fixes; fixed bug in previous dyncfg implementation on streaming that was sending the paylod in a way that allowed other streaming commands to be multiplexed * internals go to a separate header file * fix duplicate ACLK requests sent by aclk queue mechanism * use fstat instead of stat * working api * plugin actions renamed to create and delete; dyncfg files are removed only from user actions * prevent deadlock by using the react callback * fix for string_strndupz() * better dyncfg unittests * more tests at the unittests * properly detect dyncfg functions * hide config functions from the UI * tree response improvements * send the initial update with payload * determine tty using stdout, not stderr * changes to statuses, cleanup and the code to bring all business logic into interception * do not crash when the status is empty * functions now propagate the source of the requests to plugins * avoid warning about unused functions * in the count at items for attention, do not count the orphan entries * save source into dyncfg * make the list null terminated * fixed invalid comparison * prevent memory leak on duplicated headers; log x-forwarded-for * more unit tests * added dyncfg unittests into the default unittests * more unit tests and fixes * more unit tests and fixes * fix dictionary unittests * config functions require admin access
Diffstat (limited to 'database')
-rw-r--r--database/contexts/api_v2.c13
-rw-r--r--database/rrd.h2
-rw-r--r--database/rrdcalc.c10
-rw-r--r--database/rrdcalctemplate.c2
-rw-r--r--database/rrdcollector-internals.h17
-rw-r--r--database/rrdcollector.c2
-rw-r--r--database/rrdcollector.h16
-rw-r--r--database/rrddimvar.c2
-rw-r--r--database/rrdfunctions-exporters.c164
-rw-r--r--database/rrdfunctions-exporters.h16
-rw-r--r--database/rrdfunctions-inflight.c641
-rw-r--r--database/rrdfunctions-inflight.h16
-rw-r--r--database/rrdfunctions-inline.c42
-rw-r--r--database/rrdfunctions-inline.h14
-rw-r--r--database/rrdfunctions-internals.h36
-rw-r--r--database/rrdfunctions-progress.c8
-rw-r--r--database/rrdfunctions-progress.h10
-rw-r--r--database/rrdfunctions-streaming.c626
-rw-r--r--database/rrdfunctions-streaming.h12
-rw-r--r--database/rrdfunctions.c1450
-rw-r--r--database/rrdfunctions.h101
-rw-r--r--database/rrdhost.c30
-rw-r--r--database/rrdlabels.c8
-rw-r--r--database/rrdlabels.h2
-rw-r--r--database/rrdsetvar.c2
-rw-r--r--database/rrdvar.c8
26 files changed, 1753 insertions, 1497 deletions
diff --git a/database/contexts/api_v2.c b/database/contexts/api_v2.c
index d8714f73f1..3d2a954547 100644
--- a/database/contexts/api_v2.c
+++ b/database/contexts/api_v2.c
@@ -729,6 +729,15 @@ static void agent_capabilities_to_json(BUFFER *wb, RRDHOST *host, const char *ke
freez(capas);
}
+static inline void host_dyncfg_to_json_v2(BUFFER *wb, const char *key, RRDHOST_STATUS *s) {
+ buffer_json_member_add_object(wb, key);
+ {
+ buffer_json_member_add_string(wb, "status", rrdhost_dyncfg_status_to_string(s->dyncfg.status));
+ }
+ buffer_json_object_close(wb); // health
+
+}
+
static inline void rrdhost_health_to_json_v2(BUFFER *wb, const char *key, RRDHOST_STATUS *s) {
buffer_json_member_add_object(wb, key);
{
@@ -841,6 +850,8 @@ static void rrdcontext_to_json_v2_rrdhost(BUFFER *wb, RRDHOST *host, struct rrdc
host_functions2json(host, wb); // functions
agent_capabilities_to_json(wb, host, "capabilities");
+
+ host_dyncfg_to_json_v2(wb, "dyncfg", &s);
}
buffer_json_object_close(wb); // this instance
buffer_json_array_close(wb); // instances
@@ -917,7 +928,7 @@ static ssize_t rrdcontext_to_json_v2_add_host(void *data, RRDHOST *host, bool qu
.node_ids = &ctl->nodes.ni,
.help = NULL,
.tags = NULL,
- .access = HTTP_ACCESS_MEMBERS,
+ .access = HTTP_ACCESS_MEMBER,
.priority = RRDFUNCTIONS_PRIORITY_DEFAULT,
};
host_functions_to_dict(host, ctl->functions.dict, &t, sizeof(t), &t.help, &t.tags, &t.access, &t.priority);
diff --git a/database/rrd.h b/database/rrd.h
index 63bafc6f8b..b63a7e8340 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -1358,8 +1358,6 @@ struct rrdhost {
netdata_mutex_t aclk_state_lock;
aclk_rrdhost_state aclk_state;
- DICTIONARY *configurable_plugins; // configurable plugins for this host
-
struct rrdhost *next;
struct rrdhost *prev;
};
diff --git a/database/rrdcalc.c b/database/rrdcalc.c
index 199d908030..2fabcb54c5 100644
--- a/database/rrdcalc.c
+++ b/database/rrdcalc.c
@@ -190,11 +190,11 @@ const RRDCALC_ACQUIRED *rrdcalc_from_rrdset_get(RRDSET *st, const char *alert_na
char key[RRDCALC_MAX_KEY_SIZE + 1];
size_t key_len = rrdcalc_key(key, RRDCALC_MAX_KEY_SIZE, rrdset_id(st), alert_name);
- const RRDCALC_ACQUIRED *rca = (const RRDCALC_ACQUIRED *)dictionary_get_and_acquire_item_advanced(st->rrdhost->rrdcalc_root_index, key, (ssize_t)(key_len + 1));
+ const RRDCALC_ACQUIRED *rca = (const RRDCALC_ACQUIRED *)dictionary_get_and_acquire_item_advanced(st->rrdhost->rrdcalc_root_index, key, (ssize_t)key_len);
if(!rca) {
key_len = rrdcalc_key(key, RRDCALC_MAX_KEY_SIZE, rrdset_name(st), alert_name);
- rca = (const RRDCALC_ACQUIRED *)dictionary_get_and_acquire_item_advanced(st->rrdhost->rrdcalc_root_index, key, (ssize_t)(key_len + 1));
+ rca = (const RRDCALC_ACQUIRED *)dictionary_get_and_acquire_item_advanced(st->rrdhost->rrdcalc_root_index, key, (ssize_t)key_len);
}
return rca;
@@ -727,7 +727,7 @@ void rrdcalc_add_from_rrdcalctemplate(RRDHOST *host, RRDCALCTEMPLATE *rt, RRDSET
.existing_from_template = false,
};
- dictionary_set_advanced(host->rrdcalc_root_index, key, (ssize_t)(key_len + 1), NULL, sizeof(RRDCALC), &tmp);
+ dictionary_set_advanced(host->rrdcalc_root_index, key, (ssize_t)key_len, NULL, sizeof(RRDCALC), &tmp);
if(tmp.react_action != RRDCALC_REACT_NEW && tmp.existing_from_template == false)
netdata_log_error("RRDCALC: from template '%s' on chart '%s' with key '%s', failed to be added to host '%s'. It is manually configured.",
string2str(rt->name), rrdset_id(st), key, rrdhost_hostname(host));
@@ -761,7 +761,7 @@ int rrdcalc_add_from_config(RRDHOST *host, RRDCALC *rc) {
};
int ret = 1;
- RRDCALC *t = dictionary_set_advanced(host->rrdcalc_root_index, key, (ssize_t)(key_len + 1), rc, sizeof(RRDCALC), &tmp);
+ RRDCALC *t = dictionary_set_advanced(host->rrdcalc_root_index, key, (ssize_t)key_len, rc, sizeof(RRDCALC), &tmp);
if(tmp.react_action == RRDCALC_REACT_NEW) {
// we copied rc into the dictionary, so we have to free the container here
freez(rc);
@@ -795,7 +795,7 @@ static void rrdcalc_unlink_and_delete(RRDHOST *host, RRDCALC *rc, bool having_ll
if(rc->rrdset)
rrdcalc_unlink_from_rrdset(rc, having_ll_wrlock);
- dictionary_del_advanced(host->rrdcalc_root_index, string2str(rc->key), (ssize_t)string_strlen(rc->key) + 1);
+ dictionary_del_advanced(host->rrdcalc_root_index, string2str(rc->key), (ssize_t)string_strlen(rc->key));
}
diff --git a/database/rrdcalctemplate.c b/database/rrdcalctemplate.c
index f0e5da80bf..27971857bc 100644
--- a/database/rrdcalctemplate.c
+++ b/database/rrdcalctemplate.c
@@ -231,7 +231,7 @@ void rrdcalctemplate_add_from_config(RRDHOST *host, RRDCALCTEMPLATE *rt) {
size_t key_len = snprintfz(key, RRDCALCTEMPLATE_MAX_KEY_SIZE, "%s", rrdcalctemplate_name(rt));
bool added = false;
- dictionary_set_advanced(host->rrdcalctemplate_root_index, key, (ssize_t)(key_len + 1), rt, sizeof(*rt), &added);
+ dictionary_set_advanced(host->rrdcalctemplate_root_index, key, (ssize_t)key_len, rt, sizeof(*rt), &added);
if(added)
freez(rt);
diff --git a/database/rrdcollector-internals.h b/database/rrdcollector-internals.h
new file mode 100644
index 0000000000..d63ef6a76b
--- /dev/null
+++ b/database/rrdcollector-internals.h
@@ -0,0 +1,17 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDCOLLECTOR_INTERNALS_H
+#define NETDATA_RRDCOLLECTOR_INTERNALS_H
+
+#include "rrd.h"
+
+struct rrd_collector;
+struct rrd_collector *rrd_collector_acquire_current_thread(void);
+void rrd_collector_release(struct rrd_collector *rdc);
+extern __thread struct rrd_collector *thread_rrd_collector;
+bool rrd_collector_running(struct rrd_collector *rdc);
+pid_t rrd_collector_tid(struct rrd_collector *rdc);
+bool rrd_collector_dispatcher_acquire(struct rrd_collector *rdc);
+void rrd_collector_dispatcher_release(struct rrd_collector *rdc);
+
+#endif //NETDATA_RRDCOLLECTOR_INTERNALS_H
diff --git a/database/rrdcollector.c b/database/rrdcollector.c
index b776433c75..1a116c0c2e 100644
--- a/database/rrdcollector.c
+++ b/database/rrdcollector.c
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
-#define NETDATA_RRDCOLLECTOR_INTERNALS
#include "rrdcollector.h"
+#include "rrdcollector-internals.h"
// Each function points to this collector structure
// so that when the collector exits, all of them will
diff --git a/database/rrdcollector.h b/database/rrdcollector.h
index aeab1de52e..f1bbcbb97b 100644
--- a/database/rrdcollector.h
+++ b/database/rrdcollector.h
@@ -5,22 +5,6 @@
#include "rrd.h"
-#ifdef NETDATA_RRDCOLLECTOR_INTERNALS
-
-// ----------------------------------------------------------------------------
-// private API
-
-struct rrd_collector;
-struct rrd_collector *rrd_collector_acquire_current_thread(void);
-void rrd_collector_release(struct rrd_collector *rdc);
-extern __thread struct rrd_collector *thread_rrd_collector;
-bool rrd_collector_running(struct rrd_collector *rdc);
-pid_t rrd_collector_tid(struct rrd_collector *rdc);
-bool rrd_collector_dispatcher_acquire(struct rrd_collector *rdc);
-void rrd_collector_dispatcher_release(struct rrd_collector *rdc);
-
-#endif // NETDATA_RRDCOLLECTOR_INTERNALS
-
// ----------------------------------------------------------------------------
// public API
diff --git a/database/rrddimvar.c b/database/rrddimvar.c
index 5035d70a55..63d1c84046 100644
--- a/database/rrddimvar.c
+++ b/database/rrddimvar.c
@@ -243,7 +243,7 @@ void rrddimvar_add_and_leave_released(RRDDIM *rd, RRDVAR_TYPE type, const char *
.value = value,
.rrddim = rd
};
- dictionary_set_advanced(rd->rrdset->rrddimvar_root_index, key, (ssize_t)(key_len + 1), NULL, sizeof(RRDDIMVAR), &tmp);
+ dictionary_set_advanced(rd->rrdset->rrddimvar_root_index, key, (ssize_t)key_len, NULL, sizeof(RRDDIMVAR), &tmp);
}
void rrddimvar_rename_all(RRDDIM *rd) {
diff --git a/database/rrdfunctions-exporters.c b/database/rrdfunctions-exporters.c
new file mode 100644
index 0000000000..bed17ed41a
--- /dev/null
+++ b/database/rrdfunctions-exporters.c
@@ -0,0 +1,164 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#define NETDATA_RRD_INTERNALS
+
+#include "rrdfunctions-internals.h"
+#include "rrdfunctions-exporters.h"
+
+void rrd_chart_functions_expose_rrdpush(RRDSET *st, BUFFER *wb) {
+ if(!st->functions_view)
+ return;
+
+ struct rrd_host_function *t;
+ dfe_start_read(st->functions_view, t) {
+ if(t->options & RRD_FUNCTION_DYNCFG) continue;
+
+ buffer_sprintf(wb
+ , PLUGINSD_KEYWORD_FUNCTION " \"%s\" %d \"%s\" \"%s\" \"%s\" %d\n"
+ , t_dfe.name
+ , t->timeout
+ , string2str(t->help)
+ , string2str(t->tags)
+ , http_id2access(t->access)
+ ,
+ t->priority
+ );
+ }
+ dfe_done(t);
+}
+
+void rrd_global_functions_expose_rrdpush(RRDHOST *host, BUFFER *wb, bool dyncfg) {
+ rrdhost_flag_clear(host, RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED);
+
+ size_t configs = 0;
+
+ struct rrd_host_function *tmp;
+ dfe_start_read(host->functions, tmp) {
+ if(tmp->options & RRD_FUNCTION_LOCAL) continue;
+ if(tmp->options & RRD_FUNCTION_DYNCFG) {
+ // we should not send dyncfg to this parent
+ configs++;
+ continue;
+ }
+
+ buffer_sprintf(wb
+ , PLUGINSD_KEYWORD_FUNCTION " GLOBAL \"%s\" %d \"%s\" \"%s\" \"%s\" %d\n"
+ , tmp_dfe.name
+ , tmp->timeout
+ , string2str(tmp->help)
+ , string2str(tmp->tags)
+ , http_id2access(tmp->access)
+ , tmp->priority
+ );
+ }
+ dfe_done(tmp);
+
+ if(dyncfg && configs)
+ dyncfg_add_streaming(wb);
+}
+
+static void functions2json(DICTIONARY *functions, BUFFER *wb) {
+ struct rrd_host_function *t;
+ dfe_start_read(functions, t) {
+ if (!rrd_collector_running(t->collector)) continue;
+ if(t->options & RRD_FUNCTION_DYNCFG) continue;
+
+ buffer_json_member_add_object(wb, t_dfe.name);
+ {
+ buffer_json_member_add_string_or_empty(wb, "help", string2str(t->help));
+ buffer_json_member_add_int64(wb, "timeout", (int64_t) t->timeout);
+
+ char options[65];
+ snprintfz(
+ options, 64
+ , "%s%s"
+ , (t->options & RRD_FUNCTION_LOCAL) ? "LOCAL " : ""
+ , (t->options & RRD_FUNCTION_GLOBAL) ? "GLOBAL" : ""
+ );
+
+ buffer_json_member_add_string_or_empty(wb, "options", options);
+ buffer_json_member_add_string_or_empty(wb, "tags", string2str(t->tags));
+ buffer_json_member_add_string(wb, "access", http_id2access(t->access));
+ buffer_json_member_add_uint64(wb, "priority", t->priority);
+ }
+ buffer_json_object_close(wb);
+ }
+ dfe_done(t);
+}
+
+void chart_functions2json(RRDSET *st, BUFFER *wb) {
+ if(!st || !st->functions_view) return;
+
+ functions2json(st->functions_view, wb);
+}
+
+void host_functions2json(RRDHOST *host, BUFFER *wb) {
+ if(!host || !host->functions) return;
+
+ buffer_json_member_add_object(wb, "functions");
+
+ struct rrd_host_function *t;
+ dfe_start_read(host->functions, t) {
+ if(!rrd_collector_running(t->collector)) continue;
+ if(t->options & RRD_FUNCTION_DYNCFG) continue;
+
+ buffer_json_member_add_object(wb, t_dfe.name);
+ {
+ buffer_json_member_add_string(wb, "help", string2str(t->help));
+ buffer_json_member_add_int64(wb, "timeout", t->timeout);
+ buffer_json_member_add_array(wb, "options");
+ {
+ if (t->options & RRD_FUNCTION_GLOBAL)
+ buffer_json_add_array_item_string(wb, "GLOBAL");
+ if (t->options & RRD_FUNCTION_LOCAL)
+ buffer_json_add_array_item_string(wb, "LOCAL");
+ }
+ buffer_json_array_close(wb);
+ buffer_json_member_add_string(wb, "tags", string2str(t->tags));
+ buffer_json_member_add_string(wb, "access", http_id2access(t->access));
+ buffer_json_member_add_uint64(wb, "priority", t->priority);
+ }
+ buffer_json_object_close(wb);
+ }
+ dfe_done(t);
+
+ buffer_json_object_close(wb);
+}
+
+void chart_functions_to_dict(DICTIONARY *rrdset_functions_view, DICTIONARY *dst, void *value, size_t value_size) {
+ if(!rrdset_functions_view || !dst) return;
+
+ struct rrd_host_function *t;
+ dfe_start_read(rrdset_functions_view, t) {
+ if(!rrd_collector_running(t->collector)) continue;
+ if(t->options & RRD_FUNCTION_DYNCFG) continue;
+
+ dictionary_set(dst, t_dfe.name, value, value_size);
+ }
+ dfe_done(t);
+}
+
+void host_functions_to_dict(RRDHOST *host, DICTIONARY *dst, void *value, size_t value_size, STRING **help, STRING **tags, HTTP_ACCESS *access, int *priority) {
+ if(!host || !host->functions || !dictionary_entries(host->functions) || !dst) return;
+
+ struct rrd_host_function *t;
+ dfe_start_read(host->functions, t) {
+ if(!rrd_collector_running(t->collector)) continue;
+ if(t->options & RRD_FUNCTION_DYNCFG) continue;
+
+ if(help)
+ *help = t->help;
+
+ if(tags)
+ *tags = t->tags;
+
+ if(access)
+ *access = t->access;
+
+ if(priority)
+ *priority = t->priority;
+
+ dictionary_set(dst, t_dfe.name, value, value_size);
+ }
+ dfe_done(t);
+}
diff --git a/database/rrdfunctions-exporters.h b/database/rrdfunctions-exporters.h
new file mode 100644
index 0000000000..9fb525cd98
--- /dev/null
+++ b/database/rrdfunctions-exporters.h
@@ -0,0 +1,16 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_RRDFUNCTIONS_EXPORTERS_H
+#define NETDATA_RRDFUNCTIONS_EXPORTERS_H
+
+#include "rrd.h"
+
+void rrd_chart_functions_expose_rrdpush(RRDSET *st, BUFFER *wb);
+void rrd_global_functions_expose_rrdpush(RRDHOST *host, BUFFER *wb, bool dyncfg);
+
+void chart_functions2json(RRDSET *st, BUFFER *wb);
+void chart_functions_to_dict(DICTIONARY *rrdset_functions_view, DICTIONARY *dst, void *value, size_t value_size);
+void host_functions_to_dict(RRDHOST *host, DICTIONARY *dst, void *value, size_t value_size, STRING **help, STRING **tags, HTTP_ACCESS *access, int *priority);
+void host_functions2json(RRDHOST *host, BUFFER *wb);
+
+#endif //NETDATA_RRDFUNCTIONS_EXPORTERS_H
diff --git a/database/rrdfunctions-inflight.c b/database/rrdfunctions-inflight.c
new file mode 100644
index 0000000000..a318c0d317
--- /dev/null
+++ b/database/rrdfunctions-inflight.c
@@ -0,0 +1,641 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#define NETDATA_RRD_INTERNALS
+
+#include "rrdcollector-internals.h"
+#include "rrdfunctions-internals.h"
+#include "rrdfunctions-inflight.h"
+
+struct rrd_function_inflight {
+ bool used;
+
+ RRDHOST *host;
+ uuid_t transaction_uuid;
+ const char *transaction;
+ const char *cmd;
+ const char *sanitized_cmd;
+ const char *source;
+ size_t sanitized_cmd_length;
+ int timeout;
+ bool cancelled;
+ usec_t stop_monotonic_ut;
+
+ BUFFER *payload;
+
+ const DICTIONARY_ITEM *host_function_acquired;
+
+ // the collector
+ // we acquire this structure at the beginning,
+ // and we release it at the end
+ struct rrd_host_function *rdcf;
+
+ struct {
+ BUFFER *wb;
+
+ // in async mode,
+ // the function to call to send the result back
+ rrd_function_result_callback_t cb;
+ void *data;
+ } result;
+
+ struct {
+ // to be called in sync mode
+ // while the function is running
+ // to check if the function has been canceled
+ rrd_function_is_cancelled_cb_t cb;
+ void *data;
+ } is_cancelled;
+
+ struct {
+ // to be registered by the function itself
+ // used to signal the function to cancel
+ rrd_function_cancel_cb_t cb;
+ void *data;
+ } canceller;
+
+ struct {
+ // callback to receive progress reports from function
+ rrd_function_progress_cb_t cb;
+ void *data;
+ } progress;
+
+ struct {
+ // to be registered by the function itself
+ // used to send progress requests to function
+ rrd_function_progresser_cb_t cb;
+ void *data;
+ } progresser;
+};
+
+static DICTIONARY *rrd_functions_inflight_requests = NULL;
+
+static void rrd_function_cancel_inflight(struct rrd_function_inflight *r);
+
+// ----------------------------------------------------------------------------
+
+static void rrd_functions_inflight_cleanup(struct rrd_function_inflight *r) {
+ buffer_free(r->payload);
+ freez((void *)r->transaction);
+ freez((void *)r->cmd);
+ freez((void *)r->sanitized_cmd);
+ freez((void *)r->source);
+
+ r->payload = NULL;
+ r->transaction = NULL;
+ r->cmd = NULL;
+ r->sanitized_cmd = NULL;
+}
+
+static void rrd_functions_inflight_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) {
+ struct rrd_function_inflight *r = value;
+
+ // internal_error(true, "FUNCTIONS: transaction '%s' finished", r->transaction);
+
+ rrd_functions_inflight_cleanup(r);
+ dictionary_acquired_item_release(r->host->functions, r->host_function_acquired);
+}
+
+void rrd_functions_inflight_init(void) {
+ if(rrd_functions_inflight_requests)
+ return;
+
+ rrd_functions_inflight_requests = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, NULL, sizeof(struct rrd_function_inflight));
+
+ dictionary_register_delete_callback(rrd_functions_inflight_requests, rrd_functions_inflight_delete_cb, NULL);
+}
+
+void rrd_functions_inflight_destroy(void) {
+ if(!rrd_functions_inflight_requests)
+ return;
+
+ dictionary_destroy(rrd_functions_inflight_requests);
+ rrd_functions_inflight_requests = NULL;
+}
+
+static void rrd_inflight_async_function_register_canceller_cb(void *register_canceller_cb_data, rrd_function_cancel_cb_t canceller_cb, void *canceller_cb_data) {
+ struct rrd_function_inflight *r = register_canceller_cb_data;
+ r->canceller.cb = canceller_cb;
+ r->canceller.data = canceller_cb_data;
+}
+
+static void rrd_inflight_async_function_register_progresser_cb(void *register_progresser_cb_data, rrd_function_progresser_cb_t progresser_cb, void *progresser_cb_data) {
+ struct rrd_function_inflight *r = register_progresser_cb_data;
+ r->progresser.cb = progresser_cb;
+ r->progresser.data = progresser_cb_data;
+}
+
+// ----------------------------------------------------------------------------
+// waiting for async function completion
+
+struct rrd_function_call_wait {
+ RRDHOST *host;
+ const DICTIONARY_ITEM *host_function_acquired;
+ char *transaction;
+
+ bool free_with_signal;
+ bool data_are_ready;
+ netdata_mutex_t mutex;
+ pthread_cond_t cond;
+ int code;
+};
+
+static void rrd_inflight_function_cleanup(RRDHOST *host __maybe_unused, const char *transaction) {
+ dictionary_del(rrd_functions_inflight_requests, transaction);
+ dictionary_garbage_collect(rrd_functions_inflight_requests);
+}
+
+static void rrd_function_call_wait_free(struct rrd_function_call_wait *tmp) {
+ rrd_inflight_function_cleanup(tmp->host, tmp->transaction);
+ freez(tmp->transaction);
+
+ pthread_cond_destroy(&tmp->cond);
+ netdata_mutex_destroy(&tmp->mutex);
+ freez(tmp);
+}
+
+static void rrd_async_function_signal_when_ready(BUFFER *temp_wb __maybe_unused, int code, void *callback_data) {
+ struct rrd_function_call_wait *tmp = callback_data;
+ bool we_should_free = false;
+
+ netdata_mutex_lock(&tmp->mutex);
+
+ // since we got the mutex,
+ // the waiting thread is either in pthread_cond_timedwait()
+ // or gave up and left.
+
+ tmp->code = code;
+ tmp->data_are_ready = true;
+
+ if(tmp->free_with_signal)
+ we_should_free = true;
+
+ pthread_cond_signal(&tmp->cond);
+
+ netdata_mutex_unlock(&tmp->mutex);
+
+ if(we_should_free) {
+ buffer_free(temp_wb);
+ rrd_function_call_wait_free(tmp);
+ }
+}
+
+static void rrd_inflight_async_function_nowait_finished(BUFFER *wb, int code, void *data) {
+ struct rrd_function_inflight *r = data;
+
+ if(r->result.cb)
+ r->result.cb(wb, code, r->result.data);
+
+ rrd_inflight_function_cleanup(r->host, r->transaction);
+}
+
+static bool rrd_inflight_async_function_is_cancelled(void *data) {
+ struct rrd_function_inflight *r = data;
+ return __atomic_load_n(&r->cancelled, __ATOMIC_RELAXED);
+}
+
+static inline int rrd_call_function_async_and_dont_wait(struct rrd_function_inflight *r) {
+ struct rrd_function_execute rfe = {
+ .transaction = &r->transaction_uuid,
+ .function = r->sanitized_cmd,
+ .payload = r->payload,
+ .source = r->source,
+ .stop_monotonic_ut = &r->stop_monotonic_ut,
+ .result = {
+ .wb = r->result.wb,
+ .cb = rrd_inflight_async_function_nowait_finished,
+ .data = r,
+ },
+ .progress = {
+ .cb = r->progress.cb,
+ .data = r->progress.data,
+ },
+ .is_cancelled = {
+ .cb = rrd_inflight_async_function_is_cancelled,
+ .data = r,
+ },
+ .register_canceller = {
+ .cb = rrd_inflight_async_function_register_canceller_cb,
+ .data = r,
+ },
+ .register_progresser = {
+ .cb = rrd_inflight_async_function_register_progresser_cb,
+ .data = r,
+ },
+ };
+ int code = r->rdcf->execute_cb(&rfe, r->rdcf->execute_cb_data);
+
+ return code;
+}
+
+static int rrd_call_function_async_and_wait(struct rrd_function_inflight *r) {
+ struct rrd_function_call_wait *tmp = mallocz(sizeof(struct rrd_function_call_wait));
+ tmp->free_with_signal = false;
+ tmp->data_are_ready = false;
+ tmp->host = r->host;
+ tmp->host_function_acquired = r->host_function_acquired;
+ tmp->transaction = strdupz(r->transaction);
+ netdata_mutex_init(&tmp->mutex);
+ pthread_cond_init(&tmp->cond, NULL);
+
+ // we need a temporary BUFFER, because we may time out and the caller supplied one may vanish,
+ // so we create a new one we guarantee will survive until the collector finishes...
+
+ bool we_should_free = false;
+ BUFFER *temp_wb = buffer_create(1024, &netdata_buffers_statistics.buffers_functions); // we need it because we may give up on it
+ temp_wb->content_type = r->result.wb->content_type;
+
+ struct rrd_function_execute rfe = {
+ .transaction = &r->transaction_uuid,
+ .function = r->sanitized_cmd,
+ .payload = r->payload,
+ .source = r->source,
+ .stop_monotonic_ut = &r->stop_monotonic_ut,
+ .result = {
+ .wb = temp_wb,
+
+ // we overwrite the result callbacks,
+ // so that we can clean up the allocations made
+ .cb = rrd_async_function_signal_when_ready,
+ .data = tmp,
+ },
+ .progress = {
+ .cb = r->progress.cb,
+ .data = r->progress.data,
+ },
+ .is_cancelled = {
+ .cb = rrd_inflight_async_function_is_cancelled,
+ .data = r,
+ },
+ .register_canceller = {
+ .cb = rrd_inflight_async_function_register_canceller_cb,
+ .data = r,
+ },
+ .register_progresser = {
+ .cb = rrd_inflight_async_function_register_progresser_cb,
+ .data = r,
+ },
+ };
+ int code = r->rdcf->execute_cb(&rfe, r->rdcf->execute_cb_data);
+
+ // this has to happen after we execute the callback
+ // because if an async call is responded in sync mode, there will be a deadlock.
+ netdata_mutex_lock(&tmp->mutex);
+
+ if (code == HTTP_RESP_OK || tmp->data_are_ready) {
+ bool cancelled = false;
+ int rc = 0;
+ while (rc == 0 && !cancelled && !tmp->data_are_ready) {
+ usec_t now_mono_ut = now_monotonic_usec();
+ usec_t stop_mono_ut = __atomic_load_n(&r->stop_monotonic_ut, __ATOMIC_RELAXED) + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT;
+ if(now_mono_ut > stop_mono_ut) {
+ rc = ETIMEDOUT;
+ break;
+ }
+
+ // wait for 10ms, and loop again...
+ struct timespec tp;
+ clock_gettime(CLOCK_REALTIME, &tp);
+ tp.tv_nsec += 10 * NSEC_PER_MSEC;
+ if(tp.tv_nsec > (long)(1 * NSEC_PER_SEC)) {
+ tp.tv_sec++;
+ tp.tv_nsec -= 1 * NSEC_PER_SEC;
+ }
+
+ // the mutex is unlocked within pthread_cond_timedwait()
+ rc = pthread_cond_timedwait(&tmp->cond, &tmp->mutex, &tp);
+ // the mutex is again ours
+
+ if(rc == ETIMEDOUT) {
+ // 10ms have passed
+
+ rc = 0;
+