summaryrefslogtreecommitdiffstats
path: root/database/engine/rrdengine.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r--database/engine/rrdengine.c160
1 files changed, 110 insertions, 50 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index fae6656d93..7811a5eaaf 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -16,6 +16,24 @@ unsigned rrdeng_pages_per_extent = MAX_PAGES_PER_EXTENT;
#error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2)
#endif
+struct rrdeng_cmd {
+ struct rrdengine_instance *ctx;
+ enum rrdeng_opcode opcode;
+ void *data;
+ struct completion *completion;
+ enum storage_priority priority;
+ dequeue_callback_t dequeue_cb;
+
+ struct {
+ struct rrdeng_cmd *prev;
+ struct rrdeng_cmd *next;
+ } queue;
+};
+
+static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker);
+static inline void worker_dispatch_extent_read(struct rrdeng_cmd cmd, bool from_worker);
+static inline void worker_dispatch_query_prep(struct rrdeng_cmd cmd, bool from_worker);
+
struct rrdeng_main {
uv_thread_t thread;
uv_loop_t loop;
@@ -45,7 +63,6 @@ struct rrdeng_main {
struct {
size_t dispatched;
size_t executing;
- size_t pending_cb;
} atomics;
} work_cmd;
@@ -132,8 +149,22 @@ static void work_request_init(void) {
);
}
-static inline bool work_request_full(void) {
- return __atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED) >= (size_t)(libuv_worker_threads - RESERVED_LIBUV_WORKER_THREADS);
+enum LIBUV_WORKERS_STATUS {
+ LIBUV_WORKERS_RELAXED,
+ LIBUV_WORKERS_STRESSED,
+ LIBUV_WORKERS_CRITICAL,
+};
+
+static inline enum LIBUV_WORKERS_STATUS work_request_full(void) {
+ size_t dispatched = __atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED);
+
+ if(dispatched >= (size_t)(libuv_worker_threads))
+ return LIBUV_WORKERS_CRITICAL;
+
+ else if(dispatched >= (size_t)(libuv_worker_threads - RESERVED_LIBUV_WORKER_THREADS))
+ return LIBUV_WORKERS_STRESSED;
+
+ return LIBUV_WORKERS_RELAXED;
}
static inline void work_done(struct rrdeng_work *work_request) {
@@ -147,12 +178,38 @@ static void work_standard_worker(uv_work_t *req) {
worker_is_busy(UV_EVENT_WORKER_INIT);
struct rrdeng_work *work_request = req->data;
+
work_request->data = work_request->work_cb(work_request->ctx, work_request->data, work_request->completion, req);
worker_is_idle();
+ if(work_request->opcode == RRDENG_OPCODE_EXTENT_READ || work_request->opcode == RRDENG_OPCODE_QUERY) {
+ internal_fatal(work_request->after_work_cb != NULL, "DBENGINE: opcodes with a callback should not boosted");
+
+ while(1) {
+ struct rrdeng_cmd cmd = rrdeng_deq_cmd(true);
+ if (cmd.opcode == RRDENG_OPCODE_NOOP)
+ break;
+
+ worker_is_busy(UV_EVENT_WORKER_INIT);
+ switch (cmd.opcode) {
+ case RRDENG_OPCODE_EXTENT_READ:
+ worker_dispatch_extent_read(cmd, true);
+ break;
+
+ case RRDENG_OPCODE_QUERY:
+ worker_dispatch_query_prep(cmd, true);
+ break;
+
+ default:
+ fatal("DBENGINE: Opcode should not be executed synchronously");
+ break;
+ }
+ worker_is_idle();
+ }
+ }
+
__atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.dispatched, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.executing, 1, __ATOMIC_RELAXED);
- __atomic_add_fetch(&rrdeng_main.work_cmd.atomics.pending_cb, 1, __ATOMIC_RELAXED);
// signal the event loop a worker is available
fatal_assert(0 == uv_async_send(&rrdeng_main.async));
@@ -167,7 +224,6 @@ static void after_work_standard_callback(uv_work_t* req, int status) {
work_request->after_work_cb(work_request->ctx, work_request->data, work_request->completion, req, status);
work_done(work_request);
- __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.pending_cb, 1, __ATOMIC_RELAXED);
worker_is_idle();
}
@@ -369,20 +425,6 @@ void wal_release(WAL *wal) {
// ----------------------------------------------------------------------------
// command queue cache
-struct rrdeng_cmd {
- struct rrdengine_instance *ctx;
- enum rrdeng_opcode opcode;
- void *data;
- struct completion *completion;
- enum storage_priority priority;
- dequeue_callback_t dequeue_cb;
-
- struct {
- struct rrdeng_cmd *prev;
- struct rrdeng_cmd *next;
- } queue;
-};
-
static void rrdeng_cmd_queue_init(void) {
rrdeng_main.cmd_queue.ar = aral_create("dbengine-opcodes",
sizeof(struct rrdeng_cmd),
@@ -465,14 +507,33 @@ static inline bool rrdeng_cmd_has_waiting_opcodes_in_lower_priorities(STORAGE_PR
return false;
}
-static inline struct rrdeng_cmd rrdeng_deq_cmd(void) {
+#define opcode_empty (struct rrdeng_cmd) { \
+ .ctx = NULL, \
+ .opcode = RRDENG_OPCODE_NOOP, \
+ .priority = STORAGE_PRIORITY_BEST_EFFORT, \
+ .completion = NULL, \
+ .data = NULL, \
+}
+
+static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker) {
struct rrdeng_cmd *cmd = NULL;
+ enum LIBUV_WORKERS_STATUS status = work_request_full();
- STORAGE_PRIORITY max_priority = work_request_full() ? STORAGE_PRIORITY_INTERNAL_DBENGINE : STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE - 1;
+ STORAGE_PRIORITY min_priority, max_priority;
+ min_priority = STORAGE_PRIORITY_INTERNAL_DBENGINE;
+ max_priority = (status != LIBUV_WORKERS_RELAXED) ? STORAGE_PRIORITY_INTERNAL_DBENGINE : STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE - 1;
+
+ if(from_worker) {
+ if(status == LIBUV_WORKERS_CRITICAL)
+ return opcode_empty;
+
+ min_priority = STORAGE_PRIORITY_INTERNAL_QUERY_PREP;
+ max_priority = STORAGE_PRIORITY_BEST_EFFORT;
+ }
// find an opcode to execute from the queue
netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
- for(STORAGE_PRIORITY priority = STORAGE_PRIORITY_INTERNAL_DBENGINE; priority <= max_priority ; priority++) {
+ for(STORAGE_PRIORITY priority = min_priority; priority <= max_priority ; priority++) {
cmd = rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority];
if(cmd) {
@@ -508,13 +569,7 @@ static inline struct rrdeng_cmd rrdeng_deq_cmd(void) {
aral_freez(rrdeng_main.cmd_queue.ar, cmd);
}
else
- ret = (struct rrdeng_cmd) {
- .ctx = NULL,
- .opcode = RRDENG_OPCODE_NOOP,
- .priority = STORAGE_PRIORITY_BEST_EFFORT,
- .completion = NULL,
- .data = NULL,
- };
+ ret = opcode_empty;
return ret;
}
@@ -1353,14 +1408,9 @@ static void *cache_evict_tp_worker(struct rrdengine_instance *ctx __maybe_unused
return data;
}
-static void after_prep_query(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
- ;
-}
-
static void *query_prep_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *req __maybe_unused) {
- worker_is_busy(UV_EVENT_DBENGINE_QUERY);
PDC *pdc = data;
- rrdeng_prep_query(pdc);
+ rrdeng_prep_query(pdc, true);
return data;
}
@@ -1484,10 +1534,6 @@ static void after_do_cache_evict(struct rrdengine_instance *ctx __maybe_unused,
rrdeng_main.evictions_running--;
}
-static void after_extent_read(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
- ;
-}
-
static void after_journal_v2_indexing(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
__atomic_store_n(&ctx->atomic.migration_to_v2_running, false, __ATOMIC_RELAXED);
rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
@@ -1616,6 +1662,26 @@ bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) {
return true;
}
+static inline void worker_dispatch_extent_read(struct rrdeng_cmd cmd, bool from_worker) {
+ struct rrdengine_instance *ctx = cmd.ctx;
+ EPDL *epdl = cmd.data;
+
+ if(from_worker)
+ epdl_find_extent_and_populate_pages(ctx, epdl, true);
+ else
+ work_dispatch(ctx, epdl, NULL, cmd.opcode, extent_read_tp_worker, NULL);
+}
+
+static inline void worker_dispatch_query_prep(struct rrdeng_cmd cmd, bool from_worker) {
+ struct rrdengine_instance *ctx = cmd.ctx;
+ PDC *pdc = cmd.data;
+
+ if(from_worker)
+ rrdeng_prep_query(pdc, true);
+ else
+ work_dispatch(ctx, pdc, NULL, cmd.opcode, query_prep_tp_worker, NULL);
+}
+
void dbengine_event_loop(void* arg) {
sanity_check();
uv_thread_set_name_np(pthread_self(), "DBENGINE");
@@ -1673,25 +1739,19 @@ void dbengine_event_loop(void* arg) {
/* wait for commands */
do {
worker_is_busy(RRDENG_OPCODE_MAX);
- cmd = rrdeng_deq_cmd();
+ cmd = rrdeng_deq_cmd(RRDENG_OPCODE_NOOP);
opcode = cmd.opcode;
worker_is_busy(opcode);
switch (opcode) {
- case RRDENG_OPCODE_EXTENT_READ: {
- struct rrdengine_instance *ctx = cmd.ctx;
- EPDL *epdl = cmd.data;
- work_dispatch(ctx, epdl, NULL, opcode, extent_read_tp_worker, after_extent_read);
+ case RRDENG_OPCODE_EXTENT_READ:
+ worker_dispatch_extent_read(cmd, false);
break;
- }
- case RRDENG_OPCODE_QUERY: {
- struct rrdengine_instance *ctx = cmd.ctx;
- PDC *pdc = cmd.data;
- work_dispatch(ctx, pdc, NULL, opcode, query_prep_tp_worker, after_prep_query);
+ case RRDENG_OPCODE_QUERY:
+ worker_dispatch_query_prep(cmd, false);
break;
- }
case RRDENG_OPCODE_EXTENT_WRITE: {
struct rrdengine_instance *ctx = cmd.ctx;