Moved lock free queue to core.

This commit is contained in:
Relintai 2022-02-10 00:56:28 +01:00
parent aeaecd4b76
commit 6a29af9963
3 changed files with 111 additions and 115 deletions

View File

@ -0,0 +1,110 @@
/**
*
* @file LockFreeQueue.h
* @author An Tao
*
* Public header file in trantor lib.
*
* Copyright 2018, An Tao. All rights reserved.
* Use of this source code is governed by a BSD-style license
* that can be found in the License file.
*
*
*/
#pragma once
#include <assert.h>
#include <trantor/utils/NonCopyable.h>
#include <atomic>
#include <memory>
#include <type_traits>
namespace trantor {
/**
* @brief This class template represents a lock-free multiple producers single
* consumer queue
*
* @tparam T The type of the items in the queue.
*/
template <typename T>
class MpscQueue {
protected:
MpscQueue(const MpscQueue &) = delete;
MpscQueue &operator=(const MpscQueue &) = delete;
// some uncopyable classes maybe support move constructor....
MpscQueue(MpscQueue &&) noexcept(true) = default;
MpscQueue &operator=(MpscQueue &&) noexcept(true) = default;
public:
MpscQueue() :
head_(new BufferNode), tail_(head_.load(std::memory_order_relaxed)) {
}
~MpscQueue() {
T output;
while (this->dequeue(output)) {
}
BufferNode *front = head_.load(std::memory_order_relaxed);
delete front;
}
/**
* @brief Put a item into the queue.
*
* @param input
* @note This method can be called in multiple threads.
*/
void enqueue(T &&input) {
BufferNode *node{ new BufferNode(std::move(input)) };
BufferNode *prevhead{ head_.exchange(node, std::memory_order_acq_rel) };
prevhead->next_.store(node, std::memory_order_release);
}
void enqueue(const T &input) {
BufferNode *node{ new BufferNode(input) };
BufferNode *prevhead{ head_.exchange(node, std::memory_order_acq_rel) };
prevhead->next_.store(node, std::memory_order_release);
}
/**
* @brief Get a item from the queue.
*
* @param output
* @return false if the queue is empty.
* @note This method must be called in a single thread.
*/
bool dequeue(T &output) {
BufferNode *tail = tail_.load(std::memory_order_relaxed);
BufferNode *next = tail->next_.load(std::memory_order_acquire);
if (next == nullptr) {
return false;
}
output = std::move(*(next->dataPtr_));
delete next->dataPtr_;
tail_.store(next, std::memory_order_release);
delete tail;
return true;
}
bool empty() {
BufferNode *tail = tail_.load(std::memory_order_relaxed);
BufferNode *next = tail->next_.load(std::memory_order_acquire);
return next == nullptr;
}
private:
struct BufferNode {
BufferNode() = default;
BufferNode(const T &data) :
dataPtr_(new T(data)) {
}
BufferNode(T &&data) :
dataPtr_(new T(std::move(data))) {
}
T *dataPtr_;
std::atomic<BufferNode *> next_{ nullptr };
};
std::atomic<BufferNode *> head_;
std::atomic<BufferNode *> tail_;
};
} // namespace trantor

View File

@ -18,7 +18,7 @@
#pragma once
#include <trantor/utils/NonCopyable.h>
#include "core/math/date.h"
#include <trantor/utils/LockFreeQueue.h>
#include "core/containers/lock_free_queue.h"
#include <trantor/exports.h>
#include <thread>
#include <memory>

View File

@ -1,114 +0,0 @@
/**
*
* @file LockFreeQueue.h
* @author An Tao
*
* Public header file in trantor lib.
*
* Copyright 2018, An Tao. All rights reserved.
* Use of this source code is governed by a BSD-style license
* that can be found in the License file.
*
*
*/
#pragma once
#include <trantor/utils/NonCopyable.h>
#include <atomic>
#include <type_traits>
#include <memory>
#include <assert.h>
namespace trantor
{
/**
* @brief This class template represents a lock-free multiple producers single
* consumer queue
*
* @tparam T The type of the items in the queue.
*/
template <typename T>
class MpscQueue : public NonCopyable
{
public:
MpscQueue()
: head_(new BufferNode), tail_(head_.load(std::memory_order_relaxed))
{
}
~MpscQueue()
{
T output;
while (this->dequeue(output))
{
}
BufferNode *front = head_.load(std::memory_order_relaxed);
delete front;
}
/**
* @brief Put a item into the queue.
*
* @param input
* @note This method can be called in multiple threads.
*/
void enqueue(T &&input)
{
BufferNode *node{new BufferNode(std::move(input))};
BufferNode *prevhead{head_.exchange(node, std::memory_order_acq_rel)};
prevhead->next_.store(node, std::memory_order_release);
}
void enqueue(const T &input)
{
BufferNode *node{new BufferNode(input)};
BufferNode *prevhead{head_.exchange(node, std::memory_order_acq_rel)};
prevhead->next_.store(node, std::memory_order_release);
}
/**
* @brief Get a item from the queue.
*
* @param output
* @return false if the queue is empty.
* @note This method must be called in a single thread.
*/
bool dequeue(T &output)
{
BufferNode *tail = tail_.load(std::memory_order_relaxed);
BufferNode *next = tail->next_.load(std::memory_order_acquire);
if (next == nullptr)
{
return false;
}
output = std::move(*(next->dataPtr_));
delete next->dataPtr_;
tail_.store(next, std::memory_order_release);
delete tail;
return true;
}
bool empty()
{
BufferNode *tail = tail_.load(std::memory_order_relaxed);
BufferNode *next = tail->next_.load(std::memory_order_acquire);
return next == nullptr;
}
private:
struct BufferNode
{
BufferNode() = default;
BufferNode(const T &data) : dataPtr_(new T(data))
{
}
BufferNode(T &&data) : dataPtr_(new T(std::move(data)))
{
}
T *dataPtr_;
std::atomic<BufferNode *> next_{nullptr};
};
std::atomic<BufferNode *> head_;
std::atomic<BufferNode *> tail_;
};
} // namespace trantor