summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorEmmanuel Vasilakis <mrzammler@mm.st>2021-10-19 11:30:10 +0300
committerGitHub <noreply@github.com>2021-10-19 11:30:10 +0300
commit0882ed03b4000b6f9e1f64743321e4cd6e2aa39f (patch)
tree4faececbbd9434ec18ae9a068b605edb29285320 /database
parentbc8b5a8add5abf772a2ad291bad4448099677ea8 (diff)
Add snapshot message and calls to sql_queue_removed_alerts_to_aclk (#11664)
Diffstat (limited to 'database')
-rw-r--r--database/sqlite/sqlite_aclk.c4
-rw-r--r--database/sqlite/sqlite_aclk.h3
-rw-r--r--database/sqlite/sqlite_aclk_alert.c246
-rw-r--r--database/sqlite/sqlite_aclk_alert.h2
4 files changed, 254 insertions, 1 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c
index 78c42678ec..73493a7e87 100644
--- a/database/sqlite/sqlite_aclk.c
+++ b/database/sqlite/sqlite_aclk.c
@@ -412,6 +412,10 @@ void aclk_database_worker(void *arg)
debug(D_ACLK_SYNC, "Pushing alarm health log to the cloud for %s", wc->host_guid);
aclk_push_alarm_health_log(wc, cmd);
break;
+ case ACLK_DATABASE_PUSH_ALERT_SNAPSHOT:
+ debug(D_ACLK_SYNC, "Pushing alert snapshot to the cloud for node %s", wc->host_guid);
+ aclk_push_alert_snapshot_event(wc, cmd);
+ break;
// NODE OPERATIONS
case ACLK_DATABASE_NODE_INFO:
diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h
index 91ed065419..5616635c26 100644
--- a/database/sqlite/sqlite_aclk.h
+++ b/database/sqlite/sqlite_aclk.h
@@ -124,6 +124,7 @@ enum aclk_database_opcode {
ACLK_DATABASE_NODE_INFO,
ACLK_DATABASE_PUSH_ALERT,
ACLK_DATABASE_PUSH_ALERT_CONFIG,
+ ACLK_DATABASE_PUSH_ALERT_SNAPSHOT,
ACLK_DATABASE_PUSH_CHART,
ACLK_DATABASE_PUSH_CHART_CONFIG,
ACLK_DATABASE_RESET_CHART,
@@ -170,6 +171,8 @@ struct aclk_database_worker_config {
uint64_t alerts_batch_id; // batch id for alerts to use
uint64_t alerts_start_seq_id; // cloud has asked to start streaming from
uint64_t alert_sequence_id; // last alert sequence_id
+ uint64_t alerts_snapshot_id; //will contain the snapshot_id value if snapshot was requested
+ uint64_t alerts_ack_sequence_id; //last sequence_id ack'ed from cloud via sendsnapshot message
uv_loop_t *loop;
RRDHOST *host;
uv_async_t async;
diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c
index dfc948b529..6193de5e18 100644
--- a/database/sqlite/sqlite_aclk_alert.c
+++ b/database/sqlite/sqlite_aclk_alert.c
@@ -147,7 +147,11 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
sql,
"UPDATE aclk_alert_%s SET date_submitted = NULL, date_cloud_ack = NULL WHERE sequence_id >= %"PRIu64
"; UPDATE aclk_alert_%s SET date_cloud_ack = strftime('%%s','now') WHERE sequence_id < %"PRIu64
- " and date_cloud_ack is null",
+ " and date_cloud_ack is null "
+ "; UPDATE aclk_alert_%s SET date_submitted = strftime('%%s','now') WHERE sequence_id < %"PRIu64
+ " and date_submitted is null",
+ wc->uuid_str,
+ wc->alerts_start_seq_id,
wc->uuid_str,
wc->alerts_start_seq_id,
wc->uuid_str,
@@ -556,6 +560,11 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start
int sql_queue_removed_alerts_to_aclk(RRDHOST *host)
{
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ if (!aclk_use_new_cloud_arch) {
+ return 0;
+ }
+
CHECK_SQLITE_CONNECTION(db_meta);
struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) host->dbsync_worker;
@@ -574,6 +583,241 @@ int sql_queue_removed_alerts_to_aclk(RRDHOST *host)
db_execute(buffer_tostring(sql));
buffer_free(sql);
+#else
+ UNUSED(host);
+#endif
+ return 0;
+}
+
+void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id)
+{
+ UNUSED(claim_id);
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ if (unlikely(!node_id))
+ return;
+
+ uuid_t node_uuid;
+ if (uuid_parse(node_id, node_uuid))
+ return;
+
+ struct aclk_database_worker_config *wc = NULL;
+ rrd_wrlock();
+ RRDHOST *host = find_host_by_node_id(node_id);
+ if (likely(host))
+ wc = (struct aclk_database_worker_config *)host->dbsync_worker;
+ rrd_unlock();
+
+ if (likely(wc)) {
+ info(
+ "Send alerts snapshot requested for %s with snapshot_id %" PRIu64 " and ack sequence_id %" PRIu64,
+ node_id,
+ snapshot_id,
+ sequence_id);
+ __sync_synchronize();
+ wc->alerts_snapshot_id = snapshot_id;
+ wc->alerts_ack_sequence_id = sequence_id;
+ __sync_synchronize();
+
+ struct aclk_database_cmd cmd;
+ memset(&cmd, 0, sizeof(cmd));
+ cmd.opcode = ACLK_DATABASE_PUSH_ALERT_SNAPSHOT;
+ cmd.data_param = NULL;
+ cmd.completion = NULL;
+ aclk_database_enq_cmd(wc, &cmd);
+ } else
+ error("ACLK synchronization thread is not active for host %s", host->hostname);
+#else
+ UNUSED(node_id);
+ UNUSED(snapshot_id);
+ UNUSED(sequence_id);
+#endif
+ return;
+}
+
+void aclk_mark_alert_cloud_ack(char *uuid_str, uint64_t alerts_ack_sequence_id)
+{
+ BUFFER *sql = buffer_create(1024);
+
+ if (alerts_ack_sequence_id != 0) {
+ buffer_sprintf(
+ sql,
+ "UPDATE aclk_alert_%s SET date_cloud_ack = strftime('%%s','now') WHERE sequence_id <= %" PRIu64 "",
+ uuid_str,
+ alerts_ack_sequence_id);
+ db_execute(buffer_tostring(sql));
+ }
+
+ buffer_free(sql);
+}
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_ENTRY *ae, RRDHOST *host)
+{
+ char *edit_command = ae->source ? health_edit_command_from_source(ae->source) : strdupz("UNKNOWN=0");
+ char config_hash_id[GUID_LEN + 1];
+ uuid_unparse_lower(ae->config_hash_id, config_hash_id);
+
+ alarm_log->chart = strdupz((char *)ae->chart);
+ alarm_log->name = strdupz((char *)ae->name);
+ alarm_log->family = strdupz((char *)ae->family);
+
+ alarm_log->batch_id = 0;
+ alarm_log->sequence_id = 0;
+ alarm_log->when = (time_t)ae->when;
+
+ alarm_log->config_hash = strdupz((char *)config_hash_id);
+
+ alarm_log->utc_offset = host->utc_offset;
+ alarm_log->timezone = strdupz((char *)host->abbrev_timezone);
+ alarm_log->exec_path = ae->exec ? strdupz((char *)ae->exec) : strdupz((char *)host->health_default_exec);
+ alarm_log->conf_source = strdupz(ae->source);
+
+ alarm_log->command = strdupz(edit_command);
+
+ alarm_log->duration = (time_t)ae->duration;
+ alarm_log->non_clear_duration = (time_t)ae->non_clear_duration;
+ alarm_log->status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->new_status);
+ alarm_log->old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->old_status);
+ alarm_log->delay = (int)ae->delay;
+ alarm_log->delay_up_to_timestamp = (time_t)ae->delay_up_to_timestamp;
+ alarm_log->last_repeat = (time_t)ae->last_repeat;
+
+ alarm_log->silenced =
+ ((ae->flags & HEALTH_ENTRY_FLAG_SILENCED) || (ae->recipient && !strncmp((char *)ae->recipient, "silent", 6))) ?
+ 1 :
+ 0;
+
+ alarm_log->value_string = strdupz(ae->new_value_string);
+ alarm_log->old_value_string = strdupz(ae->old_value_string);
+
+ alarm_log->value = (!isnan(ae->new_value)) ? (calculated_number)ae->new_value : 0;
+ alarm_log->old_value = (!isnan(ae->old_value)) ? (calculated_number)ae->old_value : 0;
+
+ alarm_log->updated = (ae->flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
+ alarm_log->rendered_info = strdupz(ae->info);
+
+ freez(edit_command);
+}
+#endif
+
+static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, time_t mark)
+{
+ ALARM_ENTRY *ae = host->health_log.alarms;
+
+ while (ae) {
+ if (ae->alarm_id == alarm_id && ae->unique_id > mark &&
+ (ae->new_status != RRDCALC_STATUS_WARNING && ae->new_status != RRDCALC_STATUS_CRITICAL))
+ return 1;
+ ae = ae->next;
+ }
return 0;
}
+
+#define ALARM_EVENTS_PER_CHUNK 10
+void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
+{
+#ifndef ENABLE_NEW_CLOUD_PROTOCOL
+ UNUSED(wc);
+ UNUSED(cmd);
+#else
+ UNUSED(cmd);
+ // we perhaps we don't need this for snapshots
+ if (unlikely(!wc->alert_updates)) {
+ debug(D_ACLK_SYNC, "Ignoring alert push snapshot event, updates have been turned off for node %s", wc->node_id);
+ return;
+ }
+
+ char *claim_id = is_agent_claimed();
+ if (unlikely(!claim_id))
+ return;
+
+ aclk_mark_alert_cloud_ack(wc->uuid_str, wc->alerts_ack_sequence_id);
+
+ RRDHOST *host = wc->host;
+ uint32_t cnt = 0;
+
+ netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
+
+ ALARM_ENTRY *ae = host->health_log.alarms;
+
+ for (; ae; ae = ae->next) {
+ if (likely(ae->updated_by_id))
+ continue;
+
+ if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
+ continue;
+
+ if (have_recent_alarm(host, ae->alarm_id, ae->unique_id))
+ continue;
+
+ cnt++;
+ }
+
+ if (cnt) {
+ uint32_t chunk = 1, chunks = 0;
+
+ chunks = (cnt / ALARM_EVENTS_PER_CHUNK) + (cnt % ALARM_EVENTS_PER_CHUNK != 0);
+ ae = host->health_log.alarms;
+
+ cnt = 0;
+ struct alarm_snapshot alarm_snap;
+ alarm_snap.node_id = wc->node_id;
+ alarm_snap.claim_id = claim_id;
+ alarm_snap.snapshot_id = wc->alerts_snapshot_id;
+ alarm_snap.chunks = chunks;
+ alarm_snap.chunk = chunk;
+
+ alarm_snapshot_proto_ptr_t snapshot_proto;
+ snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
+
+ for (; ae; ae = ae->next) {
+ if (likely(ae->updated_by_id))
+ continue;
+
+ if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED))
+ continue;
+
+ if (have_recent_alarm(host, ae->alarm_id, ae->unique_id))
+ continue;
+
+ struct alarm_log_entry alarm_log;
+ alarm_log.node_id = wc->node_id;
+ alarm_log.claim_id = claim_id;
+
+ health_alarm_entry2proto_nolock(&alarm_log, ae, host);
+ add_alarm_log_entry2snapshot(snapshot_proto, &alarm_log);
+
+ cnt++;
+
+ if (cnt == ALARM_EVENTS_PER_CHUNK) {
+ aclk_send_alarm_snapshot(snapshot_proto);
+
+ cnt = 0;
+
+ if (chunk < chunks) {
+ chunk++;
+
+ struct alarm_snapshot alarm_snap;
+ alarm_snap.node_id = wc->node_id;
+ alarm_snap.claim_id = claim_id;
+ alarm_snap.snapshot_id = wc->alerts_snapshot_id;
+ alarm_snap.chunks = chunks;
+ alarm_snap.chunk = chunk;
+
+ snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap);
+ }
+ }
+ destroy_alarm_log_entry(&alarm_log);
+ }
+ if (cnt)
+ aclk_send_alarm_snapshot(snapshot_proto);
+ }
+
+ netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
+ wc->alerts_snapshot_id = 0;
+
+ freez(claim_id);
+#endif
+ return;
+}
diff --git a/database/sqlite/sqlite_aclk_alert.h b/database/sqlite/sqlite_aclk_alert.h
index 1c2a4a6266..a60f9660af 100644
--- a/database/sqlite/sqlite_aclk_alert.h
+++ b/database/sqlite/sqlite_aclk_alert.h
@@ -13,5 +13,7 @@ void aclk_send_alarm_configuration (char *config_hash);
int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start_seq_id);
int sql_queue_removed_alerts_to_aclk(RRDHOST *host);
+void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
+void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id);
#endif //NETDATA_SQLITE_ACLK_ALERT_H