// -*- C++ -*- #include #include #include #include #include #include #include struct ChanEOF : std::exception { const char *what() const noexcept override { return "EOF from Chan"; } }; template class Chan { std::queue> data_; std::mutex m_; std::condition_variable cv_; bool eof_{false}; public: Chan() = default; Chan(const Chan&) = delete; template void write(Args&&... args) { std::lock_guard _lk (m_); assert (!eof_); data_.emplace(std::forward(args)...); cv_.notify_one(); } void writeeof() { std::lock_guard _lk (m_); eof_ = true; cv_.notify_all(); } bool eof() { std::lock_guard _lk (m_); return eof_ && data_.empty(); } T read() { std::unique_lock lk (m_); while (data_.empty()) { if (eof_) throw ChanEOF(); cv_.wait(lk); } T ret (std::move(data_.front())); data_.pop(); return ret; } }; template class WorkChan : protected Chan { std::forward_list workers_; void work(std::function f) { try { for (;;) f(this->read()); } catch (ChanEOF) {} } public: using Chan::write; WorkChan(std::function()> mkWorker) { for(unsigned i = std::max(std::thread::hardware_concurrency(), 2U); i; --i) workers_.emplace_front(mem_fun(&WorkChan::work), this, mkWorker()); } ~WorkChan() { this->writeeof(); while (!workers_.empty()) { workers_.front().join(); workers_.pop_front(); } } };