diff --git a/src/utils/worker.cpp b/src/utils/worker.cpp index e0790d054..277729a20 100644 --- a/src/utils/worker.cpp +++ b/src/utils/worker.cpp @@ -67,10 +67,13 @@ struct worker_state_observer { }; struct task_data { - task_data() {} - task_data(task_t task_, void *data_) : task(task_), data(data_) {} - task_t task; - void *data; + task_data(task_t task, void *data, worker *w) : m_task(task), m_data(data), + m_result(0), m_w(w), m_returned(false) {} + task_t m_task; + void *m_data; + void *m_result; + bool m_returned; + struct worker *m_w; }; struct worker { @@ -84,8 +87,7 @@ struct worker { pthread_create(&m_thread_id, NULL, worker::enter_loop, this); } ~worker() { - task_data poisoned; - poisoned.task = NULL; + task_data *poisoned = new task_data(NULL, NULL, this); this->push(poisoned); pthread_join(m_thread_id, NULL); @@ -96,14 +98,13 @@ struct worker { static void *enter_loop(void *args); void run(); - void push(task_data); - void *pop(); + void push(task_data *); + void *pop(task_data *); - queue m_data; + queue m_data; pthread_mutex_t m_lock; pthread_cond_t m_task_ready_cv; pthread_cond_t m_task_completed_cv; - queue m_result; pthread_t m_thread_id; worker_state_observer &m_state_observer; @@ -118,7 +119,7 @@ void *worker::enter_loop(void *args) { void worker::run() { while(1) { - struct task_data data; + struct task_data *data; pthread_mutex_lock(&m_lock); while(m_data.empty()) { pthread_cond_wait(&m_task_ready_cv, &m_lock); @@ -128,36 +129,39 @@ void worker::run() { pthread_mutex_unlock(&m_lock); // poisoned pill - if(data.task == NULL) { + if(data->m_task == NULL) { + delete data; return; } - void *res = data.task(data.data); + void *res = data->m_task(data->m_data); pthread_mutex_lock(&m_lock); - m_result.push(res); + data->m_result = res; + data->m_returned = true; pthread_cond_signal(&m_task_completed_cv); m_state_observer.notify(this); pthread_mutex_unlock(&m_lock); } } -void worker::push(task_data data) { +void worker::push(task_data *data) { pthread_mutex_lock(&m_lock); + assert(m_data.size() == 0); m_data.push(data); pthread_cond_signal(&m_task_ready_cv); pthread_mutex_unlock(&m_lock); } -void *worker::pop() { +void *worker::pop(task_data *d) { void *res = NULL; pthread_mutex_lock(&m_lock); - while(m_result.empty()) { + while(!d->m_returned) { pthread_cond_wait(&m_task_completed_cv, &m_lock); } - res = m_result.front(); - m_result.pop(); + res = d->m_result; + delete d; pthread_mutex_unlock(&m_lock); return res; @@ -211,17 +215,19 @@ task_result_handle_t worker_pool::run_async(task_t task, void *data) /// @todo: really weird - it seems like that 'it' instead of 'w' caused some problems m_empty_workers.erase(w); m_occupied_workers.insert(w); + + task_data *d = new task_data(task, data, w); + w->push(d); pthread_mutex_unlock(&m_lock); - w->push(task_data(task, data)); - - return w; + return d; } void *worker_pool::wait_task(task_result_handle_t handle) { - worker *w = (worker *) handle; - return w->pop(); + task_data *d = (task_data *) handle; + worker *w = d->m_w; + return w->pop(d); } static class worker_pool instance;