diff options
35 files changed, 2630 insertions, 72 deletions
diff --git a/Documentation/bpf/ringbuf.rst b/Documentation/bpf/ringbuf.rst new file mode 100644 index 000000000000..75f943f0009d --- /dev/null +++ b/Documentation/bpf/ringbuf.rst @@ -0,0 +1,209 @@ +=============== +BPF ring buffer +=============== + +This document describes BPF ring buffer design, API, and implementation details. + +.. contents:: + :local: + :depth: 2 + +Motivation +---------- + +There are two distinctive motivators for this work, which are not satisfied by +existing perf buffer, which prompted creation of a new ring buffer +implementation. + +- more efficient memory utilization by sharing ring buffer across CPUs; +- preserving ordering of events that happen sequentially in time, even across + multiple CPUs (e.g., fork/exec/exit events for a task). + +These two problems are independent, but perf buffer fails to satisfy both. +Both are a result of a choice to have per-CPU perf ring buffer. Both can be +also solved by having an MPSC implementation of ring buffer. The ordering +problem could technically be solved for perf buffer with some in-kernel +counting, but given the first one requires an MPSC buffer, the same solution +would solve the second problem automatically. + +Semantics and APIs +------------------ + +Single ring buffer is presented to BPF programs as an instance of BPF map of +type ``BPF_MAP_TYPE_RINGBUF``. Two other alternatives considered, but +ultimately rejected. + +One way would be to, similar to ``BPF_MAP_TYPE_PERF_EVENT_ARRAY``, make +``BPF_MAP_TYPE_RINGBUF`` could represent an array of ring buffers, but not +enforce "same CPU only" rule. This would be more familiar interface compatible +with existing perf buffer use in BPF, but would fail if application needed more +advanced logic to lookup ring buffer by arbitrary key. +``BPF_MAP_TYPE_HASH_OF_MAPS`` addresses this with current approach. +Additionally, given the performance of BPF ringbuf, many use cases would just +opt into a simple single ring buffer shared among all CPUs, for which current +approach would be an overkill. + +Another approach could introduce a new concept, alongside BPF map, to represent +generic "container" object, which doesn't necessarily have key/value interface +with lookup/update/delete operations. This approach would add a lot of extra +infrastructure that has to be built for observability and verifier support. It +would also add another concept that BPF developers would have to familiarize +themselves with, new syntax in libbpf, etc. But then would really provide no +additional benefits over the approach of using a map. ``BPF_MAP_TYPE_RINGBUF`` +doesn't support lookup/update/delete operations, but so doesn't few other map +types (e.g., queue and stack; array doesn't support delete, etc). + +The approach chosen has an advantage of re-using existing BPF map +infrastructure (introspection APIs in kernel, libbpf support, etc), being +familiar concept (no need to teach users a new type of object in BPF program), +and utilizing existing tooling (bpftool). For common scenario of using a single +ring buffer for all CPUs, it's as simple and straightforward, as would be with +a dedicated "container" object. On the other hand, by being a map, it can be +combined with ``ARRAY_OF_MAPS`` and ``HASH_OF_MAPS`` map-in-maps to implement +a wide variety of topologies, from one ring buffer for each CPU (e.g., as +a replacement for perf buffer use cases), to a complicated application +hashing/sharding of ring buffers (e.g., having a small pool of ring buffers +with hashed task's tgid being a look up key to preserve order, but reduce +contention). + +Key and value sizes are enforced to be zero. ``max_entries`` is used to specify +the size of ring buffer and has to be a power of 2 value. + +There are a bunch of similarities between perf buffer +(``BPF_MAP_TYPE_PERF_EVENT_ARRAY``) and new BPF ring buffer semantics: + +- variable-length records; +- if there is no more space left in ring buffer, reservation fails, no + blocking; +- memory-mappable data area for user-space applications for ease of + consumption and high performance; +- epoll notifications for new incoming data; +- but still the ability to do busy polling for new data to achieve the + lowest latency, if necessary. + +BPF ringbuf provides two sets of APIs to BPF programs: + +- ``bpf_ringbuf_output()`` allows to *copy* data from one place to a ring + buffer, similarly to ``bpf_perf_event_output()``; +- ``bpf_ringbuf_reserve()``/``bpf_ringbuf_commit()``/``bpf_ringbuf_discard()`` + APIs split the whole process into two steps. First, a fixed amount of space + is reserved. If successful, a pointer to a data inside ring buffer data + area is returned, which BPF programs can use similarly to a data inside + array/hash maps. Once ready, this piece of memory is either committed or + discarded. Discard is similar to commit, but makes consumer ignore the + record. + +``bpf_ringbuf_output()`` has disadvantage of incurring extra memory copy, +because record has to be prepared in some other place first. But it allows to +submit records of the length that's not known to verifier beforehand. It also +closely matches ``bpf_perf_event_output()``, so will simplify migration +significantly. + +``bpf_ringbuf_reserve()`` avoids the extra copy of memory by providing a memory +pointer directly to ring buffer memory. In a lot of cases records are larger +than BPF stack space allows, so many programs have use extra per-CPU array as +a temporary heap for preparing sample. bpf_ringbuf_reserve() avoid this needs +completely. But in exchange, it only allows a known constant size of memory to +be reserved, such that verifier can verify that BPF program can't access memory +outside its reserved record space. bpf_ringbuf_output(), while slightly slower +due to extra memory copy, covers some use cases that are not suitable for +``bpf_ringbuf_reserve()``. + +The difference between commit and discard is very small. Discard just marks +a record as discarded, and such records are supposed to be ignored by consumer +code. Discard is useful for some advanced use-cases, such as ensuring +all-or-nothing multi-record submission, or emulating temporary +``malloc()``/``free()`` within single BPF program invocation. + +Each reserved record is tracked by verifier through existing +reference-tracking logic, similar to socket ref-tracking. It is thus +impossible to reserve a record, but forget to submit (or discard) it. + +``bpf_ringbuf_query()`` helper allows to query various properties of ring +buffer. Currently 4 are supported: + +- ``BPF_RB_AVAIL_DATA`` returns amount of unconsumed data in ring buffer; +- ``BPF_RB_RING_SIZE`` returns the size of ring buffer; +- ``BPF_RB_CONS_POS``/``BPF_RB_PROD_POS`` returns current logical possition + of consumer/producer, respectively. + +Returned values are momentarily snapshots of ring buffer state and could be +off by the time helper returns, so this should be used only for +debugging/reporting reasons or for implementing various heuristics, that take +into account highly-changeable nature of some of those characteristics. + +One such heuristic might involve more fine-grained control over poll/epoll +notifications about new data availability in ring buffer. Together with +``BPF_RB_NO_WAKEUP``/``BPF_RB_FORCE_WAKEUP`` flags for output/commit/discard +helpers, it allows BPF program a high degree of control and, e.g., more +efficient batched notifications. Default self-balancing strategy, though, +should be adequate for most applications and will work reliable and efficiently +already. + +Design and Implementation +------------------------- + +This reserve/commit schema allows a natural way for multiple producers, either +on different CPUs or even on the same CPU/in the same BPF program, to reserve +independent records and work with them without blocking other producers. This +means that if BPF program was interruped by another BPF program sharing the +same ring buffer, they will both get a record reserved (provided there is +enough space left) and can work with it and submit it independently. This +applies to NMI context as well, except that due to using a spinlock during +reservation, in NMI context, ``bpf_ringbuf_reserve()`` might fail to get +a lock, in which case reservation will fail even if ring buffer is not full. + +The ring buffer itself internally is implemented as a power-of-2 sized +circular buffer, with two logical and ever-increasing counters (which might +wrap around on 32-bit architectures, that's not a problem): + +- consumer counter shows up to which logical position consumer consumed the + data; +- producer counter denotes amount of data reserved by all producers. + +Each time a record is reserved, producer that "owns" the record will +successfully advance producer counter. At that point, data is still not yet +ready to be consumed, though. Each record has 8 byte header, which contains the +length of reserved record, as well as two extra bits: busy bit to denote that +record is still being worked on, and discard bit, which might be set at commit +time if record is discarded. In the latter case, consumer is supposed to skip +the record and move on to the next one. Record header also encodes record's +relative offset from the beginning of ring buffer data area (in pages). This +allows ``bpf_ringbuf_commit()``/``bpf_ringbuf_discard()`` to accept only the +pointer to the record itself, without requiring also the pointer to ring buffer +itself. Ring buffer memory location will be restored from record metadata +header. This significantly simplifies verifier, as well as improving API +usability. + +Producer counter increments are serialized under spinlock, so there is +a strict ordering between reservations. Commits, on the other hand, are +completely lockless and independent. All records become available to consumer +in the order of reservations, but only after all previous records where +already committed. It is thus possible for slow producers to temporarily hold +off submitted records, that were reserved later. + +Reservation/commit/consumer protocol is verified by litmus tests in +Documentation/litmus_tests/bpf-rb/_. + +One interesting implementation bit, that significantly simplifies (and thus +speeds up as well) implementation of both producers and consumers is how data +area is mapped twice contiguously back-to-back in the virtual memory. This +allows to not take any special measures for samples that have to wrap around +at the end of the circular buffer data area, because the next page after the +last data page would be first data page again, and thus the sample will still +appear completely contiguous in virtual memory. See comment and a simple ASCII +diagram showing this visually in ``bpf_ringbuf_area_alloc()``. + +Another feature that distinguishes BPF ringbuf from perf ring buffer is +a self-pacing notifications of new data being availability. +``bpf_ringbuf_commit()`` implementation will send a notification of new record +being available after commit only if consumer has already caught up right up to +the record being committed. If not, consumer still has to catch up and thus +will see new data anyways without needing an extra poll notification. +Benchmarks (see tools/testing/selftests/bpf/benchs/bench_ringbuf.c_) show that +this allows to achieve a very high throughput without having to resort to +tricks like "notify only every Nth sample", which are necessary with perf +buffer. For extreme cases, when BPF program wants more manual control of +notifications, commit/discard/output helpers accept ``BPF_RB_NO_WAKEUP`` and +``BPF_RB_FORCE_WAKEUP`` flags, which give full control over notifications of +data availability, but require extra caution and diligence in using this API. diff --git a/include/linux/bpf.h b/include/linux/bpf.h index efe8836b5c48..e5884f7f801c 100644 --- a/include/linux/bpf.h +++ b/include/linux/bpf.h @@ -90,6 +90,8 @@ struct bpf_map_ops { int (*map_direct_value_meta)(const struct bpf_map *map, u64 imm, u32 *off); int (*map_mmap)(struct bpf_map *map, struct vm_area_struct *vma); + __poll_t (*map_poll)(struct bpf_map *map, struct file *filp, + struct poll_table_struct *pts); }; struct bpf_map_memory { @@ -244,6 +246,9 @@ enum bpf_arg_type { ARG_PTR_TO_LONG, /* pointer to long */ ARG_PTR_TO_SOCKET, /* pointer to bpf_sock (fullsock) */ ARG_PTR_TO_BTF_ID, /* pointer to in-kernel struct */ + ARG_PTR_TO_ALLOC_MEM, /* pointer to dynamically allocated memory */ + ARG_PTR_TO_ALLOC_MEM_OR_NULL, /* pointer to dynamically allocated memory or NULL */ + ARG_CONST_ALLOC_SIZE_OR_ZERO, /* number of allocated bytes requested */ }; /* type of values returned from helper functions */ @@ -255,6 +260,7 @@ enum bpf_return_type { RET_PTR_TO_SOCKET_OR_NULL, /* returns a pointer to a socket or NULL */ RET_PTR_TO_TCP_SOCK_OR_NULL, /* returns a pointer to a tcp_sock or NULL */ RET_PTR_TO_SOCK_COMMON_OR_NULL, /* returns a pointer to a sock_common or NULL */ + RET_PTR_TO_ALLOC_MEM_OR_NULL, /* returns a pointer to dynamically allocated memory or NULL */ }; /* eBPF function prototype used by verifier to allow BPF_CALLs from eBPF programs @@ -322,6 +328,8 @@ enum bpf_reg_type { PTR_TO_XDP_SOCK, /* reg points to struct xdp_sock */ PTR_TO_BTF_ID, /* reg points to kernel struct */ PTR_TO_BTF_ID_OR_NULL, /* reg points to kernel struct or NULL */ + PTR_TO_MEM, /* reg points to valid memory region */ + PTR_TO_MEM_OR_NULL, /* reg points to valid memory region or NULL */ }; /* The information passed from prog-specific *_is_valid_access @@ -1611,6 +1619,11 @@ extern const struct bpf_func_proto bpf_tcp_sock_proto; extern const struct bpf_func_proto bpf_jiffies64_proto; extern const struct bpf_func_proto bpf_get_ns_current_pid_tgid_proto; extern const struct bpf_func_proto bpf_event_output_data_proto; +extern const struct bpf_func_proto bpf_ringbuf_output_proto; +extern const struct bpf_func_proto bpf_ringbuf_reserve_proto; +extern const struct bpf_func_proto bpf_ringbuf_submit_proto; +extern const struct bpf_func_proto bpf_ringbuf_discard_proto; +extern const struct bpf_func_proto bpf_ringbuf_query_proto; const struct bpf_func_proto *bpf_tracing_func_proto( enum bpf_func_id func_id, const struct bpf_prog *prog); diff --git a/include/linux/bpf_types.h b/include/linux/bpf_types.h index 29d22752fc87..fa8e1b552acd 100644 --- a/include/linux/bpf_types.h +++ b/include/linux/bpf_types.h @@ -118,6 +118,7 @@ BPF_MAP_TYPE(BPF_MAP_TYPE_STACK, stack_map_ops) #if defined(CONFIG_BPF_JIT) BPF_MAP_TYPE(BPF_MAP_TYPE_STRUCT_OPS, bpf_struct_ops_map_ops) #endif +BPF_MAP_TYPE(BPF_MAP_TYPE_RINGBUF, ringbuf_map_ops) BPF_LINK_TYPE(BPF_LINK_TYPE_RAW_TRACEPOINT, raw_tracepoint) BPF_LINK_TYPE(BPF_LINK_TYPE_TRACING, tracing) diff --git a/include/linux/bpf_verifier.h b/include/linux/bpf_verifier.h index ea833087e853..ca08db4ffb5f 100644 --- a/include/linux/bpf_verifier.h +++ b/include/linux/bpf_verifier.h @@ -54,6 +54,8 @@ struct bpf_reg_state { u32 btf_id; /* for PTR_TO_BTF_ID */ + u32 mem_size; /* for PTR_TO_MEM | PTR_TO_MEM_OR_NULL */ + /* Max size from any of the above. */ unsigned long raw; }; @@ -63,6 +65,8 @@ struct bpf_reg_state { * offset, so they can share range knowledge. * For PTR_TO_MAP_VALUE_OR_NULL this is used to share which map value we * came from, when one is tested for != NULL. + * For PTR_TO_MEM_OR_NULL this is used to identify memory allocation + * for the purpose of tracking that it's freed. * For PTR_TO_SOCKET this is used to share which pointers retain the * same reference to the socket, to determine proper reference freeing. */ diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h index 54b93f8b49b8..974ca6e948e3 100644 --- a/include/uapi/linux/bpf.h +++ b/include/uapi/linux/bpf.h @@ -147,6 +147,7 @@ enum bpf_map_type { BPF_MAP_TYPE_SK_STORAGE, BPF_MAP_TYPE_DEVMAP_HASH, BPF_MAP_TYPE_STRUCT_OPS, + BPF_MAP_TYPE_RINGBUF, }; /* Note that tracing related programs such as @@ -3157,6 +3158,59 @@ union bpf_attr { * **bpf_sk_cgroup_id**\ (). * Return * The id is returned or 0 in case the id could not be retrieved. + * + * void *bpf_ringbuf_output(void *ringbuf, void *data, u64 size, u64 flags) + * Description + * Copy *size* bytes from *data* into a ring buffer *ringbuf*. + * If BPF_RB_NO_WAKEUP is specified in *flags*, no notification of + * new data availability is sent. + * IF BPF_RB_FORCE_WAKEUP is specified in *flags*, notification of + * new data availability is sent unconditionally. + * Return + * 0, on success; + * < 0, on error. + * + * void *bpf_ringbuf_reserve(void *ringbuf, u64 size, u64 flags) + * Description + * Reserve *size* bytes of payload in a ring buffer *ringbuf*. + * Return + * Valid pointer with *size* bytes of memory available; NULL, + * otherwise. + * + * void bpf_ringbuf_submit(void *data, u64 flags) + * Description + * Submit reserved ring buffer sample, pointed to by *data*. + * If BPF_RB_NO_WAKEUP is specified in *flags*, no notification of + * new data availability is sent. + * IF BPF_RB_FORCE_WAKEUP is specified in *flags*, notification of + * new data availability is sent unconditionally. + * Return + * Nothing. Always succeeds. + * + * void bpf_ringbuf_discard(void *data, u64 flags) + * Description + * Discard reserved ring buffer sample, pointed to by *data*. + * If BPF_RB_NO_WAKEUP is specified in *flags*, no notification of + * new data availability is sent. + * IF BPF_RB_FORCE_WAKEUP is specified in *flags*, notification of + * new data availability is sent unconditionally. + * Return + * Nothing. Always succeeds. + * + * u64 bpf_ringbuf_query(void *ringbuf, u64 flags) + * Description + * Query various characteristics of provided ring buffer. What + * exactly is queries is determined by *flags*: + * - BPF_RB_AVAIL_DATA - amount of data not yet consumed; + * - BPF_RB_RING_SIZE - the size of ring buffer; + * - BPF_RB_CONS_POS - consumer position (can wrap around); + * - BPF_RB_PROD_POS - producer(s) position (can wrap around); + * Data returned is just a momentary snapshots of actual values + * and could be inaccurate, so this facility should be used to + * power heuristics and for reporting, not to make 100% correct + * calculation. + * Return + * Requested value, or 0, if flags are not recognized. */ #define __BPF_FUNC_MAPPER(FN) \ FN(unspec), \ @@ -3288,7 +3342,12 @@ union bpf_attr { FN(seq_printf), \ FN(seq_write), \ FN(sk_cgroup_id), \ - FN(sk_ancestor_cgroup_id), + FN(sk_ancestor_cgroup_id), \ + FN(ringbuf_output), \ + FN(ringbuf_reserve), \ + FN(ringbuf_submit), \ + FN(ringbuf_discard), \ + FN(ringbuf_query), /* integer value in 'imm' field of BPF_CALL instruction selects which helper * function eBPF program intends to call @@ -3398,6 +3457,29 @@ enum { BPF_F_GET_BRANCH_RECORDS_SIZE = (1ULL << 0), }; +/* BPF_FUNC_bpf_ringbuf_commit, BPF_FUNC_bpf_ringbuf_discard, and + * BPF_FUNC_bpf_ringbuf_output flags. + */ +enum { + BPF_RB_NO_WAKEUP = (1ULL << 0), + BPF_RB_FORCE_WAKEUP = (1ULL << 1), +}; + +/* BPF_FUNC_bpf_ringbuf_query flags */ +enum { + BPF_RB_AVAIL_DATA = 0, + BPF_RB_RING_SIZE = 1, + BPF_RB_CONS_POS = 2, + BPF_RB_PROD_POS = 3, +}; + +/* BPF ring buffer constants */ +enum { + BPF_RINGBUF_BUSY_BIT = (1U << 31), + BPF_RINGBUF_DISCARD_BIT = (1U << 30), + BPF_RINGBUF_HDR_SZ = 8, +}; + /* Mode for BPF_FUNC_skb_adjust_room helper. */ enum bpf_adj_room_mode { BPF_ADJ_ROOM_NET, diff --git a/kernel/bpf/Makefile b/kernel/bpf/Makefile index 375b933010dd..8fca02f64811 100644 --- a/kernel/bpf/Makefile +++ b/kernel/bpf/Makefile @@ -4,7 +4,7 @@ CFLAGS_core.o += $(call cc-disable-warning, override-init) obj-$(CONFIG_BPF_SYSCALL) += syscall.o verifier.o inode.o helpers.o tnum.o bpf_iter.o map_iter.o task_iter.o obj-$(CONFIG_BPF_SYSCALL) += hashtab.o arraymap.o percpu_freelist.o bpf_lru_list.o lpm_trie.o map_in_map.o -obj-$(CONFIG_BPF_SYSCALL) += local_storage.o queue_stack_maps.o +obj-$(CONFIG_BPF_SYSCALL) += local_storage.o queue_stack_maps.o ringbuf.o obj-$(CONFIG_BPF_SYSCALL) += disasm.o obj-$(CONFIG_BPF_JIT) += trampoline.o obj-$(CONFIG_BPF_SYSCALL) += btf.o diff --git a/kernel/bpf/helpers.c b/kernel/bpf/helpers.c index bb4fb634275e..be43ab3e619f 100644 --- a/kernel/bpf/helpers.c +++ b/kernel/bpf/helpers.c @@ -635,6 +635,16 @@ bpf_base_func_proto(enum bpf_func_id func_id) return &bpf_ktime_get_ns_proto; case BPF_FUNC_ktime_get_boot_ns: return &bpf_ktime_get_boot_ns_proto; + case BPF_FUNC_ringbuf_output: + return &bpf_ringbuf_output_proto; + case BPF_FUNC_ringbuf_reserve: + return &bpf_ringbuf_reserve_proto; + case BPF_FUNC_ringbuf_submit: + return &bpf_ringbuf_submit_proto; + case BPF_FUNC_ringbuf_discard: + return &bpf_ringbuf_discard_proto; + case BPF_FUNC_ringbuf_query: + return &bpf_ringbuf_query_proto; default: break; } diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c new file mode 100644 index 000000000000..180414bb0d3e --- /dev/null +++ b/kernel/bpf/ringbuf.c @@ -0,0 +1,501 @@ +#include <linux/bpf.h> +#include <linux/btf.h> +#include <linux/err.h> +#include <linux/irq_work.h> +#include <linux/slab.h> +#include <linux/filter.h> +#include <linux/mm.h> +#include <linux/vmalloc.h> +#include <linux/wait.h> +#include <linux/poll.h> +#include <uapi/linux/btf.h> + +#define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE) + +/* non-mmap()'able part of bpf_ringbuf (everything up to consumer page) */ +#define RINGBUF_PGOFF \ + (offsetof(struct bpf_ringbuf, consumer_pos) >> PAGE_SHIFT) +/* consumer page and producer page */ +#define RINGBUF_POS_PAGES 2 + +#define RINGBUF_MAX_RECORD_SZ (UINT_MAX/4) + +/* Maximum size of ring buffer area is limited by 32-bit page offset within + * record header, counted in pages. Reserve 8 bits for extensibility, and take + * into account few extra pages for consumer/producer pages and + * non-mmap()'able parts. This gives 64GB limit, which seems plenty for single + * ring buffer. + */ +#define RINGBUF_MAX_DATA_SZ \ + (((1ULL << 24) - RINGBUF_POS_PAGES - RINGBUF_PGOFF) * PAGE_SIZE) + +struct bpf_ringbuf { + wait_queue_head_t waitq; + struct irq_work work; + u64 mask; + struct page **pages; + int nr_pages; + spinlock_t spinlock ____cacheline_aligned_in_smp; + /* Consumer and producer counters are put into separate pages to allow + * mapping consumer page as r/w, but restrict producer page to r/o. + * This protects producer position from being modified by user-space + * application and ruining in-kernel position tracking. + */ + unsigned long consumer_pos __aligned(PAGE_SIZE); + unsigned long producer_pos __aligned(PAGE_SIZE); + char data[] __aligned(PAGE_SIZE); +}; + +struct bpf_ringbuf_map { + struct bpf_map map; + struct bpf_map_memory memory; + struct bpf_ringbuf *rb; +}; + +/* 8-byte ring buffer record header structure */ +struct bpf_ringbuf_hdr { + u32 len; + u32 pg_off; +}; + +static struct bpf_ringbuf *bpf_ringbuf_area_alloc(size_t data_sz, int numa_node) +{ + const gfp_t flags = GFP_KERNEL | __GFP_RETRY_MAYFAIL | __GFP_NOWARN | + __GFP_ZERO; + int nr_meta_pages = RINGBUF_PGOFF + RINGBUF_POS_PAGES; + int nr_data_pages = data_sz >> PAGE_SHIFT; + int nr_pages = nr_meta_pages + nr_data_pages; + struct page **pages, *page; + struct bpf_ringbuf *rb; + size_t array_size; + int i; + + /* Each data page is mapped twice to allow "virtual" + * continuous read of samples wrapping around the end of ring + * buffer area: + * ------------------------------------------------------ + * | meta pages | real data pages | same data pages | + * ------------------------------------------------------ + * | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 | + * ------------------------------------------------------ + * | | TA DA | TA DA | + * ------------------------------------------------------ + * ^^^^^^^ + * | + * Here, no need to worry about special handling of wrapped-around + * data due to double-mapped data pages. This works both in kernel and + * when mmap()'ed in user-space, simplifying both kernel and + * user-space implementations significantly. + */ + array_size = (nr_meta_pages + 2 * nr_data_pages) * sizeof(*pages); + if (array_size > PAGE_SIZE) + pages = vmalloc_node(array_size, numa_node); + else + pages = kmalloc_node(array_size, flags, numa_node); + if (!pages) + return NULL; + + for (i = 0; i < nr_pages; i++) { + page = alloc_pages_node(numa_node, flags, 0); + if (!page) { + nr_pages = i; + goto err_free_pages; + } + pages[i] = page; + if (i >= nr_meta_pages) + pages[nr_data_pages + i] = page; + } + + rb = vmap(pages, nr_meta_pages + 2 * nr_data_pages, + VM_ALLOC | VM_USERMAP, PAGE_KERNEL); + if (rb) { + rb->pages = pages; + rb->nr_pages = nr_pages; + return rb; + } + +err_free_pages: + for (i = 0; i < nr_pages; i++) + __free_page(pages[i]); + kvfree(pages); + return NULL; +} + +static void bpf_ringbuf_notify(struct irq_work *work) +{ + struct bpf_ringbuf *rb = container_of(work, struct bpf_ringbuf, work); + + wake_up_all(&rb->waitq); +} + +static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node) +{ + struct bpf_ringbuf *rb; + + if (!data_sz || !PAGE_ALIGNED(data_sz)) + return ERR_PTR(-EINVAL); + +#ifdef CONFIG_64BIT + /* on 32-bit arch, it's impossible to overflow record's hdr->pgoff */ + if (data_sz > RINGBUF_MAX_DATA_SZ) + return ERR_PTR(-E2BIG); +#endif + + rb = bpf_ringbuf_area_alloc(data_sz, numa_node); + if (!rb) + return ERR_PTR(-ENOMEM); + + spin_lock_init(&rb->spinlock); + init_waitqueue_head(&rb->waitq); + init_irq_work(&rb->work, bpf_ringbuf_notify); + + rb->mask = data_sz - 1; + rb->consumer_pos = 0; + rb->producer_pos = 0; + + return rb; +} + +static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr) +{ + struct bpf_ringbuf_map *rb_map; + u64 cost; + int err; + + if (attr->map_flags & ~RINGBUF_CREATE_FLAG_MASK) + return ERR_PTR(-EINVAL); + + if (attr->key_size || attr->value_size || + attr->max_entries == 0 || !PAGE_ALIGNED(attr->max_entries)) + return ERR_PTR(-EINVAL); + + rb_map = kzalloc(sizeof(*rb_map), GFP_USER); + if (!rb_map) + return ERR_PTR(-ENOMEM); + + bpf_map_init_from_attr(&rb_map->map, attr); + + cost = sizeof(struct bpf_ringbuf_map) + + sizeof(struct bpf_ringbuf) + + attr->max_entries; + err = bpf_map_charge_init(&rb_map->map.memory, cost); + if (err) + goto err_free_map; + + rb_map->rb = bpf_ringbuf_alloc(attr->max_entries, rb_map->map.numa_node); + if (IS_ERR(rb_map->rb)) { + err = PTR_ERR(rb_map->rb); + goto err_uncharge; + } + + return &rb_map->map; + +err_uncharge: + bpf_map_charge_finish(&rb_map->map.memory); +err_free_map: + kfree(rb_map); + return ERR_PTR(err); +} + +static void bpf_ringbuf_free(struct bpf_ringbuf *rb) +{ + /* copy pages pointer and nr_pages to local variable, as we are going + * to unmap rb itself with vunmap() below + */ + struct page **pages = rb->pages; + int i, nr_pages = rb->nr_pages; + + vunmap(rb); + for (i = 0; i < nr_pages; i++) + __free_page(pages[i]); + kvfree(pages); +} + +static void ringbuf_map_free(struct bpf_map *map) +{ + struct bpf_ringbuf_map *rb_map; + + /* at this point bpf_prog->aux->refcnt == 0 and this map->refcnt == 0, + * so the programs (can be more than one that used this map) were + * disconnected from events. Wait for outstanding critical sections in + * these programs to complete + */ + synchronize_rcu(); + + rb_map = container_of(map, struct bpf_ringbuf_map, map); + bpf_ringbuf_free(rb_map->rb); + kfree(rb_map); +} + +static void *ringbuf_map_lookup_elem(struct bpf_map *map, void *key) +{ + return ERR_PTR(-ENOTSUPP); +} + +static int ringbuf_map_update_elem(struct bpf_map *map, void *key, void *value, + u64 flags) +{ + return -ENOTSUPP; +} + +static int ringbuf_map_delete_elem(struct bpf_map *map, void *key) +{ + return -ENOTSUPP; +} + +static int ringbuf_map_get_next_key(struct bpf_map *map, void *key, + void *next_key) +{ + return -ENOTSUPP; +} + +static size_t bpf_ringbuf_mmap_page_cnt(const struct bpf_ringbuf *rb) +{ + size_t data_pages = (rb->mask + 1) >> PAGE_SHIFT; + + /* consumer page + producer page + 2 x data pages */ + return RINGBUF_POS_PAGES + 2 * data_pages; +} + +static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma) +{ + struct bpf_ringbuf_map *rb_map; + size_t mmap_sz; + + rb_map = container_of(map, struct bpf_ringbuf_map, map); + mmap_sz = bpf_ringbuf_mmap_page_cnt(rb_map->rb) << PAGE_SHIFT; + + if (vma->vm_pgoff * PAGE_SIZE + (vma->vm_end - vma->vm_start) > mmap_sz) + return -EINVAL; + + return remap_vmalloc_range(vma, rb_map->rb, + vma->vm_pgoff + RINGBUF_PGOFF); +} + +static unsigned long ringbu |