rcpp_framework/libs/brynet/net/EventLoop.hpp

445 lines
13 KiB
C++
Raw Normal View History

2020-11-24 15:41:18 +01:00
#pragma once
#include <algorithm>
2021-04-30 16:10:14 +02:00
#include <atomic>
2020-11-24 15:41:18 +01:00
#include <brynet/base/Noexcept.hpp>
2021-04-30 16:10:14 +02:00
#include <brynet/base/NonCopyable.hpp>
#include <brynet/base/Timer.hpp>
2020-11-24 15:41:18 +01:00
#include <brynet/net/Channel.hpp>
2021-04-30 16:10:14 +02:00
#include <brynet/net/CurrentThread.hpp>
2020-11-24 15:41:18 +01:00
#include <brynet/net/Exception.hpp>
2021-04-30 16:10:14 +02:00
#include <brynet/net/Socket.hpp>
2020-11-24 15:41:18 +01:00
#include <brynet/net/detail/WakeupChannel.hpp>
2021-04-30 16:10:14 +02:00
#include <cassert>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <vector>
2020-11-24 15:41:18 +01:00
namespace brynet { namespace net {
2021-04-30 16:10:14 +02:00
class Channel;
class TcpConnection;
using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
class EventLoop : public brynet::base::NonCopyable
{
public:
using Ptr = std::shared_ptr<EventLoop>;
using UserFunctor = std::function<void(void)>;
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
public:
EventLoop()
2020-11-24 15:41:18 +01:00
BRYNET_NOEXCEPT
2021-04-30 16:10:14 +02:00
:
2020-11-24 15:41:18 +01:00
#ifdef BRYNET_PLATFORM_WINDOWS
2021-04-30 16:10:14 +02:00
mIOCP(CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)),
mWakeupChannel(std::make_unique<detail::WakeupChannel>(mIOCP))
2020-11-24 15:41:18 +01:00
#elif defined BRYNET_PLATFORM_LINUX
2021-04-30 16:10:14 +02:00
mEpollFd(epoll_create(1))
2020-11-24 15:41:18 +01:00
#elif defined BRYNET_PLATFORM_DARWIN
2021-04-30 16:10:14 +02:00
mKqueueFd(kqueue())
2020-11-24 15:41:18 +01:00
#endif
2021-04-30 16:10:14 +02:00
{
2020-11-24 15:41:18 +01:00
#ifdef BRYNET_PLATFORM_WINDOWS
2021-04-30 16:10:14 +02:00
mPGetQueuedCompletionStatusEx = NULL;
auto kernel32_module = GetModuleHandleA("kernel32.dll");
if (kernel32_module != NULL)
{
mPGetQueuedCompletionStatusEx = reinterpret_cast<sGetQueuedCompletionStatusEx>(GetProcAddress(
2020-11-24 15:41:18 +01:00
kernel32_module,
"GetQueuedCompletionStatusEx"));
2021-04-30 16:10:14 +02:00
FreeLibrary(kernel32_module);
}
2020-11-24 15:41:18 +01:00
#elif defined BRYNET_PLATFORM_LINUX
2021-04-30 16:10:14 +02:00
auto eventfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
mWakeupChannel.reset(new detail::WakeupChannel(eventfd));
linkChannel(eventfd, mWakeupChannel.get());
2020-11-24 15:41:18 +01:00
#elif defined BRYNET_PLATFORM_DARWIN
2021-04-30 16:10:14 +02:00
const int NOTIFY_IDENT = 42;// Magic number we use for our filter ID.
mWakeupChannel.reset(new detail::WakeupChannel(mKqueueFd, NOTIFY_IDENT));
//Add user event
struct kevent ev;
EV_SET(&ev, NOTIFY_IDENT, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, NULL);
struct timespec timeout = {0, 0};
kevent(mKqueueFd, &ev, 1, NULL, 0, &timeout);
2020-11-24 15:41:18 +01:00
#endif
2021-04-30 16:10:14 +02:00
mIsAlreadyPostWakeup = false;
mIsInBlock = true;
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
reAllocEventSize(1024);
mSelfThreadID = -1;
mTimer = std::make_shared<brynet::base::TimerMgr>();
}
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
virtual ~EventLoop() BRYNET_NOEXCEPT
{
2020-11-24 15:41:18 +01:00
#ifdef BRYNET_PLATFORM_WINDOWS
2021-04-30 16:10:14 +02:00
CloseHandle(mIOCP);
mIOCP = INVALID_HANDLE_VALUE;
2020-11-24 15:41:18 +01:00
#elif defined BRYNET_PLATFORM_LINUX
2021-04-30 16:10:14 +02:00
close(mEpollFd);
mEpollFd = -1;
2020-11-24 15:41:18 +01:00
#elif defined BRYNET_PLATFORM_DARWIN
2021-04-30 16:10:14 +02:00
close(mKqueueFd);
mKqueueFd = -1;
2020-11-24 15:41:18 +01:00
#endif
2021-04-30 16:10:14 +02:00
}
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
void loop(int64_t milliseconds)
{
tryInitThreadID();
2020-11-24 15:41:18 +01:00
#ifndef NDEBUG
2021-04-30 16:10:14 +02:00
assert(isInLoopThread());
2020-11-24 15:41:18 +01:00
#endif
2021-04-30 16:10:14 +02:00
if (!isInLoopThread())
{
throw BrynetCommonException("only loop in io thread");
}
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
if (!mAfterLoopFunctors.empty())
{
milliseconds = 0;
}
2020-11-24 15:41:18 +01:00
#ifdef BRYNET_PLATFORM_WINDOWS
2021-04-30 16:10:14 +02:00
ULONG numComplete = 0;
if (mPGetQueuedCompletionStatusEx != nullptr)
{
if (!mPGetQueuedCompletionStatusEx(mIOCP,
mEventEntries.data(),
static_cast<ULONG>(mEventEntries.size()),
&numComplete,
static_cast<DWORD>(milliseconds),
false))
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
numComplete = 0;
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
}
else
{
for (auto& e : mEventEntries)
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
const auto timeout = (numComplete == 0) ? static_cast<DWORD>(milliseconds) : 0;
/* don't check the return value of GQCS */
GetQueuedCompletionStatus(mIOCP,
&e.dwNumberOfBytesTransferred,
&e.lpCompletionKey,
&e.lpOverlapped,
timeout);
if (e.lpOverlapped == nullptr)
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
break;
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
++numComplete;
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
}
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
mIsInBlock = false;
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
for (ULONG i = 0; i < numComplete; ++i)
{
auto channel = (Channel*) mEventEntries[i].lpCompletionKey;
assert(channel != nullptr);
const auto ovl = reinterpret_cast<const port::Win::OverlappedExt*>(mEventEntries[i].lpOverlapped);
if (ovl->OP == port::Win::OverlappedType::OverlappedRecv)
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
channel->canRecv(false);
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
else if (ovl->OP == port::Win::OverlappedType::OverlappedSend)
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
channel->canSend();
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
else
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
assert(false);
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
}
#elif defined BRYNET_PLATFORM_LINUX
int numComplete = epoll_wait(mEpollFd, mEventEntries.data(), mEventEntries.size(), milliseconds);
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
mIsInBlock = false;
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
for (int i = 0; i < numComplete; ++i)
{
auto channel = (Channel*) (mEventEntries[i].data.ptr);
auto event_data = mEventEntries[i].events;
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
if (event_data & EPOLLRDHUP)
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
channel->canRecv(true);
channel->onClose();
continue;
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
if (event_data & EPOLLIN)
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
channel->canRecv(false);
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
if (event_data & EPOLLOUT)
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
channel->canSend();
2020-11-24 15:41:18 +01:00
}
}
2021-04-30 16:10:14 +02:00
#elif defined BRYNET_PLATFORM_DARWIN
struct timespec timeout = {milliseconds / 1000, (milliseconds % 1000) * 1000 * 1000};
int numComplete = kevent(mKqueueFd, NULL, 0, mEventEntries.data(), mEventEntries.size(), &timeout);
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
mIsInBlock = false;
for (int i = 0; i < numComplete; ++i)
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
auto channel = (Channel*) (mEventEntries[i].udata);
const struct kevent& event = mEventEntries[i];
if (event.filter == EVFILT_USER)
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
continue;
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
if (event.filter == EVFILT_READ)
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
channel->canRecv(false);
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
if (event.filter == EVFILT_WRITE)
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
channel->canSend();
2020-11-24 15:41:18 +01:00
}
}
2021-04-30 16:10:14 +02:00
#endif
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
mIsAlreadyPostWakeup = false;
mIsInBlock = true;
processAsyncFunctors();
processAfterLoopFunctors();
if (static_cast<size_t>(numComplete) == mEventEntries.size())
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
reAllocEventSize(mEventEntries.size() + 128);
}
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
mTimer->schedule();
}
// loop指定毫秒数,但如果定时器不为空,则loop时间为当前最近定时器的剩余时间和milliseconds的较小值
void loopCompareNearTimer(int64_t milliseconds)
{
tryInitThreadID();
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
#ifndef NDEBUG
assert(isInLoopThread());
#endif
if (!isInLoopThread())
{
throw BrynetCommonException("only loop in IO thread");
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
if (!mTimer->isEmpty())
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
auto nearTimeout = std::chrono::duration_cast<std::chrono::milliseconds>(mTimer->nearLeftTime());
milliseconds = std::min<int64_t>(milliseconds, nearTimeout.count());
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
loop(milliseconds);
}
// 返回true表示实际发生了wakeup所需的操作(此返回值不代表接口本身操作成功与否,因为此函数永远成功)
bool wakeup()
{
if (!isInLoopThread() && mIsInBlock && !mIsAlreadyPostWakeup.exchange(true))
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
return mWakeupChannel->wakeup();
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
return false;
}
void runAsyncFunctor(UserFunctor&& f)
{
if (isInLoopThread())
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
f();
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
else
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
pushAsyncFunctor(std::move(f));
wakeup();
}
}
void runFunctorAfterLoop(UserFunctor&& f)
{
assert(isInLoopThread());
if (!isInLoopThread())
{
throw BrynetCommonException("only push after functor in io thread");
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
mAfterLoopFunctors.emplace_back(std::move(f));
}
brynet::base::Timer::WeakPtr runAfter(std::chrono::nanoseconds timeout, UserFunctor&& callback)
{
auto timer = std::make_shared<brynet::base::Timer>(
std::chrono::steady_clock::now(),
std::chrono::nanoseconds(timeout),
std::move(callback));
if (isInLoopThread())
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
mTimer->addTimer(timer);
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
else
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
auto timerMgr = mTimer;
runAsyncFunctor([timerMgr, timer]() {
timerMgr->addTimer(timer);
});
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
return timer;
}
inline bool isInLoopThread() const
{
return mSelfThreadID == current_thread::tid();
}
private:
void reAllocEventSize(size_t size)
{
mEventEntries.resize(size);
}
void processAfterLoopFunctors()
{
mCopyAfterLoopFunctors.swap(mAfterLoopFunctors);
for (const auto& x : mCopyAfterLoopFunctors)
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
x();
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
mCopyAfterLoopFunctors.clear();
}
void processAsyncFunctors()
{
swapAsyncFunctors();
for (const auto& x : mCopyAsyncFunctors)
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
x();
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
mCopyAsyncFunctors.clear();
}
void swapAsyncFunctors()
{
std::lock_guard<std::mutex> lck(mAsyncFunctorsMutex);
assert(mCopyAsyncFunctors.empty());
mCopyAsyncFunctors.swap(mAsyncFunctors);
}
void pushAsyncFunctor(UserFunctor&& f)
{
std::lock_guard<std::mutex> lck(mAsyncFunctorsMutex);
mAsyncFunctors.emplace_back(std::move(f));
}
#ifdef BRYNET_PLATFORM_LINUX
int getEpollHandle() const
{
return mEpollFd;
}
#elif defined BRYNET_PLATFORM_DARWIN
int getKqueueHandle() const
{
return mKqueueFd;
}
2020-11-24 15:41:18 +01:00
#endif
2021-04-30 16:10:14 +02:00
bool linkChannel(BrynetSocketFD fd, const Channel* ptr) BRYNET_NOEXCEPT
{
2020-11-24 15:41:18 +01:00
#ifdef BRYNET_PLATFORM_WINDOWS
2021-04-30 16:10:14 +02:00
return CreateIoCompletionPort((HANDLE) fd, mIOCP, (ULONG_PTR) ptr, 0) != nullptr;
2020-11-24 15:41:18 +01:00
#elif defined BRYNET_PLATFORM_LINUX
2021-04-30 16:10:14 +02:00
struct epoll_event ev = {0,
{
nullptr
}};
ev.events = EPOLLET | EPOLLIN | EPOLLOUT | EPOLLRDHUP;
ev.data.ptr = (void*) ptr;
return epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, &ev) == 0;
2020-11-24 15:41:18 +01:00
#elif defined BRYNET_PLATFORM_DARWIN
2021-04-30 16:10:14 +02:00
struct kevent ev[2];
memset(&ev, 0, sizeof(ev));
int n = 0;
EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | EV_CLEAR, NOTE_TRIGGER, 0, (void*) ptr);
EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | EV_CLEAR, NOTE_TRIGGER, 0, (void*) ptr);
struct timespec now = {0, 0};
return kevent(mKqueueFd, ev, n, NULL, 0, &now) == 0;
2020-11-24 15:41:18 +01:00
#endif
2021-04-30 16:10:14 +02:00
}
TcpConnectionPtr getTcpConnection(BrynetSocketFD fd)
{
auto it = mTcpConnections.find(fd);
if (it != mTcpConnections.end())
2020-11-24 15:41:18 +01:00
{
2021-04-30 16:10:14 +02:00
return (*it).second;
2020-11-24 15:41:18 +01:00
}
2021-04-30 16:10:14 +02:00
return nullptr;
}
void addTcpConnection(BrynetSocketFD fd, TcpConnectionPtr tcpConnection)
{
mTcpConnections[fd] = std::move(tcpConnection);
}
void removeTcpConnection(BrynetSocketFD fd)
{
mTcpConnections.erase(fd);
}
void tryInitThreadID()
{
std::call_once(mOnceInitThreadID, [this]() {
mSelfThreadID = current_thread::tid();
});
}
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
private:
2020-11-24 15:41:18 +01:00
#ifdef BRYNET_PLATFORM_WINDOWS
2021-04-30 16:10:14 +02:00
std::vector<OVERLAPPED_ENTRY> mEventEntries;
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
typedef BOOL(WINAPI* sGetQueuedCompletionStatusEx)(HANDLE, LPOVERLAPPED_ENTRY, ULONG, PULONG, DWORD, BOOL);
sGetQueuedCompletionStatusEx mPGetQueuedCompletionStatusEx;
HANDLE mIOCP;
2020-11-24 15:41:18 +01:00
#elif defined BRYNET_PLATFORM_LINUX
2021-04-30 16:10:14 +02:00
std::vector<epoll_event> mEventEntries;
int mEpollFd;
2020-11-24 15:41:18 +01:00
#elif defined BRYNET_PLATFORM_DARWIN
2021-04-30 16:10:14 +02:00
std::vector<struct kevent> mEventEntries;
int mKqueueFd;
2020-11-24 15:41:18 +01:00
#endif
2021-04-30 16:10:14 +02:00
std::unique_ptr<detail::WakeupChannel> mWakeupChannel;
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
std::atomic_bool mIsInBlock;
std::atomic_bool mIsAlreadyPostWakeup;
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
std::mutex mAsyncFunctorsMutex;
std::vector<UserFunctor> mAsyncFunctors;
std::vector<UserFunctor> mCopyAsyncFunctors;
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
std::vector<UserFunctor> mAfterLoopFunctors;
std::vector<UserFunctor> mCopyAfterLoopFunctors;
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
std::once_flag mOnceInitThreadID;
current_thread::THREAD_ID_TYPE mSelfThreadID;
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
brynet::base::TimerMgr::Ptr mTimer;
std::unordered_map<BrynetSocketFD, TcpConnectionPtr> mTcpConnections;
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
friend class TcpConnection;
};
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
}}// namespace brynet::net