rcpp_framework/web_backends/drogon/trantor/utils/ConcurrentTaskQueue.cc

83 lines
2.0 KiB
C++

/**
*
* ConcurrentTaskQueue.cc
* An Tao
*
* Public header file in trantor lib.
*
* Copyright 2018, An Tao. All rights reserved.
* Use of this source code is governed by a BSD-style license
* that can be found in the License file.
*
*
*/
#include <assert.h>
#include <trantor/utils/ConcurrentTaskQueue.h>
#include <trantor/utils/Logger.h>
#ifdef __linux__
#include <sys/prctl.h>
#endif
using namespace trantor;
ConcurrentTaskQueue::ConcurrentTaskQueue(size_t threadNum,
const std::string &name) :
queueCount_(threadNum), queueName_(name), stop_(false) {
assert(threadNum > 0);
for (unsigned int i = 0; i < queueCount_; ++i) {
threads_.push_back(
std::thread(std::bind(&ConcurrentTaskQueue::queueFunc, this, i)));
}
}
void ConcurrentTaskQueue::runTaskInQueue(const std::function<void()> &task) {
LOG_TRACE << "copy task into queue";
std::lock_guard<std::mutex> lock(taskMutex_);
taskQueue_.push(task);
taskCond_.notify_one();
}
void ConcurrentTaskQueue::runTaskInQueue(std::function<void()> &&task) {
LOG_TRACE << "move task into queue";
std::lock_guard<std::mutex> lock(taskMutex_);
taskQueue_.push(std::move(task));
taskCond_.notify_one();
}
void ConcurrentTaskQueue::queueFunc(int queueNum) {
char tmpName[32];
snprintf(tmpName, sizeof(tmpName), "%s%d", queueName_.c_str(), queueNum);
#ifdef __linux__
::prctl(PR_SET_NAME, tmpName);
#endif
while (!stop_) {
std::function<void()> r;
{
std::unique_lock<std::mutex> lock(taskMutex_);
while (!stop_ && taskQueue_.size() == 0) {
taskCond_.wait(lock);
}
if (taskQueue_.size() > 0) {
LOG_TRACE << "got a new task!";
r = std::move(taskQueue_.front());
taskQueue_.pop();
} else
continue;
}
r();
}
}
size_t ConcurrentTaskQueue::getTaskCount() {
std::lock_guard<std::mutex> guard(taskMutex_);
return taskQueue_.size();
}
void ConcurrentTaskQueue::stop() {
if (!stop_) {
stop_ = true;
taskCond_.notify_all();
for (auto &t : threads_)
t.join();
}
}
ConcurrentTaskQueue::~ConcurrentTaskQueue() {
stop();
}