// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

// Taken from Muduo and modified
// Copyright 2016, Tao An.  All rights reserved.
// https://github.com/an-tao/trantor
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Tao An

#include <trantor/net/EventLoop.h>
#include <trantor/utils/Logger.h>

#include "Channel.h"
#include "Poller.h"
#include "TimerQueue.h"

#include <assert.h>
#include <thread>
#ifdef _WIN32
#include <io.h>
using ssize_t = long long;
#else
#include <poll.h>
#endif
#include <iostream>
#ifdef __linux__
#include <sys/eventfd.h>
#endif
#include <functional>
#ifndef _WIN32
#include <unistd.h>
#endif
#include <fcntl.h>
#include <signal.h>
#include <algorithm>

namespace trantor {
#ifdef __linux__
int createEventfd() {
	int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
	if (evtfd < 0) {
		std::cout << "Failed in eventfd" << std::endl;
		abort();
	}

	return evtfd;
}
const int kPollTimeMs = 10000;
#endif
thread_local EventLoop *t_loopInThisThread = nullptr;

EventLoop::EventLoop() :
		looping_(false),
		threadId_(std::this_thread::get_id()),
		quit_(false),
		poller_(Poller::newPoller(this)),
		currentActiveChannel_(nullptr),
		eventHandling_(false),
		timerQueue_(new TimerQueue(this)),
#ifdef __linux__
		wakeupFd_(createEventfd()),
		wakeupChannelPtr_(new Channel(this, wakeupFd_)),
#endif
		threadLocalLoopPtr_(&t_loopInThisThread) {
	if (t_loopInThisThread) {
		LOG_FATAL << "There is already an EventLoop in this thread";
		exit(-1);
	}
	t_loopInThisThread = this;
#ifdef __linux__
	wakeupChannelPtr_->setReadCallback(std::bind(&EventLoop::wakeupRead, this));
	wakeupChannelPtr_->enableReading();
#elif !defined _WIN32
	auto r = pipe(wakeupFd_);
	(void)r;
	assert(!r);
	fcntl(wakeupFd_[0], F_SETFL, O_NONBLOCK | O_CLOEXEC);
	fcntl(wakeupFd_[1], F_SETFL, O_NONBLOCK | O_CLOEXEC);
	wakeupChannelPtr_ =
			std::unique_ptr<Channel>(new Channel(this, wakeupFd_[0]));
	wakeupChannelPtr_->setReadCallback(std::bind(&EventLoop::wakeupRead, this));
	wakeupChannelPtr_->enableReading();
#else
	poller_->setEventCallback([](uint64_t event) { assert(event == 1); });
#endif
}
#ifdef __linux__
void EventLoop::resetTimerQueue() {
	assertInLoopThread();
	assert(!looping_);
	timerQueue_->reset();
}
#endif
void EventLoop::resetAfterFork() {
	poller_->resetAfterFork();
}
EventLoop::~EventLoop() {
	quit();
	assert(!looping_);
	t_loopInThisThread = nullptr;
#ifdef __linux__
	close(wakeupFd_);
#elif defined _WIN32
#else
	close(wakeupFd_[0]);
	close(wakeupFd_[1]);
#endif
}
EventLoop *EventLoop::getEventLoopOfCurrentThread() {
	return t_loopInThisThread;
}
void EventLoop::updateChannel(Channel *channel) {
	assert(channel->ownerLoop() == this);
	assertInLoopThread();
	poller_->updateChannel(channel);
}
void EventLoop::removeChannel(Channel *channel) {
	assert(channel->ownerLoop() == this);
	assertInLoopThread();
	if (eventHandling_) {
		assert(currentActiveChannel_ == channel ||
				std::find(activeChannels_.begin(),
						activeChannels_.end(),
						channel) == activeChannels_.end());
	}
	poller_->removeChannel(channel);
}
void EventLoop::quit() {
	quit_ = true;

	Func f;
	while (funcsOnQuit_.dequeue(f)) {
		f();
	}

	// There is a chance that loop() just executes while(!quit_) and exits,
	// then EventLoop destructs, then we are accessing an invalid object.
	// Can be fixed using mutex_ in both places.
	if (!isInLoopThread()) {
		wakeup();
	}
}
void EventLoop::loop() {
	assert(!looping_);
	assertInLoopThread();
	looping_ = true;
	quit_ = false;

	while (!quit_) {
		activeChannels_.clear();
#ifdef __linux__
		poller_->poll(kPollTimeMs, &activeChannels_);
#else
		poller_->poll(static_cast<int>(timerQueue_->getTimeout()),
				&activeChannels_);
		timerQueue_->processTimers();
#endif
		// TODO sort channel by priority
		// std::cout<<"after ->poll()"<<std::endl;
		eventHandling_ = true;
		for (auto it = activeChannels_.begin(); it != activeChannels_.end();
				++it) {
			currentActiveChannel_ = *it;
			currentActiveChannel_->handleEvent();
		}
		currentActiveChannel_ = NULL;
		eventHandling_ = false;
		// std::cout << "looping" << endl;
		doRunInLoopFuncs();
	}
	looping_ = false;
}
void EventLoop::abortNotInLoopThread() {
	LOG_FATAL << "It is forbidden to run loop on threads other than event-loop "
				 "thread";
	exit(1);
}
void EventLoop::runInLoop(const Func &cb) {
	if (isInLoopThread()) {
		cb();
	} else {
		queueInLoop(cb);
	}
}
void EventLoop::runInLoop(Func &&cb) {
	if (isInLoopThread()) {
		cb();
	} else {
		queueInLoop(std::move(cb));
	}
}
void EventLoop::queueInLoop(const Func &cb) {
	funcs_.enqueue(cb);
	if (!isInLoopThread() || !looping_) {
		wakeup();
	}
}
void EventLoop::queueInLoop(Func &&cb) {
	funcs_.enqueue(std::move(cb));
	if (!isInLoopThread() || !looping_) {
		wakeup();
	}
}

TimerId EventLoop::runAt(const Date &time, const Func &cb) {
	auto microSeconds =
			time.microSecondsSinceEpoch() - Date::now().microSecondsSinceEpoch();
	std::chrono::steady_clock::time_point tp =
			std::chrono::steady_clock::now() +
			std::chrono::microseconds(microSeconds);
	return timerQueue_->addTimer(cb, tp, std::chrono::microseconds(0));
}
TimerId EventLoop::runAt(const Date &time, Func &&cb) {
	auto microSeconds =
			time.microSecondsSinceEpoch() - Date::now().microSecondsSinceEpoch();
	std::chrono::steady_clock::time_point tp =
			std::chrono::steady_clock::now() +
			std::chrono::microseconds(microSeconds);
	return timerQueue_->addTimer(std::move(cb),
			tp,
			std::chrono::microseconds(0));
}
TimerId EventLoop::runAfter(double delay, const Func &cb) {
	return runAt(Date::date().after(delay), cb);
}
TimerId EventLoop::runAfter(double delay, Func &&cb) {
	return runAt(Date::date().after(delay), std::move(cb));
}
TimerId EventLoop::runEvery(double interval, const Func &cb) {
	std::chrono::microseconds dur(
			static_cast<std::chrono::microseconds::rep>(interval * 1000000));
	auto tp = std::chrono::steady_clock::now() + dur;
	return timerQueue_->addTimer(cb, tp, dur);
}
TimerId EventLoop::runEvery(double interval, Func &&cb) {
	std::chrono::microseconds dur(
			static_cast<std::chrono::microseconds::rep>(interval * 1000000));
	auto tp = std::chrono::steady_clock::now() + dur;
	return timerQueue_->addTimer(std::move(cb), tp, dur);
}
void EventLoop::invalidateTimer(TimerId id) {
	if (isRunning() && timerQueue_)
		timerQueue_->invalidateTimer(id);
}
void EventLoop::doRunInLoopFuncs() {
	callingFuncs_ = true;
	{
		// the destructor for the Func may itself insert a new entry into the
		// queue
		while (!funcs_.empty()) {
			Func func;
			while (funcs_.dequeue(func)) {
				func();
			}
		}
	}
	callingFuncs_ = false;
}
void EventLoop::wakeup() {
	// if (!looping_)
	//     return;
	uint64_t tmp = 1;
#ifdef __linux__
	int ret = write(wakeupFd_, &tmp, sizeof(tmp));
	(void)ret;
#elif defined _WIN32
	poller_->postEvent(1);
#else
	int ret = write(wakeupFd_[1], &tmp, sizeof(tmp));
	(void)ret;
#endif
}
void EventLoop::wakeupRead() {
	ssize_t ret = 0;
#ifdef __linux__
	uint64_t tmp;
	ret = read(wakeupFd_, &tmp, sizeof(tmp));
#elif defined _WIN32
#else
	uint64_t tmp;
	ret = read(wakeupFd_[0], &tmp, sizeof(tmp));
#endif
	if (ret < 0)
		LOG_SYSERR << "wakeup read error";
}

void EventLoop::moveToCurrentThread() {
	if (isRunning()) {
		LOG_FATAL << "EventLoop cannot be moved when running";
		exit(-1);
	}
	if (isInLoopThread()) {
		LOG_WARN << "This EventLoop is already in the current thread";
		return;
	}
	if (t_loopInThisThread) {
		LOG_FATAL << "There is already an EventLoop in this thread, you cannot "
					 "move another in";
		exit(-1);
	}
	*threadLocalLoopPtr_ = nullptr;
	t_loopInThisThread = this;
	threadLocalLoopPtr_ = &t_loopInThisThread;
	threadId_ = std::this_thread::get_id();
}

void EventLoop::runOnQuit(Func &&cb) {
	funcsOnQuit_.enqueue(std::move(cb));
}

void EventLoop::runOnQuit(const Func &cb) {
	funcsOnQuit_.enqueue(cb);
}

} // namespace trantor