summaryrefslogtreecommitdiffstats
path: root/src/collectors/ebpf.plugin/ebpf_swap.c
diff options
context:
space:
mode:
authorthiagoftsm <thiagoftsm@gmail.com>2024-03-01 16:36:40 +0000
committerGitHub <noreply@github.com>2024-03-01 16:36:40 +0000
commit0dbf4bb90cdb5be417afbebfbbfb74d345b7ef10 (patch)
tree9556e64622b0784ebf90d6c04c19cd080db85b9c /src/collectors/ebpf.plugin/ebpf_swap.c
parentccb10a5fdeddeab785ffa697000bd8ae6a2d9b7a (diff)
Prepare to functions (eBPF) (#16788)
Diffstat (limited to 'src/collectors/ebpf.plugin/ebpf_swap.c')
-rw-r--r--src/collectors/ebpf.plugin/ebpf_swap.c236
1 files changed, 146 insertions, 90 deletions
diff --git a/src/collectors/ebpf.plugin/ebpf_swap.c b/src/collectors/ebpf.plugin/ebpf_swap.c
index 3c484539b9..1d405420a1 100644
--- a/src/collectors/ebpf.plugin/ebpf_swap.c
+++ b/src/collectors/ebpf.plugin/ebpf_swap.c
@@ -52,6 +52,17 @@ netdata_ebpf_targets_t swap_targets[] = { {.name = "swap_readpage", .mode = EBPF
{.name = "swap_writepage", .mode = EBPF_LOAD_TRAMPOLINE},
{.name = NULL, .mode = EBPF_LOAD_TRAMPOLINE}};
+struct netdata_static_thread ebpf_read_swap = {
+ .name = "EBPF_READ_SWAP",
+ .config_section = NULL,
+ .config_name = NULL,
+ .env_name = NULL,
+ .enabled = 1,
+ .thread = NULL,
+ .init_routine = NULL,
+ .start_routine = NULL
+};
+
#ifdef LIBBPF_MAJOR_VERSION
/**
* Disable probe
@@ -269,6 +280,7 @@ void ebpf_obsolete_swap_apps_charts(struct ebpf_module *em)
{
struct ebpf_target *w;
int update_every = em->update_every;
+ pthread_mutex_lock(&collect_data_mutex);
for (w = apps_groups_root_target; w; w = w->next) {
if (unlikely(!(w->charts_created & (1<<EBPF_MODULE_SWAP_IDX))))
continue;
@@ -296,6 +308,7 @@ void ebpf_obsolete_swap_apps_charts(struct ebpf_module *em)
update_every);
w->charts_created &= ~(1<<EBPF_MODULE_SWAP_IDX);
}
+ pthread_mutex_unlock(&collect_data_mutex);
}
/**
@@ -329,6 +342,9 @@ static void ebpf_swap_exit(void *ptr)
{
ebpf_module_t *em = (ebpf_module_t *)ptr;
+ if (ebpf_read_swap.thread)
+ netdata_thread_cancel(*ebpf_read_swap.thread);
+
if (em->enabled == NETDATA_THREAD_EBPF_FUNCTION_RUNNING) {
pthread_mutex_lock(&lock);
if (em->cgroup_charts) {
@@ -392,59 +408,24 @@ static void swap_apps_accumulator(netdata_publish_swap_t *out, int maps_per_core
}
/**
- * Fill PID
- *
- * Fill PID structures
- *
- * @param current_pid pid that we are collecting data
- * @param out values read from hash tables;
- */
-static void swap_fill_pid(uint32_t current_pid, netdata_publish_swap_t *publish)
-{
- netdata_publish_swap_t *curr = swap_pid[current_pid];
- if (!curr) {
- curr = callocz(1, sizeof(netdata_publish_swap_t));
- swap_pid[current_pid] = curr;
- }
-
- memcpy(curr, publish, sizeof(netdata_publish_swap_t));
-}
-
-/**
* Update cgroup
*
* Update cgroup data based in
- *
- * @param maps_per_core do I need to read all cores?
*/
-static void ebpf_update_swap_cgroup(int maps_per_core)
+static void ebpf_update_swap_cgroup()
{
ebpf_cgroup_target_t *ect ;
- netdata_publish_swap_t *cv = swap_vector;
- int fd = swap_maps[NETDATA_PID_SWAP_TABLE].map_fd;
- size_t length = sizeof(netdata_publish_swap_t);
- if (maps_per_core)
- length *= ebpf_nprocs;
pthread_mutex_lock(&mutex_cgroup_shm);
for (ect = ebpf_cgroup_pids; ect; ect = ect->next) {
struct pid_on_target2 *pids;
for (pids = ect->pids; pids; pids = pids->next) {
int pid = pids->pid;
netdata_publish_swap_t *out = &pids->swap;
- if (likely(swap_pid) && swap_pid[pid]) {
- netdata_publish_swap_t *in = swap_pid[pid];
+ ebpf_pid_stat_t *local_pid = ebpf_get_pid_entry(pid, 0);
+ if (local_pid) {
+ netdata_publish_swap_t *in = &local_pid->swap;
memcpy(out, in, sizeof(netdata_publish_swap_t));
- } else {
- memset(cv, 0, length);
- if (!bpf_map_lookup_elem(fd, &pid, cv)) {
- swap_apps_accumulator(cv, maps_per_core);
-
- memcpy(out, cv, sizeof(netdata_publish_swap_t));
-
- // We are cleaning to avoid passing data read from one process to other.
- memset(cv, 0, length);
- }
}
}
}
@@ -452,38 +433,143 @@ static void ebpf_update_swap_cgroup(int maps_per_core)
}
/**
+ * Sum PIDs
+ *
+ * Sum values for all targets.
+ *
+ * @param swap
+ * @param root
+ */
+static void ebpf_swap_sum_pids(netdata_publish_swap_t *swap, struct ebpf_pid_on_target *root)
+{
+ uint64_t local_read = 0;
+ uint64_t local_write = 0;
+
+ while (root) {
+ int32_t pid = root->pid;
+ ebpf_pid_stat_t *local_pid = ebpf_get_pid_entry(pid, 0);
+ if (local_pid) {
+ netdata_publish_swap_t *w = &local_pid->swap;
+ local_write += w->write;
+ local_read += w->read;
+ }
+ root = root->next;
+ }
+
+ // These conditions were added, because we are using incremental algorithm
+ swap->write = (local_write >= swap->write) ? local_write : swap->write;
+ swap->read = (local_read >= swap->read) ? local_read : swap->read;
+ }
+
+
+/**
+ * Resume apps data
+ */
+void ebpf_swap_resume_apps_data() {
+ struct ebpf_target *w;
+ for (w = apps_groups_root_target; w; w = w->next) {
+ if (unlikely(!(w->charts_created & (1 << EBPF_MODULE_SWAP_IDX))))
+ continue;
+
+ ebpf_swap_sum_pids(&w->swap, w->root_pid);
+ }
+}
+
+/**
* Read APPS table
*
* Read the apps table and store data inside the structure.
*
* @param maps_per_core do I need to read all cores?
*/
-static void read_swap_apps_table(int maps_per_core)
+static void ebpf_read_swap_apps_table(int maps_per_core, int max_period)
{
netdata_publish_swap_t *cv = swap_vector;
- uint32_t key;
- struct ebpf_pid_stat *pids = ebpf_root_of_pids;
int fd = swap_maps[NETDATA_PID_SWAP_TABLE].map_fd;
size_t length = sizeof(netdata_publish_swap_t);
if (maps_per_core)
length *= ebpf_nprocs;
- while (pids) {
- key = pids->pid;
+ uint32_t key = 0, next_key = 0;
+ while (bpf_map_get_next_key(fd, &key, &next_key) == 0) {
if (bpf_map_lookup_elem(fd, &key, cv)) {
- pids = pids->next;
- continue;
+ goto end_swap_loop;
}
swap_apps_accumulator(cv, maps_per_core);
- swap_fill_pid(key, cv);
+ ebpf_pid_stat_t *local_pid = ebpf_get_pid_entry(key, cv->tgid);
+ if (!local_pid)
+ goto end_swap_loop;
+
+ netdata_publish_swap_t *publish = &local_pid->swap;
+ if (!publish->ct || publish->ct != cv->ct) {
+ memcpy(publish, cv, sizeof(netdata_publish_swap_t));
+ local_pid->not_updated = 0;
+ } else if (++local_pid->not_updated >= max_period) {
+ bpf_map_delete_elem(fd, &key);
+ local_pid->not_updated = 0;
+ }
// We are cleaning to avoid passing data read from one process to other.
+end_swap_loop:
memset(cv, 0, length);
+ key = next_key;
+ }
+}
- pids = pids->next;
+/**
+ * SWAP thread
+ *
+ * Thread used to generate swap charts.
+ *
+ * @param ptr a pointer to `struct ebpf_module`
+ *
+ * @return It always return NULL
+ */
+void *ebpf_read_swap_thread(void *ptr)
+{
+ heartbeat_t hb;
+ heartbeat_init(&hb);
+
+ ebpf_module_t *em = (ebpf_module_t *)ptr;
+
+ int maps_per_core = em->maps_per_core;
+ int update_every = em->update_every;
+
+ int counter = update_every - 1;
+
+ uint32_t lifetime = em->lifetime;
+ uint32_t running_time = 0;
+ usec_t period = update_every * USEC_PER_SEC;
+ int max_period = update_every * EBPF_CLEANUP_FACTOR;
+
+ while (!ebpf_plugin_exit && running_time < lifetime) {
+ (void)heartbeat_next(&hb, period);
+ if (ebpf_plugin_exit || ++counter != update_every)
+ continue;
+
+ netdata_thread_disable_cancelability();
+
+ pthread_mutex_lock(&collect_data_mutex);
+ ebpf_read_swap_apps_table(maps_per_core, max_period);
+ ebpf_swap_resume_apps_data();
+ pthread_mutex_unlock(&collect_data_mutex);
+
+ counter = 0;
+
+ pthread_mutex_lock(&ebpf_exit_cleanup);
+ if (running_time && !em->running_time)
+ running_time = update_every;
+ else
+ running_time += update_every;
+
+ em->running_time = running_time;
+ pthread_mutex_unlock(&ebpf_exit_cleanup);
+ netdata_thread_enable_cancelability();
}
+
+ return NULL;
}
/**
@@ -526,34 +612,6 @@ static void ebpf_swap_read_global_table(netdata_idx_t *stats, int maps_per_core)
}
/**
- * Sum PIDs
- *
- * Sum values for all targets.
- *
- * @param swap
- * @param root
- */
-static void ebpf_swap_sum_pids(netdata_publish_swap_t *swap, struct ebpf_pid_on_target *root)
-{
- uint64_t local_read = 0;
- uint64_t local_write = 0;
-
- while (root) {
- int32_t pid = root->pid;
- netdata_publish_swap_t *w = swap_pid[pid];
- if (w) {
- local_write += w->write;
- local_read += w->read;
- }
- root = root->next;
- }
-
- // These conditions were added, because we are using incremental algorithm
- swap->write = (local_write >= swap->write) ? local_write : swap->write;
- swap->read = (local_read >= swap->read) ? local_read : swap->read;
-}
-
-/**
* Send data to Netdata calling auxiliary functions.
*
* @param root the target list.
@@ -561,12 +619,11 @@ static void ebpf_swap_sum_pids(netdata_publish_swap_t *swap, struct ebpf_pid_on_
void ebpf_swap_send_apps_data(struct ebpf_target *root)
{
struct ebpf_target *w;
+ pthread_mutex_lock(&collect_data_mutex);
for (w = root; w; w = w->next) {
if (unlikely(!(w->charts_created & (1<<EBPF_MODULE_SWAP_IDX))))
continue;
- ebpf_swap_sum_pids(&w->swap, w->root_pid);
-
ebpf_write_begin_chart(NETDATA_APP_FAMILY, w->clean_name, "_ebpf_call_swap_readpage");
write_chart_dimension("calls", (long long) w->swap.read);
ebpf_write_end_chart();
@@ -575,6 +632,7 @@ void ebpf_swap_send_apps_data(struct ebpf_target *root)
write_chart_dimension("calls", (long long) w->swap.write);
ebpf_write_end_chart();
}
+ pthread_mutex_unlock(&collect_data_mutex);
}
/**
@@ -791,12 +849,9 @@ static void swap_collector(ebpf_module_t *em)
counter = 0;
netdata_apps_integration_flags_t apps = em->apps_charts;
ebpf_swap_read_global_table(stats, maps_per_core);
- pthread_mutex_lock(&collect_data_mutex);
- if (apps)
- read_swap_apps_table(maps_per_core);
if (cgroup)
- ebpf_update_swap_cgroup(maps_per_core);
+ ebpf_update_swap_cgroup();
pthread_mutex_lock(&lock);
@@ -809,7 +864,6 @@ static void swap_collector(ebpf_module_t *em)
ebpf_swap_send_cgroup_data(update_every);
pthread_mutex_unlock(&lock);
- pthread_mutex_unlock(&collect_data_mutex);
pthread_mutex_lock(&ebpf_exit_cleanup);
if (running_time && !em->running_time)
@@ -884,14 +938,9 @@ void ebpf_swap_create_apps_charts(struct ebpf_module *em, void *ptr)
*
* We are not testing the return, because callocz does this and shutdown the software
* case it was not possible to allocate.
- *
- * @param apps is apps enabled?
*/
-static void ebpf_swap_allocate_global_vectors(int apps)
+static void ebpf_swap_allocate_global_vectors()
{
- if (apps)
- swap_pid = callocz((size_t)pid_max, sizeof(netdata_publish_swap_t *));
-
swap_vector = callocz((size_t)ebpf_nprocs, sizeof(netdata_publish_swap_t));
swap_values = callocz((size_t)ebpf_nprocs, sizeof(netdata_idx_t));
@@ -989,7 +1038,7 @@ void *ebpf_swap_thread(void *ptr)
goto endswap;
}
- ebpf_swap_allocate_global_vectors(em->apps_charts);
+ ebpf_swap_allocate_global_vectors();
int algorithms[NETDATA_SWAP_END] = { NETDATA_EBPF_INCREMENTAL_IDX, NETDATA_EBPF_INCREMENTAL_IDX };
ebpf_global_labels(swap_aggregated_data, swap_publish_aggregated, swap_dimension_name, swap_dimension_name,
@@ -1001,6 +1050,13 @@ void *ebpf_swap_thread(void *ptr)
ebpf_update_kernel_memory_with_vector(&plugin_statistics, em->maps, EBPF_ACTION_STAT_ADD);
pthread_mutex_unlock(&lock);
+ ebpf_read_swap.thread = mallocz(sizeof(netdata_thread_t));
+ netdata_thread_create(ebpf_read_swap.thread,
+ ebpf_read_swap.name,
+ NETDATA_THREAD_OPTION_DEFAULT,
+ ebpf_read_swap_thread,
+ em);
+
swap_collector(em);
endswap: