ntdll: Start of pooling support for RtlQueueWorkItem.
Start of pooling support for RtlQueueWorkItem. The algorithm implmented is very simple - if there are no free threads, create a new one.
This commit is contained in:
parent
6f2b0fbf94
commit
f40f81b6d5
|
@ -18,7 +18,11 @@
|
|||
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
*/
|
||||
|
||||
#include "config.h"
|
||||
#include "wine/port.h"
|
||||
|
||||
#include <stdarg.h>
|
||||
#include <limits.h>
|
||||
|
||||
#define NONAMELESSUNION
|
||||
#include "ntstatus.h"
|
||||
|
@ -26,35 +30,123 @@
|
|||
#include "winternl.h"
|
||||
|
||||
#include "wine/debug.h"
|
||||
#include "wine/list.h"
|
||||
|
||||
#include "ntdll_misc.h"
|
||||
|
||||
WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
|
||||
|
||||
#define WORKER_TIMEOUT 30000 /* 30 seconds */
|
||||
|
||||
static LONG num_workers;
|
||||
static LONG num_work_items;
|
||||
static LONG num_busy_workers;
|
||||
|
||||
static struct list work_item_list = LIST_INIT(work_item_list);
|
||||
static HANDLE work_item_event;
|
||||
|
||||
static CRITICAL_SECTION threadpool_cs;
|
||||
static CRITICAL_SECTION_DEBUG critsect_debug =
|
||||
{
|
||||
0, 0, &threadpool_cs,
|
||||
{ &critsect_debug.ProcessLocksList, &critsect_debug.ProcessLocksList },
|
||||
0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") }
|
||||
};
|
||||
static CRITICAL_SECTION threadpool_cs = { &critsect_debug, -1, 0, 0, 0, 0 };
|
||||
|
||||
struct work_item
|
||||
{
|
||||
struct list entry;
|
||||
PRTL_WORK_ITEM_ROUTINE function;
|
||||
PVOID context;
|
||||
};
|
||||
|
||||
inline static LONG interlocked_inc( PLONG dest )
|
||||
{
|
||||
return interlocked_xchg_add( (int *)dest, 1 ) + 1;
|
||||
}
|
||||
|
||||
inline static LONG interlocked_dec( PLONG dest )
|
||||
{
|
||||
return interlocked_xchg_add( (int *)dest, -1 ) - 1;
|
||||
}
|
||||
|
||||
static void WINAPI worker_thread_proc(void * param)
|
||||
{
|
||||
struct work_item *work_item_ptr = (struct work_item *)param;
|
||||
struct work_item work_item;
|
||||
interlocked_inc(&num_workers);
|
||||
|
||||
/* free the work item memory sooner to reduce memory usage */
|
||||
work_item = *work_item_ptr;
|
||||
RtlFreeHeap(GetProcessHeap(), 0, work_item_ptr);
|
||||
while (TRUE)
|
||||
{
|
||||
if (num_work_items > 0)
|
||||
{
|
||||
struct list *item;
|
||||
RtlEnterCriticalSection(&threadpool_cs);
|
||||
item = list_head(&work_item_list);
|
||||
if (item)
|
||||
{
|
||||
struct work_item *work_item_ptr = LIST_ENTRY(item, struct work_item, entry);
|
||||
struct work_item work_item;
|
||||
list_remove(&work_item_ptr->entry);
|
||||
interlocked_dec(&num_work_items);
|
||||
|
||||
TRACE("executing %p(%p)\n", work_item.function, work_item.context);
|
||||
RtlLeaveCriticalSection(&threadpool_cs);
|
||||
|
||||
/* do the work */
|
||||
work_item.function(work_item.context);
|
||||
work_item = *work_item_ptr;
|
||||
RtlFreeHeap(GetProcessHeap(), 0, work_item_ptr);
|
||||
|
||||
TRACE("executing %p(%p)\n", work_item.function, work_item.context);
|
||||
|
||||
interlocked_inc(&num_busy_workers);
|
||||
|
||||
/* do the work */
|
||||
work_item.function(work_item.context);
|
||||
|
||||
interlocked_dec(&num_busy_workers);
|
||||
}
|
||||
else
|
||||
RtlLeaveCriticalSection(&threadpool_cs);
|
||||
}
|
||||
else
|
||||
{
|
||||
NTSTATUS status;
|
||||
LARGE_INTEGER timeout;
|
||||
timeout.QuadPart = -(WORKER_TIMEOUT * (ULONGLONG)10000);
|
||||
status = NtWaitForSingleObject(work_item_event, FALSE, &timeout);
|
||||
if (status != STATUS_WAIT_0)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
interlocked_dec(&num_workers);
|
||||
|
||||
RtlExitUserThread(0);
|
||||
|
||||
/* never reached */
|
||||
}
|
||||
|
||||
static NTSTATUS add_work_item_to_queue(struct work_item *work_item)
|
||||
{
|
||||
NTSTATUS status;
|
||||
|
||||
RtlEnterCriticalSection(&threadpool_cs);
|
||||
list_add_tail(&work_item_list, &work_item->entry);
|
||||
num_work_items++;
|
||||
RtlLeaveCriticalSection(&threadpool_cs);
|
||||
|
||||
if (!work_item_event)
|
||||
{
|
||||
HANDLE sem;
|
||||
status = NtCreateSemaphore(&sem, SEMAPHORE_ALL_ACCESS, NULL, 1, LONG_MAX);
|
||||
if (interlocked_cmpxchg_ptr( (PVOID *)&work_item_event, (PVOID)sem, 0 ))
|
||||
NtClose(sem); /* somebody beat us to it */
|
||||
}
|
||||
else
|
||||
status = NtReleaseSemaphore(work_item_event, 1, NULL);
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
/***********************************************************************
|
||||
* RtlQueueWorkItem (NTDLL.@)
|
||||
*
|
||||
|
@ -92,16 +184,35 @@ NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context,
|
|||
if (Flags != WT_EXECUTEDEFAULT)
|
||||
FIXME("Flags 0x%lx not supported\n", Flags);
|
||||
|
||||
/* FIXME: very crude implementation that doesn't support pooling at all */
|
||||
status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
|
||||
NULL, 0, 0,
|
||||
worker_thread_proc, work_item, &thread, NULL );
|
||||
status = add_work_item_to_queue(work_item);
|
||||
|
||||
if ((status == STATUS_SUCCESS) &&
|
||||
((num_workers == 0) || (num_workers == num_busy_workers)))
|
||||
{
|
||||
status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
|
||||
NULL, 0, 0,
|
||||
worker_thread_proc, NULL, &thread, NULL );
|
||||
if (status == STATUS_SUCCESS)
|
||||
NtClose( thread );
|
||||
|
||||
/* NOTE: we don't care if we couldn't create the thread if there is at
|
||||
* least one other available to process the request */
|
||||
if ((num_workers > 0) && (status != STATUS_SUCCESS))
|
||||
status = STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
if (status != STATUS_SUCCESS)
|
||||
{
|
||||
RtlEnterCriticalSection(&threadpool_cs);
|
||||
|
||||
interlocked_dec(&num_work_items);
|
||||
list_remove(&work_item->entry);
|
||||
RtlFreeHeap(GetProcessHeap(), 0, work_item);
|
||||
|
||||
RtlLeaveCriticalSection(&threadpool_cs);
|
||||
|
||||
return status;
|
||||
}
|
||||
NtClose( thread );
|
||||
|
||||
return STATUS_SUCCESS;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue