summaryrefslogtreecommitdiffstats
path: root/ml
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2023-04-26 11:37:29 +0300
committerGitHub <noreply@github.com>2023-04-26 11:37:29 +0300
commita58101434cabf862e661a2e4e72471c05b6bd09c (patch)
treea9491787e170cf73d5277f1f10e5b0f47e09749f /ml
parent18d0513cf6fa49317202612d4ae81f11eee54cf4 (diff)
Add support for acquire/release operations on RRDSETs (#14945)
* Add acquire/release support for RRDSET * Release/acquire RRDSET when training. * Fix function name in log message. * Use proper function name to get the hostname. * Add acquire/release for hosts and skip training orphan/obsolete hosts/charts. * Fix variable name
Diffstat (limited to 'ml')
-rw-r--r--ml/ml-private.h2
-rw-r--r--ml/ml.cc69
2 files changed, 44 insertions, 27 deletions
diff --git a/ml/ml-private.h b/ml/ml-private.h
index d014c71d26..ce47ac6042 100644
--- a/ml/ml-private.h
+++ b/ml/ml-private.h
@@ -124,7 +124,7 @@ enum ml_training_result {
typedef struct {
// Chart/dimension we want to train
- STRING *host_id;
+ char machine_guid[GUID_LEN + 1];
STRING *chart_id;
STRING *dimension_id;
diff --git a/ml/ml.cc b/ml/ml.cc
index 05dc6514f9..0a4bdf563c 100644
--- a/ml/ml.cc
+++ b/ml/ml.cc
@@ -268,7 +268,7 @@ ml_queue_pop(ml_queue_t *q)
netdata_mutex_lock(&q->mutex);
ml_training_request_t req = {
- NULL, // host_id
+ {'\0'}, // machine_guid
NULL, // chart id
NULL, // dimension id
0, // current time
@@ -783,14 +783,14 @@ ml_dimension_schedule_for_training(ml_dimension_t *dim, time_t curr_time)
}
if (schedule_for_training) {
- ml_training_request_t req = {
- string_dup(dim->rd->rrdset->rrdhost->hostname),
- string_dup(dim->rd->rrdset->id),
- string_dup(dim->rd->id),
- curr_time,
- rrddim_first_entry_s(dim->rd),
- rrddim_last_entry_s(dim->rd),
- };
+ ml_training_request_t req;
+
+ memcpy(req.machine_guid, dim->rd->rrdset->rrdhost->machine_guid, GUID_LEN + 1);
+ req.chart_id = string_dup(dim->rd->rrdset->id);
+ req.dimension_id = string_dup(dim->rd->id);
+ req.request_time = curr_time;
+ req.first_entry_on_request = rrddim_first_entry_s(dim->rd);
+ req.last_entry_on_request = rrddim_last_entry_s(dim->rd);
ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host;
ml_queue_push(host->training_queue, req);
@@ -1022,31 +1022,45 @@ ml_host_detect_once(ml_host_t *host)
}
typedef struct {
+ RRDHOST_ACQUIRED *acq_rh;
+ RRDSET_ACQUIRED *acq_rs;
RRDDIM_ACQUIRED *acq_rd;
ml_dimension_t *dim;
} ml_acquired_dimension_t;
static ml_acquired_dimension_t
-ml_acquired_dimension_get(STRING *host_id, STRING *chart_id, STRING *dimension_id)
+ml_acquired_dimension_get(char *machine_guid, STRING *chart_id, STRING *dimension_id)
{
+ RRDHOST_ACQUIRED *acq_rh = NULL;
+ RRDSET_ACQUIRED *acq_rs = NULL;
RRDDIM_ACQUIRED *acq_rd = NULL;
ml_dimension_t *dim = NULL;
- RRDHOST *rh = rrdhost_find_by_hostname(string2str(host_id));
- if (rh) {
- RRDSET *rs = rrdset_find(rh, string2str(chart_id));
- if (rs) {
- acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id));
- if (acq_rd) {
- RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd);
- if (rd)
- dim = (ml_dimension_t *) rd->ml_dimension;
+ rrd_rdlock();
+
+ acq_rh = rrdhost_find_and_acquire(machine_guid);
+ if (acq_rh) {
+ RRDHOST *rh = rrdhost_acquired_to_rrdhost(acq_rh);
+ if (rh && !rrdhost_flag_check(rh, RRDHOST_FLAG_ORPHAN | RRDHOST_FLAG_ARCHIVED)) {
+ acq_rs = rrdset_find_and_acquire(rh, string2str(chart_id));
+ if (acq_rs) {
+ RRDSET *rs = rrdset_acquired_to_rrdset(acq_rs);
+ if (rs && !rrdset_flag_check(rs, RRDSET_FLAG_ARCHIVED | RRDSET_FLAG_OBSOLETE)) {
+ acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id));
+ if (acq_rd) {
+ RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd);
+ if (rd)
+ dim = (ml_dimension_t *) rd->ml_dimension;
+ }
+ }
}
}
}
+ rrd_unlock();
+
ml_acquired_dimension_t acq_dim = {
- acq_rd, dim
+ acq_rh, acq_rs, acq_rd, dim
};
return acq_dim;
@@ -1055,10 +1069,14 @@ ml_acquired_dimension_get(STRING *host_id, STRING *chart_id, STRING *dimension_i
static void
ml_acquired_dimension_release(ml_acquired_dimension_t acq_dim)
{
- if (!acq_dim.acq_rd)
- return;
+ if (acq_dim.acq_rd)
+ rrddim_acquired_release(acq_dim.acq_rd);
+
+ if (acq_dim.acq_rs)
+ rrdset_acquired_release(acq_dim.acq_rs);
- rrddim_acquired_release(acq_dim.acq_rd);
+ if (acq_dim.acq_rh)
+ rrdhost_acquired_release(acq_dim.acq_rh);
}
static enum ml_training_result
@@ -1385,7 +1403,7 @@ static void *ml_train_main(void *arg) {
// we know this thread has been cancelled, when the queue starts
// returning "null" requests without blocking on queue's pop().
- if (training_req.host_id == NULL)
+ if (training_req.chart_id == NULL)
break;
size_t queue_size = ml_queue_size(training_thread->training_queue) + 1;
@@ -1400,13 +1418,12 @@ static void *ml_train_main(void *arg) {
{
worker_is_busy(WORKER_TRAIN_ACQUIRE_DIMENSION);
ml_acquired_dimension_t acq_dim = ml_acquired_dimension_get(
- training_req.host_id,
+ training_req.machine_guid,
training_req.chart_id,
training_req.dimension_id);
training_res = ml_acquired_dimension_train(training_thread, acq_dim, training_req);
- string_freez(training_req.host_id);
string_freez(training_req.chart_id);
string_freez(training_req.dimension_id);