zipped all of the threading code together

master
Allen Webster 2017-07-18 13:19:59 -04:00
parent 122cecb6ff
commit bfc6fc689d
3 changed files with 117 additions and 200 deletions

View File

@ -9,11 +9,37 @@
// TOP
struct Threading_Vars{
Thread_Memory *thread_memory;
Work_Queue queues[THREAD_GROUP_COUNT];
Thread_Group groups[THREAD_GROUP_COUNT];
Mutex locks[LOCK_COUNT];
Condition_Variable conds[CV_COUNT];
};
global Threading_Vars threadvars;
internal
Sys_Acquire_Lock_Sig(system_acquire_lock){
system_acquire_lock(&threadvars.locks[id]);
}
internal
Sys_Release_Lock_Sig(system_release_lock){
system_release_lock(&threadvars.locks[id]);
}
internal void
job_proc(System_Functions *system, Thread_Context *thread, Work_Queue *queue, Thread_Group *group, Thread_Memory *thread_memory){
job_thread_proc(System_Functions *system, Thread_Context *thread){
Work_Queue *queue = threadvars.queues + thread->group_id;
Thread_Group *group = threadvars.groups + thread->group_id;
i32 thread_index = thread->id - 1;
i32 cancel_lock = group->cancel_lock0 + thread_index;
i32 cancel_cv = group->cancel_cv0 + thread_index;
Thread_Memory *thread_memory = threadvars.thread_memory + thread_index;
i32 cancel_lock_id = group->cancel_lock0 + thread_index;
i32 cancel_cv_id = group->cancel_cv0 + thread_index;
Mutex *cancel_lock = &threadvars.locks[cancel_lock_id];
Condition_Variable *cancel_cv = &threadvars.conds[cancel_cv_id];
if (thread_memory->size == 0){
i32 new_size = KB(64);
@ -47,7 +73,7 @@ job_proc(System_Functions *system, Thread_Context *thread, Work_Queue *queue, Th
system_acquire_lock(cancel_lock);
if (thread->cancel){
thread->cancel = 0;
system_signal_cv(cancel_lock, cancel_cv);
system_signal_cv(cancel_cv, cancel_lock);
}
system_release_lock(cancel_lock);
}
@ -92,7 +118,12 @@ get_work_queue_available_space(i32 write, i32 read){
#define UNBOUNDED_SKIP_MAX 128
internal i32
flush_to_direct_queue(Unbounded_Work_Queue *source_queue, Work_Queue *queue, i32 thread_count){
flush_thread_group(Thread_Group_ID group_id){
Thread_Group *group = threadvars.groups + group_id;
Work_Queue *queue = threadvars.queues + group_id;
Unbounded_Work_Queue *source_queue = &group->queue;
i32 thread_count = group->count;
// NOTE(allen): It is understood that read_position may be changed by other
// threads but it will only make more space in the queue if it is changed.
// Meanwhile write_position should not ever be changed by anything but the
@ -155,8 +186,9 @@ flush_to_direct_queue(Unbounded_Work_Queue *source_queue, Work_Queue *queue, i32
// The unbounded queue is entirely managed by the main thread.
// The thread safe queue is bounded in size so the unbounded
// queue is periodically flushed into the direct work queue.
internal u32
post_job(Thread_Group *group, Work_Queue *direct_queue, Job_Data job){
internal
Sys_Post_Job_Sig(system_post_job){
Thread_Group *group = threadvars.groups + group_id;
Unbounded_Work_Queue *queue = &group->queue;
u32 result = queue->next_job_id++;
@ -176,13 +208,16 @@ post_job(Thread_Group *group, Work_Queue *direct_queue, Job_Data job){
full_job.id = result;
queue->jobs[queue->count++] = full_job;
flush_to_direct_queue(queue, direct_queue, group->count);
flush_thread_group(group_id);
return(result);
}
internal void
cancel_job(Thread_Group *group, Work_Queue *queue, u32 job_id){
internal
Sys_Cancel_Job_Sig(system_cancel_job){
Thread_Group *group = threadvars.groups + group_id;
Work_Queue *queue = threadvars.queues + group_id;
Unbounded_Work_Queue *source_queue = &group->queue;
b32 handled_in_unbounded = false;
@ -205,29 +240,36 @@ cancel_job(Thread_Group *group, Work_Queue *queue, u32 job_id){
if (thread_id != THREAD_NOT_ASSIGNED && thread_id != 0){
i32 thread_index = thread_id - 1;
i32 cancel_lock = group->cancel_lock0 + thread_index;
i32 cancel_cv = group->cancel_cv0 + thread_index;
i32 cancel_lock_id = group->cancel_lock0 + thread_index;
i32 cancel_cv_id = group->cancel_cv0 + thread_index;
Mutex *cancel_lock = &threadvars.locks[cancel_lock_id];
Condition_Variable *cancel_cv = &threadvars.conds[cancel_cv_id];
Thread_Context *thread = group->threads + thread_index;
system_acquire_lock(cancel_lock);
thread->cancel = true;
system_release_lock(FRAME_LOCK);
system_release_lock(&threadvars.locks[FRAME_LOCK]);
do{
system_wait_cv(cancel_lock, cancel_cv);
system_wait_cv(cancel_cv, cancel_lock);
}while (thread->cancel);
system_acquire_lock(FRAME_LOCK);
system_acquire_lock(&threadvars.locks[FRAME_LOCK]);
system_release_lock(cancel_lock);
}
}
}
internal b32
check_cancel_status(Thread_Group *group, Thread_Context *thread){
internal
Sys_Check_Cancel_Sig(system_check_cancel){
Thread_Group *group = threadvars.groups + thread->group_id;
b32 result = false;
i32 thread_index = thread->id - 1;
i32 cancel_lock = group->cancel_lock0 + thread_index;
i32 cancel_lock_id = group->cancel_lock0 + thread_index;
Mutex *cancel_lock = &threadvars.locks[cancel_lock_id];
system_acquire_lock(cancel_lock);
if (thread->cancel){
result = true;
@ -236,9 +278,11 @@ check_cancel_status(Thread_Group *group, Thread_Context *thread){
return(result);
}
internal void
grow_thread_memory(Thread_Memory *memory){
system_acquire_lock(CANCEL_LOCK0 + memory->id - 1);
internal
Sys_Grow_Thread_Memory_Sig(system_grow_thread_memory){
Mutex *cancel_lock = &threadvars.locks[CANCEL_LOCK0 + memory->id - 1];
system_acquire_lock(cancel_lock);
void *old_data = memory->data;
i32 old_size = memory->size;
i32 new_size = l_round_up_i32(memory->size*2, KB(4));
@ -248,11 +292,13 @@ grow_thread_memory(Thread_Memory *memory){
memcpy(memory->data, old_data, old_size);
system_memory_free(old_data, old_size);
}
system_release_lock(CANCEL_LOCK0 + memory->id - 1);
system_release_lock(cancel_lock);
}
internal void
dbg_get_thread_states(Thread_Group *group, Work_Queue *queue, b8 *running, i32 *pending){
internal
INTERNAL_Sys_Get_Thread_States_Sig(system_internal_get_thread_states){
Thread_Group *group = threadvars.groups + id;
Work_Queue *queue = threadvars.queues + id;
Unbounded_Work_Queue *source_queue = &group->queue;
u32 write = queue->write_position;
u32 read = queue->read_position;

View File

@ -190,11 +190,6 @@ struct Linux_Vars{
void *app_code;
void *custom;
Thread_Memory *thread_memory;
Thread_Group groups[THREAD_GROUP_COUNT];
Work_Queue queues[THREAD_GROUP_COUNT];
Mutex locks[LOCK_COUNT];
Condition_Variable conds[8];
sem_t thread_semaphore;
i32 dpi_x, dpi_y;
@ -488,37 +483,27 @@ Sys_CLI_End_Update_Sig(system_cli_end_update){
}
//
// Threads
// Multithreading
//
internal void
system_internal_acquire_lock(Mutex *m){
system_acquire_lock(Mutex *m){
pthread_mutex_lock(m->crit);
}
internal void
system_internal_release_lock(Mutex *m){
system_release_lock(Mutex *m){
pthread_mutex_unlock(m->crit);
}
internal
Sys_Acquire_Lock_Sig(system_acquire_lock){
system_internal_acquire_lock(&linuxvars.locks[id]);
}
internal
Sys_Release_Lock_Sig(system_release_lock){
system_internal_release_lock(&linuxvars.locks[id]);
internal void
system_wait_cv(Condition_Variable *cv, Mutex *m){
pthread_cond_wait(cv->cv, m->crit);
}
internal void
system_wait_cv(i32 lock_id, i32 cv_id){
pthread_cond_wait(linuxvars.conds + cv_id, linuxvars.locks + lock_id);
}
internal void
system_signal_cv(i32 lock_id, i32 cv_id){
pthread_cond_signal(linuxvars.conds + cv_id);
system_signal_cv(Condition_Variable *cv, Mutex *m){
pthread_cond_signal(cv->cv);
}
// HACK(allen): Reduce this down to just one layer of call.
@ -542,53 +527,11 @@ system_release_semaphore(Plat_Handle handle){
internal void*
JobThreadProc(void* lpParameter){
Thread_Context *thread = (Thread_Context*)lpParameter;
Work_Queue *queue = linuxvars.queues + thread->group_id;
Thread_Group *group = linuxvars.groups + thread->group_id;
i32 thread_index = thread->id - 1;
Thread_Memory *memory = linuxvars.thread_memory + thread_index;
job_proc(&linuxvars.system, thread, queue, group, memory);
job_thread_proc(&linuxvars.system, thread);
InvalidCodePath;
return(0);
}
// Note(allen): post_job puts the job on the unbounded queue.
// The unbounded queue is entirely managed by the main thread.
// The thread safe queue is bounded in size so the unbounded
// queue is periodically flushed into the direct work queue.
internal
Sys_Post_Job_Sig(system_post_job){
Thread_Group *group = linuxvars.groups + group_id;
Work_Queue *direct_queue = linuxvars.queues + group_id;
u32 result = post_job(group, direct_queue, job);
return(result);
}
internal
Sys_Cancel_Job_Sig(system_cancel_job){
Thread_Group *group = linuxvars.groups + group_id;
Work_Queue *queue = linuxvars.queues + group_id;
cancel_job(group, queue, job_id);
}
internal
Sys_Check_Cancel_Sig(system_check_cancel){
Thread_Group *group = linuxvars.groups + thread->group_id;
b32 result = check_cancel_status(group, thread);
return(result);
}
internal
Sys_Grow_Thread_Memory_Sig(system_grow_thread_memory){
grow_thread_memory(memory);
}
internal
INTERNAL_Sys_Get_Thread_States_Sig(system_internal_get_thread_states){
Thread_Group *group = linuxvars.groups + id;
Work_Queue *queue = linuxvars.queues + id;
dbg_get_thread_states(group, queue, running, pending);
}
//
// Linux rendering/font system functions
//
@ -629,13 +572,6 @@ LinuxLoadAppCode(String* base_dir){
return(result);
}
#include "4ed_link_system_functions.cpp"
internal void
LinuxLoadSystemCode(){
link_system_code(&linuxvars.system);
}
internal void
LinuxLoadRenderCode(){
linuxvars.target.push_clip = draw_push_clip;
@ -1994,6 +1930,8 @@ LinuxHandleX11Events(void)
// Entry point
//
#include "4ed_link_system_functions.cpp"
int
main(int argc, char **argv){
//
@ -2008,7 +1946,7 @@ main(int argc, char **argv){
return 99;
}
LinuxLoadSystemCode();
link_system_code(&linuxvars.system);
LinuxLoadRenderCode();
memory_vars.vars_memory_size = MB(2);
@ -2140,31 +2078,31 @@ main(int argc, char **argv){
}
Thread_Context background[4] = {};
linuxvars.groups[BACKGROUND_THREADS].threads = background;
linuxvars.groups[BACKGROUND_THREADS].count = ArrayCount(background);
linuxvars.groups[BACKGROUND_THREADS].cancel_lock0 = CANCEL_LOCK0;
linuxvars.groups[BACKGROUND_THREADS].cancel_cv0 = 0;
threadvars.groups[BACKGROUND_THREADS].threads = background;
threadvars.groups[BACKGROUND_THREADS].count = ArrayCount(background);
threadvars.groups[BACKGROUND_THREADS].cancel_lock0 = CANCEL_LOCK0;
threadvars.groups[BACKGROUND_THREADS].cancel_cv0 = 0;
Thread_Memory thread_memory[ArrayCount(background)];
linuxvars.thread_memory = thread_memory;
threadvars.thread_memory = thread_memory;
sem_init(&linuxvars.thread_semaphore, 0, 0);
linuxvars.queues[BACKGROUND_THREADS].semaphore = LinuxSemToHandle(&linuxvars.thread_semaphore);
threadvars.queues[BACKGROUND_THREADS].semaphore = LinuxSemToHandle(&linuxvars.thread_semaphore);
for(i32 i = 0; i < linuxvars.groups[BACKGROUND_THREADS].count; ++i){
Thread_Context *thread = linuxvars.groups[BACKGROUND_THREADS].threads + i;
for(i32 i = 0; i < threadvars.groups[BACKGROUND_THREADS].count; ++i){
Thread_Context *thread = threadvars.groups[BACKGROUND_THREADS].threads + i;
thread->id = i + 1;
thread->group_id = BACKGROUND_THREADS;
Thread_Memory *memory = linuxvars.thread_memory + i;
Thread_Memory *memory = threadvars.thread_memory + i;
*memory = null_thread_memory;
memory->id = thread->id;
thread->queue = &linuxvars.queues[BACKGROUND_THREADS];
thread->queue = &threadvars.queues[BACKGROUND_THREADS];
pthread_create(&thread->handle, NULL, &JobThreadProc, thread);
}
initialize_unbounded_queue(&linuxvars.groups[BACKGROUND_THREADS].queue);
initialize_unbounded_queue(&threadvars.groups[BACKGROUND_THREADS].queue);
for(i32 i = 0; i < LOCK_COUNT; ++i){
pthread_mutex_init(linuxvars.locks + i, NULL);

View File

@ -173,12 +173,6 @@ struct Win32_Vars{
HMODULE custom;
Plat_Settings settings;
Thread_Memory *thread_memory;
Work_Queue queues[THREAD_GROUP_COUNT];
Thread_Group groups[THREAD_GROUP_COUNT];
Mutex locks[LOCK_COUNT];
Condition_Variable condition_vars[CV_COUNT];
Win32_Coroutine coroutine_data[18];
Win32_Coroutine *coroutine_free;
@ -344,33 +338,23 @@ Sys_Memory_Free_Sig(system_memory_free){
//
internal void
system_internal_acquire_lock(Mutex *m){
system_acquire_lock(Mutex *m){
EnterCriticalSection(&m->crit);
}
internal void
system_internal_release_lock(Mutex *m){
system_release_lock(Mutex *m){
LeaveCriticalSection(&m->crit);
}
internal
Sys_Acquire_Lock_Sig(system_acquire_lock){
system_internal_acquire_lock(&win32vars.locks[id]);
}
internal
Sys_Release_Lock_Sig(system_release_lock){
system_internal_release_lock(&win32vars.locks[id]);
internal void
system_wait_cv(Condition_Variable *cv, Mutex *lock){
SleepConditionVariableCS(&cv->cv, &lock->crit, INFINITE);
}
internal void
system_wait_cv(i32 crit_id, i32 cv_id){
SleepConditionVariableCS(&win32vars.condition_vars[cv_id].cv, &win32vars.locks[crit_id].crit, INFINITE);
}
internal void
system_signal_cv(i32 crit_id, i32 cv_id){
WakeConditionVariable(&win32vars.condition_vars[cv_id].cv);
system_signal_cv(Condition_Variable *cv, Mutex *lock){
WakeConditionVariable(&cv->cv);
}
internal void
@ -393,57 +377,11 @@ system_release_semaphore(Plat_Handle handle){
internal DWORD CALL_CONVENTION
JobThreadProc(LPVOID lpParameter){
Thread_Context *thread = (Thread_Context*)lpParameter;
Work_Queue *queue = win32vars.queues + thread->group_id;
Thread_Group *group = win32vars.groups + thread->group_id;
i32 thread_index = thread->id - 1;
Thread_Memory *memory = win32vars.thread_memory + thread_index;
job_proc(&win32vars.system, thread, queue, group, memory);
job_thread_proc(&win32vars.system, thread);
InvalidCodePath;
return(0);
}
internal void
flush_thread_group(i32 group_id){
Thread_Group *group = win32vars.groups + group_id;
Work_Queue *queue = win32vars.queues + group_id;
Unbounded_Work_Queue *source_queue = &group->queue;
flush_to_direct_queue(source_queue, queue, group->count);
}
internal
Sys_Post_Job_Sig(system_post_job){
Thread_Group *group = win32vars.groups + group_id;
Work_Queue *direct_queue = win32vars.queues + group_id;
u32 result = post_job(group, direct_queue, job);
return(result);
}
internal
Sys_Cancel_Job_Sig(system_cancel_job){
Thread_Group *group = win32vars.groups + group_id;
Work_Queue *queue = win32vars.queues + group_id;
cancel_job(group, queue, job_id);
}
internal
Sys_Check_Cancel_Sig(system_check_cancel){
Thread_Group *group = win32vars.groups + thread->group_id;
b32 result = check_cancel_status(group, thread);
return(result);
}
internal
Sys_Grow_Thread_Memory_Sig(system_grow_thread_memory){
grow_thread_memory(memory);
}
internal
INTERNAL_Sys_Get_Thread_States_Sig(system_internal_get_thread_states){
Thread_Group *group = win32vars.groups + id;
Work_Queue *queue = win32vars.queues + id;
dbg_get_thread_states(group, queue, running, pending);
}
//
// Coroutines
//
@ -1220,13 +1158,6 @@ Win32LoadAppCode(){
#include "4ed_font_data.h"
#include "4ed_system_shared.cpp"
#include "4ed_link_system_functions.cpp"
internal void
Win32LoadSystemCode(){
link_system_code(&win32vars.system);
}
internal void
Win32LoadRenderCode(){
win32vars.target.push_clip = draw_push_clip;
@ -1713,6 +1644,8 @@ Win32Callback(HWND hwnd, UINT uMsg, WPARAM wParam, LPARAM lParam){
return(result);
}
#include "4ed_link_system_functions.cpp"
int CALL_CONVENTION
WinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, LPSTR lpCmdLine, int nCmdShow){
i32 argc = __argc;
@ -1727,41 +1660,41 @@ WinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, LPSTR lpCmdLine, int nCmdS
//
for (i32 i = 0; i < LOCK_COUNT; ++i){
InitializeCriticalSection(&win32vars.locks[i].crit);
InitializeCriticalSection(&threadvars.locks[i].crit);
}
for (i32 i = 0; i < CV_COUNT; ++i){
InitializeConditionVariable(&win32vars.condition_vars[i].cv);
InitializeConditionVariable(&threadvars.conds[i].cv);
}
Thread_Context background[4];
memset(background, 0, sizeof(background));
win32vars.groups[BACKGROUND_THREADS].threads = background;
win32vars.groups[BACKGROUND_THREADS].count = ArrayCount(background);
win32vars.groups[BACKGROUND_THREADS].cancel_lock0 = CANCEL_LOCK0;
win32vars.groups[BACKGROUND_THREADS].cancel_cv0 = CANCEL_CV0;
threadvars.groups[BACKGROUND_THREADS].threads = background;
threadvars.groups[BACKGROUND_THREADS].count = ArrayCount(background);
threadvars.groups[BACKGROUND_THREADS].cancel_lock0 = CANCEL_LOCK0;
threadvars.groups[BACKGROUND_THREADS].cancel_cv0 = CANCEL_CV0;
Thread_Memory thread_memory[ArrayCount(background)];
win32vars.thread_memory = thread_memory;
threadvars.thread_memory = thread_memory;
win32vars.queues[BACKGROUND_THREADS].semaphore =Win32Handle(CreateSemaphore(0, 0, win32vars.groups[BACKGROUND_THREADS].count, 0));
threadvars.queues[BACKGROUND_THREADS].semaphore =Win32Handle(CreateSemaphore(0, 0, threadvars.groups[BACKGROUND_THREADS].count, 0));
u32 creation_flag = 0;
for (i32 i = 0; i < win32vars.groups[BACKGROUND_THREADS].count; ++i){
Thread_Context *thread = win32vars.groups[BACKGROUND_THREADS].threads + i;
for (i32 i = 0; i < threadvars.groups[BACKGROUND_THREADS].count; ++i){
Thread_Context *thread = threadvars.groups[BACKGROUND_THREADS].threads + i;
thread->id = i + 1;
thread->group_id = BACKGROUND_THREADS;
Thread_Memory *memory = win32vars.thread_memory + i;
Thread_Memory *memory = threadvars.thread_memory + i;
*memory = null_thread_memory;
memory->id = thread->id;
thread->queue = &win32vars.queues[BACKGROUND_THREADS];
thread->queue = &threadvars.queues[BACKGROUND_THREADS];
thread->handle = CreateThread(0, 0, JobThreadProc, thread, creation_flag, (LPDWORD)&thread->windows_id);
}
initialize_unbounded_queue(&win32vars.groups[BACKGROUND_THREADS].queue);
initialize_unbounded_queue(&threadvars.groups[BACKGROUND_THREADS].queue);
ConvertThreadToFiber(0);
win32vars.coroutine_free = win32vars.coroutine_data;
@ -1821,7 +1754,7 @@ WinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, LPSTR lpCmdLine, int nCmdS
}
Win32LoadSystemCode();
link_system_code(&win32vars.system);
Win32LoadRenderCode();
System_Functions *system = &win32vars.system;