From 56ece67eed6a8d9110288c13ebd496a1471587c5 Mon Sep 17 00:00:00 2001 From: insofaras Date: Thu, 28 Jul 2016 19:09:08 +0100 Subject: [PATCH] linux new job queue stuff --- linux_4ed.cpp | 398 +++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 364 insertions(+), 34 deletions(-) diff --git a/linux_4ed.cpp b/linux_4ed.cpp index 3fd1fb94..013d3211 100644 --- a/linux_4ed.cpp +++ b/linux_4ed.cpp @@ -103,6 +103,8 @@ #define SUPPORT_DPI 1 #define LINUX_FONTS 1 +#define InterlockedCompareExchange(dest, ex, comp) __sync_val_compare_and_swap((dest), (comp), (ex)) + // // Linux structs / enums // @@ -148,6 +150,8 @@ struct Thread_Group{ Thread_Context *threads; i32 count; + Unbounded_Work_Queue queue; + i32 cancel_lock0; i32 cancel_cv0; }; @@ -252,6 +256,9 @@ internal void LinuxStringDup(String*, void*, size_t); internal Sys_Acquire_Lock_Sig(system_acquire_lock); internal Sys_Release_Lock_Sig(system_release_lock); +internal void system_wait_cv(i32, i32); +internal void system_signal_cv(i32, i32); + // // Linux static assertions // @@ -983,8 +990,11 @@ Sys_CLI_End_Update_Sig(system_cli_end_update){ // Threads // +//#define OLD_JOB_QUEUE + +#ifdef OLD_JOB_QUEUE internal void* -ThreadProc(void* arg){ +JobThreadProc(void* arg){ Thread_Context *thread = (Thread_Context*)arg; Work_Queue *queue = linuxvars.queues + thread->group_id; Thread_Group *group = linuxvars.groups + thread->group_id; @@ -1005,27 +1015,27 @@ ThreadProc(void* arg){ for (;;){ u32 read_index = queue->read_position; u32 write_index = queue->write_position; - + if (read_index != write_index){ - u32 next_read_index = (read_index + 1) % JOB_ID_WRAP; + u32 next_read_index = (read_index + 1) % QUEUE_WRAP; u32 safe_read_index = - __sync_val_compare_and_swap(&queue->read_position, - read_index, next_read_index); - + __sync_val_compare_and_swap(&queue->read_position, + read_index, next_read_index); + if (safe_read_index == read_index){ - Full_Job_Data *full_job = queue->jobs + (safe_read_index % QUEUE_WRAP); + Full_Job_Data *full_job = queue->jobs + safe_read_index; // NOTE(allen): This is interlocked so that it plays nice // with the cancel job routine, which may try to cancel this job // at the same time that we try to run it - + i32 safe_running_thread = - __sync_val_compare_and_swap(&full_job->running_thread, - THREAD_NOT_ASSIGNED, thread->id); - + __sync_val_compare_and_swap(&full_job->running_thread, + THREAD_NOT_ASSIGNED, thread->id); + if (safe_running_thread == THREAD_NOT_ASSIGNED){ thread->job_id = full_job->id; thread->running = 1; - + full_job->job.callback(&linuxvars.system, thread, thread_memory, full_job->job.data); full_job->running_thread = 0; thread->running = 0; @@ -1050,17 +1060,17 @@ ThreadProc(void* arg){ internal Sys_Post_Job_Sig(system_post_job){ Work_Queue *queue = linuxvars.queues + group_id; - + Assert((queue->write_position + 1) % QUEUE_WRAP != queue->read_position % QUEUE_WRAP); - + b32 success = 0; u32 result = 0; while (!success){ u32 write_index = queue->write_position; - u32 next_write_index = (write_index + 1) % JOB_ID_WRAP; + u32 next_write_index = (write_index + 1) % QUEUE_WRAP; u32 safe_write_index = - __sync_val_compare_and_swap(&queue->write_position, - write_index, next_write_index); + __sync_val_compare_and_swap(&queue->write_position, + write_index, next_write_index); if (safe_write_index == write_index){ result = write_index; write_index = write_index % QUEUE_WRAP; @@ -1070,7 +1080,7 @@ Sys_Post_Job_Sig(system_post_job){ success = 1; } } - + sem_post(LinuxHandleToSem(queue->semaphore)); return result; @@ -1080,19 +1090,15 @@ internal Sys_Cancel_Job_Sig(system_cancel_job){ Work_Queue *queue = linuxvars.queues + group_id; Thread_Group *group = linuxvars.groups + group_id; - - u32 job_index; - u32 thread_id; - Full_Job_Data *full_job; - - job_index = job_id % QUEUE_WRAP; - full_job = queue->jobs + job_index; - + + u32 job_index = job_id % QUEUE_WRAP; + Full_Job_Data *full_job = queue->jobs + job_index; + Assert(full_job->id == job_id); - thread_id = - __sync_val_compare_and_swap(&full_job->running_thread, - THREAD_NOT_ASSIGNED, 0); - + u32 thread_id = + __sync_val_compare_and_swap(&full_job->running_thread, + THREAD_NOT_ASSIGNED, 0); + if (thread_id != THREAD_NOT_ASSIGNED && thread_id != 0){ i32 thread_index = thread_id - 1; @@ -1137,7 +1143,7 @@ internal Sys_Grow_Thread_Memory_Sig(system_grow_thread_memory){ void *old_data; i32 old_size, new_size; - + system_acquire_lock(CANCEL_LOCK0 + memory->id - 1); old_data = memory->data; old_size = memory->size; @@ -1151,6 +1157,299 @@ Sys_Grow_Thread_Memory_Sig(system_grow_thread_memory){ system_release_lock(CANCEL_LOCK0 + memory->id - 1); } +#else // new job queue + +internal void* +JobThreadProc(void* lpParameter){ + Thread_Context *thread = (Thread_Context*)lpParameter; + Work_Queue *queue = linuxvars.queues + thread->group_id; + Thread_Group *group = linuxvars.groups + thread->group_id; + + i32 thread_index = thread->id - 1; + + i32 cancel_lock = group->cancel_lock0 + thread_index; + i32 cancel_cv = group->cancel_cv0 + thread_index; + + Thread_Memory *thread_memory = linuxvars.thread_memory + thread_index; + + if (thread_memory->size == 0){ + i32 new_size = Kbytes(64); + thread_memory->data = LinuxGetMemory(new_size); + thread_memory->size = new_size; + } + + for (;;){ + u32 read_index = queue->read_position; + u32 write_index = queue->write_position; + + if (read_index != write_index){ + // NOTE(allen): Previously I was wrapping by the job wrap then + // wrapping by the queue wrap. That was super stupid what was that? + // Now it just wraps by the queue wrap. + u32 next_read_index = (read_index + 1) % QUEUE_WRAP; + u32 safe_read_index = + InterlockedCompareExchange(&queue->read_position, + next_read_index, read_index); + + if (safe_read_index == read_index){ + Full_Job_Data *full_job = queue->jobs + safe_read_index; + // NOTE(allen): This is interlocked so that it plays nice + // with the cancel job routine, which may try to cancel this job + // at the same time that we try to run it + + i32 safe_running_thread = + InterlockedCompareExchange(&full_job->running_thread, + thread->id, THREAD_NOT_ASSIGNED); + + if (safe_running_thread == THREAD_NOT_ASSIGNED){ + thread->job_id = full_job->id; + thread->running = 1; + + full_job->job.callback(&linuxvars.system, + thread, thread_memory, full_job->job.data); + LinuxScheduleStep(); + //full_job->running_thread = 0; + thread->running = 0; + + system_acquire_lock(cancel_lock); + if (thread->cancel){ + thread->cancel = 0; + system_signal_cv(cancel_lock, cancel_cv); + } + system_release_lock(cancel_lock); + } + } + } + else{ + sem_wait(LinuxHandleToSem(queue->semaphore)); + } + } +} + +internal void +initialize_unbounded_queue(Unbounded_Work_Queue *source_queue){ + i32 max = 512; + source_queue->jobs = (Full_Job_Data*)system_get_memory(max*sizeof(Full_Job_Data)); + source_queue->count = 0; + source_queue->max = max; + source_queue->skip = 0; +} + +inline i32 +get_work_queue_available_space(i32 write, i32 read){ + // NOTE(allen): The only time that queue->write_position == queue->read_position + // is allowed is when the queue is empty. Thus if + // queue->write_position+1 == queue->read_position the available space is zero. + // So these computations both end up leaving one slot unused. The only way I can + // think to easily eliminate this is to have read and write wrap at twice the size + // of the underlying array but modulo their values into the array then if write + // has caught up with read it still will not be equal... but lots of modulos... ehh. + + i32 available_space = 0; + if (write >= read){ + available_space = QUEUE_WRAP - (write - read) - 1; + } + else{ + available_space = (read - write) - 1; + } + + return(available_space); +} + +#define UNBOUNDED_SKIP_MAX 128 + +internal void +flush_to_direct_queue(Unbounded_Work_Queue *source_queue, Work_Queue *queue, i32 thread_count){ + // NOTE(allen): It is understood that read_position may be changed by other + // threads but it will only make more space in the queue if it is changed. + // Meanwhile write_position should not ever be changed by anything but the + // main thread in this system, so it will not be interlocked. + u32 read_position = queue->read_position; + u32 write_position = queue->write_position; + u32 available_space = get_work_queue_available_space(write_position, read_position); + u32 available_jobs = source_queue->count - source_queue->skip; + + u32 writable_count = Min(available_space, available_jobs); + + if (writable_count > 0){ + u32 count1 = writable_count; + + if (count1+write_position > QUEUE_WRAP){ + count1 = QUEUE_WRAP - write_position; + } + + u32 count2 = writable_count - count1; + + Full_Job_Data *job_src1 = source_queue->jobs + source_queue->skip; + Full_Job_Data *job_src2 = job_src1 + count1; + + Full_Job_Data *job_dst1 = queue->jobs + write_position; + Full_Job_Data *job_dst2 = queue->jobs; + + Assert((job_src1->id % QUEUE_WRAP) == write_position); + + memcpy(job_dst1, job_src1, sizeof(Full_Job_Data)*count1); + memcpy(job_dst2, job_src2, sizeof(Full_Job_Data)*count2); + queue->write_position = (write_position + writable_count) % QUEUE_WRAP; + + source_queue->skip += writable_count; + + if (source_queue->skip == source_queue->count){ + source_queue->skip = source_queue->count = 0; + } + else if (source_queue->skip > UNBOUNDED_SKIP_MAX){ + u32 left_over = source_queue->count - source_queue->skip; + memmove(source_queue->jobs, source_queue->jobs + source_queue->skip, + sizeof(Full_Job_Data)*left_over); + source_queue->count = left_over; + source_queue->skip = 0; + } + } + + i32 semaphore_release_count = writable_count; + if (semaphore_release_count > thread_count){ + semaphore_release_count = thread_count; + } + + // NOTE(allen): platform dependent portion... + // TODO(allen): pull out the duplicated part once I see + // that this is pretty much the same on linux. + for (i32 i = 0; i < semaphore_release_count; ++i){ + sem_post(LinuxHandleToSem(queue->semaphore)); + } +} + +internal void +flush_thread_group(i32 group_id){ + Thread_Group *group = linuxvars.groups + group_id; + Work_Queue *queue = linuxvars.queues + group_id; + Unbounded_Work_Queue *source_queue = &group->queue; + flush_to_direct_queue(source_queue, queue, group->count); +} + +// Note(allen): post_job puts the job on the unbounded queue. +// The unbounded queue is entirely managed by the main thread. +// The thread safe queue is bounded in size so the unbounded +// queue is periodically flushed into the direct work queue. +internal +Sys_Post_Job_Sig(system_post_job){ + Thread_Group *group = linuxvars.groups + group_id; + Unbounded_Work_Queue *queue = &group->queue; + + u32 result = queue->next_job_id++; + + while (queue->count >= queue->max){ + i32 new_max = queue->max*2; + Full_Job_Data *new_jobs = (Full_Job_Data*) + system_get_memory(new_max*sizeof(Full_Job_Data)); + + memcpy(new_jobs, queue->jobs, queue->count); + + system_free_memory(queue->jobs); + + queue->jobs = new_jobs; + queue->max = new_max; + } + + Full_Job_Data full_job; + + full_job.job = job; + full_job.running_thread = THREAD_NOT_ASSIGNED; + full_job.id = result; + + queue->jobs[queue->count++] = full_job; + + Work_Queue *direct_queue = linuxvars.queues + group_id; + flush_to_direct_queue(queue, direct_queue, group->count); + + return(result); +} + +internal +Sys_Cancel_Job_Sig(system_cancel_job){ + Thread_Group *group = linuxvars.groups + group_id; + Unbounded_Work_Queue *source_queue = &group->queue; + + b32 handled_in_unbounded = false; + if (source_queue->skip < source_queue->count){ + Full_Job_Data *first_job = source_queue->jobs + source_queue->skip; + if (first_job->id <= job_id){ + u32 index = source_queue->skip + (job_id - first_job->id); + Full_Job_Data *job = source_queue->jobs + index; + job->running_thread = 0; + handled_in_unbounded = true; + } + } + + if (!handled_in_unbounded){ + Work_Queue *queue = linuxvars.queues + group_id; + Full_Job_Data *job = queue->jobs + (job_id % QUEUE_WRAP); + Assert(job->id == job_id); + + u32 thread_id = + InterlockedCompareExchange(&job->running_thread, + 0, THREAD_NOT_ASSIGNED); + + if (thread_id != THREAD_NOT_ASSIGNED && thread_id != 0){ + i32 thread_index = thread_id - 1; + + i32 cancel_lock = group->cancel_lock0 + thread_index; + i32 cancel_cv = group->cancel_cv0 + thread_index; + Thread_Context *thread = group->threads + thread_index; + + + system_acquire_lock(cancel_lock); + + thread->cancel = 1; + + system_release_lock(FRAME_LOCK); + do{ + system_wait_cv(cancel_lock, cancel_cv); + }while (thread->cancel == 1); + system_acquire_lock(FRAME_LOCK); + + system_release_lock(cancel_lock); + } + } +} + +internal +Sys_Check_Cancel_Sig(system_check_cancel){ + b32 result = 0; + + Thread_Group *group = linuxvars.groups + thread->group_id; + i32 thread_index = thread->id - 1; + i32 cancel_lock = group->cancel_lock0 + thread_index; + + system_acquire_lock(cancel_lock); + if (thread->cancel){ + result = 1; + } + system_release_lock(cancel_lock); + + return(result); +} + +internal +Sys_Grow_Thread_Memory_Sig(system_grow_thread_memory){ + void *old_data; + i32 old_size, new_size; + + system_acquire_lock(CANCEL_LOCK0 + memory->id - 1); + old_data = memory->data; + old_size = memory->size; + new_size = LargeRoundUp(memory->size*2, Kbytes(4)); + memory->data = system_get_memory(new_size); + memory->size = new_size; + if (old_data){ + memcpy(memory->data, old_data, old_size); + system_free_memory(old_data); + } + system_release_lock(CANCEL_LOCK0 + memory->id - 1); +} + +#endif // OLD_JOB_QUEUE + internal Sys_Acquire_Lock_Sig(system_acquire_lock){ pthread_mutex_lock(linuxvars.locks + id); @@ -1161,6 +1460,16 @@ Sys_Release_Lock_Sig(system_release_lock){ pthread_mutex_unlock(linuxvars.locks + id); } +internal void +system_wait_cv(i32 lock_id, i32 cv_id){ + pthread_cond_wait(linuxvars.conds + cv_id, linuxvars.locks + lock_id); +} + +internal void +system_signal_cv(i32 lock_id, i32 cv_id){ + pthread_cond_signal(linuxvars.conds + cv_id); +} + // // Debug // @@ -1172,19 +1481,36 @@ INTERNAL_Sys_Sentinel_Sig(internal_sentinel){ return (&linuxvars.internal_bubble); } +#ifdef OLD_JOB_QUEUE internal INTERNAL_Sys_Get_Thread_States_Sig(internal_get_thread_states){ Work_Queue *queue = linuxvars.queues + id; u32 write = queue->write_position; u32 read = queue->read_position; - if (write < read) write += JOB_ID_WRAP; + if (write < read) write += QUEUE_WRAP; *pending = (i32)(write - read); - + Thread_Group *group = linuxvars.groups + id; for (i32 i = 0; i < group->count; ++i){ running[i] = (group->threads[i].running != 0); } } +#else +internal +INTERNAL_Sys_Get_Thread_States_Sig(internal_get_thread_states){ + Thread_Group *group = linuxvars.groups + id; + Unbounded_Work_Queue *source_queue = &group->queue; + Work_Queue *queue = linuxvars.queues + id; + u32 write = queue->write_position; + u32 read = queue->read_position; + if (write < read) write += QUEUE_WRAP; + *pending = (i32)(write - read) + source_queue->count - source_queue->skip; + + for (i32 i = 0; i < group->count; ++i){ + running[i] = (group->threads[i].running != 0); + } +} +#endif internal INTERNAL_Sys_Debug_Message_Sig(internal_debug_message){ @@ -2798,9 +3124,11 @@ main(int argc, char **argv) memory->id = thread->id; thread->queue = &linuxvars.queues[BACKGROUND_THREADS]; - pthread_create(&thread->handle, NULL, &ThreadProc, thread); + pthread_create(&thread->handle, NULL, &JobThreadProc, thread); } + initialize_unbounded_queue(&linuxvars.groups[BACKGROUND_THREADS].queue); + for(i32 i = 0; i < LOCK_COUNT; ++i){ pthread_mutex_init(linuxvars.locks + i, NULL); } @@ -3082,6 +3410,8 @@ main(int argc, char **argv) linuxvars.cursor = result.mouse_cursor_type; } + flush_thread_group(BACKGROUND_THREADS); + linuxvars.input.first_step = 0; linuxvars.input.keys = key_input_data_zero(); linuxvars.input.mouse.press_l = 0;