diff --git a/HEADS b/HEADS index f5c1fe4..5e830fe 100644 --- a/HEADS +++ b/HEADS @@ -1 +1 @@ -{"engine": {"master": "84d86d8cbd936f05afb83d2ea442f0b5722bf460"}} \ No newline at end of file +{"engine": {"master": "3931dba9881119766946213b175cbc90e25b2e90"}} \ No newline at end of file diff --git a/SConstruct b/SConstruct index 6f9588f..a8ccf84 100644 --- a/SConstruct +++ b/SConstruct @@ -35,6 +35,11 @@ folders = [ 'app', ] +module_folders = [ + '../modules', + '../custom_modules', +] + main_file = 'main.cpp' repository_index = 0 @@ -339,6 +344,14 @@ if len(sys.argv) > 1: build_string += '" ' + build_string += 'module_folders="' + + for f in module_folders: + build_string += f + build_string += ';' + + build_string += '" ' + build_string += 'main_file="../' + main_file + '" ' for i in range(2, len(sys.argv)): diff --git a/app/ic_application.cpp b/app/ic_application.cpp index 1c4688b..2430eab 100644 --- a/app/ic_application.cpp +++ b/app/ic_application.cpp @@ -5,11 +5,11 @@ #include -#include "core/database_manager.h" +#include "core/database/database_manager.h" #include "core/file_cache.h" #include "core/handler_instance.h" #include "core/html_builder.h" -#include "core/query_result.h" +#include "core/database/query_result.h" #include "core/request.h" #include "core/utils.h" diff --git a/custom_modules/mqtt_server/SCsub b/custom_modules/mqtt_server/SCsub new file mode 100644 index 0000000..59b379f --- /dev/null +++ b/custom_modules/mqtt_server/SCsub @@ -0,0 +1,13 @@ +#!/usr/bin/env python + +Import("env_mod") +Import("env") + +env_mod.core_sources = [] + +env_mod.add_source_files(env_mod.core_sources, "*.cpp") +env_mod.add_source_files(env_mod.core_sources, "./mqtt_broker/src/*.cc") + +# Build it all as a library +lib = env_mod.add_library("mqtt_server", env_mod.core_sources) +env.Prepend(LIBS=[lib]) diff --git a/custom_modules/mqtt_server/__pycache__/detect.cpython-39.pyc b/custom_modules/mqtt_server/__pycache__/detect.cpython-39.pyc new file mode 100644 index 0000000..de03581 Binary files /dev/null and b/custom_modules/mqtt_server/__pycache__/detect.cpython-39.pyc differ diff --git a/custom_modules/mqtt_server/detect.py b/custom_modules/mqtt_server/detect.py new file mode 100644 index 0000000..210c397 --- /dev/null +++ b/custom_modules/mqtt_server/detect.py @@ -0,0 +1,46 @@ +import os +import platform +import sys + + +def is_active(): + return True + + +def get_name(): + return "mqtt_server" + + +def can_build(): + if os.name == "posix" or sys.platform == "darwin": + x11_error = os.system("pkg-config --version > /dev/null") + if x11_error: + return False + + libevent_err = os.system("pkg-config libevent --modversion --silence-errors > /dev/null ") + + if libevent_err: + print("libevent not found! MQTT server will not be available!") + return False + + print("libevent found! MQTT server will be available!") + + return True + + return False + + +def get_opts(): + return [] + +def get_flags(): + + return [] + + +def configure(env): + env.ParseConfig("pkg-config libevent --cflags --libs") + + env.Append(CPPDEFINES=["MQTT_SERVER_PRESENT"]) + + diff --git a/custom_modules/mqtt_server/libmqtt_server.a b/custom_modules/mqtt_server/libmqtt_server.a new file mode 100644 index 0000000..36f84cb Binary files /dev/null and b/custom_modules/mqtt_server/libmqtt_server.a differ diff --git a/custom_modules/mqtt_server/mqtt_broker/HEAD b/custom_modules/mqtt_server/mqtt_broker/HEAD new file mode 100644 index 0000000..3028a1b --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/HEAD @@ -0,0 +1 @@ +bea4d892540d329cf055a61339200b76001e191d \ No newline at end of file diff --git a/custom_modules/mqtt_server/mqtt_broker/LICENSE.txt b/custom_modules/mqtt_server/mqtt_broker/LICENSE.txt new file mode 100644 index 0000000..63b4b68 --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/LICENSE.txt @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) [year] [fullname] + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/custom_modules/mqtt_server/mqtt_broker/README.md b/custom_modules/mqtt_server/mqtt_broker/README.md new file mode 100644 index 0000000..5562f6d --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/README.md @@ -0,0 +1,93 @@ +# MQTT broker and clients + +## About + +The [MQTT (MQ Telemetry Transport) publish/subscribe +protocol](htts://mqtt.org) is a simple lightweight messaging protocol +for distributed network connected devices. It provides low overhead, +reliable connectivity for resource constrained devices. + +This is an open source, asynchronous, C++ implementation of the broker +(server) and connecting clients. The implementation follows the 3.1.1 +OASIS standard available +[here](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html). + +## Installation + +### Requirements + +Asynchronous networking support requires the +[Libevent](http://libevent.org) networking library. Other than that +there are no other external run-time dependencies. + +* A C++11 conformant compiler. +* [Libevent](http://libevent.org) +* [CMake](Cmhttps://cmake.org/) + +Verified platforms. + +* Ubuntu Linux 16.04 (gcc 5.4.0) +* Mac OSX 10.11 (llvm 7.3.0) + +### Building + +1. Clone this repository. +```` + $ git clone https://github.com/inyotech/mqtt_broker.git + $ cd mqtt_broker +```` + +2. Install the google tests framework +``` + $ pushd test/lib + $ git clone https://github.com/google/googletest.git + $ popd +``` + +3. Create a build directory. +```` + $ mkdir build + $ cd build +```` + +4. Generate build files. +```` + $ cmake .. +```` + +5. Build +```` + $ make +```` + +## Example + +* Open a terminal and execute the broker. +```` + $ mqtt_broker +```` + +* In a second terminal execute a subscriber. +```` + $ mqtt_client_sub --topic 'a/b/c' +```` + +* Execute a publisher in a third terminal. +```` + $ mqtt_client_pub --topic 'a/b/c' --message 'published message' +```` + +## Documentation + +[Doxygen](http://www.stack.nl/~dimitri/doxygen/) documentation is +available [here](https://inyotech.github.io/mqtt_broker). + +## License + +This software is licensed under the MIT License. See the LICENSE.TXT file for details. + +## TODO + +* Client Will not implemented. +* Retained message publication not implemented. +* SSL support not implemented. diff --git a/custom_modules/mqtt_server/mqtt_broker/src/base_session.cc b/custom_modules/mqtt_server/mqtt_broker/src/base_session.cc new file mode 100644 index 0000000..7570a63 --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/src/base_session.cc @@ -0,0 +1,115 @@ +/** + * @file base_session.cc + */ + +#include "base_session.h" + +#include + +void BaseSession::packet_received(std::unique_ptr packet) { + + switch (packet->type) { + case PacketType::Connect: + handle_connect(dynamic_cast(*packet)); + break; + case PacketType::Connack: + handle_connack(dynamic_cast(*packet)); + break; + case PacketType::Publish: + handle_publish(dynamic_cast(*packet)); + break; + case PacketType::Puback: + handle_puback(dynamic_cast(*packet)); + break; + case PacketType::Pubrec: + handle_pubrec(dynamic_cast(*packet)); + break; + case PacketType::Pubrel: + handle_pubrel(dynamic_cast(*packet)); + break; + case PacketType::Pubcomp: + handle_pubcomp(dynamic_cast(*packet)); + break; + case PacketType::Subscribe: + handle_subscribe(dynamic_cast(*packet)); + break; + case PacketType::Suback: + handle_suback(dynamic_cast(*packet)); + break; + case PacketType::Unsubscribe: + handle_unsubscribe(dynamic_cast(*packet)); + break; + case PacketType::Unsuback: + handle_unsuback(dynamic_cast(*packet)); + break; + case PacketType::Pingreq: + handle_pingreq(dynamic_cast(*packet)); + break; + case PacketType::Pingresp: + handle_pingresp(dynamic_cast(*packet)); + break; + case PacketType::Disconnect: + handle_disconnect(dynamic_cast(*packet)); + break; + } + +} + +void BaseSession::packet_manager_event(PacketManager::EventType event) { + packet_manager->close_connection(); +} + +void BaseSession::handle_connect(const ConnectPacket &) { + throw std::exception(); +} + +void BaseSession::handle_connack(const ConnackPacket &) { + throw std::exception(); +} + +void BaseSession::handle_publish(const PublishPacket &) { + throw std::exception(); +} + +void BaseSession::handle_puback(const PubackPacket &) { + throw std::exception(); +} + +void BaseSession::handle_pubrec(const PubrecPacket &) { + throw std::exception(); +} + +void BaseSession::handle_pubrel(const PubrelPacket &) { + throw std::exception(); +} + +void BaseSession::handle_pubcomp(const PubcompPacket &) { + throw std::exception(); +} + +void BaseSession::handle_subscribe(const SubscribePacket &) { + throw std::exception(); +} + +void BaseSession::handle_suback(const SubackPacket &) { + throw std::exception(); +} + +void BaseSession::handle_unsubscribe(const UnsubscribePacket &) { + throw std::exception(); +} + +void BaseSession::handle_unsuback(const UnsubackPacket &) { + throw std::exception(); +} + +void BaseSession::handle_pingreq(const PingreqPacket &) { + PingrespPacket pingresp_packet; + packet_manager->send_packet(pingresp_packet); +} + +void BaseSession::handle_pingresp(const PingrespPacket &) {} + +void BaseSession::handle_disconnect(const DisconnectPacket &) { + throw std::exception(); +} diff --git a/custom_modules/mqtt_server/mqtt_broker/src/base_session.h b/custom_modules/mqtt_server/mqtt_broker/src/base_session.h new file mode 100644 index 0000000..fa17d4a --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/src/base_session.h @@ -0,0 +1,213 @@ +/** + * @file base_session.h + * + * Base class for MQTT sessions. This class add facilities for persistence and resumption of session state. The MQTT + * standard requires that the client and server both maintain session state while connected. The server is also + * required to resume the state when a client re-connects with the same client id. + */ + +#pragma once + +#include "packet.h" +#include "packet_manager.h" + +#include + +#include + +/** + * Base session class + * + * Maintains session attributes and provides default handler methods for received control packets. Classes derived + * from BaseSession will override control packet handlers as required. + * + * Each BaseSession composes a PacketManager instance that can be moved between BaseSession instances. + */ +class BaseSession { + +public: + + /** + * Constructor + * + * Accepts a pointer to a libevent bufferevent as the only argument. The bufferevent is forwarded to a + * newly instantiated PacketManager that use it to handle all network related functions. + * + * This BaseSession instance can persist in memory after the network connection is closed. If a connection is + * received and the Connect control packet contains the same client id as an existing session. Any currently + * active connection in the original session is closed and this PacketManager will be moved to the original + * session. This session will then be deleted. + * + * @param bev Pointer to a bufferevent. + */ + BaseSession(struct bufferevent *bev) : packet_manager(new PacketManager(bev)) { + packet_manager->set_packet_received_handler( + std::bind(&BaseSession::packet_received, this, std::placeholders::_1)); + packet_manager->set_event_handler(std::bind(&BaseSession::packet_manager_event, this, std::placeholders::_1)); + } + + /** + * Desctructor + * + * Virtual so the desctructor for derived classes will be called. + */ + virtual ~BaseSession() {} + + /** Client id. */ + std::string client_id; + + /** Clean session flag. */ + bool clean_session; + + /** + * PacketManager callback. + * + * Invoked by the installed PacketManager instance when it receives a complete control packet. The default handler + * methods will be passed a reference to the received packet. Packet memory is heap allocated on creation and + * will be freed according to standard C++ std::unique_ptr rules. It is the responsibility of subclasses to + * manage the std::unique_ptr. + * + * @param packet Pointer to a packet. + */ + virtual void packet_received(std::unique_ptr packet); + + /** + * PacketManager callback. + * + * Invoked by the installed PacketManager instance when it detects a low level protocol or network error. The + * default action is to close the network connection. + * + * @param event The type of event detected. + */ + virtual void packet_manager_event(PacketManager::EventType event); + + /** + * Handle a received ConnectPacket. + * + * The default action is to throw an exception. Subclasses should override this method. + * + * @param connect_packet A reference to the packet. + */ + virtual void handle_connect(const ConnectPacket & connect_packet); + + /** + * Handle a received ConnackPacket. + * + * The default action is to throw an exception. Subclasses should override this method. + * + * @param connack_packet A reference to the packet. + */ + virtual void handle_connack(const ConnackPacket & connack_packet); + + /** + * Handle a received PublishPacket. + * + * The default action is to throw an exception. Subclasses should override this method. + * + * @param publish_packet A reference to the packet. + */ + virtual void handle_publish(const PublishPacket & publish_packet); + + /** + * Handle a received PubackPacket. + * + * The default action is to throw an exception. Subclasses should override this method. + * + * @param puback_packet A reference to the packet. + */ + virtual void handle_puback(const PubackPacket & puback_packet); + + /** + * Handle a received PubrecPacket. + * + * The default action is to throw an exception. Subclasses should override this method. + * + * @param pubrec_packet A reference to the packet. + */ + virtual void handle_pubrec(const PubrecPacket & pubrec_packet); + + /** + * Handle a received PubrelPacket. + * + * The default action is to throw an exception. Subclasses should override this method. + * + * @param pubrel_packet A reference to the packet. + */ + virtual void handle_pubrel(const PubrelPacket & pubrel_packet); + + /** + * Handle a received PubcompPacket. + * + * The default action is to throw an exception. Subclasses should override this method. + * + * @param pubcomp_packet A reference to the packet. + */ + virtual void handle_pubcomp(const PubcompPacket & pubcomp_packet); + + /** + * Handle a received SubscribePacket. + * + * The default action is to throw an exception. Subclasses should override this method. + * + * @param subscribe_packet A reference to the packet. + */ + virtual void handle_subscribe(const SubscribePacket & subscribe_packet); + + /** + * Handle a received SubackPacket. + * + * The default action is to throw an exception. Subclasses should override this method. + * + * @param suback_packet A reference to the packet. + */ + virtual void handle_suback(const SubackPacket & suback_packet); + + /** + * Handle a received UnsubscribePacket. + * + * The default action is to throw an exception. Subclasses should override this method. + * + * @param unsubscribe_packet A reference to the packet. + */ + virtual void handle_unsubscribe(const UnsubscribePacket & unsubscribe_packet); + + /** + * Handle a received UnsubackPacket. + * + * The default action is to throw an exception. Subclasses should override this method. + * + * @param unsuback_packet A reference to the packet. + */ + virtual void handle_unsuback(const UnsubackPacket & unsuback_packet); + + /** + * Handle a received PingreqPacket. + * + * The default action is to send a Pingresp packet. Subclasses can override this method. + * + * @param pingreq_packet A reference to the packet. + */ + virtual void handle_pingreq(const PingreqPacket & pingreq_packet); + + /** + * Handle a received PingrespPacket. + * + * The default action is to do nothing. Subclasses can override this method. + * + * @param pingresp_packet A reference to the packet. + */ + virtual void handle_pingresp(const PingrespPacket & pingresp_packet); + + /** + * Handle a received DisconnectPacket. + * + * The default action is to throw an exception. Subclasses should override this method. + * + * @param disconnect_packet A reference to the packet. + */ + virtual void handle_disconnect(const DisconnectPacket & disconnect_packet); + + /** Pointer to the installed PacketManager instance. */ + std::unique_ptr packet_manager; + +}; \ No newline at end of file diff --git a/custom_modules/mqtt_server/mqtt_broker/src/broker.cc b/custom_modules/mqtt_server/mqtt_broker/src/broker.cc new file mode 100644 index 0000000..7e71e38 --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/src/broker.cc @@ -0,0 +1,181 @@ +/** + * @file broker.cc + * + * MQTT Broker (server) + * + * Listen for connections from clients. Accept subscribe, unsubscribe and publish commands and forward according to + * the [MQTT protocol](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html) + */ + +#include "session_manager.h" +#include "broker_session.h" + +#include + +#include + +#include +#include + +/** + * Manange sessions for each client. + * Sessions will persist between connections and are identified by the client id of the connecting client. + */ +SessionManager session_manager; + +/** + * Callback run when SIGINT or SIGTERM is attached, will cleanly exit. + * + * @param signal Integer value of signal. + * @param event Should be EV_SIGNAL. + * @param arg Pointer originally passed to evsignal_new. + */ +static void signal_cb(evutil_socket_t signal, short event, void * arg); + +/** + * Callback run when connection is received on the listening socket. + * + * @param listener Pointer to this listener's internal control structure. + * @param fd File descriptor of the newly accepted socket. + * @param addr Address structure for the peer. + * @param socklen Length of the address structure. + * @param arg Pointer orignally passed to evconlistener_new_bind. + */ +static void listener_cb(struct evconnlistener * listener, evutil_socket_t fd, + struct sockaddr * addr, int socklen, void * arg); + +/** + * Parse command line. + * + * Recognized command line arguments are parsed and added to the options instance. This options instance will be + * used to configure the broker instance. + * + * @param argc Command line argument count. + * @param argv Command line argument values. + */ +static void parse_arguments(int argc, char *argv[]); + +/** + * Options settable through command line arguments. + */ +struct options_t { + + /** Network interface address to bind to. */ + std::string bind_address = "0"; + + /** Port number to bind to. */ + uint16_t bind_port = 1883; + +} options; + +int main(int argc, char *argv[]) { + + struct event_base *evloop; + struct event *signal_event; + struct evconnlistener *listener; + struct sockaddr_in sin; + + unsigned short listen_port = 1883; + + parse_arguments(argc, argv); + + evloop = event_base_new(); + if (!evloop) { + std::cerr << "Could not initialize libevent\n"; + return 1; + } + + signal_event = evsignal_new(evloop, SIGINT, signal_cb, evloop); + evsignal_add(signal_event, NULL); + signal_event = evsignal_new(evloop, SIGTERM, signal_cb, evloop); + evsignal_add(signal_event, NULL); + + std::memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + evutil_inet_pton(sin.sin_family, options.bind_address.c_str(), &sin.sin_addr); + sin.sin_port = htons(listen_port); + + listener = evconnlistener_new_bind(evloop, listener_cb, (void *) evloop, + LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1, + (struct sockaddr *) &sin, sizeof(sin)); + if (!listener) { + std::cerr << "Could not create listener!\n"; + return 1; + } + + event_base_dispatch(evloop); + + event_free(signal_event); + evconnlistener_free(listener); + event_base_free(evloop); + + return 0; + +} + +void usage() { + std::cout << + R"END(usage: mqtt_broker [OPTION] + +MQTT broker server. Bind to address and listen for client connections. + +OPTIONS + +--broker-host | -b Broker host name or ip address, default localhost +--broker-port | -p Broker port, default 1883 +--help | -h Display this message and exit +)END"; + +} +void parse_arguments(int argc, char *argv[]) { + static struct option longopts[] = { + {"bind-addr", required_argument, NULL, 'b'}, + {"bind-port", required_argument, NULL, 'p'}, + {"help", no_argument, NULL, 'h'} + }; + + + int ch; + while ((ch = getopt_long(argc, argv, "b:p:h", longopts, NULL)) != -1) { + switch (ch) { + case 'b': + options.bind_address = optarg; + break; + case 'p': + options.bind_port = static_cast(atoi(optarg)); + break; + case 'h': + usage(); + std::exit(0); + default: + usage(); + std::exit(1); + } + } + +} + +static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd, + struct sockaddr *sa, int socklen, void *user_data) { + struct event_base *base = static_cast(user_data); + struct bufferevent *bev; + + bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); + if (!bev) { + std::cerr << "Error constructing bufferevent!\n"; + event_base_loopbreak(base); + return; + } + + session_manager.accept_connection(bev); +} + +static void signal_cb(evutil_socket_t fd, short event, void *arg) { + + event_base *base = static_cast(arg); + + if (event_base_loopexit(base, NULL)) { + std::cerr << "failed to exit event loop\n"; + } + +} diff --git a/custom_modules/mqtt_server/mqtt_broker/src/broker_session.cc b/custom_modules/mqtt_server/mqtt_broker/src/broker_session.cc new file mode 100644 index 0000000..72c5b5b --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/src/broker_session.cc @@ -0,0 +1,250 @@ +/** + * @file broker_session.cc + */ + +#include "broker_session.h" +#include "session_manager.h" + +#include + +bool BrokerSession::authorize_connection(const ConnectPacket &packet) { + return true; +} + +void BrokerSession::resume_session(std::unique_ptr &session, + std::unique_ptr packet_manager_ptr) { + + packet_manager_ptr->set_event_handler( + std::bind(&BrokerSession::packet_manager_event, session.get(), std::placeholders::_1)); + packet_manager_ptr->set_packet_received_handler( + std::bind(&BrokerSession::packet_received, session.get(), std::placeholders::_1)); + session->packet_manager = std::move(packet_manager_ptr); + + ConnackPacket connack; + + connack.session_present(true); + connack.return_code = ConnackPacket::ReturnCode::Accepted; + + session->packet_manager->send_packet(connack); +} + +void BrokerSession::forward_packet(const PublishPacket &packet) { + + if (packet.qos() == QoSType::QoS0) { + packet_manager->send_packet(packet); + } else if (packet.qos() == QoSType::QoS1) { + PublishPacket packet_to_send(packet); + packet_to_send.dup(false); + packet_to_send.retain(false); + packet_to_send.packet_id = packet_manager->next_packet_id(); + qos1_pending_puback.push_back(packet_to_send); + packet_manager->send_packet(packet_to_send); + } else if (packet.qos() == QoSType::QoS2) { + + PublishPacket packet_to_send(packet); + packet_to_send.dup(false); + packet_to_send.retain(false); + packet_to_send.packet_id = packet_manager->next_packet_id(); + + auto previous_packet = find_if(qos2_pending_pubrec.begin(), qos2_pending_pubrec.end(), + [&packet](const PublishPacket &p) { return p.packet_id == packet.packet_id; }); + if (previous_packet == qos2_pending_pubrec.end()) { + qos2_pending_pubrec.push_back(packet_to_send); + } + + packet_manager->send_packet(packet_to_send); + + } +} + +void BrokerSession::send_pending_message() { + + if (!qos1_pending_puback.empty()) { + packet_manager->send_packet(qos1_pending_puback[0]); + } else if (!qos2_pending_pubrec.empty()) { + packet_manager->send_packet(qos2_pending_pubrec[0]); + } else if (!qos2_pending_pubrel.empty()) { + PubrecPacket pubrec_packet; + pubrec_packet.packet_id = qos2_pending_pubrel[0]; + packet_manager->send_packet(pubrec_packet); + } else if (!qos2_pending_pubcomp.empty()) { + PubrelPacket pubrel_packet; + pubrel_packet.packet_id = qos2_pending_pubcomp[0]; + packet_manager->send_packet(pubrel_packet); + } + + return; +} + +void BrokerSession::packet_received(std::unique_ptr packet) { + BaseSession::packet_received(std::move(packet)); + send_pending_message(); +} + +void BrokerSession::packet_manager_event(PacketManager::EventType event) { + BaseSession::packet_manager_event(event); + if (clean_session) { + session_manager.erase_session(this); + } +} + +void BrokerSession::handle_connect(const ConnectPacket &packet) { + + if (!authorize_connection(packet)) { + session_manager.erase_session(this); + return; + } + + if (packet.clean_session()) { + session_manager.erase_session(packet.client_id); + } else { + auto previous_session_it = session_manager.find_session(packet.client_id); + if (previous_session_it != session_manager.sessions.end()) { + std::unique_ptr &previous_session_ptr = *previous_session_it; + resume_session(previous_session_ptr, std::move(packet_manager)); + session_manager.erase_session(this); + return; + } + } + + client_id = packet.client_id; + clean_session = packet.clean_session(); + + ConnackPacket connack; + + connack.session_present(false); + connack.return_code = ConnackPacket::ReturnCode::Accepted; + + packet_manager->send_packet(connack); + +} + +void BrokerSession::handle_publish(const PublishPacket &packet) { + + if (packet.qos() == QoSType::QoS0) { + + session_manager.handle_publish(packet); + + } else if (packet.qos() == QoSType::QoS1) { + + session_manager.handle_publish(packet); + PubackPacket puback; + puback.packet_id = packet.packet_id; + packet_manager->send_packet(puback); + + } else if (packet.qos() == QoSType::QoS2) { + + auto previous_packet = find_if(qos2_pending_pubrel.begin(), qos2_pending_pubrel.end(), + [& packet](uint16_t packet_id) { return packet_id == packet.packet_id; }); + if (previous_packet == qos2_pending_pubrel.end()) { + qos2_pending_pubrel.push_back(packet.packet_id); + session_manager.handle_publish(packet); + } + + } + + session_manager.handle_local_publish(client_id, packet); +} + +void BrokerSession::handle_puback(const PubackPacket &packet) { + + auto message = find_if(qos1_pending_puback.begin(), qos1_pending_puback.end(), + [&packet](const PublishPacket &p) { return p.packet_id == packet.packet_id; }); + if (message != qos1_pending_puback.end()) { + qos1_pending_puback.erase(message); + } + +} + +void BrokerSession::handle_pubrec(const PubrecPacket &packet) { + + qos2_pending_pubrec.erase( + std::remove_if(qos2_pending_pubrec.begin(), qos2_pending_pubrec.end(), + [&packet](const PublishPacket &p) { return p.packet_id == packet.packet_id; }), + qos2_pending_pubrec.end() + ); + + auto pubcomp_packet = find_if(qos2_pending_pubcomp.begin(), qos2_pending_pubcomp.end(), + [&packet](uint16_t packet_id) { return packet_id == packet.packet_id; }); + + if (pubcomp_packet == qos2_pending_pubcomp.end()) { + qos2_pending_pubcomp.push_back(packet.packet_id); + } + +} + +void BrokerSession::handle_pubrel(const PubrelPacket &packet) { + + qos2_pending_pubrel.erase( + std::remove_if(qos2_pending_pubrel.begin(), qos2_pending_pubrel.end(), + [&packet](uint16_t packet_id) { return packet_id == packet.packet_id; }), + qos2_pending_pubrel.end() + ); + + PubcompPacket pubcomp; + pubcomp.packet_id = packet.packet_id; + packet_manager->send_packet(pubcomp); +} + +void BrokerSession::handle_pubcomp(const PubcompPacket &packet) { + + qos2_pending_pubcomp.erase( + std::remove_if(qos2_pending_pubcomp.begin(), qos2_pending_pubcomp.end(), + [&packet](uint16_t packet_id) { return packet_id == packet.packet_id; }), + qos2_pending_pubcomp.end() + ); + +} + +void BrokerSession::handle_subscribe(const SubscribePacket &packet) { + + SubackPacket suback; + + suback.packet_id = packet.packet_id; + + for (auto subscription : packet.subscriptions) { + + auto previous_subscription = find_if(subscriptions.begin(), subscriptions.end(), + [&subscription](const Subscription &s) { + return topic_match(s.topic_filter, subscription.topic_filter); + }); + if (previous_subscription != subscriptions.end()) { + subscriptions.erase(previous_subscription); + } + + subscriptions.push_back(subscription); + + SubackPacket::ReturnCode return_code = SubackPacket::ReturnCode::Failure; + switch (subscription.qos) { + case QoSType::QoS0: + return_code = SubackPacket::ReturnCode::SuccessQoS0; + break; + case QoSType::QoS1: + return_code = SubackPacket::ReturnCode::SuccessQoS1; + break; + case QoSType::QoS2: + return_code = SubackPacket::ReturnCode::SuccessQoS2; + break; + } + suback.return_codes.push_back(return_code); + } + + packet_manager->send_packet(suback); + +} + +void BrokerSession::handle_unsubscribe(const UnsubscribePacket &packet) { + + UnsubackPacket unsuback; + + unsuback.packet_id = packet.packet_id; + + packet_manager->send_packet(unsuback); + +} + +void BrokerSession::handle_disconnect(const DisconnectPacket &packet) { + if (clean_session) { + session_manager.erase_session(this); + } +} diff --git a/custom_modules/mqtt_server/mqtt_broker/src/broker_session.h b/custom_modules/mqtt_server/mqtt_broker/src/broker_session.h new file mode 100644 index 0000000..5ff79dc --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/src/broker_session.h @@ -0,0 +1,266 @@ +/** + * @file broker_session.h + * + * This class builds on the BaseSession class adding members and methods needed for a MQTT session in the broker. + * + * Drived class for MQTT broker sessions. In addition to maintaining session state throughout the lifetime of the + * connection, the MQTT specification requires that the broker persist session state after a client closes the + * connection. When a subsequent connection is made with the same client id, the persisted session should be resumed. + * Any QoS 1 or QoS 2 messages delivered to client subscribed topics should be forwarded over the new connection. + */ + +#pragma once + +#include "base_session.h" +#include "packet_manager.h" +#include "packet.h" + +#include + +#include +#include + +class SessionManager; + +class Message; + +class Subscription; + +/** + * Broker session class + * + * In addition to maintaining session attributes and handling control packets, this subclass adds facilities for + * persisting and resuming session state according to the MQTT 3.1.1 standard. + */ +class BrokerSession : public BaseSession { + +public: + + /** + * Constructor + * + * In addition to the bufferen event pointer required by the BaseSession constructor, this constructor accepts + * a reference to a SessionManager class. + * + * @param bev Pointer to a bufferevent internal control structure. + * @param session_manager Reference to the SessionManager. + */ + BrokerSession(struct bufferevent *bev, SessionManager &session_manager) : BaseSession(bev), + session_manager(session_manager) { + } + + /** + * Handle authentication and authorization of a connecting client. + * + * This method is called from the connect packet handler. It currently always returns true. + * + * @return Authorization granted. + */ + bool authorize_connection(const ConnectPacket &); + + /** + * Resume a persisted session. + * + * The MQTT 3.1.1 standard requires that the session state be restored for clients connecting with the same client + * id. This method is used to perform that action once a persisted session is recognized. This method accepts + * a reference to the BrokerSession to be restored and PacketManager instance to be installed in the restored + * session. Once installed a the PacketManager will send a Connack packet to the connecting client with the + * Session Present flag set. + * + * @param session Reference to the session to be resumed. + * @param packet_manager PacketManager to be installed in the resumed session. + */ + void resume_session(std::unique_ptr &session, + std::unique_ptr packet_manager); + + /** + * List of topics subscribed to by this client. + */ + std::vector subscriptions; + + /** + * Forward a publshed message to the connected client. + * + * This method is called by the SessionManager when forwarding messages to subscribed clients. It will behave + * according to the QoS in the PublishPacket. QoS 0 packets will be forwarded and forgotten. In the case of QoS 1 + * or 2 messages, these will be retained until they are acknowledged according to the publish control packet + * protocol flow described in the MQTT 3.1.1 standard. + * + * @param packet Reference to the PublishPacket to forward. + */ + void forward_packet(const PublishPacket &packet); + + /** + * Send messages from the pending queues. + * + * Iterate through the pending message queues and if non-empty send a single pending message. This method should + * be called periodically. Currently it is called each time a packet is received from a client. + */ + void send_pending_message(void); + + /** + * PacketManager callback. + * + * This method will delegate to the BaseSession method and then invoke send_pending_message. Ownership of the + * Packet is transfered back to the BaseSession instance. + * + * @param packet Reference counted pointer to a packet. + */ + void packet_received(std::unique_ptr packet) override; + + /** + * PacketManager callback. + * + * This method will delegate to the BaseSession method then potentially remove this session from the SessionManager + * based on the clean_session flag. + * + * @param event The type of event detected. + */ + void packet_manager_event(PacketManager::EventType event) override; + + /** + * Handle a received ConnectPacket. + * + * This method will examine the ConnectPacket and restore or set up a new session. The authorize_connection method + * will be called to authenticate and authorize this connection. The client id will be used to lookup any + * previous session and if found the PacketManager instance installed in this session will be moved to the + * persisted session and this session instance will be destroyed. + * + * In case a persisted session is not found. A Connack packet will be sent with the session_present flag set to + * false. + * + * @param connect_packet A reference to the packet. + */ + void handle_connect(const ConnectPacket & connect_packet) override; + + /** + * Handle a received PublishPacket. + * + * Delegate forwarding of this message to the SessionManager. Additional actions will be performed based on the + * QoS value in the PublishPacket. For QoS 1 a Puback packet will be sent. For QoS 2, queue and expected Pubrel + * packet id which will initiate the sending of a Pubrec packet at the next run of the pending packets queue. + * + * @param publish_packet A reference to the packet. + */ + void handle_publish(const PublishPacket & publish_packet) override; + + /** + * Handle a received PubackPacket. + * + * This packet is expected in response to a Publish control packet with QoS 1. Publish packets with QoS 1 will be + * resent periodically until a Puback is received. QoS 1 Publish packets have an 'at least once' delivery + * guarantee. This handler will remove the Publish packet from the queue of pending messages ending the QoS 1 + * protocol flow. + * + * @param puback_packet A reference to the packet. + */ + void handle_puback(const PubackPacket & puback_packet) override; + + /** + * Handle a received PubrecPacket + * + * This packet is received in response to a Publish control packet with QoS 2. Publish packets with QoS 2 will be + * resent periodically until a PubRec is received. QoS 2 Publish packets have an 'exactly once' delivery + * guarantee. This handler will remove the Publish packet from the queue of pending messages and will continue + * the QoS 2 protocol flow by adding the packet id to the pending Pubcomp queue. This will enable the send of a + * Pubrel control packet at the next pending packet queue run. + * + * @param pubrec_packet A reference to the packet. + */ + void handle_pubrec(const PubrecPacket & pubrec_packet) override; + + /** + * Handle a received PubrelPacket. + * + * This packet is received in response to a Pubrec packet in the QoS 2 protocol flow. This handler will remove + * the Pubrec packet id from the queue of pending Pubrel packets and send a Pubcomp packet ending the QoS 2 + * protocol flow. + * + * @param pubrel_packet A reference to the packet. + */ + void handle_pubrel(const PubrelPacket & pubrel_packet) override; + + /** + * Handle a received PubcompPacket. + * + * This packet is received in response to a Pubrel control packet in the QoS 2 protocol flow. This handler will + * remove the packet id from the pending Pubcomp queue. No further processing is done. + * + * @param pubcomp_packet A reference to the packet. + */ + void handle_pubcomp(const PubcompPacket & pubcomp_packet) override; + + /** + * Handle a received SubscribePacket. + * + * Add the contained topic names to the list of subscriptions maintained in this session. Any previous matching + * subscribed topic will be replaced by the new one overriding the subscribed QoS. Send a Suback packet in + * response. + * + * @param subscribe_packet A reference to the packet. + */ + void handle_subscribe(const SubscribePacket & subscribe_packet) override; + + /** + * Handle a received UnsubscribePacket. + * + * Remove the topic name from the list of subscribed topics. + * + * //TODO This function is currently incomplete. + * + * @param unsubscribe_packet A reference to the packet. + */ + void handle_unsubscribe(const UnsubscribePacket & unsubscribe_packet) override; + + /** + * Handle a DisconnectPacket. + * + * Remove this session from the pool of persistent sessions and destroy it, depending on the clean_session + * attribute of this session. + * + * @param disconnect_packet A reference to the packet. + */ + void handle_disconnect(const DisconnectPacket & disconnect_packet) override; + + /** + * List of QoS 1 messages waiting for Puback. + * + * These messages have been forwarded to subscribed clients. This list will be persisted between connections as + * part of the BrokerSession state. + */ + std::vector qos1_pending_puback; + + /** + * List of QoS 2 messages waiting for Pubrec. + * + * These messages have been forwarded to subscribed clients. This list will be persisted between connections as + * part of the BrokerSession state. + */ + std::vector qos2_pending_pubrec; + + /** + * List of QoS 2 messages waiting for Pubrel. + * + * These messages have been received in Publish control packets from a client connected to this session and + * potentially forwarded on by the SessionManager to other subscribed clients. Packet ids are added to this + * list when a Pubrec control packet has been sent. The list will be persisted between connections as part of the + * BrokerSession state. + */ + std::vector qos2_pending_pubrel; + + /** + * List of QoS 2 messages waiting for Pubcomp. + * + * These messages have been forwarded to subscribed clients. Packet ids are added to this list when a Pubrel + * control packet has been received. This list will be persisted between connections as part of the BrokerSession + * state. + */ + std::vector qos2_pending_pubcomp; + + /** + * A reference to the SessionManager instance. + */ + SessionManager &session_manager; + +}; + diff --git a/custom_modules/mqtt_server/mqtt_broker/src/client_id.cc b/custom_modules/mqtt_server/mqtt_broker/src/client_id.cc new file mode 100644 index 0000000..215f833 --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/src/client_id.cc @@ -0,0 +1,26 @@ +/** + * @file client_id.cc + */ + +#include "client_id.h" + +#include +#include +#include + +static const std::string characters = "abcdefghijklmnopqrstuvwxyz0123456789"; +static std::once_flag init_rnd; + +std::string generate_client_id(size_t len) { + + std::call_once(init_rnd, [](){ std::srand(std::time(nullptr)); }); + + std::string random_string; + + for (size_t i=0; i +#include + +/** + * Generate a unique client id. + * + * The client id is a random character sequence drawn from the character set [a-z0-9]. + * + * @param len Length of the sequence to generate + * @return Random character sequence + */ +std::string generate_client_id(size_t len=32); \ No newline at end of file diff --git a/custom_modules/mqtt_server/mqtt_broker/src/client_pub.cc b/custom_modules/mqtt_server/mqtt_broker/src/client_pub.cc new file mode 100644 index 0000000..bfb141e --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/src/client_pub.cc @@ -0,0 +1,362 @@ +/** + * @file client_pub.cc + * + * MQTT Publisher (client) + * + * Connect to a listening broker and publish a message on a topic. Topic and message are passed as command line + * arguments to this program. + * See [the MQTT specification](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html) + */ + +#include "packet.h" +#include "base_session.h" + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include + +/** + * Display usage message + * + * Displays help on command line arguments. + */ +static void usage(void); + +/** + * Parse command line. + * + * Recognized command line arguments are parsed and added to the options instance. This options instance will be + * passed to the session instance. + * + * @param argc Command line argument count. + * @param argv Command line argument values. + */ +static void parse_arguments(int argc, char *argv[]); + +/** + * On broker connection callback. + * + * @param bev Pointer to bufferevent internal control structure. + * @param event The bufferevent event code, on success this will be EV_EVENT_CONNECTED, it can also be one of + * EV_EVENT_EOF, EV_EVENT_ERROR, or EV_EVENT_TIMEOUT. + * @param arg Pointer to user data passed in when callback is set. Here this should be a pointer to the event_base + * object. + */ +static void connect_event_cb(struct bufferevent *bev, short event, void *arg); + +/** + * Socket close handler. + * + * When the session is closing this function is set to be called when the write buffer is empty. When called it will + * close the connection and exit the event loop. + * + * @param bev Pointer to the bufferevent internal control structure. + * @param arg Pointer to user data passed in when callback is set. Here this should be a pointer to the event_base + * object. + */ +static void close_cb(struct bufferevent *bev, void *arg); + +/** + * Options settable through command line arguments. + */ +static struct options_t { + + /** Broker host to connect to. DNS name is allowed. */ + std::string broker_host = "localhost"; + + /** Broker port. */ + uint16_t broker_port = 1883; + + /** Client id. If empty no client id will be sent. The broker will generate a client id automatically. */ + std::string client_id; + + /** Topic to publish to. */ + std::string topic; + + /** Message text to publish. */ + std::vector message; + + /** Quality of service specifier for message. */ + QoSType qos = QoSType::QoS0; + + /** + * Drop this session on exit if true. If false this session will persist in the broker after disconnection + * and will be continued at the next connection with the same client id. This feature isn't useful when a client + * id is not provided. + */ + bool clean_session = false; + +} options; + +/** + * Session class specialized for this publishing client. + * + * Override base class control packet handlers used in message publishing. Any other control packets received will be + * handled by default methods, in most cases that will result in a thrown exception. + */ +class ClientSession : public BaseSession { + +public: + + /** + * Constructor + * + * @param bev Pointer to the buffer event structure for the broker connection. + * @param options Options structure. + */ + ClientSession(bufferevent *bev, const options_t &options) : BaseSession(bev), options(options) {} + + /** Reference to the options structure with members updated from command line arguments. */ + const options_t &options; + + /** + * Store the packet id of the publish packet we send for comparisison with any puback or pubcomp control packets + * received from the broker. + */ + uint16_t published_packet_id = 0; + + /** + * Handle a connack control packet from the broker. + * + * This packet will be sent in response to the connect packet that this client will send. Check the return code + * and disconnect on any error. Otherwise construct a publish packet from the options and send this to the broker. + * + * @param connack_packet The connack control packet received. + */ + void handle_connack(const ConnackPacket &connack_packet) override { + + if (connack_packet.return_code != ConnackPacket::ReturnCode::Accepted) { + std::cerr << "connection not accepted by broker\n"; + disconnect(); + } + + PublishPacket publish_packet; + publish_packet.qos(options.qos); + publish_packet.topic_name = options.topic; + publish_packet.packet_id = packet_manager->next_packet_id(); + published_packet_id = publish_packet.packet_id; + publish_packet.message_data = std::vector(options.message.begin(), options.message.end()); + packet_manager->send_packet(publish_packet); + + if (options.qos == QoSType::QoS0) { + disconnect(); + } + } + + /** + * Handle a puback control packet from the broker. + * + * This packet will be sent in response to a publish message with QoS 1. Compare the packet id with the packet + * id sent in a previous publish message. Notify in case of a mismatch. + * + * @param puback_packet The received puback control packet. + */ + void handle_puback(const PubackPacket &puback_packet) override { + + if (puback_packet.packet_id != published_packet_id) { + std::cout << "puback packet id mismatch: sent " << published_packet_id << " received " + << puback_packet.packet_id << "\n"; + } + disconnect(); + } + + /** + * Handle a pubrec control packet from the broker. + * + * This packet will be sent in repsponse to a publish message with QoS 2. Compare the packet id with the packet + * id sent in a previous publish message. Notify in case of mismatch. Send a pubrel packet containing the received + * packet id in response. Wait for pubcomp control packet response. + * + * @param pubrec_packet The received pubrec control packet. + */ + void handle_pubrec(const PubrecPacket &pubrec_packet) override { + + if (pubrec_packet.packet_id != published_packet_id) { + std::cout << "pubrec packet id mismatch: sent " << published_packet_id << " received " + << pubrec_packet.packet_id << "\n"; + } + + PubrelPacket pubrel_packet; + pubrel_packet.packet_id = pubrec_packet.packet_id; + packet_manager->send_packet(pubrel_packet); + + } + + /** + * Handle a pubcomp control packet from the broker. + * + * This packet will be sent in response to a pubrel packet confirmation of a QoS 2 publish message. Compare the + * packet id sent in a previous publish message. Notify in case of mismatch. On receipt of this message the QoS 2 + * confirmation exchange is complete. If the packet id matches the the packet id of the original message + * disconnect from the broker, otherwise stay connected waiting for the pubcomp with the required packet id. + * + * @param puback_packet the received pubcomp packet + */ + void handle_pubcomp(const PubcompPacket &pubcomp_packet) override { + + if (pubcomp_packet.packet_id != published_packet_id) { + std::cout << "pubcomp packet id mismatch: sent " << published_packet_id << " received " + << pubcomp_packet.packet_id << "\n"; + + } else { + disconnect(); + } + } + + /** + * Disconnect from the broker. + * + * Send a disconnect control packet. Set up libevent to wait for the disconnect packet to be written then close + * the network connection. + */ + void disconnect() { + DisconnectPacket disconnect_packet; + packet_manager->send_packet(disconnect_packet); + bufferevent_enable(packet_manager->bev, EV_WRITE); + bufferevent_setcb(packet_manager->bev, packet_manager->bev->readcb, close_cb, NULL, + packet_manager->bev->ev_base); + } +}; + +/** + * The session instance. + * + * MQTT requires that both the client and server maintain a session state. + */ +static std::unique_ptr session; + +int main(int argc, char *argv[]) { + + struct event_base *evloop; + struct evdns_base *dns_base; + struct bufferevent *bev; + + parse_arguments(argc, argv); + + evloop = event_base_new(); + if (!evloop) { + std::cerr << "Could not initialize libevent\n"; + std::exit(1); + } + + dns_base = evdns_base_new(evloop, 1); + + bev = bufferevent_socket_new(evloop, -1, BEV_OPT_CLOSE_ON_FREE); + + bufferevent_setcb(bev, NULL, NULL, connect_event_cb, evloop); + + bufferevent_socket_connect_hostname(bev, dns_base, AF_UNSPEC, options.broker_host.c_str(), options.broker_port); + + evdns_base_free(dns_base, 0); + + event_base_dispatch(evloop); + event_base_free(evloop); + +} + +static void connect_event_cb(struct bufferevent *bev, short events, void *arg) { + + if (events & BEV_EVENT_CONNECTED) { + + session = std::unique_ptr(new ClientSession(bev, options)); + + ConnectPacket connect_packet; + connect_packet.client_id = options.client_id; + session->packet_manager->send_packet(connect_packet); + + } else if (events & (BEV_EVENT_ERROR | BEV_EVENT_EOF)) { + std::cerr << "error connecting to broker\n"; + struct event_base *base = static_cast(arg); + if (events & BEV_EVENT_ERROR) { + int err = bufferevent_socket_get_dns_error(bev); + if (err) + std::cerr << "DNS error: " << evutil_gai_strerror(err) << "\n"; + } + + bufferevent_free(bev); + event_base_loopexit(base, NULL); + } + +} + +static void usage() { + std::cout << +R"END(usage: mqtt_client_pub [OPTIONS] + +Connect to an MQTT broker and publish a single message to a single topic. + +OPTIONS + +--broker-host | -b Broker host name or ip address, default localhost +--broker-port | -p Broker port, default 1883 +--client-id | -i Client id, default none +--topic | -t Topic string, default none +--message | -m Message data, default none +--qos | -q QoS (Quality of Service), should be 0, 1, or 2, default 0 +--clean-session | -c Disable session persistence, default false +--help | -h Display this message and exit +)END"; + +} + +static void parse_arguments(int argc, char *argv[]) { + static struct option longopts[] = { + {"broker-host", required_argument, NULL, 'b'}, + {"broker-port", required_argument, NULL, 'p'}, + {"client-id", required_argument, NULL, 'i'}, + {"topic", required_argument, NULL, 't'}, + {"message", required_argument, NULL, 'm'}, + {"qos", required_argument, NULL, 'q'}, + {"clean-session", no_argument, NULL, 'c'}, + {"help", no_argument, NULL, 'h'} + }; + + + int ch; + while ((ch = getopt_long(argc, argv, "b:p:i:t:m:q:ch", longopts, NULL)) != -1) { + switch (ch) { + case 'b': + options.broker_host = optarg; + break; + case 'p': + options.broker_port = static_cast(atoi(optarg)); + break; + case 'i': + options.client_id = optarg; + break; + case 't': + options.topic = optarg; + break; + case 'm': + options.message = std::vector(optarg, optarg + strlen(optarg)); + case 'q': + options.qos = static_cast(atoi(optarg)); + break; + case 'c': + options.clean_session = true; + break; + case 'h': + usage(); + std::exit(0); + + default: + usage(); + std::exit(1); + } + } + +} + +static void close_cb(struct bufferevent *bev, void *arg) { + session->packet_manager->close_connection(); + event_base *base = static_cast(arg); + event_base_loopexit(base, NULL);; +} diff --git a/custom_modules/mqtt_server/mqtt_broker/src/client_sub.cc b/custom_modules/mqtt_server/mqtt_broker/src/client_sub.cc new file mode 100644 index 0000000..1de4241 --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/src/client_sub.cc @@ -0,0 +1,369 @@ +/** + * @file client_sub.cc + * + * MQTT Subscriber (client) + * + * Connect to a listening broker and add a subscription a topic then listen for pubished messages from the broker. + * Topic strings can follow the MQTT 3.1.1 specification including wildcards. + * See [the MQTT specification](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html) + */ + +#include "packet.h" +#include "packet_manager.h" +#include "base_session.h" + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include + +/** + * Display usage message + * + * Displays help on command line arguments. + */ +static void usage(void); + +/** + * Parse command line. + * + * Recognized command line arguments are parsed and added to the options instance. This options instance will be + * passed to the session instance. + * + * @param argc Command line argument count. + * @param argv Command line argument values. + */ +static void parse_arguments(int argc, char *argv[]); + +/** + * On broker connection callback. + * + * @param bev Pointer to bufferevent internal control structure. + * @param event The bufferevent event code, on success this will be EV_EVENT_CONNECTED, it can also be one of + * EV_EVENT_EOF, EV_EVENT_ERROR, or EV_EVENT_TIMEOUT. + * @param arg Pointer to user data passed in when callback is set. Here this should be a pointer to the event_base + * object. + */ +static void connect_event_cb(struct bufferevent *bev, short event, void *arg); + +/** + * Socket close handler. + * + * When the session is closing this function is set to be called when the write buffer is empty. When called it will + * close the connection and exit the event loop. + * + * @param bev Pointer to the bufferevent internal control structure. + * @param arg Pointer to user data passed in when callback is set. Here this should be a pointer to the event_base + * object. + */ +static void close_cb(struct bufferevent *bev, void *arg); + +/** + * Callback run when SIGINT or SIGTERM is attached, will cleanly exit. + * + * @param signal Integer value of signal. + * @param event Should be EV_SIGNAL. + * @param arg Pointer originally passed to evsignal_new. + */ +static void signal_cb(evutil_socket_t, short event, void *); + +/** + * Options settable through command line arguments. + */ +struct options_t { + + /** Broker host to connect to. DNS name is allowed. */ + std::string broker_host = "localhost"; + + /** Broker port. */ + uint16_t broker_port = 1883; + + /** Client id. If empty no client id will be sent. The broker will generate a client id automatically. */ + std::string client_id; + + /** Topics to subscribe to. This option can be specified more than once to subscribe to multiple topics. */ + std::vector topics; + + /** Quality of service specifier for message. */ + QoSType qos = QoSType::QoS0; + + /** + * Drop this session on exit if true. If false this session will persist in the broker after disconnection + * and will be continued at the next connection with the same client id. This feature isn't useful when a client + * id is not provided. + */ + bool clean_session = false; + +} options; + +/** + * Session class specialized for this publishing client. + * + * Override base class control packet handlers used to subscribe to topics and to receive messages forwarded from the + * broker. Any other control packets received will be handled by default methods, in most cases that will result in a + * thrown exception. + */ +class ClientSession : public BaseSession { +public: + + /** + * Constructor + * + * @param bev Pointer to the buffer event structure for the broker connection. + * @param options Options structure. + */ + ClientSession(bufferevent *bev, const options_t &options) : BaseSession(bev), options(options) {} + + /** Reference to the options structure with members updated from command line arguments. */ + const options_t &options; + + /** + * Handle a connack control packet from the broker. + * + * This packet will be sent in response to the connect packet that this client will send. Check the return code + * and disconnect on any error. Otherwise construct a subscribe packet from the options and send it to the broker. + * + * @param connack_packet The connack control packet received. + */ + void handle_connack(const ConnackPacket &connack_packet) override { + + SubscribePacket subscribe_packet; + subscribe_packet.packet_id = packet_manager->next_packet_id(); + + for (auto topic : options.topics) { + subscribe_packet.subscriptions.push_back(Subscription{topic, options.qos}); + } + packet_manager->send_packet(subscribe_packet); + } + + /** + * Handle a suback control packet from the broker. + * + * This packet will be sent in response to a subscribe message. Check the error code in the packet and if there is + * an error output an error message but continue to listen for messages. Also check the qos field in the suback + * packet and compare with the requested qos in the original subscribe. If there is a mismatch output a message. + * + * @param suback_packet The received suback control packet. + */ + void handle_suback(const SubackPacket &suback_packet) override { + + for (size_t i = 0; i < suback_packet.return_codes.size(); i++) { + SubackPacket::ReturnCode code = suback_packet.return_codes[i]; + if (code == SubackPacket::ReturnCode::Failure) { + std::cout << "Subscription to topic " << options.topics[i] << "failed\n"; + } else if (static_cast(code) != options.qos) { + std::cout << "Topic " << options.topics[i] << " requested qos " << static_cast(options.qos) + << " subscribed " + << static_cast(code) << "\n"; + } + } + } + + /** + * Handle a publish control packet from the broker. + * + * This packet will contain forwarded messages from other clients published to the topic. Output the message and + * acknowledge based on the QoS in the packet. In the case of a QoS 0 message there is no acknowledegment. + * A Qos 1 message will require a Puback packet in response. A QoS 2 message requires a Pubrec packet from the + * subscriber followed by a Pubrel packet from the broker and finally a Pubcomp packet is sent from the subscriber. + * + * @param publish_packet the received publish packet + */ + void handle_publish(const PublishPacket &publish_packet) override { + + std::cout << std::string(publish_packet.message_data.begin(), publish_packet.message_data.end()) << "\n"; + + if (publish_packet.qos() == QoSType::QoS1) { + PubackPacket puback_packet; + puback_packet.packet_id = publish_packet.packet_id; + packet_manager->send_packet(puback_packet); + } else if (publish_packet.qos() == QoSType::QoS2) { + PubrecPacket pubrec_packet; + pubrec_packet.packet_id = publish_packet.packet_id; + packet_manager->send_packet(pubrec_packet); + } + } + + + /** + * Handle a pubrel control packet from the broker. + * + * This packet will be sent in repsponse to a Pubrel packet in the QoS 2 protocol flow. Send a Pubcomp packet + * in response. This completes the QoS 2 flow. + * + * @param pubrel_packet The received pubrel control packet. + */ + void handle_pubrel(const PubrelPacket &pubrel_packet) override { + PubcompPacket pubcomp_packet; + pubcomp_packet.packet_id = pubrel_packet.packet_id; + packet_manager->send_packet(pubcomp_packet); + } + + /** + * Handle protocol events in the packet manager. + * + * This callback will be called by the packet manager in case of protocol or network error. In most cases the + * response is to close the connection to the broker. + * + * @param event The specific type of the event reported by the packet manager. + */ + void packet_manager_event(PacketManager::EventType event) override { + event_base_loopexit(packet_manager->bev->ev_base, NULL); + BaseSession::packet_manager_event(event); + } +}; + +/** + * The session instance. + * + * MQTT requires that both the client and server maintain a session state. + */ +std::unique_ptr session; + +int main(int argc, char *argv[]) { + + struct event_base *evloop; + struct event *signal_event; + struct evdns_base *dns_base; + struct bufferevent *bev; + + parse_arguments(argc, argv); + + evloop = event_base_new(); + if (!evloop) { + std::cerr << "Could not initialize libevent\n"; + std::exit(1); + } + + signal_event = evsignal_new(evloop, SIGINT, signal_cb, evloop); + evsignal_add(signal_event, NULL); + signal_event = evsignal_new(evloop, SIGTERM, signal_cb, evloop); + evsignal_add(signal_event, NULL); + + dns_base = evdns_base_new(evloop, 1); + + bev = bufferevent_socket_new(evloop, -1, BEV_OPT_CLOSE_ON_FREE); + + bufferevent_setcb(bev, NULL, NULL, connect_event_cb, evloop); + + bufferevent_socket_connect_hostname(bev, dns_base, AF_UNSPEC, options.broker_host.c_str(), options.broker_port); + evdns_base_free(dns_base, 0); + + event_base_dispatch(evloop); + + event_free(signal_event); + event_base_free(evloop); + +} + +static void connect_event_cb(struct bufferevent *bev, short events, void *arg) { + + if (events & BEV_EVENT_CONNECTED) { + + session = std::unique_ptr(new ClientSession(bev, options)); + + ConnectPacket connect_packet; + connect_packet.client_id = options.client_id; + connect_packet.clean_session(options.clean_session); + session->packet_manager->send_packet(connect_packet); + + } else if (events & (BEV_EVENT_ERROR | BEV_EVENT_EOF)) { + struct event_base *base = static_cast(arg); + if (events & BEV_EVENT_ERROR) { + int err = bufferevent_socket_get_dns_error(bev); + if (err) + std::cerr << "DNS error: " << evutil_gai_strerror(err) << "\n"; + } + + bufferevent_free(bev); + event_base_loopexit(base, NULL); + } + +} + +void usage() { + std::cout << +R"END(usage: mqtt_client_sub [OPTIONS] + +Connect to an MQTT broker and publish a single message to a single topic. + +OPTIONS + +--broker-host | -b Broker host name or ip address, default localhost +--broker-port | -p Broker port, default 1883 +--client-id | -i Client id, default none +--topic | -t Topic string to subscribe to, this option can be provided more that once to subscribe + to multiple topics, default none +--qos | -q QoS (Quality of Service), should be 0, 1, or 2, default 0 +--clean-session | -c Disable session persistence, default false +--help | -h Display this message and exit +)END"; + +} + +void parse_arguments(int argc, char *argv[]) { + static struct option longopts[] = { + {"broker-host", required_argument, NULL, 'b'}, + {"broker-port", required_argument, NULL, 'p'}, + {"client-id", required_argument, NULL, 'i'}, + {"topic", required_argument, NULL, 't'}, + {"qos", required_argument, NULL, 'q'}, + {"clean-session", no_argument, NULL, 'c'}, + {"help", no_argument, NULL, 'h'} + }; + + + int ch; + while ((ch = getopt_long(argc, argv, "b:p:i:t:q:ch", longopts, NULL)) != -1) { + switch (ch) { + case 'b': + options.broker_host = optarg; + break; + case 'p': + options.broker_port = static_cast(atoi(optarg)); + break; + case 'i': + options.client_id = optarg; + break; + case 't': + options.topics.push_back(optarg); + break; + case 'q': + options.qos = static_cast(atoi(optarg)); + break; + case 'c': + options.clean_session = true; + break; + case 'h': + usage(); + std::exit(0); + + default: + usage(); + std::exit(1); + } + } + +} + +static void close_cb(struct bufferevent *bev, void *arg) { + + event_base *base = static_cast(arg); + event_base_loopexit(base, NULL); +} + +static void signal_cb(evutil_socket_t fd, short event, void *arg) { + + DisconnectPacket disconnect_packet; + session->packet_manager->send_packet(disconnect_packet); + + bufferevent_disable(session->packet_manager->bev, EV_READ); + bufferevent_setcb(session->packet_manager->bev, NULL, close_cb, NULL, arg); +} diff --git a/custom_modules/mqtt_server/mqtt_broker/src/packet.cc b/custom_modules/mqtt_server/mqtt_broker/src/packet.cc new file mode 100644 index 0000000..01deed8 --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/src/packet.cc @@ -0,0 +1,509 @@ +/** + * @file packet.cc + */ + +#include "packet.h" +#include "client_id.h" + +#include + +void Packet::read_fixed_header(PacketDataReader &reader) { + + uint8_t command_header = reader.read_byte(); + type = static_cast(command_header >> 4); + header_flags = command_header & 0x0F; + + size_t remaining_length = reader.read_remaining_length(); + if (remaining_length != reader.get_packet_data().size() - reader.get_offset()) { + throw std::exception(); + } +} + +ConnectPacket::ConnectPacket(const packet_data_t &packet_data) { + PacketDataReader reader(packet_data); + + read_fixed_header(reader); + + if (type != PacketType::Connect) { + throw std::exception(); + } + + if (header_flags != 0) { + throw std::exception(); + } + + protocol_name = reader.read_string(); + protocol_level = reader.read_byte(); + connect_flags = reader.read_byte(); + keep_alive = reader.read_uint16(); + client_id = reader.read_string(); + + if (client_id.empty()) { + client_id = generate_client_id(); + } + + if (will_flag()) { + will_topic = reader.read_string(); + will_message = reader.read_bytes(); + } + + if (username_flag()) { + username = reader.read_string(); + } + + if (password_flag()) { + password = reader.read_bytes(); + } + +} + +packet_data_t ConnectPacket::serialize() const { + packet_data_t packet_data; + PacketDataWriter writer(packet_data); + writer.write_byte((static_cast(type) << 4) | (header_flags & 0x0F)); + + size_t remaining_length = 2 + protocol_name.size(); + remaining_length += 1; // protocol_level + remaining_length += 1; // connect_flags + remaining_length += 2; // keep_alive + remaining_length += 2 + client_id.size(); + + if (will_flag()) { + remaining_length += 2 + will_topic.size(); + remaining_length += 2 + will_message.size(); + } + + if (username_flag()) { + remaining_length += 2 + username.size(); + } + + if (password_flag()) { + remaining_length += 2 + password.size(); + } + + writer.write_remaining_length(remaining_length); + writer.write_string(protocol_name); + writer.write_byte(protocol_level); + writer.write_byte(connect_flags); + writer.write_uint16(keep_alive); + writer.write_string(client_id); + + if (will_flag()) { + writer.write_string(will_topic); + writer.write_bytes(will_message); + } + + if (username_flag()) { + writer.write_string(username); + } + + if (password_flag()) { + writer.write_bytes(password); + } + + return packet_data; +} + +ConnackPacket::ConnackPacket(const packet_data_t &packet_data) { + + PacketDataReader reader(packet_data); + + read_fixed_header(reader); + + if (type != PacketType::Connack) { + throw std::exception(); + } + + if (header_flags != 0) { + throw std::exception(); + } + + acknowledge_flags = reader.read_byte(); + return_code = static_cast(reader.read_byte()); +} + +packet_data_t ConnackPacket::serialize() const { + + packet_data_t packet_data; + PacketDataWriter writer(packet_data); + + writer.write_byte((static_cast(type) << 4) | (header_flags & 0x0F)); + writer.write_remaining_length(2); + writer.write_byte(acknowledge_flags); + writer.write_byte(static_cast(return_code)); + return packet_data; +} + +PublishPacket::PublishPacket(const packet_data_t &packet_data) { + + PacketDataReader reader(packet_data); + + read_fixed_header(reader); + + if (type != PacketType::Publish) { + throw std::exception(); + } + + topic_name = reader.read_string(); + + if (qos() != QoSType::QoS0) { + packet_id = reader.read_uint16(); + } + + size_t payload_len = packet_data.size() - reader.get_offset(); + + message_data = reader.read_bytes(payload_len); + +} + +packet_data_t PublishPacket::serialize() const { + packet_data_t packet_data; + PacketDataWriter writer(packet_data); + writer.write_byte((static_cast(type) << 4) | (header_flags & 0x0F)); + uint16_t remaining_length = 2 + topic_name.size() + message_data.size(); + if (qos() != QoSType::QoS0) { + remaining_length += 2; + } + writer.write_remaining_length(remaining_length); + writer.write_string(topic_name); + if (qos() != QoSType::QoS0) { + writer.write_uint16(packet_id); + } + for (size_t i = 0; i < message_data.size(); i++) { + writer.write_byte(message_data[i]); + } + + return packet_data; +} + +PubackPacket::PubackPacket(const packet_data_t &packet_data) { + + PacketDataReader reader(packet_data); + + read_fixed_header(reader); + + if (type != PacketType::Puback) { + throw std::exception(); + } + + if (header_flags != 0) { + throw std::exception(); + } + + packet_id = reader.read_uint16(); + +} + +packet_data_t PubackPacket::serialize() const { + + packet_data_t packet_data; + PacketDataWriter writer(packet_data); + writer.write_byte((static_cast(type) << 4) | (header_flags & 0x0F)); + writer.write_remaining_length(2); + writer.write_uint16(packet_id); + return packet_data; +} + +PubrecPacket::PubrecPacket(const packet_data_t &packet_data) { + + PacketDataReader reader(packet_data); + + read_fixed_header(reader); + + if (type != PacketType::Pubrec) { + throw std::exception(); + } + + if (header_flags != 0) { + throw std::exception(); + } + + packet_id = reader.read_uint16(); + +} + +packet_data_t PubrecPacket::serialize() const { + + packet_data_t packet_data; + PacketDataWriter writer(packet_data); + writer.write_byte((static_cast(type) << 4) | (header_flags & 0x0F)); + writer.write_remaining_length(2); + writer.write_uint16(packet_id); + return packet_data; +} + +PubrelPacket::PubrelPacket(const packet_data_t &packet_data) { + + PacketDataReader reader(packet_data); + + read_fixed_header(reader); + + if (type != PacketType::Pubrel) { + throw std::exception(); + } + + if (header_flags != 0x02) { + throw std::exception(); + } + + packet_id = reader.read_uint16(); + +} + +packet_data_t PubrelPacket::serialize() const { + + packet_data_t packet_data; + PacketDataWriter writer(packet_data); + writer.write_byte((static_cast(type) << 4) | (header_flags & 0x0F)); + writer.write_remaining_length(2); + writer.write_uint16(packet_id); + return packet_data; +} + +PubcompPacket::PubcompPacket(const packet_data_t &packet_data) { + + PacketDataReader reader(packet_data); + + read_fixed_header(reader); + + if (type != PacketType::Pubcomp) { + throw std::exception(); + } + + if (header_flags != 0) { + throw std::exception(); + } + + packet_id = reader.read_uint16(); + +} + +packet_data_t PubcompPacket::serialize() const { + + packet_data_t packet_data; + PacketDataWriter writer(packet_data); + writer.write_byte((static_cast(type) << 4) | (header_flags & 0x0F)); + writer.write_remaining_length(2); + writer.write_uint16(packet_id); + return packet_data; +} + +SubscribePacket::SubscribePacket(const packet_data_t &packet_data) { + + PacketDataReader reader(packet_data); + + read_fixed_header(reader); + + if (type != PacketType::Subscribe) { + throw std::exception(); + } + + if (header_flags != 0x02) { + throw std::exception(); + } + + packet_id = reader.read_uint16(); + + do { + std::string topic = reader.read_string(); + QoSType qos = static_cast(reader.read_byte()); + // TODO use emplace_back + subscriptions.push_back(Subscription{topic, qos}); + } while (!reader.empty()); +} + +packet_data_t SubscribePacket::serialize() const { + + packet_data_t packet_data; + PacketDataWriter writer(packet_data); + + writer.write_byte((static_cast(type) << 4) | (header_flags & 0x0F)); + + size_t remaining_length = 2; + for (auto subscription : subscriptions) { + remaining_length += 1 + 2 + std::string(subscription.topic_filter).size(); + } + + writer.write_remaining_length(remaining_length); + writer.write_uint16(packet_id); + + for (auto subscription : subscriptions) { + writer.write_string(std::string(subscription.topic_filter)); + writer.write_byte(static_cast(subscription.qos)); + } + + return packet_data; + +} + +SubackPacket::SubackPacket(const packet_data_t &packet_data) { + + PacketDataReader reader(packet_data); + + read_fixed_header(reader); + + if (type != PacketType::Suback) { + throw std::exception(); + } + + if (header_flags != 0) { + throw std::exception(); + } + + packet_id = reader.read_uint16(); + + do { + ReturnCode return_code = static_cast(reader.read_byte()); + return_codes.push_back(return_code); + } while (!reader.empty()); +} + +packet_data_t SubackPacket::serialize() const { + + packet_data_t packet_data; + PacketDataWriter writer(packet_data); + writer.write_byte((static_cast(type) << 4) | (header_flags & 0x0F)); + writer.write_remaining_length(2 + return_codes.size()); + writer.write_uint16(packet_id); + for (auto return_code : return_codes) { + writer.write_byte(static_cast(return_code)); + } + return packet_data; +} + +UnsubscribePacket::UnsubscribePacket(const packet_data_t &packet_data) { + + PacketDataReader reader(packet_data); + + read_fixed_header(reader); + + if (type != PacketType::Unsubscribe) { + throw std::exception(); + } + + if (header_flags != 0x02) { + throw std::exception(); + } + + packet_id = reader.read_uint16(); + + do { + topics.push_back(reader.read_string()); + } while (!reader.empty()); +} + +packet_data_t UnsubscribePacket::serialize() const { + + packet_data_t packet_data; + PacketDataWriter writer(packet_data); + writer.write_byte((static_cast(type) << 4) | (header_flags & 0x0F)); + size_t topics_size = 0; + for (auto topic : topics) { + // string data + 2 byte length + topics_size += topic.size() + 2; + } + writer.write_remaining_length(2 + topics_size); + writer.write_uint16(packet_id); + for (auto topic : topics) { + writer.write_string(topic); + } + return packet_data; +} + +UnsubackPacket::UnsubackPacket(const packet_data_t &packet_data) { + + PacketDataReader reader(packet_data); + + read_fixed_header(reader); + + if (type != PacketType::Unsuback) { + throw std::exception(); + } + + if (header_flags != 0) { + throw std::exception(); + } + + packet_id = reader.read_uint16(); +} + +packet_data_t UnsubackPacket::serialize() const { + + packet_data_t packet_data; + PacketDataWriter writer(packet_data); + writer.write_byte((static_cast(type) << 4) | (header_flags & 0x0F)); + writer.write_remaining_length(2); + writer.write_uint16(packet_id); + return packet_data; +} + +PingreqPacket::PingreqPacket(const packet_data_t &packet_data) { + + PacketDataReader reader(packet_data); + + read_fixed_header(reader); + + if (type != PacketType::Pingreq) { + throw std::exception(); + } + + if (header_flags != 0) { + throw std::exception(); + } +} + +packet_data_t PingreqPacket::serialize() const { + + packet_data_t packet_data; + PacketDataWriter writer(packet_data); + writer.write_byte((static_cast(type) << 4) | (header_flags & 0x0F)); + writer.write_remaining_length(0); + return packet_data; +} + +PingrespPacket::PingrespPacket(const packet_data_t &packet_data) { + + PacketDataReader reader(packet_data); + + read_fixed_header(reader); + + if (type != PacketType::Pingresp) { + throw std::exception(); + } + + if (header_flags != 0) { + throw std::exception(); + } +} + +packet_data_t PingrespPacket::serialize() const { + packet_data_t packet_data; + PacketDataWriter writer(packet_data); + writer.write_byte((static_cast(type) << 4) | (header_flags & 0x0F)); + writer.write_remaining_length(0); + return packet_data; +} + +DisconnectPacket::DisconnectPacket(const packet_data_t &packet_data) { + + PacketDataReader reader(packet_data); + + read_fixed_header(reader); + + if (type != PacketType::Disconnect) { + throw std::exception(); + } + + if (header_flags != 0) { + throw std::exception(); + } + +} + +packet_data_t DisconnectPacket::serialize() const { + packet_data_t packet_data; + PacketDataWriter writer(packet_data); + writer.write_byte((static_cast(type) << 4) | (header_flags & 0x0F)); + writer.write_remaining_length(0); + return packet_data; +} diff --git a/custom_modules/mqtt_server/mqtt_broker/src/packet.h b/custom_modules/mqtt_server/mqtt_broker/src/packet.h new file mode 100644 index 0000000..ab9335c --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/src/packet.h @@ -0,0 +1,503 @@ +/** + * @file packet.h + * + * Standard control packet classes. + * + * The MQTT 3.1.1 standard specifies the wire-level structure and operational behavior protocol control packets. This + * structure and some low level behavior is implemented here. + * + * Serialization of a control packet instance to wire format is accomplisted through instance serialization methods. + * + * Deserialization from the wire level is handled by a control packet constructor that accepts a octect sequence. + * + * Control packet instances also provide a default constructor that will create an instance using default values. + */ + +#pragma once + +#include "packet_data.h" +#include "topic.h" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +/** + * Enumeration constants for packet types. + * + * The interger values correspond to control packet type values as defined in the MQTT 3.1.1 standard. + */ +enum class PacketType { + Connect = 1, + Connack = 2, + Publish = 3, + Puback = 4, + Pubrec = 5, + Pubrel = 6, + Pubcomp = 7, + Subscribe = 8, + Suback = 9, + Unsubscribe = 10, + Unsuback = 11, + Pingreq = 12, + Pingresp = 13, + Disconnect = 14, +}; + +/** + * Enumeration constants for QoS values. + */ +enum class QoSType : uint8_t { + QoS0 = 0, + QoS1 = 1, + QoS2 = 2, +}; + +/** + * Subscription Class + * + * A subscription is composed of a TopicFilter and a QoS. Matching rules for topic filters differ from topic + * names. + */ +class Subscription { +public: + TopicFilter topic_filter; + QoSType qos; +}; + +/** + * Abstract base control packet class. + * + * Packet classes inherit this and extend as necessary. The serialize method implementation is required. + */ +class Packet { +public: + PacketType type; + uint8_t header_flags; + + virtual ~Packet() {} + + void read_fixed_header(PacketDataReader &); + + // static Packet unserialize(const packet_data_t &); + virtual packet_data_t serialize(void) const = 0; + +}; + +/** + * Connect control packet class + */ +class ConnectPacket : public Packet { +public: + + ConnectPacket() { + type = PacketType::Connect; + header_flags = 0; + protocol_name = "MQIsdp"; + protocol_level = 4; + } + + ConnectPacket(const packet_data_t &packet_data); + + packet_data_t serialize() const; + + std::string protocol_name; + uint8_t protocol_level; + uint8_t connect_flags; + uint16_t keep_alive; + std::string client_id; + std::string will_topic; + std::vector will_message; + std::string username; + std::vector password; + + bool clean_session() const { + return connect_flags & 0x02; + } + + void clean_session(bool set) { + if (set) { + connect_flags |= 0x02; + } else { + connect_flags &= ~0x02; + } + } + + bool will_flag() const { + return connect_flags & 0x04; + } + + void will_flag(bool set) { + if (set) { + connect_flags |= 0x04; + } else { + connect_flags &= ~0x04; + } + } + + QoSType qos() const { + return static_cast((connect_flags >> 3) & 0x03); + } + + void qos(QoSType qos) { + connect_flags |= (static_cast(qos) << 3); + } + + bool will_retain() const { + return connect_flags & 0x20; + } + + void will_retain(bool set) { + if (set) { + connect_flags |= 0x20; + } else { + connect_flags &= ~0x20; + } + } + + bool password_flag() const { + return connect_flags & 0x40; + } + + void password_flag(bool set) { + if (set) { + connect_flags |= 0x40; + } else { + connect_flags &= ~0x40; + } + } + + bool username_flag() const { + return connect_flags & 0x80; + } + + void username_flag(bool set) { + if (set) { + connect_flags |= 0x80; + } else { + connect_flags &= ~0x80; + } + } + +}; + +/** + * Connack control packet class. + */ +class ConnackPacket : public Packet { +public: + + ConnackPacket() { + type = PacketType::Connack; + header_flags = 0; + acknowledge_flags = 0; + } + + ConnackPacket(const packet_data_t &packet_data); + + enum class ReturnCode : uint8_t { + Accepted = 0x00, + UnacceptableProtocolVersion = 0x01, + IdentifierRejected = 0x02, + ServerUnavailable = 0x03, + BadUsernameOrPassword = 0x04, + NotAuthorized = 0x05 + }; + + packet_data_t serialize() const; + + uint8_t acknowledge_flags; + ReturnCode return_code; + + bool session_present() const { + return acknowledge_flags & 0x01; + } + + void session_present(bool set) { + if (set) { + acknowledge_flags |= 0x01; + } else { + acknowledge_flags &= ~0x01; + } + } + +}; + +/** + * Publish control packet class. + */ +class PublishPacket : public Packet { +public: + + PublishPacket() { + type = PacketType::Publish; + header_flags = 0; + } + + PublishPacket(const packet_data_t &packet_data); + + packet_data_t serialize() const; + + std::string topic_name; + std::vector message_data; + uint16_t packet_id; + + bool dup() const { + return header_flags & 0x08; + } + + void dup(bool set) { + if (set) { + header_flags |= 0x08; + } else { + header_flags &= ~0x08; + } + } + + QoSType qos() const { + return static_cast((header_flags >> 1) & 0x03); + } + + void qos(QoSType qos) { + header_flags |= static_cast(qos) << 1; + } + + bool retain() const { + return header_flags & 0x01; + } + + void retain(bool set) { + if (set) { + header_flags |= 0x01; + } else { + header_flags &= ~0x01; + } + } +}; + +/** + * Puback control packet class. + */ +class PubackPacket : public Packet { + +public: + + PubackPacket() { + type = PacketType::Puback; + header_flags = 0; + } + + PubackPacket(const packet_data_t &packet_data); + + packet_data_t serialize() const; + + uint16_t packet_id; +}; + +/** + * Pubrec control packet class. + */ +class PubrecPacket : public Packet { + +public: + + PubrecPacket() { + type = PacketType::Pubrec; + header_flags = 0; + } + + PubrecPacket(const packet_data_t &packet_data); + + packet_data_t serialize() const; + + uint16_t packet_id; +}; + +/** + * Pubrel control packet class. + */ +class PubrelPacket : public Packet { + +public: + + PubrelPacket() { + type = PacketType::Pubrel; + header_flags = 0x02; + } + + PubrelPacket(const packet_data_t &packet_data); + + packet_data_t serialize() const; + + uint16_t packet_id; +}; + +/** + * Pubcomp control packet class. + */ +class PubcompPacket : public Packet { + +public: + + PubcompPacket() { + type = PacketType::Pubcomp; + header_flags = 0; + } + + PubcompPacket(const packet_data_t &packet_data); + + packet_data_t serialize() const; + + uint16_t packet_id; +}; + +/** + * Subscribe control packet class. + */ +class SubscribePacket : public Packet { + +public: + + SubscribePacket() { + type = PacketType::Subscribe; + header_flags = 0x02; + } + + SubscribePacket(const packet_data_t &packet_data); + + packet_data_t serialize() const; + + uint16_t packet_id; + + std::vector subscriptions; + +}; + +/** + * Suback control packet class. + */ +class SubackPacket : public Packet { + +public: + + SubackPacket() { + type = PacketType::Suback; + header_flags = 0; + } + + SubackPacket(const packet_data_t &packet_data); + + packet_data_t serialize() const; + + enum class ReturnCode : uint8_t { + SuccessQoS0 = 0x00, + SuccessQoS1 = 0x01, + SuccessQoS2 = 0x02, + Failure = 0x80 + }; + + uint16_t packet_id; + std::vector return_codes; + +}; + +/** + * Unsubscribe control packet class. + */ +class UnsubscribePacket : public Packet { + +public: + + UnsubscribePacket() { + type = PacketType::Unsubscribe; + header_flags = 0x02; + } + + UnsubscribePacket(const packet_data_t &packet_data); + + packet_data_t serialize() const; + + uint16_t packet_id; + + std::vector topics; + +}; + +/** + * Unsuback control packet class. + */ +class UnsubackPacket : public Packet { + +public: + + UnsubackPacket() { + type = PacketType::Unsuback; + header_flags = 0; + } + + UnsubackPacket(const packet_data_t &packet_data); + + packet_data_t serialize() const; + + uint16_t packet_id; + +}; + +/** + * Pingreq control packet class. + */ +class PingreqPacket : public Packet { + +public: + + PingreqPacket() { + type = PacketType::Pingreq; + header_flags = 0; + } + + PingreqPacket(const packet_data_t &packet_data); + + packet_data_t serialize() const; +}; + +/** + * Pingresp control packet class. + */ +class PingrespPacket : public Packet { + +public: + + PingrespPacket() { + type = PacketType::Pingresp; + header_flags = 0; + } + + PingrespPacket(const packet_data_t &packet_data); + + packet_data_t serialize() const; +}; + +/** + * Disconnect control packet class. + */ +class DisconnectPacket : public Packet { + +public: + + DisconnectPacket() { + type = PacketType::Disconnect; + header_flags = 0; + } + + DisconnectPacket(const packet_data_t &packet_data); + + packet_data_t serialize() const; +}; + diff --git a/custom_modules/mqtt_server/mqtt_broker/src/packet_data.cc b/custom_modules/mqtt_server/mqtt_broker/src/packet_data.cc new file mode 100644 index 0000000..0c9d738 --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/src/packet_data.cc @@ -0,0 +1,130 @@ +/** + * @file packet_data.cc + */ + +#include "packet_data.h" + +#include + +void PacketDataWriter::write_remaining_length(size_t length) { + + if (length > 127 + 128*127 + 128*128*127 + 128*128*128*127) { + throw std::exception(); + } + + do { + uint8_t encoded_byte = length % 0x80; + length >>= 7; + if (length > 0) { + encoded_byte |= 0x80; + } + packet_data.push_back(encoded_byte); + } while (length > 0); +} + +void PacketDataWriter::write_byte(uint8_t byte) { + packet_data.push_back(byte); +} + +void PacketDataWriter::write_uint16(uint16_t word) { + packet_data.push_back((word >> 8) & 0xFF); + packet_data.push_back(word & 0xFF); +} + +void PacketDataWriter::write_string(const std::string &s) { + write_uint16(s.size()); + std::copy(s.begin(), s.end(), std::back_inserter(packet_data)); +} + +void PacketDataWriter::write_bytes(const packet_data_t & b) { + write_uint16(b.size()); + std::copy(b.begin(), b.end(), std::back_inserter(packet_data)); +} + +bool PacketDataReader::has_remaining_length() { + + size_t remaining = std::min(packet_data.size() - offset, 4); + + for (size_t i=offset; i packet_data.size()) { + throw std::exception(); + } + return packet_data[offset++]; +} + + +uint16_t PacketDataReader::read_uint16() { + if (offset + 2 > packet_data.size()) { + throw std::exception(); + } + uint8_t msb = packet_data[offset++]; + uint8_t lsb = packet_data[offset++]; + return (msb << 8) + lsb; +} + +std::string PacketDataReader::read_string() { + uint16_t len = read_uint16(); + if (offset + len > packet_data.size()) { + throw std::exception(); + } + std::string s(&packet_data[offset], &packet_data[offset + len]); + offset += len; + return s; +} + +std::vector PacketDataReader::read_bytes() { + uint16_t len = read_uint16(); + if (offset + len > packet_data.size()) { + throw std::exception(); + } + std::vector v(&packet_data[offset], &packet_data[offset + len]); + offset += len; + return v; +} + +std::vector PacketDataReader::read_bytes(size_t len) { + if (offset + len > packet_data.size()) { + throw std::exception(); + } + std::vector v(&packet_data[offset], &packet_data[offset + len]); + offset += len; + return v; +} + +bool PacketDataReader::empty() { + return offset == packet_data.size(); +} diff --git a/custom_modules/mqtt_server/mqtt_broker/src/packet_data.h b/custom_modules/mqtt_server/mqtt_broker/src/packet_data.h new file mode 100644 index 0000000..c98f218 --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/src/packet_data.h @@ -0,0 +1,171 @@ +/** + * @file packet_data.h + * + * Utility classes supporting serialization/deserialization of MQTT control packets. + */ + +#pragma once + +#include +#include +#include +#include + +/** Typedef for packet data container. */ +typedef std::vector packet_data_t; + +/** + * Serialization class. + * + * Methods are provided to write native types to the MQTT 3.1.1 standard wire format. + */ +class PacketDataWriter +{ +public: + + /** + * Constructor + * + * Accepts a reference to a packet_data_t container. This container will be filled with serialized data and + * can be sent directly through the network connection. + * + * @param packet_data A reference to a packet_data_t container. + */ + PacketDataWriter(packet_data_t & packet_data) : packet_data(packet_data) { + packet_data.resize(0); + } + + /** + * Write the integer length to the container using the MQTT 3.1.1 remaining length encoding scheme. + * + * @param length The value to encode. + */ + void write_remaining_length(size_t length); + + /** Write a byte to the packet_data_t container. */ + void write_byte(uint8_t byte); + + /** Write a 16 bit value to the packet_data_t container. */ + void write_uint16(uint16_t word); + + /** Write a UTF-8 character string to the packet_data_t container. */ + void write_string(const std::string & s); + + /** Copy the contents of a packet_data_t container into this instance's container. */ + void write_bytes(const packet_data_t & b); + +private: + + /** A reference to the packet_data_t container. */ + packet_data_t & packet_data; +}; + +/** + * Deserialization class. + * + * Methods are provided to read native types from the wire encoded control packets received over the network + * connection. MQTT 3.1.1 standard decoding methods are implemented. + */ +class PacketDataReader +{ + +public: + + /** + * Constructor + * + * Accepts a reference to a packet_data_t container that contains data received directly over a network connection. + * The class also contains a current offset pointer that is initialized to point to the beginning of the container. + * Each data read will advance the offset pointer forward to the next item in the container. + * + * @param packet_data A reference to a packet_data_t container. + */ + PacketDataReader(const packet_data_t & packet_data) : offset(0), packet_data(packet_data) {} + + /** + * Is a remaing lenght value present. + * + * Primitive function indicating a valid remaining_length value can be read from the current position in the + * packet_data_t container. The remaining length value is encoded in a variable sequence from 1 to 4 bytes. + * + * @return valid remaining length. + */ + bool has_remaining_length(); + + /** + * Read the remaining length value from the packet_data_t container. + * + * @return integer. + */ + size_t read_remaining_length(); + + /** + * Read a single byte from the packet_data_t container. + * + * @return byte. + */ + uint8_t read_byte(); + + /** + * Read a 16 bit value from the packet_data_t container. + * + * @return 16 bit integer. + */ + uint16_t read_uint16(); + + /** + * Read a UTF-8 encoded string from the packet_data_t container. + * + * @return character string. + */ + std::string read_string(); + + /** + * Read a byte sequence from the packet_data_t container. + * + * The sequence is assumed to be encoded according to the MQTT 3.1.1 standard wire format for a byte sequence. + * The sequence length is encoded along with the data. + * + * @return byte seqence. + */ + std::vector read_bytes(); + + /** + * Read a byte sequence from the packet_data_t contianer. + * + * The number of bytes read is passed as an argument to the method. + * + * @param len Lenght of the sequence to read. + * @return Byte sequence + */ + std::vector read_bytes(size_t len); + + /** + * Is the packet_data_t container empty. + * + * @return empty. + */ + bool empty(); + + /** + * Return the current packet_data_t container offset pointer. + * + * @return integer. + */ + size_t get_offset() { return offset; } + + /** + * Get a reference to the packet_data_t container. + * + * @return Reference to packet_data_t container. + */ + const packet_data_t & get_packet_data() { return packet_data; } + +private: + + /** Current packet_data_t container offset pointer */ + size_t offset; + + /** Packet data container. */ + const packet_data_t & packet_data; +}; diff --git a/custom_modules/mqtt_server/mqtt_broker/src/packet_manager.cc b/custom_modules/mqtt_server/mqtt_broker/src/packet_manager.cc new file mode 100644 index 0000000..5003a6a --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/src/packet_manager.cc @@ -0,0 +1,161 @@ +/** + * @file packet_manager.cc + */ + +#include "packet_manager.h" +#include "packet.h" + +#include +#include + +#include + +void PacketManager::receive_packet_data() { + + struct evbuffer *input = bufferevent_get_input(bev); + + while (evbuffer_get_length(input) != 0) { + + size_t available = evbuffer_get_length(input); + + if (available < 2) { + return; + } + + if (fixed_header_length == 0) { + + size_t peek_size = std::min(static_cast(available), 5); + + std::vector peek_buffer(peek_size); + evbuffer_copyout(input, &peek_buffer[0], peek_size); + + PacketDataReader reader(peek_buffer); + reader.read_byte(); + if (!reader.has_remaining_length()) { + if (peek_size == 5) { + if (event_handler) { + event_handler(EventType::ProtocolError); + } + evbuffer_drain(input, peek_size); + } + return; + } + + remaining_length = reader.read_remaining_length(); + fixed_header_length = reader.get_offset(); + + } + + size_t packet_size = fixed_header_length + remaining_length; + + if (available < packet_size) { + return; + } + + packet_data_t packet_data(packet_size); + evbuffer_remove(input, &packet_data[0], packet_size); + + fixed_header_length = 0; + remaining_length = 0; + + std::unique_ptr packet = parse_packet_data(packet_data); + + if (packet && packet_received_handler) { + packet_received_handler(std::move(packet)); + } + } +} + +std::unique_ptr PacketManager::parse_packet_data(const std::vector &packet_data) { + + PacketType type = static_cast(packet_data[0] >> 4); + + std::unique_ptr packet; + + try { + switch (type) { + case PacketType::Connect: + packet = std::unique_ptr(new ConnectPacket(packet_data)); + break; + case PacketType::Connack: + packet = std::unique_ptr(new ConnackPacket(packet_data)); + break; + case PacketType::Publish: + packet = std::unique_ptr(new PublishPacket(packet_data)); + break; + case PacketType::Puback: + packet = std::unique_ptr(new PubackPacket(packet_data)); + break; + case PacketType::Pubrec: + packet = std::unique_ptr(new PubrecPacket(packet_data)); + break; + case PacketType::Pubrel: + packet = std::unique_ptr(new PubrelPacket(packet_data)); + break; + case PacketType::Pubcomp: + packet = std::unique_ptr(new PubcompPacket(packet_data)); + break; + case PacketType::Subscribe: + packet = std::unique_ptr(new SubscribePacket(packet_data)); + break; + case PacketType::Suback: + packet = std::unique_ptr(new SubackPacket(packet_data)); + break; + case PacketType::Unsubscribe: + packet = std::unique_ptr(new UnsubscribePacket(packet_data)); + break; + case PacketType::Unsuback: + packet = std::unique_ptr(new UnsubackPacket(packet_data)); + break; + case PacketType::Pingreq: + packet = std::unique_ptr(new PingreqPacket(packet_data)); + break; + case PacketType::Pingresp: + packet = std::unique_ptr(new PingrespPacket(packet_data)); + break; + case PacketType::Disconnect: + packet = std::unique_ptr(new DisconnectPacket(packet_data)); + break; + } + } catch (std::exception &e) { + if (event_handler) { + event_handler(EventType::ProtocolError); + } + } + return packet; +} + +void PacketManager::send_packet(const Packet &packet) { + std::vector packet_data = packet.serialize(); + if (bev) { + bufferevent_write(bev, &packet_data[0], packet_data.size()); + } else { + std::cout << "not writing to closed bev\n"; + } +} + +void PacketManager::close_connection() { + if (bev) { + evutil_socket_t fd = bufferevent_getfd(bev); + evutil_closesocket(fd); + bufferevent_free(bev); + bev = nullptr; + } +} + +void PacketManager::handle_events(short events) { + + if (events & BEV_EVENT_EOF) { + if (event_handler) { + event_handler(EventType::ConnectionClosed); + } + } else if (events & BEV_EVENT_ERROR) { + if (event_handler) { + event_handler(EventType::NetworkError); + } + } else if (events & BEV_EVENT_TIMEOUT) { + if (event_handler) { + event_handler(EventType::Timeout); + } + } +} diff --git a/custom_modules/mqtt_server/mqtt_broker/src/packet_manager.h b/custom_modules/mqtt_server/mqtt_broker/src/packet_manager.h new file mode 100644 index 0000000..eb6a482 --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/src/packet_manager.h @@ -0,0 +1,225 @@ +/** + * @file packet_manager.h + * + * Manage low level network communications. + * + * The PacketManager is responsible for sending and receiving MQTT control packets across the network connection. A + * PacketManager instance is installed into every BaseSession and can be moved between sessions to implement + * session persistance. + * + * MQTT control packets received by the PacketManager and network events are forwared to containing sessions through + * callbacks. Session instances control the packet manager by invoking its methods directly. + */ + +#pragma once + +#include "packet.h" + +#include +#include + +#include +#include +#include +#include + +/** + * PacketManager class. + * + * Manage low level network operations and invoke callbacks on MQTT control packet reception or network event. + */ +class PacketManager { +public: + + /** + * Enumeration constants for PacketManager events. + * + * Events are low level network events or unrecoverable protocol errors. + * + */ + enum class EventType { + NetworkError, + ProtocolError, + ConnectionClosed, + Timeout, + }; + + /** + * Constructor + * + * The PacketManager constructor accepts a pointer to a libevent bufferevent internal structure. Callbacks are + * installed for network data received and network events. A pointer to this PacketManager instance is passed + * as the user data argument. It will be used to invoke instance methods from the static callback wrapper. + * + * @param bev Pointer to a bufferevent control structure. + */ + PacketManager(struct bufferevent *bev) : bev(bev) { + bufferevent_setcb(bev, input_ready, NULL, network_event, this); + bufferevent_enable(bev, EV_READ); + } + + /** + * Destructor + * + * Will free the bufferevent pointer. This call should also close any underlying socket connection provided + * the libevent flag LEV_OPT_CLOSE_ON_FREE was used to create the bufferevent. + */ + ~PacketManager() { + if (bev) { + bufferevent_free(bev); + bev = nullptr; + } + } + + /** + * Send a control packet through the network connection. + * + * This method is invoked by containing session instances when they want to send a control packet. The packet + * will be serialized and transmitted provided the underlying socket connection is not closed. + */ + void send_packet(const Packet &); + + /** + * Close the network connection. + * + * Explicitly close the network connection maintained by the bufferevent. This connection should also be closed + * when the destructor for this instance is run provided the bufferevent was created with the LEV_OPT_CLOSE_ON_FREE + * flag. + */ + void close_connection(); + + /** + * Set the packet received callback. + * + * This callback will be invoked when a packet is received from the network and deserialized. The callback will + * assume ownership of the packet memory. A pointer to the base packet type is passed and the actual packet can + * be recovered through a dynamic_cast<>(). + * + * @param handler Callback function. + */ + void set_packet_received_handler(std::function)> handler) { + packet_received_handler = handler; + } + + /** + * Set the network event callback. + * + * This callback will be invoked when a low level network or protocol error is detected. The type of event is + * indicated by the an EventType enumeration constant passed as an argument to the callback. + * + * @param handler Callback function. + */ + void set_event_handler(std::function handler) { + event_handler = handler; + } + + /** + * Return the next available packet id in sequence. + * + * @return Next packet id. + */ + uint16_t next_packet_id() { + + if (++packet_id == 0) { + ++packet_id; + } + return packet_id; + } + + /** + * Pointer to the contained libevent bufferevent internal control structure. + */ + struct bufferevent *bev; + +private: + + /** + * Data receiving method. + * + * This instance method is invoked from the static input_ready callback wrapper. It is run asynchronously + * whenever data is received from the network connection. The data will be buffered inside the bufferevent control + * structure until a complete control packet is received. At that point the packet will be deserialized and + * passed to any installed packet_received_handler callback. + */ + void receive_packet_data(); + + /** + * Libevent callback wrapper. + * + * Static method invoked by libevent C library. The callback method will be passed a pointer to a bufferevent + * control structure. This should be the same pointer that was originally passed to the PacketManager constructor. + * The method will also receive a pointer to the containing PacketManager instance which will be used to invoke the + * receive_packet_data instance method. + * + * @param bev Pointer to a bufferevent + * @param arg Pointer to user data installed with the callback. In this case it is a pointer to the containing + * PacketManager instance. + */ + static void input_ready(struct bufferevent *bev, void *arg) { + PacketManager *_this = static_cast(arg); + _this->receive_packet_data(); + } + + /** + * Network event callback. + * + * This instance method is invoked from the static network_event callback wrapper. It will be passed a set of + * flags indicating the type of network event that caused the invocation. The method will then deletegate to any + * installed event_handler callback passing the event type as the an EventType argument. + * + * @param events + */ + void handle_events(short events); + + /** + * Libevent callback wrapper. + * + * The callback method will be passed a pointer to a bufferevent control structure. This should be the same + * pointer that was originally passed to the PacketManager constructor. The method will also receive a pointer + * to the containing PacketManager instance which will be used to invoke the handle_events instance method. + * + * @param bev Pointer to a bufferevent control structure. + * @param events Flags indicating the type of event that caused the callback to be invoked. + * @param arg Pointer to the containing PacketManager instance, installed along with the callback wrapper. + */ + static void network_event(struct bufferevent *bev, short events, void *arg) { + PacketManager *_this = static_cast(arg); + _this->handle_events(events); + } + + /** + * Packet deserialization method. + * + * This method will receive a packet_data_t container when the receive_packet_data method has determined + * that data for a complete packet has been received. The packet will be deserialized and a reference counted + * pointer to the instantiated packet will be returned. + * + * @param packet_data Reference to a packet_data_t container. + * @return Reference counted pointer to Packet. + */ + std::unique_ptr parse_packet_data(const packet_data_t &packet_data); + + /** + * Packet received callback. + * + * Callback installed to be invoked by this PacketManager when an MQTT control packet is received from the network. + */ + std::function)> packet_received_handler; + + /** + * Network event callback. + * + * Callback installed to be invoked by this PacketManager when a low level network or protocol error is detected. + */ + std::function event_handler; + + /** Packet id counter. */ + uint16_t packet_id = 0; + + /** State variable used to determine when data for a complete control packet is available. */ + size_t fixed_header_length = 0; + + /** State variable used to determine when data for a complete control packet is available. */ + size_t remaining_length = 0; + +}; \ No newline at end of file diff --git a/custom_modules/mqtt_server/mqtt_broker/src/session_manager.cc b/custom_modules/mqtt_server/mqtt_broker/src/session_manager.cc new file mode 100644 index 0000000..7853328 --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/src/session_manager.cc @@ -0,0 +1,57 @@ +/** + * @file session_manager.cc + */ + +#include "session_manager.h" +#include "broker_session.h" +#include "topic.h" + +#include +#include + +void SessionManager::accept_connection(struct bufferevent *bev) { + + auto session = std::unique_ptr(new BrokerSession(bev, *this)); + sessions.push_back(std::move(session)); +} + +std::list >::iterator SessionManager::find_session(const std::string &client_id) { + + return find_if(sessions.begin(), sessions.end(), [&client_id](const std::unique_ptr &s) { + return (!s->client_id.empty() and (s->client_id == client_id)); + }); +} + +void SessionManager::erase_session(const std::string &client_id) { + sessions.erase(std::remove_if(sessions.begin(), sessions.end(), [&client_id](std::unique_ptr &s) { + return (!s->client_id.empty() and (s->client_id == client_id)); + }), + sessions.end()); +} + +void SessionManager::erase_session(const BrokerSession *session) { + sessions.erase(std::remove_if(sessions.begin(), sessions.end(), [session](std::unique_ptr &s) { + return s.get() == session; + }), + sessions.end()); +} + +void SessionManager::handle_publish(const PublishPacket &packet) { + for (auto &session : sessions) { + for (auto &subscription : session->subscriptions) { + if (topic_match(subscription.topic_filter, TopicName(packet.topic_name))) { + session->forward_packet(packet); + } + } + } +} + +void SessionManager::handle_local_publish(const std::string &client_id, const PublishPacket &packet) { + for (size_t i = 0; i < local_sessions.size(); ++i) { + LocalSession &l = local_sessions[i]; + + if (topic_match(l.filter, TopicName(packet.topic_name))) { + l.func(client_id, packet.message_data, l.obj); + } + } +} \ No newline at end of file diff --git a/custom_modules/mqtt_server/mqtt_broker/src/session_manager.h b/custom_modules/mqtt_server/mqtt_broker/src/session_manager.h new file mode 100644 index 0000000..5cc1653 --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/src/session_manager.h @@ -0,0 +1,104 @@ +/** + * @file session_manager.h + * + * Manage BrokerSessions. + * + * The SessionManager maintains a contaier of all sessions in a broker. BrokerConnects are created in the session + * manager when a network session is accepted the new session is added to the sessions container. The session is then + * responsible for managing the MQTT protocol. The MQTT 3.1.1 standard requires that sessions + * can persist after a client disconnects and that any subscribe QoS 1 and Qo2 messages published while disconnected be + * delivered on reconnection. + * + * The SessionManager is responsible for forwarding published messages to all subscribing clients. + */ + +#pragma once + +#include +#include +#include +#include + +struct bufferevent; + +class BrokerSession; +class PublishPacket; + +/** + * SessionManager class + * + * Composes a container of broker sessions and methods to manage them. + */ +class SessionManager { +public: + /** + * Accept a new network connection. + * + * Creates a new BrokerSession instance and adds it to the container of sessions. The session instance will manage + * the MQTT protocol. + * + * @param bev Pointer to a bufferevent + */ + void accept_connection(struct bufferevent *bev); + + /** + * Find a session in the session container. + * + * @param client_id Unique client id to find. + * @return Iterator to BrokerSession. + */ + std::list >::iterator find_session(const std::string &client_id); + + /** + * Delete a session + * + * Given a pointer to a BrokerSession, finds that session in the session container and removes it from the + * container. The session instance will be deleted. + * + * @param session Pointer to a BrokerSession; + */ + void erase_session(const BrokerSession *session); + + /** + * Finds a session in the session container with the given client id. If found the session is removed from the + * container. The session instance will be deleted. + * + * @param client_id A Client id. + */ + void erase_session(const std::string &client_id); + + /** + * Forward a message to subsribed clients. + * + * Searches through each session and their subscriptions and invokes the forward_packet method on each session + * instance with a matching subscribed TopicFilter. The session will be responsible for Managing the MQTT publish + * protocol and correctly delivering the message to its subscribed client + * + * @param publish_packet Reference to a PublishPacket; + */ + void handle_publish(const PublishPacket &publish_packet); + + void handle_local_publish(const std::string &client_id, const PublishPacket &packet); + + /** Container of BrokerSessions. */ + std::list > sessions; + +public: + void add_local_session(const std::string &filter, void (*func)(const std::string &client_id, const std::vector &data, void *obj), void *obj) { + LocalSession l; + + l.filter = filter; + l.func = func; + l.obj = obj; + + local_sessions.push_back(l); + } + + struct LocalSession { + std::string filter; + void (*func)(const std::string &client_id, const std::vector &data, void *obj); + void *obj; + }; + + std::vector local_sessions; +}; \ No newline at end of file diff --git a/custom_modules/mqtt_server/mqtt_broker/src/topic.cc b/custom_modules/mqtt_server/mqtt_broker/src/topic.cc new file mode 100644 index 0000000..ec7c718 --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/src/topic.cc @@ -0,0 +1,131 @@ +/** + * @file topic.cc + */ + +#include "topic.h" + +#include + +TopicName::TopicName(const std::string & s) { + if (!is_valid(s)) { + throw std::exception(); + } + + name = s; +} + +bool TopicName::is_valid(const std::string &s) const { + if (s.size() > MaxNameSize) { + return false; + } + + if ((s.find('+') == std::string::npos) and (s.find('#') == std::string::npos)) { + return true; + } + + return false; +} + +TopicFilter::TopicFilter(const std::string &s) { + if (!is_valid(s)) { + throw std::exception(); + } + + filter = s; +} + +bool TopicFilter::is_valid(const std::string &s) const { + + if (s.size() > MaxFilterSize) { + return false; + } + + size_t pos = 0; + + for (char c : s) { + if (c == '+') { + if ((pos != 0 and s[pos - 1] != '/') or (pos + 1 != s.size() and s[pos + 1] != '/')) { + return false; + } + } else if (c == '#') { + if ((pos != 0 and s[pos - 1] != '/') or (pos + 1 != s.size())) { + return false; + } + } + pos++; + } + + return true; +} + +bool topic_match(const TopicFilter &filter, const TopicName &name) { + + const std::string &f = filter.filter; + const std::string &n = name.name; + + // empty strings don't match + if (f.empty() or n.empty()) { + return false; + } + + size_t fpos = 0; + size_t npos = 0; + + // Cannot match $ with wildcard + if ((f[fpos] == '$' and n[npos] != '$') or (f[fpos] != '$' and n[npos] == '$')) { + return false; + } + + while (fpos < f.size() and npos < n.size()) { + + // filter and topic name match at the current position + if (f[fpos] == n[npos]) { + + // last character in the topic name + if (npos == n.size() - 1) { + + // at the end of the topic name and the filter has a separator followed by a multi-level wildcard, + // causing a parent level match. + if ((fpos == f.size() - 3) and (f[fpos + 1] == '/') and (f[fpos + 2] == '#')) { + return true; + } + } + + fpos++; + npos++; + + // at the end of both the filter and topic name, match + if ((fpos == f.size()) && (npos == n.size())) { + return true; + + // at the end of the topic name and the next character in the filter is wildcard. + } else if ((npos == n.size()) and (fpos == f.size() - 1) and (f[fpos] == '+')) { + return true; + } + + } else { + + if (f[fpos] == '+') { + fpos++; + while ((npos < n.size()) and (n[npos] != '/')) { + npos++; + } + if ((npos == n.size()) and (fpos == f.size())) { + return true; + } + } else if (f[fpos] == '#') { + return true; + } else { + return false; + } + } + } + + return false; +} + +bool topic_match(const TopicFilter &filter1, const TopicFilter &filter2) { + + return filter1.filter == filter2.filter; + +} \ No newline at end of file diff --git a/custom_modules/mqtt_server/mqtt_broker/src/topic.h b/custom_modules/mqtt_server/mqtt_broker/src/topic.h new file mode 100644 index 0000000..601e1ba --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_broker/src/topic.h @@ -0,0 +1,141 @@ +/** + * @file topic.h + * + * Classes for managing topic names and topic filters. + * + * The MQTT 3.1.1 standard allows structured topic names. It also defines rules for matching these names and provides + * wildcard characters to enhance matching rules. + */ + +#pragma once + +#include + +class TopicFilter; + +/** + * Topic Name + * + * Topic names are UTF-8 encoded character strings. The have a structure imposed by the MQTT 3.1.1 standard. This + * class enforces that structure. Topic names differe from topic filters in that topic filters allow wildcard + * characters. + * + * This class friends the topic_match function. + */ +class TopicName { +public: + + /** Maximum lenght of a topic name according the the MQTT 3.1.1 standard. */ + const static size_t MaxNameSize = 65535; + + /** + * Constructor + * + * The string will be validated against the MQTT topic name rules. An exception will be thrown if the name is + * invalid. + * + * @param name A reference to a the topic name string. + */ + TopicName(const std::string & name); + + /** + * Validate the topic name against the MQTT 3.1.1 standard rules. + * + * @param name A name string. + * @return Topic name is valid. + */ + bool is_valid(const std::string & name) const; + + /** + * Cast an instance of this class to a std::string. + * + * @return std::string + */ + operator std::string() const {return name;} + + /** Matching friend function. */ + friend bool topic_match(const TopicFilter &, const TopicName &); + +private: + + /** The name character string. */ + std::string name; +}; + +/** + * Topic Filter + * + * Topic filters are composed of UTF-8 encoded character strings. The have a structure imposed by the MQTT 3.1.1 + * standard including wildcard characters. This class enforces that structure. + * + * This class friends the topic_match function. + */ +class TopicFilter { +public: + + /** Maximum lenght of a topic filter according the the MQTT 3.1.1 standard. */ + const static size_t MaxFilterSize = 65535; + + /** + * Constructor + * + * The string will be validated against the MQTT topic filter rules. An exception will be thrown if the filter is + * invalid. + * + * @param filter A reference to a the topic filter string. + */ + TopicFilter(const std::string & filter); + + /** + * Validate the topic filter against the MQTT 3.1.1 standard rules. + * + * @param filter A filter string. + * @return Topic filter is valid. + */ + bool is_valid(const std::string & filter) const; + + /** + * Cast an instance of this class to a std::string. + * + * @return std::string + */ + operator std::string() const {return filter;} + + /** Matching friend function. */ + friend bool topic_match(const TopicFilter &, const TopicName &); + + /** Matching friend function. */ + friend bool topic_match(const TopicFilter &, const TopicFilter &); + +private: + + /** The filter character string. */ + std::string filter; +}; + +/** + * Match a TopicFilter against a TopicName. + * + * The MQTT 3.1.1 standard topic filter matching rules will be applied including wildcard characters. + * + * @param topic_filter A reference to a TopicFilter. + * @param topic_name A reference to a TopicName + * @return match + */ +bool topic_match(const TopicFilter & topic_filter, const TopicName & topic_name); + +/** + * Match a TopicFilter against another TopicFilter. + * + * The MQTT 3.1.1 standard topic filter matching rules are applied. These are a direct character by character match. + * This function can be used when finding an existing subscription filter, in that case wildcard character matching + * does not apply. + * + * For example, the topic filters 'a/+/c' and 'a/#' both match the topic name 'a/b/c' but the two topic filters do not + * match. + * + * @param topic_filter A reference to a TopicFilter. + * @param topic_name A reference to a TopicName + * @return match + */ +bool topic_match(const TopicFilter & topic_filter, const TopicFilter & topic_name); diff --git a/custom_modules/mqtt_server/mqtt_server.cpp b/custom_modules/mqtt_server/mqtt_server.cpp new file mode 100644 index 0000000..b5c8f46 --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_server.cpp @@ -0,0 +1,85 @@ +#include "mqtt_server.h" + +void MQTTServer::add_local_session(const std::string &filter, void (*func)(const std::string &client_id, const std::vector &data, void *obj), void* obj) { + session_manager->add_local_session(filter, func, obj); +} + +void MQTTServer::initialize() { + evloop = event_base_new(); + + if (!evloop) { + printf("Could not initialize libevent\n"); + return; + } + + std::memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + evutil_inet_pton(sin.sin_family, bind_address.c_str(), &sin.sin_addr); + sin.sin_port = htons(port); + + listener = evconnlistener_new_bind(evloop, MQTTServer::listener_cb, + (void *)this, + LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1, + (struct sockaddr *)&sin, sizeof(sin)); + + if (!listener) { + std::cerr << "Could not create listener!\n"; + return; + } +} + +void MQTTServer::listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sa, int socklen, void *user_data) { + + MQTTServer *server = static_cast(user_data); + + struct bufferevent *bev; + + bev = bufferevent_socket_new(server->evloop, fd, BEV_OPT_CLOSE_ON_FREE); + if (!bev) { + std::cerr << "Error constructing bufferevent!\n"; + event_base_loopbreak(server->evloop); + return; + } + + server->session_manager->accept_connection(bev); +} + +void MQTTServer::run_async() { + if (_thread) { + printf("MQTTServer::run_async Error! A thread is already runnig!\n"); + return; + } + + _thread = new std::thread([this]() { event_base_dispatch(this->evloop); }); +} + +MQTTServer::MQTTServer() { + bind_address = "0"; + port = 1883; + _thread = nullptr; + + session_manager = new SessionManager(); + + evloop = nullptr; + listener = nullptr; +} + +MQTTServer::~MQTTServer() { + //this first, as evloop runs in _thread + if (evloop && event_base_loopexit(evloop, NULL)) { + std::cout << "failed to exit event loop\n"; + } + + if (_thread) { + _thread->join(); + delete _thread; + } + + if (listener) + evconnlistener_free(listener); + + if (evloop) + event_base_free(evloop); + + delete session_manager; +} diff --git a/custom_modules/mqtt_server/mqtt_server.h b/custom_modules/mqtt_server/mqtt_server.h new file mode 100644 index 0000000..e817730 --- /dev/null +++ b/custom_modules/mqtt_server/mqtt_server.h @@ -0,0 +1,43 @@ +#ifndef MQTT_SERVER_H +#define MQTT_SERVER_H + +#include +#include +#include +#include + +#include "./mqtt_broker/src/broker_session.h" +#include "./mqtt_broker/src/session_manager.h" + +#include + +#include + +#include +#include + +class MQTTServer { + +public: + static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *addr, int socklen, void *arg); + + void add_local_session(const std::string &filter, void (*func)(const std::string &client_id, const std::vector &data, void *obj), void* obj); + + void initialize(); + void run_async(); + + MQTTServer(); + ~MQTTServer(); + + std::thread *_thread; + + SessionManager *session_manager; + std::string bind_address; + uint16_t port; + + struct event_base *evloop; + struct evconnlistener *listener; + struct sockaddr_in sin; +}; + +#endif \ No newline at end of file diff --git a/main.cpp b/main.cpp index 42d7a62..b0906b9 100644 --- a/main.cpp +++ b/main.cpp @@ -4,17 +4,17 @@ #include "core/application.h" #include "core/file_cache.h" -#include "core/http_server.h" +#include "core/http/http_server.h" #include "app/ic_application.h" -#include "core/database_manager.h" +#include "core/database/database_manager.h" #include "database/db_init.h" #include "core/settings.h" -#include "modules/mqtt_server/mqtt_server.h" +#include "custom_modules/mqtt_server/mqtt_server.h" #define MAIN_CLASS ICApplication