summaryrefslogtreecommitdiffstats
path: root/ml/Queue.h
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2023-01-04 14:51:25 +0200
committerGitHub <noreply@github.com>2023-01-04 14:51:25 +0200
commit78359cd375d0b2c285741e6f934a681d0a0c3c15 (patch)
tree2d5264325510b663d9e87ca62a38fad187e3a713 /ml/Queue.h
parentdf379e45fbaddf825f1f7972a75ae3f3daf80097 (diff)
Refactor ML code and add support for multiple KMeans models (#14198)
* Add profile.plugin Creates the specified number of charts/dimensions, and supports backfilling with pseudo-historical data. * Bump * Remove wrongly merged line. * Use the number of models specified from the config section. * Add option to consult all ML models. * Remove profiling option consuming all models. * Add underscore after chart name prefix. * prediction -> dimensions chart * reorder funcs * Split charts across types with correct priority * Ignore training request when chart is under replication. * Track global number of models consulted. * Cleanup config. * initial readme updates * fix readme * readme * Fix function definition when ML is disabled. * Add dummy ml_chart_update_{begin,end} * Remove profile_plugin * Define chart priorities under collectors/all.h * s/curr_t/current_time/ * Use libnetdata's lock/thread wrappers. * Fix autotools & cmake builds. * Delete ML dimensions & charts. * Let users of buffer preprocessing to handle memory. * Add separate API calls to start/stop ML threads. Co-authored-by: Andrew Maguire <andrewm4894@gmail.com>
Diffstat (limited to 'ml/Queue.h')
-rw-r--r--ml/Queue.h59
1 files changed, 59 insertions, 0 deletions
diff --git a/ml/Queue.h b/ml/Queue.h
new file mode 100644
index 0000000000..b78979565d
--- /dev/null
+++ b/ml/Queue.h
@@ -0,0 +1,59 @@
+#ifndef QUEUE_H
+#define QUEUE_H
+
+#include "ml-private.h"
+#include "Mutex.h"
+#include <queue>
+#include <mutex>
+#include <condition_variable>
+
+template<typename T>
+class Queue {
+public:
+ Queue(void) : Q(), M() {
+ pthread_cond_init(&CV, nullptr);
+ Exit = false;
+ }
+
+ ~Queue() {
+ pthread_cond_destroy(&CV);
+ }
+
+ void push(T t) {
+ std::lock_guard<Mutex> L(M);
+
+ Q.push(t);
+ pthread_cond_signal(&CV);
+ }
+
+ std::pair<T, size_t> pop(void) {
+ std::lock_guard<Mutex> L(M);
+
+ while (Q.empty()) {
+ pthread_cond_wait(&CV, M.inner());
+
+ if (Exit)
+ pthread_exit(nullptr);
+ }
+
+ T V = Q.front();
+ size_t Size = Q.size();
+ Q.pop();
+
+ return { V, Size };
+ }
+
+ void signal() {
+ std::lock_guard<Mutex> L(M);
+ Exit = true;
+ pthread_cond_signal(&CV);
+ }
+
+private:
+ std::queue<T> Q;
+ Mutex M;
+ pthread_cond_t CV;
+ std::atomic<bool> Exit;
+};
+
+#endif /* QUEUE_H */