summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2023-09-29 17:13:42 +0200
committerGitHub <noreply@github.com>2023-09-29 17:13:42 +0200
commit6dfc99a2e0cf893c9ac2fd56d7986982738a9579 (patch)
treed5fb21199d99cc681641d7f6f8f33a7f083f435d /streaming
parentdd17442f81ca4bbb97c5b93c7a5525ce4946fbb1 (diff)
Dyncfg add streaming support (#15791)
* dyncfg fncnames as constants * add helper macros to know parser streaming/plugin * plugins dictionary per RRDHOST * api_request_v2_config add support for /host/ * streamify pluginsd_register_plugin * streamify pluginsd_register_module * streamify report_job_status * streamify dyncfg get functions * module_type2str * add job type and flags * add DYNCFG_REGISTER_JOB * implement register job * push all to parent at startup * add helper function is_dyncfg_function * forward virtual functions trough streaming * separate job2json * add api/v2/job_statuses * do cleanup on streaming * streamify set functions * support FUNCTION_PAYLOAD trough streaming * WIP tests * dont attempt loading non-localhost configs * move cfg persistence to proper place * prevent race * properly update job state at runtime * cleanup 1 * job2json add missing reason * add tests * correct HTTP code * add test * streamify delete_job_cb * add DELETE_JOB keyword * job delete over streaming * add tests for create and delete job over parent * rrdpush common checks to macro * add missing forwarders * fix jobs according to test results * more tests * review comment 1 * codacy remove valid warning * codacy ruby fixes * fix wrong rc check * minimal test plugin for child * add test * dict walk insted of master lock * minor - english spelling fixes * thiago comments 1 * minor - rename folder to dynconf * enable only when built with -DNETDATA_TEST_DYNCFG * minor - compiler warning * create dir post daemonization * stricter URL check
Diffstat (limited to 'streaming')
-rw-r--r--streaming/rrdpush.c117
-rw-r--r--streaming/rrdpush.h19
-rw-r--r--streaming/sender.c59
3 files changed, 190 insertions, 5 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index df7db6ed8f..19519d187d 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -96,6 +96,9 @@ STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) {
STREAM_CAP_BINARY |
STREAM_CAP_INTERPOLATED |
STREAM_HAS_COMPRESSION |
+#ifdef NETDATA_TEST_DYNCFG
+ STREAM_CAP_DYNCFG |
+#endif
(ieee754_doubles ? STREAM_CAP_IEEE754 : 0) |
(ml_capability ? STREAM_CAP_DATA_WITH_ML : 0) |
0;
@@ -465,6 +468,47 @@ void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
*rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, };
}
+// TODO enable this macro before release
+#define bail_if_no_cap(cap) \
+ if(unlikely(!stream_has_capability(host->sender, cap))) { \
+ netdata_log_error("STREAM %s [send]: cannot send job status update - parent does not support it.", rrdhost_hostname(host)); \
+ return; \
+ }
+
+#define dyncfg_check_can_push(host) \
+ if(unlikely(!rrdhost_can_send_definitions_to_parent(host))) \
+ return; \
+ bail_if_no_cap(STREAM_CAP_DYNCFG)
+
+// assumes job is locked and acquired!!!
+void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job) {
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d\n", plugin_name, module_name, job->name, job_status2str(job->status), job->state);
+ if (job->reason)
+ buffer_sprintf(wb, " \"%s\"", job->reason);
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+
+ job->dirty = 0;
+}
+
+void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name) {
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DELETE_JOB " %s %s %s\n", plugin_name, module_name, job_name);
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
+
RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) {
RRDHOST *host = st->rrdhost;
@@ -559,6 +603,78 @@ void rrdpush_send_global_functions(RRDHOST *host) {
sender_thread_buffer_free();
}
+void rrdpush_send_dyncfg(RRDHOST *host) {
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ DICTIONARY *plugins_dict = host->configurable_plugins;
+
+ struct configurable_plugin *plug;
+ dfe_start_read(plugins_dict, plug) {
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plug->name);
+ struct module *mod;
+ dfe_start_read(plug->modules, mod) {
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plug->name, mod->name, module_type2str(mod->type));
+ struct job *job;
+ dfe_start_read(mod->jobs, job) {
+ pthread_mutex_lock(&job->lock);
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plug->name, mod->name, job->name, job_type2str(job->type), job->flags);
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plug->name, mod->name, job->name, job_status2str(job->status), job->state);
+ if (job->reason)
+ buffer_sprintf(wb, " \"%s\"", job->reason);
+ buffer_sprintf(wb, "\n");
+ job->dirty = 0;
+ pthread_mutex_unlock(&job->lock);
+ } dfe_done(job);
+ } dfe_done(mod);
+ }
+ dfe_done(plug);
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
+
+void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name)
+{
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plugin_name);
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
+
+void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type)
+{
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plugin_name, module_name, module_type2str(type));
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
+
+void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags)
+{
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plugin_name, module_name, job_name, job_type2str(type), flags);
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
+
void rrdpush_send_claimed_id(RRDHOST *host) {
if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM))
return;
@@ -1288,6 +1404,7 @@ static struct {
{ STREAM_CAP_INTERPOLATED, "INTERPOLATED" },
{ STREAM_CAP_IEEE754, "IEEE754" },
{ STREAM_CAP_DATA_WITH_ML, "ML" },
+ { STREAM_CAP_DYNCFG, "DYN_CFG" },
{ 0 , NULL },
};
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index f8d6926039..c3c14233fa 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -45,6 +45,7 @@ typedef enum {
STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values
STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values
STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit
+ STREAM_CAP_DYNCFG = (1 << 17), // dynamic configuration of plugins trough streaming
STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
// this must be signed int, so don't use the last bit
@@ -232,6 +233,13 @@ typedef enum __attribute__((packed)) {
SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
} SENDER_FLAGS;
+struct function_payload_state {
+ BUFFER *payload;
+ char *txid;
+ char *fn_name;
+ char *timeout;
+};
+
struct sender_state {
RRDHOST *host;
pid_t tid; // the thread id of the sender, from gettid()
@@ -261,6 +269,9 @@ struct sender_state {
int rrdpush_sender_pipe[2]; // collector to sender thread signaling
int rrdpush_sender_socket;
+ int receiving_function_payload;
+ struct function_payload_state function_payload; // state when receiving function with payload
+
uint16_t hops;
#ifdef ENABLE_RRDPUSH_COMPRESSION
@@ -482,6 +493,7 @@ void *rrdpush_sender_thread(void *ptr);
void rrdpush_send_host_labels(RRDHOST *host);
void rrdpush_send_claimed_id(RRDHOST *host);
void rrdpush_send_global_functions(RRDHOST *host);
+void rrdpush_send_dyncfg(RRDHOST *host);
#define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
@@ -764,4 +776,11 @@ typedef struct rrdhost_status {
void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s);
bool rrdhost_state_cloud_emulation(RRDHOST *host);
+void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job);
+void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name);
+
+void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name);
+void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type);
+void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags);//x
+
#endif //NETDATA_RRDPUSH_H
diff --git a/streaming/sender.c b/streaming/sender.c
index 2ae3b0cde8..6318464b9a 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -955,6 +955,12 @@ void execute_commands(struct sender_state *s) {
while( start < end && (newline = strchr(start, '\n')) ) {
*newline = '\0';
+ if (s->receiving_function_payload && unlikely(strcmp(start, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) != 0)) {
+ buffer_strcat(s->function_payload.payload, start);
+ start = newline + 1;
+ continue;
+ }
+
netdata_log_access("STREAM: %d from '%s' for host '%s': %s",
gettid(), s->connected_to, rrdhost_hostname(s->host), start);
@@ -965,12 +971,12 @@ void execute_commands(struct sender_state *s) {
const char *keyword = get_word(words, num_words, 0);
- if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
+ if(keyword && (strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) {
worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
- char *transaction = get_word(words, num_words, 1);
- char *timeout_s = get_word(words, num_words, 2);
- char *function = get_word(words, num_words, 3);
+ char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(words, num_words, 1);
+ char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(words, num_words, 2);
+ char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(words, num_words, 3);
if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
@@ -990,8 +996,9 @@ void execute_commands(struct sender_state *s) {
tmp->transaction = string_strdupz(transaction);
BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions);
+ char *payload = s->receiving_function_payload ? (char *)buffer_tostring(s->function_payload.payload) : NULL;
int code = rrd_function_run(s->host, wb, timeout, function, false, transaction,
- stream_execute_function_callback, tmp, NULL, NULL);
+ stream_execute_function_callback, tmp, NULL, NULL, payload);
if(code != HTTP_RESP_OK) {
if (!buffer_strlen(wb))
@@ -1000,6 +1007,47 @@ void execute_commands(struct sender_state *s) {
stream_execute_function_callback(wb, code, tmp);
}
}
+
+ if (s->receiving_function_payload) {
+ s->receiving_function_payload = false;
+
+ buffer_free(s->function_payload.payload);
+ freez(s->function_payload.txid);
+ freez(s->function_payload.timeout);
+ freez(s->function_payload.fn_name);
+
+ memset(&s->function_payload, 0, sizeof(struct function_payload_state));
+ }
+ }
+ else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) {
+ if (s->receiving_function_payload) {
+ netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload", rrdhost_hostname(s->host), s->connected_to, keyword);
+ s->receiving_function_payload = false;
+ buffer_free(s->function_payload.payload);
+ s->function_payload.payload = NULL;
+
+ // TODO send error response
+ }
+
+ char *transaction = get_word(words, num_words, 1);
+ char *timeout_s = get_word(words, num_words, 2);
+ char *function = get_word(words, num_words, 3);
+
+ if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
+ netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
+ rrdhost_hostname(s->host), s->connected_to,
+ keyword,
+ transaction?transaction:"(unset)",
+ timeout_s?timeout_s:"(unset)",
+ function?function:"(unset)");
+ }
+
+ s->receiving_function_payload = true;
+ s->function_payload.payload = buffer_create(4096, &netdata_buffers_statistics.buffers_functions);
+
+ s->function_payload.txid = strdupz(get_word(words, num_words, 1));
+ s->function_payload.timeout = strdupz(get_word(words, num_words, 2));
+ s->function_payload.fn_name = strdupz(get_word(words, num_words, 3));
}
else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
@@ -1343,6 +1391,7 @@ void *rrdpush_sender_thread(void *ptr) {
rrdpush_send_claimed_id(s->host);
rrdpush_send_host_labels(s->host);
rrdpush_send_global_functions(s->host);
+ rrdpush_send_dyncfg(s->host);
s->replication.oldest_request_after_t = 0;
rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);