diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-10-16 18:13:57 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-10-16 20:13:57 +0300 |
commit | 063c4179b3956b0e22367c4f00b421c438d9b656 (patch) | |
tree | 1e653e40325b247c804adf77878282bae07d2983 | |
parent | 4ea851ed63e0b2c82e11153dcb60198e4ddabc16 (diff) |
dynamic meta queue size (#16218)
* dynamic meta queue size
* meta cleanup
-rw-r--r-- | database/sqlite/sqlite_metadata.c | 105 |
1 files changed, 28 insertions, 77 deletions
diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c index 71d0a15b89..143783163c 100644 --- a/database/sqlite/sqlite_metadata.c +++ b/database/sqlite/sqlite_metadata.c @@ -95,11 +95,11 @@ struct metadata_cmd { enum metadata_opcode opcode; struct completion *completion; const void *param[MAX_PARAM_LIST]; + struct metadata_cmd *prev, *next; }; struct metadata_database_cmdqueue { - unsigned head, tail; - struct metadata_cmd cmd_array[METADATA_CMD_Q_MAX_SIZE]; + struct metadata_cmd *cmd_base; }; typedef enum { @@ -120,7 +120,6 @@ struct metadata_wc { struct completion *scan_complete; /* FIFO command queue */ uv_mutex_t cmd_mutex; - uv_cond_t cmd_cond; struct metadata_database_cmdqueue cmd_queue; }; @@ -1062,103 +1061,57 @@ static void cleanup_health_log(struct metadata_wc *wc) static void metadata_init_cmd_queue(struct metadata_wc *wc) { - wc->cmd_queue.head = wc->cmd_queue.tail = 0; - wc->queue_size = 0; - fatal_assert(0 == uv_cond_init(&wc->cmd_cond)); + wc->cmd_queue.cmd_base = NULL; fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex)); } -int metadata_enq_cmd_noblock(struct metadata_wc *wc, struct metadata_cmd *cmd) +static void metadata_free_cmd_queue(struct metadata_wc *wc) { - unsigned queue_size; - - /* wait for free space in queue */ uv_mutex_lock(&wc->cmd_mutex); - - if (cmd->opcode == METADATA_SYNC_SHUTDOWN) { - metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN); - uv_mutex_unlock(&wc->cmd_mutex); - return 0; + while(wc->cmd_queue.cmd_base) { + struct metadata_cmd *t = wc->cmd_queue.cmd_base; + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next); + freez(t); } - - if (unlikely((queue_size = wc->queue_size) == METADATA_CMD_Q_MAX_SIZE || - metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) { - uv_mutex_unlock(&wc->cmd_mutex); - return 1; - } - - fatal_assert(queue_size < METADATA_CMD_Q_MAX_SIZE); - /* enqueue command */ - wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; - wc->cmd_queue.tail = wc->cmd_queue.tail != METADATA_CMD_Q_MAX_SIZE - 1 ? - wc->cmd_queue.tail + 1 : 0; - wc->queue_size = queue_size + 1; uv_mutex_unlock(&wc->cmd_mutex); - return 0; } static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd) { - unsigned queue_size; - - /* wait for free space in queue */ - uv_mutex_lock(&wc->cmd_mutex); - if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) { - uv_mutex_unlock(&wc->cmd_mutex); - (void) uv_async_send(&wc->async); - return; - } - if (cmd->opcode == METADATA_SYNC_SHUTDOWN) { metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN); - uv_mutex_unlock(&wc->cmd_mutex); - (void) uv_async_send(&wc->async); - return; + goto wakeup_event_loop; } - while ((queue_size = wc->queue_size) == METADATA_CMD_Q_MAX_SIZE) { - if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) { - uv_mutex_unlock(&wc->cmd_mutex); - return; - } - uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex); - } - fatal_assert(queue_size < METADATA_CMD_Q_MAX_SIZE); - /* enqueue command */ - wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; - wc->cmd_queue.tail = wc->cmd_queue.tail != METADATA_CMD_Q_MAX_SIZE - 1 ? - wc->cmd_queue.tail + 1 : 0; - wc->queue_size = queue_size + 1; + if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) + goto wakeup_event_loop; + + struct metadata_cmd *t = mallocz(sizeof(*t)); + *t = *cmd; + t->prev = t->next = NULL; + + uv_mutex_lock(&wc->cmd_mutex); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next); uv_mutex_unlock(&wc->cmd_mutex); - /* wake up event loop */ +wakeup_event_loop: (void) uv_async_send(&wc->async); } static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc) { struct metadata_cmd ret; - unsigned queue_size; uv_mutex_lock(&wc->cmd_mutex); - queue_size = wc->queue_size; - if (queue_size == 0) { - memset(&ret, 0, sizeof(ret)); + if(wc->cmd_queue.cmd_base) { + struct metadata_cmd *t = wc->cmd_queue.cmd_base; + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wc->cmd_queue.cmd_base, t, prev, next); + ret = *t; + freez(t); + } + else { ret.opcode = METADATA_DATABASE_NOOP; ret.completion = NULL; - } else { - /* dequeue command */ - ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head]; - - if (queue_size == 1) { - wc->cmd_queue.head = wc->cmd_queue.tail = 0; - } else { - wc->cmd_queue.head = wc->cmd_queue.head != METADATA_CMD_Q_MAX_SIZE - 1 ? - wc->cmd_queue.head + 1 : 0; - } - wc->queue_size = queue_size - 1; - /* wake up producers */ - uv_cond_signal(&wc->cmd_cond); } uv_mutex_unlock(&wc->cmd_mutex); @@ -1187,8 +1140,7 @@ static void timer_cb(uv_timer_t* handle) if (wc->metadata_check_after && wc->metadata_check_after < now) { cmd.opcode = METADATA_SCAN_HOSTS; - if (!metadata_enq_cmd_noblock(wc, &cmd)) - wc->metadata_check_after = now + METADATA_HOST_CHECK_INTERVAL; + metadata_enq_cmd(wc, &cmd); } } @@ -1794,7 +1746,6 @@ static void metadata_event_loop(void *arg) uv_close((uv_handle_t *)&wc->timer_req, NULL); uv_close((uv_handle_t *)&wc->async, NULL); - uv_cond_destroy(&wc->cmd_cond); int rc; do { rc = uv_loop_close(loop); @@ -1808,6 +1759,7 @@ static void metadata_event_loop(void *arg) completion_mark_complete(&wc->init_complete); completion_destroy(wc->scan_complete); freez(wc->scan_complete); + metadata_free_cmd_queue(wc); return; error_after_timer_init: @@ -2024,7 +1976,6 @@ int metadata_unittest(void) // Queue items for a specific period of time metadata_unittest_threads(); - fprintf(stderr, "Items still in queue %u\n", metasync_worker.queue_size); metadata_sync_shutdown(); return 0; |