Rename loop_once int the mqtt module to run_async. Also make the event loop run in a thread.

This commit is contained in:
Relintai 2021-05-01 17:50:58 +02:00
parent 6517a70fd7
commit 09e13d255c
2 changed files with 31 additions and 13 deletions

View File

@ -14,9 +14,9 @@ void MQTTServer::initialize() {
sin.sin_port = htons(port); sin.sin_port = htons(port);
listener = evconnlistener_new_bind(evloop, MQTTServer::listener_cb, listener = evconnlistener_new_bind(evloop, MQTTServer::listener_cb,
(void *)this, (void *)this,
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1, LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1,
(struct sockaddr *)&sin, sizeof(sin)); (struct sockaddr *)&sin, sizeof(sin));
if (!listener) { if (!listener) {
std::cerr << "Could not create listener!\n"; std::cerr << "Could not create listener!\n";
@ -40,27 +40,42 @@ void MQTTServer::listener_cb(struct evconnlistener *listener, evutil_socket_t fd
server->session_manager->accept_connection(bev); server->session_manager->accept_connection(bev);
} }
void MQTTServer::loop_once() { void MQTTServer::run_async() {
event_base_dispatch(evloop); if (_thread) {
printf("MQTTServer::run_async Error! A thread is already runnig!\n");
return;
}
_thread = new std::thread([this]() { event_base_dispatch(this->evloop); });
} }
MQTTServer::MQTTServer() { MQTTServer::MQTTServer() {
bind_address = "0"; bind_address = "0";
port = 1883; port = 1883;
_thread = nullptr;
session_manager = new SessionManager(); session_manager = new SessionManager();
evloop = nullptr; evloop = nullptr;
listener = nullptr; listener = nullptr;
} }
MQTTServer::~MQTTServer() { MQTTServer::~MQTTServer() {
if (event_base_loopexit(evloop, NULL)) { //this first, as evloop runs in _thread
std::cerr << "failed to exit event loop\n"; if (evloop && event_base_loopexit(evloop, NULL)) {
} std::cout << "failed to exit event loop\n";
}
evconnlistener_free(listener); if (_thread) {
event_base_free(evloop); _thread->join();
delete _thread;
}
delete session_manager; if (listener)
evconnlistener_free(listener);
if (evloop)
event_base_free(evloop);
delete session_manager;
} }

View File

@ -4,6 +4,7 @@
#include <cstdio> #include <cstdio>
#include <string> #include <string>
#include <vector> #include <vector>
#include <thread>
#include "./mqtt_broker/src/broker_session.h" #include "./mqtt_broker/src/broker_session.h"
#include "./mqtt_broker/src/session_manager.h" #include "./mqtt_broker/src/session_manager.h"
@ -31,11 +32,13 @@ public:
void on_error() { printf("on_error\n"); } void on_error() { printf("on_error\n"); }
void initialize(); void initialize();
void loop_once(); void run_async();
MQTTServer(); MQTTServer();
~MQTTServer(); ~MQTTServer();
std::thread *_thread;
SessionManager *session_manager; SessionManager *session_manager;
std::string bind_address; std::string bind_address;
uint16_t port; uint16_t port;