From d4142f8923b5c174bd6ba4d5f464eebedfacc476 Mon Sep 17 00:00:00 2001 From: Relintai Date: Sat, 1 May 2021 19:56:29 +0200 Subject: [PATCH] Implemented local mqtt dispatching. --- .../mqtt_broker/src/broker_session.cc | 1 + .../mqtt_broker/src/session_manager.cc | 58 +++++++++++-------- .../mqtt_broker/src/session_manager.h | 49 +++++++++++----- modules/mqtt_server/mqtt_server.cpp | 4 ++ modules/mqtt_server/mqtt_server.h | 10 +--- 5 files changed, 73 insertions(+), 49 deletions(-) diff --git a/modules/mqtt_server/mqtt_broker/src/broker_session.cc b/modules/mqtt_server/mqtt_broker/src/broker_session.cc index 436c191..72c5b5b 100644 --- a/modules/mqtt_server/mqtt_broker/src/broker_session.cc +++ b/modules/mqtt_server/mqtt_broker/src/broker_session.cc @@ -143,6 +143,7 @@ void BrokerSession::handle_publish(const PublishPacket &packet) { } + session_manager.handle_local_publish(client_id, packet); } void BrokerSession::handle_puback(const PubackPacket &packet) { diff --git a/modules/mqtt_server/mqtt_broker/src/session_manager.cc b/modules/mqtt_server/mqtt_broker/src/session_manager.cc index 372fca0..7344179 100644 --- a/modules/mqtt_server/mqtt_broker/src/session_manager.cc +++ b/modules/mqtt_server/mqtt_broker/src/session_manager.cc @@ -6,42 +6,52 @@ #include "broker_session.h" #include "topic.h" -#include #include +#include void SessionManager::accept_connection(struct bufferevent *bev) { - auto session = std::unique_ptr(new BrokerSession(bev, *this)); - sessions.push_back(std::move(session)); + auto session = std::unique_ptr(new BrokerSession(bev, *this)); + sessions.push_back(std::move(session)); } -std::list>::iterator SessionManager::find_session(const std::string &client_id) { +std::list >::iterator SessionManager::find_session(const std::string &client_id) { - return find_if(sessions.begin(), sessions.end(), [&client_id](const std::unique_ptr & s) { - return (!s->client_id.empty() and (s->client_id == client_id)); - }); + return find_if(sessions.begin(), sessions.end(), [&client_id](const std::unique_ptr &s) { + return (!s->client_id.empty() and (s->client_id == client_id)); + }); } void SessionManager::erase_session(const std::string &client_id) { - sessions.erase(std::remove_if(sessions.begin(), sessions.end(), [&client_id](std::unique_ptr &s) { - return (!s->client_id.empty() and (s->client_id == client_id)); - }), sessions.end()); + sessions.erase(std::remove_if(sessions.begin(), sessions.end(), [&client_id](std::unique_ptr &s) { + return (!s->client_id.empty() and (s->client_id == client_id)); + }), + sessions.end()); } -void SessionManager::erase_session(const BrokerSession *session) -{ - sessions.erase(std::remove_if(sessions.begin(), sessions.end(), [session](std::unique_ptr & s) { - return s.get() == session; - }), sessions.end()); - +void SessionManager::erase_session(const BrokerSession *session) { + sessions.erase(std::remove_if(sessions.begin(), sessions.end(), [session](std::unique_ptr &s) { + return s.get() == session; + }), + sessions.end()); } -void SessionManager::handle_publish(const PublishPacket & packet) { - for (auto &session : sessions) { - for (auto &subscription : session->subscriptions) { - if (topic_match(subscription.topic_filter, TopicName(packet.topic_name))) { - session->forward_packet(packet); - } - } - } +void SessionManager::handle_publish(const PublishPacket &packet) { + for (auto &session : sessions) { + for (auto &subscription : session->subscriptions) { + if (topic_match(subscription.topic_filter, TopicName(packet.topic_name))) { + session->forward_packet(packet); + } + } + } } + +void SessionManager::handle_local_publish(const std::string &client_id, const PublishPacket &packet) { + for (size_t i = 0; i < local_sessions.size(); ++i) { + LocalSession &l = local_sessions[i]; + + if (topic_match(l.filter, TopicName(packet.topic_name))) { + l.func(client_id, packet.message_data); + } + } +} \ No newline at end of file diff --git a/modules/mqtt_server/mqtt_broker/src/session_manager.h b/modules/mqtt_server/mqtt_broker/src/session_manager.h index c87853b..8ce2114 100644 --- a/modules/mqtt_server/mqtt_broker/src/session_manager.h +++ b/modules/mqtt_server/mqtt_broker/src/session_manager.h @@ -15,8 +15,9 @@ #pragma once #include -#include #include +#include +#include struct bufferevent; @@ -28,11 +29,9 @@ class PublishPacket; * * Composes a container of broker sessions and methods to manage them. */ -class SessionManager -{ +class SessionManager { public: - - /** + /** * Accept a new network connection. * * Creates a new BrokerSession instance and adds it to the container of sessions. The session instance will manage @@ -40,17 +39,17 @@ public: * * @param bev Pointer to a bufferevent */ - void accept_connection(struct bufferevent * bev); + void accept_connection(struct bufferevent *bev); - /** + /** * Find a session in the session container. * * @param client_id Unique client id to find. * @return Iterator to BrokerSession. */ - std::list>::iterator find_session(const std::string & client_id); + std::list >::iterator find_session(const std::string &client_id); - /** + /** * Delete a session * * Given a pointer to a BrokerSession, finds that session in the session container and removes it from the @@ -58,17 +57,17 @@ public: * * @param session Pointer to a BrokerSession; */ - void erase_session(const BrokerSession *session); + void erase_session(const BrokerSession *session); - /** + /** * Finds a session in the session container with the given client id. If found the session is removed from the * container. The session instance will be deleted. * * @param client_id A Client id. */ - void erase_session(const std::string &client_id); + void erase_session(const std::string &client_id); - /** + /** * Forward a message to subsribed clients. * * Searches through each session and their subscriptions and invokes the forward_packet method on each session @@ -77,9 +76,27 @@ public: * * @param publish_packet Reference to a PublishPacket; */ - void handle_publish(const PublishPacket & publish_packet); + void handle_publish(const PublishPacket &publish_packet); - /** Container of BrokerSessions. */ - std::list> sessions; + void handle_local_publish(const std::string &client_id, const PublishPacket &packet); + /** Container of BrokerSessions. */ + std::list > sessions; + +public: + void add_local_session(const std::string &filter, void (*func)(const std::string &client_id, const std::vector &data)) { + LocalSession l; + + l.filter = filter; + l.func = func; + + local_sessions.push_back(l); + } + + struct LocalSession { + std::string filter; + void (*func)(const std::string &client_id, const std::vector &data); + }; + + std::vector local_sessions; }; \ No newline at end of file diff --git a/modules/mqtt_server/mqtt_server.cpp b/modules/mqtt_server/mqtt_server.cpp index 7ddeea4..0072b9a 100644 --- a/modules/mqtt_server/mqtt_server.cpp +++ b/modules/mqtt_server/mqtt_server.cpp @@ -1,5 +1,9 @@ #include "mqtt_server.h" +void MQTTServer::add_local_session(const std::string &filter, void (*func)(const std::string &client_id, const std::vector &data)) { + session_manager->add_local_session(filter, func); +} + void MQTTServer::initialize() { evloop = event_base_new(); diff --git a/modules/mqtt_server/mqtt_server.h b/modules/mqtt_server/mqtt_server.h index 6e6715e..57cacfd 100644 --- a/modules/mqtt_server/mqtt_server.h +++ b/modules/mqtt_server/mqtt_server.h @@ -21,15 +21,7 @@ class MQTTServer { public: static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *addr, int socklen, void *arg); - void on_connect(int rc) { printf("on_connect\n"); } - void on_connect_with_flags(int rc, int flags) { printf("on_connect_with_flags\n"); } - void on_disconnect(int rc) { printf("on_disconnect\n"); } - void on_publish(int mid) { printf("on_publish\n"); } - void on_message(const struct mosquitto_message *message) { printf("on_message\n"); } - void on_subscribe(int mid, int qos_count, const int *granted_qos) { printf("on_subscribe\n"); } - void on_unsubscribe(int mid) { printf("on_unsubscribe\n"); } - void on_log(int level, const char *str) { printf("on_log\n"); } - void on_error() { printf("on_error\n"); } + void add_local_session(const std::string &filter, void (*func)(const std::string &client_id, const std::vector &data)); void initialize(); void run_async();