summaryrefslogtreecommitdiffstats
path: root/collectors
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-09-14 15:59:22 +0300
committerGitHub <noreply@github.com>2023-09-14 15:59:22 +0300
commit934f620265d2e7a2be160cb084ec01875ed3491e (patch)
tree6bbf40643ff66c63867742ae5ecafb305d7119c5 /collectors
parente9d3dc658d50b74430b3b8a470bfa9ab338fd44f (diff)
facets: data-only queries (#15961)
Diffstat (limited to 'collectors')
-rw-r--r--collectors/apps.plugin/apps_plugin.c54
-rw-r--r--collectors/ebpf.plugin/ebpf_functions.c28
-rw-r--r--collectors/plugins.d/plugins_d.h8
-rw-r--r--collectors/systemd-journal.plugin/systemd-journal.c388
4 files changed, 370 insertions, 108 deletions
diff --git a/collectors/apps.plugin/apps_plugin.c b/collectors/apps.plugin/apps_plugin.c
index 7e1e0313a1..c3559433d6 100644
--- a/collectors/apps.plugin/apps_plugin.c
+++ b/collectors/apps.plugin/apps_plugin.c
@@ -4575,7 +4575,7 @@ static int check_capabilities() {
}
#endif
-static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
+static netdata_mutex_t stdout_mutex = NETDATA_MUTEX_INITIALIZER;
#define PROCESS_FILTER_CATEGORY "category:"
#define PROCESS_FILTER_USER "user:"
@@ -4629,8 +4629,8 @@ static void get_MemTotal(void) {
}
static void apps_plugin_function_processes_help(const char *transaction) {
- pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600);
- fprintf(stdout, "%s",
+ BUFFER *wb = buffer_create(0, NULL);
+ buffer_sprintf(wb, "%s",
"apps.plugin / processes\n"
"\n"
"Function `processes` presents all the currently running processes of the system.\n"
@@ -4660,7 +4660,12 @@ static void apps_plugin_function_processes_help(const char *transaction) {
"\n"
"Filters can be combined. Each filter can be given only one time.\n"
);
- pluginsd_function_result_end_to_stdout();
+
+ netdata_mutex_lock(&stdout_mutex);
+ pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600, wb);
+ netdata_mutex_unlock(&stdout_mutex);
+
+ buffer_free(wb);
}
#define add_value_field_llu_with_max(wb, key, value) do { \
@@ -4696,24 +4701,31 @@ static void function_processes(const char *transaction, char *function __maybe_u
if(!category && strncmp(keyword, PROCESS_FILTER_CATEGORY, strlen(PROCESS_FILTER_CATEGORY)) == 0) {
category = find_target_by_name(apps_groups_root_target, &keyword[strlen(PROCESS_FILTER_CATEGORY)]);
if(!category) {
+ netdata_mutex_lock(&stdout_mutex);
pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_BAD_REQUEST,
"No category with that name found.");
+ netdata_mutex_unlock(&stdout_mutex);
+
return;
}
}
else if(!user && strncmp(keyword, PROCESS_FILTER_USER, strlen(PROCESS_FILTER_USER)) == 0) {
user = find_target_by_name(users_root_target, &keyword[strlen(PROCESS_FILTER_USER)]);
if(!user) {
+ netdata_mutex_lock(&stdout_mutex);
pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_BAD_REQUEST,
"No user with that name found.");
+ netdata_mutex_unlock(&stdout_mutex);
return;
}
}
else if(strncmp(keyword, PROCESS_FILTER_GROUP, strlen(PROCESS_FILTER_GROUP)) == 0) {
group = find_target_by_name(groups_root_target, &keyword[strlen(PROCESS_FILTER_GROUP)]);
if(!group) {
+ netdata_mutex_lock(&stdout_mutex);
pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_BAD_REQUEST,
"No group with that name found.");
+ netdata_mutex_unlock(&stdout_mutex);
return;
}
}
@@ -4739,13 +4751,14 @@ static void function_processes(const char *transaction, char *function __maybe_u
else {
char msg[PLUGINSD_LINE_MAX];
snprintfz(msg, PLUGINSD_LINE_MAX, "Invalid parameter '%s'", keyword);
+ netdata_mutex_lock(&stdout_mutex);
pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_BAD_REQUEST, msg);
+ netdata_mutex_unlock(&stdout_mutex);
return;
}
}
time_t expires = now_realtime_sec() + update_every;
- pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "application/json", expires);
unsigned int cpu_divisor = time_factor * RATES_DETAIL / 100;
unsigned int memory_divisor = 1024;
@@ -5523,10 +5536,11 @@ static void function_processes(const char *transaction, char *function __maybe_u
buffer_json_member_add_time_t(wb, "expires", expires);
buffer_json_finalize(wb);
- fwrite(buffer_tostring(wb), buffer_strlen(wb), 1, stdout);
- buffer_free(wb);
+ netdata_mutex_lock(&stdout_mutex);
+ pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "application/json", expires, wb);
+ netdata_mutex_unlock(&stdout_mutex);
- pluginsd_function_result_end_to_stdout();
+ buffer_free(wb);
}
static bool apps_plugin_exit = false;
@@ -5560,16 +5574,14 @@ static void *reader_main(void *arg __maybe_unused) {
// internal_error(true, "Received function '%s', transaction '%s', timeout %d", function, transaction, timeout);
- netdata_mutex_lock(&mutex);
-
if(strncmp(function, "processes", strlen("processes")) == 0)
function_processes(transaction, function, buffer, PLUGINSD_LINE_MAX + 1, timeout);
- else
+ else {
+ netdata_mutex_lock(&stdout_mutex);
pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND,
"No function with this name found in apps.plugin.");
-
- fflush(stdout);
- netdata_mutex_unlock(&mutex);
+ netdata_mutex_unlock(&stdout_mutex);
+ }
// internal_error(true, "Done with function '%s', transaction '%s', timeout %d", function, transaction, timeout);
}
@@ -5692,7 +5704,7 @@ int main(int argc, char **argv) {
netdata_thread_t reader_thread;
netdata_thread_create(&reader_thread, "APPS_READER", NETDATA_THREAD_OPTION_DONT_LOG, reader_main, NULL);
- netdata_mutex_lock(&mutex);
+ netdata_mutex_lock(&stdout_mutex);
APPS_PLUGIN_GLOBAL_FUNCTIONS();
@@ -5701,7 +5713,7 @@ int main(int argc, char **argv) {
heartbeat_t hb;
heartbeat_init(&hb);
for(; !apps_plugin_exit ; global_iterations_counter++) {
- netdata_mutex_unlock(&mutex);
+ netdata_mutex_unlock(&stdout_mutex);
#ifdef NETDATA_PROFILING
#warning "compiling for profiling"
@@ -5712,16 +5724,16 @@ int main(int argc, char **argv) {
#else
usec_t dt = heartbeat_next(&hb, step);
#endif
- netdata_mutex_lock(&mutex);
+ netdata_mutex_lock(&stdout_mutex);
struct pollfd pollfd = { .fd = fileno(stdout), .events = POLLERR };
if (unlikely(poll(&pollfd, 1, 0) < 0)) {
- netdata_mutex_unlock(&mutex);
+ netdata_mutex_unlock(&stdout_mutex);
netdata_thread_cancel(reader_thread);
fatal("Cannot check if a pipe is available");
}
if (unlikely(pollfd.revents & POLLERR)) {
- netdata_mutex_unlock(&mutex);
+ netdata_mutex_unlock(&stdout_mutex);
netdata_thread_cancel(reader_thread);
fatal("Received error on read pipe.");
}
@@ -5732,7 +5744,7 @@ int main(int argc, char **argv) {
if(!collect_data_for_all_processes()) {
netdata_log_error("Cannot collect /proc data for running processes. Disabling apps.plugin...");
printf("DISABLE\n");
- netdata_mutex_unlock(&mutex);
+ netdata_mutex_unlock(&stdout_mutex);
netdata_thread_cancel(reader_thread);
exit(1);
}
@@ -5769,5 +5781,5 @@ int main(int argc, char **argv) {
debug_log("done Loop No %zu", global_iterations_counter);
}
- netdata_mutex_unlock(&mutex);
+ netdata_mutex_unlock(&stdout_mutex);
}
diff --git a/collectors/ebpf.plugin/ebpf_functions.c b/collectors/ebpf.plugin/ebpf_functions.c
index 7a43692bc0..d4f4687a7d 100644
--- a/collectors/ebpf.plugin/ebpf_functions.c
+++ b/collectors/ebpf.plugin/ebpf_functions.c
@@ -37,9 +37,8 @@ ebpf_module_t *ebpf_functions_select_module(const char *thread_name) {
* @param transaction the transaction id that Netdata sent for this function execution
*/
static void ebpf_function_thread_manipulation_help(const char *transaction) {
- pthread_mutex_lock(&lock);
- pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600);
- fprintf(stdout, "%s",
+ BUFFER *wb = buffer_create(0, NULL);
+ buffer_sprintf(wb, "%s",
"ebpf.plugin / thread\n"
"\n"
"Function `thread` allows user to control eBPF threads.\n"
@@ -59,9 +58,12 @@ static void ebpf_function_thread_manipulation_help(const char *transaction) {
"Filters can be combined. Each filter can be given only one time.\n"
"Process thread is not controlled by functions until we finish the creation of functions per thread..\n"
);
- pluginsd_function_result_end_to_stdout();
- fflush(stdout);
+
+ pthread_mutex_lock(&lock);
+ pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600, wb);
pthread_mutex_unlock(&lock);
+
+ buffer_free(wb);
}
@@ -79,12 +81,9 @@ static void ebpf_function_thread_manipulation_help(const char *transaction) {
* @param msg the error message
*/
static void ebpf_function_error(const char *transaction, int code, const char *msg) {
- char buffer[PLUGINSD_LINE_MAX + 1];
- json_escape_string(buffer, msg, PLUGINSD_LINE_MAX);
-
- pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec());
- fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer);
- pluginsd_function_result_end_to_stdout();
+ pthread_mutex_lock(&lock);
+ pluginsd_function_json_error_to_stdout(transaction, code, msg);
+ pthread_mutex_unlock(&lock);
}
/*****************************************************************
@@ -350,12 +349,7 @@ static void ebpf_function_thread_manipulation(const char *transaction,
// Lock necessary to avoid race condition
pthread_mutex_lock(&lock);
- pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "application/json", expires);
-
- fwrite(buffer_tostring(wb), buffer_strlen(wb), 1, stdout);
-
- pluginsd_function_result_end_to_stdout();
- fflush(stdout);
+ pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "application/json", expires, wb);
pthread_mutex_unlock(&lock);
buffer_free(wb);
diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h
index 135f30eca4..594fa70170 100644
--- a/collectors/plugins.d/plugins_d.h
+++ b/collectors/plugins.d/plugins_d.h
@@ -130,6 +130,14 @@ static inline void pluginsd_function_json_error_to_stdout(const char *transactio
pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec());
fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer);
pluginsd_function_result_end_to_stdout();
+ fflush(stdout);
+}
+
+static inline void pluginsd_function_result_to_stdout(const char *transaction, int code, const char *content_type, time_t expires, BUFFER *result) {
+ pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires);
+ fwrite(buffer_tostring(result), buffer_strlen(result), 1, stdout);
+ pluginsd_function_result_end_to_stdout();
+ fflush(stdout);
}
#endif /* NETDATA_PLUGINS_D_H */
diff --git a/collectors/systemd-journal.plugin/systemd-journal.c b/collectors/systemd-journal.plugin/systemd-journal.c
index 1fe8804a77..32ae597026 100644
--- a/collectors/systemd-journal.plugin/systemd-journal.c
+++ b/collectors/systemd-journal.plugin/systemd-journal.c
@@ -22,6 +22,7 @@
#define SYSTEMD_JOURNAL_MAX_PARAMS 100
#define SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION (3 * 3600)
#define SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY 200
+#define SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED 50
#define JOURNAL_PARAMETER_HELP "help"
#define JOURNAL_PARAMETER_AFTER "after"
@@ -33,6 +34,7 @@
#define JOURNAL_PARAMETER_HISTOGRAM "histogram"
#define JOURNAL_PARAMETER_DIRECTION "direction"
#define JOURNAL_PARAMETER_IF_MODIFIED_SINCE "if_modified_since"
+#define JOURNAL_PARAMETER_DATA_ONLY "data_only"
#define JOURNAL_PARAMETER_SOURCE "source"
#define JOURNAL_PARAMETER_INFO "info"
@@ -53,7 +55,7 @@
"|IMAGE_NAME" \
""
-static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
+static netdata_mutex_t stdout_mutex = NETDATA_MUTEX_INITIALIZER;
static bool plugin_should_exit = false;
DICTIONARY *uids = NULL;
@@ -61,7 +63,7 @@ DICTIONARY *gids = NULL;
// ----------------------------------------------------------------------------
-int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t if_modified_since, usec_t stop_monotonic_ut) {
+static inline sd_journal *netdata_open_systemd_journal(void) {
sd_journal *j = NULL;
int r;
@@ -82,36 +84,78 @@ int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t be
if (r < 0) {
netdata_log_error("SYSTEMD-JOURNAL: Failed to open SystemD Journal, with error %d", r);
- return HTTP_RESP_INTERNAL_SERVER_ERROR;
+ return NULL;
}
- facets_rows_begin(facets);
+ return j;
+}
+
+typedef enum {
+ ND_SD_JOURNAL_FAILED_TO_SEEK,
+ ND_SD_JOURNAL_TIMED_OUT,
+ ND_SD_JOURNAL_OK,
+ ND_SD_JOURNAL_NOT_MODIFIED,
+} ND_SD_JOURNAL_STATUS;
+
+static inline bool netdata_systemd_journal_seek_to(sd_journal *j, usec_t timestamp) {
+ if(sd_journal_seek_realtime_usec(j, timestamp) < 0) {
+ netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to %" PRIu64, timestamp);
+ if(sd_journal_seek_tail(j) < 0) {
+ netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to journal's tail");
+ return false;
+ }
+ }
+
+ return true;
+}
+
+static inline void netdata_systemd_journal_process_row(sd_journal *j, FACETS *facets) {
+ const void *data;
+ size_t length;
+ SD_JOURNAL_FOREACH_DATA(j, data, length) {
+ const char *key = data;
+ const char *equal = strchr(key, '=');
+ if(unlikely(!equal))
+ continue;
+
+ const char *value = ++equal;
+ size_t key_length = value - key; // including '\0'
+
+ char key_copy[key_length];
+ memcpy(key_copy, key, key_length - 1);
+ key_copy[key_length - 1] = '\0';
+
+ size_t value_length = length - key_length; // without '\0'
+ facets_add_key_value_length(facets, key_copy, key_length - 1, value, value_length <= FACET_MAX_VALUE_LENGTH ? value_length : FACET_MAX_VALUE_LENGTH);
+ }
+}
+
+ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_full(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t if_modified_since, usec_t stop_monotonic_ut, usec_t *last_modified) {
+ if(!netdata_systemd_journal_seek_to(j, before_ut))
+ return ND_SD_JOURNAL_FAILED_TO_SEEK;
- uint64_t first_msg_ut = 0;
+ size_t errors_no_timestamp = 0;
+ usec_t first_msg_ut = 0;
bool timed_out = false;
size_t row_counter = 0;
// the entries are not guaranteed to be sorted, so we process up to 100 entries beyond
// the end of the query to find possibly useful logs for our time-frame
- size_t excess_rows_allowed = 100;
+ size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED;
- if(sd_journal_seek_realtime_usec(j, before_ut) < 0) {
- netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to %" PRIu64, before_ut);
- if(sd_journal_seek_tail(j) < 0) {
- netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to journal's tail");
- goto finalize;
- }
- }
+ facets_rows_begin(facets);
while (sd_journal_previous(j) > 0) {
row_counter++;
- uint64_t msg_ut;
- sd_journal_get_realtime_usec(j, &msg_ut);
+ usec_t msg_ut;
+ if(sd_journal_get_realtime_usec(j, &msg_ut) < 0) {
+ errors_no_timestamp++;
+ continue;
+ }
if(unlikely(!first_msg_ut)) {
if(msg_ut == if_modified_since) {
- sd_journal_close(j);
- return HTTP_RESP_NOT_MODIFIED;
+ return ND_SD_JOURNAL_NOT_MODIFIED;
}
first_msg_ut = msg_ut;
@@ -127,26 +171,65 @@ int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t be
continue;
}
- const void *data;
- size_t length;
- SD_JOURNAL_FOREACH_DATA(j, data, length) {
- const char *key = data;
- const char *equal = strchr(key, '=');
- if(unlikely(!equal))
- continue;
+ netdata_systemd_journal_process_row(j, facets);
+ facets_row_finished(facets, msg_ut);
+
+ if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) {
+ timed_out = true;
+ break;
+ }
+ }
+
+ if(errors_no_timestamp)
+ netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp);
+
+ *last_modified = first_msg_ut;
- const char *value = ++equal;
- size_t key_length = value - key; // including '\0'
+ if(timed_out)
+ return ND_SD_JOURNAL_TIMED_OUT;
- char key_copy[key_length];
- memcpy(key_copy, key, key_length - 1);
- key_copy[key_length - 1] = '\0';
+ return ND_SD_JOURNAL_OK;
+}
+
+ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_forward(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t anchor, size_t entries, usec_t stop_monotonic_ut) {
+ if(!netdata_systemd_journal_seek_to(j, anchor))
+ return ND_SD_JOURNAL_FAILED_TO_SEEK;
+
+ size_t errors_no_timestamp = 0;
+ bool timed_out = false;
+ size_t row_counter = 0;
+ size_t rows_added = 0;
+
+ // the entries are not guaranteed to be sorted, so we process up to 100 entries beyond
+ // the end of the query to find possibly useful logs for our time-frame
+ size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED;
+
+ facets_rows_begin(facets);
+ while (sd_journal_next(j) > 0) {
+ row_counter++;
+
+ usec_t msg_ut;
+ if(sd_journal_get_realtime_usec(j, &msg_ut) < 0) {
+ errors_no_timestamp++;
+ continue;
+ }
+
+ if (msg_ut > before_ut || msg_ut <= anchor)
+ continue;
+
+ if (msg_ut < after_ut) {
+ if(--excess_rows_allowed == 0)
+ break;
- size_t value_length = length - key_length; // without '\0'
- facets_add_key_value_length(facets, key_copy, key_length - 1, value, value_length <= FACET_MAX_VALUE_LENGTH ? value_length : FACET_MAX_VALUE_LENGTH);
+ continue;
}
+ if(rows_added > entries && --excess_rows_allowed == 0)
+ break;
+
+ netdata_systemd_journal_process_row(j, facets);
facets_row_finished(facets, msg_ut);
+ rows_added++;
if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) {
timed_out = true;
@@ -154,27 +237,147 @@ int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t be
}
}
-finalize:
+ if(errors_no_timestamp)
+ netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp);
+
+ if(timed_out)
+ return ND_SD_JOURNAL_TIMED_OUT;
+
+ return ND_SD_JOURNAL_OK;
+}
+
+ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_data_backward(sd_journal *j, BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t anchor, size_t entries, usec_t stop_monotonic_ut) {
+ if(!netdata_systemd_journal_seek_to(j, anchor))
+ return ND_SD_JOURNAL_FAILED_TO_SEEK;
+
+ size_t errors_no_timestamp = 0;
+ bool timed_out = false;
+ size_t row_counter = 0;
+ size_t rows_added = 0;
+
+ // the entries are not guaranteed to be sorted, so we process up to 100 entries beyond
+ // the end of the query to find possibly useful logs for our time-frame
+ size_t excess_rows_allowed = SYSTEMD_JOURNAL_EXCESS_ROWS_ALLOWED;
+
+ facets_rows_begin(facets);
+ while (sd_journal_previous(j) > 0) {
+ row_counter++;
+
+ usec_t msg_ut;
+ if(sd_journal_get_realtime_usec(j, &msg_ut) < 0) {
+ errors_no_timestamp++;
+ continue;
+ }
+
+ if (msg_ut > before_ut || msg_ut >= anchor)
+ continue;
+
+ if (msg_ut < after_ut) {
+ if(--excess_rows_allowed == 0)
+ break;
+
+ continue;
+ }
+
+ if(rows_added > entries && --excess_rows_allowed == 0)
+ break;
+
+ netdata_systemd_journal_process_row(j, facets);
+ facets_row_finished(facets, msg_ut);
+ rows_added++;
+
+ if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) {
+ timed_out = true;
+ break;
+ }
+ }
+
+ if(errors_no_timestamp)
+ netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp);
+
+ if(timed_out)
+ return ND_SD_JOURNAL_TIMED_OUT;
+
+ return ND_SD_JOURNAL_OK;
+}
+
+bool netdata_systemd_journal_check_if_modified_since(sd_journal *j, usec_t seek_to, usec_t last_modified) {
+ // return true, if data have been modified since the timestamp
+
+ if(!last_modified || !seek_to)
+ return false;
+
+ if(!netdata_systemd_journal_seek_to(j, seek_to))
+ return false;
+
+ usec_t first_msg_ut = 0;
+ while (sd_journal_previous(j) > 0) {
+ usec_t msg_ut;
+ if(sd_journal_get_realtime_usec(j, &msg_ut) < 0)
+ continue;
+
+ first_msg_ut = msg_ut;
+ break;
+ }
+
+ return first_msg_ut != last_modified;
+}
+
+static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets,
+ usec_t after_ut, usec_t before_ut,
+ usec_t anchor, FACETS_ANCHOR_DIRECTION direction, size_t entries,
+ usec_t if_modified_since, bool data_only,
+ usec_t stop_monotonic_ut) {
+ sd_journal *j = netdata_open_systemd_journal();
+ if(!j)
+ return HTTP_RESP_INTERNAL_SERVER_ERROR;
+
+ usec_t last_modified = 0;
+
+ ND_SD_JOURNAL_STATUS status;
+
+ if(data_only && anchor /* && !netdata_systemd_journal_check_if_modified_since(j, before_ut, if_modified_since) */) {
+ facets_data_only_mode(facets);
+
+ // we can do a data-only query
+ if(direction == FACETS_ANCHOR_DIRECTION_FORWARD)
+ status = netdata_systemd_journal_query_data_forward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut);
+ else
+ status = netdata_systemd_journal_query_data_backward(j, wb, facets, after_ut, before_ut, anchor, entries, stop_monotonic_ut);
+ }
+ else {
+ // we have to do a full query
+ status = netdata_systemd_journal_query_full(j, wb, facets,
+ after_ut, before_ut, if_modified_since,
+ stop_monotonic_ut, &last_modified);
+ }
+
sd_journal_close(j);
- buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
- buffer_json_member_add_boolean(wb, "partial", timed_out);
+ if(status == ND_SD_JOURNAL_NOT_MODIFIED)
+ return HTTP_RESP_NOT_MODIFIED;
+
+ buffer_json_member_add_uint64(wb, "status", status == ND_SD_JOURNAL_FAILED_TO_SEEK ? HTTP_RESP_INTERNAL_SERVER_ERROR : HTTP_RESP_OK);
+ buffer_json_member_add_boolean(wb, "partial", status != ND_SD_JOURNAL_OK);
buffer_json_member_add_string(wb, "type", "table");
- buffer_json_member_add_time_t(wb, "update_every", 1);
- buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
- buffer_json_member_add_uint64(wb, "last_modified", first_msg_ut);
+
+ if(!data_only) {
+ buffer_json_member_add_time_t(wb, "update_every", 1);
+ buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
+ buffer_json_member_add_uint64(wb, "last_modified", last_modified);
+ }
facets_report(facets, wb);
- buffer_json_member_add_time_t(wb, "expires", now_realtime_sec());
+ buffer_json_member_add_time_t(wb, "expires", now_realtime_sec() + data_only ? 3600 : 0);
buffer_json_finalize(wb);
- return HTTP_RESP_OK;
+ return status == ND_SD_JOURNAL_FAILED_TO_SEEK ? HTTP_RESP_INTERNAL_SERVER_ERROR : HTTP_RESP_OK;
}
-static void systemd_journal_function_help(const char *transaction) {
- pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600);
- fprintf(stdout,
+static void netdata_systemd_journal_function_help(const char *transaction) {
+ BUFFER *wb = buffer_create(0, NULL);
+ buffer_sprintf(wb,
"%s / %s\n"
"\n"
"%s\n"
@@ -212,7 +415,12 @@ static void systemd_journal_function_help(const char *transaction) {
, -SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION
, SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY
);
- pluginsd_function_result_end_to_stdout();
+
+ netdata_mutex_lock(&stdout_mutex);
+ pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600, wb);
+ netdata_mutex_unlock(&stdout_mutex);
+
+ buffer_free(wb);
}
static const char *syslog_facility_to_name(int facility) {
@@ -255,6 +463,26 @@ static const char *syslog_priority_to_name(int priority) {
}
}
+static FACET_ROW_SEVERITY syslog_priority_to_facet_severity(int priority) {
+ // same to
+ // https://github.com/systemd/systemd/blob/aab9e4b2b86905a15944a1ac81e471b5b7075932/src/basic/terminal-util.c#L1501
+ // function get_log_colors()
+
+ if(priority <= LOG_ERR)
+ return FACET_ROW_SEVERITY_CRITICAL;
+
+ else if (priority <= LOG_WARNING)
+ return FACET_ROW_SEVERITY_WARNING;
+
+ else if(priority <= LOG_NOTICE)
+ return FACET_ROW_SEVERITY_NOTICE;
+
+ else if(priority >= LOG_DEBUG)
+ return FACET_ROW_SEVERITY_DEBUG;
+
+ return FACET_ROW_SEVERITY_NORMAL;
+}
+
static char *uid_to_username(uid_t uid, char *buffer, size_t buffer_size) {
struct passwd pw, *result;
char tmp[1024 + 1];
@@ -277,7 +505,7 @@ static char *gid_to_groupname(gid_t gid, char* buffer, size_t buffer_size) {
return buffer;
}
-static void systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) {
+static void netdata_systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) {
const char *v = buffer_tostring(wb);
if(*v && isdigit(*v)) {
int facility = str2i(buffer_tostring(wb));
@@ -289,7 +517,7 @@ static void systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unu
}
}
-static void systemd_journal_transform_priority(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) {
+static void netdata_systemd_journal_transform_priority(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) {
const char *v = buffer_tostring(wb);
if(*v && isdigit(*v)) {
int priority = str2i(buffer_tostring(wb));
@@ -298,10 +526,12 @@ static void systemd_journal_transform_priority(FACETS *facets __maybe_unused, BU
buffer_flush(wb);
buffer_strcat(wb, name);
}
+
+ facets_set_current_row_severity(facets, syslog_priority_to_facet_severity(priority));
}
}
-static void systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) {
+static void netdata_systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) {
DICTIONARY *cache = data;
const char *v = buffer_tostring(wb);
if(*v && isdigit(*v)) {
@@ -321,7 +551,7 @@ static void systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER
}
}
-static void systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) {
+static void netdata_systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) {
DICTIONARY *cache = data;
const char *v = buffer_tostring(wb);
if(*v && isdigit(*v)) {
@@ -341,7 +571,7 @@ static void systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER
}
}
-static void systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) {
+static void netdata_systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) {
FACET_ROW_KEY_VALUE *pid_rkv = dictionary_get(row->dict, "_PID");
const char *pid = pid_rkv ? buffer_tostring(pid_rkv->wb) : FACET_VALUE_UNSET;
@@ -363,6 +593,12 @@ static void systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER
buffer_json_add_array_item_string(json_array, buffer_tostring(rkv->wb));
}
+static void netdata_systemd_journal_rich_message(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) {
+ buffer_json_add_array_item_object(json_array);
+ buffer_json_member_add_string(json_array, "value", buffer_tostring(rkv->wb));
+ buffer_json_object_close(json_array);
+}
+
static void function_systemd_journal(const char *transaction, char *function, char *line_buffer __maybe_unused, int line_max __maybe_unused, int timeout __maybe_unused) {
BUFFER *wb = buffer_create(0, NULL);
buffer_flush(wb);
@@ -384,34 +620,41 @@ static void function_systemd_journal(const char *transaction, char *function, ch
facets_accepted_param(facets, JOURNAL_PARAMETER_FACETS);
facets_accepted_param(facets, JOURNAL_PARAMETER_HISTOGRAM);
facets_accepted_param(facets, JOURNAL_PARAMETER_IF_MODIFIED_SINCE);
+ facets_accepted_param(facets, JOURNAL_PARAMETER_DATA_ONLY);
// register the fields in the order you want them on the dashboard
facets_register_dynamic_key_name(facets, "ND_JOURNAL_PROCESS",
FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS,
- systemd_journal_dynamic_row_id, NULL);
+ netdata_systemd_journal_dynamic_row_id, NULL);
facets_register_key_name(facets, "MESSAGE",
- FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT | FACET_KEY_OPTION_VISIBLE |
- FACET_KEY_OPTION_FTS);
+ FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT |
+ FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS);
+
+// facets_register_dynamic_key_name(facets, "MESSAGE",
+// FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT | FACET_KEY_OPTION_RICH_TEXT |
+// FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS,
+// netdata_systemd_journal_rich_message, NULL);
facets_register_key_name_transformation(facets, "PRIORITY", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS,
- systemd_journal_transform_priority, NULL);
+ netdata_systemd_journal_transform_priority, NULL);
facets_register_key_name_transformation(facets, "SYSLOG_FACILITY", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS,
- systemd_journal_transform_syslog_facility, NULL);
+ netdata_systemd_journal_transform_syslog_facility, NULL);
facets_register_key_name(facets, "SYSLOG_IDENTIFIER", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS);
facets_register_key_name(facets, "UNIT", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS);
facets_register_key_name(facets, "USER_UNIT", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS);
facets_register_key_name_transformation(facets, "_UID", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS,
- systemd_journal_transform_uid, uids);
+ netdata_systemd_journal_transform_uid, uids);
facets_register_key_name_transformation(facets, "_GID", FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS,
- systemd_journal_transform_gid, gids);
+ netdata_systemd_journal_transform_gid, gids);
bool info = false;
+ bool data_only = false;
time_t after_s = 0, before_s = 0;
usec_t anchor = 0;
usec_t if_modified_since = 0;
@@ -430,12 +673,15 @@ static void function_systemd_journal(const char *transaction, char *function, ch
if(!keyword) break;
if(strcmp(keyword, JOURNAL_PARAMETER_HELP) == 0) {
- systemd_journal_function_help(transaction);
+ netdata_systemd_journal_function_help(transaction);
goto cleanup;
}
else if(strcmp(keyword, JOURNAL_PARAMETER_INFO) == 0) {
info = true;
}
+ else if(strcmp(keyword, JOURNAL_PARAMETER_DATA_ONLY) == 0) {
+ data_only = true;
+ }
else if(strncmp(keyword, JOURNAL_PARAMETER_SOURCE ":", sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1) == 0) {
source = &keyword[sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1];
}
@@ -580,18 +826,22 @@ static void function_systemd_journal(const char *transaction, char *function, ch
facets_set_query(facets, query);
facets_set_histogram(facets, chart ? chart : "PRIORITY", after_s * USEC_PER_SEC, before_s * USEC_PER_SEC);
- response = systemd_journal_query(wb, facets, after_s * USEC_PER_SEC, before_s * USEC_PER_SEC,
- if_modified_since, now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC);
+ response = netdata_systemd_journal_query(wb, facets, after_s * USEC_PER_SEC, before_s * USEC_PER_SEC,
+ anchor, direction, last,
+ if_modified_since, data_only,
+ now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC);
if(response != HTTP_RESP_OK) {
+ netdata_mutex_lock(&stdout_mutex);
pluginsd_function_json_error_to_stdout(transaction, response, "failed");
+