summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorEmmanuel Vasilakis <mrzammler@mm.st>2021-09-23 17:34:34 +0300
committerGitHub <noreply@github.com>2021-09-23 17:34:34 +0300
commit4ae31993115f3d796e31e931441c4bfd371dcbfa (patch)
treee73846f2f6b05040d3b037cb95c1125d89e6f562 /database
parent3d94b13bb42e8215f75643657b2c73d286451fb8 (diff)
Add alert message support for ACLK new architecture (#11552)
* add alert messages * also clear date_cloud_ack * move buffer_create * remove include file * use wc->node_id
Diffstat (limited to 'database')
-rw-r--r--database/sqlite/sqlite_aclk.h2
-rw-r--r--database/sqlite/sqlite_aclk_alert.c538
-rw-r--r--database/sqlite/sqlite_aclk_alert.h17
-rw-r--r--database/sqlite/sqlite_aclk_chart.c9
-rw-r--r--database/sqlite/sqlite_functions.h10
5 files changed, 566 insertions, 10 deletions
diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h
index 13605d6a71..02a4322ed0 100644
--- a/database/sqlite/sqlite_aclk.h
+++ b/database/sqlite/sqlite_aclk.h
@@ -102,7 +102,7 @@ static inline char *get_str_from_uuid(uuid_t *uuid)
"end;"
#define TABLE_ACLK_ALERT "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \
- "alert_unique_id, date_created, date_submitted, " \
+ "alert_unique_id, date_created, date_submitted, date_cloud_ack, " \
"unique(alert_unique_id)); " \
"insert into aclk_alert_%s (alert_unique_id, date_created) " \
"select unique_id alert_unique_id, strftime('%%s') date_created from health_log_%s where new_status <> 0 and new_status <> -2 order by unique_id asc on conflict (alert_unique_id) do nothing;"
diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c
new file mode 100644
index 0000000000..a84db5986d
--- /dev/null
+++ b/database/sqlite/sqlite_aclk_alert.c
@@ -0,0 +1,538 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "sqlite_functions.h"
+#include "sqlite_aclk_alert.h"
+
+#include "../../aclk/aclk_alarm_api.h"
+
+// will replace call to aclk_update_alarm in health/health_log.c
+// and handle both cases
+void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae)
+{
+ //check aclk architecture and handle old json alarm update to cloud
+ //include also the valid statuses for this case
+ /* if (!aclk_architecture)
+ aclk_update_alarm(host, ae); */
+
+ if (ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED)
+ return;
+
+ if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED)
+ return;
+
+ if (unlikely(!host->dbsync_worker))
+ return;
+
+ if (unlikely(uuid_is_null(ae->config_hash_id)))
+ return;
+
+ struct aclk_database_cmd cmd;
+ memset(&cmd, 0, sizeof(cmd));
+ cmd.opcode = ACLK_DATABASE_ADD_ALERT;
+ cmd.data = ae;
+ cmd.completion = NULL;
+ aclk_database_enq_cmd((struct aclk_database_worker_config *) host->dbsync_worker, &cmd);
+ ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
+ return;
+}
+
+// stores an alert entry to aclk_alert_ table
+int aclk_add_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
+{
+ int rc = 0;
+
+ CHECK_SQLITE_CONNECTION(db_meta);
+
+ sqlite3_stmt *res_alert = NULL;
+ ALARM_ENTRY *ae = cmd.data;
+
+ BUFFER *sql = buffer_create(1024);
+
+ buffer_sprintf(
+ sql,
+ "INSERT INTO aclk_alert_%s (alert_unique_id, date_created) "
+ "VALUES (@alert_unique_id, strftime('%%s')); ",
+ wc->uuid_str);
+
+ rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res_alert, 0);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to store alert event");
+ buffer_free(sql);
+ return 1;
+ }
+
+ rc = sqlite3_bind_int(res_alert, 1, ae->unique_id);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = execute_insert(res_alert);
+ if (unlikely(rc != SQLITE_DONE))
+ error_report("Failed to store alert event %u, rc = %d", ae->unique_id, rc);
+
+bind_fail:
+ if (unlikely(sqlite3_finalize(res_alert) != SQLITE_OK))
+ error_report("Failed to reset statement in store alert event, rc = %d", rc);
+
+ buffer_free(sql);
+ return (rc != SQLITE_DONE);
+}
+
+int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status)
+{
+ switch(status) {
+ case RRDCALC_STATUS_REMOVED:
+ return ALARM_STATUS_REMOVED;
+
+ case RRDCALC_STATUS_UNDEFINED:
+ return ALARM_STATUS_NOT_A_NUMBER;
+
+ case RRDCALC_STATUS_CLEAR:
+ return ALARM_STATUS_CLEAR;
+
+ case RRDCALC_STATUS_WARNING:
+ return ALARM_STATUS_WARNING;
+
+ case RRDCALC_STATUS_CRITICAL:
+ return ALARM_STATUS_CRITICAL;
+
+ default:
+ return ALARM_STATUS_UNKNOWN;
+ }
+}
+
+void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
+{
+#ifndef ACLK_NG
+ UNUSED(wc);
+ UNUSED(cmd);
+#else
+ int rc;
+
+ if (unlikely(!wc->alert_updates)) {
+ debug(D_ACLK_SYNC,"Ignoring alert push event, updates have been turned off for node %s", wc->node_id);
+ return;
+ }
+
+ char *claim_id = is_agent_claimed();
+ if (unlikely(!claim_id))
+ return;
+
+ BUFFER *sql = buffer_create(1024);
+
+ if (wc->alerts_start_seq_id != 0) {
+ buffer_sprintf(
+ sql,
+ "UPDATE aclk_alert_%s SET date_submitted = NULL, date_cloud_ack = NULL WHERE sequence_id >= %" PRIu64
+ "; UPDATE aclk_alert_%s SET date_cloud_ack = strftime('%%s','now') WHERE sequence_id < %" PRIu64
+ " and date_cloud_ack is null",
+ wc->uuid_str,
+ wc->alerts_start_seq_id,
+ wc->uuid_str,
+ wc->alerts_start_seq_id);
+ db_execute(buffer_tostring(sql));
+ buffer_reset(sql);
+ wc->alerts_start_seq_id = 0;
+ }
+
+ int limit = cmd.count > 0 ? cmd.count : 1;
+
+ sqlite3_stmt *res = NULL;
+
+ buffer_sprintf(sql, "select aa.sequence_id, hl.unique_id, hl.alarm_id, hl.config_hash_id, hl.updated_by_id, hl.when_key, \
+ hl.duration, hl.non_clear_duration, hl.flags, hl.exec_run_timestamp, hl.delay_up_to_timestamp, hl.name, \
+ hl.chart, hl.family, hl.exec, hl.recipient, hl.source, hl.units, hl.info, hl.exec_code, hl.new_status, \
+ hl.old_status, hl.delay, hl.new_value, hl.old_value, hl.last_repeat \
+ from health_log_%s hl, aclk_alert_%s aa \
+ where hl.unique_id = aa.alert_unique_id and aa.date_submitted is null \
+ order by aa.sequence_id asc limit %d;", wc->uuid_str, wc->uuid_str, limit);
+
+ rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
+ if (rc != SQLITE_OK) {
+ error_report("Failed to prepare statement when trying to send an alert update via ACLK");
+ buffer_free(sql);
+ freez(claim_id);
+ return;
+ }
+
+ char uuid_str[GUID_LEN + 1];
+ uint64_t first_sequence_id = 0;
+ uint64_t last_sequence_id = 0;
+
+ while (sqlite3_step(res) == SQLITE_ROW) {
+ struct alarm_log_entry alarm_log;
+ char old_value_string[100 + 1];
+ char new_value_string[100 + 1];
+
+ alarm_log.node_id = strdupz(wc->node_id);
+ alarm_log.claim_id = claim_id;
+
+ alarm_log.chart = strdupz((char *)sqlite3_column_text(res, 12));
+ alarm_log.name = strdupz((char *)sqlite3_column_text(res, 11));
+ alarm_log.family = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) : NULL;
+
+ alarm_log.batch_id = wc->alerts_batch_id;
+ alarm_log.sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
+ alarm_log.when = (time_t) sqlite3_column_int64(res, 5);
+
+ uuid_unparse_lower(*((uuid_t *) sqlite3_column_blob(res, 3)), uuid_str);
+ alarm_log.config_hash = strdupz((char *)uuid_str);
+
+ alarm_log.utc_offset = wc->host->utc_offset;
+ alarm_log.timezone = strdupz((char *)wc->host->abbrev_timezone);
+ alarm_log.exec_path = sqlite3_column_bytes(res, 14) > 0 ? strdupz((char *)sqlite3_column_text(res, 14)) : strdupz((char *)wc->host->health_default_exec);
+ alarm_log.conf_source = strdupz((char *)sqlite3_column_text(res, 16));
+
+ char *edit_command = sqlite3_column_bytes(res, 16) > 0 ? health_edit_command_from_source((char *)sqlite3_column_text(res, 16)) : strdupz("UNKNOWN=0");
+ alarm_log.command = strdupz(edit_command);
+
+ alarm_log.duration = (time_t) sqlite3_column_int64(res, 6);
+ alarm_log.non_clear_duration = (time_t) sqlite3_column_int64(res, 7);
+ alarm_log.status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS) sqlite3_column_int(res, 20));
+ alarm_log.old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS) sqlite3_column_int(res, 21));
+ alarm_log.delay = (int) sqlite3_column_int(res, 22);
+ alarm_log.delay_up_to_timestamp = (time_t) sqlite3_column_int64(res, 10);
+ alarm_log.last_repeat = (time_t) sqlite3_column_int64(res, 25);
+
+ alarm_log.silenced = ( (sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_SILENCED) || ( sqlite3_column_type(res, 15) != SQLITE_NULL && !strncmp((char *)sqlite3_column_text(res,15), "silent", 6)) ) ? 1 : 0;
+
+ alarm_log.value_string = sqlite3_column_type(res, 23) == SQLITE_NULL ? strdupz((char *)"-") : strdupz((char *)format_value_and_unit(new_value_string, 100, sqlite3_column_double(res, 23), (char *) sqlite3_column_text(res, 17), -1));
+ alarm_log.old_value_string = sqlite3_column_type(res, 24) == SQLITE_NULL ? strdupz((char *)"-") : strdupz((char *)format_value_and_unit(old_value_string, 100, sqlite3_column_double(res, 24), (char *) sqlite3_column_text(res, 17), -1));
+
+ alarm_log.value = (calculated_number) sqlite3_column_double(res, 23);
+ alarm_log.old_value = (calculated_number) sqlite3_column_double(res, 24);
+
+ alarm_log.updated = (sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
+ alarm_log.rendered_info = strdupz((char *)sqlite3_column_text(res, 18));
+
+ info("DEBUG: %s pushing alert seq %" PRIu64 " - %" PRIu64"", wc->uuid_str, (uint64_t) sqlite3_column_int64(res, 0), (uint64_t) sqlite3_column_int64(res, 1));
+ aclk_send_alarm_log_entry(&alarm_log);
+
+ if (first_sequence_id == 0)
+ first_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
+ last_sequence_id = (uint64_t) sqlite3_column_int64(res, 0);
+
+ destroy_alarm_log_entry(&alarm_log);
+ freez(edit_command);
+ }
+ buffer_flush(sql);
+
+ buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=strftime('%%s') "
+ "WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";",
+ wc->uuid_str, first_sequence_id, last_sequence_id);
+ db_execute(buffer_tostring(sql));
+
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to finalize statement to send alert entries from the database, rc = %d", rc);
+
+ freez(claim_id);
+ buffer_free(sql);
+#endif
+
+ return;
+}
+
+void aclk_send_alarm_health_log(char *node_id)
+{
+ if (unlikely(!node_id))
+ return;
+
+ struct aclk_database_worker_config *wc = NULL;
+ struct aclk_database_cmd cmd;
+ memset(&cmd, 0, sizeof(cmd));
+ cmd.opcode = ACLK_DATABASE_ALARM_HEALTH_LOG;
+
+ rrd_wrlock();
+ RRDHOST *host = find_host_by_node_id(node_id);
+ if (likely(host))
+ wc = (struct aclk_database_worker_config *)host->dbsync_worker;
+ rrd_unlock();
+ if (wc)
+ aclk_database_enq_cmd(wc, &cmd);
+ else {
+ if (aclk_worker_enq_cmd(node_id, &cmd))
+ error_report("ACLK synchronization thread is not active for node id %s", node_id);
+ }
+ return;
+}
+
+void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
+{
+ UNUSED(cmd);
+#ifndef ACLK_NG
+ UNUSED(wc);
+#else
+ int rc;
+
+ char *claim_id = is_agent_claimed();
+ if (unlikely(!claim_id))
+ return;
+
+ uint64_t first_sequence = 0;
+ uint64_t last_sequence = 0;
+ struct timeval first_timestamp;
+ struct timeval last_timestamp;
+
+ BUFFER *sql = buffer_create(1024);
+
+ sqlite3_stmt *res = NULL;
+
+ //TODO: make this better: include info from health log too
+ buffer_sprintf(sql, "select aa.sequence_id, aa.date_created, \
+ (select laa.sequence_id from aclk_alert_%s laa \
+ order by laa.sequence_id desc limit 1), \
+ (select laa.date_created from aclk_alert_%s laa \
+ order by laa.sequence_id desc limit 1) \
+ from aclk_alert_%s aa order by aa.sequence_id asc limit 1;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
+
+ rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
+ if (rc != SQLITE_OK) {
+ error_report("Failed to prepare statement to get health log statistics from the database");
+ buffer_free(sql);
+ freez(claim_id);
+ return;
+ }
+
+ first_timestamp.tv_sec = 0;
+ first_timestamp.tv_usec = 0;
+ last_timestamp.tv_sec = 0;
+ last_timestamp.tv_usec = 0;
+
+ while (sqlite3_step(res) == SQLITE_ROW) {
+ first_sequence = sqlite3_column_bytes(res, 0) > 0 ? (uint64_t) sqlite3_column_int64(res, 0) : 0;
+ if (sqlite3_column_bytes(res, 1) > 0) {
+ first_timestamp.tv_sec = sqlite3_column_int64(res, 1);
+ }
+
+ last_sequence = sqlite3_column_bytes(res, 2) > 0 ? (uint64_t) sqlite3_column_int64(res, 2) : 0;
+ if (sqlite3_column_bytes(res, 3) > 0) {
+ last_timestamp.tv_sec = sqlite3_column_int64(res, 3);
+ }
+ }
+
+ struct alarm_log_entries log_entries;
+ log_entries.first_seq_id = first_sequence;
+ log_entries.first_when = first_timestamp;
+ log_entries.last_seq_id = last_sequence;
+ log_entries.last_when = last_timestamp;
+
+ struct alarm_log_health alarm_log;
+ alarm_log.claim_id = claim_id;
+ alarm_log.node_id = strdupz(wc->node_id);
+ alarm_log.log_entries = log_entries;
+ alarm_log.status = wc->alert_updates == 0 ? 2 : 1;
+
+ wc->alert_sequence_id = last_sequence;
+
+ aclk_send_alarm_log_health(&alarm_log);
+
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement to get health log statistics from the database, rc = %d", rc);
+
+ freez((char *)alarm_log.node_id);
+ freez(claim_id);
+ buffer_free(sql);
+#endif
+
+ return;
+}
+
+void aclk_send_alarm_configuration(char *config_hash)
+{
+ if (unlikely(!config_hash))
+ return;
+
+ struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) localhost->dbsync_worker;
+
+ if (unlikely(!wc)) {
+ return;
+ }
+
+ struct aclk_database_cmd cmd;
+ memset(&cmd, 0, sizeof(cmd));
+ cmd.opcode = ACLK_DATABASE_PUSH_ALERT_CONFIG;
+ cmd.data_param = (void *) strdupz(config_hash);
+ cmd.completion = NULL;
+ aclk_database_enq_cmd(wc, &cmd);
+
+ return;
+}
+
+int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
+{
+ UNUSED(wc);
+#ifndef ACLK_NG
+ UNUSED(cmd);
+#else
+ int rc = 0;
+
+ CHECK_SQLITE_CONNECTION(db_meta);
+
+ sqlite3_stmt *res = NULL;
+
+ char *config_hash = (char *) cmd.data_param;
+ BUFFER *sql = buffer_create(1024);
+ buffer_sprintf(
+ sql,
+ "SELECT alarm, template, on_key, class, type, component, os, hosts, plugin, module, charts, families, lookup, every, units, green, red, calc, warn, crit, to_key, exec, delay, repeat, info, options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after, p_db_lookup_before, p_update_every FROM alert_hash WHERE hash_id = @hash_id;");
+
+ rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
+ if (rc != SQLITE_OK) {
+ error_report("Failed to prepare statement when trying to fetch a chart hash configuration");
+ goto fail;
+ }
+
+ uuid_t hash_uuid;
+ if (uuid_parse(config_hash, hash_uuid))
+ goto fail;
+
+ rc = sqlite3_bind_blob(res, 1, &hash_uuid , sizeof(hash_uuid), SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ struct aclk_alarm_configuration alarm_config;
+ struct provide_alarm_configuration p_alarm_config;
+ p_alarm_config.cfg_hash = NULL;
+
+ if (sqlite3_step(res) == SQLITE_ROW) {
+
+ alarm_config.alarm = sqlite3_column_bytes(res, 0) > 0 ? strdupz((char *)sqlite3_column_text(res, 0)) : NULL;
+ alarm_config.tmpl = sqlite3_column_bytes(res, 1) > 0 ? strdupz((char *)sqlite3_column_text(res, 1)) : NULL;
+ alarm_config.on_chart = sqlite3_column_bytes(res, 2) > 0 ? strdupz((char *)sqlite3_column_text(res, 2)) : NULL;
+ alarm_config.classification = sqlite3_column_bytes(res, 3) > 0 ? strdupz((char *)sqlite3_column_text(res, 3)) : NULL;
+ alarm_config.type = sqlite3_column_bytes(res, 4) > 0 ? strdupz((char *)sqlite3_column_text(res, 4)) : NULL;
+ alarm_config.component = sqlite3_column_bytes(res, 5) > 0 ? strdupz((char *)sqlite3_column_text(res, 5)) : NULL;
+
+ alarm_config.os = sqlite3_column_bytes(res, 6) > 0 ? strdupz((char *)sqlite3_column_text(res, 6)) : NULL;
+ alarm_config.hosts = sqlite3_column_bytes(res, 7) > 0 ? strdupz((char *)sqlite3_column_text(res, 7)) : NULL;
+ alarm_config.plugin = sqlite3_column_bytes(res, 8) > 0 ? strdupz((char *)sqlite3_column_text(res, 8)) : NULL;
+ alarm_config.module = sqlite3_column_bytes(res, 9) > 0 ? strdupz((char *)sqlite3_column_text(res, 9)) : NULL;
+ alarm_config.charts = sqlite3_column_bytes(res, 10) > 0 ? strdupz((char *)sqlite3_column_text(res, 10)) : NULL;
+ alarm_config.families = sqlite3_column_bytes(res, 11) > 0 ? strdupz((char *)sqlite3_column_text(res, 11)) : NULL;
+ alarm_config.lookup = sqlite3_column_bytes(res, 12) > 0 ? strdupz((char *)sqlite3_column_text(res, 12)) : NULL;
+ alarm_config.every = sqlite3_column_bytes(res, 13) > 0 ? strdupz((char *)sqlite3_column_text(res, 13)) : NULL;
+ alarm_config.units = sqlite3_column_bytes(res, 14) > 0 ? strdupz((char *)sqlite3_column_text(res, 14)) : NULL;
+
+ alarm_config.green = sqlite3_column_bytes(res, 15) > 0 ? strdupz((char *)sqlite3_column_text(res, 15)) : NULL;
+ alarm_config.red = sqlite3_column_bytes(res, 16) > 0 ? strdupz((char *)sqlite3_column_text(res, 16)) : NULL;
+
+ alarm_config.calculation_expr = sqlite3_column_bytes(res, 17) > 0 ? strdupz((char *)sqlite3_column_text(res, 17)) : NULL;
+ alarm_config.warning_expr = sqlite3_column_bytes(res, 18) > 0 ? strdupz((char *)sqlite3_column_text(res, 18)) : NULL;
+ alarm_config.critical_expr = sqlite3_column_bytes(res, 19) > 0 ? strdupz((char *)sqlite3_column_text(res, 19)) : NULL;
+
+ alarm_config.recipient = sqlite3_column_bytes(res, 20) > 0 ? strdupz((char *)sqlite3_column_text(res, 20)) : NULL;
+ alarm_config.exec = sqlite3_column_bytes(res, 21) > 0 ? strdupz((char *)sqlite3_column_text(res, 21)) : NULL;
+ alarm_config.delay = sqlite3_column_bytes(res, 22) > 0 ? strdupz((char *)sqlite3_column_text(res, 22)) : NULL;
+ alarm_config.repeat = sqlite3_column_bytes(res, 23) > 0 ? strdupz((char *)sqlite3_column_text(res, 23)) : NULL;
+ alarm_config.info = sqlite3_column_bytes(res, 24) > 0 ? strdupz((char *)sqlite3_column_text(res, 24)) : NULL;
+ alarm_config.options = sqlite3_column_bytes(res, 25) > 0 ? strdupz((char *)sqlite3_column_text(res, 25)) : NULL;
+ alarm_config.host_labels = sqlite3_column_bytes(res, 26) > 0 ? strdupz((char *)sqlite3_column_text(res, 26)) : NULL;
+
+ alarm_config.p_db_lookup_dimensions = NULL;
+ alarm_config.p_db_lookup_method = NULL;
+ alarm_config.p_db_lookup_options = NULL;
+ alarm_config.p_db_lookup_after = 0;
+ alarm_config.p_db_lookup_before = 0;
+
+ if (sqlite3_column_bytes(res, 30) > 0) {
+
+ alarm_config.p_db_lookup_dimensions = sqlite3_column_bytes(res, 27) > 0 ? strdupz((char *)sqlite3_column_text(res, 27)) : NULL;
+ alarm_config.p_db_lookup_method = sqlite3_column_bytes(res, 28) > 0 ? strdupz((char *)sqlite3_column_text(res, 28)) : NULL;
+
+ BUFFER *tmp_buf = buffer_create(1024);
+ buffer_data_options2string(tmp_buf, sqlite3_column_int(res, 29));
+ alarm_config.p_db_lookup_options = strdupz((char *)buffer_tostring(tmp_buf));
+ buffer_free(tmp_buf);
+
+ alarm_config.p_db_lookup_after = sqlite3_column_int(res, 30);
+ alarm_config.p_db_lookup_before = sqlite3_column_int(res, 31);
+ }
+
+ alarm_config.p_update_every = sqlite3_column_int(res, 32);
+
+ p_alarm_config.cfg_hash = strdupz((char *) config_hash);
+ p_alarm_config.cfg = alarm_config;
+ }
+
+ if (likely(p_alarm_config.cfg_hash)) {
+ debug(D_ACLK_SYNC, "Sending alert config for %s", config_hash);
+ aclk_send_provide_alarm_cfg(&p_alarm_config);
+ freez((char *) cmd.data_param);
+ freez(p_alarm_config.cfg_hash);
+ destroy_aclk_alarm_configuration(&alarm_config);
+ }
+ else
+ info("DEBUG: Alert config for %s not found", config_hash);
+
+ bind_fail:
+ rc = sqlite3_finalize(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement when pushing alarm config hash, rc = %d", rc);
+
+ fail:
+ buffer_free(sql);
+
+ return rc;
+#endif
+ return 0;
+}
+
+
+// Start streaming alerts
+void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start_seq_id)
+{
+#ifdef ACLK_NG
+ if (unlikely(!node_id))
+ return;
+
+ uuid_t node_uuid;
+ if (uuid_parse(node_id, node_uuid))
+ return;
+
+ struct aclk_database_worker_config *wc = NULL;
+ rrd_wrlock();
+ RRDHOST *host = find_host_by_node_id(node_id);
+ if (likely(host))
+ wc = (struct aclk_database_worker_config *)host->dbsync_worker;
+ rrd_unlock();
+
+ if (likely(wc)) {
+ info("START streaming alerts for %s enabled with batch_id %"PRIu64" and start_seq_id %"PRIu64, node_id, batch_id, start_seq_id);
+ __sync_synchronize();
+ wc->alerts_batch_id = batch_id;
+ wc->alerts_start_seq_id = start_seq_id;
+ wc->alert_updates = 1;
+ __sync_synchronize();
+ }
+ else
+ error("ACLK synchronization thread is not active for host %s", host->hostname);
+
+#else
+ UNUSED(node_id);
+ UNUSED(start_seq_id);
+ UNUSED(batch_id);
+#endif
+ return;
+}
+
+int sql_queue_removed_alerts_to_aclk(RRDHOST *host)
+{
+ CHECK_SQLITE_CONNECTION(db_meta);
+
+ struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) host->dbsync_worker;
+ if (unlikely(!wc)) {
+ return 1;
+ }
+
+ BUFFER *sql = buffer_create(1024);
+
+ buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created) " \
+ "select unique_id alert_unique_id, strftime('%%s') date_created from health_log_%s where new_status = -2 and updated_by_id = 0 and unique_id not in (select alert_unique_id from aclk_alert_%s) order by unique_id asc on conflict (alert_unique_id) do nothing;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
+
+ db_execute(buffer_tostring(sql));
+
+ buffer_free(sql);
+
+ return 0;
+}
diff --git a/database/sqlite/sqlite_aclk_alert.h b/database/sqlite/sqlite_aclk_alert.h
new file mode 100644
index 0000000000..1c2a4a6266
--- /dev/null
+++ b/database/sqlite/sqlite_aclk_alert.h
@@ -0,0 +1,17 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_SQLITE_ACLK_ALERT_H
+#define NETDATA_SQLITE_ACLK_ALERT_H
+
+extern sqlite3 *db_meta;
+
+int aclk_add_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
+void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
+void aclk_send_alarm_health_log(char *node_id);
+void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
+void aclk_send_alarm_configuration (char *config_hash);
+int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
+void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start_seq_id);
+int sql_queue_removed_alerts_to_aclk(RRDHOST *host);
+
+#endif //NETDATA_SQLITE_ACLK_ALERT_H
diff --git a/database/sqlite/sqlite_aclk_chart.c b/database/sqlite/sqlite_aclk_chart.c
index 87aebd4054..8da62fbb76 100644
--- a/database/sqlite/sqlite_aclk_chart.c
+++ b/database/sqlite/sqlite_aclk_chart.c
@@ -5,15 +5,6 @@
#include "../../aclk/aclk_charts_api.h"
-#define CHECK_SQLITE_CONNECTION(db_meta) \
- if (unlikely(!db_meta)) { \
- if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) { \
- return 1; \
- } \
- error_report("Database has not been initialized"); \
- return 1; \
- }
-
static inline int sql_queue_chart_payload(struct aclk_database_worker_config *wc,
void *data, enum aclk_database_opcode opcode)
{
diff --git a/database/sqlite/sqlite_functions.h b/database/sqlite/sqlite_functions.h
index 8389d17542..49cf854bfa 100644
--- a/database/sqlite/sqlite_functions.h
+++ b/database/sqlite/sqlite_functions.h
@@ -39,6 +39,16 @@ struct node_instance_list {
#define SQL_STORE_ACTIVE_DIMENSION \
"insert or replace into dimension_active (dim_id, date_created) values (@id, strftime('%s'));"
+
+#define CHECK_SQLITE_CONNECTION(db_meta) \
+ if (unlikely(!db_meta)) { \
+ if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) { \
+ return 1; \
+ } \
+ error_report("Database has not been initialized"); \
+ return 1; \
+ }
+
extern int sql_init_database(void);
extern void sql_close_database(void);