summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libnetdata/functions_evloop/functions_evloop.c19
1 files changed, 11 insertions, 8 deletions
diff --git a/libnetdata/functions_evloop/functions_evloop.c b/libnetdata/functions_evloop/functions_evloop.c
index f3fdf3a757..3fcd70aa1f 100644
--- a/libnetdata/functions_evloop/functions_evloop.c
+++ b/libnetdata/functions_evloop/functions_evloop.c
@@ -42,34 +42,37 @@ struct functions_evloop_globals {
static void *rrd_functions_worker_globals_worker_main(void *arg) {
struct functions_evloop_globals *wg = arg;
+ bool last_acquired = true;
while (true) {
pthread_mutex_lock(&wg->worker_mutex);
- while (dictionary_entries(wg->worker_queue) == 0) {
+ if(dictionary_entries(wg->worker_queue) == 0 || !last_acquired)
pthread_cond_wait(&wg->worker_cond_var, &wg->worker_mutex);
- }
const DICTIONARY_ITEM *acquired = NULL;
struct functions_evloop_worker_job *j;
dfe_start_write(wg->worker_queue, j) {
- if(j->running || j->cancelled)
- continue;
+ if(j->running || j->cancelled)
+ continue;
- acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item);
- j->running = true;
- break;
- }
+ acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item);
+ j->running = true;
+ break;
+ }
dfe_done(j);
pthread_mutex_unlock(&wg->worker_mutex);
if(acquired) {
+ last_acquired = true;
j = dictionary_acquired_item_value(acquired);
j->cb(j->transaction, j->cmd, j->timeout, &j->cancelled);
dictionary_del(wg->worker_queue, j->transaction);
dictionary_acquired_item_release(wg->worker_queue, acquired);
dictionary_garbage_collect(wg->worker_queue);
}
+ else
+ last_acquired = false;
}
return NULL;
}