summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2023-11-21 12:00:51 +0200
committerGitHub <noreply@github.com>2023-11-21 12:00:51 +0200
commit85d43694355dfd31f1a57fcec68adf27327d5868 (patch)
treefad13b6b8f6ae1e473f2ad0129bb7992f8b0e9dc /database
parentc0feaec456e474313e827f489b0fcf9a7c5491e8 (diff)
Remove queue limit from ACLK sync event loop (#16411)
Code cleanup
Diffstat (limited to 'database')
-rw-r--r--database/rrd.h2
-rw-r--r--database/sqlite/sqlite_aclk.c134
-rw-r--r--database/sqlite/sqlite_aclk.h25
-rw-r--r--database/sqlite/sqlite_aclk_alert.c350
-rw-r--r--database/sqlite/sqlite_aclk_alert.h2
-rw-r--r--database/sqlite/sqlite_aclk_node.c6
-rw-r--r--database/sqlite/sqlite_functions.c2
7 files changed, 235 insertions, 286 deletions
diff --git a/database/rrd.h b/database/rrd.h
index 3687497ffd..341a29e501 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -1273,7 +1273,7 @@ struct rrdhost {
struct sender_state *sender;
netdata_thread_t rrdpush_sender_thread; // the sender thread
size_t rrdpush_sender_replicating_charts; // the number of charts currently being replicated to a parent
- void *aclk_sync_host_config;
+ struct aclk_sync_cfg_t *aclk_config;
uint32_t rrdpush_receiver_connection_counter; // the number of times this receiver has connected
uint32_t rrdpush_sender_connection_counter; // the number of times this sender has connected
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c
index 3d3caeff21..3a702738aa 100644
--- a/database/sqlite/sqlite_aclk.c
+++ b/database/sqlite/sqlite_aclk.c
@@ -11,60 +11,46 @@ struct aclk_sync_config_s {
uv_timer_t timer_req;
time_t cleanup_after; // Start a cleanup after this timestamp
uv_async_t async;
- /* FIFO command queue */
- uv_mutex_t cmd_mutex;
- uv_cond_t cmd_cond;
bool initialized;
- volatile unsigned queue_size;
- struct aclk_database_cmdqueue cmd_queue;
+ SPINLOCK cmd_queue_lock;
+ struct aclk_database_cmd *cmd_base;
} aclk_sync_config = { 0 };
-
void sanity_check(void) {
// make sure the compiler will stop on misconfigurations
BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < ACLK_MAX_ENUMERATIONS_DEFINED);
}
-
-int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd)
+static struct aclk_database_cmd aclk_database_deq_cmd(void)
{
- unsigned queue_size;
+ struct aclk_database_cmd ret;
- /* wait for free space in queue */
- uv_mutex_lock(&aclk_sync_config.cmd_mutex);
- if ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) {
- uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
- return 1;
+ spinlock_lock(&aclk_sync_config.cmd_queue_lock);
+ if(aclk_sync_config.cmd_base) {
+ struct aclk_database_cmd *t = aclk_sync_config.cmd_base;
+ DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(aclk_sync_config.cmd_base, t, prev, next);
+ ret = *t;
+ freez(t);
+ }
+ else {
+ ret.opcode = ACLK_DATABASE_NOOP;
+ ret.completion = NULL;
}
+ spinlock_unlock(&aclk_sync_config.cmd_queue_lock);
- fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE);
- /* enqueue command */
- aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd;
- aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
- aclk_sync_config.cmd_queue.tail + 1 : 0;
- aclk_sync_config.queue_size = queue_size + 1;
- uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
- return 0;
+ return ret;
}
static void aclk_database_enq_cmd(struct aclk_database_cmd *cmd)
{
- unsigned queue_size;
+ struct aclk_database_cmd *t = mallocz(sizeof(*t));
+ *t = *cmd;
+ t->prev = t->next = NULL;
+
+ spinlock_lock(&aclk_sync_config.cmd_queue_lock);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(aclk_sync_config.cmd_base, t, prev, next);
+ spinlock_unlock(&aclk_sync_config.cmd_queue_lock);
- /* wait for free space in queue */
- uv_mutex_lock(&aclk_sync_config.cmd_mutex);
- while ((queue_size = aclk_sync_config.queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) {
- uv_cond_wait(&aclk_sync_config.cmd_cond, &aclk_sync_config.cmd_mutex);
- }
- fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE);
- /* enqueue command */
- aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.tail] = *cmd;
- aclk_sync_config.cmd_queue.tail = aclk_sync_config.cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
- aclk_sync_config.cmd_queue.tail + 1 : 0;
- aclk_sync_config.queue_size = queue_size + 1;
- uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
-
- /* wake up event loop */
(void) uv_async_send(&aclk_sync_config.async);
}
@@ -145,35 +131,6 @@ static int create_host_callback(void *data, int argc, char **argv, char **column
}
#ifdef ENABLE_ACLK
-static struct aclk_database_cmd aclk_database_deq_cmd(void)
-{
- struct aclk_database_cmd ret;
- unsigned queue_size;
-
- uv_mutex_lock(&aclk_sync_config.cmd_mutex);
- queue_size = aclk_sync_config.queue_size;
- if (queue_size == 0) {
- memset(&ret, 0, sizeof(ret));
- ret.opcode = ACLK_DATABASE_NOOP;
- ret.completion = NULL;
-
- } else {
- /* dequeue command */
- ret = aclk_sync_config.cmd_queue.cmd_array[aclk_sync_config.cmd_queue.head];
- if (queue_size == 1) {
- aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0;
- } else {
- aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.head != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
- aclk_sync_config.cmd_queue.head + 1 : 0;
- }
- aclk_sync_config.queue_size = queue_size - 1;
- /* wake up producers */
- uv_cond_signal(&aclk_sync_config.cmd_cond);
- }
- uv_mutex_unlock(&aclk_sync_config.cmd_mutex);
-
- return ret;
-}
#define SQL_SELECT_HOST_BY_UUID "SELECT host_id FROM host WHERE host_id = @host_id;"
static int is_host_available(uuid_t *host_id)
@@ -264,7 +221,7 @@ static int sql_check_aclk_table(void *data __maybe_unused, int argc __maybe_unus
memset(&cmd, 0, sizeof(cmd));
cmd.opcode = ACLK_DATABASE_DELETE_HOST;
cmd.param[0] = strdupz((char *) argv[0]);
- aclk_database_enq_cmd_noblock(&cmd);
+ aclk_database_enq_cmd(&cmd);
return 0;
}
@@ -292,7 +249,6 @@ static int sql_maint_aclk_sync_database(void *data __maybe_unused, int argc __ma
return 0;
}
-
#define SQL_SELECT_ACLK_ALERT_LIST "SELECT SUBSTR(name,12) FROM sqlite_schema WHERE name LIKE 'aclk_alert_%' AND type IN ('table');"
static void sql_maint_aclk_sync_database_all(void)
@@ -307,7 +263,7 @@ static void sql_maint_aclk_sync_database_all(void)
static int aclk_config_parameters(void *data __maybe_unused, int argc __maybe_unused, char **argv, char **column __maybe_unused)
{
- char uuid_str[GUID_LEN + 1];
+ char uuid_str[UUID_STR_LEN];
uuid_unparse_lower(*((uuid_t *) argv[0]), uuid_str);
RRDHOST *host = rrdhost_find_by_guid(uuid_str);
@@ -337,16 +293,15 @@ static void timer_cb(uv_timer_t *handle)
time_t now = now_realtime_sec();
- if (config->cleanup_after && config->cleanup_after < now) {
+ if (config->cleanup_after < now) {
cmd.opcode = ACLK_DATABASE_CLEANUP;
- if (!aclk_database_enq_cmd_noblock(&cmd))
- config->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL;
+ aclk_database_enq_cmd(&cmd);
+ config->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL;
}
if (aclk_connected) {
cmd.opcode = ACLK_DATABASE_PUSH_ALERT;
- aclk_database_enq_cmd_noblock(&cmd);
-
+ aclk_database_enq_cmd(&cmd);
aclk_check_node_info_and_collectors();
}
}
@@ -417,7 +372,7 @@ static void aclk_synchronization(void *arg __maybe_unused)
case ACLK_DATABASE_NODE_STATE:;
RRDHOST *host = cmd.param[0];
int live = (host == localhost || host->receiver || !(rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN))) ? 1 : 0;
- struct aclk_sync_host_config *ahc = host->aclk_sync_host_config;
+ struct aclk_sync_cfg_t *ahc = host->aclk_config;
if (unlikely(!ahc))
sql_create_aclk_table(host, &host->host_uuid, host->node_id);
aclk_host_state_update(host, live);
@@ -447,8 +402,6 @@ static void aclk_synchronization(void *arg __maybe_unused)
uv_close((uv_handle_t *)&config->timer_req, NULL);
uv_close((uv_handle_t *)&config->async, NULL);
-// uv_close((uv_handle_t *)&config->async_exit, NULL);
- uv_cond_destroy(&config->cmd_cond);
(void) uv_loop_close(loop);
worker_unregister();
@@ -458,11 +411,7 @@ static void aclk_synchronization(void *arg __maybe_unused)
static void aclk_synchronization_init(void)
{
- aclk_sync_config.cmd_queue.head = aclk_sync_config.cmd_queue.tail = 0;
- aclk_sync_config.queue_size = 0;
- fatal_assert(0 == uv_cond_init(&aclk_sync_config.cmd_cond));
- fatal_assert(0 == uv_mutex_init(&aclk_sync_config.cmd_mutex));
-
+ memset(&aclk_sync_config, 0, sizeof(aclk_sync_config));
fatal_assert(0 == uv_thread_create(&aclk_sync_config.thread, aclk_synchronization, &aclk_sync_config));
}
#endif
@@ -472,8 +421,8 @@ static void aclk_synchronization_init(void)
void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __maybe_unused, uuid_t *node_id __maybe_unused)
{
#ifdef ENABLE_ACLK
- char uuid_str[GUID_LEN + 1];
- char host_guid[GUID_LEN + 1];
+ char uuid_str[UUID_STR_LEN];
+ char host_guid[UUID_STR_LEN];
int rc;
uuid_unparse_lower_fix(host_uuid, uuid_str);
@@ -496,17 +445,17 @@ void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __may
if (unlikely(rc))
error_report("Failed to create ACLK alert table index 2 for host %s", host ? string2str(host->hostname) : host_guid);
}
- if (likely(host) && unlikely(host->aclk_sync_host_config))
+ if (likely(host) && unlikely(host->aclk_config))
return;
if (unlikely(!host))
return;
- struct aclk_sync_host_config *wc = callocz(1, sizeof(struct aclk_sync_host_config));
+ struct aclk_sync_cfg_t *wc = callocz(1, sizeof(struct aclk_sync_cfg_t));
if (node_id && !uuid_is_null(*node_id))
uuid_unparse_lower(*node_id, wc->node_id);
- host->aclk_sync_host_config = (void *)wc;
+ host->aclk_config = wc;
if (node_id && !host->node_id) {
host->node_id = mallocz(sizeof(*host->node_id));
uuid_copy(*host->node_id, *node_id);
@@ -520,12 +469,15 @@ void sql_create_aclk_table(RRDHOST *host __maybe_unused, uuid_t *host_uuid __may
#endif
}
-#define SQL_FETCH_ALL_HOSTS "SELECT host_id, hostname, registry_hostname, update_every, os, " \
- "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " \
+#define SQL_FETCH_ALL_HOSTS \
+ "SELECT host_id, hostname, registry_hostname, update_every, os, " \
+ "timezone, tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, " \
"program_version, entries, health_enabled, last_connected FROM host WHERE hops >0;"
-#define SQL_FETCH_ALL_INSTANCES "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni " \
- "WHERE h.host_id = ni.host_id AND ni.node_id IS NOT NULL; "
+#define SQL_FETCH_ALL_INSTANCES \
+ "SELECT ni.host_id, ni.node_id FROM host h, node_instance ni " \
+ "WHERE h.host_id = ni.host_id AND ni.node_id IS NOT NULL; "
+
void sql_aclk_sync_init(void)
{
char *err_msg = NULL;
diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h
index 850ca434e2..a8d0a274a9 100644
--- a/database/sqlite/sqlite_aclk.h
+++ b/database/sqlite/sqlite_aclk.h
@@ -5,7 +5,6 @@
#include "sqlite3.h"
-
#ifndef ACLK_MAX_CHART_BATCH
#define ACLK_MAX_CHART_BATCH (200)
#endif
@@ -41,11 +40,11 @@ static inline int claimed()
return localhost->aclk_state.claimed_id != NULL;
}
-#define TABLE_ACLK_ALERT "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \
- "alert_unique_id, date_created, date_submitted, date_cloud_ack, filtered_alert_unique_id NOT NULL, " \
- "unique(alert_unique_id));"
+#define TABLE_ACLK_ALERT \
+ "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \
+ "alert_unique_id, date_created, date_submitted, date_cloud_ack, filtered_alert_unique_id NOT NULL, " \
+ "UNIQUE(alert_unique_id));"
-#define INDEX_ACLK_ALERT "CREATE INDEX IF NOT EXISTS aclk_alert_index_%s ON aclk_alert_%s (alert_unique_id);"
#define INDEX_ACLK_ALERT1 "CREATE INDEX IF NOT EXISTS aclk_alert_index1_%s ON aclk_alert_%s (filtered_alert_unique_id);"
#define INDEX_ACLK_ALERT2 "CREATE INDEX IF NOT EXISTS aclk_alert_index2_%s ON aclk_alert_%s (date_submitted);"
@@ -71,16 +70,10 @@ struct aclk_database_cmd {
enum aclk_database_opcode opcode;
void *param[2];
struct completion *completion;
+ struct aclk_database_cmd *prev, *next;
};
-#define ACLK_DATABASE_CMD_Q_MAX_SIZE (1024)
-
-struct aclk_database_cmdqueue {
- unsigned head, tail;
- struct aclk_database_cmd cmd_array[ACLK_DATABASE_CMD_Q_MAX_SIZE];
-};
-
-struct aclk_sync_host_config {
+typedef struct aclk_sync_cfg_t {
RRDHOST *host;
int alert_updates;
int alert_checkpoint_req;
@@ -92,16 +85,12 @@ struct aclk_sync_host_config {
char *alerts_snapshot_uuid; // will contain the snapshot_uuid value if snapshot was requested
uint64_t alerts_log_first_sequence_id;
uint64_t alerts_log_last_sequence_id;
-};
-
-extern sqlite3 *db_meta;
+} aclk_sync_cfg_t;
-int aclk_database_enq_cmd_noblock(struct aclk_database_cmd *cmd);
void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id);
void sql_aclk_sync_init(void);
void aclk_push_alert_config(const char *node_id, const char *config_hash);
void aclk_push_node_alert_snapshot(const char *node_id);
-void aclk_push_node_health_log(const char *node_id);
void aclk_push_node_removed_alerts(const char *node_id);
void schedule_node_info_update(RRDHOST *host);
diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c
index 89c75f7a89..62ffab78a5 100644
--- a/database/sqlite/sqlite_aclk_alert.c
+++ b/database/sqlite/sqlite_aclk_alert.c
@@ -13,16 +13,42 @@
sqlite3_column_bytes((res), (_param)) ? strdupz((char *)sqlite3_column_text((res), (_param))) : NULL; \
})
-
#define SQL_UPDATE_FILTERED_ALERT \
- "UPDATE aclk_alert_%s SET filtered_alert_unique_id = %u, date_created = unixepoch() where filtered_alert_unique_id = %u"
+ "UPDATE aclk_alert_%s SET filtered_alert_unique_id = @new_alert, date_created = UNIXEPOCH() " \
+ "WHERE filtered_alert_unique_id = @old_alert"
-static void update_filtered(ALARM_ENTRY *ae, uint32_t unique_id, char *uuid_str)
+static void update_filtered(ALARM_ENTRY *ae, int64_t unique_id, char *uuid_str)
{
+ sqlite3_stmt *res = NULL;
+
char sql[ACLK_SYNC_QUERY_SIZE];
- snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, SQL_UPDATE_FILTERED_ALERT, uuid_str, ae->unique_id, unique_id);
- sqlite3_exec_monitored(db_meta, sql, 0, 0, NULL);
- ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
+ snprintfz(sql, ACLK_SYNC_QUERY_SIZE-1, SQL_UPDATE_FILTERED_ALERT, uuid_str);
+ int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
+ if (rc != SQLITE_OK) {
+ error_report("Failed to prepare statement when trying to check for alert variables.");
+ return;
+ }
+
+ rc = sqlite3_bind_int64(res, 1, ae->unique_id);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to bind ae unique_id for update_filtered");
+ goto done;
+ }
+
+ rc = sqlite3_bind_int64(res, 2, unique_id);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to bind unique_id for update_filtered");
+ goto done;
+ }
+
+ rc = sqlite3_step_monitored(res);
+ if (likely(rc == SQLITE_DONE))
+ ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
+
+done:
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to finalize statement when trying to update_filtered, rc = %d", rc);
}
#define SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID \
@@ -30,35 +56,35 @@ static void update_filtered(ALARM_ENTRY *ae, uint32_t unique_id, char *uuid_str)
"WHERE hld.unique_id = @unique_id AND hl.config_hash_id = ah.hash_id AND hld.health_log_id = hl.health_log_id " \
"AND hl.host_id = @host_id AND ah.warn IS NULL AND ah.crit IS NULL"
-static inline bool is_event_from_alert_variable_config(uint32_t unique_id, uuid_t *host_id)
+static inline bool is_event_from_alert_variable_config(int64_t unique_id, uuid_t *host_id)
{
sqlite3_stmt *res = NULL;
- int rc = 0;
- bool ret = false;
- rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID, -1, &res, 0);
+ int rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_VARIABLE_ALERT_BY_UNIQUE_ID, -1, &res, 0);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement when trying to check for alert variables.");
return false;
}
- rc = sqlite3_bind_int(res, 1, (int) unique_id);
+ bool ret = false;
+
+ rc = sqlite3_bind_int64(res, 1, unique_id);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to bind unique_id for checking alert variable.");
- goto fail;
+ goto done;
}
rc = sqlite3_bind_blob(res, 2, host_id, sizeof(*host_id), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to bind host_id for checking alert variable.");
- goto fail;
+ goto done;
}
rc = sqlite3_step_monitored(res);
if (likely(rc == SQLITE_ROW))
ret = true;
-fail:
+done:
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to finalize statement when trying to check for alert variables, rc = %d", rc);
@@ -78,25 +104,18 @@ fail:
static bool should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
{
sqlite3_stmt *res = NULL;
- char uuid_str[UUID_STR_LEN];
- uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
-
- bool send = false;
if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED)
return 0;
- if (unlikely(uuid_is_null(ae->config_hash_id)))
+ if (unlikely(uuid_is_null(ae->config_hash_id) || !host->aclk_config))
return 0;
char sql[ACLK_SYNC_QUERY_SIZE];
- uuid_t config_hash_id;
- RRDCALC_STATUS status;
- uint32_t unique_id;
//get the previous sent event of this alarm_id
//base the search on the last filtered event
- snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_SELECT_ALERT_BY_ID, uuid_str);
+ snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_SELECT_ALERT_BY_ID, host->aclk_config->uuid_str);
int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0);
if (rc != SQLITE_OK) {
@@ -104,6 +123,8 @@ static bool should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
return true;
}
+ bool send = false;
+
rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to bind host_id for checking should_send_to_cloud");
@@ -119,17 +140,18 @@ static bool should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae)
rc = sqlite3_step_monitored(res);
if (likely(rc == SQLITE_ROW)) {
- status = (RRDCALC_STATUS)sqlite3_column_int(res, 0);
+ uuid_t config_hash_id;
+ RRDCALC_STATUS status = (RRDCALC_STATUS)sqlite3_column_int(res, 0);
if (sqlite3_column_type(res, 1) != SQLITE_NULL)
uuid_copy(config_hash_id, *((uuid_t *)sqlite3_column_blob(res, 1)));
- unique_id = (uint32_t)sqlite3_column_int64(res, 2);
+ int64_t unique_id = sqlite3_column_int64(res, 2);
if (ae->new_status != (RRDCALC_STATUS)status || uuid_memcmp(&ae->config_hash_id, &config_hash_id))
send = true;
else
- update_filtered(ae, unique_id, uuid_str);
+ update_filtered(ae, unique_id, host->aclk_config->uuid_str);
} else
send = true;
@@ -149,7 +171,6 @@ void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter)
{
sqlite3_stmt *res_alert = NULL;
char sql[ACLK_SYNC_QUERY_SIZE];
- char uuid_str[UUID_STR_LEN];
if (!service_running(SERVICE_ACLK))
return;
@@ -163,8 +184,7 @@ void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter)
if (is_event_from_alert_variable_config(ae->unique_id, &host->host_uuid))
return;
- uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
- snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_QUEUE_ALERT_TO_CLOUD, uuid_str);
+ snprintfz(sql, ACLK_SYNC_QUERY_SIZE - 1, SQL_QUEUE_ALERT_TO_CLOUD, host->aclk_config->uuid_str);
int rc = sqlite3_prepare_v2(db_meta, sql, -1, &res_alert, 0);
if (unlikely(rc != SQLITE_OK)) {
@@ -172,18 +192,18 @@ void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, bool skip_filter)
return;
}
- rc = sqlite3_bind_int(res_alert, 1, (int) ae->unique_id);
+ rc = sqlite3_bind_int64(res_alert, 1, ae->unique_id);
if (unlikely(rc != SQLITE_OK))
- goto bind_fail;
+ goto done;
rc = execute_insert(res_alert);
if (unlikely(rc == SQLITE_DONE)) {
ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
} else
- error_report("Failed to store alert event %u, rc = %d", ae->unique_id, rc);
+ error_report("Failed to store alert event %"PRId64", rc = %d", ae->unique_id, rc);
-bind_fail:
+done:
if (unlikely(sqlite3_finalize(res_alert) != SQLITE_OK))
error_report("Failed to reset statement in store alert event, rc = %d", rc);
}
@@ -239,7 +259,7 @@ static inline char *sqlite3_text_strdupz_empty(sqlite3_stmt *res, int iCol) {
}
-void aclk_push_alert_event(struct aclk_sync_host_config *wc)
+void aclk_push_alert_event(struct aclk_sync_cfg_t *wc)
{
#ifndef ENABLE_ACLK
UNUSED(wc);
@@ -388,9 +408,13 @@ void aclk_push_alert_event(struct aclk_sync_host_config *wc)
if (first_sequence_id) {
buffer_flush(sql);
- buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=unixepoch() "
- "WHERE +date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";",
- wc->uuid_str, first_sequence_id, last_sequence_id);
+ buffer_sprintf(
+ sql,
+ "UPDATE aclk_alert_%s SET date_submitted=unixepoch() "
+ "WHERE +date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";",
+ wc->uuid_str,
+ first_sequence_id,
+ last_sequence_id);
if (unlikely(db_execute(db_meta, buffer_tostring(sql))))
error_report("Failed to mark ACLK alert entries as submitted for host %s", rrdhost_hostname(wc->host));
@@ -430,7 +454,7 @@ void aclk_push_alert_events_for_all_hosts(void)
rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
- struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
if (likely(wc))
aclk_push_alert_event(wc);
}
@@ -439,59 +463,54 @@ void aclk_push_alert_events_for_all_hosts(void)
void sql_queue_existing_alerts_to_aclk(RRDHOST *host)
{
- char uuid_str[UUID_STR_LEN];
- uuid_unparse_lower_fix(&host->host_uuid, uuid_str);
- BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
sqlite3_stmt *res = NULL;
int rc;
+ struct aclk_sync_cfg_t *wc = host->aclk_config;
+
+ BUFFER *sql = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+
rw_spinlock_write_lock(&host->health_log.spinlock);
- buffer_sprintf(sql, "delete from aclk_alert_%s; ", uuid_str);
- if (unlikely(db_execute(db_meta, buffer_tostring(sql)))) {
- rw_spinlock_write_unlock(&host->health_log.spinlock);
- buffer_free(sql);
- return;
- }
+ buffer_sprintf(sql, "DELETE FROM aclk_alert_%s", wc->uuid_str);
+ if (unlikely(db_execute(db_meta, buffer_tostring(sql))))
+ goto skip;
buffer_flush(sql);
+
buffer_sprintf(
sql,
"insert into aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) "
"select hld.unique_id alert_unique_id, unixepoch(), hld.unique_id alert_unique_id from health_log_detail hld, health_log hl "
"where hld.new_status <> 0 and hld.new_status <> -2 and hl.health_log_id = hld.health_log_id and hl.config_hash_id is not null "
"and hld.updated_by_id = 0 and hl.host_id = @host_id order by hld.unique_id asc on conflict (alert_unique_id) do nothing;",
- uuid_str);
+ wc->uuid_str);
rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
if (rc != SQLITE_OK) {
error_report("Failed to prepare statement when trying to queue existing alerts.");
- rw_spinlock_write_unlock(&host->health_log.spinlock);
- buffer_free(sql);
- return;
+ goto skip;
}
rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to bind host_id for when trying to queue existing alerts.");
- sqlite3_finalize(res);
- rw_spinlock_write_unlock(&host->health_log.spinlock);
- buffer_free(sql);
- return;
+ goto done;
}
rc = execute_insert(res);
if (unlikely(rc != SQLITE_DONE))
error_report("Failed to queue existing alerts, rc = %d", rc);
-
+ else
+ rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
+done:
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to finalize statement to queue existing alerts, rc = %d", rc);
+skip:
rw_spinlock_write_unlock(&host->health_log.spinlock);
-
buffer_free(sql);
- rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
}
void aclk_send_alarm_configuration(char *config_hash)
@@ -499,16 +518,13 @@ void aclk_send_alarm_configuration(char *config_hash)
if (unlikely(!config_hash))
return;
- struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *) localhost->aclk_sync_host_config;
+ struct aclk_sync_cfg_t *wc = localhost->aclk_config;
if (unlikely(!wc))
return;
netdata_log_access(
- "ACLK REQ [%s (%s)]: Request to send alert config %s.",
- wc->node_id,
- wc->host ? rrdhost_hostname(wc->host) : "N/A",
- config_hash);
+ "ACLK REQ [%s (%s)]: Request to send alert config %s.", wc->node_id, rrdhost_hostname(wc->host), config_hash);
aclk_push_alert_config(wc->node_id, config_hash);
}
@@ -521,25 +537,18 @@ void aclk_send_alarm_configuration(char *config_hash)
int aclk_push_alert_config_event(char *node_id __maybe_unused, char *config_hash __maybe_unused)
{
- int rc = 0;
+ int rc;
#ifdef ENABLE_ACLK
CHECK_SQLITE_CONNECTION(db_meta);
sqlite3_stmt *res = NULL;
+ struct aclk_sync_cfg_t *wc;
- struct aclk_sync_host_config *wc = NULL;
RRDHOST *host = find_host_by_node_id(node_id);
- if (unlikely(!host)) {
- freez(config_hash);
- freez(node_id);
- return 1;
- }
-
- wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
- if (unlikely(!wc)) {
+ if (unlikely(!host || !(wc = host->aclk_config))) {
freez(config_hash);
freez(node_id);
return 1;
@@ -653,45 +662,48 @@ void aclk_start_alert_streaming(char *node_id, bool resets)
if (unlikely(!node_id || uuid_parse(node_id, node_uuid)))
return;
- RRDHOST *host = find_host_by_node_id(node_id);
-
- if (unlikely(!host))
- return;
-
- struct aclk_sync_host_config *wc = host->aclk_sync_host_config;
+ struct aclk_sync_cfg_t *wc;
- if (unlikely(!wc))
+ RRDHOST *host = find_host_by_node_id(node_id);
+ if (unlikely(!host || !(wc = host->aclk_config)))
return;
if (unlikely(!host->health.health_enabled)) {
- netdata_log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
+ netdata_log_access(
+ "ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id);
return;
}
if (resets) {
- netdata_log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
+ netdata_log_access(
+ "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED (RESET REQUESTED)",
+ node_id,
+ wc->host ? rrdhost_hostname(wc->host) : "N/A");
sql_queue_existing_alerts_to_aclk(host);
} else
- netdata_log_access("ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
+ netdata_log_access(
+ "ACLK REQ [%s (%s)]: STREAM ALERTS ENABLED", node_id, wc->host ? rrdhost_hostname(wc->host) : "N/A");
wc->alert_updates = 1;
wc->alert_queue_removed = SEND_REMOVED_AFTER_HEALTH_LOOPS;
}
-#define SQL_QUEUE_REMOVE_ALERTS "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
+#define SQL_QUEUE_REMOVE_ALERTS \
+ "INSERT INTO aclk_alert_%s (alert_unique_id, date_created, filtered_alert_unique_id) " \
"SELECT hld.unique_id alert_unique_id, UNIXEPOCH(), hld.unique_id alert_unique_id FROM health_log hl, health_log_detail hld " \
- "WHERE hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id AND hld.new_status = -2 AND hld.updated_by_id = 0 " \
- "AND hld.unique_id NOT IN (SELECT alert_unique_id FROM aclk_alert_%s) " \
- "AND hl.config_hash_id NOT IN (select hash_id from alert_hash where warn is null and crit is null) " \
- "AND hl.name || hl.chart NOT IN (select name || chart from health_log where name = hl.name and chart = hl.chart and alarm_id > hl.alarm_id and host_id = hl.host_id) " \
+ "WHERE hl.host_id = @host_id AND hl.health_log_id = hld.health_log_id AND hld.new_status = -2 AND hld.updated_by_id = 0 " \
+ "AND hld.unique_id NOT IN (SELECT alert_unique_id FROM aclk_alert_%s) " \
+ "AND hl.config_hash_id NOT IN (select hash_id from alert_hash where warn is null and crit is null) " \
+ "AND hl.name || hl.chart NOT IN (select name || chart from health_log where name = hl.name and " \
+ "chart = hl.chart and alarm_id > hl.alarm_id and host_id = hl.host_id) " \
"ORDER BY hld.unique_id ASC ON CONFLICT (alert_unique_id) DO NOTHING;"
void sql_process_queue_removed_alerts_to_aclk(char *node_id)
{
- struct aclk_sync_host_config *wc;
+ struct aclk_sync_cfg_t *wc;
RRDHOST *host = find_host_by_node_id(node_id);
freez(node_id);
- if (unlikely(!host || !(wc = host->aclk_sync_host_config)))
+ if (unlikely(!host || !(wc = host->aclk_config)))
return;
char sql[ACLK_SYNC_QUERY_SIZE * 2];
@@ -708,33 +720,25 @@ void sql_process_queue_removed_alerts_to_aclk(char *node_id)
rc = sqlite3_bind_blob(res, 1, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK)) {
error_report("Failed to bind host_id for when trying to queue remvoed alerts.");
- sqlite3_finalize(res);
- return;
+ goto skip;
}
rc = execute_insert(res);
- if (unlikely(rc != SQLITE_DONE)) {
- sqlite3_finalize(res);
- error_report("Failed to queue removed alerts, rc = %d", rc);
- return;
+ if (likely(rc == SQLITE_DONE)) {
+ netdata_log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host));
+ rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
+ wc->alert_queue_removed = 0;
}
+skip:
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
error_report("Failed to finalize statement to queue removed alerts, rc = %d", rc);
-
- netdata_log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, rrdhost_hostname(wc->host));
-
- rrdhost_flag_set(wc->host, RRDHOST_FLAG_ACLK_STREAM_ALERTS);
- wc->alert_queue_removed = 0;
}
void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
{
- if (unlikely(!host->aclk_sync_host_config))
- return;
-
- if (!claimed() || !host->node_id)
+ if (unlikely(!host->aclk_config || !claimed() || !host->node_id))
return;
char node_id[UUID_STR_LEN];
@@ -746,32 +750,28 @@ void sql_queue_removed_alerts_to_aclk(RRDHOST *host)
void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id __maybe_unused, char *snapshot_uuid)
{
uuid_t node_uuid;
+
if (unlikely(!node_id || uuid_parse(node_id, node_uuid)))
return;
+ struct aclk_sync_cfg_t *wc;
+
RRDHOST *host = find_host_by_node_id(node_id);
- if (unlikely(!host)) {
+ if (unlikely(!host || !(wc = host->aclk_config))) {
netdata_log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id);
return;
}
- struct aclk_sync_host_config *wc = (struct aclk_sync_host_config *)host->aclk_sync_host_config;
-
- if (unlikely(!wc)) {
- netdata_log_access("ACLK STA [%s (N/A)]: ACLK node id does not exist", node_id);
- return;
- }
-
netdata_log_access(
- "IN [%s (%s)]: Request to send alerts snapshot, snapshot_uuid %s",
- node_id,
- wc->h