diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-29 16:03:40 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-29 16:03:40 +0200 |
commit | 462988dac901e95e765cd6be2dc24a5c33595526 (patch) | |
tree | 601474721ffe5cbe7885faa1b78947ce467d2e89 | |
parent | 009029052f54224b2387e652a6a81a9887008b15 (diff) |
replication fixes No 8 (#14061)
* replication requests with start_streaming=true are executed immediately upon reception, instead of being placed in the queue
* disable thread cancelability while workers cleanup
* remove obsolete worker from replication
* multi-threaded replication with netdata.conf option to set number of replication threads
* revert spinlock to mutex
* separate worker and main thread worker jobs
* restart the queue every 10 seconds only
* use atomic for sender buffer percentage
* reset the queue position after sleeping
* use sender resets to sleep properly
* fix condition
* cleanup sender members related to replication
-rw-r--r-- | daemon/global_statistics.c | 5 | ||||
-rw-r--r-- | daemon/static_threads.c | 4 | ||||
-rw-r--r-- | libnetdata/locks/locks.c | 4 | ||||
-rw-r--r-- | streaming/replication.c | 714 | ||||
-rw-r--r-- | streaming/replication.h | 1 | ||||
-rw-r--r-- | streaming/rrdpush.h | 45 | ||||
-rw-r--r-- | streaming/sender.c | 2 |
7 files changed, 489 insertions, 286 deletions
diff --git a/daemon/global_statistics.c b/daemon/global_statistics.c index 7ac4bdfa77..53fd6c45af 100644 --- a/daemon/global_statistics.c +++ b/daemon/global_statistics.c @@ -2474,6 +2474,8 @@ static int read_thread_cpu_time_from_proc_stat(pid_t pid __maybe_unused, kernel_ static Pvoid_t workers_by_pid_JudyL_array = NULL; static void workers_threads_cleanup(struct worker_utilization *wu) { + netdata_thread_disable_cancelability(); + struct worker_thread *t = wu->threads; while(t) { struct worker_thread *next = t->next; @@ -2483,9 +2485,10 @@ static void workers_threads_cleanup(struct worker_utilization *wu) { DOUBLE_LINKED_LIST_REMOVE_UNSAFE(wu->threads, t, prev, next); freez(t); } - t = next; } + + netdata_thread_enable_cancelability(); } static struct worker_thread *worker_thread_find(struct worker_utilization *wu __maybe_unused, pid_t pid) { diff --git a/daemon/static_threads.c b/daemon/static_threads.c index 3964fb1547..b7730bc310 100644 --- a/daemon/static_threads.c +++ b/daemon/static_threads.c @@ -134,7 +134,7 @@ const struct netdata_static_thread static_threads_common[] = { #endif { - .name = "rrdcontext", + .name = "RRDCONTEXT", .config_section = NULL, .config_name = NULL, .enabled = 1, @@ -144,7 +144,7 @@ const struct netdata_static_thread static_threads_common[] = { }, { - .name = "replication", + .name = "REPLICATION", .config_section = NULL, .config_name = NULL, .enabled = 1, diff --git a/libnetdata/locks/locks.c b/libnetdata/locks/locks.c index 6677e220bf..9bf56e9f58 100644 --- a/libnetdata/locks/locks.c +++ b/libnetdata/locks/locks.c @@ -287,6 +287,8 @@ void netdata_spinlock_init(SPINLOCK *spinlock) { } void netdata_spinlock_lock(SPINLOCK *spinlock) { + netdata_thread_disable_cancelability(); + static const struct timespec ns = { .tv_sec = 0, .tv_nsec = 1 }; bool expected = false, desired = true; @@ -306,6 +308,8 @@ void netdata_spinlock_lock(SPINLOCK *spinlock) { void netdata_spinlock_unlock(SPINLOCK *spinlock) { __atomic_store_n(&spinlock->locked, false, __ATOMIC_RELEASE); + + netdata_thread_enable_cancelability(); } #ifdef NETDATA_TRACE_RWLOCKS diff --git a/streaming/replication.c b/streaming/replication.c index 2dd662311c..8fa501061b 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -3,10 +3,34 @@ #include "replication.h" #include "Judy.h" +#define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50 #define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20 #define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10 +#define WORKER_JOB_FIND_NEXT 1 +#define WORKER_JOB_QUERYING 2 +#define WORKER_JOB_DELETE_ENTRY 3 +#define WORKER_JOB_FIND_CHART 4 +#define WORKER_JOB_CHECK_CONSISTENCY 5 +#define WORKER_JOB_BUFFER_COMMIT 6 +#define WORKER_JOB_CLEANUP 7 + +// master thread worker jobs +#define WORKER_JOB_STATISTICS 8 +#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 9 +#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 10 +#define WORKER_JOB_CUSTOM_METRIC_ADDED 11 +#define WORKER_JOB_CUSTOM_METRIC_DONE 12 +#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED 13 +#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 14 +#define WORKER_JOB_CUSTOM_METRIC_WAITS 15 +#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16 + +#define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30 +#define SECONDS_TO_RESET_POINT_IN_TIME 10 + static struct replication_query_statistics replication_queries = { + .spinlock = NETDATA_SPINLOCK_INITIALIZER, .queries_started = 0, .queries_finished = 0, .points_read = 0, @@ -14,7 +38,10 @@ static struct replication_query_statistics replication_queries = { }; struct replication_query_statistics replication_get_query_statistics(void) { - return replication_queries; + netdata_spinlock_lock(&replication_queries.spinlock); + struct replication_query_statistics ret = replication_queries; + netdata_spinlock_unlock(&replication_queries.spinlock); + return ret; } // ---------------------------------------------------------------------------- @@ -177,6 +204,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti // release all the dictionary items acquired // finalize the queries + size_t queries = 0; for(size_t i = 0; i < dimensions ;i++) { struct replication_dimension *d = &data[i]; if(unlikely(!d->enabled)) continue; @@ -186,12 +214,15 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti dictionary_acquired_item_release(d->dict, d->rda); // update global statistics - replication_queries.queries_started++; - replication_queries.queries_finished++; + queries++; } + netdata_spinlock_lock(&replication_queries.spinlock); + replication_queries.queries_started += queries; + replication_queries.queries_finished += queries; replication_queries.points_read += points_read; replication_queries.points_generated += points_generated; + netdata_spinlock_unlock(&replication_queries.spinlock); return before; } @@ -323,7 +354,9 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t , (unsigned long long)world_clock_time ); + worker_is_busy(WORKER_JOB_BUFFER_COMMIT); sender_commit(host->sender, wb); + worker_is_busy(WORKER_JOB_CLEANUP); return enable_streaming; } @@ -349,7 +382,7 @@ struct replication_request_details { struct { time_t first_entry_t; // the first entry time we have time_t last_entry_t; // the last entry time we have - bool last_entry_t_adjusted_to_now; // true, if the last entry time was in the future and we fixed + bool last_entry_t_adjusted_to_now; // true, if the last entry time was in the future, and we fixed time_t now; // the current local world clock time } local_db; @@ -484,7 +517,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST else { // we had sent a request - let's continue at the point we left it // for this we don't take into account the actual data in our db - // because the child may also have gaps and we need to get over it + // because the child may also have gaps, and we need to get over it r.gap.from = r.last_request.before; } @@ -534,7 +567,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST if(r.wanted.after > r.wanted.before) r.wanted.after = r.wanted.before; - // the child should start streaming immediately if the wanted duration is small or we reached the last entry of the child + // the child should start streaming immediately if the wanted duration is small, or we reached the last entry of the child r.wanted.start_streaming = (r.local_db.now - r.wanted.after <= host->rrdpush_replication_step || r.wanted.before == r.child_db.last_entry_t); // the wanted timeframe is now r.wanted.after -> r.wanted.before @@ -568,87 +601,137 @@ struct replication_sort_entry { size_t unique_id; // used as a key to identify the sort entry - we never access its contents }; +#define MAX_REPLICATION_THREADS 20 // + 1 for the main thread + // the global variables for the replication thread static struct replication_thread { netdata_mutex_t mutex; - size_t pending; - size_t added; - size_t executed; - size_t removed; - size_t last_executed; - time_t first_time_t; - Word_t next_unique_id; - struct replication_request *requests; + struct { + size_t pending; // number of requests pending in the queue + Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1) + + // statistics + size_t added; // number of requests added to the queue + size_t removed; // number of requests removed from the queue + size_t skipped_not_connected; // number of requests skipped, because the sender is not connected to a parent + size_t skipped_no_room; // number of requests skipped, because the sender has no room for responses +// size_t skipped_no_room_since_last_reset; + size_t sender_resets; // number of times a sender reset our last position in the queue + time_t first_time_t; // the minimum 'after' we encountered + + struct { + Word_t after; + Word_t unique_id; + Pvoid_t JudyL_array; + } queue; + + } unsafe; // protected from replication_recursive_lock() - Word_t last_after; - Word_t last_unique_id; + struct { + size_t executed; // the number of replication requests executed + size_t latest_first_time; // the 'after' timestamp of the last request we executed + } atomic; // access should be with atomic operations - size_t skipped_not_connected; - size_t skipped_no_room; - size_t sender_resets; - size_t waits; + struct { + size_t waits; + size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time - size_t skipped_no_room_last_run; + netdata_thread_t **threads_ptrs; + size_t threads; + } main_thread; // access is allowed only by the main thread - Pvoid_t JudyL_array; } replication_globals = { .mutex = NETDATA_MUTEX_INITIALIZER, - .pending = 0, - .added = 0, - .executed = 0, - .last_executed = 0, - .first_time_t = 0, - .next_unique_id = 1, - .skipped_no_room = 0, - .skipped_no_room_last_run = 0, - .skipped_not_connected = 0, - .sender_resets = 0, - .waits = 0, - .requests = NULL, - .JudyL_array = NULL, + .unsafe = { + .pending = 0, + .unique_id = 0, + + .added = 0, + .removed = 0, + .skipped_not_connected = 0, + .skipped_no_room = 0, +// .skipped_no_room_since_last_reset = 0, + .sender_resets = 0, + + .first_time_t = 0, + + .queue = { + .after = 0, + .unique_id = 0, + .JudyL_array = NULL, + }, + }, + .atomic = { + .executed = 0, + .latest_first_time = 0, + }, + .main_thread = { + .waits = 0, + .last_executed = 0, + .threads = 0, + .threads_ptrs = NULL, + }, }; -static __thread int replication_recursive_mutex_recursions = 0; +#define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED) +#define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED) -static void replication_recursive_lock() { - if(++replication_recursive_mutex_recursions == 1) - netdata_mutex_lock(&replication_globals.mutex); +static inline bool replication_recursive_lock_mode(char mode) { + static __thread int recursions = 0; -#ifdef NETDATA_INTERNAL_CHECKS - if(replication_recursive_mutex_recursions < 0 || replication_recursive_mutex_recursions > 2) - fatal("REPLICATION: recursions is %d", replication_recursive_mutex_recursions); -#endif -} - -static void replication_recursive_unlock() { - if(--replication_recursive_mutex_recursions == 0) - netdata_mutex_unlock(&replication_globals.mutex); + if(mode == 'L') { // (L)ock + if(++recursions == 1) + netdata_mutex_lock(&replication_globals.mutex); + } + else if(mode == 'U') { // (U)nlock + if(--recursions == 0) + netdata_mutex_unlock(&replication_globals.mutex); + } + else if(mode == 'C') { // (C)heck + if(recursions > 0) + return true; + else + return false; + } + else + fatal("REPLICATION: unknown lock mode '%c'", mode); #ifdef NETDATA_INTERNAL_CHECKS - if(replication_recursive_mutex_recursions < 0 || replication_recursive_mutex_recursions > 2) - fatal("REPLICATION: recursions is %d", replication_recursive_mutex_recursions); + if(recursions < 0) + fatal("REPLICATION: recursions is %d", recursions); #endif + + return true; } +#define replication_recursive_lock() replication_recursive_lock_mode('L') +#define replication_recursive_unlock() replication_recursive_lock_mode('U') +#define fatal_when_replication_is_not_locked_for_me() do { \ + if(!replication_recursive_lock_mode('C')) \ + fatal("REPLICATION: reached %s, but replication is not locked by this thread.", __FUNCTION__); \ +} while(0) + void replication_set_next_point_in_time(time_t after, size_t unique_id) { replication_recursive_lock(); - replication_globals.last_after = after; - replication_globals.last_unique_id = unique_id; + replication_globals.unsafe.queue.after = after; + replication_globals.unsafe.queue.unique_id = unique_id; replication_recursive_unlock(); } // ---------------------------------------------------------------------------- // replication sort entry management -static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) { +static struct replication_sort_entry *replication_sort_entry_create_unsafe(struct replication_request *rq) { + fatal_when_replication_is_not_locked_for_me(); + struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry)); rrdpush_sender_pending_replication_requests_plus_one(rq->sender); // copy the request rse->rq = rq; - rse->unique_id = replication_globals.next_unique_id++; + rse->unique_id = ++replication_globals.unsafe.unique_id; // save the unique id into the request, to be able to delete it later rq->unique_id = rse->unique_id; @@ -663,30 +746,33 @@ static void replication_sort_entry_destroy(struct replication_sort_entry *rse) { static struct replication_sort_entry *replication_sort_entry_add(struct replication_request *rq) { replication_recursive_lock(); - struct replication_sort_entry *rse = replication_sort_entry_create(rq); + struct replication_sort_entry *rse = replication_sort_entry_create_unsafe(rq); - if(rq->after < (time_t)replication_globals.last_after && rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && !replication_globals.skipped_no_room_last_run) { - // make it find this request first - replication_set_next_point_in_time(rq->after, rq->unique_id); - } +// if(rq->after < (time_t)replication_globals.protected.queue.after && +// rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && +// !replication_globals.protected.skipped_no_room_since_last_reset) { +// +// // make it find this request first +// replication_set_next_point_in_time(rq->after, rq->unique_id); +// } - replication_globals.added++; - replication_globals.pending++; + replication_globals.unsafe.added++; + replication_globals.unsafe.pending++; Pvoid_t *inner_judy_ptr; // find the outer judy entry, using after as key - inner_judy_ptr = JudyLGet(replication_globals.JudyL_array, (Word_t) rq->after, PJE0); + inner_judy_ptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0); if(!inner_judy_ptr) - inner_judy_ptr = JudyLIns(&replication_globals.JudyL_array, (Word_t) rq->after, PJE0); + inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0); // add it to the inner judy, using unique_id as key Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0); *item = rse; rq->indexed_in_judy = true; - if(!replication_globals.first_time_t || rq->after < replication_globals.first_time_t) - replication_globals.first_time_t = rq->after; + if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t) + replication_globals.unsafe.first_time_t = rq->after; replication_recursive_unlock(); @@ -694,10 +780,12 @@ static struct replication_sort_entry *replication_sort_entry_add(struct replicat } static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr) { + fatal_when_replication_is_not_locked_for_me(); + bool inner_judy_deleted = false; - replication_globals.removed++; - replication_globals.pending--; + replication_globals.unsafe.removed++; + replication_globals.unsafe.pending--; rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender); @@ -708,7 +796,7 @@ static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sor // if no items left, delete it from the outer judy if(**inner_judy_ppptr == NULL) { - JudyLDel(&replication_globals.JudyL_array, rse->rq->after, PJE0); + JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0); inner_judy_deleted = true; } @@ -725,7 +813,7 @@ static void replication_sort_entry_del(struct replication_request *rq) { replication_recursive_lock(); if(rq->indexed_in_judy) { - inner_judy_pptr = JudyLGet(replication_globals.JudyL_array, rq->after, PJE0); + inner_judy_pptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, rq->after, PJE0); if (inner_judy_pptr) { Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0); if (our_item_pptr) { @@ -754,68 +842,87 @@ static struct replication_request replication_request_get_first_available() { Pvoid_t *inner_judy_pptr; replication_recursive_lock(); - replication_globals.skipped_no_room_last_run = 0; struct replication_request rq_to_return = (struct replication_request){ .found = false }; - - if(unlikely(!replication_globals.last_after || !replication_globals.last_unique_id)) { - replication_globals.last_after = 0; - replication_globals.last_unique_id = 0; + if(unlikely(!replication_globals.unsafe.queue.after || !replication_globals.unsafe.queue.unique_id)) { + replication_globals.unsafe.queue.after = 0; + replication_globals.unsafe.queue.unique_id = 0; } - bool find_same_after = true; - while(!rq_to_return.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.JudyL_array, &replication_globals.last_after, find_same_after))) { - Pvoid_t *our_item_pptr; + Word_t started_after = replication_globals.unsafe.queue.after; + + size_t round = 0; + while(!rq_to_return.found) { + round++; - while(!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.last_unique_id, PJE0))) { - struct replication_sort_entry *rse = *our_item_pptr; - struct replication_request *rq = rse->rq; - struct sender_state *s = rq->sender; + if(round > 2) + break; - if(likely(s->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)) { - // there is room for this request in the sender buffer + if(round == 2) { + if(started_after == 0) + break; - bool sender_is_connected = - rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); + replication_globals.unsafe.queue.after = 0; + replication_globals.unsafe.queue.unique_id = 0; + } - bool sender_has_been_flushed_since_this_request = - rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s); + bool find_same_after = true; + while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, find_same_after))) { + Pvoid_t *our_item_pptr; - if (unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) { - // skip this request, the sender is not connected or it has reconnected + if(unlikely(round == 2 && replication_globals.unsafe.queue.after > started_after)) + break; - replication_globals.skipped_not_connected++; - if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) - // we removed the item from the outer JudyL - break; - } - else { - // this request is good to execute + while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) { + struct replication_sort_entry *rse = *our_item_pptr; + struct replication_request *rq = rse->rq; + struct sender_state *s = rq->sender; - // copy the request to return it - rq_to_return = *rq; - rq_to_return.chart_id = string_dup(rq_to_return.chart_id); + if (likely(rrdpush_sender_get_buffer_used_percent(s) <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)) { + // there is room for this request in the sender buffer - // set the return result to found - rq_to_return.found = true; + bool sender_is_connected = + rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED); - if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) - // we removed the item from the outer JudyL - break; + bool sender_has_been_flushed_since_this_request = + rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s); + + if (unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) { + // skip this request, the sender is not connected, or it has reconnected + + replication_globals.unsafe.skipped_not_connected++; + if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) + // we removed the item from the outer JudyL + break; + } + else { + // this request is good to execute + + // copy the request to return it + rq_to_return = *rq; + rq_to_return.chart_id = string_dup(rq_to_return.chart_id); + + // set the return result to found + rq_to_return.found = true; + + if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr)) + // we removed the item from the outer JudyL + break; + } + } + else { + replication_globals.unsafe.skipped_no_room++; +// replication_globals.protected.skipped_no_room_since_last_reset++; } } - else { - replication_globals.skipped_no_room++; - replication_globals.skipped_no_room_last_run++; - } - } - // call JudyLNext from now on - find_same_after = false; + // call JudyLNext from now on + find_same_after = false; - // prepare for the next iteration on the outer loop - replication_globals.last_unique_id = 0; + // prepare for the next iteration on the outer loop + replication_globals.unsafe.queue.unique_id = 0; + } } replication_recursive_unlock(); @@ -889,6 +996,58 @@ static void replication_request_delete_callback(const DICTIONARY_ITEM *item __ma string_freez(rq->chart_id); } +static bool replication_execute_request(struct replication_request *rq, bool workers) { + bool ret = false; + + if(likely(workers)) + worker_is_busy(WORKER_JOB_FIND_CHART); + + RRDSET *st = rrdset_find(rq->sender->host, string2str(rq->chart_id)); + if(!st) { + internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' not found", + rrdhost_hostname(rq->sender->host), string2str(rq->chart_id)); + + goto cleanup; + } + + if(likely(workers)) + worker_is_busy(WORKER_JOB_QUERYING); + + netdata_thread_disable_cancelability(); + + // send the replication data + bool start_streaming = replicate_chart_response( + st->rrdhost, st, rq->start_streaming, rq->after, rq->before); + + netdata_thread_enable_cancelability(); + + if(start_streaming && rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender)) { + // enable normal streaming if we have to + // but only if the sender buffer has not been flushed since we started + + if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) { + rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS); + rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED); + rrdhost_sender_replicating_charts_minus_one(st->rrdhost); + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); +#endif + } + else + internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating", + rrdhost_hostname(st->rrdhost), string2str(rq->chart_id)); + } + + __atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED); + + ret = true; + +cleanup: + string_freez(rq->chart_id); + return ret; +} // ---------------------------------------------------------------------------- // public API @@ -903,27 +1062,31 @@ void replication_add_request(struct sender_state *sender, const char *chart_id, .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender), }; - dictionary_set(sender->replication_requests, chart_id, &rq, sizeof(struct replication_request)); + if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED) + replication_execute_request(&rq, false); + + else + dictionary_set(sender->replication.requests, chart_id, &rq, sizeof(struct replication_request)); } void replication_sender_delete_pending_requests(struct sender_state *sender) { // allow the dictionary destructor to go faster on locks replication_recursive_lock(); - dictionary_flush(sender->replication_requests); + dictionary_flush(sender->replication.requests); replication_recursive_unlock(); } void replication_init_sender(struct sender_state *sender) { - sender->replication_requests = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); - dictionary_register_react_callback(sender->replication_requests, replication_request_react_callback, sender); - dictionary_register_conflict_callback(sender->replication_requests, replication_request_conflict_callback, sender); - dictionary_register_delete_callback(sender->replication_requests, replication_request_delete_callback, sender); + sender->replication.requests = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); + dictionary_register_react_callback(sender->replication.requests, replication_request_react_callback, sender); + dictionary_register_conflict_callback(sender->replication.requests, replication_request_conflict_callback, sender); + dictionary_register_delete_callback(sender->replication.requests, replication_request_delete_callback, sender); } void replication_cleanup_sender(struct sender_state *sender) { // allow the dictionary destructor to go faster on locks replication_recursive_lock(); - dictionary_destroy(sender->replication_requests); + dictionary_destroy(sender->replication.requests); replication_recursive_unlock(); } @@ -932,61 +1095,32 @@ void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) { size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size; if(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED) - s->replication_reached_max = true; + s->replication.unsafe.reached_max = true; - if(s->replication_reached_max && + if(s->replication.unsafe.reached_max && percentage <= MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED) { - s->replication_reached_max = false; + s->replication.unsafe.reached_max = false; replication_recursive_lock(); - replication_set_next_point_in_time(0, 0); - replication_globals.sender_resets++; +// replication_set_next_point_in_time(0, 0); + replication_globals.unsafe.sender_resets++; replication_recursive_unlock(); } - s->buffer_used_percentage = percentage; + rrdpush_sender_set_buffer_used_percent(s, percentage); } // ---------------------------------------------------------------------------- // replication thread -static void replication_main_cleanup(void *ptr) { - struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; - static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - - // custom code - worker_unregister(); - - static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; -} - -#define WORKER_JOB_FIND_NEXT 1 -#define WORKER_JOB_QUERYING 2 -#define WORKER_JOB_DELETE_ENTRY 3 -#define WORKER_JOB_FIND_CHART 4 -#define WORKER_JOB_STATISTICS 5 -#define WORKER_JOB_ACTIVATE_ENABLE_STREAMING 6 -#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 7 -#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 8 -#define WORKER_JOB_CUSTOM_METRIC_ADDED 9 -#define WORKER_JOB_CUSTOM_METRIC_DONE 10 -#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED 11 -#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12 -#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 13 -#define WORKER_JOB_CUSTOM_METRIC_WAITS 14 -#define WORKER_JOB_CHECK_CONSISTENCY 15 - -#define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 10 -#define SECONDS_TO_RESET_POINT_IN_TIME 10 - static size_t verify_host_charts_are_streaming_now(RRDHOST *host) { internal_error( host->sender && - !host->sender->replication_pending_requests && - dictionary_entries(host->sender->replication_requests) != 0, + !rrdpush_sender_pending_replication_requests(host->sender) && + dictionary_entries(host->sender->replication.requests) != 0, "REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication", rrdhost_hostname(host), - host->sender->replication_pending_requests, - dictionary_entries(host->sender->replication_requests) + rrdpush_sender_pending_replication_requests(host->sender), + dictionary_entries(host->sender->replication.requests) ); size_t ok = 0; @@ -1039,42 +1173,131 @@ static void verify_all_hosts_charts_are_streaming_now(void) { errors += verify_host_charts_are_streaming_now(host); dfe_done(host); - size_t executed = replication_globals.executed; - info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", executed - replication_globals.last_executed, errors); - replication_globals.last_executed = executed; + size_t executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED); + info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", + executed - replication_globals.main_thread.last_executed, errors); + replication_globals.main_thread.last_executed = executed; } -void *replication_thread_main(void *ptr __maybe_unused) { - netdata_thread_cleanup_push(replication_main_cleanup, ptr); - +static void replication_initialize_workers(bool master) { worker_register("REPLICATION"); - worker_register_job_name(WORKER_JOB_FIND_NEXT, "find next"); worker_register_job_name(WORKER_JOB_QUERYING, "querying"); worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete"); worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart"); - worker_register_job_name(WORKER_JOB_ACTIVATE_ENABLE_STREAMING, "enable streaming"); worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency"); - worker_register_job_name(WORKER_JOB_STATISTICS, "statistics"); + worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit"); + worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup"); + + if(master) { + worker_register_job_name(WORKER_JOB_STATISTICS, "statistics"); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, "not connected requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_JOB_C |