diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-09-14 15:59:22 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-14 15:59:22 +0300 |
commit | 934f620265d2e7a2be160cb084ec01875ed3491e (patch) | |
tree | 6bbf40643ff66c63867742ae5ecafb305d7119c5 /collectors/systemd-journal.plugin/systemd-journal.c | |
parent | e9d3dc658d50b74430b3b8a470bfa9ab338fd44f (diff) |
facets: data-only queries (#15961)
Diffstat (limited to 'collectors/systemd-journal.plugin/systemd-journal.c')
-rw-r--r-- | collectors/systemd-journal.plugin/systemd-journal.c | 388 |
1 files changed, 318 insertions, 70 deletions
diff --git a/collectors/systemd-journal.plugin/systemd-journal.c b/collectors/systemd-journal.plugin/systemd-journal.c index 1fe8804a77..32ae597026 100644 --- a/collectors/systemd-journal.plugin/systemd-journal.c +++ b/collectors/systemd-journal.plugin/systemd-journal.c @@ -22,6 +22,7 @@ #define SYSTEMD_JOURNAL_MAX_PARAMS 100 #define SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION (3 * 3600) #define SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY 200 +#define SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED 50 #define JOURNAL_PARAMETER_HELP "help" #define JOURNAL_PARAMETER_AFTER "after" @@ -33,6 +34,7 @@ #define JOURNAL_PARAMETER_HISTOGRAM "histogram" #define JOURNAL_PARAMETER_DIRECTION "direction" #define JOURNAL_PARAMETER_IF_MODIFIED_SINCE "if_modified_since" +#define JOURNAL_PARAMETER_DATA_ONLY "data_only" #define JOURNAL_PARAMETER_SOURCE "source" #define JOURNAL_PARAMETER_INFO "info" @@ -53,7 +55,7 @@ "|IMAGE_NAME" \ "" -static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER; +static netdata_mutex_t stdout_mutex = NETDATA_MUTEX_INITIALIZER; static bool plugin_should_exit = false; DICTIONARY *uids = NULL; @@ -61,7 +63,7 @@ DICTIONARY *gids = NULL; // ---------------------------------------------------------------------------- -int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t if_modified_since, usec_t stop_monotonic_ut) { +static inline sd_journal *netdata_open_systemd_journal(void) { sd_journal *j = NULL; int r; @@ -82,36 +84,78 @@ int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t be if (r < 0) { netdata_log_error("SYSTEMD-JOURNAL: Failed to open SystemD Journal, with error %d", r); - return HTTP_RESP_INTERNAL_SERVER_ERROR; + return NULL; } - facets_rows_begin(facets); + return j; +} + +typedef enum { + ND_SD_JOURNAL_FAILED_TO_SEEK, + ND_SD_JOURNAL_TIMED_OUT, + ND_SD_JOURNAL_OK, + ND_SD_JOURNAL_NOT_MODIFIED, +} ND_SD_JOURNAL_STATUS; + +static inline bool netdata_systemd_journal_seek_to(sd_journal *j, usec_t timestamp) { + if(sd_journal_seek_realtime_usec(j, timestamp) < 0) { + netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to %" PRIu64, timestamp); + if(sd_journal_seek_tail(j) < 0) { + netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to journal's tail"); + return false; + } + } + + return true; +} + +static inline void netdata_systemd_journal_process_row(sd_journal *j, FACETS *facets) { + const void *data; + size_t length; + SD_JOURNAL_FOREACH_DATA(j, data, length) { + const char *key = data; + const char *equal = strchr(key, '='); + if(unlikely(!equal)) + continue; + + const char *value = ++equal; + size_t key_length = value - key; // including '\0' + + char key_copy[key_length]; + memcpy(key_copy, key, key_length - 1); + key_copy[key_length - 1] = '\0'; + + size_t value_length = length - key_length; // without '\0' + facets_add_key_value_length(facets, key_copy, key_length - 1, value, value_length <= FACET_MAX_VALUE_LENGTH ? value_length : FACET_MAX_VALUE_LENGTH); + } +} + +ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_full(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t if_modified_since, usec_t stop_monotonic_ut, usec_t *last_modified) { + if(!netdata_systemd_journal_seek_to(j, before_ut)) + return ND_SD_JOURNAL_FAILED_TO_SEEK; - uint64_t first_msg_ut = 0; + size_t errors_no_timestamp = 0; + usec_t first_msg_ut = 0; bool timed_out = false; size_t row_counter = 0; // the entries are not guaranteed to be sorted, so we process up to 100 entries beyond // the end of the query to find possibly useful logs for our time-frame - size_t excess_rows_allowed = 100; + size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED; - if(sd_journal_seek_realtime_usec(j, before_ut) < 0) { - netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to %" PRIu64, before_ut); - if(sd_journal_seek_tail(j) < 0) { - netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to journal's tail"); - goto finalize; - } - } + facets_rows_begin(facets); while (sd_journal_previous(j) > 0) { row_counter++; - uint64_t msg_ut; - sd_journal_get_realtime_usec(j, &msg_ut); + usec_t msg_ut; + if(sd_journal_get_realtime_usec(j, &msg_ut) < 0) { + errors_no_timestamp++; + continue; + } if(unlikely(!first_msg_ut)) { if(msg_ut == if_modified_since) { - sd_journal_close(j); - return HTTP_RESP_NOT_MODIFIED; + return ND_SD_JOURNAL_NOT_MODIFIED; } first_msg_ut = msg_ut; @@ -127,26 +171,65 @@ int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t be continue; } - const void *data; - size_t length; - SD_JOURNAL_FOREACH_DATA(j, data, length) { - const char *key = data; - const char *equal = strchr(key, '='); - if(unlikely(!equal)) - continue; + netdata_systemd_journal_process_row(j, facets); + facets_row_finished(facets, msg_ut); + + if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) { + timed_out = true; + break; + } + } + + if(errors_no_timestamp) + netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp); + + *last_modified = first_msg_ut; - const char *value = ++equal; - size_t key_length = value - key; // including '\0' + if(timed_out) + return ND_SD_JOURNAL_TIMED_OUT; - char key_copy[key_length]; - memcpy(key_copy, key, key_length - 1); - key_copy[key_length - 1] = '\0'; + return ND_SD_JOURNAL_OK; +} + +ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_forward(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t anchor, size_t entries, usec_t stop_monotonic_ut) { + if(!netdata_systemd_journal_seek_to(j, anchor)) + return ND_SD_JOURNAL_FAILED_TO_SEEK; + + size_t errors_no_timestamp = 0; + bool timed_out = false; + size_t row_counter = 0; + size_t rows_added = 0; + + // the entries are not guaranteed to be sorted, so we process up to 100 entries beyond + // the end of the query to find possibly useful logs for our time-frame + size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED; + + facets_rows_begin(facets); + while (sd_journal_next(j) > 0) { + row_counter++; + + usec_t msg_ut; + if(sd_journal_get_realtime_usec(j, &msg_ut) < 0) { + errors_no_timestamp++; + continue; + } + + if (msg_ut > before_ut || msg_ut <= anchor) + continue; + + if (msg_ut < after_ut) { + if(--excess_rows_allowed == 0) + break; - size_t value_length = length - key_length; // without '\0' - facets_add_key_value_length(facets, key_copy, key_length - 1, value, value_length <= FACET_MAX_VALUE_LENGTH ? value_length : FACET_MAX_VALUE_LENGTH); + continue; } + if(rows_added > entries && --excess_rows_allowed == 0) + break; + + netdata_systemd_journal_process_row(j, facets); facets_row_finished(facets, msg_ut); + rows_added++; if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) { timed_out = true; @@ -154,27 +237,147 @@ int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t be } } -finalize: + if(errors_no_timestamp) + netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp); + + if(timed_out) + return ND_SD_JOURNAL_TIMED_OUT; + + return ND_SD_JOURNAL_OK; +} + +ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_backward(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t anchor, size_t entries, usec_t stop_monotonic_ut) { + if(!netdata_systemd_journal_seek_to(j, anchor)) + return ND_SD_JOURNAL_FAILED_TO_SEEK; + + size_t errors_no_timestamp = 0; + bool timed_out = false; + size_t row_counter = 0; + size_t rows_added = 0; + + // the entries are not guaranteed to be sorted, so we process up to 100 entries beyond + // the end of the query to find possibly useful logs for our time-frame + size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED; + + facets_rows_begin(facets); + while (sd_journal_previous(j) > 0) { + row_counter++; + + usec_t msg_ut; + if(sd_journal_get_realtime_usec(j, &msg_ut) < 0) { + errors_no_timestamp++; + continue; + } + + if (msg_ut > before_ut || msg_ut >= anchor) + continue; + + if (msg_ut < after_ut) { + if(--excess_rows_allowed == 0) + break; + + continue; + } + + if(rows_added > entries && --excess_rows_allowed == 0) + break; + + netdata_systemd_journal_process_row(j, facets); + facets_row_finished(facets, msg_ut); + rows_added++; + + if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) { + timed_out = true; + break; + } + } + + if(errors_no_timestamp) + netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp); + + if(timed_out) + return ND_SD_JOURNAL_TIMED_OUT; + + return ND_SD_JOURNAL_OK; +} + +bool netdata_systemd_journal_check_if_modified_since(sd_journal *j, usec_t seek_to, usec_t last_modified) { + // return true, if data have been modified since the timestamp + + if(!last_modified || !seek_to) + return false; + + if(!netdata_systemd_journal_seek_to(j, seek_to)) + return false; + + usec_t first_msg_ut = 0; + while (sd_journal_previous(j) > 0) { + usec_t msg_ut; + if(sd_journal_get_realtime_usec(j, &msg_ut) < 0) + continue; + + first_msg_ut = msg_ut; + break; + } + + return first_msg_ut != last_modified; +} + +static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, + usec_t after_ut, usec_t before_ut, + usec_t anchor, FACETS_ANCHOR_DIRECTION direction, size_t entries, + usec_t if_modified_since, bool data_only, + usec_t stop_monotonic_ut) { + sd_journal *j = netdata_open_systemd_journal(); + if(!j) + return HTTP_RESP_INTERNAL_SERVER_ERROR; + + usec_t last_modified = 0; + + ND_SD_JOURNAL_STATUS status; + + if(data_only && anchor /* && !netdata_systemd_journal_check_if_modified_since(j, before_ut, if_modified_since) */) { + facets_data_only_mode(facets); + + // we can do a data-only query + if(direction == FACETS_ANCHOR_DIRECTION_FORWARD) + status = netdata_systemd_journal_query_data_forward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut); + else + status = netdata_systemd_journal_query_data_backward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut); + } + else { + // we have to do a full query + status = netdata_systemd_journal_query_full(j, wb, facets, + after_ut, before_ut, if_modified_since, + stop_monotonic_ut, &last_modified); + } + sd_journal_close(j); - buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK); - buffer_json_member_add_boolean(wb, "partial", timed_out); + if(status == ND_SD_JOURNAL_NOT_MODIFIED) + return HTTP_RESP_NOT_MODIFIED; + + buffer_json_member_add_uint64(wb, "status", status == ND_SD_JOURNAL_FAILED_TO_SEEK ? HTTP_RESP_INTERNAL_SERVER_ERROR : HTTP_RESP_OK); + buffer_json_member_add_boolean(wb, "partial", status != ND_SD_JOURNAL_OK); buffer_json_member_add_string(wb, "type", "table"); - buffer_json_member_add_time_t(wb, "update_every", 1); - buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION); - buffer_json_member_add_uint64(wb, "last_modified", first_msg_ut); + + if(!data_only) { + buffer_json_member_add_time_t(wb, "update_every", 1); + buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION); + buffer_json_member_add_uint64(wb, "last_modified", last_modified); + } facets_report(facets, wb); - buffer_json_member_add_time_t(wb, "expires", now_realtime_sec()); + buffer_json_member_add_time_t(wb, "expires", now_realtime_sec() + data_only ? 3600 : 0); buffer_json_finalize(wb); - return HTTP_RESP_OK; + return status == ND_SD_JOURNAL_FAILED_TO_SEEK ? HTTP_RESP_INTERNAL_SERVER_ERROR : HTTP_RESP_OK; } -static void systemd_journal_function_help(const char *transaction) { - pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600); - fprintf(stdout, +static void netdata_systemd_journal_function_help(const char *transaction) { + BUFFER *wb = buffer_create(0, NULL); + buffer_sprintf(wb, "%s / %s\n" "\n" "%s\n" @@ -212,7 +415,12 @@ static void systemd_journal_function_help(const char *transaction) { , -SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION , SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY ); - pluginsd_function_result_end_to_stdout(); + + netdata_mutex_lock(&stdout_mutex); + pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600, wb); + netdata_mutex_unlock(&stdout_mutex); + + buffer_free(wb); } static const char *syslog_facility_to_name(int facility) { @@ -255,6 +463,26 @@ static const char *syslog_priority_to_name(int priority) { } } +static FACET_ROW_SEVERITY syslog_priority_to_facet_severity(int priority) { + // same to + // https://github.com/systemd/systemd/blob/aab9e4b2b86905a15944a1ac81e471b5b7075932/src/basic/terminal-util.c#L1501 + // function get_log_colors() + + if(priority <= LOG_ERR) + return FACET_ROW_SEVERITY_CRITICAL; + + else if (priority <= LOG_WARNING) + return FACET_ROW_SEVERITY_WARNING; + + else if(priority <= LOG_NOTICE) + return FACET_ROW_SEVERITY_NOTICE; + + else if(priority >= LOG_DEBUG) + return FACET_ROW_SEVERITY_DEBUG; + + return FACET_ROW_SEVERITY_NORMAL; +} + static char *uid_to_username(uid_t uid, char *buffer, size_t buffer_size) { struct passwd pw, *result; char tmp[1024 + 1]; @@ -277,7 +505,7 @@ static char *gid_to_groupname(gid_t gid, char* buffer, size_t buffer_size) { return buffer; } -static void systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) { +static void netdata_systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) { const char *v = buffer_tostring(wb); if(*v && isdigit(*v)) { int facility = str2i(buffer_tostring(wb)); @@ -289,7 +517,7 @@ static void systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unu } } -static void systemd_journal_transform_priority(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) { +static void netdata_systemd_journal_transform_priority(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) { const char *v = buffer_tostring(wb); if(*v && isdigit(*v)) { int priority = str2i(buffer_tostring(wb)); @@ -298,10 +526,12 @@ static void systemd_journal_transform_priority(FACETS *facets __maybe_unused, BU buffer_flush(wb); buffer_strcat(wb, name); } + + facets_set_current_row_severity(facets, syslog_priority_to_facet_severity(priority)); } } -static void systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) { +static void netdata_systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) { DICTIONARY *cache = data; const char *v = buffer_tostring(wb); if(*v && isdigit(*v)) { @@ -321,7 +551,7 @@ static void systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER } } -static void systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) { +static void netdata_systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) { DICTIONARY *cache = data; const char *v = buffer_tostring(wb); if(*v && isdigit(*v)) { @@ -341,7 +571,7 @@ static void systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER } } -static void systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) { +static void netdata_systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) { FACET_ROW_KEY_VALUE *pid_rkv = dictionary_get(row->dict, "_PID"); const char *pid = pid_rkv ? buffer_tostring(pid_rkv->wb) : FACET_VALUE_UNSET; @@ -363,6 +593,12 @@ static void systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER buffer_json_add_array_item_string(json_array, buffer_tostring(rkv->wb)); } +static void netdata_systemd_journal_rich_message(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) { + buffer_json_add_array_item_object(json_array); + buffer_json_member_add_string(json_array, "value", buffer_tostring(rkv->wb)); + buffer_json_object_close(json_array); +} + static void function_systemd_journal(const char *transaction, char *function, char *line_buffer __maybe_unused, int line_max __maybe_unused, int timeout __maybe_unused) { BUFFER *wb = buffer_create(0, NULL); buffer_flush(wb); @@ -384,34 +620,41 @@ static void function_systemd_journal(const char *transaction, char *function, ch facets_accepted_param(facets, JOURNAL_PARAMETER_FACETS); facets_accepted_param(facets, JOURNAL_PARAMETER_HISTOGRAM); facets_accepted_param(facets, JOURNAL_PARAMETER_IF_MODIFIED_SINCE); + facets_accepted_param(facets, JOURNAL_PARAMETER_DATA_ONLY); // register the fields in the order you want them on the dashboard facets_register_dynamic_key_name(facets, "ND_JOURNAL_PROCESS", FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS, - systemd_journal_dynamic_row_id, NULL); + netdata_systemd_journal_dynamic_row_id, NULL); facets_register_key_name(facets, "MESSAGE", - FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT | FACET_KEY_OPTION_VISIBLE | - FACET_KEY_OPTION_FTS); + FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT | + FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS); + +// facets_register_dynamic_key_name(facets, "MESSAGE", +// FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT | FACET_KEY_OPTION_RICH_TEXT | +// FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS, +// netdata_systemd_journal_rich_message, NULL); facets_register_key_name_transformation(facets, "PRIORITY", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS, - systemd_journal_transform_priority, NULL); + netdata_systemd_journal_transform_priority, NULL); facets_register_key_name_transformation(facets, "SYSLOG_FACILITY", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS, - systemd_journal_transform_syslog_facility, NULL); + netdata_systemd_journal_transform_syslog_facility, NULL); facets_register_key_name(facets, "SYSLOG_IDENTIFIER", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS); facets_register_key_name(facets, "UNIT", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS); facets_register_key_name(facets, "USER_UNIT", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS); facets_register_key_name_transformation(facets, "_UID", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS, - systemd_journal_transform_uid, uids); + netdata_systemd_journal_transform_uid, uids); facets_register_key_name_transformation(facets, "_GID", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS, - systemd_journal_transform_gid, gids); + netdata_systemd_journal_transform_gid, gids); bool info = false; + bool data_only = false; time_t after_s = 0, before_s = 0; usec_t anchor = 0; usec_t if_modified_since = 0; @@ -430,12 +673,15 @@ static void function_systemd_journal(const char *transaction, char *function, ch if(!keyword) break; if(strcmp(keyword, JOURNAL_PARAMETER_HELP) == 0) { - systemd_journal_function_help(transaction); + netdata_systemd_journal_function_help(transaction); goto cleanup; } else if(strcmp(keyword, JOURNAL_PARAMETER_INFO) == 0) { info = true; } + else if(strcmp(keyword, JOURNAL_PARAMETER_DATA_ONLY) == 0) { + data_only = true; + } else if(strncmp(keyword, JOURNAL_PARAMETER_SOURCE ":", sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1) == 0) { source = &keyword[sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1]; } @@ -580,18 +826,22 @@ static void function_systemd_journal(const char *transaction, char *function, ch facets_set_query(facets, query); facets_set_histogram(facets, chart ? chart : "PRIORITY", after_s * USEC_PER_SEC, before_s * USEC_PER_SEC); - response = systemd_journal_query(wb, facets, after_s * USEC_PER_SEC, before_s * USEC_PER_SEC, - if_modified_since, now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC); + response = netdata_systemd_journal_query(wb, facets, after_s * USEC_PER_SEC, before_s * USEC_PER_SEC, + anchor, direction, last, + if_modified_since, data_only, + now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC); if(response != HTTP_RESP_OK) { + netdata_mutex_lock(&stdout_mutex); pluginsd_function_json_error_to_stdout(transaction, response, "failed"); + netdata_mutex_unlock(&stdout_mutex); goto cleanup; } output: - pluginsd_function_result_begin_to_stdout(transaction, response, "application/json", expires); - fwrite(buffer_tostring(wb), buffer_strlen(wb), 1, stdout); - pluginsd_function_result_end_to_stdout(); + netdata_mutex_lock(&stdout_mutex); + pluginsd_function_result_to_stdout(transaction, response, "application/json", expires, wb); + netdata_mutex_unlock(&stdout_mutex); cleanup: facets_destroy(facets); @@ -625,16 +875,14 @@ static void *reader_main(void *arg __maybe_unused) { int timeout = str2i(timeout_s); if(timeout <= 0) timeout = SYSTEMD_JOURNAL_DEFAULT_TIMEOUT; - netdata_mutex_lock(&mutex); - if(strncmp(function, SYSTEMD_JOURNAL_FUNCTION_NAME, strlen(SYSTEMD_JOURNAL_FUNCTION_NAME)) == 0) function_systemd_journal(transaction, function, buffer, PLUGINSD_LINE_MAX + 1, timeout); - else + else { + netdata_mutex_lock(&stdout_mutex); pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND, "No function with this name found in systemd-journal.plugin."); - - fflush(stdout); - netdata_mutex_unlock(&mutex); + netdata_mutex_unlock(&stdout_mutex); + } } } else @@ -691,16 +939,16 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) { usec_t step = 1000 * USEC_PER_MS; bool tty = isatty(fileno(stderr)) == 1; - netdata_mutex_lock(&mutex); + netdata_mutex_lock(&stdout_mutex); fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " GLOBAL \"%s\" %d \"%s\"\n", SYSTEMD_JOURNAL_FUNCTION_NAME, SYSTEMD_JOURNAL_DEFAULT_TIMEOUT, SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION); heartbeat_t hb; heartbeat_init(&hb); for(iteration = 0; 1 ; iteration++) { - netdata_mutex_unlock(&mutex); + netdata_mutex_unlock(&stdout_mutex); heartbeat_next(&hb, step); - netdata_mutex_lock(&mutex); + netdata_mutex_lock(&stdout_mutex); if(!tty) fprintf(stdout, "\n"); |