rtworkq: Initialize queue with specific set of functionality.

Signed-off-by: Nikolay Sivov <nsivov@codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
Nikolay Sivov 2020-02-11 12:19:07 +03:00 committed by Alexandre Julliard
parent b8805c1e7d
commit 9b1fac2dfd
1 changed files with 108 additions and 54 deletions

View File

@ -16,6 +16,8 @@
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
*/
#include <assert.h>
#define COBJMACROS
#define NONAMELESSUNION
@ -136,13 +138,25 @@ static const TP_CALLBACK_PRIORITY priorities[] =
TP_CALLBACK_PRIORITY_LOW,
};
struct queue;
struct queue_desc;
struct queue_ops
{
HRESULT (*init)(const struct queue_desc *desc, struct queue *queue);
BOOL (*shutdown)(struct queue *queue);
void (*submit)(struct queue *queue, struct work_item *item);
};
struct queue_desc
{
RTWQ_WORKQUEUE_TYPE queue_type;
const struct queue_ops *ops;
};
struct queue
{
const struct queue_ops *ops;
TP_POOL *pool;
TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)];
CRITICAL_SECTION cs;
@ -171,6 +185,88 @@ static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *gr
{
}
static HRESULT pool_queue_init(const struct queue_desc *desc, struct queue *queue)
{
TP_CALLBACK_ENVIRON_V3 env;
unsigned int max_thread, i;
queue->pool = CreateThreadpool(NULL);
memset(&env, 0, sizeof(env));
env.Version = 3;
env.Size = sizeof(env);
env.Pool = queue->pool;
env.CleanupGroup = CreateThreadpoolCleanupGroup();
env.CleanupGroupCancelCallback = standard_queue_cleanup_callback;
env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL;
for (i = 0; i < ARRAY_SIZE(queue->envs); ++i)
{
queue->envs[i] = env;
queue->envs[i].CallbackPriority = priorities[i];
}
list_init(&queue->pending_items);
InitializeCriticalSection(&queue->cs);
max_thread = (desc->queue_type == RTWQ_STANDARD_WORKQUEUE || desc->queue_type == RTWQ_WINDOW_WORKQUEUE) ? 1 : 4;
SetThreadpoolThreadMinimum(queue->pool, 1);
SetThreadpoolThreadMaximum(queue->pool, max_thread);
if (desc->queue_type == RTWQ_WINDOW_WORKQUEUE)
FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n");
return S_OK;
}
static BOOL pool_queue_shutdown(struct queue *queue)
{
if (!queue->pool)
return FALSE;
CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL);
CloseThreadpool(queue->pool);
queue->pool = NULL;
return TRUE;
}
static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work)
{
struct work_item *item = context;
RTWQASYNCRESULT *result = (RTWQASYNCRESULT *)item->result;
TRACE("result object %p.\n", result);
IRtwqAsyncCallback_Invoke(result->pCallback, item->result);
IUnknown_Release(&item->IUnknown_iface);
}
static void pool_queue_submit(struct queue *queue, struct work_item *item)
{
TP_CALLBACK_PRIORITY callback_priority;
TP_WORK *work_object;
if (item->priority == 0)
callback_priority = TP_CALLBACK_PRIORITY_NORMAL;
else if (item->priority < 0)
callback_priority = TP_CALLBACK_PRIORITY_LOW;
else
callback_priority = TP_CALLBACK_PRIORITY_HIGH;
work_object = CreateThreadpoolWork(standard_queue_worker, item,
(TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]);
SubmitThreadpoolWork(work_object);
TRACE("dispatched %p.\n", item->result);
}
static const struct queue_ops pool_queue_ops =
{
pool_queue_init,
pool_queue_shutdown,
pool_queue_submit,
};
static HRESULT WINAPI work_item_QueryInterface(IUnknown *iface, REFIID riid, void **obj)
{
if (IsEqualIID(riid, &IID_IUnknown))
@ -235,33 +331,14 @@ static struct work_item * alloc_work_item(struct queue *queue, LONG priority, IR
static void init_work_queue(const struct queue_desc *desc, struct queue *queue)
{
TP_CALLBACK_ENVIRON_V3 env;
unsigned int max_thread, i;
assert(desc->ops != NULL);
queue->pool = CreateThreadpool(NULL);
memset(&env, 0, sizeof(env));
env.Version = 3;
env.Size = sizeof(env);
env.Pool = queue->pool;
env.CleanupGroup = CreateThreadpoolCleanupGroup();
env.CleanupGroupCancelCallback = standard_queue_cleanup_callback;
env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL;
for (i = 0; i < ARRAY_SIZE(queue->envs); ++i)
queue->ops = desc->ops;
if (SUCCEEDED(queue->ops->init(desc, queue)))
{
queue->envs[i] = env;
queue->envs[i].CallbackPriority = priorities[i];
list_init(&queue->pending_items);
InitializeCriticalSection(&queue->cs);
}
list_init(&queue->pending_items);
InitializeCriticalSection(&queue->cs);
max_thread = (desc->queue_type == RTWQ_STANDARD_WORKQUEUE || desc->queue_type == RTWQ_WINDOW_WORKQUEUE) ? 1 : 4;
SetThreadpoolThreadMinimum(queue->pool, 1);
SetThreadpoolThreadMaximum(queue->pool, max_thread);
if (desc->queue_type == RTWQ_WINDOW_WORKQUEUE)
FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n");
}
static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
@ -297,6 +374,7 @@ static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
}
desc.queue_type = queue_type;
desc.ops = &pool_queue_ops;
init_work_queue(&desc, queue);
LeaveCriticalSection(&queues_section);
*ret = queue;
@ -333,13 +411,9 @@ static void shutdown_queue(struct queue *queue)
{
struct work_item *item, *item2;
if (!queue->pool)
if (!queue->ops || !queue->ops->shutdown(queue))
return;
CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL);
CloseThreadpool(queue->pool);
queue->pool = NULL;
EnterCriticalSection(&queue->cs);
LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry)
{
@ -349,6 +423,8 @@ static void shutdown_queue(struct queue *queue)
LeaveCriticalSection(&queue->cs);
DeleteCriticalSection(&queue->cs);
memset(queue, 0, sizeof(*queue));
}
static HRESULT unlock_user_queue(DWORD queue)
@ -376,38 +452,14 @@ static HRESULT unlock_user_queue(DWORD queue)
return hr;
}
static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work)
{
struct work_item *item = context;
RTWQASYNCRESULT *result = (RTWQASYNCRESULT *)item->result;
TRACE("result object %p.\n", result);
IRtwqAsyncCallback_Invoke(result->pCallback, item->result);
IUnknown_Release(&item->IUnknown_iface);
}
static HRESULT queue_submit_item(struct queue *queue, LONG priority, IRtwqAsyncResult *result)
{
TP_CALLBACK_PRIORITY callback_priority;
struct work_item *item;
TP_WORK *work_object;
if (!(item = alloc_work_item(queue, priority, result)))
return E_OUTOFMEMORY;
if (priority == 0)
callback_priority = TP_CALLBACK_PRIORITY_NORMAL;
else if (priority < 0)
callback_priority = TP_CALLBACK_PRIORITY_LOW;
else
callback_priority = TP_CALLBACK_PRIORITY_HIGH;
work_object = CreateThreadpoolWork(standard_queue_worker, item,
(TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]);
SubmitThreadpoolWork(work_object);
TRACE("dispatched %p.\n", result);
queue->ops->submit(queue, item);
return S_OK;
}
@ -862,6 +914,7 @@ static void init_system_queues(void)
}
desc.queue_type = RTWQ_STANDARD_WORKQUEUE;
desc.ops = &pool_queue_ops;
init_work_queue(&desc, &system_queues[SYS_QUEUE_STANDARD]);
LeaveCriticalSection(&queues_section);
@ -1114,6 +1167,7 @@ HRESULT WINAPI RtwqAllocateWorkQueue(RTWQ_WORKQUEUE_TYPE queue_type, DWORD *queu
TRACE("%d, %p.\n", queue_type, queue);
desc.queue_type = queue_type;
desc.ops = &pool_queue_ops;
return alloc_user_queue(&desc, queue);
}