diff options
author | thiagoftsm <thiagoftsm@gmail.com> | 2024-03-01 16:36:40 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-01 16:36:40 +0000 |
commit | 0dbf4bb90cdb5be417afbebfbbfb74d345b7ef10 (patch) | |
tree | 9556e64622b0784ebf90d6c04c19cd080db85b9c /src/collectors/ebpf.plugin/ebpf_swap.c | |
parent | ccb10a5fdeddeab785ffa697000bd8ae6a2d9b7a (diff) |
Prepare to functions (eBPF) (#16788)
Diffstat (limited to 'src/collectors/ebpf.plugin/ebpf_swap.c')
-rw-r--r-- | src/collectors/ebpf.plugin/ebpf_swap.c | 236 |
1 files changed, 146 insertions, 90 deletions
diff --git a/src/collectors/ebpf.plugin/ebpf_swap.c b/src/collectors/ebpf.plugin/ebpf_swap.c index 3c484539b9..1d405420a1 100644 --- a/src/collectors/ebpf.plugin/ebpf_swap.c +++ b/src/collectors/ebpf.plugin/ebpf_swap.c @@ -52,6 +52,17 @@ netdata_ebpf_targets_t swap_targets[] = { {.name = "swap_readpage", .mode = EBPF {.name = "swap_writepage", .mode = EBPF_LOAD_TRAMPOLINE}, {.name = NULL, .mode = EBPF_LOAD_TRAMPOLINE}}; +struct netdata_static_thread ebpf_read_swap = { + .name = "EBPF_READ_SWAP", + .config_section = NULL, + .config_name = NULL, + .env_name = NULL, + .enabled = 1, + .thread = NULL, + .init_routine = NULL, + .start_routine = NULL +}; + #ifdef LIBBPF_MAJOR_VERSION /** * Disable probe @@ -269,6 +280,7 @@ void ebpf_obsolete_swap_apps_charts(struct ebpf_module *em) { struct ebpf_target *w; int update_every = em->update_every; + pthread_mutex_lock(&collect_data_mutex); for (w = apps_groups_root_target; w; w = w->next) { if (unlikely(!(w->charts_created & (1<<EBPF_MODULE_SWAP_IDX)))) continue; @@ -296,6 +308,7 @@ void ebpf_obsolete_swap_apps_charts(struct ebpf_module *em) update_every); w->charts_created &= ~(1<<EBPF_MODULE_SWAP_IDX); } + pthread_mutex_unlock(&collect_data_mutex); } /** @@ -329,6 +342,9 @@ static void ebpf_swap_exit(void *ptr) { ebpf_module_t *em = (ebpf_module_t *)ptr; + if (ebpf_read_swap.thread) + netdata_thread_cancel(*ebpf_read_swap.thread); + if (em->enabled == NETDATA_THREAD_EBPF_FUNCTION_RUNNING) { pthread_mutex_lock(&lock); if (em->cgroup_charts) { @@ -392,59 +408,24 @@ static void swap_apps_accumulator(netdata_publish_swap_t *out, int maps_per_core } /** - * Fill PID - * - * Fill PID structures - * - * @param current_pid pid that we are collecting data - * @param out values read from hash tables; - */ -static void swap_fill_pid(uint32_t current_pid, netdata_publish_swap_t *publish) -{ - netdata_publish_swap_t *curr = swap_pid[current_pid]; - if (!curr) { - curr = callocz(1, sizeof(netdata_publish_swap_t)); - swap_pid[current_pid] = curr; - } - - memcpy(curr, publish, sizeof(netdata_publish_swap_t)); -} - -/** * Update cgroup * * Update cgroup data based in - * - * @param maps_per_core do I need to read all cores? */ -static void ebpf_update_swap_cgroup(int maps_per_core) +static void ebpf_update_swap_cgroup() { ebpf_cgroup_target_t *ect ; - netdata_publish_swap_t *cv = swap_vector; - int fd = swap_maps[NETDATA_PID_SWAP_TABLE].map_fd; - size_t length = sizeof(netdata_publish_swap_t); - if (maps_per_core) - length *= ebpf_nprocs; pthread_mutex_lock(&mutex_cgroup_shm); for (ect = ebpf_cgroup_pids; ect; ect = ect->next) { struct pid_on_target2 *pids; for (pids = ect->pids; pids; pids = pids->next) { int pid = pids->pid; netdata_publish_swap_t *out = &pids->swap; - if (likely(swap_pid) && swap_pid[pid]) { - netdata_publish_swap_t *in = swap_pid[pid]; + ebpf_pid_stat_t *local_pid = ebpf_get_pid_entry(pid, 0); + if (local_pid) { + netdata_publish_swap_t *in = &local_pid->swap; memcpy(out, in, sizeof(netdata_publish_swap_t)); - } else { - memset(cv, 0, length); - if (!bpf_map_lookup_elem(fd, &pid, cv)) { - swap_apps_accumulator(cv, maps_per_core); - - memcpy(out, cv, sizeof(netdata_publish_swap_t)); - - // We are cleaning to avoid passing data read from one process to other. - memset(cv, 0, length); - } } } } @@ -452,38 +433,143 @@ static void ebpf_update_swap_cgroup(int maps_per_core) } /** + * Sum PIDs + * + * Sum values for all targets. + * + * @param swap + * @param root + */ +static void ebpf_swap_sum_pids(netdata_publish_swap_t *swap, struct ebpf_pid_on_target *root) +{ + uint64_t local_read = 0; + uint64_t local_write = 0; + + while (root) { + int32_t pid = root->pid; + ebpf_pid_stat_t *local_pid = ebpf_get_pid_entry(pid, 0); + if (local_pid) { + netdata_publish_swap_t *w = &local_pid->swap; + local_write += w->write; + local_read += w->read; + } + root = root->next; + } + + // These conditions were added, because we are using incremental algorithm + swap->write = (local_write >= swap->write) ? local_write : swap->write; + swap->read = (local_read >= swap->read) ? local_read : swap->read; + } + + +/** + * Resume apps data + */ +void ebpf_swap_resume_apps_data() { + struct ebpf_target *w; + for (w = apps_groups_root_target; w; w = w->next) { + if (unlikely(!(w->charts_created & (1 << EBPF_MODULE_SWAP_IDX)))) + continue; + + ebpf_swap_sum_pids(&w->swap, w->root_pid); + } +} + +/** * Read APPS table * * Read the apps table and store data inside the structure. * * @param maps_per_core do I need to read all cores? */ -static void read_swap_apps_table(int maps_per_core) +static void ebpf_read_swap_apps_table(int maps_per_core, int max_period) { netdata_publish_swap_t *cv = swap_vector; - uint32_t key; - struct ebpf_pid_stat *pids = ebpf_root_of_pids; int fd = swap_maps[NETDATA_PID_SWAP_TABLE].map_fd; size_t length = sizeof(netdata_publish_swap_t); if (maps_per_core) length *= ebpf_nprocs; - while (pids) { - key = pids->pid; + uint32_t key = 0, next_key = 0; + while (bpf_map_get_next_key(fd, &key, &next_key) == 0) { if (bpf_map_lookup_elem(fd, &key, cv)) { - pids = pids->next; - continue; + goto end_swap_loop; } swap_apps_accumulator(cv, maps_per_core); - swap_fill_pid(key, cv); + ebpf_pid_stat_t *local_pid = ebpf_get_pid_entry(key, cv->tgid); + if (!local_pid) + goto end_swap_loop; + + netdata_publish_swap_t *publish = &local_pid->swap; + if (!publish->ct || publish->ct != cv->ct) { + memcpy(publish, cv, sizeof(netdata_publish_swap_t)); + local_pid->not_updated = 0; + } else if (++local_pid->not_updated >= max_period) { + bpf_map_delete_elem(fd, &key); + local_pid->not_updated = 0; + } // We are cleaning to avoid passing data read from one process to other. +end_swap_loop: memset(cv, 0, length); + key = next_key; + } +} - pids = pids->next; +/** + * SWAP thread + * + * Thread used to generate swap charts. + * + * @param ptr a pointer to `struct ebpf_module` + * + * @return It always return NULL + */ +void *ebpf_read_swap_thread(void *ptr) +{ + heartbeat_t hb; + heartbeat_init(&hb); + + ebpf_module_t *em = (ebpf_module_t *)ptr; + + int maps_per_core = em->maps_per_core; + int update_every = em->update_every; + + int counter = update_every - 1; + + uint32_t lifetime = em->lifetime; + uint32_t running_time = 0; + usec_t period = update_every * USEC_PER_SEC; + int max_period = update_every * EBPF_CLEANUP_FACTOR; + + while (!ebpf_plugin_exit && running_time < lifetime) { + (void)heartbeat_next(&hb, period); + if (ebpf_plugin_exit || ++counter != update_every) + continue; + + netdata_thread_disable_cancelability(); + + pthread_mutex_lock(&collect_data_mutex); + ebpf_read_swap_apps_table(maps_per_core, max_period); + ebpf_swap_resume_apps_data(); + pthread_mutex_unlock(&collect_data_mutex); + + counter = 0; + + pthread_mutex_lock(&ebpf_exit_cleanup); + if (running_time && !em->running_time) + running_time = update_every; + else + running_time += update_every; + + em->running_time = running_time; + pthread_mutex_unlock(&ebpf_exit_cleanup); + netdata_thread_enable_cancelability(); } + + return NULL; } /** @@ -526,34 +612,6 @@ static void ebpf_swap_read_global_table(netdata_idx_t *stats, int maps_per_core) } /** - * Sum PIDs - * - * Sum values for all targets. - * - * @param swap - * @param root - */ -static void ebpf_swap_sum_pids(netdata_publish_swap_t *swap, struct ebpf_pid_on_target *root) -{ - uint64_t local_read = 0; - uint64_t local_write = 0; - - while (root) { - int32_t pid = root->pid; - netdata_publish_swap_t *w = swap_pid[pid]; - if (w) { - local_write += w->write; - local_read += w->read; - } - root = root->next; - } - - // These conditions were added, because we are using incremental algorithm - swap->write = (local_write >= swap->write) ? local_write : swap->write; - swap->read = (local_read >= swap->read) ? local_read : swap->read; -} - -/** * Send data to Netdata calling auxiliary functions. * * @param root the target list. @@ -561,12 +619,11 @@ static void ebpf_swap_sum_pids(netdata_publish_swap_t *swap, struct ebpf_pid_on_ void ebpf_swap_send_apps_data(struct ebpf_target *root) { struct ebpf_target *w; + pthread_mutex_lock(&collect_data_mutex); for (w = root; w; w = w->next) { if (unlikely(!(w->charts_created & (1<<EBPF_MODULE_SWAP_IDX)))) continue; - ebpf_swap_sum_pids(&w->swap, w->root_pid); - ebpf_write_begin_chart(NETDATA_APP_FAMILY, w->clean_name, "_ebpf_call_swap_readpage"); write_chart_dimension("calls", (long long) w->swap.read); ebpf_write_end_chart(); @@ -575,6 +632,7 @@ void ebpf_swap_send_apps_data(struct ebpf_target *root) write_chart_dimension("calls", (long long) w->swap.write); ebpf_write_end_chart(); } + pthread_mutex_unlock(&collect_data_mutex); } /** @@ -791,12 +849,9 @@ static void swap_collector(ebpf_module_t *em) counter = 0; netdata_apps_integration_flags_t apps = em->apps_charts; ebpf_swap_read_global_table(stats, maps_per_core); - pthread_mutex_lock(&collect_data_mutex); - if (apps) - read_swap_apps_table(maps_per_core); if (cgroup) - ebpf_update_swap_cgroup(maps_per_core); + ebpf_update_swap_cgroup(); pthread_mutex_lock(&lock); @@ -809,7 +864,6 @@ static void swap_collector(ebpf_module_t *em) ebpf_swap_send_cgroup_data(update_every); pthread_mutex_unlock(&lock); - pthread_mutex_unlock(&collect_data_mutex); pthread_mutex_lock(&ebpf_exit_cleanup); if (running_time && !em->running_time) @@ -884,14 +938,9 @@ void ebpf_swap_create_apps_charts(struct ebpf_module *em, void *ptr) * * We are not testing the return, because callocz does this and shutdown the software * case it was not possible to allocate. - * - * @param apps is apps enabled? */ -static void ebpf_swap_allocate_global_vectors(int apps) +static void ebpf_swap_allocate_global_vectors() { - if (apps) - swap_pid = callocz((size_t)pid_max, sizeof(netdata_publish_swap_t *)); - swap_vector = callocz((size_t)ebpf_nprocs, sizeof(netdata_publish_swap_t)); swap_values = callocz((size_t)ebpf_nprocs, sizeof(netdata_idx_t)); @@ -989,7 +1038,7 @@ void *ebpf_swap_thread(void *ptr) goto endswap; } - ebpf_swap_allocate_global_vectors(em->apps_charts); + ebpf_swap_allocate_global_vectors(); int algorithms[NETDATA_SWAP_END] = { NETDATA_EBPF_INCREMENTAL_IDX, NETDATA_EBPF_INCREMENTAL_IDX }; ebpf_global_labels(swap_aggregated_data, swap_publish_aggregated, swap_dimension_name, swap_dimension_name, @@ -1001,6 +1050,13 @@ void *ebpf_swap_thread(void *ptr) ebpf_update_kernel_memory_with_vector(&plugin_statistics, em->maps, EBPF_ACTION_STAT_ADD); pthread_mutex_unlock(&lock); + ebpf_read_swap.thread = mallocz(sizeof(netdata_thread_t)); + netdata_thread_create(ebpf_read_swap.thread, + ebpf_read_swap.name, + NETDATA_THREAD_OPTION_DEFAULT, + ebpf_read_swap_thread, + em); + swap_collector(em); endswap: |