diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2024-01-11 16:56:45 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-11 16:56:45 +0200 |
commit | f2b250a1f53af00241522db35f8c85f19ed282e1 (patch) | |
tree | e813ca47880e3e2adf09583658efef59633ec6bd /streaming | |
parent | bead543ea52e51cf73f7e5b27de53197801399a7 (diff) |
dyncfg v2 (#16702)
* split rrdfunctions streaming and progress
* simplified internal inline functions API
* split rrdfunctions inflight management
* split rrd functions exporters
* renames
* base dyncfg structure
* config pluginsd
* intercept dyncfg function calls
* loading and saving of dyncfg metadata and data
* save metadata and payload to a single file; added code to update the plugins with jobs and saved configs
* basic working unit test
* added payload to functions execution
* removed old dyncfg code that is not needed any more
* more cleanup
* cleanup sender for functions with payload
* dyncfg functions are not exposed as functions
* remaining work to avoid indexing the \0 terminating character in dictionary keys
* added back old dyncfg plugins.d commands as noop, to allow plugins continue working
* working api; working streaming;
* updated plugins.d documentation
* aclk and http api requests share the same header parsing logic
* added source type internal
* fixed crashes
* added god mode for tests
* fixes
* fixed messages
* save host machine guids to configs
* cleaner manipulation of supported commands
* the functions event loop for external plugins can now process dyncfg requests
* unified internal and external plugins dyncfg API
* Netdata serves schema requests from /etc/netdata/schema.d and /var/lib/netdata/conf.d/schema.d
* cleanup and various fixes; fixed bug in previous dyncfg implementation on streaming that was sending the paylod in a way that allowed other streaming commands to be multiplexed
* internals go to a separate header file
* fix duplicate ACLK requests sent by aclk queue mechanism
* use fstat instead of stat
* working api
* plugin actions renamed to create and delete; dyncfg files are removed only from user actions
* prevent deadlock by using the react callback
* fix for string_strndupz()
* better dyncfg unittests
* more tests at the unittests
* properly detect dyncfg functions
* hide config functions from the UI
* tree response improvements
* send the initial update with payload
* determine tty using stdout, not stderr
* changes to statuses, cleanup and the code to bring all business logic into interception
* do not crash when the status is empty
* functions now propagate the source of the requests to plugins
* avoid warning about unused functions
* in the count at items for attention, do not count the orphan entries
* save source into dyncfg
* make the list null terminated
* fixed invalid comparison
* prevent memory leak on duplicated headers; log x-forwarded-for
* more unit tests
* added dyncfg unittests into the default unittests
* more unit tests and fixes
* more unit tests and fixes
* fix dictionary unittests
* config functions require admin access
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/rrdpush.c | 126 | ||||
-rw-r--r-- | streaming/rrdpush.h | 51 | ||||
-rw-r--r-- | streaming/sender.c | 181 |
3 files changed, 126 insertions, 232 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index e74c06ef49..bc967c007d 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -302,7 +302,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { // send the chart functions if(stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS)) - rrd_functions_expose_rrdpush(st, wb); + rrd_chart_functions_expose_rrdpush(st, wb); // send the chart local custom variables rrdsetvar_print_to_streaming_custom_chart_variables(st, wb); @@ -485,40 +485,6 @@ void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) { *rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, }; } -#define dyncfg_can_push(host) (rrdhost_can_send_definitions_to_parent(host) && stream_has_capability((host)->sender, STREAM_CAP_DYNCFG)) - -// assumes job is locked and acquired!!! -void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job) { - if(!dyncfg_can_push(host)) return; - - BUFFER *wb = sender_start(host->sender); - - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plugin_name, module_name, job->name, job_status2str(job->status), job->state); - - if (job->reason && strlen(job->reason)) - buffer_sprintf(wb, " \"%s\"", job->reason); - - buffer_strcat(wb, "\n"); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); - - sender_thread_buffer_free(); - - job->dirty = 0; -} - -void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name) { - if(!dyncfg_can_push(host)) return; - - BUFFER *wb = sender_start(host->sender); - - buffer_sprintf(wb, PLUGINSD_KEYWORD_DELETE_JOB " %s %s %s\n", plugin_name, module_name, job_name); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); - - sender_thread_buffer_free(); -} - RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) { RRDHOST *host = st->rrdhost; @@ -545,7 +511,7 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) { BUFFER *wb = sender_start(host->sender); - rrd_functions_expose_global_rrdpush(host, wb); + rrd_global_functions_expose_rrdpush(host, wb, stream_has_capability(host->sender, STREAM_CAP_DYNCFG)); sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS); } @@ -605,94 +571,13 @@ void rrdpush_send_global_functions(RRDHOST *host) { BUFFER *wb = sender_start(host->sender); - rrd_functions_expose_global_rrdpush(host, wb); + rrd_global_functions_expose_rrdpush(host, wb, stream_has_capability(host->sender, STREAM_CAP_DYNCFG)); sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS); sender_thread_buffer_free(); } -void rrdpush_send_dyncfg(RRDHOST *host) { - if(!dyncfg_can_push(host)) return; - - BUFFER *wb = sender_start(host->sender); - - DICTIONARY *plugins_dict = host->configurable_plugins; - - struct configurable_plugin *plug; - dfe_start_read(plugins_dict, plug) { - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plug->name); - struct module *mod; - dfe_start_read(plug->modules, mod) { - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plug->name, mod->name, module_type2str(mod->type)); - struct job *job; - dfe_start_read(mod->jobs, job) { - pthread_mutex_lock(&job->lock); - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plug->name, mod->name, job->name, job_type2str(job->type), job->flags); - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plug->name, mod->name, job->name, job_status2str(job->status), job->state); - if (job->reason) - buffer_sprintf(wb, " \"%s\"", job->reason); - buffer_sprintf(wb, "\n"); - job->dirty = 0; - pthread_mutex_unlock(&job->lock); - } dfe_done(job); - } dfe_done(mod); - } - dfe_done(plug); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); - - sender_thread_buffer_free(); -} - -void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name) { - if(!dyncfg_can_push(host)) return; - - BUFFER *wb = sender_start(host->sender); - - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plugin_name); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); - - sender_thread_buffer_free(); -} - -void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type) { - if(!dyncfg_can_push(host)) return; - - BUFFER *wb = sender_start(host->sender); - - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plugin_name, module_name, module_type2str(type)); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); - - sender_thread_buffer_free(); -} - -void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags) { - if(!dyncfg_can_push(host)) return; - - BUFFER *wb = sender_start(host->sender); - - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plugin_name, module_name, job_name, job_type2str(type), flags); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG); - - sender_thread_buffer_free(); -} - -void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name) { - if(!dyncfg_can_push(host)) return; - - BUFFER *wb = sender_start(host->sender); - - buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_RESET " %s\n", plugin_name); - - sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); - - sender_thread_buffer_free(); -} - void rrdpush_send_claimed_id(RRDHOST *host) { if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM)) return; @@ -1486,11 +1371,10 @@ STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) { STREAM_CAP_REPLICATION | STREAM_CAP_BINARY | STREAM_CAP_INTERPOLATED | - STREAM_CAP_SLOTS | STREAM_CAP_PROGRESS | + STREAM_CAP_SLOTS | + STREAM_CAP_PROGRESS | STREAM_CAP_COMPRESSIONS_AVAILABLE | - #ifdef NETDATA_TEST_DYNCFG STREAM_CAP_DYNCFG | - #endif STREAM_CAP_IEEE754 | STREAM_CAP_DATA_WITH_ML | 0) & ~disabled_capabilities; diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index d6cc66ba04..c03414191d 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -47,12 +47,13 @@ typedef enum { STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit - STREAM_CAP_DYNCFG = (1 << 17), // dynamic configuration of plugins trough streaming + // STREAM_CAP_DYNCFG = (1 << 17), // leave this unused for as long as possible STREAM_CAP_SLOTS = (1 << 18), // the sender can appoint a unique slot for each chart STREAM_CAP_ZSTD = (1 << 19), // ZSTD compression supported STREAM_CAP_GZIP = (1 << 20), // GZIP compression supported STREAM_CAP_BROTLI = (1 << 21), // BROTLI compression supported STREAM_CAP_PROGRESS = (1 << 22), // Functions PROGRESS support + STREAM_CAP_DYNCFG = (1 << 23), // support for DYNCFG STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set // this must be signed int, so don't use the last bit @@ -198,13 +199,6 @@ typedef enum __attribute__((packed)) { SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown } SENDER_FLAGS; -struct function_payload_state { - BUFFER *payload; - char *txid; - char *fn_name; - char *timeout; -}; - struct sender_state { RRDHOST *host; pid_t tid; // the thread id of the sender, from gettid() @@ -235,9 +229,6 @@ struct sender_state { int rrdpush_sender_pipe[2]; // collector to sender thread signaling int rrdpush_sender_socket; - int receiving_function_payload; - struct function_payload_state function_payload; // state when receiving function with payload - uint16_t hops; struct line_splitter line; @@ -276,6 +267,15 @@ struct sender_state { time_t last_buffer_recreate_s; // true when the sender buffer should be re-created } atomic; + struct { + bool intercept_input; + const char *transaction; + const char *timeout_s; + const char *function; + const char *source; + BUFFER *payload; + } functions; + int parent_using_h2o; }; @@ -457,7 +457,6 @@ void *rrdpush_sender_thread(void *ptr); void rrdpush_send_host_labels(RRDHOST *host); void rrdpush_send_claimed_id(RRDHOST *host); void rrdpush_send_global_functions(RRDHOST *host); -void rrdpush_send_dyncfg(RRDHOST *host); int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string, void *h2o_ctx); void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait); @@ -659,11 +658,31 @@ static inline const char *rrdhost_health_status_to_string(RRDHOST_HEALTH_STATUS } } +typedef enum __attribute__((packed)) { + RRDHOST_DYNCFG_STATUS_UNAVAILABLE = 0, + RRDHOST_DYNCFG_STATUS_AVAILABLE, +} RRDHOST_DYNCFG_STATUS; + +static inline const char *rrdhost_dyncfg_status_to_string(RRDHOST_DYNCFG_STATUS status) { + switch(status) { + default: + case RRDHOST_DYNCFG_STATUS_UNAVAILABLE: + return "unavailable"; + + case RRDHOST_DYNCFG_STATUS_AVAILABLE: + return "online"; + } +} + typedef struct rrdhost_status { RRDHOST *host; time_t now; struct { + RRDHOST_DYNCFG_STATUS status; + } dyncfg; + + struct { RRDHOST_DB_STATUS status; RRDHOST_DB_LIVENESS liveness; RRD_MEMORY_MODE mode; @@ -733,14 +752,6 @@ typedef struct rrdhost_status { void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s); bool rrdhost_state_cloud_emulation(RRDHOST *host); -void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job); -void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name); - -void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name); -void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type); -void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags); -void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name); - bool rrdpush_compression_initialize(struct sender_state *s); bool rrdpush_decompression_initialize(struct receiver_state *rpt); void rrdpush_parse_compression_order(struct receiver_state *rpt, const char *order); diff --git a/streaming/sender.c b/streaming/sender.c index 987dd65375..80144c6d16 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -1128,7 +1128,7 @@ static void stream_execute_function_callback(BUFFER *func_wb, int code, void *da pluginsd_function_result_begin_to_buffer(wb , string2str(tmp->transaction) , code - , functions_content_type_to_format(func_wb->content_type) + , content_type_id2string(func_wb->content_type) , func_wb->expires); buffer_fast_strcat(wb, buffer_tostring(func_wb), buffer_strlen(func_wb)); @@ -1163,6 +1163,60 @@ static void stream_execute_function_progress_callback(void *data, size_t done, s } } +static void execute_commands_function(struct sender_state *s, const char *command, const char *transaction, const char *timeout_s, const char *function, BUFFER *payload, const char *source) { + worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); + nd_log(NDLS_ACCESS, NDLP_INFO, NULL); + + if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { + netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", + rrdhost_hostname(s->host), s->connected_to, + command, + transaction?transaction:"(unset)", + timeout_s?timeout_s:"(unset)", + function?function:"(unset)"); + } + else { + int timeout = str2i(timeout_s); + if(timeout <= 0) timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT; + + struct inflight_stream_function *tmp = callocz(1, sizeof(struct inflight_stream_function)); + tmp->received_ut = now_realtime_usec(); + tmp->sender = s; + tmp->transaction = string_strdupz(transaction); + BUFFER *wb = buffer_create(1024, &netdata_buffers_statistics.buffers_functions); + + int code = rrd_function_run(s->host, wb, + timeout,HTTP_ACCESS_ADMIN, function, false, transaction, + stream_execute_function_callback, tmp, + stream_has_capability(s, STREAM_CAP_PROGRESS) ? stream_execute_function_progress_callback : NULL, + stream_has_capability(s, STREAM_CAP_PROGRESS) ? tmp : NULL, + NULL, NULL, payload, source); + + if(code != HTTP_RESP_OK) { + if (!buffer_strlen(wb)) + rrd_call_function_error(wb, "Failed to route request to collector", code); + } + } +} + +static void cleanup_intercepting_input(struct sender_state *s) { + freez((void *)s->functions.transaction); + freez((void *)s->functions.timeout_s); + freez((void *)s->functions.function); + freez((void *)s->functions.source); + buffer_free(s->functions.payload); + + s->functions.transaction = NULL; + s->functions.timeout_s = NULL; + s->functions.function = NULL; + s->functions.payload = NULL; + s->functions.intercept_input = false; +} + +static void execute_commands_cleanup(struct sender_state *s) { + cleanup_intercepting_input(s); +} + // This is just a placeholder until the gap filling state machine is inserted void execute_commands(struct sender_state *s) { worker_is_busy(WORKER_SENDER_JOB_EXECUTE); @@ -1176,105 +1230,49 @@ void execute_commands(struct sender_state *s) { char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline; *end = 0; while( start < end && (newline = strchr(start, '\n')) ) { - *newline = '\0'; + s->line.count++; + + if(s->functions.intercept_input) { + if(strcmp(start, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END "\n") == 0) { + execute_commands_function(s, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END, + s->functions.transaction, s->functions.timeout_s, + s->functions.function, s->functions.payload, s->functions.source); + + cleanup_intercepting_input(s); + } + else + buffer_strcat(s->functions.payload, start); - if (s->receiving_function_payload && unlikely(strcmp(start, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) != 0)) { - if (buffer_strlen(s->function_payload.payload) != 0) - buffer_strcat(s->function_payload.payload, "\n"); - buffer_strcat(s->function_payload.payload, start); start = newline + 1; continue; } - s->line.count++; + *newline = '\0'; s->line.num_words = quoted_strings_splitter_pluginsd(start, s->line.words, PLUGINSD_MAX_WORDS); const char *command = get_word(s->line.words, s->line.num_words, 0); - if(command && (strcmp(command, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) { - worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); - nd_log(NDLS_ACCESS, NDLP_INFO, NULL); - - char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(s->line.words, s->line.num_words, 1); - char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(s->line.words, s->line.num_words, 2); - char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(s->line.words, s->line.num_words, 3); - - if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { - netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", - rrdhost_hostname(s->host), s->connected_to, - command, - transaction?transaction:"(unset)", - timeout_s?timeout_s:"(unset)", - function?function:"(unset)"); - } - else { - int timeout = str2i(timeout_s); - if(timeout <= 0) timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT; - - struct inflight_stream_function *tmp = callocz(1, sizeof(struct inflight_stream_function)); - tmp->received_ut = now_realtime_usec(); - tmp->sender = s; - tmp->transaction = string_strdupz(transaction); - BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions); - - char *payload = s->receiving_function_payload ? (char *)buffer_tostring(s->function_payload.payload) : NULL; - int code = rrd_function_run(s->host, wb, - timeout, HTTP_ACCESS_ADMINS, function, false, transaction, - stream_execute_function_callback, tmp, - stream_has_capability(s, STREAM_CAP_PROGRESS) ? stream_execute_function_progress_callback : NULL, - stream_has_capability(s, STREAM_CAP_PROGRESS) ? tmp : NULL, - NULL, NULL, payload); - - if(code != HTTP_RESP_OK) { - if (!buffer_strlen(wb)) - rrd_call_function_error(wb, "Failed to route request to collector", code); - - stream_execute_function_callback(wb, code, tmp); - } - } - - if (s->receiving_function_payload) { - s->receiving_function_payload = false; - - buffer_free(s->function_payload.payload); - freez(s->function_payload.txid); - freez(s->function_payload.timeout); - freez(s->function_payload.fn_name); + if(command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION) == 0) { + char *transaction = get_word(s->line.words, s->line.num_words, 1); + char *timeout_s = get_word(s->line.words, s->line.num_words, 2); + char *function = get_word(s->line.words, s->line.num_words, 3); + char *source = get_word(s->line.words, s->line.num_words, 4); - memset(&s->function_payload, 0, sizeof(struct function_payload_state)); - } + execute_commands_function(s, command, transaction, timeout_s, function, NULL, source); } - else if (command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) { - nd_log(NDLS_ACCESS, NDLP_INFO, NULL); - - if (s->receiving_function_payload) { - netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload", - rrdhost_hostname(s->host), s->connected_to, command); - s->receiving_function_payload = false; - buffer_free(s->function_payload.payload); - s->function_payload.payload = NULL; - - // TODO send error response - } - - char *transaction = get_word(s->line.words, s->line.num_words, 1); - char *timeout_s = get_word(s->line.words, s->line.num_words, 2); - char *function = get_word(s->line.words, s->line.num_words, 3); - - if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { - netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", - rrdhost_hostname(s->host), s->connected_to, - command, - transaction?transaction:"(unset)", - timeout_s?timeout_s:"(unset)", - function?function:"(unset)"); - } - - s->receiving_function_payload = true; - s->function_payload.payload = buffer_create(4096, &netdata_buffers_statistics.buffers_functions); - - s->function_payload.txid = strdupz(get_word(s->line.words, s->line.num_words, 1)); - s->function_payload.timeout = strdupz(get_word(s->line.words, s->line.num_words, 2)); - s->function_payload.fn_name = strdupz(get_word(s->line.words, s->line.num_words, 3)); + else if(command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) { + char *transaction = get_word(s->line.words, s->line.num_words, 1); + char *timeout_s = get_word(s->line.words, s->line.num_words, 2); + char *function = get_word(s->line.words, s->line.num_words, 3); + char *source = get_word(s->line.words, s->line.num_words, 4); + char *content_type = get_word(s->line.words, s->line.num_words, 5); + + s->functions.transaction = strdupz(transaction ? transaction : ""); + s->functions.timeout_s = strdupz(timeout_s ? timeout_s : ""); + s->functions.function = strdupz(function ? function : ""); + s->functions.source = strdupz(source ? source : ""); + s->functions.payload = buffer_create(0, NULL); + s->functions.payload->content_type = content_type_string2id(content_type); + s->functions.intercept_input = true; } else if(command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) { worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); @@ -1480,6 +1478,7 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) { rrdpush_sender_thread_close_socket(host); rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false); + execute_commands_cleanup(host->sender); rrdhost_clear_sender___while_having_sender_mutex(host); @@ -1680,6 +1679,7 @@ void *rrdpush_sender_thread(void *ptr) { now_s = now_monotonic_sec(); rrdpush_sender_cbuffer_recreate_timed(s, now_s, false, true); + execute_commands_cleanup(s); rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); s->flags &= ~SENDER_FLAG_OVERFLOW; @@ -1697,7 +1697,6 @@ void *rrdpush_sender_thread(void *ptr) { rrdpush_send_claimed_id(s->host); rrdpush_send_host_labels(s->host); rrdpush_send_global_functions(s->host); - rrdpush_send_dyncfg(s->host); s->replication.oldest_request_after_t = 0; rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); |