summaryrefslogtreecommitdiffstats
path: root/database/sqlite/sqlite_aclk.c
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2023-11-21 12:00:51 +0200
committerGitHub <noreply@github.com>2023-11-21 12:00:51 +0200
commit85d43694355dfd31f1a57fcec68adf27327d5868 (patch)
treefad13b6b8f6ae1e473f2ad0129bb7992f8b0e9dc /database/sqlite/sqlite_aclk.c
parentc0feaec456e474313e827f489b0fcf9a7c5491e8 (diff)
Remove queue limit from ACLK sync event loop (#16411)
Code cleanup
Diffstat (limited to 'database/sqlite/sqlite_aclk.c')
-rw-r--r--database/sqlite/sqlite_aclk.c134
1 files changed, 43 insertions, 91 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c
index 3d3caeff21..3a702738aa 100644
--- a/database/sqlite/sqlite_aclk.c
+++ b/database/sqlite/sqlite_aclk.c
@@ -11,60 +11,46 @@ struct aclk_sync_config_s {
uv_timer_t timer_req;
time_t cleanup_after; // Start a cleanup after this timestamp
uv_async_t async;
- /* FIFO command queue */
- uv_mutex_t cmd_mutex;
- uv_cond_t cmd_cond;
bool initialized;
- volatile unsigned queue_size;
- struct aclk_database_cmdqueue cmd_queue;
+ SPINLOCK cmd_queue_lock;
+ struct aclk_database_cmd *cmd_base;
} aclk_sync_config = { 0 };
-
void sanity_check(void) {
// make sure the compiler will stop on misconfigurations
BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED);
}
-
-int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd)
+static struct aclk_database_cmd aclk_database_deq_cmd(void)
{
- unsigned queue_size;
+ struct aclk_database_cmd ret;
- /* wait for free space in queue */
- uv_mutex_lock(&aclk_sync_config.cmd_mutex);
- if ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) {
- uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
- return 1;
+ spinlock_lock(&aclk_sync_config.cmd_queue_lock);
+ if(aclk_sync_config.cmd_base) {
+ struct aclk_database_cmd *t = aclk_sync_config.cmd_base;
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(aclk_sync_config.cmd_base, t, prev, next);
+ ret = *t;
+ freez(t);
+ }
+ else {
+ ret.opcode = ACLK_DATABASE_NOOP;
+ ret.completion = NULL;
}
+ spinlock_unlock(&aclk_sync_config.cmd_queue_lock);
- fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE);
- /* enqueue command */
- aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd;
- aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
- aclk_sync_config.cmd_queue.tail + 1 : 0;
- aclk_sync_config.queue_size = queue_size + 1;
- uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
- return 0;
+ return ret;
}
static void aclk_database_enq_cmd(struct aclk_database_cmd *cmd)
{
- unsigned queue_size;
+ struct aclk_database_cmd *t = mallocz(sizeof(*t));
+ *t = *cmd;
+ t->prev = t->next = NULL;
+
+ spinlock_lock(&aclk_sync_config.cmd_queue_lock);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(aclk_sync_config.cmd_base, t, prev, next);
+ spinlock_unlock(&aclk_sync_config.cmd_queue_lock);
- /* wait for free space in queue */
- uv_mutex_lock(&aclk_sync_config.cmd_mutex);
- while ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) {
- uv_cond_wait(&aclk_sync_config.cmd_cond, &aclk_sync_config.cmd_mutex);
- }
- fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE);
- /* enqueue command */
- aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd;
- aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
- aclk_sync_config.cmd_queue.tail + 1 : 0;
- aclk_sync_config.queue_size = queue_size + 1;
- uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
-
- /* wake up event loop */
(void) uv_async_send(&aclk_sync_config.async);
}
@@ -145,35 +131,6 @@ static int create_host_callback(void *data, int argc, char **argv, char **column
}
#ifdef ENABLE_ACLK
-static struct aclk_database_cmd aclk_database_deq_cmd(void)
-{
- struct aclk_database_cmd ret;
- unsigned queue_size;
-
- uv_mutex_lock(&aclk_sync_config.cmd_mutex);
- queue_size = aclk_sync_config.queue_size;
- if (queue_size == 0) {
- memset(&ret, 0, sizeof(ret));
- ret.opcode = ACLK_DATABASE_NOOP;
- ret.completion = NULL;
-
- } else {
- /* dequeue command */
- ret = aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.head];
- if (queue_size == 1) {
- aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0;
- } else {
- aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.head != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
- aclk_sync_config.cmd_queue.head + 1 : 0;
- }
- aclk_sync_config.queue_size = queue_size - 1;
- /* wake up producers */
- uv_cond_signal(&aclk_sync_config.cmd_cond);
- }
- uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
-
- return ret;
-}
#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;"
static int is_host_available(uuid_t *host_id)
@@ -264,7 +221,7 @@ static int sql_check_aclk_table(void *data __maybe_unused, int argc __maybe_unus
memset(&cmd, 0, sizeof(cmd));
cmd.opcode = ACLK_DATABASE_DELETE_HOST;
cmd.param[0] = strdupz((char *) argv[0]);
- aclk_database_enq_cmd_noblock(&cmd);
+ aclk_database_enq_cmd(&cmd);
return 0;
}
@@ -292,7 +249,6 @@ static int sql_maint_aclk_sync_database(void *data __maybe_unused, int argc __ma
return 0;
}
-
#define SQL_SELECT_ACLK_ALERT_LIST "SELECT SUBSTR(name,12) FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table');"
static void sql_maint_aclk_sync_database_all(void)
@@ -307,7 +263,7 @@ static void sql_maint_aclk_sync_database_all(void)
static int aclk_config_parameters(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused)
{
- char uuid_str[GUID_LEN + 1];
+ char uuid_str[UUID_STR_LEN];
uuid_unparse_lower(*((uuid_t *) argv[0]), uuid_str);
RRDHOST *host = rrdhost_find_by_guid(uuid_str);
@@ -337,16 +293,15 @@ static void timer_cb(uv_timer_t *handle)
time_t now = now_realtime_sec();
- if (config->cleanup_after && config->cleanup_after < now) {
+ if (config->cleanup_after < now) {
cmd.opcode = ACLK_DATABASE_CLEANUP;
- if (!aclk_database_enq_cmd_noblock(&cmd))
- config->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL;
+ aclk_database_enq_cmd(&cmd);
+ config->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL;
}
if (aclk_connected) {
cmd.opcode = ACLK_DATABASE_PUSH_ALERT;
- aclk_database_enq_cmd_noblock(&cmd);
-
+ aclk_database_enq_cmd(&cmd);
aclk_check_node_info_and_collectors();
}
}
@@ -417,7 +372,7 @@ static void aclk_synchronization(void *arg __maybe_unused)
case ACLK_DATABASE_NODE_STATE:;
RRDHOST *host = cmd.param[0];
int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0;
- struct aclk_sync_host_config *ahc = host->aclk_sync_host_config;
+ struct aclk_sync_cfg_t *ahc = host->aclk_config;
if (unlikely(!ahc))
sql_create_aclk_table(host, &host->host_uuid, host->node_id);
aclk_host_state_update(host, live);
@@ -447,8 +402,6 @@ static void aclk_synchronization(void *arg __maybe_unused)
uv_close((uv_handle_t *)&config->timer_req, NULL);
uv_close((uv_handle_t *)&config->async, NULL);
-// uv_close((uv_handle_t *)&config->async_exit, NULL);
- uv_cond_destroy(&config->cmd_cond);
(void) uv_loop_close(loop);
worker_unregister();
@@ -458,11 +411,7 @@ static void aclk_synchronization(void *arg __maybe_unused)
static void aclk_synchronization_init(void)
{
- aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0;
- aclk_sync_config.queue_size = 0;
- fatal_assert(0 == uv_cond_init(&aclk_sync_config.cmd_cond));
- fatal_assert(0 == uv_mutex_init(&aclk_sync_config.cmd_mutex));
-
+ memset(&aclk_sync_config, 0, sizeof(aclk_sync_config));
fatal_assert(0 == uv_thread_create(&aclk_sync_config.thread, aclk_synchronization, &aclk_sync_config));
}
#endif
@@ -472,8 +421,8 @@ static void aclk_synchronization_init(void)
void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __maybe_unused, uuid_t *node_id __maybe_unused)
{
#ifdef ENABLE_ACLK
- char uuid_str[GUID_LEN + 1];
- char host_guid[GUID_LEN + 1];
+ char uuid_str[UUID_STR_LEN];
+ char host_guid[UUID_STR_LEN];
int rc;
uuid_unparse_lower_fix(host_uuid, uuid_str);
@@ -496,17 +445,17 @@ void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __may
if (unlikely(rc))
error_report("Failed to create ACLK alert table index 2 for host %s", host ? string2str(host->hostname) : host_guid);
}
- if (likely(host) && unlikely(host->aclk_sync_host_config))
+ if (likely(host) && unlikely(host->aclk_config))
return;
if (unlikely(!host))
return;
- struct aclk_sync_host_config *wc = callocz(1, sizeof(struct aclk_sync_host_config));
+ struct aclk_sync_cfg_t *wc = callocz(1, sizeof(struct aclk_sync_cfg_t));
if (node_id && !uuid_is_null(*node_id))
uuid_unparse_lower(*node_id, wc->node_id);
- host->aclk_sync_host_config = (void *)wc;
+ host->aclk_config = wc;
if (node_id && !host->node_id) {
host->node_id = mallocz(sizeof(*host->node_id));
uuid_copy(*host->node_id, *node_id);
@@ -520,12 +469,15 @@ void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __may
#endif
}
-#define SQL_FETCH_ALL_HOSTS "SELECT host_id, hostname, registry_hostname, update_every, os, " \
- "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " \
+#define SQL_FETCH_ALL_HOSTS \
+ "SELECT host_id, hostname, registry_hostname, update_every, os, " \
+ "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " \
"program_version, entries, health_enabled, last_connected FROM host WHERE hops >0;"
-#define SQL_FETCH_ALL_INSTANCES "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni " \
- "WHERE h.host_id = ni.host_id AND ni.node_id IS NOT NULL; "
+#define SQL_FETCH_ALL_INSTANCES \
+ "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni " \
+ "WHERE h.host_id = ni.host_id AND ni.node_id IS NOT NULL; "
+
void sql_aclk_sync_init(void)
{
char *err_msg = NULL;