From e816ee49237bfbdb289d7b0bba2bf2a8b94b0a5e Mon Sep 17 00:00:00 2001 From: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Date: Fri, 1 Apr 2022 18:12:50 +0300 Subject: Fix issue with charts not properly synchronized with the cloud (#12451) * Add function to check a specific chart * If a chart is not obsoleted, check if the liveness needs to be updated * Calculate liveness based on a (constant * update_every) for each dimension * Scan all dimensions when the retention message is constructed and update liveness if needed * If initial state, set to computed live * Set computed live state to dimension * Add a maximum dimension cleanup on startup to prevent message flood * Schedule chart updates if charts streaming is enabled * Adjust live state for dimension * The query executed will have a valid dimension uuid only if memory mode is dbengine --- database/rrdhost.c | 4 ++ database/rrdset.c | 4 +- database/sqlite/sqlite_aclk_chart.c | 119 ++++++++++++++++++++++++++---------- database/sqlite/sqlite_aclk_chart.h | 9 ++- 4 files changed, 99 insertions(+), 37 deletions(-) (limited to 'database') diff --git a/database/rrdhost.c b/database/rrdhost.c index 3c0d0d3a37..4bfb9519d8 100644 --- a/database/rrdhost.c +++ b/database/rrdhost.c @@ -1504,6 +1504,10 @@ restart_after_removal: rrdset_free(st); goto restart_after_removal; } +#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL) + else + sql_check_chart_liveness(st); +#endif } } diff --git a/database/rrdset.c b/database/rrdset.c index 7a1ce74e1d..af951bd39d 100644 --- a/database/rrdset.c +++ b/database/rrdset.c @@ -1833,8 +1833,8 @@ after_second_database_work: #if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL) if (likely(!st->state->is_ar_chart)) { if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) { - int live = ((mark - rd->last_collected_time.tv_sec) < - MAX(RRDSET_MINIMUM_LIVE_MULTIPLIER * rd->update_every, rrdset_free_obsolete_time)); + int live = + ((mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every); if (unlikely(live != rd->state->aclk_live_status)) { if (likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { if (likely(!queue_dimension_to_aclk(rd))) { diff --git a/database/sqlite/sqlite_aclk_chart.c b/database/sqlite/sqlite_aclk_chart.c index f715df5005..7afa1d451e 100644 --- a/database/sqlite/sqlite_aclk_chart.c +++ b/database/sqlite/sqlite_aclk_chart.c @@ -58,18 +58,15 @@ bind_fail: return send_status; } -static int aclk_add_chart_payload( - struct aclk_database_worker_config *wc, - uuid_t *uuid, - char *claim_id, - ACLK_PAYLOAD_TYPE payload_type, - void *payload, - size_t payload_size) +static int aclk_add_chart_payload(struct aclk_database_worker_config *wc, uuid_t *uuid, char *claim_id, + ACLK_PAYLOAD_TYPE payload_type, void *payload, size_t payload_size, int *send_status) { static __thread sqlite3_stmt *res_chart = NULL; int rc; rc = payload_sent(wc->uuid_str, uuid, payload, payload_size); + if (send_status) + *send_status = rc; if (rc == 1) return 0; @@ -162,22 +159,16 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat size_t size; char *payload = generate_chart_instance_updated(&size, &chart_payload); if (likely(payload)) - rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *)payload, size); + rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size, NULL); freez(payload); chart_instance_updated_destroy(&chart_payload); } return rc; } -static inline int aclk_upd_dimension_event( - struct aclk_database_worker_config *wc, - char *claim_id, - uuid_t *dim_uuid, - const char *dim_id, - const char *dim_name, - const char *chart_type_id, - time_t first_time, - time_t last_time) +static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *wc, char *claim_id, uuid_t *dim_uuid, + const char *dim_id, const char *dim_name, const char *chart_type_id, time_t first_time, time_t last_time, + int *send_status) { int rc = 0; size_t size; @@ -190,13 +181,11 @@ static inline int aclk_upd_dimension_event( #ifdef NETDATA_INTERNAL_CHECKS if (!first_time) - info( - "Host %s (node %s) deleting dimension id=[%s] name=[%s] chart=[%s]", - wc->host_guid, - wc->node_id, - dim_id, - dim_name, - chart_type_id); + info("Host %s (node %s) deleting dimension id=[%s] name=[%s] chart=[%s]", + wc->host_guid, wc->node_id, dim_id, dim_name, chart_type_id); + if (last_time) + info("Host %s (node %s) stopped collecting dimension id=[%s] name=[%s] chart=[%s] %ld seconds ago at %ld", + wc->host_guid, wc->node_id, dim_id, dim_name, chart_type_id, now_realtime_sec() - last_time, last_time); #endif dim_payload.node_id = wc->node_id; @@ -208,7 +197,7 @@ static inline int aclk_upd_dimension_event( dim_payload.last_timestamp.tv_sec = last_time; char *payload = generate_chart_dimension_updated(&size, &dim_payload); if (likely(payload)) - rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size); + rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size, send_status); freez(payload); return rc; } @@ -252,7 +241,7 @@ void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, str unsigned count = 0; while (sqlite3_step(res) == SQLITE_ROW) { - (void)aclk_upd_dimension_event( + (void) aclk_upd_dimension_event( wc, claim_id, (uuid_t *)sqlite3_column_text(res, 3), @@ -260,7 +249,8 @@ void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, str (const char *)sqlite3_column_text(res, 1), (const char *)sqlite3_column_text(res, 2), 0, - 0); + 0, + NULL); count++; } @@ -289,12 +279,13 @@ int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk RRDDIM *rd = cmd.data; if (likely(claim_id)) { + int send_status = 0; time_t now = now_realtime_sec(); time_t first_t = rd->state->query_ops.oldest_time(rd); time_t last_t = rd->state->query_ops.latest_time(rd); - int live = ((now - last_t) < MAX(RRDSET_MINIMUM_LIVE_MULTIPLIER * rd->update_every, rrdset_free_obsolete_time)); + int live = ((now - last_t) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every)); rc = aclk_upd_dimension_event( wc, @@ -304,7 +295,11 @@ int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk rd->name, rd->rrdset->id, first_t, - live ? 0 : last_t); + live ? 0 : last_t, + &send_status); + + if (!send_status) + rd->state->aclk_live_status = live; freez(claim_id); } @@ -891,6 +886,8 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d time_t first_entry_t; time_t last_entry_t; uint32_t update_every = 0; + uint32_t dimension_update_count = 0; + int send_status; struct retention_updated rotate_data; @@ -906,7 +903,7 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d rotate_data.claim_id = claim_id; rotate_data.node_id = strdupz(wc->node_id); - // time_t now = now_realtime_sec(); + time_t now = now_realtime_sec(); while (sqlite3_step(res) == SQLITE_ROW) { if (!update_every || update_every != (uint32_t)sqlite3_column_int(res, 1)) { if (update_every) { @@ -944,6 +941,24 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d if (likely(!rc && first_entry_t)) start_time = MIN(start_time, first_entry_t); + + if (memory_mode == RRD_MEMORY_MODE_DBENGINE && wc->chart_updates) { + int live = ((now - last_entry_t) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * update_every)); + if ((!live || !first_entry_t) && (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP)) { + (void)aclk_upd_dimension_event( + wc, + claim_id, + (uuid_t *)sqlite3_column_blob(res, 0), + (const char *)(const char *)sqlite3_column_text(res, 3), + (const char *)(const char *)sqlite3_column_text(res, 4), + (const char *)(const char *)sqlite3_column_text(res, 2), + first_entry_t, + live ? 0 : last_entry_t, + &send_status); + if (!send_status) + dimension_update_count++; + } + } } if (update_every) { debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time); @@ -1053,8 +1068,7 @@ void aclk_send_dimension_update(RRDDIM *rd) time_t last_entry_t = rrddim_last_entry_t(rd); time_t now = now_realtime_sec(); - int live = ((now - rd->last_collected_time.tv_sec) < - MAX(RRDSET_MINIMUM_LIVE_MULTIPLIER * rd->update_every, rrdset_free_obsolete_time)); + int live = ((now - rd->last_collected_time.tv_sec) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every)); if (!live || rd->state->aclk_live_status != live || !first_entry_t) { (void)aclk_upd_dimension_event( @@ -1065,7 +1079,8 @@ void aclk_send_dimension_update(RRDDIM *rd) rd->name, rd->rrdset->id, first_entry_t, - live ? 0 : last_entry_t); + live ? 0 : last_entry_t, + NULL); if (!first_entry_t) debug( @@ -1180,6 +1195,44 @@ struct aclk_chart_sync_stats *aclk_get_chart_sync_stats(RRDHOST *host) buffer_free(sql); return aclk_statistics; } + +void sql_check_chart_liveness(RRDSET *st) { + RRDDIM *rd; + + if (unlikely(st->state->is_ar_chart)) + return; + + rrdset_rdlock(st); + if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { + if (likely(st->dimensions && st->counter_done && !queue_chart_to_aclk(st))) { + debug(D_ACLK_SYNC,"Check chart liveness [%s] submit chart definition", st->name); + rrdset_flag_set(st, RRDSET_FLAG_ACLK); + } + } + else + debug(D_ACLK_SYNC,"Check chart liveness [%s] chart definition already submitted", st->name); + time_t mark = now_realtime_sec(); + + debug(D_ACLK_SYNC,"Check chart liveness [%s] scanning dimensions", st->name); + rrddim_foreach_read(rd, st) { + if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) { + int live = (mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every; + if (unlikely(live != rd->state->aclk_live_status)) { + if (likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { + if (likely(!queue_dimension_to_aclk(rd))) { + debug(D_ACLK_SYNC,"Dimension change [%s] on [%s] from live %d --> %d", rd->id, rd->rrdset->name, rd->state->aclk_live_status, live); + rd->state->aclk_live_status = live; + rrddim_flag_set(rd, RRDDIM_FLAG_ACLK); + } + } + } + else + debug(D_ACLK_SYNC,"Dimension check [%s] on [%s] liveness matches", rd->id, st->name); + } + } + rrdset_unlock(st); +} + #endif //ENABLE_NEW_CLOUD_PROTOCOL // ST is read locked diff --git a/database/sqlite/sqlite_aclk_chart.h b/database/sqlite/sqlite_aclk_chart.h index fee5ecca2f..1d25de24ed 100644 --- a/database/sqlite/sqlite_aclk_chart.h +++ b/database/sqlite/sqlite_aclk_chart.h @@ -12,8 +12,12 @@ typedef enum payload_type { extern sqlite3 *db_meta; -#ifndef RRDSET_MINIMUM_LIVE_MULTIPLIER -#define RRDSET_MINIMUM_LIVE_MULTIPLIER (1.5) +#ifndef RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER +#define RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER (3) +#endif + +#ifndef ACLK_MAX_DIMENSION_CLEANUP +#define ACLK_MAX_DIMENSION_CLEANUP (500) #endif struct aclk_chart_sync_stats { @@ -52,4 +56,5 @@ void aclk_process_dimension_deletion(struct aclk_database_worker_config *wc, str uint32_t sql_get_pending_count(struct aclk_database_worker_config *wc); void aclk_send_dimension_update(RRDDIM *rd); struct aclk_chart_sync_stats *aclk_get_chart_sync_stats(RRDHOST *host); +void sql_check_chart_liveness(RRDSET *st); #endif //NETDATA_SQLITE_ACLK_CHART_H -- cgit v1.2.3