From 09e13d255cee057e788b2acceec896a2e86df615 Mon Sep 17 00:00:00 2001 From: Relintai Date: Sat, 1 May 2021 17:50:58 +0200 Subject: [PATCH] Rename loop_once int the mqtt module to run_async. Also make the event loop run in a thread. --- modules/mqtt_server/mqtt_server.cpp | 39 ++++++++++++++++++++--------- modules/mqtt_server/mqtt_server.h | 5 +++- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/modules/mqtt_server/mqtt_server.cpp b/modules/mqtt_server/mqtt_server.cpp index 32faf9e..7ddeea4 100644 --- a/modules/mqtt_server/mqtt_server.cpp +++ b/modules/mqtt_server/mqtt_server.cpp @@ -14,9 +14,9 @@ void MQTTServer::initialize() { sin.sin_port = htons(port); listener = evconnlistener_new_bind(evloop, MQTTServer::listener_cb, - (void *)this, - LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1, - (struct sockaddr *)&sin, sizeof(sin)); + (void *)this, + LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1, + (struct sockaddr *)&sin, sizeof(sin)); if (!listener) { std::cerr << "Could not create listener!\n"; @@ -40,27 +40,42 @@ void MQTTServer::listener_cb(struct evconnlistener *listener, evutil_socket_t fd server->session_manager->accept_connection(bev); } -void MQTTServer::loop_once() { - event_base_dispatch(evloop); +void MQTTServer::run_async() { + if (_thread) { + printf("MQTTServer::run_async Error! A thread is already runnig!\n"); + return; + } + + _thread = new std::thread([this]() { event_base_dispatch(this->evloop); }); } MQTTServer::MQTTServer() { bind_address = "0"; port = 1883; + _thread = nullptr; - session_manager = new SessionManager(); + session_manager = new SessionManager(); evloop = nullptr; listener = nullptr; } MQTTServer::~MQTTServer() { - if (event_base_loopexit(evloop, NULL)) { - std::cerr << "failed to exit event loop\n"; - } + //this first, as evloop runs in _thread + if (evloop && event_base_loopexit(evloop, NULL)) { + std::cout << "failed to exit event loop\n"; + } - evconnlistener_free(listener); - event_base_free(evloop); + if (_thread) { + _thread->join(); + delete _thread; + } - delete session_manager; + if (listener) + evconnlistener_free(listener); + + if (evloop) + event_base_free(evloop); + + delete session_manager; } diff --git a/modules/mqtt_server/mqtt_server.h b/modules/mqtt_server/mqtt_server.h index 5db7c6a..6e6715e 100644 --- a/modules/mqtt_server/mqtt_server.h +++ b/modules/mqtt_server/mqtt_server.h @@ -4,6 +4,7 @@ #include #include #include +#include #include "./mqtt_broker/src/broker_session.h" #include "./mqtt_broker/src/session_manager.h" @@ -31,11 +32,13 @@ public: void on_error() { printf("on_error\n"); } void initialize(); - void loop_once(); + void run_async(); MQTTServer(); ~MQTTServer(); + std::thread *_thread; + SessionManager *session_manager; std::string bind_address; uint16_t port;