summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-10-16 18:13:57 +0100
committerGitHub <noreply@github.com>2023-10-16 20:13:57 +0300
commit063c4179b3956b0e22367c4f00b421c438d9b656 (patch)
tree1e653e40325b247c804adf77878282bae07d2983
parent4ea851ed63e0b2c82e11153dcb60198e4ddabc16 (diff)
dynamic meta queue size (#16218)
* dynamic meta queue size * meta cleanup
-rw-r--r--database/sqlite/sqlite_metadata.c105
1 files changed, 28 insertions, 77 deletions
diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c
index 71d0a15b89..143783163c 100644
--- a/database/sqlite/sqlite_metadata.c
+++ b/database/sqlite/sqlite_metadata.c
@@ -95,11 +95,11 @@ struct metadata_cmd {
enum metadata_opcode opcode;
struct completion *completion;
const void *param[MAX_PARAM_LIST];
+ struct metadata_cmd *prev, *next;
};
struct metadata_database_cmdqueue {
- unsigned head, tail;
- struct metadata_cmd cmd_array[METADATA_CMD_Q_MAX_SIZE];
+ struct metadata_cmd *cmd_base;
};
typedef enum {
@@ -120,7 +120,6 @@ struct metadata_wc {
struct completion *scan_complete;
/* FIFO command queue */
uv_mutex_t cmd_mutex;
- uv_cond_t cmd_cond;
struct metadata_database_cmdqueue cmd_queue;
};
@@ -1062,103 +1061,57 @@ static void cleanup_health_log(struct metadata_wc *wc)
static void metadata_init_cmd_queue(struct metadata_wc *wc)
{
- wc->cmd_queue.head = wc->cmd_queue.tail = 0;
- wc->queue_size = 0;
- fatal_assert(0 == uv_cond_init(&wc->cmd_cond));
+ wc->cmd_queue.cmd_base = NULL;
fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
}
-int metadata_enq_cmd_noblock(struct metadata_wc *wc, struct metadata_cmd *cmd)
+static void metadata_free_cmd_queue(struct metadata_wc *wc)
{
- unsigned queue_size;
-
- /* wait for free space in queue */
uv_mutex_lock(&wc->cmd_mutex);
-
- if (cmd->opcode == METADATA_SYNC_SHUTDOWN) {
- metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN);
- uv_mutex_unlock(&wc->cmd_mutex);
- return 0;
+ while(wc->cmd_queue.cmd_base) {
+ struct metadata_cmd *t = wc->cmd_queue.cmd_base;
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next);
+ freez(t);
}
-
- if (unlikely((queue_size = wc->queue_size) == METADATA_CMD_Q_MAX_SIZE ||
- metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
- uv_mutex_unlock(&wc->cmd_mutex);
- return 1;
- }
-
- fatal_assert(queue_size < METADATA_CMD_Q_MAX_SIZE);
- /* enqueue command */
- wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
- wc->cmd_queue.tail = wc->cmd_queue.tail != METADATA_CMD_Q_MAX_SIZE - 1 ?
- wc->cmd_queue.tail + 1 : 0;
- wc->queue_size = queue_size + 1;
uv_mutex_unlock(&wc->cmd_mutex);
- return 0;
}
static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd)
{
- unsigned queue_size;
-
- /* wait for free space in queue */
- uv_mutex_lock(&wc->cmd_mutex);
- if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
- uv_mutex_unlock(&wc->cmd_mutex);
- (void) uv_async_send(&wc->async);
- return;
- }
-
if (cmd->opcode == METADATA_SYNC_SHUTDOWN) {
metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN);
- uv_mutex_unlock(&wc->cmd_mutex);
- (void) uv_async_send(&wc->async);
- return;
+ goto wakeup_event_loop;
}
- while ((queue_size = wc->queue_size) == METADATA_CMD_Q_MAX_SIZE) {
- if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
- uv_mutex_unlock(&wc->cmd_mutex);
- return;
- }
- uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex);
- }
- fatal_assert(queue_size < METADATA_CMD_Q_MAX_SIZE);
- /* enqueue command */
- wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
- wc->cmd_queue.tail = wc->cmd_queue.tail != METADATA_CMD_Q_MAX_SIZE - 1 ?
- wc->cmd_queue.tail + 1 : 0;
- wc->queue_size = queue_size + 1;
+ if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)))
+ goto wakeup_event_loop;
+
+ struct metadata_cmd *t = mallocz(sizeof(*t));
+ *t = *cmd;
+ t->prev = t->next = NULL;
+
+ uv_mutex_lock(&wc->cmd_mutex);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next);
uv_mutex_unlock(&wc->cmd_mutex);
- /* wake up event loop */
+wakeup_event_loop:
(void) uv_async_send(&wc->async);
}
static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc)
{
struct metadata_cmd ret;
- unsigned queue_size;
uv_mutex_lock(&wc->cmd_mutex);
- queue_size = wc->queue_size;
- if (queue_size == 0) {
- memset(&ret, 0, sizeof(ret));
+ if(wc->cmd_queue.cmd_base) {
+ struct metadata_cmd *t = wc->cmd_queue.cmd_base;
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next);
+ ret = *t;
+ freez(t);
+ }
+ else {
ret.opcode = METADATA_DATABASE_NOOP;
ret.completion = NULL;
- } else {
- /* dequeue command */
- ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head];
-
- if (queue_size == 1) {
- wc->cmd_queue.head = wc->cmd_queue.tail = 0;
- } else {
- wc->cmd_queue.head = wc->cmd_queue.head != METADATA_CMD_Q_MAX_SIZE - 1 ?
- wc->cmd_queue.head + 1 : 0;
- }
- wc->queue_size = queue_size - 1;
- /* wake up producers */
- uv_cond_signal(&wc->cmd_cond);
}
uv_mutex_unlock(&wc->cmd_mutex);
@@ -1187,8 +1140,7 @@ static void timer_cb(uv_timer_t* handle)
if (wc->metadata_check_after && wc->metadata_check_after < now) {
cmd.opcode = METADATA_SCAN_HOSTS;
- if (!metadata_enq_cmd_noblock(wc, &cmd))
- wc->metadata_check_after = now + METADATA_HOST_CHECK_INTERVAL;
+ metadata_enq_cmd(wc, &cmd);
}
}
@@ -1794,7 +1746,6 @@ static void metadata_event_loop(void *arg)
uv_close((uv_handle_t *)&wc->timer_req, NULL);
uv_close((uv_handle_t *)&wc->async, NULL);
- uv_cond_destroy(&wc->cmd_cond);
int rc;
do {
rc = uv_loop_close(loop);
@@ -1808,6 +1759,7 @@ static void metadata_event_loop(void *arg)
completion_mark_complete(&wc->init_complete);
completion_destroy(wc->scan_complete);
freez(wc->scan_complete);
+ metadata_free_cmd_queue(wc);
return;
error_after_timer_init:
@@ -2024,7 +1976,6 @@ int metadata_unittest(void)
// Queue items for a specific period of time
metadata_unittest_threads();
- fprintf(stderr, "Items still in queue %u\n", metasync_worker.queue_size);
metadata_sync_shutdown();
return 0;