summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2022-04-01 18:12:50 +0300
committerGitHub <noreply@github.com>2022-04-01 18:12:50 +0300
commite816ee49237bfbdb289d7b0bba2bf2a8b94b0a5e (patch)
treee87774a60d4c2a549fe9cf0dfeeaa4eaf6cf375f /database
parent7bfc543172115ee8c042f605c3af1432c43d1b07 (diff)
Fix issue with charts not properly synchronized with the cloud (#12451)
* Add function to check a specific chart * If a chart is not obsoleted, check if the liveness needs to be updated * Calculate liveness based on a (constant * update_every) for each dimension * Scan all dimensions when the retention message is constructed and update liveness if needed * If initial state, set to computed live * Set computed live state to dimension * Add a maximum dimension cleanup on startup to prevent message flood * Schedule chart updates if charts streaming is enabled * Adjust live state for dimension * The query executed will have a valid dimension uuid only if memory mode is dbengine
Diffstat (limited to 'database')
-rw-r--r--database/rrdhost.c4
-rw-r--r--database/rrdset.c4
-rw-r--r--database/sqlite/sqlite_aclk_chart.c119
-rw-r--r--database/sqlite/sqlite_aclk_chart.h9
4 files changed, 99 insertions, 37 deletions
diff --git a/database/rrdhost.c b/database/rrdhost.c
index 3c0d0d3a37..4bfb9519d8 100644
--- a/database/rrdhost.c
+++ b/database/rrdhost.c
@@ -1504,6 +1504,10 @@ restart_after_removal:
rrdset_free(st);
goto restart_after_removal;
}
+#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
+ else
+ sql_check_chart_liveness(st);
+#endif
}
}
diff --git a/database/rrdset.c b/database/rrdset.c
index 7a1ce74e1d..af951bd39d 100644
--- a/database/rrdset.c
+++ b/database/rrdset.c
@@ -1833,8 +1833,8 @@ after_second_database_work:
#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL)
if (likely(!st->state->is_ar_chart)) {
if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) {
- int live = ((mark - rd->last_collected_time.tv_sec) <
- MAX(RRDSET_MINIMUM_LIVE_MULTIPLIER * rd->update_every, rrdset_free_obsolete_time));
+ int live =
+ ((mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every);
if (unlikely(live != rd->state->aclk_live_status)) {
if (likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
if (likely(!queue_dimension_to_aclk(rd))) {
diff --git a/database/sqlite/sqlite_aclk_chart.c b/database/sqlite/sqlite_aclk_chart.c
index f715df5005..7afa1d451e 100644
--- a/database/sqlite/sqlite_aclk_chart.c
+++ b/database/sqlite/sqlite_aclk_chart.c
@@ -58,18 +58,15 @@ bind_fail:
return send_status;
}
-static int aclk_add_chart_payload(
- struct aclk_database_worker_config *wc,
- uuid_t *uuid,
- char *claim_id,
- ACLK_PAYLOAD_TYPE payload_type,
- void *payload,
- size_t payload_size)
+static int aclk_add_chart_payload(struct aclk_database_worker_config *wc, uuid_t *uuid, char *claim_id,
+ ACLK_PAYLOAD_TYPE payload_type, void *payload, size_t payload_size, int *send_status)
{
static __thread sqlite3_stmt *res_chart = NULL;
int rc;
rc = payload_sent(wc->uuid_str, uuid, payload, payload_size);
+ if (send_status)
+ *send_status = rc;
if (rc == 1)
return 0;
@@ -162,22 +159,16 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat
size_t size;
char *payload = generate_chart_instance_updated(&size, &chart_payload);
if (likely(payload))
- rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *)payload, size);
+ rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size, NULL);
freez(payload);
chart_instance_updated_destroy(&chart_payload);
}
return rc;
}
-static inline int aclk_upd_dimension_event(
- struct aclk_database_worker_config *wc,
- char *claim_id,
- uuid_t *dim_uuid,
- const char *dim_id,
- const char *dim_name,
- const char *chart_type_id,
- time_t first_time,
- time_t last_time)
+static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *wc, char *claim_id, uuid_t *dim_uuid,
+ const char *dim_id, const char *dim_name, const char *chart_type_id, time_t first_time, time_t last_time,
+ int *send_status)
{
int rc = 0;
size_t size;
@@ -190,13 +181,11 @@ static inline int aclk_upd_dimension_event(
#ifdef NETDATA_INTERNAL_CHECKS
if (!first_time)
- info(
- "Host %s (node %s) deleting dimension id=[%s] name=[%s] chart=[%s]",
- wc->host_guid,
- wc->node_id,
- dim_id,
- dim_name,
- chart_type_id);
+ info("Host %s (node %s) deleting dimension id=[%s] name=[%s] chart=[%s]",
+ wc->host_guid, wc->node_id, dim_id, dim_name, chart_type_id);
+ if (last_time)
+ info("Host %s (node %s) stopped collecting dimension id=[%s] name=[%s] chart=[%s] %ld seconds ago at %ld",
+ wc->host_guid, wc->node_id, dim_id, dim_name, chart_type_id, now_realtime_sec() - last_time, last_time);
#endif
dim_payload.node_id = wc->node_id;
@@ -208,7 +197,7 @@ static inline int aclk_upd_dimension_event(
dim_payload.last_timestamp.tv_sec = last_time;
char *payload = generate_chart_dimension_updated(&size, &dim_payload);
if (likely(payload))
- rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size);
+ rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size, send_status);
freez(payload);
return rc;
}
@@ -252,7 +241,7 @@ void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, str
unsigned count = 0;
while (sqlite3_step(res) == SQLITE_ROW) {
- (void)aclk_upd_dimension_event(
+ (void) aclk_upd_dimension_event(
wc,
claim_id,
(uuid_t *)sqlite3_column_text(res, 3),
@@ -260,7 +249,8 @@ void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, str
(const char *)sqlite3_column_text(res, 1),
(const char *)sqlite3_column_text(res, 2),
0,
- 0);
+ 0,
+ NULL);
count++;
}
@@ -289,12 +279,13 @@ int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk
RRDDIM *rd = cmd.data;
if (likely(claim_id)) {
+ int send_status = 0;
time_t now = now_realtime_sec();
time_t first_t = rd->state->query_ops.oldest_time(rd);
time_t last_t = rd->state->query_ops.latest_time(rd);
- int live = ((now - last_t) < MAX(RRDSET_MINIMUM_LIVE_MULTIPLIER * rd->update_every, rrdset_free_obsolete_time));
+ int live = ((now - last_t) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every));
rc = aclk_upd_dimension_event(
wc,
@@ -304,7 +295,11 @@ int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk
rd->name,
rd->rrdset->id,
first_t,
- live ? 0 : last_t);
+ live ? 0 : last_t,
+ &send_status);
+
+ if (!send_status)
+ rd->state->aclk_live_status = live;
freez(claim_id);
}
@@ -891,6 +886,8 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
time_t first_entry_t;
time_t last_entry_t;
uint32_t update_every = 0;
+ uint32_t dimension_update_count = 0;
+ int send_status;
struct retention_updated rotate_data;
@@ -906,7 +903,7 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
rotate_data.claim_id = claim_id;
rotate_data.node_id = strdupz(wc->node_id);
- // time_t now = now_realtime_sec();
+ time_t now = now_realtime_sec();
while (sqlite3_step(res) == SQLITE_ROW) {
if (!update_every || update_every != (uint32_t)sqlite3_column_int(res, 1)) {
if (update_every) {
@@ -944,6 +941,24 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
if (likely(!rc && first_entry_t))
start_time = MIN(start_time, first_entry_t);
+
+ if (memory_mode == RRD_MEMORY_MODE_DBENGINE && wc->chart_updates) {
+ int live = ((now - last_entry_t) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * update_every));
+ if ((!live || !first_entry_t) && (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP)) {
+ (void)aclk_upd_dimension_event(
+ wc,
+ claim_id,
+ (uuid_t *)sqlite3_column_blob(res, 0),
+ (const char *)(const char *)sqlite3_column_text(res, 3),
+ (const char *)(const char *)sqlite3_column_text(res, 4),
+ (const char *)(const char *)sqlite3_column_text(res, 2),
+ first_entry_t,
+ live ? 0 : last_entry_t,
+ &send_status);
+ if (!send_status)
+ dimension_update_count++;
+ }
+ }
}
if (update_every) {
debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time);
@@ -1053,8 +1068,7 @@ void aclk_send_dimension_update(RRDDIM *rd)
time_t last_entry_t = rrddim_last_entry_t(rd);
time_t now = now_realtime_sec();
- int live = ((now - rd->last_collected_time.tv_sec) <
- MAX(RRDSET_MINIMUM_LIVE_MULTIPLIER * rd->update_every, rrdset_free_obsolete_time));
+ int live = ((now - rd->last_collected_time.tv_sec) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every));
if (!live || rd->state->aclk_live_status != live || !first_entry_t) {
(void)aclk_upd_dimension_event(
@@ -1065,7 +1079,8 @@ void aclk_send_dimension_update(RRDDIM *rd)
rd->name,
rd->rrdset->id,
first_entry_t,
- live ? 0 : last_entry_t);
+ live ? 0 : last_entry_t,
+ NULL);
if (!first_entry_t)
debug(
@@ -1180,6 +1195,44 @@ struct aclk_chart_sync_stats *aclk_get_chart_sync_stats(RRDHOST *host)
buffer_free(sql);
return aclk_statistics;
}
+
+void sql_check_chart_liveness(RRDSET *st) {
+ RRDDIM *rd;
+
+ if (unlikely(st->state->is_ar_chart))
+ return;
+
+ rrdset_rdlock(st);
+ if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
+ if (likely(st->dimensions && st->counter_done && !queue_chart_to_aclk(st))) {
+ debug(D_ACLK_SYNC,"Check chart liveness [%s] submit chart definition", st->name);
+ rrdset_flag_set(st, RRDSET_FLAG_ACLK);
+ }
+ }
+ else
+ debug(D_ACLK_SYNC,"Check chart liveness [%s] chart definition already submitted", st->name);
+ time_t mark = now_realtime_sec();
+
+ debug(D_ACLK_SYNC,"Check chart liveness [%s] scanning dimensions", st->name);
+ rrddim_foreach_read(rd, st) {
+ if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) {
+ int live = (mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every;
+ if (unlikely(live != rd->state->aclk_live_status)) {
+ if (likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
+ if (likely(!queue_dimension_to_aclk(rd))) {
+ debug(D_ACLK_SYNC,"Dimension change [%s] on [%s] from live %d --> %d", rd->id, rd->rrdset->name, rd->state->aclk_live_status, live);
+ rd->state->aclk_live_status = live;
+ rrddim_flag_set(rd, RRDDIM_FLAG_ACLK);
+ }
+ }
+ }
+ else
+ debug(D_ACLK_SYNC,"Dimension check [%s] on [%s] liveness matches", rd->id, st->name);
+ }
+ }
+ rrdset_unlock(st);
+}
+
#endif //ENABLE_NEW_CLOUD_PROTOCOL
// ST is read locked
diff --git a/database/sqlite/sqlite_aclk_chart.h b/database/sqlite/sqlite_aclk_chart.h
index fee5ecca2f..1d25de24ed 100644
--- a/database/sqlite/sqlite_aclk_chart.h
+++ b/database/sqlite/sqlite_aclk_chart.h
@@ -12,8 +12,12 @@ typedef enum payload_type {
extern sqlite3 *db_meta;
-#ifndef RRDSET_MINIMUM_LIVE_MULTIPLIER
-#define RRDSET_MINIMUM_LIVE_MULTIPLIER (1.5)
+#ifndef RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER
+#define RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER (3)
+#endif
+
+#ifndef ACLK_MAX_DIMENSION_CLEANUP
+#define ACLK_MAX_DIMENSION_CLEANUP (500)
#endif
struct aclk_chart_sync_stats {
@@ -52,4 +56,5 @@ void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, str
uint32_t sql_get_pending_count(struct aclk_database_worker_config *wc);
void aclk_send_dimension_update(RRDDIM *rd);
struct aclk_chart_sync_stats *aclk_get_chart_sync_stats(RRDHOST *host);
+void sql_check_chart_liveness(RRDSET *st);
#endif //NETDATA_SQLITE_ACLK_CHART_H