// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) // Taken from Muduo and modified // Copyright 2016, Tao An. All rights reserved. // https://github.com/an-tao/trantor // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Tao An #include #include #include "Channel.h" #include "Poller.h" #include "TimerQueue.h" #include #include #ifdef _WIN32 #include using ssize_t = long long; #else #include #endif #include #ifdef __linux__ #include #endif #include #ifndef _WIN32 #include #endif #include #include #include namespace trantor { #ifdef __linux__ int createEventfd() { int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (evtfd < 0) { std::cout << "Failed in eventfd" << std::endl; abort(); } return evtfd; } const int kPollTimeMs = 10000; #endif thread_local EventLoop *t_loopInThisThread = nullptr; EventLoop::EventLoop() : looping_(false), threadId_(std::this_thread::get_id()), quit_(false), poller_(Poller::newPoller(this)), currentActiveChannel_(nullptr), eventHandling_(false), timerQueue_(new TimerQueue(this)), #ifdef __linux__ wakeupFd_(createEventfd()), wakeupChannelPtr_(new Channel(this, wakeupFd_)), #endif threadLocalLoopPtr_(&t_loopInThisThread) { if (t_loopInThisThread) { LOG_FATAL << "There is already an EventLoop in this thread"; exit(-1); } t_loopInThisThread = this; #ifdef __linux__ wakeupChannelPtr_->setReadCallback(std::bind(&EventLoop::wakeupRead, this)); wakeupChannelPtr_->enableReading(); #elif !defined _WIN32 auto r = pipe(wakeupFd_); (void)r; assert(!r); fcntl(wakeupFd_[0], F_SETFL, O_NONBLOCK | O_CLOEXEC); fcntl(wakeupFd_[1], F_SETFL, O_NONBLOCK | O_CLOEXEC); wakeupChannelPtr_ = std::unique_ptr(new Channel(this, wakeupFd_[0])); wakeupChannelPtr_->setReadCallback(std::bind(&EventLoop::wakeupRead, this)); wakeupChannelPtr_->enableReading(); #else poller_->setEventCallback([](uint64_t event) { assert(event == 1); }); #endif } #ifdef __linux__ void EventLoop::resetTimerQueue() { assertInLoopThread(); assert(!looping_); timerQueue_->reset(); } #endif void EventLoop::resetAfterFork() { poller_->resetAfterFork(); } EventLoop::~EventLoop() { quit(); assert(!looping_); t_loopInThisThread = nullptr; #ifdef __linux__ close(wakeupFd_); #elif defined _WIN32 #else close(wakeupFd_[0]); close(wakeupFd_[1]); #endif } EventLoop *EventLoop::getEventLoopOfCurrentThread() { return t_loopInThisThread; } void EventLoop::updateChannel(Channel *channel) { assert(channel->ownerLoop() == this); assertInLoopThread(); poller_->updateChannel(channel); } void EventLoop::removeChannel(Channel *channel) { assert(channel->ownerLoop() == this); assertInLoopThread(); if (eventHandling_) { assert(currentActiveChannel_ == channel || std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end()); } poller_->removeChannel(channel); } void EventLoop::quit() { quit_ = true; Func f; while (funcsOnQuit_.dequeue(f)) { f(); } // There is a chance that loop() just executes while(!quit_) and exits, // then EventLoop destructs, then we are accessing an invalid object. // Can be fixed using mutex_ in both places. if (!isInLoopThread()) { wakeup(); } } void EventLoop::loop() { assert(!looping_); assertInLoopThread(); looping_ = true; quit_ = false; while (!quit_) { activeChannels_.clear(); #ifdef __linux__ poller_->poll(kPollTimeMs, &activeChannels_); #else poller_->poll(static_cast(timerQueue_->getTimeout()), &activeChannels_); timerQueue_->processTimers(); #endif // TODO sort channel by priority // std::cout<<"after ->poll()"<handleEvent(); } currentActiveChannel_ = NULL; eventHandling_ = false; // std::cout << "looping" << endl; doRunInLoopFuncs(); } looping_ = false; } void EventLoop::abortNotInLoopThread() { LOG_FATAL << "It is forbidden to run loop on threads other than event-loop " "thread"; exit(1); } void EventLoop::runInLoop(const Func &cb) { if (isInLoopThread()) { cb(); } else { queueInLoop(cb); } } void EventLoop::runInLoop(Func &&cb) { if (isInLoopThread()) { cb(); } else { queueInLoop(std::move(cb)); } } void EventLoop::queueInLoop(const Func &cb) { funcs_.enqueue(cb); if (!isInLoopThread() || !looping_) { wakeup(); } } void EventLoop::queueInLoop(Func &&cb) { funcs_.enqueue(std::move(cb)); if (!isInLoopThread() || !looping_) { wakeup(); } } TimerId EventLoop::runAt(const Date &time, const Func &cb) { auto microSeconds = time.microSecondsSinceEpoch() - Date::now().microSecondsSinceEpoch(); std::chrono::steady_clock::time_point tp = std::chrono::steady_clock::now() + std::chrono::microseconds(microSeconds); return timerQueue_->addTimer(cb, tp, std::chrono::microseconds(0)); } TimerId EventLoop::runAt(const Date &time, Func &&cb) { auto microSeconds = time.microSecondsSinceEpoch() - Date::now().microSecondsSinceEpoch(); std::chrono::steady_clock::time_point tp = std::chrono::steady_clock::now() + std::chrono::microseconds(microSeconds); return timerQueue_->addTimer(std::move(cb), tp, std::chrono::microseconds(0)); } TimerId EventLoop::runAfter(double delay, const Func &cb) { return runAt(Date::date().after(delay), cb); } TimerId EventLoop::runAfter(double delay, Func &&cb) { return runAt(Date::date().after(delay), std::move(cb)); } TimerId EventLoop::runEvery(double interval, const Func &cb) { std::chrono::microseconds dur( static_cast(interval * 1000000)); auto tp = std::chrono::steady_clock::now() + dur; return timerQueue_->addTimer(cb, tp, dur); } TimerId EventLoop::runEvery(double interval, Func &&cb) { std::chrono::microseconds dur( static_cast(interval * 1000000)); auto tp = std::chrono::steady_clock::now() + dur; return timerQueue_->addTimer(std::move(cb), tp, dur); } void EventLoop::invalidateTimer(TimerId id) { if (isRunning() && timerQueue_) timerQueue_->invalidateTimer(id); } void EventLoop::doRunInLoopFuncs() { callingFuncs_ = true; { // the destructor for the Func may itself insert a new entry into the // queue while (!funcs_.empty()) { Func func; while (funcs_.dequeue(func)) { func(); } } } callingFuncs_ = false; } void EventLoop::wakeup() { // if (!looping_) // return; uint64_t tmp = 1; #ifdef __linux__ int ret = write(wakeupFd_, &tmp, sizeof(tmp)); (void)ret; #elif defined _WIN32 poller_->postEvent(1); #else int ret = write(wakeupFd_[1], &tmp, sizeof(tmp)); (void)ret; #endif } void EventLoop::wakeupRead() { ssize_t ret = 0; #ifdef __linux__ uint64_t tmp; ret = read(wakeupFd_, &tmp, sizeof(tmp)); #elif defined _WIN32 #else uint64_t tmp; ret = read(wakeupFd_[0], &tmp, sizeof(tmp)); #endif if (ret < 0) LOG_SYSERR << "wakeup read error"; } void EventLoop::moveToCurrentThread() { if (isRunning()) { LOG_FATAL << "EventLoop cannot be moved when running"; exit(-1); } if (isInLoopThread()) { LOG_WARN << "This EventLoop is already in the current thread"; return; } if (t_loopInThisThread) { LOG_FATAL << "There is already an EventLoop in this thread, you cannot " "move another in"; exit(-1); } *threadLocalLoopPtr_ = nullptr; t_loopInThisThread = this; threadLocalLoopPtr_ = &t_loopInThisThread; threadId_ = std::this_thread::get_id(); } void EventLoop::runOnQuit(Func &&cb) { funcsOnQuit_.enqueue(std::move(cb)); } void EventLoop::runOnQuit(const Func &cb) { funcsOnQuit_.enqueue(cb); } } // namespace trantor