diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-09-14 15:59:22 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-14 15:59:22 +0300 |
commit | 934f620265d2e7a2be160cb084ec01875ed3491e (patch) | |
tree | 6bbf40643ff66c63867742ae5ecafb305d7119c5 /collectors | |
parent | e9d3dc658d50b74430b3b8a470bfa9ab338fd44f (diff) |
facets: data-only queries (#15961)
Diffstat (limited to 'collectors')
-rw-r--r-- | collectors/apps.plugin/apps_plugin.c | 54 | ||||
-rw-r--r-- | collectors/ebpf.plugin/ebpf_functions.c | 28 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.h | 8 | ||||
-rw-r--r-- | collectors/systemd-journal.plugin/systemd-journal.c | 388 |
4 files changed, 370 insertions, 108 deletions
diff --git a/collectors/apps.plugin/apps_plugin.c b/collectors/apps.plugin/apps_plugin.c index 7e1e0313a1..c3559433d6 100644 --- a/collectors/apps.plugin/apps_plugin.c +++ b/collectors/apps.plugin/apps_plugin.c @@ -4575,7 +4575,7 @@ static int check_capabilities() { } #endif -static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER; +static netdata_mutex_t stdout_mutex = NETDATA_MUTEX_INITIALIZER; #define PROCESS_FILTER_CATEGORY "category:" #define PROCESS_FILTER_USER "user:" @@ -4629,8 +4629,8 @@ static void get_MemTotal(void) { } static void apps_plugin_function_processes_help(const char *transaction) { - pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600); - fprintf(stdout, "%s", + BUFFER *wb = buffer_create(0, NULL); + buffer_sprintf(wb, "%s", "apps.plugin / processes\n" "\n" "Function `processes` presents all the currently running processes of the system.\n" @@ -4660,7 +4660,12 @@ static void apps_plugin_function_processes_help(const char *transaction) { "\n" "Filters can be combined. Each filter can be given only one time.\n" ); - pluginsd_function_result_end_to_stdout(); + + netdata_mutex_lock(&stdout_mutex); + pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600, wb); + netdata_mutex_unlock(&stdout_mutex); + + buffer_free(wb); } #define add_value_field_llu_with_max(wb, key, value) do { \ @@ -4696,24 +4701,31 @@ static void function_processes(const char *transaction, char *function __maybe_u if(!category && strncmp(keyword, PROCESS_FILTER_CATEGORY, strlen(PROCESS_FILTER_CATEGORY)) == 0) { category = find_target_by_name(apps_groups_root_target, &keyword[strlen(PROCESS_FILTER_CATEGORY)]); if(!category) { + netdata_mutex_lock(&stdout_mutex); pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_BAD_REQUEST, "No category with that name found."); + netdata_mutex_unlock(&stdout_mutex); + return; } } else if(!user && strncmp(keyword, PROCESS_FILTER_USER, strlen(PROCESS_FILTER_USER)) == 0) { user = find_target_by_name(users_root_target, &keyword[strlen(PROCESS_FILTER_USER)]); if(!user) { + netdata_mutex_lock(&stdout_mutex); pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_BAD_REQUEST, "No user with that name found."); + netdata_mutex_unlock(&stdout_mutex); return; } } else if(strncmp(keyword, PROCESS_FILTER_GROUP, strlen(PROCESS_FILTER_GROUP)) == 0) { group = find_target_by_name(groups_root_target, &keyword[strlen(PROCESS_FILTER_GROUP)]); if(!group) { + netdata_mutex_lock(&stdout_mutex); pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_BAD_REQUEST, "No group with that name found."); + netdata_mutex_unlock(&stdout_mutex); return; } } @@ -4739,13 +4751,14 @@ static void function_processes(const char *transaction, char *function __maybe_u else { char msg[PLUGINSD_LINE_MAX]; snprintfz(msg, PLUGINSD_LINE_MAX, "Invalid parameter '%s'", keyword); + netdata_mutex_lock(&stdout_mutex); pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_BAD_REQUEST, msg); + netdata_mutex_unlock(&stdout_mutex); return; } } time_t expires = now_realtime_sec() + update_every; - pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "application/json", expires); unsigned int cpu_divisor = time_factor * RATES_DETAIL / 100; unsigned int memory_divisor = 1024; @@ -5523,10 +5536,11 @@ static void function_processes(const char *transaction, char *function __maybe_u buffer_json_member_add_time_t(wb, "expires", expires); buffer_json_finalize(wb); - fwrite(buffer_tostring(wb), buffer_strlen(wb), 1, stdout); - buffer_free(wb); + netdata_mutex_lock(&stdout_mutex); + pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "application/json", expires, wb); + netdata_mutex_unlock(&stdout_mutex); - pluginsd_function_result_end_to_stdout(); + buffer_free(wb); } static bool apps_plugin_exit = false; @@ -5560,16 +5574,14 @@ static void *reader_main(void *arg __maybe_unused) { // internal_error(true, "Received function '%s', transaction '%s', timeout %d", function, transaction, timeout); - netdata_mutex_lock(&mutex); - if(strncmp(function, "processes", strlen("processes")) == 0) function_processes(transaction, function, buffer, PLUGINSD_LINE_MAX + 1, timeout); - else + else { + netdata_mutex_lock(&stdout_mutex); pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND, "No function with this name found in apps.plugin."); - - fflush(stdout); - netdata_mutex_unlock(&mutex); + netdata_mutex_unlock(&stdout_mutex); + } // internal_error(true, "Done with function '%s', transaction '%s', timeout %d", function, transaction, timeout); } @@ -5692,7 +5704,7 @@ int main(int argc, char **argv) { netdata_thread_t reader_thread; netdata_thread_create(&reader_thread, "APPS_READER", NETDATA_THREAD_OPTION_DONT_LOG, reader_main, NULL); - netdata_mutex_lock(&mutex); + netdata_mutex_lock(&stdout_mutex); APPS_PLUGIN_GLOBAL_FUNCTIONS(); @@ -5701,7 +5713,7 @@ int main(int argc, char **argv) { heartbeat_t hb; heartbeat_init(&hb); for(; !apps_plugin_exit ; global_iterations_counter++) { - netdata_mutex_unlock(&mutex); + netdata_mutex_unlock(&stdout_mutex); #ifdef NETDATA_PROFILING #warning "compiling for profiling" @@ -5712,16 +5724,16 @@ int main(int argc, char **argv) { #else usec_t dt = heartbeat_next(&hb, step); #endif - netdata_mutex_lock(&mutex); + netdata_mutex_lock(&stdout_mutex); struct pollfd pollfd = { .fd = fileno(stdout), .events = POLLERR }; if (unlikely(poll(&pollfd, 1, 0) < 0)) { - netdata_mutex_unlock(&mutex); + netdata_mutex_unlock(&stdout_mutex); netdata_thread_cancel(reader_thread); fatal("Cannot check if a pipe is available"); } if (unlikely(pollfd.revents & POLLERR)) { - netdata_mutex_unlock(&mutex); + netdata_mutex_unlock(&stdout_mutex); netdata_thread_cancel(reader_thread); fatal("Received error on read pipe."); } @@ -5732,7 +5744,7 @@ int main(int argc, char **argv) { if(!collect_data_for_all_processes()) { netdata_log_error("Cannot collect /proc data for running processes. Disabling apps.plugin..."); printf("DISABLE\n"); - netdata_mutex_unlock(&mutex); + netdata_mutex_unlock(&stdout_mutex); netdata_thread_cancel(reader_thread); exit(1); } @@ -5769,5 +5781,5 @@ int main(int argc, char **argv) { debug_log("done Loop No %zu", global_iterations_counter); } - netdata_mutex_unlock(&mutex); + netdata_mutex_unlock(&stdout_mutex); } diff --git a/collectors/ebpf.plugin/ebpf_functions.c b/collectors/ebpf.plugin/ebpf_functions.c index 7a43692bc0..d4f4687a7d 100644 --- a/collectors/ebpf.plugin/ebpf_functions.c +++ b/collectors/ebpf.plugin/ebpf_functions.c @@ -37,9 +37,8 @@ ebpf_module_t *ebpf_functions_select_module(const char *thread_name) { * @param transaction the transaction id that Netdata sent for this function execution */ static void ebpf_function_thread_manipulation_help(const char *transaction) { - pthread_mutex_lock(&lock); - pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600); - fprintf(stdout, "%s", + BUFFER *wb = buffer_create(0, NULL); + buffer_sprintf(wb, "%s", "ebpf.plugin / thread\n" "\n" "Function `thread` allows user to control eBPF threads.\n" @@ -59,9 +58,12 @@ static void ebpf_function_thread_manipulation_help(const char *transaction) { "Filters can be combined. Each filter can be given only one time.\n" "Process thread is not controlled by functions until we finish the creation of functions per thread..\n" ); - pluginsd_function_result_end_to_stdout(); - fflush(stdout); + + pthread_mutex_lock(&lock); + pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600, wb); pthread_mutex_unlock(&lock); + + buffer_free(wb); } @@ -79,12 +81,9 @@ static void ebpf_function_thread_manipulation_help(const char *transaction) { * @param msg the error message */ static void ebpf_function_error(const char *transaction, int code, const char *msg) { - char buffer[PLUGINSD_LINE_MAX + 1]; - json_escape_string(buffer, msg, PLUGINSD_LINE_MAX); - - pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec()); - fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer); - pluginsd_function_result_end_to_stdout(); + pthread_mutex_lock(&lock); + pluginsd_function_json_error_to_stdout(transaction, code, msg); + pthread_mutex_unlock(&lock); } /***************************************************************** @@ -350,12 +349,7 @@ static void ebpf_function_thread_manipulation(const char *transaction, // Lock necessary to avoid race condition pthread_mutex_lock(&lock); - pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "application/json", expires); - - fwrite(buffer_tostring(wb), buffer_strlen(wb), 1, stdout); - - pluginsd_function_result_end_to_stdout(); - fflush(stdout); + pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "application/json", expires, wb); pthread_mutex_unlock(&lock); buffer_free(wb); diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h index 135f30eca4..594fa70170 100644 --- a/collectors/plugins.d/plugins_d.h +++ b/collectors/plugins.d/plugins_d.h @@ -130,6 +130,14 @@ static inline void pluginsd_function_json_error_to_stdout(const char *transactio pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec()); fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer); pluginsd_function_result_end_to_stdout(); + fflush(stdout); +} + +static inline void pluginsd_function_result_to_stdout(const char *transaction, int code, const char *content_type, time_t expires, BUFFER *result) { + pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires); + fwrite(buffer_tostring(result), buffer_strlen(result), 1, stdout); + pluginsd_function_result_end_to_stdout(); + fflush(stdout); } #endif /* NETDATA_PLUGINS_D_H */ diff --git a/collectors/systemd-journal.plugin/systemd-journal.c b/collectors/systemd-journal.plugin/systemd-journal.c index 1fe8804a77..32ae597026 100644 --- a/collectors/systemd-journal.plugin/systemd-journal.c +++ b/collectors/systemd-journal.plugin/systemd-journal.c @@ -22,6 +22,7 @@ #define SYSTEMD_JOURNAL_MAX_PARAMS 100 #define SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION (3 * 3600) #define SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY 200 +#define SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED 50 #define JOURNAL_PARAMETER_HELP "help" #define JOURNAL_PARAMETER_AFTER "after" @@ -33,6 +34,7 @@ #define JOURNAL_PARAMETER_HISTOGRAM "histogram" #define JOURNAL_PARAMETER_DIRECTION "direction" #define JOURNAL_PARAMETER_IF_MODIFIED_SINCE "if_modified_since" +#define JOURNAL_PARAMETER_DATA_ONLY "data_only" #define JOURNAL_PARAMETER_SOURCE "source" #define JOURNAL_PARAMETER_INFO "info" @@ -53,7 +55,7 @@ "|IMAGE_NAME" \ "" -static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER; +static netdata_mutex_t stdout_mutex = NETDATA_MUTEX_INITIALIZER; static bool plugin_should_exit = false; DICTIONARY *uids = NULL; @@ -61,7 +63,7 @@ DICTIONARY *gids = NULL; // ---------------------------------------------------------------------------- -int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t if_modified_since, usec_t stop_monotonic_ut) { +static inline sd_journal *netdata_open_systemd_journal(void) { sd_journal *j = NULL; int r; @@ -82,36 +84,78 @@ int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t be if (r < 0) { netdata_log_error("SYSTEMD-JOURNAL: Failed to open SystemD Journal, with error %d", r); - return HTTP_RESP_INTERNAL_SERVER_ERROR; + return NULL; } - facets_rows_begin(facets); + return j; +} + +typedef enum { + ND_SD_JOURNAL_FAILED_TO_SEEK, + ND_SD_JOURNAL_TIMED_OUT, + ND_SD_JOURNAL_OK, + ND_SD_JOURNAL_NOT_MODIFIED, +} ND_SD_JOURNAL_STATUS; + +static inline bool netdata_systemd_journal_seek_to(sd_journal *j, usec_t timestamp) { + if(sd_journal_seek_realtime_usec(j, timestamp) < 0) { + netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to %" PRIu64, timestamp); + if(sd_journal_seek_tail(j) < 0) { + netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to journal's tail"); + return false; + } + } + + return true; +} + +static inline void netdata_systemd_journal_process_row(sd_journal *j, FACETS *facets) { + const void *data; + size_t length; + SD_JOURNAL_FOREACH_DATA(j, data, length) { + const char *key = data; + const char *equal = strchr(key, '='); + if(unlikely(!equal)) + continue; + + const char *value = ++equal; + size_t key_length = value - key; // including '\0' + + char key_copy[key_length]; + memcpy(key_copy, key, key_length - 1); + key_copy[key_length - 1] = '\0'; + + size_t value_length = length - key_length; // without '\0' + facets_add_key_value_length(facets, key_copy, key_length - 1, value, value_length <= FACET_MAX_VALUE_LENGTH ? value_length : FACET_MAX_VALUE_LENGTH); + } +} + +ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_full(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t if_modified_since, usec_t stop_monotonic_ut, usec_t *last_modified) { + if(!netdata_systemd_journal_seek_to(j, before_ut)) + return ND_SD_JOURNAL_FAILED_TO_SEEK; - uint64_t first_msg_ut = 0; + size_t errors_no_timestamp = 0; + usec_t first_msg_ut = 0; bool timed_out = false; size_t row_counter = 0; // the entries are not guaranteed to be sorted, so we process up to 100 entries beyond // the end of the query to find possibly useful logs for our time-frame - size_t excess_rows_allowed = 100; + size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED; - if(sd_journal_seek_realtime_usec(j, before_ut) < 0) { - netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to %" PRIu64, before_ut); - if(sd_journal_seek_tail(j) < 0) { - netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to journal's tail"); - goto finalize; - } - } + facets_rows_begin(facets); while (sd_journal_previous(j) > 0) { row_counter++; - uint64_t msg_ut; - sd_journal_get_realtime_usec(j, &msg_ut); + usec_t msg_ut; + if(sd_journal_get_realtime_usec(j, &msg_ut) < 0) { + errors_no_timestamp++; + continue; + } if(unlikely(!first_msg_ut)) { if(msg_ut == if_modified_since) { - sd_journal_close(j); - return HTTP_RESP_NOT_MODIFIED; + return ND_SD_JOURNAL_NOT_MODIFIED; } first_msg_ut = msg_ut; @@ -127,26 +171,65 @@ int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t be continue; } - const void *data; - size_t length; - SD_JOURNAL_FOREACH_DATA(j, data, length) { - const char *key = data; - const char *equal = strchr(key, '='); - if(unlikely(!equal)) - continue; + netdata_systemd_journal_process_row(j, facets); + facets_row_finished(facets, msg_ut); + + if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) { + timed_out = true; + break; + } + } + + if(errors_no_timestamp) + netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp); + + *last_modified = first_msg_ut; - const char *value = ++equal; - size_t key_length = value - key; // including '\0' + if(timed_out) + return ND_SD_JOURNAL_TIMED_OUT; - char key_copy[key_length]; - memcpy(key_copy, key, key_length - 1); - key_copy[key_length - 1] = '\0'; + return ND_SD_JOURNAL_OK; +} + +ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_forward(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t anchor, size_t entries, usec_t stop_monotonic_ut) { + if(!netdata_systemd_journal_seek_to(j, anchor)) + return ND_SD_JOURNAL_FAILED_TO_SEEK; + + size_t errors_no_timestamp = 0; + bool timed_out = false; + size_t row_counter = 0; + size_t rows_added = 0; + + // the entries are not guaranteed to be sorted, so we process up to 100 entries beyond + // the end of the query to find possibly useful logs for our time-frame + size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED; + + facets_rows_begin(facets); + while (sd_journal_next(j) > 0) { + row_counter++; + + usec_t msg_ut; + if(sd_journal_get_realtime_usec(j, &msg_ut) < 0) { + errors_no_timestamp++; + continue; + } + + if (msg_ut > before_ut || msg_ut <= anchor) + continue; + + if (msg_ut < after_ut) { + if(--excess_rows_allowed == 0) + break; - size_t value_length = length - key_length; // without '\0' - facets_add_key_value_length(facets, key_copy, key_length - 1, value, value_length <= FACET_MAX_VALUE_LENGTH ? value_length : FACET_MAX_VALUE_LENGTH); + continue; } + if(rows_added > entries && --excess_rows_allowed == 0) + break; + + netdata_systemd_journal_process_row(j, facets); facets_row_finished(facets, msg_ut); + rows_added++; if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) { timed_out = true; @@ -154,27 +237,147 @@ int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t be } } -finalize: + if(errors_no_timestamp) + netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp); + + if(timed_out) + return ND_SD_JOURNAL_TIMED_OUT; + + return ND_SD_JOURNAL_OK; +} + +ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_backward(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t anchor, size_t entries, usec_t stop_monotonic_ut) { + if(!netdata_systemd_journal_seek_to(j, anchor)) + return ND_SD_JOURNAL_FAILED_TO_SEEK; + + size_t errors_no_timestamp = 0; + bool timed_out = false; + size_t row_counter = 0; + size_t rows_added = 0; + + // the entries are not guaranteed to be sorted, so we process up to 100 entries beyond + // the end of the query to find possibly useful logs for our time-frame + size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED; + + facets_rows_begin(facets); + while (sd_journal_previous(j) > 0) { + row_counter++; + + usec_t msg_ut; + if(sd_journal_get_realtime_usec(j, &msg_ut) < 0) { + errors_no_timestamp++; + continue; + } + + if (msg_ut > before_ut || msg_ut >= anchor) + continue; + + if (msg_ut < after_ut) { + if(--excess_rows_allowed == 0) + break; + + continue; + } + + if(rows_added > entries && --excess_rows_allowed == 0) + break; + + netdata_systemd_journal_process_row(j, facets); + facets_row_finished(facets, msg_ut); + rows_added++; + + if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) { + timed_out = true; + break; + } + } + + if(errors_no_timestamp) + netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp); + + if(timed_out) + return ND_SD_JOURNAL_TIMED_OUT; + + return ND_SD_JOURNAL_OK; +} + +bool netdata_systemd_journal_check_if_modified_since(sd_journal *j, usec_t seek_to, usec_t last_modified) { + // return true, if data have been modified since the timestamp + + if(!last_modified || !seek_to) + return false; + + if(!netdata_systemd_journal_seek_to(j, seek_to)) + return false; + + usec_t first_msg_ut = 0; + while (sd_journal_previous(j) > 0) { + usec_t msg_ut; + if(sd_journal_get_realtime_usec(j, &msg_ut) < 0) + continue; + + first_msg_ut = msg_ut; + break; + } + + return first_msg_ut != last_modified; +} + +static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, + usec_t after_ut, usec_t before_ut, + usec_t anchor, FACETS_ANCHOR_DIRECTION direction, size_t entries, + usec_t if_modified_since, bool data_only, + usec_t stop_monotonic_ut) { + sd_journal *j = netdata_open_systemd_journal(); + if(!j) + return HTTP_RESP_INTERNAL_SERVER_ERROR; + + usec_t last_modified = 0; + + ND_SD_JOURNAL_STATUS status; + + if(data_only && anchor /* && !netdata_systemd_journal_check_if_modified_since(j, before_ut, if_modified_since) */) { + facets_data_only_mode(facets); + + // we can do a data-only query + if(direction == FACETS_ANCHOR_DIRECTION_FORWARD) + status = netdata_systemd_journal_query_data_forward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut); + else + status = netdata_systemd_journal_query_data_backward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut); + } + else { + // we have to do a full query + status = netdata_systemd_journal_query_full(j, wb, facets, + after_ut, before_ut, if_modified_since, + stop_monotonic_ut, &last_modified); + } + sd_journal_close(j); - buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK); - buffer_json_member_add_boolean(wb, "partial", timed_out); + if(status == ND_SD_JOURNAL_NOT_MODIFIED) + return HTTP_RESP_NOT_MODIFIED; + + buffer_json_member_add_uint64(wb, "status", status == ND_SD_JOURNAL_FAILED_TO_SEEK ? HTTP_RESP_INTERNAL_SERVER_ERROR : HTTP_RESP_OK); + buffer_json_member_add_boolean(wb, "partial", status != ND_SD_JOURNAL_OK); buffer_json_member_add_string(wb, "type", "table"); - buffer_json_member_add_time_t(wb, "update_every", 1); - buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION); - buffer_json_member_add_uint64(wb, "last_modified", first_msg_ut); + + if(!data_only) { + buffer_json_member_add_time_t(wb, "update_every", 1); + buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION); + buffer_json_member_add_uint64(wb, "last_modified", last_modified); + } facets_report(facets, wb); - buffer_json_member_add_time_t(wb, "expires", now_realtime_sec()); + buffer_json_member_add_time_t(wb, "expires", now_realtime_sec() + data_only ? 3600 : 0); buffer_json_finalize(wb); - return HTTP_RESP_OK; + return status == ND_SD_JOURNAL_FAILED_TO_SEEK ? HTTP_RESP_INTERNAL_SERVER_ERROR : HTTP_RESP_OK; } -static void systemd_journal_function_help(const char *transaction) { - pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600); - fprintf(stdout, +static void netdata_systemd_journal_function_help(const char *transaction) { + BUFFER *wb = buffer_create(0, NULL); + buffer_sprintf(wb, "%s / %s\n" "\n" "%s\n" @@ -212,7 +415,12 @@ static void systemd_journal_function_help(const char *transaction) { , -SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION , SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY ); - pluginsd_function_result_end_to_stdout(); + + netdata_mutex_lock(&stdout_mutex); + pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600, wb); + netdata_mutex_unlock(&stdout_mutex); + + buffer_free(wb); } static const char *syslog_facility_to_name(int facility) { @@ -255,6 +463,26 @@ static const char *syslog_priority_to_name(int priority) { } } +static FACET_ROW_SEVERITY syslog_priority_to_facet_severity(int priority) { + // same to + // https://github.com/systemd/systemd/blob/aab9e4b2b86905a15944a1ac81e471b5b7075932/src/basic/terminal-util.c#L1501 + // function get_log_colors() + + if(priority <= LOG_ERR) + return FACET_ROW_SEVERITY_CRITICAL; + + else if (priority <= LOG_WARNING) + return FACET_ROW_SEVERITY_WARNING; + + else if(priority <= LOG_NOTICE) + return FACET_ROW_SEVERITY_NOTICE; + + else if(priority >= LOG_DEBUG) + return FACET_ROW_SEVERITY_DEBUG; + + return FACET_ROW_SEVERITY_NORMAL; +} + static char *uid_to_username(uid_t uid, char *buffer, size_t buffer_size) { struct passwd pw, *result; char tmp[1024 + 1]; @@ -277,7 +505,7 @@ static char *gid_to_groupname(gid_t gid, char* buffer, size_t buffer_size) { return buffer; } -static void systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) { +static void netdata_systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) { const char *v = buffer_tostring(wb); if(*v && isdigit(*v)) { int facility = str2i(buffer_tostring(wb)); @@ -289,7 +517,7 @@ static void systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unu } } -static void systemd_journal_transform_priority(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) { +static void netdata_systemd_journal_transform_priority(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) { const char *v = buffer_tostring(wb); if(*v && isdigit(*v)) { int priority = str2i(buffer_tostring(wb)); @@ -298,10 +526,12 @@ static void systemd_journal_transform_priority(FACETS *facets __maybe_unused, BU buffer_flush(wb); buffer_strcat(wb, name); } + + facets_set_current_row_severity(facets, syslog_priority_to_facet_severity(priority)); } } -static void systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) { +static void netdata_systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) { DICTIONARY *cache = data; const char *v = buffer_tostring(wb); if(*v && isdigit(*v)) { @@ -321,7 +551,7 @@ static void systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER } } -static void systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) { +static void netdata_systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) { DICTIONARY *cache = data; const char *v = buffer_tostring(wb); if(*v && isdigit(*v)) { @@ -341,7 +571,7 @@ static void systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER } } -static void systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) { +static void netdata_systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) { FACET_ROW_KEY_VALUE *pid_rkv = dictionary_get(row->dict, "_PID"); const char *pid = pid_rkv ? buffer_tostring(pid_rkv->wb) : FACET_VALUE_UNSET; @@ -363,6 +593,12 @@ static void systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER buffer_json_add_array_item_string(json_array, buffer_tostring(rkv->wb)); } +static void netdata_systemd_journal_rich_message(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) { + buffer_json_add_array_item_object(json_array); + buffer_json_member_add_string(json_array, "value", buffer_tostring(rkv->wb)); + buffer_json_object_close(json_array); +} + static void function_systemd_journal(const char *transaction, char *function, char *line_buffer __maybe_unused, int line_max __maybe_unused, int timeout __maybe_unused) { BUFFER *wb = buffer_create(0, NULL); buffer_flush(wb); @@ -384,34 +620,41 @@ static void function_systemd_journal(const char *transaction, char *function, ch facets_accepted_param(facets, JOURNAL_PARAMETER_FACETS); facets_accepted_param(facets, JOURNAL_PARAMETER_HISTOGRAM); facets_accepted_param(facets, JOURNAL_PARAMETER_IF_MODIFIED_SINCE); + facets_accepted_param(facets, JOURNAL_PARAMETER_DATA_ONLY); // register the fields in the order you want them on the dashboard facets_register_dynamic_key_name(facets, "ND_JOURNAL_PROCESS", FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS, - systemd_journal_dynamic_row_id, NULL); + netdata_systemd_journal_dynamic_row_id, NULL); facets_register_key_name(facets, "MESSAGE", - FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT | FACET_KEY_OPTION_VISIBLE | - FACET_KEY_OPTION_FTS); + FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT | + FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS); + +// facets_register_dynamic_key_name(facets, "MESSAGE", +// FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT | FACET_KEY_OPTION_RICH_TEXT | +// FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS, +// netdata_systemd_journal_rich_message, NULL); facets_register_key_name_transformation(facets, "PRIORITY", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS, - systemd_journal_transform_priority, NULL); + netdata_systemd_journal_transform_priority, NULL); facets_register_key_name_transformation(facets, "SYSLOG_FACILITY", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS, - systemd_journal_transform_syslog_facility, NULL); + netdata_systemd_journal_transform_syslog_facility, NULL); facets_register_key_name(facets, "SYSLOG_IDENTIFIER", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS); facets_register_key_name(facets, "UNIT", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS); facets_register_key_name(facets, "USER_UNIT", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS); facets_register_key_name_transformation(facets, "_UID", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS, - systemd_journal_transform_uid, uids); + netdata_systemd_journal_transform_uid, uids); facets_register_key_name_transformation(facets, "_GID", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS, - systemd_journal_transform_gid, gids); + netdata_systemd_journal_transform_gid, gids); bool info = false; + bool data_only = false; time_t after_s = 0, before_s = 0; usec_t anchor = 0; usec_t if_modified_since = 0; @@ -430,12 +673,15 @@ static void function_systemd_journal(const char *transaction, char *function, ch if(!keyword) break; if(strcmp(keyword, JOURNAL_PARAMETER_HELP) == 0) { - systemd_journal_function_help(transaction); + netdata_systemd_journal_function_help(transaction); goto cleanup; } else if(strcmp(keyword, JOURNAL_PARAMETER_INFO) == 0) { info = true; } + else if(strcmp(keyword, JOURNAL_PARAMETER_DATA_ONLY) == 0) { + data_only = true; + } else if(strncmp(keyword, JOURNAL_PARAMETER_SOURCE ":", sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1) == 0) { source = &keyword[sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1]; } @@ -580,18 +826,22 @@ static void function_systemd_journal(const char *transaction, char *function, ch facets_set_query(facets, query); facets_set_histogram(facets, chart ? chart : "PRIORITY", after_s * USEC_PER_SEC, before_s * USEC_PER_SEC); - response = systemd_journal_query(wb, facets, after_s * USEC_PER_SEC, before_s * USEC_PER_SEC, - if_modified_since, now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC); + response = netdata_systemd_journal_query(wb, facets, after_s * USEC_PER_SEC, before_s * USEC_PER_SEC, + anchor, direction, last, + if_modified_since, data_only, + now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC); if(response != HTTP_RESP_OK) { + netdata_mutex_lock(&stdout_mutex); pluginsd_function_json_error_to_stdout(transaction, response, "failed"); + |