From d10db3fddd76e79b85c065b2b77e7bb3742114c0 Mon Sep 17 00:00:00 2001 From: Relintai Date: Sat, 10 Sep 2022 03:07:49 +0200 Subject: [PATCH] Now ThreadPool uses a List internally as a queue inetad of a Vector. --- modules/thread_pool/thread_pool.cpp | 91 ++++++----------------------- modules/thread_pool/thread_pool.h | 11 +--- 2 files changed, 20 insertions(+), 82 deletions(-) diff --git a/modules/thread_pool/thread_pool.cpp b/modules/thread_pool/thread_pool.cpp index e372cdeb6..c68621ece 100644 --- a/modules/thread_pool/thread_pool.cpp +++ b/modules/thread_pool/thread_pool.cpp @@ -82,17 +82,11 @@ bool ThreadPool::has_job(const Ref &job) { _THREAD_SAFE_LOCK_ - for (int i = _current_queue_head; i < _current_queue_tail; ++i) { - if (_queue[i] == job) { - _THREAD_SAFE_UNLOCK_ - - return true; - } - } + List>::Element *E = _queue.find(job); _THREAD_SAFE_UNLOCK_ - return false; + return E; } void ThreadPool::add_job(const Ref &job) { @@ -111,22 +105,7 @@ void ThreadPool::add_job(const Ref &job) { } } - if (_current_queue_tail == _queue.size()) { - if (_current_queue_head == 0) { - _queue.resize(_queue.size() + _queue_grow_size); - } else { - int j = 0; - - for (int i = _current_queue_head; i < _current_queue_tail; ++i) { - _queue.write[j++] = _queue[i]; - } - - _current_queue_head = 0; - _current_queue_tail = j; - } - } - - _queue.write[_current_queue_tail++] = job; + _queue.push_back(job); _THREAD_SAFE_UNLOCK_ } @@ -138,23 +117,7 @@ void ThreadPool::cancel_job(Ref job) { _THREAD_SAFE_LOCK_ - //it it's in the queue remove it - for (int i = _current_queue_head; i < _current_queue_tail; ++i) { - Ref cjob = _queue[i]; - - if (cjob == job) { - _queue.write[i].unref(); - - for (int j = i; j + 1 < _current_queue_tail; ++j) { - _queue.write[j] = _queue[j + 1]; - } - - --_current_queue_tail; - - _THREAD_SAFE_UNLOCK_ - return; - } - } + _queue.erase(job); _THREAD_SAFE_UNLOCK_ } @@ -166,14 +129,9 @@ void ThreadPool::cancel_job_wait(Ref job) { _THREAD_SAFE_LOCK_ - for (int i = _current_queue_head; i < _current_queue_tail; ++i) { - Ref j = _queue[i]; - - if (j == job) { - _queue.write[i].unref(); - _THREAD_SAFE_UNLOCK_ - return; - } + if (_queue.erase(job)) { + _THREAD_SAFE_UNLOCK_ + return; } _THREAD_SAFE_UNLOCK_ @@ -197,19 +155,13 @@ void ThreadPool::_thread_finished(ThreadPoolContext *context) { context->job.unref(); - while (_current_queue_head != _current_queue_tail) { - context->job = _queue.get(_current_queue_head); - - if (!context->job.is_valid()) { - ++_current_queue_head; - continue; - } + while (_queue.size() > 0 && !context->job.is_valid()) { + context->job = _queue.front()->get(); + _queue.pop_front(); + } + if (context->job.is_valid()) { context->semaphore->post(); - _queue.write[_current_queue_head].unref(); - - ++_current_queue_head; - break; } _THREAD_SAFE_UNLOCK_ @@ -242,16 +194,17 @@ void ThreadPool::register_update() { } void ThreadPool::update() { - if (_current_queue_head == _current_queue_tail) + if (_queue.size() == 0) { return; + } float remaining_time = _max_time_per_frame; - while (remaining_time > 0 && _current_queue_head != _current_queue_tail) { - Ref job = _queue.get(_current_queue_head); + while (remaining_time > 0 && _queue.size() > 0) { + Ref job = _queue.front()->get(); if (!job.is_valid()) { - ++_current_queue_head; + _queue.pop_front(); continue; } @@ -261,7 +214,7 @@ void ThreadPool::update() { remaining_time -= job->get_current_execution_time(); if (job->get_complete() || job->get_cancelled()) { - _queue.write[_current_queue_head++].unref(); + _queue.pop_front(); } } } @@ -269,9 +222,6 @@ void ThreadPool::update() { ThreadPool::ThreadPool() { _instance = this; - _current_queue_head = 0; - _current_queue_tail = 0; - _use_threads = GLOBAL_DEF("thread_pool/use_threads", true); _thread_count = GLOBAL_DEF("thread_pool/thread_count", -1); _thread_fallback_count = GLOBAL_DEF("thread_pool/thread_fallback_count", 4); @@ -285,10 +235,6 @@ ThreadPool::ThreadPool() { //Todo Add help text, as this will only come into play if threading is disabled, or not available _max_work_per_frame_percent = GLOBAL_DEF("thread_pool/max_work_per_frame_percent", 25); - _queue_start_size = GLOBAL_DEF("thread_pool/queue_start_size", 20); - _queue_grow_size = GLOBAL_DEF("thread_pool/queue_grow_size", 10); - _queue.resize(_queue_start_size); - //Todo this should be recalculated constantly to smooth out performance better _max_time_per_frame = (1 / 60.0) * (_max_work_per_frame_percent / 100.0); @@ -346,7 +292,6 @@ ThreadPool::~ThreadPool() { _threads.clear(); _queue.clear(); - //_job_pool.clear(); } void ThreadPool::_bind_methods() { diff --git a/modules/thread_pool/thread_pool.h b/modules/thread_pool/thread_pool.h index 9c649e6e6..d957a622d 100644 --- a/modules/thread_pool/thread_pool.h +++ b/modules/thread_pool/thread_pool.h @@ -27,6 +27,7 @@ SOFTWARE. #include "core/object/object.h" #include "core/containers/vector.h" +#include "core/containers/list.h" #include "core/os/semaphore.h" #include "core/os/thread.h" @@ -100,15 +101,7 @@ private: Vector _threads; - Vector> _queue; - int _current_queue_head; - int _current_queue_tail; - - int _queue_start_size; - int _queue_grow_size; - - //todo - //Vector > _job_pool; + List> _queue; }; #endif