diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2023-09-29 17:13:42 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-29 17:13:42 +0200 |
commit | 6dfc99a2e0cf893c9ac2fd56d7986982738a9579 (patch) | |
tree | d5fb21199d99cc681641d7f6f8f33a7f083f435d /streaming | |
parent | dd17442f81ca4bbb97c5b93c7a5525ce4946fbb1 (diff) |
Dyncfg add streaming support (#15791)
* dyncfg fncnames as constants
* add helper macros to know parser streaming/plugin
* plugins dictionary per RRDHOST
* api_request_v2_config add support for /host/
* streamify pluginsd_register_plugin
* streamify pluginsd_register_module
* streamify report_job_status
* streamify dyncfg get functions
* module_type2str
* add job type and flags
* add DYNCFG_REGISTER_JOB
* implement register job
* push all to parent at startup
* add helper function is_dyncfg_function
* forward virtual functions trough streaming
* separate job2json
* add api/v2/job_statuses
* do cleanup on streaming
* streamify set functions
* support FUNCTION_PAYLOAD trough streaming
* WIP tests
* dont attempt loading non-localhost configs
* move cfg persistence to proper place
* prevent race
* properly update job state at runtime
* cleanup 1
* job2json add missing reason
* add tests
* correct HTTP code
* add test
* streamify delete_job_cb
* add DELETE_JOB keyword
* job delete over streaming
* add tests for create and delete job over parent
* rrdpush common checks to macro
* add missing forwarders
* fix jobs according to test results
* more tests
* review comment 1
* codacy remove valid warning
* codacy ruby fixes
* fix wrong rc check
* minimal test plugin for child
* add test
* dict walk insted of master lock
* minor - english spelling fixes
* thiago comments 1
* minor - rename folder to dynconf
* enable only when built with -DNETDATA_TEST_DYNCFG
* minor - compiler warning
* create dir post daemonization
* stricter URL check
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/rrdpush.c | 117 | ||||
-rw-r--r-- | streaming/rrdpush.h | 19 | ||||
-rw-r--r-- | streaming/sender.c | 59 |
3 files changed, 190 insertions, 5 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index df7db6ed8f..19519d187d 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -96,6 +96,9 @@ STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) { STREAM_CAP_BINARY | STREAM_CAP_INTERPOLATED | STREAM_HAS_COMPRESSION | +#ifdef NETDATA_TEST_DYNCFG + STREAM_CAP_DYNCFG | +#endif (ieee754_doubles ? STREAM_CAP_IEEE754 : 0) | (ml_capability ? STREAM_CAP_DATA_WITH_ML : 0) | 0; @@ -465,6 +468,47 @@ void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) { *rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, }; } +// TODO enable this macro before release +#define bail_if_no_cap(cap) \ + if(unlikely(!stream_has_capability(host->sender, cap))) { \ + netdata_log_error("STREAM %s [send]: cannot send job status update - parent does not support it.", rrdhost_hostname(host)); \ + return; \ + } + +#define dyncfg_check_can_push(host) \ + if(unlikely(!rrdhost_can_send_definitions_to_parent(host))) \ + return; \ + bail_if_no_cap(STREAM_CAP_DYNCFG) + +// assumes job is locked and acquired!!! +void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job) { + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d\n", plugin_name, module_name, job->name, job_status2str(job->status), job->state); + if (job->reason) + buffer_sprintf(wb, " \"%s\"", job->reason); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); + + job->dirty = 0; +} + +void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name) { + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_DELETE_JOB " %s %s %s\n", plugin_name, module_name, job_name); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); +} + RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) { RRDHOST *host = st->rrdhost; @@ -559,6 +603,78 @@ void rrdpush_send_global_functions(RRDHOST *host) { sender_thread_buffer_free(); } +void rrdpush_send_dyncfg(RRDHOST *host) { + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + DICTIONARY *plugins_dict = host->configurable_plugins; + + struct configurable_plugin *plug; + dfe_start_read(plugins_dict, plug) { + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plug->name); + struct module *mod; + dfe_start_read(plug->modules, mod) { + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plug->name, mod->name, module_type2str(mod->type)); + struct job *job; + dfe_start_read(mod->jobs, job) { + pthread_mutex_lock(&job->lock); + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plug->name, mod->name, job->name, job_type2str(job->type), job->flags); + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plug->name, mod->name, job->name, job_status2str(job->status), job->state); + if (job->reason) + buffer_sprintf(wb, " \"%s\"", job->reason); + buffer_sprintf(wb, "\n"); + job->dirty = 0; + pthread_mutex_unlock(&job->lock); + } dfe_done(job); + } dfe_done(mod); + } + dfe_done(plug); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); +} + +void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name) +{ + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plugin_name); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); +} + +void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type) +{ + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plugin_name, module_name, module_type2str(type)); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); +} + +void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags) +{ + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plugin_name, module_name, job_name, job_type2str(type), flags); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); +} + void rrdpush_send_claimed_id(RRDHOST *host) { if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM)) return; @@ -1288,6 +1404,7 @@ static struct { { STREAM_CAP_INTERPOLATED, "INTERPOLATED" }, { STREAM_CAP_IEEE754, "IEEE754" }, { STREAM_CAP_DATA_WITH_ML, "ML" }, + { STREAM_CAP_DYNCFG, "DYN_CFG" }, { 0 , NULL }, }; diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index f8d6926039..c3c14233fa 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -45,6 +45,7 @@ typedef enum { STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit + STREAM_CAP_DYNCFG = (1 << 17), // dynamic configuration of plugins trough streaming STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set // this must be signed int, so don't use the last bit @@ -232,6 +233,13 @@ typedef enum __attribute__((packed)) { SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression } SENDER_FLAGS; +struct function_payload_state { + BUFFER *payload; + char *txid; + char *fn_name; + char *timeout; +}; + struct sender_state { RRDHOST *host; pid_t tid; // the thread id of the sender, from gettid() @@ -261,6 +269,9 @@ struct sender_state { int rrdpush_sender_pipe[2]; // collector to sender thread signaling int rrdpush_sender_socket; + int receiving_function_payload; + struct function_payload_state function_payload; // state when receiving function with payload + uint16_t hops; #ifdef ENABLE_RRDPUSH_COMPRESSION @@ -482,6 +493,7 @@ void *rrdpush_sender_thread(void *ptr); void rrdpush_send_host_labels(RRDHOST *host); void rrdpush_send_claimed_id(RRDHOST *host); void rrdpush_send_global_functions(RRDHOST *host); +void rrdpush_send_dyncfg(RRDHOST *host); #define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended #define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended @@ -764,4 +776,11 @@ typedef struct rrdhost_status { void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s); bool rrdhost_state_cloud_emulation(RRDHOST *host); +void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job); +void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name); + +void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name); +void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type); +void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags);//x + #endif //NETDATA_RRDPUSH_H diff --git a/streaming/sender.c b/streaming/sender.c index 2ae3b0cde8..6318464b9a 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -955,6 +955,12 @@ void execute_commands(struct sender_state *s) { while( start < end && (newline = strchr(start, '\n')) ) { *newline = '\0'; + if (s->receiving_function_payload && unlikely(strcmp(start, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) != 0)) { + buffer_strcat(s->function_payload.payload, start); + start = newline + 1; + continue; + } + netdata_log_access("STREAM: %d from '%s' for host '%s': %s", gettid(), s->connected_to, rrdhost_hostname(s->host), start); @@ -965,12 +971,12 @@ void execute_commands(struct sender_state *s) { const char *keyword = get_word(words, num_words, 0); - if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) { + if(keyword && (strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) { worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); - char *transaction = get_word(words, num_words, 1); - char *timeout_s = get_word(words, num_words, 2); - char *function = get_word(words, num_words, 3); + char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(words, num_words, 1); + char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(words, num_words, 2); + char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(words, num_words, 3); if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", @@ -990,8 +996,9 @@ void execute_commands(struct sender_state *s) { tmp->transaction = string_strdupz(transaction); BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions); + char *payload = s->receiving_function_payload ? (char *)buffer_tostring(s->function_payload.payload) : NULL; int code = rrd_function_run(s->host, wb, timeout, function, false, transaction, - stream_execute_function_callback, tmp, NULL, NULL); + stream_execute_function_callback, tmp, NULL, NULL, payload); if(code != HTTP_RESP_OK) { if (!buffer_strlen(wb)) @@ -1000,6 +1007,47 @@ void execute_commands(struct sender_state *s) { stream_execute_function_callback(wb, code, tmp); } } + + if (s->receiving_function_payload) { + s->receiving_function_payload = false; + + buffer_free(s->function_payload.payload); + freez(s->function_payload.txid); + freez(s->function_payload.timeout); + freez(s->function_payload.fn_name); + + memset(&s->function_payload, 0, sizeof(struct function_payload_state)); + } + } + else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) { + if (s->receiving_function_payload) { + netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload", rrdhost_hostname(s->host), s->connected_to, keyword); + s->receiving_function_payload = false; + buffer_free(s->function_payload.payload); + s->function_payload.payload = NULL; + + // TODO send error response + } + + char *transaction = get_word(words, num_words, 1); + char *timeout_s = get_word(words, num_words, 2); + char *function = get_word(words, num_words, 3); + + if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { + netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", + rrdhost_hostname(s->host), s->connected_to, + keyword, + transaction?transaction:"(unset)", + timeout_s?timeout_s:"(unset)", + function?function:"(unset)"); + } + + s->receiving_function_payload = true; + s->function_payload.payload = buffer_create(4096, &netdata_buffers_statistics.buffers_functions); + + s->function_payload.txid = strdupz(get_word(words, num_words, 1)); + s->function_payload.timeout = strdupz(get_word(words, num_words, 2)); + s->function_payload.fn_name = strdupz(get_word(words, num_words, 3)); } else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) { worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); @@ -1343,6 +1391,7 @@ void *rrdpush_sender_thread(void *ptr) { rrdpush_send_claimed_id(s->host); rrdpush_send_host_labels(s->host); rrdpush_send_global_functions(s->host); + rrdpush_send_dyncfg(s->host); s->replication.oldest_request_after_t = 0; rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); |