Simple mqtt broker module for a small project that I have to do. Will probably get removed later. It uses https://github.com/inyotech/mqtt_broker .

This commit is contained in:
Relintai 2021-05-01 17:42:25 +02:00
parent c6033ab136
commit 6517a70fd7
27 changed files with 4196 additions and 0 deletions

View File

@ -137,6 +137,7 @@ env_base.Prepend(LINKFLAGS=["-lpthread"])
env_base.Append(CXX=["-o3"])
#env_base.Append(CXX=["-g"])
#env_base.Append(CXX=["-g2"])
#env_base.Append(CXX=["-fno-rtti"])
# Compilation DB requires SCons 3.1.1+.
from SCons import __version__ as scons_raw_version

13
modules/mqtt_server/SCsub Normal file
View File

@ -0,0 +1,13 @@
#!/usr/bin/env python
Import("env_mod")
Import("env")
env_mod.core_sources = []
env_mod.add_source_files(env_mod.core_sources, "*.cpp")
env_mod.add_source_files(env_mod.core_sources, "./mqtt_broker/src/*.cc")
# Build it all as a library
lib = env_mod.add_library("mqtt_server", env_mod.core_sources)
env.Prepend(LIBS=[lib])

View File

@ -0,0 +1,46 @@
import os
import platform
import sys
def is_active():
return True
def get_name():
return "mqtt_server"
def can_build():
if os.name == "posix" or sys.platform == "darwin":
x11_error = os.system("pkg-config --version > /dev/null")
if x11_error:
return False
libevent_err = os.system("pkg-config libevent --modversion --silence-errors > /dev/null ")
if libevent_err:
print("libevent not found! MQTT server will not be available!")
return False
print("libevent found! MQTT server will be available!")
return True
return False
def get_opts():
return []
def get_flags():
return []
def configure(env):
env.ParseConfig("pkg-config libevent --cflags --libs")
env.Append(CPPDEFINES=["MQTT_SERVER_PRESENT"])

View File

@ -0,0 +1 @@
bea4d892540d329cf055a61339200b76001e191d

View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) [year] [fullname]
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,93 @@
# MQTT broker and clients
## About
The [MQTT (MQ Telemetry Transport) publish/subscribe
protocol](htts://mqtt.org) is a simple lightweight messaging protocol
for distributed network connected devices. It provides low overhead,
reliable connectivity for resource constrained devices.
This is an open source, asynchronous, C++ implementation of the broker
(server) and connecting clients. The implementation follows the 3.1.1
OASIS standard available
[here](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html).
## Installation
### Requirements
Asynchronous networking support requires the
[Libevent](http://libevent.org) networking library. Other than that
there are no other external run-time dependencies.
* A C++11 conformant compiler.
* [Libevent](http://libevent.org)
* [CMake](Cmhttps://cmake.org/)
Verified platforms.
* Ubuntu Linux 16.04 (gcc 5.4.0)
* Mac OSX 10.11 (llvm 7.3.0)
### Building
1. Clone this repository.
````
$ git clone https://github.com/inyotech/mqtt_broker.git
$ cd mqtt_broker
````
2. Install the google tests framework
```
$ pushd test/lib
$ git clone https://github.com/google/googletest.git
$ popd
```
3. Create a build directory.
````
$ mkdir build
$ cd build
````
4. Generate build files.
````
$ cmake ..
````
5. Build
````
$ make
````
## Example
* Open a terminal and execute the broker.
````
$ mqtt_broker
````
* In a second terminal execute a subscriber.
````
$ mqtt_client_sub --topic 'a/b/c'
````
* Execute a publisher in a third terminal.
````
$ mqtt_client_pub --topic 'a/b/c' --message 'published message'
````
## Documentation
[Doxygen](http://www.stack.nl/~dimitri/doxygen/) documentation is
available [here](https://inyotech.github.io/mqtt_broker).
## License
This software is licensed under the MIT License. See the LICENSE.TXT file for details.
## TODO
* Client Will not implemented.
* Retained message publication not implemented.
* SSL support not implemented.

View File

@ -0,0 +1,115 @@
/**
* @file base_session.cc
*/
#include "base_session.h"
#include <string>
void BaseSession::packet_received(std::unique_ptr<Packet> packet) {
switch (packet->type) {
case PacketType::Connect:
handle_connect(dynamic_cast<const ConnectPacket &>(*packet));
break;
case PacketType::Connack:
handle_connack(dynamic_cast<const ConnackPacket &>(*packet));
break;
case PacketType::Publish:
handle_publish(dynamic_cast<const PublishPacket &>(*packet));
break;
case PacketType::Puback:
handle_puback(dynamic_cast<const PubackPacket &>(*packet));
break;
case PacketType::Pubrec:
handle_pubrec(dynamic_cast<const PubrecPacket &>(*packet));
break;
case PacketType::Pubrel:
handle_pubrel(dynamic_cast<const PubrelPacket &>(*packet));
break;
case PacketType::Pubcomp:
handle_pubcomp(dynamic_cast<const PubcompPacket &>(*packet));
break;
case PacketType::Subscribe:
handle_subscribe(dynamic_cast<const SubscribePacket &>(*packet));
break;
case PacketType::Suback:
handle_suback(dynamic_cast<const SubackPacket &>(*packet));
break;
case PacketType::Unsubscribe:
handle_unsubscribe(dynamic_cast<const UnsubscribePacket &>(*packet));
break;
case PacketType::Unsuback:
handle_unsuback(dynamic_cast<const UnsubackPacket &>(*packet));
break;
case PacketType::Pingreq:
handle_pingreq(dynamic_cast<const PingreqPacket &>(*packet));
break;
case PacketType::Pingresp:
handle_pingresp(dynamic_cast<const PingrespPacket &>(*packet));
break;
case PacketType::Disconnect:
handle_disconnect(dynamic_cast<const DisconnectPacket &>(*packet));
break;
}
}
void BaseSession::packet_manager_event(PacketManager::EventType event) {
packet_manager->close_connection();
}
void BaseSession::handle_connect(const ConnectPacket &) {
throw std::exception();
}
void BaseSession::handle_connack(const ConnackPacket &) {
throw std::exception();
}
void BaseSession::handle_publish(const PublishPacket &) {
throw std::exception();
}
void BaseSession::handle_puback(const PubackPacket &) {
throw std::exception();
}
void BaseSession::handle_pubrec(const PubrecPacket &) {
throw std::exception();
}
void BaseSession::handle_pubrel(const PubrelPacket &) {
throw std::exception();
}
void BaseSession::handle_pubcomp(const PubcompPacket &) {
throw std::exception();
}
void BaseSession::handle_subscribe(const SubscribePacket &) {
throw std::exception();
}
void BaseSession::handle_suback(const SubackPacket &) {
throw std::exception();
}
void BaseSession::handle_unsubscribe(const UnsubscribePacket &) {
throw std::exception();
}
void BaseSession::handle_unsuback(const UnsubackPacket &) {
throw std::exception();
}
void BaseSession::handle_pingreq(const PingreqPacket &) {
PingrespPacket pingresp_packet;
packet_manager->send_packet(pingresp_packet);
}
void BaseSession::handle_pingresp(const PingrespPacket &) {}
void BaseSession::handle_disconnect(const DisconnectPacket &) {
throw std::exception();
}

View File

@ -0,0 +1,213 @@
/**
* @file base_session.h
*
* Base class for MQTT sessions. This class add facilities for persistence and resumption of session state. The MQTT
* standard requires that the client and server both maintain session state while connected. The server is also
* required to resume the state when a client re-connects with the same client id.
*/
#pragma once
#include "packet.h"
#include "packet_manager.h"
#include <event2/bufferevent.h>
#include <string>
/**
* Base session class
*
* Maintains session attributes and provides default handler methods for received control packets. Classes derived
* from BaseSession will override control packet handlers as required.
*
* Each BaseSession composes a PacketManager instance that can be moved between BaseSession instances.
*/
class BaseSession {
public:
/**
* Constructor
*
* Accepts a pointer to a libevent bufferevent as the only argument. The bufferevent is forwarded to a
* newly instantiated PacketManager that use it to handle all network related functions.
*
* This BaseSession instance can persist in memory after the network connection is closed. If a connection is
* received and the Connect control packet contains the same client id as an existing session. Any currently
* active connection in the original session is closed and this PacketManager will be moved to the original
* session. This session will then be deleted.
*
* @param bev Pointer to a bufferevent.
*/
BaseSession(struct bufferevent *bev) : packet_manager(new PacketManager(bev)) {
packet_manager->set_packet_received_handler(
std::bind(&BaseSession::packet_received, this, std::placeholders::_1));
packet_manager->set_event_handler(std::bind(&BaseSession::packet_manager_event, this, std::placeholders::_1));
}
/**
* Desctructor
*
* Virtual so the desctructor for derived classes will be called.
*/
virtual ~BaseSession() {}
/** Client id. */
std::string client_id;
/** Clean session flag. */
bool clean_session;
/**
* PacketManager callback.
*
* Invoked by the installed PacketManager instance when it receives a complete control packet. The default handler
* methods will be passed a reference to the received packet. Packet memory is heap allocated on creation and
* will be freed according to standard C++ std::unique_ptr rules. It is the responsibility of subclasses to
* manage the std::unique_ptr<Packet>.
*
* @param packet Pointer to a packet.
*/
virtual void packet_received(std::unique_ptr<Packet> packet);
/**
* PacketManager callback.
*
* Invoked by the installed PacketManager instance when it detects a low level protocol or network error. The
* default action is to close the network connection.
*
* @param event The type of event detected.
*/
virtual void packet_manager_event(PacketManager::EventType event);
/**
* Handle a received ConnectPacket.
*
* The default action is to throw an exception. Subclasses should override this method.
*
* @param connect_packet A reference to the packet.
*/
virtual void handle_connect(const ConnectPacket & connect_packet);
/**
* Handle a received ConnackPacket.
*
* The default action is to throw an exception. Subclasses should override this method.
*
* @param connack_packet A reference to the packet.
*/
virtual void handle_connack(const ConnackPacket & connack_packet);
/**
* Handle a received PublishPacket.
*
* The default action is to throw an exception. Subclasses should override this method.
*
* @param publish_packet A reference to the packet.
*/
virtual void handle_publish(const PublishPacket & publish_packet);
/**
* Handle a received PubackPacket.
*
* The default action is to throw an exception. Subclasses should override this method.
*
* @param puback_packet A reference to the packet.
*/
virtual void handle_puback(const PubackPacket & puback_packet);
/**
* Handle a received PubrecPacket.
*
* The default action is to throw an exception. Subclasses should override this method.
*
* @param pubrec_packet A reference to the packet.
*/
virtual void handle_pubrec(const PubrecPacket & pubrec_packet);
/**
* Handle a received PubrelPacket.
*
* The default action is to throw an exception. Subclasses should override this method.
*
* @param pubrel_packet A reference to the packet.
*/
virtual void handle_pubrel(const PubrelPacket & pubrel_packet);
/**
* Handle a received PubcompPacket.
*
* The default action is to throw an exception. Subclasses should override this method.
*
* @param pubcomp_packet A reference to the packet.
*/
virtual void handle_pubcomp(const PubcompPacket & pubcomp_packet);
/**
* Handle a received SubscribePacket.
*
* The default action is to throw an exception. Subclasses should override this method.
*
* @param subscribe_packet A reference to the packet.
*/
virtual void handle_subscribe(const SubscribePacket & subscribe_packet);
/**
* Handle a received SubackPacket.
*
* The default action is to throw an exception. Subclasses should override this method.
*
* @param suback_packet A reference to the packet.
*/
virtual void handle_suback(const SubackPacket & suback_packet);
/**
* Handle a received UnsubscribePacket.
*
* The default action is to throw an exception. Subclasses should override this method.
*
* @param unsubscribe_packet A reference to the packet.
*/
virtual void handle_unsubscribe(const UnsubscribePacket & unsubscribe_packet);
/**
* Handle a received UnsubackPacket.
*
* The default action is to throw an exception. Subclasses should override this method.
*
* @param unsuback_packet A reference to the packet.
*/
virtual void handle_unsuback(const UnsubackPacket & unsuback_packet);
/**
* Handle a received PingreqPacket.
*
* The default action is to send a Pingresp packet. Subclasses can override this method.
*
* @param pingreq_packet A reference to the packet.
*/
virtual void handle_pingreq(const PingreqPacket & pingreq_packet);
/**
* Handle a received PingrespPacket.
*
* The default action is to do nothing. Subclasses can override this method.
*
* @param pingresp_packet A reference to the packet.
*/
virtual void handle_pingresp(const PingrespPacket & pingresp_packet);
/**
* Handle a received DisconnectPacket.
*
* The default action is to throw an exception. Subclasses should override this method.
*
* @param disconnect_packet A reference to the packet.
*/
virtual void handle_disconnect(const DisconnectPacket & disconnect_packet);
/** Pointer to the installed PacketManager instance. */
std::unique_ptr<PacketManager> packet_manager;
};

View File

@ -0,0 +1,181 @@
/**
* @file broker.cc
*
* MQTT Broker (server)
*
* Listen for connections from clients. Accept subscribe, unsubscribe and publish commands and forward according to
* the [MQTT protocol](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html)
*/
#include "session_manager.h"
#include "broker_session.h"
#include <event2/listener.h>
#include <getopt.h>
#include <csignal>
#include <cstring>
/**
* Manange sessions for each client.
* Sessions will persist between connections and are identified by the client id of the connecting client.
*/
SessionManager session_manager;
/**
* Callback run when SIGINT or SIGTERM is attached, will cleanly exit.
*
* @param signal Integer value of signal.
* @param event Should be EV_SIGNAL.
* @param arg Pointer originally passed to evsignal_new.
*/
static void signal_cb(evutil_socket_t signal, short event, void * arg);
/**
* Callback run when connection is received on the listening socket.
*
* @param listener Pointer to this listener's internal control structure.
* @param fd File descriptor of the newly accepted socket.
* @param addr Address structure for the peer.
* @param socklen Length of the address structure.
* @param arg Pointer orignally passed to evconlistener_new_bind.
*/
static void listener_cb(struct evconnlistener * listener, evutil_socket_t fd,
struct sockaddr * addr, int socklen, void * arg);
/**
* Parse command line.
*
* Recognized command line arguments are parsed and added to the options instance. This options instance will be
* used to configure the broker instance.
*
* @param argc Command line argument count.
* @param argv Command line argument values.
*/
static void parse_arguments(int argc, char *argv[]);
/**
* Options settable through command line arguments.
*/
struct options_t {
/** Network interface address to bind to. */
std::string bind_address = "0";
/** Port number to bind to. */
uint16_t bind_port = 1883;
} options;
int main(int argc, char *argv[]) {
struct event_base *evloop;
struct event *signal_event;
struct evconnlistener *listener;
struct sockaddr_in sin;
unsigned short listen_port = 1883;
parse_arguments(argc, argv);
evloop = event_base_new();
if (!evloop) {
std::cerr << "Could not initialize libevent\n";
return 1;
}
signal_event = evsignal_new(evloop, SIGINT, signal_cb, evloop);
evsignal_add(signal_event, NULL);
signal_event = evsignal_new(evloop, SIGTERM, signal_cb, evloop);
evsignal_add(signal_event, NULL);
std::memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
evutil_inet_pton(sin.sin_family, options.bind_address.c_str(), &sin.sin_addr);
sin.sin_port = htons(listen_port);
listener = evconnlistener_new_bind(evloop, listener_cb, (void *) evloop,
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1,
(struct sockaddr *) &sin, sizeof(sin));
if (!listener) {
std::cerr << "Could not create listener!\n";
return 1;
}
event_base_dispatch(evloop);
event_free(signal_event);
evconnlistener_free(listener);
event_base_free(evloop);
return 0;
}
void usage() {
std::cout <<
R"END(usage: mqtt_broker [OPTION]
MQTT broker server. Bind to address and listen for client connections.
OPTIONS
--broker-host | -b Broker host name or ip address, default localhost
--broker-port | -p Broker port, default 1883
--help | -h Display this message and exit
)END";
}
void parse_arguments(int argc, char *argv[]) {
static struct option longopts[] = {
{"bind-addr", required_argument, NULL, 'b'},
{"bind-port", required_argument, NULL, 'p'},
{"help", no_argument, NULL, 'h'}
};
int ch;
while ((ch = getopt_long(argc, argv, "b:p:h", longopts, NULL)) != -1) {
switch (ch) {
case 'b':
options.bind_address = optarg;
break;
case 'p':
options.bind_port = static_cast<uint16_t>(atoi(optarg));
break;
case 'h':
usage();
std::exit(0);
default:
usage();
std::exit(1);
}
}
}
static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
struct sockaddr *sa, int socklen, void *user_data) {
struct event_base *base = static_cast<event_base *>(user_data);
struct bufferevent *bev;
bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
if (!bev) {
std::cerr << "Error constructing bufferevent!\n";
event_base_loopbreak(base);
return;
}
session_manager.accept_connection(bev);
}
static void signal_cb(evutil_socket_t fd, short event, void *arg) {
event_base *base = static_cast<event_base *>(arg);
if (event_base_loopexit(base, NULL)) {
std::cerr << "failed to exit event loop\n";
}
}

View File

@ -0,0 +1,249 @@
/**
* @file broker_session.cc
*/
#include "broker_session.h"
#include "session_manager.h"
#include <algorithm>
bool BrokerSession::authorize_connection(const ConnectPacket &packet) {
return true;
}
void BrokerSession::resume_session(std::unique_ptr<BrokerSession> &session,
std::unique_ptr<PacketManager> packet_manager_ptr) {
packet_manager_ptr->set_event_handler(
std::bind(&BrokerSession::packet_manager_event, session.get(), std::placeholders::_1));
packet_manager_ptr->set_packet_received_handler(
std::bind(&BrokerSession::packet_received, session.get(), std::placeholders::_1));
session->packet_manager = std::move(packet_manager_ptr);
ConnackPacket connack;
connack.session_present(true);
connack.return_code = ConnackPacket::ReturnCode::Accepted;
session->packet_manager->send_packet(connack);
}
void BrokerSession::forward_packet(const PublishPacket &packet) {
if (packet.qos() == QoSType::QoS0) {
packet_manager->send_packet(packet);
} else if (packet.qos() == QoSType::QoS1) {
PublishPacket packet_to_send(packet);
packet_to_send.dup(false);
packet_to_send.retain(false);
packet_to_send.packet_id = packet_manager->next_packet_id();
qos1_pending_puback.push_back(packet_to_send);
packet_manager->send_packet(packet_to_send);
} else if (packet.qos() == QoSType::QoS2) {
PublishPacket packet_to_send(packet);
packet_to_send.dup(false);
packet_to_send.retain(false);
packet_to_send.packet_id = packet_manager->next_packet_id();
auto previous_packet = find_if(qos2_pending_pubrec.begin(), qos2_pending_pubrec.end(),
[&packet](const PublishPacket &p) { return p.packet_id == packet.packet_id; });
if (previous_packet == qos2_pending_pubrec.end()) {
qos2_pending_pubrec.push_back(packet_to_send);
}
packet_manager->send_packet(packet_to_send);
}
}
void BrokerSession::send_pending_message() {
if (!qos1_pending_puback.empty()) {
packet_manager->send_packet(qos1_pending_puback[0]);
} else if (!qos2_pending_pubrec.empty()) {
packet_manager->send_packet(qos2_pending_pubrec[0]);
} else if (!qos2_pending_pubrel.empty()) {
PubrecPacket pubrec_packet;
pubrec_packet.packet_id = qos2_pending_pubrel[0];
packet_manager->send_packet(pubrec_packet);
} else if (!qos2_pending_pubcomp.empty()) {
PubrelPacket pubrel_packet;
pubrel_packet.packet_id = qos2_pending_pubcomp[0];
packet_manager->send_packet(pubrel_packet);
}
return;
}
void BrokerSession::packet_received(std::unique_ptr<Packet> packet) {
BaseSession::packet_received(std::move(packet));
send_pending_message();
}
void BrokerSession::packet_manager_event(PacketManager::EventType event) {
BaseSession::packet_manager_event(event);
if (clean_session) {
session_manager.erase_session(this);
}
}
void BrokerSession::handle_connect(const ConnectPacket &packet) {
if (!authorize_connection(packet)) {
session_manager.erase_session(this);
return;
}
if (packet.clean_session()) {
session_manager.erase_session(packet.client_id);
} else {
auto previous_session_it = session_manager.find_session(packet.client_id);
if (previous_session_it != session_manager.sessions.end()) {
std::unique_ptr<BrokerSession> &previous_session_ptr = *previous_session_it;
resume_session(previous_session_ptr, std::move(packet_manager));
session_manager.erase_session(this);
return;
}
}
client_id = packet.client_id;
clean_session = packet.clean_session();
ConnackPacket connack;
connack.session_present(false);
connack.return_code = ConnackPacket::ReturnCode::Accepted;
packet_manager->send_packet(connack);
}
void BrokerSession::handle_publish(const PublishPacket &packet) {
if (packet.qos() == QoSType::QoS0) {
session_manager.handle_publish(packet);
} else if (packet.qos() == QoSType::QoS1) {
session_manager.handle_publish(packet);
PubackPacket puback;
puback.packet_id = packet.packet_id;
packet_manager->send_packet(puback);
} else if (packet.qos() == QoSType::QoS2) {
auto previous_packet = find_if(qos2_pending_pubrel.begin(), qos2_pending_pubrel.end(),
[& packet](uint16_t packet_id) { return packet_id == packet.packet_id; });
if (previous_packet == qos2_pending_pubrel.end()) {
qos2_pending_pubrel.push_back(packet.packet_id);
session_manager.handle_publish(packet);
}
}
}
void BrokerSession::handle_puback(const PubackPacket &packet) {
auto message = find_if(qos1_pending_puback.begin(), qos1_pending_puback.end(),
[&packet](const PublishPacket &p) { return p.packet_id == packet.packet_id; });
if (message != qos1_pending_puback.end()) {
qos1_pending_puback.erase(message);
}
}
void BrokerSession::handle_pubrec(const PubrecPacket &packet) {
qos2_pending_pubrec.erase(
std::remove_if(qos2_pending_pubrec.begin(), qos2_pending_pubrec.end(),
[&packet](const PublishPacket &p) { return p.packet_id == packet.packet_id; }),
qos2_pending_pubrec.end()
);
auto pubcomp_packet = find_if(qos2_pending_pubcomp.begin(), qos2_pending_pubcomp.end(),
[&packet](uint16_t packet_id) { return packet_id == packet.packet_id; });
if (pubcomp_packet == qos2_pending_pubcomp.end()) {
qos2_pending_pubcomp.push_back(packet.packet_id);
}
}
void BrokerSession::handle_pubrel(const PubrelPacket &packet) {
qos2_pending_pubrel.erase(
std::remove_if(qos2_pending_pubrel.begin(), qos2_pending_pubrel.end(),
[&packet](uint16_t packet_id) { return packet_id == packet.packet_id; }),
qos2_pending_pubrel.end()
);
PubcompPacket pubcomp;
pubcomp.packet_id = packet.packet_id;
packet_manager->send_packet(pubcomp);
}
void BrokerSession::handle_pubcomp(const PubcompPacket &packet) {
qos2_pending_pubcomp.erase(
std::remove_if(qos2_pending_pubcomp.begin(), qos2_pending_pubcomp.end(),
[&packet](uint16_t packet_id) { return packet_id == packet.packet_id; }),
qos2_pending_pubcomp.end()
);
}
void BrokerSession::handle_subscribe(const SubscribePacket &packet) {
SubackPacket suback;
suback.packet_id = packet.packet_id;
for (auto subscription : packet.subscriptions) {
auto previous_subscription = find_if(subscriptions.begin(), subscriptions.end(),
[&subscription](const Subscription &s) {
return topic_match(s.topic_filter, subscription.topic_filter);
});
if (previous_subscription != subscriptions.end()) {
subscriptions.erase(previous_subscription);
}
subscriptions.push_back(subscription);
SubackPacket::ReturnCode return_code = SubackPacket::ReturnCode::Failure;
switch (subscription.qos) {
case QoSType::QoS0:
return_code = SubackPacket::ReturnCode::SuccessQoS0;
break;
case QoSType::QoS1:
return_code = SubackPacket::ReturnCode::SuccessQoS1;
break;
case QoSType::QoS2:
return_code = SubackPacket::ReturnCode::SuccessQoS2;
break;
}
suback.return_codes.push_back(return_code);
}
packet_manager->send_packet(suback);
}
void BrokerSession::handle_unsubscribe(const UnsubscribePacket &packet) {
UnsubackPacket unsuback;
unsuback.packet_id = packet.packet_id;
packet_manager->send_packet(unsuback);
}
void BrokerSession::handle_disconnect(const DisconnectPacket &packet) {
if (clean_session) {
session_manager.erase_session(this);
}
}

View File

@ -0,0 +1,266 @@
/**
* @file broker_session.h
*
* This class builds on the BaseSession class adding members and methods needed for a MQTT session in the broker.
*
* Drived class for MQTT broker sessions. In addition to maintaining session state throughout the lifetime of the
* connection, the MQTT specification requires that the broker persist session state after a client closes the
* connection. When a subsequent connection is made with the same client id, the persisted session should be resumed.
* Any QoS 1 or QoS 2 messages delivered to client subscribed topics should be forwarded over the new connection.
*/
#pragma once
#include "base_session.h"
#include "packet_manager.h"
#include "packet.h"
#include <event2/bufferevent.h>
#include <list>
#include <memory>
class SessionManager;
class Message;
class Subscription;
/**
* Broker session class
*
* In addition to maintaining session attributes and handling control packets, this subclass adds facilities for
* persisting and resuming session state according to the MQTT 3.1.1 standard.
*/
class BrokerSession : public BaseSession {
public:
/**
* Constructor
*
* In addition to the bufferen event pointer required by the BaseSession constructor, this constructor accepts
* a reference to a SessionManager class.
*
* @param bev Pointer to a bufferevent internal control structure.
* @param session_manager Reference to the SessionManager.
*/
BrokerSession(struct bufferevent *bev, SessionManager &session_manager) : BaseSession(bev),
session_manager(session_manager) {
}
/**
* Handle authentication and authorization of a connecting client.
*
* This method is called from the connect packet handler. It currently always returns true.
*
* @return Authorization granted.
*/
bool authorize_connection(const ConnectPacket &);
/**
* Resume a persisted session.
*
* The MQTT 3.1.1 standard requires that the session state be restored for clients connecting with the same client
* id. This method is used to perform that action once a persisted session is recognized. This method accepts
* a reference to the BrokerSession to be restored and PacketManager instance to be installed in the restored
* session. Once installed a the PacketManager will send a Connack packet to the connecting client with the
* Session Present flag set.
*
* @param session Reference to the session to be resumed.
* @param packet_manager PacketManager to be installed in the resumed session.
*/
void resume_session(std::unique_ptr<BrokerSession> &session,
std::unique_ptr<PacketManager> packet_manager);
/**
* List of topics subscribed to by this client.
*/
std::vector<Subscription> subscriptions;
/**
* Forward a publshed message to the connected client.
*
* This method is called by the SessionManager when forwarding messages to subscribed clients. It will behave
* according to the QoS in the PublishPacket. QoS 0 packets will be forwarded and forgotten. In the case of QoS 1
* or 2 messages, these will be retained until they are acknowledged according to the publish control packet
* protocol flow described in the MQTT 3.1.1 standard.
*
* @param packet Reference to the PublishPacket to forward.
*/
void forward_packet(const PublishPacket &packet);
/**
* Send messages from the pending queues.
*
* Iterate through the pending message queues and if non-empty send a single pending message. This method should
* be called periodically. Currently it is called each time a packet is received from a client.
*/
void send_pending_message(void);
/**
* PacketManager callback.
*
* This method will delegate to the BaseSession method and then invoke send_pending_message. Ownership of the
* Packet is transfered back to the BaseSession instance.
*
* @param packet Reference counted pointer to a packet.
*/
void packet_received(std::unique_ptr<Packet> packet) override;
/**
* PacketManager callback.
*
* This method will delegate to the BaseSession method then potentially remove this session from the SessionManager
* based on the clean_session flag.
*
* @param event The type of event detected.
*/
void packet_manager_event(PacketManager::EventType event) override;
/**
* Handle a received ConnectPacket.
*
* This method will examine the ConnectPacket and restore or set up a new session. The authorize_connection method
* will be called to authenticate and authorize this connection. The client id will be used to lookup any
* previous session and if found the PacketManager instance installed in this session will be moved to the
* persisted session and this session instance will be destroyed.
*
* In case a persisted session is not found. A Connack packet will be sent with the session_present flag set to
* false.
*
* @param connect_packet A reference to the packet.
*/
void handle_connect(const ConnectPacket & connect_packet) override;
/**
* Handle a received PublishPacket.
*
* Delegate forwarding of this message to the SessionManager. Additional actions will be performed based on the
* QoS value in the PublishPacket. For QoS 1 a Puback packet will be sent. For QoS 2, queue and expected Pubrel
* packet id which will initiate the sending of a Pubrec packet at the next run of the pending packets queue.
*
* @param publish_packet A reference to the packet.
*/
void handle_publish(const PublishPacket & publish_packet) override;
/**
* Handle a received PubackPacket.
*
* This packet is expected in response to a Publish control packet with QoS 1. Publish packets with QoS 1 will be
* resent periodically until a Puback is received. QoS 1 Publish packets have an 'at least once' delivery
* guarantee. This handler will remove the Publish packet from the queue of pending messages ending the QoS 1
* protocol flow.
*
* @param puback_packet A reference to the packet.
*/
void handle_puback(const PubackPacket & puback_packet) override;
/**
* Handle a received PubrecPacket
*
* This packet is received in response to a Publish control packet with QoS 2. Publish packets with QoS 2 will be
* resent periodically until a PubRec is received. QoS 2 Publish packets have an 'exactly once' delivery
* guarantee. This handler will remove the Publish packet from the queue of pending messages and will continue
* the QoS 2 protocol flow by adding the packet id to the pending Pubcomp queue. This will enable the send of a
* Pubrel control packet at the next pending packet queue run.
*
* @param pubrec_packet A reference to the packet.
*/
void handle_pubrec(const PubrecPacket & pubrec_packet) override;
/**
* Handle a received PubrelPacket.
*
* This packet is received in response to a Pubrec packet in the QoS 2 protocol flow. This handler will remove
* the Pubrec packet id from the queue of pending Pubrel packets and send a Pubcomp packet ending the QoS 2
* protocol flow.
*
* @param pubrel_packet A reference to the packet.
*/
void handle_pubrel(const PubrelPacket & pubrel_packet) override;
/**
* Handle a received PubcompPacket.
*
* This packet is received in response to a Pubrel control packet in the QoS 2 protocol flow. This handler will
* remove the packet id from the pending Pubcomp queue. No further processing is done.
*
* @param pubcomp_packet A reference to the packet.
*/
void handle_pubcomp(const PubcompPacket & pubcomp_packet) override;
/**
* Handle a received SubscribePacket.
*
* Add the contained topic names to the list of subscriptions maintained in this session. Any previous matching
* subscribed topic will be replaced by the new one overriding the subscribed QoS. Send a Suback packet in
* response.
*
* @param subscribe_packet A reference to the packet.
*/
void handle_subscribe(const SubscribePacket & subscribe_packet) override;
/**
* Handle a received UnsubscribePacket.
*
* Remove the topic name from the list of subscribed topics.
*
* //TODO This function is currently incomplete.
*
* @param unsubscribe_packet A reference to the packet.
*/
void handle_unsubscribe(const UnsubscribePacket & unsubscribe_packet) override;
/**
* Handle a DisconnectPacket.
*
* Remove this session from the pool of persistent sessions and destroy it, depending on the clean_session
* attribute of this session.
*
* @param disconnect_packet A reference to the packet.
*/
void handle_disconnect(const DisconnectPacket & disconnect_packet) override;
/**
* List of QoS 1 messages waiting for Puback.
*
* These messages have been forwarded to subscribed clients. This list will be persisted between connections as
* part of the BrokerSession state.
*/
std::vector<PublishPacket> qos1_pending_puback;
/**
* List of QoS 2 messages waiting for Pubrec.
*
* These messages have been forwarded to subscribed clients. This list will be persisted between connections as
* part of the BrokerSession state.
*/
std::vector<PublishPacket> qos2_pending_pubrec;
/**
* List of QoS 2 messages waiting for Pubrel.
*
* These messages have been received in Publish control packets from a client connected to this session and
* potentially forwarded on by the SessionManager to other subscribed clients. Packet ids are added to this
* list when a Pubrec control packet has been sent. The list will be persisted between connections as part of the
* BrokerSession state.
*/
std::vector<uint16_t> qos2_pending_pubrel;
/**
* List of QoS 2 messages waiting for Pubcomp.
*
* These messages have been forwarded to subscribed clients. Packet ids are added to this list when a Pubrel
* control packet has been received. This list will be persisted between connections as part of the BrokerSession
* state.
*/
std::vector<uint16_t> qos2_pending_pubcomp;
/**
* A reference to the SessionManager instance.
*/
SessionManager &session_manager;
};

View File

@ -0,0 +1,26 @@
/**
* @file client_id.cc
*/
#include "client_id.h"
#include <cstdlib>
#include <ctime>
#include <mutex>
static const std::string characters = "abcdefghijklmnopqrstuvwxyz0123456789";
static std::once_flag init_rnd;
std::string generate_client_id(size_t len) {
std::call_once(init_rnd, [](){ std::srand(std::time(nullptr)); });
std::string random_string;
for (size_t i=0; i<len; i++) {
random_string += characters[std::rand() % characters.size()];
}
return random_string;
}

View File

@ -0,0 +1,23 @@
/**
* @file client_id.h
*
* Generate a unique client id.
*
* This function is used to generate a random client id for clients that don't provide one in their Connect control
* packet.
*/
#pragma once
#include <string>
#include <cstdint>
/**
* Generate a unique client id.
*
* The client id is a random character sequence drawn from the character set [a-z0-9].
*
* @param len Length of the sequence to generate
* @return Random character sequence
*/
std::string generate_client_id(size_t len=32);

View File

@ -0,0 +1,362 @@
/**
* @file client_pub.cc
*
* MQTT Publisher (client)
*
* Connect to a listening broker and publish a message on a topic. Topic and message are passed as command line
* arguments to this program.
* See [the MQTT specification](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html)
*/
#include "packet.h"
#include "base_session.h"
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/dns.h>
#include <evdns.h>
#include <getopt.h>
#include <iostream>
#include <vector>
#include <cstring>
/**
* Display usage message
*
* Displays help on command line arguments.
*/
static void usage(void);
/**
* Parse command line.
*
* Recognized command line arguments are parsed and added to the options instance. This options instance will be
* passed to the session instance.
*
* @param argc Command line argument count.
* @param argv Command line argument values.
*/
static void parse_arguments(int argc, char *argv[]);
/**
* On broker connection callback.
*
* @param bev Pointer to bufferevent internal control structure.
* @param event The bufferevent event code, on success this will be EV_EVENT_CONNECTED, it can also be one of
* EV_EVENT_EOF, EV_EVENT_ERROR, or EV_EVENT_TIMEOUT.
* @param arg Pointer to user data passed in when callback is set. Here this should be a pointer to the event_base
* object.
*/
static void connect_event_cb(struct bufferevent *bev, short event, void *arg);
/**
* Socket close handler.
*
* When the session is closing this function is set to be called when the write buffer is empty. When called it will
* close the connection and exit the event loop.
*
* @param bev Pointer to the bufferevent internal control structure.
* @param arg Pointer to user data passed in when callback is set. Here this should be a pointer to the event_base
* object.
*/
static void close_cb(struct bufferevent *bev, void *arg);
/**
* Options settable through command line arguments.
*/
static struct options_t {
/** Broker host to connect to. DNS name is allowed. */
std::string broker_host = "localhost";
/** Broker port. */
uint16_t broker_port = 1883;
/** Client id. If empty no client id will be sent. The broker will generate a client id automatically. */
std::string client_id;
/** Topic to publish to. */
std::string topic;
/** Message text to publish. */
std::vector<uint8_t> message;
/** Quality of service specifier for message. */
QoSType qos = QoSType::QoS0;
/**
* Drop this session on exit if true. If false this session will persist in the broker after disconnection
* and will be continued at the next connection with the same client id. This feature isn't useful when a client
* id is not provided.
*/
bool clean_session = false;
} options;
/**
* Session class specialized for this publishing client.
*
* Override base class control packet handlers used in message publishing. Any other control packets received will be
* handled by default methods, in most cases that will result in a thrown exception.
*/
class ClientSession : public BaseSession {
public:
/**
* Constructor
*
* @param bev Pointer to the buffer event structure for the broker connection.
* @param options Options structure.
*/
ClientSession(bufferevent *bev, const options_t &options) : BaseSession(bev), options(options) {}
/** Reference to the options structure with members updated from command line arguments. */
const options_t &options;
/**
* Store the packet id of the publish packet we send for comparisison with any puback or pubcomp control packets
* received from the broker.
*/
uint16_t published_packet_id = 0;
/**
* Handle a connack control packet from the broker.
*
* This packet will be sent in response to the connect packet that this client will send. Check the return code
* and disconnect on any error. Otherwise construct a publish packet from the options and send this to the broker.
*
* @param connack_packet The connack control packet received.
*/
void handle_connack(const ConnackPacket &connack_packet) override {
if (connack_packet.return_code != ConnackPacket::ReturnCode::Accepted) {
std::cerr << "connection not accepted by broker\n";
disconnect();
}
PublishPacket publish_packet;
publish_packet.qos(options.qos);
publish_packet.topic_name = options.topic;
publish_packet.packet_id = packet_manager->next_packet_id();
published_packet_id = publish_packet.packet_id;
publish_packet.message_data = std::vector<uint8_t>(options.message.begin(), options.message.end());
packet_manager->send_packet(publish_packet);
if (options.qos == QoSType::QoS0) {
disconnect();
}
}
/**
* Handle a puback control packet from the broker.
*
* This packet will be sent in response to a publish message with QoS 1. Compare the packet id with the packet
* id sent in a previous publish message. Notify in case of a mismatch.
*
* @param puback_packet The received puback control packet.
*/
void handle_puback(const PubackPacket &puback_packet) override {
if (puback_packet.packet_id != published_packet_id) {
std::cout << "puback packet id mismatch: sent " << published_packet_id << " received "
<< puback_packet.packet_id << "\n";
}
disconnect();
}
/**
* Handle a pubrec control packet from the broker.
*
* This packet will be sent in repsponse to a publish message with QoS 2. Compare the packet id with the packet
* id sent in a previous publish message. Notify in case of mismatch. Send a pubrel packet containing the received
* packet id in response. Wait for pubcomp control packet response.
*
* @param pubrec_packet The received pubrec control packet.
*/
void handle_pubrec(const PubrecPacket &pubrec_packet) override {
if (pubrec_packet.packet_id != published_packet_id) {
std::cout << "pubrec packet id mismatch: sent " << published_packet_id << " received "
<< pubrec_packet.packet_id << "\n";
}
PubrelPacket pubrel_packet;
pubrel_packet.packet_id = pubrec_packet.packet_id;
packet_manager->send_packet(pubrel_packet);
}
/**
* Handle a pubcomp control packet from the broker.
*
* This packet will be sent in response to a pubrel packet confirmation of a QoS 2 publish message. Compare the
* packet id sent in a previous publish message. Notify in case of mismatch. On receipt of this message the QoS 2
* confirmation exchange is complete. If the packet id matches the the packet id of the original message
* disconnect from the broker, otherwise stay connected waiting for the pubcomp with the required packet id.
*
* @param puback_packet the received pubcomp packet
*/
void handle_pubcomp(const PubcompPacket &pubcomp_packet) override {
if (pubcomp_packet.packet_id != published_packet_id) {
std::cout << "pubcomp packet id mismatch: sent " << published_packet_id << " received "
<< pubcomp_packet.packet_id << "\n";
} else {
disconnect();
}
}
/**
* Disconnect from the broker.
*
* Send a disconnect control packet. Set up libevent to wait for the disconnect packet to be written then close
* the network connection.
*/
void disconnect() {
DisconnectPacket disconnect_packet;
packet_manager->send_packet(disconnect_packet);
bufferevent_enable(packet_manager->bev, EV_WRITE);
bufferevent_setcb(packet_manager->bev, packet_manager->bev->readcb, close_cb, NULL,
packet_manager->bev->ev_base);
}
};
/**
* The session instance.
*
* MQTT requires that both the client and server maintain a session state.
*/
static std::unique_ptr<ClientSession> session;
int main(int argc, char *argv[]) {
struct event_base *evloop;
struct evdns_base *dns_base;
struct bufferevent *bev;
parse_arguments(argc, argv);
evloop = event_base_new();
if (!evloop) {
std::cerr << "Could not initialize libevent\n";
std::exit(1);
}
dns_base = evdns_base_new(evloop, 1);
bev = bufferevent_socket_new(evloop, -1, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, NULL, NULL, connect_event_cb, evloop);
bufferevent_socket_connect_hostname(bev, dns_base, AF_UNSPEC, options.broker_host.c_str(), options.broker_port);
evdns_base_free(dns_base, 0);
event_base_dispatch(evloop);
event_base_free(evloop);
}
static void connect_event_cb(struct bufferevent *bev, short events, void *arg) {
if (events & BEV_EVENT_CONNECTED) {
session = std::unique_ptr<ClientSession>(new ClientSession(bev, options));
ConnectPacket connect_packet;
connect_packet.client_id = options.client_id;
session->packet_manager->send_packet(connect_packet);
} else if (events & (BEV_EVENT_ERROR | BEV_EVENT_EOF)) {
std::cerr << "error connecting to broker\n";
struct event_base *base = static_cast<struct event_base *>(arg);
if (events & BEV_EVENT_ERROR) {
int err = bufferevent_socket_get_dns_error(bev);
if (err)
std::cerr << "DNS error: " << evutil_gai_strerror(err) << "\n";
}
bufferevent_free(bev);
event_base_loopexit(base, NULL);
}
}
static void usage() {
std::cout <<
R"END(usage: mqtt_client_pub [OPTIONS]
Connect to an MQTT broker and publish a single message to a single topic.
OPTIONS
--broker-host | -b Broker host name or ip address, default localhost
--broker-port | -p Broker port, default 1883
--client-id | -i Client id, default none
--topic | -t Topic string, default none
--message | -m Message data, default none
--qos | -q QoS (Quality of Service), should be 0, 1, or 2, default 0
--clean-session | -c Disable session persistence, default false
--help | -h Display this message and exit
)END";
}
static void parse_arguments(int argc, char *argv[]) {
static struct option longopts[] = {
{"broker-host", required_argument, NULL, 'b'},
{"broker-port", required_argument, NULL, 'p'},
{"client-id", required_argument, NULL, 'i'},
{"topic", required_argument, NULL, 't'},
{"message", required_argument, NULL, 'm'},
{"qos", required_argument, NULL, 'q'},
{"clean-session", no_argument, NULL, 'c'},
{"help", no_argument, NULL, 'h'}
};
int ch;
while ((ch = getopt_long(argc, argv, "b:p:i:t:m:q:ch", longopts, NULL)) != -1) {
switch (ch) {
case 'b':
options.broker_host = optarg;
break;
case 'p':
options.broker_port = static_cast<uint16_t>(atoi(optarg));
break;
case 'i':
options.client_id = optarg;
break;
case 't':
options.topic = optarg;
break;
case 'm':
options.message = std::vector<uint8_t>(optarg, optarg + strlen(optarg));
case 'q':
options.qos = static_cast<QoSType>(atoi(optarg));
break;
case 'c':
options.clean_session = true;
break;
case 'h':
usage();
std::exit(0);
default:
usage();
std::exit(1);
}
}
}
static void close_cb(struct bufferevent *bev, void *arg) {
session->packet_manager->close_connection();
event_base *base = static_cast<event_base *>(arg);
event_base_loopexit(base, NULL);;
}

View File

@ -0,0 +1,369 @@
/**
* @file client_sub.cc
*
* MQTT Subscriber (client)
*
* Connect to a listening broker and add a subscription a topic then listen for pubished messages from the broker.
* Topic strings can follow the MQTT 3.1.1 specification including wildcards.
* See [the MQTT specification](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html)
*/
#include "packet.h"
#include "packet_manager.h"
#include "base_session.h"
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/dns.h>
#include <evdns.h>
#include <getopt.h>
#include <iostream>
#include <vector>
#include <csignal>
/**
* Display usage message
*
* Displays help on command line arguments.
*/
static void usage(void);
/**
* Parse command line.
*
* Recognized command line arguments are parsed and added to the options instance. This options instance will be
* passed to the session instance.
*
* @param argc Command line argument count.
* @param argv Command line argument values.
*/
static void parse_arguments(int argc, char *argv[]);
/**
* On broker connection callback.
*
* @param bev Pointer to bufferevent internal control structure.
* @param event The bufferevent event code, on success this will be EV_EVENT_CONNECTED, it can also be one of
* EV_EVENT_EOF, EV_EVENT_ERROR, or EV_EVENT_TIMEOUT.
* @param arg Pointer to user data passed in when callback is set. Here this should be a pointer to the event_base
* object.
*/
static void connect_event_cb(struct bufferevent *bev, short event, void *arg);
/**
* Socket close handler.
*
* When the session is closing this function is set to be called when the write buffer is empty. When called it will
* close the connection and exit the event loop.
*
* @param bev Pointer to the bufferevent internal control structure.
* @param arg Pointer to user data passed in when callback is set. Here this should be a pointer to the event_base
* object.
*/
static void close_cb(struct bufferevent *bev, void *arg);
/**
* Callback run when SIGINT or SIGTERM is attached, will cleanly exit.
*
* @param signal Integer value of signal.
* @param event Should be EV_SIGNAL.
* @param arg Pointer originally passed to evsignal_new.
*/
static void signal_cb(evutil_socket_t, short event, void *);
/**
* Options settable through command line arguments.
*/
struct options_t {
/** Broker host to connect to. DNS name is allowed. */
std::string broker_host = "localhost";
/** Broker port. */
uint16_t broker_port = 1883;
/** Client id. If empty no client id will be sent. The broker will generate a client id automatically. */
std::string client_id;
/** Topics to subscribe to. This option can be specified more than once to subscribe to multiple topics. */
std::vector<std::string> topics;
/** Quality of service specifier for message. */
QoSType qos = QoSType::QoS0;
/**
* Drop this session on exit if true. If false this session will persist in the broker after disconnection
* and will be continued at the next connection with the same client id. This feature isn't useful when a client
* id is not provided.
*/
bool clean_session = false;
} options;
/**
* Session class specialized for this publishing client.
*
* Override base class control packet handlers used to subscribe to topics and to receive messages forwarded from the
* broker. Any other control packets received will be handled by default methods, in most cases that will result in a
* thrown exception.
*/
class ClientSession : public BaseSession {
public:
/**
* Constructor
*
* @param bev Pointer to the buffer event structure for the broker connection.
* @param options Options structure.
*/
ClientSession(bufferevent *bev, const options_t &options) : BaseSession(bev), options(options) {}
/** Reference to the options structure with members updated from command line arguments. */
const options_t &options;
/**
* Handle a connack control packet from the broker.
*
* This packet will be sent in response to the connect packet that this client will send. Check the return code
* and disconnect on any error. Otherwise construct a subscribe packet from the options and send it to the broker.
*
* @param connack_packet The connack control packet received.
*/
void handle_connack(const ConnackPacket &connack_packet) override {
SubscribePacket subscribe_packet;
subscribe_packet.packet_id = packet_manager->next_packet_id();
for (auto topic : options.topics) {
subscribe_packet.subscriptions.push_back(Subscription{topic, options.qos});
}
packet_manager->send_packet(subscribe_packet);
}
/**
* Handle a suback control packet from the broker.
*
* This packet will be sent in response to a subscribe message. Check the error code in the packet and if there is
* an error output an error message but continue to listen for messages. Also check the qos field in the suback
* packet and compare with the requested qos in the original subscribe. If there is a mismatch output a message.
*
* @param suback_packet The received suback control packet.
*/
void handle_suback(const SubackPacket &suback_packet) override {
for (size_t i = 0; i < suback_packet.return_codes.size(); i++) {
SubackPacket::ReturnCode code = suback_packet.return_codes[i];
if (code == SubackPacket::ReturnCode::Failure) {
std::cout << "Subscription to topic " << options.topics[i] << "failed\n";
} else if (static_cast<QoSType>(code) != options.qos) {
std::cout << "Topic " << options.topics[i] << " requested qos " << static_cast<int>(options.qos)
<< " subscribed "
<< static_cast<int>(code) << "\n";
}
}
}
/**
* Handle a publish control packet from the broker.
*
* This packet will contain forwarded messages from other clients published to the topic. Output the message and
* acknowledge based on the QoS in the packet. In the case of a QoS 0 message there is no acknowledegment.
* A Qos 1 message will require a Puback packet in response. A QoS 2 message requires a Pubrec packet from the
* subscriber followed by a Pubrel packet from the broker and finally a Pubcomp packet is sent from the subscriber.
*
* @param publish_packet the received publish packet
*/
void handle_publish(const PublishPacket &publish_packet) override {
std::cout << std::string(publish_packet.message_data.begin(), publish_packet.message_data.end()) << "\n";
if (publish_packet.qos() == QoSType::QoS1) {
PubackPacket puback_packet;
puback_packet.packet_id = publish_packet.packet_id;
packet_manager->send_packet(puback_packet);
} else if (publish_packet.qos() == QoSType::QoS2) {
PubrecPacket pubrec_packet;
pubrec_packet.packet_id = publish_packet.packet_id;
packet_manager->send_packet(pubrec_packet);
}
}
/**
* Handle a pubrel control packet from the broker.
*
* This packet will be sent in repsponse to a Pubrel packet in the QoS 2 protocol flow. Send a Pubcomp packet
* in response. This completes the QoS 2 flow.
*
* @param pubrel_packet The received pubrel control packet.
*/
void handle_pubrel(const PubrelPacket &pubrel_packet) override {
PubcompPacket pubcomp_packet;
pubcomp_packet.packet_id = pubrel_packet.packet_id;
packet_manager->send_packet(pubcomp_packet);
}
/**
* Handle protocol events in the packet manager.
*
* This callback will be called by the packet manager in case of protocol or network error. In most cases the
* response is to close the connection to the broker.
*
* @param event The specific type of the event reported by the packet manager.
*/
void packet_manager_event(PacketManager::EventType event) override {
event_base_loopexit(packet_manager->bev->ev_base, NULL);
BaseSession::packet_manager_event(event);
}
};
/**
* The session instance.
*
* MQTT requires that both the client and server maintain a session state.
*/
std::unique_ptr<ClientSession> session;
int main(int argc, char *argv[]) {
struct event_base *evloop;
struct event *signal_event;
struct evdns_base *dns_base;
struct bufferevent *bev;
parse_arguments(argc, argv);
evloop = event_base_new();
if (!evloop) {
std::cerr << "Could not initialize libevent\n";
std::exit(1);
}
signal_event = evsignal_new(evloop, SIGINT, signal_cb, evloop);
evsignal_add(signal_event, NULL);
signal_event = evsignal_new(evloop, SIGTERM, signal_cb, evloop);
evsignal_add(signal_event, NULL);
dns_base = evdns_base_new(evloop, 1);
bev = bufferevent_socket_new(evloop, -1, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, NULL, NULL, connect_event_cb, evloop);
bufferevent_socket_connect_hostname(bev, dns_base, AF_UNSPEC, options.broker_host.c_str(), options.broker_port);
evdns_base_free(dns_base, 0);
event_base_dispatch(evloop);
event_free(signal_event);
event_base_free(evloop);
}
static void connect_event_cb(struct bufferevent *bev, short events, void *arg) {
if (events & BEV_EVENT_CONNECTED) {
session = std::unique_ptr<ClientSession>(new ClientSession(bev, options));
ConnectPacket connect_packet;
connect_packet.client_id = options.client_id;
connect_packet.clean_session(options.clean_session);
session->packet_manager->send_packet(connect_packet);
} else if (events & (BEV_EVENT_ERROR | BEV_EVENT_EOF)) {
struct event_base *base = static_cast<struct event_base *>(arg);
if (events & BEV_EVENT_ERROR) {
int err = bufferevent_socket_get_dns_error(bev);
if (err)
std::cerr << "DNS error: " << evutil_gai_strerror(err) << "\n";
}
bufferevent_free(bev);
event_base_loopexit(base, NULL);
}
}
void usage() {
std::cout <<
R"END(usage: mqtt_client_sub [OPTIONS]
Connect to an MQTT broker and publish a single message to a single topic.
OPTIONS
--broker-host | -b Broker host name or ip address, default localhost
--broker-port | -p Broker port, default 1883
--client-id | -i Client id, default none
--topic | -t Topic string to subscribe to, this option can be provided more that once to subscribe
to multiple topics, default none
--qos | -q QoS (Quality of Service), should be 0, 1, or 2, default 0
--clean-session | -c Disable session persistence, default false
--help | -h Display this message and exit
)END";
}
void parse_arguments(int argc, char *argv[]) {
static struct option longopts[] = {
{"broker-host", required_argument, NULL, 'b'},
{"broker-port", required_argument, NULL, 'p'},
{"client-id", required_argument, NULL, 'i'},
{"topic", required_argument, NULL, 't'},
{"qos", required_argument, NULL, 'q'},
{"clean-session", no_argument, NULL, 'c'},
{"help", no_argument, NULL, 'h'}
};
int ch;
while ((ch = getopt_long(argc, argv, "b:p:i:t:q:ch", longopts, NULL)) != -1) {
switch (ch) {
case 'b':
options.broker_host = optarg;
break;
case 'p':
options.broker_port = static_cast<uint16_t>(atoi(optarg));
break;
case 'i':
options.client_id = optarg;
break;
case 't':
options.topics.push_back(optarg);
break;
case 'q':
options.qos = static_cast<QoSType>(atoi(optarg));
break;
case 'c':
options.clean_session = true;
break;
case 'h':
usage();
std::exit(0);
default:
usage();
std::exit(1);
}
}
}
static void close_cb(struct bufferevent *bev, void *arg) {
event_base *base = static_cast<event_base *>(arg);
event_base_loopexit(base, NULL);
}
static void signal_cb(evutil_socket_t fd, short event, void *arg) {
DisconnectPacket disconnect_packet;
session->packet_manager->send_packet(disconnect_packet);
bufferevent_disable(session->packet_manager->bev, EV_READ);
bufferevent_setcb(session->packet_manager->bev, NULL, close_cb, NULL, arg);
}

View File

@ -0,0 +1,509 @@
/**
* @file packet.cc
*/
#include "packet.h"
#include "client_id.h"
#include <cassert>
void Packet::read_fixed_header(PacketDataReader &reader) {
uint8_t command_header = reader.read_byte();
type = static_cast<PacketType>(command_header >> 4);
header_flags = command_header & 0x0F;
size_t remaining_length = reader.read_remaining_length();
if (remaining_length != reader.get_packet_data().size() - reader.get_offset()) {
throw std::exception();
}
}
ConnectPacket::ConnectPacket(const packet_data_t &packet_data) {
PacketDataReader reader(packet_data);
read_fixed_header(reader);
if (type != PacketType::Connect) {
throw std::exception();
}
if (header_flags != 0) {
throw std::exception();
}
protocol_name = reader.read_string();
protocol_level = reader.read_byte();
connect_flags = reader.read_byte();
keep_alive = reader.read_uint16();
client_id = reader.read_string();
if (client_id.empty()) {
client_id = generate_client_id();
}
if (will_flag()) {
will_topic = reader.read_string();
will_message = reader.read_bytes();
}
if (username_flag()) {
username = reader.read_string();
}
if (password_flag()) {
password = reader.read_bytes();
}
}
packet_data_t ConnectPacket::serialize() const {
packet_data_t packet_data;
PacketDataWriter writer(packet_data);
writer.write_byte((static_cast<uint8_t>(type) << 4) | (header_flags & 0x0F));
size_t remaining_length = 2 + protocol_name.size();
remaining_length += 1; // protocol_level
remaining_length += 1; // connect_flags
remaining_length += 2; // keep_alive
remaining_length += 2 + client_id.size();
if (will_flag()) {
remaining_length += 2 + will_topic.size();
remaining_length += 2 + will_message.size();
}
if (username_flag()) {
remaining_length += 2 + username.size();
}
if (password_flag()) {
remaining_length += 2 + password.size();
}
writer.write_remaining_length(remaining_length);
writer.write_string(protocol_name);
writer.write_byte(protocol_level);
writer.write_byte(connect_flags);
writer.write_uint16(keep_alive);
writer.write_string(client_id);
if (will_flag()) {
writer.write_string(will_topic);
writer.write_bytes(will_message);
}
if (username_flag()) {
writer.write_string(username);
}
if (password_flag()) {
writer.write_bytes(password);
}
return packet_data;
}
ConnackPacket::ConnackPacket(const packet_data_t &packet_data) {
PacketDataReader reader(packet_data);
read_fixed_header(reader);
if (type != PacketType::Connack) {
throw std::exception();
}
if (header_flags != 0) {
throw std::exception();
}
acknowledge_flags = reader.read_byte();
return_code = static_cast<ReturnCode>(reader.read_byte());
}
packet_data_t ConnackPacket::serialize() const {
packet_data_t packet_data;
PacketDataWriter writer(packet_data);
writer.write_byte((static_cast<uint8_t>(type) << 4) | (header_flags & 0x0F));
writer.write_remaining_length(2);
writer.write_byte(acknowledge_flags);
writer.write_byte(static_cast<uint8_t>(return_code));
return packet_data;
}
PublishPacket::PublishPacket(const packet_data_t &packet_data) {
PacketDataReader reader(packet_data);
read_fixed_header(reader);
if (type != PacketType::Publish) {
throw std::exception();
}
topic_name = reader.read_string();
if (qos() != QoSType::QoS0) {
packet_id = reader.read_uint16();
}
size_t payload_len = packet_data.size() - reader.get_offset();
message_data = reader.read_bytes(payload_len);
}
packet_data_t PublishPacket::serialize() const {
packet_data_t packet_data;
PacketDataWriter writer(packet_data);
writer.write_byte((static_cast<uint8_t>(type) << 4) | (header_flags & 0x0F));
uint16_t remaining_length = 2 + topic_name.size() + message_data.size();
if (qos() != QoSType::QoS0) {
remaining_length += 2;
}
writer.write_remaining_length(remaining_length);
writer.write_string(topic_name);
if (qos() != QoSType::QoS0) {
writer.write_uint16(packet_id);
}
for (size_t i = 0; i < message_data.size(); i++) {
writer.write_byte(message_data[i]);
}
return packet_data;
}
PubackPacket::PubackPacket(const packet_data_t &packet_data) {
PacketDataReader reader(packet_data);
read_fixed_header(reader);
if (type != PacketType::Puback) {
throw std::exception();
}
if (header_flags != 0) {
throw std::exception();
}
packet_id = reader.read_uint16();
}
packet_data_t PubackPacket::serialize() const {
packet_data_t packet_data;
PacketDataWriter writer(packet_data);
writer.write_byte((static_cast<uint8_t>(type) << 4) | (header_flags & 0x0F));
writer.write_remaining_length(2);
writer.write_uint16(packet_id);
return packet_data;
}
PubrecPacket::PubrecPacket(const packet_data_t &packet_data) {
PacketDataReader reader(packet_data);
read_fixed_header(reader);
if (type != PacketType::Pubrec) {
throw std::exception();
}
if (header_flags != 0) {
throw std::exception();
}
packet_id = reader.read_uint16();
}
packet_data_t PubrecPacket::serialize() const {
packet_data_t packet_data;
PacketDataWriter writer(packet_data);
writer.write_byte((static_cast<uint8_t>(type) << 4) | (header_flags & 0x0F));
writer.write_remaining_length(2);
writer.write_uint16(packet_id);
return packet_data;
}
PubrelPacket::PubrelPacket(const packet_data_t &packet_data) {
PacketDataReader reader(packet_data);
read_fixed_header(reader);
if (type != PacketType::Pubrel) {
throw std::exception();
}
if (header_flags != 0x02) {
throw std::exception();
}
packet_id = reader.read_uint16();
}
packet_data_t PubrelPacket::serialize() const {
packet_data_t packet_data;
PacketDataWriter writer(packet_data);
writer.write_byte((static_cast<uint8_t>(type) << 4) | (header_flags & 0x0F));
writer.write_remaining_length(2);
writer.write_uint16(packet_id);
return packet_data;
}
PubcompPacket::PubcompPacket(const packet_data_t &packet_data) {
PacketDataReader reader(packet_data);
read_fixed_header(reader);
if (type != PacketType::Pubcomp) {
throw std::exception();
}
if (header_flags != 0) {
throw std::exception();
}
packet_id = reader.read_uint16();
}
packet_data_t PubcompPacket::serialize() const {
packet_data_t packet_data;
PacketDataWriter writer(packet_data);
writer.write_byte((static_cast<uint8_t>(type) << 4) | (header_flags & 0x0F));
writer.write_remaining_length(2);
writer.write_uint16(packet_id);
return packet_data;
}
SubscribePacket::SubscribePacket(const packet_data_t &packet_data) {
PacketDataReader reader(packet_data);
read_fixed_header(reader);
if (type != PacketType::Subscribe) {
throw std::exception();
}
if (header_flags != 0x02) {
throw std::exception();
}
packet_id = reader.read_uint16();
do {
std::string topic = reader.read_string();
QoSType qos = static_cast<QoSType>(reader.read_byte());
// TODO use emplace_back
subscriptions.push_back(Subscription{topic, qos});
} while (!reader.empty());
}
packet_data_t SubscribePacket::serialize() const {
packet_data_t packet_data;
PacketDataWriter writer(packet_data);
writer.write_byte((static_cast<uint8_t>(type) << 4) | (header_flags & 0x0F));
size_t remaining_length = 2;
for (auto subscription : subscriptions) {
remaining_length += 1 + 2 + std::string(subscription.topic_filter).size();
}
writer.write_remaining_length(remaining_length);
writer.write_uint16(packet_id);
for (auto subscription : subscriptions) {
writer.write_string(std::string(subscription.topic_filter));
writer.write_byte(static_cast<uint8_t>(subscription.qos));
}
return packet_data;
}
SubackPacket::SubackPacket(const packet_data_t &packet_data) {
PacketDataReader reader(packet_data);
read_fixed_header(reader);
if (type != PacketType::Suback) {
throw std::exception();
}
if (header_flags != 0) {
throw std::exception();
}
packet_id = reader.read_uint16();
do {
ReturnCode return_code = static_cast<ReturnCode>(reader.read_byte());
return_codes.push_back(return_code);
} while (!reader.empty());
}
packet_data_t SubackPacket::serialize() const {
packet_data_t packet_data;
PacketDataWriter writer(packet_data);
writer.write_byte((static_cast<uint8_t>(type) << 4) | (header_flags & 0x0F));
writer.write_remaining_length(2 + return_codes.size());
writer.write_uint16(packet_id);
for (auto return_code : return_codes) {
writer.write_byte(static_cast<uint8_t>(return_code));
}
return packet_data;
}
UnsubscribePacket::UnsubscribePacket(const packet_data_t &packet_data) {
PacketDataReader reader(packet_data);
read_fixed_header(reader);
if (type != PacketType::Unsubscribe) {
throw std::exception();
}
if (header_flags != 0x02) {
throw std::exception();
}
packet_id = reader.read_uint16();
do {
topics.push_back(reader.read_string());
} while (!reader.empty());
}
packet_data_t UnsubscribePacket::serialize() const {
packet_data_t packet_data;
PacketDataWriter writer(packet_data);
writer.write_byte((static_cast<uint8_t>(type) << 4) | (header_flags & 0x0F));
size_t topics_size = 0;
for (auto topic : topics) {
// string data + 2 byte length
topics_size += topic.size() + 2;
}
writer.write_remaining_length(2 + topics_size);
writer.write_uint16(packet_id);
for (auto topic : topics) {
writer.write_string(topic);
}
return packet_data;
}
UnsubackPacket::UnsubackPacket(const packet_data_t &packet_data) {
PacketDataReader reader(packet_data);
read_fixed_header(reader);
if (type != PacketType::Unsuback) {
throw std::exception();
}
if (header_flags != 0) {
throw std::exception();
}
packet_id = reader.read_uint16();
}
packet_data_t UnsubackPacket::serialize() const {
packet_data_t packet_data;
PacketDataWriter writer(packet_data);
writer.write_byte((static_cast<uint8_t>(type) << 4) | (header_flags & 0x0F));
writer.write_remaining_length(2);
writer.write_uint16(packet_id);
return packet_data;
}
PingreqPacket::PingreqPacket(const packet_data_t &packet_data) {
PacketDataReader reader(packet_data);
read_fixed_header(reader);
if (type != PacketType::Pingreq) {
throw std::exception();
}
if (header_flags != 0) {
throw std::exception();
}
}
packet_data_t PingreqPacket::serialize() const {
packet_data_t packet_data;
PacketDataWriter writer(packet_data);
writer.write_byte((static_cast<uint8_t>(type) << 4) | (header_flags & 0x0F));
writer.write_remaining_length(0);
return packet_data;
}
PingrespPacket::PingrespPacket(const packet_data_t &packet_data) {
PacketDataReader reader(packet_data);
read_fixed_header(reader);
if (type != PacketType::Pingresp) {
throw std::exception();
}
if (header_flags != 0) {
throw std::exception();
}
}
packet_data_t PingrespPacket::serialize() const {
packet_data_t packet_data;
PacketDataWriter writer(packet_data);
writer.write_byte((static_cast<uint8_t>(type) << 4) | (header_flags & 0x0F));
writer.write_remaining_length(0);
return packet_data;
}
DisconnectPacket::DisconnectPacket(const packet_data_t &packet_data) {
PacketDataReader reader(packet_data);
read_fixed_header(reader);
if (type != PacketType::Disconnect) {
throw std::exception();
}
if (header_flags != 0) {
throw std::exception();
}
}
packet_data_t DisconnectPacket::serialize() const {
packet_data_t packet_data;
PacketDataWriter writer(packet_data);
writer.write_byte((static_cast<uint8_t>(type) << 4) | (header_flags & 0x0F));
writer.write_remaining_length(0);
return packet_data;
}

View File

@ -0,0 +1,503 @@
/**
* @file packet.h
*
* Standard control packet classes.
*
* The MQTT 3.1.1 standard specifies the wire-level structure and operational behavior protocol control packets. This
* structure and some low level behavior is implemented here.
*
* Serialization of a control packet instance to wire format is accomplisted through instance serialization methods.
*
* Deserialization from the wire level is handled by a control packet constructor that accepts a octect sequence.
*
* Control packet instances also provide a default constructor that will create an instance using default values.
*/
#pragma once
#include "packet_data.h"
#include "topic.h"
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <iostream>
#include <string>
#include <cstdint>
#include <vector>
#include <functional>
#include <memory>
#include <cassert>
/**
* Enumeration constants for packet types.
*
* The interger values correspond to control packet type values as defined in the MQTT 3.1.1 standard.
*/
enum class PacketType {
Connect = 1,
Connack = 2,
Publish = 3,
Puback = 4,
Pubrec = 5,
Pubrel = 6,
Pubcomp = 7,
Subscribe = 8,
Suback = 9,
Unsubscribe = 10,
Unsuback = 11,
Pingreq = 12,
Pingresp = 13,
Disconnect = 14,
};
/**
* Enumeration constants for QoS values.
*/
enum class QoSType : uint8_t {
QoS0 = 0,
QoS1 = 1,
QoS2 = 2,
};
/**
* Subscription Class
*
* A subscription is composed of a TopicFilter and a QoS. Matching rules for topic filters differ from topic
* names.
*/
class Subscription {
public:
TopicFilter topic_filter;
QoSType qos;
};
/**
* Abstract base control packet class.
*
* Packet classes inherit this and extend as necessary. The serialize method implementation is required.
*/
class Packet {
public:
PacketType type;
uint8_t header_flags;
virtual ~Packet() {}
void read_fixed_header(PacketDataReader &);
// static Packet unserialize(const packet_data_t &);
virtual packet_data_t serialize(void) const = 0;
};
/**
* Connect control packet class
*/
class ConnectPacket : public Packet {
public:
ConnectPacket() {
type = PacketType::Connect;
header_flags = 0;
protocol_name = "MQIsdp";
protocol_level = 4;
}
ConnectPacket(const packet_data_t &packet_data);
packet_data_t serialize() const;
std::string protocol_name;
uint8_t protocol_level;
uint8_t connect_flags;
uint16_t keep_alive;
std::string client_id;
std::string will_topic;
std::vector<uint8_t> will_message;
std::string username;
std::vector<uint8_t> password;
bool clean_session() const {
return connect_flags & 0x02;
}
void clean_session(bool set) {
if (set) {
connect_flags |= 0x02;
} else {
connect_flags &= ~0x02;
}
}
bool will_flag() const {
return connect_flags & 0x04;
}
void will_flag(bool set) {
if (set) {
connect_flags |= 0x04;
} else {
connect_flags &= ~0x04;
}
}
QoSType qos() const {
return static_cast<QoSType >((connect_flags >> 3) & 0x03);
}
void qos(QoSType qos) {
connect_flags |= (static_cast<uint8_t>(qos) << 3);
}
bool will_retain() const {
return connect_flags & 0x20;
}
void will_retain(bool set) {
if (set) {
connect_flags |= 0x20;
} else {
connect_flags &= ~0x20;
}
}
bool password_flag() const {
return connect_flags & 0x40;
}
void password_flag(bool set) {
if (set) {
connect_flags |= 0x40;
} else {
connect_flags &= ~0x40;
}
}
bool username_flag() const {
return connect_flags & 0x80;
}
void username_flag(bool set) {
if (set) {
connect_flags |= 0x80;
} else {
connect_flags &= ~0x80;
}
}
};
/**
* Connack control packet class.
*/
class ConnackPacket : public Packet {
public:
ConnackPacket() {
type = PacketType::Connack;
header_flags = 0;
acknowledge_flags = 0;
}
ConnackPacket(const packet_data_t &packet_data);
enum class ReturnCode : uint8_t {
Accepted = 0x00,
UnacceptableProtocolVersion = 0x01,
IdentifierRejected = 0x02,
ServerUnavailable = 0x03,
BadUsernameOrPassword = 0x04,
NotAuthorized = 0x05
};
packet_data_t serialize() const;
uint8_t acknowledge_flags;
ReturnCode return_code;
bool session_present() const {
return acknowledge_flags & 0x01;
}
void session_present(bool set) {
if (set) {
acknowledge_flags |= 0x01;
} else {
acknowledge_flags &= ~0x01;
}
}
};
/**
* Publish control packet class.
*/
class PublishPacket : public Packet {
public:
PublishPacket() {
type = PacketType::Publish;
header_flags = 0;
}
PublishPacket(const packet_data_t &packet_data);
packet_data_t serialize() const;
std::string topic_name;
std::vector<uint8_t> message_data;
uint16_t packet_id;
bool dup() const {
return header_flags & 0x08;
}
void dup(bool set) {
if (set) {
header_flags |= 0x08;
} else {
header_flags &= ~0x08;
}
}
QoSType qos() const {
return static_cast<QoSType >((header_flags >> 1) & 0x03);
}
void qos(QoSType qos) {
header_flags |= static_cast<uint8_t>(qos) << 1;
}
bool retain() const {
return header_flags & 0x01;
}
void retain(bool set) {
if (set) {
header_flags |= 0x01;
} else {
header_flags &= ~0x01;
}
}
};
/**
* Puback control packet class.
*/
class PubackPacket : public Packet {
public:
PubackPacket() {
type = PacketType::Puback;
header_flags = 0;
}
PubackPacket(const packet_data_t &packet_data);
packet_data_t serialize() const;
uint16_t packet_id;
};
/**
* Pubrec control packet class.
*/
class PubrecPacket : public Packet {
public:
PubrecPacket() {
type = PacketType::Pubrec;
header_flags = 0;
}
PubrecPacket(const packet_data_t &packet_data);
packet_data_t serialize() const;
uint16_t packet_id;
};
/**
* Pubrel control packet class.
*/
class PubrelPacket : public Packet {
public:
PubrelPacket() {
type = PacketType::Pubrel;
header_flags = 0x02;
}
PubrelPacket(const packet_data_t &packet_data);
packet_data_t serialize() const;
uint16_t packet_id;
};
/**
* Pubcomp control packet class.
*/
class PubcompPacket : public Packet {
public:
PubcompPacket() {
type = PacketType::Pubcomp;
header_flags = 0;
}
PubcompPacket(const packet_data_t &packet_data);
packet_data_t serialize() const;
uint16_t packet_id;
};
/**
* Subscribe control packet class.
*/
class SubscribePacket : public Packet {
public:
SubscribePacket() {
type = PacketType::Subscribe;
header_flags = 0x02;
}
SubscribePacket(const packet_data_t &packet_data);
packet_data_t serialize() const;
uint16_t packet_id;
std::vector<Subscription> subscriptions;
};
/**
* Suback control packet class.
*/
class SubackPacket : public Packet {
public:
SubackPacket() {
type = PacketType::Suback;
header_flags = 0;
}
SubackPacket(const packet_data_t &packet_data);
packet_data_t serialize() const;
enum class ReturnCode : uint8_t {
SuccessQoS0 = 0x00,
SuccessQoS1 = 0x01,
SuccessQoS2 = 0x02,
Failure = 0x80
};
uint16_t packet_id;
std::vector<ReturnCode> return_codes;
};
/**
* Unsubscribe control packet class.
*/
class UnsubscribePacket : public Packet {
public:
UnsubscribePacket() {
type = PacketType::Unsubscribe;
header_flags = 0x02;
}
UnsubscribePacket(const packet_data_t &packet_data);
packet_data_t serialize() const;
uint16_t packet_id;
std::vector<std::string> topics;
};
/**
* Unsuback control packet class.
*/
class UnsubackPacket : public Packet {
public:
UnsubackPacket() {
type = PacketType::Unsuback;
header_flags = 0;
}
UnsubackPacket(const packet_data_t &packet_data);
packet_data_t serialize() const;
uint16_t packet_id;
};
/**
* Pingreq control packet class.
*/
class PingreqPacket : public Packet {
public:
PingreqPacket() {
type = PacketType::Pingreq;
header_flags = 0;
}
PingreqPacket(const packet_data_t &packet_data);
packet_data_t serialize() const;
};
/**
* Pingresp control packet class.
*/
class PingrespPacket : public Packet {
public:
PingrespPacket() {
type = PacketType::Pingresp;
header_flags = 0;
}
PingrespPacket(const packet_data_t &packet_data);
packet_data_t serialize() const;
};
/**
* Disconnect control packet class.
*/
class DisconnectPacket : public Packet {
public:
DisconnectPacket() {
type = PacketType::Disconnect;
header_flags = 0;
}
DisconnectPacket(const packet_data_t &packet_data);
packet_data_t serialize() const;
};

View File

@ -0,0 +1,130 @@
/**
* @file packet_data.cc
*/
#include "packet_data.h"
#include <iostream>
void PacketDataWriter::write_remaining_length(size_t length) {
if (length > 127 + 128*127 + 128*128*127 + 128*128*128*127) {
throw std::exception();
}
do {
uint8_t encoded_byte = length % 0x80;
length >>= 7;
if (length > 0) {
encoded_byte |= 0x80;
}
packet_data.push_back(encoded_byte);
} while (length > 0);
}
void PacketDataWriter::write_byte(uint8_t byte) {
packet_data.push_back(byte);
}
void PacketDataWriter::write_uint16(uint16_t word) {
packet_data.push_back((word >> 8) & 0xFF);
packet_data.push_back(word & 0xFF);
}
void PacketDataWriter::write_string(const std::string &s) {
write_uint16(s.size());
std::copy(s.begin(), s.end(), std::back_inserter(packet_data));
}
void PacketDataWriter::write_bytes(const packet_data_t & b) {
write_uint16(b.size());
std::copy(b.begin(), b.end(), std::back_inserter(packet_data));
}
bool PacketDataReader::has_remaining_length() {
size_t remaining = std::min<size_t>(packet_data.size() - offset, 4);
for (size_t i=offset; i<offset+remaining; i++) {
if ((packet_data[i] & 0x80) == 0) {
return true;
}
}
return false;
}
size_t PacketDataReader::read_remaining_length() {
size_t starting_offset = offset;
size_t value = 0;
int multiplier = 1;
do {
uint8_t encoded_byte = packet_data[offset++];
value += (encoded_byte & 0x7F) * multiplier;
if ((encoded_byte & 0x80) == 0) {
break;
}
multiplier <<= 7;
if (offset - starting_offset == 4) {
throw std::exception();
}
} while (1);
return value;
}
uint8_t PacketDataReader::read_byte() {
if (offset > packet_data.size()) {
throw std::exception();
}
return packet_data[offset++];
}
uint16_t PacketDataReader::read_uint16() {
if (offset + 2 > packet_data.size()) {
throw std::exception();
}
uint8_t msb = packet_data[offset++];
uint8_t lsb = packet_data[offset++];
return (msb << 8) + lsb;
}
std::string PacketDataReader::read_string() {
uint16_t len = read_uint16();
if (offset + len > packet_data.size()) {
throw std::exception();
}
std::string s(&packet_data[offset], &packet_data[offset + len]);
offset += len;
return s;
}
std::vector<uint8_t> PacketDataReader::read_bytes() {
uint16_t len = read_uint16();
if (offset + len > packet_data.size()) {
throw std::exception();
}
std::vector<uint8_t> v(&packet_data[offset], &packet_data[offset + len]);
offset += len;
return v;
}
std::vector<uint8_t> PacketDataReader::read_bytes(size_t len) {
if (offset + len > packet_data.size()) {
throw std::exception();
}
std::vector<uint8_t> v(&packet_data[offset], &packet_data[offset + len]);
offset += len;
return v;
}
bool PacketDataReader::empty() {
return offset == packet_data.size();
}

View File

@ -0,0 +1,171 @@
/**
* @file packet_data.h
*
* Utility classes supporting serialization/deserialization of MQTT control packets.
*/
#pragma once
#include <string>
#include <cstdint>
#include <cstddef>
#include <vector>
/** Typedef for packet data container. */
typedef std::vector<uint8_t> packet_data_t;
/**
* Serialization class.
*
* Methods are provided to write native types to the MQTT 3.1.1 standard wire format.
*/
class PacketDataWriter
{
public:
/**
* Constructor
*
* Accepts a reference to a packet_data_t container. This container will be filled with serialized data and
* can be sent directly through the network connection.
*
* @param packet_data A reference to a packet_data_t container.
*/
PacketDataWriter(packet_data_t & packet_data) : packet_data(packet_data) {
packet_data.resize(0);
}
/**
* Write the integer length to the container using the MQTT 3.1.1 remaining length encoding scheme.
*
* @param length The value to encode.
*/
void write_remaining_length(size_t length);
/** Write a byte to the packet_data_t container. */
void write_byte(uint8_t byte);
/** Write a 16 bit value to the packet_data_t container. */
void write_uint16(uint16_t word);
/** Write a UTF-8 character string to the packet_data_t container. */
void write_string(const std::string & s);
/** Copy the contents of a packet_data_t container into this instance's container. */
void write_bytes(const packet_data_t & b);
private:
/** A reference to the packet_data_t container. */
packet_data_t & packet_data;
};
/**
* Deserialization class.
*
* Methods are provided to read native types from the wire encoded control packets received over the network
* connection. MQTT 3.1.1 standard decoding methods are implemented.
*/
class PacketDataReader
{
public:
/**
* Constructor
*
* Accepts a reference to a packet_data_t container that contains data received directly over a network connection.
* The class also contains a current offset pointer that is initialized to point to the beginning of the container.
* Each data read will advance the offset pointer forward to the next item in the container.
*
* @param packet_data A reference to a packet_data_t container.
*/
PacketDataReader(const packet_data_t & packet_data) : offset(0), packet_data(packet_data) {}
/**
* Is a remaing lenght value present.
*
* Primitive function indicating a valid remaining_length value can be read from the current position in the
* packet_data_t container. The remaining length value is encoded in a variable sequence from 1 to 4 bytes.
*
* @return valid remaining length.
*/
bool has_remaining_length();
/**
* Read the remaining length value from the packet_data_t container.
*
* @return integer.
*/
size_t read_remaining_length();
/**
* Read a single byte from the packet_data_t container.
*
* @return byte.
*/
uint8_t read_byte();
/**
* Read a 16 bit value from the packet_data_t container.
*
* @return 16 bit integer.
*/
uint16_t read_uint16();
/**
* Read a UTF-8 encoded string from the packet_data_t container.
*
* @return character string.
*/
std::string read_string();
/**
* Read a byte sequence from the packet_data_t container.
*
* The sequence is assumed to be encoded according to the MQTT 3.1.1 standard wire format for a byte sequence.
* The sequence length is encoded along with the data.
*
* @return byte seqence.
*/
std::vector<uint8_t> read_bytes();
/**
* Read a byte sequence from the packet_data_t contianer.
*
* The number of bytes read is passed as an argument to the method.
*
* @param len Lenght of the sequence to read.
* @return Byte sequence
*/
std::vector<uint8_t> read_bytes(size_t len);
/**
* Is the packet_data_t container empty.
*
* @return empty.
*/
bool empty();
/**
* Return the current packet_data_t container offset pointer.
*
* @return integer.
*/
size_t get_offset() { return offset; }
/**
* Get a reference to the packet_data_t container.
*
* @return Reference to packet_data_t container.
*/
const packet_data_t & get_packet_data() { return packet_data; }
private:
/** Current packet_data_t container offset pointer */
size_t offset;
/** Packet data container. */
const packet_data_t & packet_data;
};

View File

@ -0,0 +1,161 @@
/**
* @file packet_manager.cc
*/
#include "packet_manager.h"
#include "packet.h"
#include <event2/buffer.h>
#include <evdns.h>
#include <cstring>
void PacketManager::receive_packet_data() {
struct evbuffer *input = bufferevent_get_input(bev);
while (evbuffer_get_length(input) != 0) {
size_t available = evbuffer_get_length(input);
if (available < 2) {
return;
}
if (fixed_header_length == 0) {
size_t peek_size = std::min<uint8_t>(static_cast<uint8_t>(available), 5);
std::vector<uint8_t> peek_buffer(peek_size);
evbuffer_copyout(input, &peek_buffer[0], peek_size);
PacketDataReader reader(peek_buffer);
reader.read_byte();
if (!reader.has_remaining_length()) {
if (peek_size == 5) {
if (event_handler) {
event_handler(EventType::ProtocolError);
}
evbuffer_drain(input, peek_size);
}
return;
}
remaining_length = reader.read_remaining_length();
fixed_header_length = reader.get_offset();
}
size_t packet_size = fixed_header_length + remaining_length;
if (available < packet_size) {
return;
}
packet_data_t packet_data(packet_size);
evbuffer_remove(input, &packet_data[0], packet_size);
fixed_header_length = 0;
remaining_length = 0;
std::unique_ptr<Packet> packet = parse_packet_data(packet_data);
if (packet && packet_received_handler) {
packet_received_handler(std::move(packet));
}
}
}
std::unique_ptr<Packet> PacketManager::parse_packet_data(const std::vector<uint8_t> &packet_data) {
PacketType type = static_cast<PacketType>(packet_data[0] >> 4);
std::unique_ptr<Packet> packet;
try {
switch (type) {
case PacketType::Connect:
packet = std::unique_ptr<ConnectPacket>(new ConnectPacket(packet_data));
break;
case PacketType::Connack:
packet = std::unique_ptr<ConnackPacket>(new ConnackPacket(packet_data));
break;
case PacketType::Publish:
packet = std::unique_ptr<PublishPacket>(new PublishPacket(packet_data));
break;
case PacketType::Puback:
packet = std::unique_ptr<PubackPacket>(new PubackPacket(packet_data));
break;
case PacketType::Pubrec:
packet = std::unique_ptr<PubrecPacket>(new PubrecPacket(packet_data));
break;
case PacketType::Pubrel:
packet = std::unique_ptr<PubrelPacket>(new PubrelPacket(packet_data));
break;
case PacketType::Pubcomp:
packet = std::unique_ptr<PubcompPacket>(new PubcompPacket(packet_data));
break;
case PacketType::Subscribe:
packet = std::unique_ptr<SubscribePacket>(new SubscribePacket(packet_data));
break;
case PacketType::Suback:
packet = std::unique_ptr<SubackPacket>(new SubackPacket(packet_data));
break;
case PacketType::Unsubscribe:
packet = std::unique_ptr<UnsubscribePacket>(new UnsubscribePacket(packet_data));
break;
case PacketType::Unsuback:
packet = std::unique_ptr<UnsubackPacket>(new UnsubackPacket(packet_data));
break;
case PacketType::Pingreq:
packet = std::unique_ptr<PingreqPacket>(new PingreqPacket(packet_data));
break;
case PacketType::Pingresp:
packet = std::unique_ptr<PingrespPacket>(new PingrespPacket(packet_data));
break;
case PacketType::Disconnect:
packet = std::unique_ptr<DisconnectPacket>(new DisconnectPacket(packet_data));
break;
}
} catch (std::exception &e) {
if (event_handler) {
event_handler(EventType::ProtocolError);
}
}
return packet;
}
void PacketManager::send_packet(const Packet &packet) {
std::vector<uint8_t> packet_data = packet.serialize();
if (bev) {
bufferevent_write(bev, &packet_data[0], packet_data.size());
} else {
std::cout << "not writing to closed bev\n";
}
}
void PacketManager::close_connection() {
if (bev) {
evutil_socket_t fd = bufferevent_getfd(bev);
evutil_closesocket(fd);
bufferevent_free(bev);
bev = nullptr;
}
}
void PacketManager::handle_events(short events) {
if (events & BEV_EVENT_EOF) {
if (event_handler) {
event_handler(EventType::ConnectionClosed);
}
} else if (events & BEV_EVENT_ERROR) {
if (event_handler) {
event_handler(EventType::NetworkError);
}
} else if (events & BEV_EVENT_TIMEOUT) {
if (event_handler) {
event_handler(EventType::Timeout);
}
}
}

View File

@ -0,0 +1,225 @@
/**
* @file packet_manager.h
*
* Manage low level network communications.
*
* The PacketManager is responsible for sending and receiving MQTT control packets across the network connection. A
* PacketManager instance is installed into every BaseSession and can be moved between sessions to implement
* session persistance.
*
* MQTT control packets received by the PacketManager and network events are forwared to containing sessions through
* callbacks. Session instances control the packet manager by invoking its methods directly.
*/
#pragma once
#include "packet.h"
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <iostream>
#include <vector>
#include <cstddef>
#include <memory>
/**
* PacketManager class.
*
* Manage low level network operations and invoke callbacks on MQTT control packet reception or network event.
*/
class PacketManager {
public:
/**
* Enumeration constants for PacketManager events.
*
* Events are low level network events or unrecoverable protocol errors.
*
*/
enum class EventType {
NetworkError,
ProtocolError,
ConnectionClosed,
Timeout,
};
/**
* Constructor
*
* The PacketManager constructor accepts a pointer to a libevent bufferevent internal structure. Callbacks are
* installed for network data received and network events. A pointer to this PacketManager instance is passed
* as the user data argument. It will be used to invoke instance methods from the static callback wrapper.
*
* @param bev Pointer to a bufferevent control structure.
*/
PacketManager(struct bufferevent *bev) : bev(bev) {
bufferevent_setcb(bev, input_ready, NULL, network_event, this);
bufferevent_enable(bev, EV_READ);
}
/**
* Destructor
*
* Will free the bufferevent pointer. This call should also close any underlying socket connection provided
* the libevent flag LEV_OPT_CLOSE_ON_FREE was used to create the bufferevent.
*/
~PacketManager() {
if (bev) {
bufferevent_free(bev);
bev = nullptr;
}
}
/**
* Send a control packet through the network connection.
*
* This method is invoked by containing session instances when they want to send a control packet. The packet
* will be serialized and transmitted provided the underlying socket connection is not closed.
*/
void send_packet(const Packet &);
/**
* Close the network connection.
*
* Explicitly close the network connection maintained by the bufferevent. This connection should also be closed
* when the destructor for this instance is run provided the bufferevent was created with the LEV_OPT_CLOSE_ON_FREE
* flag.
*/
void close_connection();
/**
* Set the packet received callback.
*
* This callback will be invoked when a packet is received from the network and deserialized. The callback will
* assume ownership of the packet memory. A pointer to the base packet type is passed and the actual packet can
* be recovered through a dynamic_cast<>().
*
* @param handler Callback function.
*/
void set_packet_received_handler(std::function<void(std::unique_ptr<Packet>)> handler) {
packet_received_handler = handler;
}
/**
* Set the network event callback.
*
* This callback will be invoked when a low level network or protocol error is detected. The type of event is
* indicated by the an EventType enumeration constant passed as an argument to the callback.
*
* @param handler Callback function.
*/
void set_event_handler(std::function<void(EventType)> handler) {
event_handler = handler;
}
/**
* Return the next available packet id in sequence.
*
* @return Next packet id.
*/
uint16_t next_packet_id() {
if (++packet_id == 0) {
++packet_id;
}
return packet_id;
}
/**
* Pointer to the contained libevent bufferevent internal control structure.
*/
struct bufferevent *bev;
private:
/**
* Data receiving method.
*
* This instance method is invoked from the static input_ready callback wrapper. It is run asynchronously
* whenever data is received from the network connection. The data will be buffered inside the bufferevent control
* structure until a complete control packet is received. At that point the packet will be deserialized and
* passed to any installed packet_received_handler callback.
*/
void receive_packet_data();
/**
* Libevent callback wrapper.
*
* Static method invoked by libevent C library. The callback method will be passed a pointer to a bufferevent
* control structure. This should be the same pointer that was originally passed to the PacketManager constructor.
* The method will also receive a pointer to the containing PacketManager instance which will be used to invoke the
* receive_packet_data instance method.
*
* @param bev Pointer to a bufferevent
* @param arg Pointer to user data installed with the callback. In this case it is a pointer to the containing
* PacketManager instance.
*/
static void input_ready(struct bufferevent *bev, void *arg) {
PacketManager *_this = static_cast<PacketManager *>(arg);
_this->receive_packet_data();
}
/**
* Network event callback.
*
* This instance method is invoked from the static network_event callback wrapper. It will be passed a set of
* flags indicating the type of network event that caused the invocation. The method will then deletegate to any
* installed event_handler callback passing the event type as the an EventType argument.
*
* @param events
*/
void handle_events(short events);
/**
* Libevent callback wrapper.
*
* The callback method will be passed a pointer to a bufferevent control structure. This should be the same
* pointer that was originally passed to the PacketManager constructor. The method will also receive a pointer
* to the containing PacketManager instance which will be used to invoke the handle_events instance method.
*
* @param bev Pointer to a bufferevent control structure.
* @param events Flags indicating the type of event that caused the callback to be invoked.
* @param arg Pointer to the containing PacketManager instance, installed along with the callback wrapper.
*/
static void network_event(struct bufferevent *bev, short events, void *arg) {
PacketManager *_this = static_cast<PacketManager *>(arg);
_this->handle_events(events);
}
/**
* Packet deserialization method.
*
* This method will receive a packet_data_t container when the receive_packet_data method has determined
* that data for a complete packet has been received. The packet will be deserialized and a reference counted
* pointer to the instantiated packet will be returned.
*
* @param packet_data Reference to a packet_data_t container.
* @return Reference counted pointer to Packet.
*/
std::unique_ptr<Packet> parse_packet_data(const packet_data_t &packet_data);
/**
* Packet received callback.
*
* Callback installed to be invoked by this PacketManager when an MQTT control packet is received from the network.
*/
std::function<void(std::unique_ptr<Packet>)> packet_received_handler;
/**
* Network event callback.
*
* Callback installed to be invoked by this PacketManager when a low level network or protocol error is detected.
*/
std::function<void(EventType)> event_handler;
/** Packet id counter. */
uint16_t packet_id = 0;
/** State variable used to determine when data for a complete control packet is available. */
size_t fixed_header_length = 0;
/** State variable used to determine when data for a complete control packet is available. */
size_t remaining_length = 0;
};

View File

@ -0,0 +1,47 @@
/**
* @file session_manager.cc
*/
#include "session_manager.h"
#include "broker_session.h"
#include "topic.h"
#include <memory>
#include <algorithm>
void SessionManager::accept_connection(struct bufferevent *bev) {
auto session = std::unique_ptr<BrokerSession>(new BrokerSession(bev, *this));
sessions.push_back(std::move(session));
}
std::list<std::unique_ptr<BrokerSession>>::iterator SessionManager::find_session(const std::string &client_id) {
return find_if(sessions.begin(), sessions.end(), [&client_id](const std::unique_ptr<BrokerSession> & s) {
return (!s->client_id.empty() and (s->client_id == client_id));
});
}
void SessionManager::erase_session(const std::string &client_id) {
sessions.erase(std::remove_if(sessions.begin(), sessions.end(), [&client_id](std::unique_ptr<BrokerSession> &s) {
return (!s->client_id.empty() and (s->client_id == client_id));
}), sessions.end());
}
void SessionManager::erase_session(const BrokerSession *session)
{
sessions.erase(std::remove_if(sessions.begin(), sessions.end(), [session](std::unique_ptr<BrokerSession> & s) {
return s.get() == session;
}), sessions.end());
}
void SessionManager::handle_publish(const PublishPacket & packet) {
for (auto &session : sessions) {
for (auto &subscription : session->subscriptions) {
if (topic_match(subscription.topic_filter, TopicName(packet.topic_name))) {
session->forward_packet(packet);
}
}
}
}

View File

@ -0,0 +1,85 @@
/**
* @file session_manager.h
*
* Manage BrokerSessions.
*
* The SessionManager maintains a contaier of all sessions in a broker. BrokerConnects are created in the session
* manager when a network session is accepted the new session is added to the sessions container. The session is then
* responsible for managing the MQTT protocol. The MQTT 3.1.1 standard requires that sessions
* can persist after a client disconnects and that any subscribe QoS 1 and Qo2 messages published while disconnected be
* delivered on reconnection.
*
* The SessionManager is responsible for forwarding published messages to all subscribing clients.
*/
#pragma once
#include <list>
#include <string>
#include <memory>
struct bufferevent;
class BrokerSession;
class PublishPacket;
/**
* SessionManager class
*
* Composes a container of broker sessions and methods to manage them.
*/
class SessionManager
{
public:
/**
* Accept a new network connection.
*
* Creates a new BrokerSession instance and adds it to the container of sessions. The session instance will manage
* the MQTT protocol.
*
* @param bev Pointer to a bufferevent
*/
void accept_connection(struct bufferevent * bev);
/**
* Find a session in the session container.
*
* @param client_id Unique client id to find.
* @return Iterator to BrokerSession.
*/
std::list<std::unique_ptr<BrokerSession>>::iterator find_session(const std::string & client_id);
/**
* Delete a session
*
* Given a pointer to a BrokerSession, finds that session in the session container and removes it from the
* container. The session instance will be deleted.
*
* @param session Pointer to a BrokerSession;
*/
void erase_session(const BrokerSession *session);
/**
* Finds a session in the session container with the given client id. If found the session is removed from the
* container. The session instance will be deleted.
*
* @param client_id A Client id.
*/
void erase_session(const std::string &client_id);
/**
* Forward a message to subsribed clients.
*
* Searches through each session and their subscriptions and invokes the forward_packet method on each session
* instance with a matching subscribed TopicFilter. The session will be responsible for Managing the MQTT publish
* protocol and correctly delivering the message to its subscribed client
*
* @param publish_packet Reference to a PublishPacket;
*/
void handle_publish(const PublishPacket & publish_packet);
/** Container of BrokerSessions. */
std::list<std::unique_ptr<BrokerSession>> sessions;
};

View File

@ -0,0 +1,131 @@
/**
* @file topic.cc
*/
#include "topic.h"
#include <regex>
TopicName::TopicName(const std::string & s) {
if (!is_valid(s)) {
throw std::exception();
}
name = s;
}
bool TopicName::is_valid(const std::string &s) const {
if (s.size() > MaxNameSize) {
return false;
}
if ((s.find('+') == std::string::npos) and (s.find('#') == std::string::npos)) {
return true;
}
return false;
}
TopicFilter::TopicFilter(const std::string &s) {
if (!is_valid(s)) {
throw std::exception();
}
filter = s;
}
bool TopicFilter::is_valid(const std::string &s) const {
if (s.size() > MaxFilterSize) {
return false;
}
size_t pos = 0;
for (char c : s) {
if (c == '+') {
if ((pos != 0 and s[pos - 1] != '/') or (pos + 1 != s.size() and s[pos + 1] != '/')) {
return false;
}
} else if (c == '#') {
if ((pos != 0 and s[pos - 1] != '/') or (pos + 1 != s.size())) {
return false;
}
}
pos++;
}
return true;
}
bool topic_match(const TopicFilter &filter, const TopicName &name) {
const std::string &f = filter.filter;
const std::string &n = name.name;
// empty strings don't match
if (f.empty() or n.empty()) {
return false;
}
size_t fpos = 0;
size_t npos = 0;
// Cannot match $ with wildcard
if ((f[fpos] == '$' and n[npos] != '$') or (f[fpos] != '$' and n[npos] == '$')) {
return false;
}
while (fpos < f.size() and npos < n.size()) {
// filter and topic name match at the current position
if (f[fpos] == n[npos]) {
// last character in the topic name
if (npos == n.size() - 1) {
// at the end of the topic name and the filter has a separator followed by a multi-level wildcard,
// causing a parent level match.
if ((fpos == f.size() - 3) and (f[fpos + 1] == '/') and (f[fpos + 2] == '#')) {
return true;
}
}
fpos++;
npos++;
// at the end of both the filter and topic name, match
if ((fpos == f.size()) && (npos == n.size())) {
return true;
// at the end of the topic name and the next character in the filter is wildcard.
} else if ((npos == n.size()) and (fpos == f.size() - 1) and (f[fpos] == '+')) {
return true;
}
} else {
if (f[fpos] == '+') {
fpos++;
while ((npos < n.size()) and (n[npos] != '/')) {
npos++;
}
if ((npos == n.size()) and (fpos == f.size())) {
return true;
}
} else if (f[fpos] == '#') {
return true;
} else {
return false;
}
}
}
return false;
}
bool topic_match(const TopicFilter &filter1, const TopicFilter &filter2) {
return filter1.filter == filter2.filter;
}

View File

@ -0,0 +1,141 @@
/**
* @file topic.h
*
* Classes for managing topic names and topic filters.
*
* The MQTT 3.1.1 standard allows structured topic names. It also defines rules for matching these names and provides
* wildcard characters to enhance matching rules.
*/
#pragma once
#include <string>
class TopicFilter;
/**
* Topic Name
*
* Topic names are UTF-8 encoded character strings. The have a structure imposed by the MQTT 3.1.1 standard. This
* class enforces that structure. Topic names differe from topic filters in that topic filters allow wildcard
* characters.
*
* This class friends the topic_match function.
*/
class TopicName {
public:
/** Maximum lenght of a topic name according the the MQTT 3.1.1 standard. */
const static size_t MaxNameSize = 65535;
/**
* Constructor
*
* The string will be validated against the MQTT topic name rules. An exception will be thrown if the name is
* invalid.
*
* @param name A reference to a the topic name string.
*/
TopicName(const std::string & name);
/**
* Validate the topic name against the MQTT 3.1.1 standard rules.
*
* @param name A name string.
* @return Topic name is valid.
*/
bool is_valid(const std::string & name) const;
/**
* Cast an instance of this class to a std::string.
*
* @return std::string
*/
operator std::string() const {return name;}
/** Matching friend function. */
friend bool topic_match(const TopicFilter &, const TopicName &);
private:
/** The name character string. */
std::string name;
};
/**
* Topic Filter
*
* Topic filters are composed of UTF-8 encoded character strings. The have a structure imposed by the MQTT 3.1.1
* standard including wildcard characters. This class enforces that structure.
*
* This class friends the topic_match function.
*/
class TopicFilter {
public:
/** Maximum lenght of a topic filter according the the MQTT 3.1.1 standard. */
const static size_t MaxFilterSize = 65535;
/**
* Constructor
*
* The string will be validated against the MQTT topic filter rules. An exception will be thrown if the filter is
* invalid.
*
* @param filter A reference to a the topic filter string.
*/
TopicFilter(const std::string & filter);
/**
* Validate the topic filter against the MQTT 3.1.1 standard rules.
*
* @param filter A filter string.
* @return Topic filter is valid.
*/
bool is_valid(const std::string & filter) const;
/**
* Cast an instance of this class to a std::string.
*
* @return std::string
*/
operator std::string() const {return filter;}
/** Matching friend function. */
friend bool topic_match(const TopicFilter &, const TopicName &);
/** Matching friend function. */
friend bool topic_match(const TopicFilter &, const TopicFilter &);
private:
/** The filter character string. */
std::string filter;
};
/**
* Match a TopicFilter against a TopicName.
*
* The MQTT 3.1.1 standard topic filter matching rules will be applied including wildcard characters.
*
* @param topic_filter A reference to a TopicFilter.
* @param topic_name A reference to a TopicName
* @return match
*/
bool topic_match(const TopicFilter & topic_filter, const TopicName & topic_name);
/**
* Match a TopicFilter against another TopicFilter.
*
* The MQTT 3.1.1 standard topic filter matching rules are applied. These are a direct character by character match.
* This function can be used when finding an existing subscription filter, in that case wildcard character matching
* does not apply.
*
* For example, the topic filters 'a/+/c' and 'a/#' both match the topic name 'a/b/c' but the two topic filters do not
* match.
*
* @param topic_filter A reference to a TopicFilter.
* @param topic_name A reference to a TopicName
* @return match
*/
bool topic_match(const TopicFilter & topic_filter, const TopicFilter & topic_name);

View File

@ -0,0 +1,66 @@
#include "mqtt_server.h"
void MQTTServer::initialize() {
evloop = event_base_new();
if (!evloop) {
printf("Could not initialize libevent\n");
return;
}
std::memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
evutil_inet_pton(sin.sin_family, bind_address.c_str(), &sin.sin_addr);
sin.sin_port = htons(port);
listener = evconnlistener_new_bind(evloop, MQTTServer::listener_cb,
(void *)this,
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1,
(struct sockaddr *)&sin, sizeof(sin));
if (!listener) {
std::cerr << "Could not create listener!\n";
return;
}
}
void MQTTServer::listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sa, int socklen, void *user_data) {
MQTTServer *server = static_cast<MQTTServer *>(user_data);
struct bufferevent *bev;
bev = bufferevent_socket_new(server->evloop, fd, BEV_OPT_CLOSE_ON_FREE);
if (!bev) {
std::cerr << "Error constructing bufferevent!\n";
event_base_loopbreak(server->evloop);
return;
}
server->session_manager->accept_connection(bev);
}
void MQTTServer::loop_once() {
event_base_dispatch(evloop);
}
MQTTServer::MQTTServer() {
bind_address = "0";
port = 1883;
session_manager = new SessionManager();
evloop = nullptr;
listener = nullptr;
}
MQTTServer::~MQTTServer() {
if (event_base_loopexit(evloop, NULL)) {
std::cerr << "failed to exit event loop\n";
}
evconnlistener_free(listener);
event_base_free(evloop);
delete session_manager;
}

View File

@ -0,0 +1,48 @@
#ifndef MQTT_SERVER_H
#define MQTT_SERVER_H
#include <cstdio>
#include <string>
#include <vector>
#include "./mqtt_broker/src/broker_session.h"
#include "./mqtt_broker/src/session_manager.h"
#include <event2/listener.h>
#include <getopt.h>
#include <csignal>
#include <cstring>
class MQTTServer {
public:
static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *addr, int socklen, void *arg);
void on_connect(int rc) { printf("on_connect\n"); }
void on_connect_with_flags(int rc, int flags) { printf("on_connect_with_flags\n"); }
void on_disconnect(int rc) { printf("on_disconnect\n"); }
void on_publish(int mid) { printf("on_publish\n"); }
void on_message(const struct mosquitto_message *message) { printf("on_message\n"); }
void on_subscribe(int mid, int qos_count, const int *granted_qos) { printf("on_subscribe\n"); }
void on_unsubscribe(int mid) { printf("on_unsubscribe\n"); }
void on_log(int level, const char *str) { printf("on_log\n"); }
void on_error() { printf("on_error\n"); }
void initialize();
void loop_once();
MQTTServer();
~MQTTServer();
SessionManager *session_manager;
std::string bind_address;
uint16_t port;
struct event_base *evloop;
struct evconnlistener *listener;
struct sockaddr_in sin;
};
#endif