diff options
author | Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> | 2021-10-06 20:55:31 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-06 20:55:31 +0300 |
commit | 12f16063f51e51a8e3c0e0ae727a634258b95219 (patch) | |
tree | 24a7eae9d61f6f5009a9d6d1e1b5d7e2870d64ca /database | |
parent | af93cc31eda9a2b7058c4b02d8f984331e5f544b (diff) |
Enable additional functionality for the new cloud architecture (#11579)
Diffstat (limited to 'database')
-rw-r--r-- | database/engine/rrdengine.c | 1 | ||||
-rw-r--r-- | database/rrd.h | 5 | ||||
-rw-r--r-- | database/rrddim.c | 17 | ||||
-rw-r--r-- | database/rrdhost.c | 2 | ||||
-rw-r--r-- | database/rrdset.c | 32 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk.c | 144 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk.h | 16 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_alert.c | 112 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_chart.c | 119 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_node.c | 160 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_node.h | 1 | ||||
-rw-r--r-- | database/sqlite/sqlite_functions.c | 208 | ||||
-rw-r--r-- | database/sqlite/sqlite_functions.h | 5 | ||||
-rw-r--r-- | database/sqlite/sqlite_health.c | 15 |
14 files changed, 644 insertions, 193 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index 0c4a401cb4..f48181d29a 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -861,6 +861,7 @@ static void after_delete_old_data(struct rrdengine_worker_config* wc) wc->now_deleting_files = NULL; wc->cleanup_thread_deleting_files = 0; + aclk_data_rotated(); /* interrupt event loop */ uv_stop(wc->loop); diff --git a/database/rrd.h b/database/rrd.h index ba59eac16b..95c49eb21e 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -50,6 +50,7 @@ struct context_param { uint8_t flags; }; +#define RRDSET_MINIMUM_LIVE_COUNT 3 #define META_CHART_UPDATED 1 #define META_PLUGIN_UPDATED 2 #define META_MODULE_UPDATED 4 @@ -1359,4 +1360,8 @@ extern void set_host_properties( #endif #include "sqlite/sqlite_functions.h" #include "sqlite/sqlite_aclk.h" +#include "sqlite/sqlite_aclk_chart.h" +#include "sqlite/sqlite_aclk_alert.h" +#include "sqlite/sqlite_aclk_node.h" +#include "sqlite/sqlite_health.h" #endif /* NETDATA_RRD_H */ diff --git a/database/rrddim.c b/database/rrddim.c index 66650ffbf2..51c4428c72 100644 --- a/database/rrddim.c +++ b/database/rrddim.c @@ -210,7 +210,7 @@ void rrdcalc_link_to_rrddim(RRDDIM *rd, RRDSET *st, RRDHOST *host) { } } #ifdef ENABLE_ACLK - rrdset_flag_set(st, RRDSET_FLAG_ACLK); + rrdset_flag_clear(st, RRDSET_FLAG_ACLK); #endif } @@ -387,6 +387,9 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte rd->last_collected_time.tv_usec = 0; rd->rrdset = st; rd->state = mallocz(sizeof(*rd->state)); +#ifdef ENABLE_ACLK + rd->state->aclk_live_status = -1; +#endif (void) find_dimension_uuid(st, rd, &(rd->state->metric_uuid)); if(memory_mode == RRD_MEMORY_MODE_DBENGINE) { #ifdef ENABLE_DBENGINE @@ -453,7 +456,7 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte rrdset_unlock(st); #ifdef ENABLE_ACLK - rrdset_flag_set(st, RRDSET_FLAG_ACLK); + rrdset_flag_clear(st, RRDSET_FLAG_ACLK); #endif return(rd); } @@ -521,7 +524,7 @@ void rrddim_free_custom(RRDSET *st, RRDDIM *rd, int db_rotated) } #ifdef ENABLE_ACLK if (db_rotated || RRD_MEMORY_MODE_DBENGINE != rrd_memory_mode) - rrdset_flag_set(st, RRDSET_FLAG_ACLK); + rrdset_flag_clear(st, RRDSET_FLAG_ACLK); #endif } @@ -542,7 +545,7 @@ int rrddim_hide(RRDSET *st, const char *id) { rrddim_flag_set(rd, RRDDIM_FLAG_HIDDEN); #ifdef ENABLE_ACLK - rrdset_flag_set(st, RRDSET_FLAG_ACLK); + rrdset_flag_clear(st, RRDSET_FLAG_ACLK); #endif return 0; } @@ -559,7 +562,7 @@ int rrddim_unhide(RRDSET *st, const char *id) { rrddim_flag_clear(rd, RRDDIM_FLAG_HIDDEN); #ifdef ENABLE_ACLK - rrdset_flag_set(st, RRDSET_FLAG_ACLK); + rrdset_flag_clear(st, RRDSET_FLAG_ACLK); #endif return 0; } @@ -574,7 +577,7 @@ inline void rrddim_is_obsolete(RRDSET *st, RRDDIM *rd) { rrddim_flag_set(rd, RRDDIM_FLAG_OBSOLETE); rrdset_flag_set(st, RRDSET_FLAG_OBSOLETE_DIMENSIONS); #ifdef ENABLE_ACLK - rrdset_flag_set(st, RRDSET_FLAG_ACLK); + rrdset_flag_clear(st, RRDSET_FLAG_ACLK); #endif } @@ -583,7 +586,7 @@ inline void rrddim_isnot_obsolete(RRDSET *st __maybe_unused, RRDDIM *rd) { rrddim_flag_clear(rd, RRDDIM_FLAG_OBSOLETE); #ifdef ENABLE_ACLK - rrdset_flag_set(st, RRDSET_FLAG_ACLK); + rrdset_flag_clear(st, RRDSET_FLAG_ACLK); #endif } diff --git a/database/rrdhost.c b/database/rrdhost.c index 59e2c6d77c..f6b1834aa0 100644 --- a/database/rrdhost.c +++ b/database/rrdhost.c @@ -746,7 +746,9 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info) { fatal("Failed to initialize dbengine"); } #endif +#ifdef ACLK_NEWARCH_DEVMODE sql_aclk_sync_init(); +#endif rrd_unlock(); web_client_api_v1_management_init(); diff --git a/database/rrdset.c b/database/rrdset.c index 0af189f77c..95e7eb0c2b 100644 --- a/database/rrdset.c +++ b/database/rrdset.c @@ -649,7 +649,7 @@ RRDSET *rrdset_create_custom( aclk_add_collector(host, st->plugin_name, st->module_name); } } - rrdset_flag_set(st, RRDSET_FLAG_ACLK); + rrdset_flag_clear(st, RRDSET_FLAG_ACLK); } #endif freez(old_plugin); @@ -935,12 +935,13 @@ RRDSET *rrdset_create_custom( update_chart_metadata(st->chart_uuid, st, id, name); store_active_chart(st->chart_uuid); + compute_chart_hash(st); rrdhost_unlock(host); #ifdef ENABLE_ACLK if (netdata_cloud_setting) aclk_add_collector(host, plugin, module); - rrdset_flag_set(st, RRDSET_FLAG_ACLK); + rrdset_flag_clear(st, RRDSET_FLAG_ACLK); #endif return(st); } @@ -1377,10 +1378,19 @@ void rrdset_done(RRDSET *st) { rrdset_rdlock(st); #ifdef ENABLE_ACLK - if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { - rrdset_flag_clear(st, RRDSET_FLAG_ACLK); + #ifdef ENABLE_NEW_CLOUD_PROTOCOL + if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { + if (st->counter_done >= RRDSET_MINIMUM_LIVE_COUNT) { + if (likely(!sql_queue_chart_to_aclk(st))) + rrdset_flag_set(st, RRDSET_FLAG_ACLK); + } + } + #else + if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { + rrdset_flag_set(st, RRDSET_FLAG_ACLK); aclk_update_chart(st->rrdhost, st->id, 1); } + #endif #endif if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE))) { @@ -1783,9 +1793,23 @@ after_first_database_work: after_second_database_work: st->last_collected_total = st->collected_total; +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + time_t mark = now_realtime_sec(); +#endif rrddim_foreach_read(rd, st) { if (rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) continue; + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + int live = ((mark - rd->last_collected_time.tv_sec) < (RRDSET_MINIMUM_LIVE_COUNT * rd->update_every)); + if (unlikely(live != rd->state->aclk_live_status)) { + if (likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { + if (likely(!sql_queue_dimension_to_aclk(rd))) { + rd->state->aclk_live_status = live; + } + } + } +#endif if(unlikely(!rd->updated)) continue; diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index db4b4008e6..78c42678ec 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -3,17 +3,17 @@ #include "sqlite_functions.h" #include "sqlite_aclk.h" -// TODO: To be added #include "sqlite_aclk_chart.h" -//#include "sqlite_aclk_alert.h" #include "sqlite_aclk_node.h" +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +#include "../../aclk/aclk.h" +#endif + const char *aclk_sync_config[] = { NULL, }; -int aclk_architecture = 0; - uv_mutex_t aclk_async_lock; struct aclk_database_worker_config *aclk_thread_head = NULL; @@ -78,24 +78,6 @@ void aclk_database_init_cmd_queue(struct aclk_database_worker_config *wc) fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex)); } -void aclk_database_enq_cmd_nowake(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd) -{ - unsigned queue_size; - - /* wait for free space in queue */ - uv_mutex_lock(&wc->cmd_mutex); - while ((queue_size = wc->queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) { - uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex); - } - fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE); - /* enqueue command */ - wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; - wc->cmd_queue.tail = wc->cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ? - wc->cmd_queue.tail + 1 : 0; - wc->queue_size = queue_size + 1; - uv_mutex_unlock(&wc->cmd_mutex); -} - int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd) { unsigned queue_size; @@ -207,7 +189,7 @@ int aclk_start_sync_thread(void *data, int argc, char **argv, char **column) void sql_aclk_sync_init(void) { -#ifdef ACLK_NEWARCH_DEVMODE +#ifdef ENABLE_NEW_CLOUD_PROTOCOL char *err_msg = NULL; int rc; @@ -251,35 +233,49 @@ static void async_cb(uv_async_t *handle) static void timer_cb(uv_timer_t* handle) { - struct aclk_database_worker_config *wc = handle->data; uv_stop(handle->loop); uv_update_time(handle->loop); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + struct aclk_database_worker_config *wc = handle->data; struct aclk_database_cmd cmd; memset(&cmd, 0, sizeof(cmd)); cmd.opcode = ACLK_DATABASE_TIMER; aclk_database_enq_cmd_noblock(wc, &cmd); - if (wc->cleanup_after && wc->cleanup_after < now_realtime_sec()) { + time_t now = now_realtime_sec(); + + if (wc->cleanup_after && wc->cleanup_after < now) { cmd.opcode = ACLK_DATABASE_CLEANUP; if (!aclk_database_enq_cmd_noblock(wc, &cmd)) wc->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL; } - if (wc->chart_updates && !wc->chart_pending) { - cmd.opcode = ACLK_DATABASE_PUSH_CHART; - cmd.count = ACLK_MAX_CHART_BATCH; - cmd.completion = NULL; - cmd.param1 = ACLK_MAX_CHART_BATCH_COUNT; - if (!aclk_database_enq_cmd_noblock(wc, &cmd)) - wc->chart_pending = 1; - } + if (aclk_use_new_cloud_arch && aclk_connected) { + if (wc->rotation_after && wc->rotation_after < now) { + cmd.opcode = ACLK_DATABASE_NODE_INFO; + aclk_database_enq_cmd_noblock(wc, &cmd); - if (wc->alert_updates) { - cmd.opcode = ACLK_DATABASE_PUSH_ALERT; - cmd.count = ACLK_MAX_ALERT_UPDATES; - aclk_database_enq_cmd_noblock(wc, &cmd); + cmd.opcode = ACLK_DATABASE_UPD_RETENTION; + if (!aclk_database_enq_cmd_noblock(wc, &cmd)) + wc->rotation_after += ACLK_DATABASE_ROTATION_INTERVAL; + } + + if (wc->chart_updates && !wc->chart_pending) { + cmd.opcode = ACLK_DATABASE_PUSH_CHART; + cmd.count = ACLK_MAX_CHART_BATCH; + cmd.param1 = ACLK_MAX_CHART_BATCH_COUNT; + if (!aclk_database_enq_cmd_noblock(wc, &cmd)) + wc->chart_pending = 1; + } + + if (wc->alert_updates) { + cmd.opcode = ACLK_DATABASE_PUSH_ALERT; + cmd.count = ACLK_MAX_ALERT_UPDATES; + aclk_database_enq_cmd_noblock(wc, &cmd); + } } +#endif } #define MAX_CMD_BATCH_SIZE (256) @@ -334,8 +330,14 @@ void aclk_database_worker(void *arg) wc->node_info_send = (wc->host && !localhost); aclk_add_worker_thread(wc); info("Starting ACLK sync thread for host %s -- scratch area %lu bytes", wc->host_guid, sizeof(*wc)); -// TODO: To be added -// sql_get_last_chart_sequence(wc, cmd); + + memset(&cmd, 0, sizeof(cmd)); + sql_get_last_chart_sequence(wc, cmd); + wc->chart_updates = 0; + wc->alert_updates = 0; + wc->startup_time = now_realtime_sec(); + wc->cleanup_after = wc->startup_time + ACLK_DATABASE_CLEANUP_FIRST; + wc->rotation_after = wc->startup_time + ACLK_DATABASE_ROTATION_DELAY; while (likely(shutdown == 0)) { uv_run(loop, UV_RUN_DEFAULT); @@ -345,11 +347,8 @@ void aclk_database_worker(void *arg) /* wait for commands */ cmd_batch_size = 0; do { - if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE)) { - info("DEBUG: %s Processed %u commands, current queue about %u", - wc->uuid_str, cmd_batch_size, wc->queue_size); + if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE)) break; - } cmd = aclk_database_deq_cmd(wc); opcode = cmd.opcode; ++cmd_batch_size; @@ -365,14 +364,6 @@ void aclk_database_worker(void *arg) if (wc->host == localhost) sql_check_aclk_table_list(wc); break; - case ACLK_DATABASE_CHECK: - debug(D_ACLK_SYNC, "Checking database dimensions for %s", wc->host_guid); -// sql_check_dimension_state(wc, cmd); - break; - case ACLK_DATABASE_CHECK_ROTATION: - debug(D_ACLK_SYNC, "Checking database for rotation %s", wc->host_guid); -// sql_check_rotation_state(wc, cmd); - break; case ACLK_DATABASE_DELETE_HOST: debug(D_ACLK_SYNC,"Cleaning ACLK tables for %s", (char *) cmd.data); sql_delete_aclk_table_list(wc, cmd); @@ -407,19 +398,19 @@ void aclk_database_worker(void *arg) // ALERTS case ACLK_DATABASE_ADD_ALERT: debug(D_ACLK_SYNC,"Adding alert event for %s", wc->host_guid); -// aclk_add_alert_event(wc, cmd); + aclk_add_alert_event(wc, cmd); break; case ACLK_DATABASE_PUSH_ALERT_CONFIG: debug(D_ACLK_SYNC,"Pushing chart config info to the cloud for %s", wc->host_guid); -// aclk_push_alert_config_event(wc, cmd); + aclk_push_alert_config_event(wc, cmd); break; case ACLK_DATABASE_PUSH_ALERT: debug(D_ACLK_SYNC, "Pushing alert info to the cloud for %s", wc->host_guid); -// aclk_push_alert_event(wc, cmd); + aclk_push_alert_event(wc, cmd); break; case ACLK_DATABASE_ALARM_HEALTH_LOG: debug(D_ACLK_SYNC, "Pushing alarm health log to the cloud for %s", wc->host_guid); -// aclk_push_alarm_health_log(wc, cmd); + aclk_push_alarm_health_log(wc, cmd); break; // NODE OPERATIONS @@ -427,8 +418,9 @@ void aclk_database_worker(void *arg) debug(D_ACLK_SYNC,"Sending node info for %s", wc->uuid_str); sql_build_node_info(wc, cmd); break; - case ACLK_DATABASE_UPD_STATS: -// sql_update_metric_statistics(wc, cmd); + case ACLK_DATABASE_UPD_RETENTION: + debug(D_ACLK_SYNC,"Sending retention info for %s", wc->uuid_str); + aclk_update_retention(wc, cmd); break; // NODE_INSTANCE DETECTION @@ -446,7 +438,7 @@ void aclk_database_worker(void *arg) } } } - if (wc->node_info_send && wc->host && localhost && claimed()) { + if (wc->node_info_send && wc->host && localhost && claimed() && aclk_connected) { cmd.opcode = ACLK_DATABASE_NODE_INFO; cmd.completion = NULL; wc->node_info_send = aclk_database_enq_cmd_noblock(wc, &cmd); @@ -471,7 +463,7 @@ void aclk_database_worker(void *arg) /* * uv_async_send after uv_close does not seem to crash in linux at the moment, - * it is however undocumented behaviour and we need to be aware if this becomes + * it is however undocumented behaviour we need to be aware if this becomes * an issue in the future. */ uv_close((uv_handle_t *)&wc->async, NULL); @@ -509,13 +501,9 @@ error_after_loop_init: // ------------------------------------------------------------- -void aclk_set_architecture(int mode) -{ - aclk_architecture = mode; -} - void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id) { +#ifdef ENABLE_ACLK char uuid_str[GUID_LEN + 1]; char host_guid[GUID_LEN + 1]; @@ -568,15 +556,17 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id) if (likely(host)) host->dbsync_worker = (void *) wc; wc->host = host; - wc->chart_updates = 0; - wc->alert_updates = 0; - wc->startup_time = now_realtime_sec(); - wc->cleanup_after = wc->startup_time + ACLK_DATABASE_CLEANUP_FIRST; strcpy(wc->uuid_str, uuid_str); strcpy(wc->host_guid, host_guid); if (node_id && !uuid_is_null(*node_id)) uuid_unparse_lower(*node_id, wc->node_id); fatal_assert(0 == uv_thread_create(&(wc->thread), aclk_database_worker, wc)); +#else + UNUSED(host); + UNUSED(host_uuid); + UNUSED(node_id); +#endif + return; } void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) @@ -722,19 +712,20 @@ void sql_check_aclk_table_list(struct aclk_database_worker_config *wc) return; } -void aclk_data_rotated(RRDHOST *host) +void aclk_data_rotated(void) { - UNUSED(host); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL - debug(D_ACLK_SYNC,"Processing data base rotation event"); - struct aclk_database_cmd cmd; - memset(&cmd, 0, sizeof(cmd)); - cmd.opcode = ACLK_DATABASE_UPD_STATS; + if (!aclk_use_new_cloud_arch || !aclk_connected) + return; + time_t next_rotation_time = now_realtime_sec()+ACLK_DATABASE_ROTATION_DELAY; rrd_wrlock(); RRDHOST *this_host = localhost; while (this_host) { - aclk_database_enq_cmd((struct aclk_database_worker_config *)this_host->dbsync_worker, &cmd); + struct aclk_database_worker_config *wc = this_host->dbsync_worker; + if (wc) + wc->rotation_after = next_rotation_time; this_host = this_host->next; } rrd_unlock(); @@ -743,9 +734,10 @@ void aclk_data_rotated(RRDHOST *host) uv_mutex_lock(&aclk_async_lock); while (tmp) { - aclk_database_enq_cmd(tmp, &cmd); + tmp->rotation_after = next_rotation_time; tmp = tmp->next; } uv_mutex_unlock(&aclk_async_lock); +#endif return; } diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h index 02a4322ed0..91ed065419 100644 --- a/database/sqlite/sqlite_aclk.h +++ b/database/sqlite/sqlite_aclk.h @@ -15,9 +15,10 @@ #define ACLK_MAX_CHART_BATCH_COUNT (10) #endif #define ACLK_MAX_ALERT_UPDATES (5) -#define ACLK_SYNC_RETRY_COUNT "10" #define ACLK_DATABASE_CLEANUP_FIRST (60) +#define ACLK_DATABASE_ROTATION_DELAY (60) #define ACLK_DATABASE_CLEANUP_INTERVAL (3600) +#define ACLK_DATABASE_ROTATION_INTERVAL (3600) #define ACLK_DELETE_ACK_INTERNAL (600) #define ACLK_SYNC_QUERY_SIZE 512 @@ -60,8 +61,6 @@ static inline void aclk_complete(struct aclk_completion *p) extern uv_mutex_t aclk_async_lock; -extern int aclk_architecture; - static inline void uuid_unparse_lower_fix(uuid_t *uuid, char *out) { uuid_unparse_lower(*uuid, out); @@ -120,8 +119,6 @@ enum aclk_database_opcode { ACLK_DATABASE_ADD_DIMENSION, ACLK_DATABASE_ALARM_HEALTH_LOG, ACLK_DATABASE_CHART_ACK, - ACLK_DATABASE_CHECK, - ACLK_DATABASE_CHECK_ROTATION, ACLK_DATABASE_CLEANUP, ACLK_DATABASE_DELETE_HOST, ACLK_DATABASE_NODE_INFO, @@ -130,11 +127,9 @@ enum aclk_database_opcode { ACLK_DATABASE_PUSH_CHART, ACLK_DATABASE_PUSH_CHART_CONFIG, ACLK_DATABASE_RESET_CHART, - ACLK_DATABASE_RESET_NODE, ACLK_DATABASE_SHUTDOWN, ACLK_DATABASE_TIMER, - ACLK_DATABASE_UPD_STATS, - ACLK_DATABASE_MAX_OPCODE + ACLK_DATABASE_UPD_RETENTION }; struct aclk_chart_payload_t { @@ -170,6 +165,7 @@ struct aclk_database_worker_config { time_t chart_timestamp; // last chart timestamp time_t cleanup_after; // Start a cleanup after this timestamp time_t startup_time; // When the sync thread started + time_t rotation_after; uint64_t batch_id; // batch id to use uint64_t alerts_batch_id; // batch id for alerts to use uint64_t alerts_start_seq_id; // cloud has asked to start streaming from @@ -215,13 +211,11 @@ extern sqlite3 *db_meta; extern int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd); extern void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd); -extern void aclk_set_architecture(int mode); extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id); int aclk_worker_enq_cmd(char *node_id, struct aclk_database_cmd *cmd); -void aclk_data_rotated(RRDHOST *host); +void aclk_data_rotated(void); void sql_aclk_sync_init(void); void sql_check_aclk_table_list(struct aclk_database_worker_config *wc); void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); -void sql_drop_host_aclk_table_list(uuid_t *host_uuid); void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); #endif //NETDATA_SQLITE_ACLK_H diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c index ce957c7823..846d9bdf2d 100644 --- a/database/sqlite/sqlite_aclk_alert.c +++ b/database/sqlite/sqlite_aclk_alert.c @@ -3,7 +3,10 @@ #include "sqlite_functions.h" #include "sqlite_aclk_alert.h" +#ifdef ENABLE_ACLK #include "../../aclk/aclk_alarm_api.h" +#include "../../aclk/aclk.h" +#endif // will replace call to aclk_update_alarm in health/health_log.c // and handle both cases @@ -11,8 +14,14 @@ void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae) { //check aclk architecture and handle old json alarm update to cloud //include also the valid statuses for this case - /* if (!aclk_architecture) - aclk_update_alarm(host, ae); */ +#ifdef ENABLE_ACLK + if (!aclk_use_new_cloud_arch) { + if ((ae->new_status == RRDCALC_STATUS_WARNING || ae->new_status == RRDCALC_STATUS_CRITICAL) || + ((ae->old_status == RRDCALC_STATUS_WARNING || ae->old_status == RRDCALC_STATUS_CRITICAL))) { + aclk_update_alarm(host, ae); + } + return; + } if (ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED) return; @@ -33,6 +42,10 @@ void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae) cmd.completion = NULL; aclk_database_enq_cmd((struct aclk_database_worker_config *) host->dbsync_worker, &cmd); ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED; +#else + UNUSED(host); + UNUSED(ae); +#endif return; } @@ -79,6 +92,7 @@ bind_fail: int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status) { +#ifdef ENABLE_ACLK switch(status) { case RRDCALC_STATUS_REMOVED: return ALARM_STATUS_REMOVED; @@ -98,6 +112,10 @@ int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status) default: return ALARM_STATUS_UNKNOWN; } +#else + UNUSED(status); + return 1; +#endif } void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) @@ -122,8 +140,8 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d if (wc->alerts_start_seq_id != 0) { buffer_sprintf( sql, - "UPDATE aclk_alert_%s SET date_submitted = NULL, date_cloud_ack = NULL WHERE sequence_id >= %" PRIu64 - "; UPDATE aclk_alert_%s SET date_cloud_ack = strftime('%%s','now') WHERE sequence_id < %" PRIu64 + "UPDATE aclk_alert_%s SET date_submitted = NULL, date_cloud_ack = NULL WHERE sequence_id >= %"PRIu64 + "; UPDATE aclk_alert_%s SET date_cloud_ack = strftime('%%s','now') WHERE sequence_id < %"PRIu64 " and date_cloud_ack is null", wc->uuid_str, wc->alerts_start_seq_id, @@ -163,7 +181,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d char old_value_string[100 + 1]; char new_value_string[100 + 1]; - alarm_log.node_id = strdupz(wc->node_id); + alarm_log.node_id = wc->node_id; alarm_log.claim_id = claim_id; alarm_log.chart = strdupz((char *)sqlite3_column_text(res, 12)); @@ -179,10 +197,13 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d alarm_log.utc_offset = wc->host->utc_offset; alarm_log.timezone = strdupz((char *)wc->host->abbrev_timezone); - alarm_log.exec_path = sqlite3_column_bytes(res, 14) > 0 ? strdupz((char *)sqlite3_column_text(res, 14)) : strdupz((char *)wc->host->health_default_exec); + alarm_log.exec_path = sqlite3_column_bytes(res, 14) > 0 ? strdupz((char *)sqlite3_column_text(res, 14)) : + strdupz((char *)wc->host->health_default_exec); alarm_log.conf_source = strdupz((char *)sqlite3_column_text(res, 16)); - char *edit_command = sqlite3_column_bytes(res, 16) > 0 ? health_edit_command_from_source((char *)sqlite3_column_text(res, 16)) : strdupz("UNKNOWN=0"); + char *edit_command = sqlite3_column_bytes(res, 16) > 0 ? + health_edit_command_from_source((char *)sqlite3_column_text(res, 16)) : + strdupz("UNKNOWN=0"); alarm_log.command = strdupz(edit_command); alarm_log.duration = (time_t) sqlite3_column_int64(res, 6); @@ -193,10 +214,23 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d alarm_log.delay_up_to_timestamp = (time_t) sqlite3_column_int64(res, 10); alarm_log.last_repeat = (time_t) sqlite3_column_int64(res, 25); - alarm_log.silenced = ( (sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_SILENCED) || ( sqlite3_column_type(res, 15) != SQLITE_NULL && !strncmp((char *)sqlite3_column_text(res,15), "silent", 6)) ) ? 1 : 0; + alarm_log.silenced = ((sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_SILENCED) || + (sqlite3_column_type(res, 15) != SQLITE_NULL && + !strncmp((char *)sqlite3_column_text(res, 15), "silent", 6))) ? + 1 : + 0; + + alarm_log.value_string = + sqlite3_column_type(res, 23) == SQLITE_NULL ? + strdupz((char *)"-") : + strdupz((char *)format_value_and_unit( + new_value_string, 100, sqlite3_column_double(res, 23), (char *)sqlite3_column_text(res, 17), -1)); - alarm_log.value_string = sqlite3_column_type(res, 23) == SQLITE_NULL ? strdupz((char *)"-") : strdupz((char *)format_value_and_unit(new_value_string, 100, sqlite3_column_double(res, 23), (char *) sqlite3_column_text(res, 17), -1)); - alarm_log.old_value_string = sqlite3_column_type(res, 24) == SQLITE_NULL ? strdupz((char *)"-") : strdupz((char *)format_value_and_unit(old_value_string, 100, sqlite3_column_double(res, 24), (char *) sqlite3_column_text(res, 17), -1)); + alarm_log.old_value_string = + sqlite3_column_type(res, 24) == SQLITE_NULL ? + strdupz((char *)"-") : + strdupz((char *)format_value_and_unit( + old_value_string, 100, sqlite3_column_double(res, 24), (char *)sqlite3_column_text(res, 17), -1)); alarm_log.value = (calculated_number) sqlite3_column_double(res, 23); alarm_log.old_value = (calculated_number) sqlite3_column_double(res, 24); @@ -204,7 +238,6 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d alarm_log.updated = (sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0; alarm_log.rendered_info = strdupz((char *)sqlite3_column_text(res, 18)); - info("DEBUG: %s pushing alert seq %" PRIu64 " - %" PRIu64"", wc->uuid_str, (uint64_t) sqlite3_column_int64(res, 0), (uint64_t) sqlite3_column_int64(res, 1)); aclk_send_alarm_log_entry(&alarm_log); if (first_sequence_id == 0) @@ -214,12 +247,14 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d destroy_alarm_log_entry(&alarm_log); freez(edit_command); } - buffer_flush(sql); - buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=strftime('%%s') " - "WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";", - wc->uuid_str, first_sequence_id, last_sequence_id); - db_execute(buffer_tostring(sql)); + if (first_sequence_id) { + buffer_flush(sql); + buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=strftime('%%s') " + "WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";", + wc->uuid_str, first_sequence_id, last_sequence_id); + db_execute(buffer_tostring(sql)); + } rc = sqlite3_finalize(res); if (unlikely(rc != SQLITE_OK)) @@ -278,12 +313,8 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a sqlite3_stmt *res = NULL; //TODO: make this better: include info from health log too - buffer_sprintf(sql, "select aa.sequence_id, aa.date_created, \ - (select laa.sequence_id from aclk_alert_%s laa \ - order by laa.sequence_id desc limit 1), \ - (select laa.date_created from aclk_alert_%s laa \ - order by laa.sequence_id desc limit 1) \ - from aclk_alert_%s aa order by aa.sequence_id asc limit 1;", wc->uuid_str, wc->uuid_str, wc->uuid_str); + buffer_sprintf(sql, "SELECT MIN(sequence_id), MIN(date_created), MAX(sequence_id), MAX(date_created) " \ + "FROM aclk_alert_%s;", wc->uuid_str); rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); if (rc != SQLITE_OK) { @@ -318,7 +349,7 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a struct alarm_log_health alarm_log; alarm_log.claim_id = claim_id; - alarm_log.node_id = strdupz(wc->node_id); + alarm_log.node_id = wc->node_id; alarm_log.log_entries = log_entries; alarm_log.status = wc->alert_updates == 0 ? 2 : 1; @@ -330,7 +361,6 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a if (unlikely(rc != SQLITE_OK)) error_report("Failed to reset statement to get health log statistics from the database, rc = %d", rc); - freez((char *)alarm_log.node_id); freez(claim_id); buffer_free(sql); #endif @@ -359,6 +389,11 @@ void aclk_send_alarm_configuration(char *config_hash) return; } +#define SQL_SELECT_ALERT_CONFIG "SELECT alarm, template, on_key, class, type, component, os, hosts, plugin," \ + "module, charts, families, lookup, every, units, green, red, calc, warn, crit, to_key, exec, delay, repeat, info," \ + "options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after," \ + "p_db_lookup_before, p_update_every FROM alert_hash WHERE hash_id = @hash_id;" + int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) { UNUSED(wc); @@ -372,20 +407,16 @@ int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct sqlite3_stmt *res = NULL; char *config_hash = (char *) cmd.data_param; - BUFFER *sql = buffer_create(1024); - buffer_sprintf( - sql, - "SELECT alarm, template, on_key, class, type, component, os, hosts, plugin, module, charts, families, lookup, every, units, green, red, calc, warn, crit, to_key, exec, delay, repeat, info, options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after, p_db_lookup_before, p_update_every FROM alert_hash WHERE hash_id = @hash_id;"); - rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0); + rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_ALERT_CONFIG, -1, &res, 0);< |