diff options
-rw-r--r-- | libnetdata/functions_evloop/functions_evloop.c | 19 |
1 files changed, 11 insertions, 8 deletions
diff --git a/libnetdata/functions_evloop/functions_evloop.c b/libnetdata/functions_evloop/functions_evloop.c index f3fdf3a757..3fcd70aa1f 100644 --- a/libnetdata/functions_evloop/functions_evloop.c +++ b/libnetdata/functions_evloop/functions_evloop.c @@ -42,34 +42,37 @@ struct functions_evloop_globals { static void *rrd_functions_worker_globals_worker_main(void *arg) { struct functions_evloop_globals *wg = arg; + bool last_acquired = true; while (true) { pthread_mutex_lock(&wg->worker_mutex); - while (dictionary_entries(wg->worker_queue) == 0) { + if(dictionary_entries(wg->worker_queue) == 0 || !last_acquired) pthread_cond_wait(&wg->worker_cond_var, &wg->worker_mutex); - } const DICTIONARY_ITEM *acquired = NULL; struct functions_evloop_worker_job *j; dfe_start_write(wg->worker_queue, j) { - if(j->running || j->cancelled) - continue; + if(j->running || j->cancelled) + continue; - acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item); - j->running = true; - break; - } + acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item); + j->running = true; + break; + } dfe_done(j); pthread_mutex_unlock(&wg->worker_mutex); if(acquired) { + last_acquired = true; j = dictionary_acquired_item_value(acquired); j->cb(j->transaction, j->cmd, j->timeout, &j->cancelled); dictionary_del(wg->worker_queue, j->transaction); dictionary_acquired_item_release(wg->worker_queue, acquired); dictionary_garbage_collect(wg->worker_queue); } + else + last_acquired = false; } return NULL; } |