diff options
author | Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> | 2022-06-01 19:10:32 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-06-01 19:10:32 +0300 |
commit | c261a771cc0c93fe4e9fbb83e1be141406d314be (patch) | |
tree | 28bf18c89edb20711fe4d67ce15b797279cde38d | |
parent | 044d4c9d916fbc94a5a00324ab377d56b4937a29 (diff) |
Schedule retention message calculation to a worker thread (#13039)
* Move aclk_update_retention to the proper header file
* Do a scan but avoid going through all the dimensions if we have too much to delete -- do not generate a retention message in that case
* Schedule the retention calculation to a worker
* Adjust messages in the access log
* Fix compilation errors with --disable-cloud
-rw-r--r-- | database/sqlite/sqlite_aclk.c | 75 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk.h | 2 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_alert.c | 2 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_chart.c | 20 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_chart.h | 1 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_node.h | 1 |
6 files changed, 87 insertions, 14 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index baeedd4e6e..950856d9a1 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -34,6 +34,28 @@ const char *aclk_sync_config[] = { uv_mutex_t aclk_async_lock; struct aclk_database_worker_config *aclk_thread_head = NULL; +int retention_running = 0; + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +static void stop_retention_run() +{ + uv_mutex_lock(&aclk_async_lock); + retention_running = 0; + uv_mutex_unlock(&aclk_async_lock); +} + +static int request_retention_run() +{ + int rc = 0; + uv_mutex_lock(&aclk_async_lock); + if (unlikely(retention_running)) + rc = 1; + else + retention_running = 1; + uv_mutex_unlock(&aclk_async_lock); + return rc; +} +#endif int claimed() { @@ -318,9 +340,6 @@ static void timer_cb(uv_timer_t* handle) if (aclk_use_new_cloud_arch && aclk_connected) { if (wc->rotation_after && wc->rotation_after < now) { - cmd.opcode = ACLK_DATABASE_NODE_INFO; - aclk_database_enq_cmd_noblock(wc, &cmd); - cmd.opcode = ACLK_DATABASE_UPD_RETENTION; if (!aclk_database_enq_cmd_noblock(wc, &cmd)) wc->rotation_after += ACLK_DATABASE_ROTATION_INTERVAL; @@ -353,6 +372,38 @@ static void timer_cb(uv_timer_t* handle) #endif } + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +void after_send_retention(uv_work_t *req, int status) +{ + struct aclk_database_worker_config *wc = req->data; + (void)status; + stop_retention_run(); + wc->retention_running = 0; + + struct aclk_database_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + cmd.opcode = ACLK_DATABASE_DIM_DELETION; + if (aclk_database_enq_cmd_noblock(wc, &cmd)) + info("Failed to queue a dimension deletion message"); + + cmd.opcode = ACLK_DATABASE_NODE_INFO; + if (aclk_database_enq_cmd_noblock(wc, &cmd)) + info("Failed to queue a node update info message"); +} + + +static void send_retention(uv_work_t *req) +{ + struct aclk_database_worker_config *wc = req->data; + + if (unlikely(wc->is_shutting_down)) + return; + + aclk_update_retention(wc); +} +#endif + #define MAX_CMD_BATCH_SIZE (256) void aclk_database_worker(void *arg) @@ -429,6 +480,7 @@ void aclk_database_worker(void *arg) memset(&cmd, 0, sizeof(cmd)); #ifdef ENABLE_NEW_CLOUD_PROTOCOL + uv_work_t retention_work; sql_get_last_chart_sequence(wc); wc->chart_payload_count = sql_get_pending_count(wc); if (!wc->chart_payload_count) @@ -440,6 +492,7 @@ void aclk_database_worker(void *arg) wc->rotation_after = wc->startup_time + ACLK_DATABASE_ROTATION_DELAY; debug(D_ACLK_SYNC,"Node %s reports pending message count = %u", wc->node_id, wc->chart_payload_count); + while (likely(!netdata_exit)) { worker_is_idle(); uv_run(loop, UV_RUN_DEFAULT); @@ -538,9 +591,21 @@ void aclk_database_worker(void *arg) aclk_process_dimension_deletion(wc, cmd); break; case ACLK_DATABASE_UPD_RETENTION: + if (unlikely(wc->retention_running)) + break; + + if (unlikely(request_retention_run())) { + wc->rotation_after = now_realtime_sec() + ACLK_DATABASE_RETENTION_RETRY; + break; + } + debug(D_ACLK_SYNC,"Sending retention info for %s", wc->uuid_str); - aclk_update_retention(wc, cmd); - aclk_process_dimension_deletion(wc, cmd); + retention_work.data = wc; + wc->retention_running = 1; + if (unlikely(uv_queue_work(loop, &retention_work, send_retention, after_send_retention))) { + wc->retention_running = 0; + stop_retention_run(); + } break; // NODE_INSTANCE DETECTION diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h index 6854a05b86..37e3d4530d 100644 --- a/database/sqlite/sqlite_aclk.h +++ b/database/sqlite/sqlite_aclk.h @@ -17,6 +17,7 @@ #define ACLK_MAX_ALERT_UPDATES (5) #define ACLK_DATABASE_CLEANUP_FIRST (60) #define ACLK_DATABASE_ROTATION_DELAY (180) +#define ACLK_DATABASE_RETENTION_RETRY (60) #define ACLK_DATABASE_CLEANUP_INTERVAL (3600) #define ACLK_DATABASE_ROTATION_INTERVAL (3600) #define ACLK_DELETE_ACK_INTERNAL (600) @@ -197,6 +198,7 @@ struct aclk_database_worker_config { int node_info_send; int chart_pending; int chart_reset_count; + int retention_running; volatile unsigned is_shutting_down; volatile unsigned is_orphan; struct aclk_database_worker_config *next; diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c index 0003a69109..53c6c2a651 100644 --- a/database/sqlite/sqlite_aclk_alert.c +++ b/database/sqlite/sqlite_aclk_alert.c @@ -773,7 +773,7 @@ void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config db_execute(buffer_tostring(sql)); - log_access("ACLK STA [%s (%s)]: Queued removed alerts.", wc->node_id, wc->host ? wc->host->hostname : "N/A"); + log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, wc->host ? wc->host->hostname : "N/A"); buffer_free(sql); diff --git a/database/sqlite/sqlite_aclk_chart.c b/database/sqlite/sqlite_aclk_chart.c index c303c89ace..d506d53acc 100644 --- a/database/sqlite/sqlite_aclk_chart.c +++ b/database/sqlite/sqlite_aclk_chart.c @@ -566,7 +566,7 @@ void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_ error_report("Failed to ACK sequence id, rc = %d", rc); else log_access( - "ACLK STA [%s (%s)]: CHARTS ACKNOWLEDGED in the database upto %" PRIu64, + "ACLK STA [%s (%s)]: CHARTS ACKNOWLEDGED IN THE DATABASE UP TO %" PRIu64, wc->node_id, wc->host ? wc->host->hostname : "N/A", cmd.param1); @@ -847,9 +847,8 @@ failed: "SELECT distinct h.host_id, c.update_every, c.type||'.'||c.id FROM chart c, host h " \ "WHERE c.host_id = h.host_id AND c.host_id = @host_id ORDER BY c.update_every ASC;" -void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd) +void aclk_update_retention(struct aclk_database_worker_config *wc) { - UNUSED(cmd); int rc; if (!aclk_use_new_cloud_arch || !aclk_connected) @@ -916,7 +915,9 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d rotate_data.node_id = strdupz(wc->node_id); time_t now = now_realtime_sec(); - while (sqlite3_step(res) == SQLITE_ROW) { + while (sqlite3_step(res) == SQLITE_ROW && dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP) { + if (unlikely(netdata_exit)) + break; if (!update_every || update_every != (uint32_t)sqlite3_column_int(res, 1)) { if (update_every) { debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time); @@ -1003,8 +1004,12 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d if (!wc->host) hostname = get_hostname_by_node_id(wc->node_id); - log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING", - wc->node_id, wc->host ? wc->host->hostname : hostname ? hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped); + if (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP && !netdata_exit) + log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING", + wc->node_id, wc->host ? wc->host->hostname : hostname ? hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped); + else + log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE NOT SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING", + wc->node_id, wc->host ? wc->host->hostname : hostname ? hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped); freez(hostname); #ifdef NETDATA_INTERNAL_CHECKS @@ -1017,7 +1022,8 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d rotate_data.interval_durations[i].update_every, rotate_data.interval_durations[i].retention); #endif - aclk_retention_updated(&rotate_data); + if (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP && !netdata_exit) + aclk_retention_updated(&rotate_data); freez(rotate_data.node_id); freez(rotate_data.interval_durations); diff --git a/database/sqlite/sqlite_aclk_chart.h b/database/sqlite/sqlite_aclk_chart.h index fd8c139741..84325bf6c6 100644 --- a/database/sqlite/sqlite_aclk_chart.h +++ b/database/sqlite/sqlite_aclk_chart.h @@ -67,4 +67,5 @@ uint32_t sql_get_pending_count(struct aclk_database_worker_config *wc); void aclk_send_dimension_update(RRDDIM *rd); struct aclk_chart_sync_stats *aclk_get_chart_sync_stats(RRDHOST *host); void sql_check_chart_liveness(RRDSET *st); +void aclk_update_retention(struct aclk_database_worker_config *wc); #endif //NETDATA_SQLITE_ACLK_CHART_H diff --git a/database/sqlite/sqlite_aclk_node.h b/database/sqlite/sqlite_aclk_node.h index 9cb4115866..b8f8c6bbf4 100644 --- a/database/sqlite/sqlite_aclk_node.h +++ b/database/sqlite/sqlite_aclk_node.h @@ -4,5 +4,4 @@ #define NETDATA_SQLITE_ACLK_NODE_H void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); -void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd); #endif //NETDATA_SQLITE_ACLK_NODE_H |