rcpp_framework/libs/brynet/net/wrapper/ConnectionBuilder.hpp

178 lines
4.9 KiB
C++
Raw Normal View History

2020-11-24 15:41:18 +01:00
#pragma once
#include <brynet/net/AsyncConnector.hpp>
#include <brynet/net/Exception.hpp>
2021-04-30 16:10:14 +02:00
#include <brynet/net/TcpService.hpp>
#include <future>
#include <utility>
2020-11-24 15:41:18 +01:00
2021-05-14 17:49:39 +02:00
using CompletedCallback = AsyncConnectAddr::CompletedCallback;
using ProcessTcpSocketCallback = AsyncConnectAddr::ProcessTcpSocketCallback;
using FailedCallback = AsyncConnectAddr::FailedCallback;
2021-04-30 16:10:14 +02:00
2021-05-14 17:16:45 +02:00
template <typename Derived>
class BaseSocketConnectBuilder {
2021-04-30 16:10:14 +02:00
public:
2021-05-14 17:16:45 +02:00
virtual ~BaseSocketConnectBuilder() = default;
Derived &WithConnector(AsyncConnector::Ptr connector) {
mConnector = std::move(connector);
return static_cast<Derived &>(*this);
}
Derived &WithAddr(std::string ip, size_t port) {
mConnectOption.ip = std::move(ip);
mConnectOption.port = port;
return static_cast<Derived &>(*this);
}
Derived &WithTimeout(std::chrono::nanoseconds timeout) {
mConnectOption.timeout = timeout;
return static_cast<Derived &>(*this);
}
Derived &AddSocketProcessCallback(const ProcessTcpSocketCallback &callback) {
mConnectOption.processCallbacks.push_back(callback);
return static_cast<Derived &>(*this);
}
Derived &WithCompletedCallback(CompletedCallback callback) {
mConnectOption.completedCallback = std::move(callback);
return static_cast<Derived &>(*this);
}
Derived &WithFailedCallback(FailedCallback callback) {
mConnectOption.failedCallback = std::move(callback);
return static_cast<Derived &>(*this);
}
void asyncConnect() const {
if (mConnector == nullptr) {
throw BrynetCommonException("connector is nullptr");
}
if (mConnectOption.ip.empty()) {
throw BrynetCommonException("address is empty");
}
mConnector->asyncConnect(mConnectOption);
}
TcpSocket::Ptr syncConnect() {
if (mConnectOption.completedCallback != nullptr || mConnectOption.failedCallback != nullptr) {
throw std::runtime_error("already setting completed callback or failed callback");
}
auto socketPromise = std::make_shared<std::promise<TcpSocket::Ptr> >();
mConnectOption.completedCallback = [socketPromise](TcpSocket::Ptr socket) {
socketPromise->set_value(std::move(socket));
};
mConnectOption.failedCallback = [socketPromise]() {
socketPromise->set_value(nullptr);
};
asyncConnect();
auto future = socketPromise->get_future();
if (future.wait_for(mConnectOption.timeout) != std::future_status::ready) {
return nullptr;
}
return future.get();
}
2021-04-30 16:10:14 +02:00
private:
2021-05-14 17:16:45 +02:00
AsyncConnector::Ptr mConnector;
ConnectOption mConnectOption;
2021-04-30 16:10:14 +02:00
};
2020-11-24 15:41:18 +01:00
2021-05-14 17:16:45 +02:00
class SocketConnectBuilder : public BaseSocketConnectBuilder<SocketConnectBuilder> {
2021-04-30 16:10:14 +02:00
};
2021-05-14 17:16:45 +02:00
template <typename Derived>
class BaseConnectionBuilder {
2021-04-30 16:10:14 +02:00
public:
2021-05-14 17:16:45 +02:00
Derived &WithService(TcpService::Ptr service) {
mTcpService = std::move(service);
return static_cast<Derived &>(*this);
}
Derived &WithConnector(AsyncConnector::Ptr connector) {
mConnectBuilder.WithConnector(std::move(connector));
return static_cast<Derived &>(*this);
}
Derived &WithAddr(std::string ip, size_t port) {
mConnectBuilder.WithAddr(std::move(ip), port);
return static_cast<Derived &>(*this);
}
Derived &WithTimeout(std::chrono::nanoseconds timeout) {
mConnectBuilder.WithTimeout(timeout);
return static_cast<Derived &>(*this);
}
Derived &AddSocketProcessCallback(const ProcessTcpSocketCallback &callback) {
mConnectBuilder.AddSocketProcessCallback(callback);
return static_cast<Derived &>(*this);
}
Derived &WithFailedCallback(FailedCallback callback) {
mConnectBuilder.WithFailedCallback(std::move(callback));
return static_cast<Derived &>(*this);
}
Derived &WithMaxRecvBufferSize(size_t size) {
mOption.maxRecvBufferSize = size;
return static_cast<Derived &>(*this);
}
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
#ifdef BRYNET_USE_OPENSSL
2021-05-14 17:16:45 +02:00
Derived &WithSSL() {
mOption.useSSL = true;
return static_cast<Derived &>(*this);
}
2021-04-30 16:10:14 +02:00
#endif
2021-05-14 17:16:45 +02:00
Derived &WithForceSameThreadLoop() {
mOption.forceSameThreadLoop = true;
return static_cast<Derived &>(*this);
}
Derived &AddEnterCallback(const TcpConnection::EnterCallback &callback) {
mOption.enterCallback.push_back(callback);
return static_cast<Derived &>(*this);
}
void asyncConnect() {
auto service = mTcpService;
auto option = mOption;
mConnectBuilder.WithCompletedCallback([service, option](TcpSocket::Ptr socket) mutable {
service->addTcpConnection(std::move(socket), option);
});
mConnectBuilder.asyncConnect();
}
TcpConnection::Ptr syncConnect() {
auto sessionPromise = std::make_shared<std::promise<TcpConnection::Ptr> >();
auto option = mOption;
option.enterCallback.push_back([sessionPromise](const TcpConnection::Ptr &session) {
sessionPromise->set_value(session);
});
auto socket = mConnectBuilder.syncConnect();
if (socket == nullptr || !mTcpService->addTcpConnection(std::move(socket), option)) {
return nullptr;
}
return sessionPromise->get_future().get();
}
2020-11-24 15:41:18 +01:00
2021-04-30 16:10:14 +02:00
private:
2021-05-14 17:16:45 +02:00
TcpService::Ptr mTcpService;
ConnectionOption mOption;
SocketConnectBuilder mConnectBuilder;
2021-04-30 16:10:14 +02:00
};
2020-11-24 15:41:18 +01:00
2021-05-14 17:16:45 +02:00
class ConnectionBuilder : public BaseConnectionBuilder<ConnectionBuilder> {
2021-04-30 16:10:14 +02:00
};