Revert "throw away threading stuff"

This reverts commit 768c736a72.
master
Allen Webster 2017-06-30 22:19:51 -04:00
parent 768c736a72
commit 3d0693c6b6
2 changed files with 358 additions and 366 deletions

View File

@ -53,6 +53,7 @@
#include <locale.h> #include <locale.h>
#include <dlfcn.h> #include <dlfcn.h>
#include <xmmintrin.h>
#include <ucontext.h> #include <ucontext.h>
#include <sys/types.h> #include <sys/types.h>
@ -93,6 +94,9 @@
#define LINUX_FN_DEBUG(fmt, ...) #define LINUX_FN_DEBUG(fmt, ...)
#endif #endif
#define InterlockedCompareExchange(dest, ex, comp) \
__sync_val_compare_and_swap((dest), (comp), (ex))
// //
// Linux structs / enums // Linux structs / enums
// //
@ -113,6 +117,27 @@ struct Linux_Coroutine {
b32 done; b32 done;
}; };
struct Thread_Context{
u32 job_id;
b32 running;
b32 cancel;
Work_Queue *queue;
u32 id;
u32 group_id;
pthread_t handle;
};
struct Thread_Group{
Thread_Context *threads;
i32 count;
Unbounded_Work_Queue queue;
i32 cancel_lock0;
i32 cancel_cv0;
};
struct Linux_Vars{ struct Linux_Vars{
Display *XDisplay; Display *XDisplay;
Window XWindow; Window XWindow;
@ -162,6 +187,13 @@ struct Linux_Vars{
void *app_code; void *app_code;
void *custom; void *custom;
Thread_Memory *thread_memory;
Thread_Group groups[THREAD_GROUP_COUNT];
Work_Queue queues[THREAD_GROUP_COUNT];
pthread_mutex_t locks[LOCK_COUNT];
pthread_cond_t conds[8];
sem_t thread_semaphore;
i32 dpi_x, dpi_y; i32 dpi_x, dpi_y;
Plat_Settings settings; Plat_Settings settings;
@ -454,6 +486,317 @@ Sys_CLI_End_Update_Sig(system_cli_end_update){
return close_me; return close_me;
} }
//
// Threads
//
internal
Sys_Acquire_Lock_Sig(system_acquire_lock){
pthread_mutex_lock(linuxvars.locks + id);
}
internal
Sys_Release_Lock_Sig(system_release_lock){
pthread_mutex_unlock(linuxvars.locks + id);
}
internal void
system_wait_cv(i32 lock_id, i32 cv_id){
pthread_cond_wait(linuxvars.conds + cv_id, linuxvars.locks + lock_id);
}
internal void
system_signal_cv(i32 lock_id, i32 cv_id){
pthread_cond_signal(linuxvars.conds + cv_id);
}
internal void*
JobThreadProc(void* lpParameter){
Thread_Context *thread = (Thread_Context*)lpParameter;
Work_Queue *queue = linuxvars.queues + thread->group_id;
Thread_Group *group = linuxvars.groups + thread->group_id;
i32 thread_index = thread->id - 1;
i32 cancel_lock = group->cancel_lock0 + thread_index;
i32 cancel_cv = group->cancel_cv0 + thread_index;
Thread_Memory *thread_memory = linuxvars.thread_memory + thread_index;
if (thread_memory->size == 0){
i32 new_size = KB(64);
thread_memory->data = system_memory_allocate(new_size);
thread_memory->size = new_size;
}
for (;;){
u32 read_index = queue->read_position;
u32 write_index = queue->write_position;
if (read_index != write_index){
// NOTE(allen): Previously I was wrapping by the job wrap then
// wrapping by the queue wrap. That was super stupid what was that?
// Now it just wraps by the queue wrap.
u32 next_read_index = (read_index + 1) % QUEUE_WRAP;
u32 safe_read_index =
InterlockedCompareExchange(&queue->read_position,
next_read_index, read_index);
if (safe_read_index == read_index){
Full_Job_Data *full_job = queue->jobs + safe_read_index;
// NOTE(allen): This is interlocked so that it plays nice
// with the cancel job routine, which may try to cancel this job
// at the same time that we try to run it
i32 safe_running_thread =
InterlockedCompareExchange(&full_job->running_thread,
thread->id, THREAD_NOT_ASSIGNED);
if (safe_running_thread == THREAD_NOT_ASSIGNED){
thread->job_id = full_job->id;
thread->running = 1;
full_job->job.callback(&linuxvars.system,
thread, thread_memory, full_job->job.data);
LinuxScheduleStep();
//full_job->running_thread = 0;
thread->running = 0;
system_acquire_lock(cancel_lock);
if (thread->cancel){
thread->cancel = 0;
system_signal_cv(cancel_lock, cancel_cv);
}
system_release_lock(cancel_lock);
}
}
}
else{
sem_wait(LinuxHandleToSem(queue->semaphore));
}
}
}
internal void
initialize_unbounded_queue(Unbounded_Work_Queue *source_queue){
i32 max = 512;
source_queue->jobs = (Full_Job_Data*)system_memory_allocate(max*sizeof(Full_Job_Data));
source_queue->count = 0;
source_queue->max = max;
source_queue->skip = 0;
}
inline i32
get_work_queue_available_space(i32 write, i32 read){
// NOTE(allen): The only time that queue->write_position == queue->read_position
// is allowed is when the queue is empty. Thus if
// queue->write_position+1 == queue->read_position the available space is zero.
// So these computations both end up leaving one slot unused. The only way I can
// think to easily eliminate this is to have read and write wrap at twice the size
// of the underlying array but modulo their values into the array then if write
// has caught up with read it still will not be equal... but lots of modulos... ehh.
i32 available_space = 0;
if (write >= read){
available_space = QUEUE_WRAP - (write - read) - 1;
}
else{
available_space = (read - write) - 1;
}
return(available_space);
}
#define UNBOUNDED_SKIP_MAX 128
internal void
flush_to_direct_queue(Unbounded_Work_Queue *source_queue, Work_Queue *queue, i32 thread_count){
// NOTE(allen): It is understood that read_position may be changed by other
// threads but it will only make more space in the queue if it is changed.
// Meanwhile write_position should not ever be changed by anything but the
// main thread in this system, so it will not be interlocked.
u32 read_position = queue->read_position;
u32 write_position = queue->write_position;
u32 available_space = get_work_queue_available_space(write_position, read_position);
u32 available_jobs = source_queue->count - source_queue->skip;
u32 writable_count = Min(available_space, available_jobs);
if (writable_count > 0){
u32 count1 = writable_count;
if (count1+write_position > QUEUE_WRAP){
count1 = QUEUE_WRAP - write_position;
}
u32 count2 = writable_count - count1;
Full_Job_Data *job_src1 = source_queue->jobs + source_queue->skip;
Full_Job_Data *job_src2 = job_src1 + count1;
Full_Job_Data *job_dst1 = queue->jobs + write_position;
Full_Job_Data *job_dst2 = queue->jobs;
Assert((job_src1->id % QUEUE_WRAP) == write_position);
memcpy(job_dst1, job_src1, sizeof(Full_Job_Data)*count1);
memcpy(job_dst2, job_src2, sizeof(Full_Job_Data)*count2);
queue->write_position = (write_position + writable_count) % QUEUE_WRAP;
source_queue->skip += writable_count;
if (source_queue->skip == source_queue->count){
source_queue->skip = source_queue->count = 0;
}
else if (source_queue->skip > UNBOUNDED_SKIP_MAX){
u32 left_over = source_queue->count - source_queue->skip;
memmove(source_queue->jobs, source_queue->jobs + source_queue->skip,
sizeof(Full_Job_Data)*left_over);
source_queue->count = left_over;
source_queue->skip = 0;
}
}
i32 semaphore_release_count = writable_count;
if (semaphore_release_count > thread_count){
semaphore_release_count = thread_count;
}
// NOTE(allen): platform dependent portion...
for (i32 i = 0; i < semaphore_release_count; ++i){
sem_post(LinuxHandleToSem(queue->semaphore));
}
}
internal void
flush_thread_group(i32 group_id){
Thread_Group *group = linuxvars.groups + group_id;
Work_Queue *queue = linuxvars.queues + group_id;
Unbounded_Work_Queue *source_queue = &group->queue;
flush_to_direct_queue(source_queue, queue, group->count);
}
// Note(allen): post_job puts the job on the unbounded queue.
// The unbounded queue is entirely managed by the main thread.
// The thread safe queue is bounded in size so the unbounded
// queue is periodically flushed into the direct work queue.
internal
Sys_Post_Job_Sig(system_post_job){
Thread_Group *group = linuxvars.groups + group_id;
Unbounded_Work_Queue *queue = &group->queue;
u32 result = queue->next_job_id++;
while (queue->count >= queue->max){
i32 new_max = queue->max*2;
u32 job_size = sizeof(Full_Job_Data);
Full_Job_Data *new_jobs = (Full_Job_Data*)system_memory_allocate(new_max*job_size);
memcpy(new_jobs, queue->jobs, queue->count);
system_memory_free(queue->jobs, queue->max*job_size);
queue->jobs = new_jobs;
queue->max = new_max;
}
Full_Job_Data full_job;
full_job.job = job;
full_job.running_thread = THREAD_NOT_ASSIGNED;
full_job.id = result;
queue->jobs[queue->count++] = full_job;
Work_Queue *direct_queue = linuxvars.queues + group_id;
flush_to_direct_queue(queue, direct_queue, group->count);
return(result);
}
internal
Sys_Cancel_Job_Sig(system_cancel_job){
Thread_Group *group = linuxvars.groups + group_id;
Unbounded_Work_Queue *source_queue = &group->queue;
b32 handled_in_unbounded = false;
if (source_queue->skip < source_queue->count){
Full_Job_Data *first_job = source_queue->jobs + source_queue->skip;
if (first_job->id <= job_id){
u32 index = source_queue->skip + (job_id - first_job->id);
Full_Job_Data *job = source_queue->jobs + index;
job->running_thread = 0;
handled_in_unbounded = true;
}
}
if (!handled_in_unbounded){
Work_Queue *queue = linuxvars.queues + group_id;
Full_Job_Data *job = queue->jobs + (job_id % QUEUE_WRAP);
Assert(job->id == job_id);
u32 thread_id =
InterlockedCompareExchange(&job->running_thread,
0, THREAD_NOT_ASSIGNED);
if (thread_id != THREAD_NOT_ASSIGNED && thread_id != 0){
i32 thread_index = thread_id - 1;
i32 cancel_lock = group->cancel_lock0 + thread_index;
i32 cancel_cv = group->cancel_cv0 + thread_index;
Thread_Context *thread = group->threads + thread_index;
system_acquire_lock(cancel_lock);
thread->cancel = 1;
system_release_lock(FRAME_LOCK);
do{
system_wait_cv(cancel_lock, cancel_cv);
}while (thread->cancel == 1);
system_acquire_lock(FRAME_LOCK);
system_release_lock(cancel_lock);
}
}
}
internal
Sys_Check_Cancel_Sig(system_check_cancel){
b32 result = 0;
Thread_Group *group = linuxvars.groups + thread->group_id;
i32 thread_index = thread->id - 1;
i32 cancel_lock = group->cancel_lock0 + thread_index;
system_acquire_lock(cancel_lock);
if (thread->cancel){
result = 1;
}
system_release_lock(cancel_lock);
return(result);
}
internal
Sys_Grow_Thread_Memory_Sig(system_grow_thread_memory){
void *old_data;
i32 old_size, new_size;
system_acquire_lock(CANCEL_LOCK0 + memory->id - 1);
old_data = memory->data;
old_size = memory->size;
new_size = l_round_up_i32(memory->size*2, KB(4));
memory->data = system_memory_allocate(new_size);
memory->size = new_size;
if (old_data){
memcpy(memory->data, old_data, old_size);
system_memory_free(old_data, old_size);
}
system_release_lock(CANCEL_LOCK0 + memory->id - 1);
}
// //
// Debug // Debug
// //

View File

@ -22,53 +22,17 @@
#include <pthread.h> #include <pthread.h>
#include <semaphore.h> #include <semaphore.h>
#include <signal.h> #include <signal.h>
#include <xmmintrin.h>
#if defined(USE_LOG) #if defined(USE_LOG)
# include <stdio.h> # include <stdio.h>
#endif #endif
struct Thread_Context{
u32 job_id;
b32 running;
b32 cancel;
Work_Queue *queue;
u32 id;
u32 group_id;
pthread_t handle;
};
struct Thread_Group{
Thread_Context *threads;
i32 count;
Unbounded_Work_Queue queue;
i32 cancel_lock0;
i32 cancel_cv0;
};
struct Unix_Vars{ struct Unix_Vars{
b32 do_logging; b32 do_logging;
Thread_Memory *thread_memory;
Thread_Group groups[THREAD_GROUP_COUNT];
Work_Queue queues[THREAD_GROUP_COUNT];
pthread_mutex_t locks[LOCK_COUNT];
pthread_cond_t conds[8];
sem_t thread_semaphore;
}; };
static Unix_Vars unixvars; static Unix_Vars unixvars;
//
// Intrinsics
//
#define InterlockedCompareExchange(dest, ex, comp) \
__sync_val_compare_and_swap((dest), (comp), (ex))
// //
// 4ed Path // 4ed Path
// //
@ -298,24 +262,19 @@ Sys_Get_Canonical_Sig(system_get_canonical){
if(read_p == filename || read_p[0] == '/'){ if(read_p == filename || read_p[0] == '/'){
if(read_p[1] == '/'){ if(read_p[1] == '/'){
++read_p; ++read_p;
} } else if(read_p[1] == '.'){
else if (read_p[1] == '.'){
if(read_p[2] == '/' || !read_p[2]){ if(read_p[2] == '/' || !read_p[2]){
read_p += 2; read_p += 2;
} } else if(read_p[2] == '.' && (read_p[3] == '/' || !read_p[3])){
else if (read_p[2] == '.' && (read_p[3] == '/' || !read_p[3])){
while(write_p > path && *--write_p != '/'); while(write_p > path && *--write_p != '/');
read_p += 3; read_p += 3;
} } else {
else{
*write_p++ = *read_p++; *write_p++ = *read_p++;
} }
} } else {
else{
*write_p++ = *read_p++; *write_p++ = *read_p++;
} }
} } else {
else{
*write_p++ = *read_p++; *write_p++ = *read_p++;
} }
} }
@ -323,8 +282,7 @@ Sys_Get_Canonical_Sig(system_get_canonical){
if(max >= (write_p - path)){ if(max >= (write_p - path)){
memcpy(buffer, path, write_p - path); memcpy(buffer, path, write_p - path);
} } else {
else{
write_p = path; write_p = path;
} }
@ -504,314 +462,5 @@ Sys_Now_Time_Sig(system_now_time){
return(result); return(result);
} }
//
// Threads
//
internal
Sys_Acquire_Lock_Sig(system_acquire_lock){
pthread_mutex_lock(unixvars.locks + id);
}
internal
Sys_Release_Lock_Sig(system_release_lock){
pthread_mutex_unlock(unixvars.locks + id);
}
internal void
system_wait_cv(i32 lock_id, i32 cv_id){
pthread_cond_wait(unixvars.conds + cv_id, unixvars.locks + lock_id);
}
internal void
system_signal_cv(i32 lock_id, i32 cv_id){
pthread_cond_signal(unixvars.conds + cv_id);
}
internal void*
JobThreadProc(void* lpParameter){
Thread_Context *thread = (Thread_Context*)lpParameter;
Work_Queue *queue = unixvars.queues + thread->group_id;
Thread_Group *group = unixvars.groups + thread->group_id;
i32 thread_index = thread->id - 1;
i32 cancel_lock = group->cancel_lock0 + thread_index;
i32 cancel_cv = group->cancel_cv0 + thread_index;
Thread_Memory *thread_memory = unixvars.thread_memory + thread_index;
if (thread_memory->size == 0){
i32 new_size = KB(64);
thread_memory->data = system_memory_allocate(new_size);
thread_memory->size = new_size;
}
for (;;){
u32 read_index = queue->read_position;
u32 write_index = queue->write_position;
if (read_index != write_index){
// NOTE(allen): Previously I was wrapping by the job wrap then
// wrapping by the queue wrap. That was super stupid what was that?
// Now it just wraps by the queue wrap.
u32 next_read_index = (read_index + 1) % QUEUE_WRAP;
u32 safe_read_index = InterlockedCompareExchange(&queue->read_position, next_read_index, read_index);
if (safe_read_index == read_index){
Full_Job_Data *full_job = queue->jobs + safe_read_index;
// NOTE(allen): This is interlocked so that it plays nice
// with the cancel job routine, which may try to cancel this job
// at the same time that we try to run it
i32 safe_running_thread =
InterlockedCompareExchange(&full_job->running_thread,
thread->id, THREAD_NOT_ASSIGNED);
if (safe_running_thread == THREAD_NOT_ASSIGNED){
thread->job_id = full_job->id;
thread->running = 1;
full_job->job.callback(&linuxvars.system,
thread, thread_memory, full_job->job.data);
LinuxScheduleStep();
//full_job->running_thread = 0;
thread->running = 0;
system_acquire_lock(cancel_lock);
if (thread->cancel){
thread->cancel = 0;
system_signal_cv(cancel_lock, cancel_cv);
}
system_release_lock(cancel_lock);
}
}
}
else{
sem_wait(LinuxHandleToSem(queue->semaphore));
}
}
}
internal void
initialize_unbounded_queue(Unbounded_Work_Queue *source_queue){
i32 max = 512;
source_queue->jobs = (Full_Job_Data*)system_memory_allocate(max*sizeof(Full_Job_Data));
source_queue->count = 0;
source_queue->max = max;
source_queue->skip = 0;
}
inline i32
get_work_queue_available_space(i32 write, i32 read){
// NOTE(allen): The only time that queue->write_position == queue->read_position
// is allowed is when the queue is empty. Thus if
// queue->write_position+1 == queue->read_position the available space is zero.
// So these computations both end up leaving one slot unused. The only way I can
// think to easily eliminate this is to have read and write wrap at twice the size
// of the underlying array but modulo their values into the array then if write
// has caught up with read it still will not be equal... but lots of modulos... ehh.
i32 available_space = 0;
if (write >= read){
available_space = QUEUE_WRAP - (write - read) - 1;
}
else{
available_space = (read - write) - 1;
}
return(available_space);
}
#define UNBOUNDED_SKIP_MAX 128
internal void
flush_to_direct_queue(Unbounded_Work_Queue *source_queue, Work_Queue *queue, i32 thread_count){
// NOTE(allen): It is understood that read_position may be changed by other
// threads but it will only make more space in the queue if it is changed.
// Meanwhile write_position should not ever be changed by anything but the
// main thread in this system, so it will not be interlocked.
u32 read_position = queue->read_position;
u32 write_position = queue->write_position;
u32 available_space = get_work_queue_available_space(write_position, read_position);
u32 available_jobs = source_queue->count - source_queue->skip;
u32 writable_count = Min(available_space, available_jobs);
if (writable_count > 0){
u32 count1 = writable_count;
if (count1+write_position > QUEUE_WRAP){
count1 = QUEUE_WRAP - write_position;
}
u32 count2 = writable_count - count1;
Full_Job_Data *job_src1 = source_queue->jobs + source_queue->skip;
Full_Job_Data *job_src2 = job_src1 + count1;
Full_Job_Data *job_dst1 = queue->jobs + write_position;
Full_Job_Data *job_dst2 = queue->jobs;
Assert((job_src1->id % QUEUE_WRAP) == write_position);
memcpy(job_dst1, job_src1, sizeof(Full_Job_Data)*count1);
memcpy(job_dst2, job_src2, sizeof(Full_Job_Data)*count2);
queue->write_position = (write_position + writable_count) % QUEUE_WRAP;
source_queue->skip += writable_count;
if (source_queue->skip == source_queue->count){
source_queue->skip = source_queue->count = 0;
}
else if (source_queue->skip > UNBOUNDED_SKIP_MAX){
u32 left_over = source_queue->count - source_queue->skip;
memmove(source_queue->jobs, source_queue->jobs + source_queue->skip,
sizeof(Full_Job_Data)*left_over);
source_queue->count = left_over;
source_queue->skip = 0;
}
}
i32 semaphore_release_count = writable_count;
if (semaphore_release_count > thread_count){
semaphore_release_count = thread_count;
}
// NOTE(allen): platform dependent portion...
for (i32 i = 0; i < semaphore_release_count; ++i){
sem_post(LinuxHandleToSem(queue->semaphore));
}
}
internal void
flush_thread_group(i32 group_id){
Thread_Group *group = linuxvars.groups + group_id;
Work_Queue *queue = linuxvars.queues + group_id;
Unbounded_Work_Queue *source_queue = &group->queue;
flush_to_direct_queue(source_queue, queue, group->count);
}
// Note(allen): post_job puts the job on the unbounded queue.
// The unbounded queue is entirely managed by the main thread.
// The thread safe queue is bounded in size so the unbounded
// queue is periodically flushed into the direct work queue.
internal
Sys_Post_Job_Sig(system_post_job){
Thread_Group *group = linuxvars.groups + group_id;
Unbounded_Work_Queue *queue = &group->queue;
u32 result = queue->next_job_id++;
while (queue->count >= queue->max){
i32 new_max = queue->max*2;
u32 job_size = sizeof(Full_Job_Data);
Full_Job_Data *new_jobs = (Full_Job_Data*)system_memory_allocate(new_max*job_size);
memcpy(new_jobs, queue->jobs, queue->count);
system_memory_free(queue->jobs, queue->max*job_size);
queue->jobs = new_jobs;
queue->max = new_max;
}
Full_Job_Data full_job;
full_job.job = job;
full_job.running_thread = THREAD_NOT_ASSIGNED;
full_job.id = result;
queue->jobs[queue->count++] = full_job;
Work_Queue *direct_queue = linuxvars.queues + group_id;
flush_to_direct_queue(queue, direct_queue, group->count);
return(result);
}
internal
Sys_Cancel_Job_Sig(system_cancel_job){
Thread_Group *group = linuxvars.groups + group_id;
Unbounded_Work_Queue *source_queue = &group->queue;
b32 handled_in_unbounded = false;
if (source_queue->skip < source_queue->count){
Full_Job_Data *first_job = source_queue->jobs + source_queue->skip;
if (first_job->id <= job_id){
u32 index = source_queue->skip + (job_id - first_job->id);
Full_Job_Data *job = source_queue->jobs + index;
job->running_thread = 0;
handled_in_unbounded = true;
}
}
if (!handled_in_unbounded){
Work_Queue *queue = linuxvars.queues + group_id;
Full_Job_Data *job = queue->jobs + (job_id % QUEUE_WRAP);
Assert(job->id == job_id);
u32 thread_id =
InterlockedCompareExchange(&job->running_thread,
0, THREAD_NOT_ASSIGNED);
if (thread_id != THREAD_NOT_ASSIGNED && thread_id != 0){
i32 thread_index = thread_id - 1;
i32 cancel_lock = group->cancel_lock0 + thread_index;
i32 cancel_cv = group->cancel_cv0 + thread_index;
Thread_Context *thread = group->threads + thread_index;
system_acquire_lock(cancel_lock);
thread->cancel = 1;
system_release_lock(FRAME_LOCK);
do{
system_wait_cv(cancel_lock, cancel_cv);
}while (thread->cancel == 1);
system_acquire_lock(FRAME_LOCK);
system_release_lock(cancel_lock);
}
}
}
internal
Sys_Check_Cancel_Sig(system_check_cancel){
b32 result = 0;
Thread_Group *group = linuxvars.groups + thread->group_id;
i32 thread_index = thread->id - 1;
i32 cancel_lock = group->cancel_lock0 + thread_index;
system_acquire_lock(cancel_lock);
if (thread->cancel){
result = 1;
}
system_release_lock(cancel_lock);
return(result);
}
internal
Sys_Grow_Thread_Memory_Sig(system_grow_thread_memory){
void *old_data;
i32 old_size, new_size;
system_acquire_lock(CANCEL_LOCK0 + memory->id - 1);
old_data = memory->data;
old_size = memory->size;
new_size = l_round_up_i32(memory->size*2, KB(4));
memory->data = system_memory_allocate(new_size);
memory->size = new_size;
if (old_data){
memcpy(memory->data, old_data, old_size);
system_memory_free(old_data, old_size);
}
system_release_lock(CANCEL_LOCK0 + memory->id - 1);
}
// BOTTOM // BOTTOM