rtworkq: Add RtwqPutWaitingWorkItem().
Signed-off-by: Nikolay Sivov <nsivov@codeweavers.com> Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
parent
9ca02920f1
commit
e9e087fdbb
|
@ -27,6 +27,34 @@
|
|||
|
||||
WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
|
||||
|
||||
#define FIRST_USER_QUEUE_HANDLE 5
|
||||
#define MAX_USER_QUEUE_HANDLES 124
|
||||
|
||||
#define WAIT_ITEM_KEY_MASK (0x82000000)
|
||||
#define SCHEDULED_ITEM_KEY_MASK (0x80000000)
|
||||
|
||||
static LONG next_item_key;
|
||||
|
||||
static RTWQWORKITEM_KEY get_item_key(DWORD mask, DWORD key)
|
||||
{
|
||||
return ((RTWQWORKITEM_KEY)mask << 32) | key;
|
||||
}
|
||||
|
||||
static RTWQWORKITEM_KEY generate_item_key(DWORD mask)
|
||||
{
|
||||
return get_item_key(mask, InterlockedIncrement(&next_item_key));
|
||||
}
|
||||
|
||||
struct queue_handle
|
||||
{
|
||||
void *obj;
|
||||
LONG refcount;
|
||||
WORD generation;
|
||||
};
|
||||
|
||||
static struct queue_handle user_queues[MAX_USER_QUEUE_HANDLES];
|
||||
static struct queue_handle *next_free_user_queue;
|
||||
|
||||
static CRITICAL_SECTION queues_section;
|
||||
static CRITICAL_SECTION_DEBUG queues_critsect_debug =
|
||||
{
|
||||
|
@ -38,6 +66,33 @@ static CRITICAL_SECTION queues_section = { &queues_critsect_debug, -1, 0, 0, 0,
|
|||
|
||||
static LONG platform_lock;
|
||||
|
||||
static struct queue_handle *get_queue_obj(DWORD handle)
|
||||
{
|
||||
unsigned int idx = HIWORD(handle) - FIRST_USER_QUEUE_HANDLE;
|
||||
|
||||
if (idx < MAX_USER_QUEUE_HANDLES && user_queues[idx].refcount)
|
||||
{
|
||||
if (LOWORD(handle) == user_queues[idx].generation)
|
||||
return &user_queues[idx];
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Should be kept in sync with corresponding MFASYNC_CALLBACK_ constants. */
|
||||
enum rtwq_callback_queue_id
|
||||
{
|
||||
RTWQ_CALLBACK_QUEUE_UNDEFINED = 0x00000000,
|
||||
RTWQ_CALLBACK_QUEUE_STANDARD = 0x00000001,
|
||||
RTWQ_CALLBACK_QUEUE_RT = 0x00000002,
|
||||
RTWQ_CALLBACK_QUEUE_IO = 0x00000003,
|
||||
RTWQ_CALLBACK_QUEUE_TIMER = 0x00000004,
|
||||
RTWQ_CALLBACK_QUEUE_MULTITHREADED = 0x00000005,
|
||||
RTWQ_CALLBACK_QUEUE_LONG_FUNCTION = 0x00000007,
|
||||
RTWQ_CALLBACK_QUEUE_PRIVATE_MASK = 0xffff0000,
|
||||
RTWQ_CALLBACK_QUEUE_ALL = 0xffffffff,
|
||||
};
|
||||
|
||||
enum system_queue_index
|
||||
{
|
||||
SYS_QUEUE_STANDARD = 0,
|
||||
|
@ -81,10 +136,40 @@ struct queue
|
|||
|
||||
static struct queue system_queues[SYS_QUEUE_COUNT];
|
||||
|
||||
static struct queue *get_system_queue(DWORD queue_id)
|
||||
{
|
||||
switch (queue_id)
|
||||
{
|
||||
case RTWQ_CALLBACK_QUEUE_STANDARD:
|
||||
case RTWQ_CALLBACK_QUEUE_RT:
|
||||
case RTWQ_CALLBACK_QUEUE_IO:
|
||||
case RTWQ_CALLBACK_QUEUE_TIMER:
|
||||
case RTWQ_CALLBACK_QUEUE_MULTITHREADED:
|
||||
case RTWQ_CALLBACK_QUEUE_LONG_FUNCTION:
|
||||
return &system_queues[queue_id - 1];
|
||||
default:
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data)
|
||||
{
|
||||
}
|
||||
|
||||
static struct work_item * alloc_work_item(struct queue *queue, IRtwqAsyncResult *result)
|
||||
{
|
||||
struct work_item *item;
|
||||
|
||||
item = heap_alloc_zero(sizeof(*item));
|
||||
item->result = result;
|
||||
IRtwqAsyncResult_AddRef(item->result);
|
||||
item->refcount = 1;
|
||||
item->queue = queue;
|
||||
list_init(&item->entry);
|
||||
|
||||
return item;
|
||||
}
|
||||
|
||||
static void release_work_item(struct work_item *item)
|
||||
{
|
||||
if (InterlockedDecrement(&item->refcount) == 0)
|
||||
|
@ -94,6 +179,12 @@ static void release_work_item(struct work_item *item)
|
|||
}
|
||||
}
|
||||
|
||||
static struct work_item *grab_work_item(struct work_item *item)
|
||||
{
|
||||
InterlockedIncrement(&item->refcount);
|
||||
return item;
|
||||
}
|
||||
|
||||
static void init_work_queue(RTWQ_WORKQUEUE_TYPE queue_type, struct queue *queue)
|
||||
{
|
||||
TP_CALLBACK_ENVIRON_V3 env;
|
||||
|
@ -125,6 +216,255 @@ static void init_work_queue(RTWQ_WORKQUEUE_TYPE queue_type, struct queue *queue)
|
|||
FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n");
|
||||
}
|
||||
|
||||
static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
|
||||
{
|
||||
struct queue *queue = get_system_queue(queue_id);
|
||||
RTWQ_WORKQUEUE_TYPE queue_type;
|
||||
struct queue_handle *entry;
|
||||
|
||||
*ret = NULL;
|
||||
|
||||
if (!system_queues[SYS_QUEUE_STANDARD].pool)
|
||||
return RTWQ_E_SHUTDOWN;
|
||||
|
||||
if (queue && queue->pool)
|
||||
{
|
||||
*ret = queue;
|
||||
return S_OK;
|
||||
}
|
||||
else if (queue)
|
||||
{
|
||||
EnterCriticalSection(&queues_section);
|
||||
switch (queue_id)
|
||||
{
|
||||
case RTWQ_CALLBACK_QUEUE_IO:
|
||||
case RTWQ_CALLBACK_QUEUE_MULTITHREADED:
|
||||
case RTWQ_CALLBACK_QUEUE_LONG_FUNCTION:
|
||||
queue_type = RTWQ_MULTITHREADED_WORKQUEUE;
|
||||
break;
|
||||
default:
|
||||
queue_type = RTWQ_STANDARD_WORKQUEUE;
|
||||
}
|
||||
init_work_queue(queue_type, queue);
|
||||
LeaveCriticalSection(&queues_section);
|
||||
*ret = queue;
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
/* Handles user queues. */
|
||||
if ((entry = get_queue_obj(queue_id)))
|
||||
*ret = entry->obj;
|
||||
|
||||
return *ret ? S_OK : RTWQ_E_INVALID_WORKQUEUE;
|
||||
}
|
||||
|
||||
static HRESULT lock_user_queue(DWORD queue)
|
||||
{
|
||||
HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
|
||||
struct queue_handle *entry;
|
||||
|
||||
if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
|
||||
return S_OK;
|
||||
|
||||
EnterCriticalSection(&queues_section);
|
||||
entry = get_queue_obj(queue);
|
||||
if (entry && entry->refcount)
|
||||
{
|
||||
entry->refcount++;
|
||||
hr = S_OK;
|
||||
}
|
||||
LeaveCriticalSection(&queues_section);
|
||||
return hr;
|
||||
}
|
||||
|
||||
static void shutdown_queue(struct queue *queue)
|
||||
{
|
||||
struct work_item *item, *item2;
|
||||
|
||||
if (!queue->pool)
|
||||
return;
|
||||
|
||||
CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL);
|
||||
CloseThreadpool(queue->pool);
|
||||
queue->pool = NULL;
|
||||
|
||||
EnterCriticalSection(&queue->cs);
|
||||
LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry)
|
||||
{
|
||||
list_remove(&item->entry);
|
||||
release_work_item(item);
|
||||
}
|
||||
LeaveCriticalSection(&queue->cs);
|
||||
|
||||
DeleteCriticalSection(&queue->cs);
|
||||
}
|
||||
|
||||
static HRESULT unlock_user_queue(DWORD queue)
|
||||
{
|
||||
HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
|
||||
struct queue_handle *entry;
|
||||
|
||||
if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
|
||||
return S_OK;
|
||||
|
||||
EnterCriticalSection(&queues_section);
|
||||
entry = get_queue_obj(queue);
|
||||
if (entry && entry->refcount)
|
||||
{
|
||||
if (--entry->refcount == 0)
|
||||
{
|
||||
shutdown_queue((struct queue *)entry->obj);
|
||||
heap_free(entry->obj);
|
||||
entry->obj = next_free_user_queue;
|
||||
next_free_user_queue = entry;
|
||||
}
|
||||
hr = S_OK;
|
||||
}
|
||||
LeaveCriticalSection(&queues_section);
|
||||
return hr;
|
||||
}
|
||||
|
||||
static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work)
|
||||
{
|
||||
struct work_item *item = context;
|
||||
RTWQASYNCRESULT *result = (RTWQASYNCRESULT *)item->result;
|
||||
|
||||
TRACE("result object %p.\n", result);
|
||||
|
||||
IRtwqAsyncCallback_Invoke(result->pCallback, item->result);
|
||||
|
||||
release_work_item(item);
|
||||
}
|
||||
|
||||
static HRESULT queue_submit_item(struct queue *queue, LONG priority, IRtwqAsyncResult *result)
|
||||
{
|
||||
TP_CALLBACK_PRIORITY callback_priority;
|
||||
struct work_item *item;
|
||||
TP_WORK *work_object;
|
||||
|
||||
if (!(item = alloc_work_item(queue, result)))
|
||||
return E_OUTOFMEMORY;
|
||||
|
||||
if (priority == 0)
|
||||
callback_priority = TP_CALLBACK_PRIORITY_NORMAL;
|
||||
else if (priority < 0)
|
||||
callback_priority = TP_CALLBACK_PRIORITY_LOW;
|
||||
else
|
||||
callback_priority = TP_CALLBACK_PRIORITY_HIGH;
|
||||
work_object = CreateThreadpoolWork(standard_queue_worker, item,
|
||||
(TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]);
|
||||
SubmitThreadpoolWork(work_object);
|
||||
|
||||
TRACE("dispatched %p.\n", result);
|
||||
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
static HRESULT queue_put_work_item(DWORD queue_id, LONG priority, IRtwqAsyncResult *result)
|
||||
{
|
||||
struct queue *queue;
|
||||
HRESULT hr;
|
||||
|
||||
if (FAILED(hr = grab_queue(queue_id, &queue)))
|
||||
return hr;
|
||||
|
||||
return queue_submit_item(queue, priority, result);
|
||||
}
|
||||
|
||||
static HRESULT invoke_async_callback(IRtwqAsyncResult *result)
|
||||
{
|
||||
RTWQASYNCRESULT *result_data = (RTWQASYNCRESULT *)result;
|
||||
DWORD queue = RTWQ_CALLBACK_QUEUE_STANDARD, flags;
|
||||
HRESULT hr;
|
||||
|
||||
if (FAILED(IRtwqAsyncCallback_GetParameters(result_data->pCallback, &flags, &queue)))
|
||||
queue = RTWQ_CALLBACK_QUEUE_STANDARD;
|
||||
|
||||
if (FAILED(lock_user_queue(queue)))
|
||||
queue = RTWQ_CALLBACK_QUEUE_STANDARD;
|
||||
|
||||
hr = queue_put_work_item(queue, 0, result);
|
||||
|
||||
unlock_user_queue(queue);
|
||||
|
||||
return hr;
|
||||
}
|
||||
|
||||
static void queue_release_pending_item(struct work_item *item)
|
||||
{
|
||||
EnterCriticalSection(&item->queue->cs);
|
||||
if (item->key)
|
||||
{
|
||||
list_remove(&item->entry);
|
||||
item->key = 0;
|
||||
release_work_item(item);
|
||||
}
|
||||
LeaveCriticalSection(&item->queue->cs);
|
||||
}
|
||||
|
||||
static void CALLBACK waiting_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait,
|
||||
TP_WAIT_RESULT wait_result)
|
||||
{
|
||||
struct work_item *item = context;
|
||||
|
||||
TRACE("result object %p.\n", item->result);
|
||||
|
||||
invoke_async_callback(item->result);
|
||||
|
||||
release_work_item(item);
|
||||
}
|
||||
|
||||
static void CALLBACK waiting_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait,
|
||||
TP_WAIT_RESULT wait_result)
|
||||
{
|
||||
struct work_item *item = context;
|
||||
|
||||
TRACE("result object %p.\n", item->result);
|
||||
|
||||
queue_release_pending_item(item);
|
||||
|
||||
invoke_async_callback(item->result);
|
||||
|
||||
release_work_item(item);
|
||||
}
|
||||
|
||||
static void queue_mark_item_pending(DWORD mask, struct work_item *item, RTWQWORKITEM_KEY *key)
|
||||
{
|
||||
*key = generate_item_key(mask);
|
||||
item->key = *key;
|
||||
|
||||
EnterCriticalSection(&item->queue->cs);
|
||||
list_add_tail(&item->queue->pending_items, &item->entry);
|
||||
grab_work_item(item);
|
||||
LeaveCriticalSection(&item->queue->cs);
|
||||
}
|
||||
|
||||
static HRESULT queue_submit_wait(struct queue *queue, HANDLE event, LONG priority, IRtwqAsyncResult *result,
|
||||
RTWQWORKITEM_KEY *key)
|
||||
{
|
||||
PTP_WAIT_CALLBACK callback;
|
||||
struct work_item *item;
|
||||
|
||||
if (!(item = alloc_work_item(queue, result)))
|
||||
return E_OUTOFMEMORY;
|
||||
|
||||
if (key)
|
||||
{
|
||||
queue_mark_item_pending(WAIT_ITEM_KEY_MASK, item, key);
|
||||
callback = waiting_item_cancelable_callback;
|
||||
}
|
||||
else
|
||||
callback = waiting_item_callback;
|
||||
|
||||
item->u.wait_object = CreateThreadpoolWait(callback, item,
|
||||
(TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]);
|
||||
SetThreadpoolWait(item->u.wait_object, event, NULL);
|
||||
|
||||
TRACE("dispatched %p.\n", result);
|
||||
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
struct async_result
|
||||
{
|
||||
RTWQASYNCRESULT result;
|
||||
|
@ -342,28 +682,6 @@ HRESULT WINAPI RtwqStartup(void)
|
|||
return S_OK;
|
||||
}
|
||||
|
||||
static void shutdown_queue(struct queue *queue)
|
||||
{
|
||||
struct work_item *item, *item2;
|
||||
|
||||
if (!queue->pool)
|
||||
return;
|
||||
|
||||
CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL);
|
||||
CloseThreadpool(queue->pool);
|
||||
queue->pool = NULL;
|
||||
|
||||
EnterCriticalSection(&queue->cs);
|
||||
LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry)
|
||||
{
|
||||
list_remove(&item->entry);
|
||||
release_work_item(item);
|
||||
}
|
||||
LeaveCriticalSection(&queue->cs);
|
||||
|
||||
DeleteCriticalSection(&queue->cs);
|
||||
}
|
||||
|
||||
static void shutdown_system_queues(void)
|
||||
{
|
||||
unsigned int i;
|
||||
|
@ -390,3 +708,32 @@ HRESULT WINAPI RtwqShutdown(void)
|
|||
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
HRESULT WINAPI RtwqPutWaitingWorkItem(HANDLE event, LONG priority, IRtwqAsyncResult *result, RTWQWORKITEM_KEY *key)
|
||||
{
|
||||
struct queue *queue;
|
||||
HRESULT hr;
|
||||
|
||||
TRACE("%p, %d, %p, %p.\n", event, priority, result, key);
|
||||
|
||||
if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue)))
|
||||
return hr;
|
||||
|
||||
hr = queue_submit_wait(queue, event, priority, result, key);
|
||||
|
||||
return hr;
|
||||
}
|
||||
|
||||
HRESULT WINAPI RtwqLockWorkQueue(DWORD queue)
|
||||
{
|
||||
TRACE("%#x.\n", queue);
|
||||
|
||||
return lock_user_queue(queue);
|
||||
}
|
||||
|
||||
HRESULT WINAPI RtwqUnlockWorkQueue(DWORD queue)
|
||||
{
|
||||
TRACE("%#x.\n", queue);
|
||||
|
||||
return unlock_user_queue(queue);
|
||||
}
|
||||
|
|
|
@ -17,9 +17,9 @@
|
|||
@ stub RtwqJoinWorkQueue
|
||||
@ stdcall RtwqLockPlatform()
|
||||
@ stub RtwqLockSharedWorkQueue
|
||||
@ stub RtwqLockWorkQueue
|
||||
@ stdcall RtwqLockWorkQueue(long)
|
||||
@ stub RtwqPutMultipleWaitingWorkItem
|
||||
@ stub RtwqPutWaitingWorkItem
|
||||
@ stdcall RtwqPutWaitingWorkItem(long long ptr ptr)
|
||||
@ stub RtwqPutWorkItem
|
||||
@ stub RtwqRegisterPlatformEvents
|
||||
@ stub RtwqRegisterPlatformWithMMCSS
|
||||
|
@ -32,6 +32,6 @@
|
|||
@ stdcall RtwqStartup()
|
||||
@ stub RtwqUnjoinWorkQueue
|
||||
@ stdcall RtwqUnlockPlatform()
|
||||
@ stub RtwqUnlockWorkQueue
|
||||
@ stdcall RtwqUnlockWorkQueue(long)
|
||||
@ stub RtwqUnregisterPlatformEvents
|
||||
@ stub RtwqUnregisterPlatformFromMMCSS
|
||||
|
|
|
@ -52,6 +52,15 @@ interface IRtwqAsyncCallback : IUnknown
|
|||
HRESULT Invoke([in] IRtwqAsyncResult *result);
|
||||
}
|
||||
|
||||
cpp_quote("#define RTWQ_E_ERROR(x) ((HRESULT)(0xc00d0000L+x))")
|
||||
cpp_quote("#define RTWQ_E_BUFFERTOOSMALL RTWQ_E_ERROR(14001)")
|
||||
cpp_quote("#define RTWQ_E_NOT_INITIALIZED RTWQ_E_ERROR(14006)")
|
||||
cpp_quote("#define RTWQ_E_UNEXPECTED RTWQ_E_ERROR(14011)")
|
||||
cpp_quote("#define RTWQ_E_NOT_FOUND RTWQ_E_ERROR(14037)")
|
||||
cpp_quote("#define RTWQ_E_OPERATION_CANCELLED RTWQ_E_ERROR(14061)")
|
||||
cpp_quote("#define RTWQ_E_INVALID_WORKQUEUE RTWQ_E_ERROR(14079)")
|
||||
cpp_quote("#define RTWQ_E_SHUTDOWN RTWQ_E_ERROR(16005)")
|
||||
|
||||
cpp_quote("#ifdef __WINESRC__")
|
||||
cpp_quote("typedef struct tagRTWQASYNCRESULT")
|
||||
cpp_quote("{")
|
||||
|
@ -68,6 +77,9 @@ cpp_quote("} RTWQASYNCRESULT;")
|
|||
|
||||
cpp_quote("HRESULT WINAPI RtwqCreateAsyncResult(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **result);")
|
||||
cpp_quote("HRESULT WINAPI RtwqLockPlatform(void);")
|
||||
cpp_quote("HRESULT WINAPI RtwqLockWorkQueue(DWORD queue);")
|
||||
cpp_quote("HRESULT WINAPI RtwqPutWaitingWorkItem(HANDLE event, LONG priority, IRtwqAsyncResult *result, RTWQWORKITEM_KEY *key);")
|
||||
cpp_quote("HRESULT WINAPI RtwqShutdown(void);")
|
||||
cpp_quote("HRESULT WINAPI RtwqStartup(void);")
|
||||
cpp_quote("HRESULT WINAPI RtwqUnlockPlatform(void);")
|
||||
cpp_quote("HRESULT WINAPI RtwqUnlockWorkQueue(DWORD queue);")
|
||||
|
|
Loading…
Reference in New Issue