Implemented local mqtt dispatching.

This commit is contained in:
Relintai 2021-05-01 19:56:29 +02:00
parent 09e13d255c
commit d4142f8923
5 changed files with 73 additions and 49 deletions

View File

@ -143,6 +143,7 @@ void BrokerSession::handle_publish(const PublishPacket &packet) {
}
session_manager.handle_local_publish(client_id, packet);
}
void BrokerSession::handle_puback(const PubackPacket &packet) {

View File

@ -6,42 +6,52 @@
#include "broker_session.h"
#include "topic.h"
#include <memory>
#include <algorithm>
#include <memory>
void SessionManager::accept_connection(struct bufferevent *bev) {
auto session = std::unique_ptr<BrokerSession>(new BrokerSession(bev, *this));
sessions.push_back(std::move(session));
auto session = std::unique_ptr<BrokerSession>(new BrokerSession(bev, *this));
sessions.push_back(std::move(session));
}
std::list<std::unique_ptr<BrokerSession>>::iterator SessionManager::find_session(const std::string &client_id) {
std::list<std::unique_ptr<BrokerSession> >::iterator SessionManager::find_session(const std::string &client_id) {
return find_if(sessions.begin(), sessions.end(), [&client_id](const std::unique_ptr<BrokerSession> & s) {
return (!s->client_id.empty() and (s->client_id == client_id));
});
return find_if(sessions.begin(), sessions.end(), [&client_id](const std::unique_ptr<BrokerSession> &s) {
return (!s->client_id.empty() and (s->client_id == client_id));
});
}
void SessionManager::erase_session(const std::string &client_id) {
sessions.erase(std::remove_if(sessions.begin(), sessions.end(), [&client_id](std::unique_ptr<BrokerSession> &s) {
return (!s->client_id.empty() and (s->client_id == client_id));
}), sessions.end());
sessions.erase(std::remove_if(sessions.begin(), sessions.end(), [&client_id](std::unique_ptr<BrokerSession> &s) {
return (!s->client_id.empty() and (s->client_id == client_id));
}),
sessions.end());
}
void SessionManager::erase_session(const BrokerSession *session)
{
sessions.erase(std::remove_if(sessions.begin(), sessions.end(), [session](std::unique_ptr<BrokerSession> & s) {
return s.get() == session;
}), sessions.end());
void SessionManager::erase_session(const BrokerSession *session) {
sessions.erase(std::remove_if(sessions.begin(), sessions.end(), [session](std::unique_ptr<BrokerSession> &s) {
return s.get() == session;
}),
sessions.end());
}
void SessionManager::handle_publish(const PublishPacket & packet) {
for (auto &session : sessions) {
for (auto &subscription : session->subscriptions) {
if (topic_match(subscription.topic_filter, TopicName(packet.topic_name))) {
session->forward_packet(packet);
}
}
}
void SessionManager::handle_publish(const PublishPacket &packet) {
for (auto &session : sessions) {
for (auto &subscription : session->subscriptions) {
if (topic_match(subscription.topic_filter, TopicName(packet.topic_name))) {
session->forward_packet(packet);
}
}
}
}
void SessionManager::handle_local_publish(const std::string &client_id, const PublishPacket &packet) {
for (size_t i = 0; i < local_sessions.size(); ++i) {
LocalSession &l = local_sessions[i];
if (topic_match(l.filter, TopicName(packet.topic_name))) {
l.func(client_id, packet.message_data);
}
}
}

View File

@ -15,8 +15,9 @@
#pragma once
#include <list>
#include <string>
#include <memory>
#include <string>
#include <vector>
struct bufferevent;
@ -28,11 +29,9 @@ class PublishPacket;
*
* Composes a container of broker sessions and methods to manage them.
*/
class SessionManager
{
class SessionManager {
public:
/**
/**
* Accept a new network connection.
*
* Creates a new BrokerSession instance and adds it to the container of sessions. The session instance will manage
@ -40,17 +39,17 @@ public:
*
* @param bev Pointer to a bufferevent
*/
void accept_connection(struct bufferevent * bev);
void accept_connection(struct bufferevent *bev);
/**
/**
* Find a session in the session container.
*
* @param client_id Unique client id to find.
* @return Iterator to BrokerSession.
*/
std::list<std::unique_ptr<BrokerSession>>::iterator find_session(const std::string & client_id);
std::list<std::unique_ptr<BrokerSession> >::iterator find_session(const std::string &client_id);
/**
/**
* Delete a session
*
* Given a pointer to a BrokerSession, finds that session in the session container and removes it from the
@ -58,17 +57,17 @@ public:
*
* @param session Pointer to a BrokerSession;
*/
void erase_session(const BrokerSession *session);
void erase_session(const BrokerSession *session);
/**
/**
* Finds a session in the session container with the given client id. If found the session is removed from the
* container. The session instance will be deleted.
*
* @param client_id A Client id.
*/
void erase_session(const std::string &client_id);
void erase_session(const std::string &client_id);
/**
/**
* Forward a message to subsribed clients.
*
* Searches through each session and their subscriptions and invokes the forward_packet method on each session
@ -77,9 +76,27 @@ public:
*
* @param publish_packet Reference to a PublishPacket;
*/
void handle_publish(const PublishPacket & publish_packet);
void handle_publish(const PublishPacket &publish_packet);
/** Container of BrokerSessions. */
std::list<std::unique_ptr<BrokerSession>> sessions;
void handle_local_publish(const std::string &client_id, const PublishPacket &packet);
/** Container of BrokerSessions. */
std::list<std::unique_ptr<BrokerSession> > sessions;
public:
void add_local_session(const std::string &filter, void (*func)(const std::string &client_id, const std::vector<uint8_t> &data)) {
LocalSession l;
l.filter = filter;
l.func = func;
local_sessions.push_back(l);
}
struct LocalSession {
std::string filter;
void (*func)(const std::string &client_id, const std::vector<uint8_t> &data);
};
std::vector<LocalSession> local_sessions;
};

View File

@ -1,5 +1,9 @@
#include "mqtt_server.h"
void MQTTServer::add_local_session(const std::string &filter, void (*func)(const std::string &client_id, const std::vector<uint8_t> &data)) {
session_manager->add_local_session(filter, func);
}
void MQTTServer::initialize() {
evloop = event_base_new();

View File

@ -21,15 +21,7 @@ class MQTTServer {
public:
static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *addr, int socklen, void *arg);
void on_connect(int rc) { printf("on_connect\n"); }
void on_connect_with_flags(int rc, int flags) { printf("on_connect_with_flags\n"); }
void on_disconnect(int rc) { printf("on_disconnect\n"); }
void on_publish(int mid) { printf("on_publish\n"); }
void on_message(const struct mosquitto_message *message) { printf("on_message\n"); }
void on_subscribe(int mid, int qos_count, const int *granted_qos) { printf("on_subscribe\n"); }
void on_unsubscribe(int mid) { printf("on_unsubscribe\n"); }
void on_log(int level, const char *str) { printf("on_log\n"); }
void on_error() { printf("on_error\n"); }
void add_local_session(const std::string &filter, void (*func)(const std::string &client_id, const std::vector<uint8_t> &data));
void initialize();
void run_async();