summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-27 01:32:20 +0200
committerGitHub <noreply@github.com>2023-01-27 01:32:20 +0200
commit57eab742c88093c89d5d46deb495558ad726e6f0 (patch)
treee8a01519a8f9df7beba4d0be7be53a9be3f1fdfd /streaming
parentc4f5524ea8279be492eb527a67242b408543382e (diff)
DBENGINE v2 - improvements part 10 (#14332)
* replication cancels pending queries on exit * log when waiting for inflight queries * when there are collected and not-collected metrics, use the context priority from the collected only * Write metadata with a faster pace * Remove journal file size limit and sync mode to 0 / Drop wal checkpoint for now * Wrap in a big transaction remaining metadata writes (test 1) * fix higher tiers when tiering iterations = 2 * dbengine always returns db-aligned points; query engine expands the queries by 2 points in every direction to have enough data for interpolation * Wrap in a big transaction metadata writes (test 2) * replication cancelling fix * do not first and last entry in replication when the db has no retention * fix internal check condition * Increase metadata write batch size * always apply error limit to dbengine logs * Remove code that processes the obsolete health.db files * cleanup in query.c * do not allow queries to go beyond db boundaries * prevent internal log for +1 delta in timestamp * detect gap pages in conflicts * double protection for gap injection in main cache * Add checkpoint to prevent large WAL while running Remove unused and duplicate functions * do not allocate chart cache dir if not needed * add more info to unittests * revert query expansion to satisfy unittests Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/replication.c65
1 files changed, 58 insertions, 7 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index 4abd20cc3d..1a7be31eea 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -1328,6 +1328,7 @@ static bool replication_execute_request(struct replication_request *rq, bool wor
replication_response_execute_and_finalize(
rq->q, (size_t)((unsigned long long)rq->sender->host->sender->buffer->max_size * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL));
+ rq->q = NULL;
netdata_thread_enable_cancelability();
__atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED);
@@ -1335,6 +1336,11 @@ static bool replication_execute_request(struct replication_request *rq, bool wor
ret = true;
cleanup:
+ if(rq->q) {
+ replication_response_cancel_and_finalize(rq->q);
+ rq->q = NULL;
+ }
+
string_freez(rq->chart_id);
worker_is_idle();
return ret;
@@ -1518,13 +1524,41 @@ static void replication_initialize_workers(bool master) {
#define REQUEST_QUEUE_EMPTY (-1)
#define REQUEST_CHART_NOT_FOUND (-2)
-static int replication_execute_next_pending_request(void) {
+static int replication_execute_next_pending_request(bool cancel) {
static __thread int max_requests_ahead = 0;
static __thread struct replication_request *rqs = NULL;
static __thread int rqs_last_executed = 0, rqs_last_prepared = 0;
static __thread size_t queue_rounds = 0; (void)queue_rounds;
struct replication_request *rq;
+ if(unlikely(cancel)) {
+ if(rqs) {
+ size_t cancelled = 0;
+ do {
+ if (++rqs_last_executed >= max_requests_ahead)
+ rqs_last_executed = 0;
+
+ rq = &rqs[rqs_last_executed];
+
+ if (rq->q) {
+ internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
+ internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq");
+
+ replication_response_cancel_and_finalize(rq->q);
+ rq->q = NULL;
+ cancelled++;
+ }
+
+ rq->executed = true;
+ rq->found = false;
+
+ } while (rqs_last_executed != rqs_last_prepared);
+
+ internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled);
+ }
+ return REQUEST_QUEUE_EMPTY;
+ }
+
if(unlikely(!rqs)) {
max_requests_ahead = get_system_cpus() / 2;
@@ -1545,8 +1579,8 @@ static int replication_execute_next_pending_request(void) {
queue_rounds++;
}
- internal_fatal(queue_rounds > 1 && !rqs[rqs_last_prepared].executed,
- "REPLAY FATAL: query has not been executed!");
+ internal_fatal(rqs[rqs_last_prepared].q,
+ "REPLAY FATAL: slot is used by query that has not been executed!");
worker_is_busy(WORKER_JOB_FIND_NEXT);
rqs[rqs_last_prepared] = replication_request_get_first_available();
@@ -1562,6 +1596,8 @@ static int replication_execute_next_pending_request(void) {
worker_is_busy(WORKER_JOB_PREPARE_QUERY);
rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before);
}
+
+ rq->executed = false;
}
} while(rq->found && rqs_last_prepared != rqs_last_executed);
@@ -1572,14 +1608,17 @@ static int replication_execute_next_pending_request(void) {
rqs_last_executed = 0;
rq = &rqs[rqs_last_executed];
- rq->executed = true;
if(rq->found) {
+ internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
+
if (rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(rq->sender)) {
// the sender has reconnected since this request was queued,
// we can safely throw it away, since the parent will resend it
replication_response_cancel_and_finalize(rq->q);
+ rq->executed = true;
rq->found = false;
+ rq->q = NULL;
}
else if (rrdpush_sender_replication_buffer_full_get(rq->sender)) {
// the sender buffer is full, so we can ignore this request,
@@ -1587,7 +1626,9 @@ static int replication_execute_next_pending_request(void) {
// and the sender will put it back in when there is
// enough room in the buffer for processing replication requests
replication_response_cancel_and_finalize(rq->q);
+ rq->executed = true;
rq->found = false;
+ rq->q = NULL;
}
else {
// we can execute this,
@@ -1596,6 +1637,8 @@ static int replication_execute_next_pending_request(void) {
dictionary_del(rq->sender->replication.requests, string2str(rq->chart_id));
}
}
+ else
+ internal_fatal(rq->q, "REPLAY FATAL: slot status says slot is empty, but it has a pending query!");
} while(!rq->found && rqs_last_executed != rqs_last_prepared);
@@ -1606,7 +1649,12 @@ static int replication_execute_next_pending_request(void) {
replication_set_latest_first_time(rq->after);
- if(unlikely(!replication_execute_request(rq, true))) {
+ bool chart_found = replication_execute_request(rq, true);
+ rq->executed = true;
+ rq->found = false;
+ rq->q = NULL;
+
+ if(unlikely(!chart_found)) {
worker_is_idle();
return REQUEST_CHART_NOT_FOUND;
}
@@ -1616,6 +1664,7 @@ static int replication_execute_next_pending_request(void) {
}
static void replication_worker_cleanup(void *ptr __maybe_unused) {
+ replication_execute_next_pending_request(true);
worker_unregister();
}
@@ -1625,7 +1674,7 @@ static void *replication_worker_thread(void *ptr) {
netdata_thread_cleanup_push(replication_worker_cleanup, ptr);
while(service_running(SERVICE_REPLICATION)) {
- if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) {
+ if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) {
sender_thread_buffer_free();
worker_is_busy(WORKER_JOB_WAIT);
worker_is_idle();
@@ -1641,6 +1690,8 @@ static void replication_main_cleanup(void *ptr) {
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+ replication_execute_next_pending_request(true);
+
int threads = (int)replication_globals.main_thread.threads;
for(int i = 0; i < threads ;i++) {
netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL);
@@ -1756,7 +1807,7 @@ void *replication_thread_main(void *ptr __maybe_unused) {
worker_is_idle();
}
- if(unlikely(replication_execute_next_pending_request() == REQUEST_QUEUE_EMPTY)) {
+ if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) {
worker_is_busy(WORKER_JOB_WAIT);
replication_recursive_lock();