ntdll: Reimplement RtlQueueWorkItem on top of new threadpool API.
This commit is contained in:
parent
c774a8c3ae
commit
9562e81810
|
@ -42,44 +42,28 @@ WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
|
|||
* Old thread pooling API
|
||||
*/
|
||||
|
||||
#define OLD_WORKER_TIMEOUT 30000 /* 30 seconds */
|
||||
struct rtl_work_item
|
||||
{
|
||||
PRTL_WORK_ITEM_ROUTINE function;
|
||||
PVOID context;
|
||||
};
|
||||
|
||||
#define EXPIRE_NEVER (~(ULONGLONG)0)
|
||||
#define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
|
||||
|
||||
static RTL_CRITICAL_SECTION_DEBUG critsect_debug;
|
||||
static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug;
|
||||
|
||||
static struct
|
||||
{
|
||||
/* threadpool_cs must be held while modifying the following four elements */
|
||||
struct list work_item_list;
|
||||
LONG num_workers;
|
||||
LONG num_busy_workers;
|
||||
LONG num_items_processed;
|
||||
RTL_CONDITION_VARIABLE threadpool_cond;
|
||||
RTL_CRITICAL_SECTION threadpool_cs;
|
||||
HANDLE compl_port;
|
||||
RTL_CRITICAL_SECTION threadpool_compl_cs;
|
||||
}
|
||||
old_threadpool =
|
||||
{
|
||||
LIST_INIT(old_threadpool.work_item_list), /* work_item_list */
|
||||
0, /* num_workers */
|
||||
0, /* num_busy_workers */
|
||||
0, /* num_items_processed */
|
||||
RTL_CONDITION_VARIABLE_INIT, /* threadpool_cond */
|
||||
{ &critsect_debug, -1, 0, 0, 0, 0 }, /* threadpool_cs */
|
||||
NULL, /* compl_port */
|
||||
{ &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
|
||||
};
|
||||
|
||||
static RTL_CRITICAL_SECTION_DEBUG critsect_debug =
|
||||
{
|
||||
0, 0, &old_threadpool.threadpool_cs,
|
||||
{ &critsect_debug.ProcessLocksList, &critsect_debug.ProcessLocksList },
|
||||
0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") }
|
||||
};
|
||||
|
||||
static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
|
||||
{
|
||||
0, 0, &old_threadpool.threadpool_compl_cs,
|
||||
|
@ -87,13 +71,6 @@ static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
|
|||
0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
|
||||
};
|
||||
|
||||
struct work_item
|
||||
{
|
||||
struct list entry;
|
||||
PRTL_WORK_ITEM_ROUTINE function;
|
||||
PVOID context;
|
||||
};
|
||||
|
||||
struct wait_work_item
|
||||
{
|
||||
HANDLE Object;
|
||||
|
@ -364,47 +341,14 @@ static inline LONG interlocked_dec( PLONG dest )
|
|||
return interlocked_xchg_add( dest, -1 ) - 1;
|
||||
}
|
||||
|
||||
static void WINAPI worker_thread_proc(void * param)
|
||||
static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata )
|
||||
{
|
||||
struct list *item;
|
||||
struct work_item *work_item_ptr, work_item;
|
||||
LARGE_INTEGER timeout;
|
||||
timeout.QuadPart = -(OLD_WORKER_TIMEOUT * (ULONGLONG)10000);
|
||||
struct rtl_work_item *item = userdata;
|
||||
|
||||
RtlEnterCriticalSection( &old_threadpool.threadpool_cs );
|
||||
old_threadpool.num_workers++;
|
||||
TRACE("executing %p(%p)\n", item->function, item->context);
|
||||
item->function( item->context );
|
||||
|
||||
for (;;)
|
||||
{
|
||||
if ((item = list_head( &old_threadpool.work_item_list )))
|
||||
{
|
||||
work_item_ptr = LIST_ENTRY( item, struct work_item, entry );
|
||||
list_remove( &work_item_ptr->entry );
|
||||
old_threadpool.num_busy_workers++;
|
||||
old_threadpool.num_items_processed++;
|
||||
RtlLeaveCriticalSection( &old_threadpool.threadpool_cs );
|
||||
|
||||
/* copy item to stack and do the work */
|
||||
work_item = *work_item_ptr;
|
||||
RtlFreeHeap( GetProcessHeap(), 0, work_item_ptr );
|
||||
TRACE("executing %p(%p)\n", work_item.function, work_item.context);
|
||||
work_item.function( work_item.context );
|
||||
|
||||
RtlEnterCriticalSection( &old_threadpool.threadpool_cs );
|
||||
old_threadpool.num_busy_workers--;
|
||||
}
|
||||
else if (RtlSleepConditionVariableCS( &old_threadpool.threadpool_cond,
|
||||
&old_threadpool.threadpool_cs, &timeout ) != STATUS_SUCCESS)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
old_threadpool.num_workers--;
|
||||
RtlLeaveCriticalSection( &old_threadpool.threadpool_cs );
|
||||
RtlExitUserThread( 0 );
|
||||
|
||||
/* never reached */
|
||||
RtlFreeHeap( GetProcessHeap(), 0, item );
|
||||
}
|
||||
|
||||
/***********************************************************************
|
||||
|
@ -413,9 +357,9 @@ static void WINAPI worker_thread_proc(void * param)
|
|||
* Queues a work item into a thread in the thread pool.
|
||||
*
|
||||
* PARAMS
|
||||
* Function [I] Work function to execute.
|
||||
* Context [I] Context to pass to the work function when it is executed.
|
||||
* Flags [I] Flags. See notes.
|
||||
* function [I] Work function to execute.
|
||||
* context [I] Context to pass to the work function when it is executed.
|
||||
* flags [I] Flags. See notes.
|
||||
*
|
||||
* RETURNS
|
||||
* Success: STATUS_SUCCESS.
|
||||
|
@ -429,59 +373,26 @@ static void WINAPI worker_thread_proc(void * param)
|
|||
*|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
|
||||
*|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
|
||||
*/
|
||||
NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context, ULONG Flags)
|
||||
NTSTATUS WINAPI RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG flags )
|
||||
{
|
||||
HANDLE thread;
|
||||
TP_CALLBACK_ENVIRON environment;
|
||||
struct rtl_work_item *item;
|
||||
NTSTATUS status;
|
||||
LONG items_processed;
|
||||
struct work_item *work_item = RtlAllocateHeap(GetProcessHeap(), 0, sizeof(struct work_item));
|
||||
|
||||
if (!work_item)
|
||||
item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
|
||||
if (!item)
|
||||
return STATUS_NO_MEMORY;
|
||||
|
||||
work_item->function = Function;
|
||||
work_item->context = Context;
|
||||
memset( &environment, 0, sizeof(environment) );
|
||||
environment.Version = 1;
|
||||
environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
|
||||
environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
|
||||
|
||||
if (Flags & ~WT_EXECUTELONGFUNCTION)
|
||||
FIXME("Flags 0x%x not supported\n", Flags);
|
||||
|
||||
RtlEnterCriticalSection( &old_threadpool.threadpool_cs );
|
||||
list_add_tail( &old_threadpool.work_item_list, &work_item->entry );
|
||||
status = (old_threadpool.num_workers > old_threadpool.num_busy_workers) ?
|
||||
STATUS_SUCCESS : STATUS_UNSUCCESSFUL;
|
||||
items_processed = old_threadpool.num_items_processed;
|
||||
RtlLeaveCriticalSection( &old_threadpool.threadpool_cs );
|
||||
|
||||
/* FIXME: tune this algorithm to not be as aggressive with creating threads
|
||||
* if WT_EXECUTELONGFUNCTION isn't specified */
|
||||
if (status == STATUS_SUCCESS)
|
||||
RtlWakeConditionVariable( &old_threadpool.threadpool_cond );
|
||||
else
|
||||
{
|
||||
status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
|
||||
worker_thread_proc, NULL, &thread, NULL );
|
||||
|
||||
/* NOTE: we don't care if we couldn't create the thread if there is at
|
||||
* least one other available to process the request */
|
||||
if (status == STATUS_SUCCESS)
|
||||
NtClose( thread );
|
||||
else
|
||||
{
|
||||
RtlEnterCriticalSection( &old_threadpool.threadpool_cs );
|
||||
if (old_threadpool.num_workers > 0 ||
|
||||
old_threadpool.num_items_processed != items_processed)
|
||||
{
|
||||
status = STATUS_SUCCESS;
|
||||
}
|
||||
else
|
||||
list_remove( &work_item->entry );
|
||||
RtlLeaveCriticalSection( &old_threadpool.threadpool_cs );
|
||||
|
||||
if (status != STATUS_SUCCESS)
|
||||
RtlFreeHeap( GetProcessHeap(), 0, work_item );
|
||||
}
|
||||
}
|
||||
item->function = function;
|
||||
item->context = context;
|
||||
|
||||
status = TpSimpleTryPost( process_rtl_work_item, item, &environment );
|
||||
if (status) RtlFreeHeap( GetProcessHeap(), 0, item );
|
||||
return status;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue