diff --git a/thread_pool.cpp b/thread_pool.cpp index 2671c4e..a8abd73 100644 --- a/thread_pool.cpp +++ b/thread_pool.cpp @@ -90,83 +90,19 @@ void ThreadPool::set_max_time_per_frame(const bool value) { _max_time_per_frame = value; } -void ThreadPool::cancel_job_wait(Ref job) { - ERR_FAIL_COND(!job.is_valid()); - - _THREAD_SAFE_LOCK_ - - for (int i = 0; i < _queue.size(); ++i) { - Ref j = _queue[i]; - - if (j == job) { - _queue.write[i].unref(); - _THREAD_SAFE_UNLOCK_ - return; - } - } - - _THREAD_SAFE_UNLOCK_ - - for (int i = 0; i < _threads.size(); ++i) { - Ref j = _threads[i]->job; - - if (j == job) { - job->set_cancelled(true); - - while (_threads[i]->job == job) { - OS::get_singleton()->delay_usec(100); - } - - return; - } - } -} -void ThreadPool::cancel_job(Ref job) { - ERR_FAIL_COND(!job.is_valid()); - - _THREAD_SAFE_LOCK_ - - for (int i = 0; i < _queue.size(); ++i) { - Ref j = _queue[i]; - - if (j == job) { - _queue.write[i].unref(); - _THREAD_SAFE_UNLOCK_ - return; - } - } - - _THREAD_SAFE_UNLOCK_ - - for (int i = 0; i < _threads.size(); ++i) { - Ref j = _threads[i]->job; - - if (j == job) { - job->set_cancelled(true); - return; - } - } -} - bool ThreadPool::has_job(const Ref &job) { - _THREAD_SAFE_LOCK_ + for (int i = 0; i < _threads.size(); ++i) { + ThreadPoolContext *context = _threads.get(i); - if (_use_threads) { - for (int i = 0; i < _threads.size(); ++i) { - ThreadPoolContext *context = _threads.get(i); - - if (context->job == job) { - - _THREAD_SAFE_UNLOCK_ - - return true; - } + if (context->job == job) { + return true; } } + _THREAD_SAFE_LOCK_ + for (int i = _current_queue_head; i < _current_queue_tail; ++i) { if (_queue[i] == job) { - _THREAD_SAFE_UNLOCK_ return true; @@ -214,6 +150,67 @@ void ThreadPool::add_job(const Ref &job) { _THREAD_SAFE_UNLOCK_ } +void ThreadPool::cancel_job(Ref job) { + ERR_FAIL_COND(!job.is_valid()); + + job->set_cancelled(true); + + _THREAD_SAFE_LOCK_ + + //it it's in the queue remove it + for (int i = _current_queue_head; i < _current_queue_tail; ++i) { + Ref cjob = _queue[i]; + + if (cjob == job) { + _queue.write[i].unref(); + + for (int j = i; j + 1 < _current_queue_tail; ++j) { + _queue.write[j] = _queue[j + 1]; + } + + --_current_queue_tail; + + _THREAD_SAFE_UNLOCK_ + return; + } + } + + _THREAD_SAFE_UNLOCK_ +} + +void ThreadPool::cancel_job_wait(Ref job) { + ERR_FAIL_COND(!job.is_valid()); + + job->set_cancelled(true); + + _THREAD_SAFE_LOCK_ + + for (int i = _current_queue_head; i < _current_queue_tail; ++i) { + Ref j = _queue[i]; + + if (j == job) { + _queue.write[i].unref(); + _THREAD_SAFE_UNLOCK_ + return; + } + } + + _THREAD_SAFE_UNLOCK_ + + for (int i = 0; i < _threads.size(); ++i) { + Ref j = _threads[i]->job; + + if (j == job) { + //wait until it's done + while (_threads[i]->job == job) { + OS::get_singleton()->delay_usec(100); + } + + return; + } + } +} + void ThreadPool::_thread_finished(ThreadPoolContext *context) { _THREAD_SAFE_LOCK_ @@ -381,9 +378,9 @@ void ThreadPool::_bind_methods() { ClassDB::bind_method(D_METHOD("has_job", "job"), &ThreadPool::has_job); ClassDB::bind_method(D_METHOD("add_job", "job"), &ThreadPool::add_job); + ClassDB::bind_method(D_METHOD("cancel_job", "job"), &ThreadPool::cancel_job); + ClassDB::bind_method(D_METHOD("cancel_job_wait", "job"), &ThreadPool::cancel_job_wait); + ClassDB::bind_method(D_METHOD("register_update"), &ThreadPool::register_update); ClassDB::bind_method(D_METHOD("update"), &ThreadPool::update); - - ClassDB::bind_method(D_METHOD("cancel_job_wait", "job"), &ThreadPool::cancel_job_wait); - ClassDB::bind_method(D_METHOD("cancel_job", "job"), &ThreadPool::cancel_job); } diff --git a/thread_pool.h b/thread_pool.h index 87ca5d1..dcb8317 100644 --- a/thread_pool.h +++ b/thread_pool.h @@ -79,12 +79,12 @@ public: float get_max_time_per_frame() const; void set_max_time_per_frame(const bool value); - void cancel_job_wait(Ref job); - void cancel_job(Ref job); - bool has_job(const Ref &job); void add_job(const Ref &job); + void cancel_job(Ref job); + void cancel_job_wait(Ref job); + void _thread_finished(ThreadPoolContext *context); static void _worker_thread_func(void *user_data);