summaryrefslogtreecommitdiffstats
path: root/collectors/systemd-journal.plugin/systemd-journal.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-09-14 15:59:22 +0300
committerGitHub <noreply@github.com>2023-09-14 15:59:22 +0300
commit934f620265d2e7a2be160cb084ec01875ed3491e (patch)
tree6bbf40643ff66c63867742ae5ecafb305d7119c5 /collectors/systemd-journal.plugin/systemd-journal.c
parente9d3dc658d50b74430b3b8a470bfa9ab338fd44f (diff)
facets: data-only queries (#15961)
Diffstat (limited to 'collectors/systemd-journal.plugin/systemd-journal.c')
-rw-r--r--collectors/systemd-journal.plugin/systemd-journal.c388
1 files changed, 318 insertions, 70 deletions
diff --git a/collectors/systemd-journal.plugin/systemd-journal.c b/collectors/systemd-journal.plugin/systemd-journal.c
index 1fe8804a77..32ae597026 100644
--- a/collectors/systemd-journal.plugin/systemd-journal.c
+++ b/collectors/systemd-journal.plugin/systemd-journal.c
@@ -22,6 +22,7 @@
#define SYSTEMD_JOURNAL_MAX_PARAMS 100
#define SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION (3 * 3600)
#define SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY 200
+#define SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED 50
#define JOURNAL_PARAMETER_HELP "help"
#define JOURNAL_PARAMETER_AFTER "after"
@@ -33,6 +34,7 @@
#define JOURNAL_PARAMETER_HISTOGRAM "histogram"
#define JOURNAL_PARAMETER_DIRECTION "direction"
#define JOURNAL_PARAMETER_IF_MODIFIED_SINCE "if_modified_since"
+#define JOURNAL_PARAMETER_DATA_ONLY "data_only"
#define JOURNAL_PARAMETER_SOURCE "source"
#define JOURNAL_PARAMETER_INFO "info"
@@ -53,7 +55,7 @@
"|IMAGE_NAME" \
""
-static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
+static netdata_mutex_t stdout_mutex = NETDATA_MUTEX_INITIALIZER;
static bool plugin_should_exit = false;
DICTIONARY *uids = NULL;
@@ -61,7 +63,7 @@ DICTIONARY *gids = NULL;
// ----------------------------------------------------------------------------
-int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t if_modified_since, usec_t stop_monotonic_ut) {
+static inline sd_journal *netdata_open_systemd_journal(void) {
sd_journal *j = NULL;
int r;
@@ -82,36 +84,78 @@ int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t be
if (r < 0) {
netdata_log_error("SYSTEMD-JOURNAL: Failed to open SystemD Journal, with error %d", r);
- return HTTP_RESP_INTERNAL_SERVER_ERROR;
+ return NULL;
}
- facets_rows_begin(facets);
+ return j;
+}
+
+typedef enum {
+ ND_SD_JOURNAL_FAILED_TO_SEEK,
+ ND_SD_JOURNAL_TIMED_OUT,
+ ND_SD_JOURNAL_OK,
+ ND_SD_JOURNAL_NOT_MODIFIED,
+} ND_SD_JOURNAL_STATUS;
+
+static inline bool netdata_systemd_journal_seek_to(sd_journal *j, usec_t timestamp) {
+ if(sd_journal_seek_realtime_usec(j, timestamp) < 0) {
+ netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to %" PRIu64, timestamp);
+ if(sd_journal_seek_tail(j) < 0) {
+ netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to journal's tail");
+ return false;
+ }
+ }
+
+ return true;
+}
+
+static inline void netdata_systemd_journal_process_row(sd_journal *j, FACETS *facets) {
+ const void *data;
+ size_t length;
+ SD_JOURNAL_FOREACH_DATA(j, data, length) {
+ const char *key = data;
+ const char *equal = strchr(key, '=');
+ if(unlikely(!equal))
+ continue;
+
+ const char *value = ++equal;
+ size_t key_length = value - key; // including '\0'
+
+ char key_copy[key_length];
+ memcpy(key_copy, key, key_length - 1);
+ key_copy[key_length - 1] = '\0';
+
+ size_t value_length = length - key_length; // without '\0'
+ facets_add_key_value_length(facets, key_copy, key_length - 1, value, value_length <= FACET_MAX_VALUE_LENGTH ? value_length : FACET_MAX_VALUE_LENGTH);
+ }
+}
+
+ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_full(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t if_modified_since, usec_t stop_monotonic_ut, usec_t *last_modified) {
+ if(!netdata_systemd_journal_seek_to(j, before_ut))
+ return ND_SD_JOURNAL_FAILED_TO_SEEK;
- uint64_t first_msg_ut = 0;
+ size_t errors_no_timestamp = 0;
+ usec_t first_msg_ut = 0;
bool timed_out = false;
size_t row_counter = 0;
// the entries are not guaranteed to be sorted, so we process up to 100 entries beyond
// the end of the query to find possibly useful logs for our time-frame
- size_t excess_rows_allowed = 100;
+ size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED;
- if(sd_journal_seek_realtime_usec(j, before_ut) < 0) {
- netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to %" PRIu64, before_ut);
- if(sd_journal_seek_tail(j) < 0) {
- netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to journal's tail");
- goto finalize;
- }
- }
+ facets_rows_begin(facets);
while (sd_journal_previous(j) > 0) {
row_counter++;
- uint64_t msg_ut;
- sd_journal_get_realtime_usec(j, &msg_ut);
+ usec_t msg_ut;
+ if(sd_journal_get_realtime_usec(j, &msg_ut) < 0) {
+ errors_no_timestamp++;
+ continue;
+ }
if(unlikely(!first_msg_ut)) {
if(msg_ut == if_modified_since) {
- sd_journal_close(j);
- return HTTP_RESP_NOT_MODIFIED;
+ return ND_SD_JOURNAL_NOT_MODIFIED;
}
first_msg_ut = msg_ut;
@@ -127,26 +171,65 @@ int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t be
continue;
}
- const void *data;
- size_t length;
- SD_JOURNAL_FOREACH_DATA(j, data, length) {
- const char *key = data;
- const char *equal = strchr(key, '=');
- if(unlikely(!equal))
- continue;
+ netdata_systemd_journal_process_row(j, facets);
+ facets_row_finished(facets, msg_ut);
+
+ if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) {
+ timed_out = true;
+ break;
+ }
+ }
+
+ if(errors_no_timestamp)
+ netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp);
+
+ *last_modified = first_msg_ut;
- const char *value = ++equal;
- size_t key_length = value - key; // including '\0'
+ if(timed_out)
+ return ND_SD_JOURNAL_TIMED_OUT;
- char key_copy[key_length];
- memcpy(key_copy, key, key_length - 1);
- key_copy[key_length - 1] = '\0';
+ return ND_SD_JOURNAL_OK;
+}
+
+ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_forward(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t anchor, size_t entries, usec_t stop_monotonic_ut) {
+ if(!netdata_systemd_journal_seek_to(j, anchor))
+ return ND_SD_JOURNAL_FAILED_TO_SEEK;
+
+ size_t errors_no_timestamp = 0;
+ bool timed_out = false;
+ size_t row_counter = 0;
+ size_t rows_added = 0;
+
+ // the entries are not guaranteed to be sorted, so we process up to 100 entries beyond
+ // the end of the query to find possibly useful logs for our time-frame
+ size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED;
+
+ facets_rows_begin(facets);
+ while (sd_journal_next(j) > 0) {
+ row_counter++;
+
+ usec_t msg_ut;
+ if(sd_journal_get_realtime_usec(j, &msg_ut) < 0) {
+ errors_no_timestamp++;
+ continue;
+ }
+
+ if (msg_ut > before_ut || msg_ut <= anchor)
+ continue;
+
+ if (msg_ut < after_ut) {
+ if(--excess_rows_allowed == 0)
+ break;
- size_t value_length = length - key_length; // without '\0'
- facets_add_key_value_length(facets, key_copy, key_length - 1, value, value_length <= FACET_MAX_VALUE_LENGTH ? value_length : FACET_MAX_VALUE_LENGTH);
+ continue;
}
+ if(rows_added > entries && --excess_rows_allowed == 0)
+ break;
+
+ netdata_systemd_journal_process_row(j, facets);
facets_row_finished(facets, msg_ut);
+ rows_added++;
if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) {
timed_out = true;
@@ -154,27 +237,147 @@ int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t be
}
}
-finalize:
+ if(errors_no_timestamp)
+ netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp);
+
+ if(timed_out)
+ return ND_SD_JOURNAL_TIMED_OUT;
+
+ return ND_SD_JOURNAL_OK;
+}
+
+ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_backward(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t anchor, size_t entries, usec_t stop_monotonic_ut) {
+ if(!netdata_systemd_journal_seek_to(j, anchor))
+ return ND_SD_JOURNAL_FAILED_TO_SEEK;
+
+ size_t errors_no_timestamp = 0;
+ bool timed_out = false;
+ size_t row_counter = 0;
+ size_t rows_added = 0;
+
+ // the entries are not guaranteed to be sorted, so we process up to 100 entries beyond
+ // the end of the query to find possibly useful logs for our time-frame
+ size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED;
+
+ facets_rows_begin(facets);
+ while (sd_journal_previous(j) > 0) {
+ row_counter++;
+
+ usec_t msg_ut;
+ if(sd_journal_get_realtime_usec(j, &msg_ut) < 0) {
+ errors_no_timestamp++;
+ continue;
+ }
+
+ if (msg_ut > before_ut || msg_ut >= anchor)
+ continue;
+
+ if (msg_ut < after_ut) {
+ if(--excess_rows_allowed == 0)
+ break;
+
+ continue;
+ }
+
+ if(rows_added > entries && --excess_rows_allowed == 0)
+ break;
+
+ netdata_systemd_journal_process_row(j, facets);
+ facets_row_finished(facets, msg_ut);
+ rows_added++;
+
+ if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) {
+ timed_out = true;
+ break;
+ }
+ }
+
+ if(errors_no_timestamp)
+ netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp);
+
+ if(timed_out)
+ return ND_SD_JOURNAL_TIMED_OUT;
+
+ return ND_SD_JOURNAL_OK;
+}
+
+bool netdata_systemd_journal_check_if_modified_since(sd_journal *j, usec_t seek_to, usec_t last_modified) {
+ // return true, if data have been modified since the timestamp
+
+ if(!last_modified || !seek_to)
+ return false;
+
+ if(!netdata_systemd_journal_seek_to(j, seek_to))
+ return false;
+
+ usec_t first_msg_ut = 0;
+ while (sd_journal_previous(j) > 0) {
+ usec_t msg_ut;
+ if(sd_journal_get_realtime_usec(j, &msg_ut) < 0)
+ continue;
+
+ first_msg_ut = msg_ut;
+ break;
+ }
+
+ return first_msg_ut != last_modified;
+}
+
+static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets,
+ usec_t after_ut, usec_t before_ut,
+ usec_t anchor, FACETS_ANCHOR_DIRECTION direction, size_t entries,
+ usec_t if_modified_since, bool data_only,
+ usec_t stop_monotonic_ut) {
+ sd_journal *j = netdata_open_systemd_journal();
+ if(!j)
+ return HTTP_RESP_INTERNAL_SERVER_ERROR;
+
+ usec_t last_modified = 0;
+
+ ND_SD_JOURNAL_STATUS status;
+
+ if(data_only && anchor /* && !netdata_systemd_journal_check_if_modified_since(j, before_ut, if_modified_since) */) {
+ facets_data_only_mode(facets);
+
+ // we can do a data-only query
+ if(direction == FACETS_ANCHOR_DIRECTION_FORWARD)
+ status = netdata_systemd_journal_query_data_forward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut);
+ else
+ status = netdata_systemd_journal_query_data_backward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut);
+ }
+ else {
+ // we have to do a full query
+ status = netdata_systemd_journal_query_full(j, wb, facets,
+ after_ut, before_ut, if_modified_since,
+ stop_monotonic_ut, &last_modified);
+ }
+
sd_journal_close(j);
- buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
- buffer_json_member_add_boolean(wb, "partial", timed_out);
+ if(status == ND_SD_JOURNAL_NOT_MODIFIED)
+ return HTTP_RESP_NOT_MODIFIED;
+
+ buffer_json_member_add_uint64(wb, "status", status == ND_SD_JOURNAL_FAILED_TO_SEEK ? HTTP_RESP_INTERNAL_SERVER_ERROR : HTTP_RESP_OK);
+ buffer_json_member_add_boolean(wb, "partial", status != ND_SD_JOURNAL_OK);
buffer_json_member_add_string(wb, "type", "table");
- buffer_json_member_add_time_t(wb, "update_every", 1);
- buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
- buffer_json_member_add_uint64(wb, "last_modified", first_msg_ut);
+
+ if(!data_only) {
+ buffer_json_member_add_time_t(wb, "update_every", 1);
+ buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
+ buffer_json_member_add_uint64(wb, "last_modified", last_modified);
+ }
facets_report(facets, wb);
- buffer_json_member_add_time_t(wb, "expires", now_realtime_sec());
+ buffer_json_member_add_time_t(wb, "expires", now_realtime_sec() + data_only ? 3600 : 0);
buffer_json_finalize(wb);
- return HTTP_RESP_OK;
+ return status == ND_SD_JOURNAL_FAILED_TO_SEEK ? HTTP_RESP_INTERNAL_SERVER_ERROR : HTTP_RESP_OK;
}
-static void systemd_journal_function_help(const char *transaction) {
- pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600);
- fprintf(stdout,
+static void netdata_systemd_journal_function_help(const char *transaction) {
+ BUFFER *wb = buffer_create(0, NULL);
+ buffer_sprintf(wb,
"%s / %s\n"
"\n"
"%s\n"
@@ -212,7 +415,12 @@ static void systemd_journal_function_help(const char *transaction) {
, -SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION
, SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY
);
- pluginsd_function_result_end_to_stdout();
+
+ netdata_mutex_lock(&stdout_mutex);
+ pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600, wb);
+ netdata_mutex_unlock(&stdout_mutex);
+
+ buffer_free(wb);
}
static const char *syslog_facility_to_name(int facility) {
@@ -255,6 +463,26 @@ static const char *syslog_priority_to_name(int priority) {
}
}
+static FACET_ROW_SEVERITY syslog_priority_to_facet_severity(int priority) {
+ // same to
+ // https://github.com/systemd/systemd/blob/aab9e4b2b86905a15944a1ac81e471b5b7075932/src/basic/terminal-util.c#L1501
+ // function get_log_colors()
+
+ if(priority <= LOG_ERR)
+ return FACET_ROW_SEVERITY_CRITICAL;
+
+ else if (priority <= LOG_WARNING)
+ return FACET_ROW_SEVERITY_WARNING;
+
+ else if(priority <= LOG_NOTICE)
+ return FACET_ROW_SEVERITY_NOTICE;
+
+ else if(priority >= LOG_DEBUG)
+ return FACET_ROW_SEVERITY_DEBUG;
+
+ return FACET_ROW_SEVERITY_NORMAL;
+}
+
static char *uid_to_username(uid_t uid, char *buffer, size_t buffer_size) {
struct passwd pw, *result;
char tmp[1024 + 1];
@@ -277,7 +505,7 @@ static char *gid_to_groupname(gid_t gid, char* buffer, size_t buffer_size) {
return buffer;
}
-static void systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) {
+static void netdata_systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) {
const char *v = buffer_tostring(wb);
if(*v && isdigit(*v)) {
int facility = str2i(buffer_tostring(wb));
@@ -289,7 +517,7 @@ static void systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unu
}
}
-static void systemd_journal_transform_priority(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) {
+static void netdata_systemd_journal_transform_priority(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) {
const char *v = buffer_tostring(wb);
if(*v && isdigit(*v)) {
int priority = str2i(buffer_tostring(wb));
@@ -298,10 +526,12 @@ static void systemd_journal_transform_priority(FACETS *facets __maybe_unused, BU
buffer_flush(wb);
buffer_strcat(wb, name);
}
+
+ facets_set_current_row_severity(facets, syslog_priority_to_facet_severity(priority));
}
}
-static void systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) {
+static void netdata_systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) {
DICTIONARY *cache = data;
const char *v = buffer_tostring(wb);
if(*v && isdigit(*v)) {
@@ -321,7 +551,7 @@ static void systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER
}
}
-static void systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) {
+static void netdata_systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) {
DICTIONARY *cache = data;
const char *v = buffer_tostring(wb);
if(*v && isdigit(*v)) {
@@ -341,7 +571,7 @@ static void systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER
}
}
-static void systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) {
+static void netdata_systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) {
FACET_ROW_KEY_VALUE *pid_rkv = dictionary_get(row->dict, "_PID");
const char *pid = pid_rkv ? buffer_tostring(pid_rkv->wb) : FACET_VALUE_UNSET;
@@ -363,6 +593,12 @@ static void systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER
buffer_json_add_array_item_string(json_array, buffer_tostring(rkv->wb));
}
+static void netdata_systemd_journal_rich_message(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) {
+ buffer_json_add_array_item_object(json_array);
+ buffer_json_member_add_string(json_array, "value", buffer_tostring(rkv->wb));
+ buffer_json_object_close(json_array);
+}
+
static void function_systemd_journal(const char *transaction, char *function, char *line_buffer __maybe_unused, int line_max __maybe_unused, int timeout __maybe_unused) {
BUFFER *wb = buffer_create(0, NULL);
buffer_flush(wb);
@@ -384,34 +620,41 @@ static void function_systemd_journal(const char *transaction, char *function, ch
facets_accepted_param(facets, JOURNAL_PARAMETER_FACETS);
facets_accepted_param(facets, JOURNAL_PARAMETER_HISTOGRAM);
facets_accepted_param(facets, JOURNAL_PARAMETER_IF_MODIFIED_SINCE);
+ facets_accepted_param(facets, JOURNAL_PARAMETER_DATA_ONLY);
// register the fields in the order you want them on the dashboard
facets_register_dynamic_key_name(facets, "ND_JOURNAL_PROCESS",
FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS,
- systemd_journal_dynamic_row_id, NULL);
+ netdata_systemd_journal_dynamic_row_id, NULL);
facets_register_key_name(facets, "MESSAGE",
- FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT | FACET_KEY_OPTION_VISIBLE |
- FACET_KEY_OPTION_FTS);
+ FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT |
+ FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS);
+
+// facets_register_dynamic_key_name(facets, "MESSAGE",
+// FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT | FACET_KEY_OPTION_RICH_TEXT |
+// FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS,
+// netdata_systemd_journal_rich_message, NULL);
facets_register_key_name_transformation(facets, "PRIORITY", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS,
- systemd_journal_transform_priority, NULL);
+ netdata_systemd_journal_transform_priority, NULL);
facets_register_key_name_transformation(facets, "SYSLOG_FACILITY", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS,
- systemd_journal_transform_syslog_facility, NULL);
+ netdata_systemd_journal_transform_syslog_facility, NULL);
facets_register_key_name(facets, "SYSLOG_IDENTIFIER", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS);
facets_register_key_name(facets, "UNIT", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS);
facets_register_key_name(facets, "USER_UNIT", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS);
facets_register_key_name_transformation(facets, "_UID", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS,
- systemd_journal_transform_uid, uids);
+ netdata_systemd_journal_transform_uid, uids);
facets_register_key_name_transformation(facets, "_GID", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS,
- systemd_journal_transform_gid, gids);
+ netdata_systemd_journal_transform_gid, gids);
bool info = false;
+ bool data_only = false;
time_t after_s = 0, before_s = 0;
usec_t anchor = 0;
usec_t if_modified_since = 0;
@@ -430,12 +673,15 @@ static void function_systemd_journal(const char *transaction, char *function, ch
if(!keyword) break;
if(strcmp(keyword, JOURNAL_PARAMETER_HELP) == 0) {
- systemd_journal_function_help(transaction);
+ netdata_systemd_journal_function_help(transaction);
goto cleanup;
}
else if(strcmp(keyword, JOURNAL_PARAMETER_INFO) == 0) {
info = true;
}
+ else if(strcmp(keyword, JOURNAL_PARAMETER_DATA_ONLY) == 0) {
+ data_only = true;
+ }
else if(strncmp(keyword, JOURNAL_PARAMETER_SOURCE ":", sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1) == 0) {
source = &keyword[sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1];
}
@@ -580,18 +826,22 @@ static void function_systemd_journal(const char *transaction, char *function, ch
facets_set_query(facets, query);
facets_set_histogram(facets, chart ? chart : "PRIORITY", after_s * USEC_PER_SEC, before_s * USEC_PER_SEC);
- response = systemd_journal_query(wb, facets, after_s * USEC_PER_SEC, before_s * USEC_PER_SEC,
- if_modified_since, now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC);
+ response = netdata_systemd_journal_query(wb, facets, after_s * USEC_PER_SEC, before_s * USEC_PER_SEC,
+ anchor, direction, last,
+ if_modified_since, data_only,
+ now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC);
if(response != HTTP_RESP_OK) {
+ netdata_mutex_lock(&stdout_mutex);
pluginsd_function_json_error_to_stdout(transaction, response, "failed");
+ netdata_mutex_unlock(&stdout_mutex);
goto cleanup;
}
output:
- pluginsd_function_result_begin_to_stdout(transaction, response, "application/json", expires);
- fwrite(buffer_tostring(wb), buffer_strlen(wb), 1, stdout);
- pluginsd_function_result_end_to_stdout();
+ netdata_mutex_lock(&stdout_mutex);
+ pluginsd_function_result_to_stdout(transaction, response, "application/json", expires, wb);
+ netdata_mutex_unlock(&stdout_mutex);
cleanup:
facets_destroy(facets);
@@ -625,16 +875,14 @@ static void *reader_main(void *arg __maybe_unused) {
int timeout = str2i(timeout_s);
if(timeout <= 0) timeout = SYSTEMD_JOURNAL_DEFAULT_TIMEOUT;
- netdata_mutex_lock(&mutex);
-
if(strncmp(function, SYSTEMD_JOURNAL_FUNCTION_NAME, strlen(SYSTEMD_JOURNAL_FUNCTION_NAME)) == 0)
function_systemd_journal(transaction, function, buffer, PLUGINSD_LINE_MAX + 1, timeout);
- else
+ else {
+ netdata_mutex_lock(&stdout_mutex);
pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND,
"No function with this name found in systemd-journal.plugin.");
-
- fflush(stdout);
- netdata_mutex_unlock(&mutex);
+ netdata_mutex_unlock(&stdout_mutex);
+ }
}
}
else
@@ -691,16 +939,16 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) {
usec_t step = 1000 * USEC_PER_MS;
bool tty = isatty(fileno(stderr)) == 1;
- netdata_mutex_lock(&mutex);
+ netdata_mutex_lock(&stdout_mutex);
fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " GLOBAL \"%s\" %d \"%s\"\n",
SYSTEMD_JOURNAL_FUNCTION_NAME, SYSTEMD_JOURNAL_DEFAULT_TIMEOUT, SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
heartbeat_t hb;
heartbeat_init(&hb);
for(iteration = 0; 1 ; iteration++) {
- netdata_mutex_unlock(&mutex);
+ netdata_mutex_unlock(&stdout_mutex);
heartbeat_next(&hb, step);
- netdata_mutex_lock(&mutex);
+ netdata_mutex_lock(&stdout_mutex);
if(!tty)
fprintf(stdout, "\n");