summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-09-16 16:00:42 +0300
committerGitHub <noreply@github.com>2023-09-16 16:00:42 +0300
commit11de4e4ab77177bc1a4f9b6358151adf525f2ca0 (patch)
tree96cc31e5e41103e85f7935eb7bb04b571668c246
parent638d9b064d0f7818a3672f26163413efff9f30a3 (diff)
Functions: allow collectors to be restarted (#15983)
-rw-r--r--database/rrd.h2
-rw-r--r--database/rrdfunctions.c98
-rw-r--r--libnetdata/required_dummies.h1
-rw-r--r--libnetdata/threads/threads.c2
-rw-r--r--streaming/rrdpush.c6
5 files changed, 80 insertions, 29 deletions
diff --git a/database/rrd.h b/database/rrd.h
index 9ff838559c..19af417e03 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -931,6 +931,8 @@ typedef enum __attribute__ ((__packed__)) rrdhost_flags {
RRDHOST_FLAG_METADATA_CLAIMID = (1 << 28), // metadata needs to be stored in the database
RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED = (1 << 29), // set when the receiver part is disconnected
+
+ RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED = (1 << 30), // set when the host has updated global functions
} RRDHOST_FLAGS;
#define rrdhost_flag_check(host, flag) (__atomic_load_n(&((host)->flags), __ATOMIC_SEQ_CST) & (flag))
diff --git a/database/rrdfunctions.c b/database/rrdfunctions.c
index 7a097ab9ca..81a911c489 100644
--- a/database/rrdfunctions.c
+++ b/database/rrdfunctions.c
@@ -310,8 +310,11 @@ struct rrd_collector {
static __thread struct rrd_collector *thread_rrd_collector = NULL;
static void rrd_collector_free(struct rrd_collector *rdc) {
+ if(rdc->running)
+ return;
+
int32_t expected = 0;
- if(likely(!__atomic_compare_exchange_n(&rdc->refcount, &expected, -1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))) {
+ if(!__atomic_compare_exchange_n(&rdc->refcount, &expected, -1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
// the collector is still referenced by charts.
// leave it hanging there, the last chart will actually free it.
return;
@@ -323,9 +326,9 @@ static void rrd_collector_free(struct rrd_collector *rdc) {
// called once per collector
void rrd_collector_started(void) {
- if(likely(thread_rrd_collector)) return;
+ if(!thread_rrd_collector)
+ thread_rrd_collector = callocz(1, sizeof(struct rrd_collector));
- thread_rrd_collector = callocz(1, sizeof(struct rrd_collector));
thread_rrd_collector->tid = gettid();
thread_rrd_collector->running = true;
}
@@ -341,43 +344,70 @@ void rrd_collector_finished(void) {
}
static struct rrd_collector *rrd_collector_acquire(void) {
- __atomic_add_fetch(&thread_rrd_collector->refcount, 1, __ATOMIC_SEQ_CST);
+ rrd_collector_started();
+
+ int32_t expected = __atomic_load_n(&thread_rrd_collector->refcount, __ATOMIC_RELAXED), wanted = 0;
+ do {
+ if(expected < 0 || !thread_rrd_collector->running) {
+ internal_fatal(true, "FUNCTIONS: Trying to acquire a collector that is exiting.");
+ return thread_rrd_collector;
+ }
+
+ wanted = expected + 1;
+
+ } while(!__atomic_compare_exchange_n(&thread_rrd_collector->refcount, &expected, wanted, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED));
+
return thread_rrd_collector;
}
static void rrd_collector_release(struct rrd_collector *rdc) {
if(unlikely(!rdc)) return;
- int32_t refcount = __atomic_sub_fetch(&rdc->refcount, 1, __ATOMIC_SEQ_CST);
- if(refcount == 0 && !rdc->running)
+ int32_t expected = __atomic_load_n(&rdc->refcount, __ATOMIC_RELAXED), wanted = 0;
+ do {
+ if(expected < 0) {
+ internal_fatal(true, "FUNCTIONS: Trying to release a collector that is exiting.");
+ return;
+ }
+
+ if(expected == 0) {
+ internal_fatal(true, "FUNCTIONS: Trying to release a collector that is not acquired.");
+ return;
+ }
+
+ wanted = expected - 1;
+
+ } while(!__atomic_compare_exchange_n(&rdc->refcount, &expected, wanted, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED));
+
+ if(wanted == 0)
rrd_collector_free(rdc);
}
-static void rrd_functions_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused,
- void *rrdhost __maybe_unused) {
+static void rrd_functions_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *rrdhost) {
+ RRDHOST *host = rrdhost; (void)host;
struct rrd_collector_function *rdcf = func;
- if(!thread_rrd_collector)
- fatal("RRDSET_COLLECTOR: called %s() for function '%s' without calling rrd_collector_started() first.",
- __FUNCTION__, dictionary_acquired_item_name(item));
-
+ rrd_collector_started();
rdcf->collector = rrd_collector_acquire();
+
+// internal_error(true, "FUNCTIONS: adding function '%s' on host '%s', collection tid %d, %s",
+// dictionary_acquired_item_name(item), rrdhost_hostname(host),
+// rdcf->collector->tid, rdcf->collector->running ? "running" : "NOT running");
}
-static void rrd_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused,
+static void rrd_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func,
void *rrdhost __maybe_unused) {
struct rrd_collector_function *rdcf = func;
rrd_collector_release(rdcf->collector);
}
-static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused,
- void *new_func __maybe_unused, void *rrdhost __maybe_unused) {
+static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func,
+ void *new_func, void *rrdhost) {
+ RRDHOST *host = rrdhost; (void)host;
struct rrd_collector_function *rdcf = func;
struct rrd_collector_function *new_rdcf = new_func;
- if(!thread_rrd_collector)
- fatal("RRDSET_COLLECTOR: called %s() for function '%s' without calling rrd_collector_started() first.",
- __FUNCTION__, dictionary_acquired_item_name(item));
+ rrd_collector_started();
bool changed = false;
@@ -417,6 +447,10 @@ static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_
changed = true;
}
+// internal_error(true, "FUNCTIONS: adding function '%s' on host '%s', collection tid %d, %s",
+// dictionary_acquired_item_name(item), rrdhost_hostname(host),
+// rdcf->collector->tid, rdcf->collector->running ? "running" : "NOT running");
+
return changed;
}
@@ -460,6 +494,8 @@ void rrd_collector_add_function(RRDHOST *host, RRDSET *st, const char *name, int
if(st)
dictionary_view_set(st->functions_view, key, item);
+ else
+ rrdhost_flag_set(host, RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED);
dictionary_acquired_item_release(host->functions, item);
}
@@ -481,6 +517,8 @@ void rrd_functions_expose_rrdpush(RRDSET *st, BUFFER *wb) {
}
void rrd_functions_expose_global_rrdpush(RRDHOST *host, BUFFER *wb) {
+ rrdhost_flag_clear(host, RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED);
+
struct rrd_collector_function *tmp;
dfe_start_read(host->functions, tmp) {
if(!(tmp->options & RRD_FUNCTION_GLOBAL))
@@ -565,20 +603,22 @@ static int rrd_call_function_find(RRDHOST *host, BUFFER *wb, const char *name, s
char *s = NULL;
*rdcf = NULL;
- while(!(*rdcf) && buffer[0]) {
- *rdcf = dictionary_get(host->functions, buffer);
- if(*rdcf) break;
+ if(host->functions) {
+ while (!(*rdcf) && buffer[0]) {
+ *rdcf = dictionary_get(host->functions, buffer);
+ if (*rdcf) break;
- // if s == NULL, set it to the end of the buffer
- // this should happen only the first time
- if(unlikely(!s))
- s = &buffer[key_length - 1];
+ // if s == NULL, set it to the end of the buffer
+ // this should happen only the first time
+ if (unlikely(!s))
+ s = &buffer[key_length - 1];
- // skip a word from the end
- while(s >= buffer && !isspace(*s)) *s-- = '\0';
+ // skip a word from the end
+ while (s >= buffer && !isspace(*s)) *s-- = '\0';
- // skip all spaces
- while(s >= buffer && isspace(*s)) *s-- = '\0';
+ // skip all spaces
+ while (s >= buffer && isspace(*s)) *s-- = '\0';
+ }
}
buffer_flush(wb);
diff --git a/libnetdata/required_dummies.h b/libnetdata/required_dummies.h
index 5a0d4e0502..1ffe1e9e53 100644
--- a/libnetdata/required_dummies.h
+++ b/libnetdata/required_dummies.h
@@ -37,6 +37,7 @@ void rrdset_thread_rda_free(void){};
void sender_thread_buffer_free(void){};
void query_target_free(void){};
void service_exits(void){};
+void rrd_collector_finished(void){};
// required by get_system_cpus()
char *netdata_configured_host_prefix = "";
diff --git a/libnetdata/threads/threads.c b/libnetdata/threads/threads.c
index ae3c7106d9..a26eba7341 100644
--- a/libnetdata/threads/threads.c
+++ b/libnetdata/threads/threads.c
@@ -178,6 +178,7 @@ void rrdset_thread_rda_free(void);
void sender_thread_buffer_free(void);
void query_target_free(void);
void service_exits(void);
+void rrd_collector_finished(void);
static void thread_cleanup(void *ptr) {
if(netdata_thread != ptr) {
@@ -188,6 +189,7 @@ static void thread_cleanup(void *ptr) {
if(!(netdata_thread->options & NETDATA_THREAD_OPTION_DONT_LOG_CLEANUP))
netdata_log_info("thread with task id %d finished", gettid());
+ rrd_collector_finished();
sender_thread_buffer_free();
rrdset_thread_rda_free();
query_target_free();
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 72c7c6786f..df7db6ed8f 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -489,6 +489,12 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
}
+ if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) {
+ BUFFER *wb = sender_start(host->sender);
+ rrd_functions_expose_global_rrdpush(host, wb);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+ }
+
RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST);
bool exposed_upstream = (rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED);
bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED);