diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c index e2fc6a5989a..f66f6abe683 100644 --- a/dlls/ntdll/threadpool.c +++ b/dlls/ntdll/threadpool.c @@ -39,12 +39,13 @@ WINE_DEFAULT_DEBUG_CHANNEL(threadpool); #define WORKER_TIMEOUT 30000 /* 30 seconds */ -static LONG num_workers; -static LONG num_work_items; -static LONG num_busy_workers; - +/* threadpool_cs must be held while modifying the following elements */ static struct list work_item_list = LIST_INIT(work_item_list); -static HANDLE work_item_event; +static LONG num_workers; +static LONG num_busy_workers; +static LONG num_items_processed; + +static RTL_CONDITION_VARIABLE threadpool_cond = RTL_CONDITION_VARIABLE_INIT; static RTL_CRITICAL_SECTION threadpool_cs; static RTL_CRITICAL_SECTION_DEBUG critsect_debug = @@ -84,80 +85,44 @@ static inline LONG interlocked_dec( PLONG dest ) static void WINAPI worker_thread_proc(void * param) { - interlocked_inc(&num_workers); + struct list *item; + struct work_item *work_item_ptr, work_item; + LARGE_INTEGER timeout; + timeout.QuadPart = -(WORKER_TIMEOUT * (ULONGLONG)10000); - /* free the work item memory sooner to reduce memory usage */ - while (TRUE) + RtlEnterCriticalSection( &threadpool_cs ); + num_workers++; + + for (;;) { - if (num_work_items > 0) + if ((item = list_head( &work_item_list ))) { - struct list *item; - RtlEnterCriticalSection(&threadpool_cs); - item = list_head(&work_item_list); - if (item) - { - struct work_item *work_item_ptr = LIST_ENTRY(item, struct work_item, entry); - struct work_item work_item; - list_remove(&work_item_ptr->entry); - interlocked_dec(&num_work_items); + work_item_ptr = LIST_ENTRY( item, struct work_item, entry ); + list_remove( &work_item_ptr->entry ); + num_busy_workers++; + num_items_processed++; + RtlLeaveCriticalSection( &threadpool_cs ); - RtlLeaveCriticalSection(&threadpool_cs); + /* copy item to stack and do the work */ + work_item = *work_item_ptr; + RtlFreeHeap( GetProcessHeap(), 0, work_item_ptr ); + TRACE("executing %p(%p)\n", work_item.function, work_item.context); + work_item.function( work_item.context ); - work_item = *work_item_ptr; - RtlFreeHeap(GetProcessHeap(), 0, work_item_ptr); - - TRACE("executing %p(%p)\n", work_item.function, work_item.context); - - interlocked_inc(&num_busy_workers); - - /* do the work */ - work_item.function(work_item.context); - - interlocked_dec(&num_busy_workers); - } - else - RtlLeaveCriticalSection(&threadpool_cs); - } - else - { - NTSTATUS status; - LARGE_INTEGER timeout; - timeout.QuadPart = -(WORKER_TIMEOUT * (ULONGLONG)10000); - status = NtWaitForSingleObject(work_item_event, FALSE, &timeout); - if (status != STATUS_WAIT_0) - break; + RtlEnterCriticalSection( &threadpool_cs ); + num_busy_workers--; } + else if (RtlSleepConditionVariableCS( &threadpool_cond, &threadpool_cs, &timeout ) != STATUS_SUCCESS) + break; } - interlocked_dec(&num_workers); - - RtlExitUserThread(0); + num_workers--; + RtlLeaveCriticalSection( &threadpool_cs ); + RtlExitUserThread( 0 ); /* never reached */ } -static NTSTATUS add_work_item_to_queue(struct work_item *work_item) -{ - NTSTATUS status; - - RtlEnterCriticalSection(&threadpool_cs); - list_add_tail(&work_item_list, &work_item->entry); - num_work_items++; - RtlLeaveCriticalSection(&threadpool_cs); - - if (!work_item_event) - { - HANDLE sem; - status = NtCreateSemaphore(&sem, SEMAPHORE_ALL_ACCESS, NULL, 1, INT_MAX); - if (interlocked_cmpxchg_ptr( &work_item_event, sem, 0 )) - NtClose(sem); /* somebody beat us to it */ - } - else - status = NtReleaseSemaphore(work_item_event, 1, NULL); - - return status; -} - /*********************************************************************** * RtlQueueWorkItem (NTDLL.@) * @@ -184,6 +149,7 @@ NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context, { HANDLE thread; NTSTATUS status; + LONG items_processed; struct work_item *work_item = RtlAllocateHeap(GetProcessHeap(), 0, sizeof(struct work_item)); if (!work_item) @@ -195,39 +161,40 @@ NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context, if (Flags & ~WT_EXECUTELONGFUNCTION) FIXME("Flags 0x%x not supported\n", Flags); - status = add_work_item_to_queue(work_item); + RtlEnterCriticalSection( &threadpool_cs ); + list_add_tail( &work_item_list, &work_item->entry ); + status = (num_workers > num_busy_workers) ? STATUS_SUCCESS : STATUS_UNSUCCESSFUL; + items_processed = num_items_processed; + RtlLeaveCriticalSection( &threadpool_cs ); /* FIXME: tune this algorithm to not be as aggressive with creating threads * if WT_EXECUTELONGFUNCTION isn't specified */ - if ((status == STATUS_SUCCESS) && - ((num_workers == 0) || (num_workers == num_busy_workers))) + if (status == STATUS_SUCCESS) + RtlWakeConditionVariable( &threadpool_cond ); + else { - status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, - NULL, 0, 0, - worker_thread_proc, NULL, &thread, NULL ); - if (status == STATUS_SUCCESS) - NtClose( thread ); + status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0, + worker_thread_proc, NULL, &thread, NULL ); /* NOTE: we don't care if we couldn't create the thread if there is at * least one other available to process the request */ - if ((num_workers > 0) && (status != STATUS_SUCCESS)) - status = STATUS_SUCCESS; + if (status == STATUS_SUCCESS) + NtClose( thread ); + else + { + RtlEnterCriticalSection( &threadpool_cs ); + if (num_workers > 0 || num_items_processed != items_processed) + status = STATUS_SUCCESS; + else + list_remove( &work_item->entry ); + RtlLeaveCriticalSection( &threadpool_cs ); + + if (status != STATUS_SUCCESS) + RtlFreeHeap( GetProcessHeap(), 0, work_item ); + } } - if (status != STATUS_SUCCESS) - { - RtlEnterCriticalSection(&threadpool_cs); - - interlocked_dec(&num_work_items); - list_remove(&work_item->entry); - RtlFreeHeap(GetProcessHeap(), 0, work_item); - - RtlLeaveCriticalSection(&threadpool_cs); - - return status; - } - - return STATUS_SUCCESS; + return status; } /***********************************************************************