diff options
author | Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> | 2022-06-01 19:10:32 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-06-01 19:10:32 +0300 |
commit | c261a771cc0c93fe4e9fbb83e1be141406d314be (patch) | |
tree | 28bf18c89edb20711fe4d67ce15b797279cde38d /database/sqlite/sqlite_aclk.c | |
parent | 044d4c9d916fbc94a5a00324ab377d56b4937a29 (diff) |
Schedule retention message calculation to a worker thread (#13039)
* Move aclk_update_retention to the proper header file
* Do a scan but avoid going through all the dimensions if we have too much to delete -- do not generate a retention message in that case
* Schedule the retention calculation to a worker
* Adjust messages in the access log
* Fix compilation errors with --disable-cloud
Diffstat (limited to 'database/sqlite/sqlite_aclk.c')
-rw-r--r-- | database/sqlite/sqlite_aclk.c | 75 |
1 files changed, 70 insertions, 5 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c index baeedd4e6e..950856d9a1 100644 --- a/database/sqlite/sqlite_aclk.c +++ b/database/sqlite/sqlite_aclk.c @@ -34,6 +34,28 @@ const char *aclk_sync_config[] = { uv_mutex_t aclk_async_lock; struct aclk_database_worker_config *aclk_thread_head = NULL; +int retention_running = 0; + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +static void stop_retention_run() +{ + uv_mutex_lock(&aclk_async_lock); + retention_running = 0; + uv_mutex_unlock(&aclk_async_lock); +} + +static int request_retention_run() +{ + int rc = 0; + uv_mutex_lock(&aclk_async_lock); + if (unlikely(retention_running)) + rc = 1; + else + retention_running = 1; + uv_mutex_unlock(&aclk_async_lock); + return rc; +} +#endif int claimed() { @@ -318,9 +340,6 @@ static void timer_cb(uv_timer_t* handle) if (aclk_use_new_cloud_arch && aclk_connected) { if (wc->rotation_after && wc->rotation_after < now) { - cmd.opcode = ACLK_DATABASE_NODE_INFO; - aclk_database_enq_cmd_noblock(wc, &cmd); - cmd.opcode = ACLK_DATABASE_UPD_RETENTION; if (!aclk_database_enq_cmd_noblock(wc, &cmd)) wc->rotation_after += ACLK_DATABASE_ROTATION_INTERVAL; @@ -353,6 +372,38 @@ static void timer_cb(uv_timer_t* handle) #endif } + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +void after_send_retention(uv_work_t *req, int status) +{ + struct aclk_database_worker_config *wc = req->data; + (void)status; + stop_retention_run(); + wc->retention_running = 0; + + struct aclk_database_cmd cmd; + memset(&cmd, 0, sizeof(cmd)); + cmd.opcode = ACLK_DATABASE_DIM_DELETION; + if (aclk_database_enq_cmd_noblock(wc, &cmd)) + info("Failed to queue a dimension deletion message"); + + cmd.opcode = ACLK_DATABASE_NODE_INFO; + if (aclk_database_enq_cmd_noblock(wc, &cmd)) + info("Failed to queue a node update info message"); +} + + +static void send_retention(uv_work_t *req) +{ + struct aclk_database_worker_config *wc = req->data; + + if (unlikely(wc->is_shutting_down)) + return; + + aclk_update_retention(wc); +} +#endif + #define MAX_CMD_BATCH_SIZE (256) void aclk_database_worker(void *arg) @@ -429,6 +480,7 @@ void aclk_database_worker(void *arg) memset(&cmd, 0, sizeof(cmd)); #ifdef ENABLE_NEW_CLOUD_PROTOCOL + uv_work_t retention_work; sql_get_last_chart_sequence(wc); wc->chart_payload_count = sql_get_pending_count(wc); if (!wc->chart_payload_count) @@ -440,6 +492,7 @@ void aclk_database_worker(void *arg) wc->rotation_after = wc->startup_time + ACLK_DATABASE_ROTATION_DELAY; debug(D_ACLK_SYNC,"Node %s reports pending message count = %u", wc->node_id, wc->chart_payload_count); + while (likely(!netdata_exit)) { worker_is_idle(); uv_run(loop, UV_RUN_DEFAULT); @@ -538,9 +591,21 @@ void aclk_database_worker(void *arg) aclk_process_dimension_deletion(wc, cmd); break; case ACLK_DATABASE_UPD_RETENTION: + if (unlikely(wc->retention_running)) + break; + + if (unlikely(request_retention_run())) { + wc->rotation_after = now_realtime_sec() + ACLK_DATABASE_RETENTION_RETRY; + break; + } + debug(D_ACLK_SYNC,"Sending retention info for %s", wc->uuid_str); - aclk_update_retention(wc, cmd); - aclk_process_dimension_deletion(wc, cmd); + retention_work.data = wc; + wc->retention_running = 1; + if (unlikely(uv_queue_work(loop, &retention_work, send_retention, after_send_retention))) { + wc->retention_running = 0; + stop_retention_run(); + } break; // NODE_INSTANCE DETECTION |