diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2024-01-11 16:56:45 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-11 16:56:45 +0200 |
commit | f2b250a1f53af00241522db35f8c85f19ed282e1 (patch) | |
tree | e813ca47880e3e2adf09583658efef59633ec6bd /database | |
parent | bead543ea52e51cf73f7e5b27de53197801399a7 (diff) |
dyncfg v2 (#16702)
* split rrdfunctions streaming and progress
* simplified internal inline functions API
* split rrdfunctions inflight management
* split rrd functions exporters
* renames
* base dyncfg structure
* config pluginsd
* intercept dyncfg function calls
* loading and saving of dyncfg metadata and data
* save metadata and payload to a single file; added code to update the plugins with jobs and saved configs
* basic working unit test
* added payload to functions execution
* removed old dyncfg code that is not needed any more
* more cleanup
* cleanup sender for functions with payload
* dyncfg functions are not exposed as functions
* remaining work to avoid indexing the \0 terminating character in dictionary keys
* added back old dyncfg plugins.d commands as noop, to allow plugins continue working
* working api; working streaming;
* updated plugins.d documentation
* aclk and http api requests share the same header parsing logic
* added source type internal
* fixed crashes
* added god mode for tests
* fixes
* fixed messages
* save host machine guids to configs
* cleaner manipulation of supported commands
* the functions event loop for external plugins can now process dyncfg requests
* unified internal and external plugins dyncfg API
* Netdata serves schema requests from /etc/netdata/schema.d and /var/lib/netdata/conf.d/schema.d
* cleanup and various fixes; fixed bug in previous dyncfg implementation on streaming that was sending the paylod in a way that allowed other streaming commands to be multiplexed
* internals go to a separate header file
* fix duplicate ACLK requests sent by aclk queue mechanism
* use fstat instead of stat
* working api
* plugin actions renamed to create and delete; dyncfg files are removed only from user actions
* prevent deadlock by using the react callback
* fix for string_strndupz()
* better dyncfg unittests
* more tests at the unittests
* properly detect dyncfg functions
* hide config functions from the UI
* tree response improvements
* send the initial update with payload
* determine tty using stdout, not stderr
* changes to statuses, cleanup and the code to bring all business logic into interception
* do not crash when the status is empty
* functions now propagate the source of the requests to plugins
* avoid warning about unused functions
* in the count at items for attention, do not count the orphan entries
* save source into dyncfg
* make the list null terminated
* fixed invalid comparison
* prevent memory leak on duplicated headers; log x-forwarded-for
* more unit tests
* added dyncfg unittests into the default unittests
* more unit tests and fixes
* more unit tests and fixes
* fix dictionary unittests
* config functions require admin access
Diffstat (limited to 'database')
26 files changed, 1753 insertions, 1497 deletions
diff --git a/database/contexts/api_v2.c b/database/contexts/api_v2.c index d8714f73f1..3d2a954547 100644 --- a/database/contexts/api_v2.c +++ b/database/contexts/api_v2.c @@ -729,6 +729,15 @@ static void agent_capabilities_to_json(BUFFER *wb, RRDHOST *host, const char *ke freez(capas); } +static inline void host_dyncfg_to_json_v2(BUFFER *wb, const char *key, RRDHOST_STATUS *s) { + buffer_json_member_add_object(wb, key); + { + buffer_json_member_add_string(wb, "status", rrdhost_dyncfg_status_to_string(s->dyncfg.status)); + } + buffer_json_object_close(wb); // health + +} + static inline void rrdhost_health_to_json_v2(BUFFER *wb, const char *key, RRDHOST_STATUS *s) { buffer_json_member_add_object(wb, key); { @@ -841,6 +850,8 @@ static void rrdcontext_to_json_v2_rrdhost(BUFFER *wb, RRDHOST *host, struct rrdc host_functions2json(host, wb); // functions agent_capabilities_to_json(wb, host, "capabilities"); + + host_dyncfg_to_json_v2(wb, "dyncfg", &s); } buffer_json_object_close(wb); // this instance buffer_json_array_close(wb); // instances @@ -917,7 +928,7 @@ static ssize_t rrdcontext_to_json_v2_add_host(void *data, RRDHOST *host, bool qu .node_ids = &ctl->nodes.ni, .help = NULL, .tags = NULL, - .access = HTTP_ACCESS_MEMBERS, + .access = HTTP_ACCESS_MEMBER, .priority = RRDFUNCTIONS_PRIORITY_DEFAULT, }; host_functions_to_dict(host, ctl->functions.dict, &t, sizeof(t), &t.help, &t.tags, &t.access, &t.priority); diff --git a/database/rrd.h b/database/rrd.h index 63bafc6f8b..b63a7e8340 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -1358,8 +1358,6 @@ struct rrdhost { netdata_mutex_t aclk_state_lock; aclk_rrdhost_state aclk_state; - DICTIONARY *configurable_plugins; // configurable plugins for this host - struct rrdhost *next; struct rrdhost *prev; }; diff --git a/database/rrdcalc.c b/database/rrdcalc.c index 199d908030..2fabcb54c5 100644 --- a/database/rrdcalc.c +++ b/database/rrdcalc.c @@ -190,11 +190,11 @@ const RRDCALC_ACQUIRED *rrdcalc_from_rrdset_get(RRDSET *st, const char *alert_na char key[RRDCALC_MAX_KEY_SIZE + 1]; size_t key_len = rrdcalc_key(key, RRDCALC_MAX_KEY_SIZE, rrdset_id(st), alert_name); - const RRDCALC_ACQUIRED *rca = (const RRDCALC_ACQUIRED *)dictionary_get_and_acquire_item_advanced(st->rrdhost->rrdcalc_root_index, key, (ssize_t)(key_len + 1)); + const RRDCALC_ACQUIRED *rca = (const RRDCALC_ACQUIRED *)dictionary_get_and_acquire_item_advanced(st->rrdhost->rrdcalc_root_index, key, (ssize_t)key_len); if(!rca) { key_len = rrdcalc_key(key, RRDCALC_MAX_KEY_SIZE, rrdset_name(st), alert_name); - rca = (const RRDCALC_ACQUIRED *)dictionary_get_and_acquire_item_advanced(st->rrdhost->rrdcalc_root_index, key, (ssize_t)(key_len + 1)); + rca = (const RRDCALC_ACQUIRED *)dictionary_get_and_acquire_item_advanced(st->rrdhost->rrdcalc_root_index, key, (ssize_t)key_len); } return rca; @@ -727,7 +727,7 @@ void rrdcalc_add_from_rrdcalctemplate(RRDHOST *host, RRDCALCTEMPLATE *rt, RRDSET .existing_from_template = false, }; - dictionary_set_advanced(host->rrdcalc_root_index, key, (ssize_t)(key_len + 1), NULL, sizeof(RRDCALC), &tmp); + dictionary_set_advanced(host->rrdcalc_root_index, key, (ssize_t)key_len, NULL, sizeof(RRDCALC), &tmp); if(tmp.react_action != RRDCALC_REACT_NEW && tmp.existing_from_template == false) netdata_log_error("RRDCALC: from template '%s' on chart '%s' with key '%s', failed to be added to host '%s'. It is manually configured.", string2str(rt->name), rrdset_id(st), key, rrdhost_hostname(host)); @@ -761,7 +761,7 @@ int rrdcalc_add_from_config(RRDHOST *host, RRDCALC *rc) { }; int ret = 1; - RRDCALC *t = dictionary_set_advanced(host->rrdcalc_root_index, key, (ssize_t)(key_len + 1), rc, sizeof(RRDCALC), &tmp); + RRDCALC *t = dictionary_set_advanced(host->rrdcalc_root_index, key, (ssize_t)key_len, rc, sizeof(RRDCALC), &tmp); if(tmp.react_action == RRDCALC_REACT_NEW) { // we copied rc into the dictionary, so we have to free the container here freez(rc); @@ -795,7 +795,7 @@ static void rrdcalc_unlink_and_delete(RRDHOST *host, RRDCALC *rc, bool having_ll if(rc->rrdset) rrdcalc_unlink_from_rrdset(rc, having_ll_wrlock); - dictionary_del_advanced(host->rrdcalc_root_index, string2str(rc->key), (ssize_t)string_strlen(rc->key) + 1); + dictionary_del_advanced(host->rrdcalc_root_index, string2str(rc->key), (ssize_t)string_strlen(rc->key)); } diff --git a/database/rrdcalctemplate.c b/database/rrdcalctemplate.c index f0e5da80bf..27971857bc 100644 --- a/database/rrdcalctemplate.c +++ b/database/rrdcalctemplate.c @@ -231,7 +231,7 @@ void rrdcalctemplate_add_from_config(RRDHOST *host, RRDCALCTEMPLATE *rt) { size_t key_len = snprintfz(key, RRDCALCTEMPLATE_MAX_KEY_SIZE, "%s", rrdcalctemplate_name(rt)); bool added = false; - dictionary_set_advanced(host->rrdcalctemplate_root_index, key, (ssize_t)(key_len + 1), rt, sizeof(*rt), &added); + dictionary_set_advanced(host->rrdcalctemplate_root_index, key, (ssize_t)key_len, rt, sizeof(*rt), &added); if(added) freez(rt); diff --git a/database/rrdcollector-internals.h b/database/rrdcollector-internals.h new file mode 100644 index 0000000000..d63ef6a76b --- /dev/null +++ b/database/rrdcollector-internals.h @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_RRDCOLLECTOR_INTERNALS_H +#define NETDATA_RRDCOLLECTOR_INTERNALS_H + +#include "rrd.h" + +struct rrd_collector; +struct rrd_collector *rrd_collector_acquire_current_thread(void); +void rrd_collector_release(struct rrd_collector *rdc); +extern __thread struct rrd_collector *thread_rrd_collector; +bool rrd_collector_running(struct rrd_collector *rdc); +pid_t rrd_collector_tid(struct rrd_collector *rdc); +bool rrd_collector_dispatcher_acquire(struct rrd_collector *rdc); +void rrd_collector_dispatcher_release(struct rrd_collector *rdc); + +#endif //NETDATA_RRDCOLLECTOR_INTERNALS_H diff --git a/database/rrdcollector.c b/database/rrdcollector.c index b776433c75..1a116c0c2e 100644 --- a/database/rrdcollector.c +++ b/database/rrdcollector.c @@ -1,7 +1,7 @@ // SPDX-License-Identifier: GPL-3.0-or-later -#define NETDATA_RRDCOLLECTOR_INTERNALS #include "rrdcollector.h" +#include "rrdcollector-internals.h" // Each function points to this collector structure // so that when the collector exits, all of them will diff --git a/database/rrdcollector.h b/database/rrdcollector.h index aeab1de52e..f1bbcbb97b 100644 --- a/database/rrdcollector.h +++ b/database/rrdcollector.h @@ -5,22 +5,6 @@ #include "rrd.h" -#ifdef NETDATA_RRDCOLLECTOR_INTERNALS - -// ---------------------------------------------------------------------------- -// private API - -struct rrd_collector; -struct rrd_collector *rrd_collector_acquire_current_thread(void); -void rrd_collector_release(struct rrd_collector *rdc); -extern __thread struct rrd_collector *thread_rrd_collector; -bool rrd_collector_running(struct rrd_collector *rdc); -pid_t rrd_collector_tid(struct rrd_collector *rdc); -bool rrd_collector_dispatcher_acquire(struct rrd_collector *rdc); -void rrd_collector_dispatcher_release(struct rrd_collector *rdc); - -#endif // NETDATA_RRDCOLLECTOR_INTERNALS - // ---------------------------------------------------------------------------- // public API diff --git a/database/rrddimvar.c b/database/rrddimvar.c index 5035d70a55..63d1c84046 100644 --- a/database/rrddimvar.c +++ b/database/rrddimvar.c @@ -243,7 +243,7 @@ void rrddimvar_add_and_leave_released(RRDDIM *rd, RRDVAR_TYPE type, const char * .value = value, .rrddim = rd }; - dictionary_set_advanced(rd->rrdset->rrddimvar_root_index, key, (ssize_t)(key_len + 1), NULL, sizeof(RRDDIMVAR), &tmp); + dictionary_set_advanced(rd->rrdset->rrddimvar_root_index, key, (ssize_t)key_len, NULL, sizeof(RRDDIMVAR), &tmp); } void rrddimvar_rename_all(RRDDIM *rd) { diff --git a/database/rrdfunctions-exporters.c b/database/rrdfunctions-exporters.c new file mode 100644 index 0000000000..bed17ed41a --- /dev/null +++ b/database/rrdfunctions-exporters.c @@ -0,0 +1,164 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#define NETDATA_RRD_INTERNALS + +#include "rrdfunctions-internals.h" +#include "rrdfunctions-exporters.h" + +void rrd_chart_functions_expose_rrdpush(RRDSET *st, BUFFER *wb) { + if(!st->functions_view) + return; + + struct rrd_host_function *t; + dfe_start_read(st->functions_view, t) { + if(t->options & RRD_FUNCTION_DYNCFG) continue; + + buffer_sprintf(wb + , PLUGINSD_KEYWORD_FUNCTION " \"%s\" %d \"%s\" \"%s\" \"%s\" %d\n" + , t_dfe.name + , t->timeout + , string2str(t->help) + , string2str(t->tags) + , http_id2access(t->access) + , + t->priority + ); + } + dfe_done(t); +} + +void rrd_global_functions_expose_rrdpush(RRDHOST *host, BUFFER *wb, bool dyncfg) { + rrdhost_flag_clear(host, RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED); + + size_t configs = 0; + + struct rrd_host_function *tmp; + dfe_start_read(host->functions, tmp) { + if(tmp->options & RRD_FUNCTION_LOCAL) continue; + if(tmp->options & RRD_FUNCTION_DYNCFG) { + // we should not send dyncfg to this parent + configs++; + continue; + } + + buffer_sprintf(wb + , PLUGINSD_KEYWORD_FUNCTION " GLOBAL \"%s\" %d \"%s\" \"%s\" \"%s\" %d\n" + , tmp_dfe.name + , tmp->timeout + , string2str(tmp->help) + , string2str(tmp->tags) + , http_id2access(tmp->access) + , tmp->priority + ); + } + dfe_done(tmp); + + if(dyncfg && configs) + dyncfg_add_streaming(wb); +} + +static void functions2json(DICTIONARY *functions, BUFFER *wb) { + struct rrd_host_function *t; + dfe_start_read(functions, t) { + if (!rrd_collector_running(t->collector)) continue; + if(t->options & RRD_FUNCTION_DYNCFG) continue; + + buffer_json_member_add_object(wb, t_dfe.name); + { + buffer_json_member_add_string_or_empty(wb, "help", string2str(t->help)); + buffer_json_member_add_int64(wb, "timeout", (int64_t) t->timeout); + + char options[65]; + snprintfz( + options, 64 + , "%s%s" + , (t->options & RRD_FUNCTION_LOCAL) ? "LOCAL " : "" + , (t->options & RRD_FUNCTION_GLOBAL) ? "GLOBAL" : "" + ); + + buffer_json_member_add_string_or_empty(wb, "options", options); + buffer_json_member_add_string_or_empty(wb, "tags", string2str(t->tags)); + buffer_json_member_add_string(wb, "access", http_id2access(t->access)); + buffer_json_member_add_uint64(wb, "priority", t->priority); + } + buffer_json_object_close(wb); + } + dfe_done(t); +} + +void chart_functions2json(RRDSET *st, BUFFER *wb) { + if(!st || !st->functions_view) return; + + functions2json(st->functions_view, wb); +} + +void host_functions2json(RRDHOST *host, BUFFER *wb) { + if(!host || !host->functions) return; + + buffer_json_member_add_object(wb, "functions"); + + struct rrd_host_function *t; + dfe_start_read(host->functions, t) { + if(!rrd_collector_running(t->collector)) continue; + if(t->options & RRD_FUNCTION_DYNCFG) continue; + + buffer_json_member_add_object(wb, t_dfe.name); + { + buffer_json_member_add_string(wb, "help", string2str(t->help)); + buffer_json_member_add_int64(wb, "timeout", t->timeout); + buffer_json_member_add_array(wb, "options"); + { + if (t->options & RRD_FUNCTION_GLOBAL) + buffer_json_add_array_item_string(wb, "GLOBAL"); + if (t->options & RRD_FUNCTION_LOCAL) + buffer_json_add_array_item_string(wb, "LOCAL"); + } + buffer_json_array_close(wb); + buffer_json_member_add_string(wb, "tags", string2str(t->tags)); + buffer_json_member_add_string(wb, "access", http_id2access(t->access)); + buffer_json_member_add_uint64(wb, "priority", t->priority); + } + buffer_json_object_close(wb); + } + dfe_done(t); + + buffer_json_object_close(wb); +} + +void chart_functions_to_dict(DICTIONARY *rrdset_functions_view, DICTIONARY *dst, void *value, size_t value_size) { + if(!rrdset_functions_view || !dst) return; + + struct rrd_host_function *t; + dfe_start_read(rrdset_functions_view, t) { + if(!rrd_collector_running(t->collector)) continue; + if(t->options & RRD_FUNCTION_DYNCFG) continue; + + dictionary_set(dst, t_dfe.name, value, value_size); + } + dfe_done(t); +} + +void host_functions_to_dict(RRDHOST *host, DICTIONARY *dst, void *value, size_t value_size, STRING **help, STRING **tags, HTTP_ACCESS *access, int *priority) { + if(!host || !host->functions || !dictionary_entries(host->functions) || !dst) return; + + struct rrd_host_function *t; + dfe_start_read(host->functions, t) { + if(!rrd_collector_running(t->collector)) continue; + if(t->options & RRD_FUNCTION_DYNCFG) continue; + + if(help) + *help = t->help; + + if(tags) + *tags = t->tags; + + if(access) + *access = t->access; + + if(priority) + *priority = t->priority; + + dictionary_set(dst, t_dfe.name, value, value_size); + } + dfe_done(t); +} diff --git a/database/rrdfunctions-exporters.h b/database/rrdfunctions-exporters.h new file mode 100644 index 0000000000..9fb525cd98 --- /dev/null +++ b/database/rrdfunctions-exporters.h @@ -0,0 +1,16 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_RRDFUNCTIONS_EXPORTERS_H +#define NETDATA_RRDFUNCTIONS_EXPORTERS_H + +#include "rrd.h" + +void rrd_chart_functions_expose_rrdpush(RRDSET *st, BUFFER *wb); +void rrd_global_functions_expose_rrdpush(RRDHOST *host, BUFFER *wb, bool dyncfg); + +void chart_functions2json(RRDSET *st, BUFFER *wb); +void chart_functions_to_dict(DICTIONARY *rrdset_functions_view, DICTIONARY *dst, void *value, size_t value_size); +void host_functions_to_dict(RRDHOST *host, DICTIONARY *dst, void *value, size_t value_size, STRING **help, STRING **tags, HTTP_ACCESS *access, int *priority); +void host_functions2json(RRDHOST *host, BUFFER *wb); + +#endif //NETDATA_RRDFUNCTIONS_EXPORTERS_H diff --git a/database/rrdfunctions-inflight.c b/database/rrdfunctions-inflight.c new file mode 100644 index 0000000000..a318c0d317 --- /dev/null +++ b/database/rrdfunctions-inflight.c @@ -0,0 +1,641 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#define NETDATA_RRD_INTERNALS + +#include "rrdcollector-internals.h" +#include "rrdfunctions-internals.h" +#include "rrdfunctions-inflight.h" + +struct rrd_function_inflight { + bool used; + + RRDHOST *host; + uuid_t transaction_uuid; + const char *transaction; + const char *cmd; + const char *sanitized_cmd; + const char *source; + size_t sanitized_cmd_length; + int timeout; + bool cancelled; + usec_t stop_monotonic_ut; + + BUFFER *payload; + + const DICTIONARY_ITEM *host_function_acquired; + + // the collector + // we acquire this structure at the beginning, + // and we release it at the end + struct rrd_host_function *rdcf; + + struct { + BUFFER *wb; + + // in async mode, + // the function to call to send the result back + rrd_function_result_callback_t cb; + void *data; + } result; + + struct { + // to be called in sync mode + // while the function is running + // to check if the function has been canceled + rrd_function_is_cancelled_cb_t cb; + void *data; + } is_cancelled; + + struct { + // to be registered by the function itself + // used to signal the function to cancel + rrd_function_cancel_cb_t cb; + void *data; + } canceller; + + struct { + // callback to receive progress reports from function + rrd_function_progress_cb_t cb; + void *data; + } progress; + + struct { + // to be registered by the function itself + // used to send progress requests to function + rrd_function_progresser_cb_t cb; + void *data; + } progresser; +}; + +static DICTIONARY *rrd_functions_inflight_requests = NULL; + +static void rrd_function_cancel_inflight(struct rrd_function_inflight *r); + +// ---------------------------------------------------------------------------- + +static void rrd_functions_inflight_cleanup(struct rrd_function_inflight *r) { + buffer_free(r->payload); + freez((void *)r->transaction); + freez((void *)r->cmd); + freez((void *)r->sanitized_cmd); + freez((void *)r->source); + + r->payload = NULL; + r->transaction = NULL; + r->cmd = NULL; + r->sanitized_cmd = NULL; +} + +static void rrd_functions_inflight_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) { + struct rrd_function_inflight *r = value; + + // internal_error(true, "FUNCTIONS: transaction '%s' finished", r->transaction); + + rrd_functions_inflight_cleanup(r); + dictionary_acquired_item_release(r->host->functions, r->host_function_acquired); +} + +void rrd_functions_inflight_init(void) { + if(rrd_functions_inflight_requests) + return; + + rrd_functions_inflight_requests = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, NULL, sizeof(struct rrd_function_inflight)); + + dictionary_register_delete_callback(rrd_functions_inflight_requests, rrd_functions_inflight_delete_cb, NULL); +} + +void rrd_functions_inflight_destroy(void) { + if(!rrd_functions_inflight_requests) + return; + + dictionary_destroy(rrd_functions_inflight_requests); + rrd_functions_inflight_requests = NULL; +} + +static void rrd_inflight_async_function_register_canceller_cb(void *register_canceller_cb_data, rrd_function_cancel_cb_t canceller_cb, void *canceller_cb_data) { + struct rrd_function_inflight *r = register_canceller_cb_data; + r->canceller.cb = canceller_cb; + r->canceller.data = canceller_cb_data; +} + +static void rrd_inflight_async_function_register_progresser_cb(void *register_progresser_cb_data, rrd_function_progresser_cb_t progresser_cb, void *progresser_cb_data) { + struct rrd_function_inflight *r = register_progresser_cb_data; + r->progresser.cb = progresser_cb; + r->progresser.data = progresser_cb_data; +} + +// ---------------------------------------------------------------------------- +// waiting for async function completion + +struct rrd_function_call_wait { + RRDHOST *host; + const DICTIONARY_ITEM *host_function_acquired; + char *transaction; + + bool free_with_signal; + bool data_are_ready; + netdata_mutex_t mutex; + pthread_cond_t cond; + int code; +}; + +static void rrd_inflight_function_cleanup(RRDHOST *host __maybe_unused, const char *transaction) { + dictionary_del(rrd_functions_inflight_requests, transaction); + dictionary_garbage_collect(rrd_functions_inflight_requests); +} + +static void rrd_function_call_wait_free(struct rrd_function_call_wait *tmp) { + rrd_inflight_function_cleanup(tmp->host, tmp->transaction); + freez(tmp->transaction); + + pthread_cond_destroy(&tmp->cond); + netdata_mutex_destroy(&tmp->mutex); + freez(tmp); +} + +static void rrd_async_function_signal_when_ready(BUFFER *temp_wb __maybe_unused, int code, void *callback_data) { + struct rrd_function_call_wait *tmp = callback_data; + bool we_should_free = false; + + netdata_mutex_lock(&tmp->mutex); + + // since we got the mutex, + // the waiting thread is either in pthread_cond_timedwait() + // or gave up and left. + + tmp->code = code; + tmp->data_are_ready = true; + + if(tmp->free_with_signal) + we_should_free = true; + + pthread_cond_signal(&tmp->cond); + + netdata_mutex_unlock(&tmp->mutex); + + if(we_should_free) { + buffer_free(temp_wb); + rrd_function_call_wait_free(tmp); + } +} + +static void rrd_inflight_async_function_nowait_finished(BUFFER *wb, int code, void *data) { + struct rrd_function_inflight *r = data; + + if(r->result.cb) + r->result.cb(wb, code, r->result.data); + + rrd_inflight_function_cleanup(r->host, r->transaction); +} + +static bool rrd_inflight_async_function_is_cancelled(void *data) { + struct rrd_function_inflight *r = data; + return __atomic_load_n(&r->cancelled, __ATOMIC_RELAXED); +} + +static inline int rrd_call_function_async_and_dont_wait(struct rrd_function_inflight *r) { + struct rrd_function_execute rfe = { + .transaction = &r->transaction_uuid, + .function = r->sanitized_cmd, + .payload = r->payload, + .source = r->source, + .stop_monotonic_ut = &r->stop_monotonic_ut, + .result = { + .wb = r->result.wb, + .cb = rrd_inflight_async_function_nowait_finished, + .data = r, + }, + .progress = { + .cb = r->progress.cb, + .data = r->progress.data, + }, + .is_cancelled = { + .cb = rrd_inflight_async_function_is_cancelled, + .data = r, + }, + .register_canceller = { + .cb = rrd_inflight_async_function_register_canceller_cb, + .data = r, + }, + .register_progresser = { + .cb = rrd_inflight_async_function_register_progresser_cb, + .data = r, + }, + }; + int code = r->rdcf->execute_cb(&rfe, r->rdcf->execute_cb_data); + + return code; +} + +static int rrd_call_function_async_and_wait(struct rrd_function_inflight *r) { + struct rrd_function_call_wait *tmp = mallocz(sizeof(struct rrd_function_call_wait)); + tmp->free_with_signal = false; + tmp->data_are_ready = false; + tmp->host = r->host; + tmp->host_function_acquired = r->host_function_acquired; + tmp->transaction = strdupz(r->transaction); + netdata_mutex_init(&tmp->mutex); + pthread_cond_init(&tmp->cond, NULL); + + // we need a temporary BUFFER, because we may time out and the caller supplied one may vanish, + // so we create a new one we guarantee will survive until the collector finishes... + + bool we_should_free = false; + BUFFER *temp_wb = buffer_create(1024, &netdata_buffers_statistics.buffers_functions); // we need it because we may give up on it + temp_wb->content_type = r->result.wb->content_type; + + struct rrd_function_execute rfe = { + .transaction = &r->transaction_uuid, + .function = r->sanitized_cmd, + .payload = r->payload, + .source = r->source, + .stop_monotonic_ut = &r->stop_monotonic_ut, + .result = { + .wb = temp_wb, + + // we overwrite the result callbacks, + // so that we can clean up the allocations made + .cb = rrd_async_function_signal_when_ready, + .data = tmp, + }, + .progress = { + .cb = r->progress.cb, + .data = r->progress.data, + }, + .is_cancelled = { + .cb = rrd_inflight_async_function_is_cancelled, + .data = r, + }, + .register_canceller = { + .cb = rrd_inflight_async_function_register_canceller_cb, + .data = r, + }, + .register_progresser = { + .cb = rrd_inflight_async_function_register_progresser_cb, + .data = r, + }, + }; + int code = r->rdcf->execute_cb(&rfe, r->rdcf->execute_cb_data); + + // this has to happen after we execute the callback + // because if an async call is responded in sync mode, there will be a deadlock. + netdata_mutex_lock(&tmp->mutex); + + if (code == HTTP_RESP_OK || tmp->data_are_ready) { + bool cancelled = false; + int rc = 0; + while (rc == 0 && !cancelled && !tmp->data_are_ready) { + usec_t now_mono_ut = now_monotonic_usec(); + usec_t stop_mono_ut = __atomic_load_n(&r->stop_monotonic_ut, __ATOMIC_RELAXED) + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT; + if(now_mono_ut > stop_mono_ut) { + rc = ETIMEDOUT; + break; + } + + // wait for 10ms, and loop again... + struct timespec tp; + clock_gettime(CLOCK_REALTIME, &tp); + tp.tv_nsec += 10 * NSEC_PER_MSEC; + if(tp.tv_nsec > (long)(1 * NSEC_PER_SEC)) { + tp.tv_sec++; + tp.tv_nsec -= 1 * NSEC_PER_SEC; + } + + // the mutex is unlocked within pthread_cond_timedwait() + rc = pthread_cond_timedwait(&tmp->cond, &tmp->mutex, &tp); + // the mutex is again ours + + if(rc == ETIMEDOUT) { + // 10ms have passed + + rc = 0; + |