summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2021-10-06 20:55:31 +0300
committerGitHub <noreply@github.com>2021-10-06 20:55:31 +0300
commit12f16063f51e51a8e3c0e0ae727a634258b95219 (patch)
tree24a7eae9d61f6f5009a9d6d1e1b5d7e2870d64ca /database
parentaf93cc31eda9a2b7058c4b02d8f984331e5f544b (diff)
Enable additional functionality for the new cloud architecture (#11579)
Diffstat (limited to 'database')
-rw-r--r--database/engine/rrdengine.c1
-rw-r--r--database/rrd.h5
-rw-r--r--database/rrddim.c17
-rw-r--r--database/rrdhost.c2
-rw-r--r--database/rrdset.c32
-rw-r--r--database/sqlite/sqlite_aclk.c144
-rw-r--r--database/sqlite/sqlite_aclk.h16
-rw-r--r--database/sqlite/sqlite_aclk_alert.c112
-rw-r--r--database/sqlite/sqlite_aclk_chart.c119
-rw-r--r--database/sqlite/sqlite_aclk_node.c160
-rw-r--r--database/sqlite/sqlite_aclk_node.h1
-rw-r--r--database/sqlite/sqlite_functions.c208
-rw-r--r--database/sqlite/sqlite_functions.h5
-rw-r--r--database/sqlite/sqlite_health.c15
14 files changed, 644 insertions, 193 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index 0c4a401cb4..f48181d29a 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -861,6 +861,7 @@ static void after_delete_old_data(struct rrdengine_worker_config* wc)
wc->now_deleting_files = NULL;
wc->cleanup_thread_deleting_files = 0;
+ aclk_data_rotated();
/* interrupt event loop */
uv_stop(wc->loop);
diff --git a/database/rrd.h b/database/rrd.h
index ba59eac16b..95c49eb21e 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -50,6 +50,7 @@ struct context_param {
uint8_t flags;
};
+#define RRDSET_MINIMUM_LIVE_COUNT 3
#define META_CHART_UPDATED 1
#define META_PLUGIN_UPDATED 2
#define META_MODULE_UPDATED 4
@@ -1359,4 +1360,8 @@ extern void set_host_properties(
#endif
#include "sqlite/sqlite_functions.h"
#include "sqlite/sqlite_aclk.h"
+#include "sqlite/sqlite_aclk_chart.h"
+#include "sqlite/sqlite_aclk_alert.h"
+#include "sqlite/sqlite_aclk_node.h"
+#include "sqlite/sqlite_health.h"
#endif /* NETDATA_RRD_H */
diff --git a/database/rrddim.c b/database/rrddim.c
index 66650ffbf2..51c4428c72 100644
--- a/database/rrddim.c
+++ b/database/rrddim.c
@@ -210,7 +210,7 @@ void rrdcalc_link_to_rrddim(RRDDIM *rd, RRDSET *st, RRDHOST *host) {
}
}
#ifdef ENABLE_ACLK
- rrdset_flag_set(st, RRDSET_FLAG_ACLK);
+ rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
#endif
}
@@ -387,6 +387,9 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
rd->last_collected_time.tv_usec = 0;
rd->rrdset = st;
rd->state = mallocz(sizeof(*rd->state));
+#ifdef ENABLE_ACLK
+ rd->state->aclk_live_status = -1;
+#endif
(void) find_dimension_uuid(st, rd, &(rd->state->metric_uuid));
if(memory_mode == RRD_MEMORY_MODE_DBENGINE) {
#ifdef ENABLE_DBENGINE
@@ -453,7 +456,7 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
rrdset_unlock(st);
#ifdef ENABLE_ACLK
- rrdset_flag_set(st, RRDSET_FLAG_ACLK);
+ rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
#endif
return(rd);
}
@@ -521,7 +524,7 @@ void rrddim_free_custom(RRDSET *st, RRDDIM *rd, int db_rotated)
}
#ifdef ENABLE_ACLK
if (db_rotated || RRD_MEMORY_MODE_DBENGINE != rrd_memory_mode)
- rrdset_flag_set(st, RRDSET_FLAG_ACLK);
+ rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
#endif
}
@@ -542,7 +545,7 @@ int rrddim_hide(RRDSET *st, const char *id) {
rrddim_flag_set(rd, RRDDIM_FLAG_HIDDEN);
#ifdef ENABLE_ACLK
- rrdset_flag_set(st, RRDSET_FLAG_ACLK);
+ rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
#endif
return 0;
}
@@ -559,7 +562,7 @@ int rrddim_unhide(RRDSET *st, const char *id) {
rrddim_flag_clear(rd, RRDDIM_FLAG_HIDDEN);
#ifdef ENABLE_ACLK
- rrdset_flag_set(st, RRDSET_FLAG_ACLK);
+ rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
#endif
return 0;
}
@@ -574,7 +577,7 @@ inline void rrddim_is_obsolete(RRDSET *st, RRDDIM *rd) {
rrddim_flag_set(rd, RRDDIM_FLAG_OBSOLETE);
rrdset_flag_set(st, RRDSET_FLAG_OBSOLETE_DIMENSIONS);
#ifdef ENABLE_ACLK
- rrdset_flag_set(st, RRDSET_FLAG_ACLK);
+ rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
#endif
}
@@ -583,7 +586,7 @@ inline void rrddim_isnot_obsolete(RRDSET *st __maybe_unused, RRDDIM *rd) {
rrddim_flag_clear(rd, RRDDIM_FLAG_OBSOLETE);
#ifdef ENABLE_ACLK
- rrdset_flag_set(st, RRDSET_FLAG_ACLK);
+ rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
#endif
}
diff --git a/database/rrdhost.c b/database/rrdhost.c
index 59e2c6d77c..f6b1834aa0 100644
--- a/database/rrdhost.c
+++ b/database/rrdhost.c
@@ -746,7 +746,9 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info) {
fatal("Failed to initialize dbengine");
}
#endif
+#ifdef ACLK_NEWARCH_DEVMODE
sql_aclk_sync_init();
+#endif
rrd_unlock();
web_client_api_v1_management_init();
diff --git a/database/rrdset.c b/database/rrdset.c
index 0af189f77c..95e7eb0c2b 100644
--- a/database/rrdset.c
+++ b/database/rrdset.c
@@ -649,7 +649,7 @@ RRDSET *rrdset_create_custom(
aclk_add_collector(host, st->plugin_name, st->module_name);
}
}
- rrdset_flag_set(st, RRDSET_FLAG_ACLK);
+ rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
}
#endif
freez(old_plugin);
@@ -935,12 +935,13 @@ RRDSET *rrdset_create_custom(
update_chart_metadata(st->chart_uuid, st, id, name);
store_active_chart(st->chart_uuid);
+ compute_chart_hash(st);
rrdhost_unlock(host);
#ifdef ENABLE_ACLK
if (netdata_cloud_setting)
aclk_add_collector(host, plugin, module);
- rrdset_flag_set(st, RRDSET_FLAG_ACLK);
+ rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
#endif
return(st);
}
@@ -1377,10 +1378,19 @@ void rrdset_done(RRDSET *st) {
rrdset_rdlock(st);
#ifdef ENABLE_ACLK
- if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
- rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
+ #ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
+ if (st->counter_done >= RRDSET_MINIMUM_LIVE_COUNT) {
+ if (likely(!sql_queue_chart_to_aclk(st)))
+ rrdset_flag_set(st, RRDSET_FLAG_ACLK);
+ }
+ }
+ #else
+ if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
+ rrdset_flag_set(st, RRDSET_FLAG_ACLK);
aclk_update_chart(st->rrdhost, st->id, 1);
}
+ #endif
#endif
if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE))) {
@@ -1783,9 +1793,23 @@ after_first_database_work:
after_second_database_work:
st->last_collected_total = st->collected_total;
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ time_t mark = now_realtime_sec();
+#endif
rrddim_foreach_read(rd, st) {
if (rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED))
continue;
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ int live = ((mark - rd->last_collected_time.tv_sec) < (RRDSET_MINIMUM_LIVE_COUNT * rd->update_every));
+ if (unlikely(live != rd->state->aclk_live_status)) {
+ if (likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
+ if (likely(!sql_queue_dimension_to_aclk(rd))) {
+ rd->state->aclk_live_status = live;
+ }
+ }
+ }
+#endif
if(unlikely(!rd->updated))
continue;
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c
index db4b4008e6..78c42678ec 100644
--- a/database/sqlite/sqlite_aclk.c
+++ b/database/sqlite/sqlite_aclk.c
@@ -3,17 +3,17 @@
#include "sqlite_functions.h"
#include "sqlite_aclk.h"
-// TODO: To be added
#include "sqlite_aclk_chart.h"
-//#include "sqlite_aclk_alert.h"
#include "sqlite_aclk_node.h"
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+#include "../../aclk/aclk.h"
+#endif
+
const char *aclk_sync_config[] = {
NULL,
};
-int aclk_architecture = 0;
-
uv_mutex_t aclk_async_lock;
struct aclk_database_worker_config *aclk_thread_head = NULL;
@@ -78,24 +78,6 @@ void aclk_database_init_cmd_queue(struct aclk_database_worker_config *wc)
fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
}
-void aclk_database_enq_cmd_nowake(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd)
-{
- unsigned queue_size;
-
- /* wait for free space in queue */
- uv_mutex_lock(&wc->cmd_mutex);
- while ((queue_size = wc->queue_size) == ACLK_DATABASE_CMD_Q_MAX_SIZE) {
- uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex);
- }
- fatal_assert(queue_size < ACLK_DATABASE_CMD_Q_MAX_SIZE);
- /* enqueue command */
- wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
- wc->cmd_queue.tail = wc->cmd_queue.tail != ACLK_DATABASE_CMD_Q_MAX_SIZE - 1 ?
- wc->cmd_queue.tail + 1 : 0;
- wc->queue_size = queue_size + 1;
- uv_mutex_unlock(&wc->cmd_mutex);
-}
-
int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd)
{
unsigned queue_size;
@@ -207,7 +189,7 @@ int aclk_start_sync_thread(void *data, int argc, char **argv, char **column)
void sql_aclk_sync_init(void)
{
-#ifdef ACLK_NEWARCH_DEVMODE
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
char *err_msg = NULL;
int rc;
@@ -251,35 +233,49 @@ static void async_cb(uv_async_t *handle)
static void timer_cb(uv_timer_t* handle)
{
- struct aclk_database_worker_config *wc = handle->data;
uv_stop(handle->loop);
uv_update_time(handle->loop);
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ struct aclk_database_worker_config *wc = handle->data;
struct aclk_database_cmd cmd;
memset(&cmd, 0, sizeof(cmd));
cmd.opcode = ACLK_DATABASE_TIMER;
aclk_database_enq_cmd_noblock(wc, &cmd);
- if (wc->cleanup_after && wc->cleanup_after < now_realtime_sec()) {
+ time_t now = now_realtime_sec();
+
+ if (wc->cleanup_after && wc->cleanup_after < now) {
cmd.opcode = ACLK_DATABASE_CLEANUP;
if (!aclk_database_enq_cmd_noblock(wc, &cmd))
wc->cleanup_after += ACLK_DATABASE_CLEANUP_INTERVAL;
}
- if (wc->chart_updates && !wc->chart_pending) {
- cmd.opcode = ACLK_DATABASE_PUSH_CHART;
- cmd.count = ACLK_MAX_CHART_BATCH;
- cmd.completion = NULL;
- cmd.param1 = ACLK_MAX_CHART_BATCH_COUNT;
- if (!aclk_database_enq_cmd_noblock(wc, &cmd))
- wc->chart_pending = 1;
- }
+ if (aclk_use_new_cloud_arch && aclk_connected) {
+ if (wc->rotation_after && wc->rotation_after < now) {
+ cmd.opcode = ACLK_DATABASE_NODE_INFO;
+ aclk_database_enq_cmd_noblock(wc, &cmd);
- if (wc->alert_updates) {
- cmd.opcode = ACLK_DATABASE_PUSH_ALERT;
- cmd.count = ACLK_MAX_ALERT_UPDATES;
- aclk_database_enq_cmd_noblock(wc, &cmd);
+ cmd.opcode = ACLK_DATABASE_UPD_RETENTION;
+ if (!aclk_database_enq_cmd_noblock(wc, &cmd))
+ wc->rotation_after += ACLK_DATABASE_ROTATION_INTERVAL;
+ }
+
+ if (wc->chart_updates && !wc->chart_pending) {
+ cmd.opcode = ACLK_DATABASE_PUSH_CHART;
+ cmd.count = ACLK_MAX_CHART_BATCH;
+ cmd.param1 = ACLK_MAX_CHART_BATCH_COUNT;
+ if (!aclk_database_enq_cmd_noblock(wc, &cmd))
+ wc->chart_pending = 1;
+ }
+
+ if (wc->alert_updates) {
+ cmd.opcode = ACLK_DATABASE_PUSH_ALERT;
+ cmd.count = ACLK_MAX_ALERT_UPDATES;
+ aclk_database_enq_cmd_noblock(wc, &cmd);
+ }
}
+#endif
}
#define MAX_CMD_BATCH_SIZE (256)
@@ -334,8 +330,14 @@ void aclk_database_worker(void *arg)
wc->node_info_send = (wc->host && !localhost);
aclk_add_worker_thread(wc);
info("Starting ACLK sync thread for host %s -- scratch area %lu bytes", wc->host_guid, sizeof(*wc));
-// TODO: To be added
-// sql_get_last_chart_sequence(wc, cmd);
+
+ memset(&cmd, 0, sizeof(cmd));
+ sql_get_last_chart_sequence(wc, cmd);
+ wc->chart_updates = 0;
+ wc->alert_updates = 0;
+ wc->startup_time = now_realtime_sec();
+ wc->cleanup_after = wc->startup_time + ACLK_DATABASE_CLEANUP_FIRST;
+ wc->rotation_after = wc->startup_time + ACLK_DATABASE_ROTATION_DELAY;
while (likely(shutdown == 0)) {
uv_run(loop, UV_RUN_DEFAULT);
@@ -345,11 +347,8 @@ void aclk_database_worker(void *arg)
/* wait for commands */
cmd_batch_size = 0;
do {
- if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE)) {
- info("DEBUG: %s Processed %u commands, current queue about %u",
- wc->uuid_str, cmd_batch_size, wc->queue_size);
+ if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE))
break;
- }
cmd = aclk_database_deq_cmd(wc);
opcode = cmd.opcode;
++cmd_batch_size;
@@ -365,14 +364,6 @@ void aclk_database_worker(void *arg)
if (wc->host == localhost)
sql_check_aclk_table_list(wc);
break;
- case ACLK_DATABASE_CHECK:
- debug(D_ACLK_SYNC, "Checking database dimensions for %s", wc->host_guid);
-// sql_check_dimension_state(wc, cmd);
- break;
- case ACLK_DATABASE_CHECK_ROTATION:
- debug(D_ACLK_SYNC, "Checking database for rotation %s", wc->host_guid);
-// sql_check_rotation_state(wc, cmd);
- break;
case ACLK_DATABASE_DELETE_HOST:
debug(D_ACLK_SYNC,"Cleaning ACLK tables for %s", (char *) cmd.data);
sql_delete_aclk_table_list(wc, cmd);
@@ -407,19 +398,19 @@ void aclk_database_worker(void *arg)
// ALERTS
case ACLK_DATABASE_ADD_ALERT:
debug(D_ACLK_SYNC,"Adding alert event for %s", wc->host_guid);
-// aclk_add_alert_event(wc, cmd);
+ aclk_add_alert_event(wc, cmd);
break;
case ACLK_DATABASE_PUSH_ALERT_CONFIG:
debug(D_ACLK_SYNC,"Pushing chart config info to the cloud for %s", wc->host_guid);
-// aclk_push_alert_config_event(wc, cmd);
+ aclk_push_alert_config_event(wc, cmd);
break;
case ACLK_DATABASE_PUSH_ALERT:
debug(D_ACLK_SYNC, "Pushing alert info to the cloud for %s", wc->host_guid);
-// aclk_push_alert_event(wc, cmd);
+ aclk_push_alert_event(wc, cmd);
break;
case ACLK_DATABASE_ALARM_HEALTH_LOG:
debug(D_ACLK_SYNC, "Pushing alarm health log to the cloud for %s", wc->host_guid);
-// aclk_push_alarm_health_log(wc, cmd);
+ aclk_push_alarm_health_log(wc, cmd);
break;
// NODE OPERATIONS
@@ -427,8 +418,9 @@ void aclk_database_worker(void *arg)
debug(D_ACLK_SYNC,"Sending node info for %s", wc->uuid_str);
sql_build_node_info(wc, cmd);
break;
- case ACLK_DATABASE_UPD_STATS:
-// sql_update_metric_statistics(wc, cmd);
+ case ACLK_DATABASE_UPD_RETENTION:
+ debug(D_ACLK_SYNC,"Sending retention info for %s", wc->uuid_str);
+ aclk_update_retention(wc, cmd);
break;
// NODE_INSTANCE DETECTION
@@ -446,7 +438,7 @@ void aclk_database_worker(void *arg)
}
}
}
- if (wc->node_info_send && wc->host && localhost && claimed()) {
+ if (wc->node_info_send && wc->host && localhost && claimed() && aclk_connected) {
cmd.opcode = ACLK_DATABASE_NODE_INFO;
cmd.completion = NULL;
wc->node_info_send = aclk_database_enq_cmd_noblock(wc, &cmd);
@@ -471,7 +463,7 @@ void aclk_database_worker(void *arg)
/*
* uv_async_send after uv_close does not seem to crash in linux at the moment,
- * it is however undocumented behaviour and we need to be aware if this becomes
+ * it is however undocumented behaviour we need to be aware if this becomes
* an issue in the future.
*/
uv_close((uv_handle_t *)&wc->async, NULL);
@@ -509,13 +501,9 @@ error_after_loop_init:
// -------------------------------------------------------------
-void aclk_set_architecture(int mode)
-{
- aclk_architecture = mode;
-}
-
void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id)
{
+#ifdef ENABLE_ACLK
char uuid_str[GUID_LEN + 1];
char host_guid[GUID_LEN + 1];
@@ -568,15 +556,17 @@ void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id)
if (likely(host))
host->dbsync_worker = (void *) wc;
wc->host = host;
- wc->chart_updates = 0;
- wc->alert_updates = 0;
- wc->startup_time = now_realtime_sec();
- wc->cleanup_after = wc->startup_time + ACLK_DATABASE_CLEANUP_FIRST;
strcpy(wc->uuid_str, uuid_str);
strcpy(wc->host_guid, host_guid);
if (node_id && !uuid_is_null(*node_id))
uuid_unparse_lower(*node_id, wc->node_id);
fatal_assert(0 == uv_thread_create(&(wc->thread), aclk_database_worker, wc));
+#else
+ UNUSED(host);
+ UNUSED(host_uuid);
+ UNUSED(node_id);
+#endif
+ return;
}
void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
@@ -722,19 +712,20 @@ void sql_check_aclk_table_list(struct aclk_database_worker_config *wc)
return;
}
-void aclk_data_rotated(RRDHOST *host)
+void aclk_data_rotated(void)
{
- UNUSED(host);
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- debug(D_ACLK_SYNC,"Processing data base rotation event");
- struct aclk_database_cmd cmd;
- memset(&cmd, 0, sizeof(cmd));
- cmd.opcode = ACLK_DATABASE_UPD_STATS;
+ if (!aclk_use_new_cloud_arch || !aclk_connected)
+ return;
+ time_t next_rotation_time = now_realtime_sec()+ACLK_DATABASE_ROTATION_DELAY;
rrd_wrlock();
RRDHOST *this_host = localhost;
while (this_host) {
- aclk_database_enq_cmd((struct aclk_database_worker_config *)this_host->dbsync_worker, &cmd);
+ struct aclk_database_worker_config *wc = this_host->dbsync_worker;
+ if (wc)
+ wc->rotation_after = next_rotation_time;
this_host = this_host->next;
}
rrd_unlock();
@@ -743,9 +734,10 @@ void aclk_data_rotated(RRDHOST *host)
uv_mutex_lock(&aclk_async_lock);
while (tmp) {
- aclk_database_enq_cmd(tmp, &cmd);
+ tmp->rotation_after = next_rotation_time;
tmp = tmp->next;
}
uv_mutex_unlock(&aclk_async_lock);
+#endif
return;
}
diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h
index 02a4322ed0..91ed065419 100644
--- a/database/sqlite/sqlite_aclk.h
+++ b/database/sqlite/sqlite_aclk.h
@@ -15,9 +15,10 @@
#define ACLK_MAX_CHART_BATCH_COUNT (10)
#endif
#define ACLK_MAX_ALERT_UPDATES (5)
-#define ACLK_SYNC_RETRY_COUNT "10"
#define ACLK_DATABASE_CLEANUP_FIRST (60)
+#define ACLK_DATABASE_ROTATION_DELAY (60)
#define ACLK_DATABASE_CLEANUP_INTERVAL (3600)
+#define ACLK_DATABASE_ROTATION_INTERVAL (3600)
#define ACLK_DELETE_ACK_INTERNAL (600)
#define ACLK_SYNC_QUERY_SIZE 512
@@ -60,8 +61,6 @@ static inline void aclk_complete(struct aclk_completion *p)
extern uv_mutex_t aclk_async_lock;
-extern int aclk_architecture;
-
static inline void uuid_unparse_lower_fix(uuid_t *uuid, char *out)
{
uuid_unparse_lower(*uuid, out);
@@ -120,8 +119,6 @@ enum aclk_database_opcode {
ACLK_DATABASE_ADD_DIMENSION,
ACLK_DATABASE_ALARM_HEALTH_LOG,
ACLK_DATABASE_CHART_ACK,
- ACLK_DATABASE_CHECK,
- ACLK_DATABASE_CHECK_ROTATION,
ACLK_DATABASE_CLEANUP,
ACLK_DATABASE_DELETE_HOST,
ACLK_DATABASE_NODE_INFO,
@@ -130,11 +127,9 @@ enum aclk_database_opcode {
ACLK_DATABASE_PUSH_CHART,
ACLK_DATABASE_PUSH_CHART_CONFIG,
ACLK_DATABASE_RESET_CHART,
- ACLK_DATABASE_RESET_NODE,
ACLK_DATABASE_SHUTDOWN,
ACLK_DATABASE_TIMER,
- ACLK_DATABASE_UPD_STATS,
- ACLK_DATABASE_MAX_OPCODE
+ ACLK_DATABASE_UPD_RETENTION
};
struct aclk_chart_payload_t {
@@ -170,6 +165,7 @@ struct aclk_database_worker_config {
time_t chart_timestamp; // last chart timestamp
time_t cleanup_after; // Start a cleanup after this timestamp
time_t startup_time; // When the sync thread started
+ time_t rotation_after;
uint64_t batch_id; // batch id to use
uint64_t alerts_batch_id; // batch id for alerts to use
uint64_t alerts_start_seq_id; // cloud has asked to start streaming from
@@ -215,13 +211,11 @@ extern sqlite3 *db_meta;
extern int aclk_database_enq_cmd_noblock(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd);
extern void aclk_database_enq_cmd(struct aclk_database_worker_config *wc, struct aclk_database_cmd *cmd);
-extern void aclk_set_architecture(int mode);
extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id);
int aclk_worker_enq_cmd(char *node_id, struct aclk_database_cmd *cmd);
-void aclk_data_rotated(RRDHOST *host);
+void aclk_data_rotated(void);
void sql_aclk_sync_init(void);
void sql_check_aclk_table_list(struct aclk_database_worker_config *wc);
void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
-void sql_drop_host_aclk_table_list(uuid_t *host_uuid);
void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
#endif //NETDATA_SQLITE_ACLK_H
diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c
index ce957c7823..846d9bdf2d 100644
--- a/database/sqlite/sqlite_aclk_alert.c
+++ b/database/sqlite/sqlite_aclk_alert.c
@@ -3,7 +3,10 @@
#include "sqlite_functions.h"
#include "sqlite_aclk_alert.h"
+#ifdef ENABLE_ACLK
#include "../../aclk/aclk_alarm_api.h"
+#include "../../aclk/aclk.h"
+#endif
// will replace call to aclk_update_alarm in health/health_log.c
// and handle both cases
@@ -11,8 +14,14 @@ void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae)
{
//check aclk architecture and handle old json alarm update to cloud
//include also the valid statuses for this case
- /* if (!aclk_architecture)
- aclk_update_alarm(host, ae); */
+#ifdef ENABLE_ACLK
+ if (!aclk_use_new_cloud_arch) {
+ if ((ae->new_status == RRDCALC_STATUS_WARNING || ae->new_status == RRDCALC_STATUS_CRITICAL) ||
+ ((ae->old_status == RRDCALC_STATUS_WARNING || ae->old_status == RRDCALC_STATUS_CRITICAL))) {
+ aclk_update_alarm(host, ae);
+ }
+ return;
+ }
if (ae->flags & HEALTH_ENTRY_FLAG_ACLK_QUEUED)
return;
@@ -33,6 +42,10 @@ void sql_queue_alarm_to_aclk(RRDHOST *host, ALARM_ENTRY *ae)
cmd.completion = NULL;
aclk_database_enq_cmd((struct aclk_database_worker_config *) host->dbsync_worker, &cmd);
ae->flags |= HEALTH_ENTRY_FLAG_ACLK_QUEUED;
+#else
+ UNUSED(host);
+ UNUSED(ae);
+#endif
return;
}
@@ -79,6 +92,7 @@ bind_fail:
int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status)
{
+#ifdef ENABLE_ACLK
switch(status) {
case RRDCALC_STATUS_REMOVED:
return ALARM_STATUS_REMOVED;
@@ -98,6 +112,10 @@ int rrdcalc_status_to_proto_enum(RRDCALC_STATUS status)
default:
return ALARM_STATUS_UNKNOWN;
}
+#else
+ UNUSED(status);
+ return 1;
+#endif
}
void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
@@ -122,8 +140,8 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
if (wc->alerts_start_seq_id != 0) {
buffer_sprintf(
sql,
- "UPDATE aclk_alert_%s SET date_submitted = NULL, date_cloud_ack = NULL WHERE sequence_id >= %" PRIu64
- "; UPDATE aclk_alert_%s SET date_cloud_ack = strftime('%%s','now') WHERE sequence_id < %" PRIu64
+ "UPDATE aclk_alert_%s SET date_submitted = NULL, date_cloud_ack = NULL WHERE sequence_id >= %"PRIu64
+ "; UPDATE aclk_alert_%s SET date_cloud_ack = strftime('%%s','now') WHERE sequence_id < %"PRIu64
" and date_cloud_ack is null",
wc->uuid_str,
wc->alerts_start_seq_id,
@@ -163,7 +181,7 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
char old_value_string[100 + 1];
char new_value_string[100 + 1];
- alarm_log.node_id = strdupz(wc->node_id);
+ alarm_log.node_id = wc->node_id;
alarm_log.claim_id = claim_id;
alarm_log.chart = strdupz((char *)sqlite3_column_text(res, 12));
@@ -179,10 +197,13 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
alarm_log.utc_offset = wc->host->utc_offset;
alarm_log.timezone = strdupz((char *)wc->host->abbrev_timezone);
- alarm_log.exec_path = sqlite3_column_bytes(res, 14) > 0 ? strdupz((char *)sqlite3_column_text(res, 14)) : strdupz((char *)wc->host->health_default_exec);
+ alarm_log.exec_path = sqlite3_column_bytes(res, 14) > 0 ? strdupz((char *)sqlite3_column_text(res, 14)) :
+ strdupz((char *)wc->host->health_default_exec);
alarm_log.conf_source = strdupz((char *)sqlite3_column_text(res, 16));
- char *edit_command = sqlite3_column_bytes(res, 16) > 0 ? health_edit_command_from_source((char *)sqlite3_column_text(res, 16)) : strdupz("UNKNOWN=0");
+ char *edit_command = sqlite3_column_bytes(res, 16) > 0 ?
+ health_edit_command_from_source((char *)sqlite3_column_text(res, 16)) :
+ strdupz("UNKNOWN=0");
alarm_log.command = strdupz(edit_command);
alarm_log.duration = (time_t) sqlite3_column_int64(res, 6);
@@ -193,10 +214,23 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
alarm_log.delay_up_to_timestamp = (time_t) sqlite3_column_int64(res, 10);
alarm_log.last_repeat = (time_t) sqlite3_column_int64(res, 25);
- alarm_log.silenced = ( (sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_SILENCED) || ( sqlite3_column_type(res, 15) != SQLITE_NULL && !strncmp((char *)sqlite3_column_text(res,15), "silent", 6)) ) ? 1 : 0;
+ alarm_log.silenced = ((sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_SILENCED) ||
+ (sqlite3_column_type(res, 15) != SQLITE_NULL &&
+ !strncmp((char *)sqlite3_column_text(res, 15), "silent", 6))) ?
+ 1 :
+ 0;
+
+ alarm_log.value_string =
+ sqlite3_column_type(res, 23) == SQLITE_NULL ?
+ strdupz((char *)"-") :
+ strdupz((char *)format_value_and_unit(
+ new_value_string, 100, sqlite3_column_double(res, 23), (char *)sqlite3_column_text(res, 17), -1));
- alarm_log.value_string = sqlite3_column_type(res, 23) == SQLITE_NULL ? strdupz((char *)"-") : strdupz((char *)format_value_and_unit(new_value_string, 100, sqlite3_column_double(res, 23), (char *) sqlite3_column_text(res, 17), -1));
- alarm_log.old_value_string = sqlite3_column_type(res, 24) == SQLITE_NULL ? strdupz((char *)"-") : strdupz((char *)format_value_and_unit(old_value_string, 100, sqlite3_column_double(res, 24), (char *) sqlite3_column_text(res, 17), -1));
+ alarm_log.old_value_string =
+ sqlite3_column_type(res, 24) == SQLITE_NULL ?
+ strdupz((char *)"-") :
+ strdupz((char *)format_value_and_unit(
+ old_value_string, 100, sqlite3_column_double(res, 24), (char *)sqlite3_column_text(res, 17), -1));
alarm_log.value = (calculated_number) sqlite3_column_double(res, 23);
alarm_log.old_value = (calculated_number) sqlite3_column_double(res, 24);
@@ -204,7 +238,6 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
alarm_log.updated = (sqlite3_column_int64(res, 8) & HEALTH_ENTRY_FLAG_UPDATED) ? 1 : 0;
alarm_log.rendered_info = strdupz((char *)sqlite3_column_text(res, 18));
- info("DEBUG: %s pushing alert seq %" PRIu64 " - %" PRIu64"", wc->uuid_str, (uint64_t) sqlite3_column_int64(res, 0), (uint64_t) sqlite3_column_int64(res, 1));
aclk_send_alarm_log_entry(&alarm_log);
if (first_sequence_id == 0)
@@ -214,12 +247,14 @@ void aclk_push_alert_event(struct aclk_database_worker_config *wc, struct aclk_d
destroy_alarm_log_entry(&alarm_log);
freez(edit_command);
}
- buffer_flush(sql);
- buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=strftime('%%s') "
- "WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";",
- wc->uuid_str, first_sequence_id, last_sequence_id);
- db_execute(buffer_tostring(sql));
+ if (first_sequence_id) {
+ buffer_flush(sql);
+ buffer_sprintf(sql, "UPDATE aclk_alert_%s SET date_submitted=strftime('%%s') "
+ "WHERE date_submitted IS NULL AND sequence_id BETWEEN %" PRIu64 " AND %" PRIu64 ";",
+ wc->uuid_str, first_sequence_id, last_sequence_id);
+ db_execute(buffer_tostring(sql));
+ }
rc = sqlite3_finalize(res);
if (unlikely(rc != SQLITE_OK))
@@ -278,12 +313,8 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a
sqlite3_stmt *res = NULL;
//TODO: make this better: include info from health log too
- buffer_sprintf(sql, "select aa.sequence_id, aa.date_created, \
- (select laa.sequence_id from aclk_alert_%s laa \
- order by laa.sequence_id desc limit 1), \
- (select laa.date_created from aclk_alert_%s laa \
- order by laa.sequence_id desc limit 1) \
- from aclk_alert_%s aa order by aa.sequence_id asc limit 1;", wc->uuid_str, wc->uuid_str, wc->uuid_str);
+ buffer_sprintf(sql, "SELECT MIN(sequence_id), MIN(date_created), MAX(sequence_id), MAX(date_created) " \
+ "FROM aclk_alert_%s;", wc->uuid_str);
rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
if (rc != SQLITE_OK) {
@@ -318,7 +349,7 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a
struct alarm_log_health alarm_log;
alarm_log.claim_id = claim_id;
- alarm_log.node_id = strdupz(wc->node_id);
+ alarm_log.node_id = wc->node_id;
alarm_log.log_entries = log_entries;
alarm_log.status = wc->alert_updates == 0 ? 2 : 1;
@@ -330,7 +361,6 @@ void aclk_push_alarm_health_log(struct aclk_database_worker_config *wc, struct a
if (unlikely(rc != SQLITE_OK))
error_report("Failed to reset statement to get health log statistics from the database, rc = %d", rc);
- freez((char *)alarm_log.node_id);
freez(claim_id);
buffer_free(sql);
#endif
@@ -359,6 +389,11 @@ void aclk_send_alarm_configuration(char *config_hash)
return;
}
+#define SQL_SELECT_ALERT_CONFIG "SELECT alarm, template, on_key, class, type, component, os, hosts, plugin," \
+ "module, charts, families, lookup, every, units, green, red, calc, warn, crit, to_key, exec, delay, repeat, info," \
+ "options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after," \
+ "p_db_lookup_before, p_update_every FROM alert_hash WHERE hash_id = @hash_id;"
+
int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
{
UNUSED(wc);
@@ -372,20 +407,16 @@ int aclk_push_alert_config_event(struct aclk_database_worker_config *wc, struct
sqlite3_stmt *res = NULL;
char *config_hash = (char *) cmd.data_param;
- BUFFER *sql = buffer_create(1024);
- buffer_sprintf(
- sql,
- "SELECT alarm, template, on_key, class, type, component, os, hosts, plugin, module, charts, families, lookup, every, units, green, red, calc, warn, crit, to_key, exec, delay, repeat, info, options, host_labels, p_db_lookup_dimensions, p_db_lookup_method, p_db_lookup_options, p_db_lookup_after, p_db_lookup_before, p_update_every FROM alert_hash WHERE hash_id = @hash_id;");
- rc = sqlite3_prepare_v2(db_meta, buffer_tostring(sql), -1, &res, 0);
+ rc = sqlite3_prepare_v2(db_meta, SQL_SELECT_ALERT_CONFIG, -1, &res, 0);<