From 6dd5b3b87b573159c5b6f60451399e312fa75cbd Mon Sep 17 00:00:00 2001 From: Allen Webster Date: Tue, 19 Jul 2016 19:47:27 -0400 Subject: [PATCH] unbounded job queue --- 4ed_file_view.cpp | 17 ++- 4ed_meta.h | 1 + 4ed_system.h | 17 ++- win32_4ed.cpp | 347 +++++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 353 insertions(+), 29 deletions(-) diff --git a/4ed_file_view.cpp b/4ed_file_view.cpp index d20cc1df..3da215e8 100644 --- a/4ed_file_view.cpp +++ b/4ed_file_view.cpp @@ -2702,12 +2702,19 @@ compute_this_indent(Buffer *buffer, Indent_Parse_State indent, prev_token.start + prev_token.size > this_line_start){ if (prev_token.type == CPP_TOKEN_COMMENT){ Hard_Start_Result hard_start = buffer_find_hard_start(buffer, this_line_start, tab_width); - i32 line_pos = hard_start.char_pos - this_line_start; - this_indent = line_pos + indent.comment_shift; - if (this_indent < 0){ - this_indent = 0; + + if (hard_start.all_whitespace){ + this_indent = previous_indent; + did_special_behavior = true; + } + else{ + i32 line_pos = hard_start.char_pos - this_line_start; + this_indent = line_pos + indent.comment_shift; + if (this_indent < 0){ + this_indent = 0; + } + did_special_behavior = true; } - did_special_behavior = true; } else if (prev_token.type == CPP_TOKEN_STRING_CONSTANT){ this_indent = previous_indent; diff --git a/4ed_meta.h b/4ed_meta.h index 6e3fcb9d..4642f2c6 100644 --- a/4ed_meta.h +++ b/4ed_meta.h @@ -26,6 +26,7 @@ typedef int16_t i16; typedef i32 bool32; typedef i8 bool8; typedef i32 b32; +typedef i16 b16; typedef i8 b8; typedef uint8_t byte; diff --git a/4ed_system.h b/4ed_system.h index 0ca63e63..7b3d515a 100644 --- a/4ed_system.h +++ b/4ed_system.h @@ -167,20 +167,26 @@ typedef Job_Callback_Sig(Job_Callback); struct Job_Data{ Job_Callback *callback; void *data[2]; - //i32 memory_request; }; struct Full_Job_Data{ Job_Data job; - u32 job_memory_index; u32 running_thread; - b32 finished; u32 id; }; +struct Unbounded_Work_Queue{ + Full_Job_Data *jobs; + i32 count, max, skip; + + u32 next_job_id; +}; + +#define QUEUE_WRAP 256 + struct Work_Queue{ - Full_Job_Data jobs[256]; + Full_Job_Data jobs[QUEUE_WRAP]; Plat_Handle semaphore; volatile u32 write_position; volatile u32 read_position; @@ -188,9 +194,6 @@ struct Work_Queue{ #define THREAD_NOT_ASSIGNED 0xFFFFFFFF -#define JOB_ID_WRAP (ArrayCount(queue->jobs) * 4) -#define QUEUE_WRAP (ArrayCount(queue->jobs)) - #define Sys_Post_Job_Sig(name) u32 name(Thread_Group_ID group_id, Job_Data job) typedef Sys_Post_Job_Sig(System_Post_Job); diff --git a/win32_4ed.cpp b/win32_4ed.cpp index 43744386..18d0fd34 100644 --- a/win32_4ed.cpp +++ b/win32_4ed.cpp @@ -64,6 +64,8 @@ struct Thread_Group{ Thread_Context *threads; i32 count; + Unbounded_Work_Queue queue; + i32 cancel_lock0; i32 cancel_cv0; }; @@ -320,6 +322,7 @@ system_signal_cv(i32 crit_id, i32 cv_id){ WakeConditionVariable(win32vars.condition_vars + cv_id); } +#if 0 internal DWORD JobThreadProc(LPVOID lpParameter){ Thread_Context *thread = (Thread_Context*)lpParameter; @@ -344,13 +347,16 @@ JobThreadProc(LPVOID lpParameter){ u32 write_index = queue->write_position; if (read_index != write_index){ - u32 next_read_index = (read_index + 1) % JOB_ID_WRAP; + // NOTE(allen): Previously I was wrapping by the job wrap then + // wrapping by the queue wrap. That was super stupid what was that? + // Now it just wraps by the queue wrap. + u32 next_read_index = (read_index + 1) % QUEUE_WRAP; u32 safe_read_index = InterlockedCompareExchange(&queue->read_position, next_read_index, read_index); if (safe_read_index == read_index){ - Full_Job_Data *full_job = queue->jobs + (safe_read_index % QUEUE_WRAP); + Full_Job_Data *full_job = queue->jobs + safe_read_index; // NOTE(allen): This is interlocked so that it plays nice // with the cancel job routine, which may try to cancel this job // at the same time that we try to run it @@ -394,13 +400,12 @@ Sys_Post_Job_Sig(system_post_job){ u32 result = 0; while (!success){ u32 write_index = queue->write_position; - u32 next_write_index = (write_index + 1) % JOB_ID_WRAP; + u32 next_write_index = (write_index + 1) % QUEUE_WRAP; u32 safe_write_index = InterlockedCompareExchange(&queue->write_position, next_write_index, write_index); if (safe_write_index == write_index){ result = write_index; - write_index = write_index % QUEUE_WRAP; queue->jobs[write_index].job = job; queue->jobs[write_index].running_thread = THREAD_NOT_ASSIGNED; queue->jobs[write_index].id = result; @@ -430,15 +435,11 @@ Sys_Cancel_Job_Sig(system_cancel_job){ Work_Queue *queue = win32vars.queues + group_id; Thread_Group *group = win32vars.groups + group_id; - u32 job_index; - u32 thread_id; - Full_Job_Data *full_job; - - job_index = job_id % QUEUE_WRAP; - full_job = queue->jobs + job_index; + u32 job_index = job_id % QUEUE_WRAP; + Full_Job_Data *full_job = queue->jobs + job_index; Assert(full_job->id == job_id); - thread_id = + u32 thread_id = InterlockedCompareExchange(&full_job->running_thread, 0, THREAD_NOT_ASSIGNED); @@ -505,7 +506,7 @@ INTERNAL_get_thread_states(Thread_Group_ID id, bool8 *running, i32 *pending){ Work_Queue *queue = win32vars.queues + id; u32 write = queue->write_position; u32 read = queue->read_position; - if (write < read) write += JOB_ID_WRAP; + if (write < read) write += QUEUE_WRAP; *pending = (i32)(write - read); Thread_Group *group = win32vars.groups + id; @@ -515,6 +516,315 @@ INTERNAL_get_thread_states(Thread_Group_ID id, bool8 *running, i32 *pending){ } #endif +#else + +internal DWORD +JobThreadProc(LPVOID lpParameter){ + Thread_Context *thread = (Thread_Context*)lpParameter; + Work_Queue *queue = win32vars.queues + thread->group_id; + Thread_Group *group = win32vars.groups + thread->group_id; + + i32 thread_index = thread->id - 1; + + i32 cancel_lock = group->cancel_lock0 + thread_index; + i32 cancel_cv = group->cancel_cv0 + thread_index; + + Thread_Memory *thread_memory = win32vars.thread_memory + thread_index; + + if (thread_memory->size == 0){ + i32 new_size = Kbytes(64); + thread_memory->data = Win32GetMemory(new_size); + thread_memory->size = new_size; + } + + for (;;){ + u32 read_index = queue->read_position; + u32 write_index = queue->write_position; + + if (read_index != write_index){ + // NOTE(allen): Previously I was wrapping by the job wrap then + // wrapping by the queue wrap. That was super stupid what was that? + // Now it just wraps by the queue wrap. + u32 next_read_index = (read_index + 1) % QUEUE_WRAP; + u32 safe_read_index = + InterlockedCompareExchange(&queue->read_position, + next_read_index, read_index); + + if (safe_read_index == read_index){ + Full_Job_Data *full_job = queue->jobs + safe_read_index; + // NOTE(allen): This is interlocked so that it plays nice + // with the cancel job routine, which may try to cancel this job + // at the same time that we try to run it + + i32 safe_running_thread = + InterlockedCompareExchange(&full_job->running_thread, + thread->id, THREAD_NOT_ASSIGNED); + + if (safe_running_thread == THREAD_NOT_ASSIGNED){ + thread->job_id = full_job->id; + thread->running = 1; + + full_job->job.callback(&win32vars.system, + thread, thread_memory, full_job->job.data); + PostMessage(win32vars.window_handle, WM_4coder_ANIMATE, 0, 0); + full_job->running_thread = 0; + thread->running = 0; + + system_acquire_lock(cancel_lock); + if (thread->cancel){ + thread->cancel = 0; + system_signal_cv(cancel_lock, cancel_cv); + } + system_release_lock(cancel_lock); + } + } + } + else{ + WaitForSingleObject(Win32Handle(queue->semaphore), INFINITE); + } + } +} + +internal void +initialize_unbounded_queue(Unbounded_Work_Queue *source_queue){ + i32 max = 512; + source_queue->jobs = (Full_Job_Data*)system_get_memory(max*sizeof(Full_Job_Data)); + source_queue->count = 0; + source_queue->max = max; + source_queue->skip = 0; +} + +inline i32 +get_work_queue_available_space(i32 write, i32 read){ + // NOTE(allen): The only time that queue->write_position == queue->read_position + // is allowed is when the queue is empty. Thus if + // queue->write_position+1 == queue->read_position the available space is zero. + // So these computations both end up leaving one slot unused. The only way I can + // think to easily eliminate this is to have read and write wrap at twice the size + // of the underlying array but modulo their values into the array then if write + // has caught up with read it still will not be equal... but lots of modulos... ehh. + + i32 available_space = 0; + if (write >= read){ + available_space = QUEUE_WRAP - (write - read) - 1; + } + else{ + available_space = (read - write) - 1; + } + + return(available_space); +} + +#define UNBOUNDED_SKIP_MAX 128 + +internal void +flush_to_direct_queue(Unbounded_Work_Queue *source_queue, Work_Queue *queue, i32 thread_count){ + // NOTE(allen): It is understood that read_position may be changed by other + // threads but it will only make more space in the queue if it is changed. + // Meanwhile write_position should not ever be changed by anything but the + // main thread in this system, so it will not be interlocked. + u32 read_position = queue->read_position; + u32 write_position = queue->write_position; + u32 available_space = get_work_queue_available_space(write_position, read_position); + u32 available_jobs = source_queue->count - source_queue->skip; + + u32 writable_count = Min(available_space, available_jobs); + + if (writable_count > 0){ + u32 count1 = writable_count; + + if (count1+write_position > QUEUE_WRAP){ + count1 = QUEUE_WRAP - write_position; + } + + u32 count2 = writable_count - count1; + + Full_Job_Data *job_src1 = source_queue->jobs + source_queue->skip; + Full_Job_Data *job_src2 = job_src1 + count1; + + Full_Job_Data *job_dst1 = queue->jobs + write_position; + Full_Job_Data *job_dst2 = queue->jobs; + + Assert((job_src1->id % QUEUE_WRAP) == write_position); + + memcpy(job_dst1, job_src1, sizeof(Full_Job_Data)*count1); + memcpy(job_dst2, job_src2, sizeof(Full_Job_Data)*count2); + queue->write_position = (write_position + writable_count) % QUEUE_WRAP; + + source_queue->skip += writable_count; + + if (source_queue->skip == source_queue->count){ + source_queue->skip = source_queue->count = 0; + } + else if (source_queue->skip > UNBOUNDED_SKIP_MAX){ + u32 left_over = source_queue->count - source_queue->skip; + memmove(queue->jobs, queue->jobs + source_queue->skip, left_over); + source_queue->count = left_over; + source_queue->skip = 0; + } + } + + i32 semaphore_release_count = writable_count; + if (semaphore_release_count > thread_count){ + semaphore_release_count = thread_count; + } + + // NOTE(allen): platform dependent portion... + // TODO(allen): pull out the duplicated part once I see + // that this is pretty much the same on linux. + for (i32 i = 0; i < semaphore_release_count; ++i){ + ReleaseSemaphore(Win32Handle(queue->semaphore), 1, 0); + } +} + +internal void +flush_thread_group(i32 group_id){ + Thread_Group *group = win32vars.groups + group_id; + Work_Queue *queue = win32vars.queues + group_id; + Unbounded_Work_Queue *source_queue = &group->queue; + flush_to_direct_queue(source_queue, queue, group->count); +} + +// Note(allen): post_job puts the job on the unbounded queue. +// The unbounded queue is entirely managed by the main thread. +// The thread safe queue is bounded in size so the unbounded +// queue is periodically flushed into the direct work queue. +internal +Sys_Post_Job_Sig(system_post_job){ + Thread_Group *group = win32vars.groups + group_id; + Unbounded_Work_Queue *queue = &group->queue; + + u32 result = queue->next_job_id++; + + while (queue->count >= queue->max){ + i32 new_max = queue->max*2; + Full_Job_Data *new_jobs = (Full_Job_Data*) + system_get_memory(new_max*sizeof(Full_Job_Data)); + + memcpy(new_jobs, queue->jobs, queue->count); + + system_free_memory(queue->jobs); + + queue->jobs = new_jobs; + queue->max = new_max; + } + + Full_Job_Data full_job; + + full_job.job = job; + full_job.running_thread = THREAD_NOT_ASSIGNED; + full_job.id = result; + + queue->jobs[queue->count++] = full_job; + + Work_Queue *direct_queue = win32vars.queues + group_id; + flush_to_direct_queue(queue, direct_queue, group->count); + + return(result); +} + +internal +Sys_Cancel_Job_Sig(system_cancel_job){ + Thread_Group *group = win32vars.groups + group_id; + Unbounded_Work_Queue *source_queue = &group->queue; + + b32 handled_in_unbounded = false; + if (source_queue->skip < source_queue->count){ + Full_Job_Data *first_job = source_queue->jobs + source_queue->skip; + if (first_job->id <= job_id){ + u32 index = source_queue->skip + (job_id - first_job->id); + Full_Job_Data *job = source_queue->jobs + index; + job->running_thread = 0; + handled_in_unbounded = true; + } + } + + if (!handled_in_unbounded){ + Work_Queue *queue = win32vars.queues + group_id; + Full_Job_Data *job = queue->jobs + (job_id % QUEUE_WRAP); + Assert(job->id == job_id); + + u32 thread_id = + InterlockedCompareExchange(&job->running_thread, + 0, THREAD_NOT_ASSIGNED); + + if (thread_id != THREAD_NOT_ASSIGNED && thread_id != 0){ + i32 thread_index = thread_id - 1; + + i32 cancel_lock = group->cancel_lock0 + thread_index; + i32 cancel_cv = group->cancel_cv0 + thread_index; + Thread_Context *thread = group->threads + thread_index; + + + system_acquire_lock(cancel_lock); + + thread->cancel = 1; + + system_release_lock(FRAME_LOCK); + do{ + system_wait_cv(cancel_lock, cancel_cv); + }while (thread->cancel == 1); + system_acquire_lock(FRAME_LOCK); + + system_release_lock(cancel_lock); + } + } +} + +internal +Sys_Check_Cancel_Sig(system_check_cancel){ + b32 result = 0; + + Thread_Group *group = win32vars.groups + thread->group_id; + i32 thread_index = thread->id - 1; + i32 cancel_lock = group->cancel_lock0 + thread_index; + + system_acquire_lock(cancel_lock); + if (thread->cancel){ + result = 1; + } + system_release_lock(cancel_lock); + + return(result); +} + +internal +Sys_Grow_Thread_Memory_Sig(system_grow_thread_memory){ + void *old_data; + i32 old_size, new_size; + + system_acquire_lock(CANCEL_LOCK0 + memory->id - 1); + old_data = memory->data; + old_size = memory->size; + new_size = LargeRoundUp(memory->size*2, Kbytes(4)); + memory->data = system_get_memory(new_size); + memory->size = new_size; + if (old_data){ + memcpy(memory->data, old_data, old_size); + system_free_memory(old_data); + } + system_release_lock(CANCEL_LOCK0 + memory->id - 1); +} + +#if FRED_INTERNAL +internal void +INTERNAL_get_thread_states(Thread_Group_ID id, bool8 *running, i32 *pending){ + Thread_Group *group = win32vars.groups + id; + Unbounded_Work_Queue *source_queue = &group->queue; + Work_Queue *queue = win32vars.queues + id; + u32 write = queue->write_position; + u32 read = queue->read_position; + if (write < read) write += QUEUE_WRAP; + *pending = (i32)(write - read) + source_queue->count - source_queue->skip; + + for (i32 i = 0; i < group->count; ++i){ + running[i] = (group->threads[i].running != 0); + } +} +#endif + +#endif + // // Coroutines @@ -1675,9 +1985,9 @@ OpenGLDebugCallback(GLenum source, GLenum type, GLuint id, GLenum severity, GLsi int WinMain(HINSTANCE hInstance, - HINSTANCE hPrevInstance, - LPSTR lpCmdLine, - int nCmdShow){ + HINSTANCE hPrevInstance, + LPSTR lpCmdLine, + int nCmdShow){ int argc = __argc; char **argv = __argv; @@ -1739,6 +2049,8 @@ WinMain(HINSTANCE hInstance, thread->handle = CreateThread(0, 0, JobThreadProc, thread, creation_flag, (LPDWORD)&thread->windows_id); } + initialize_unbounded_queue(&win32vars.groups[BACKGROUND_THREADS].queue); + ConvertThreadToFiber(0); win32vars.coroutine_free = win32vars.coroutine_data; for (i32 i = 0; i+1 < ArrayCount(win32vars.coroutine_data); ++i){ @@ -2189,13 +2501,15 @@ WinMain(HINSTANCE hInstance, HDC hdc = GetDC(win32vars.window_handle); Win32RedrawScreen(hdc); ReleaseDC(win32vars.window_handle, hdc); - + win32vars.first = 0; if (result.animating){ PostMessage(win32vars.window_handle, WM_4coder_ANIMATE, 0, 0); } + flush_thread_group(BACKGROUND_THREADS); + u64 timer_end = Win32HighResolutionTime(); u64 end_target = timer_start + frame_useconds; @@ -2217,7 +2531,6 @@ WinMain(HINSTANCE hInstance, // application at some point. int main(int argc, char **argv){ HINSTANCE hInstance = GetModuleHandle(0); -} #endif // BOTTOM