diff options
author | Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> | 2022-05-17 16:58:49 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-17 16:58:49 +0300 |
commit | 3b8d4c21e5dd7abbc0b8b5c5b5b0bc826c229abc (patch) | |
tree | 3c54ededf79d562348d81604bfef9896aebcbd64 /database | |
parent | 108bfb7918a86255bf79119f9b8eee604943ae76 (diff) |
Adjust the dimension liveness status check (#12933)
* Mark a chart to be exposed only if dimension is created or metadata changes
* Add a calculate liveness for the dimension for collected to non collected (live -> stale) and vice versa
* queue_dimension_to_aclk will have the rrdset and either 0 or last collected time
If 0 then it will be marked as live else it will be marked as stale and last collected time will be sent to the cloud
* Add an extra parameter to indicate if the payload check should be done in the database or it has been done already
* Queue dimension sets dimension liveness and queues the exact payload to store in the database
* Fix compilation error when --disable-cloud is specified
Diffstat (limited to 'database')
-rw-r--r-- | database/rrd.h | 4 | ||||
-rw-r--r-- | database/rrddim.c | 30 | ||||
-rw-r--r-- | database/rrdhost.c | 2 | ||||
-rw-r--r-- | database/rrdset.c | 10 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_chart.c | 139 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_chart.h | 12 |
6 files changed, 132 insertions, 65 deletions
diff --git a/database/rrd.h b/database/rrd.h index a2faf572d3..9e67e93212 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -1274,7 +1274,9 @@ extern void rrddim_isnot_obsolete(RRDSET *st, RRDDIM *rd); extern collected_number rrddim_set_by_pointer(RRDSET *st, RRDDIM *rd, collected_number value); extern collected_number rrddim_set(RRDSET *st, const char *id, collected_number value); - +#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL) +extern time_t calc_dimension_liveness(RRDDIM *rd, time_t now); +#endif extern long align_entries_to_pagesize(RRD_MEMORY_MODE mode, long entries); // ---------------------------------------------------------------------------- diff --git a/database/rrddim.c b/database/rrddim.c index 0a25e17939..31982c38fe 100644 --- a/database/rrddim.c +++ b/database/rrddim.c @@ -135,15 +135,31 @@ void rrdcalc_link_to_rrddim(RRDDIM *rd, RRDSET *st, RRDHOST *host) { } } +// Return either +// 0 : Dimension is live +// last collected time : Dimension is not live + +#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL) +time_t calc_dimension_liveness(RRDDIM *rd, time_t now) +{ + time_t last_updated = rd->last_collected_time.tv_sec; + int live; + if (rd->state->aclk_live_status == 1) + live = + ((now - last_updated) < + MIN(rrdset_free_obsolete_time, RRDSET_MINIMUM_DIM_OFFLINE_MULTIPLIER * rd->update_every)); + else + live = ((now - last_updated) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every); + return live ? 0 : last_updated; +} +#endif + RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collected_number multiplier, collected_number divisor, RRD_ALGORITHM algorithm, RRD_MEMORY_MODE memory_mode) { RRDHOST *host = st->rrdhost; rrdset_wrlock(st); - rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK); - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); - RRDDIM *rd = rrddim_find(st, id); if(unlikely(rd)) { debug(D_RRD_CALLS, "Cannot create rrd dimension '%s/%s', it already exists.", st->id, name?name:"<NONAME>"); @@ -168,11 +184,19 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte debug(D_METADATALOG, "DIMENSION [%s] metadata updated", rd->id); (void)sql_store_dimension(&rd->state->metric_uuid, rd->rrdset->chart_uuid, rd->id, rd->name, rd->multiplier, rd->divisor, rd->algorithm); +#if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL) + queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, now_realtime_sec())); +#endif + rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK); + rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); } rrdset_unlock(st); return rd; } + rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK); + rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); + char filename[FILENAME_MAX + 1]; char fullfilename[FILENAME_MAX + 1]; diff --git a/database/rrdhost.c b/database/rrdhost.c index be18faf2ce..08503a4854 100644 --- a/database/rrdhost.c +++ b/database/rrdhost.c @@ -1489,7 +1489,7 @@ restart_after_removal: } #if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL) else - queue_dimension_to_aclk(rd); + queue_dimension_to_aclk(rd, rd->last_collected_time.tv_sec); #endif } last = rd; diff --git a/database/rrdset.c b/database/rrdset.c index 3f1ec9fde4..0d512f8361 100644 --- a/database/rrdset.c +++ b/database/rrdset.c @@ -1798,12 +1798,8 @@ after_second_database_work: #if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL) if (likely(!st->state->is_ar_chart)) { - if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN) && likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { - int live = - ((mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every); - if (unlikely(live != rd->state->aclk_live_status)) - queue_dimension_to_aclk(rd); - } + if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN) && likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) + queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, mark)); } #endif if(unlikely(!rd->updated)) @@ -1906,7 +1902,7 @@ after_second_database_work: } else { /* Do not delete this dimension */ #if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL) - queue_dimension_to_aclk(rd); + queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, mark)); #endif last = rd; rd = rd->next; diff --git a/database/sqlite/sqlite_aclk_chart.c b/database/sqlite/sqlite_aclk_chart.c index 17f1a29a2d..c69d04e657 100644 --- a/database/sqlite/sqlite_aclk_chart.c +++ b/database/sqlite/sqlite_aclk_chart.c @@ -58,19 +58,31 @@ bind_fail: return send_status; } -static int aclk_add_chart_payload(struct aclk_database_worker_config *wc, uuid_t *uuid, char *claim_id, - ACLK_PAYLOAD_TYPE payload_type, void *payload, size_t payload_size, time_t *send_status) +static int aclk_add_chart_payload( + struct aclk_database_worker_config *wc, + uuid_t *uuid, + char *claim_id, + ACLK_PAYLOAD_TYPE payload_type, + void *payload, + size_t payload_size, + time_t *send_status, + int check_sent) { static __thread sqlite3_stmt *res_chart = NULL; int rc; time_t date_submitted; - date_submitted = payload_sent(wc->uuid_str, uuid, payload, payload_size); - if (send_status) - *send_status = date_submitted; - if (date_submitted) + if (unlikely(!payload)) return 0; + if (check_sent) { + date_submitted = payload_sent(wc->uuid_str, uuid, payload, payload_size); + if (send_status) + *send_status = date_submitted; + if (date_submitted) + return 0; + } + if (unlikely(!res_chart)) { char sql[ACLK_SYNC_QUERY_SIZE]; snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, @@ -160,7 +172,7 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat size_t size; char *payload = generate_chart_instance_updated(&size, &chart_payload); if (likely(payload)) - rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size, NULL); + rc = aclk_add_chart_payload(wc, st->chart_uuid, claim_id, ACLK_PAYLOAD_CHART, (void *) payload, size, NULL, 1); freez(payload); chart_instance_updated_destroy(&chart_payload); } @@ -198,7 +210,7 @@ static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *w dim_payload.last_timestamp.tv_sec = last_time; char *payload = generate_chart_dimension_updated(&size, &dim_payload); if (likely(payload)) - rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size, send_status); + rc = aclk_add_chart_payload(wc, dim_uuid, claim_id, ACLK_PAYLOAD_DIMENSION, (void *)payload, size, send_status, 1); freez(payload); return rc; } @@ -272,39 +284,22 @@ bind_fail: int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) { - int rc = 0; + int rc = 1; CHECK_SQLITE_CONNECTION(db_meta); - char *claim_id = is_agent_claimed(); - - RRDDIM *rd = cmd.data; - - if (likely(claim_id)) { - time_t send_status = 0; - time_t now = now_realtime_sec(); - - time_t first_t = rd->state->query_ops.oldest_time(rd); - time_t last_t = rd->state->query_ops.latest_time(rd); - - int live = ((now - last_t) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every)); + struct aclk_chart_dimension_data *aclk_cd_data = cmd.data; - rc = aclk_upd_dimension_event( - wc, - claim_id, - &rd->state->metric_uuid, - rd->id, - rd->name, - rd->rrdset->id, - first_t, - live ? 0 : last_t, - &send_status); + char *claim_id = is_agent_claimed(); + if (!claim_id) + goto cleanup; - if (!send_status) - rd->state->aclk_live_status = live; + rc = aclk_add_chart_payload(wc, &aclk_cd_data->uuid, claim_id, ACLK_PAYLOAD_DIMENSION, + (void *) aclk_cd_data->payload, aclk_cd_data->payload_size, NULL, 0); - freez(claim_id); - } - rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK); + freez(claim_id); +cleanup: + freez(aclk_cd_data->payload); + freez(aclk_cd_data); return rc; } @@ -1091,16 +1086,63 @@ void sql_get_last_chart_sequence(struct aclk_database_worker_config *wc) return; } -void queue_dimension_to_aclk(RRDDIM *rd) +void queue_dimension_to_aclk(RRDDIM *rd, time_t last_updated) { - if (rrddim_flag_check(rd, RRDDIM_FLAG_ACLK)) + int live = !last_updated; + + if (likely(rd->state->aclk_live_status == live)) + return; + + rd->state->aclk_live_status = live; + + struct aclk_database_worker_config *wc = rd->rrdset->rrdhost->dbsync_worker; + if (unlikely(!wc)) return; - rrddim_flag_set(rd, RRDDIM_FLAG_ACLK); - int rc = sql_queue_chart_payload((struct aclk_database_worker_config *) rd->rrdset->rrdhost->dbsync_worker, - rd, ACLK_DATABASE_ADD_DIMENSION); - if (unlikely(rc)) - rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK); + char *claim_id = is_agent_claimed(); + if (unlikely(!claim_id)) + return; + + struct chart_dimension_updated dim_payload; + memset(&dim_payload, 0, sizeof(dim_payload)); + dim_payload.node_id = wc->node_id; + dim_payload.claim_id = claim_id; + dim_payload.name = rd->name; + dim_payload.id = rd->id; + dim_payload.chart_id = rd->rrdset->id; + dim_payload.created_at.tv_sec = rd->state->query_ops.oldest_time(rd); + dim_payload.last_timestamp.tv_sec = last_updated; + + size_t size = 0; + char *payload = generate_chart_dimension_updated(&size, &dim_payload); + + freez(claim_id); + if (unlikely(!payload)) + return; + + time_t date_submitted = payload_sent(wc->uuid_str, &rd->state->metric_uuid, payload, size); + if (date_submitted) { + freez(payload); + return; + } + + struct aclk_chart_dimension_data *aclk_cd_data = mallocz(sizeof(*aclk_cd_data)); + uuid_copy(aclk_cd_data->uuid, rd->state->metric_uuid); + aclk_cd_data->payload = payload; + aclk_cd_data->payload_size = size; + + struct aclk_database_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + + cmd.opcode = ACLK_DATABASE_ADD_DIMENSION; + cmd.data = aclk_cd_data; + int rc = aclk_database_enq_cmd_noblock(wc, &cmd); + + if (unlikely(rc)) { + freez(aclk_cd_data->payload); + freez(aclk_cd_data); + rd->state->aclk_live_status = !live; + } return; } @@ -1270,15 +1312,8 @@ void sql_check_chart_liveness(RRDSET *st) { debug(D_ACLK_SYNC,"Check chart liveness [%s] scanning dimensions", st->name); rrddim_foreach_read(rd, st) { - if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) { - int live = (mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every; - if (unlikely(live != rd->state->aclk_live_status)) { - debug(D_ACLK_SYNC,"Dimension change [%s] on [%s] from live %d --> %d", rd->id, rd->rrdset->name, rd->state->aclk_live_status, live); - queue_dimension_to_aclk(rd); - } - else - debug(D_ACLK_SYNC,"Dimension check [%s] on [%s] liveness matches", rd->id, st->name); - } + if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) + queue_dimension_to_aclk(rd, calc_dimension_liveness(rd, mark)); } rrdset_unlock(st); } diff --git a/database/sqlite/sqlite_aclk_chart.h b/database/sqlite/sqlite_aclk_chart.h index 75aff3af30..f98cf55c5c 100644 --- a/database/sqlite/sqlite_aclk_chart.h +++ b/database/sqlite/sqlite_aclk_chart.h @@ -16,10 +16,20 @@ extern sqlite3 *db_meta; #define RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER (3) #endif +#ifndef RRDSET_MINIMUM_DIM_OFFLINE_MULTIPLIER +#define RRDSET_MINIMUM_DIM_OFFLINE_MULTIPLIER (30) +#endif + #ifndef ACLK_MAX_DIMENSION_CLEANUP #define ACLK_MAX_DIMENSION_CLEANUP (500) #endif +struct aclk_chart_dimension_data { + uuid_t uuid; + char *payload; + size_t payload_size; +}; + struct aclk_chart_sync_stats { int updates; uint64_t batch_id; @@ -37,7 +47,7 @@ struct aclk_chart_sync_stats { }; extern int queue_chart_to_aclk(RRDSET *st); -extern void queue_dimension_to_aclk(RRDDIM *rd); +extern void queue_dimension_to_aclk(RRDDIM *rd, time_t last_updated); extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id); int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); |