rcpp_framework/core/loops/event_loop.cpp

336 lines
9.1 KiB
C++
Raw Normal View History

2021-06-17 14:43:29 +02:00
// This file is originally from Moduo -> Trantor - EventLoop.cc
2021-06-17 14:43:29 +02:00
// Copyright (c) 2016-2021, Tao An. All rights reserved.
// Copyright (c) 2010, Shuo Chen. All rights reserved.
2021-06-17 14:43:29 +02:00
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of Tao An nor the names of other contributors
// may be used to endorse or promote products derived from this software
// without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2021-06-17 14:43:29 +02:00
2022-02-10 01:24:36 +01:00
#include "event_loop.h"
#include "core/log/logger.h"
2021-06-17 14:43:29 +02:00
#include "core/net/channel.h"
#include "core/polling/poller.h"
2022-02-10 01:24:36 +01:00
#include "timer_queue.h"
2021-06-17 14:43:29 +02:00
#include <assert.h>
#include <thread>
2021-06-17 14:43:29 +02:00
#ifdef _WIN32
#include <io.h>
using ssize_t = long long;
#else
#include <poll.h>
#endif
#include <iostream>
#ifdef __linux__
#include <sys/eventfd.h>
#endif
#include <functional>
#ifndef _WIN32
#include <unistd.h>
#endif
#include <fcntl.h>
#include <signal.h>
#include <algorithm>
2021-06-17 14:43:29 +02:00
#ifdef __linux__
int createEventfd() {
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0) {
std::cout << "Failed in eventfd" << std::endl;
abort();
}
2021-06-17 14:43:29 +02:00
return evtfd;
2021-06-17 14:43:29 +02:00
}
const int kPollTimeMs = 10000;
#endif
thread_local EventLoop *t_loopInThisThread = nullptr;
EventLoop::EventLoop() :
looping_(false),
threadId_(std::this_thread::get_id()),
quit_(false),
poller_(Poller::newPoller(this)),
currentActiveChannel_(nullptr),
eventHandling_(false),
timerQueue_(new TimerQueue(this)),
2021-06-17 14:43:29 +02:00
#ifdef __linux__
wakeupFd_(createEventfd()),
wakeupChannelPtr_(new Channel(this, wakeupFd_)),
2021-06-17 14:43:29 +02:00
#endif
threadLocalLoopPtr_(&t_loopInThisThread) {
if (t_loopInThisThread) {
LOG_FATAL << "There is already an EventLoop in this thread";
exit(-1);
}
t_loopInThisThread = this;
2021-06-17 14:43:29 +02:00
#ifdef __linux__
wakeupChannelPtr_->setReadCallback(std::bind(&EventLoop::wakeupRead, this));
wakeupChannelPtr_->enableReading();
2021-06-17 14:43:29 +02:00
#elif !defined _WIN32
auto r = pipe(wakeupFd_);
(void)r;
assert(!r);
fcntl(wakeupFd_[0], F_SETFL, O_NONBLOCK | O_CLOEXEC);
fcntl(wakeupFd_[1], F_SETFL, O_NONBLOCK | O_CLOEXEC);
wakeupChannelPtr_ =
std::unique_ptr<Channel>(new Channel(this, wakeupFd_[0]));
wakeupChannelPtr_->setReadCallback(std::bind(&EventLoop::wakeupRead, this));
wakeupChannelPtr_->enableReading();
2021-06-17 14:43:29 +02:00
#else
poller_->setEventCallback([](uint64_t event) { assert(event == 1); });
2021-06-17 14:43:29 +02:00
#endif
}
#ifdef __linux__
void EventLoop::resetTimerQueue() {
assertInLoopThread();
assert(!looping_);
timerQueue_->reset();
2021-06-17 14:43:29 +02:00
}
#endif
void EventLoop::resetAfterFork() {
poller_->resetAfterFork();
2021-06-17 14:43:29 +02:00
}
EventLoop::~EventLoop() {
quit();
assert(!looping_);
t_loopInThisThread = nullptr;
2021-06-17 14:43:29 +02:00
#ifdef __linux__
close(wakeupFd_);
2021-06-17 14:43:29 +02:00
#elif defined _WIN32
#else
close(wakeupFd_[0]);
close(wakeupFd_[1]);
2021-06-17 14:43:29 +02:00
#endif
}
EventLoop *EventLoop::getEventLoopOfCurrentThread() {
return t_loopInThisThread;
2021-06-17 14:43:29 +02:00
}
void EventLoop::updateChannel(Channel *channel) {
assert(channel->ownerLoop() == this);
assertInLoopThread();
poller_->updateChannel(channel);
2021-06-17 14:43:29 +02:00
}
void EventLoop::removeChannel(Channel *channel) {
assert(channel->ownerLoop() == this);
assertInLoopThread();
if (eventHandling_) {
assert(currentActiveChannel_ == channel ||
std::find(activeChannels_.begin(),
activeChannels_.end(),
channel) == activeChannels_.end());
}
poller_->removeChannel(channel);
2021-06-17 14:43:29 +02:00
}
void EventLoop::quit() {
quit_ = true;
2021-06-17 14:43:29 +02:00
Func f;
while (funcsOnQuit_.dequeue(f)) {
f();
}
2021-06-17 14:43:29 +02:00
// There is a chance that loop() just executes while(!quit_) and exits,
// then EventLoop destructs, then we are accessing an invalid object.
// Can be fixed using mutex_ in both places.
if (!isInLoopThread()) {
wakeup();
}
2021-06-17 14:43:29 +02:00
}
void EventLoop::loop() {
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false;
2021-06-17 14:43:29 +02:00
while (!quit_) {
activeChannels_.clear();
2021-06-17 14:43:29 +02:00
#ifdef __linux__
poller_->poll(kPollTimeMs, &activeChannels_);
2021-06-17 14:43:29 +02:00
#else
poller_->poll(static_cast<int>(timerQueue_->getTimeout()),
&activeChannels_);
timerQueue_->processTimers();
2021-06-17 14:43:29 +02:00
#endif
// TODO sort channel by priority
// std::cout<<"after ->poll()"<<std::endl;
eventHandling_ = true;
for (auto it = activeChannels_.begin(); it != activeChannels_.end();
++it) {
currentActiveChannel_ = *it;
currentActiveChannel_->handleEvent();
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
// std::cout << "looping" << endl;
doRunInLoopFuncs();
}
looping_ = false;
2021-06-17 14:43:29 +02:00
}
void EventLoop::abortNotInLoopThread() {
LOG_FATAL << "It is forbidden to run loop on threads other than event-loop "
"thread";
exit(1);
2021-06-17 14:43:29 +02:00
}
void EventLoop::runInLoop(const Func &cb) {
if (isInLoopThread()) {
cb();
} else {
queueInLoop(cb);
}
2021-06-17 14:43:29 +02:00
}
void EventLoop::runInLoop(Func &&cb) {
if (isInLoopThread()) {
cb();
} else {
queueInLoop(std::move(cb));
}
2021-06-17 14:43:29 +02:00
}
void EventLoop::queueInLoop(const Func &cb) {
funcs_.enqueue(cb);
if (!isInLoopThread() || !looping_) {
wakeup();
}
2021-06-17 14:43:29 +02:00
}
void EventLoop::queueInLoop(Func &&cb) {
funcs_.enqueue(std::move(cb));
if (!isInLoopThread() || !looping_) {
wakeup();
}
2021-06-17 14:43:29 +02:00
}
TimerId EventLoop::runAt(const Date &time, const Func &cb) {
auto microSeconds =
time.microSecondsSinceEpoch() - Date::now().microSecondsSinceEpoch();
std::chrono::steady_clock::time_point tp =
std::chrono::steady_clock::now() +
std::chrono::microseconds(microSeconds);
return timerQueue_->addTimer(cb, tp, std::chrono::microseconds(0));
2021-06-17 14:43:29 +02:00
}
TimerId EventLoop::runAt(const Date &time, Func &&cb) {
auto microSeconds =
time.microSecondsSinceEpoch() - Date::now().microSecondsSinceEpoch();
std::chrono::steady_clock::time_point tp =
std::chrono::steady_clock::now() +
std::chrono::microseconds(microSeconds);
return timerQueue_->addTimer(std::move(cb),
tp,
std::chrono::microseconds(0));
2021-06-17 14:43:29 +02:00
}
TimerId EventLoop::runAfter(double delay, const Func &cb) {
return runAt(Date::date().after(delay), cb);
2021-06-17 14:43:29 +02:00
}
TimerId EventLoop::runAfter(double delay, Func &&cb) {
return runAt(Date::date().after(delay), std::move(cb));
2021-06-17 14:43:29 +02:00
}
TimerId EventLoop::runEvery(double interval, const Func &cb) {
std::chrono::microseconds dur(
static_cast<std::chrono::microseconds::rep>(interval * 1000000));
auto tp = std::chrono::steady_clock::now() + dur;
return timerQueue_->addTimer(cb, tp, dur);
2021-06-17 14:43:29 +02:00
}
TimerId EventLoop::runEvery(double interval, Func &&cb) {
std::chrono::microseconds dur(
static_cast<std::chrono::microseconds::rep>(interval * 1000000));
auto tp = std::chrono::steady_clock::now() + dur;
return timerQueue_->addTimer(std::move(cb), tp, dur);
2021-06-17 14:43:29 +02:00
}
void EventLoop::invalidateTimer(TimerId id) {
if (isRunning() && timerQueue_)
timerQueue_->invalidateTimer(id);
2021-06-17 14:43:29 +02:00
}
void EventLoop::doRunInLoopFuncs() {
callingFuncs_ = true;
{
// the destructor for the Func may itself insert a new entry into the
// queue
while (!funcs_.empty()) {
Func func;
while (funcs_.dequeue(func)) {
func();
}
}
}
callingFuncs_ = false;
2021-06-17 14:43:29 +02:00
}
void EventLoop::wakeup() {
// if (!looping_)
// return;
uint64_t tmp = 1;
2021-06-17 14:43:29 +02:00
#ifdef __linux__
int ret = write(wakeupFd_, &tmp, sizeof(tmp));
(void)ret;
2021-06-17 14:43:29 +02:00
#elif defined _WIN32
poller_->postEvent(1);
2021-06-17 14:43:29 +02:00
#else
int ret = write(wakeupFd_[1], &tmp, sizeof(tmp));
(void)ret;
2021-06-17 14:43:29 +02:00
#endif
}
void EventLoop::wakeupRead() {
ssize_t ret = 0;
2021-06-17 14:43:29 +02:00
#ifdef __linux__
uint64_t tmp;
ret = read(wakeupFd_, &tmp, sizeof(tmp));
2021-06-17 14:43:29 +02:00
#elif defined _WIN32
#else
uint64_t tmp;
ret = read(wakeupFd_[0], &tmp, sizeof(tmp));
2021-06-17 14:43:29 +02:00
#endif
if (ret < 0)
LOG_SYSERR << "wakeup read error";
2021-06-17 14:43:29 +02:00
}
void EventLoop::moveToCurrentThread() {
if (isRunning()) {
LOG_FATAL << "EventLoop cannot be moved when running";
exit(-1);
}
if (isInLoopThread()) {
LOG_WARN << "This EventLoop is already in the current thread";
return;
}
if (t_loopInThisThread) {
LOG_FATAL << "There is already an EventLoop in this thread, you cannot "
"move another in";
exit(-1);
}
*threadLocalLoopPtr_ = nullptr;
t_loopInThisThread = this;
threadLocalLoopPtr_ = &t_loopInThisThread;
threadId_ = std::this_thread::get_id();
2021-06-17 14:43:29 +02:00
}
void EventLoop::runOnQuit(Func &&cb) {
funcsOnQuit_.enqueue(std::move(cb));
2021-06-17 14:43:29 +02:00
}
void EventLoop::runOnQuit(const Func &cb) {
funcsOnQuit_.enqueue(cb);
2021-06-17 14:43:29 +02:00
}