summaryrefslogtreecommitdiffstats
path: root/collectors/systemd-journal.plugin
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-09-18 19:21:12 +0300
committerGitHub <noreply@github.com>2023-09-18 19:21:12 +0300
commited3ba445145a8403e77b1d4d00bbe944460a4530 (patch)
treedcc400da44a8c5801b80128e410b2614f63a3d27 /collectors/systemd-journal.plugin
parent717ba3e9b201e43e0b4064bae5bdc762b31ebf93 (diff)
functions cancelling (#15977)
Diffstat (limited to 'collectors/systemd-journal.plugin')
-rw-r--r--collectors/systemd-journal.plugin/systemd-journal.c378
1 files changed, 192 insertions, 186 deletions
diff --git a/collectors/systemd-journal.plugin/systemd-journal.c b/collectors/systemd-journal.plugin/systemd-journal.c
index 5a247a0a72..6efbb57550 100644
--- a/collectors/systemd-journal.plugin/systemd-journal.c
+++ b/collectors/systemd-journal.plugin/systemd-journal.c
@@ -5,8 +5,6 @@
* GPL v3+
*/
-// TODO - 1) MARKDOC 2) HELP TEXT
-
#include "collectors/all.h"
#include "libnetdata/libnetdata.h"
#include "libnetdata/required_dummies.h"
@@ -23,6 +21,7 @@
#define SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION (3 * 3600)
#define SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY 200
#define SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED 50
+#define SYSTEMD_JOURNAL_WORKER_THREADS 2
#define JOURNAL_PARAMETER_HELP "help"
#define JOURNAL_PARAMETER_AFTER "after"
@@ -58,9 +57,6 @@
static netdata_mutex_t stdout_mutex = NETDATA_MUTEX_INITIALIZER;
static bool plugin_should_exit = false;
-DICTIONARY *uids = NULL;
-DICTIONARY *gids = NULL;
-
// ----------------------------------------------------------------------------
static inline sd_journal *netdata_open_systemd_journal(void) {
@@ -95,6 +91,7 @@ typedef enum {
ND_SD_JOURNAL_TIMED_OUT,
ND_SD_JOURNAL_OK,
ND_SD_JOURNAL_NOT_MODIFIED,
+ ND_SD_JOURNAL_CANCELLED,
} ND_SD_JOURNAL_STATUS;
static inline bool netdata_systemd_journal_seek_to(sd_journal *j, usec_t timestamp) {
@@ -130,21 +127,42 @@ static inline void netdata_systemd_journal_process_row(sd_journal *j, FACETS *fa
}
}
-ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_full(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t if_modified_since, usec_t stop_monotonic_ut, usec_t *last_modified) {
+static inline ND_SD_JOURNAL_STATUS check_stop(size_t row_counter, const bool *cancelled, usec_t stop_monotonic_ut) {
+ if((row_counter % 1000) == 0) {
+ if(cancelled && __atomic_load_n(cancelled, __ATOMIC_RELAXED)) {
+ internal_error(true, "Function has been cancelled");
+ return ND_SD_JOURNAL_CANCELLED;
+ }
+
+ if(now_monotonic_usec() > stop_monotonic_ut) {
+ internal_error(true, "Function timed out");
+ return ND_SD_JOURNAL_TIMED_OUT;
+ }
+ }
+
+ return ND_SD_JOURNAL_OK;
+}
+
+ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_full(
+ sd_journal *j, BUFFER *wb __maybe_unused, FACETS *facets,
+ usec_t after_ut, usec_t before_ut,
+ usec_t if_modified_since, usec_t stop_monotonic_ut, usec_t *last_modified,
+ bool *cancelled) {
if(!netdata_systemd_journal_seek_to(j, before_ut))
return ND_SD_JOURNAL_FAILED_TO_SEEK;
size_t errors_no_timestamp = 0;
usec_t first_msg_ut = 0;
- bool timed_out = false;
size_t row_counter = 0;
// the entries are not guaranteed to be sorted, so we process up to 100 entries beyond
// the end of the query to find possibly useful logs for our time-frame
size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED;
+ ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_OK;
+
facets_rows_begin(facets);
- while (sd_journal_previous(j) > 0) {
+ while (status == ND_SD_JOURNAL_OK && sd_journal_previous(j) > 0) {
row_counter++;
usec_t msg_ut;
@@ -174,10 +192,7 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_full(sd_journal *j, BUFFER *w
netdata_systemd_journal_process_row(j, facets);
facets_row_finished(facets, msg_ut);
- if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) {
- timed_out = true;
- break;
- }
+ status = check_stop(row_counter, cancelled, stop_monotonic_ut);
}
if(errors_no_timestamp)
@@ -185,18 +200,19 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_full(sd_journal *j, BUFFER *w
*last_modified = first_msg_ut;
- if(timed_out)
- return ND_SD_JOURNAL_TIMED_OUT;
-
- return ND_SD_JOURNAL_OK;
+ return status;
}
-ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_forward(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t anchor, size_t entries, usec_t stop_monotonic_ut) {
+ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_forward(
+ sd_journal *j, BUFFER *wb __maybe_unused, FACETS *facets,
+ usec_t after_ut, usec_t before_ut,
+ usec_t anchor, size_t entries, usec_t stop_monotonic_ut,
+ bool *cancelled) {
+
if(!netdata_systemd_journal_seek_to(j, anchor))
return ND_SD_JOURNAL_FAILED_TO_SEEK;
size_t errors_no_timestamp = 0;
- bool timed_out = false;
size_t row_counter = 0;
size_t rows_added = 0;
@@ -204,8 +220,10 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_forward(sd_journal *j, B
// the end of the query to find possibly useful logs for our time-frame
size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED;
+ ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_OK;
+
facets_rows_begin(facets);
- while (sd_journal_next(j) > 0) {
+ while (status == ND_SD_JOURNAL_OK && sd_journal_next(j) > 0) {
row_counter++;
usec_t msg_ut;
@@ -231,27 +249,25 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_forward(sd_journal *j, B
facets_row_finished(facets, msg_ut);
rows_added++;
- if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) {
- timed_out = true;
- break;
- }
+ status = check_stop(row_counter, cancelled, stop_monotonic_ut);
}
if(errors_no_timestamp)
netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp);
- if(timed_out)
- return ND_SD_JOURNAL_TIMED_OUT;
-
- return ND_SD_JOURNAL_OK;
+ return status;
}
-ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_backward(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t anchor, size_t entries, usec_t stop_monotonic_ut) {
+ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_backward(
+ sd_journal *j, BUFFER *wb __maybe_unused, FACETS *facets,
+ usec_t after_ut, usec_t before_ut,
+ usec_t anchor, size_t entries, usec_t stop_monotonic_ut,
+ bool *cancelled) {
+
if(!netdata_systemd_journal_seek_to(j, anchor))
return ND_SD_JOURNAL_FAILED_TO_SEEK;
size_t errors_no_timestamp = 0;
- bool timed_out = false;
size_t row_counter = 0;
size_t rows_added = 0;
@@ -259,8 +275,10 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_backward(sd_journal *j,
// the end of the query to find possibly useful logs for our time-frame
size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED;
+ ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_OK;
+
facets_rows_begin(facets);
- while (sd_journal_previous(j) > 0) {
+ while (status == ND_SD_JOURNAL_OK && sd_journal_previous(j) > 0) {
row_counter++;
usec_t msg_ut;
@@ -286,19 +304,13 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_backward(sd_journal *j,
facets_row_finished(facets, msg_ut);
rows_added++;
- if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) {
- timed_out = true;
- break;
- }
+ status = check_stop(row_counter, cancelled, stop_monotonic_ut);
}
if(errors_no_timestamp)
netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp);
- if(timed_out)
- return ND_SD_JOURNAL_TIMED_OUT;
-
- return ND_SD_JOURNAL_OK;
+ return status;
}
bool netdata_systemd_journal_check_if_modified_since(sd_journal *j, usec_t seek_to, usec_t last_modified) {
@@ -327,7 +339,8 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets,
usec_t after_ut, usec_t before_ut,
usec_t anchor, FACETS_ANCHOR_DIRECTION direction, size_t entries,
usec_t if_modified_since, bool data_only,
- usec_t stop_monotonic_ut) {
+ usec_t stop_monotonic_ut,
+ bool *cancelled) {
sd_journal *j = netdata_open_systemd_journal();
if(!j)
return HTTP_RESP_INTERNAL_SERVER_ERROR;
@@ -341,23 +354,36 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets,
// we can do a data-only query
if(direction == FACETS_ANCHOR_DIRECTION_FORWARD)
- status = netdata_systemd_journal_query_data_forward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut);
+ status = netdata_systemd_journal_query_data_forward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut, cancelled);
else
- status = netdata_systemd_journal_query_data_backward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut);
+ status = netdata_systemd_journal_query_data_backward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut, cancelled);
}
else {
// we have to do a full query
status = netdata_systemd_journal_query_full(j, wb, facets,
after_ut, before_ut, if_modified_since,
- stop_monotonic_ut, &last_modified);
+ stop_monotonic_ut, &last_modified, cancelled);
}
sd_journal_close(j);
- if(status == ND_SD_JOURNAL_NOT_MODIFIED)
- return HTTP_RESP_NOT_MODIFIED;
+ if(status != ND_SD_JOURNAL_OK && status != ND_SD_JOURNAL_TIMED_OUT) {
+ buffer_flush(wb);
+
+ switch (status) {
+ case ND_SD_JOURNAL_CANCELLED:
+ return HTTP_RESP_CLIENT_CLOSED_REQUEST;
+
+ case ND_SD_JOURNAL_NOT_MODIFIED:
+ return HTTP_RESP_NOT_MODIFIED;
- buffer_json_member_add_uint64(wb, "status", status == ND_SD_JOURNAL_FAILED_TO_SEEK ? HTTP_RESP_INTERNAL_SERVER_ERROR : HTTP_RESP_OK);
+ default:
+ case ND_SD_JOURNAL_FAILED_TO_SEEK:
+ return HTTP_RESP_INTERNAL_SERVER_ERROR;
+ }
+ }
+
+ buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
buffer_json_member_add_boolean(wb, "partial", status != ND_SD_JOURNAL_OK);
buffer_json_member_add_string(wb, "type", "table");
@@ -372,7 +398,7 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets,
buffer_json_member_add_time_t(wb, "expires", now_realtime_sec() + (data_only ? 3600 : 0));
buffer_json_finalize(wb);
- return status == ND_SD_JOURNAL_FAILED_TO_SEEK ? HTTP_RESP_INTERNAL_SERVER_ERROR : HTTP_RESP_OK;
+ return HTTP_RESP_OK;
}
static void netdata_systemd_journal_function_help(const char *transaction) {
@@ -484,8 +510,8 @@ static FACET_ROW_SEVERITY syslog_priority_to_facet_severity(int priority) {
}
static char *uid_to_username(uid_t uid, char *buffer, size_t buffer_size) {
+ static __thread char tmp[1024 + 1];
struct passwd pw, *result;
- char tmp[1024 + 1];
if (getpwuid_r(uid, &pw, tmp, 1024, &result) != 0 || result == NULL)
return NULL;
@@ -495,8 +521,8 @@ static char *uid_to_username(uid_t uid, char *buffer, size_t buffer_size) {
}
static char *gid_to_groupname(gid_t gid, char* buffer, size_t buffer_size) {
+ static __thread char tmp[1024 + 1];
struct group grp, *result;
- char tmp[1024 + 1];
if (getgrgid_r(gid, &grp, tmp, 1024, &result) != 0 || result == NULL)
return NULL;
@@ -531,46 +557,106 @@ static void netdata_systemd_journal_transform_priority(FACETS *facets __maybe_un
}
}
-static void netdata_systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) {
- DICTIONARY *cache = data;
- const char *v = buffer_tostring(wb);
- if(*v && isdigit(*v)) {
- const char *sv = dictionary_get(cache, v);
- if(!sv) {
- char buf[1024 + 1];
- int uid = str2i(buffer_tostring(wb));
- const char *name = uid_to_username(uid, buf, 1024);
- if (!name)
- name = v;
-
- sv = dictionary_set(cache, v, (void *)name, strlen(name) + 1);
- }
+// ----------------------------------------------------------------------------
+// UID and GID transformation
- buffer_flush(wb);
- buffer_strcat(wb, sv);
+#define UID_GID_HASHTABLE_SIZE 1000
+
+struct word_t2str_hashtable_entry {
+ struct word_t2str_hashtable_entry *next;
+ Word_t hash;
+ size_t len;
+ char str[];
+};
+
+struct word_t2str_hashtable {
+ SPINLOCK spinlock;
+ size_t size;
+ struct word_t2str_hashtable_entry *hashtable[UID_GID_HASHTABLE_SIZE];
+};
+
+struct word_t2str_hashtable uid_hashtable = {
+ .size = UID_GID_HASHTABLE_SIZE,
+};
+
+struct word_t2str_hashtable gid_hashtable = {
+ .size = UID_GID_HASHTABLE_SIZE,
+};
+
+struct word_t2str_hashtable_entry **word_t2str_hashtable_slot(struct word_t2str_hashtable *ht, Word_t hash) {
+ size_t slot = hash % ht->size;
+ struct word_t2str_hashtable_entry **e = &ht->hashtable[slot];
+
+ while(*e && (*e)->hash != hash)
+ e = &((*e)->next);
+
+ return e;
+}
+
+const char *uid_to_username_cached(uid_t uid, size_t *length) {
+ spinlock_lock(&uid_hashtable.spinlock);
+
+ struct word_t2str_hashtable_entry **e = word_t2str_hashtable_slot(&uid_hashtable, uid);
+ if(!(*e)) {
+ static __thread char buf[1024 + 1];
+ const char *name = uid_to_username(uid, buf, 1024);
+ size_t size = strlen(name) + 1;
+
+ *e = callocz(1, sizeof(struct word_t2str_hashtable_entry) + size);
+ (*e)->len = size - 1;
+ (*e)->hash = uid;
+ memcpy((*e)->str, name, size);
}
+
+ spinlock_unlock(&uid_hashtable.spinlock);
+
+ *length = (*e)->len;
+ return (*e)->str;
}
-static void netdata_systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) {
- DICTIONARY *cache = data;
+const char *gid_to_groupname_cached(gid_t gid, size_t *length) {
+ spinlock_lock(&gid_hashtable.spinlock);
+
+ struct word_t2str_hashtable_entry **e = word_t2str_hashtable_slot(&gid_hashtable, gid);
+ if(!(*e)) {
+ static __thread char buf[1024 + 1];
+ const char *name = gid_to_groupname(gid, buf, 1024);
+ size_t size = strlen(name) + 1;
+
+ *e = callocz(1, sizeof(struct word_t2str_hashtable_entry) + size);
+ (*e)->len = size - 1;
+ (*e)->hash = gid;
+ memcpy((*e)->str, name, size);
+ }
+
+ spinlock_unlock(&gid_hashtable.spinlock);
+
+ *length = (*e)->len;
+ return (*e)->str;
+}
+
+static void netdata_systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) {
const char *v = buffer_tostring(wb);
if(*v && isdigit(*v)) {
- const char *sv = dictionary_get(cache, v);
- if(!sv) {
- char buf[1024 + 1];
- int gid = str2i(buffer_tostring(wb));
- const char *name = gid_to_groupname(gid, buf, 1024);
- if (!name)
- name = v;
-
- sv = dictionary_set(cache, v, (void *)name, strlen(name) + 1);
- }
+ uid_t uid = str2i(buffer_tostring(wb));
+ size_t len;
+ const char *name = uid_to_username_cached(uid, &len);
+ buffer_contents_replace(wb, name, len);
+ }
+}
- buffer_flush(wb);
- buffer_strcat(wb, sv);
+static void netdata_systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) {
+ const char *v = buffer_tostring(wb);
+ if(*v && isdigit(*v)) {
+ gid_t gid = str2i(buffer_tostring(wb));
+ size_t len;
+ const char *name = gid_to_groupname_cached(gid, &len);
+ buffer_contents_replace(wb, name, len);
}
}
+// ----------------------------------------------------------------------------
+
static void netdata_systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) {
FACET_ROW_KEY_VALUE *pid_rkv = dictionary_get(row->dict, "_PID");
const char *pid = pid_rkv ? buffer_tostring(pid_rkv->wb) : FACET_VALUE_UNSET;
@@ -593,47 +679,13 @@ static void netdata_systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused
buffer_json_add_array_item_string(json_array, buffer_tostring(rkv->wb));
}
-static void netdata_systemd_journal_rich_message(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) {
+static void netdata_systemd_journal_rich_message(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row __maybe_unused, void *data __maybe_unused) {
buffer_json_add_array_item_object(json_array);
buffer_json_member_add_string(json_array, "value", buffer_tostring(rkv->wb));
buffer_json_object_close(json_array);
}
-static void function_systemd_journal(const char *transaction, char *function, char *line_buffer __maybe_unused, int line_max __maybe_unused, int timeout __maybe_unused) {
- static struct {
- BUFFER *tmp;
- BUFFER *wb;
- BUFFER *function;
- int response;
- time_t expires;
- } cache = {
- .tmp = NULL,
- .wb = NULL,
- .function = NULL,
- .response = 0,
- .expires = 0,
- };
-
- if(unlikely(!cache.wb)) {
- cache.tmp = buffer_create(0, NULL);
- cache.wb = buffer_create(0, NULL);
- cache.function = buffer_create(0, NULL);
- }
-
- if(buffer_strlen(cache.function) && buffer_strlen(cache.wb) && strcmp(buffer_tostring(cache.function), function) == 0) {
- // repeated the same request
- netdata_mutex_lock(&stdout_mutex);
- if(cache.response == HTTP_RESP_OK)
- pluginsd_function_result_to_stdout(transaction, cache.response, "application/json", cache.expires, cache.wb);
- else
- pluginsd_function_json_error_to_stdout(transaction, cache.response, "failed");
- netdata_mutex_unlock(&stdout_mutex);
- return;
- }
-
- buffer_flush(cache.tmp);
- buffer_strcat(cache.tmp, function);
-
+static void function_systemd_journal(const char *transaction, char *function, int timeout, bool *cancelled) {
BUFFER *wb = buffer_create(0, NULL);
buffer_flush(wb);
buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_NEWLINE_ON_ARRAY_ITEMS);
@@ -682,10 +734,10 @@ static void function_systemd_journal(const char *transaction, char *function, ch
facets_register_key_name(facets, "USER_UNIT", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS);
facets_register_key_name_transformation(facets, "_UID", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS,
- netdata_systemd_journal_transform_uid, uids);
+ netdata_systemd_journal_transform_uid, NULL);
facets_register_key_name_transformation(facets, "_GID", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS,
- netdata_systemd_journal_transform_gid, gids);
+ netdata_systemd_journal_transform_gid, NULL);
bool info = false;
bool data_only = false;
@@ -858,12 +910,19 @@ static void function_systemd_journal(const char *transaction, char *function, ch
facets_set_items(facets, last);
facets_set_anchor(facets, anchor, direction);
facets_set_query(facets, query);
- facets_set_histogram(facets, chart ? chart : "PRIORITY", after_s * USEC_PER_SEC, before_s * USEC_PER_SEC);
+
+ if(chart && *chart)
+ facets_set_histogram_by_id(facets, chart,
+ after_s * USEC_PER_SEC, before_s * USEC_PER_SEC);
+ else
+ facets_set_histogram_by_name(facets, "PRIORITY",
+ after_s * USEC_PER_SEC, before_s * USEC_PER_SEC);
response = netdata_systemd_journal_query(wb, facets, after_s * USEC_PER_SEC, before_s * USEC_PER_SEC,
anchor, direction, last,
if_modified_since, data_only,
- now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC);
+ now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC,
+ cancelled);
if(response != HTTP_RESP_OK) {
netdata_mutex_lock(&stdout_mutex);
@@ -872,14 +931,6 @@ static void function_systemd_journal(const char *transaction, char *function, ch
goto cleanup;
}
- // keep this response in the cache
- cache.response = response;
- cache.expires = expires;
- buffer_flush(cache.wb);
- buffer_fast_strcat(cache.wb, buffer_tostring(wb), buffer_strlen(wb));
- buffer_flush(cache.function);
- buffer_fast_strcat(cache.function, buffer_tostring(cache.tmp), buffer_strlen(cache.tmp));
-
output:
netdata_mutex_lock(&stdout_mutex);
pluginsd_function_result_to_stdout(transaction, response, "application/json", expires, wb);
@@ -890,54 +941,7 @@ cleanup:
buffer_free(wb);
}
-static void *reader_main(void *arg __maybe_unused) {
- char buffer[PLUGINSD_LINE_MAX + 1];
-
- char *s = NULL;
- while(!plugin_should_exit && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) {
-
- char *words[PLUGINSD_MAX_WORDS] = { NULL };
- size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, PLUGINSD_MAX_WORDS);
-
- const char *keyword = get_word(words, num_words, 0);
-
- if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
- char *transaction = get_word(words, num_words, 1);
- char *timeout_s = get_word(words, num_words, 2);
- char *function = get_word(words, num_words, 3);
-
- if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
- netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
- keyword,
- transaction?transaction:"(unset)",
- timeout_s?timeout_s:"(unset)",
- function?function:"(unset)");
- }
- else {
- int timeout = str2i(timeout_s);
- if(timeout <= 0) timeout = SYSTEMD_JOURNAL_DEFAULT_TIMEOUT;
-
- if(strncmp(function, SYSTEMD_JOURNAL_FUNCTION_NAME, strlen(SYSTEMD_JOURNAL_FUNCTION_NAME)) == 0)
- function_systemd_journal(transaction, function, buffer, PLUGINSD_LINE_MAX + 1, timeout);
- else {
- netdata_mutex_lock(&stdout_mutex);
- pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND,
- "No function with this name found in systemd-journal.plugin.");
- netdata_mutex_unlock(&stdout_mutex);
- }
- }
- }
- else
- netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)");
- }
-
- if(!s || feof(stdin) || ferror(stdin)) {
- plugin_should_exit = true;
- netdata_log_error("Received error on stdin.");
- }
-
- exit(1);
-}
+// ----------------------------------------------------------------------------
int main(int argc __maybe_unused, char **argv __maybe_unused) {
stderror = stderr;
@@ -952,9 +956,6 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) {
error_log_errors_per_period = 100;
error_log_throttle_period = 3600;
- uids = dictionary_create(0);
- gids = dictionary_create(0);
-
netdata_configured_host_prefix = getenv("NETDATA_HOST_PREFIX");
if(verify_netdata_host_prefix() == -1) exit(1);
@@ -962,22 +963,28 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) {
// debug
if(argc == 2 && strcmp(argv[1], "debug") == 0) {
- char buf[] = "systemd-journal after:-864000 before:0 last:500";
+ bool cancelled = false;
+ char buf[] = "systemd-journal after:-2592000 before:0 last:500";
// char buf[] = "systemd-journal after:1694511062 before:1694514662 anchor:1694514122024403";
- function_systemd_journal("123", buf, "", 0, 30);
+ function_systemd_journal("123", buf, 30, &cancelled);
exit(1);
}
// ------------------------------------------------------------------------
+ // the event loop for functions
+
+ struct functions_evloop_globals *wg =
+ functions_evloop_init(SYSTEMD_JOURNAL_WORKER_THREADS, "SDJ", &stdout_mutex, &plugin_should_exit);
+
+ functions_evloop_add_function(wg, SYSTEMD_JOURNAL_FUNCTION_NAME, function_systemd_journal,
+ SYSTEMD_JOURNAL_DEFAULT_TIMEOUT);
- netdata_thread_t reader_thread;
- netdata_thread_create(&reader_thread, "SDJ_READER", NETDATA_THREAD_OPTION_DONT_LOG, reader_main, NULL);
// ------------------------------------------------------------------------
time_t started_t = now_monotonic_sec();
- size_t iteration;
+ size_t iteration = 0;
usec_t step = 1000 * USEC_PER_MS;
bool tty = isatty(fileno(stderr)) == 1;
@@ -987,7 +994,9 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) {
heartbeat_t hb;
heartbeat_init(&hb);
- for(iteration = 0; 1 ; iteration++) {
+ while(!plugin_should_exit) {
+ iteration++;
+
netdata_mutex_unlock(&stdout_mutex);
heartbeat_next(&hb, step);
netdata_mutex_lock(&stdout_mutex);
@@ -1002,8 +1011,5 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) {
break;
}
- dictionary_destroy(uids);
- dictionary_destroy(gids);
-
exit(0);
}