From bfc6fc689d3346fec06d63623f7f801c8c32a67b Mon Sep 17 00:00:00 2001 From: Allen Webster Date: Tue, 18 Jul 2017 13:19:59 -0400 Subject: [PATCH] zipped all of the threading code together --- platform_all/4ed_work_queues.cpp | 94 ++++++++++++++++++------- platform_linux/linux_4ed.cpp | 108 +++++++---------------------- platform_win32/win32_4ed.cpp | 115 +++++++------------------------ 3 files changed, 117 insertions(+), 200 deletions(-) diff --git a/platform_all/4ed_work_queues.cpp b/platform_all/4ed_work_queues.cpp index e890ad5c..0d76f75f 100644 --- a/platform_all/4ed_work_queues.cpp +++ b/platform_all/4ed_work_queues.cpp @@ -9,11 +9,37 @@ // TOP +struct Threading_Vars{ + Thread_Memory *thread_memory; + Work_Queue queues[THREAD_GROUP_COUNT]; + Thread_Group groups[THREAD_GROUP_COUNT]; + Mutex locks[LOCK_COUNT]; + Condition_Variable conds[CV_COUNT]; +}; +global Threading_Vars threadvars; + +internal +Sys_Acquire_Lock_Sig(system_acquire_lock){ + system_acquire_lock(&threadvars.locks[id]); +} + +internal +Sys_Release_Lock_Sig(system_release_lock){ + system_release_lock(&threadvars.locks[id]); +} + internal void -job_proc(System_Functions *system, Thread_Context *thread, Work_Queue *queue, Thread_Group *group, Thread_Memory *thread_memory){ +job_thread_proc(System_Functions *system, Thread_Context *thread){ + Work_Queue *queue = threadvars.queues + thread->group_id; + Thread_Group *group = threadvars.groups + thread->group_id; + i32 thread_index = thread->id - 1; - i32 cancel_lock = group->cancel_lock0 + thread_index; - i32 cancel_cv = group->cancel_cv0 + thread_index; + Thread_Memory *thread_memory = threadvars.thread_memory + thread_index; + + i32 cancel_lock_id = group->cancel_lock0 + thread_index; + i32 cancel_cv_id = group->cancel_cv0 + thread_index; + Mutex *cancel_lock = &threadvars.locks[cancel_lock_id]; + Condition_Variable *cancel_cv = &threadvars.conds[cancel_cv_id]; if (thread_memory->size == 0){ i32 new_size = KB(64); @@ -47,7 +73,7 @@ job_proc(System_Functions *system, Thread_Context *thread, Work_Queue *queue, Th system_acquire_lock(cancel_lock); if (thread->cancel){ thread->cancel = 0; - system_signal_cv(cancel_lock, cancel_cv); + system_signal_cv(cancel_cv, cancel_lock); } system_release_lock(cancel_lock); } @@ -92,7 +118,12 @@ get_work_queue_available_space(i32 write, i32 read){ #define UNBOUNDED_SKIP_MAX 128 internal i32 -flush_to_direct_queue(Unbounded_Work_Queue *source_queue, Work_Queue *queue, i32 thread_count){ +flush_thread_group(Thread_Group_ID group_id){ + Thread_Group *group = threadvars.groups + group_id; + Work_Queue *queue = threadvars.queues + group_id; + Unbounded_Work_Queue *source_queue = &group->queue; + i32 thread_count = group->count; + // NOTE(allen): It is understood that read_position may be changed by other // threads but it will only make more space in the queue if it is changed. // Meanwhile write_position should not ever be changed by anything but the @@ -155,8 +186,9 @@ flush_to_direct_queue(Unbounded_Work_Queue *source_queue, Work_Queue *queue, i32 // The unbounded queue is entirely managed by the main thread. // The thread safe queue is bounded in size so the unbounded // queue is periodically flushed into the direct work queue. -internal u32 -post_job(Thread_Group *group, Work_Queue *direct_queue, Job_Data job){ +internal +Sys_Post_Job_Sig(system_post_job){ + Thread_Group *group = threadvars.groups + group_id; Unbounded_Work_Queue *queue = &group->queue; u32 result = queue->next_job_id++; @@ -176,13 +208,16 @@ post_job(Thread_Group *group, Work_Queue *direct_queue, Job_Data job){ full_job.id = result; queue->jobs[queue->count++] = full_job; - flush_to_direct_queue(queue, direct_queue, group->count); + flush_thread_group(group_id); return(result); } -internal void -cancel_job(Thread_Group *group, Work_Queue *queue, u32 job_id){ +internal +Sys_Cancel_Job_Sig(system_cancel_job){ + Thread_Group *group = threadvars.groups + group_id; + Work_Queue *queue = threadvars.queues + group_id; + Unbounded_Work_Queue *source_queue = &group->queue; b32 handled_in_unbounded = false; @@ -205,29 +240,36 @@ cancel_job(Thread_Group *group, Work_Queue *queue, u32 job_id){ if (thread_id != THREAD_NOT_ASSIGNED && thread_id != 0){ i32 thread_index = thread_id - 1; - i32 cancel_lock = group->cancel_lock0 + thread_index; - i32 cancel_cv = group->cancel_cv0 + thread_index; + i32 cancel_lock_id = group->cancel_lock0 + thread_index; + i32 cancel_cv_id = group->cancel_cv0 + thread_index; + Mutex *cancel_lock = &threadvars.locks[cancel_lock_id]; + Condition_Variable *cancel_cv = &threadvars.conds[cancel_cv_id]; + Thread_Context *thread = group->threads + thread_index; system_acquire_lock(cancel_lock); thread->cancel = true; - system_release_lock(FRAME_LOCK); + system_release_lock(&threadvars.locks[FRAME_LOCK]); do{ - system_wait_cv(cancel_lock, cancel_cv); + system_wait_cv(cancel_cv, cancel_lock); }while (thread->cancel); - system_acquire_lock(FRAME_LOCK); + system_acquire_lock(&threadvars.locks[FRAME_LOCK]); system_release_lock(cancel_lock); } } } -internal b32 -check_cancel_status(Thread_Group *group, Thread_Context *thread){ +internal +Sys_Check_Cancel_Sig(system_check_cancel){ + Thread_Group *group = threadvars.groups + thread->group_id; + b32 result = false; i32 thread_index = thread->id - 1; - i32 cancel_lock = group->cancel_lock0 + thread_index; + i32 cancel_lock_id = group->cancel_lock0 + thread_index; + Mutex *cancel_lock = &threadvars.locks[cancel_lock_id]; + system_acquire_lock(cancel_lock); if (thread->cancel){ result = true; @@ -236,9 +278,11 @@ check_cancel_status(Thread_Group *group, Thread_Context *thread){ return(result); } -internal void -grow_thread_memory(Thread_Memory *memory){ - system_acquire_lock(CANCEL_LOCK0 + memory->id - 1); +internal +Sys_Grow_Thread_Memory_Sig(system_grow_thread_memory){ + Mutex *cancel_lock = &threadvars.locks[CANCEL_LOCK0 + memory->id - 1]; + + system_acquire_lock(cancel_lock); void *old_data = memory->data; i32 old_size = memory->size; i32 new_size = l_round_up_i32(memory->size*2, KB(4)); @@ -248,11 +292,13 @@ grow_thread_memory(Thread_Memory *memory){ memcpy(memory->data, old_data, old_size); system_memory_free(old_data, old_size); } - system_release_lock(CANCEL_LOCK0 + memory->id - 1); + system_release_lock(cancel_lock); } -internal void -dbg_get_thread_states(Thread_Group *group, Work_Queue *queue, b8 *running, i32 *pending){ +internal +INTERNAL_Sys_Get_Thread_States_Sig(system_internal_get_thread_states){ + Thread_Group *group = threadvars.groups + id; + Work_Queue *queue = threadvars.queues + id; Unbounded_Work_Queue *source_queue = &group->queue; u32 write = queue->write_position; u32 read = queue->read_position; diff --git a/platform_linux/linux_4ed.cpp b/platform_linux/linux_4ed.cpp index f2cd2e7f..09dc5989 100644 --- a/platform_linux/linux_4ed.cpp +++ b/platform_linux/linux_4ed.cpp @@ -190,11 +190,6 @@ struct Linux_Vars{ void *app_code; void *custom; - Thread_Memory *thread_memory; - Thread_Group groups[THREAD_GROUP_COUNT]; - Work_Queue queues[THREAD_GROUP_COUNT]; - Mutex locks[LOCK_COUNT]; - Condition_Variable conds[8]; sem_t thread_semaphore; i32 dpi_x, dpi_y; @@ -488,37 +483,27 @@ Sys_CLI_End_Update_Sig(system_cli_end_update){ } // -// Threads +// Multithreading // internal void -system_internal_acquire_lock(Mutex *m){ +system_acquire_lock(Mutex *m){ pthread_mutex_lock(m->crit); } internal void -system_internal_release_lock(Mutex *m){ +system_release_lock(Mutex *m){ pthread_mutex_unlock(m->crit); } -internal -Sys_Acquire_Lock_Sig(system_acquire_lock){ - system_internal_acquire_lock(&linuxvars.locks[id]); -} - -internal -Sys_Release_Lock_Sig(system_release_lock){ - system_internal_release_lock(&linuxvars.locks[id]); +internal void +system_wait_cv(Condition_Variable *cv, Mutex *m){ + pthread_cond_wait(cv->cv, m->crit); } internal void -system_wait_cv(i32 lock_id, i32 cv_id){ - pthread_cond_wait(linuxvars.conds + cv_id, linuxvars.locks + lock_id); -} - -internal void -system_signal_cv(i32 lock_id, i32 cv_id){ - pthread_cond_signal(linuxvars.conds + cv_id); +system_signal_cv(Condition_Variable *cv, Mutex *m){ + pthread_cond_signal(cv->cv); } // HACK(allen): Reduce this down to just one layer of call. @@ -542,53 +527,11 @@ system_release_semaphore(Plat_Handle handle){ internal void* JobThreadProc(void* lpParameter){ Thread_Context *thread = (Thread_Context*)lpParameter; - Work_Queue *queue = linuxvars.queues + thread->group_id; - Thread_Group *group = linuxvars.groups + thread->group_id; - i32 thread_index = thread->id - 1; - Thread_Memory *memory = linuxvars.thread_memory + thread_index; - job_proc(&linuxvars.system, thread, queue, group, memory); + job_thread_proc(&linuxvars.system, thread); InvalidCodePath; return(0); } -// Note(allen): post_job puts the job on the unbounded queue. -// The unbounded queue is entirely managed by the main thread. -// The thread safe queue is bounded in size so the unbounded -// queue is periodically flushed into the direct work queue. -internal -Sys_Post_Job_Sig(system_post_job){ - Thread_Group *group = linuxvars.groups + group_id; - Work_Queue *direct_queue = linuxvars.queues + group_id; - u32 result = post_job(group, direct_queue, job); - return(result); -} - -internal -Sys_Cancel_Job_Sig(system_cancel_job){ - Thread_Group *group = linuxvars.groups + group_id; - Work_Queue *queue = linuxvars.queues + group_id; - cancel_job(group, queue, job_id); -} - -internal -Sys_Check_Cancel_Sig(system_check_cancel){ - Thread_Group *group = linuxvars.groups + thread->group_id; - b32 result = check_cancel_status(group, thread); - return(result); -} - -internal -Sys_Grow_Thread_Memory_Sig(system_grow_thread_memory){ - grow_thread_memory(memory); -} - -internal -INTERNAL_Sys_Get_Thread_States_Sig(system_internal_get_thread_states){ - Thread_Group *group = linuxvars.groups + id; - Work_Queue *queue = linuxvars.queues + id; - dbg_get_thread_states(group, queue, running, pending); -} - // // Linux rendering/font system functions // @@ -629,13 +572,6 @@ LinuxLoadAppCode(String* base_dir){ return(result); } -#include "4ed_link_system_functions.cpp" - -internal void -LinuxLoadSystemCode(){ - link_system_code(&linuxvars.system); -} - internal void LinuxLoadRenderCode(){ linuxvars.target.push_clip = draw_push_clip; @@ -1994,6 +1930,8 @@ LinuxHandleX11Events(void) // Entry point // +#include "4ed_link_system_functions.cpp" + int main(int argc, char **argv){ // @@ -2008,7 +1946,7 @@ main(int argc, char **argv){ return 99; } - LinuxLoadSystemCode(); + link_system_code(&linuxvars.system); LinuxLoadRenderCode(); memory_vars.vars_memory_size = MB(2); @@ -2140,31 +2078,31 @@ main(int argc, char **argv){ } Thread_Context background[4] = {}; - linuxvars.groups[BACKGROUND_THREADS].threads = background; - linuxvars.groups[BACKGROUND_THREADS].count = ArrayCount(background); - linuxvars.groups[BACKGROUND_THREADS].cancel_lock0 = CANCEL_LOCK0; - linuxvars.groups[BACKGROUND_THREADS].cancel_cv0 = 0; + threadvars.groups[BACKGROUND_THREADS].threads = background; + threadvars.groups[BACKGROUND_THREADS].count = ArrayCount(background); + threadvars.groups[BACKGROUND_THREADS].cancel_lock0 = CANCEL_LOCK0; + threadvars.groups[BACKGROUND_THREADS].cancel_cv0 = 0; Thread_Memory thread_memory[ArrayCount(background)]; - linuxvars.thread_memory = thread_memory; + threadvars.thread_memory = thread_memory; sem_init(&linuxvars.thread_semaphore, 0, 0); - linuxvars.queues[BACKGROUND_THREADS].semaphore = LinuxSemToHandle(&linuxvars.thread_semaphore); + threadvars.queues[BACKGROUND_THREADS].semaphore = LinuxSemToHandle(&linuxvars.thread_semaphore); - for(i32 i = 0; i < linuxvars.groups[BACKGROUND_THREADS].count; ++i){ - Thread_Context *thread = linuxvars.groups[BACKGROUND_THREADS].threads + i; + for(i32 i = 0; i < threadvars.groups[BACKGROUND_THREADS].count; ++i){ + Thread_Context *thread = threadvars.groups[BACKGROUND_THREADS].threads + i; thread->id = i + 1; thread->group_id = BACKGROUND_THREADS; - Thread_Memory *memory = linuxvars.thread_memory + i; + Thread_Memory *memory = threadvars.thread_memory + i; *memory = null_thread_memory; memory->id = thread->id; - thread->queue = &linuxvars.queues[BACKGROUND_THREADS]; + thread->queue = &threadvars.queues[BACKGROUND_THREADS]; pthread_create(&thread->handle, NULL, &JobThreadProc, thread); } - initialize_unbounded_queue(&linuxvars.groups[BACKGROUND_THREADS].queue); + initialize_unbounded_queue(&threadvars.groups[BACKGROUND_THREADS].queue); for(i32 i = 0; i < LOCK_COUNT; ++i){ pthread_mutex_init(linuxvars.locks + i, NULL); diff --git a/platform_win32/win32_4ed.cpp b/platform_win32/win32_4ed.cpp index b83e58d1..4bc349c7 100644 --- a/platform_win32/win32_4ed.cpp +++ b/platform_win32/win32_4ed.cpp @@ -173,12 +173,6 @@ struct Win32_Vars{ HMODULE custom; Plat_Settings settings; - Thread_Memory *thread_memory; - Work_Queue queues[THREAD_GROUP_COUNT]; - Thread_Group groups[THREAD_GROUP_COUNT]; - Mutex locks[LOCK_COUNT]; - Condition_Variable condition_vars[CV_COUNT]; - Win32_Coroutine coroutine_data[18]; Win32_Coroutine *coroutine_free; @@ -344,33 +338,23 @@ Sys_Memory_Free_Sig(system_memory_free){ // internal void -system_internal_acquire_lock(Mutex *m){ +system_acquire_lock(Mutex *m){ EnterCriticalSection(&m->crit); } internal void -system_internal_release_lock(Mutex *m){ +system_release_lock(Mutex *m){ LeaveCriticalSection(&m->crit); } -internal -Sys_Acquire_Lock_Sig(system_acquire_lock){ - system_internal_acquire_lock(&win32vars.locks[id]); -} - -internal -Sys_Release_Lock_Sig(system_release_lock){ - system_internal_release_lock(&win32vars.locks[id]); +internal void +system_wait_cv(Condition_Variable *cv, Mutex *lock){ + SleepConditionVariableCS(&cv->cv, &lock->crit, INFINITE); } internal void -system_wait_cv(i32 crit_id, i32 cv_id){ - SleepConditionVariableCS(&win32vars.condition_vars[cv_id].cv, &win32vars.locks[crit_id].crit, INFINITE); -} - -internal void -system_signal_cv(i32 crit_id, i32 cv_id){ - WakeConditionVariable(&win32vars.condition_vars[cv_id].cv); +system_signal_cv(Condition_Variable *cv, Mutex *lock){ + WakeConditionVariable(&cv->cv); } internal void @@ -393,57 +377,11 @@ system_release_semaphore(Plat_Handle handle){ internal DWORD CALL_CONVENTION JobThreadProc(LPVOID lpParameter){ Thread_Context *thread = (Thread_Context*)lpParameter; - Work_Queue *queue = win32vars.queues + thread->group_id; - Thread_Group *group = win32vars.groups + thread->group_id; - i32 thread_index = thread->id - 1; - Thread_Memory *memory = win32vars.thread_memory + thread_index; - job_proc(&win32vars.system, thread, queue, group, memory); + job_thread_proc(&win32vars.system, thread); InvalidCodePath; return(0); } -internal void -flush_thread_group(i32 group_id){ - Thread_Group *group = win32vars.groups + group_id; - Work_Queue *queue = win32vars.queues + group_id; - Unbounded_Work_Queue *source_queue = &group->queue; - flush_to_direct_queue(source_queue, queue, group->count); -} - -internal -Sys_Post_Job_Sig(system_post_job){ - Thread_Group *group = win32vars.groups + group_id; - Work_Queue *direct_queue = win32vars.queues + group_id; - u32 result = post_job(group, direct_queue, job); - return(result); -} - -internal -Sys_Cancel_Job_Sig(system_cancel_job){ - Thread_Group *group = win32vars.groups + group_id; - Work_Queue *queue = win32vars.queues + group_id; - cancel_job(group, queue, job_id); -} - -internal -Sys_Check_Cancel_Sig(system_check_cancel){ - Thread_Group *group = win32vars.groups + thread->group_id; - b32 result = check_cancel_status(group, thread); - return(result); -} - -internal -Sys_Grow_Thread_Memory_Sig(system_grow_thread_memory){ - grow_thread_memory(memory); -} - -internal -INTERNAL_Sys_Get_Thread_States_Sig(system_internal_get_thread_states){ - Thread_Group *group = win32vars.groups + id; - Work_Queue *queue = win32vars.queues + id; - dbg_get_thread_states(group, queue, running, pending); -} - // // Coroutines // @@ -1220,13 +1158,6 @@ Win32LoadAppCode(){ #include "4ed_font_data.h" #include "4ed_system_shared.cpp" -#include "4ed_link_system_functions.cpp" - -internal void -Win32LoadSystemCode(){ - link_system_code(&win32vars.system); -} - internal void Win32LoadRenderCode(){ win32vars.target.push_clip = draw_push_clip; @@ -1713,6 +1644,8 @@ Win32Callback(HWND hwnd, UINT uMsg, WPARAM wParam, LPARAM lParam){ return(result); } +#include "4ed_link_system_functions.cpp" + int CALL_CONVENTION WinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, LPSTR lpCmdLine, int nCmdShow){ i32 argc = __argc; @@ -1727,41 +1660,41 @@ WinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, LPSTR lpCmdLine, int nCmdS // for (i32 i = 0; i < LOCK_COUNT; ++i){ - InitializeCriticalSection(&win32vars.locks[i].crit); + InitializeCriticalSection(&threadvars.locks[i].crit); } for (i32 i = 0; i < CV_COUNT; ++i){ - InitializeConditionVariable(&win32vars.condition_vars[i].cv); + InitializeConditionVariable(&threadvars.conds[i].cv); } Thread_Context background[4]; memset(background, 0, sizeof(background)); - win32vars.groups[BACKGROUND_THREADS].threads = background; - win32vars.groups[BACKGROUND_THREADS].count = ArrayCount(background); - win32vars.groups[BACKGROUND_THREADS].cancel_lock0 = CANCEL_LOCK0; - win32vars.groups[BACKGROUND_THREADS].cancel_cv0 = CANCEL_CV0; + threadvars.groups[BACKGROUND_THREADS].threads = background; + threadvars.groups[BACKGROUND_THREADS].count = ArrayCount(background); + threadvars.groups[BACKGROUND_THREADS].cancel_lock0 = CANCEL_LOCK0; + threadvars.groups[BACKGROUND_THREADS].cancel_cv0 = CANCEL_CV0; Thread_Memory thread_memory[ArrayCount(background)]; - win32vars.thread_memory = thread_memory; + threadvars.thread_memory = thread_memory; - win32vars.queues[BACKGROUND_THREADS].semaphore =Win32Handle(CreateSemaphore(0, 0, win32vars.groups[BACKGROUND_THREADS].count, 0)); + threadvars.queues[BACKGROUND_THREADS].semaphore =Win32Handle(CreateSemaphore(0, 0, threadvars.groups[BACKGROUND_THREADS].count, 0)); u32 creation_flag = 0; - for (i32 i = 0; i < win32vars.groups[BACKGROUND_THREADS].count; ++i){ - Thread_Context *thread = win32vars.groups[BACKGROUND_THREADS].threads + i; + for (i32 i = 0; i < threadvars.groups[BACKGROUND_THREADS].count; ++i){ + Thread_Context *thread = threadvars.groups[BACKGROUND_THREADS].threads + i; thread->id = i + 1; thread->group_id = BACKGROUND_THREADS; - Thread_Memory *memory = win32vars.thread_memory + i; + Thread_Memory *memory = threadvars.thread_memory + i; *memory = null_thread_memory; memory->id = thread->id; - thread->queue = &win32vars.queues[BACKGROUND_THREADS]; + thread->queue = &threadvars.queues[BACKGROUND_THREADS]; thread->handle = CreateThread(0, 0, JobThreadProc, thread, creation_flag, (LPDWORD)&thread->windows_id); } - initialize_unbounded_queue(&win32vars.groups[BACKGROUND_THREADS].queue); + initialize_unbounded_queue(&threadvars.groups[BACKGROUND_THREADS].queue); ConvertThreadToFiber(0); win32vars.coroutine_free = win32vars.coroutine_data; @@ -1821,7 +1754,7 @@ WinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, LPSTR lpCmdLine, int nCmdS } - Win32LoadSystemCode(); + link_system_code(&win32vars.system); Win32LoadRenderCode(); System_Functions *system = &win32vars.system;