vcomp: Add multithreaded implementation for _vcomp_fork.

This commit is contained in:
Sebastian Lackner 2015-07-16 15:00:20 +02:00 committed by Alexandre Julliard
parent ad1077f94b
commit ee34265f8f
1 changed files with 172 additions and 8 deletions

View File

@ -4,6 +4,7 @@
*
* Copyright 2011 Austin English
* Copyright 2012 Dan Kegel
* Copyright 2015 Sebastian Lackner
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
@ -23,22 +24,52 @@
#include "config.h"
#include <stdarg.h>
#include <assert.h>
#include "windef.h"
#include "winbase.h"
#include "wine/debug.h"
#include "wine/list.h"
WINE_DEFAULT_DEBUG_CHANNEL(vcomp);
static struct list vcomp_idle_threads = LIST_INIT(vcomp_idle_threads);
static DWORD vcomp_context_tls = TLS_OUT_OF_INDEXES;
static HMODULE vcomp_module;
static int vcomp_max_threads;
static int vcomp_num_threads;
static BOOL vcomp_nested_fork = FALSE;
static RTL_CRITICAL_SECTION vcomp_section;
static RTL_CRITICAL_SECTION_DEBUG critsect_debug =
{
0, 0, &vcomp_section,
{ &critsect_debug.ProcessLocksList, &critsect_debug.ProcessLocksList },
0, 0, { (DWORD_PTR)(__FILE__ ": vcomp_section") }
};
static RTL_CRITICAL_SECTION vcomp_section = { &critsect_debug, -1, 0, 0, 0, 0 };
struct vcomp_thread_data
{
struct vcomp_team_data *team;
int thread_num;
int fork_threads;
/* only used for concurrent tasks */
struct list entry;
CONDITION_VARIABLE cond;
};
struct vcomp_team_data
{
CONDITION_VARIABLE cond;
int num_threads;
int finished_threads;
/* callback arguments */
int nargs;
void *wrapper;
__ms_va_list valist;
};
#if defined(__i386__)
@ -145,6 +176,7 @@ static struct vcomp_thread_data *vcomp_init_thread_data(void)
ExitProcess(1);
}
thread_data->team = NULL;
thread_data->thread_num = 0;
thread_data->fork_threads = 0;
@ -187,8 +219,9 @@ int CDECL omp_get_num_procs(void)
int CDECL omp_get_num_threads(void)
{
TRACE("stub\n");
return 1;
struct vcomp_team_data *team_data = vcomp_init_thread_data()->team;
TRACE("()\n");
return team_data ? team_data->num_threads : 1;
}
int CDECL omp_get_thread_num(void)
@ -244,15 +277,145 @@ void CDECL _vcomp_single_end(void)
TRACE("stub\n");
}
static DWORD WINAPI _vcomp_fork_worker(void *param)
{
struct vcomp_thread_data *thread_data = param;
vcomp_set_thread_data(thread_data);
TRACE("starting worker thread for %p\n", thread_data);
EnterCriticalSection(&vcomp_section);
for (;;)
{
struct vcomp_team_data *team = thread_data->team;
if (team != NULL)
{
LeaveCriticalSection(&vcomp_section);
_vcomp_fork_call_wrapper(team->wrapper, team->nargs, team->valist);
EnterCriticalSection(&vcomp_section);
thread_data->team = NULL;
list_remove(&thread_data->entry);
list_add_tail(&vcomp_idle_threads, &thread_data->entry);
if (++team->finished_threads >= team->num_threads)
WakeAllConditionVariable(&team->cond);
}
if (!SleepConditionVariableCS(&thread_data->cond, &vcomp_section, 5000) &&
GetLastError() == ERROR_TIMEOUT && !thread_data->team)
{
break;
}
}
list_remove(&thread_data->entry);
LeaveCriticalSection(&vcomp_section);
TRACE("terminating worker thread for %p\n", thread_data);
HeapFree(GetProcessHeap(), 0, thread_data);
vcomp_set_thread_data(NULL);
FreeLibraryAndExitThread(vcomp_module, 0);
return 0;
}
void WINAPIV _vcomp_fork(BOOL ifval, int nargs, void *wrapper, ...)
{
__ms_va_list valist;
struct vcomp_thread_data *prev_thread_data = vcomp_init_thread_data();
struct vcomp_thread_data thread_data;
struct vcomp_team_data team_data;
int num_threads;
TRACE("(%d, %d, %p, ...)\n", ifval, nargs, wrapper);
__ms_va_start(valist, wrapper);
_vcomp_fork_call_wrapper(wrapper, nargs, valist);
__ms_va_end(valist);
if (!ifval)
num_threads = 1;
else if (prev_thread_data->team && !vcomp_nested_fork)
num_threads = 1;
else if (prev_thread_data->fork_threads)
num_threads = prev_thread_data->fork_threads;
else
num_threads = vcomp_num_threads;
InitializeConditionVariable(&team_data.cond);
team_data.num_threads = 1;
team_data.finished_threads = 0;
team_data.nargs = nargs;
team_data.wrapper = wrapper;
__ms_va_start(team_data.valist, wrapper);
thread_data.team = &team_data;
thread_data.thread_num = 0;
thread_data.fork_threads = 0;
list_init(&thread_data.entry);
InitializeConditionVariable(&thread_data.cond);
if (num_threads > 1)
{
struct list *ptr;
EnterCriticalSection(&vcomp_section);
/* reuse existing threads (if any) */
while (team_data.num_threads < num_threads && (ptr = list_head(&vcomp_idle_threads)))
{
struct vcomp_thread_data *data = LIST_ENTRY(ptr, struct vcomp_thread_data, entry);
data->team = &team_data;
data->thread_num = team_data.num_threads++;
data->fork_threads = 0;
list_remove(&data->entry);
list_add_tail(&thread_data.entry, &data->entry);
WakeAllConditionVariable(&data->cond);
}
/* spawn additional threads */
while (team_data.num_threads < num_threads)
{
struct vcomp_thread_data *data;
HMODULE module;
HANDLE thread;
data = HeapAlloc(GetProcessHeap(), 0, sizeof(*data));
if (!data) break;
data->team = &team_data;
data->thread_num = team_data.num_threads;
data->fork_threads = 0;
InitializeConditionVariable(&data->cond);
thread = CreateThread(NULL, 0, _vcomp_fork_worker, data, 0, NULL);
if (!thread)
{
HeapFree(GetProcessHeap(), 0, data);
break;
}
GetModuleHandleExW(GET_MODULE_HANDLE_EX_FLAG_FROM_ADDRESS,
(const WCHAR *)vcomp_module, &module);
team_data.num_threads++;
list_add_tail(&thread_data.entry, &data->entry);
CloseHandle(thread);
}
LeaveCriticalSection(&vcomp_section);
}
vcomp_set_thread_data(&thread_data);
_vcomp_fork_call_wrapper(team_data.wrapper, team_data.nargs, team_data.valist);
vcomp_set_thread_data(prev_thread_data);
prev_thread_data->fork_threads = 0;
if (team_data.num_threads > 1)
{
EnterCriticalSection(&vcomp_section);
team_data.finished_threads++;
while (team_data.finished_threads < team_data.num_threads)
SleepConditionVariableCS(&team_data.cond, &vcomp_section, INFINITE);
LeaveCriticalSection(&vcomp_section);
assert(list_empty(&thread_data.entry));
}
__ms_va_end(team_data.valist);
}
BOOL WINAPI DllMain(HINSTANCE instance, DWORD reason, LPVOID reserved)
@ -275,6 +438,7 @@ BOOL WINAPI DllMain(HINSTANCE instance, DWORD reason, LPVOID reserved)
}
GetSystemInfo(&sysinfo);
vcomp_module = instance;
vcomp_max_threads = sysinfo.dwNumberOfProcessors;
vcomp_num_threads = sysinfo.dwNumberOfProcessors;
break;