diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2024-01-11 16:56:45 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-11 16:56:45 +0200 |
commit | f2b250a1f53af00241522db35f8c85f19ed282e1 (patch) | |
tree | e813ca47880e3e2adf09583658efef59633ec6bd /streaming/rrdpush.c | |
parent | bead543ea52e51cf73f7e5b27de53197801399a7 (diff) |
dyncfg v2 (#16702)
* split rrdfunctions streaming and progress
* simplified internal inline functions API
* split rrdfunctions inflight management
* split rrd functions exporters
* renames
* base dyncfg structure
* config pluginsd
* intercept dyncfg function calls
* loading and saving of dyncfg metadata and data
* save metadata and payload to a single file; added code to update the plugins with jobs and saved configs
* basic working unit test
* added payload to functions execution
* removed old dyncfg code that is not needed any more
* more cleanup
* cleanup sender for functions with payload
* dyncfg functions are not exposed as functions
* remaining work to avoid indexing the \0 terminating character in dictionary keys
* added back old dyncfg plugins.d commands as noop, to allow plugins continue working
* working api; working streaming;
* updated plugins.d documentation
* aclk and http api requests share the same header parsing logic
* added source type internal
* fixed crashes
* added god mode for tests
* fixes
* fixed messages
* save host machine guids to configs
* cleaner manipulation of supported commands
* the functions event loop for external plugins can now process dyncfg requests
* unified internal and external plugins dyncfg API
* Netdata serves schema requests from /etc/netdata/schema.d and /var/lib/netdata/conf.d/schema.d
* cleanup and various fixes; fixed bug in previous dyncfg implementation on streaming that was sending the paylod in a way that allowed other streaming commands to be multiplexed
* internals go to a separate header file
* fix duplicate ACLK requests sent by aclk queue mechanism
* use fstat instead of stat
* working api
* plugin actions renamed to create and delete; dyncfg files are removed only from user actions
* prevent deadlock by using the react callback
* fix for string_strndupz()
* better dyncfg unittests
* more tests at the unittests
* properly detect dyncfg functions
* hide config functions from the UI
* tree response improvements
* send the initial update with payload
* determine tty using stdout, not stderr
* changes to statuses, cleanup and the code to bring all business logic into interception
* do not crash when the status is empty
* functions now propagate the source of the requests to plugins
* avoid warning about unused functions
* in the count at items for attention, do not count the orphan entries
* save source into dyncfg
* make the list null terminated
* fixed invalid comparison
* prevent memory leak on duplicated headers; log x-forwarded-for
* more unit tests
* added dyncfg unittests into the default unittests
* more unit tests and fixes
* more unit tests and fixes
* fix dictionary unittests
* config functions require admin access
Diffstat (limited to 'streaming/rrdpush.c')
-rw-r--r-- | streaming/rrdpush.c | 126 |
1 files changed, 5 insertions, 121 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index e74c06ef49..bc967c007d 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -302,7 +302,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { // send the chart functions if(stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS)) - rrd_functions_expose_rrdpush(st, wb); + rrd_chart_functions_expose_rrdpush(st, wb); // send the chart local custom variables rrdsetvar_print_to_streaming_custom_chart_variables(st, wb); @@ -485,40 +485,6 @@ void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) { *rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, }; } -#define dyncfg_can_push(host) (rrdhost_can_send_definitions_to_parent(host) && stream_has_capability((host)->sender, STREAM_CAP_DYNCFG)) - -// assumes job is locked and acquired!!! -void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job) { - if(!dyncfg_can_push(host)) return; - - BUFFER *wb = sender_start(host->sender); - - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plugin_name, module_name, job->name, job_status2str(job->status), job->state); - - if (job->reason && strlen(job->reason)) - buffer_sprintf(wb, " \"%s\"", job->reason); - - buffer_strcat(wb, "\n"); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); - - sender_thread_buffer_free(); - - job->dirty = 0; -} - -void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name) { - if(!dyncfg_can_push(host)) return; - - BUFFER *wb = sender_start(host->sender); - - buffer_sprintf(wb, PLUGINSD_KEYWORD_DELETE_JOB " %s %s %s\n", plugin_name, module_name, job_name); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); - - sender_thread_buffer_free(); -} - RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) { RRDHOST *host = st->rrdhost; @@ -545,7 +511,7 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) { BUFFER *wb = sender_start(host->sender); - rrd_functions_expose_global_rrdpush(host, wb); + rrd_global_functions_expose_rrdpush(host, wb, stream_has_capability(host->sender, STREAM_CAP_DYNCFG)); sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS); } @@ -605,94 +571,13 @@ void rrdpush_send_global_functions(RRDHOST *host) { BUFFER *wb = sender_start(host->sender); - rrd_functions_expose_global_rrdpush(host, wb); + rrd_global_functions_expose_rrdpush(host, wb, stream_has_capability(host->sender, STREAM_CAP_DYNCFG)); sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS); sender_thread_buffer_free(); } -void rrdpush_send_dyncfg(RRDHOST *host) { - if(!dyncfg_can_push(host)) return; - - BUFFER *wb = sender_start(host->sender); - - DICTIONARY *plugins_dict = host->configurable_plugins; - - struct configurable_plugin *plug; - dfe_start_read(plugins_dict, plug) { - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plug->name); - struct module *mod; - dfe_start_read(plug->modules, mod) { - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plug->name, mod->name, module_type2str(mod->type)); - struct job *job; - dfe_start_read(mod->jobs, job) { - pthread_mutex_lock(&job->lock); - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plug->name, mod->name, job->name, job_type2str(job->type), job->flags); - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plug->name, mod->name, job->name, job_status2str(job->status), job->state); - if (job->reason) - buffer_sprintf(wb, " \"%s\"", job->reason); - buffer_sprintf(wb, "\n"); - job->dirty = 0; - pthread_mutex_unlock(&job->lock); - } dfe_done(job); - } dfe_done(mod); - } - dfe_done(plug); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); - - sender_thread_buffer_free(); -} - -void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name) { - if(!dyncfg_can_push(host)) return; - - BUFFER *wb = sender_start(host->sender); - - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plugin_name); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); - - sender_thread_buffer_free(); -} - -void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type) { - if(!dyncfg_can_push(host)) return; - - BUFFER *wb = sender_start(host->sender); - - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plugin_name, module_name, module_type2str(type)); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); - - sender_thread_buffer_free(); -} - -void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags) { - if(!dyncfg_can_push(host)) return; - - BUFFER *wb = sender_start(host->sender); - - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plugin_name, module_name, job_name, job_type2str(type), flags); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); - - sender_thread_buffer_free(); -} - -void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name) { - if(!dyncfg_can_push(host)) return; - - BUFFER *wb = sender_start(host->sender); - - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_RESET " %s\n", plugin_name); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); - - sender_thread_buffer_free(); -} - void rrdpush_send_claimed_id(RRDHOST *host) { if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM)) return; @@ -1486,11 +1371,10 @@ STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) { STREAM_CAP_REPLICATION | STREAM_CAP_BINARY | STREAM_CAP_INTERPOLATED | - STREAM_CAP_SLOTS | STREAM_CAP_PROGRESS | + STREAM_CAP_SLOTS | + STREAM_CAP_PROGRESS | STREAM_CAP_COMPRESSIONS_AVAILABLE | - #ifdef NETDATA_TEST_DYNCFG STREAM_CAP_DYNCFG | - #endif STREAM_CAP_IEEE754 | STREAM_CAP_DATA_WITH_ML | 0) & ~disabled_capabilities; |