From 9d3721db27005d65ec57057713022e61e9bad887 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Thu, 18 Jun 2020 00:10:47 +0200 Subject: Update rigtorp/SPSCQueue v1.0 --- lib/rigtorp/SPSCQueue/README.md | 53 ++++++++++++++- lib/rigtorp/SPSCQueue/include/rigtorp/SPSCQueue.h | 78 ++++++++++++++++++----- 2 files changed, 113 insertions(+), 18 deletions(-) (limited to 'lib/rigtorp') diff --git a/lib/rigtorp/SPSCQueue/README.md b/lib/rigtorp/SPSCQueue/README.md index ba0ef3b9e1..85326e656a 100644 --- a/lib/rigtorp/SPSCQueue/README.md +++ b/lib/rigtorp/SPSCQueue/README.md @@ -1,6 +1,7 @@ # SPSCQueue.h [![Build Status](https://travis-ci.org/rigtorp/SPSCQueue.svg?branch=master)](https://travis-ci.org/rigtorp/SPSCQueue) +[![C/C++ CI](https://github.com/rigtorp/SPSCQueue/workflows/C/C++%20CI/badge.svg)](https://github.com/rigtorp/SPSCQueue/actions) [![License](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/rigtorp/SPSCQueue/master/LICENSE) A single producer single consumer wait-free and lock-free fixed size @@ -19,6 +20,8 @@ q.push(1); t.join(); ``` +See `src/SPSCQueueExample.cpp` for the full example. + ## Usage - `SPSCQueue(size_t capacity);` @@ -70,6 +73,54 @@ Only a single writer thread can perform enqueue operations and only a single reader thread can perform dequeue operations. Any other usage is invalid. +## Huge page support + +In addition to supporting custom allocation through the [standard custom +allocator interface](https://en.cppreference.com/w/cpp/named_req/Allocator) this +library also supports standard proposal [P0401R3 Providing size feedback in the +Allocator +interface](http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2020/p0401r3.html). +This allows convenient use of [huge +pages](https://www.kernel.org/doc/html/latest/admin-guide/mm/hugetlbpage.html) +without wasting any allocated space. Using size feedback is only supported when +C++17 is enabled. + +The library currently doesn't include a huge page allocator since the APIs for +allocating huge pages are platform dependent and handling of huge page size and +NUMA awareness is application specific. + +Below is an example huge page allocator for Linux: +```cpp +#include + +template struct Allocator { + using value_type = T; + + struct AllocationResult { + T *ptr; + size_t count; + }; + + size_t roundup(size_t n) { return (((n - 1) >> 21) + 1) << 21; } + + AllocationResult allocate_at_least(size_t n) { + size_t count = roundup(sizeof(T) * n); + auto p = static_cast(mmap(nullptr, count, PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB, + -1, 0)); + if (p == MAP_FAILED) { + throw std::bad_alloc(); + } + return {p, count / sizeof(T)}; + } + + void deallocate(T *p, size_t n) { munmap(p, roundup(sizeof(T) * n)); } +}; +``` + +See `src/SPSCQueueExampleHugepages.cpp` for the full example on how to use huge +pages on Linux. + ## Implementation ![Memory layout](https://github.com/rigtorp/SPSCQueue/blob/master/spsc.png) @@ -80,7 +131,7 @@ The underlying implementation is a Care has been taken to make sure to avoid any issues with [false sharing](https://en.wikipedia.org/wiki/False_sharing). The head and tail pointers are aligned and padded to the false sharing range -(currently hard coded to 128 bytes). The slots buffer is padded with +(cache line size). The slots buffer is padded with the false sharing range at the beginning and end. References: diff --git a/lib/rigtorp/SPSCQueue/include/rigtorp/SPSCQueue.h b/lib/rigtorp/SPSCQueue/include/rigtorp/SPSCQueue.h index 881847d01a..b2bb56f9cf 100644 --- a/lib/rigtorp/SPSCQueue/include/rigtorp/SPSCQueue.h +++ b/lib/rigtorp/SPSCQueue/include/rigtorp/SPSCQueue.h @@ -1,5 +1,5 @@ /* -Copyright (c) 2018 Erik Rigtorp +Copyright (c) 2020 Erik Rigtorp Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -25,23 +25,56 @@ SOFTWARE. #include #include #include +#include // std::allocator +#include // std::hardware_destructive_interference_size #include -#include +#include // std::enable_if, std::is_*_constructible namespace rigtorp { -template class SPSCQueue { +template > class SPSCQueue { + +#if defined(__cpp_if_constexpr) && defined(__cpp_lib_void_t) + template + struct has_allocate_at_least : std::false_type {}; + + template + struct has_allocate_at_least< + Alloc2, std::void_t().allocate_at_least( + size_t{}))>> : std::true_type {}; +#endif + public: - explicit SPSCQueue(const size_t capacity) - : capacity_(capacity), - slots_(capacity_ < 2 ? nullptr - : static_cast(operator new[]( - sizeof(T) * (capacity_ + 2 * kPadding)))), - head_(0), tail_(0) { - if (capacity_ < 2) { - throw std::invalid_argument("size < 2"); + explicit SPSCQueue(const size_t capacity, + const Allocator &allocator = Allocator()) + : capacity_(capacity), allocator_(allocator), head_(0), tail_(0) { + // The queue needs at least one element + if (capacity_ < 1) { + capacity_ = 1; + } + capacity_++; // Needs one slack element + // Prevent overflowing size_t + if (capacity_ > SIZE_MAX - 2 * kPadding) { + capacity_ = SIZE_MAX - 2 * kPadding; } - assert(alignof(SPSCQueue) >= kCacheLineSize); + +#if defined(__cpp_if_constexpr) && defined(__cpp_lib_void_t) + if constexpr (has_allocate_at_least::value) { + auto res = allocator_.allocate_at_least(capacity_ + 2 * kPadding); + slots_ = res.ptr; + capacity_ = res.count - 2 * kPadding; + } else { + slots_ = std::allocator_traits::allocate( + allocator_, capacity_ + 2 * kPadding); + } +#else + slots_ = std::allocator_traits::allocate( + allocator_, capacity_ + 2 * kPadding); +#endif + + static_assert(alignof(SPSCQueue) == kCacheLineSize, ""); + static_assert(sizeof(SPSCQueue) >= 3 * kCacheLineSize, ""); assert(reinterpret_cast(&tail_) - reinterpret_cast(&head_) >= static_cast(kCacheLineSize)); @@ -51,7 +84,8 @@ public: while (front()) { pop(); } - operator delete[](slots_); + std::allocator_traits::deallocate(allocator_, slots_, + capacity_ + 2 * kPadding); } // non-copyable and non-movable @@ -149,17 +183,27 @@ public: bool empty() const noexcept { return size() == 0; } - size_t capacity() const noexcept { return capacity_; } + size_t capacity() const noexcept { return capacity_ - 1; } private: - static constexpr size_t kCacheLineSize = 128; +#ifdef __cpp_lib_hardware_interference_size + static constexpr size_t kCacheLineSize = + std::hardware_destructive_interference_size; +#else + static constexpr size_t kCacheLineSize = 64; +#endif // Padding to avoid false sharing between slots_ and adjacent allocations static constexpr size_t kPadding = (kCacheLineSize - 1) / sizeof(T) + 1; private: - const size_t capacity_; - T *const slots_; + size_t capacity_; + T *slots_; +#if defined(__has_cpp_attribute) && __has_cpp_attribute(no_unique_address) + Allocator allocator_ [[no_unique_address]]; +#else + Allocator allocator_; +#endif // Align to avoid false sharing between head_ and tail_ alignas(kCacheLineSize) std::atomic head_; -- cgit v1.2.3