diff --git a/core/http_server.cpp b/core/http_server.cpp index 7213635..f1a24a7 100644 --- a/core/http_server.cpp +++ b/core/http_server.cpp @@ -101,14 +101,13 @@ void HTTPServer::initialize() { //drogon -> void HttpAppFrameworkImpl::run() void HTTPServer::main_loop() { - if (!getLoop()->isInLoopThread()) { getLoop()->moveToCurrentThread(); } LOG_TRACE << "Start to run..."; -/* + /* trantor::AsyncFileLogger asyncFileLogger; // Create dirs for cache files @@ -131,7 +130,7 @@ void HTTPServer::main_loop() { } */ -/* + /* // set relaunching if (relaunchOnError_) { #ifndef _WIN32 @@ -155,7 +154,7 @@ void HTTPServer::main_loop() { //signal(SIGTERM, TERMFunction); -/* + /* // set logger if (!logPath_.empty()) { #ifdef _WIN32 @@ -183,7 +182,7 @@ void HTTPServer::main_loop() { } */ -/* + /* if (relaunchOnError_) { LOG_INFO << "Start child process"; } @@ -193,7 +192,7 @@ void HTTPServer::main_loop() { _running = true; -/* + /* #ifndef _WIN32 if (!libFilePaths_.empty()) { sharedLibManagerPtr_ = std::unique_ptr( @@ -236,7 +235,7 @@ void HTTPServer::main_loop() { websockCtrlsRouterPtr_->init(); */ -/* + /* get_loop()->queueInLoop([this]() { // Let listener event loops run when everything is ready. listenerManagerPtr_->startListening(); @@ -251,6 +250,30 @@ void HTTPServer::main_loop() { get_loop()->loop(); } +static void defaultHttpAsyncCallback( + const HttpRequestPtr &, + std::function &&callback) +{ + // auto resp = HttpResponse::newNotFoundResponse(); + //resp->setCloseConnection(true); + // callback(resp); +} + +static void defaultWebSockAsyncCallback( + const HttpRequestPtr &, + std::function &&callback, + const WebSocketConnectionPtr &) +{ + //auto resp = HttpResponse::newNotFoundResponse(); + //resp->setCloseConnection(true); + //callback(resp); +} + +static void defaultConnectionCallback(const trantor::TcpConnectionPtr &) +{ + return; +} + HTTPServer::HTTPServer() { port = 80; threads = 4; @@ -258,28 +281,435 @@ HTTPServer::HTTPServer() { _running = false; } -HttpServer::HTTPServer( - EventLoop *loop, - const InetAddress &listenAddr, - const std::string &name, - const std::vector> - &syncAdvices) +HTTPServer::HTTPServer(trantor::EventLoop *loop, + const trantor::InetAddress &listenAddr, + const std::string &name, + const std::vector > + &syncAdvices) #ifdef __linux__ - : server_(loop, listenAddr, name.c_str()), + : + server_(loop, listenAddr, name.c_str()), #else - : server_(loop, listenAddr, name.c_str(), true, false), + : + server_(loop, listenAddr, name.c_str(), true, false), #endif - httpAsyncCallback_(defaultHttpAsyncCallback), - newWebsocketCallback_(defaultWebSockAsyncCallback), - connectionCallback_(defaultConnectionCallback), - syncAdvices_(syncAdvices) -{ - server_.setConnectionCallback( - std::bind(&HttpServer::onConnection, this, _1)); - server_.setRecvMessageCallback( - std::bind(&HttpServer::onMessage, this, _1, _2)); + httpAsyncCallback_(defaultHttpAsyncCallback), + newWebsocketCallback_(defaultWebSockAsyncCallback), + connectionCallback_(defaultConnectionCallback), + syncAdvices_(syncAdvices) { + port = 80; + threads = 4; + listenBuilder = nullptr; + _running = false; + + server_.setConnectionCallback(std::bind(&HTTPServer::onConnection, this, _1)); + server_.setRecvMessageCallback(std::bind(&HTTPServer::onMessage, this, _1, _2)); } HTTPServer::~HTTPServer() { delete listenBuilder; +} + + +void HTTPServer::onConnection(const trantor::TcpConnectionPtr &conn) +{ + /* + if (conn->connected()) + { + auto parser = std::make_shared(conn); + parser->reset(); + conn->setContext(parser); + connectionCallback_(conn); + } + else if (conn->disconnected()) + { + LOG_TRACE << "conn disconnected!"; + connectionCallback_(conn); + auto requestParser = conn->getContext(); + if (requestParser) + { + if (requestParser->webSocketConn()) + { + requestParser->webSocketConn()->onClose(); + } + conn->clearContext(); + } + } + */ +} + +void HTTPServer::onMessage(const trantor::TcpConnectionPtr &conn, trantor::MsgBuffer *buf) +{ + /* + if (!conn->hasContext()) + return; + auto requestParser = conn->getContext(); + if (!requestParser) + return; + // With the pipelining feature or web socket, it is possible to receice + // multiple messages at once, so + // the while loop is necessary + if (requestParser->webSocketConn()) + { + // Websocket payload + requestParser->webSocketConn()->onNewMessage(conn, buf); + } + else + { + auto &requests = requestParser->getRequestBuffer(); + while (buf->readableBytes() > 0) + { + if (requestParser->isStop()) + { + // The number of requests has reached the limit. + buf->retrieveAll(); + return; + } + if (!requestParser->parseRequest(buf)) + { + requestParser->reset(); + conn->forceClose(); + return; + } + if (requestParser->gotAll()) + { + requestParser->requestImpl()->setPeerAddr(conn->peerAddr()); + requestParser->requestImpl()->setLocalAddr(conn->localAddr()); + requestParser->requestImpl()->setCreationDate( + trantor::Date::date()); + requestParser->requestImpl()->setSecure( + conn->isSSLConnection()); + if (requestParser->firstReq() && + isWebSocket(requestParser->requestImpl())) + { + auto wsConn = + std::make_shared(conn); + newWebsocketCallback_( + requestParser->requestImpl(), + [conn, wsConn, requestParser]( + const HttpResponsePtr &resp) mutable { + if (conn->connected()) + { + if (resp->statusCode() == + k101SwitchingProtocols) + { + requestParser->setWebsockConnection(wsConn); + } + auto httpString = + ((HttpResponseImpl *)resp.get()) + ->renderToBuffer(); + conn->send(httpString); + COZ_PROGRESS + } + }, + wsConn); + } + else + requests.push_back(requestParser->requestImpl()); + requestParser->reset(); + } + else + { + break; + } + } + onRequests(conn, requests, requestParser); + requests.clear(); + } + */ +} + +void HTTPServer::onRequests( + const trantor::TcpConnectionPtr &conn, + const std::vector &requests, + const std::shared_ptr &requestParser) +{ + /* + if (requests.empty()) + return; + if (HttpAppFrameworkImpl::instance().keepaliveRequestsNumber() > 0 && + requestParser->numberOfRequestsParsed() >= + HttpAppFrameworkImpl::instance().keepaliveRequestsNumber()) + { + requestParser->stop(); + conn->shutdown(); + return; + } + else if (HttpAppFrameworkImpl::instance().pipeliningRequestsNumber() > 0 && + requestParser->numberOfRequestsInPipelining() + requests.size() >= + HttpAppFrameworkImpl::instance().pipeliningRequestsNumber()) + { + requestParser->stop(); + conn->shutdown(); + return; + } + if (!conn->connected()) + { + return; + } + auto loopFlagPtr = std::make_shared(true); + + for (auto &req : requests) + { + bool close_ = (!req->keepAlive()); + bool isHeadMethod = (req->method() == Head); + if (isHeadMethod) + { + req->setMethod(Get); + } + bool syncFlag = false; + if (!requestParser->emptyPipelining()) + { + requestParser->pushRequestToPipelining(req); + syncFlag = true; + } + if (!syncAdvices_.empty()) + { + bool adviceFlag = false; + for (auto &advice : syncAdvices_) + { + auto resp = advice(req); + if (resp) + { + resp->setVersion(req->getVersion()); + resp->setCloseConnection(close_); + if (!syncFlag) + { + requestParser->getResponseBuffer().emplace_back( + getCompressedResponse(req, resp, isHeadMethod), + isHeadMethod); + } + else + { + requestParser->pushResponseToPipelining( + req, + getCompressedResponse(req, resp, isHeadMethod), + isHeadMethod); + } + + adviceFlag = true; + break; + } + } + if (adviceFlag) + continue; + } + httpAsyncCallback_( + req, + [conn, + close_, + req, + loopFlagPtr, + &syncFlag, + isHeadMethod, + this, + requestParser](const HttpResponsePtr &response) { + if (!response) + return; + if (!conn->connected()) + return; + + response->setVersion(req->getVersion()); + response->setCloseConnection(close_); + auto newResp = + getCompressedResponse(req, response, isHeadMethod); + if (conn->getLoop()->isInLoopThread()) + { + //* + // * A client that supports persistent connections MAY + // * "pipeline" its requests (i.e., send multiple requests + // * without waiting for each response). A server MUST send + // * its responses to those requests in the same order that + // * the requests were received. rfc2616-8.1.1.2 + // * + if (!conn->connected()) + return; + if (*loopFlagPtr) + { + syncFlag = true; + if (requestParser->emptyPipelining()) + { + requestParser->getResponseBuffer().emplace_back( + newResp, isHeadMethod); + } + else + { + // some earlier requests are waiting for responses; + requestParser->pushResponseToPipelining( + req, newResp, isHeadMethod); + } + } + else if (requestParser->getFirstRequest() == req) + { + requestParser->popFirstRequest(); + + std::vector> resps; + resps.emplace_back(newResp, isHeadMethod); + while (!requestParser->emptyPipelining()) + { + auto resp = requestParser->getFirstResponse(); + if (resp.first) + { + requestParser->popFirstRequest(); + resps.push_back(std::move(resp)); + } + else + break; + } + sendResponses(conn, resps, requestParser->getBuffer()); + } + else + { + // some earlier requests are waiting for responses; + requestParser->pushResponseToPipelining(req, + newResp, + isHeadMethod); + } + } + else + { + conn->getLoop()->queueInLoop([conn, + req, + newResp, + this, + isHeadMethod, + requestParser]() { + if (conn->connected()) + { + if (requestParser->getFirstRequest() == req) + { + requestParser->popFirstRequest(); + std::vector> + resps; + resps.emplace_back(newResp, isHeadMethod); + while (!requestParser->emptyPipelining()) + { + auto resp = + requestParser->getFirstResponse(); + if (resp.first) + { + requestParser->popFirstRequest(); + resps.push_back(std::move(resp)); + } + else + break; + } + sendResponses(conn, + resps, + requestParser->getBuffer()); + } + else + { + // some earlier requests are waiting for + // responses; + requestParser->pushResponseToPipelining( + req, newResp, isHeadMethod); + } + } + }); + } + }); + if (syncFlag == false) + { + requestParser->pushRequestToPipelining(req); + } + } + *loopFlagPtr = false; + if (conn->connected() && !requestParser->getResponseBuffer().empty()) + { + sendResponses(conn, + requestParser->getResponseBuffer(), + requestParser->getBuffer()); + requestParser->getResponseBuffer().clear(); + } + */ +} + +void HTTPServer::sendResponse(const trantor::TcpConnectionPtr &conn, + const HttpResponsePtr &response, + bool isHeadMethod) +{ + /* + conn->getLoop()->assertInLoopThread(); + auto respImplPtr = static_cast(response.get()); + if (!isHeadMethod) + { + auto httpString = respImplPtr->renderToBuffer(); + conn->send(httpString); + auto &sendfileName = respImplPtr->sendfileName(); + if (!sendfileName.empty()) + { + conn->sendFile(sendfileName.c_str()); + } + COZ_PROGRESS + } + else + { + auto httpString = respImplPtr->renderHeaderForHeadMethod(); + conn->send(std::move(*httpString)); + COZ_PROGRESS + } + + if (response->ifCloseConnection()) + { + conn->shutdown(); + COZ_PROGRESS + } + */ +} + +void HTTPServer::sendResponses( + const trantor::TcpConnectionPtr &conn, + const std::vector> &responses, + trantor::MsgBuffer &buffer) +{ + /* + conn->getLoop()->assertInLoopThread(); + if (responses.empty()) + return; + if (responses.size() == 1) + { + sendResponse(conn, responses[0].first, responses[0].second); + return; + } + for (auto const &resp : responses) + { + auto respImplPtr = static_cast(resp.first.get()); + if (!resp.second) + { + // Not HEAD method + respImplPtr->renderToBuffer(buffer); + auto &sendfileName = respImplPtr->sendfileName(); + if (!sendfileName.empty()) + { + conn->send(buffer); + buffer.retrieveAll(); + conn->sendFile(sendfileName.c_str()); + COZ_PROGRESS + } + } + else + { + auto httpString = respImplPtr->renderHeaderForHeadMethod(); + buffer.append(httpString->peek(), httpString->readableBytes()); + } + if (respImplPtr->ifCloseConnection()) + { + if (buffer.readableBytes() > 0) + { + conn->send(buffer); + buffer.retrieveAll(); + COZ_PROGRESS + } + conn->shutdown(); + return; + } + } + if (conn->connected() && buffer.readableBytes() > 0) + { + conn->send(buffer); + COZ_PROGRESS + } + buffer.retrieveAll(); + */ } \ No newline at end of file diff --git a/core/http_server.h b/core/http_server.h index ac99452..13f32b9 100644 --- a/core/http_server.h +++ b/core/http_server.h @@ -103,7 +103,7 @@ protected: HttpAsyncCallback httpAsyncCallback_; WebSocketNewAsyncCallback newWebsocketCallback_; trantor::ConnectionCallback connectionCallback_; - const std::vector > &syncAdvices_; + std::vector > syncAdvices_; bool _running{ false }; }; diff --git a/core/listener_manager.cpp b/core/listener_manager.cpp index 8b4d53b..4895840 100644 --- a/core/listener_manager.cpp +++ b/core/listener_manager.cpp @@ -81,7 +81,7 @@ std::vector ListenerManager::createListeners( for (auto const &listener : listeners_) { auto const &ip = listener.ip_; bool isIpv6 = ip.find(':') == std::string::npos ? false : true; - std::shared_ptr serverPtr; + std::shared_ptr serverPtr; if (i == 0) { DrogonFileLocker lock; // Check whether the port is in use. @@ -90,13 +90,13 @@ std::vector ListenerManager::createListeners( "drogonPortTest", true, false); - serverPtr = std::make_shared( + serverPtr = std::make_shared( loopThreadPtr->getLoop(), InetAddress(ip, listener.port_, isIpv6), "drogon", syncAdvices); } else { - serverPtr = std::make_shared( + serverPtr = std::make_shared( loopThreadPtr->getLoop(), InetAddress(ip, listener.port_, isIpv6), "drogon", @@ -140,7 +140,7 @@ std::vector ListenerManager::createListeners( LOG_TRACE << "thread num=" << threadNum; auto ip = listener.ip_; bool isIpv6 = ip.find(':') == std::string::npos ? false : true; - auto serverPtr = std::make_shared( + auto serverPtr = std::make_shared( loopThreadPtr->getLoop(), InetAddress(ip, listener.port_, isIpv6), "drogon", diff --git a/core/listener_manager.h b/core/listener_manager.h index 8943b3a..da9029a 100644 --- a/core/listener_manager.h +++ b/core/listener_manager.h @@ -23,7 +23,7 @@ #include "core/http_server_callbacks.h" -class HttpServer; +class HTTPServer; namespace drogon { class ListenerManager : public trantor::NonCopyable { @@ -74,7 +74,7 @@ private: }; std::vector listeners_; - std::vector > servers_; + std::vector > servers_; std::vector > listeningloopThreads_; std::shared_ptr ioLoopThreadPoolPtr_; }; diff --git a/libs/trantor/trantor/net/TcpServer.cc b/libs/trantor/trantor/net/TcpServer.cc index 523301c..d866e14 100644 --- a/libs/trantor/trantor/net/TcpServer.cc +++ b/libs/trantor/trantor/net/TcpServer.cc @@ -39,6 +39,8 @@ TcpServer::TcpServer(EventLoop *loop, std::bind(&TcpServer::newConnection, this, _1, _2)); } +TcpServer::TcpServer() {} + TcpServer::~TcpServer() { // loop_->assertInLoopThread(); diff --git a/libs/trantor/trantor/net/TcpServer.h b/libs/trantor/trantor/net/TcpServer.h index c82808e..bd38d1c 100644 --- a/libs/trantor/trantor/net/TcpServer.h +++ b/libs/trantor/trantor/net/TcpServer.h @@ -50,7 +50,7 @@ class TcpServer : NonCopyable const std::string &name, bool reUseAddr = true, bool reUsePort = true); - TcpServer() {} + TcpServer(); ~TcpServer(); /**