Fix compile. The new drogon/trantor based server still needs lots of work/cleanups to replace brynet though.

This commit is contained in:
Relintai 2021-02-03 20:33:36 +01:00
parent 7d000bb5b5
commit 46565958a1
6 changed files with 464 additions and 32 deletions

View File

@ -101,7 +101,6 @@ void HTTPServer::initialize() {
//drogon -> void HttpAppFrameworkImpl::run() //drogon -> void HttpAppFrameworkImpl::run()
void HTTPServer::main_loop() { void HTTPServer::main_loop() {
if (!getLoop()->isInLoopThread()) { if (!getLoop()->isInLoopThread()) {
getLoop()->moveToCurrentThread(); getLoop()->moveToCurrentThread();
} }
@ -251,6 +250,30 @@ void HTTPServer::main_loop() {
get_loop()->loop(); get_loop()->loop();
} }
static void defaultHttpAsyncCallback(
const HttpRequestPtr &,
std::function<void(const HttpResponsePtr &resp)> &&callback)
{
// auto resp = HttpResponse::newNotFoundResponse();
//resp->setCloseConnection(true);
// callback(resp);
}
static void defaultWebSockAsyncCallback(
const HttpRequestPtr &,
std::function<void(const HttpResponsePtr &resp)> &&callback,
const WebSocketConnectionPtr &)
{
//auto resp = HttpResponse::newNotFoundResponse();
//resp->setCloseConnection(true);
//callback(resp);
}
static void defaultConnectionCallback(const trantor::TcpConnectionPtr &)
{
return;
}
HTTPServer::HTTPServer() { HTTPServer::HTTPServer() {
port = 80; port = 80;
threads = 4; threads = 4;
@ -258,28 +281,435 @@ HTTPServer::HTTPServer() {
_running = false; _running = false;
} }
HttpServer::HTTPServer( HTTPServer::HTTPServer(trantor::EventLoop *loop,
EventLoop *loop, const trantor::InetAddress &listenAddr,
const InetAddress &listenAddr,
const std::string &name, const std::string &name,
const std::vector<std::function<HttpResponsePtr(const HttpRequestPtr &)> > const std::vector<std::function<HttpResponsePtr(const HttpRequestPtr &)> >
&syncAdvices) &syncAdvices)
#ifdef __linux__ #ifdef __linux__
: server_(loop, listenAddr, name.c_str()), :
server_(loop, listenAddr, name.c_str()),
#else #else
: server_(loop, listenAddr, name.c_str(), true, false), :
server_(loop, listenAddr, name.c_str(), true, false),
#endif #endif
httpAsyncCallback_(defaultHttpAsyncCallback), httpAsyncCallback_(defaultHttpAsyncCallback),
newWebsocketCallback_(defaultWebSockAsyncCallback), newWebsocketCallback_(defaultWebSockAsyncCallback),
connectionCallback_(defaultConnectionCallback), connectionCallback_(defaultConnectionCallback),
syncAdvices_(syncAdvices) syncAdvices_(syncAdvices) {
{ port = 80;
server_.setConnectionCallback( threads = 4;
std::bind(&HttpServer::onConnection, this, _1)); listenBuilder = nullptr;
server_.setRecvMessageCallback( _running = false;
std::bind(&HttpServer::onMessage, this, _1, _2));
server_.setConnectionCallback(std::bind(&HTTPServer::onConnection, this, _1));
server_.setRecvMessageCallback(std::bind(&HTTPServer::onMessage, this, _1, _2));
} }
HTTPServer::~HTTPServer() { HTTPServer::~HTTPServer() {
delete listenBuilder; delete listenBuilder;
} }
void HTTPServer::onConnection(const trantor::TcpConnectionPtr &conn)
{
/*
if (conn->connected())
{
auto parser = std::make_shared<HttpRequestParser>(conn);
parser->reset();
conn->setContext(parser);
connectionCallback_(conn);
}
else if (conn->disconnected())
{
LOG_TRACE << "conn disconnected!";
connectionCallback_(conn);
auto requestParser = conn->getContext<HttpRequestParser>();
if (requestParser)
{
if (requestParser->webSocketConn())
{
requestParser->webSocketConn()->onClose();
}
conn->clearContext();
}
}
*/
}
void HTTPServer::onMessage(const trantor::TcpConnectionPtr &conn, trantor::MsgBuffer *buf)
{
/*
if (!conn->hasContext())
return;
auto requestParser = conn->getContext<HttpRequestParser>();
if (!requestParser)
return;
// With the pipelining feature or web socket, it is possible to receice
// multiple messages at once, so
// the while loop is necessary
if (requestParser->webSocketConn())
{
// Websocket payload
requestParser->webSocketConn()->onNewMessage(conn, buf);
}
else
{
auto &requests = requestParser->getRequestBuffer();
while (buf->readableBytes() > 0)
{
if (requestParser->isStop())
{
// The number of requests has reached the limit.
buf->retrieveAll();
return;
}
if (!requestParser->parseRequest(buf))
{
requestParser->reset();
conn->forceClose();
return;
}
if (requestParser->gotAll())
{
requestParser->requestImpl()->setPeerAddr(conn->peerAddr());
requestParser->requestImpl()->setLocalAddr(conn->localAddr());
requestParser->requestImpl()->setCreationDate(
trantor::Date::date());
requestParser->requestImpl()->setSecure(
conn->isSSLConnection());
if (requestParser->firstReq() &&
isWebSocket(requestParser->requestImpl()))
{
auto wsConn =
std::make_shared<WebSocketConnectionImpl>(conn);
newWebsocketCallback_(
requestParser->requestImpl(),
[conn, wsConn, requestParser](
const HttpResponsePtr &resp) mutable {
if (conn->connected())
{
if (resp->statusCode() ==
k101SwitchingProtocols)
{
requestParser->setWebsockConnection(wsConn);
}
auto httpString =
((HttpResponseImpl *)resp.get())
->renderToBuffer();
conn->send(httpString);
COZ_PROGRESS
}
},
wsConn);
}
else
requests.push_back(requestParser->requestImpl());
requestParser->reset();
}
else
{
break;
}
}
onRequests(conn, requests, requestParser);
requests.clear();
}
*/
}
void HTTPServer::onRequests(
const trantor::TcpConnectionPtr &conn,
const std::vector<HttpRequestPtr> &requests,
const std::shared_ptr<HttpRequestParser> &requestParser)
{
/*
if (requests.empty())
return;
if (HttpAppFrameworkImpl::instance().keepaliveRequestsNumber() > 0 &&
requestParser->numberOfRequestsParsed() >=
HttpAppFrameworkImpl::instance().keepaliveRequestsNumber())
{
requestParser->stop();
conn->shutdown();
return;
}
else if (HttpAppFrameworkImpl::instance().pipeliningRequestsNumber() > 0 &&
requestParser->numberOfRequestsInPipelining() + requests.size() >=
HttpAppFrameworkImpl::instance().pipeliningRequestsNumber())
{
requestParser->stop();
conn->shutdown();
return;
}
if (!conn->connected())
{
return;
}
auto loopFlagPtr = std::make_shared<bool>(true);
for (auto &req : requests)
{
bool close_ = (!req->keepAlive());
bool isHeadMethod = (req->method() == Head);
if (isHeadMethod)
{
req->setMethod(Get);
}
bool syncFlag = false;
if (!requestParser->emptyPipelining())
{
requestParser->pushRequestToPipelining(req);
syncFlag = true;
}
if (!syncAdvices_.empty())
{
bool adviceFlag = false;
for (auto &advice : syncAdvices_)
{
auto resp = advice(req);
if (resp)
{
resp->setVersion(req->getVersion());
resp->setCloseConnection(close_);
if (!syncFlag)
{
requestParser->getResponseBuffer().emplace_back(
getCompressedResponse(req, resp, isHeadMethod),
isHeadMethod);
}
else
{
requestParser->pushResponseToPipelining(
req,
getCompressedResponse(req, resp, isHeadMethod),
isHeadMethod);
}
adviceFlag = true;
break;
}
}
if (adviceFlag)
continue;
}
httpAsyncCallback_(
req,
[conn,
close_,
req,
loopFlagPtr,
&syncFlag,
isHeadMethod,
this,
requestParser](const HttpResponsePtr &response) {
if (!response)
return;
if (!conn->connected())
return;
response->setVersion(req->getVersion());
response->setCloseConnection(close_);
auto newResp =
getCompressedResponse(req, response, isHeadMethod);
if (conn->getLoop()->isInLoopThread())
{
//*
// * A client that supports persistent connections MAY
// * "pipeline" its requests (i.e., send multiple requests
// * without waiting for each response). A server MUST send
// * its responses to those requests in the same order that
// * the requests were received. rfc2616-8.1.1.2
// *
if (!conn->connected())
return;
if (*loopFlagPtr)
{
syncFlag = true;
if (requestParser->emptyPipelining())
{
requestParser->getResponseBuffer().emplace_back(
newResp, isHeadMethod);
}
else
{
// some earlier requests are waiting for responses;
requestParser->pushResponseToPipelining(
req, newResp, isHeadMethod);
}
}
else if (requestParser->getFirstRequest() == req)
{
requestParser->popFirstRequest();
std::vector<std::pair<HttpResponsePtr, bool>> resps;
resps.emplace_back(newResp, isHeadMethod);
while (!requestParser->emptyPipelining())
{
auto resp = requestParser->getFirstResponse();
if (resp.first)
{
requestParser->popFirstRequest();
resps.push_back(std::move(resp));
}
else
break;
}
sendResponses(conn, resps, requestParser->getBuffer());
}
else
{
// some earlier requests are waiting for responses;
requestParser->pushResponseToPipelining(req,
newResp,
isHeadMethod);
}
}
else
{
conn->getLoop()->queueInLoop([conn,
req,
newResp,
this,
isHeadMethod,
requestParser]() {
if (conn->connected())
{
if (requestParser->getFirstRequest() == req)
{
requestParser->popFirstRequest();
std::vector<std::pair<HttpResponsePtr, bool>>
resps;
resps.emplace_back(newResp, isHeadMethod);
while (!requestParser->emptyPipelining())
{
auto resp =
requestParser->getFirstResponse();
if (resp.first)
{
requestParser->popFirstRequest();
resps.push_back(std::move(resp));
}
else
break;
}
sendResponses(conn,
resps,
requestParser->getBuffer());
}
else
{
// some earlier requests are waiting for
// responses;
requestParser->pushResponseToPipelining(
req, newResp, isHeadMethod);
}
}
});
}
});
if (syncFlag == false)
{
requestParser->pushRequestToPipelining(req);
}
}
*loopFlagPtr = false;
if (conn->connected() && !requestParser->getResponseBuffer().empty())
{
sendResponses(conn,
requestParser->getResponseBuffer(),
requestParser->getBuffer());
requestParser->getResponseBuffer().clear();
}
*/
}
void HTTPServer::sendResponse(const trantor::TcpConnectionPtr &conn,
const HttpResponsePtr &response,
bool isHeadMethod)
{
/*
conn->getLoop()->assertInLoopThread();
auto respImplPtr = static_cast<HttpResponseImpl *>(response.get());
if (!isHeadMethod)
{
auto httpString = respImplPtr->renderToBuffer();
conn->send(httpString);
auto &sendfileName = respImplPtr->sendfileName();
if (!sendfileName.empty())
{
conn->sendFile(sendfileName.c_str());
}
COZ_PROGRESS
}
else
{
auto httpString = respImplPtr->renderHeaderForHeadMethod();
conn->send(std::move(*httpString));
COZ_PROGRESS
}
if (response->ifCloseConnection())
{
conn->shutdown();
COZ_PROGRESS
}
*/
}
void HTTPServer::sendResponses(
const trantor::TcpConnectionPtr &conn,
const std::vector<std::pair<HttpResponsePtr, bool>> &responses,
trantor::MsgBuffer &buffer)
{
/*
conn->getLoop()->assertInLoopThread();
if (responses.empty())
return;
if (responses.size() == 1)
{
sendResponse(conn, responses[0].first, responses[0].second);
return;
}
for (auto const &resp : responses)
{
auto respImplPtr = static_cast<HttpResponseImpl *>(resp.first.get());
if (!resp.second)
{
// Not HEAD method
respImplPtr->renderToBuffer(buffer);
auto &sendfileName = respImplPtr->sendfileName();
if (!sendfileName.empty())
{
conn->send(buffer);
buffer.retrieveAll();
conn->sendFile(sendfileName.c_str());
COZ_PROGRESS
}
}
else
{
auto httpString = respImplPtr->renderHeaderForHeadMethod();
buffer.append(httpString->peek(), httpString->readableBytes());
}
if (respImplPtr->ifCloseConnection())
{
if (buffer.readableBytes() > 0)
{
conn->send(buffer);
buffer.retrieveAll();
COZ_PROGRESS
}
conn->shutdown();
return;
}
}
if (conn->connected() && buffer.readableBytes() > 0)
{
conn->send(buffer);
COZ_PROGRESS
}
buffer.retrieveAll();
*/
}

View File

@ -103,7 +103,7 @@ protected:
HttpAsyncCallback httpAsyncCallback_; HttpAsyncCallback httpAsyncCallback_;
WebSocketNewAsyncCallback newWebsocketCallback_; WebSocketNewAsyncCallback newWebsocketCallback_;
trantor::ConnectionCallback connectionCallback_; trantor::ConnectionCallback connectionCallback_;
const std::vector<std::function<HttpResponsePtr(const HttpRequestPtr &)> > &syncAdvices_; std::vector<std::function<HttpResponsePtr(const HttpRequestPtr &)> > syncAdvices_;
bool _running{ false }; bool _running{ false };
}; };

View File

@ -81,7 +81,7 @@ std::vector<trantor::EventLoop *> ListenerManager::createListeners(
for (auto const &listener : listeners_) { for (auto const &listener : listeners_) {
auto const &ip = listener.ip_; auto const &ip = listener.ip_;
bool isIpv6 = ip.find(':') == std::string::npos ? false : true; bool isIpv6 = ip.find(':') == std::string::npos ? false : true;
std::shared_ptr<HttpServer> serverPtr; std::shared_ptr<HTTPServer> serverPtr;
if (i == 0) { if (i == 0) {
DrogonFileLocker lock; DrogonFileLocker lock;
// Check whether the port is in use. // Check whether the port is in use.
@ -90,13 +90,13 @@ std::vector<trantor::EventLoop *> ListenerManager::createListeners(
"drogonPortTest", "drogonPortTest",
true, true,
false); false);
serverPtr = std::make_shared<HttpServer>( serverPtr = std::make_shared<HTTPServer>(
loopThreadPtr->getLoop(), loopThreadPtr->getLoop(),
InetAddress(ip, listener.port_, isIpv6), InetAddress(ip, listener.port_, isIpv6),
"drogon", "drogon",
syncAdvices); syncAdvices);
} else { } else {
serverPtr = std::make_shared<HttpServer>( serverPtr = std::make_shared<HTTPServer>(
loopThreadPtr->getLoop(), loopThreadPtr->getLoop(),
InetAddress(ip, listener.port_, isIpv6), InetAddress(ip, listener.port_, isIpv6),
"drogon", "drogon",
@ -140,7 +140,7 @@ std::vector<trantor::EventLoop *> ListenerManager::createListeners(
LOG_TRACE << "thread num=" << threadNum; LOG_TRACE << "thread num=" << threadNum;
auto ip = listener.ip_; auto ip = listener.ip_;
bool isIpv6 = ip.find(':') == std::string::npos ? false : true; bool isIpv6 = ip.find(':') == std::string::npos ? false : true;
auto serverPtr = std::make_shared<HttpServer>( auto serverPtr = std::make_shared<HTTPServer>(
loopThreadPtr->getLoop(), loopThreadPtr->getLoop(),
InetAddress(ip, listener.port_, isIpv6), InetAddress(ip, listener.port_, isIpv6),
"drogon", "drogon",

View File

@ -23,7 +23,7 @@
#include "core/http_server_callbacks.h" #include "core/http_server_callbacks.h"
class HttpServer; class HTTPServer;
namespace drogon { namespace drogon {
class ListenerManager : public trantor::NonCopyable { class ListenerManager : public trantor::NonCopyable {
@ -74,7 +74,7 @@ private:
}; };
std::vector<ListenerInfo> listeners_; std::vector<ListenerInfo> listeners_;
std::vector<std::shared_ptr<HttpServer> > servers_; std::vector<std::shared_ptr<HTTPServer> > servers_;
std::vector<std::shared_ptr<trantor::EventLoopThread> > listeningloopThreads_; std::vector<std::shared_ptr<trantor::EventLoopThread> > listeningloopThreads_;
std::shared_ptr<trantor::EventLoopThreadPool> ioLoopThreadPoolPtr_; std::shared_ptr<trantor::EventLoopThreadPool> ioLoopThreadPoolPtr_;
}; };

View File

@ -39,6 +39,8 @@ TcpServer::TcpServer(EventLoop *loop,
std::bind(&TcpServer::newConnection, this, _1, _2)); std::bind(&TcpServer::newConnection, this, _1, _2));
} }
TcpServer::TcpServer() {}
TcpServer::~TcpServer() TcpServer::~TcpServer()
{ {
// loop_->assertInLoopThread(); // loop_->assertInLoopThread();

View File

@ -50,7 +50,7 @@ class TcpServer : NonCopyable
const std::string &name, const std::string &name,
bool reUseAddr = true, bool reUseAddr = true,
bool reUsePort = true); bool reUsePort = true);
TcpServer() {} TcpServer();
~TcpServer(); ~TcpServer();
/** /**