Now ThreadPool uses a List internally as a queue inetad of a Vector.

This commit is contained in:
Relintai 2022-09-10 03:07:49 +02:00
parent 4025ab2824
commit d10db3fddd
2 changed files with 20 additions and 82 deletions

View File

@ -82,17 +82,11 @@ bool ThreadPool::has_job(const Ref<ThreadPoolJob> &job) {
_THREAD_SAFE_LOCK_ _THREAD_SAFE_LOCK_
for (int i = _current_queue_head; i < _current_queue_tail; ++i) { List<Ref<ThreadPoolJob>>::Element *E = _queue.find(job);
if (_queue[i] == job) {
_THREAD_SAFE_UNLOCK_
return true;
}
}
_THREAD_SAFE_UNLOCK_ _THREAD_SAFE_UNLOCK_
return false; return E;
} }
void ThreadPool::add_job(const Ref<ThreadPoolJob> &job) { void ThreadPool::add_job(const Ref<ThreadPoolJob> &job) {
@ -111,22 +105,7 @@ void ThreadPool::add_job(const Ref<ThreadPoolJob> &job) {
} }
} }
if (_current_queue_tail == _queue.size()) { _queue.push_back(job);
if (_current_queue_head == 0) {
_queue.resize(_queue.size() + _queue_grow_size);
} else {
int j = 0;
for (int i = _current_queue_head; i < _current_queue_tail; ++i) {
_queue.write[j++] = _queue[i];
}
_current_queue_head = 0;
_current_queue_tail = j;
}
}
_queue.write[_current_queue_tail++] = job;
_THREAD_SAFE_UNLOCK_ _THREAD_SAFE_UNLOCK_
} }
@ -138,23 +117,7 @@ void ThreadPool::cancel_job(Ref<ThreadPoolJob> job) {
_THREAD_SAFE_LOCK_ _THREAD_SAFE_LOCK_
//it it's in the queue remove it _queue.erase(job);
for (int i = _current_queue_head; i < _current_queue_tail; ++i) {
Ref<ThreadPoolJob> cjob = _queue[i];
if (cjob == job) {
_queue.write[i].unref();
for (int j = i; j + 1 < _current_queue_tail; ++j) {
_queue.write[j] = _queue[j + 1];
}
--_current_queue_tail;
_THREAD_SAFE_UNLOCK_
return;
}
}
_THREAD_SAFE_UNLOCK_ _THREAD_SAFE_UNLOCK_
} }
@ -166,15 +129,10 @@ void ThreadPool::cancel_job_wait(Ref<ThreadPoolJob> job) {
_THREAD_SAFE_LOCK_ _THREAD_SAFE_LOCK_
for (int i = _current_queue_head; i < _current_queue_tail; ++i) { if (_queue.erase(job)) {
Ref<ThreadPoolJob> j = _queue[i];
if (j == job) {
_queue.write[i].unref();
_THREAD_SAFE_UNLOCK_ _THREAD_SAFE_UNLOCK_
return; return;
} }
}
_THREAD_SAFE_UNLOCK_ _THREAD_SAFE_UNLOCK_
@ -197,19 +155,13 @@ void ThreadPool::_thread_finished(ThreadPoolContext *context) {
context->job.unref(); context->job.unref();
while (_current_queue_head != _current_queue_tail) { while (_queue.size() > 0 && !context->job.is_valid()) {
context->job = _queue.get(_current_queue_head); context->job = _queue.front()->get();
_queue.pop_front();
if (!context->job.is_valid()) {
++_current_queue_head;
continue;
} }
if (context->job.is_valid()) {
context->semaphore->post(); context->semaphore->post();
_queue.write[_current_queue_head].unref();
++_current_queue_head;
break;
} }
_THREAD_SAFE_UNLOCK_ _THREAD_SAFE_UNLOCK_
@ -242,16 +194,17 @@ void ThreadPool::register_update() {
} }
void ThreadPool::update() { void ThreadPool::update() {
if (_current_queue_head == _current_queue_tail) if (_queue.size() == 0) {
return; return;
}
float remaining_time = _max_time_per_frame; float remaining_time = _max_time_per_frame;
while (remaining_time > 0 && _current_queue_head != _current_queue_tail) { while (remaining_time > 0 && _queue.size() > 0) {
Ref<ThreadPoolJob> job = _queue.get(_current_queue_head); Ref<ThreadPoolJob> job = _queue.front()->get();
if (!job.is_valid()) { if (!job.is_valid()) {
++_current_queue_head; _queue.pop_front();
continue; continue;
} }
@ -261,7 +214,7 @@ void ThreadPool::update() {
remaining_time -= job->get_current_execution_time(); remaining_time -= job->get_current_execution_time();
if (job->get_complete() || job->get_cancelled()) { if (job->get_complete() || job->get_cancelled()) {
_queue.write[_current_queue_head++].unref(); _queue.pop_front();
} }
} }
} }
@ -269,9 +222,6 @@ void ThreadPool::update() {
ThreadPool::ThreadPool() { ThreadPool::ThreadPool() {
_instance = this; _instance = this;
_current_queue_head = 0;
_current_queue_tail = 0;
_use_threads = GLOBAL_DEF("thread_pool/use_threads", true); _use_threads = GLOBAL_DEF("thread_pool/use_threads", true);
_thread_count = GLOBAL_DEF("thread_pool/thread_count", -1); _thread_count = GLOBAL_DEF("thread_pool/thread_count", -1);
_thread_fallback_count = GLOBAL_DEF("thread_pool/thread_fallback_count", 4); _thread_fallback_count = GLOBAL_DEF("thread_pool/thread_fallback_count", 4);
@ -285,10 +235,6 @@ ThreadPool::ThreadPool() {
//Todo Add help text, as this will only come into play if threading is disabled, or not available //Todo Add help text, as this will only come into play if threading is disabled, or not available
_max_work_per_frame_percent = GLOBAL_DEF("thread_pool/max_work_per_frame_percent", 25); _max_work_per_frame_percent = GLOBAL_DEF("thread_pool/max_work_per_frame_percent", 25);
_queue_start_size = GLOBAL_DEF("thread_pool/queue_start_size", 20);
_queue_grow_size = GLOBAL_DEF("thread_pool/queue_grow_size", 10);
_queue.resize(_queue_start_size);
//Todo this should be recalculated constantly to smooth out performance better //Todo this should be recalculated constantly to smooth out performance better
_max_time_per_frame = (1 / 60.0) * (_max_work_per_frame_percent / 100.0); _max_time_per_frame = (1 / 60.0) * (_max_work_per_frame_percent / 100.0);
@ -346,7 +292,6 @@ ThreadPool::~ThreadPool() {
_threads.clear(); _threads.clear();
_queue.clear(); _queue.clear();
//_job_pool.clear();
} }
void ThreadPool::_bind_methods() { void ThreadPool::_bind_methods() {

View File

@ -27,6 +27,7 @@ SOFTWARE.
#include "core/object/object.h" #include "core/object/object.h"
#include "core/containers/vector.h" #include "core/containers/vector.h"
#include "core/containers/list.h"
#include "core/os/semaphore.h" #include "core/os/semaphore.h"
#include "core/os/thread.h" #include "core/os/thread.h"
@ -100,15 +101,7 @@ private:
Vector<ThreadPoolContext *> _threads; Vector<ThreadPoolContext *> _threads;
Vector<Ref<ThreadPoolJob>> _queue; List<Ref<ThreadPoolJob>> _queue;
int _current_queue_head;
int _current_queue_tail;
int _queue_start_size;
int _queue_grow_size;
//todo
//Vector<Ref<ThreadPoolJob> > _job_pool;
}; };
#endif #endif