Updated brynet to 1.0.9.

This commit is contained in:
Relintai 2021-01-29 17:26:17 +01:00
parent 4b64bf5864
commit bf688577fb
7 changed files with 268 additions and 96 deletions

View File

@ -29,11 +29,11 @@ void HTTPServer::wsEnterCallbackDefault(const HttpSession::Ptr &httpSession, Web
std::cout << "frame enter of type:" << int(opcode) << std::endl;
std::cout << "payload is:" << payload << std::endl;
// echo frame
auto frame = std::make_shared<std::string>();
//auto frame = std::make_shared<std::string>();
WebSocketFormat::wsFrameBuild(payload.c_str(), payload.size(), *frame, WebSocketFormat::WebSocketFrameType::TEXT_FRAME, true, false);
//WebSocketFormat::wsFrameBuild(payload.c_str(), payload.size(), *frame, WebSocketFormat::WebSocketFrameType::TEXT_FRAME, true, false);
httpSession->send(frame);
//httpSession->send(frame);
}
void HTTPServer::configure() {

View File

@ -1,3 +1,3 @@
#pragma once
#define BRYNET_VERSION 1008000
#define BRYNET_VERSION 1009000

View File

@ -11,6 +11,6 @@
#endif
#if (__cplusplus >= 201703L || \
(defined(_MSC_VER) && _MSC_VER >= 1910))
(defined(_MSVC_LANG) && _MSVC_LANG >= 201703L))
#define BRYNET_HAVE_LANG_CXX17 1
#endif

View File

@ -16,7 +16,7 @@ namespace brynet { namespace base {
public:
BasePacketWriter(char* buffer,
size_t len,
bool useBigEndian = true,
bool useBigEndian = false,
bool isAutoMalloc = false)
:
mIsAutoMalloc(isAutoMalloc),
@ -248,49 +248,85 @@ namespace brynet { namespace base {
public:
BasePacketReader(const char* buffer,
size_t len,
bool useBigEndian = true) :
mBigEndian(useBigEndian),
mMaxLen(len)
bool useBigEndian = false) :
mBigEndian(useBigEndian),
mSize(len)
{
mPos = 0;
mSavedPos = 0;
mBuffer = buffer;
}
virtual ~BasePacketReader() = default;
void useBigEndian()
{
mBigEndian = true;
}
void useLittleEndian()
{
mBigEndian = false;
}
void savePos()
{
mSavedPos = mPos;
}
size_t savedPos() const
{
return mSavedPos;
}
size_t getLeft() const
{
if (mPos > mMaxLen)
if (mPos > mSize)
{
throw std::out_of_range("current pos is greater than max len");
}
return mMaxLen - mPos;
return mSize - mPos;
}
const char* getBuffer() const
bool enough(size_t len) const
{
if (mPos > mSize)
{
return false;
}
return (mSize - mPos) >= len;
}
const char* begin() const
{
return mBuffer;
}
void skipAll()
const char* currentBuffer() const
{
mPos = mMaxLen;
return mBuffer+mPos;
}
size_t getPos() const
void consumeAll()
{
mPos = mSize;
savePos();
}
size_t currentPos() const
{
return mPos;
}
size_t getMaxPos() const
size_t size() const
{
return mMaxLen;
return mSize;
}
void addPos(size_t diff)
{
const auto tmpPos = mPos + diff;
if (tmpPos > mMaxLen)
if (tmpPos > mSize)
{
throw std::out_of_range("diff is to big");
}
@ -360,11 +396,11 @@ namespace brynet { namespace base {
void read(T& value)
{
static_assert(std::is_same<T, typename std::remove_pointer<T>::type>::value,
"T must a nomal type");
"T must a normal type");
static_assert(std::is_pod<T>::value,
"T must a pod type");
if ((mPos + sizeof(value)) > mMaxLen)
if ((mPos + sizeof(value)) > mSize)
{
throw std::out_of_range("T size is to big");
}
@ -374,17 +410,18 @@ namespace brynet { namespace base {
}
protected:
const bool mBigEndian;
const size_t mMaxLen;
bool mBigEndian;
const size_t mSize;
const char* mBuffer;
size_t mPos;
size_t mSavedPos;
};
template<size_t SIZE>
class AutoMallocPacket : public BasePacketWriter
{
public:
explicit AutoMallocPacket(bool useBigEndian = true,
explicit AutoMallocPacket(bool useBigEndian = false,
bool isAutoMalloc = false)
:
BasePacketWriter(mData, SIZE, useBigEndian, isAutoMalloc)

View File

@ -144,9 +144,10 @@ namespace brynet { namespace net {
std::shared_ptr<PromiseReceive> setupPromiseReceive(const TcpConnection::Ptr& session)
{
auto promiseReceive = std::make_shared<PromiseReceive>();
session->setDataCallback([promiseReceive](const char* buffer,
size_t len) {
return promiseReceive->process(buffer, len);
session->setDataCallback([promiseReceive](brynet::base::BasePacketReader& reader) {
auto procLen = promiseReceive->process(reader.begin(), reader.size());
reader.addPos(procLen);
reader.savePos();
});
return promiseReceive;

View File

@ -7,12 +7,14 @@
#include <cassert>
#include <cstring>
#include <cmath>
#include <type_traits>
#include <brynet/base/Buffer.hpp>
#include <brynet/base/Timer.hpp>
#include <brynet/base/NonCopyable.hpp>
#include <brynet/base/Any.hpp>
#include <brynet/base/Noexcept.hpp>
#include <brynet/base/Packet.hpp>
#include <brynet/net/SocketLibFunction.hpp>
#include <brynet/net/Channel.hpp>
#include <brynet/net/EventLoop.hpp>
@ -34,6 +36,53 @@ extern "C" {
namespace brynet { namespace net {
class SendableMsg
{
public:
using Ptr = std::shared_ptr<SendableMsg>;
virtual ~SendableMsg() = default;
virtual const void * data() = 0;
virtual size_t size() = 0;
};
class StringSendMsg : public SendableMsg
{
public:
explicit StringSendMsg(const char* buffer, size_t len)
:
mMsg(buffer, len)
{}
explicit StringSendMsg(std::string buffer)
:
mMsg(std::move(buffer))
{}
const void* data() override
{
return static_cast<const void*>(mMsg.data());
}
size_t size() override
{
return mMsg.size();
}
private:
std::string mMsg;
};
static SendableMsg::Ptr MakeStringMsg(const char* buffer, size_t len)
{
return std::make_shared<StringSendMsg>(buffer, len);
}
static SendableMsg::Ptr MakeStringMsg(std::string buffer)
{
return std::make_shared<StringSendMsg>(std::move(buffer));
}
class TcpConnection : public Channel,
public brynet::base::NonCopyable,
public std::enable_shared_from_this<TcpConnection>
@ -42,11 +91,10 @@ namespace brynet { namespace net {
using Ptr = std::shared_ptr<TcpConnection>;
using EnterCallback = std::function<void(Ptr)>;
using DataCallback = std::function<size_t(const char* buffer, size_t len)>;
using DataCallback = std::function<void(brynet::base::BasePacketReader&)>;
using DisconnectedCallback = std::function<void(Ptr)>;
using PacketSendedCallback = std::function<void(void)>;
using PacketPtr = std::shared_ptr<std::string>;
using HighWaterCallback = std::function<void(void)>;
public:
Ptr static Create(TcpSocket::Ptr socket,
@ -104,9 +152,9 @@ namespace brynet { namespace net {
#endif
eventLoop->runAsyncFunctor([session]()
{
session->onEnterEventLoop();
});
{
session->onEnterEventLoop();
});
return session;
}
@ -116,56 +164,79 @@ namespace brynet { namespace net {
}
//TODO::如果所属EventLoop已经没有工作则可能导致内存无限大因为所投递的请求都没有得到处理
void send(
const SendableMsg::Ptr& msg,
PacketSendedCallback&& callback = nullptr
)
{
auto sharedThis = shared_from_this();
mEventLoop->runAsyncFunctor([sharedThis, msg, callback]() mutable
{
if(sharedThis->mAlreadyClose)
{
return;
}
const auto len = msg->size();
sharedThis->mSendingMsgSize += len;
sharedThis->mSendList.emplace_back(PendingPacket{
std::move(msg),
len,
std::move(callback) });
sharedThis->runAfterFlush();
if(sharedThis->mSendingMsgSize > sharedThis->mHighWaterSize &&
sharedThis->mHighWaterCallback != nullptr)
{
sharedThis->mHighWaterCallback();
}
});
}
void send(
const char* buffer,
size_t len,
PacketSendedCallback&& callback = nullptr)
{
send(makePacket(buffer, len), std::move(callback));
send(MakeStringMsg(buffer, len), std::move(callback));
}
template<typename PacketType>
void send(
PacketType&& packet,
PacketSendedCallback&&
callback = nullptr)
std::string buffer,
PacketSendedCallback&& callback = nullptr)
{
auto packetCapture = std::forward<PacketType>(packet);
auto callbackCapture = std::move(callback);
auto sharedThis = shared_from_this();
mEventLoop->runAsyncFunctor(
[sharedThis, packetCapture, callbackCapture]() mutable {
const auto len = packetCapture->size();
sharedThis->mSendList.emplace_back(PendingPacket{
std::move(packetCapture),
len,
std::move(callbackCapture) });
sharedThis->runAfterFlush();
});
send(MakeStringMsg(std::move(buffer)), std::move(callback));
}
void setDataCallback(DataCallback&& cb)
// setDataCallback(std::function<void(brynet::base::BasePacketReader&)>
template<typename Callback>
void setDataCallback(Callback&& cb)
{
verifyArgType(cb, &Callback::operator());
auto sharedThis = shared_from_this();
mEventLoop->runAsyncFunctor([sharedThis, cb]() mutable {
sharedThis->mDataCallback = std::move(cb);
mEventLoop->runAsyncFunctor([sharedThis, cb]() mutable
{
sharedThis->mDataCallback = cb;
sharedThis->processRecvMessage();
});
});
}
void setDisConnectCallback(DisconnectedCallback&& cb)
{
auto sharedThis = shared_from_this();
mEventLoop->runAsyncFunctor([sharedThis, cb]() mutable {
mEventLoop->runAsyncFunctor([sharedThis, cb]() mutable
{
sharedThis->mDisConnectCallback = std::move(cb);
});
});
}
/* if checkTime is zero, will cancel check heartbeat */
void setHeartBeat(std::chrono::nanoseconds checkTime)
{
auto sharedThis = shared_from_this();
mEventLoop->runAsyncFunctor([sharedThis, checkTime]() {
mEventLoop->runAsyncFunctor([sharedThis, checkTime]()
{
if (sharedThis->mTimer.lock() != nullptr)
{
sharedThis->mTimer.lock()->cancel();
@ -174,25 +245,50 @@ namespace brynet { namespace net {
sharedThis->mCheckTime = checkTime;
sharedThis->startPingCheckTimer();
});
}
void setHighWaterCallback(HighWaterCallback cb, size_t size)
{
auto sharedThis = shared_from_this();
mEventLoop->runAsyncFunctor([=]() mutable
{
sharedThis->mHighWaterCallback = std::move(cb);
sharedThis->mHighWaterSize = size;
});
}
void postShrinkReceiveBuffer()
{
auto sharedThis = shared_from_this();
mEventLoop->runAsyncFunctor([=]()
{
mEventLoop->runFunctorAfterLoop([=]()
{
sharedThis->shrinkReceiveBuffer();
});
});
}
void postDisConnect()
{
auto sharedThis = shared_from_this();
mEventLoop->runAsyncFunctor([sharedThis]() {
mEventLoop->runAsyncFunctor([sharedThis]()
{
sharedThis->procCloseInLoop();
});
});
}
void postShutdown()
{
auto sharedThis = shared_from_this();
mEventLoop->runAsyncFunctor([sharedThis]() {
sharedThis->mEventLoop->runFunctorAfterLoop([sharedThis]() {
mEventLoop->runAsyncFunctor([sharedThis]()
{
sharedThis->mEventLoop->runFunctorAfterLoop([sharedThis]()
{
sharedThis->procShutdownInLoop();
});
});
});
}
const std::string& getIP() const
@ -200,11 +296,6 @@ namespace brynet { namespace net {
return mIP;
}
static TcpConnection::PacketPtr makePacket(const char* buffer, size_t len)
{
return std::make_shared<std::string>(buffer, len);
}
protected:
TcpConnection(TcpSocket::Ptr socket,
size_t maxRecvBufferSize,
@ -221,7 +312,9 @@ namespace brynet { namespace net {
mEventLoop(std::move(eventLoop)),
mAlreadyClose(false),
mMaxRecvBufferSize(maxRecvBufferSize),
mEnterCallback(std::move(enterCallback))
mSendingMsgSize(0),
mEnterCallback(std::move(enterCallback)),
mHighWaterSize(0)
{
mRecvData = false;
mCheckTime = std::chrono::steady_clock::duration::zero();
@ -292,6 +385,29 @@ namespace brynet { namespace net {
}
}
void shrinkReceiveBuffer()
{
auto newSize = buffer_getreadvalidcount(mRecvBuffer.get());
if (newSize == 0)
{
newSize = std::min<size_t>(16 * 1024, mMaxRecvBufferSize);
}
if(newSize == buffer_getsize(mRecvBuffer.get()))
{
return;
}
std::unique_ptr<struct brynet::base::buffer_s, BufferDeleter>
newBuffer(brynet::base::buffer_new(newSize));
buffer_write(newBuffer.get(),
buffer_getreadptr(mRecvBuffer.get()),
buffer_getreadvalidcount(mRecvBuffer.get()));
mRecvBuffer = std::move(newBuffer);
mCurrentTanhXDiff = 0;
mRecvBuffOriginSize = newSize;
}
/* must called in network thread */
bool onEnterEventLoop()
{
@ -548,6 +664,12 @@ namespace brynet { namespace net {
buffer_adjustto_head(mRecvBuffer.get());
}
if (buffer_getreadvalidcount(mRecvBuffer.get())
== buffer_getsize(mRecvBuffer.get()))
{
growRecvBuffer();
}
const auto tryRecvLen = buffer_getwritevalidcount(mRecvBuffer.get());
if (tryRecvLen == 0)
{
@ -612,11 +734,6 @@ namespace brynet { namespace net {
mRecvData = true;
buffer_addwritepos(mRecvBuffer.get(), static_cast<size_t>(retlen));
if (buffer_getreadvalidcount(mRecvBuffer.get())
== buffer_getsize(mRecvBuffer.get()))
{
growRecvBuffer();
}
if (notInSSL && retlen < static_cast<int>(tryRecvLen))
{
@ -662,7 +779,7 @@ namespace brynet { namespace net {
static const int SENDBUF_SIZE = 1024 * 32;
if (threadLocalSendBuf == nullptr)
{
threadLocalSendBuf = (char*)malloc(SENDBUF_SIZE);
threadLocalSendBuf = static_cast<char*>(malloc(SENDBUF_SIZE));
}
#ifdef BRYNET_USE_OPENSSL
@ -675,13 +792,13 @@ namespace brynet { namespace net {
while (!mSendList.empty() && mCanWrite)
{
char* sendptr = threadLocalSendBuf;
auto sendptr = threadLocalSendBuf;
size_t wait_send_size = 0;
for (auto it = mSendList.begin(); it != mSendList.end(); ++it)
{
auto& packet = *it;
auto packetLeftBuf = (char*)(packet.data->c_str() + (packet.data->size() - packet.left));
auto packetLeftBuf = (char*)(packet.data->data()) + packet.data->size() - packet.left;
const auto packetLeftLen = packet.left;
if ((wait_send_size + packetLeftLen) > SENDBUF_SIZE)
@ -694,7 +811,7 @@ namespace brynet { namespace net {
break;
}
memcpy(sendptr + wait_send_size, packetLeftBuf, packetLeftLen);
memcpy(static_cast<void*>(sendptr + wait_send_size), static_cast<void*>(packetLeftBuf), packetLeftLen);
wait_send_size += packetLeftLen;
}
@ -759,6 +876,7 @@ namespace brynet { namespace net {
{
(packet.mCompleteCallback)();
}
mSendingMsgSize -= packet.data->size();
it = mSendList.erase(it);
}
@ -791,7 +909,7 @@ namespace brynet { namespace net {
size_t ready_send_len = 0;
for (const auto& p : mSendList)
{
iov[num].iov_base = (void*)(p.data->c_str() + (p.data->size() - p.left));
iov[num].iov_base = (void*)(static_cast<const char*>(p.data->data()) + p.data->size() - p.left);
iov[num].iov_len = p.left;
ready_send_len += p.left;
@ -837,6 +955,7 @@ namespace brynet { namespace net {
{
b.mCompleteCallback();
}
mSendingMsgSize -= b.data->size();
it = mSendList.erase(it);
}
@ -889,9 +1008,11 @@ namespace brynet { namespace net {
});
mCanWrite = false;
mDisConnectCallback = nullptr;
mDataCallback = nullptr;
mDisConnectCallback = nullptr;
mHighWaterCallback = nullptr;
mRecvBuffer = nullptr;
mSendList.clear();
}
void procCloseInLoop()
@ -1064,16 +1185,25 @@ namespace brynet { namespace net {
{
if (mDataCallback != nullptr && buffer_getreadvalidcount(mRecvBuffer.get()) > 0)
{
const auto proclen = mDataCallback(buffer_getreadptr(mRecvBuffer.get()),
buffer_getreadvalidcount(mRecvBuffer.get()));
assert(proclen <= buffer_getreadvalidcount(mRecvBuffer.get()));
if (proclen <= buffer_getreadvalidcount(mRecvBuffer.get()))
auto reader = brynet::base::BasePacketReader(buffer_getreadptr(mRecvBuffer.get()),
buffer_getreadvalidcount(mRecvBuffer.get()), false);
mDataCallback(reader);
const auto consumedLen = reader.savedPos();
assert(consumedLen <= reader.size());
if (consumedLen <= reader.size())
{
buffer_addreadpos(mRecvBuffer.get(), proclen);
buffer_addreadpos(mRecvBuffer.get(), consumedLen);
}
}
}
template<typename CallbackType, typename Arg>
static void verifyArgType(const CallbackType&, void(CallbackType::*)(Arg&) const)
{
static_assert(std::is_reference<Arg&>::value, "arg must be reference type");
static_assert(!std::is_const<typename std::remove_const<Arg>::type>::value, "arg can't be const type");
}
private:
#ifdef BRYNET_PLATFORM_WINDOWS
@ -1105,17 +1235,20 @@ namespace brynet { namespace net {
struct PendingPacket
{
PacketPtr data;
SendableMsg::Ptr data;
size_t left;
PacketSendedCallback mCompleteCallback;
};
using PacketListType = std::deque<PendingPacket>;
PacketListType mSendList;
size_t mSendingMsgSize;
EnterCallback mEnterCallback;
DataCallback mDataCallback;
DisconnectedCallback mDisConnectCallback;
HighWaterCallback mHighWaterCallback;
size_t mHighWaterSize;
bool mIsPostFlush;

View File

@ -32,7 +32,7 @@ namespace brynet { namespace net { namespace http {
void send(PacketType&& packet,
TcpConnection::PacketSendedCallback&& callback = nullptr)
{
mSession->send(std::forward<TcpConnection::PacketPtr>(packet),
mSession->send(std::forward<PacketType&&>(packet),
std::move(callback));
}
void send(const char* packet,
@ -196,25 +196,26 @@ namespace brynet { namespace net { namespace http {
auto httpParser = std::make_shared<HTTPParser>(HTTP_BOTH);
session->setDataCallback([httpSession, httpParser](
const char* buffer, size_t len) {
size_t retlen = 0;
brynet::base::BasePacketReader& reader) {
size_t retLen = 0;
if (httpParser->isWebSocket())
{
retlen = HttpService::ProcessWebSocket(buffer,
len,
httpParser,
httpSession);
retLen = HttpService::ProcessWebSocket( reader.begin(),
reader.size(),
httpParser,
httpSession);
}
else
{
retlen = HttpService::ProcessHttp(buffer,
len,
httpParser,
httpSession);
retLen = HttpService::ProcessHttp( reader.begin(),
reader.size(),
httpParser,
httpSession);
}
return retlen;
reader.addPos(retLen);
reader.savePos();
});
}