diff options
author | Emmanuel Vasilakis <mrzammler@mm.st> | 2021-10-19 11:30:10 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-19 11:30:10 +0300 |
commit | 0882ed03b4000b6f9e1f64743321e4cd6e2aa39f (patch) | |
tree | 4faececbbd9434ec18ae9a068b605edb29285320 /database | |
parent | bc8b5a8add5abf772a2ad291bad4448099677ea8 (diff) |
Add snapshot message and calls to sql_queue_removed_alerts_to_aclk (#11664)
Diffstat (limited to 'database')
-rw-r--r-- | database/sqlite/sqlite_aclk.c | 4 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk.h | 3 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_alert.c | 246 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_alert.h | 2 |
4 files changed, 254 insertions, 1 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index 78c42678ec..73493a7e87 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -412,6 +412,10 @@ void aclk_database_worker(void *arg) debug(D_ACLK_SYNC, "Pushing alarm health log to the cloud for %s", wc->host_guid); aclk_push_alarm_health_log(wc, cmd); break; + case ACLK_DATABASE_PUSH_ALERT_SNAPSHOT: + debug(D_ACLK_SYNC, "Pushing alert snapshot to the cloud for node %s", wc->host_guid); + aclk_push_alert_snapshot_event(wc, cmd); + break; // NODE OPERATIONS case ACLK_DATABASE_NODE_INFO: diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h index 91ed065419..5616635c26 100644 --- a/database/sqlite/sqlite_aclk.h +++ b/database/sqlite/sqlite_aclk.h @@ -124,6 +124,7 @@ enum aclk_database_opcode { ACLK_DATABASE_NODE_INFO, ACLK_DATABASE_PUSH_ALERT, ACLK_DATABASE_PUSH_ALERT_CONFIG, + ACLK_DATABASE_PUSH_ALERT_SNAPSHOT, ACLK_DATABASE_PUSH_CHART, ACLK_DATABASE_PUSH_CHART_CONFIG, ACLK_DATABASE_RESET_CHART, @@ -170,6 +171,8 @@ struct aclk_database_worker_config { uint64_t alerts_batch_id; // batch id for alerts to use uint64_t alerts_start_seq_id; // cloud has asked to start streaming from uint64_t alert_sequence_id; // last alert sequence_id + uint64_t alerts_snapshot_id; //will contain the snapshot_id value if snapshot was requested + uint64_t alerts_ack_sequence_id; //last sequence_id ack'ed from cloud via sendsnapshot message uv_loop_t *loop; RRDHOST *host; uv_async_t async; diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c index dfc948b529..6193de5e18 100644 --- a/database/sqlite/sqlite_aclk_alert.c +++ b/database/sqlite/sqlite_aclk_alert.c @@ -147,7 +147,11 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d sql, "UPDATE aclk_alert_%s SET date_submitted = NULL, date_cloud_ack = NULL WHERE sequence_id >= %"PRIu64 "; UPDATE aclk_alert_%s SET date_cloud_ack = strftime('%%s','now') WHERE sequence_id < %"PRIu64 - " and date_cloud_ack is null", + " and date_cloud_ack is null " + "; UPDATE aclk_alert_%s SET date_submitted = strftime('%%s','now') WHERE sequence_id < %"PRIu64 + " and date_submitted is null", + wc->uuid_str, + wc->alerts_start_seq_id, wc->uuid_str, wc->alerts_start_seq_id, wc->uuid_str, @@ -556,6 +560,11 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start int sql_queue_removed_alerts_to_aclk(RRDHOST *host) { +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + if (!aclk_use_new_cloud_arch) { + return 0; + } + CHECK_SQLITE_CONNECTION(db_meta); struct aclk_database_worker_config *wc = (struct aclk_database_worker_config *) host->dbsync_worker; @@ -574,6 +583,241 @@ int sql_queue_removed_alerts_to_aclk(RRDHOST *host) db_execute(buffer_tostring(sql)); buffer_free(sql); +#else + UNUSED(host); +#endif + return 0; +} + +void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id) +{ + UNUSED(claim_id); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + if (unlikely(!node_id)) + return; + + uuid_t node_uuid; + if (uuid_parse(node_id, node_uuid)) + return; + + struct aclk_database_worker_config *wc = NULL; + rrd_wrlock(); + RRDHOST *host = find_host_by_node_id(node_id); + if (likely(host)) + wc = (struct aclk_database_worker_config *)host->dbsync_worker; + rrd_unlock(); + + if (likely(wc)) { + info( + "Send alerts snapshot requested for %s with snapshot_id %" PRIu64 " and ack sequence_id %" PRIu64, + node_id, + snapshot_id, + sequence_id); + __sync_synchronize(); + wc->alerts_snapshot_id = snapshot_id; + wc->alerts_ack_sequence_id = sequence_id; + __sync_synchronize(); + + struct aclk_database_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + cmd.opcode = ACLK_DATABASE_PUSH_ALERT_SNAPSHOT; + cmd.data_param = NULL; + cmd.completion = NULL; + aclk_database_enq_cmd(wc, &cmd); + } else + error("ACLK synchronization thread is not active for host %s", host->hostname); +#else + UNUSED(node_id); + UNUSED(snapshot_id); + UNUSED(sequence_id); +#endif + return; +} + +void aclk_mark_alert_cloud_ack(char *uuid_str, uint64_t alerts_ack_sequence_id) +{ + BUFFER *sql = buffer_create(1024); + + if (alerts_ack_sequence_id != 0) { + buffer_sprintf( + sql, + "UPDATE aclk_alert_%s SET date_cloud_ack = strftime('%%s','now') WHERE sequence_id <= %" PRIu64 "", + uuid_str, + alerts_ack_sequence_id); + db_execute(buffer_tostring(sql)); + } + + buffer_free(sql); +} + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +void health_alarm_entry2proto_nolock(struct alarm_log_entry *alarm_log, ALARM_ENTRY *ae, RRDHOST *host) +{ + char *edit_command = ae->source ? health_edit_command_from_source(ae->source) : strdupz("UNKNOWN=0"); + char config_hash_id[GUID_LEN + 1]; + uuid_unparse_lower(ae->config_hash_id, config_hash_id); + + alarm_log->chart = strdupz((char *)ae->chart); + alarm_log->name = strdupz((char *)ae->name); + alarm_log->family = strdupz((char *)ae->family); + + alarm_log->batch_id = 0; + alarm_log->sequence_id = 0; + alarm_log->when = (time_t)ae->when; + + alarm_log->config_hash = strdupz((char *)config_hash_id); + + alarm_log->utc_offset = host->utc_offset; + alarm_log->timezone = strdupz((char *)host->abbrev_timezone); + alarm_log->exec_path = ae->exec ? strdupz((char *)ae->exec) : strdupz((char *)host->health_default_exec); + alarm_log->conf_source = strdupz(ae->source); + + alarm_log->command = strdupz(edit_command); + + alarm_log->duration = (time_t)ae->duration; + alarm_log->non_clear_duration = (time_t)ae->non_clear_duration; + alarm_log->status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->new_status); + alarm_log->old_status = rrdcalc_status_to_proto_enum((RRDCALC_STATUS)ae->old_status); + alarm_log->delay = (int)ae->delay; + alarm_log->delay_up_to_timestamp = (time_t)ae->delay_up_to_timestamp; + alarm_log->last_repeat = (time_t)ae->last_repeat; + + alarm_log->silenced = + ((ae->flags & HEALTH_ENTRY_FLAG_SILENCED) || (ae->recipient && !strncmp((char *)ae->recipient, "silent", 6))) ? + 1 : + 0; + + alarm_log->value_string = strdupz(ae->new_value_string); + alarm_log->old_value_string = strdupz(ae->old_value_string); + + alarm_log->value = (!isnan(ae->new_value)) ? (calculated_number)ae->new_value : 0; + alarm_log->old_value = (!isnan(ae->old_value)) ? (calculated_number)ae->old_value : 0; + + alarm_log->updated = (ae->flags & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0; + alarm_log->rendered_info = strdupz(ae->info); + + freez(edit_command); +} +#endif + +static int have_recent_alarm(RRDHOST *host, uint32_t alarm_id, time_t mark) +{ + ALARM_ENTRY *ae = host->health_log.alarms; + + while (ae) { + if (ae->alarm_id == alarm_id && ae->unique_id > mark && + (ae->new_status != RRDCALC_STATUS_WARNING && ae->new_status != RRDCALC_STATUS_CRITICAL)) + return 1; + ae = ae->next; + } return 0; } + +#define ALARM_EVENTS_PER_CHUNK 10 +void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +{ +#ifndef ENABLE_NEW_CLOUD_PROTOCOL + UNUSED(wc); + UNUSED(cmd); +#else + UNUSED(cmd); + // we perhaps we don't need this for snapshots + if (unlikely(!wc->alert_updates)) { + debug(D_ACLK_SYNC, "Ignoring alert push snapshot event, updates have been turned off for node %s", wc->node_id); + return; + } + + char *claim_id = is_agent_claimed(); + if (unlikely(!claim_id)) + return; + + aclk_mark_alert_cloud_ack(wc->uuid_str, wc->alerts_ack_sequence_id); + + RRDHOST *host = wc->host; + uint32_t cnt = 0; + + netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock); + + ALARM_ENTRY *ae = host->health_log.alarms; + + for (; ae; ae = ae->next) { + if (likely(ae->updated_by_id)) + continue; + + if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED)) + continue; + + if (have_recent_alarm(host, ae->alarm_id, ae->unique_id)) + continue; + + cnt++; + } + + if (cnt) { + uint32_t chunk = 1, chunks = 0; + + chunks = (cnt / ALARM_EVENTS_PER_CHUNK) + (cnt % ALARM_EVENTS_PER_CHUNK != 0); + ae = host->health_log.alarms; + + cnt = 0; + struct alarm_snapshot alarm_snap; + alarm_snap.node_id = wc->node_id; + alarm_snap.claim_id = claim_id; + alarm_snap.snapshot_id = wc->alerts_snapshot_id; + alarm_snap.chunks = chunks; + alarm_snap.chunk = chunk; + + alarm_snapshot_proto_ptr_t snapshot_proto; + snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap); + + for (; ae; ae = ae->next) { + if (likely(ae->updated_by_id)) + continue; + + if (unlikely(ae->new_status == RRDCALC_STATUS_UNINITIALIZED)) + continue; + + if (have_recent_alarm(host, ae->alarm_id, ae->unique_id)) + continue; + + struct alarm_log_entry alarm_log; + alarm_log.node_id = wc->node_id; + alarm_log.claim_id = claim_id; + + health_alarm_entry2proto_nolock(&alarm_log, ae, host); + add_alarm_log_entry2snapshot(snapshot_proto, &alarm_log); + + cnt++; + + if (cnt == ALARM_EVENTS_PER_CHUNK) { + aclk_send_alarm_snapshot(snapshot_proto); + + cnt = 0; + + if (chunk < chunks) { + chunk++; + + struct alarm_snapshot alarm_snap; + alarm_snap.node_id = wc->node_id; + alarm_snap.claim_id = claim_id; + alarm_snap.snapshot_id = wc->alerts_snapshot_id; + alarm_snap.chunks = chunks; + alarm_snap.chunk = chunk; + + snapshot_proto = generate_alarm_snapshot_proto(&alarm_snap); + } + } + destroy_alarm_log_entry(&alarm_log); + } + if (cnt) + aclk_send_alarm_snapshot(snapshot_proto); + } + + netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock); + wc->alerts_snapshot_id = 0; + + freez(claim_id); +#endif + return; +} diff --git a/database/sqlite/sqlite_aclk_alert.h b/database/sqlite/sqlite_aclk_alert.h index 1c2a4a6266..a60f9660af 100644 --- a/database/sqlite/sqlite_aclk_alert.h +++ b/database/sqlite/sqlite_aclk_alert.h @@ -13,5 +13,7 @@ void aclk_send_alarm_configuration (char *config_hash); int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start_seq_id); int sql_queue_removed_alerts_to_aclk(RRDHOST *host); +void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); +void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id); #endif //NETDATA_SQLITE_ACLK_ALERT_H |