summaryrefslogtreecommitdiffstats
path: root/collectors
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-08-03 15:42:11 +0300
committerGitHub <noreply@github.com>2023-08-03 15:42:11 +0300
commitce75313de06ee1e8aa5a59d57625ca25d467f0f8 (patch)
treed9ea59af0ace84b34f627b2f72d67a39d5bcff4d /collectors
parenta833f6674f09011b2e0134f045c4c8a39be8de75 (diff)
systemd-journal plugin (#15363)
Diffstat (limited to 'collectors')
-rw-r--r--collectors/Makefile.am1
-rw-r--r--collectors/apps.plugin/apps_plugin.c37
-rw-r--r--collectors/ebpf.plugin/ebpf_functions.c2
-rw-r--r--collectors/plugins.d/plugins_d.h11
-rw-r--r--collectors/systemd-journal.plugin/Makefile.am0
-rw-r--r--collectors/systemd-journal.plugin/README.md0
-rw-r--r--collectors/systemd-journal.plugin/systemd-journal.c578
7 files changed, 604 insertions, 25 deletions
diff --git a/collectors/Makefile.am b/collectors/Makefile.am
index 2aec3dd3e8..d477e5b80e 100644
--- a/collectors/Makefile.am
+++ b/collectors/Makefile.am
@@ -25,6 +25,7 @@ SUBDIRS = \
statsd.plugin \
ebpf.plugin \
tc.plugin \
+ systemd-journal.plugin \
$(NULL)
usercustompluginsconfigdir=$(configdir)/custom-plugins.d
diff --git a/collectors/apps.plugin/apps_plugin.c b/collectors/apps.plugin/apps_plugin.c
index 631980dffd..f258a30ad0 100644
--- a/collectors/apps.plugin/apps_plugin.c
+++ b/collectors/apps.plugin/apps_plugin.c
@@ -13,7 +13,7 @@
#define APPS_PLUGIN_PROCESSES_FUNCTION_DESCRIPTION "Detailed information on the currently running processes."
#define APPS_PLUGIN_FUNCTIONS() do { \
- fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " \"processes\" 10 \"%s\"\n", APPS_PLUGIN_PROCESSES_FUNCTION_DESCRIPTION); \
+ fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " \"processes\" %d \"%s\"\n", PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT, APPS_PLUGIN_PROCESSES_FUNCTION_DESCRIPTION); \
} while(0)
@@ -4572,7 +4572,7 @@ static int check_capabilities() {
}
#endif
-netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
+static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
#define PROCESS_FILTER_CATEGORY "category:"
#define PROCESS_FILTER_USER "user:"
@@ -4625,15 +4625,6 @@ static void get_MemTotal(void) {
#endif
}
-static void apps_plugin_function_error(const char *transaction, int code, const char *msg) {
- char buffer[PLUGINSD_LINE_MAX + 1];
- json_escape_string(buffer, msg, PLUGINSD_LINE_MAX);
-
- pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec());
- fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer);
- pluginsd_function_result_end_to_stdout();
-}
-
static void apps_plugin_function_processes_help(const char *transaction) {
pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600);
fprintf(stdout, "%s",
@@ -4681,7 +4672,7 @@ static void apps_plugin_function_processes_help(const char *transaction) {
buffer_json_add_array_item_double(wb, _tmp); \
} while(0)
-static void apps_plugin_function_processes(const char *transaction, char *function __maybe_unused, char *line_buffer __maybe_unused, int line_max __maybe_unused, int timeout __maybe_unused) {
+static void function_processes(const char *transaction, char *function __maybe_unused, char *line_buffer __maybe_unused, int line_max __maybe_unused, int timeout __maybe_unused) {
struct pid_stat *p;
char *words[PLUGINSD_MAX_WORDS] = { NULL };
@@ -4702,21 +4693,21 @@ static void apps_plugin_function_processes(const char *transaction, char *functi
if(!category && strncmp(keyword, PROCESS_FILTER_CATEGORY, strlen(PROCESS_FILTER_CATEGORY)) == 0) {
category = find_target_by_name(apps_groups_root_target, &keyword[strlen(PROCESS_FILTER_CATEGORY)]);
if(!category) {
- apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, "No category with that name found.");
+ pluginsd_function_json_error(transaction, HTTP_RESP_BAD_REQUEST, "No category with that name found.");
return;
}
}
else if(!user && strncmp(keyword, PROCESS_FILTER_USER, strlen(PROCESS_FILTER_USER)) == 0) {
user = find_target_by_name(users_root_target, &keyword[strlen(PROCESS_FILTER_USER)]);
if(!user) {
- apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, "No user with that name found.");
+ pluginsd_function_json_error(transaction, HTTP_RESP_BAD_REQUEST, "No user with that name found.");
return;
}
}
else if(strncmp(keyword, PROCESS_FILTER_GROUP, strlen(PROCESS_FILTER_GROUP)) == 0) {
group = find_target_by_name(groups_root_target, &keyword[strlen(PROCESS_FILTER_GROUP)]);
if(!group) {
- apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, "No group with that name found.");
+ pluginsd_function_json_error(transaction, HTTP_RESP_BAD_REQUEST, "No group with that name found.");
return;
}
}
@@ -4742,7 +4733,7 @@ static void apps_plugin_function_processes(const char *transaction, char *functi
else {
char msg[PLUGINSD_LINE_MAX];
snprintfz(msg, PLUGINSD_LINE_MAX, "Invalid parameter '%s'", keyword);
- apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, msg);
+ pluginsd_function_json_error(transaction, HTTP_RESP_BAD_REQUEST, msg);
return;
}
}
@@ -4755,7 +4746,7 @@ static void apps_plugin_function_processes(const char *transaction, char *functi
unsigned int io_divisor = 1024 * RATES_DETAIL;
BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL);
- buffer_json_initialize(wb, "\"", "\"", 0, true, false);
+ buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_NEWLINE_ON_ARRAYS);
buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
buffer_json_member_add_string(wb, "type", "table");
buffer_json_member_add_time_t(wb, "update_every", update_every);
@@ -5232,7 +5223,7 @@ static void apps_plugin_function_processes(const char *transaction, char *functi
RRDF_FIELD_FILTER_RANGE,
RRDF_FIELD_OPTS_VISIBLE, NULL);
buffer_rrdf_table_add_field(wb, field_id++, "Uptime", "Uptime in seconds", RRDF_FIELD_TYPE_DURATION,
- RRDF_FIELD_VISUAL_BAR, RRDF_FIELD_TRANSFORM_DURATION, 2,
+ RRDF_FIELD_VISUAL_BAR, RRDF_FIELD_TRANSFORM_DURATION_S, 2,
"seconds", Uptime_max, RRDF_FIELD_SORT_DESCENDING, NULL, RRDF_FIELD_SUMMARY_MAX,
RRDF_FIELD_FILTER_RANGE,
RRDF_FIELD_OPTS_VISIBLE, NULL);
@@ -5532,9 +5523,9 @@ static void apps_plugin_function_processes(const char *transaction, char *functi
pluginsd_function_result_end_to_stdout();
}
-bool apps_plugin_exit = false;
+static bool apps_plugin_exit = false;
-void *reader_main(void *arg __maybe_unused) {
+static void *reader_main(void *arg __maybe_unused) {
char buffer[PLUGINSD_LINE_MAX + 1];
char *s = NULL;
@@ -5566,9 +5557,9 @@ void *reader_main(void *arg __maybe_unused) {
netdata_mutex_lock(&mutex);
if(strncmp(function, "processes", strlen("processes")) == 0)
- apps_plugin_function_processes(transaction, function, buffer, PLUGINSD_LINE_MAX + 1, timeout);
+ function_processes(transaction, function, buffer, PLUGINSD_LINE_MAX + 1, timeout);
else
- apps_plugin_function_error(transaction, HTTP_RESP_NOT_FOUND, "No function with this name found in apps.plugin.");
+ pluginsd_function_json_error(transaction, HTTP_RESP_NOT_FOUND, "No function with this name found in apps.plugin.");
fflush(stdout);
netdata_mutex_unlock(&mutex);
@@ -5696,6 +5687,8 @@ int main(int argc, char **argv) {
netdata_thread_create(&reader_thread, "APPS_READER", NETDATA_THREAD_OPTION_DONT_LOG, reader_main, NULL);
netdata_mutex_lock(&mutex);
+ APPS_PLUGIN_FUNCTIONS();
+
usec_t step = update_every * USEC_PER_SEC;
global_iterations_counter = 1;
heartbeat_t hb;
diff --git a/collectors/ebpf.plugin/ebpf_functions.c b/collectors/ebpf.plugin/ebpf_functions.c
index cc26044c46..8f0244cde4 100644
--- a/collectors/ebpf.plugin/ebpf_functions.c
+++ b/collectors/ebpf.plugin/ebpf_functions.c
@@ -206,7 +206,7 @@ static void ebpf_function_thread_manipulation(const char *transaction,
time_t expires = now_realtime_sec() + em->update_every;
BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL);
- buffer_json_initialize(wb, "\"", "\"", 0, true, false);
+ buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_NEWLINE_ON_ARRAYS);
buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
buffer_json_member_add_string(wb, "type", "table");
buffer_json_member_add_time_t(wb, "update_every", em->update_every);
diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h
index 1e183c2dc8..4988b50719 100644
--- a/collectors/plugins.d/plugins_d.h
+++ b/collectors/plugins.d/plugins_d.h
@@ -99,8 +99,6 @@ void pluginsd_process_thread_cleanup(void *ptr);
size_t pluginsd_initialize_plugin_directories();
-
-
#define pluginsd_function_result_begin_to_buffer(wb, transaction, code, content_type, expires) \
buffer_sprintf(wb \
, PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \
@@ -125,4 +123,13 @@ size_t pluginsd_initialize_plugin_directories();
#define pluginsd_function_result_end_to_stdout() \
fprintf(stdout, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n")
+static inline void pluginsd_function_json_error(const char *transaction, int code, const char *msg) {
+ char buffer[PLUGINSD_LINE_MAX + 1];
+ json_escape_string(buffer, msg, PLUGINSD_LINE_MAX);
+
+ pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec());
+ fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer);
+ pluginsd_function_result_end_to_stdout();
+}
+
#endif /* NETDATA_PLUGINS_D_H */
diff --git a/collectors/systemd-journal.plugin/Makefile.am b/collectors/systemd-journal.plugin/Makefile.am
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/collectors/systemd-journal.plugin/Makefile.am
diff --git a/collectors/systemd-journal.plugin/README.md b/collectors/systemd-journal.plugin/README.md
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/collectors/systemd-journal.plugin/README.md
diff --git a/collectors/systemd-journal.plugin/systemd-journal.c b/collectors/systemd-journal.plugin/systemd-journal.c
new file mode 100644
index 0000000000..95954355df
--- /dev/null
+++ b/collectors/systemd-journal.plugin/systemd-journal.c
@@ -0,0 +1,578 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+/*
+ * netdata systemd-journal.plugin
+ * Copyright (C) 2023 Netdata Inc.
+ * GPL v3+
+ */
+
+// TODO - 1) MARKDOC
+
+#include "collectors/all.h"
+#include "libnetdata/libnetdata.h"
+#include "libnetdata/required_dummies.h"
+
+#include <systemd/sd-journal.h>
+#include <syslog.h>
+
+#define FACET_MAX_VALUE_LENGTH 8192
+
+#define SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION "View, search and analyze systemd journal entries."
+#define SYSTEMD_JOURNAL_FUNCTION_NAME "systemd-journal"
+#define SYSTEMD_JOURNAL_DEFAULT_TIMEOUT 30
+#define SYSTEMD_JOURNAL_MAX_PARAMS 100
+#define SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION (3 * 3600)
+#define SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY 200
+
+#define JOURNAL_PARAMETER_HELP "help"
+#define JOURNAL_PARAMETER_AFTER "after"
+#define JOURNAL_PARAMETER_BEFORE "before"
+#define JOURNAL_PARAMETER_ANCHOR "anchor"
+#define JOURNAL_PARAMETER_LAST "last"
+#define JOURNAL_PARAMETER_QUERY "query"
+
+#define SYSTEMD_ALWAYS_VISIBLE_KEYS NULL
+#define SYSTEMD_KEYS_EXCLUDED_FROM_FACETS NULL
+#define SYSTEMD_KEYS_INCLUDED_IN_FACETS \
+ "_TRANSPORT" \
+ "|SYSLOG_IDENTIFIER" \
+ "|SYSLOG_FACILITY" \
+ "|PRIORITY" \
+ "|_HOSTNAME" \
+ "|_RUNTIME_SCOPE" \
+ "|_PID" \
+ "|_UID" \
+ "|_GID" \
+ "|_SYSTEMD_UNIT" \
+ "|_SYSTEMD_SLICE" \
+ "|_SYSTEMD_USER_SLICE" \
+ "|_COMM" \
+ "|_EXE" \
+ "|_SYSTEMD_CGROUP" \
+ "|_SYSTEMD_USER_UNIT" \
+ "|USER_UNIT" \
+ "|UNIT" \
+ ""
+
+static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
+static bool plugin_should_exit = false;
+
+DICTIONARY *uids = NULL;
+DICTIONARY *gids = NULL;
+
+
+// ----------------------------------------------------------------------------
+
+int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t stop_monotonic_ut) {
+ sd_journal *j;
+ int r;
+
+ // Open the system journal for reading
+ r = sd_journal_open(&j, SD_JOURNAL_ALL_NAMESPACES);
+ if (r < 0)
+ return HTTP_RESP_INTERNAL_SERVER_ERROR;
+
+ facets_rows_begin(facets);
+
+ bool timed_out = false;
+ size_t row_counter = 0;
+ sd_journal_seek_realtime_usec(j, before_ut);
+ SD_JOURNAL_FOREACH_BACKWARDS(j) {
+ row_counter++;
+
+ uint64_t msg_ut;
+ sd_journal_get_realtime_usec(j, &msg_ut);
+ if (msg_ut < after_ut)
+ break;
+
+ const void *data;
+ size_t length;
+ SD_JOURNAL_FOREACH_DATA(j, data, length) {
+ const char *key = data;
+ const char *equal = strchr(key, '=');
+ if(unlikely(!equal))
+ continue;
+
+ const char *value = ++equal;
+ size_t key_length = value - key; // including '\0'
+
+ char key_copy[key_length];
+ memcpy(key_copy, key, key_length - 1);
+ key_copy[key_length - 1] = '\0';
+
+ size_t value_length = length - key_length; // without '\0'
+ facets_add_key_value_length(facets, key_copy, value, value_length <= FACET_MAX_VALUE_LENGTH ? value_length : FACET_MAX_VALUE_LENGTH);
+ }
+
+ facets_row_finished(facets, msg_ut);
+
+ if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) {
+ timed_out = true;
+ break;
+ }
+ }
+
+ sd_journal_close(j);
+
+ buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
+ buffer_json_member_add_boolean(wb, "partial", timed_out);
+ buffer_json_member_add_string(wb, "type", "table");
+ buffer_json_member_add_time_t(wb, "update_every", 1);
+ buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
+
+ facets_report(facets, wb);
+
+ buffer_json_member_add_time_t(wb, "expires", now_realtime_sec());
+ buffer_json_finalize(wb);
+
+ return HTTP_RESP_OK;
+}
+
+static void systemd_journal_function_help(const char *transaction) {
+ pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600);
+ fprintf(stdout,
+ "%s / %s\n"
+ "\n"
+ "%s\n"
+ "\n"
+ "The following filters are supported:\n"
+ "\n"
+ " help\n"
+ " Shows this help message.\n"
+ "\n"
+ " before:TIMESTAMP\n"
+ " Absolute or relative (to now) timestamp in seconds, to start the query.\n"
+ " The query is always executed from the most recent to the oldest log entry.\n"
+ " If not given the default is: now.\n"
+ "\n"
+ " after:TIMESTAMP\n"
+ " Absolute or relative (to `before`) timestamp in seconds, to end the query.\n"
+ " If not given, the default is %d.\n"
+ "\n"
+ " last:ITEMS\n"
+ " The number of items to return.\n"
+ " The default is %d.\n"
+ "\n"
+ " anchor:NUMBER\n"
+ " The `timestamp` of the item last received, to return log entries after that.\n"
+ " If not given, the query will return the top `ITEMS` from the most recent.\n"
+ "\n"
+ " facet_id:value_id1,value_id2,value_id3,...\n"
+ " Apply filters to the query, based on the facet IDs returned.\n"
+ " Each `facet_id` can be given once, but multiple `facet_ids` can be given.\n"
+ "\n"
+ "Filters can be combined. Each filter can be given only one time.\n"
+ , program_name
+ , SYSTEMD_JOURNAL_FUNCTION_NAME
+ , SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION
+ , -SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION
+ , SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY
+ );
+ pluginsd_function_result_end_to_stdout();
+}
+
+static const char *syslog_facility_to_name(int facility) {
+ switch (facility) {
+ case LOG_FAC(LOG_KERN): return "kern";
+ case LOG_FAC(LOG_USER): return "user";
+ case LOG_FAC(LOG_MAIL): return "mail";
+ case LOG_FAC(LOG_DAEMON): return "daemon";
+ case LOG_FAC(LOG_AUTH): return "auth";
+ case LOG_FAC(LOG_SYSLOG): return "syslog";
+ case LOG_FAC(LOG_LPR): return "lpr";
+ case LOG_FAC(LOG_NEWS): return "news";
+ case LOG_FAC(LOG_UUCP): return "uucp";
+ case LOG_FAC(LOG_CRON): return "cron";
+ case LOG_FAC(LOG_AUTHPRIV): return "authpriv";
+ case LOG_FAC(LOG_FTP): return "ftp";
+ case LOG_FAC(LOG_LOCAL0): return "local0";
+ case LOG_FAC(LOG_LOCAL1): return "local1";
+ case LOG_FAC(LOG_LOCAL2): return "local2";
+ case LOG_FAC(LOG_LOCAL3): return "local3";
+ case LOG_FAC(LOG_LOCAL4): return "local4";
+ case LOG_FAC(LOG_LOCAL5): return "local5";
+ case LOG_FAC(LOG_LOCAL6): return "local6";
+ case LOG_FAC(LOG_LOCAL7): return "local7";
+ default: return NULL;
+ }
+}
+
+static const char *syslog_priority_to_name(int priority) {
+ switch (priority) {
+ case LOG_ALERT: return "alert";
+ case LOG_CRIT: return "critical";
+ case LOG_DEBUG: return "debug";
+ case LOG_EMERG: return "panic";
+ case LOG_ERR: return "error";
+ case LOG_INFO: return "info";
+ case LOG_NOTICE: return "notice";
+ case LOG_WARNING: return "warning";
+ default: return NULL;
+ }
+}
+
+static char *uid_to_username(uid_t uid, char *buffer, size_t buffer_size) {
+ struct passwd pw, *result;
+ char tmp[1024 + 1];
+
+ if (getpwuid_r(uid, &pw, tmp, 1024, &result) != 0 || result == NULL)
+ return NULL;
+
+ strncpy(buffer, pw.pw_name, buffer_size - 1);
+ buffer[buffer_size - 1] = '\0'; // Null-terminate just in case
+ return buffer;
+}
+
+static char *gid_to_groupname(gid_t gid, char* buffer, size_t buffer_size) {
+ struct group grp, *result;
+ char tmp[1024 + 1];
+
+ if (getgrgid_r(gid, &grp, tmp, 1024, &result) != 0 || result == NULL)
+ return NULL;
+
+ strncpy(buffer, grp.gr_name, buffer_size - 1);
+ buffer[buffer_size - 1] = '\0'; // Null-terminate just in case
+ return buffer;
+}
+
+static void systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) {
+ const char *v = buffer_tostring(wb);
+ if(*v && isdigit(*v)) {
+ int facility = str2i(buffer_tostring(wb));
+ const char *name = syslog_facility_to_name(facility);
+ if (name) {
+ buffer_flush(wb);
+ buffer_json_add_array_item_string(wb, name);
+ }
+ }
+}
+
+static void systemd_journal_transform_priority(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) {
+ const char *v = buffer_tostring(wb);
+ if(*v && isdigit(*v)) {
+ int priority = str2i(buffer_tostring(wb));
+ const char *name = syslog_priority_to_name(priority);
+ if (name) {
+ buffer_flush(wb);
+ buffer_json_add_array_item_string(wb, name);
+ }
+ }
+}
+
+static void systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) {
+ DICTIONARY *cache = data;
+ const char *v = buffer_tostring(wb);
+ if(*v && isdigit(*v)) {
+ const char *sv = dictionary_get(cache, v);
+ if(!sv) {
+ char buf[1024 + 1];
+ int uid = str2i(buffer_tostring(wb));
+ const char *name = uid_to_username(uid, buf, 1024);
+ if (!name)
+ name = v;
+
+ sv = dictionary_set(cache, v, (void *)name, strlen(name) + 1);
+ }
+
+ buffer_flush(wb);
+ buffer_strcat(wb, sv);
+ }
+}
+
+static void systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) {
+ DICTIONARY *cache = data;
+ const char *v = buffer_tostring(wb);
+ if(*v && isdigit(*v)) {
+ const char *sv = dictionary_get(cache, v);
+ if(!sv) {
+ char buf[1024 + 1];
+ int gid = str2i(buffer_tostring(wb));
+ const char *name = gid_to_groupname(gid, buf, 1024);
+ if (!name)
+ name = v;
+
+ sv = dictionary_set(cache, v, (void *)name, strlen(name) + 1);
+ }
+
+ buffer_flush(wb);
+ buffer_strcat(wb, sv);
+ }
+}
+
+static void systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *wb, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) {
+ FACET_ROW_KEY_VALUE *syslog_identifier_rkv = dictionary_get(row->dict, "SYSLOG_IDENTIFIER");
+ FACET_ROW_KEY_VALUE *pid_rkv = dictionary_get(row->dict, "_PID");
+
+ const char *identifier = syslog_identifier_rkv ? buffer_tostring(syslog_identifier_rkv->wb) : "UNKNOWN";
+ const char *pid = pid_rkv ? buffer_tostring(pid_rkv->wb) : "UNKNOWN";
+
+ buffer_flush(rkv->wb);
+ buffer_sprintf(rkv->wb, "%s[%s]", identifier, pid);
+
+ buffer_json_add_array_item_string(wb, buffer_tostring(rkv->wb));
+}
+
+static void function_systemd_journal(const char *transaction, char *function, char *line_buffer __maybe_unused, int line_max __maybe_unused, int timeout __maybe_unused) {
+ char *words[SYSTEMD_JOURNAL_MAX_PARAMS] = { NULL };
+ size_t num_words = quoted_strings_splitter_pluginsd(function, words, SYSTEMD_JOURNAL_MAX_PARAMS);
+
+ BUFFER *wb = buffer_create(0, NULL);
+ buffer_flush(wb);
+ buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_NEWLINE_ON_ARRAYS);
+
+ FACETS *facets = facets_create(50, 0, FACETS_OPTION_ALL_KEYS_FTS,
+ SYSTEMD_ALWAYS_VISIBLE_KEYS,
+ SYSTEMD_KEYS_INCLUDED_IN_FACETS,
+ SYSTEMD_KEYS_EXCLUDED_FROM_FACETS);
+
+ facets_accepted_param(facets, JOURNAL_PARAMETER_AFTER);
+ facets_accepted_param(facets, JOURNAL_PARAMETER_BEFORE);
+ facets_accepted_param(facets, JOURNAL_PARAMETER_ANCHOR);
+ facets_accepted_param(facets, JOURNAL_PARAMETER_LAST);
+ facets_accepted_param(facets, JOURNAL_PARAMETER_QUERY);
+
+ // register the fields in the order you want them on the dashboard
+
+ facets_register_dynamic_key(facets, "ND_JOURNAL_PROCESS", FACET_KEY_OPTION_NO_FACET|FACET_KEY_OPTION_VISIBLE|FACET_KEY_OPTION_FTS,
+ systemd_journal_dynamic_row_id, NULL);
+
+ facets_register_key(facets, "MESSAGE",
+ FACET_KEY_OPTION_NO_FACET|FACET_KEY_OPTION_MAIN_TEXT|FACET_KEY_OPTION_VISIBLE|FACET_KEY_OPTION_FTS);
+
+ facets_register_key_transformation(facets, "PRIORITY", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS,
+ systemd_journal_transform_priority, NULL);
+
+ facets_register_key_transformation(facets, "SYSLOG_FACILITY", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS,
+ systemd_journal_transform_syslog_facility, NULL);
+
+ facets_register_key(facets, "SYSLOG_IDENTIFIER", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS);
+ facets_register_key(facets, "UNIT", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS);
+ facets_register_key(facets, "USER_UNIT", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS);
+
+ facets_register_key_transformation(facets, "_UID", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS,
+ systemd_journal_transform_uid, uids);
+
+ facets_register_key_transformation(facets, "_GID", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS,
+ systemd_journal_transform_gid, gids);
+
+ time_t after_s = 0, before_s = 0;
+ usec_t anchor = 0;
+ size_t last = 0;
+ const char *query = NULL;
+
+ buffer_json_member_add_object(wb, "request");
+ buffer_json_member_add_object(wb, "filters");
+
+ for(int i = 1; i < SYSTEMD_JOURNAL_MAX_PARAMS ;i++) {
+ const char *keyword = get_word(words, num_words, i);
+ if(!keyword) break;
+
+ if(strcmp(keyword, JOURNAL_PARAMETER_HELP) == 0) {
+ systemd_journal_function_help(transaction);
+ goto cleanup;
+ }
+ else if(strncmp(keyword, JOURNAL_PARAMETER_AFTER ":", strlen(JOURNAL_PARAMETER_AFTER ":")) == 0) {
+ after_s = str2l(&keyword[strlen(JOURNAL_PARAMETER_AFTER ":")]);
+ }
+ else if(strncmp(keyword, JOURNAL_PARAMETER_BEFORE ":", strlen(JOURNAL_PARAMETER_BEFORE ":")) == 0) {
+ before_s = str2l(&keyword[strlen(JOURNAL_PARAMETER_BEFORE ":")]);
+ }
+ else if(strncmp(keyword, JOURNAL_PARAMETER_ANCHOR ":", strlen(JOURNAL_PARAMETER_ANCHOR ":")) == 0) {
+ anchor = str2ull(&keyword[strlen(JOURNAL_PARAMETER_ANCHOR ":")], NULL);
+ }
+ else if(strncmp(keyword, JOURNAL_PARAMETER_LAST ":", strlen(JOURNAL_PARAMETER_LAST ":")) == 0) {
+ last = str2ul(&keyword[strlen(JOURNAL_PARAMETER_LAST ":")]);
+ }
+ else if(strncmp(keyword, JOURNAL_PARAMETER_QUERY ":", strlen(JOURNAL_PARAMETER_QUERY ":")) == 0) {
+ query= &keyword[strlen(JOURNAL_PARAMETER_QUERY ":")];
+ }
+ else {
+ char *value = strchr(keyword, ':');
+ if(value) {
+ *value++ = '\0';
+
+ buffer_json_member_add_array(wb, keyword);
+
+ while(value) {
+ char *sep = strchr(value, ',');
+ if(sep)
+ *sep++ = '\0';
+
+ facets_register_facet_filter(facets, keyword, value, FACET_KEY_OPTION_REORDER);
+ buffer_json_add_array_item_string(wb, value);
+
+ value = sep;
+ }
+
+ buffer_json_array_close(wb); // keyword
+ }
+ }
+ }
+
+ buffer_json_object_close(wb); // filters
+
+ time_t expires = now_realtime_sec() + 1;
+ time_t now_s;
+
+ if(!after_s && !before_s) {
+ now_s = now_realtime_sec();
+ before_s = now_s;
+ after_s = before_s - SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION;
+ }
+ else
+ rrdr_relative_window_to_absolute(&after_s, &before_s, &now_s, false);
+
+ if(after_s > before_s) {
+ time_t tmp = after_s;
+ after_s = before_s;
+ before_s = tmp;
+ }
+
+ if(after_s == before_s)
+ after_s = before_s - SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION;
+
+ if(!last)
+ last = SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY;
+
+ buffer_json_member_add_time_t(wb, "after", after_s);
+ buffer_json_member_add_time_t(wb, "before", before_s);
+ buffer_json_member_add_uint64(wb, "anchor", anchor);
+ buffer_json_member_add_uint64(wb, "last", last);
+ buffer_json_member_add_string(wb, "query", query);
+ buffer_json_member_add_time_t(wb, "timeout", timeout);
+ buffer_json_object_close(wb); // request
+
+ facets_set_items(facets, last);
+ facets_set_anchor(facets, anchor);
+ facets_set_query(facets, query);
+ int response = systemd_journal_query(wb, facets, after_s * USEC_PER_SEC, before_s * USEC_PER_SEC,
+ now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC);
+
+ if(response != HTTP_RESP_OK) {
+ pluginsd_function_json_error(transaction, response, "failed");
+ goto cleanup;
+ }
+
+ pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "application/json", expires);
+ fwrite(buffer_tostring(wb), buffer_strlen(wb), 1, stdout);
+
+ pluginsd_function_result_end_to_stdout();
+
+cleanup:
+ facets_destroy(facets);
+ buffer_free(wb);
+}
+
+static void *reader_main(void *arg __maybe_unused) {
+ char buffer[PLUGINSD_LINE_MAX + 1];
+
+ char *s = NULL;
+ while(!plugin_should_exit && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) {
+
+ char *words[PLUGINSD_MAX_WORDS] = { NULL };
+ size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, PLUGINSD_MAX_WORDS);
+
+ const char *keyword = get_word(words, num_words, 0);
+
+ if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
+ char *transaction = get_word(words, num_words, 1);
+ char *timeout_s = get_word(words, num_words, 2);
+ char *function = get_word(words, num_words, 3);
+
+ if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
+ netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
+ keyword,
+ transaction?transaction:"(unset)",
+ timeout_s?timeout_s:"(unset)",
+ function?function:"(unset)");
+ }
+ else {
+ int timeout = str2i(timeout_s);
+ if(timeout <= 0) timeout = SYSTEMD_JOURNAL_DEFAULT_TIMEOUT;
+
+ netdata_mutex_lock(&mutex);
+
+ if(strncmp(function, SYSTEMD_JOURNAL_FUNCTION_NAME, strlen(SYSTEMD_JOURNAL_FUNCTION_NAME)) == 0)
+ function_systemd_journal(transaction, function, buffer, PLUGINSD_LINE_MAX + 1, timeout);
+ else
+ pluginsd_function_json_error(transaction, HTTP_RESP_NOT_FOUND, "No function with this name found in systemd-journal.plugin.");
+
+ fflush(stdout);
+ netdata_mutex_unlock(&mutex);
+ }
+ }
+ else
+ netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)");
+ }
+
+ if(!s || feof(stdin) || ferror(stdin)) {
+ plugin_should_exit = true;
+ netdata_log_error("Received error on stdin.");
+ }
+
+ exit(1);
+}
+
+int main(int argc __maybe_unused, char **argv __maybe_unused) {
+ stderror = stderr;
+ clocks_init();
+
+ program_name = "systemd-journal.plugin";
+
+ // disable syslog
+ error_log_syslog = 0;
+
+ // set errors flood protection to 100 logs per hour
+ error_log_errors_per_period = 100;
+ error_log_throttle_period = 3600;
+
+ uids = dictionary_create(0);
+ gids = dictionary_create(0);
+
+ // ------------------------------------------------------------------------
+ // debug
+
+ if(argc == 2 && strcmp(argv[1], "debug") == 0) {
+ char buf[] = "systemd-journal after:-86400 before:0 last:500";
+ function_systemd_journal("123", buf, "", 0, 30);
+ exit(1);
+ }
+
+ // ------------------------------------------------------------------------
+
+ netdata_thread_t reader_thread;
+ netdata_thread_create(&reader_thread, "SDJ_READER", NETDATA_THREAD_OPTION_DONT_LOG, reader_main, NULL);
+
+ // ------------------------------------------------------------------------
+
+ time_t started_t = now_monotonic_sec();
+
+ size_t iteration;
+ usec_t step = 1000 * USEC_PER_MS;
+ bool tty = isatty(fileno(stderr)) == 1;
+
+ netdata_mutex_lock(&mutex);
+ fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " \"%s\" %d \"%s\"\n",
+ SYSTEMD_JOURNAL_FUNCTION_NAME, SYSTEMD_JOURNAL_DEFAULT_TIMEOUT, SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
+
+ heartbeat_t hb;
+ heartbeat_init(&hb);
+ for(iteration = 0; 1 ; iteration++) {
+ netdata_mutex_unlock(&mutex);
+ heartbeat_next(&hb, step);
+ netdata_mutex_lock(&mutex);
+
+ if(!tty)
+ fprintf(stdout, "\n");
+
+ fflush(stdout);
+
+ time_t now = now_monotonic_sec();
+ if(now - started_t > 86400)
+ break;
+ }
+
+ dictionary_destroy(uids);
+ dictionary_destroy(gids);
+
+ exit(0);
+}