summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2022-06-01 19:10:32 +0300
committerGitHub <noreply@github.com>2022-06-01 19:10:32 +0300
commitc261a771cc0c93fe4e9fbb83e1be141406d314be (patch)
tree28bf18c89edb20711fe4d67ce15b797279cde38d
parent044d4c9d916fbc94a5a00324ab377d56b4937a29 (diff)
Schedule retention message calculation to a worker thread (#13039)
* Move aclk_update_retention to the proper header file * Do a scan but avoid going through all the dimensions if we have too much to delete -- do not generate a retention message in that case * Schedule the retention calculation to a worker * Adjust messages in the access log * Fix compilation errors with --disable-cloud
-rw-r--r--database/sqlite/sqlite_aclk.c75
-rw-r--r--database/sqlite/sqlite_aclk.h2
-rw-r--r--database/sqlite/sqlite_aclk_alert.c2
-rw-r--r--database/sqlite/sqlite_aclk_chart.c20
-rw-r--r--database/sqlite/sqlite_aclk_chart.h1
-rw-r--r--database/sqlite/sqlite_aclk_node.h1
6 files changed, 87 insertions, 14 deletions
diff --git a/database/sqlite/sqlite_aclk.c b/database/sqlite/sqlite_aclk.c
index baeedd4e6e..950856d9a1 100644
--- a/database/sqlite/sqlite_aclk.c
+++ b/database/sqlite/sqlite_aclk.c
@@ -34,6 +34,28 @@ const char *aclk_sync_config[] = {
uv_mutex_t aclk_async_lock;
struct aclk_database_worker_config *aclk_thread_head = NULL;
+int retention_running = 0;
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+static void stop_retention_run()
+{
+ uv_mutex_lock(&aclk_async_lock);
+ retention_running = 0;
+ uv_mutex_unlock(&aclk_async_lock);
+}
+
+static int request_retention_run()
+{
+ int rc = 0;
+ uv_mutex_lock(&aclk_async_lock);
+ if (unlikely(retention_running))
+ rc = 1;
+ else
+ retention_running = 1;
+ uv_mutex_unlock(&aclk_async_lock);
+ return rc;
+}
+#endif
int claimed()
{
@@ -318,9 +340,6 @@ static void timer_cb(uv_timer_t* handle)
if (aclk_use_new_cloud_arch && aclk_connected) {
if (wc->rotation_after && wc->rotation_after < now) {
- cmd.opcode = ACLK_DATABASE_NODE_INFO;
- aclk_database_enq_cmd_noblock(wc, &cmd);
-
cmd.opcode = ACLK_DATABASE_UPD_RETENTION;
if (!aclk_database_enq_cmd_noblock(wc, &cmd))
wc->rotation_after += ACLK_DATABASE_ROTATION_INTERVAL;
@@ -353,6 +372,38 @@ static void timer_cb(uv_timer_t* handle)
#endif
}
+
+#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+void after_send_retention(uv_work_t *req, int status)
+{
+ struct aclk_database_worker_config *wc = req->data;
+ (void)status;
+ stop_retention_run();
+ wc->retention_running = 0;
+
+ struct aclk_database_cmd cmd;
+ memset(&cmd, 0, sizeof(cmd));
+ cmd.opcode = ACLK_DATABASE_DIM_DELETION;
+ if (aclk_database_enq_cmd_noblock(wc, &cmd))
+ info("Failed to queue a dimension deletion message");
+
+ cmd.opcode = ACLK_DATABASE_NODE_INFO;
+ if (aclk_database_enq_cmd_noblock(wc, &cmd))
+ info("Failed to queue a node update info message");
+}
+
+
+static void send_retention(uv_work_t *req)
+{
+ struct aclk_database_worker_config *wc = req->data;
+
+ if (unlikely(wc->is_shutting_down))
+ return;
+
+ aclk_update_retention(wc);
+}
+#endif
+
#define MAX_CMD_BATCH_SIZE (256)
void aclk_database_worker(void *arg)
@@ -429,6 +480,7 @@ void aclk_database_worker(void *arg)
memset(&cmd, 0, sizeof(cmd));
#ifdef ENABLE_NEW_CLOUD_PROTOCOL
+ uv_work_t retention_work;
sql_get_last_chart_sequence(wc);
wc->chart_payload_count = sql_get_pending_count(wc);
if (!wc->chart_payload_count)
@@ -440,6 +492,7 @@ void aclk_database_worker(void *arg)
wc->rotation_after = wc->startup_time + ACLK_DATABASE_ROTATION_DELAY;
debug(D_ACLK_SYNC,"Node %s reports pending message count = %u", wc->node_id, wc->chart_payload_count);
+
while (likely(!netdata_exit)) {
worker_is_idle();
uv_run(loop, UV_RUN_DEFAULT);
@@ -538,9 +591,21 @@ void aclk_database_worker(void *arg)
aclk_process_dimension_deletion(wc, cmd);
break;
case ACLK_DATABASE_UPD_RETENTION:
+ if (unlikely(wc->retention_running))
+ break;
+
+ if (unlikely(request_retention_run())) {
+ wc->rotation_after = now_realtime_sec() + ACLK_DATABASE_RETENTION_RETRY;
+ break;
+ }
+
debug(D_ACLK_SYNC,"Sending retention info for %s", wc->uuid_str);
- aclk_update_retention(wc, cmd);
- aclk_process_dimension_deletion(wc, cmd);
+ retention_work.data = wc;
+ wc->retention_running = 1;
+ if (unlikely(uv_queue_work(loop, &retention_work, send_retention, after_send_retention))) {
+ wc->retention_running = 0;
+ stop_retention_run();
+ }
break;
// NODE_INSTANCE DETECTION
diff --git a/database/sqlite/sqlite_aclk.h b/database/sqlite/sqlite_aclk.h
index 6854a05b86..37e3d4530d 100644
--- a/database/sqlite/sqlite_aclk.h
+++ b/database/sqlite/sqlite_aclk.h
@@ -17,6 +17,7 @@
#define ACLK_MAX_ALERT_UPDATES (5)
#define ACLK_DATABASE_CLEANUP_FIRST (60)
#define ACLK_DATABASE_ROTATION_DELAY (180)
+#define ACLK_DATABASE_RETENTION_RETRY (60)
#define ACLK_DATABASE_CLEANUP_INTERVAL (3600)
#define ACLK_DATABASE_ROTATION_INTERVAL (3600)
#define ACLK_DELETE_ACK_INTERNAL (600)
@@ -197,6 +198,7 @@ struct aclk_database_worker_config {
int node_info_send;
int chart_pending;
int chart_reset_count;
+ int retention_running;
volatile unsigned is_shutting_down;
volatile unsigned is_orphan;
struct aclk_database_worker_config *next;
diff --git a/database/sqlite/sqlite_aclk_alert.c b/database/sqlite/sqlite_aclk_alert.c
index 0003a69109..53c6c2a651 100644
--- a/database/sqlite/sqlite_aclk_alert.c
+++ b/database/sqlite/sqlite_aclk_alert.c
@@ -773,7 +773,7 @@ void sql_process_queue_removed_alerts_to_aclk(struct aclk_database_worker_config
db_execute(buffer_tostring(sql));
- log_access("ACLK STA [%s (%s)]: Queued removed alerts.", wc->node_id, wc->host ? wc->host->hostname : "N/A");
+ log_access("ACLK STA [%s (%s)]: QUEUED REMOVED ALERTS", wc->node_id, wc->host ? wc->host->hostname : "N/A");
buffer_free(sql);
diff --git a/database/sqlite/sqlite_aclk_chart.c b/database/sqlite/sqlite_aclk_chart.c
index c303c89ace..d506d53acc 100644
--- a/database/sqlite/sqlite_aclk_chart.c
+++ b/database/sqlite/sqlite_aclk_chart.c
@@ -566,7 +566,7 @@ void aclk_receive_chart_ack(struct aclk_database_worker_config *wc, struct aclk_
error_report("Failed to ACK sequence id, rc = %d", rc);
else
log_access(
- "ACLK STA [%s (%s)]: CHARTS ACKNOWLEDGED in the database upto %" PRIu64,
+ "ACLK STA [%s (%s)]: CHARTS ACKNOWLEDGED IN THE DATABASE UP TO %" PRIu64,
wc->node_id,
wc->host ? wc->host->hostname : "N/A",
cmd.param1);
@@ -847,9 +847,8 @@ failed:
"SELECT distinct h.host_id, c.update_every, c.type||'.'||c.id FROM chart c, host h " \
"WHERE c.host_id = h.host_id AND c.host_id = @host_id ORDER BY c.update_every ASC;"
-void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd)
+void aclk_update_retention(struct aclk_database_worker_config *wc)
{
- UNUSED(cmd);
int rc;
if (!aclk_use_new_cloud_arch || !aclk_connected)
@@ -916,7 +915,9 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
rotate_data.node_id = strdupz(wc->node_id);
time_t now = now_realtime_sec();
- while (sqlite3_step(res) == SQLITE_ROW) {
+ while (sqlite3_step(res) == SQLITE_ROW && dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP) {
+ if (unlikely(netdata_exit))
+ break;
if (!update_every || update_every != (uint32_t)sqlite3_column_int(res, 1)) {
if (update_every) {
debug(D_ACLK_SYNC, "Update %s for %u oldest time = %ld", wc->host_guid, update_every, start_time);
@@ -1003,8 +1004,12 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
if (!wc->host)
hostname = get_hostname_by_node_id(wc->node_id);
- log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING",
- wc->node_id, wc->host ? wc->host->hostname : hostname ? hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped);
+ if (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP && !netdata_exit)
+ log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING",
+ wc->node_id, wc->host ? wc->host->hostname : hostname ? hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped);
+ else
+ log_access("ACLK STA [%s (%s)]: UPDATES %d RETENTION MESSAGE NOT SENT. CHECKED %u DIMENSIONS. %u DELETED, %u STOPPED COLLECTING",
+ wc->node_id, wc->host ? wc->host->hostname : hostname ? hostname : "N/A", wc->chart_updates, total_checked, total_deleted, total_stopped);
freez(hostname);
#ifdef NETDATA_INTERNAL_CHECKS
@@ -1017,7 +1022,8 @@ void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_d
rotate_data.interval_durations[i].update_every,
rotate_data.interval_durations[i].retention);
#endif
- aclk_retention_updated(&rotate_data);
+ if (dimension_update_count < ACLK_MAX_DIMENSION_CLEANUP && !netdata_exit)
+ aclk_retention_updated(&rotate_data);
freez(rotate_data.node_id);
freez(rotate_data.interval_durations);
diff --git a/database/sqlite/sqlite_aclk_chart.h b/database/sqlite/sqlite_aclk_chart.h
index fd8c139741..84325bf6c6 100644
--- a/database/sqlite/sqlite_aclk_chart.h
+++ b/database/sqlite/sqlite_aclk_chart.h
@@ -67,4 +67,5 @@ uint32_t sql_get_pending_count(struct aclk_database_worker_config *wc);
void aclk_send_dimension_update(RRDDIM *rd);
struct aclk_chart_sync_stats *aclk_get_chart_sync_stats(RRDHOST *host);
void sql_check_chart_liveness(RRDSET *st);
+void aclk_update_retention(struct aclk_database_worker_config *wc);
#endif //NETDATA_SQLITE_ACLK_CHART_H
diff --git a/database/sqlite/sqlite_aclk_node.h b/database/sqlite/sqlite_aclk_node.h
index 9cb4115866..b8f8c6bbf4 100644
--- a/database/sqlite/sqlite_aclk_node.h
+++ b/database/sqlite/sqlite_aclk_node.h
@@ -4,5 +4,4 @@
#define NETDATA_SQLITE_ACLK_NODE_H
void sql_build_node_info(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
-void aclk_update_retention(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
#endif //NETDATA_SQLITE_ACLK_NODE_H