IMplemented threading for the simple web server.

This commit is contained in:
Relintai 2022-07-24 13:19:21 +02:00
parent 7d3486e5ee
commit aa8c6b35d1
3 changed files with 151 additions and 46 deletions

View File

@ -257,16 +257,19 @@ HTTPServerConnection::~HTTPServerConnection() {
void HTTPServerSimple::stop() {
server->stop();
_clear_clients();
}
Error HTTPServerSimple::listen(int p_port, IP_Address p_address, bool p_use_ssl, String p_ssl_key, String p_ssl_cert) {
use_ssl = p_use_ssl;
if (use_ssl) {
Ref<Crypto> crypto = Crypto::create();
if (crypto.is_null()) {
return ERR_UNAVAILABLE;
}
if (!p_ssl_key.empty() && !p_ssl_cert.empty()) {
key = Ref<CryptoKey>(CryptoKey::create());
Error err = key->load(p_ssl_key);
@ -278,7 +281,28 @@ Error HTTPServerSimple::listen(int p_port, IP_Address p_address, bool p_use_ssl,
_set_internal_certs(crypto);
}
}
return server->listen(p_port, p_address);
Error err = server->listen(p_port, p_address);
if (err != OK) {
return err;
}
if (_use_worker_threads) {
for (int i = 0; i < _thread_count; ++i) {
ServerWorkerThread *t = memnew(ServerWorkerThread);
t->running = true;
t->server.reference_ptr(this);
t->semaphore = memnew(Semaphore);
t->thread = memnew(Thread());
t->thread->start(HTTPServerSimple::_worker_thread_func, t);
_threads.push_back(t);
}
}
return OK;
}
bool HTTPServerSimple::is_listening() const {
@ -310,58 +334,39 @@ void HTTPServerSimple::poll() {
_connections_lock.write_unlock();
}
/*
//THis will only work well in a worker thread
while (!_connections.empty()) {
_connections_lock.write_lock();
Ref<HTTPServerConnection> c = _connections.front()->get();
_connections.pop_front();
_connections_lock.write_unlock();
if (!_use_worker_threads) {
_connections_lock.write_lock();
if (c->closed()) {
continue;
}
List<Ref<HTTPServerConnection>>::Element *e = _connections.front();
c->update();
while (e) {
Ref<HTTPServerConnection> c = e->get();
if (c->closed()) {
continue;
}
if (c->closed()) {
List<Ref<HTTPServerConnection>>::Element *etmp = e->next();
_connections.erase(e);
e = etmp;
continue;
}
_connections_lock.write_lock();
_connections.push_back(c);
_connections_lock.write_unlock();
}
*/
c->update();
//Single threaded ver
_connections_lock.write_lock();
if (c->closed()) {
List<Ref<HTTPServerConnection>>::Element *etmp = e->next();
_connections.erase(e);
e = etmp;
continue;
}
List<Ref<HTTPServerConnection>>::Element *e = _connections.front();
while (e) {
Ref<HTTPServerConnection> c = e->get();
if (c->closed()) {
List<Ref<HTTPServerConnection>>::Element *etmp = e->next();
_connections.erase(e);
e = etmp;
continue;
e = e->next();
}
c->update();
if (c->closed()) {
List<Ref<HTTPServerConnection>>::Element *etmp = e->next();
_connections.erase(e);
e = etmp;
continue;
_connections_lock.write_unlock();
} else {
if (_connections.size() > 0) {
_wake_workers();
}
e = e->next();
}
_connections_lock.write_unlock();
}
HTTPServerSimple::HTTPServerSimple() {
@ -384,18 +389,32 @@ HTTPServerSimple::~HTTPServerSimple() {
void HTTPServerSimple::_clear_clients() {
//stop worker threads first!
_stop_workers();
_connections_lock.write_lock();
for (List<Ref<HTTPServerConnection>>::Element *e = _connections.front(); e; e = e->next()) {
e->get()->close();
}
_connections.clear();
_connections_lock.write_unlock();
}
void HTTPServerSimple::_stop_workers() {
for (int i = 0; i < _threads.size(); ++i) {
_threads.write[i]->running = false;
_threads.write[i]->semaphore->post();
}
for (int i = 0; i < _threads.size(); ++i) {
_threads.write[i]->thread->wait_to_finish();
memdelete(_threads.write[i]->thread);
memdelete(_threads.write[i]->semaphore);
}
_threads.clear();
}
void HTTPServerSimple::_set_internal_certs(Ref<Crypto> p_crypto) {
//const String cache_path = EditorSettings::get_singleton()->get_cache_dir();
//TODO
@ -419,3 +438,60 @@ void HTTPServerSimple::_set_internal_certs(Ref<Crypto> p_crypto) {
cert->save(crt_path);
}
}
void HTTPServerSimple::_wake_workers() {
for (int i = 0; i < _threads.size(); ++i) {
if (_connections.size() == 0) {
return;
}
ServerWorkerThread *t = _threads[i];
if (!t->working) {
t->semaphore->post();
}
}
}
void HTTPServerSimple::_worker_thread_func(void *data) {
ServerWorkerThread *context = reinterpret_cast<ServerWorkerThread *>(data);
Ref<HTTPServerSimple> server = context->server;
context->working = true;
while (context->running) {
//THis will only work well in a worker thread
while (!server->_connections.empty()) {
server->_connections_lock.write_lock();
List<Ref<HTTPServerConnection>>::Element *e = server->_connections.front();
if (!e) {
server->_connections_lock.write_unlock();
break;
}
Ref<HTTPServerConnection> c = e->get();
server->_connections.pop_front();
server->_connections_lock.write_unlock();
if (c->closed()) {
continue;
}
c->update();
if (c->closed()) {
continue;
}
server->_connections_lock.write_lock();
server->_connections.push_back(c);
server->_connections_lock.write_unlock();
}
context->working = false;
context->semaphore->wait();
context->working = true;
}
}

View File

@ -36,6 +36,7 @@
#include "core/io/zip_io.h"
#include "core/list.h"
#include "core/os/rw_lock.h"
#include "core/os/semaphore.h"
#include "core/vector.h"
#include "core/project_settings.h"
@ -99,6 +100,9 @@ public:
Ref<X509Certificate> cert;
bool _use_worker_threads;
int _thread_count;
private:
Ref<TCP_Server> server;
@ -109,7 +113,29 @@ private:
RWLock _connections_lock;
void _clear_clients();
void _stop_workers();
void _set_internal_certs(Ref<Crypto> p_crypto);
void _wake_workers();
struct ServerWorkerThread {
Thread *thread;
Semaphore *semaphore;
Ref<HTTPServerSimple> server;
bool running;
bool working;
ServerWorkerThread() {
thread = nullptr;
semaphore = nullptr;
running = false;
working = false;
}
};
Vector<ServerWorkerThread *> _threads;
static void _worker_thread_func(void *data);
};
#endif

View File

@ -33,6 +33,9 @@
#include "http_server_simple.h"
void WebServerSimple::_start() {
server->_use_worker_threads = _use_worker_threads;
server->_thread_count = _thread_count;
WebServer::_start();
const uint16_t bind_port = 8080;
@ -75,7 +78,7 @@ void WebServerSimple::_stop() {
WebServerSimple::WebServerSimple() {
_use_worker_threads = true;
_use_poll_thread = true;
_thread_count = 1;
_thread_count = 4;
server.instance();
server->_web_server = this;