summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2024-01-11 16:56:45 +0200
committerGitHub <noreply@github.com>2024-01-11 16:56:45 +0200
commitf2b250a1f53af00241522db35f8c85f19ed282e1 (patch)
treee813ca47880e3e2adf09583658efef59633ec6bd /streaming
parentbead543ea52e51cf73f7e5b27de53197801399a7 (diff)
dyncfg v2 (#16702)
* split rrdfunctions streaming and progress * simplified internal inline functions API * split rrdfunctions inflight management * split rrd functions exporters * renames * base dyncfg structure * config pluginsd * intercept dyncfg function calls * loading and saving of dyncfg metadata and data * save metadata and payload to a single file; added code to update the plugins with jobs and saved configs * basic working unit test * added payload to functions execution * removed old dyncfg code that is not needed any more * more cleanup * cleanup sender for functions with payload * dyncfg functions are not exposed as functions * remaining work to avoid indexing the \0 terminating character in dictionary keys * added back old dyncfg plugins.d commands as noop, to allow plugins continue working * working api; working streaming; * updated plugins.d documentation * aclk and http api requests share the same header parsing logic * added source type internal * fixed crashes * added god mode for tests * fixes * fixed messages * save host machine guids to configs * cleaner manipulation of supported commands * the functions event loop for external plugins can now process dyncfg requests * unified internal and external plugins dyncfg API * Netdata serves schema requests from /etc/netdata/schema.d and /var/lib/netdata/conf.d/schema.d * cleanup and various fixes; fixed bug in previous dyncfg implementation on streaming that was sending the paylod in a way that allowed other streaming commands to be multiplexed * internals go to a separate header file * fix duplicate ACLK requests sent by aclk queue mechanism * use fstat instead of stat * working api * plugin actions renamed to create and delete; dyncfg files are removed only from user actions * prevent deadlock by using the react callback * fix for string_strndupz() * better dyncfg unittests * more tests at the unittests * properly detect dyncfg functions * hide config functions from the UI * tree response improvements * send the initial update with payload * determine tty using stdout, not stderr * changes to statuses, cleanup and the code to bring all business logic into interception * do not crash when the status is empty * functions now propagate the source of the requests to plugins * avoid warning about unused functions * in the count at items for attention, do not count the orphan entries * save source into dyncfg * make the list null terminated * fixed invalid comparison * prevent memory leak on duplicated headers; log x-forwarded-for * more unit tests * added dyncfg unittests into the default unittests * more unit tests and fixes * more unit tests and fixes * fix dictionary unittests * config functions require admin access
Diffstat (limited to 'streaming')
-rw-r--r--streaming/rrdpush.c126
-rw-r--r--streaming/rrdpush.h51
-rw-r--r--streaming/sender.c181
3 files changed, 126 insertions, 232 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index e74c06ef49..bc967c007d 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -302,7 +302,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
// send the chart functions
if(stream_has_capability(host->sender, STREAM_CAP_FUNCTIONS))
- rrd_functions_expose_rrdpush(st, wb);
+ rrd_chart_functions_expose_rrdpush(st, wb);
// send the chart local custom variables
rrdsetvar_print_to_streaming_custom_chart_variables(st, wb);
@@ -485,40 +485,6 @@ void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
*rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, };
}
-#define dyncfg_can_push(host) (rrdhost_can_send_definitions_to_parent(host) && stream_has_capability((host)->sender, STREAM_CAP_DYNCFG))
-
-// assumes job is locked and acquired!!!
-void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job) {
- if(!dyncfg_can_push(host)) return;
-
- BUFFER *wb = sender_start(host->sender);
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plugin_name, module_name, job->name, job_status2str(job->status), job->state);
-
- if (job->reason && strlen(job->reason))
- buffer_sprintf(wb, " \"%s\"", job->reason);
-
- buffer_strcat(wb, "\n");
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
-
- sender_thread_buffer_free();
-
- job->dirty = 0;
-}
-
-void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name) {
- if(!dyncfg_can_push(host)) return;
-
- BUFFER *wb = sender_start(host->sender);
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DELETE_JOB " %s %s %s\n", plugin_name, module_name, job_name);
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
-
- sender_thread_buffer_free();
-}
-
RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) {
RRDHOST *host = st->rrdhost;
@@ -545,7 +511,7 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock
if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) {
BUFFER *wb = sender_start(host->sender);
- rrd_functions_expose_global_rrdpush(host, wb);
+ rrd_global_functions_expose_rrdpush(host, wb, stream_has_capability(host->sender, STREAM_CAP_DYNCFG));
sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
}
@@ -605,94 +571,13 @@ void rrdpush_send_global_functions(RRDHOST *host) {
BUFFER *wb = sender_start(host->sender);
- rrd_functions_expose_global_rrdpush(host, wb);
+ rrd_global_functions_expose_rrdpush(host, wb, stream_has_capability(host->sender, STREAM_CAP_DYNCFG));
sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
sender_thread_buffer_free();
}
-void rrdpush_send_dyncfg(RRDHOST *host) {
- if(!dyncfg_can_push(host)) return;
-
- BUFFER *wb = sender_start(host->sender);
-
- DICTIONARY *plugins_dict = host->configurable_plugins;
-
- struct configurable_plugin *plug;
- dfe_start_read(plugins_dict, plug) {
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plug->name);
- struct module *mod;
- dfe_start_read(plug->modules, mod) {
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plug->name, mod->name, module_type2str(mod->type));
- struct job *job;
- dfe_start_read(mod->jobs, job) {
- pthread_mutex_lock(&job->lock);
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plug->name, mod->name, job->name, job_type2str(job->type), job->flags);
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plug->name, mod->name, job->name, job_status2str(job->status), job->state);
- if (job->reason)
- buffer_sprintf(wb, " \"%s\"", job->reason);
- buffer_sprintf(wb, "\n");
- job->dirty = 0;
- pthread_mutex_unlock(&job->lock);
- } dfe_done(job);
- } dfe_done(mod);
- }
- dfe_done(plug);
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
-
- sender_thread_buffer_free();
-}
-
-void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name) {
- if(!dyncfg_can_push(host)) return;
-
- BUFFER *wb = sender_start(host->sender);
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plugin_name);
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
-
- sender_thread_buffer_free();
-}
-
-void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type) {
- if(!dyncfg_can_push(host)) return;
-
- BUFFER *wb = sender_start(host->sender);
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plugin_name, module_name, module_type2str(type));
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
-
- sender_thread_buffer_free();
-}
-
-void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags) {
- if(!dyncfg_can_push(host)) return;
-
- BUFFER *wb = sender_start(host->sender);
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plugin_name, module_name, job_name, job_type2str(type), flags);
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_DYNCFG);
-
- sender_thread_buffer_free();
-}
-
-void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name) {
- if(!dyncfg_can_push(host)) return;
-
- BUFFER *wb = sender_start(host->sender);
-
- buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_RESET " %s\n", plugin_name);
-
- sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
-
- sender_thread_buffer_free();
-}
-
void rrdpush_send_claimed_id(RRDHOST *host) {
if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM))
return;
@@ -1486,11 +1371,10 @@ STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) {
STREAM_CAP_REPLICATION |
STREAM_CAP_BINARY |
STREAM_CAP_INTERPOLATED |
- STREAM_CAP_SLOTS | STREAM_CAP_PROGRESS |
+ STREAM_CAP_SLOTS |
+ STREAM_CAP_PROGRESS |
STREAM_CAP_COMPRESSIONS_AVAILABLE |
- #ifdef NETDATA_TEST_DYNCFG
STREAM_CAP_DYNCFG |
- #endif
STREAM_CAP_IEEE754 |
STREAM_CAP_DATA_WITH_ML |
0) & ~disabled_capabilities;
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index d6cc66ba04..c03414191d 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -47,12 +47,13 @@ typedef enum {
STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values
STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values
STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit
- STREAM_CAP_DYNCFG = (1 << 17), // dynamic configuration of plugins trough streaming
+ // STREAM_CAP_DYNCFG = (1 << 17), // leave this unused for as long as possible
STREAM_CAP_SLOTS = (1 << 18), // the sender can appoint a unique slot for each chart
STREAM_CAP_ZSTD = (1 << 19), // ZSTD compression supported
STREAM_CAP_GZIP = (1 << 20), // GZIP compression supported
STREAM_CAP_BROTLI = (1 << 21), // BROTLI compression supported
STREAM_CAP_PROGRESS = (1 << 22), // Functions PROGRESS support
+ STREAM_CAP_DYNCFG = (1 << 23), // support for DYNCFG
STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
// this must be signed int, so don't use the last bit
@@ -198,13 +199,6 @@ typedef enum __attribute__((packed)) {
SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
} SENDER_FLAGS;
-struct function_payload_state {
- BUFFER *payload;
- char *txid;
- char *fn_name;
- char *timeout;
-};
-
struct sender_state {
RRDHOST *host;
pid_t tid; // the thread id of the sender, from gettid()
@@ -235,9 +229,6 @@ struct sender_state {
int rrdpush_sender_pipe[2]; // collector to sender thread signaling
int rrdpush_sender_socket;
- int receiving_function_payload;
- struct function_payload_state function_payload; // state when receiving function with payload
-
uint16_t hops;
struct line_splitter line;
@@ -276,6 +267,15 @@ struct sender_state {
time_t last_buffer_recreate_s; // true when the sender buffer should be re-created
} atomic;
+ struct {
+ bool intercept_input;
+ const char *transaction;
+ const char *timeout_s;
+ const char *function;
+ const char *source;
+ BUFFER *payload;
+ } functions;
+
int parent_using_h2o;
};
@@ -457,7 +457,6 @@ void *rrdpush_sender_thread(void *ptr);
void rrdpush_send_host_labels(RRDHOST *host);
void rrdpush_send_claimed_id(RRDHOST *host);
void rrdpush_send_global_functions(RRDHOST *host);
-void rrdpush_send_dyncfg(RRDHOST *host);
int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_string, void *h2o_ctx);
void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wait);
@@ -659,11 +658,31 @@ static inline const char *rrdhost_health_status_to_string(RRDHOST_HEALTH_STATUS
}
}
+typedef enum __attribute__((packed)) {
+ RRDHOST_DYNCFG_STATUS_UNAVAILABLE = 0,
+ RRDHOST_DYNCFG_STATUS_AVAILABLE,
+} RRDHOST_DYNCFG_STATUS;
+
+static inline const char *rrdhost_dyncfg_status_to_string(RRDHOST_DYNCFG_STATUS status) {
+ switch(status) {
+ default:
+ case RRDHOST_DYNCFG_STATUS_UNAVAILABLE:
+ return "unavailable";
+
+ case RRDHOST_DYNCFG_STATUS_AVAILABLE:
+ return "online";
+ }
+}
+
typedef struct rrdhost_status {
RRDHOST *host;
time_t now;
struct {
+ RRDHOST_DYNCFG_STATUS status;
+ } dyncfg;
+
+ struct {
RRDHOST_DB_STATUS status;
RRDHOST_DB_LIVENESS liveness;
RRD_MEMORY_MODE mode;
@@ -733,14 +752,6 @@ typedef struct rrdhost_status {
void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s);
bool rrdhost_state_cloud_emulation(RRDHOST *host);
-void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job);
-void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name);
-
-void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name);
-void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type);
-void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags);
-void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name);
-
bool rrdpush_compression_initialize(struct sender_state *s);
bool rrdpush_decompression_initialize(struct receiver_state *rpt);
void rrdpush_parse_compression_order(struct receiver_state *rpt, const char *order);
diff --git a/streaming/sender.c b/streaming/sender.c
index 987dd65375..80144c6d16 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -1128,7 +1128,7 @@ static void stream_execute_function_callback(BUFFER *func_wb, int code, void *da
pluginsd_function_result_begin_to_buffer(wb
, string2str(tmp->transaction)
, code
- , functions_content_type_to_format(func_wb->content_type)
+ , content_type_id2string(func_wb->content_type)
, func_wb->expires);
buffer_fast_strcat(wb, buffer_tostring(func_wb), buffer_strlen(func_wb));
@@ -1163,6 +1163,60 @@ static void stream_execute_function_progress_callback(void *data, size_t done, s
}
}
+static void execute_commands_function(struct sender_state *s, const char *command, const char *transaction, const char *timeout_s, const char *function, BUFFER *payload, const char *source) {
+ worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
+ nd_log(NDLS_ACCESS, NDLP_INFO, NULL);
+
+ if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
+ netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
+ rrdhost_hostname(s->host), s->connected_to,
+ command,
+ transaction?transaction:"(unset)",
+ timeout_s?timeout_s:"(unset)",
+ function?function:"(unset)");
+ }
+ else {
+ int timeout = str2i(timeout_s);
+ if(timeout <= 0) timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
+
+ struct inflight_stream_function *tmp = callocz(1, sizeof(struct inflight_stream_function));
+ tmp->received_ut = now_realtime_usec();
+ tmp->sender = s;
+ tmp->transaction = string_strdupz(transaction);
+ BUFFER *wb = buffer_create(1024, &netdata_buffers_statistics.buffers_functions);
+
+ int code = rrd_function_run(s->host, wb,
+ timeout,HTTP_ACCESS_ADMIN, function, false, transaction,
+ stream_execute_function_callback, tmp,
+ stream_has_capability(s, STREAM_CAP_PROGRESS) ? stream_execute_function_progress_callback : NULL,
+ stream_has_capability(s, STREAM_CAP_PROGRESS) ? tmp : NULL,
+ NULL, NULL, payload, source);
+
+ if(code != HTTP_RESP_OK) {
+ if (!buffer_strlen(wb))
+ rrd_call_function_error(wb, "Failed to route request to collector", code);
+ }
+ }
+}
+
+static void cleanup_intercepting_input(struct sender_state *s) {
+ freez((void *)s->functions.transaction);
+ freez((void *)s->functions.timeout_s);
+ freez((void *)s->functions.function);
+ freez((void *)s->functions.source);
+ buffer_free(s->functions.payload);
+
+ s->functions.transaction = NULL;
+ s->functions.timeout_s = NULL;
+ s->functions.function = NULL;
+ s->functions.payload = NULL;
+ s->functions.intercept_input = false;
+}
+
+static void execute_commands_cleanup(struct sender_state *s) {
+ cleanup_intercepting_input(s);
+}
+
// This is just a placeholder until the gap filling state machine is inserted
void execute_commands(struct sender_state *s) {
worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
@@ -1176,105 +1230,49 @@ void execute_commands(struct sender_state *s) {
char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline;
*end = 0;
while( start < end && (newline = strchr(start, '\n')) ) {
- *newline = '\0';
+ s->line.count++;
+
+ if(s->functions.intercept_input) {
+ if(strcmp(start, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END "\n") == 0) {
+ execute_commands_function(s, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END,
+ s->functions.transaction, s->functions.timeout_s,
+ s->functions.function, s->functions.payload, s->functions.source);
+
+ cleanup_intercepting_input(s);
+ }
+ else
+ buffer_strcat(s->functions.payload, start);
- if (s->receiving_function_payload && unlikely(strcmp(start, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) != 0)) {
- if (buffer_strlen(s->function_payload.payload) != 0)
- buffer_strcat(s->function_payload.payload, "\n");
- buffer_strcat(s->function_payload.payload, start);
start = newline + 1;
continue;
}
- s->line.count++;
+ *newline = '\0';
s->line.num_words = quoted_strings_splitter_pluginsd(start, s->line.words, PLUGINSD_MAX_WORDS);
const char *command = get_word(s->line.words, s->line.num_words, 0);
- if(command && (strcmp(command, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) {
- worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
- nd_log(NDLS_ACCESS, NDLP_INFO, NULL);
-
- char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(s->line.words, s->line.num_words, 1);
- char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(s->line.words, s->line.num_words, 2);
- char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(s->line.words, s->line.num_words, 3);
-
- if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
- netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
- rrdhost_hostname(s->host), s->connected_to,
- command,
- transaction?transaction:"(unset)",
- timeout_s?timeout_s:"(unset)",
- function?function:"(unset)");
- }
- else {
- int timeout = str2i(timeout_s);
- if(timeout <= 0) timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
-
- struct inflight_stream_function *tmp = callocz(1, sizeof(struct inflight_stream_function));
- tmp->received_ut = now_realtime_usec();
- tmp->sender = s;
- tmp->transaction = string_strdupz(transaction);
- BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions);
-
- char *payload = s->receiving_function_payload ? (char *)buffer_tostring(s->function_payload.payload) : NULL;
- int code = rrd_function_run(s->host, wb,
- timeout, HTTP_ACCESS_ADMINS, function, false, transaction,
- stream_execute_function_callback, tmp,
- stream_has_capability(s, STREAM_CAP_PROGRESS) ? stream_execute_function_progress_callback : NULL,
- stream_has_capability(s, STREAM_CAP_PROGRESS) ? tmp : NULL,
- NULL, NULL, payload);
-
- if(code != HTTP_RESP_OK) {
- if (!buffer_strlen(wb))
- rrd_call_function_error(wb, "Failed to route request to collector", code);
-
- stream_execute_function_callback(wb, code, tmp);
- }
- }
-
- if (s->receiving_function_payload) {
- s->receiving_function_payload = false;
-
- buffer_free(s->function_payload.payload);
- freez(s->function_payload.txid);
- freez(s->function_payload.timeout);
- freez(s->function_payload.fn_name);
+ if(command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION) == 0) {
+ char *transaction = get_word(s->line.words, s->line.num_words, 1);
+ char *timeout_s = get_word(s->line.words, s->line.num_words, 2);
+ char *function = get_word(s->line.words, s->line.num_words, 3);
+ char *source = get_word(s->line.words, s->line.num_words, 4);
- memset(&s->function_payload, 0, sizeof(struct function_payload_state));
- }
+ execute_commands_function(s, command, transaction, timeout_s, function, NULL, source);
}
- else if (command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) {
- nd_log(NDLS_ACCESS, NDLP_INFO, NULL);
-
- if (s->receiving_function_payload) {
- netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload",
- rrdhost_hostname(s->host), s->connected_to, command);
- s->receiving_function_payload = false;
- buffer_free(s->function_payload.payload);
- s->function_payload.payload = NULL;
-
- // TODO send error response
- }
-
- char *transaction = get_word(s->line.words, s->line.num_words, 1);
- char *timeout_s = get_word(s->line.words, s->line.num_words, 2);
- char *function = get_word(s->line.words, s->line.num_words, 3);
-
- if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
- netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
- rrdhost_hostname(s->host), s->connected_to,
- command,
- transaction?transaction:"(unset)",
- timeout_s?timeout_s:"(unset)",
- function?function:"(unset)");
- }
-
- s->receiving_function_payload = true;
- s->function_payload.payload = buffer_create(4096, &netdata_buffers_statistics.buffers_functions);
-
- s->function_payload.txid = strdupz(get_word(s->line.words, s->line.num_words, 1));
- s->function_payload.timeout = strdupz(get_word(s->line.words, s->line.num_words, 2));
- s->function_payload.fn_name = strdupz(get_word(s->line.words, s->line.num_words, 3));
+ else if(command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) {
+ char *transaction = get_word(s->line.words, s->line.num_words, 1);
+ char *timeout_s = get_word(s->line.words, s->line.num_words, 2);
+ char *function = get_word(s->line.words, s->line.num_words, 3);
+ char *source = get_word(s->line.words, s->line.num_words, 4);
+ char *content_type = get_word(s->line.words, s->line.num_words, 5);
+
+ s->functions.transaction = strdupz(transaction ? transaction : "");
+ s->functions.timeout_s = strdupz(timeout_s ? timeout_s : "");
+ s->functions.function = strdupz(function ? function : "");
+ s->functions.source = strdupz(source ? source : "");
+ s->functions.payload = buffer_create(0, NULL);
+ s->functions.payload->content_type = content_type_string2id(content_type);
+ s->functions.intercept_input = true;
}
else if(command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
@@ -1480,6 +1478,7 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
rrdpush_sender_thread_close_socket(host);
rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false);
+ execute_commands_cleanup(host->sender);
rrdhost_clear_sender___while_having_sender_mutex(host);
@@ -1680,6 +1679,7 @@ void *rrdpush_sender_thread(void *ptr) {
now_s = now_monotonic_sec();
rrdpush_sender_cbuffer_recreate_timed(s, now_s, false, true);
+ execute_commands_cleanup(s);
rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
s->flags &= ~SENDER_FLAG_OVERFLOW;
@@ -1697,7 +1697,6 @@ void *rrdpush_sender_thread(void *ptr) {
rrdpush_send_claimed_id(s->host);
rrdpush_send_host_labels(s->host);
rrdpush_send_global_functions(s->host);
- rrdpush_send_dyncfg(s->host);
s->replication.oldest_request_after_t = 0;
rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);