From bf688577fb932e7e04764694ca8a6b08e431b275 Mon Sep 17 00:00:00 2001 From: Relintai Date: Fri, 29 Jan 2021 17:26:17 +0100 Subject: [PATCH] Updated brynet to 1.0.9. --- core/http_server.cpp | 6 +- libs/brynet/Version.hpp | 2 +- libs/brynet/base/CPP_VERSION.hpp | 2 +- libs/brynet/base/Packet.hpp | 73 ++++++-- libs/brynet/net/PromiseReceive.hpp | 7 +- libs/brynet/net/TcpConnection.hpp | 249 ++++++++++++++++++++------- libs/brynet/net/http/HttpService.hpp | 25 +-- 7 files changed, 268 insertions(+), 96 deletions(-) diff --git a/core/http_server.cpp b/core/http_server.cpp index 14d568f..3b09d20 100644 --- a/core/http_server.cpp +++ b/core/http_server.cpp @@ -29,11 +29,11 @@ void HTTPServer::wsEnterCallbackDefault(const HttpSession::Ptr &httpSession, Web std::cout << "frame enter of type:" << int(opcode) << std::endl; std::cout << "payload is:" << payload << std::endl; // echo frame - auto frame = std::make_shared(); + //auto frame = std::make_shared(); - WebSocketFormat::wsFrameBuild(payload.c_str(), payload.size(), *frame, WebSocketFormat::WebSocketFrameType::TEXT_FRAME, true, false); + //WebSocketFormat::wsFrameBuild(payload.c_str(), payload.size(), *frame, WebSocketFormat::WebSocketFrameType::TEXT_FRAME, true, false); - httpSession->send(frame); + //httpSession->send(frame); } void HTTPServer::configure() { diff --git a/libs/brynet/Version.hpp b/libs/brynet/Version.hpp index 97f985a..80b0939 100644 --- a/libs/brynet/Version.hpp +++ b/libs/brynet/Version.hpp @@ -1,3 +1,3 @@ #pragma once -#define BRYNET_VERSION 1008000 +#define BRYNET_VERSION 1009000 diff --git a/libs/brynet/base/CPP_VERSION.hpp b/libs/brynet/base/CPP_VERSION.hpp index f32b49d..fb65c91 100644 --- a/libs/brynet/base/CPP_VERSION.hpp +++ b/libs/brynet/base/CPP_VERSION.hpp @@ -11,6 +11,6 @@ #endif #if (__cplusplus >= 201703L || \ - (defined(_MSC_VER) && _MSC_VER >= 1910)) + (defined(_MSVC_LANG) && _MSVC_LANG >= 201703L)) #define BRYNET_HAVE_LANG_CXX17 1 #endif \ No newline at end of file diff --git a/libs/brynet/base/Packet.hpp b/libs/brynet/base/Packet.hpp index 7f18d0c..8cd451d 100644 --- a/libs/brynet/base/Packet.hpp +++ b/libs/brynet/base/Packet.hpp @@ -16,7 +16,7 @@ namespace brynet { namespace base { public: BasePacketWriter(char* buffer, size_t len, - bool useBigEndian = true, + bool useBigEndian = false, bool isAutoMalloc = false) : mIsAutoMalloc(isAutoMalloc), @@ -248,49 +248,85 @@ namespace brynet { namespace base { public: BasePacketReader(const char* buffer, size_t len, - bool useBigEndian = true) : - mBigEndian(useBigEndian), - mMaxLen(len) + bool useBigEndian = false) : + mBigEndian(useBigEndian), + mSize(len) { mPos = 0; + mSavedPos = 0; mBuffer = buffer; } virtual ~BasePacketReader() = default; + void useBigEndian() + { + mBigEndian = true; + } + + void useLittleEndian() + { + mBigEndian = false; + } + + void savePos() + { + mSavedPos = mPos; + } + + size_t savedPos() const + { + return mSavedPos; + } + size_t getLeft() const { - if (mPos > mMaxLen) + if (mPos > mSize) { throw std::out_of_range("current pos is greater than max len"); } - return mMaxLen - mPos; + return mSize - mPos; } - const char* getBuffer() const + bool enough(size_t len) const + { + if (mPos > mSize) + { + return false; + } + return (mSize - mPos) >= len; + } + + const char* begin() const { return mBuffer; } - void skipAll() + const char* currentBuffer() const { - mPos = mMaxLen; + return mBuffer+mPos; } - size_t getPos() const + void consumeAll() + { + mPos = mSize; + savePos(); + } + + size_t currentPos() const { return mPos; } - size_t getMaxPos() const + size_t size() const { - return mMaxLen; + return mSize; } void addPos(size_t diff) { const auto tmpPos = mPos + diff; - if (tmpPos > mMaxLen) + if (tmpPos > mSize) { throw std::out_of_range("diff is to big"); } @@ -360,11 +396,11 @@ namespace brynet { namespace base { void read(T& value) { static_assert(std::is_same::type>::value, - "T must a nomal type"); + "T must a normal type"); static_assert(std::is_pod::value, "T must a pod type"); - if ((mPos + sizeof(value)) > mMaxLen) + if ((mPos + sizeof(value)) > mSize) { throw std::out_of_range("T size is to big"); } @@ -374,17 +410,18 @@ namespace brynet { namespace base { } protected: - const bool mBigEndian; - const size_t mMaxLen; + bool mBigEndian; + const size_t mSize; const char* mBuffer; size_t mPos; + size_t mSavedPos; }; template class AutoMallocPacket : public BasePacketWriter { public: - explicit AutoMallocPacket(bool useBigEndian = true, + explicit AutoMallocPacket(bool useBigEndian = false, bool isAutoMalloc = false) : BasePacketWriter(mData, SIZE, useBigEndian, isAutoMalloc) diff --git a/libs/brynet/net/PromiseReceive.hpp b/libs/brynet/net/PromiseReceive.hpp index b1c0211..2f372f3 100644 --- a/libs/brynet/net/PromiseReceive.hpp +++ b/libs/brynet/net/PromiseReceive.hpp @@ -144,9 +144,10 @@ namespace brynet { namespace net { std::shared_ptr setupPromiseReceive(const TcpConnection::Ptr& session) { auto promiseReceive = std::make_shared(); - session->setDataCallback([promiseReceive](const char* buffer, - size_t len) { - return promiseReceive->process(buffer, len); + session->setDataCallback([promiseReceive](brynet::base::BasePacketReader& reader) { + auto procLen = promiseReceive->process(reader.begin(), reader.size()); + reader.addPos(procLen); + reader.savePos(); }); return promiseReceive; diff --git a/libs/brynet/net/TcpConnection.hpp b/libs/brynet/net/TcpConnection.hpp index d8bd605..af9654a 100644 --- a/libs/brynet/net/TcpConnection.hpp +++ b/libs/brynet/net/TcpConnection.hpp @@ -7,12 +7,14 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include #include @@ -34,6 +36,53 @@ extern "C" { namespace brynet { namespace net { + class SendableMsg + { + public: + using Ptr = std::shared_ptr; + + virtual ~SendableMsg() = default; + + virtual const void * data() = 0; + virtual size_t size() = 0; + }; + + class StringSendMsg : public SendableMsg + { + public: + explicit StringSendMsg(const char* buffer, size_t len) + : + mMsg(buffer, len) + {} + + explicit StringSendMsg(std::string buffer) + : + mMsg(std::move(buffer)) + {} + + const void* data() override + { + return static_cast(mMsg.data()); + } + size_t size() override + { + return mMsg.size(); + } + + private: + std::string mMsg; + }; + + static SendableMsg::Ptr MakeStringMsg(const char* buffer, size_t len) + { + return std::make_shared(buffer, len); + } + + static SendableMsg::Ptr MakeStringMsg(std::string buffer) + { + return std::make_shared(std::move(buffer)); + } + class TcpConnection : public Channel, public brynet::base::NonCopyable, public std::enable_shared_from_this @@ -42,11 +91,10 @@ namespace brynet { namespace net { using Ptr = std::shared_ptr; using EnterCallback = std::function; - using DataCallback = std::function; + using DataCallback = std::function; using DisconnectedCallback = std::function; using PacketSendedCallback = std::function; - - using PacketPtr = std::shared_ptr; + using HighWaterCallback = std::function; public: Ptr static Create(TcpSocket::Ptr socket, @@ -104,9 +152,9 @@ namespace brynet { namespace net { #endif eventLoop->runAsyncFunctor([session]() - { - session->onEnterEventLoop(); - }); + { + session->onEnterEventLoop(); + }); return session; } @@ -116,56 +164,79 @@ namespace brynet { namespace net { } //TODO::如果所属EventLoop已经没有工作,则可能导致内存无限大,因为所投递的请求都没有得到处理 + void send( + const SendableMsg::Ptr& msg, + PacketSendedCallback&& callback = nullptr + ) + { + auto sharedThis = shared_from_this(); + mEventLoop->runAsyncFunctor([sharedThis, msg, callback]() mutable + { + if(sharedThis->mAlreadyClose) + { + return; + } + + const auto len = msg->size(); + sharedThis->mSendingMsgSize += len; + sharedThis->mSendList.emplace_back(PendingPacket{ + std::move(msg), + len, + std::move(callback) }); + sharedThis->runAfterFlush(); + + if(sharedThis->mSendingMsgSize > sharedThis->mHighWaterSize && + sharedThis->mHighWaterCallback != nullptr) + { + sharedThis->mHighWaterCallback(); + } + }); + } void send( const char* buffer, size_t len, PacketSendedCallback&& callback = nullptr) { - send(makePacket(buffer, len), std::move(callback)); + send(MakeStringMsg(buffer, len), std::move(callback)); } - template void send( - PacketType&& packet, - PacketSendedCallback&& - callback = nullptr) + std::string buffer, + PacketSendedCallback&& callback = nullptr) { - auto packetCapture = std::forward(packet); - auto callbackCapture = std::move(callback); - auto sharedThis = shared_from_this(); - mEventLoop->runAsyncFunctor( - [sharedThis, packetCapture, callbackCapture]() mutable { - const auto len = packetCapture->size(); - sharedThis->mSendList.emplace_back(PendingPacket{ - std::move(packetCapture), - len, - std::move(callbackCapture) }); - sharedThis->runAfterFlush(); - }); + send(MakeStringMsg(std::move(buffer)), std::move(callback)); } - void setDataCallback(DataCallback&& cb) + // setDataCallback(std::function) + template + void setDataCallback(Callback&& cb) { + verifyArgType(cb, &Callback::operator()); + auto sharedThis = shared_from_this(); - mEventLoop->runAsyncFunctor([sharedThis, cb]() mutable { - sharedThis->mDataCallback = std::move(cb); + mEventLoop->runAsyncFunctor([sharedThis, cb]() mutable + { + sharedThis->mDataCallback = cb; sharedThis->processRecvMessage(); - }); + }); } + void setDisConnectCallback(DisconnectedCallback&& cb) { auto sharedThis = shared_from_this(); - mEventLoop->runAsyncFunctor([sharedThis, cb]() mutable { + mEventLoop->runAsyncFunctor([sharedThis, cb]() mutable + { sharedThis->mDisConnectCallback = std::move(cb); - }); + }); } /* if checkTime is zero, will cancel check heartbeat */ void setHeartBeat(std::chrono::nanoseconds checkTime) { auto sharedThis = shared_from_this(); - mEventLoop->runAsyncFunctor([sharedThis, checkTime]() { + mEventLoop->runAsyncFunctor([sharedThis, checkTime]() + { if (sharedThis->mTimer.lock() != nullptr) { sharedThis->mTimer.lock()->cancel(); @@ -174,25 +245,50 @@ namespace brynet { namespace net { sharedThis->mCheckTime = checkTime; sharedThis->startPingCheckTimer(); + }); + } + + void setHighWaterCallback(HighWaterCallback cb, size_t size) + { + auto sharedThis = shared_from_this(); + mEventLoop->runAsyncFunctor([=]() mutable + { + sharedThis->mHighWaterCallback = std::move(cb); + sharedThis->mHighWaterSize = size; + }); + } + + void postShrinkReceiveBuffer() + { + auto sharedThis = shared_from_this(); + mEventLoop->runAsyncFunctor([=]() + { + mEventLoop->runFunctorAfterLoop([=]() + { + sharedThis->shrinkReceiveBuffer(); }); + }); } void postDisConnect() { auto sharedThis = shared_from_this(); - mEventLoop->runAsyncFunctor([sharedThis]() { + mEventLoop->runAsyncFunctor([sharedThis]() + { sharedThis->procCloseInLoop(); - }); + }); } void postShutdown() { auto sharedThis = shared_from_this(); - mEventLoop->runAsyncFunctor([sharedThis]() { - sharedThis->mEventLoop->runFunctorAfterLoop([sharedThis]() { + mEventLoop->runAsyncFunctor([sharedThis]() + { + sharedThis->mEventLoop->runFunctorAfterLoop([sharedThis]() + { sharedThis->procShutdownInLoop(); - }); }); + }); } const std::string& getIP() const @@ -200,11 +296,6 @@ namespace brynet { namespace net { return mIP; } - static TcpConnection::PacketPtr makePacket(const char* buffer, size_t len) - { - return std::make_shared(buffer, len); - } - protected: TcpConnection(TcpSocket::Ptr socket, size_t maxRecvBufferSize, @@ -221,7 +312,9 @@ namespace brynet { namespace net { mEventLoop(std::move(eventLoop)), mAlreadyClose(false), mMaxRecvBufferSize(maxRecvBufferSize), - mEnterCallback(std::move(enterCallback)) + mSendingMsgSize(0), + mEnterCallback(std::move(enterCallback)), + mHighWaterSize(0) { mRecvData = false; mCheckTime = std::chrono::steady_clock::duration::zero(); @@ -292,6 +385,29 @@ namespace brynet { namespace net { } } + void shrinkReceiveBuffer() + { + auto newSize = buffer_getreadvalidcount(mRecvBuffer.get()); + if (newSize == 0) + { + newSize = std::min(16 * 1024, mMaxRecvBufferSize); + } + if(newSize == buffer_getsize(mRecvBuffer.get())) + { + return; + } + + std::unique_ptr + newBuffer(brynet::base::buffer_new(newSize)); + buffer_write(newBuffer.get(), + buffer_getreadptr(mRecvBuffer.get()), + buffer_getreadvalidcount(mRecvBuffer.get())); + + mRecvBuffer = std::move(newBuffer); + mCurrentTanhXDiff = 0; + mRecvBuffOriginSize = newSize; + } + /* must called in network thread */ bool onEnterEventLoop() { @@ -548,6 +664,12 @@ namespace brynet { namespace net { buffer_adjustto_head(mRecvBuffer.get()); } + if (buffer_getreadvalidcount(mRecvBuffer.get()) + == buffer_getsize(mRecvBuffer.get())) + { + growRecvBuffer(); + } + const auto tryRecvLen = buffer_getwritevalidcount(mRecvBuffer.get()); if (tryRecvLen == 0) { @@ -612,11 +734,6 @@ namespace brynet { namespace net { mRecvData = true; buffer_addwritepos(mRecvBuffer.get(), static_cast(retlen)); - if (buffer_getreadvalidcount(mRecvBuffer.get()) - == buffer_getsize(mRecvBuffer.get())) - { - growRecvBuffer(); - } if (notInSSL && retlen < static_cast(tryRecvLen)) { @@ -662,7 +779,7 @@ namespace brynet { namespace net { static const int SENDBUF_SIZE = 1024 * 32; if (threadLocalSendBuf == nullptr) { - threadLocalSendBuf = (char*)malloc(SENDBUF_SIZE); + threadLocalSendBuf = static_cast(malloc(SENDBUF_SIZE)); } #ifdef BRYNET_USE_OPENSSL @@ -675,13 +792,13 @@ namespace brynet { namespace net { while (!mSendList.empty() && mCanWrite) { - char* sendptr = threadLocalSendBuf; + auto sendptr = threadLocalSendBuf; size_t wait_send_size = 0; for (auto it = mSendList.begin(); it != mSendList.end(); ++it) { auto& packet = *it; - auto packetLeftBuf = (char*)(packet.data->c_str() + (packet.data->size() - packet.left)); + auto packetLeftBuf = (char*)(packet.data->data()) + packet.data->size() - packet.left; const auto packetLeftLen = packet.left; if ((wait_send_size + packetLeftLen) > SENDBUF_SIZE) @@ -694,7 +811,7 @@ namespace brynet { namespace net { break; } - memcpy(sendptr + wait_send_size, packetLeftBuf, packetLeftLen); + memcpy(static_cast(sendptr + wait_send_size), static_cast(packetLeftBuf), packetLeftLen); wait_send_size += packetLeftLen; } @@ -759,6 +876,7 @@ namespace brynet { namespace net { { (packet.mCompleteCallback)(); } + mSendingMsgSize -= packet.data->size(); it = mSendList.erase(it); } @@ -791,7 +909,7 @@ namespace brynet { namespace net { size_t ready_send_len = 0; for (const auto& p : mSendList) { - iov[num].iov_base = (void*)(p.data->c_str() + (p.data->size() - p.left)); + iov[num].iov_base = (void*)(static_cast(p.data->data()) + p.data->size() - p.left); iov[num].iov_len = p.left; ready_send_len += p.left; @@ -837,6 +955,7 @@ namespace brynet { namespace net { { b.mCompleteCallback(); } + mSendingMsgSize -= b.data->size(); it = mSendList.erase(it); } @@ -889,9 +1008,11 @@ namespace brynet { namespace net { }); mCanWrite = false; - mDisConnectCallback = nullptr; mDataCallback = nullptr; + mDisConnectCallback = nullptr; + mHighWaterCallback = nullptr; mRecvBuffer = nullptr; + mSendList.clear(); } void procCloseInLoop() @@ -1064,16 +1185,25 @@ namespace brynet { namespace net { { if (mDataCallback != nullptr && buffer_getreadvalidcount(mRecvBuffer.get()) > 0) { - const auto proclen = mDataCallback(buffer_getreadptr(mRecvBuffer.get()), - buffer_getreadvalidcount(mRecvBuffer.get())); - assert(proclen <= buffer_getreadvalidcount(mRecvBuffer.get())); - if (proclen <= buffer_getreadvalidcount(mRecvBuffer.get())) + auto reader = brynet::base::BasePacketReader(buffer_getreadptr(mRecvBuffer.get()), + buffer_getreadvalidcount(mRecvBuffer.get()), false); + mDataCallback(reader); + const auto consumedLen = reader.savedPos(); + assert(consumedLen <= reader.size()); + if (consumedLen <= reader.size()) { - buffer_addreadpos(mRecvBuffer.get(), proclen); + buffer_addreadpos(mRecvBuffer.get(), consumedLen); } } } + template + static void verifyArgType(const CallbackType&, void(CallbackType::*)(Arg&) const) + { + static_assert(std::is_reference::value, "arg must be reference type"); + static_assert(!std::is_const::type>::value, "arg can't be const type"); + } + private: #ifdef BRYNET_PLATFORM_WINDOWS @@ -1105,17 +1235,20 @@ namespace brynet { namespace net { struct PendingPacket { - PacketPtr data; + SendableMsg::Ptr data; size_t left; PacketSendedCallback mCompleteCallback; }; using PacketListType = std::deque; PacketListType mSendList; + size_t mSendingMsgSize; EnterCallback mEnterCallback; DataCallback mDataCallback; DisconnectedCallback mDisConnectCallback; + HighWaterCallback mHighWaterCallback; + size_t mHighWaterSize; bool mIsPostFlush; diff --git a/libs/brynet/net/http/HttpService.hpp b/libs/brynet/net/http/HttpService.hpp index a7b46ab..965f003 100644 --- a/libs/brynet/net/http/HttpService.hpp +++ b/libs/brynet/net/http/HttpService.hpp @@ -32,7 +32,7 @@ namespace brynet { namespace net { namespace http { void send(PacketType&& packet, TcpConnection::PacketSendedCallback&& callback = nullptr) { - mSession->send(std::forward(packet), + mSession->send(std::forward(packet), std::move(callback)); } void send(const char* packet, @@ -196,25 +196,26 @@ namespace brynet { namespace net { namespace http { auto httpParser = std::make_shared(HTTP_BOTH); session->setDataCallback([httpSession, httpParser]( - const char* buffer, size_t len) { - size_t retlen = 0; + brynet::base::BasePacketReader& reader) { + size_t retLen = 0; if (httpParser->isWebSocket()) { - retlen = HttpService::ProcessWebSocket(buffer, - len, - httpParser, - httpSession); + retLen = HttpService::ProcessWebSocket( reader.begin(), + reader.size(), + httpParser, + httpSession); } else { - retlen = HttpService::ProcessHttp(buffer, - len, - httpParser, - httpSession); + retLen = HttpService::ProcessHttp( reader.begin(), + reader.size(), + httpParser, + httpSession); } - return retlen; + reader.addPos(retLen); + reader.savePos(); }); }