diff options
author | Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> | 2023-11-21 12:00:51 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-21 12:00:51 +0200 |
commit | 85d43694355dfd31f1a57fcec68adf27327d5868 (patch) | |
tree | fad13b6b8f6ae1e473f2ad0129bb7992f8b0e9dc /database | |
parent | c0feaec456e474313e827f489b0fcf9a7c5491e8 (diff) |
Remove queue limit from ACLK sync event loop (#16411)
Code cleanup
Diffstat (limited to 'database')
-rw-r--r-- | database/rrd.h | 2 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk.c | 134 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk.h | 25 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_alert.c | 350 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_alert.h | 2 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_node.c | 6 | ||||
-rw-r--r-- | database/sqlite/sqlite_functions.c | 2 |
7 files changed, 235 insertions, 286 deletions
diff --git a/database/rrd.h b/database/rrd.h index 3687497ffd..341a29e501 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -1273,7 +1273,7 @@ struct rrdhost { struct sender_state *sender; netdata_thread_t rrdpush_sender_thread; // the sender thread size_t rrdpush_sender_replicating_charts; // the number of charts currently being replicated to a parent - void *aclk_sync_host_config; + struct aclk_sync_cfg_t *aclk_config; uint32_t rrdpush_receiver_connection_counter; // the number of times this receiver has connected uint32_t rrdpush_sender_connection_counter; // the number of times this sender has connected diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index 3d3caeff21..3a702738aa 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -11,60 +11,46 @@ struct aclk_sync_config_s { uv_timer_t timer_req; time_t cleanup_after; // Start a cleanup after this timestamp uv_async_t async; - /* FIFO command queue */ - uv_mutex_t cmd_mutex; - uv_cond_t cmd_cond; bool initialized; - volatile unsigned queue_size; - struct aclk_database_cmdqueue cmd_queue; + SPINLOCK cmd_queue_lock; + struct aclk_database_cmd *cmd_base; } aclk_sync_config = { 0 }; - void sanity_check(void) { // make sure the compiler will stop on misconfigurations BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED); } - -int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd) +static struct aclk_database_cmd aclk_database_deq_cmd(void) { - unsigned queue_size; + struct aclk_database_cmd ret; - /* wait for free space in queue */ - uv_mutex_lock(&aclk_sync_config.cmd_mutex); - if ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) { - uv_mutex_unlock(&aclk_sync_config.cmd_mutex); - return 1; + spinlock_lock(&aclk_sync_config.cmd_queue_lock); + if(aclk_sync_config.cmd_base) { + struct aclk_database_cmd *t = aclk_sync_config.cmd_base; + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(aclk_sync_config.cmd_base, t, prev, next); + ret = *t; + freez(t); + } + else { + ret.opcode = ACLK_DATABASE_NOOP; + ret.completion = NULL; } + spinlock_unlock(&aclk_sync_config.cmd_queue_lock); - fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE); - /* enqueue command */ - aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd; - aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? - aclk_sync_config.cmd_queue.tail + 1 : 0; - aclk_sync_config.queue_size = queue_size + 1; - uv_mutex_unlock(&aclk_sync_config.cmd_mutex); - return 0; + return ret; } static void aclk_database_enq_cmd(struct aclk_database_cmd *cmd) { - unsigned queue_size; + struct aclk_database_cmd *t = mallocz(sizeof(*t)); + *t = *cmd; + t->prev = t->next = NULL; + + spinlock_lock(&aclk_sync_config.cmd_queue_lock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(aclk_sync_config.cmd_base, t, prev, next); + spinlock_unlock(&aclk_sync_config.cmd_queue_lock); - /* wait for free space in queue */ - uv_mutex_lock(&aclk_sync_config.cmd_mutex); - while ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) { - uv_cond_wait(&aclk_sync_config.cmd_cond, &aclk_sync_config.cmd_mutex); - } - fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE); - /* enqueue command */ - aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd; - aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? - aclk_sync_config.cmd_queue.tail + 1 : 0; - aclk_sync_config.queue_size = queue_size + 1; - uv_mutex_unlock(&aclk_sync_config.cmd_mutex); - - /* wake up event loop */ (void) uv_async_send(&aclk_sync_config.async); } @@ -145,35 +131,6 @@ static int create_host_callback(void *data, int argc, char **argv, char **column } #ifdef ENABLE_ACLK -static struct aclk_database_cmd aclk_database_deq_cmd(void) -{ - struct aclk_database_cmd ret; - unsigned queue_size; - - uv_mutex_lock(&aclk_sync_config.cmd_mutex); - queue_size = aclk_sync_config.queue_size; - if (queue_size == 0) { - memset(&ret, 0, sizeof(ret)); - ret.opcode = ACLK_DATABASE_NOOP; - ret.completion = NULL; - - } else { - /* dequeue command */ - ret = aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.head]; - if (queue_size == 1) { - aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0; - } else { - aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.head != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? - aclk_sync_config.cmd_queue.head + 1 : 0; - } - aclk_sync_config.queue_size = queue_size - 1; - /* wake up producers */ - uv_cond_signal(&aclk_sync_config.cmd_cond); - } - uv_mutex_unlock(&aclk_sync_config.cmd_mutex); - - return ret; -} #define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;" static int is_host_available(uuid_t *host_id) @@ -264,7 +221,7 @@ static int sql_check_aclk_table(void *data __maybe_unused, int argc __maybe_unus memset(&cmd, 0, sizeof(cmd)); cmd.opcode = ACLK_DATABASE_DELETE_HOST; cmd.param[0] = strdupz((char *) argv[0]); - aclk_database_enq_cmd_noblock(&cmd); + aclk_database_enq_cmd(&cmd); return 0; } @@ -292,7 +249,6 @@ static int sql_maint_aclk_sync_database(void *data __maybe_unused, int argc __ma return 0; } - #define SQL_SELECT_ACLK_ALERT_LIST "SELECT SUBSTR(name,12) FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table');" static void sql_maint_aclk_sync_database_all(void) @@ -307,7 +263,7 @@ static void sql_maint_aclk_sync_database_all(void) static int aclk_config_parameters(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused) { - char uuid_str[GUID_LEN + 1]; + char uuid_str[UUID_STR_LEN]; uuid_unparse_lower(*((uuid_t *) argv[0]), uuid_str); RRDHOST *host = rrdhost_find_by_guid(uuid_str); @@ -337,16 +293,15 @@ static void timer_cb(uv_timer_t *handle) time_t now = now_realtime_sec(); - if (config->cleanup_after && config->cleanup_after < now) { + if (config->cleanup_after < now) { cmd.opcode = ACLK_DATABASE_CLEANUP; - if (!aclk_database_enq_cmd_noblock(&cmd)) - config->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL; + aclk_database_enq_cmd(&cmd); + config->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL; } if (aclk_connected) { cmd.opcode = ACLK_DATABASE_PUSH_ALERT; - aclk_database_enq_cmd_noblock(&cmd); - + aclk_database_enq_cmd(&cmd); aclk_check_node_info_and_collectors(); } } @@ -417,7 +372,7 @@ static void aclk_synchronization(void *arg __maybe_unused) case ACLK_DATABASE_NODE_STATE:; RRDHOST *host = cmd.param[0]; int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0; - struct aclk_sync_host_config *ahc = host->aclk_sync_host_config; + struct aclk_sync_cfg_t *ahc = host->aclk_config; if (unlikely(!ahc)) sql_create_aclk_table(host, &host->host_uuid, host->node_id); aclk_host_state_update(host, live); @@ -447,8 +402,6 @@ static void aclk_synchronization(void *arg __maybe_unused) uv_close((uv_handle_t *)&config->timer_req, NULL); uv_close((uv_handle_t *)&config->async, NULL); -// uv_close((uv_handle_t *)&config->async_exit, NULL); - uv_cond_destroy(&config->cmd_cond); (void) uv_loop_close(loop); worker_unregister(); @@ -458,11 +411,7 @@ static void aclk_synchronization(void *arg __maybe_unused) static void aclk_synchronization_init(void) { - aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0; - aclk_sync_config.queue_size = 0; - fatal_assert(0 == uv_cond_init(&aclk_sync_config.cmd_cond)); - fatal_assert(0 == uv_mutex_init(&aclk_sync_config.cmd_mutex)); - + memset(&aclk_sync_config, 0, sizeof(aclk_sync_config)); fatal_assert(0 == uv_thread_create(&aclk_sync_config.thread, aclk_synchronization, &aclk_sync_config)); } #endif @@ -472,8 +421,8 @@ static void aclk_synchronization_init(void) void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __maybe_unused, uuid_t *node_id __maybe_unused) { #ifdef ENABLE_ACLK - char uuid_str[GUID_LEN + 1]; - char host_guid[GUID_LEN + 1]; + char uuid_str[UUID_STR_LEN]; + char host_guid[UUID_STR_LEN]; int rc; uuid_unparse_lower_fix(host_uuid, uuid_str); @@ -496,17 +445,17 @@ void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __may if (unlikely(rc)) error_report("Failed to create ACLK alert table index 2 for host %s", host ? string2str(host->hostname) : host_guid); } - if (likely(host) && unlikely(host->aclk_sync_host_config)) + if (likely(host) && unlikely(host->aclk_config)) return; if (unlikely(!host)) return; - struct aclk_sync_host_config *wc = callocz(1, sizeof(struct aclk_sync_host_config)); + struct aclk_sync_cfg_t *wc = callocz(1, sizeof(struct aclk_sync_cfg_t)); if (node_id && !uuid_is_null(*node_id)) uuid_unparse_lower(*node_id, wc->node_id); - host->aclk_sync_host_config = (void *)wc; + host->aclk_config = wc; if (node_id && !host->node_id) { host->node_id = mallocz(sizeof(*host->node_id)); uuid_copy(*host->node_id, *node_id); @@ -520,12 +469,15 @@ void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __may #endif } -#define SQL_FETCH_ALL_HOSTS "SELECT host_id, hostname, registry_hostname, update_every, os, " \ - "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " \ +#define SQL_FETCH_ALL_HOSTS \ + "SELECT host_id, hostname, registry_hostname, update_every, os, " \ + "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " \ "program_version, entries, health_enabled, last_connected FROM host WHERE hops >0;" -#define SQL_FETCH_ALL_INSTANCES "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni " \ - "WHERE h.host_id = ni.host_id AND ni.node_id IS NOT NULL; " +#define SQL_FETCH_ALL_INSTANCES \ + "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni " \ + "WHERE h.host_id = ni.host_id AND ni.node_id IS NOT NULL; " + void sql_aclk_sync_init(void) { char *err_msg = NULL; diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h index 850ca434e2..a8d0a274a9 100644 --- a/database/sqlite/sqlite_aclk.h +++ b/database/sqlite/sqlite_aclk.h @@ -5,7 +5,6 @@ #include "sqlite3.h" - #ifndef ACLK_MAX_CHART_BATCH #define ACLK_MAX_CHART_BATCH (200) #endif @@ -41,11 +40,11 @@ static inline int claimed() return localhost->aclk_state.claimed_id != NULL; } -#define TABLE_ACLK_ALERT "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \ - "alert_unique_id, date_created, date_submitted, date_cloud_ack, filtered_alert_unique_id NOT NULL, " \ - "unique(alert_unique_id));" +#define TABLE_ACLK_ALERT \ + "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \ + "alert_unique_id, date_created, date_submitted, date_cloud_ack, filtered_alert_unique_id NOT NULL, " \ + "UNIQUE(alert_unique_id));" -#define INDEX_ACLK_ALERT "CREATE INDEX IF NOT EXISTS aclk_alert_index_%s ON aclk_alert_%s (alert_unique_id);" #define INDEX_ACLK_ALERT1 "CREATE INDEX IF NOT EXISTS aclk_alert_index1_%s ON aclk_alert_%s (filtered_alert_unique_id);" #define INDEX_ACLK_ALERT2 "CREATE INDEX IF NOT EXISTS aclk_alert_index2_%s ON aclk_alert_%s (date_submitted);" @@ -71,16 +70,10 @@ struct aclk_database_cmd { enum aclk_database_opcode opcode; void *param[2]; struct completion *completion; + struct aclk_database_cmd *prev, *next; }; -#define ACLK_DATABASE_CMD_Q_MAX_SIZE (1024) - -struct aclk_database_cmdqueue { - unsigned head, tail; - struct aclk_database_cmd cmd_array[ACLK_DATABASE_CMD_Q_MAX_SIZE]; -}; - -struct aclk_sync_host_config { +typedef struct aclk_sync_cfg_t { RRDHOST *host; int alert_updates; int alert_checkpoint_req; @@ -92,16 +85,12 @@ struct aclk_sync_host_config { char *alerts_snapshot_uuid; // will contain the snapshot_uuid value if snapshot was requested uint64_t alerts_log_first_sequence_id; uint64_t alerts_log_last_sequence_id; -}; - -extern sqlite3 *db_meta; +} aclk_sync_cfg_t; -int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd); void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id); void sql_aclk_sync_init(void); void aclk_push_alert_config(const char *node_id, const char *config_hash); void aclk_push_node_alert_snapshot(const char *node_id); -void aclk_push_node_health_log(const char *node_id); void aclk_push_node_removed_alerts(const char *node_id); void schedule_node_info_update(RRDHOST *host); diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c index 89c75f7a89..62ffab78a5 100644 --- a/database/sqlite/sqlite_aclk_alert.c +++ b/database/sqlite/sqlite_aclk_alert.c @@ -13,16 +13,42 @@ sqlite3_column_bytes((res), (_param)) ? strdupz((char *)sqlite3_column_text((res), (_param))) : NULL; \ }) - #define SQL_UPDATE_FILTERED_ALERT \ - "UPDATE aclk_alert_%s SET filtered_alert_unique_id = %u, date_created = unixepoch() where filtered_alert_unique_id = %u" + "UPDATE aclk_alert_%s SET filtered_alert_unique_id = @new_alert, date_created = UNIXEPOCH() " \ + "WHERE filtered_alert_unique_id = @old_alert" -static void update_filtered(ALARM_ENTRY *ae, uint32_t unique_id, char *uuid_str) +static void update_filtered(ALARM_ENTRY *ae, int64_t unique_id, char *uuid_str) { + sqlite3_stmt *res = NULL; + char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, SQL_UPDATE_FILTERED_ALERT, uuid_str, ae->unique_id, unique_id); - sqlite3_exec_monitored(db_meta, sql, 0, 0, NULL); - ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; + snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, SQL_UPDATE_FILTERED_ALERT, uuid_str); + int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement when trying to check for alert variables."); + return; + } + + rc = sqlite3_bind_int64(res, 1, ae->unique_id); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind ae unique_id for update_filtered"); + goto done; + } + + rc = sqlite3_bind_int64(res, 2, unique_id); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to bind unique_id for update_filtered"); + goto done; + } + + rc = sqlite3_step_monitored(res); + if (likely(rc == SQLITE_DONE)) + ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; + +done: + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement when trying to update_filtered, rc = %d", rc); } #define SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID \ @@ -30,35 +56,35 @@ static void update_filtered(ALARM_ENTRY *ae, uint32_t unique_id, char *uuid_str) "WHERE hld.unique_id = @unique_id AND hl.config_hash_id = ah.hash_id AND hld.health_log_id = hl.health_log_id " \ "AND hl.host_id = @host_id AND ah.warn IS NULL AND ah.crit IS NULL" -static inline bool is_event_from_alert_variable_config(uint32_t unique_id, uuid_t *host_id) +static inline bool is_event_from_alert_variable_config(int64_t unique_id, uuid_t *host_id) { sqlite3_stmt *res = NULL; - int rc = 0; - bool ret = false; - rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID, -1, &res, 0); + int rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID, -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement when trying to check for alert variables."); return false; } - rc = sqlite3_bind_int(res, 1, (int) unique_id); + bool ret = false; + + rc = sqlite3_bind_int64(res, 1, unique_id); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind unique_id for checking alert variable."); - goto fail; + goto done; } rc = sqlite3_bind_blob(res, 2, host_id, sizeof(*host_id), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind host_id for checking alert variable."); - goto fail; + goto done; } rc = sqlite3_step_monitored(res); if (likely(rc == SQLITE_ROW)) ret = true; -fail: +done: rc = sqlite3_finalize(res); if (unlikely(rc != SQLITE_OK)) error_report("Failed to finalize statement when trying to check for alert variables, rc = %d", rc); @@ -78,25 +104,18 @@ fail: static bool should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) { sqlite3_stmt *res = NULL; - char uuid_str[UUID_STR_LEN]; - uuid_unparse_lower_fix(&host->host_uuid, uuid_str); - - bool send = false; if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED) return 0; - if (unlikely(uuid_is_null(ae->config_hash_id))) + if (unlikely(uuid_is_null(ae->config_hash_id) || !host->aclk_config)) return 0; char sql[ACLK_SYNC_QUERY_SIZE]; - uuid_t config_hash_id; - RRDCALC_STATUS status; - uint32_t unique_id; //get the previous sent event of this alarm_id //base the search on the last filtered event - snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_SELECT_ALERT_BY_ID, uuid_str); + snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_SELECT_ALERT_BY_ID, host->aclk_config->uuid_str); int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); if (rc != SQLITE_OK) { @@ -104,6 +123,8 @@ static bool should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) return true; } + bool send = false; + rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind host_id for checking should_send_to_cloud"); @@ -119,17 +140,18 @@ static bool should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) rc = sqlite3_step_monitored(res); if (likely(rc == SQLITE_ROW)) { - status = (RRDCALC_STATUS)sqlite3_column_int(res, 0); + uuid_t config_hash_id; + RRDCALC_STATUS status = (RRDCALC_STATUS)sqlite3_column_int(res, 0); if (sqlite3_column_type(res, 1) != SQLITE_NULL) uuid_copy(config_hash_id, *((uuid_t *)sqlite3_column_blob(res, 1))); - unique_id = (uint32_t)sqlite3_column_int64(res, 2); + int64_t unique_id = sqlite3_column_int64(res, 2); if (ae->new_status != (RRDCALC_STATUS)status || uuid_memcmp(&ae->config_hash_id, &config_hash_id)) send = true; else - update_filtered(ae, unique_id, uuid_str); + update_filtered(ae, unique_id, host->aclk_config->uuid_str); } else send = true; @@ -149,7 +171,6 @@ void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter) { sqlite3_stmt *res_alert = NULL; char sql[ACLK_SYNC_QUERY_SIZE]; - char uuid_str[UUID_STR_LEN]; if (!service_running(SERVICE_ACLK)) return; @@ -163,8 +184,7 @@ void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter) if (is_event_from_alert_variable_config(ae->unique_id, &host->host_uuid)) return; - uuid_unparse_lower_fix(&host->host_uuid, uuid_str); - snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_QUEUE_ALERT_TO_CLOUD, uuid_str); + snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_QUEUE_ALERT_TO_CLOUD, host->aclk_config->uuid_str); int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res_alert, 0); if (unlikely(rc != SQLITE_OK)) { @@ -172,18 +192,18 @@ void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter) return; } - rc = sqlite3_bind_int(res_alert, 1, (int) ae->unique_id); + rc = sqlite3_bind_int64(res_alert, 1, ae->unique_id); if (unlikely(rc != SQLITE_OK)) - goto bind_fail; + goto done; rc = execute_insert(res_alert); if (unlikely(rc == SQLITE_DONE)) { ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); } else - error_report("Failed to store alert event %u, rc = %d", ae->unique_id, rc); + error_report("Failed to store alert event %"PRId64", rc = %d", ae->unique_id, rc); -bind_fail: +done: if (unlikely(sqlite3_finalize(res_alert) != SQLITE_OK)) error_report("Failed to reset statement in store alert event, rc = %d", rc); } @@ -239,7 +259,7 @@ static inline char *sqlite3_text_strdupz_empty(sqlite3_stmt *res, int iCol) { } -void aclk_push_alert_event(struct aclk_sync_host_config *wc) +void aclk_push_alert_event(struct aclk_sync_cfg_t *wc) { #ifndef ENABLE_ACLK UNUSED(wc); @@ -388,9 +408,13 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc) if (first_sequence_id) { buffer_flush(sql); - buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=unixepoch() " - "WHERE +date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";", - wc->uuid_str, first_sequence_id, last_sequence_id); + buffer_sprintf( + sql, + "UPDATE aclk_alert_%s SET date_submitted=unixepoch() " + "WHERE +date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";", + wc->uuid_str, + first_sequence_id, + last_sequence_id); if (unlikely(db_execute(db_meta, buffer_tostring(sql)))) error_report("Failed to mark ACLK alert entries as submitted for host %s", rrdhost_hostname(wc->host)); @@ -430,7 +454,7 @@ void aclk_push_alert_events_for_all_hosts(void) rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); - struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + struct aclk_sync_cfg_t *wc = host->aclk_config; if (likely(wc)) aclk_push_alert_event(wc); } @@ -439,59 +463,54 @@ void aclk_push_alert_events_for_all_hosts(void) void sql_queue_existing_alerts_to_aclk(RRDHOST *host) { - char uuid_str[UUID_STR_LEN]; - uuid_unparse_lower_fix(&host->host_uuid, uuid_str); - BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); sqlite3_stmt *res = NULL; int rc; + struct aclk_sync_cfg_t *wc = host->aclk_config; + + BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + rw_spinlock_write_lock(&host->health_log.spinlock); - buffer_sprintf(sql, "delete from aclk_alert_%s; ", uuid_str); - if (unlikely(db_execute(db_meta, buffer_tostring(sql)))) { - rw_spinlock_write_unlock(&host->health_log.spinlock); - buffer_free(sql); - return; - } + buffer_sprintf(sql, "DELETE FROM aclk_alert_%s", wc->uuid_str); + if (unlikely(db_execute(db_meta, buffer_tostring(sql)))) + goto skip; buffer_flush(sql); + buffer_sprintf( sql, "insert into aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " "select hld.unique_id alert_unique_id, unixepoch(), hld.unique_id alert_unique_id from health_log_detail hld, health_log hl " "where hld.new_status <> 0 and hld.new_status <> -2 and hl.health_log_id = hld.health_log_id and hl.config_hash_id is not null " "and hld.updated_by_id = 0 and hl.host_id = @host_id order by hld.unique_id asc on conflict (alert_unique_id) do nothing;", - uuid_str); + wc->uuid_str); rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); if (rc != SQLITE_OK) { error_report("Failed to prepare statement when trying to queue existing alerts."); - rw_spinlock_write_unlock(&host->health_log.spinlock); - buffer_free(sql); - return; + goto skip; } rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind host_id for when trying to queue existing alerts."); - sqlite3_finalize(res); - rw_spinlock_write_unlock(&host->health_log.spinlock); - buffer_free(sql); - return; + goto done; } rc = execute_insert(res); if (unlikely(rc != SQLITE_DONE)) error_report("Failed to queue existing alerts, rc = %d", rc); - + else + rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); +done: rc = sqlite3_finalize(res); if (unlikely(rc != SQLITE_OK)) error_report("Failed to finalize statement to queue existing alerts, rc = %d", rc); +skip: rw_spinlock_write_unlock(&host->health_log.spinlock); - buffer_free(sql); - rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); } void aclk_send_alarm_configuration(char *config_hash) @@ -499,16 +518,13 @@ void aclk_send_alarm_configuration(char *config_hash) if (unlikely(!config_hash)) return; - struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) localhost->aclk_sync_host_config; + struct aclk_sync_cfg_t *wc = localhost->aclk_config; if (unlikely(!wc)) return; netdata_log_access( - "ACLK REQ [%s (%s)]: Request to send alert config %s.", - wc->node_id, - wc->host ? rrdhost_hostname(wc->host) : "N/A", - config_hash); + "ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, rrdhost_hostname(wc->host), config_hash); aclk_push_alert_config(wc->node_id, config_hash); } @@ -521,25 +537,18 @@ void aclk_send_alarm_configuration(char *config_hash) int aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash __maybe_unused) { - int rc = 0; + int rc; #ifdef ENABLE_ACLK CHECK_SQLITE_CONNECTION(db_meta); sqlite3_stmt *res = NULL; + struct aclk_sync_cfg_t *wc; - struct aclk_sync_host_config *wc = NULL; RRDHOST *host = find_host_by_node_id(node_id); - if (unlikely(!host)) { - freez(config_hash); - freez(node_id); - return 1; - } - - wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; - if (unlikely(!wc)) { + if (unlikely(!host || !(wc = host->aclk_config))) { freez(config_hash); freez(node_id); return 1; @@ -653,45 +662,48 @@ void aclk_start_alert_streaming(char *node_id, bool resets) if (unlikely(!node_id || uuid_parse(node_id, node_uuid))) return; - RRDHOST *host = find_host_by_node_id(node_id); - - if (unlikely(!host)) - return; - - struct aclk_sync_host_config *wc = host->aclk_sync_host_config; + struct aclk_sync_cfg_t *wc; - if (unlikely(!wc)) + RRDHOST *host = find_host_by_node_id(node_id); + if (unlikely(!host || !(wc = host->aclk_config))) return; if (unlikely(!host->health.health_enabled)) { - netdata_log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id); + netdata_log_access( + "ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id); return; } if (resets) { - netdata_log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); + netdata_log_access( + "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)", + node_id, + wc->host ? rrdhost_hostname(wc->host) : "N/A"); sql_queue_existing_alerts_to_aclk(host); } else - netdata_log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); + netdata_log_access( + "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A"); wc->alert_updates = 1; wc->alert_queue_removed = SEND_REMOVED_AFTER_HEALTH_LOOPS; } -#define SQL_QUEUE_REMOVE_ALERTS "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ +#define SQL_QUEUE_REMOVE_ALERTS \ + "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \ "SELECT hld.unique_id alert_unique_id, UNIXEPOCH(), hld.unique_id alert_unique_id FROM health_log hl, health_log_detail hld " \ - "WHERE hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id AND hld.new_status = -2 AND hld.updated_by_id = 0 " \ - "AND hld.unique_id NOT IN (SELECT alert_unique_id FROM aclk_alert_%s) " \ - "AND hl.config_hash_id NOT IN (select hash_id from alert_hash where warn is null and crit is null) " \ - "AND hl.name || hl.chart NOT IN (select name || chart from health_log where name = hl.name and chart = hl.chart and alarm_id > hl.alarm_id and host_id = hl.host_id) " \ + "WHERE hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id AND hld.new_status = -2 AND hld.updated_by_id = 0 " \ + "AND hld.unique_id NOT IN (SELECT alert_unique_id FROM aclk_alert_%s) " \ + "AND hl.config_hash_id NOT IN (select hash_id from alert_hash where warn is null and crit is null) " \ + "AND hl.name || hl.chart NOT IN (select name || chart from health_log where name = hl.name and " \ + "chart = hl.chart and alarm_id > hl.alarm_id and host_id = hl.host_id) " \ "ORDER BY hld.unique_id ASC ON CONFLICT (alert_unique_id) DO NOTHING;" void sql_process_queue_removed_alerts_to_aclk(char *node_id) { - struct aclk_sync_host_config *wc; + struct aclk_sync_cfg_t *wc; RRDHOST *host = find_host_by_node_id(node_id); freez(node_id); - if (unlikely(!host || !(wc = host->aclk_sync_host_config))) + if (unlikely(!host || !(wc = host->aclk_config))) return; char sql[ACLK_SYNC_QUERY_SIZE * 2]; @@ -708,33 +720,25 @@ void sql_process_queue_removed_alerts_to_aclk(char *node_id) rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC); if (unlikely(rc != SQLITE_OK)) { error_report("Failed to bind host_id for when trying to queue remvoed alerts."); - sqlite3_finalize(res); - return; + goto skip; } rc = execute_insert(res); - if (unlikely(rc != SQLITE_DONE)) { - sqlite3_finalize(res); - error_report("Failed to queue removed alerts, rc = %d", rc); - return; + if (likely(rc == SQLITE_DONE)) { + netdata_log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host)); + rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); + wc->alert_queue_removed = 0; } +skip: rc = sqlite3_finalize(res); if (unlikely(rc != SQLITE_OK)) error_report("Failed to finalize statement to queue removed alerts, rc = %d", rc); - - netdata_log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host)); - - rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS); - wc->alert_queue_removed = 0; } void sql_queue_removed_alerts_to_aclk(RRDHOST *host) { - if (unlikely(!host->aclk_sync_host_config)) - return; - - if (!claimed() || !host->node_id) + if (unlikely(!host->aclk_config || !claimed() || !host->node_id)) return; char node_id[UUID_STR_LEN]; @@ -746,32 +750,28 @@ void sql_queue_removed_alerts_to_aclk(RRDHOST *host) void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, char *snapshot_uuid) { uuid_t node_uuid; + if (unlikely(!node_id || uuid_parse(node_id, node_uuid))) return; + struct aclk_sync_cfg_t *wc; + RRDHOST *host = find_host_by_node_id(node_id); - if (unlikely(!host)) { + if (unlikely(!host || !(wc = host->aclk_config))) { netdata_log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id); return; } - struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config; - - if (unlikely(!wc)) { - netdata_log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id); - return; - } - netdata_log_access( - "IN [%s (%s)]: Request to send alerts snapshot, snapshot_uuid %s", - node_id, - wc->h |