rcpp_framework/libs/brynet/net/detail/TCPServiceDetail.hpp

178 lines
4.9 KiB
C++
Raw Normal View History

2020-11-24 15:41:18 +01:00
#pragma once
#include <brynet/base/Noexcept.hpp>
2021-04-30 16:10:14 +02:00
#include <brynet/base/NonCopyable.hpp>
2020-11-24 15:41:18 +01:00
#include <brynet/net/SSLHelper.hpp>
#include <brynet/net/Socket.hpp>
2021-04-30 16:10:14 +02:00
#include <brynet/net/TcpConnection.hpp>
#include <brynet/net/detail/ConnectionOption.hpp>
2020-11-24 15:41:18 +01:00
#include <brynet/net/detail/IOLoopData.hpp>
2021-04-30 16:10:14 +02:00
#include <functional>
#include <memory>
#include <random>
#include <thread>
#include <vector>
2020-11-24 15:41:18 +01:00
namespace brynet { namespace net { namespace detail {
2021-04-30 16:10:14 +02:00
class TcpServiceDetail : public brynet::base::NonCopyable
{
protected:
using FrameCallback = std::function<void(const EventLoop::Ptr&)>;
const static unsigned int sDefaultLoopTimeOutMS = 100;
void startWorkerThread(size_t threadNum,
FrameCallback callback = nullptr)
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
std::lock_guard<std::mutex> lck(mServiceGuard);
std::lock_guard<std::mutex> lock(mIOLoopGuard);
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
if (!mIOLoopDatas.empty())
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
return;
}
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
mRunIOLoop = std::make_shared<bool>(true);
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
mIOLoopDatas.resize(threadNum);
for (auto& v : mIOLoopDatas)
{
auto eventLoop = std::make_shared<EventLoop>();
auto runIoLoop = mRunIOLoop;
v = IOLoopData::Create(eventLoop,
std::make_shared<std::thread>(
[callback, runIoLoop, eventLoop]() {
while (*runIoLoop)
{
eventLoop->loopCompareNearTimer(sDefaultLoopTimeOutMS);
if (callback != nullptr)
{
callback(eventLoop);
}
}
}));
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
}
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
void stopWorkerThread()
{
std::lock_guard<std::mutex> lck(mServiceGuard);
std::lock_guard<std::mutex> lock(mIOLoopGuard);
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
*mRunIOLoop = false;
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
for (const auto& v : mIOLoopDatas)
{
v->getEventLoop()->wakeup();
try
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
if (v->getIOThread()->joinable())
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
v->getIOThread()->join();
2020-11-24 15:41:18 +01:00
}
}
2021-04-30 16:10:14 +02:00
catch (std::system_error& e)
{
(void) e;
}
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
mIOLoopDatas.clear();
}
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
bool addTcpConnection(TcpSocket::Ptr socket, ConnectionOption option)
{
if (option.maxRecvBufferSize <= 0)
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
throw BrynetCommonException("buffer size is zero");
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
EventLoop::Ptr eventLoop;
if (option.forceSameThreadLoop)
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
eventLoop = getSameThreadEventLoop();
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
else
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
eventLoop = getRandomEventLoop();
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
if (eventLoop == nullptr)
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
return false;
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
auto wrapperEnterCallback = [option](const TcpConnection::Ptr& tcpConnection) {
for (const auto& callback : option.enterCallback)
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
callback(tcpConnection);
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
};
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
if (option.useSSL && option.sslHelper == nullptr)
{
option.sslHelper = SSLHelper::Create();
}
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
TcpConnection::Create(std::move(socket),
option.maxRecvBufferSize,
wrapperEnterCallback,
eventLoop,
option.sslHelper);
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
return true;
}
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
EventLoop::Ptr getRandomEventLoop()
{
std::lock_guard<std::mutex> lock(mIOLoopGuard);
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
const auto ioLoopSize = mIOLoopDatas.size();
if (ioLoopSize == 0)
{
return nullptr;
}
else if (ioLoopSize == 1)
{
return mIOLoopDatas.front()->getEventLoop();
}
else
{
return mIOLoopDatas[mRandom() % ioLoopSize]->getEventLoop();
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
}
TcpServiceDetail() BRYNET_NOEXCEPT
: mRandom(static_cast<unsigned int>(
std::chrono::system_clock::now().time_since_epoch().count()))
{
mRunIOLoop = std::make_shared<bool>(false);
}
virtual ~TcpServiceDetail() BRYNET_NOEXCEPT
{
stopWorkerThread();
}
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
EventLoop::Ptr getSameThreadEventLoop()
{
std::lock_guard<std::mutex> lock(mIOLoopGuard);
for (const auto& v : mIOLoopDatas)
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
if (v->getEventLoop()->isInLoopThread())
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
return v->getEventLoop();
2020-11-24 15:41:18 +01:00
}
}
2021-04-30 16:10:14 +02:00
return nullptr;
}
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
private:
std::vector<IOLoopDataPtr> mIOLoopDatas;
mutable std::mutex mIOLoopGuard;
std::shared_ptr<bool> mRunIOLoop;
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
std::mutex mServiceGuard;
std::mt19937 mRandom;
};
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
}}}// namespace brynet::net::detail