diff --git a/core/containers/lock_free_queue.h b/core/containers/lock_free_queue.h new file mode 100644 index 0000000..d88a0ad --- /dev/null +++ b/core/containers/lock_free_queue.h @@ -0,0 +1,110 @@ +/** + * + * @file LockFreeQueue.h + * @author An Tao + * + * Public header file in trantor lib. + * + * Copyright 2018, An Tao. All rights reserved. + * Use of this source code is governed by a BSD-style license + * that can be found in the License file. + * + * + */ + +#pragma once +#include +#include +#include +#include +#include +namespace trantor { +/** + * @brief This class template represents a lock-free multiple producers single + * consumer queue + * + * @tparam T The type of the items in the queue. + */ +template +class MpscQueue { +protected: + MpscQueue(const MpscQueue &) = delete; + MpscQueue &operator=(const MpscQueue &) = delete; + // some uncopyable classes maybe support move constructor.... + MpscQueue(MpscQueue &&) noexcept(true) = default; + MpscQueue &operator=(MpscQueue &&) noexcept(true) = default; + +public: + MpscQueue() : + head_(new BufferNode), tail_(head_.load(std::memory_order_relaxed)) { + } + ~MpscQueue() { + T output; + while (this->dequeue(output)) { + } + BufferNode *front = head_.load(std::memory_order_relaxed); + delete front; + } + + /** + * @brief Put a item into the queue. + * + * @param input + * @note This method can be called in multiple threads. + */ + void enqueue(T &&input) { + BufferNode *node{ new BufferNode(std::move(input)) }; + BufferNode *prevhead{ head_.exchange(node, std::memory_order_acq_rel) }; + prevhead->next_.store(node, std::memory_order_release); + } + void enqueue(const T &input) { + BufferNode *node{ new BufferNode(input) }; + BufferNode *prevhead{ head_.exchange(node, std::memory_order_acq_rel) }; + prevhead->next_.store(node, std::memory_order_release); + } + + /** + * @brief Get a item from the queue. + * + * @param output + * @return false if the queue is empty. + * @note This method must be called in a single thread. + */ + bool dequeue(T &output) { + BufferNode *tail = tail_.load(std::memory_order_relaxed); + BufferNode *next = tail->next_.load(std::memory_order_acquire); + + if (next == nullptr) { + return false; + } + output = std::move(*(next->dataPtr_)); + delete next->dataPtr_; + tail_.store(next, std::memory_order_release); + delete tail; + return true; + } + + bool empty() { + BufferNode *tail = tail_.load(std::memory_order_relaxed); + BufferNode *next = tail->next_.load(std::memory_order_acquire); + return next == nullptr; + } + +private: + struct BufferNode { + BufferNode() = default; + BufferNode(const T &data) : + dataPtr_(new T(data)) { + } + BufferNode(T &&data) : + dataPtr_(new T(std::move(data))) { + } + T *dataPtr_; + std::atomic next_{ nullptr }; + }; + + std::atomic head_; + std::atomic tail_; +}; + +} // namespace trantor diff --git a/web_backends/drogon/trantor/net/EventLoop.h b/web_backends/drogon/trantor/net/EventLoop.h index 1a78d01..2ab8aa7 100644 --- a/web_backends/drogon/trantor/net/EventLoop.h +++ b/web_backends/drogon/trantor/net/EventLoop.h @@ -18,7 +18,7 @@ #pragma once #include #include "core/math/date.h" -#include +#include "core/containers/lock_free_queue.h" #include #include #include diff --git a/web_backends/drogon/trantor/utils/LockFreeQueue.h b/web_backends/drogon/trantor/utils/LockFreeQueue.h deleted file mode 100644 index f608a90..0000000 --- a/web_backends/drogon/trantor/utils/LockFreeQueue.h +++ /dev/null @@ -1,114 +0,0 @@ -/** - * - * @file LockFreeQueue.h - * @author An Tao - * - * Public header file in trantor lib. - * - * Copyright 2018, An Tao. All rights reserved. - * Use of this source code is governed by a BSD-style license - * that can be found in the License file. - * - * - */ - -#pragma once -#include -#include -#include -#include -#include -namespace trantor -{ -/** - * @brief This class template represents a lock-free multiple producers single - * consumer queue - * - * @tparam T The type of the items in the queue. - */ -template -class MpscQueue : public NonCopyable -{ - public: - MpscQueue() - : head_(new BufferNode), tail_(head_.load(std::memory_order_relaxed)) - { - } - ~MpscQueue() - { - T output; - while (this->dequeue(output)) - { - } - BufferNode *front = head_.load(std::memory_order_relaxed); - delete front; - } - - /** - * @brief Put a item into the queue. - * - * @param input - * @note This method can be called in multiple threads. - */ - void enqueue(T &&input) - { - BufferNode *node{new BufferNode(std::move(input))}; - BufferNode *prevhead{head_.exchange(node, std::memory_order_acq_rel)}; - prevhead->next_.store(node, std::memory_order_release); - } - void enqueue(const T &input) - { - BufferNode *node{new BufferNode(input)}; - BufferNode *prevhead{head_.exchange(node, std::memory_order_acq_rel)}; - prevhead->next_.store(node, std::memory_order_release); - } - - /** - * @brief Get a item from the queue. - * - * @param output - * @return false if the queue is empty. - * @note This method must be called in a single thread. - */ - bool dequeue(T &output) - { - BufferNode *tail = tail_.load(std::memory_order_relaxed); - BufferNode *next = tail->next_.load(std::memory_order_acquire); - - if (next == nullptr) - { - return false; - } - output = std::move(*(next->dataPtr_)); - delete next->dataPtr_; - tail_.store(next, std::memory_order_release); - delete tail; - return true; - } - - bool empty() - { - BufferNode *tail = tail_.load(std::memory_order_relaxed); - BufferNode *next = tail->next_.load(std::memory_order_acquire); - return next == nullptr; - } - - private: - struct BufferNode - { - BufferNode() = default; - BufferNode(const T &data) : dataPtr_(new T(data)) - { - } - BufferNode(T &&data) : dataPtr_(new T(std::move(data))) - { - } - T *dataPtr_; - std::atomic next_{nullptr}; - }; - - std::atomic head_; - std::atomic tail_; -}; - -} // namespace trantor