summaryrefslogtreecommitdiffstats
path: root/database/sqlite/sqlite_aclk.c
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2022-06-01 19:10:32 +0300
committerGitHub <noreply@github.com>2022-06-01 19:10:32 +0300
commitc261a771cc0c93fe4e9fbb83e1be141406d314be (patch)
tree28bf18c89edb20711fe4d67ce15b797279cde38d /database/sqlite/sqlite_aclk.c
parent044d4c9d916fbc94a5a00324ab377d56b4937a29 (diff)
Schedule retention message calculation to a worker thread (#13039)
* Move aclk_update_retention to the proper header file * Do a scan but avoid going through all the dimensions if we have too much to delete -- do not generate a retention message in that case * Schedule the retention calculation to a worker * Adjust messages in the access log * Fix compilation errors with --disable-cloud
Diffstat (limited to 'database/sqlite/sqlite_aclk.c')
-rw-r--r--database/sqlite/sqlite_aclk.c75
1 files changed, 70 insertions, 5 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c
index baeedd4e6e..950856d9a1 100644
--- a/database/sqlite/sqlite_aclk.c
+++ b/database/sqlite/sqlite_aclk.c
@@ -34,6 +34,28 @@ const char *aclk_sync_config[] = {
uv_mutex_t aclk_async_lock;
struct aclk_database_worker_config *aclk_thread_head = NULL;
+int retention_running = 0;
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+static void stop_retention_run()
+{
+ uv_mutex_lock(&aclk_async_lock);
+ retention_running = 0;
+ uv_mutex_unlock(&aclk_async_lock);
+}
+
+static int request_retention_run()
+{
+ int rc = 0;
+ uv_mutex_lock(&aclk_async_lock);
+ if (unlikely(retention_running))
+ rc = 1;
+ else
+ retention_running = 1;
+ uv_mutex_unlock(&aclk_async_lock);
+ return rc;
+}
+#endif
int claimed()
{
@@ -318,9 +340,6 @@ static void timer_cb(uv_timer_t* handle)
if (aclk_use_new_cloud_arch && aclk_connected) {
if (wc->rotation_after && wc->rotation_after < now) {
- cmd.opcode = ACLK_DATABASE_NODE_INFO;
- aclk_database_enq_cmd_noblock(wc, &cmd);
-
cmd.opcode = ACLK_DATABASE_UPD_RETENTION;
if (!aclk_database_enq_cmd_noblock(wc, &cmd))
wc->rotation_after += ACLK_DATABASE_ROTATION_INTERVAL;
@@ -353,6 +372,38 @@ static void timer_cb(uv_timer_t* handle)
#endif
}
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+void after_send_retention(uv_work_t *req, int status)
+{
+ struct aclk_database_worker_config *wc = req->data;
+ (void)status;
+ stop_retention_run();
+ wc->retention_running = 0;
+
+ struct aclk_database_cmd cmd;
+ memset(&cmd, 0, sizeof(cmd));
+ cmd.opcode = ACLK_DATABASE_DIM_DELETION;
+ if (aclk_database_enq_cmd_noblock(wc, &cmd))
+ info("Failed to queue a dimension deletion message");
+
+ cmd.opcode = ACLK_DATABASE_NODE_INFO;
+ if (aclk_database_enq_cmd_noblock(wc, &cmd))
+ info("Failed to queue a node update info message");
+}
+
+
+static void send_retention(uv_work_t *req)
+{
+ struct aclk_database_worker_config *wc = req->data;
+
+ if (unlikely(wc->is_shutting_down))
+ return;
+
+ aclk_update_retention(wc);
+}
+#endif
+
#define MAX_CMD_BATCH_SIZE (256)
void aclk_database_worker(void *arg)
@@ -429,6 +480,7 @@ void aclk_database_worker(void *arg)
memset(&cmd, 0, sizeof(cmd));
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ uv_work_t retention_work;
sql_get_last_chart_sequence(wc);
wc->chart_payload_count = sql_get_pending_count(wc);
if (!wc->chart_payload_count)
@@ -440,6 +492,7 @@ void aclk_database_worker(void *arg)
wc->rotation_after = wc->startup_time + ACLK_DATABASE_ROTATION_DELAY;
debug(D_ACLK_SYNC,"Node %s reports pending message count = %u", wc->node_id, wc->chart_payload_count);
+
while (likely(!netdata_exit)) {
worker_is_idle();
uv_run(loop, UV_RUN_DEFAULT);
@@ -538,9 +591,21 @@ void aclk_database_worker(void *arg)
aclk_process_dimension_deletion(wc, cmd);
break;
case ACLK_DATABASE_UPD_RETENTION:
+ if (unlikely(wc->retention_running))
+ break;
+
+ if (unlikely(request_retention_run())) {
+ wc->rotation_after = now_realtime_sec() + ACLK_DATABASE_RETENTION_RETRY;
+ break;
+ }
+
debug(D_ACLK_SYNC,"Sending retention info for %s", wc->uuid_str);
- aclk_update_retention(wc, cmd);
- aclk_process_dimension_deletion(wc, cmd);
+ retention_work.data = wc;
+ wc->retention_running = 1;
+ if (unlikely(uv_queue_work(loop, &retention_work, send_retention, after_send_retention))) {
+ wc->retention_running = 0;
+ stop_retention_run();
+ }
break;
// NODE_INSTANCE DETECTION