summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-29 16:03:40 +0200
committerGitHub <noreply@github.com>2022-11-29 16:03:40 +0200
commit462988dac901e95e765cd6be2dc24a5c33595526 (patch)
tree601474721ffe5cbe7885faa1b78947ce467d2e89
parent009029052f54224b2387e652a6a81a9887008b15 (diff)
replication fixes No 8 (#14061)
* replication requests with start_streaming=true are executed immediately upon reception, instead of being placed in the queue * disable thread cancelability while workers cleanup * remove obsolete worker from replication * multi-threaded replication with netdata.conf option to set number of replication threads * revert spinlock to mutex * separate worker and main thread worker jobs * restart the queue every 10 seconds only * use atomic for sender buffer percentage * reset the queue position after sleeping * use sender resets to sleep properly * fix condition * cleanup sender members related to replication
-rw-r--r--daemon/global_statistics.c5
-rw-r--r--daemon/static_threads.c4
-rw-r--r--libnetdata/locks/locks.c4
-rw-r--r--streaming/replication.c714
-rw-r--r--streaming/replication.h1
-rw-r--r--streaming/rrdpush.h45
-rw-r--r--streaming/sender.c2
7 files changed, 489 insertions, 286 deletions
diff --git a/daemon/global_statistics.c b/daemon/global_statistics.c
index 7ac4bdfa77..53fd6c45af 100644
--- a/daemon/global_statistics.c
+++ b/daemon/global_statistics.c
@@ -2474,6 +2474,8 @@ static int read_thread_cpu_time_from_proc_stat(pid_t pid __maybe_unused, kernel_
static Pvoid_t workers_by_pid_JudyL_array = NULL;
static void workers_threads_cleanup(struct worker_utilization *wu) {
+ netdata_thread_disable_cancelability();
+
struct worker_thread *t = wu->threads;
while(t) {
struct worker_thread *next = t->next;
@@ -2483,9 +2485,10 @@ static void workers_threads_cleanup(struct worker_utilization *wu) {
DOUBLE_LINKED_LIST_REMOVE_UNSAFE(wu->threads, t, prev, next);
freez(t);
}
-
t = next;
}
+
+ netdata_thread_enable_cancelability();
}
static struct worker_thread *worker_thread_find(struct worker_utilization *wu __maybe_unused, pid_t pid) {
diff --git a/daemon/static_threads.c b/daemon/static_threads.c
index 3964fb1547..b7730bc310 100644
--- a/daemon/static_threads.c
+++ b/daemon/static_threads.c
@@ -134,7 +134,7 @@ const struct netdata_static_thread static_threads_common[] = {
#endif
{
- .name = "rrdcontext",
+ .name = "RRDCONTEXT",
.config_section = NULL,
.config_name = NULL,
.enabled = 1,
@@ -144,7 +144,7 @@ const struct netdata_static_thread static_threads_common[] = {
},
{
- .name = "replication",
+ .name = "REPLICATION",
.config_section = NULL,
.config_name = NULL,
.enabled = 1,
diff --git a/libnetdata/locks/locks.c b/libnetdata/locks/locks.c
index 6677e220bf..9bf56e9f58 100644
--- a/libnetdata/locks/locks.c
+++ b/libnetdata/locks/locks.c
@@ -287,6 +287,8 @@ void netdata_spinlock_init(SPINLOCK *spinlock) {
}
void netdata_spinlock_lock(SPINLOCK *spinlock) {
+ netdata_thread_disable_cancelability();
+
static const struct timespec ns = { .tv_sec = 0, .tv_nsec = 1 };
bool expected = false, desired = true;
@@ -306,6 +308,8 @@ void netdata_spinlock_lock(SPINLOCK *spinlock) {
void netdata_spinlock_unlock(SPINLOCK *spinlock) {
__atomic_store_n(&spinlock->locked, false, __ATOMIC_RELEASE);
+
+ netdata_thread_enable_cancelability();
}
#ifdef NETDATA_TRACE_RWLOCKS
diff --git a/streaming/replication.c b/streaming/replication.c
index 2dd662311c..8fa501061b 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -3,10 +3,34 @@
#include "replication.h"
#include "Judy.h"
+#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50
#define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20
#define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10
+#define WORKER_JOB_FIND_NEXT 1
+#define WORKER_JOB_QUERYING 2
+#define WORKER_JOB_DELETE_ENTRY 3
+#define WORKER_JOB_FIND_CHART 4
+#define WORKER_JOB_CHECK_CONSISTENCY 5
+#define WORKER_JOB_BUFFER_COMMIT 6
+#define WORKER_JOB_CLEANUP 7
+
+// master thread worker jobs
+#define WORKER_JOB_STATISTICS 8
+#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 9
+#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 10
+#define WORKER_JOB_CUSTOM_METRIC_ADDED 11
+#define WORKER_JOB_CUSTOM_METRIC_DONE 12
+#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED 13
+#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 14
+#define WORKER_JOB_CUSTOM_METRIC_WAITS 15
+#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16
+
+#define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30
+#define SECONDS_TO_RESET_POINT_IN_TIME 10
+
static struct replication_query_statistics replication_queries = {
+ .spinlock = NETDATA_SPINLOCK_INITIALIZER,
.queries_started = 0,
.queries_finished = 0,
.points_read = 0,
@@ -14,7 +38,10 @@ static struct replication_query_statistics replication_queries = {
};
struct replication_query_statistics replication_get_query_statistics(void) {
- return replication_queries;
+ netdata_spinlock_lock(&replication_queries.spinlock);
+ struct replication_query_statistics ret = replication_queries;
+ netdata_spinlock_unlock(&replication_queries.spinlock);
+ return ret;
}
// ----------------------------------------------------------------------------
@@ -177,6 +204,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
// release all the dictionary items acquired
// finalize the queries
+ size_t queries = 0;
for(size_t i = 0; i < dimensions ;i++) {
struct replication_dimension *d = &data[i];
if(unlikely(!d->enabled)) continue;
@@ -186,12 +214,15 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
dictionary_acquired_item_release(d->dict, d->rda);
// update global statistics
- replication_queries.queries_started++;
- replication_queries.queries_finished++;
+ queries++;
}
+ netdata_spinlock_lock(&replication_queries.spinlock);
+ replication_queries.queries_started += queries;
+ replication_queries.queries_finished += queries;
replication_queries.points_read += points_read;
replication_queries.points_generated += points_generated;
+ netdata_spinlock_unlock(&replication_queries.spinlock);
return before;
}
@@ -323,7 +354,9 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
, (unsigned long long)world_clock_time
);
+ worker_is_busy(WORKER_JOB_BUFFER_COMMIT);
sender_commit(host->sender, wb);
+ worker_is_busy(WORKER_JOB_CLEANUP);
return enable_streaming;
}
@@ -349,7 +382,7 @@ struct replication_request_details {
struct {
time_t first_entry_t; // the first entry time we have
time_t last_entry_t; // the last entry time we have
- bool last_entry_t_adjusted_to_now; // true, if the last entry time was in the future and we fixed
+ bool last_entry_t_adjusted_to_now; // true, if the last entry time was in the future, and we fixed
time_t now; // the current local world clock time
} local_db;
@@ -484,7 +517,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
else {
// we had sent a request - let's continue at the point we left it
// for this we don't take into account the actual data in our db
- // because the child may also have gaps and we need to get over it
+ // because the child may also have gaps, and we need to get over it
r.gap.from = r.last_request.before;
}
@@ -534,7 +567,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
if(r.wanted.after > r.wanted.before)
r.wanted.after = r.wanted.before;
- // the child should start streaming immediately if the wanted duration is small or we reached the last entry of the child
+ // the child should start streaming immediately if the wanted duration is small, or we reached the last entry of the child
r.wanted.start_streaming = (r.local_db.now - r.wanted.after <= host->rrdpush_replication_step || r.wanted.before == r.child_db.last_entry_t);
// the wanted timeframe is now r.wanted.after -> r.wanted.before
@@ -568,87 +601,137 @@ struct replication_sort_entry {
size_t unique_id; // used as a key to identify the sort entry - we never access its contents
};
+#define MAX_REPLICATION_THREADS 20 // + 1 for the main thread
+
// the global variables for the replication thread
static struct replication_thread {
netdata_mutex_t mutex;
- size_t pending;
- size_t added;
- size_t executed;
- size_t removed;
- size_t last_executed;
- time_t first_time_t;
- Word_t next_unique_id;
- struct replication_request *requests;
+ struct {
+ size_t pending; // number of requests pending in the queue
+ Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1)
+
+ // statistics
+ size_t added; // number of requests added to the queue
+ size_t removed; // number of requests removed from the queue
+ size_t skipped_not_connected; // number of requests skipped, because the sender is not connected to a parent
+ size_t skipped_no_room; // number of requests skipped, because the sender has no room for responses
+// size_t skipped_no_room_since_last_reset;
+ size_t sender_resets; // number of times a sender reset our last position in the queue
+ time_t first_time_t; // the minimum 'after' we encountered
+
+ struct {
+ Word_t after;
+ Word_t unique_id;
+ Pvoid_t JudyL_array;
+ } queue;
+
+ } unsafe; // protected from replication_recursive_lock()
- Word_t last_after;
- Word_t last_unique_id;
+ struct {
+ size_t executed; // the number of replication requests executed
+ size_t latest_first_time; // the 'after' timestamp of the last request we executed
+ } atomic; // access should be with atomic operations
- size_t skipped_not_connected;
- size_t skipped_no_room;
- size_t sender_resets;
- size_t waits;
+ struct {
+ size_t waits;
+ size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time
- size_t skipped_no_room_last_run;
+ netdata_thread_t **threads_ptrs;
+ size_t threads;
+ } main_thread; // access is allowed only by the main thread
- Pvoid_t JudyL_array;
} replication_globals = {
.mutex = NETDATA_MUTEX_INITIALIZER,
- .pending = 0,
- .added = 0,
- .executed = 0,
- .last_executed = 0,
- .first_time_t = 0,
- .next_unique_id = 1,
- .skipped_no_room = 0,
- .skipped_no_room_last_run = 0,
- .skipped_not_connected = 0,
- .sender_resets = 0,
- .waits = 0,
- .requests = NULL,
- .JudyL_array = NULL,
+ .unsafe = {
+ .pending = 0,
+ .unique_id = 0,
+
+ .added = 0,
+ .removed = 0,
+ .skipped_not_connected = 0,
+ .skipped_no_room = 0,
+// .skipped_no_room_since_last_reset = 0,
+ .sender_resets = 0,
+
+ .first_time_t = 0,
+
+ .queue = {
+ .after = 0,
+ .unique_id = 0,
+ .JudyL_array = NULL,
+ },
+ },
+ .atomic = {
+ .executed = 0,
+ .latest_first_time = 0,
+ },
+ .main_thread = {
+ .waits = 0,
+ .last_executed = 0,
+ .threads = 0,
+ .threads_ptrs = NULL,
+ },
};
-static __thread int replication_recursive_mutex_recursions = 0;
+#define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED)
+#define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED)
-static void replication_recursive_lock() {
- if(++replication_recursive_mutex_recursions == 1)
- netdata_mutex_lock(&replication_globals.mutex);
+static inline bool replication_recursive_lock_mode(char mode) {
+ static __thread int recursions = 0;
-#ifdef NETDATA_INTERNAL_CHECKS
- if(replication_recursive_mutex_recursions < 0 || replication_recursive_mutex_recursions > 2)
- fatal("REPLICATION: recursions is %d", replication_recursive_mutex_recursions);
-#endif
-}
-
-static void replication_recursive_unlock() {
- if(--replication_recursive_mutex_recursions == 0)
- netdata_mutex_unlock(&replication_globals.mutex);
+ if(mode == 'L') { // (L)ock
+ if(++recursions == 1)
+ netdata_mutex_lock(&replication_globals.mutex);
+ }
+ else if(mode == 'U') { // (U)nlock
+ if(--recursions == 0)
+ netdata_mutex_unlock(&replication_globals.mutex);
+ }
+ else if(mode == 'C') { // (C)heck
+ if(recursions > 0)
+ return true;
+ else
+ return false;
+ }
+ else
+ fatal("REPLICATION: unknown lock mode '%c'", mode);
#ifdef NETDATA_INTERNAL_CHECKS
- if(replication_recursive_mutex_recursions < 0 || replication_recursive_mutex_recursions > 2)
- fatal("REPLICATION: recursions is %d", replication_recursive_mutex_recursions);
+ if(recursions < 0)
+ fatal("REPLICATION: recursions is %d", recursions);
#endif
+
+ return true;
}
+#define replication_recursive_lock() replication_recursive_lock_mode('L')
+#define replication_recursive_unlock() replication_recursive_lock_mode('U')
+#define fatal_when_replication_is_not_locked_for_me() do { \
+ if(!replication_recursive_lock_mode('C')) \
+ fatal("REPLICATION: reached %s, but replication is not locked by this thread.", __FUNCTION__); \
+} while(0)
+
void replication_set_next_point_in_time(time_t after, size_t unique_id) {
replication_recursive_lock();
- replication_globals.last_after = after;
- replication_globals.last_unique_id = unique_id;
+ replication_globals.unsafe.queue.after = after;
+ replication_globals.unsafe.queue.unique_id = unique_id;
replication_recursive_unlock();
}
// ----------------------------------------------------------------------------
// replication sort entry management
-static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) {
+static struct replication_sort_entry *replication_sort_entry_create_unsafe(struct replication_request *rq) {
+ fatal_when_replication_is_not_locked_for_me();
+
struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry));
rrdpush_sender_pending_replication_requests_plus_one(rq->sender);
// copy the request
rse->rq = rq;
- rse->unique_id = replication_globals.next_unique_id++;
+ rse->unique_id = ++replication_globals.unsafe.unique_id;
// save the unique id into the request, to be able to delete it later
rq->unique_id = rse->unique_id;
@@ -663,30 +746,33 @@ static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
static struct replication_sort_entry *replication_sort_entry_add(struct replication_request *rq) {
replication_recursive_lock();
- struct replication_sort_entry *rse = replication_sort_entry_create(rq);
+ struct replication_sort_entry *rse = replication_sort_entry_create_unsafe(rq);
- if(rq->after < (time_t)replication_globals.last_after && rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && !replication_globals.skipped_no_room_last_run) {
- // make it find this request first
- replication_set_next_point_in_time(rq->after, rq->unique_id);
- }
+// if(rq->after < (time_t)replication_globals.protected.queue.after &&
+// rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED &&
+// !replication_globals.protected.skipped_no_room_since_last_reset) {
+//
+// // make it find this request first
+// replication_set_next_point_in_time(rq->after, rq->unique_id);
+// }
- replication_globals.added++;
- replication_globals.pending++;
+ replication_globals.unsafe.added++;
+ replication_globals.unsafe.pending++;
Pvoid_t *inner_judy_ptr;
// find the outer judy entry, using after as key
- inner_judy_ptr = JudyLGet(replication_globals.JudyL_array, (Word_t) rq->after, PJE0);
+ inner_judy_ptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
if(!inner_judy_ptr)
- inner_judy_ptr = JudyLIns(&replication_globals.JudyL_array, (Word_t) rq->after, PJE0);
+ inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
// add it to the inner judy, using unique_id as key
Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
*item = rse;
rq->indexed_in_judy = true;
- if(!replication_globals.first_time_t || rq->after < replication_globals.first_time_t)
- replication_globals.first_time_t = rq->after;
+ if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t)
+ replication_globals.unsafe.first_time_t = rq->after;
replication_recursive_unlock();
@@ -694,10 +780,12 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat
}
static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr) {
+ fatal_when_replication_is_not_locked_for_me();
+
bool inner_judy_deleted = false;
- replication_globals.removed++;
- replication_globals.pending--;
+ replication_globals.unsafe.removed++;
+ replication_globals.unsafe.pending--;
rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender);
@@ -708,7 +796,7 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor
// if no items left, delete it from the outer judy
if(**inner_judy_ppptr == NULL) {
- JudyLDel(&replication_globals.JudyL_array, rse->rq->after, PJE0);
+ JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0);
inner_judy_deleted = true;
}
@@ -725,7 +813,7 @@ static void replication_sort_entry_del(struct replication_request *rq) {
replication_recursive_lock();
if(rq->indexed_in_judy) {
- inner_judy_pptr = JudyLGet(replication_globals.JudyL_array, rq->after, PJE0);
+ inner_judy_pptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, rq->after, PJE0);
if (inner_judy_pptr) {
Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0);
if (our_item_pptr) {
@@ -754,68 +842,87 @@ static struct replication_request replication_request_get_first_available() {
Pvoid_t *inner_judy_pptr;
replication_recursive_lock();
- replication_globals.skipped_no_room_last_run = 0;
struct replication_request rq_to_return = (struct replication_request){ .found = false };
-
- if(unlikely(!replication_globals.last_after || !replication_globals.last_unique_id)) {
- replication_globals.last_after = 0;
- replication_globals.last_unique_id = 0;
+ if(unlikely(!replication_globals.unsafe.queue.after || !replication_globals.unsafe.queue.unique_id)) {
+ replication_globals.unsafe.queue.after = 0;
+ replication_globals.unsafe.queue.unique_id = 0;
}
- bool find_same_after = true;
- while(!rq_to_return.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.JudyL_array, &replication_globals.last_after, find_same_after))) {
- Pvoid_t *our_item_pptr;
+ Word_t started_after = replication_globals.unsafe.queue.after;
+
+ size_t round = 0;
+ while(!rq_to_return.found) {
+ round++;
- while(!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.last_unique_id, PJE0))) {
- struct replication_sort_entry *rse = *our_item_pptr;
- struct replication_request *rq = rse->rq;
- struct sender_state *s = rq->sender;
+ if(round > 2)
+ break;
- if(likely(s->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)) {
- // there is room for this request in the sender buffer
+ if(round == 2) {
+ if(started_after == 0)
+ break;
- bool sender_is_connected =
- rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
+ replication_globals.unsafe.queue.after = 0;
+ replication_globals.unsafe.queue.unique_id = 0;
+ }
- bool sender_has_been_flushed_since_this_request =
- rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s);
+ bool find_same_after = true;
+ while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, find_same_after))) {
+ Pvoid_t *our_item_pptr;
- if (unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) {
- // skip this request, the sender is not connected or it has reconnected
+ if(unlikely(round == 2 && replication_globals.unsafe.queue.after > started_after))
+ break;
- replication_globals.skipped_not_connected++;
- if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
- // we removed the item from the outer JudyL
- break;
- }
- else {
- // this request is good to execute
+ while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) {
+ struct replication_sort_entry *rse = *our_item_pptr;
+ struct replication_request *rq = rse->rq;
+ struct sender_state *s = rq->sender;
- // copy the request to return it
- rq_to_return = *rq;
- rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
+ if (likely(rrdpush_sender_get_buffer_used_percent(s) <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)) {
+ // there is room for this request in the sender buffer
- // set the return result to found
- rq_to_return.found = true;
+ bool sender_is_connected =
+ rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
- if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
- // we removed the item from the outer JudyL
- break;
+ bool sender_has_been_flushed_since_this_request =
+ rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s);
+
+ if (unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) {
+ // skip this request, the sender is not connected, or it has reconnected
+
+ replication_globals.unsafe.skipped_not_connected++;
+ if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
+ // we removed the item from the outer JudyL
+ break;
+ }
+ else {
+ // this request is good to execute
+
+ // copy the request to return it
+ rq_to_return = *rq;
+ rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
+
+ // set the return result to found
+ rq_to_return.found = true;
+
+ if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
+ // we removed the item from the outer JudyL
+ break;
+ }
+ }
+ else {
+ replication_globals.unsafe.skipped_no_room++;
+// replication_globals.protected.skipped_no_room_since_last_reset++;
}
}
- else {
- replication_globals.skipped_no_room++;
- replication_globals.skipped_no_room_last_run++;
- }
- }
- // call JudyLNext from now on
- find_same_after = false;
+ // call JudyLNext from now on
+ find_same_after = false;
- // prepare for the next iteration on the outer loop
- replication_globals.last_unique_id = 0;
+ // prepare for the next iteration on the outer loop
+ replication_globals.unsafe.queue.unique_id = 0;
+ }
}
replication_recursive_unlock();
@@ -889,6 +996,58 @@ static void replication_request_delete_callback(const DICTIONARY_ITEM *item __ma
string_freez(rq->chart_id);
}
+static bool replication_execute_request(struct replication_request *rq, bool workers) {
+ bool ret = false;
+
+ if(likely(workers))
+ worker_is_busy(WORKER_JOB_FIND_CHART);
+
+ RRDSET *st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
+ if(!st) {
+ internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' not found",
+ rrdhost_hostname(rq->sender->host), string2str(rq->chart_id));
+
+ goto cleanup;
+ }
+
+ if(likely(workers))
+ worker_is_busy(WORKER_JOB_QUERYING);
+
+ netdata_thread_disable_cancelability();
+
+ // send the replication data
+ bool start_streaming = replicate_chart_response(
+ st->rrdhost, st, rq->start_streaming, rq->after, rq->before);
+
+ netdata_thread_enable_cancelability();
+
+ if(start_streaming && rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender)) {
+ // enable normal streaming if we have to
+ // but only if the sender buffer has not been flushed since we started
+
+ if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
+ rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ rrdhost_sender_replicating_charts_minus_one(st->rrdhost);
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+#endif
+ }
+ else
+ internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating",
+ rrdhost_hostname(st->rrdhost), string2str(rq->chart_id));
+ }
+
+ __atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED);
+
+ ret = true;
+
+cleanup:
+ string_freez(rq->chart_id);
+ return ret;
+}
// ----------------------------------------------------------------------------
// public API
@@ -903,27 +1062,31 @@ void replication_add_request(struct sender_state *sender, const char *chart_id,
.sender_last_flush_ut = rrdpush_sender_get_flush_time(sender),
};
- dictionary_set(sender->replication_requests, chart_id, &rq, sizeof(struct replication_request));
+ if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
+ replication_execute_request(&rq, false);
+
+ else
+ dictionary_set(sender->replication.requests, chart_id, &rq, sizeof(struct replication_request));
}
void replication_sender_delete_pending_requests(struct sender_state *sender) {
// allow the dictionary destructor to go faster on locks
replication_recursive_lock();
- dictionary_flush(sender->replication_requests);
+ dictionary_flush(sender->replication.requests);
replication_recursive_unlock();
}
void replication_init_sender(struct sender_state *sender) {
- sender->replication_requests = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
- dictionary_register_react_callback(sender->replication_requests, replication_request_react_callback, sender);
- dictionary_register_conflict_callback(sender->replication_requests, replication_request_conflict_callback, sender);
- dictionary_register_delete_callback(sender->replication_requests, replication_request_delete_callback, sender);
+ sender->replication.requests = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+ dictionary_register_react_callback(sender->replication.requests, replication_request_react_callback, sender);
+ dictionary_register_conflict_callback(sender->replication.requests, replication_request_conflict_callback, sender);
+ dictionary_register_delete_callback(sender->replication.requests, replication_request_delete_callback, sender);
}
void replication_cleanup_sender(struct sender_state *sender) {
// allow the dictionary destructor to go faster on locks
replication_recursive_lock();
- dictionary_destroy(sender->replication_requests);
+ dictionary_destroy(sender->replication.requests);
replication_recursive_unlock();
}
@@ -932,61 +1095,32 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size;
if(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
- s->replication_reached_max = true;
+ s->replication.unsafe.reached_max = true;
- if(s->replication_reached_max &&
+ if(s->replication.unsafe.reached_max &&
percentage <= MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED) {
- s->replication_reached_max = false;
+ s->replication.unsafe.reached_max = false;
replication_recursive_lock();
- replication_set_next_point_in_time(0, 0);
- replication_globals.sender_resets++;
+// replication_set_next_point_in_time(0, 0);
+ replication_globals.unsafe.sender_resets++;
replication_recursive_unlock();
}
- s->buffer_used_percentage = percentage;
+ rrdpush_sender_set_buffer_used_percent(s, percentage);
}
// ----------------------------------------------------------------------------
// replication thread
-static void replication_main_cleanup(void *ptr) {
- struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
-
- // custom code
- worker_unregister();
-
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
-}
-
-#define WORKER_JOB_FIND_NEXT 1
-#define WORKER_JOB_QUERYING 2
-#define WORKER_JOB_DELETE_ENTRY 3
-#define WORKER_JOB_FIND_CHART 4
-#define WORKER_JOB_STATISTICS 5
-#define WORKER_JOB_ACTIVATE_ENABLE_STREAMING 6
-#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 7
-#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 8
-#define WORKER_JOB_CUSTOM_METRIC_ADDED 9
-#define WORKER_JOB_CUSTOM_METRIC_DONE 10
-#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED 11
-#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12
-#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 13
-#define WORKER_JOB_CUSTOM_METRIC_WAITS 14
-#define WORKER_JOB_CHECK_CONSISTENCY 15
-
-#define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 10
-#define SECONDS_TO_RESET_POINT_IN_TIME 10
-
static size_t verify_host_charts_are_streaming_now(RRDHOST *host) {
internal_error(
host->sender &&
- !host->sender->replication_pending_requests &&
- dictionary_entries(host->sender->replication_requests) != 0,
+ !rrdpush_sender_pending_replication_requests(host->sender) &&
+ dictionary_entries(host->sender->replication.requests) != 0,
"REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication",
rrdhost_hostname(host),
- host->sender->replication_pending_requests,
- dictionary_entries(host->sender->replication_requests)
+ rrdpush_sender_pending_replication_requests(host->sender),
+ dictionary_entries(host->sender->replication.requests)
);
size_t ok = 0;
@@ -1039,42 +1173,131 @@ static void verify_all_hosts_charts_are_streaming_now(void) {
errors += verify_host_charts_are_streaming_now(host);
dfe_done(host);
- size_t executed = replication_globals.executed;
- info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", executed - replication_globals.last_executed, errors);
- replication_globals.last_executed = executed;
+ size_t executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
+ info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication",
+ executed - replication_globals.main_thread.last_executed, errors);
+ replication_globals.main_thread.last_executed = executed;
}
-void *replication_thread_main(void *ptr __maybe_unused) {
- netdata_thread_cleanup_push(replication_main_cleanup, ptr);
-
+static void replication_initialize_workers(bool master) {
worker_register("REPLICATION");
-
worker_register_job_name(WORKER_JOB_FIND_NEXT, "find next");
worker_register_job_name(WORKER_JOB_QUERYING, "querying");
worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete");
worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart");
- worker_register_job_name(WORKER_JOB_ACTIVATE_ENABLE_STREAMING, "enable streaming");
worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency");
- worker_register_job_name(WORKER_JOB_STATISTICS, "statistics");
+ worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit");
+ worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup");
+
+ if(master) {
+ worker_register_job_name(WORKER_JOB_STATISTICS, "statistics");
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, "not connected requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
+ worker_register_job_custom_metric(WORKER_JOB_C