diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/rigtorp/SPSCQueue/LICENSE | 22 | ||||
-rw-r--r-- | lib/rigtorp/SPSCQueue/README.md | 130 | ||||
-rw-r--r-- | lib/rigtorp/SPSCQueue/include/rigtorp/SPSCQueue.h | 171 |
3 files changed, 323 insertions, 0 deletions
diff --git a/lib/rigtorp/SPSCQueue/LICENSE b/lib/rigtorp/SPSCQueue/LICENSE new file mode 100644 index 0000000000..d49012fb3b --- /dev/null +++ b/lib/rigtorp/SPSCQueue/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2018 Erik Rigtorp <erik@rigtorp.se> + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/lib/rigtorp/SPSCQueue/README.md b/lib/rigtorp/SPSCQueue/README.md new file mode 100644 index 0000000000..ba0ef3b9e1 --- /dev/null +++ b/lib/rigtorp/SPSCQueue/README.md @@ -0,0 +1,130 @@ +# SPSCQueue.h + +[![Build Status](https://travis-ci.org/rigtorp/SPSCQueue.svg?branch=master)](https://travis-ci.org/rigtorp/SPSCQueue) +[![License](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/rigtorp/SPSCQueue/master/LICENSE) + +A single producer single consumer wait-free and lock-free fixed size +queue written in C++11. + +## Example + +```cpp +SPSCQueue<int> q(2); +auto t = std::thread([&] { + while (!q.front()); + std::cout << *q.front() << std::endl; + q.pop(); +}); +q.push(1); +t.join(); +``` + +## Usage + +- `SPSCQueue<T>(size_t capacity);` + + Create a `SPSCqueue` holding items of type `T` with capacity + `capacity`. Capacity need to be greater than 2. + +- `void emplace(Args &&... args);` + + Enqueue an item using inplace construction. Blocks if queue is full. + +- `bool try_emplace(Args &&... args);` + + Try to enqueue an item using inplace construction. Returns `true` on + success and `false` if queue is full. + +- `void push(const T &v);` + + Enqueue an item using copy construction. Blocks if queue is full. + +- `template <typename P> void push(P &&v);` + + Enqueue an item using move construction. Participates in overload + resolution only if `std::is_constructible<T, P&&>::value == true`. + Blocks if queue is full. + +- `bool try_push(const T &v);` + + Try to enqueue an item using copy construction. Returns `true` on + success and `false` if queue is full. + +- `template <typename P> void try_push(P &&v);` + + Try to enqueue an item using move construction. Returns `true` on + success and `false` if queue is full. Participates in overload + resolution only if `std::is_constructible<T, P&&>::value == true`. + +- `T *front();` + + Return pointer to front of queue. Returns `nullptr` if queue is + empty. + +- `pop();` + + Dequeue first elment of queue. Invalid to call if queue is + empty. Requires `std::is_nothrow_destructible<T>::value == true`. + +Only a single writer thread can perform enqueue operations and only a +single reader thread can perform dequeue operations. Any other usage +is invalid. + +## Implementation + +![Memory layout](https://github.com/rigtorp/SPSCQueue/blob/master/spsc.png) + +The underlying implementation is a +[ring buffer](https://en.wikipedia.org/wiki/Circular_buffer). + +Care has been taken to make sure to avoid any issues with +[false sharing](https://en.wikipedia.org/wiki/False_sharing). The head +and tail pointers are aligned and padded to the false sharing range +(currently hard coded to 128 bytes). The slots buffer is padded with +the false sharing range at the beginning and end. + +References: + +- *Intel*. [Avoiding and Identifying False Sharing Among Threads](https://software.intel.com/en-us/articles/avoiding-and-identifying-false-sharing-among-threads). +- *Wikipedia*. [Ring buffer](https://en.wikipedia.org/wiki/Circular_buffer). +- *Wikipedia*. [False sharing](https://en.wikipedia.org/wiki/False_sharing). + +## Testing + +Testing lock-free algorithms is hard. I'm using two approaches to test +the implementation: + +- A single threaded test that the functionality works as intended, + including that the element constructor and destructor is invoked + correctly. +- A multithreaded fuzz test that all elements are enqueued and + dequeued correctly under heavy contention. + +## Benchmarks + +Throughput benchmark measures throughput between 2 threads for a +`SPSCQueue<int>` of size 256. + +Latency benchmark measures round trip time between 2 threads +communicating using 2 queues of type `SPSCQueue<int>`. + +The following numbers are for a 2 socket machine with 2 x Intel(R) +Xeon(R) CPU E5-2620 0 @ 2.00GHz. + +| NUMA Node / Core / Hyper-Thread | Throughput (ops/ms) | Latency RTT (ns) | +| ------------------------------- | -------------------:| ----------------:| +| #0,#0,#0 & #0,#0,#1 | 63942 | 60 | +| #0,#0,#0 & #0,#1,#0 | 37739 | 238 | +| #0,#0,#0 & #1,#0,#0 | 25744 | 768 | + +## Cited by + +SPSCQueue have been cited by the following papers: +- Peizhao Ou and Brian Demsky. 2018. Towards understanding the costs of avoiding + out-of-thin-air results. Proc. ACM Program. Lang. 2, OOPSLA, Article 136 + (October 2018), 29 pages. DOI: https://doi.org/10.1145/3276506 + +## About + +This project was created by [Erik Rigtorp](http://rigtorp.se) +<[erik@rigtorp.se](mailto:erik@rigtorp.se)>. diff --git a/lib/rigtorp/SPSCQueue/include/rigtorp/SPSCQueue.h b/lib/rigtorp/SPSCQueue/include/rigtorp/SPSCQueue.h new file mode 100644 index 0000000000..881847d01a --- /dev/null +++ b/lib/rigtorp/SPSCQueue/include/rigtorp/SPSCQueue.h @@ -0,0 +1,171 @@ +/* +Copyright (c) 2018 Erik Rigtorp <erik@rigtorp.se> + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + */ + +#pragma once + +#include <atomic> +#include <cassert> +#include <cstddef> +#include <stdexcept> +#include <type_traits> + +namespace rigtorp { + +template <typename T> class SPSCQueue { +public: + explicit SPSCQueue(const size_t capacity) + : capacity_(capacity), + slots_(capacity_ < 2 ? nullptr + : static_cast<T *>(operator new[]( + sizeof(T) * (capacity_ + 2 * kPadding)))), + head_(0), tail_(0) { + if (capacity_ < 2) { + throw std::invalid_argument("size < 2"); + } + assert(alignof(SPSCQueue<T>) >= kCacheLineSize); + assert(reinterpret_cast<char *>(&tail_) - + reinterpret_cast<char *>(&head_) >= + static_cast<std::ptrdiff_t>(kCacheLineSize)); + } + + ~SPSCQueue() { + while (front()) { + pop(); + } + operator delete[](slots_); + } + + // non-copyable and non-movable + SPSCQueue(const SPSCQueue &) = delete; + SPSCQueue &operator=(const SPSCQueue &) = delete; + + template <typename... Args> + void emplace(Args &&... args) noexcept( + std::is_nothrow_constructible<T, Args &&...>::value) { + static_assert(std::is_constructible<T, Args &&...>::value, + "T must be constructible with Args&&..."); + auto const head = head_.load(std::memory_order_relaxed); + auto nextHead = head + 1; + if (nextHead == capacity_) { + nextHead = 0; + } + while (nextHead == tail_.load(std::memory_order_acquire)) + ; + new (&slots_[head + kPadding]) T(std::forward<Args>(args)...); + head_.store(nextHead, std::memory_order_release); + } + + template <typename... Args> + bool try_emplace(Args &&... args) noexcept( + std::is_nothrow_constructible<T, Args &&...>::value) { + static_assert(std::is_constructible<T, Args &&...>::value, + "T must be constructible with Args&&..."); + auto const head = head_.load(std::memory_order_relaxed); + auto nextHead = head + 1; + if (nextHead == capacity_) { + nextHead = 0; + } + if (nextHead == tail_.load(std::memory_order_acquire)) { + return false; + } + new (&slots_[head + kPadding]) T(std::forward<Args>(args)...); + head_.store(nextHead, std::memory_order_release); + return true; + } + + void push(const T &v) noexcept(std::is_nothrow_copy_constructible<T>::value) { + static_assert(std::is_copy_constructible<T>::value, + "T must be copy constructible"); + emplace(v); + } + + template <typename P, typename = typename std::enable_if< + std::is_constructible<T, P &&>::value>::type> + void push(P &&v) noexcept(std::is_nothrow_constructible<T, P &&>::value) { + emplace(std::forward<P>(v)); + } + + bool + try_push(const T &v) noexcept(std::is_nothrow_copy_constructible<T>::value) { + static_assert(std::is_copy_constructible<T>::value, + "T must be copy constructible"); + return try_emplace(v); + } + + template <typename P, typename = typename std::enable_if< + std::is_constructible<T, P &&>::value>::type> + bool try_push(P &&v) noexcept(std::is_nothrow_constructible<T, P &&>::value) { + return try_emplace(std::forward<P>(v)); + } + + T *front() noexcept { + auto const tail = tail_.load(std::memory_order_relaxed); + if (head_.load(std::memory_order_acquire) == tail) { + return nullptr; + } + return &slots_[tail + kPadding]; + } + + void pop() noexcept { + static_assert(std::is_nothrow_destructible<T>::value, + "T must be nothrow destructible"); + auto const tail = tail_.load(std::memory_order_relaxed); + assert(head_.load(std::memory_order_acquire) != tail); + slots_[tail + kPadding].~T(); + auto nextTail = tail + 1; + if (nextTail == capacity_) { + nextTail = 0; + } + tail_.store(nextTail, std::memory_order_release); + } + + size_t size() const noexcept { + std::ptrdiff_t diff = head_.load(std::memory_order_acquire) - + tail_.load(std::memory_order_acquire); + if (diff < 0) { + diff += capacity_; + } + return static_cast<size_t>(diff); + } + + bool empty() const noexcept { return size() == 0; } + + size_t capacity() const noexcept { return capacity_; } + +private: + static constexpr size_t kCacheLineSize = 128; + + // Padding to avoid false sharing between slots_ and adjacent allocations + static constexpr size_t kPadding = (kCacheLineSize - 1) / sizeof(T) + 1; + +private: + const size_t capacity_; + T *const slots_; + + // Align to avoid false sharing between head_ and tail_ + alignas(kCacheLineSize) std::atomic<size_t> head_; + alignas(kCacheLineSize) std::atomic<size_t> tail_; + + // Padding to avoid adjacent allocations to share cache line with tail_ + char padding_[kCacheLineSize - sizeof(tail_)]; +}; +} // namespace rigtorp |