diff --git a/thread_pool.cpp b/thread_pool.cpp index 04885fc..97a5fad 100644 --- a/thread_pool.cpp +++ b/thread_pool.cpp @@ -59,52 +59,55 @@ bool ThreadPool::get_use_threads() const { return _use_threads; } void ThreadPool::set_use_threads(const bool value) { - _use_threads = value; + // Will be applied later in update, so current jobs can be finished first + _use_threads_new = value; + _dirty = true; } int ThreadPool::get_thread_count() const { return _thread_count; } -void ThreadPool::set_thread_count(const bool value) { +void ThreadPool::set_thread_count(const int value) { _thread_count = value; + _dirty = true; } int ThreadPool::get_thread_fallback_count() const { return _thread_fallback_count; } -void ThreadPool::set_thread_fallback_count(const bool value) { +void ThreadPool::set_thread_fallback_count(const int value) { _thread_fallback_count = value; + _dirty = true; } float ThreadPool::get_max_work_per_frame_percent() const { return _max_work_per_frame_percent; } -void ThreadPool::set_max_work_per_frame_percent(const bool value) { +void ThreadPool::set_max_work_per_frame_percent(const float value) { _max_work_per_frame_percent = value; + _dirty = true; } float ThreadPool::get_max_time_per_frame() const { return _max_time_per_frame; } -void ThreadPool::set_max_time_per_frame(const bool value) { +void ThreadPool::set_max_time_per_frame(const float value) { _max_time_per_frame = value; + _dirty = true; } -bool ThreadPool::has_job(const Ref &job) { - for (int i = 0; i < _threads.size(); ++i) { - ThreadPoolContext *context = _threads.get(i); - - if (context->job == job) { - return true; - } - } - +bool ThreadPool::is_working() const { _THREAD_SAFE_LOCK_ - for (int i = _current_queue_head; i < _current_queue_tail; ++i) { - if (_queue[i] == job) { - _THREAD_SAFE_UNLOCK_ + if (_queue.size() > 0) { + _THREAD_SAFE_UNLOCK_ + return true; + } + + for (int i = 0; i < _threads.size(); ++i) { + if (_threads[i]->job.is_valid()) { + _THREAD_SAFE_UNLOCK_ return true; } } @@ -114,6 +117,39 @@ bool ThreadPool::has_job(const Ref &job) { return false; } +bool ThreadPool::is_working_no_lock() const { + if (_queue.size() > 0) { + return true; + } + + for (int i = 0; i < _threads.size(); ++i) { + if (_threads[i]->job.is_valid()) { + return true; + } + } + + return false; +} + +bool ThreadPool::has_job(const Ref &job) { + _THREAD_SAFE_LOCK_ + + for (int i = 0; i < _threads.size(); ++i) { + ThreadPoolContext *context = _threads.get(i); + + if (context->job == job) { + _THREAD_SAFE_UNLOCK_ + return true; + } + } + + List>::Element *E = _queue.find(job); + + _THREAD_SAFE_UNLOCK_ + + return E; +} + void ThreadPool::add_job(const Ref &job) { _THREAD_SAFE_LOCK_ @@ -130,22 +166,7 @@ void ThreadPool::add_job(const Ref &job) { } } - if (_current_queue_tail == _queue.size()) { - if (_current_queue_head == 0) { - _queue.resize(_queue.size() + _queue_grow_size); - } else { - int j = 0; - - for (int i = _current_queue_head; i < _current_queue_tail; ++i) { - _queue.write[j++] = _queue[i]; - } - - _current_queue_head = 0; - _current_queue_tail = j; - } - } - - _queue.write[_current_queue_tail++] = job; + _queue.push_back(job); _THREAD_SAFE_UNLOCK_ } @@ -157,23 +178,7 @@ void ThreadPool::cancel_job(Ref job) { _THREAD_SAFE_LOCK_ - //it it's in the queue remove it - for (int i = _current_queue_head; i < _current_queue_tail; ++i) { - Ref cjob = _queue[i]; - - if (cjob == job) { - _queue.write[i].unref(); - - for (int j = i; j + 1 < _current_queue_tail; ++j) { - _queue.write[j] = _queue[j + 1]; - } - - --_current_queue_tail; - - _THREAD_SAFE_UNLOCK_ - return; - } - } + _queue.erase(job); _THREAD_SAFE_UNLOCK_ } @@ -185,14 +190,9 @@ void ThreadPool::cancel_job_wait(Ref job) { _THREAD_SAFE_LOCK_ - for (int i = _current_queue_head; i < _current_queue_tail; ++i) { - Ref j = _queue[i]; - - if (j == job) { - _queue.write[i].unref(); - _THREAD_SAFE_UNLOCK_ - return; - } + if (_queue.erase(job)) { + _THREAD_SAFE_UNLOCK_ + return; } _THREAD_SAFE_UNLOCK_ @@ -216,19 +216,13 @@ void ThreadPool::_thread_finished(ThreadPoolContext *context) { context->job.unref(); - while (_current_queue_head != _current_queue_tail) { - context->job = _queue.get(_current_queue_head); - - if (!context->job.is_valid()) { - ++_current_queue_head; - continue; - } + while (_queue.size() > 0 && !context->job.is_valid()) { + context->job = _queue.front()->get(); + _queue.pop_front(); + } + if (context->job.is_valid()) { context->semaphore->post(); - _queue.write[_current_queue_head].unref(); - - ++_current_queue_head; - break; } _THREAD_SAFE_UNLOCK_ @@ -261,16 +255,25 @@ void ThreadPool::register_update() { } void ThreadPool::update() { - if (_current_queue_head == _current_queue_tail) + if (_dirty) { + apply_settings(); + } + + if (_use_threads) { return; + } + + if (_queue.size() == 0) { + return; + } float remaining_time = _max_time_per_frame; - while (remaining_time > 0 && _current_queue_head != _current_queue_tail) { - Ref job = _queue.get(_current_queue_head); + while (remaining_time > 0 && _queue.size() > 0) { + Ref job = _queue.front()->get(); if (!job.is_valid()) { - ++_current_queue_head; + _queue.pop_front(); continue; } @@ -280,17 +283,66 @@ void ThreadPool::update() { remaining_time -= job->get_current_execution_time(); if (job->get_complete() || job->get_cancelled()) { - _queue.write[_current_queue_head++].unref(); + _queue.pop_front(); } } } +void ThreadPool::apply_settings() { + if (!_dirty) { + return; + } + + _THREAD_SAFE_LOCK_ + + if (is_working_no_lock()) { + _THREAD_SAFE_UNLOCK_ + return; + } + + _dirty = false; + + for (int i = 0; i < _threads.size(); ++i) { + ThreadPoolContext *context = _threads[i]; + + CRASH_COND(context->job.is_valid()); + + context->running = false; + context->semaphore->post(); + context->thread->wait_to_finish(); + memdelete(context->thread); + memdelete(context->semaphore); + memdelete(context); + } + + _threads.resize(0); + + _use_threads = _use_threads_new; + + if (_use_threads) { + _threads.resize(_thread_count); + + for (int i = 0; i < _threads.size(); ++i) { + ThreadPoolContext *context = memnew(ThreadPoolContext); + + context->running = true; + context->semaphore = memnew(Semaphore); + + context->thread = memnew(Thread()); + context->thread->start(ThreadPool::_worker_thread_func, context); + + _threads.write[i] = context; + } + } else { + call_deferred("register_update"); + } + + _THREAD_SAFE_UNLOCK_ +} + ThreadPool::ThreadPool() { _instance = this; - _current_queue_head = 0; - _current_queue_tail = 0; - _use_threads = GLOBAL_DEF("thread_pool/use_threads", true); _thread_count = GLOBAL_DEF("thread_pool/thread_count", -1); _thread_fallback_count = GLOBAL_DEF("thread_pool/thread_fallback_count", 4); @@ -304,10 +356,6 @@ ThreadPool::ThreadPool() { //Todo Add help text, as this will only come into play if threading is disabled, or not available _max_work_per_frame_percent = GLOBAL_DEF("thread_pool/max_work_per_frame_percent", 25); - _queue_start_size = GLOBAL_DEF("thread_pool/queue_start_size", 20); - _queue_grow_size = GLOBAL_DEF("thread_pool/queue_grow_size", 10); - _queue.resize(_queue_start_size); - //Todo this should be recalculated constantly to smooth out performance better _max_time_per_frame = (1 / 60.0) * (_max_work_per_frame_percent / 100.0); @@ -324,23 +372,13 @@ ThreadPool::ThreadPool() { if (_thread_count <= 0) { _thread_count = _thread_fallback_count; } - - _threads.resize(_thread_count); - - for (int i = 0; i < _threads.size(); ++i) { - ThreadPoolContext *context = memnew(ThreadPoolContext); - - context->running = true; - context->semaphore = memnew(Semaphore); - - context->thread = memnew(Thread()); - context->thread->start(ThreadPool::_worker_thread_func, context); - - _threads.write[i] = context; - } - } else { - call_deferred("register_update"); } + + _use_threads_new = _use_threads; + + _dirty = true; + + apply_settings(); } ThreadPool::~ThreadPool() { @@ -365,7 +403,6 @@ ThreadPool::~ThreadPool() { _threads.clear(); _queue.clear(); - //_job_pool.clear(); } void ThreadPool::_bind_methods() { @@ -389,6 +426,9 @@ void ThreadPool::_bind_methods() { ClassDB::bind_method(D_METHOD("set_max_time_per_frame", "value"), &ThreadPool::set_max_time_per_frame); ADD_PROPERTY(PropertyInfo(Variant::REAL, "max_time_per_frame"), "set_max_time_per_frame", "get_max_time_per_frame"); + ClassDB::bind_method(D_METHOD("is_working"), &ThreadPool::is_working); + ClassDB::bind_method(D_METHOD("is_working_no_lock"), &ThreadPool::is_working_no_lock); + ClassDB::bind_method(D_METHOD("has_job", "job"), &ThreadPool::has_job); ClassDB::bind_method(D_METHOD("add_job", "job"), &ThreadPool::add_job); diff --git a/thread_pool.h b/thread_pool.h index 4263b71..1b2b707 100644 --- a/thread_pool.h +++ b/thread_pool.h @@ -30,9 +30,11 @@ SOFTWARE. #if VERSION_MAJOR > 3 #include "core/object/object.h" #include "core/templates/vector.h" +#include "core/templates/list.h" #else #include "core/object.h" #include "core/vector.h" +#include "core/list.h" #endif #include "core/os/semaphore.h" @@ -68,16 +70,19 @@ public: void set_use_threads(const bool value); int get_thread_count() const; - void set_thread_count(const bool value); + void set_thread_count(const int value); int get_thread_fallback_count() const; - void set_thread_fallback_count(const bool value); + void set_thread_fallback_count(const int value); float get_max_work_per_frame_percent() const; - void set_max_work_per_frame_percent(const bool value); + void set_max_work_per_frame_percent(const float value); float get_max_time_per_frame() const; - void set_max_time_per_frame(const bool value); + void set_max_time_per_frame(const float value); + + bool is_working() const; + bool is_working_no_lock() const; bool has_job(const Ref &job); void add_job(const Ref &job); @@ -90,6 +95,7 @@ public: void register_update(); void update(); + void apply_settings(); ThreadPool(); ~ThreadPool(); @@ -100,7 +106,9 @@ protected: private: static ThreadPool *_instance; + bool _dirty; bool _use_threads; + bool _use_threads_new; int _thread_count; int _thread_fallback_count; float _max_work_per_frame_percent; @@ -108,15 +116,7 @@ private: Vector _threads; - Vector > _queue; - int _current_queue_head; - int _current_queue_tail; - - int _queue_start_size; - int _queue_grow_size; - - //todo - //Vector > _job_pool; + List> _queue; }; #endif