ntdll: Introduce new tp_object_execute helper.

To execute a threadpool_object callbacks.

Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=47843
Signed-off-by: Rémi Bernon <rbernon@codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
Rémi Bernon 2021-02-11 10:53:48 +01:00 committed by Alexandre Julliard
parent 3ed3e031ff
commit 304d811924
1 changed files with 160 additions and 143 deletions

View File

@ -372,6 +372,7 @@ static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CAL
static void CALLBACK threadpool_worker_proc( void *param );
static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
static void tp_object_execute( struct threadpool_object *object );
static void tp_object_prepare_shutdown( struct threadpool_object *object );
static BOOL tp_object_release( struct threadpool_object *object );
static struct threadpool *default_threadpool = NULL;
@ -2095,18 +2096,171 @@ static struct list *threadpool_get_next_item( const struct threadpool *pool )
}
/***********************************************************************
* threadpool_worker_proc (internal)
* tp_object_execute (internal)
*
* Executes a threadpool object callback, object->pool->cs has to be
* held.
*/
static void CALLBACK threadpool_worker_proc( void *param )
static void tp_object_execute( struct threadpool_object *object )
{
TP_CALLBACK_INSTANCE *callback_instance;
struct threadpool_instance instance;
struct io_completion completion;
struct threadpool *pool = param;
struct threadpool *pool = object->pool;
TP_WAIT_RESULT wait_result = 0;
NTSTATUS status;
object->num_pending_callbacks--;
/* For wait objects check if they were signaled or have timed out. */
if (object->type == TP_OBJECT_TYPE_WAIT)
{
wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
}
else if (object->type == TP_OBJECT_TYPE_IO)
{
assert( object->u.io.completion_count );
completion = object->u.io.completions[--object->u.io.completion_count];
object->u.io.pending_count--;
}
/* Leave critical section and do the actual callback. */
object->num_associated_callbacks++;
object->num_running_callbacks++;
RtlLeaveCriticalSection( &pool->cs );
/* Initialize threadpool instance struct. */
callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
instance.object = object;
instance.threadid = GetCurrentThreadId();
instance.associated = TRUE;
instance.may_run_long = object->may_run_long;
instance.cleanup.critical_section = NULL;
instance.cleanup.mutex = NULL;
instance.cleanup.semaphore = NULL;
instance.cleanup.semaphore_count = 0;
instance.cleanup.event = NULL;
instance.cleanup.library = NULL;
switch (object->type)
{
case TP_OBJECT_TYPE_SIMPLE:
{
TRACE( "executing simple callback %p(%p, %p)\n",
object->u.simple.callback, callback_instance, object->userdata );
object->u.simple.callback( callback_instance, object->userdata );
TRACE( "callback %p returned\n", object->u.simple.callback );
break;
}
case TP_OBJECT_TYPE_WORK:
{
TRACE( "executing work callback %p(%p, %p, %p)\n",
object->u.work.callback, callback_instance, object->userdata, object );
object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
TRACE( "callback %p returned\n", object->u.work.callback );
break;
}
case TP_OBJECT_TYPE_TIMER:
{
TRACE( "executing timer callback %p(%p, %p, %p)\n",
object->u.timer.callback, callback_instance, object->userdata, object );
object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
TRACE( "callback %p returned\n", object->u.timer.callback );
break;
}
case TP_OBJECT_TYPE_WAIT:
{
TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
TRACE( "callback %p returned\n", object->u.wait.callback );
break;
}
case TP_OBJECT_TYPE_IO:
{
TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n",
object->u.io.callback, callback_instance, object->userdata,
completion.cvalue, &completion.iosb, (TP_IO *)object );
object->u.io.callback( callback_instance, object->userdata,
(void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
TRACE( "callback %p returned\n", object->u.io.callback );
break;
}
default:
assert(0);
break;
}
/* Execute finalization callback. */
if (object->finalization_callback)
{
TRACE( "executing finalization callback %p(%p, %p)\n",
object->finalization_callback, callback_instance, object->userdata );
object->finalization_callback( callback_instance, object->userdata );
TRACE( "callback %p returned\n", object->finalization_callback );
}
/* Execute cleanup tasks. */
if (instance.cleanup.critical_section)
{
RtlLeaveCriticalSection( instance.cleanup.critical_section );
}
if (instance.cleanup.mutex)
{
status = NtReleaseMutant( instance.cleanup.mutex, NULL );
if (status != STATUS_SUCCESS) goto skip_cleanup;
}
if (instance.cleanup.semaphore)
{
status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
if (status != STATUS_SUCCESS) goto skip_cleanup;
}
if (instance.cleanup.event)
{
status = NtSetEvent( instance.cleanup.event, NULL );
if (status != STATUS_SUCCESS) goto skip_cleanup;
}
if (instance.cleanup.library)
{
LdrUnloadDll( instance.cleanup.library );
}
skip_cleanup:
RtlEnterCriticalSection( &pool->cs );
/* Simple callbacks are automatically shutdown after execution. */
if (object->type == TP_OBJECT_TYPE_SIMPLE)
{
tp_object_prepare_shutdown( object );
object->shutdown = TRUE;
}
object->num_running_callbacks--;
if (object_is_finished( object, TRUE ))
RtlWakeAllConditionVariable( &object->group_finished_event );
if (instance.associated)
{
object->num_associated_callbacks--;
if (object_is_finished( object, FALSE ))
RtlWakeAllConditionVariable( &object->finished_event );
}
}
/***********************************************************************
* threadpool_worker_proc (internal)
*/
static void CALLBACK threadpool_worker_proc( void *param )
{
struct threadpool *pool = param;
LARGE_INTEGER timeout;
struct list *ptr;
NTSTATUS status;
TRACE( "starting worker thread for pool %p\n", pool );
@ -2121,151 +2275,14 @@ static void CALLBACK threadpool_worker_proc( void *param )
/* If further pending callbacks are queued, move the work item to
* the end of the pool list. Otherwise remove it from the pool. */
list_remove( &object->pool_entry );
if (--object->num_pending_callbacks)
if (object->num_pending_callbacks > 1)
tp_object_prio_queue( object );
/* For wait objects check if they were signaled or have timed out. */
if (object->type == TP_OBJECT_TYPE_WAIT)
{
wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
}
else if (object->type == TP_OBJECT_TYPE_IO)
{
assert( object->u.io.completion_count );
completion = object->u.io.completions[--object->u.io.completion_count];
object->u.io.pending_count--;
}
tp_object_execute( object );
/* Leave critical section and do the actual callback. */
object->num_associated_callbacks++;
object->num_running_callbacks++;
RtlLeaveCriticalSection( &pool->cs );
/* Initialize threadpool instance struct. */
callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
instance.object = object;
instance.threadid = GetCurrentThreadId();
instance.associated = TRUE;
instance.may_run_long = object->may_run_long;
instance.cleanup.critical_section = NULL;
instance.cleanup.mutex = NULL;
instance.cleanup.semaphore = NULL;
instance.cleanup.semaphore_count = 0;
instance.cleanup.event = NULL;
instance.cleanup.library = NULL;
switch (object->type)
{
case TP_OBJECT_TYPE_SIMPLE:
{
TRACE( "executing simple callback %p(%p, %p)\n",
object->u.simple.callback, callback_instance, object->userdata );
object->u.simple.callback( callback_instance, object->userdata );
TRACE( "callback %p returned\n", object->u.simple.callback );
break;
}
case TP_OBJECT_TYPE_WORK:
{
TRACE( "executing work callback %p(%p, %p, %p)\n",
object->u.work.callback, callback_instance, object->userdata, object );
object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
TRACE( "callback %p returned\n", object->u.work.callback );
break;
}
case TP_OBJECT_TYPE_TIMER:
{
TRACE( "executing timer callback %p(%p, %p, %p)\n",
object->u.timer.callback, callback_instance, object->userdata, object );
object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
TRACE( "callback %p returned\n", object->u.timer.callback );
break;
}
case TP_OBJECT_TYPE_WAIT:
{
TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
TRACE( "callback %p returned\n", object->u.wait.callback );
break;
}
case TP_OBJECT_TYPE_IO:
{
TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n",
object->u.io.callback, callback_instance, object->userdata,
completion.cvalue, &completion.iosb, (TP_IO *)object );
object->u.io.callback( callback_instance, object->userdata,
(void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
TRACE( "callback %p returned\n", object->u.io.callback );
break;
}
default:
assert(0);
break;
}
/* Execute finalization callback. */
if (object->finalization_callback)
{
TRACE( "executing finalization callback %p(%p, %p)\n",
object->finalization_callback, callback_instance, object->userdata );
object->finalization_callback( callback_instance, object->userdata );
TRACE( "callback %p returned\n", object->finalization_callback );
}
/* Execute cleanup tasks. */
if (instance.cleanup.critical_section)
{
RtlLeaveCriticalSection( instance.cleanup.critical_section );
}
if (instance.cleanup.mutex)
{
status = NtReleaseMutant( instance.cleanup.mutex, NULL );
if (status != STATUS_SUCCESS) goto skip_cleanup;
}
if (instance.cleanup.semaphore)
{
status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
if (status != STATUS_SUCCESS) goto skip_cleanup;
}
if (instance.cleanup.event)
{
status = NtSetEvent( instance.cleanup.event, NULL );
if (status != STATUS_SUCCESS) goto skip_cleanup;
}
if (instance.cleanup.library)
{
LdrUnloadDll( instance.cleanup.library );
}
skip_cleanup:
RtlEnterCriticalSection( &pool->cs );
assert(pool->num_busy_workers);
pool->num_busy_workers--;
/* Simple callbacks are automatically shutdown after execution. */
if (object->type == TP_OBJECT_TYPE_SIMPLE)
{
tp_object_prepare_shutdown( object );
object->shutdown = TRUE;
}
object->num_running_callbacks--;
if (object_is_finished( object, TRUE ))
RtlWakeAllConditionVariable( &object->group_finished_event );
if (instance.associated)
{
object->num_associated_callbacks--;
if (object_is_finished( object, FALSE ))
RtlWakeAllConditionVariable( &object->finished_event );
}
tp_object_release( object );
}