diff options
author | Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> | 2023-11-21 12:00:51 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-21 12:00:51 +0200 |
commit | 85d43694355dfd31f1a57fcec68adf27327d5868 (patch) | |
tree | fad13b6b8f6ae1e473f2ad0129bb7992f8b0e9dc /database/sqlite/sqlite_aclk.c | |
parent | c0feaec456e474313e827f489b0fcf9a7c5491e8 (diff) |
Remove queue limit from ACLK sync event loop (#16411)
Code cleanup
Diffstat (limited to 'database/sqlite/sqlite_aclk.c')
-rw-r--r-- | database/sqlite/sqlite_aclk.c | 134 |
1 files changed, 43 insertions, 91 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index 3d3caeff21..3a702738aa 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -11,60 +11,46 @@ struct aclk_sync_config_s { uv_timer_t timer_req; time_t cleanup_after; // Start a cleanup after this timestamp uv_async_t async; - /* FIFO command queue */ - uv_mutex_t cmd_mutex; - uv_cond_t cmd_cond; bool initialized; - volatile unsigned queue_size; - struct aclk_database_cmdqueue cmd_queue; + SPINLOCK cmd_queue_lock; + struct aclk_database_cmd *cmd_base; } aclk_sync_config = { 0 }; - void sanity_check(void) { // make sure the compiler will stop on misconfigurations BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED); } - -int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd) +static struct aclk_database_cmd aclk_database_deq_cmd(void) { - unsigned queue_size; + struct aclk_database_cmd ret; - /* wait for free space in queue */ - uv_mutex_lock(&aclk_sync_config.cmd_mutex); - if ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) { - uv_mutex_unlock(&aclk_sync_config.cmd_mutex); - return 1; + spinlock_lock(&aclk_sync_config.cmd_queue_lock); + if(aclk_sync_config.cmd_base) { + struct aclk_database_cmd *t = aclk_sync_config.cmd_base; + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(aclk_sync_config.cmd_base, t, prev, next); + ret = *t; + freez(t); + } + else { + ret.opcode = ACLK_DATABASE_NOOP; + ret.completion = NULL; } + spinlock_unlock(&aclk_sync_config.cmd_queue_lock); - fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE); - /* enqueue command */ - aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd; - aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? - aclk_sync_config.cmd_queue.tail + 1 : 0; - aclk_sync_config.queue_size = queue_size + 1; - uv_mutex_unlock(&aclk_sync_config.cmd_mutex); - return 0; + return ret; } static void aclk_database_enq_cmd(struct aclk_database_cmd *cmd) { - unsigned queue_size; + struct aclk_database_cmd *t = mallocz(sizeof(*t)); + *t = *cmd; + t->prev = t->next = NULL; + + spinlock_lock(&aclk_sync_config.cmd_queue_lock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(aclk_sync_config.cmd_base, t, prev, next); + spinlock_unlock(&aclk_sync_config.cmd_queue_lock); - /* wait for free space in queue */ - uv_mutex_lock(&aclk_sync_config.cmd_mutex); - while ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) { - uv_cond_wait(&aclk_sync_config.cmd_cond, &aclk_sync_config.cmd_mutex); - } - fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE); - /* enqueue command */ - aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd; - aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? - aclk_sync_config.cmd_queue.tail + 1 : 0; - aclk_sync_config.queue_size = queue_size + 1; - uv_mutex_unlock(&aclk_sync_config.cmd_mutex); - - /* wake up event loop */ (void) uv_async_send(&aclk_sync_config.async); } @@ -145,35 +131,6 @@ static int create_host_callback(void *data, int argc, char **argv, char **column } #ifdef ENABLE_ACLK -static struct aclk_database_cmd aclk_database_deq_cmd(void) -{ - struct aclk_database_cmd ret; - unsigned queue_size; - - uv_mutex_lock(&aclk_sync_config.cmd_mutex); - queue_size = aclk_sync_config.queue_size; - if (queue_size == 0) { - memset(&ret, 0, sizeof(ret)); - ret.opcode = ACLK_DATABASE_NOOP; - ret.completion = NULL; - - } else { - /* dequeue command */ - ret = aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.head]; - if (queue_size == 1) { - aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0; - } else { - aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.head != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? - aclk_sync_config.cmd_queue.head + 1 : 0; - } - aclk_sync_config.queue_size = queue_size - 1; - /* wake up producers */ - uv_cond_signal(&aclk_sync_config.cmd_cond); - } - uv_mutex_unlock(&aclk_sync_config.cmd_mutex); - - return ret; -} #define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;" static int is_host_available(uuid_t *host_id) @@ -264,7 +221,7 @@ static int sql_check_aclk_table(void *data __maybe_unused, int argc __maybe_unus memset(&cmd, 0, sizeof(cmd)); cmd.opcode = ACLK_DATABASE_DELETE_HOST; cmd.param[0] = strdupz((char *) argv[0]); - aclk_database_enq_cmd_noblock(&cmd); + aclk_database_enq_cmd(&cmd); return 0; } @@ -292,7 +249,6 @@ static int sql_maint_aclk_sync_database(void *data __maybe_unused, int argc __ma return 0; } - #define SQL_SELECT_ACLK_ALERT_LIST "SELECT SUBSTR(name,12) FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table');" static void sql_maint_aclk_sync_database_all(void) @@ -307,7 +263,7 @@ static void sql_maint_aclk_sync_database_all(void) static int aclk_config_parameters(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused) { - char uuid_str[GUID_LEN + 1]; + char uuid_str[UUID_STR_LEN]; uuid_unparse_lower(*((uuid_t *) argv[0]), uuid_str); RRDHOST *host = rrdhost_find_by_guid(uuid_str); @@ -337,16 +293,15 @@ static void timer_cb(uv_timer_t *handle) time_t now = now_realtime_sec(); - if (config->cleanup_after && config->cleanup_after < now) { + if (config->cleanup_after < now) { cmd.opcode = ACLK_DATABASE_CLEANUP; - if (!aclk_database_enq_cmd_noblock(&cmd)) - config->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL; + aclk_database_enq_cmd(&cmd); + config->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL; } if (aclk_connected) { cmd.opcode = ACLK_DATABASE_PUSH_ALERT; - aclk_database_enq_cmd_noblock(&cmd); - + aclk_database_enq_cmd(&cmd); aclk_check_node_info_and_collectors(); } } @@ -417,7 +372,7 @@ static void aclk_synchronization(void *arg __maybe_unused) case ACLK_DATABASE_NODE_STATE:; RRDHOST *host = cmd.param[0]; int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0; - struct aclk_sync_host_config *ahc = host->aclk_sync_host_config; + struct aclk_sync_cfg_t *ahc = host->aclk_config; if (unlikely(!ahc)) sql_create_aclk_table(host, &host->host_uuid, host->node_id); aclk_host_state_update(host, live); @@ -447,8 +402,6 @@ static void aclk_synchronization(void *arg __maybe_unused) uv_close((uv_handle_t *)&config->timer_req, NULL); uv_close((uv_handle_t *)&config->async, NULL); -// uv_close((uv_handle_t *)&config->async_exit, NULL); - uv_cond_destroy(&config->cmd_cond); (void) uv_loop_close(loop); worker_unregister(); @@ -458,11 +411,7 @@ static void aclk_synchronization(void *arg __maybe_unused) static void aclk_synchronization_init(void) { - aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0; - aclk_sync_config.queue_size = 0; - fatal_assert(0 == uv_cond_init(&aclk_sync_config.cmd_cond)); - fatal_assert(0 == uv_mutex_init(&aclk_sync_config.cmd_mutex)); - + memset(&aclk_sync_config, 0, sizeof(aclk_sync_config)); fatal_assert(0 == uv_thread_create(&aclk_sync_config.thread, aclk_synchronization, &aclk_sync_config)); } #endif @@ -472,8 +421,8 @@ static void aclk_synchronization_init(void) void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __maybe_unused, uuid_t *node_id __maybe_unused) { #ifdef ENABLE_ACLK - char uuid_str[GUID_LEN + 1]; - char host_guid[GUID_LEN + 1]; + char uuid_str[UUID_STR_LEN]; + char host_guid[UUID_STR_LEN]; int rc; uuid_unparse_lower_fix(host_uuid, uuid_str); @@ -496,17 +445,17 @@ void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __may if (unlikely(rc)) error_report("Failed to create ACLK alert table index 2 for host %s", host ? string2str(host->hostname) : host_guid); } - if (likely(host) && unlikely(host->aclk_sync_host_config)) + if (likely(host) && unlikely(host->aclk_config)) return; if (unlikely(!host)) return; - struct aclk_sync_host_config *wc = callocz(1, sizeof(struct aclk_sync_host_config)); + struct aclk_sync_cfg_t *wc = callocz(1, sizeof(struct aclk_sync_cfg_t)); if (node_id && !uuid_is_null(*node_id)) uuid_unparse_lower(*node_id, wc->node_id); - host->aclk_sync_host_config = (void *)wc; + host->aclk_config = wc; if (node_id && !host->node_id) { host->node_id = mallocz(sizeof(*host->node_id)); uuid_copy(*host->node_id, *node_id); @@ -520,12 +469,15 @@ void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __may #endif } -#define SQL_FETCH_ALL_HOSTS "SELECT host_id, hostname, registry_hostname, update_every, os, " \ - "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " \ +#define SQL_FETCH_ALL_HOSTS \ + "SELECT host_id, hostname, registry_hostname, update_every, os, " \ + "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " \ "program_version, entries, health_enabled, last_connected FROM host WHERE hops >0;" -#define SQL_FETCH_ALL_INSTANCES "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni " \ - "WHERE h.host_id = ni.host_id AND ni.node_id IS NOT NULL; " +#define SQL_FETCH_ALL_INSTANCES \ + "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni " \ + "WHERE h.host_id = ni.host_id AND ni.node_id IS NOT NULL; " + void sql_aclk_sync_init(void) { char *err_msg = NULL; |