#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include class Channel; class TcpConnection; using TcpConnectionPtr = std::shared_ptr; class EventLoop : public NonCopyable { public: using Ptr = std::shared_ptr; using UserFunctor = std::function; public: EventLoop() BRYNET_NOEXCEPT : #ifdef BRYNET_PLATFORM_WINDOWS mIOCP(CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)), mWakeupChannel(std::make_unique(mIOCP)) #elif defined BRYNET_PLATFORM_LINUX mEpollFd(epoll_create(1)) #elif defined BRYNET_PLATFORM_DARWIN mKqueueFd(kqueue()) #endif { #ifdef BRYNET_PLATFORM_WINDOWS mPGetQueuedCompletionStatusEx = NULL; auto kernel32_module = GetModuleHandleA("kernel32.dll"); if (kernel32_module != NULL) { mPGetQueuedCompletionStatusEx = reinterpret_cast(GetProcAddress( kernel32_module, "GetQueuedCompletionStatusEx")); FreeLibrary(kernel32_module); } #elif defined BRYNET_PLATFORM_LINUX auto eventfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); mWakeupChannel.reset(new WakeupChannel(eventfd)); linkChannel(eventfd, mWakeupChannel.get()); #elif defined BRYNET_PLATFORM_DARWIN const int NOTIFY_IDENT = 42; // Magic number we use for our filter ID. mWakeupChannel.reset(new WakeupChannel(mKqueueFd, NOTIFY_IDENT)); //Add user event struct kevent ev; EV_SET(&ev, NOTIFY_IDENT, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, NULL); struct timespec timeout = { 0, 0 }; kevent(mKqueueFd, &ev, 1, NULL, 0, &timeout); #endif mIsAlreadyPostWakeup = false; mIsInBlock = true; reAllocEventSize(1024); mSelfThreadID = -1; mTimer = std::make_shared(); } virtual ~EventLoop() BRYNET_NOEXCEPT { #ifdef BRYNET_PLATFORM_WINDOWS CloseHandle(mIOCP); mIOCP = INVALID_HANDLE_VALUE; #elif defined BRYNET_PLATFORM_LINUX close(mEpollFd); mEpollFd = -1; #elif defined BRYNET_PLATFORM_DARWIN close(mKqueueFd); mKqueueFd = -1; #endif } void loop(int64_t milliseconds) { tryInitThreadID(); #ifndef NDEBUG assert(isInLoopThread()); #endif if (!isInLoopThread()) { throw BrynetCommonException("only loop in io thread"); } if (!mAfterLoopFunctors.empty()) { milliseconds = 0; } #ifdef BRYNET_PLATFORM_WINDOWS ULONG numComplete = 0; if (mPGetQueuedCompletionStatusEx != nullptr) { if (!mPGetQueuedCompletionStatusEx(mIOCP, mEventEntries.data(), static_cast(mEventEntries.size()), &numComplete, static_cast(milliseconds), false)) { numComplete = 0; } } else { for (auto &e : mEventEntries) { const auto timeout = (numComplete == 0) ? static_cast(milliseconds) : 0; /* don't check the return value of GQCS */ GetQueuedCompletionStatus(mIOCP, &e.dwNumberOfBytesTransferred, &e.lpCompletionKey, &e.lpOverlapped, timeout); if (e.lpOverlapped == nullptr) { break; } ++numComplete; } } mIsInBlock = false; for (ULONG i = 0; i < numComplete; ++i) { auto channel = (Channel *)mEventEntries[i].lpCompletionKey; assert(channel != nullptr); const auto ovl = reinterpret_cast(mEventEntries[i].lpOverlapped); if (ovl->OP == port::Win::OverlappedType::OverlappedRecv) { channel->canRecv(false); } else if (ovl->OP == port::Win::OverlappedType::OverlappedSend) { channel->canSend(); } else { assert(false); } } #elif defined BRYNET_PLATFORM_LINUX int numComplete = epoll_wait(mEpollFd, mEventEntries.data(), mEventEntries.size(), milliseconds); mIsInBlock = false; for (int i = 0; i < numComplete; ++i) { auto channel = (Channel *)(mEventEntries[i].data.ptr); auto event_data = mEventEntries[i].events; if (event_data & EPOLLRDHUP) { channel->canRecv(true); channel->onClose(); continue; } if (event_data & EPOLLIN) { channel->canRecv(false); } if (event_data & EPOLLOUT) { channel->canSend(); } } #elif defined BRYNET_PLATFORM_DARWIN struct timespec timeout = { milliseconds / 1000, (milliseconds % 1000) * 1000 * 1000 }; int numComplete = kevent(mKqueueFd, NULL, 0, mEventEntries.data(), mEventEntries.size(), &timeout); mIsInBlock = false; for (int i = 0; i < numComplete; ++i) { auto channel = (Channel *)(mEventEntries[i].udata); const struct kevent &event = mEventEntries[i]; if (event.filter == EVFILT_USER) { continue; } if (event.filter == EVFILT_READ) { channel->canRecv(false); } if (event.filter == EVFILT_WRITE) { channel->canSend(); } } #endif mIsAlreadyPostWakeup = false; mIsInBlock = true; processAsyncFunctors(); processAfterLoopFunctors(); if (static_cast(numComplete) == mEventEntries.size()) { reAllocEventSize(mEventEntries.size() + 128); } mTimer->schedule(); } // loop指定毫秒数,但如果定时器不为空,则loop时间为当前最近定时器的剩余时间和milliseconds的较小值 void loopCompareNearTimer(int64_t milliseconds) { tryInitThreadID(); #ifndef NDEBUG assert(isInLoopThread()); #endif if (!isInLoopThread()) { throw BrynetCommonException("only loop in IO thread"); } if (!mTimer->isEmpty()) { auto nearTimeout = std::chrono::duration_cast(mTimer->nearLeftTime()); milliseconds = std::min(milliseconds, nearTimeout.count()); } loop(milliseconds); } // 返回true表示实际发生了wakeup所需的操作(此返回值不代表接口本身操作成功与否,因为此函数永远成功) bool wakeup() { if (!isInLoopThread() && mIsInBlock && !mIsAlreadyPostWakeup.exchange(true)) { return mWakeupChannel->wakeup(); } return false; } void runAsyncFunctor(UserFunctor &&f) { if (isInLoopThread()) { f(); } else { pushAsyncFunctor(std::move(f)); wakeup(); } } void runFunctorAfterLoop(UserFunctor &&f) { assert(isInLoopThread()); if (!isInLoopThread()) { throw BrynetCommonException("only push after functor in io thread"); } mAfterLoopFunctors.emplace_back(std::move(f)); } Timer::WeakPtr runAfter(std::chrono::nanoseconds timeout, UserFunctor &&callback) { auto timer = std::make_shared( std::chrono::steady_clock::now(), std::chrono::nanoseconds(timeout), std::move(callback)); if (isInLoopThread()) { mTimer->addTimer(timer); } else { auto timerMgr = mTimer; runAsyncFunctor([timerMgr, timer]() { timerMgr->addTimer(timer); }); } return timer; } inline bool isInLoopThread() const { return mSelfThreadID == tid(); } private: void reAllocEventSize(size_t size) { mEventEntries.resize(size); } void processAfterLoopFunctors() { mCopyAfterLoopFunctors.swap(mAfterLoopFunctors); for (const auto &x : mCopyAfterLoopFunctors) { x(); } mCopyAfterLoopFunctors.clear(); } void processAsyncFunctors() { swapAsyncFunctors(); for (const auto &x : mCopyAsyncFunctors) { x(); } mCopyAsyncFunctors.clear(); } void swapAsyncFunctors() { std::lock_guard lck(mAsyncFunctorsMutex); assert(mCopyAsyncFunctors.empty()); mCopyAsyncFunctors.swap(mAsyncFunctors); } void pushAsyncFunctor(UserFunctor &&f) { std::lock_guard lck(mAsyncFunctorsMutex); mAsyncFunctors.emplace_back(std::move(f)); } #ifdef BRYNET_PLATFORM_LINUX int getEpollHandle() const { return mEpollFd; } #elif defined BRYNET_PLATFORM_DARWIN int getKqueueHandle() const { return mKqueueFd; } #endif bool linkChannel(BrynetSocketFD fd, const Channel *ptr) BRYNET_NOEXCEPT { #ifdef BRYNET_PLATFORM_WINDOWS return CreateIoCompletionPort((HANDLE)fd, mIOCP, (ULONG_PTR)ptr, 0) != nullptr; #elif defined BRYNET_PLATFORM_LINUX struct epoll_event ev = { 0, { nullptr } }; ev.events = EPOLLET | EPOLLIN | EPOLLOUT | EPOLLRDHUP; ev.data.ptr = (void *)ptr; return epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, &ev) == 0; #elif defined BRYNET_PLATFORM_DARWIN struct kevent ev[2]; memset(&ev, 0, sizeof(ev)); int n = 0; EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | EV_CLEAR, NOTE_TRIGGER, 0, (void *)ptr); EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | EV_CLEAR, NOTE_TRIGGER, 0, (void *)ptr); struct timespec now = { 0, 0 }; return kevent(mKqueueFd, ev, n, NULL, 0, &now) == 0; #endif } TcpConnectionPtr getTcpConnection(BrynetSocketFD fd) { auto it = mTcpConnections.find(fd); if (it != mTcpConnections.end()) { return (*it).second; } return nullptr; } void addTcpConnection(BrynetSocketFD fd, TcpConnectionPtr tcpConnection) { mTcpConnections[fd] = std::move(tcpConnection); } void removeTcpConnection(BrynetSocketFD fd) { mTcpConnections.erase(fd); } void tryInitThreadID() { std::call_once(mOnceInitThreadID, [this]() { mSelfThreadID = tid(); }); } private: #ifdef BRYNET_PLATFORM_WINDOWS std::vector mEventEntries; typedef BOOL(WINAPI *sGetQueuedCompletionStatusEx)(HANDLE, LPOVERLAPPED_ENTRY, ULONG, PULONG, DWORD, BOOL); sGetQueuedCompletionStatusEx mPGetQueuedCompletionStatusEx; HANDLE mIOCP; #elif defined BRYNET_PLATFORM_LINUX std::vector mEventEntries; int mEpollFd; #elif defined BRYNET_PLATFORM_DARWIN std::vector mEventEntries; int mKqueueFd; #endif std::unique_ptr mWakeupChannel; std::atomic_bool mIsInBlock; std::atomic_bool mIsAlreadyPostWakeup; std::mutex mAsyncFunctorsMutex; std::vector mAsyncFunctors; std::vector mCopyAsyncFunctors; std::vector mAfterLoopFunctors; std::vector mCopyAfterLoopFunctors; std::once_flag mOnceInitThreadID; THREAD_ID_TYPE mSelfThreadID; TimerMgr::Ptr mTimer; std::unordered_map mTcpConnections; friend class TcpConnection; };