summaryrefslogtreecommitdiffstats
path: root/ml/ml.cc
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2023-05-02 19:09:05 +0300
committerGitHub <noreply@github.com>2023-05-02 19:09:05 +0300
commit0a1ef218f00742758585eda5321e6630680eb1bb (patch)
tree616e998eee0eac23fffaa9d2e7d51fda5c469794 /ml/ml.cc
parent6bb3b56010ed3f03b0d0655069e7cf5f29e4dfb9 (diff)
Load/Store ML models (#14981)
* Pass DB connection in db_execute() * Add support for loading/saving models. * Fix ML stats when no training takes place. * Make model flushing batch size configurable. * Delete unused function * Update ML config. * Restore threshold for logs/period. * Rm whitespace. * Add missing dummy function. * Update function call arguments * Guard transactions with a lock when flushing ML models. * Mark dimensions with loaded models as trained.
Diffstat (limited to 'ml/ml.cc')
-rw-r--r--ml/ml.cc138
1 files changed, 78 insertions, 60 deletions
diff --git a/ml/ml.cc b/ml/ml.cc
index 0a4bdf563c..93a2d3b71a 100644
--- a/ml/ml.cc
+++ b/ml/ml.cc
@@ -16,9 +16,10 @@
#define WORKER_TRAIN_UPDATE_MODELS 4
#define WORKER_TRAIN_RELEASE_DIMENSION 5
#define WORKER_TRAIN_UPDATE_HOST 6
-#define WORKER_TRAIN_LOAD_MODELS 7
+#define WORKER_TRAIN_FLUSH_MODELS 7
static sqlite3 *db = NULL;
+static netdata_mutex_t db_mutex = NETDATA_MUTEX_INITIALIZER;
/*
* Functions to convert enums to strings
@@ -406,7 +407,7 @@ ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimens
const char *db_models_create_table =
"CREATE TABLE IF NOT EXISTS models("
- " dim_id BLOB, dim_str TEXT, after INT, before INT,"
+ " dim_id BLOB, after INT, before INT,"
" min_dist REAL, max_dist REAL,"
" c00 REAL, c01 REAL, c02 REAL, c03 REAL, c04 REAL, c05 REAL,"
" c10 REAL, c11 REAL, c12 REAL, c13 REAL, c14 REAL, c15 REAL,"
@@ -415,26 +416,26 @@ const char *db_models_create_table =
const char *db_models_add_model =
"INSERT OR REPLACE INTO models("
- " dim_id, dim_str, after, before,"
+ " dim_id, after, before,"
" min_dist, max_dist,"
" c00, c01, c02, c03, c04, c05,"
" c10, c11, c12, c13, c14, c15)"
"VALUES("
- " @dim_id, @dim_str, @after, @before,"
+ " @dim_id, @after, @before,"
" @min_dist, @max_dist,"
" @c00, @c01, @c02, @c03, @c04, @c05,"
" @c10, @c11, @c12, @c13, @c14, @c15);";
const char *db_models_load =
"SELECT * FROM models "
- "WHERE dim_id == @dim_id AND after >= @after ORDER BY before ASC;";
+ "WHERE dim_id = @dim_id AND after >= @after ORDER BY before ASC;";
const char *db_models_delete =
"DELETE FROM models "
"WHERE dim_id = @dim_id AND before < @before;";
static int
-ml_dimension_add_model(ml_dimension_t *dim)
+ml_dimension_add_model(const uuid_t *metric_uuid, const ml_kmeans_t *km)
{
static __thread sqlite3_stmt *res = NULL;
int param = 0;
@@ -453,36 +454,30 @@ ml_dimension_add_model(ml_dimension_t *dim)
}
}
- rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC);
- if (unlikely(rc != SQLITE_OK))
- goto bind_fail;
-
- char id[1024];
- snprintfz(id, 1024 - 1, "%s.%s", rrdset_id(dim->rd->rrdset), rrddim_id(dim->rd));
- rc = sqlite3_bind_text(res, ++param, id, -1, SQLITE_STATIC);
+ rc = sqlite3_bind_blob(res, ++param, metric_uuid, sizeof(*metric_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_int(res, ++param, (int) dim->kmeans.after);
+ rc = sqlite3_bind_int(res, ++param, (int) km->after);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_int(res, ++param, (int) dim->kmeans.before);
+ rc = sqlite3_bind_int(res, ++param, (int) km->before);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_double(res, ++param, dim->kmeans.min_dist);
+ rc = sqlite3_bind_double(res, ++param, km->min_dist);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_double(res, ++param, dim->kmeans.max_dist);
+ rc = sqlite3_bind_double(res, ++param, km->max_dist);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- if (dim->kmeans.cluster_centers.size() != 2)
- fatal("Expected 2 cluster centers, got %zu", dim->kmeans.cluster_centers.size());
+ if (km->cluster_centers.size() != 2)
+ fatal("Expected 2 cluster centers, got %zu", km->cluster_centers.size());
- for (const DSample &ds : dim->kmeans.cluster_centers) {
+ for (const DSample &ds : km->cluster_centers) {
if (ds.size() != 6)
fatal("Expected dsample with 6 dimensions, got %ld", ds.size());
@@ -513,7 +508,7 @@ bind_fail:
}
static int
-ml_dimension_delete_models(ml_dimension_t *dim)
+ml_dimension_delete_models(const uuid_t *metric_uuid, time_t before)
{
static __thread sqlite3_stmt *res = NULL;
int rc = 0;
@@ -532,11 +527,11 @@ ml_dimension_delete_models(ml_dimension_t *dim)
}
}
- rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC);
+ rc = sqlite3_bind_blob(res, ++param, metric_uuid, sizeof(*metric_uuid), SQLITE_STATIC);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_int(res, ++param, (int) dim->kmeans.before - (Cfg.num_models_to_use * Cfg.train_every));
+ rc = sqlite3_bind_int(res, ++param, (int) before);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
@@ -558,8 +553,18 @@ bind_fail:
return 1;
}
-static int
-ml_dimension_load_models(ml_dimension_t *dim) {
+int ml_dimension_load_models(RRDDIM *rd) {
+ ml_dimension_t *dim = (ml_dimension_t *) rd->ml_dimension;
+ if (!dim)
+ return 0;
+
+ netdata_mutex_lock(&dim->mutex);
+ bool is_empty = dim->km_contexts.empty();
+ netdata_mutex_unlock(&dim->mutex);
+
+ if (!is_empty)
+ return 0;
+
std::vector<ml_kmeans_t> V;
static __thread sqlite3_stmt *res = NULL;
@@ -587,6 +592,8 @@ ml_dimension_load_models(ml_dimension_t *dim) {
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
+ netdata_mutex_lock(&dim->mutex);
+
dim->km_contexts.reserve(Cfg.num_models_to_use);
while ((rc = sqlite3_step_monitored(res)) == SQLITE_ROW) {
ml_kmeans_t km;
@@ -618,6 +625,12 @@ ml_dimension_load_models(ml_dimension_t *dim) {
dim->km_contexts.push_back(km);
}
+ if (!dim->km_contexts.empty()) {
+ dim->ts = TRAINING_STATUS_TRAINED;
+ }
+
+ netdata_mutex_unlock(&dim->mutex);
+
if (unlikely(rc != SQLITE_DONE))
error_report("Failed to load models, rc = %d", rc);
@@ -635,24 +648,6 @@ bind_fail:
return 1;
}
-static int
-ml_dimension_update_models(ml_dimension_t *dim)
-{
- int rc;
-
- if (dim->km_contexts.empty()) {
- rc = ml_dimension_load_models(dim);
- if (rc)
- return rc;
- }
-
- rc = ml_dimension_add_model(dim);
- if (rc)
- return rc;
-
- return ml_dimension_delete_models(dim);
-}
-
static enum ml_training_result
ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request)
{
@@ -704,22 +699,10 @@ ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *
}
// update models
+ worker_is_busy(WORKER_TRAIN_UPDATE_MODELS);
{
netdata_mutex_lock(&dim->mutex);
- // temporarily disable sqlite operations because they interfere with
- // training scheduling on busy parents
- #if 0
- worker_is_busy(WORKER_TRAIN_LOAD_MODELS);
-
- int rc = ml_dimension_update_models(dim);
- if (rc) {
- error("Failed to update models for %s [%u, %u]", rrddim_id(dim->rd), dim->kmeans.after, dim->kmeans.before);
- }
- #endif
-
- worker_is_busy(WORKER_TRAIN_UPDATE_MODELS);
-
if (dim->km_contexts.size() < Cfg.num_models_to_use) {
dim->km_contexts.push_back(std::move(dim->kmeans));
} else {
@@ -747,6 +730,12 @@ ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *
dim->tr = training_response;
dim->last_training_time = rrddim_last_entry_s(dim->rd);
+ // Add the newly generated model to the list of pending models to flush
+ ml_model_info_t model_info;
+ uuid_copy(model_info.metric_uuid, dim->rd->metric_uuid);
+ model_info.kmeans = dim->km_contexts.back();
+ training_thread->pending_model_info.push_back(model_info);
+
netdata_mutex_unlock(&dim->mutex);
}
@@ -1133,10 +1122,9 @@ ml_detect_main(void *arg)
training_stats.consumed_ut /= training_stats.num_popped_items;
training_stats.remaining_ut /= training_stats.num_popped_items;
} else {
- training_stats.queue_size = 0;
- training_stats.allotted_ut = 0;
+ training_stats.queue_size = ml_queue_size(training_thread->training_queue);
training_stats.consumed_ut = 0;
- training_stats.remaining_ut = 0;
+ training_stats.remaining_ut = training_stats.allotted_ut;
training_stats.training_result_ok = 0;
training_stats.training_result_invalid_query_time_range = 0;
@@ -1352,6 +1340,8 @@ void ml_dimension_new(RRDDIM *rd)
dim->km_contexts.reserve(Cfg.num_models_to_use);
rd->ml_dimension = (rrd_ml_dimension_t *) dim;
+
+ metaqueue_ml_load_models(rd);
}
void ml_dimension_delete(RRDDIM *rd)
@@ -1380,6 +1370,25 @@ bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool
return is_anomalous;
}
+static int ml_flush_pending_models(ml_training_thread_t *training_thread) {
+ (void) db_execute(db, "BEGIN TRANSACTION;");
+
+ for (const auto &pending_model: training_thread->pending_model_info) {
+ int rc = ml_dimension_add_model(&pending_model.metric_uuid, &pending_model.kmeans);
+ if (rc)
+ return rc;
+
+ rc = ml_dimension_delete_models(&pending_model.metric_uuid, pending_model.kmeans.before - (Cfg.num_models_to_use * Cfg.train_every));
+ if (rc)
+ return rc;
+ }
+
+ (void) db_execute(db, "COMMIT TRANSACTION;");
+
+ training_thread->pending_model_info.clear();
+ return 0;
+}
+
static void *ml_train_main(void *arg) {
ml_training_thread_t *training_thread = (ml_training_thread_t *) arg;
@@ -1392,9 +1401,9 @@ static void *ml_train_main(void *arg) {
worker_register_job_name(WORKER_TRAIN_QUERY, "query");
worker_register_job_name(WORKER_TRAIN_KMEANS, "kmeans");
worker_register_job_name(WORKER_TRAIN_UPDATE_MODELS, "update models");
- worker_register_job_name(WORKER_TRAIN_LOAD_MODELS, "load models");
worker_register_job_name(WORKER_TRAIN_RELEASE_DIMENSION, "release");
worker_register_job_name(WORKER_TRAIN_UPDATE_HOST, "update host");
+ worker_register_job_name(WORKER_TRAIN_FLUSH_MODELS, "flush models");
while (!Cfg.training_stop) {
worker_is_busy(WORKER_TRAIN_QUEUE_POP);
@@ -1470,6 +1479,14 @@ static void *ml_train_main(void *arg) {
netdata_mutex_unlock(&training_thread->nd_mutex);
}
+ if (training_thread->pending_model_info.size() >= Cfg.flush_models_batch_size) {
+ worker_is_busy(WORKER_TRAIN_FLUSH_MODELS);
+ netdata_mutex_lock(&db_mutex);
+ ml_flush_pending_models(training_thread);
+ netdata_mutex_unlock(&db_mutex);
+ continue;
+ }
+
worker_is_idle();
std::this_thread::sleep_for(std::chrono::microseconds{remaining_ut});
}
@@ -1505,6 +1522,7 @@ void ml_init()
training_thread->id = idx;
training_thread->training_queue = ml_queue_init();
+ training_thread->pending_model_info.reserve(Cfg.flush_models_batch_size);
netdata_mutex_init(&training_thread->nd_mutex);
}