diff options
author | vkalintiris <vasilis@netdata.cloud> | 2023-04-26 11:37:29 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-26 11:37:29 +0300 |
commit | a58101434cabf862e661a2e4e72471c05b6bd09c (patch) | |
tree | a9491787e170cf73d5277f1f10e5b0f47e09749f /ml | |
parent | 18d0513cf6fa49317202612d4ae81f11eee54cf4 (diff) |
Add support for acquire/release operations on RRDSETs (#14945)
* Add acquire/release support for RRDSET
* Release/acquire RRDSET when training.
* Fix function name in log message.
* Use proper function name to get the hostname.
* Add acquire/release for hosts and skip training orphan/obsolete hosts/charts.
* Fix variable name
Diffstat (limited to 'ml')
-rw-r--r-- | ml/ml-private.h | 2 | ||||
-rw-r--r-- | ml/ml.cc | 69 |
2 files changed, 44 insertions, 27 deletions
diff --git a/ml/ml-private.h b/ml/ml-private.h index d014c71d26..ce47ac6042 100644 --- a/ml/ml-private.h +++ b/ml/ml-private.h @@ -124,7 +124,7 @@ enum ml_training_result { typedef struct { // Chart/dimension we want to train - STRING *host_id; + char machine_guid[GUID_LEN + 1]; STRING *chart_id; STRING *dimension_id; @@ -268,7 +268,7 @@ ml_queue_pop(ml_queue_t *q) netdata_mutex_lock(&q->mutex); ml_training_request_t req = { - NULL, // host_id + {'\0'}, // machine_guid NULL, // chart id NULL, // dimension id 0, // current time @@ -783,14 +783,14 @@ ml_dimension_schedule_for_training(ml_dimension_t *dim, time_t curr_time) } if (schedule_for_training) { - ml_training_request_t req = { - string_dup(dim->rd->rrdset->rrdhost->hostname), - string_dup(dim->rd->rrdset->id), - string_dup(dim->rd->id), - curr_time, - rrddim_first_entry_s(dim->rd), - rrddim_last_entry_s(dim->rd), - }; + ml_training_request_t req; + + memcpy(req.machine_guid, dim->rd->rrdset->rrdhost->machine_guid, GUID_LEN + 1); + req.chart_id = string_dup(dim->rd->rrdset->id); + req.dimension_id = string_dup(dim->rd->id); + req.request_time = curr_time; + req.first_entry_on_request = rrddim_first_entry_s(dim->rd); + req.last_entry_on_request = rrddim_last_entry_s(dim->rd); ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host; ml_queue_push(host->training_queue, req); @@ -1022,31 +1022,45 @@ ml_host_detect_once(ml_host_t *host) } typedef struct { + RRDHOST_ACQUIRED *acq_rh; + RRDSET_ACQUIRED *acq_rs; RRDDIM_ACQUIRED *acq_rd; ml_dimension_t *dim; } ml_acquired_dimension_t; static ml_acquired_dimension_t -ml_acquired_dimension_get(STRING *host_id, STRING *chart_id, STRING *dimension_id) +ml_acquired_dimension_get(char *machine_guid, STRING *chart_id, STRING *dimension_id) { + RRDHOST_ACQUIRED *acq_rh = NULL; + RRDSET_ACQUIRED *acq_rs = NULL; RRDDIM_ACQUIRED *acq_rd = NULL; ml_dimension_t *dim = NULL; - RRDHOST *rh = rrdhost_find_by_hostname(string2str(host_id)); - if (rh) { - RRDSET *rs = rrdset_find(rh, string2str(chart_id)); - if (rs) { - acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id)); - if (acq_rd) { - RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd); - if (rd) - dim = (ml_dimension_t *) rd->ml_dimension; + rrd_rdlock(); + + acq_rh = rrdhost_find_and_acquire(machine_guid); + if (acq_rh) { + RRDHOST *rh = rrdhost_acquired_to_rrdhost(acq_rh); + if (rh && !rrdhost_flag_check(rh, RRDHOST_FLAG_ORPHAN | RRDHOST_FLAG_ARCHIVED)) { + acq_rs = rrdset_find_and_acquire(rh, string2str(chart_id)); + if (acq_rs) { + RRDSET *rs = rrdset_acquired_to_rrdset(acq_rs); + if (rs && !rrdset_flag_check(rs, RRDSET_FLAG_ARCHIVED | RRDSET_FLAG_OBSOLETE)) { + acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id)); + if (acq_rd) { + RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd); + if (rd) + dim = (ml_dimension_t *) rd->ml_dimension; + } + } } } } + rrd_unlock(); + ml_acquired_dimension_t acq_dim = { - acq_rd, dim + acq_rh, acq_rs, acq_rd, dim }; return acq_dim; @@ -1055,10 +1069,14 @@ ml_acquired_dimension_get(STRING *host_id, STRING *chart_id, STRING *dimension_i static void ml_acquired_dimension_release(ml_acquired_dimension_t acq_dim) { - if (!acq_dim.acq_rd) - return; + if (acq_dim.acq_rd) + rrddim_acquired_release(acq_dim.acq_rd); + + if (acq_dim.acq_rs) + rrdset_acquired_release(acq_dim.acq_rs); - rrddim_acquired_release(acq_dim.acq_rd); + if (acq_dim.acq_rh) + rrdhost_acquired_release(acq_dim.acq_rh); } static enum ml_training_result @@ -1385,7 +1403,7 @@ static void *ml_train_main(void *arg) { // we know this thread has been cancelled, when the queue starts // returning "null" requests without blocking on queue's pop(). - if (training_req.host_id == NULL) + if (training_req.chart_id == NULL) break; size_t queue_size = ml_queue_size(training_thread->training_queue) + 1; @@ -1400,13 +1418,12 @@ static void *ml_train_main(void *arg) { { worker_is_busy(WORKER_TRAIN_ACQUIRE_DIMENSION); ml_acquired_dimension_t acq_dim = ml_acquired_dimension_get( - training_req.host_id, + training_req.machine_guid, training_req.chart_id, training_req.dimension_id); training_res = ml_acquired_dimension_train(training_thread, acq_dim, training_req); - string_freez(training_req.host_id); string_freez(training_req.chart_id); string_freez(training_req.dimension_id); |