diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-01-27 01:32:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-27 01:32:20 +0200 |
commit | 57eab742c88093c89d5d46deb495558ad726e6f0 (patch) | |
tree | e8a01519a8f9df7beba4d0be7be53a9be3f1fdfd /streaming | |
parent | c4f5524ea8279be492eb527a67242b408543382e (diff) |
DBENGINE v2 - improvements part 10 (#14332)
* replication cancels pending queries on exit
* log when waiting for inflight queries
* when there are collected and not-collected metrics, use the context priority from the collected only
* Write metadata with a faster pace
* Remove journal file size limit and sync mode to 0 / Drop wal checkpoint for now
* Wrap in a big transaction remaining metadata writes (test 1)
* fix higher tiers when tiering iterations = 2
* dbengine always returns db-aligned points; query engine expands the queries by 2 points in every direction to have enough data for interpolation
* Wrap in a big transaction metadata writes (test 2)
* replication cancelling fix
* do not first and last entry in replication when the db has no retention
* fix internal check condition
* Increase metadata write batch size
* always apply error limit to dbengine logs
* Remove code that processes the obsolete health.db files
* cleanup in query.c
* do not allow queries to go beyond db boundaries
* prevent internal log for +1 delta in timestamp
* detect gap pages in conflicts
* double protection for gap injection in main cache
* Add checkpoint to prevent large WAL while running
Remove unused and duplicate functions
* do not allocate chart cache dir if not needed
* add more info to unittests
* revert query expansion to satisfy unittests
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/replication.c | 65 |
1 files changed, 58 insertions, 7 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index 4abd20cc3d..1a7be31eea 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -1328,6 +1328,7 @@ static bool replication_execute_request(struct replication_request *rq, bool wor replication_response_execute_and_finalize( rq->q, (size_t)((unsigned long long)rq->sender->host->sender->buffer->max_size * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL)); + rq->q = NULL; netdata_thread_enable_cancelability(); __atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED); @@ -1335,6 +1336,11 @@ static bool replication_execute_request(struct replication_request *rq, bool wor ret = true; cleanup: + if(rq->q) { + replication_response_cancel_and_finalize(rq->q); + rq->q = NULL; + } + string_freez(rq->chart_id); worker_is_idle(); return ret; @@ -1518,13 +1524,41 @@ static void replication_initialize_workers(bool master) { #define REQUEST_QUEUE_EMPTY (-1) #define REQUEST_CHART_NOT_FOUND (-2) -static int replication_execute_next_pending_request(void) { +static int replication_execute_next_pending_request(bool cancel) { static __thread int max_requests_ahead = 0; static __thread struct replication_request *rqs = NULL; static __thread int rqs_last_executed = 0, rqs_last_prepared = 0; static __thread size_t queue_rounds = 0; (void)queue_rounds; struct replication_request *rq; + if(unlikely(cancel)) { + if(rqs) { + size_t cancelled = 0; + do { + if (++rqs_last_executed >= max_requests_ahead) + rqs_last_executed = 0; + + rq = &rqs[rqs_last_executed]; + + if (rq->q) { + internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!"); + internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq"); + + replication_response_cancel_and_finalize(rq->q); + rq->q = NULL; + cancelled++; + } + + rq->executed = true; + rq->found = false; + + } while (rqs_last_executed != rqs_last_prepared); + + internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled); + } + return REQUEST_QUEUE_EMPTY; + } + if(unlikely(!rqs)) { max_requests_ahead = get_system_cpus() / 2; @@ -1545,8 +1579,8 @@ static int replication_execute_next_pending_request(void) { queue_rounds++; } - internal_fatal(queue_rounds > 1 && !rqs[rqs_last_prepared].executed, - "REPLAY FATAL: query has not been executed!"); + internal_fatal(rqs[rqs_last_prepared].q, + "REPLAY FATAL: slot is used by query that has not been executed!"); worker_is_busy(WORKER_JOB_FIND_NEXT); rqs[rqs_last_prepared] = replication_request_get_first_available(); @@ -1562,6 +1596,8 @@ static int replication_execute_next_pending_request(void) { worker_is_busy(WORKER_JOB_PREPARE_QUERY); rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before); } + + rq->executed = false; } } while(rq->found && rqs_last_prepared != rqs_last_executed); @@ -1572,14 +1608,17 @@ static int replication_execute_next_pending_request(void) { rqs_last_executed = 0; rq = &rqs[rqs_last_executed]; - rq->executed = true; if(rq->found) { + internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!"); + if (rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(rq->sender)) { // the sender has reconnected since this request was queued, // we can safely throw it away, since the parent will resend it replication_response_cancel_and_finalize(rq->q); + rq->executed = true; rq->found = false; + rq->q = NULL; } else if (rrdpush_sender_replication_buffer_full_get(rq->sender)) { // the sender buffer is full, so we can ignore this request, @@ -1587,7 +1626,9 @@ static int replication_execute_next_pending_request(void) { // and the sender will put it back in when there is // enough room in the buffer for processing replication requests replication_response_cancel_and_finalize(rq->q); + rq->executed = true; rq->found = false; + rq->q = NULL; } else { // we can execute this, @@ -1596,6 +1637,8 @@ static int replication_execute_next_pending_request(void) { dictionary_del(rq->sender->replication.requests, string2str(rq->chart_id)); } } + else + internal_fatal(rq->q, "REPLAY FATAL: slot status says slot is empty, but it has a pending query!"); } while(!rq->found && rqs_last_executed != rqs_last_prepared); @@ -1606,7 +1649,12 @@ static int replication_execute_next_pending_request(void) { replication_set_latest_first_time(rq->after); - if(unlikely(!replication_execute_request(rq, true))) { + bool chart_found = replication_execute_request(rq, true); + rq->executed = true; + rq->found = false; + rq->q = NULL; + + if(unlikely(!chart_found)) { worker_is_idle(); return REQUEST_CHART_NOT_FOUND; } @@ -1616,6 +1664,7 @@ static int replication_execute_next_pending_request(void) { } static void replication_worker_cleanup(void *ptr __maybe_unused) { + replication_execute_next_pending_request(true); worker_unregister(); } @@ -1625,7 +1674,7 @@ static void *replication_worker_thread(void *ptr) { netdata_thread_cleanup_push(replication_worker_cleanup, ptr); while(service_running(SERVICE_REPLICATION)) { - if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) { + if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) { sender_thread_buffer_free(); worker_is_busy(WORKER_JOB_WAIT); worker_is_idle(); @@ -1641,6 +1690,8 @@ static void replication_main_cleanup(void *ptr) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; + replication_execute_next_pending_request(true); + int threads = (int)replication_globals.main_thread.threads; for(int i = 0; i < threads ;i++) { netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL); @@ -1756,7 +1807,7 @@ void *replication_thread_main(void *ptr __maybe_unused) { worker_is_idle(); } - if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) { + if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) { worker_is_busy(WORKER_JOB_WAIT); replication_recursive_lock(); |