Sweden-Number/dlls/rtworkq/queue.c

1465 lines
38 KiB
C
Raw Normal View History

/*
* Copyright 2019-2020 Nikolay Sivov for CodeWeavers
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
*/
#include <assert.h>
#define COBJMACROS
#define NONAMELESSUNION
#include "initguid.h"
#include "rtworkq.h"
#include "wine/debug.h"
#include "wine/heap.h"
#include "wine/list.h"
WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
#define FIRST_USER_QUEUE_HANDLE 5
#define MAX_USER_QUEUE_HANDLES 124
#define WAIT_ITEM_KEY_MASK (0x82000000)
#define SCHEDULED_ITEM_KEY_MASK (0x80000000)
static LONG next_item_key;
static RTWQWORKITEM_KEY get_item_key(DWORD mask, DWORD key)
{
return ((RTWQWORKITEM_KEY)mask << 32) | key;
}
static RTWQWORKITEM_KEY generate_item_key(DWORD mask)
{
return get_item_key(mask, InterlockedIncrement(&next_item_key));
}
struct queue_handle
{
void *obj;
LONG refcount;
WORD generation;
};
static struct queue_handle user_queues[MAX_USER_QUEUE_HANDLES];
static struct queue_handle *next_free_user_queue;
static struct queue_handle *next_unused_user_queue = user_queues;
static WORD queue_generation;
static CRITICAL_SECTION queues_section;
static CRITICAL_SECTION_DEBUG queues_critsect_debug =
{
0, 0, &queues_section,
{ &queues_critsect_debug.ProcessLocksList, &queues_critsect_debug.ProcessLocksList },
0, 0, { (DWORD_PTR)(__FILE__ ": queues_section") }
};
static CRITICAL_SECTION queues_section = { &queues_critsect_debug, -1, 0, 0, 0, 0 };
static LONG platform_lock;
static struct queue_handle *get_queue_obj(DWORD handle)
{
unsigned int idx = HIWORD(handle) - FIRST_USER_QUEUE_HANDLE;
if (idx < MAX_USER_QUEUE_HANDLES && user_queues[idx].refcount)
{
if (LOWORD(handle) == user_queues[idx].generation)
return &user_queues[idx];
}
return NULL;
}
/* Should be kept in sync with corresponding MFASYNC_CALLBACK_ constants. */
enum rtwq_callback_queue_id
{
RTWQ_CALLBACK_QUEUE_UNDEFINED = 0x00000000,
RTWQ_CALLBACK_QUEUE_STANDARD = 0x00000001,
RTWQ_CALLBACK_QUEUE_RT = 0x00000002,
RTWQ_CALLBACK_QUEUE_IO = 0x00000003,
RTWQ_CALLBACK_QUEUE_TIMER = 0x00000004,
RTWQ_CALLBACK_QUEUE_MULTITHREADED = 0x00000005,
RTWQ_CALLBACK_QUEUE_LONG_FUNCTION = 0x00000007,
RTWQ_CALLBACK_QUEUE_PRIVATE_MASK = 0xffff0000,
RTWQ_CALLBACK_QUEUE_ALL = 0xffffffff,
};
/* Should be kept in sync with corresponding MFASYNC_ constants. */
enum rtwq_callback_flags
{
RTWQ_FAST_IO_PROCESSING_CALLBACK = 0x00000001,
RTWQ_SIGNAL_CALLBACK = 0x00000002,
RTWQ_BLOCKING_CALLBACK = 0x00000004,
RTWQ_REPLY_CALLBACK = 0x00000008,
RTWQ_LOCALIZE_REMOTE_CALLBACK = 0x00000010,
};
enum system_queue_index
{
SYS_QUEUE_STANDARD = 0,
SYS_QUEUE_RT,
SYS_QUEUE_IO,
SYS_QUEUE_TIMER,
SYS_QUEUE_MULTITHREADED,
SYS_QUEUE_DO_NOT_USE,
SYS_QUEUE_LONG_FUNCTION,
SYS_QUEUE_COUNT,
};
struct work_item
{
IUnknown IUnknown_iface;
LONG refcount;
struct list entry;
IRtwqAsyncResult *result;
IRtwqAsyncResult *reply_result;
struct queue *queue;
RTWQWORKITEM_KEY key;
LONG priority;
DWORD flags;
PTP_SIMPLE_CALLBACK finalization_callback;
union
{
TP_WAIT *wait_object;
TP_TIMER *timer_object;
} u;
};
static struct work_item *work_item_impl_from_IUnknown(IUnknown *iface)
{
return CONTAINING_RECORD(iface, struct work_item, IUnknown_iface);
}
static const TP_CALLBACK_PRIORITY priorities[] =
{
TP_CALLBACK_PRIORITY_HIGH,
TP_CALLBACK_PRIORITY_NORMAL,
TP_CALLBACK_PRIORITY_LOW,
};
struct queue;
struct queue_desc;
struct queue_ops
{
HRESULT (*init)(const struct queue_desc *desc, struct queue *queue);
BOOL (*shutdown)(struct queue *queue);
void (*submit)(struct queue *queue, struct work_item *item);
};
struct queue_desc
{
RTWQ_WORKQUEUE_TYPE queue_type;
const struct queue_ops *ops;
DWORD target_queue;
};
struct queue
{
IRtwqAsyncCallback IRtwqAsyncCallback_iface;
const struct queue_ops *ops;
TP_POOL *pool;
TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)];
CRITICAL_SECTION cs;
struct list pending_items;
DWORD id;
/* Data used for serial queues only. */
PTP_SIMPLE_CALLBACK finalization_callback;
DWORD target_queue;
};
static void shutdown_queue(struct queue *queue);
static HRESULT lock_user_queue(DWORD queue)
{
HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
struct queue_handle *entry;
if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
return S_OK;
EnterCriticalSection(&queues_section);
entry = get_queue_obj(queue);
if (entry && entry->refcount)
{
entry->refcount++;
hr = S_OK;
}
LeaveCriticalSection(&queues_section);
return hr;
}
static HRESULT unlock_user_queue(DWORD queue)
{
HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
struct queue_handle *entry;
if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
return S_OK;
EnterCriticalSection(&queues_section);
entry = get_queue_obj(queue);
if (entry && entry->refcount)
{
if (--entry->refcount == 0)
{
shutdown_queue((struct queue *)entry->obj);
heap_free(entry->obj);
entry->obj = next_free_user_queue;
next_free_user_queue = entry;
}
hr = S_OK;
}
LeaveCriticalSection(&queues_section);
return hr;
}
static struct queue *queue_impl_from_IRtwqAsyncCallback(IRtwqAsyncCallback *iface)
{
return CONTAINING_RECORD(iface, struct queue, IRtwqAsyncCallback_iface);
}
static HRESULT WINAPI queue_serial_callback_QueryInterface(IRtwqAsyncCallback *iface, REFIID riid, void **obj)
{
if (IsEqualIID(riid, &IID_IRtwqAsyncCallback) ||
IsEqualIID(riid, &IID_IUnknown))
{
*obj = iface;
IRtwqAsyncCallback_AddRef(iface);
return S_OK;
}
*obj = NULL;
return E_NOINTERFACE;
}
static ULONG WINAPI queue_serial_callback_AddRef(IRtwqAsyncCallback *iface)
{
return 2;
}
static ULONG WINAPI queue_serial_callback_Release(IRtwqAsyncCallback *iface)
{
return 1;
}
static HRESULT WINAPI queue_serial_callback_GetParameters(IRtwqAsyncCallback *iface, DWORD *flags, DWORD *queue_id)
{
struct queue *queue = queue_impl_from_IRtwqAsyncCallback(iface);
*flags = 0;
*queue_id = queue->id;
return S_OK;
}
static HRESULT WINAPI queue_serial_callback_Invoke(IRtwqAsyncCallback *iface, IRtwqAsyncResult *result)
{
/* Reply callback won't be called in a regular way, pending items and chained queues will make it
unnecessary complicated to reach actual work queue that's able to execute this item. Instead
serial queues are cleaned up right away on submit(). */
return S_OK;
}
static const IRtwqAsyncCallbackVtbl queue_serial_callback_vtbl =
{
queue_serial_callback_QueryInterface,
queue_serial_callback_AddRef,
queue_serial_callback_Release,
queue_serial_callback_GetParameters,
queue_serial_callback_Invoke,
};
static struct queue system_queues[SYS_QUEUE_COUNT];
static struct queue *get_system_queue(DWORD queue_id)
{
switch (queue_id)
{
case RTWQ_CALLBACK_QUEUE_STANDARD:
case RTWQ_CALLBACK_QUEUE_RT:
case RTWQ_CALLBACK_QUEUE_IO:
case RTWQ_CALLBACK_QUEUE_TIMER:
case RTWQ_CALLBACK_QUEUE_MULTITHREADED:
case RTWQ_CALLBACK_QUEUE_LONG_FUNCTION:
return &system_queues[queue_id - 1];
default:
return NULL;
}
}
static HRESULT grab_queue(DWORD queue_id, struct queue **ret);
static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data)
{
}
static HRESULT pool_queue_init(const struct queue_desc *desc, struct queue *queue)
{
TP_CALLBACK_ENVIRON_V3 env;
unsigned int max_thread, i;
queue->pool = CreateThreadpool(NULL);
memset(&env, 0, sizeof(env));
env.Version = 3;
env.Size = sizeof(env);
env.Pool = queue->pool;
env.CleanupGroup = CreateThreadpoolCleanupGroup();
env.CleanupGroupCancelCallback = standard_queue_cleanup_callback;
env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL;
for (i = 0; i < ARRAY_SIZE(queue->envs); ++i)
{
queue->envs[i] = env;
queue->envs[i].CallbackPriority = priorities[i];
}
list_init(&queue->pending_items);
InitializeCriticalSection(&queue->cs);
max_thread = (desc->queue_type == RTWQ_STANDARD_WORKQUEUE || desc->queue_type == RTWQ_WINDOW_WORKQUEUE) ? 1 : 4;
SetThreadpoolThreadMinimum(queue->pool, 1);
SetThreadpoolThreadMaximum(queue->pool, max_thread);
if (desc->queue_type == RTWQ_WINDOW_WORKQUEUE)
FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n");
return S_OK;
}
static BOOL pool_queue_shutdown(struct queue *queue)
{
if (!queue->pool)
return FALSE;
CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL);
CloseThreadpool(queue->pool);
queue->pool = NULL;
return TRUE;
}
static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work)
{
struct work_item *item = context;
RTWQASYNCRESULT *result = (RTWQASYNCRESULT *)item->result;
TRACE("result object %p.\n", result);
/* Submitting from serial queue in reply mode, use different result object acting as receipt token.
It's submitted to user callback still, but when invoked, special serial queue callback will be used
to ensure correct destination queue. */
IRtwqAsyncCallback_Invoke(result->pCallback, item->reply_result ? item->reply_result : item->result);
IUnknown_Release(&item->IUnknown_iface);
}
static void pool_queue_submit(struct queue *queue, struct work_item *item)
{
TP_CALLBACK_PRIORITY callback_priority;
TP_CALLBACK_ENVIRON_V3 env;
TP_WORK *work_object;
if (item->priority == 0)
callback_priority = TP_CALLBACK_PRIORITY_NORMAL;
else if (item->priority < 0)
callback_priority = TP_CALLBACK_PRIORITY_LOW;
else
callback_priority = TP_CALLBACK_PRIORITY_HIGH;
env = queue->envs[callback_priority];
env.FinalizationCallback = item->finalization_callback;
/* Worker pool callback will release one reference. Grab one more to keep object alive when
we need finalization callback. */
if (item->finalization_callback)
IUnknown_AddRef(&item->IUnknown_iface);
work_object = CreateThreadpoolWork(standard_queue_worker, item, (TP_CALLBACK_ENVIRON *)&env);
SubmitThreadpoolWork(work_object);
TRACE("dispatched %p.\n", item->result);
}
static const struct queue_ops pool_queue_ops =
{
pool_queue_init,
pool_queue_shutdown,
pool_queue_submit,
};
static struct work_item * serial_queue_get_next(struct queue *queue, struct work_item *item)
{
struct work_item *next_item = NULL;
list_remove(&item->entry);
if (!list_empty(&item->queue->pending_items))
next_item = LIST_ENTRY(list_head(&item->queue->pending_items), struct work_item, entry);
return next_item;
}
static void CALLBACK serial_queue_finalization_callback(PTP_CALLBACK_INSTANCE instance, void *user_data)
{
struct work_item *item = (struct work_item *)user_data, *next_item;
struct queue *target_queue, *queue = item->queue;
HRESULT hr;
EnterCriticalSection(&queue->cs);
if ((next_item = serial_queue_get_next(queue, item)))
{
if (SUCCEEDED(hr = grab_queue(queue->target_queue, &target_queue)))
target_queue->ops->submit(target_queue, next_item);
else
WARN("Failed to grab queue for id %#x, hr %#x.\n", queue->target_queue, hr);
}
LeaveCriticalSection(&queue->cs);
IUnknown_Release(&item->IUnknown_iface);
}
static HRESULT serial_queue_init(const struct queue_desc *desc, struct queue *queue)
{
queue->IRtwqAsyncCallback_iface.lpVtbl = &queue_serial_callback_vtbl;
queue->target_queue = desc->target_queue;
lock_user_queue(queue->target_queue);
queue->finalization_callback = serial_queue_finalization_callback;
return S_OK;
}
static BOOL serial_queue_shutdown(struct queue *queue)
{
unlock_user_queue(queue->target_queue);
return TRUE;
}
static struct work_item * serial_queue_is_ack_token(struct queue *queue, struct work_item *item)
{
RTWQASYNCRESULT *async_result = (RTWQASYNCRESULT *)item->result;
struct work_item *head;
if (list_empty(&queue->pending_items))
return NULL;
head = LIST_ENTRY(list_head(&queue->pending_items), struct work_item, entry);
if (head->reply_result == item->result && async_result->pCallback == &queue->IRtwqAsyncCallback_iface)
return head;
return NULL;
}
static void serial_queue_submit(struct queue *queue, struct work_item *item)
{
struct work_item *head, *next_item = NULL;
struct queue *target_queue;
HRESULT hr;
/* In reply mode queue will advance when 'reply_result' is invoked, in regular mode it will advance automatically,
via finalization callback. */
if (item->flags & RTWQ_REPLY_CALLBACK)
{
if (FAILED(hr = RtwqCreateAsyncResult(NULL, &queue->IRtwqAsyncCallback_iface, NULL, &item->reply_result)))
WARN("Failed to create reply object, hr %#x.\n", hr);
}
else
item->finalization_callback = queue->finalization_callback;
/* Serial queues could be chained together, detach from current queue before transitioning item to this one.
Items are not detached when submitted to pool queues, because pool queues won't forward them further. */
EnterCriticalSection(&item->queue->cs);
list_remove(&item->entry);
LeaveCriticalSection(&item->queue->cs);
EnterCriticalSection(&queue->cs);
item->queue = queue;
if ((head = serial_queue_is_ack_token(queue, item)))
{
/* Ack receipt token - pop waiting item, advance. */
next_item = serial_queue_get_next(queue, head);
IUnknown_Release(&head->IUnknown_iface);
}
else
{
if (list_empty(&queue->pending_items))
next_item = item;
list_add_tail(&queue->pending_items, &item->entry);
IUnknown_AddRef(&item->IUnknown_iface);
}
if (next_item)
{
if (SUCCEEDED(hr = grab_queue(queue->target_queue, &target_queue)))
target_queue->ops->submit(target_queue, next_item);
else
WARN("Failed to grab queue for id %#x, hr %#x.\n", queue->target_queue, hr);
}
LeaveCriticalSection(&queue->cs);
}
static const struct queue_ops serial_queue_ops =
{
serial_queue_init,
serial_queue_shutdown,
serial_queue_submit,
};
static HRESULT WINAPI work_item_QueryInterface(IUnknown *iface, REFIID riid, void **obj)
{
if (IsEqualIID(riid, &IID_IUnknown))
{
*obj = iface;
IUnknown_AddRef(iface);
return S_OK;
}
*obj = NULL;
return E_NOINTERFACE;
}
static ULONG WINAPI work_item_AddRef(IUnknown *iface)
{
struct work_item *item = work_item_impl_from_IUnknown(iface);
return InterlockedIncrement(&item->refcount);
}
static ULONG WINAPI work_item_Release(IUnknown *iface)
{
struct work_item *item = work_item_impl_from_IUnknown(iface);
ULONG refcount = InterlockedDecrement(&item->refcount);
if (!refcount)
{
if (item->reply_result)
IRtwqAsyncResult_Release(item->reply_result);
IRtwqAsyncResult_Release(item->result);
heap_free(item);
}
return refcount;
}
static const IUnknownVtbl work_item_vtbl =
{
work_item_QueryInterface,
work_item_AddRef,
work_item_Release,
};
static struct work_item * alloc_work_item(struct queue *queue, LONG priority, IRtwqAsyncResult *result)
{
RTWQASYNCRESULT *async_result = (RTWQASYNCRESULT *)result;
DWORD flags = 0, queue_id = 0;
struct work_item *item;
item = heap_alloc_zero(sizeof(*item));
item->IUnknown_iface.lpVtbl = &work_item_vtbl;
item->result = result;
IRtwqAsyncResult_AddRef(item->result);
item->refcount = 1;
item->queue = queue;
list_init(&item->entry);
item->priority = priority;
if (SUCCEEDED(IRtwqAsyncCallback_GetParameters(async_result->pCallback, &flags, &queue_id)))
item->flags = flags;
return item;
}
static void init_work_queue(const struct queue_desc *desc, struct queue *queue)
{
assert(desc->ops != NULL);
queue->ops = desc->ops;
if (SUCCEEDED(queue->ops->init(desc, queue)))
{
list_init(&queue->pending_items);
InitializeCriticalSection(&queue->cs);
}
}
static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
{
struct queue *queue = get_system_queue(queue_id);
RTWQ_WORKQUEUE_TYPE queue_type;
struct queue_handle *entry;
*ret = NULL;
if (!system_queues[SYS_QUEUE_STANDARD].pool)
return RTWQ_E_SHUTDOWN;
if (queue && queue->pool)
{
*ret = queue;
return S_OK;
}
else if (queue)
{
struct queue_desc desc;
EnterCriticalSection(&queues_section);
switch (queue_id)
{
case RTWQ_CALLBACK_QUEUE_IO:
case RTWQ_CALLBACK_QUEUE_MULTITHREADED:
case RTWQ_CALLBACK_QUEUE_LONG_FUNCTION:
queue_type = RTWQ_MULTITHREADED_WORKQUEUE;
break;
default:
queue_type = RTWQ_STANDARD_WORKQUEUE;
}
desc.queue_type = queue_type;
desc.ops = &pool_queue_ops;
desc.target_queue = 0;
init_work_queue(&desc, queue);
LeaveCriticalSection(&queues_section);
*ret = queue;
return S_OK;
}
/* Handles user queues. */
if ((entry = get_queue_obj(queue_id)))
*ret = entry->obj;
return *ret ? S_OK : RTWQ_E_INVALID_WORKQUEUE;
}
static void shutdown_queue(struct queue *queue)
{
struct work_item *item, *item2;
if (!queue->ops || !queue->ops->shutdown(queue))
return;
EnterCriticalSection(&queue->cs);
LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry)
{
list_remove(&item->entry);
IUnknown_Release(&item->IUnknown_iface);
}
LeaveCriticalSection(&queue->cs);
DeleteCriticalSection(&queue->cs);
memset(queue, 0, sizeof(*queue));
}
static HRESULT queue_submit_item(struct queue *queue, LONG priority, IRtwqAsyncResult *result)
{
struct work_item *item;
if (!(item = alloc_work_item(queue, priority, result)))
return E_OUTOFMEMORY;
queue->ops->submit(queue, item);
return S_OK;
}
static HRESULT queue_put_work_item(DWORD queue_id, LONG priority, IRtwqAsyncResult *result)
{
struct queue *queue;
HRESULT hr;
if (FAILED(hr = grab_queue(queue_id, &queue)))
return hr;
return queue_submit_item(queue, priority, result);
}
static HRESULT invoke_async_callback(IRtwqAsyncResult *result)
{
RTWQASYNCRESULT *result_data = (RTWQASYNCRESULT *)result;
DWORD queue = RTWQ_CALLBACK_QUEUE_STANDARD, flags;
HRESULT hr;
if (FAILED(IRtwqAsyncCallback_GetParameters(result_data->pCallback, &flags, &queue)))
queue = RTWQ_CALLBACK_QUEUE_STANDARD;
if (FAILED(lock_user_queue(queue)))
queue = RTWQ_CALLBACK_QUEUE_STANDARD;
hr = queue_put_work_item(queue, 0, result);
unlock_user_queue(queue);
return hr;
}
static void queue_release_pending_item(struct work_item *item)
{
EnterCriticalSection(&item->queue->cs);
if (item->key)
{
list_remove(&item->entry);
item->key = 0;
IUnknown_Release(&item->IUnknown_iface);
}
LeaveCriticalSection(&item->queue->cs);
}
static void CALLBACK waiting_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait,
TP_WAIT_RESULT wait_result)
{
struct work_item *item = context;
TRACE("result object %p.\n", item->result);
invoke_async_callback(item->result);
IUnknown_Release(&item->IUnknown_iface);
}
static void CALLBACK waiting_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait,
TP_WAIT_RESULT wait_result)
{
struct work_item *item = context;
TRACE("result object %p.\n", item->result);
queue_release_pending_item(item);
invoke_async_callback(item->result);
IUnknown_Release(&item->IUnknown_iface);
}
static void CALLBACK scheduled_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer)
{
struct work_item *item = context;
TRACE("result object %p.\n", item->result);
invoke_async_callback(item->result);
IUnknown_Release(&item->IUnknown_iface);
}
static void CALLBACK scheduled_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer)
{
struct work_item *item = context;
TRACE("result object %p.\n", item->result);
queue_release_pending_item(item);
invoke_async_callback(item->result);
IUnknown_Release(&item->IUnknown_iface);
}
static void CALLBACK periodic_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer)
{
struct work_item *item = context;
IUnknown_AddRef(&item->IUnknown_iface);
invoke_async_callback(item->result);
IUnknown_Release(&item->IUnknown_iface);
}
static void queue_mark_item_pending(DWORD mask, struct work_item *item, RTWQWORKITEM_KEY *key)
{
*key = generate_item_key(mask);
item->key = *key;
EnterCriticalSection(&item->queue->cs);
list_add_tail(&item->queue->pending_items, &item->entry);
IUnknown_AddRef(&item->IUnknown_iface);
LeaveCriticalSection(&item->queue->cs);
}
static HRESULT queue_submit_wait(struct queue *queue, HANDLE event, LONG priority, IRtwqAsyncResult *result,
RTWQWORKITEM_KEY *key)
{
PTP_WAIT_CALLBACK callback;
struct work_item *item;
if (!(item = alloc_work_item(queue, priority, result)))
return E_OUTOFMEMORY;
if (key)
{
queue_mark_item_pending(WAIT_ITEM_KEY_MASK, item, key);
callback = waiting_item_cancelable_callback;
}
else
callback = waiting_item_callback;
item->u.wait_object = CreateThreadpoolWait(callback, item,
(TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]);
SetThreadpoolWait(item->u.wait_object, event, NULL);
TRACE("dispatched %p.\n", result);
return S_OK;
}
static HRESULT queue_submit_timer(struct queue *queue, IRtwqAsyncResult *result, INT64 timeout, DWORD period,
RTWQWORKITEM_KEY *key)
{
PTP_TIMER_CALLBACK callback;
struct work_item *item;
FILETIME filetime;
LARGE_INTEGER t;
if (!(item = alloc_work_item(queue, 0, result)))
return E_OUTOFMEMORY;
if (key)
{
queue_mark_item_pending(SCHEDULED_ITEM_KEY_MASK, item, key);
}
if (period)
callback = periodic_item_callback;
else
callback = key ? scheduled_item_cancelable_callback : scheduled_item_callback;
t.QuadPart = timeout * 1000 * 10;
filetime.dwLowDateTime = t.u.LowPart;
filetime.dwHighDateTime = t.u.HighPart;
item->u.timer_object = CreateThreadpoolTimer(callback, item,
(TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]);
SetThreadpoolTimer(item->u.timer_object, &filetime, period, 0);
TRACE("dispatched %p.\n", result);
return S_OK;
}
static HRESULT queue_cancel_item(struct queue *queue, RTWQWORKITEM_KEY key)
{
HRESULT hr = RTWQ_E_NOT_FOUND;
struct work_item *item;
EnterCriticalSection(&queue->cs);
LIST_FOR_EACH_ENTRY(item, &queue->pending_items, struct work_item, entry)
{
if (item->key == key)
{
key >>= 32;
if ((key & WAIT_ITEM_KEY_MASK) == WAIT_ITEM_KEY_MASK)
CloseThreadpoolWait(item->u.wait_object);
else if ((key & SCHEDULED_ITEM_KEY_MASK) == SCHEDULED_ITEM_KEY_MASK)
CloseThreadpoolTimer(item->u.timer_object);
else
WARN("Unknown item key mask %#x.\n", (DWORD)key);
queue_release_pending_item(item);
hr = S_OK;
break;
}
}
LeaveCriticalSection(&queue->cs);
return hr;
}
static HRESULT alloc_user_queue(const struct queue_desc *desc, DWORD *queue_id)
{
struct queue_handle *entry;
struct queue *queue;
unsigned int idx;
*queue_id = RTWQ_CALLBACK_QUEUE_UNDEFINED;
if (platform_lock <= 0)
return RTWQ_E_SHUTDOWN;
queue = heap_alloc_zero(sizeof(*queue));
if (!queue)
return E_OUTOFMEMORY;
init_work_queue(desc, queue);
EnterCriticalSection(&queues_section);
entry = next_free_user_queue;
if (entry)
next_free_user_queue = entry->obj;
else if (next_unused_user_queue < user_queues + MAX_USER_QUEUE_HANDLES)
entry = next_unused_user_queue++;
else
{
LeaveCriticalSection(&queues_section);
heap_free(queue);
WARN("Out of user queue handles.\n");
return E_OUTOFMEMORY;
}
entry->refcount = 1;
entry->obj = queue;
if (++queue_generation == 0xffff) queue_generation = 1;
entry->generation = queue_generation;
idx = entry - user_queues + FIRST_USER_QUEUE_HANDLE;
*queue_id = (idx << 16) | entry->generation;
LeaveCriticalSection(&queues_section);
return S_OK;
}
struct async_result
{
RTWQASYNCRESULT result;
LONG refcount;
IUnknown *object;
IUnknown *state;
};
static struct async_result *impl_from_IRtwqAsyncResult(IRtwqAsyncResult *iface)
{
return CONTAINING_RECORD(iface, struct async_result, result.AsyncResult);
}
static HRESULT WINAPI async_result_QueryInterface(IRtwqAsyncResult *iface, REFIID riid, void **obj)
{
TRACE("%p, %s, %p.\n", iface, debugstr_guid(riid), obj);
if (IsEqualIID(riid, &IID_IRtwqAsyncResult) ||
IsEqualIID(riid, &IID_IUnknown))
{
*obj = iface;
IRtwqAsyncResult_AddRef(iface);
return S_OK;
}
*obj = NULL;
WARN("Unsupported interface %s.\n", debugstr_guid(riid));
return E_NOINTERFACE;
}
static ULONG WINAPI async_result_AddRef(IRtwqAsyncResult *iface)
{
struct async_result *result = impl_from_IRtwqAsyncResult(iface);
ULONG refcount = InterlockedIncrement(&result->refcount);
TRACE("%p, %u.\n", iface, refcount);
return refcount;
}
static ULONG WINAPI async_result_Release(IRtwqAsyncResult *iface)
{
struct async_result *result = impl_from_IRtwqAsyncResult(iface);
ULONG refcount = InterlockedDecrement(&result->refcount);
TRACE("%p, %u.\n", iface, refcount);
if (!refcount)
{
if (result->result.pCallback)
IRtwqAsyncCallback_Release(result->result.pCallback);
if (result->object)
IUnknown_Release(result->object);
if (result->state)
IUnknown_Release(result->state);
if (result->result.hEvent)
CloseHandle(result->result.hEvent);
heap_free(result);
RtwqUnlockPlatform();
}
return refcount;
}
static HRESULT WINAPI async_result_GetState(IRtwqAsyncResult *iface, IUnknown **state)
{
struct async_result *result = impl_from_IRtwqAsyncResult(iface);
TRACE("%p, %p.\n", iface, state);
if (!result->state)
return E_POINTER;
*state = result->state;
IUnknown_AddRef(*state);
return S_OK;
}
static HRESULT WINAPI async_result_GetStatus(IRtwqAsyncResult *iface)
{
struct async_result *result = impl_from_IRtwqAsyncResult(iface);
TRACE("%p.\n", iface);
return result->result.hrStatusResult;
}
static HRESULT WINAPI async_result_SetStatus(IRtwqAsyncResult *iface, HRESULT status)
{
struct async_result *result = impl_from_IRtwqAsyncResult(iface);
TRACE("%p, %#x.\n", iface, status);
result->result.hrStatusResult = status;
return S_OK;
}
static HRESULT WINAPI async_result_GetObject(IRtwqAsyncResult *iface, IUnknown **object)
{
struct async_result *result = impl_from_IRtwqAsyncResult(iface);
TRACE("%p, %p.\n", iface, object);
if (!result->object)
return E_POINTER;
*object = result->object;
IUnknown_AddRef(*object);
return S_OK;
}
static IUnknown * WINAPI async_result_GetStateNoAddRef(IRtwqAsyncResult *iface)
{
struct async_result *result = impl_from_IRtwqAsyncResult(iface);
TRACE("%p.\n", iface);
return result->state;
}
static const IRtwqAsyncResultVtbl async_result_vtbl =
{
async_result_QueryInterface,
async_result_AddRef,
async_result_Release,
async_result_GetState,
async_result_GetStatus,
async_result_SetStatus,
async_result_GetObject,
async_result_GetStateNoAddRef,
};
static HRESULT create_async_result(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **out)
{
struct async_result *result;
if (!out)
return E_INVALIDARG;
result = heap_alloc_zero(sizeof(*result));
if (!result)
return E_OUTOFMEMORY;
RtwqLockPlatform();
result->result.AsyncResult.lpVtbl = &async_result_vtbl;
result->refcount = 1;
result->object = object;
if (result->object)
IUnknown_AddRef(result->object);
result->result.pCallback = callback;
if (result->result.pCallback)
IRtwqAsyncCallback_AddRef(result->result.pCallback);
result->state = state;
if (result->state)
IUnknown_AddRef(result->state);
*out = &result->result.AsyncResult;
TRACE("Created async result object %p.\n", *out);
return S_OK;
}
HRESULT WINAPI RtwqCreateAsyncResult(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state,
IRtwqAsyncResult **out)
{
TRACE("%p, %p, %p, %p.\n", object, callback, state, out);
return create_async_result(object, callback, state, out);
}
HRESULT WINAPI RtwqLockPlatform(void)
{
InterlockedIncrement(&platform_lock);
return S_OK;
}
HRESULT WINAPI RtwqUnlockPlatform(void)
{
InterlockedDecrement(&platform_lock);
return S_OK;
}
static void init_system_queues(void)
{
struct queue_desc desc;
/* Always initialize standard queue, keep the rest lazy. */
EnterCriticalSection(&queues_section);
if (system_queues[SYS_QUEUE_STANDARD].pool)
{
LeaveCriticalSection(&queues_section);
return;
}
desc.queue_type = RTWQ_STANDARD_WORKQUEUE;
desc.ops = &pool_queue_ops;
desc.target_queue = 0;
init_work_queue(&desc, &system_queues[SYS_QUEUE_STANDARD]);
LeaveCriticalSection(&queues_section);
}
HRESULT WINAPI RtwqStartup(void)
{
if (InterlockedIncrement(&platform_lock) == 1)
{
init_system_queues();
}
return S_OK;
}
static void shutdown_system_queues(void)
{
unsigned int i;
EnterCriticalSection(&queues_section);
for (i = 0; i < ARRAY_SIZE(system_queues); ++i)
{
shutdown_queue(&system_queues[i]);
}
LeaveCriticalSection(&queues_section);
}
HRESULT WINAPI RtwqShutdown(void)
{
if (platform_lock <= 0)
return S_OK;
if (InterlockedExchangeAdd(&platform_lock, -1) == 1)
{
shutdown_system_queues();
}
return S_OK;
}
HRESULT WINAPI RtwqPutWaitingWorkItem(HANDLE event, LONG priority, IRtwqAsyncResult *result, RTWQWORKITEM_KEY *key)
{
struct queue *queue;
HRESULT hr;
TRACE("%p, %d, %p, %p.\n", event, priority, result, key);
if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue)))
return hr;
hr = queue_submit_wait(queue, event, priority, result, key);
return hr;
}
static HRESULT schedule_work_item(IRtwqAsyncResult *result, INT64 timeout, RTWQWORKITEM_KEY *key)
{
struct queue *queue;
HRESULT hr;
if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue)))
return hr;
TRACE("%p, %s, %p.\n", result, wine_dbgstr_longlong(timeout), key);
return queue_submit_timer(queue, result, timeout, 0, key);
}
HRESULT WINAPI RtwqScheduleWorkItem(IRtwqAsyncResult *result, INT64 timeout, RTWQWORKITEM_KEY *key)
{
TRACE("%p, %s, %p.\n", result, wine_dbgstr_longlong(timeout), key);
return schedule_work_item(result, timeout, key);
}
struct periodic_callback
{
IRtwqAsyncCallback IRtwqAsyncCallback_iface;
LONG refcount;
RTWQPERIODICCALLBACK callback;
};
static struct periodic_callback *impl_from_IRtwqAsyncCallback(IRtwqAsyncCallback *iface)
{
return CONTAINING_RECORD(iface, struct periodic_callback, IRtwqAsyncCallback_iface);
}
static HRESULT WINAPI periodic_callback_QueryInterface(IRtwqAsyncCallback *iface, REFIID riid, void **obj)
{
if (IsEqualIID(riid, &IID_IRtwqAsyncCallback) ||
IsEqualIID(riid, &IID_IUnknown))
{
*obj = iface;
IRtwqAsyncCallback_AddRef(iface);
return S_OK;
}
*obj = NULL;
return E_NOINTERFACE;
}
static ULONG WINAPI periodic_callback_AddRef(IRtwqAsyncCallback *iface)
{
struct periodic_callback *callback = impl_from_IRtwqAsyncCallback(iface);
ULONG refcount = InterlockedIncrement(&callback->refcount);
TRACE("%p, %u.\n", iface, refcount);
return refcount;
}
static ULONG WINAPI periodic_callback_Release(IRtwqAsyncCallback *iface)
{
struct periodic_callback *callback = impl_from_IRtwqAsyncCallback(iface);
ULONG refcount = InterlockedDecrement(&callback->refcount);
TRACE("%p, %u.\n", iface, refcount);
if (!refcount)
heap_free(callback);
return refcount;
}
static HRESULT WINAPI periodic_callback_GetParameters(IRtwqAsyncCallback *iface, DWORD *flags, DWORD *queue)
{
return E_NOTIMPL;
}
static HRESULT WINAPI periodic_callback_Invoke(IRtwqAsyncCallback *iface, IRtwqAsyncResult *result)
{
struct periodic_callback *callback = impl_from_IRtwqAsyncCallback(iface);
IUnknown *context = NULL;
if (FAILED(IRtwqAsyncResult_GetObject(result, &context)))
WARN("Expected object to be set for result object.\n");
callback->callback(context);
if (context)
IUnknown_Release(context);
return S_OK;
}
static const IRtwqAsyncCallbackVtbl periodic_callback_vtbl =
{
periodic_callback_QueryInterface,
periodic_callback_AddRef,
periodic_callback_Release,
periodic_callback_GetParameters,
periodic_callback_Invoke,
};
static HRESULT create_periodic_callback_obj(RTWQPERIODICCALLBACK callback, IRtwqAsyncCallback **out)
{
struct periodic_callback *object;
object = heap_alloc(sizeof(*object));
if (!object)
return E_OUTOFMEMORY;
object->IRtwqAsyncCallback_iface.lpVtbl = &periodic_callback_vtbl;
object->refcount = 1;
object->callback = callback;
*out = &object->IRtwqAsyncCallback_iface;
return S_OK;
}
HRESULT WINAPI RtwqAddPeriodicCallback(RTWQPERIODICCALLBACK callback, IUnknown *context, DWORD *key)
{
IRtwqAsyncCallback *periodic_callback;
RTWQWORKITEM_KEY workitem_key;
IRtwqAsyncResult *result;
struct queue *queue;
HRESULT hr;
TRACE("%p, %p, %p.\n", callback, context, key);
if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue)))
return hr;
if (FAILED(hr = create_periodic_callback_obj(callback, &periodic_callback)))
return hr;
hr = create_async_result(context, periodic_callback, NULL, &result);
IRtwqAsyncCallback_Release(periodic_callback);
if (FAILED(hr))
return hr;
/* Same period MFGetTimerPeriodicity() returns. */
hr = queue_submit_timer(queue, result, 0, 10, key ? &workitem_key : NULL);
IRtwqAsyncResult_Release(result);
if (key)
*key = workitem_key;
return S_OK;
}
HRESULT WINAPI RtwqRemovePeriodicCallback(DWORD key)
{
struct queue *queue;
HRESULT hr;
TRACE("%#x.\n", key);
if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue)))
return hr;
return queue_cancel_item(queue, get_item_key(SCHEDULED_ITEM_KEY_MASK, key));
}
HRESULT WINAPI RtwqCancelWorkItem(RTWQWORKITEM_KEY key)
{
struct queue *queue;
HRESULT hr;
TRACE("%s.\n", wine_dbgstr_longlong(key));
if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue)))
return hr;
return queue_cancel_item(queue, key);
}
HRESULT WINAPI RtwqInvokeCallback(IRtwqAsyncResult *result)
{
TRACE("%p.\n", result);
return invoke_async_callback(result);
}
HRESULT WINAPI RtwqPutWorkItem(DWORD queue, LONG priority, IRtwqAsyncResult *result)
{
TRACE("%d, %d, %p.\n", queue, priority, result);
return queue_put_work_item(queue, priority, result);
}
HRESULT WINAPI RtwqAllocateWorkQueue(RTWQ_WORKQUEUE_TYPE queue_type, DWORD *queue)
{
struct queue_desc desc;
TRACE("%d, %p.\n", queue_type, queue);
desc.queue_type = queue_type;
desc.ops = &pool_queue_ops;
desc.target_queue = 0;
return alloc_user_queue(&desc, queue);
}
HRESULT WINAPI RtwqLockWorkQueue(DWORD queue)
{
TRACE("%#x.\n", queue);
return lock_user_queue(queue);
}
HRESULT WINAPI RtwqUnlockWorkQueue(DWORD queue)
{
TRACE("%#x.\n", queue);
return unlock_user_queue(queue);
}
HRESULT WINAPI RtwqSetLongRunning(DWORD queue_id, BOOL enable)
{
struct queue *queue;
HRESULT hr;
int i;
TRACE("%#x, %d.\n", queue_id, enable);
lock_user_queue(queue_id);
if (SUCCEEDED(hr = grab_queue(queue_id, &queue)))
{
for (i = 0; i < ARRAY_SIZE(queue->envs); ++i)
queue->envs[i].u.s.LongFunction = !!enable;
}
unlock_user_queue(queue_id);
return hr;
}
HRESULT WINAPI RtwqLockSharedWorkQueue(const WCHAR *usageclass, LONG priority, DWORD *taskid, DWORD *queue)
{
FIXME("%s, %d, %p, %p.\n", debugstr_w(usageclass), priority, taskid, queue);
return RtwqAllocateWorkQueue(RTWQ_STANDARD_WORKQUEUE, queue);
}
HRESULT WINAPI RtwqSetDeadline(DWORD queue_id, LONGLONG deadline, HANDLE *request)
{
FIXME("%#x, %s, %p.\n", queue_id, wine_dbgstr_longlong(deadline), request);
return E_NOTIMPL;
}
HRESULT WINAPI RtwqSetDeadline2(DWORD queue_id, LONGLONG deadline, LONGLONG predeadline, HANDLE *request)
{
FIXME("%#x, %s, %s, %p.\n", queue_id, wine_dbgstr_longlong(deadline), wine_dbgstr_longlong(predeadline), request);
return E_NOTIMPL;
}
HRESULT WINAPI RtwqCancelDeadline(HANDLE request)
{
FIXME("%p.\n", request);
return E_NOTIMPL;
}
HRESULT WINAPI RtwqAllocateSerialWorkQueue(DWORD target_queue, DWORD *queue)
{
struct queue_desc desc;
TRACE("%#x, %p.\n", target_queue, queue);
desc.queue_type = RTWQ_STANDARD_WORKQUEUE;
desc.ops = &serial_queue_ops;
desc.target_queue = target_queue;
return alloc_user_queue(&desc, queue);
}