From aa8c6b35d1373cced1b3bdd93e118babee885283 Mon Sep 17 00:00:00 2001 From: Relintai Date: Sun, 24 Jul 2022 13:19:21 +0200 Subject: [PATCH] IMplemented threading for the simple web server. --- .../http_server_simple/http_server_simple.cpp | 166 +++++++++++++----- .../http_server_simple/http_server_simple.h | 26 +++ .../http_server_simple/web_server_simple.cpp | 5 +- 3 files changed, 151 insertions(+), 46 deletions(-) diff --git a/modules/web/http_server_simple/http_server_simple.cpp b/modules/web/http_server_simple/http_server_simple.cpp index 8272cb307..262705008 100644 --- a/modules/web/http_server_simple/http_server_simple.cpp +++ b/modules/web/http_server_simple/http_server_simple.cpp @@ -257,16 +257,19 @@ HTTPServerConnection::~HTTPServerConnection() { void HTTPServerSimple::stop() { server->stop(); + _clear_clients(); } Error HTTPServerSimple::listen(int p_port, IP_Address p_address, bool p_use_ssl, String p_ssl_key, String p_ssl_cert) { use_ssl = p_use_ssl; + if (use_ssl) { Ref crypto = Crypto::create(); if (crypto.is_null()) { return ERR_UNAVAILABLE; } + if (!p_ssl_key.empty() && !p_ssl_cert.empty()) { key = Ref(CryptoKey::create()); Error err = key->load(p_ssl_key); @@ -278,7 +281,28 @@ Error HTTPServerSimple::listen(int p_port, IP_Address p_address, bool p_use_ssl, _set_internal_certs(crypto); } } - return server->listen(p_port, p_address); + + Error err = server->listen(p_port, p_address); + + if (err != OK) { + return err; + } + + if (_use_worker_threads) { + for (int i = 0; i < _thread_count; ++i) { + ServerWorkerThread *t = memnew(ServerWorkerThread); + t->running = true; + t->server.reference_ptr(this); + t->semaphore = memnew(Semaphore); + + t->thread = memnew(Thread()); + t->thread->start(HTTPServerSimple::_worker_thread_func, t); + + _threads.push_back(t); + } + } + + return OK; } bool HTTPServerSimple::is_listening() const { @@ -310,58 +334,39 @@ void HTTPServerSimple::poll() { _connections_lock.write_unlock(); } - /* - //THis will only work well in a worker thread - while (!_connections.empty()) { - _connections_lock.write_lock(); - Ref c = _connections.front()->get(); - _connections.pop_front(); - _connections_lock.write_unlock(); + if (!_use_worker_threads) { + _connections_lock.write_lock(); - if (c->closed()) { - continue; - } + List>::Element *e = _connections.front(); - c->update(); + while (e) { + Ref c = e->get(); - if (c->closed()) { - continue; - } + if (c->closed()) { + List>::Element *etmp = e->next(); + _connections.erase(e); + e = etmp; + continue; + } - _connections_lock.write_lock(); - _connections.push_back(c); - _connections_lock.write_unlock(); - } - */ + c->update(); - //Single threaded ver - _connections_lock.write_lock(); + if (c->closed()) { + List>::Element *etmp = e->next(); + _connections.erase(e); + e = etmp; + continue; + } - List>::Element *e = _connections.front(); - - while (e) { - Ref c = e->get(); - - if (c->closed()) { - List>::Element *etmp = e->next(); - _connections.erase(e); - e = etmp; - continue; + e = e->next(); } - c->update(); - - if (c->closed()) { - List>::Element *etmp = e->next(); - _connections.erase(e); - e = etmp; - continue; + _connections_lock.write_unlock(); + } else { + if (_connections.size() > 0) { + _wake_workers(); } - - e = e->next(); } - - _connections_lock.write_unlock(); } HTTPServerSimple::HTTPServerSimple() { @@ -384,18 +389,32 @@ HTTPServerSimple::~HTTPServerSimple() { void HTTPServerSimple::_clear_clients() { //stop worker threads first! + _stop_workers(); _connections_lock.write_lock(); - for (List>::Element *e = _connections.front(); e; e = e->next()) { e->get()->close(); } _connections.clear(); - _connections_lock.write_unlock(); } +void HTTPServerSimple::_stop_workers() { + for (int i = 0; i < _threads.size(); ++i) { + _threads.write[i]->running = false; + _threads.write[i]->semaphore->post(); + } + + for (int i = 0; i < _threads.size(); ++i) { + _threads.write[i]->thread->wait_to_finish(); + memdelete(_threads.write[i]->thread); + memdelete(_threads.write[i]->semaphore); + } + + _threads.clear(); +} + void HTTPServerSimple::_set_internal_certs(Ref p_crypto) { //const String cache_path = EditorSettings::get_singleton()->get_cache_dir(); //TODO @@ -419,3 +438,60 @@ void HTTPServerSimple::_set_internal_certs(Ref p_crypto) { cert->save(crt_path); } } + +void HTTPServerSimple::_wake_workers() { + for (int i = 0; i < _threads.size(); ++i) { + if (_connections.size() == 0) { + return; + } + + ServerWorkerThread *t = _threads[i]; + + if (!t->working) { + t->semaphore->post(); + } + } +} + +void HTTPServerSimple::_worker_thread_func(void *data) { + ServerWorkerThread *context = reinterpret_cast(data); + + Ref server = context->server; + + context->working = true; + + while (context->running) { + //THis will only work well in a worker thread + while (!server->_connections.empty()) { + server->_connections_lock.write_lock(); + List>::Element *e = server->_connections.front(); + + if (!e) { + server->_connections_lock.write_unlock(); + break; + } + + Ref c = e->get(); + server->_connections.pop_front(); + server->_connections_lock.write_unlock(); + + if (c->closed()) { + continue; + } + + c->update(); + + if (c->closed()) { + continue; + } + + server->_connections_lock.write_lock(); + server->_connections.push_back(c); + server->_connections_lock.write_unlock(); + } + + context->working = false; + context->semaphore->wait(); + context->working = true; + } +} diff --git a/modules/web/http_server_simple/http_server_simple.h b/modules/web/http_server_simple/http_server_simple.h index 08e09862d..1cc61c1cf 100644 --- a/modules/web/http_server_simple/http_server_simple.h +++ b/modules/web/http_server_simple/http_server_simple.h @@ -36,6 +36,7 @@ #include "core/io/zip_io.h" #include "core/list.h" #include "core/os/rw_lock.h" +#include "core/os/semaphore.h" #include "core/vector.h" #include "core/project_settings.h" @@ -99,6 +100,9 @@ public: Ref cert; + bool _use_worker_threads; + int _thread_count; + private: Ref server; @@ -109,7 +113,29 @@ private: RWLock _connections_lock; void _clear_clients(); + void _stop_workers(); void _set_internal_certs(Ref p_crypto); + + void _wake_workers(); + + struct ServerWorkerThread { + Thread *thread; + Semaphore *semaphore; + Ref server; + bool running; + bool working; + + ServerWorkerThread() { + thread = nullptr; + semaphore = nullptr; + running = false; + working = false; + } + }; + + Vector _threads; + + static void _worker_thread_func(void *data); }; #endif diff --git a/modules/web/http_server_simple/web_server_simple.cpp b/modules/web/http_server_simple/web_server_simple.cpp index 57a0464f3..4bc0d677d 100644 --- a/modules/web/http_server_simple/web_server_simple.cpp +++ b/modules/web/http_server_simple/web_server_simple.cpp @@ -33,6 +33,9 @@ #include "http_server_simple.h" void WebServerSimple::_start() { + server->_use_worker_threads = _use_worker_threads; + server->_thread_count = _thread_count; + WebServer::_start(); const uint16_t bind_port = 8080; @@ -75,7 +78,7 @@ void WebServerSimple::_stop() { WebServerSimple::WebServerSimple() { _use_worker_threads = true; _use_poll_thread = true; - _thread_count = 1; + _thread_count = 4; server.instance(); server->_web_server = this;