ntdll: Re-implement RtlRegisterWait using TpSetWait.

This adds several internal flags to TP_WAIT object to support the
implementation:

* WT_EXECUTEONLYONCE: waits are re-queued unless it is set.

* WT_EXECUTEINWAITTHREAD: call the callback in the wait thread when set.

* WT_EXECUTEINIOTHREAD: call alertable NtWaitForMultipleObjects in wait
  thread when set, as well the callback in the wait thread, as for
  WT_EXECUTEINWAITTHREAD. The worker threads use non-alertable waits
  otherwise.

Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=47843
Signed-off-by: Rémi Bernon <rbernon@codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
Rémi Bernon 2021-02-11 10:53:49 +01:00 committed by Alexandre Julliard
parent 304d811924
commit 26ee9134d5
3 changed files with 100 additions and 171 deletions

View File

@ -1367,7 +1367,6 @@ static void CALLBACK waitthread_test_function(PVOID p, BOOLEAN TimerOrWaitFired)
SetEvent(param->trigger_event);
ret = WaitForSingleObject(param->wait_event, 100);
todo_wine
ok(ret == WAIT_TIMEOUT, "wait should have timed out\n");
SetEvent(param->complete_event);
}

View File

@ -319,7 +319,7 @@ static void test_RtlRegisterWait(void)
result = WaitForSingleObject(semaphores[0], 200);
ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
ok(info.userdata == 1, "expected info.userdata = 1, got %u\n", info.userdata);
todo_wine ok(info.threadid == threadid, "unexpected different wait thread id %x\n", info.threadid);
ok(info.threadid == threadid, "unexpected different wait thread id %x\n", info.threadid);
result = WaitForSingleObject(semaphores[1], 0);
ok(result == WAIT_TIMEOUT, "WaitForSingleObject returned %u\n", result);
Sleep(50);
@ -350,7 +350,7 @@ static void test_RtlRegisterWait(void)
result = WaitForSingleObject(semaphores[0], 200);
ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
ok(info.userdata == 1, "expected info.userdata = 1, got %u\n", info.userdata);
todo_wine ok(info.threadid == threadid, "unexpected different wait thread id %x\n", info.threadid);
ok(info.threadid == threadid, "unexpected different wait thread id %x\n", info.threadid);
result = WaitForSingleObject(semaphores[1], 0);
ok(result == WAIT_TIMEOUT, "WaitForSingleObject returned %u\n", result);
Sleep(50);
@ -433,7 +433,6 @@ static void test_RtlRegisterWait(void)
ok(!status, "RtlDeregisterWaitEx failed with status %x\n", status);
ok(info.userdata == 0, "expected info.userdata = 0, got %u\n", info.userdata);
result = WaitForSingleObject(event, 200);
todo_wine
ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
/* test RtlDeregisterWaitEx after wait expired */
@ -470,7 +469,6 @@ static void test_RtlRegisterWait(void)
ok(!status, "RtlDeregisterWaitEx failed with status %x\n", status);
ok(info.userdata == 0x10000, "expected info.userdata = 0x10000, got %u\n", info.userdata);
result = WaitForSingleObject(event, 200);
todo_wine
ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
/* test RtlDeregisterWaitEx while callback is running */

View File

@ -68,19 +68,6 @@ static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
};
struct wait_work_item
{
HANDLE Object;
HANDLE CancelEvent;
WAITORTIMERCALLBACK Callback;
PVOID Context;
ULONG Milliseconds;
ULONG Flags;
HANDLE CompletionEvent;
LONG DeleteCount;
int CallbackInProgress;
};
struct timer_queue;
struct queue_timer
{
@ -170,6 +157,7 @@ struct threadpool_object
struct list pool_entry;
RTL_CONDITION_VARIABLE finished_event;
RTL_CONDITION_VARIABLE group_finished_event;
HANDLE completed_event;
LONG num_pending_callbacks;
LONG num_running_callbacks;
LONG num_associated_callbacks;
@ -206,6 +194,8 @@ struct threadpool_object
struct list wait_entry;
ULONGLONG timeout;
HANDLE handle;
DWORD flags;
RTL_WAITORTIMERCALLBACKFUNC rtl_callback;
} wait;
struct
{
@ -302,6 +292,7 @@ struct waitqueue_bucket
struct list reserved;
struct list waiting;
HANDLE update_event;
BOOL alertable;
};
/* global I/O completion queue object */
@ -372,7 +363,7 @@ static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CAL
static void CALLBACK threadpool_worker_proc( void *param );
static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
static void tp_object_execute( struct threadpool_object *object );
static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread );
static void tp_object_prepare_shutdown( struct threadpool_object *object );
static BOOL tp_object_release( struct threadpool_object *object );
static struct threadpool *default_threadpool = NULL;
@ -1266,9 +1257,21 @@ static void CALLBACK waitqueue_thread_proc( void *param )
if (wait->u.wait.timeout <= now.QuadPart)
{
/* Wait object timed out. */
list_remove( &wait->u.wait.wait_entry );
list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
tp_object_submit( wait, FALSE );
if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
{
list_remove( &wait->u.wait.wait_entry );
list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
}
if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
{
InterlockedIncrement( &wait->refcount );
wait->num_pending_callbacks++;
RtlEnterCriticalSection( &wait->pool->cs );
tp_object_execute( wait, TRUE );
RtlLeaveCriticalSection( &wait->pool->cs );
tp_object_release( wait );
}
else tp_object_submit( wait, FALSE );
}
else
{
@ -1290,7 +1293,7 @@ static void CALLBACK waitqueue_thread_proc( void *param )
assert( num_handles == 0 );
RtlLeaveCriticalSection( &waitqueue.cs );
timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, FALSE, &timeout );
status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, bucket->alertable, &timeout );
RtlEnterCriticalSection( &waitqueue.cs );
if (status == STATUS_TIMEOUT && !bucket->objcount)
@ -1300,7 +1303,7 @@ static void CALLBACK waitqueue_thread_proc( void *param )
{
handles[num_handles] = bucket->update_event;
RtlLeaveCriticalSection( &waitqueue.cs );
status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, FALSE, &timeout );
status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, bucket->alertable, &timeout );
RtlEnterCriticalSection( &waitqueue.cs );
if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
@ -1311,9 +1314,20 @@ static void CALLBACK waitqueue_thread_proc( void *param )
{
/* Wait object signaled. */
assert( wait->u.wait.bucket == bucket );
list_remove( &wait->u.wait.wait_entry );
list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
tp_object_submit( wait, TRUE );
if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
{
list_remove( &wait->u.wait.wait_entry );
list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
}
if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD)))
{
wait->u.wait.signaled++;
wait->num_pending_callbacks++;
RtlEnterCriticalSection( &wait->pool->cs );
tp_object_execute( wait, TRUE );
RtlLeaveCriticalSection( &wait->pool->cs );
}
else tp_object_submit( wait, TRUE );
}
else
WARN("wait object %p triggered while object was destroyed\n", wait);
@ -1335,7 +1349,7 @@ static void CALLBACK waitqueue_thread_proc( void *param )
struct waitqueue_bucket *other_bucket;
LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
{
if (other_bucket != bucket && other_bucket->objcount &&
if (other_bucket != bucket && other_bucket->objcount && other_bucket->alertable == bucket->alertable &&
other_bucket->objcount + bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
{
other_bucket->objcount += bucket->objcount;
@ -1395,6 +1409,7 @@ static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
struct waitqueue_bucket *bucket;
NTSTATUS status;
HANDLE thread;
BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0;
assert( wait->type == TP_OBJECT_TYPE_WAIT );
wait->u.wait.signaled = 0;
@ -1408,7 +1423,7 @@ static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
/* Try to assign to existing bucket if possible. */
LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
{
if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS)
if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS && bucket->alertable == alertable)
{
list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
wait->u.wait.bucket = bucket;
@ -1428,6 +1443,7 @@ static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
}
bucket->objcount = 0;
bucket->alertable = alertable;
list_init( &bucket->reserved );
list_init( &bucket->waiting );
@ -1860,6 +1876,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
RtlInitializeConditionVariable( &object->finished_event );
RtlInitializeConditionVariable( &object->group_finished_event );
object->completed_event = NULL;
object->num_pending_callbacks = 0;
object->num_running_callbacks = 0;
object->num_associated_callbacks = 0;
@ -2077,6 +2094,9 @@ static BOOL tp_object_release( struct threadpool_object *object )
if (object->race_dll)
LdrUnloadDll( object->race_dll );
if (object->completed_event && object->completed_event != INVALID_HANDLE_VALUE)
NtSetEvent( object->completed_event, NULL );
RtlFreeHeap( GetProcessHeap(), 0, object );
return TRUE;
}
@ -2101,7 +2121,7 @@ static struct list *threadpool_get_next_item( const struct threadpool *pool )
* Executes a threadpool object callback, object->pool->cs has to be
* held.
*/
static void tp_object_execute( struct threadpool_object *object )
static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread )
{
TP_CALLBACK_INSTANCE *callback_instance;
struct threadpool_instance instance;
@ -2129,6 +2149,7 @@ static void tp_object_execute( struct threadpool_object *object )
object->num_associated_callbacks++;
object->num_running_callbacks++;
RtlLeaveCriticalSection( &pool->cs );
if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs );
/* Initialize threadpool instance struct. */
callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
@ -2232,6 +2253,7 @@ static void tp_object_execute( struct threadpool_object *object )
}
skip_cleanup:
if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs );
RtlEnterCriticalSection( &pool->cs );
/* Simple callbacks are automatically shutdown after execution. */
@ -2278,7 +2300,7 @@ static void CALLBACK threadpool_worker_proc( void *param )
if (object->num_pending_callbacks > 1)
tp_object_prio_queue( object );
tp_object_execute( object );
tp_object_execute( object, FALSE );
assert(pool->num_busy_workers);
pool->num_busy_workers--;
@ -2418,18 +2440,13 @@ NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID
return STATUS_SUCCESS;
}
/***********************************************************************
* TpAllocWait (NTDLL.@)
*/
NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
TP_CALLBACK_ENVIRON *environment )
static NTSTATUS tp_alloc_wait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
TP_CALLBACK_ENVIRON *environment, DWORD flags )
{
struct threadpool_object *object;
struct threadpool *pool;
NTSTATUS status;
TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
if (!object)
return STATUS_NO_MEMORY;
@ -2443,6 +2460,7 @@ NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID us
object->type = TP_OBJECT_TYPE_WAIT;
object->u.wait.callback = callback;
object->u.wait.flags = flags;
status = tp_waitqueue_lock( object );
if (status)
@ -2458,6 +2476,16 @@ NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID us
return STATUS_SUCCESS;
}
/***********************************************************************
* TpAllocWait (NTDLL.@)
*/
NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
TP_CALLBACK_ENVIRON *environment )
{
TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE );
}
/***********************************************************************
* TpAllocWork (NTDLL.@)
*/
@ -3143,71 +3171,10 @@ NTSTATUS WINAPI TpQueryPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORM
return STATUS_SUCCESS;
}
static void delete_wait_work_item(struct wait_work_item *wait_work_item)
static void CALLBACK rtl_wait_callback( TP_CALLBACK_INSTANCE *instance, void *userdata, TP_WAIT *wait, TP_WAIT_RESULT result )
{
NtClose( wait_work_item->CancelEvent );
RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
}
static DWORD CALLBACK wait_thread_proc(LPVOID Arg)
{
struct wait_work_item *wait_work_item = Arg;
NTSTATUS status;
BOOLEAN alertable = (wait_work_item->Flags & WT_EXECUTEINIOTHREAD) != 0;
HANDLE handles[2] = { wait_work_item->Object, wait_work_item->CancelEvent };
LARGE_INTEGER timeout;
HANDLE completion_event;
TRACE("\n");
while (TRUE)
{
status = NtWaitForMultipleObjects( 2, handles, TRUE, alertable,
get_nt_timeout( &timeout, wait_work_item->Milliseconds ) );
if (status == STATUS_WAIT_0 || status == STATUS_TIMEOUT)
{
BOOLEAN TimerOrWaitFired;
if (status == STATUS_WAIT_0)
{
TRACE( "object %p signaled, calling callback %p with context %p\n",
wait_work_item->Object, wait_work_item->Callback,
wait_work_item->Context );
TimerOrWaitFired = FALSE;
}
else
{
TRACE( "wait for object %p timed out, calling callback %p with context %p\n",
wait_work_item->Object, wait_work_item->Callback,
wait_work_item->Context );
TimerOrWaitFired = TRUE;
}
InterlockedExchange( &wait_work_item->CallbackInProgress, TRUE );
if (wait_work_item->CompletionEvent)
{
TRACE( "Work has been canceled.\n" );
break;
}
wait_work_item->Callback( wait_work_item->Context, TimerOrWaitFired );
InterlockedExchange( &wait_work_item->CallbackInProgress, FALSE );
if (wait_work_item->Flags & WT_EXECUTEONLYONCE)
break;
}
else if (status != STATUS_USER_APC)
break;
}
if (InterlockedIncrement( &wait_work_item->DeleteCount ) == 2 )
{
completion_event = wait_work_item->CompletionEvent;
delete_wait_work_item( wait_work_item );
if (completion_event && completion_event != INVALID_HANDLE_VALUE)
NtSetEvent( completion_event, NULL );
}
return 0;
struct threadpool_object *object = impl_from_TP_WAIT(wait);
object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 );
}
/***********************************************************************
@ -3235,46 +3202,34 @@ static DWORD CALLBACK wait_thread_proc(LPVOID Arg)
*|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
*|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
*/
NTSTATUS WINAPI RtlRegisterWait(PHANDLE NewWaitObject, HANDLE Object,
RTL_WAITORTIMERCALLBACKFUNC Callback,
PVOID Context, ULONG Milliseconds, ULONG Flags)
NTSTATUS WINAPI RtlRegisterWait( HANDLE *out, HANDLE handle, RTL_WAITORTIMERCALLBACKFUNC callback,
void *context, ULONG milliseconds, ULONG flags )
{
struct wait_work_item *wait_work_item;
struct threadpool_object *object;
TP_CALLBACK_ENVIRON environment;
LARGE_INTEGER timeout;
NTSTATUS status;
TP_WAIT *wait;
TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject, Object, Callback, Context, Milliseconds, Flags );
TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %u, flags %x\n",
out, handle, callback, context, milliseconds, flags );
wait_work_item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item) );
if (!wait_work_item)
return STATUS_NO_MEMORY;
memset( &environment, 0, sizeof(environment) );
environment.Version = 1;
environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
wait_work_item->Object = Object;
wait_work_item->Callback = Callback;
wait_work_item->Context = Context;
wait_work_item->Milliseconds = Milliseconds;
wait_work_item->Flags = Flags;
wait_work_item->CallbackInProgress = FALSE;
wait_work_item->DeleteCount = 0;
wait_work_item->CompletionEvent = NULL;
status = NtCreateEvent( &wait_work_item->CancelEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE );
if (status != STATUS_SUCCESS)
{
RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
flags &= (WT_EXECUTEONLYONCE | WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD);
if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment, flags )))
return status;
}
Flags = Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD |
WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION);
status = RtlQueueWorkItem( wait_thread_proc, wait_work_item, Flags );
if (status != STATUS_SUCCESS)
{
delete_wait_work_item( wait_work_item );
return status;
}
object = impl_from_TP_WAIT(wait);
object->u.wait.rtl_callback = callback;
*NewWaitObject = wait_work_item;
return status;
TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds ) );
*out = object;
return STATUS_SUCCESS;
}
/***********************************************************************
@ -3290,54 +3245,31 @@ NTSTATUS WINAPI RtlRegisterWait(PHANDLE NewWaitObject, HANDLE Object,
* Success: STATUS_SUCCESS.
* Failure: Any NTSTATUS code.
*/
NTSTATUS WINAPI RtlDeregisterWaitEx(HANDLE WaitHandle, HANDLE CompletionEvent)
NTSTATUS WINAPI RtlDeregisterWaitEx( HANDLE handle, HANDLE event )
{
struct wait_work_item *wait_work_item = WaitHandle;
struct threadpool_object *object = handle;
NTSTATUS status;
HANDLE LocalEvent = NULL;
int CallbackInProgress;
TRACE( "(%p %p)\n", WaitHandle, CompletionEvent );
TRACE( "handle %p, event %p\n", handle, event );
if (WaitHandle == NULL)
return STATUS_INVALID_HANDLE;
if (!object) return STATUS_INVALID_HANDLE;
InterlockedExchangePointer( &wait_work_item->CompletionEvent, INVALID_HANDLE_VALUE );
CallbackInProgress = wait_work_item->CallbackInProgress;
TRACE( "callback in progress %u\n", CallbackInProgress );
if (CompletionEvent == INVALID_HANDLE_VALUE || !CallbackInProgress)
{
status = NtCreateEvent( &LocalEvent, EVENT_ALL_ACCESS, NULL, NotificationEvent, FALSE );
if (status != STATUS_SUCCESS)
return status;
InterlockedExchangePointer( &wait_work_item->CompletionEvent, LocalEvent );
}
else if (CompletionEvent != NULL)
{
InterlockedExchangePointer( &wait_work_item->CompletionEvent, CompletionEvent );
}
TpSetWait( (TP_WAIT *)object, NULL, NULL );
NtSetEvent( wait_work_item->CancelEvent, NULL );
if (InterlockedIncrement( &wait_work_item->DeleteCount ) == 2 )
{
status = STATUS_SUCCESS;
delete_wait_work_item( wait_work_item );
}
else if (LocalEvent)
{
TRACE( "Waiting for completion event\n" );
NtWaitForSingleObject( LocalEvent, FALSE, NULL );
status = STATUS_SUCCESS;
}
if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE );
else
{
status = STATUS_PENDING;
assert( object->completed_event == NULL );
object->completed_event = event;
}
if (LocalEvent)
NtClose( LocalEvent );
RtlEnterCriticalSection( &object->pool->cs );
if (object->num_pending_callbacks + object->num_running_callbacks
+ object->num_associated_callbacks) status = STATUS_PENDING;
else status = STATUS_SUCCESS;
RtlLeaveCriticalSection( &object->pool->cs );
TpReleaseWait( (TP_WAIT *)object );
return status;
}