diff options
author | Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> | 2022-05-03 21:38:12 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-03 21:38:12 +0300 |
commit | 154cf74d6a97e0712f0d6b6a970d3e5aaf439f0e (patch) | |
tree | 4745e1ef767fe097e0379bbcbe9ed85f51fd2d9b | |
parent | cbff54ac71f2a76d1f7e7c80d85328159857314a (diff) |
Improve agent cloud chart synchronization (#12655)
* Try to queue dimension always when:
Trying to clean obsolete charts
If chart has been sent and liveness apparently changed
* delay rotation and skip chart check if not send to cloud
* No need to CLEAR flag during database rotation
Do not clear chart ACLK status for dimension requests
* Change payload_sent to return timestamp of submitted message
* Clear the dimension ACLK flag if we are processing all the charts again
* Check if dimension is already queued to ACLK and ignore it
If queue fails then reset it to retry
Already try to queue the dimension
* Improve dimension cleanup during the retention message calculation
* Change queue_dimension_to_aclk to return void
* If no time range for this dimension then assume it is deleted
* Start streaming for inactive nodes
* Remove dead code
* Correctly report hostname in the access log
* Schedule a dimension deletion without trying to submit a message immediately
* Enable dimension cleanup -- also delete dimension if not found in the dbengine files
Free hostname
-rw-r--r-- | database/rrddim.c | 22 | ||||
-rw-r--r-- | database/rrdhost.c | 5 | ||||
-rw-r--r-- | database/rrdset.c | 23 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk.h | 2 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_chart.c | 174 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_chart.h | 2 |
6 files changed, 125 insertions, 103 deletions
diff --git a/database/rrddim.c b/database/rrddim.c index bca86b7373..c958d25615 100644 --- a/database/rrddim.c +++ b/database/rrddim.c @@ -194,9 +194,6 @@ void rrdcalc_link_to_rrddim(RRDDIM *rd, RRDSET *st, RRDHOST *host) { } } } -#ifdef ENABLE_ACLK - rrdset_flag_clear(st, RRDSET_FLAG_ACLK); -#endif } RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collected_number multiplier, @@ -441,9 +438,6 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte ml_new_dimension(rd); rrdset_unlock(st); -#ifdef ENABLE_ACLK - rrdset_flag_clear(st, RRDSET_FLAG_ACLK); -#endif return(rd); } @@ -516,10 +510,6 @@ void rrddim_free_custom(RRDSET *st, RRDDIM *rd, int db_rotated) freez(rd); break; } -#ifdef ENABLE_ACLK - if (db_rotated || RRD_MEMORY_MODE_DBENGINE != rrd_memory_mode) - rrdset_flag_clear(st, RRDSET_FLAG_ACLK); -#endif } @@ -539,9 +529,6 @@ int rrddim_hide(RRDSET *st, const char *id) { (void) sql_set_dimension_option(&rd->state->metric_uuid, "hidden"); rrddim_flag_set(rd, RRDDIM_FLAG_HIDDEN); -#ifdef ENABLE_ACLK - rrdset_flag_clear(st, RRDSET_FLAG_ACLK); -#endif return 0; } @@ -557,9 +544,6 @@ int rrddim_unhide(RRDSET *st, const char *id) { (void) sql_set_dimension_option(&rd->state->metric_uuid, NULL); rrddim_flag_clear(rd, RRDDIM_FLAG_HIDDEN); -#ifdef ENABLE_ACLK - rrdset_flag_clear(st, RRDSET_FLAG_ACLK); -#endif return 0; } @@ -572,18 +556,12 @@ inline void rrddim_is_obsolete(RRDSET *st, RRDDIM *rd) { } rrddim_flag_set(rd, RRDDIM_FLAG_OBSOLETE); rrdset_flag_set(st, RRDSET_FLAG_OBSOLETE_DIMENSIONS); -#ifdef ENABLE_ACLK - rrdset_flag_clear(st, RRDSET_FLAG_ACLK); -#endif } inline void rrddim_isnot_obsolete(RRDSET *st __maybe_unused, RRDDIM *rd) { debug(D_RRD_CALLS, "rrddim_isnot_obsolete() for chart %s, dimension %s", st->name, rd->name); rrddim_flag_clear(rd, RRDDIM_FLAG_OBSOLETE); -#ifdef ENABLE_ACLK - rrdset_flag_clear(st, RRDSET_FLAG_ACLK); -#endif } // ---------------------------------------------------------------------------- diff --git a/database/rrdhost.c b/database/rrdhost.c index b0874e3d3d..45ee2d08d8 100644 --- a/database/rrdhost.c +++ b/database/rrdhost.c @@ -1488,9 +1488,8 @@ restart_after_removal: continue; } #if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL) - else { - aclk_send_dimension_update(rd); - } + else + queue_dimension_to_aclk(rd); #endif } last = rd; diff --git a/database/rrdset.c b/database/rrdset.c index 820c958f1b..af36174e78 100644 --- a/database/rrdset.c +++ b/database/rrdset.c @@ -1368,8 +1368,9 @@ void rrdset_done(RRDSET *st) { #ifdef ENABLE_ACLK if (likely(!st->state->is_ar_chart)) { if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { - if (likely(st->dimensions && st->counter_done && !queue_chart_to_aclk(st))) + if (likely(st->dimensions && st->counter_done && !queue_chart_to_aclk(st))) { rrdset_flag_set(st, RRDSET_FLAG_ACLK); + } } } #endif @@ -1796,20 +1797,14 @@ after_second_database_work: continue; #if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL) - if (likely(!st->state->is_ar_chart)) { - if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) { - int live = - ((mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every); - if (unlikely(live != rd->state->aclk_live_status)) { - if (likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { - if (likely(!queue_dimension_to_aclk(rd))) { - rd->state->aclk_live_status = live; - rrddim_flag_set(rd, RRDDIM_FLAG_ACLK); - } - } + if (likely(!st->state->is_ar_chart)) { + if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN) && likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { + int live = + ((mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every); + if (unlikely(live != rd->state->aclk_live_status)) + queue_dimension_to_aclk(rd); } } - } #endif if(unlikely(!rd->updated)) continue; @@ -1911,7 +1906,7 @@ after_second_database_work: } else { /* Do not delete this dimension */ #if defined(ENABLE_ACLK) && defined(ENABLE_NEW_CLOUD_PROTOCOL) - aclk_send_dimension_update(rd); + queue_dimension_to_aclk(rd); #endif last = rd; rd = rd->next; diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h index 792c2d21ed..77cda30908 100644 --- a/database/sqlite/sqlite_aclk.h +++ b/database/sqlite/sqlite_aclk.h @@ -16,7 +16,7 @@ #endif #define ACLK_MAX_ALERT_UPDATES (5) #define ACLK_DATABASE_CLEANUP_FIRST (60) -#define ACLK_DATABASE_ROTATION_DELAY (60) +#define ACLK_DATABASE_ROTATION_DELAY (180) #define ACLK_DATABASE_CLEANUP_INTERVAL (3600) #define ACLK_DATABASE_ROTATION_INTERVAL (3600) #define ACLK_DELETE_ACK_INTERNAL (600) diff --git a/database/sqlite/sqlite_aclk_chart.c b/database/sqlite/sqlite_aclk_chart.c index 7afa1d451e..faebe5af3c 100644 --- a/database/sqlite/sqlite_aclk_chart.c +++ b/database/sqlite/sqlite_aclk_chart.c @@ -22,20 +22,20 @@ sql_queue_chart_payload(struct aclk_database_worker_config *wc, void *data, enum return rc; } -static int payload_sent(char *uuid_str, uuid_t *uuid, void *payload, size_t payload_size) +static time_t payload_sent(char *uuid_str, uuid_t *uuid, void *payload, size_t payload_size) { static __thread sqlite3_stmt *res = NULL; int rc; - int send_status = 0; + time_t send_status = 0; if (unlikely(!res)) { char sql[ACLK_SYNC_QUERY_SIZE]; - snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "SELECT 1 FROM aclk_chart_latest_%s acl, aclk_chart_payload_%s acp " - "WHERE acl.unique_id = acp.unique_id AND acl.uuid = @uuid AND acp.payload = @payload;", - uuid_str, uuid_str); + snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, "SELECT acl.date_submitted FROM aclk_chart_latest_%s acl, aclk_chart_payload_%s acp " + "WHERE acl.unique_id = acp.unique_id AND acl.uuid = @uuid AND acp.payload = @payload;", + uuid_str, uuid_str); rc = prepare_statement(db_meta, sql, &res); if (rc != SQLITE_OK) { - error_report("Failed to prepare statement to check payload data"); + error_report("Failed to prepare statement to check payload data on %s", sql); return 0; } } @@ -49,7 +49,7 @@ static int payload_sent(char *uuid_str, uuid_t *uuid, void *payload, size_t payl goto bind_fail; while (sqlite3_step(res) == SQLITE_ROW) { - send_status = sqlite3_column_int(res, 0); + send_status = (time_t) sqlite3_column_int64(res, 0); } bind_fail: @@ -59,22 +59,23 @@ bind_fail: } static int aclk_add_chart_payload(struct aclk_database_worker_config *wc, uuid_t *uuid, char *claim_id, - ACLK_PAYLOAD_TYPE payload_type, void *payload, size_t payload_size, int *send_status) + ACLK_PAYLOAD_TYPE payload_type, void *payload, size_t payload_size, time_t *send_status) { static __thread sqlite3_stmt *res_chart = NULL; int rc; + time_t date_submitted; - rc = payload_sent(wc->uuid_str, uuid, payload, payload_size); + date_submitted = payload_sent(wc->uuid_str, uuid, payload, payload_size); if (send_status) - *send_status = rc; - if (rc == 1) + *send_status = date_submitted; + if (date_submitted) return 0; if (unlikely(!res_chart)) { char sql[ACLK_SYNC_QUERY_SIZE]; snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1, - "INSERT INTO aclk_chart_payload_%s (unique_id, uuid, claim_id, date_created, type, payload) " \ - "VALUES (@unique_id, @uuid, @claim_id, strftime('%%s','now'), @type, @payload);", wc->uuid_str); + "INSERT INTO aclk_chart_payload_%s (unique_id, uuid, claim_id, date_created, type, payload) " \ + "VALUES (@unique_id, @uuid, @claim_id, strftime('%%s','now'), @type, @payload);", wc->uuid_str); rc = prepare_statement(db_meta, sql, &res_chart); if (rc != SQLITE_OK) { error_report("Failed to prepare statement to store chart payload data"); @@ -168,7 +169,7 @@ int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_dat static inline int aclk_upd_dimension_event(struct aclk_database_worker_config *wc, char *claim_id, uuid_t *dim_uuid, const char *dim_id, const char *dim_name, const char *chart_type_id, time_t first_time, time_t last_time, - int *send_status) + time_t *send_status) { int rc = 0; size_t size; @@ -279,7 +280,7 @@ int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk RRDDIM *rd = cmd.data; if (likely(claim_id)) { - int send_status = 0; + time_t send_status = 0; time_t now = now_realtime_sec(); time_t first_t = rd->state->query_ops.oldest_time(rd); @@ -337,6 +338,12 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d char sql[ACLK_SYNC_QUERY_SIZE]; static __thread sqlite3_stmt *res = NULL; + char *hostname = NULL; + if (wc->host) + hostname = strdupz(wc->host->hostname); + else + hostname = get_hostname_by_node_id(wc->node_id); + if (unlikely(!res)) { snprintfz(sql,ACLK_SYNC_QUERY_SIZE-1,"SELECT ac.sequence_id, acp.payload, ac.date_created, ac.type, ac.uuid " \ "FROM aclk_chart_%s ac, aclk_chart_payload_%s acp " \ @@ -419,7 +426,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d log_access( "ACLK RES [%s (%s)]: CHARTS SENT from %" PRIu64 " to %" PRIu64 " batch=%" PRIu64, wc->node_id, - wc->host ? wc->host->hostname : "N/A", + hostname ? hostname : "N/A", first_sequence, last_sequence, wc->batch_id); @@ -440,7 +447,7 @@ void aclk_send_chart_event(struct aclk_database_worker_config *wc, struct aclk_d log_access( "ACLK STA [%s (%s)]: Sync of charts and dimensions done in %ld seconds.", wc->node_id, - wc->host ? wc->host->hostname : "N/A", + hostname ? hostname : "N/A", now_realtime_sec() - wc->startup_time); } @@ -459,6 +466,7 @@ bind_fail: error_report("Failed to reset statement when pushing chart events, rc = %d", rc); freez(claim_id); + freez(hostname); return; } @@ -583,8 +591,13 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl cmd.param1); db_execute(buffer_tostring(sql)); if (cmd.param1 == 1) { + char *hostname = NULL; + if (wc->host) + hostname = strdupz(wc->host->hostname); + else + hostname = get_hostname_by_node_id(wc->node_id); buffer_flush(sql); - log_access("ACLK REQ [%s (%s)]: Received chart full resync.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); + log_access("ACLK REQ [%s (%s)]: Received chart full resync.", wc->node_id, hostname? hostname : "N/A"); buffer_sprintf(sql, "DELETE FROM aclk_chart_payload_%s; DELETE FROM aclk_chart_%s; " \ "DELETE FROM aclk_chart_latest_%s;", wc->uuid_str, wc->uuid_str, wc->uuid_str); db_lock(); @@ -609,6 +622,7 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl RRDDIM *rd; rrddim_foreach_read(rd, st) { + rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK); rd->state->aclk_live_status = (rd->state->aclk_live_status == 0); } rrdset_unlock(st); @@ -616,6 +630,7 @@ void aclk_receive_chart_reset(struct aclk_database_worker_config *wc, struct acl rrdhost_unlock(host); } else error_report("ACLK synchronization thread for %s is not linked to HOST", wc->host_guid); + freez(hostname); } else { log_access( "ACLK STA [%s (%s)]: Restarting chart sync from sequence %" PRIu64, @@ -705,25 +720,28 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at if (unlikely(!node_id)) return; - // log_access("ACLK REQ [%s (N/A)]: CHARTS STREAM from %"PRIu64" t=%ld batch=%"PRIu64, node_id, - // sequence_id, created_at, batch_id); - uuid_t node_uuid; if (uuid_parse(node_id, node_uuid)) { log_access("ACLK REQ [%s (N/A)]: CHARTS STREAM ignored, invalid node id", node_id); return; } - struct aclk_database_worker_config *wc = NULL; + struct aclk_database_worker_config *wc = find_inactive_wc_by_node_id(node_id); rrd_rdlock(); RRDHOST *host = localhost; while(host) { - if (host->node_id && !(uuid_compare(*host->node_id, node_uuid))) { + if (wc || (host->node_id && !(uuid_compare(*host->node_id, node_uuid)))) { rrd_unlock(); - wc = (struct aclk_database_worker_config *)host->dbsync_worker ? - (struct aclk_database_worker_config *)host->dbsync_worker : - (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id); + if (!wc) + wc = (struct aclk_database_worker_config *)host->dbsync_worker ? + (struct aclk_database_worker_config *)host->dbsync_worker : + (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id); + char *hostname = NULL; if (likely(wc)) { + if (wc->host) + hostname = strdupz(wc->host->hostname); + else + hostname = get_hostname_by_node_id(node_id); wc->chart_reset_count++; __sync_synchronize(); wc->chart_updates = 0; @@ -733,7 +751,7 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at log_access( "ACLK REQ [%s (%s)]: CHARTS STREAM from %" PRIu64 " t=%ld resets=%d", wc->node_id, - wc->host ? wc->host->hostname : "N/A", + hostname ? hostname : "N/A", wc->chart_sequence_id, wc->chart_timestamp, wc->chart_reset_count); @@ -742,7 +760,7 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at "ACLK RES [%s (%s)]: CHARTS FULL RESYNC REQUEST " "remote_seq=%" PRIu64 " local_seq=%" PRIu64 " resets=%d ", wc->node_id, - wc->host ? wc->host->hostname : "N/A", + hostname ? hostname : "N/A", sequence_id, wc->chart_sequence_id, wc->chart_reset_count); @@ -766,7 +784,7 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at log_access( "ACLK REQ [%s (%s)]: CHART RESET from %" PRIu64 " t=%ld batch=%" PRIu64, wc->node_id, - wc->host ? wc->host->hostname : "N/A", + hostname ? hostname : "N/A", wc->chart_sequence_id, wc->chart_timestamp, wc->batch_id); @@ -775,20 +793,16 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at cmd.completion = NULL; aclk_database_enq_cmd(wc, &cmd); } else { -// log_access( -// "ACLK RES [%s (%s)]: CHARTS STREAM from %" PRIu64 -// " t=%ld resets=%d", -// wc->node_id, -// wc->host ? wc->host->hostname : "N/A", -// wc->chart_sequence_id, -// wc->chart_timestamp, -// wc->chart_reset_count); wc->chart_reset_count = 0; wc->chart_updates = 1; } } - } else - log_access("ACLK STA [%s (N/A)]: ACLK synchronization thread is not active.", node_id); + freez(hostname); + } else { + hostname = get_hostname_by_node_id(node_id); + log_access("ACLK STA [%s (%s)]: ACLK synchronization thread is not active.", node_id, hostname ? hostname : "N/A"); + freez(hostname); + } return; } host = host->next; @@ -887,7 +901,10 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d time_t last_entry_t; uint32_t update_every = 0; uint32_t dimension_update_count = 0; - int send_status; + uint32_t total_checked = 0; + uint32_t total_deleted= 0; + uint32_t total_stopped= 0; + time_t send_status; struct retention_updated rotate_data; @@ -942,23 +959,40 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d if (likely(!rc && first_entry_t)) start_time = MIN(start_time, first_entry_t); - if (memory_mode == RRD_MEMORY_MODE_DBENGINE && wc->chart_updates) { + if (memory_mode == RRD_MEMORY_MODE_DBENGINE && wc->chart_updates && (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP)) { int live = ((now - last_entry_t) < (RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * update_every)); - if ((!live || !first_entry_t) && (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP)) { - (void)aclk_upd_dimension_event( - wc, - claim_id, - (uuid_t *)sqlite3_column_blob(res, 0), - (const char *)(const char *)sqlite3_column_text(res, 3), - (const char *)(const char *)sqlite3_column_text(res, 4), - (const char *)(const char *)sqlite3_column_text(res, 2), - first_entry_t, - live ? 0 : last_entry_t, - &send_status); - if (!send_status) + if (rc) { + first_entry_t = 0; + last_entry_t = 0; + live = 0; + } + if (!wc->host || !first_entry_t) { + if (!first_entry_t) { + delete_dimension_uuid((uuid_t *)sqlite3_column_blob(res, 0)); + total_deleted++; dimension_update_count++; + } + else { + (void)aclk_upd_dimension_event( + wc, + claim_id, + (uuid_t *)sqlite3_column_blob(res, 0), + (const char *)(const char *)sqlite3_column_text(res, 3), + (const char *)(const char *)sqlite3_column_text(res, 4), + (const char *)(const char *)sqlite3_column_text(res, 2), + first_entry_t, + live ? 0 : last_entry_t, + &send_status); + + if (!send_status) { + if (last_entry_t) + total_stopped++; + dimension_update_count++; + } + } } } + total_checked++; } if (update_every) { debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time); @@ -970,7 +1004,16 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d rotate_data.interval_duration_count++; } + char *hostname = NULL; + if (!wc->host) + hostname = get_hostname_by_node_id(wc->node_id); + + log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING", + wc->node_id, wc->host ? wc->host->hostname : hostname ? hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped); + freez(hostname); + #ifdef NETDATA_INTERNAL_CHECKS + info("Retention update for %s (chart updates = %d)", wc->host_guid, wc->chart_updates); for (int i = 0; i < rotate_data.interval_duration_count; ++i) info( "Update for host %s (node %s) for %u Retention = %u", @@ -1048,11 +1091,17 @@ void sql_get_last_chart_sequence(struct aclk_database_worker_config *wc) return; } -int queue_dimension_to_aclk(RRDDIM *rd) +void queue_dimension_to_aclk(RRDDIM *rd) { + if (rrddim_flag_check(rd, RRDDIM_FLAG_ACLK)) + return; + + rrddim_flag_set(rd, RRDDIM_FLAG_ACLK); int rc = sql_queue_chart_payload((struct aclk_database_worker_config *) rd->rrdset->rrdhost->dbsync_worker, rd, ACLK_DATABASE_ADD_DIMENSION); - return rc; + if (unlikely(rc)) + rrddim_flag_clear(rd, RRDDIM_FLAG_ACLK); + return; } void aclk_send_dimension_update(RRDDIM *rd) @@ -1203,6 +1252,12 @@ void sql_check_chart_liveness(RRDSET *st) { return; rrdset_rdlock(st); + + if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { + rrdset_unlock(st); + return; + } + if (unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { if (likely(st->dimensions && st->counter_done && !queue_chart_to_aclk(st))) { debug(D_ACLK_SYNC,"Check chart liveness [%s] submit chart definition", st->name); @@ -1218,13 +1273,8 @@ void sql_check_chart_liveness(RRDSET *st) { if (!rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) { int live = (mark - rd->last_collected_time.tv_sec) < RRDSET_MINIMUM_DIM_LIVE_MULTIPLIER * rd->update_every; if (unlikely(live != rd->state->aclk_live_status)) { - if (likely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) { - if (likely(!queue_dimension_to_aclk(rd))) { - debug(D_ACLK_SYNC,"Dimension change [%s] on [%s] from live %d --> %d", rd->id, rd->rrdset->name, rd->state->aclk_live_status, live); - rd->state->aclk_live_status = live; - rrddim_flag_set(rd, RRDDIM_FLAG_ACLK); - } - } + debug(D_ACLK_SYNC,"Dimension change [%s] on [%s] from live %d --> %d", rd->id, rd->rrdset->name, rd->state->aclk_live_status, live); + queue_dimension_to_aclk(rd); } else debug(D_ACLK_SYNC,"Dimension check [%s] on [%s] liveness matches", rd->id, st->name); diff --git a/database/sqlite/sqlite_aclk_chart.h b/database/sqlite/sqlite_aclk_chart.h index b698c58275..75aff3af30 100644 --- a/database/sqlite/sqlite_aclk_chart.h +++ b/database/sqlite/sqlite_aclk_chart.h @@ -37,7 +37,7 @@ struct aclk_chart_sync_stats { }; extern int queue_chart_to_aclk(RRDSET *st); -extern int queue_dimension_to_aclk(RRDDIM *rd); +extern void queue_dimension_to_aclk(RRDDIM *rd); extern void sql_create_aclk_table(RRDHOST *host, uuid_t *host_uuid, uuid_t *node_id); int aclk_add_chart_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); int aclk_add_dimension_event(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); |