diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec index 270d7ff8d15..cd4e8539fe0 100644 --- a/dlls/ntdll/ntdll.spec +++ b/dlls/ntdll/ntdll.spec @@ -970,6 +970,9 @@ @ stdcall RtlxOemStringToUnicodeSize(ptr) RtlOemStringToUnicodeSize @ stdcall RtlxUnicodeStringToAnsiSize(ptr) RtlUnicodeStringToAnsiSize @ stdcall RtlxUnicodeStringToOemSize(ptr) RtlUnicodeStringToOemSize +@ stdcall TpAllocPool(ptr ptr) +@ stdcall TpReleasePool(ptr) +@ stdcall TpSimpleTryPost(ptr ptr ptr) @ stdcall -ret64 VerSetConditionMask(int64 long long) @ stdcall WinSqmIsOptedIn() @ stdcall ZwAcceptConnectPort(ptr long ptr long long ptr) NtAcceptConnectPort diff --git a/dlls/ntdll/tests/threadpool.c b/dlls/ntdll/tests/threadpool.c index 2e31b344a43..6f164e95c19 100644 --- a/dlls/ntdll/tests/threadpool.c +++ b/dlls/ntdll/tests/threadpool.c @@ -48,7 +48,7 @@ static BOOL init_threadpool(void) if (!pTpAllocPool) { - skip("Threadpool functions not supported, skipping tests\n"); + win_skip("Threadpool functions not supported, skipping tests\n"); return FALSE; } @@ -105,6 +105,7 @@ static void test_tp_simple(void) environment.Version = 9999; environment.Pool = pool; status = pTpSimpleTryPost(simple_cb, semaphore, &environment); + todo_wine ok(status == STATUS_INVALID_PARAMETER || broken(!status) /* Vista/2008 */, "TpSimpleTryPost unexpectedly returned status %x\n", status); if (!status) diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c index 513c13d6ab7..d8dc929a5c5 100644 --- a/dlls/ntdll/threadpool.c +++ b/dlls/ntdll/threadpool.c @@ -2,6 +2,7 @@ * Thread pooling * * Copyright (c) 2006 Robert Shearman + * Copyright (c) 2014-2015 Sebastian Lackner * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -37,6 +38,10 @@ WINE_DEFAULT_DEBUG_CHANNEL(threadpool); +/* + * Old thread pooling API + */ + #define OLD_WORKER_TIMEOUT 30000 /* 30 seconds */ #define EXPIRE_NEVER (~(ULONGLONG)0) #define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */ @@ -127,6 +132,65 @@ struct timer_queue HANDLE thread; }; +/* + * Object-oriented thread pooling API + */ + +#define THREADPOOL_WORKER_TIMEOUT 5000 + +/* internal threadpool representation */ +struct threadpool +{ + LONG refcount; + LONG objcount; + BOOL shutdown; + CRITICAL_SECTION cs; + /* pool of work items, locked via .cs */ + struct list pool; + RTL_CONDITION_VARIABLE update_event; + /* information about worker threads, locked via .cs */ + int max_workers; + int min_workers; + int num_workers; + int num_busy_workers; +}; + +enum threadpool_objtype +{ + TP_OBJECT_TYPE_SIMPLE +}; + +/* internal threadpool object representation */ +struct threadpool_object +{ + LONG refcount; + BOOL shutdown; + /* read-only information */ + enum threadpool_objtype type; + struct threadpool *pool; + PVOID userdata; + /* information about the pool, locked via .pool->cs */ + struct list pool_entry; + LONG num_pending_callbacks; + LONG num_running_callbacks; + /* arguments for callback */ + union + { + struct + { + PTP_SIMPLE_CALLBACK callback; + } simple; + } u; +}; + +static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool ) +{ + return (struct threadpool *)pool; +} + +static void CALLBACK threadpool_worker_proc( void *param ); +static struct threadpool *default_threadpool = NULL; + static inline LONG interlocked_inc( PLONG dest ) { return interlocked_xchg_add( dest, 1 ) + 1; @@ -1044,3 +1108,393 @@ NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer, return status; } + +/*********************************************************************** + * tp_threadpool_alloc (internal) + * + * Allocates a new threadpool object. + */ +static NTSTATUS tp_threadpool_alloc( struct threadpool **out ) +{ + struct threadpool *pool; + + pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) ); + if (!pool) + return STATUS_NO_MEMORY; + + pool->refcount = 1; + pool->objcount = 0; + pool->shutdown = FALSE; + + RtlInitializeCriticalSection( &pool->cs ); + pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs"); + + list_init( &pool->pool ); + RtlInitializeConditionVariable( &pool->update_event ); + + pool->max_workers = 500; + pool->min_workers = 0; + pool->num_workers = 0; + pool->num_busy_workers = 0; + + TRACE( "allocated threadpool %p\n", pool ); + + *out = pool; + return STATUS_SUCCESS; +} + +/*********************************************************************** + * tp_threadpool_shutdown (internal) + * + * Prepares the shutdown of a threadpool object and notifies all worker + * threads to terminate (after all remaining work items have been + * processed). + */ +static void tp_threadpool_shutdown( struct threadpool *pool ) +{ + assert( pool != default_threadpool ); + + pool->shutdown = TRUE; + RtlWakeAllConditionVariable( &pool->update_event ); +} + +/*********************************************************************** + * tp_threadpool_release (internal) + * + * Releases a reference to a threadpool object. + */ +static BOOL tp_threadpool_release( struct threadpool *pool ) +{ + if (interlocked_dec( &pool->refcount )) + return FALSE; + + TRACE( "destroying threadpool %p\n", pool ); + + assert( pool->shutdown ); + assert( !pool->objcount ); + assert( list_empty( &pool->pool ) ); + + pool->cs.DebugInfo->Spare[0] = 0; + RtlDeleteCriticalSection( &pool->cs ); + + RtlFreeHeap( GetProcessHeap(), 0, pool ); + return TRUE; +} + +/*********************************************************************** + * tp_threadpool_lock (internal) + * + * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON + * block. When the lock is acquired successfully, it is guaranteed that + * there is at least one worker thread to process tasks. + */ +static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment ) +{ + struct threadpool *pool = NULL; + NTSTATUS status = STATUS_SUCCESS; + + if (environment) + pool = (struct threadpool *)environment->Pool; + + if (!pool) + { + if (!default_threadpool) + { + status = tp_threadpool_alloc( &pool ); + if (status != STATUS_SUCCESS) + return status; + + if (interlocked_cmpxchg_ptr( (void *)&default_threadpool, pool, NULL ) != NULL) + { + tp_threadpool_shutdown( pool ); + tp_threadpool_release( pool ); + } + } + + pool = default_threadpool; + } + + RtlEnterCriticalSection( &pool->cs ); + + /* Make sure that the threadpool has at least one thread. */ + if (!pool->num_workers) + { + HANDLE thread; + status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0, + threadpool_worker_proc, pool, &thread, NULL ); + if (status == STATUS_SUCCESS) + { + interlocked_inc( &pool->refcount ); + pool->num_workers++; + NtClose( thread ); + } + } + + /* Keep a reference, and increment objcount to ensure that the + * last thread doesn't terminate. */ + if (status == STATUS_SUCCESS) + { + interlocked_inc( &pool->refcount ); + pool->objcount++; + } + + RtlLeaveCriticalSection( &pool->cs ); + + if (status != STATUS_SUCCESS) + return status; + + *out = pool; + return STATUS_SUCCESS; +} + +/*********************************************************************** + * tp_threadpool_unlock (internal) + * + * Releases a lock on a threadpool. + */ +static void tp_threadpool_unlock( struct threadpool *pool ) +{ + RtlEnterCriticalSection( &pool->cs ); + pool->objcount--; + RtlLeaveCriticalSection( &pool->cs ); + tp_threadpool_release( pool ); +} + +/*********************************************************************** + * tp_object_initialize (internal) + * + * Initializes members of a threadpool object. + */ +static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool, + PVOID userdata, TP_CALLBACK_ENVIRON *environment ) +{ + object->refcount = 1; + object->shutdown = FALSE; + + object->pool = pool; + object->userdata = userdata; + + memset( &object->pool_entry, 0, sizeof(object->pool_entry) ); + object->num_pending_callbacks = 0; + object->num_running_callbacks = 0; + + if (environment) + FIXME( "environment not implemented yet\n" ); + + TRACE( "allocated object %p of type %u\n", object, object->type ); +} + +/*********************************************************************** + * tp_object_submit (internal) + * + * Submits a threadpool object to the associcated threadpool. This + * function has to be VOID because TpPostWork can never fail on Windows. + */ +static void tp_object_submit( struct threadpool_object *object ) +{ + struct threadpool *pool = object->pool; + NTSTATUS status = STATUS_UNSUCCESSFUL; + + assert( !object->shutdown ); + assert( !pool->shutdown ); + + RtlEnterCriticalSection( &pool->cs ); + + /* Start new worker threads if required. */ + if (pool->num_busy_workers >= pool->num_workers && + pool->num_workers < pool->max_workers) + { + HANDLE thread; + status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0, + threadpool_worker_proc, pool, &thread, NULL ); + if (status == STATUS_SUCCESS) + { + interlocked_inc( &pool->refcount ); + pool->num_workers++; + NtClose( thread ); + } + } + + /* Queue work item and increment refcount. */ + interlocked_inc( &object->refcount ); + if (!object->num_pending_callbacks++) + list_add_tail( &pool->pool, &object->pool_entry ); + + /* No new thread started - wake up one existing thread. */ + if (status != STATUS_SUCCESS) + { + assert( pool->num_workers > 0 ); + RtlWakeConditionVariable( &pool->update_event ); + } + + RtlLeaveCriticalSection( &pool->cs ); +} + +/*********************************************************************** + * tp_object_shutdown (internal) + * + * Marks a threadpool object for shutdown (which means that no further + * tasks can be submitted). + */ +static void tp_object_shutdown( struct threadpool_object *object ) +{ + object->shutdown = TRUE; +} + +/*********************************************************************** + * tp_object_release (internal) + * + * Releases a reference to a threadpool object. + */ +static BOOL tp_object_release( struct threadpool_object *object ) +{ + if (interlocked_dec( &object->refcount )) + return FALSE; + + TRACE( "destroying object %p of type %u\n", object, object->type ); + + assert( object->shutdown ); + assert( !object->num_pending_callbacks ); + assert( !object->num_running_callbacks ); + + tp_threadpool_unlock( object->pool ); + + RtlFreeHeap( GetProcessHeap(), 0, object ); + return TRUE; +} + +/*********************************************************************** + * threadpool_worker_proc (internal) + */ +static void CALLBACK threadpool_worker_proc( void *param ) +{ + struct threadpool *pool = param; + LARGE_INTEGER timeout; + struct list *ptr; + + TRACE( "starting worker thread for pool %p\n", pool ); + + RtlEnterCriticalSection( &pool->cs ); + for (;;) + { + while ((ptr = list_head( &pool->pool ))) + { + struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry ); + assert( object->num_pending_callbacks > 0 ); + + /* If further pending callbacks are queued, move the work item to + * the end of the pool list. Otherwise remove it from the pool. */ + list_remove( &object->pool_entry ); + if (--object->num_pending_callbacks) + list_add_tail( &pool->pool, &object->pool_entry ); + + /* Leave critical section and do the actual callback. */ + object->num_running_callbacks++; + pool->num_busy_workers++; + RtlLeaveCriticalSection( &pool->cs ); + + switch (object->type) + { + case TP_OBJECT_TYPE_SIMPLE: + { + TRACE( "executing simple callback %p(NULL, %p)\n", + object->u.simple.callback, object->userdata ); + object->u.simple.callback( NULL, object->userdata ); + TRACE( "callback %p returned\n", object->u.simple.callback ); + break; + } + + default: + assert(0); + break; + } + + RtlEnterCriticalSection( &pool->cs ); + pool->num_busy_workers--; + object->num_running_callbacks--; + tp_object_release( object ); + } + + /* Shutdown worker thread if requested. */ + if (pool->shutdown) + break; + + /* Wait for new tasks or until the timeout expires. A thread only terminates + * when no new tasks are available, and the number of threads can be + * decreased without violating the min_workers limit. An exception is when + * min_workers == 0, then objcount is used to detect if the last thread + * can be terminated. */ + timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000; + if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT && + !list_head( &pool->pool ) && (pool->num_workers > max( pool->min_workers, 1 ) || + (!pool->min_workers && !pool->objcount))) + { + break; + } + } + pool->num_workers--; + RtlLeaveCriticalSection( &pool->cs ); + + TRACE( "terminating worker thread for pool %p\n", pool ); + tp_threadpool_release( pool ); +} + +/*********************************************************************** + * TpAllocPool (NTDLL.@) + */ +NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved ) +{ + TRACE( "%p %p\n", out, reserved ); + + if (reserved) + FIXME( "reserved argument is nonzero (%p)", reserved ); + + return tp_threadpool_alloc( (struct threadpool **)out ); +} + +/*********************************************************************** + * TpReleasePool (NTDLL.@) + */ +VOID WINAPI TpReleasePool( TP_POOL *pool ) +{ + struct threadpool *this = impl_from_TP_POOL( pool ); + + TRACE( "%p\n", pool ); + + tp_threadpool_shutdown( this ); + tp_threadpool_release( this ); +} + +/*********************************************************************** + * TpSimpleTryPost (NTDLL.@) + */ +NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata, + TP_CALLBACK_ENVIRON *environment ) +{ + struct threadpool_object *object; + struct threadpool *pool; + NTSTATUS status; + + TRACE( "%p %p %p\n", callback, userdata, environment ); + + object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) ); + if (!object) + return STATUS_NO_MEMORY; + + status = tp_threadpool_lock( &pool, environment ); + if (status) + { + RtlFreeHeap( GetProcessHeap(), 0, object ); + return status; + } + + object->type = TP_OBJECT_TYPE_SIMPLE; + object->u.simple.callback = callback; + tp_object_initialize( object, pool, userdata, environment ); + + tp_object_submit( object ); + + tp_object_shutdown( object ); + tp_object_release( object ); + return STATUS_SUCCESS; +}