summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/rrdpush.c')
-rw-r--r--streaming/rrdpush.c126
1 files changed, 5 insertions, 121 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index e74c06ef49..bc967c007d 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -302,7 +302,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
// send the chart functions
if(stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS))
- rrd_functions_expose_rrdpush(st, wb);
+ rrd_chart_functions_expose_rrdpush(st, wb);
// send the chart local custom variables
rrdsetvar_print_to_streaming_custom_chart_variables(st, wb);
@@ -485,40 +485,6 @@ void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
*rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, };
}
-#define dyncfg_can_push(host) (rrdhost_can_send_definitions_to_parent(host) && stream_has_capability((host)->sender, STREAM_CAP_DYNCFG))
-
-// assumes job is locked and acquired!!!
-void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job) {
- if(!dyncfg_can_push(host)) return;
-
- BUFFER *wb = sender_start(host->sender);
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plugin_name, module_name, job->name, job_status2str(job->status), job->state);
-
- if (job->reason && strlen(job->reason))
- buffer_sprintf(wb, " \"%s\"", job->reason);
-
- buffer_strcat(wb, "\n");
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
-
- sender_thread_buffer_free();
-
- job->dirty = 0;
-}
-
-void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name) {
- if(!dyncfg_can_push(host)) return;
-
- BUFFER *wb = sender_start(host->sender);
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DELETE_JOB " %s %s %s\n", plugin_name, module_name, job_name);
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
-
- sender_thread_buffer_free();
-}
-
RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) {
RRDHOST *host = st->rrdhost;
@@ -545,7 +511,7 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock
if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) {
BUFFER *wb = sender_start(host->sender);
- rrd_functions_expose_global_rrdpush(host, wb);
+ rrd_global_functions_expose_rrdpush(host, wb, stream_has_capability(host->sender, STREAM_CAP_DYNCFG));
sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
}
@@ -605,94 +571,13 @@ void rrdpush_send_global_functions(RRDHOST *host) {
BUFFER *wb = sender_start(host->sender);
- rrd_functions_expose_global_rrdpush(host, wb);
+ rrd_global_functions_expose_rrdpush(host, wb, stream_has_capability(host->sender, STREAM_CAP_DYNCFG));
sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
sender_thread_buffer_free();
}
-void rrdpush_send_dyncfg(RRDHOST *host) {
- if(!dyncfg_can_push(host)) return;
-
- BUFFER *wb = sender_start(host->sender);
-
- DICTIONARY *plugins_dict = host->configurable_plugins;
-
- struct configurable_plugin *plug;
- dfe_start_read(plugins_dict, plug) {
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plug->name);
- struct module *mod;
- dfe_start_read(plug->modules, mod) {
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plug->name, mod->name, module_type2str(mod->type));
- struct job *job;
- dfe_start_read(mod->jobs, job) {
- pthread_mutex_lock(&job->lock);
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plug->name, mod->name, job->name, job_type2str(job->type), job->flags);
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plug->name, mod->name, job->name, job_status2str(job->status), job->state);
- if (job->reason)
- buffer_sprintf(wb, " \"%s\"", job->reason);
- buffer_sprintf(wb, "\n");
- job->dirty = 0;
- pthread_mutex_unlock(&job->lock);
- } dfe_done(job);
- } dfe_done(mod);
- }
- dfe_done(plug);
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
-
- sender_thread_buffer_free();
-}
-
-void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name) {
- if(!dyncfg_can_push(host)) return;
-
- BUFFER *wb = sender_start(host->sender);
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plugin_name);
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
-
- sender_thread_buffer_free();
-}
-
-void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type) {
- if(!dyncfg_can_push(host)) return;
-
- BUFFER *wb = sender_start(host->sender);
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plugin_name, module_name, module_type2str(type));
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
-
- sender_thread_buffer_free();
-}
-
-void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags) {
- if(!dyncfg_can_push(host)) return;
-
- BUFFER *wb = sender_start(host->sender);
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plugin_name, module_name, job_name, job_type2str(type), flags);
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
-
- sender_thread_buffer_free();
-}
-
-void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name) {
- if(!dyncfg_can_push(host)) return;
-
- BUFFER *wb = sender_start(host->sender);
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_RESET " %s\n", plugin_name);
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
-
- sender_thread_buffer_free();
-}
-
void rrdpush_send_claimed_id(RRDHOST *host) {
if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM))
return;
@@ -1486,11 +1371,10 @@ STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) {
STREAM_CAP_REPLICATION |
STREAM_CAP_BINARY |
STREAM_CAP_INTERPOLATED |
- STREAM_CAP_SLOTS | STREAM_CAP_PROGRESS |
+ STREAM_CAP_SLOTS |
+ STREAM_CAP_PROGRESS |
STREAM_CAP_COMPRESSIONS_AVAILABLE |
- #ifdef NETDATA_TEST_DYNCFG
STREAM_CAP_DYNCFG |
- #endif
STREAM_CAP_IEEE754 |
STREAM_CAP_DATA_WITH_ML |
0) & ~disabled_capabilities;