diff options
author | Emmanuel Vasilakis <mrzammler@mm.st> | 2022-05-02 18:36:56 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-02 18:36:56 +0300 |
commit | d6b1756ea7c0dc28b338aff67800e932d8fe97ec (patch) | |
tree | c11bd6aac932001abe3d1b4b697c6cd90f1abcef | |
parent | 90c2fcb838dc711e6706713510b0d07916d94829 (diff) |
Reduce alert events sent to the cloud. (#12544)
* filter
* update filter
* queue removed directly
* more
* logging
* cleanup
* cleanup 2
* cleanup 3
* finalize instead of reset
-rw-r--r-- | database/sqlite/sqlite_aclk.c | 2 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk.h | 4 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_alert.c | 150 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_alert.h | 1 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_chart.h | 1 | ||||
-rw-r--r-- | health/health.c | 2 | ||||
-rw-r--r-- | health/health_log.c | 2 |
7 files changed, 147 insertions, 15 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index 9893280971..9945e18ad2 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -628,7 +628,7 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id) db_execute(buffer_tostring(sql)); buffer_flush(sql); - buffer_sprintf(sql, TABLE_ACLK_ALERT, uuid_str, uuid_str, uuid_str); + buffer_sprintf(sql, TABLE_ACLK_ALERT, uuid_str); db_execute(buffer_tostring(sql)); buffer_flush(sql); diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h index 894d934897..792c2d21ed 100644 --- a/database/sqlite/sqlite_aclk.h +++ b/database/sqlite/sqlite_aclk.h @@ -103,9 +103,7 @@ static inline char *get_str_from_uuid(uuid_t *uuid) #define TABLE_ACLK_ALERT "CREATE TABLE IF NOT EXISTS aclk_alert_%s (sequence_id INTEGER PRIMARY KEY, " \ "alert_unique_id, date_created, date_submitted, date_cloud_ack, " \ - "unique(alert_unique_id)); " \ - "insert into aclk_alert_%s (alert_unique_id, date_created) " \ - "select unique_id alert_unique_id, strftime('%%s') date_created from health_log_%s where new_status <> 0 and new_status <> -2 order by unique_id asc on conflict (alert_unique_id) do nothing;" + "unique(alert_unique_id));" #define INDEX_ACLK_CHART "CREATE INDEX IF NOT EXISTS aclk_chart_index_%s ON aclk_chart_%s (unique_id);" diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c index 54e8be4a79..f5618592fe 100644 --- a/database/sqlite/sqlite_aclk_alert.c +++ b/database/sqlite/sqlite_aclk_alert.c @@ -8,9 +8,120 @@ #include "../../aclk/aclk.h" #endif +time_t removed_when(uint32_t alarm_id, uint32_t before_unique_id, uint32_t after_unique_id, char *uuid_str) { + sqlite3_stmt *res = NULL; + int rc = 0; + time_t when = 0; + char sql[ACLK_SYNC_QUERY_SIZE]; + + snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select when_key from health_log_%s where alarm_id = %u " \ + "and unique_id > %u and unique_id < %u " \ + "and new_status = -2;", uuid_str, alarm_id, after_unique_id, before_unique_id); + + rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement when trying to find removed gap."); + return 0; + } + + rc = sqlite3_step(res); + if (likely(rc == SQLITE_ROW)) { + when = (time_t) sqlite3_column_int64(res, 0); + } + + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement when trying to find removed gap, rc = %d", rc); + + return when; +} + +#define MAX_REMOVED_PERIOD 900 +//decide if some events should be sent or not +int should_send_to_cloud(RRDHOST *host, ALARM_ENTRY *ae) +{ + sqlite3_stmt *res = NULL; + char uuid_str[GUID_LEN + 1]; + uuid_unparse_lower_fix(&host->host_uuid, uuid_str); + int send = 1, rc = 0; + + if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED) { + return 0; + } + + if (unlikely(uuid_is_null(ae->config_hash_id))) + return 0; + + char sql[ACLK_SYNC_QUERY_SIZE]; + uuid_t config_hash_id; + RRDCALC_STATUS status; + uint32_t unique_id; + + //get the previous sent event of this alarm_id + snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "select hl.new_status, hl.config_hash_id, hl.unique_id from health_log_%s hl, aclk_alert_%s aa \ + where hl.unique_id = aa.alert_unique_id \ + and hl.alarm_id = %u and hl.unique_id <> %u \ + order by alarm_event_id desc LIMIT 1;", uuid_str, uuid_str, ae->alarm_id, ae->unique_id); + + rc = sqlite3_prepare_v2(db_meta, sql, -1, &res, 0); + if (rc != SQLITE_OK) { + error_report("Failed to prepare statement when trying to filter alert events."); + send = 1; + return send; + } + + rc = sqlite3_step(res); + if (likely(rc == SQLITE_ROW)) { + status = (RRDCALC_STATUS) sqlite3_column_int(res, 0); + if (sqlite3_column_type(res, 1) != SQLITE_NULL) + uuid_copy(config_hash_id, *((uuid_t *) sqlite3_column_blob(res, 1))); + unique_id = (uint32_t) sqlite3_column_int64(res, 2); + + } else { + send = 1; + goto done; + } + + if (ae->new_status != (RRDCALC_STATUS)status) { + send = 1; + goto done; + } + + if (uuid_compare(ae->config_hash_id, config_hash_id)) { + send = 1; + goto done; + } + + //same status, same config + if (ae->new_status == RRDCALC_STATUS_CLEAR) { + send = 0; + goto done; + } + + //detect a long off period of the agent, TODO make global + if (ae->new_status == RRDCALC_STATUS_WARNING || ae->new_status == RRDCALC_STATUS_CRITICAL) { + time_t when = removed_when(ae->alarm_id, ae->unique_id, unique_id, uuid_str); + + if (when && (when + (time_t)MAX_REMOVED_PERIOD) < ae->when) { + send = 1; + goto done; + } else { + send = 0; + goto done; + } + } + +done: + rc = sqlite3_finalize(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to finalize statement when trying to filter alert events, rc = %d", rc); + + return send; +} + // will replace call to aclk_update_alarm in health/health_log.c // and handle both cases -int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae) +int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter) { //check aclk architecture and handle old json alarm update to cloud //include also the valid statuses for this case @@ -30,17 +141,18 @@ int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae) if (!claimed()) return 0; - if (ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED) - return 0; - - if (ae->new_status == RRDCALC_STATUS_REMOVED || ae->new_status == RRDCALC_STATUS_UNINITIALIZED) - return 0; - if (unlikely(!host->dbsync_worker)) return 1; - if (unlikely(uuid_is_null(ae->config_hash_id))) + if (ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED) { return 0; + } + + if (!skip_filter) { + if (!should_send_to_cloud(host, ae)) { + return 0; + } + } int rc = 0; @@ -296,6 +408,22 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d return; } +void sql_queue_existing_alerts_to_aclk(RRDHOST *host) +{ + char uuid_str[GUID_LEN + 1]; + uuid_unparse_lower_fix(&host->host_uuid, uuid_str); + BUFFER *sql = buffer_create(1024); + + buffer_sprintf(sql,"insert into aclk_alert_%s (alert_unique_id, date_created) " \ + "select unique_id alert_unique_id, strftime('%%s') date_created from health_log_%s " \ + "where new_status <> 0 and new_status <> -2 and config_hash_id is not null and updated_by_id = 0 " \ + "order by unique_id asc on conflict (alert_unique_id) do nothing;", uuid_str, uuid_str); + + db_execute(buffer_tostring(sql)); + + buffer_free(sql); +} + void aclk_send_alarm_health_log(char *node_id) { if (unlikely(!node_id)) @@ -593,6 +721,9 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start log_access("ACLK STA [%s (N/A)]: Ignoring request to stream alert state changes, health is disabled.", node_id); return; } + + if (unlikely(batch_id == 1) && unlikely(start_seq_id == 1)) + sql_queue_existing_alerts_to_aclk(host); } else wc = (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id); @@ -644,6 +775,9 @@ void sql_queue_removed_alerts_to_aclk(RRDHOST *host) if (unlikely(!host->dbsync_worker)) return; + if (!claimed()) + return; + struct aclk_database_cmd cmd; memset(&cmd, 0, sizeof(cmd)); cmd.opcode = ACLK_DATABASE_QUEUE_REMOVED_ALERTS; diff --git a/database/sqlite/sqlite_aclk_alert.h b/database/sqlite/sqlite_aclk_alert.h index 957cb94ac8..0181b48420 100644 --- a/database/sqlite/sqlite_aclk_alert.h +++ b/database/sqlite/sqlite_aclk_alert.h @@ -26,5 +26,6 @@ void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config void aclk_push_alert_snapshot_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); void aclk_process_send_alarm_snapshot(char *node_id, char *claim_id, uint64_t snapshot_id, uint64_t sequence_id); int get_proto_alert_status(RRDHOST *host, struct proto_alert_status *proto_alert_status); +extern int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae, int skip_filter); #endif //NETDATA_SQLITE_ACLK_ALERT_H diff --git a/database/sqlite/sqlite_aclk_chart.h b/database/sqlite/sqlite_aclk_chart.h index 1d25de24ed..b698c58275 100644 --- a/database/sqlite/sqlite_aclk_chart.h +++ b/database/sqlite/sqlite_aclk_chart.h @@ -39,7 +39,6 @@ struct aclk_chart_sync_stats { extern int queue_chart_to_aclk(RRDSET *st); extern int queue_dimension_to_aclk(RRDDIM *rd); extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id); -extern int sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae); int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); int aclk_send_chart_config(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); diff --git a/health/health.c b/health/health.c index 5ac88fec71..e81c933303 100644 --- a/health/health.c +++ b/health/health.c @@ -804,7 +804,7 @@ void *health_main(void *ptr) { rc->value = NAN; #if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL) if (netdata_cloud_setting && likely(!aclk_alert_reloaded)) - sql_queue_removed_alerts_to_aclk(host); + sql_queue_alarm_to_aclk(host, ae, 1); #endif } } diff --git a/health/health_log.c b/health/health_log.c index 4959aa179a..54f6dc9fc4 100644 --- a/health/health_log.c +++ b/health/health_log.c @@ -162,7 +162,7 @@ inline void health_alarm_log_save(RRDHOST *host, ALARM_ENTRY *ae) { #ifdef ENABLE_ACLK if (netdata_cloud_setting) { - sql_queue_alarm_to_aclk(host, ae); + sql_queue_alarm_to_aclk(host, ae, 0); } #endif } |