/* * Copyright 2019-2020 Nikolay Sivov for CodeWeavers * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA */ #define COBJMACROS #define NONAMELESSUNION #include "initguid.h" #include "rtworkq.h" #include "wine/debug.h" #include "wine/heap.h" #include "wine/list.h" WINE_DEFAULT_DEBUG_CHANNEL(mfplat); #define FIRST_USER_QUEUE_HANDLE 5 #define MAX_USER_QUEUE_HANDLES 124 #define WAIT_ITEM_KEY_MASK (0x82000000) #define SCHEDULED_ITEM_KEY_MASK (0x80000000) static LONG next_item_key; static RTWQWORKITEM_KEY get_item_key(DWORD mask, DWORD key) { return ((RTWQWORKITEM_KEY)mask << 32) | key; } static RTWQWORKITEM_KEY generate_item_key(DWORD mask) { return get_item_key(mask, InterlockedIncrement(&next_item_key)); } struct queue_handle { void *obj; LONG refcount; WORD generation; }; static struct queue_handle user_queues[MAX_USER_QUEUE_HANDLES]; static struct queue_handle *next_free_user_queue; static struct queue_handle *next_unused_user_queue = user_queues; static WORD queue_generation; static CRITICAL_SECTION queues_section; static CRITICAL_SECTION_DEBUG queues_critsect_debug = { 0, 0, &queues_section, { &queues_critsect_debug.ProcessLocksList, &queues_critsect_debug.ProcessLocksList }, 0, 0, { (DWORD_PTR)(__FILE__ ": queues_section") } }; static CRITICAL_SECTION queues_section = { &queues_critsect_debug, -1, 0, 0, 0, 0 }; static LONG platform_lock; static struct queue_handle *get_queue_obj(DWORD handle) { unsigned int idx = HIWORD(handle) - FIRST_USER_QUEUE_HANDLE; if (idx < MAX_USER_QUEUE_HANDLES && user_queues[idx].refcount) { if (LOWORD(handle) == user_queues[idx].generation) return &user_queues[idx]; } return NULL; } /* Should be kept in sync with corresponding MFASYNC_CALLBACK_ constants. */ enum rtwq_callback_queue_id { RTWQ_CALLBACK_QUEUE_UNDEFINED = 0x00000000, RTWQ_CALLBACK_QUEUE_STANDARD = 0x00000001, RTWQ_CALLBACK_QUEUE_RT = 0x00000002, RTWQ_CALLBACK_QUEUE_IO = 0x00000003, RTWQ_CALLBACK_QUEUE_TIMER = 0x00000004, RTWQ_CALLBACK_QUEUE_MULTITHREADED = 0x00000005, RTWQ_CALLBACK_QUEUE_LONG_FUNCTION = 0x00000007, RTWQ_CALLBACK_QUEUE_PRIVATE_MASK = 0xffff0000, RTWQ_CALLBACK_QUEUE_ALL = 0xffffffff, }; enum system_queue_index { SYS_QUEUE_STANDARD = 0, SYS_QUEUE_RT, SYS_QUEUE_IO, SYS_QUEUE_TIMER, SYS_QUEUE_MULTITHREADED, SYS_QUEUE_DO_NOT_USE, SYS_QUEUE_LONG_FUNCTION, SYS_QUEUE_COUNT, }; struct work_item { IUnknown IUnknown_iface; LONG refcount; struct list entry; IRtwqAsyncResult *result; struct queue *queue; RTWQWORKITEM_KEY key; union { TP_WAIT *wait_object; TP_TIMER *timer_object; } u; }; static struct work_item *work_item_impl_from_IUnknown(IUnknown *iface) { return CONTAINING_RECORD(iface, struct work_item, IUnknown_iface); } static const TP_CALLBACK_PRIORITY priorities[] = { TP_CALLBACK_PRIORITY_HIGH, TP_CALLBACK_PRIORITY_NORMAL, TP_CALLBACK_PRIORITY_LOW, }; struct queue { TP_POOL *pool; TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)]; CRITICAL_SECTION cs; struct list pending_items; }; static struct queue system_queues[SYS_QUEUE_COUNT]; static struct queue *get_system_queue(DWORD queue_id) { switch (queue_id) { case RTWQ_CALLBACK_QUEUE_STANDARD: case RTWQ_CALLBACK_QUEUE_RT: case RTWQ_CALLBACK_QUEUE_IO: case RTWQ_CALLBACK_QUEUE_TIMER: case RTWQ_CALLBACK_QUEUE_MULTITHREADED: case RTWQ_CALLBACK_QUEUE_LONG_FUNCTION: return &system_queues[queue_id - 1]; default: return NULL; } } static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data) { } static HRESULT WINAPI work_item_QueryInterface(IUnknown *iface, REFIID riid, void **obj) { if (IsEqualIID(riid, &IID_IUnknown)) { *obj = iface; IUnknown_AddRef(iface); return S_OK; } *obj = NULL; return E_NOINTERFACE; } static ULONG WINAPI work_item_AddRef(IUnknown *iface) { struct work_item *item = work_item_impl_from_IUnknown(iface); return InterlockedIncrement(&item->refcount); } static ULONG WINAPI work_item_Release(IUnknown *iface) { struct work_item *item = work_item_impl_from_IUnknown(iface); ULONG refcount = InterlockedDecrement(&item->refcount); if (!refcount) { IRtwqAsyncResult_Release(item->result); heap_free(item); } return refcount; } static const IUnknownVtbl work_item_vtbl = { work_item_QueryInterface, work_item_AddRef, work_item_Release, }; static struct work_item * alloc_work_item(struct queue *queue, IRtwqAsyncResult *result) { struct work_item *item; item = heap_alloc_zero(sizeof(*item)); item->IUnknown_iface.lpVtbl = &work_item_vtbl; item->result = result; IRtwqAsyncResult_AddRef(item->result); item->refcount = 1; item->queue = queue; list_init(&item->entry); return item; } static void init_work_queue(RTWQ_WORKQUEUE_TYPE queue_type, struct queue *queue) { TP_CALLBACK_ENVIRON_V3 env; unsigned int max_thread, i; queue->pool = CreateThreadpool(NULL); memset(&env, 0, sizeof(env)); env.Version = 3; env.Size = sizeof(env); env.Pool = queue->pool; env.CleanupGroup = CreateThreadpoolCleanupGroup(); env.CleanupGroupCancelCallback = standard_queue_cleanup_callback; env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL; for (i = 0; i < ARRAY_SIZE(queue->envs); ++i) { queue->envs[i] = env; queue->envs[i].CallbackPriority = priorities[i]; } list_init(&queue->pending_items); InitializeCriticalSection(&queue->cs); max_thread = (queue_type == RTWQ_STANDARD_WORKQUEUE || queue_type == RTWQ_WINDOW_WORKQUEUE) ? 1 : 4; SetThreadpoolThreadMinimum(queue->pool, 1); SetThreadpoolThreadMaximum(queue->pool, max_thread); if (queue_type == RTWQ_WINDOW_WORKQUEUE) FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n"); } static HRESULT grab_queue(DWORD queue_id, struct queue **ret) { struct queue *queue = get_system_queue(queue_id); RTWQ_WORKQUEUE_TYPE queue_type; struct queue_handle *entry; *ret = NULL; if (!system_queues[SYS_QUEUE_STANDARD].pool) return RTWQ_E_SHUTDOWN; if (queue && queue->pool) { *ret = queue; return S_OK; } else if (queue) { EnterCriticalSection(&queues_section); switch (queue_id) { case RTWQ_CALLBACK_QUEUE_IO: case RTWQ_CALLBACK_QUEUE_MULTITHREADED: case RTWQ_CALLBACK_QUEUE_LONG_FUNCTION: queue_type = RTWQ_MULTITHREADED_WORKQUEUE; break; default: queue_type = RTWQ_STANDARD_WORKQUEUE; } init_work_queue(queue_type, queue); LeaveCriticalSection(&queues_section); *ret = queue; return S_OK; } /* Handles user queues. */ if ((entry = get_queue_obj(queue_id))) *ret = entry->obj; return *ret ? S_OK : RTWQ_E_INVALID_WORKQUEUE; } static HRESULT lock_user_queue(DWORD queue) { HRESULT hr = RTWQ_E_INVALID_WORKQUEUE; struct queue_handle *entry; if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK)) return S_OK; EnterCriticalSection(&queues_section); entry = get_queue_obj(queue); if (entry && entry->refcount) { entry->refcount++; hr = S_OK; } LeaveCriticalSection(&queues_section); return hr; } static void shutdown_queue(struct queue *queue) { struct work_item *item, *item2; if (!queue->pool) return; CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL); CloseThreadpool(queue->pool); queue->pool = NULL; EnterCriticalSection(&queue->cs); LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry) { list_remove(&item->entry); IUnknown_Release(&item->IUnknown_iface); } LeaveCriticalSection(&queue->cs); DeleteCriticalSection(&queue->cs); } static HRESULT unlock_user_queue(DWORD queue) { HRESULT hr = RTWQ_E_INVALID_WORKQUEUE; struct queue_handle *entry; if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK)) return S_OK; EnterCriticalSection(&queues_section); entry = get_queue_obj(queue); if (entry && entry->refcount) { if (--entry->refcount == 0) { shutdown_queue((struct queue *)entry->obj); heap_free(entry->obj); entry->obj = next_free_user_queue; next_free_user_queue = entry; } hr = S_OK; } LeaveCriticalSection(&queues_section); return hr; } static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work) { struct work_item *item = context; RTWQASYNCRESULT *result = (RTWQASYNCRESULT *)item->result; TRACE("result object %p.\n", result); IRtwqAsyncCallback_Invoke(result->pCallback, item->result); IUnknown_Release(&item->IUnknown_iface); } static HRESULT queue_submit_item(struct queue *queue, LONG priority, IRtwqAsyncResult *result) { TP_CALLBACK_PRIORITY callback_priority; struct work_item *item; TP_WORK *work_object; if (!(item = alloc_work_item(queue, result))) return E_OUTOFMEMORY; if (priority == 0) callback_priority = TP_CALLBACK_PRIORITY_NORMAL; else if (priority < 0) callback_priority = TP_CALLBACK_PRIORITY_LOW; else callback_priority = TP_CALLBACK_PRIORITY_HIGH; work_object = CreateThreadpoolWork(standard_queue_worker, item, (TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]); SubmitThreadpoolWork(work_object); TRACE("dispatched %p.\n", result); return S_OK; } static HRESULT queue_put_work_item(DWORD queue_id, LONG priority, IRtwqAsyncResult *result) { struct queue *queue; HRESULT hr; if (FAILED(hr = grab_queue(queue_id, &queue))) return hr; return queue_submit_item(queue, priority, result); } static HRESULT invoke_async_callback(IRtwqAsyncResult *result) { RTWQASYNCRESULT *result_data = (RTWQASYNCRESULT *)result; DWORD queue = RTWQ_CALLBACK_QUEUE_STANDARD, flags; HRESULT hr; if (FAILED(IRtwqAsyncCallback_GetParameters(result_data->pCallback, &flags, &queue))) queue = RTWQ_CALLBACK_QUEUE_STANDARD; if (FAILED(lock_user_queue(queue))) queue = RTWQ_CALLBACK_QUEUE_STANDARD; hr = queue_put_work_item(queue, 0, result); unlock_user_queue(queue); return hr; } static void queue_release_pending_item(struct work_item *item) { EnterCriticalSection(&item->queue->cs); if (item->key) { list_remove(&item->entry); item->key = 0; IUnknown_Release(&item->IUnknown_iface); } LeaveCriticalSection(&item->queue->cs); } static void CALLBACK waiting_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait, TP_WAIT_RESULT wait_result) { struct work_item *item = context; TRACE("result object %p.\n", item->result); invoke_async_callback(item->result); IUnknown_Release(&item->IUnknown_iface); } static void CALLBACK waiting_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait, TP_WAIT_RESULT wait_result) { struct work_item *item = context; TRACE("result object %p.\n", item->result); queue_release_pending_item(item); invoke_async_callback(item->result); IUnknown_Release(&item->IUnknown_iface); } static void CALLBACK scheduled_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer) { struct work_item *item = context; TRACE("result object %p.\n", item->result); invoke_async_callback(item->result); IUnknown_Release(&item->IUnknown_iface); } static void CALLBACK scheduled_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer) { struct work_item *item = context; TRACE("result object %p.\n", item->result); queue_release_pending_item(item); invoke_async_callback(item->result); IUnknown_Release(&item->IUnknown_iface); } static void CALLBACK periodic_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer) { struct work_item *item = context; IUnknown_AddRef(&item->IUnknown_iface); invoke_async_callback(item->result); IUnknown_Release(&item->IUnknown_iface); } static void queue_mark_item_pending(DWORD mask, struct work_item *item, RTWQWORKITEM_KEY *key) { *key = generate_item_key(mask); item->key = *key; EnterCriticalSection(&item->queue->cs); list_add_tail(&item->queue->pending_items, &item->entry); IUnknown_AddRef(&item->IUnknown_iface); LeaveCriticalSection(&item->queue->cs); } static HRESULT queue_submit_wait(struct queue *queue, HANDLE event, LONG priority, IRtwqAsyncResult *result, RTWQWORKITEM_KEY *key) { PTP_WAIT_CALLBACK callback; struct work_item *item; if (!(item = alloc_work_item(queue, result))) return E_OUTOFMEMORY; if (key) { queue_mark_item_pending(WAIT_ITEM_KEY_MASK, item, key); callback = waiting_item_cancelable_callback; } else callback = waiting_item_callback; item->u.wait_object = CreateThreadpoolWait(callback, item, (TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]); SetThreadpoolWait(item->u.wait_object, event, NULL); TRACE("dispatched %p.\n", result); return S_OK; } static HRESULT queue_submit_timer(struct queue *queue, IRtwqAsyncResult *result, INT64 timeout, DWORD period, RTWQWORKITEM_KEY *key) { PTP_TIMER_CALLBACK callback; struct work_item *item; FILETIME filetime; LARGE_INTEGER t; if (!(item = alloc_work_item(queue, result))) return E_OUTOFMEMORY; if (key) { queue_mark_item_pending(SCHEDULED_ITEM_KEY_MASK, item, key); } if (period) callback = periodic_item_callback; else callback = key ? scheduled_item_cancelable_callback : scheduled_item_callback; t.QuadPart = timeout * 1000 * 10; filetime.dwLowDateTime = t.u.LowPart; filetime.dwHighDateTime = t.u.HighPart; item->u.timer_object = CreateThreadpoolTimer(callback, item, (TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]); SetThreadpoolTimer(item->u.timer_object, &filetime, period, 0); TRACE("dispatched %p.\n", result); return S_OK; } static HRESULT queue_cancel_item(struct queue *queue, RTWQWORKITEM_KEY key) { HRESULT hr = RTWQ_E_NOT_FOUND; struct work_item *item; EnterCriticalSection(&queue->cs); LIST_FOR_EACH_ENTRY(item, &queue->pending_items, struct work_item, entry) { if (item->key == key) { key >>= 32; if ((key & WAIT_ITEM_KEY_MASK) == WAIT_ITEM_KEY_MASK) CloseThreadpoolWait(item->u.wait_object); else if ((key & SCHEDULED_ITEM_KEY_MASK) == SCHEDULED_ITEM_KEY_MASK) CloseThreadpoolTimer(item->u.timer_object); else WARN("Unknown item key mask %#x.\n", (DWORD)key); queue_release_pending_item(item); hr = S_OK; break; } } LeaveCriticalSection(&queue->cs); return hr; } static HRESULT alloc_user_queue(RTWQ_WORKQUEUE_TYPE queue_type, DWORD *queue_id) { struct queue_handle *entry; struct queue *queue; unsigned int idx; *queue_id = RTWQ_CALLBACK_QUEUE_UNDEFINED; if (platform_lock <= 0) return RTWQ_E_SHUTDOWN; queue = heap_alloc_zero(sizeof(*queue)); if (!queue) return E_OUTOFMEMORY; init_work_queue(queue_type, queue); EnterCriticalSection(&queues_section); entry = next_free_user_queue; if (entry) next_free_user_queue = entry->obj; else if (next_unused_user_queue < user_queues + MAX_USER_QUEUE_HANDLES) entry = next_unused_user_queue++; else { LeaveCriticalSection(&queues_section); heap_free(queue); WARN("Out of user queue handles.\n"); return E_OUTOFMEMORY; } entry->refcount = 1; entry->obj = queue; if (++queue_generation == 0xffff) queue_generation = 1; entry->generation = queue_generation; idx = entry - user_queues + FIRST_USER_QUEUE_HANDLE; *queue_id = (idx << 16) | entry->generation; LeaveCriticalSection(&queues_section); return S_OK; } struct async_result { RTWQASYNCRESULT result; LONG refcount; IUnknown *object; IUnknown *state; }; static struct async_result *impl_from_IRtwqAsyncResult(IRtwqAsyncResult *iface) { return CONTAINING_RECORD(iface, struct async_result, result.AsyncResult); } static HRESULT WINAPI async_result_QueryInterface(IRtwqAsyncResult *iface, REFIID riid, void **obj) { TRACE("%p, %s, %p.\n", iface, debugstr_guid(riid), obj); if (IsEqualIID(riid, &IID_IRtwqAsyncResult) || IsEqualIID(riid, &IID_IUnknown)) { *obj = iface; IRtwqAsyncResult_AddRef(iface); return S_OK; } *obj = NULL; WARN("Unsupported interface %s.\n", debugstr_guid(riid)); return E_NOINTERFACE; } static ULONG WINAPI async_result_AddRef(IRtwqAsyncResult *iface) { struct async_result *result = impl_from_IRtwqAsyncResult(iface); ULONG refcount = InterlockedIncrement(&result->refcount); TRACE("%p, %u.\n", iface, refcount); return refcount; } static ULONG WINAPI async_result_Release(IRtwqAsyncResult *iface) { struct async_result *result = impl_from_IRtwqAsyncResult(iface); ULONG refcount = InterlockedDecrement(&result->refcount); TRACE("%p, %u.\n", iface, refcount); if (!refcount) { if (result->result.pCallback) IRtwqAsyncCallback_Release(result->result.pCallback); if (result->object) IUnknown_Release(result->object); if (result->state) IUnknown_Release(result->state); if (result->result.hEvent) CloseHandle(result->result.hEvent); heap_free(result); RtwqUnlockPlatform(); } return refcount; } static HRESULT WINAPI async_result_GetState(IRtwqAsyncResult *iface, IUnknown **state) { struct async_result *result = impl_from_IRtwqAsyncResult(iface); TRACE("%p, %p.\n", iface, state); if (!result->state) return E_POINTER; *state = result->state; IUnknown_AddRef(*state); return S_OK; } static HRESULT WINAPI async_result_GetStatus(IRtwqAsyncResult *iface) { struct async_result *result = impl_from_IRtwqAsyncResult(iface); TRACE("%p.\n", iface); return result->result.hrStatusResult; } static HRESULT WINAPI async_result_SetStatus(IRtwqAsyncResult *iface, HRESULT status) { struct async_result *result = impl_from_IRtwqAsyncResult(iface); TRACE("%p, %#x.\n", iface, status); result->result.hrStatusResult = status; return S_OK; } static HRESULT WINAPI async_result_GetObject(IRtwqAsyncResult *iface, IUnknown **object) { struct async_result *result = impl_from_IRtwqAsyncResult(iface); TRACE("%p, %p.\n", iface, object); if (!result->object) return E_POINTER; *object = result->object; IUnknown_AddRef(*object); return S_OK; } static IUnknown * WINAPI async_result_GetStateNoAddRef(IRtwqAsyncResult *iface) { struct async_result *result = impl_from_IRtwqAsyncResult(iface); TRACE("%p.\n", iface); return result->state; } static const IRtwqAsyncResultVtbl async_result_vtbl = { async_result_QueryInterface, async_result_AddRef, async_result_Release, async_result_GetState, async_result_GetStatus, async_result_SetStatus, async_result_GetObject, async_result_GetStateNoAddRef, }; static HRESULT create_async_result(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **out) { struct async_result *result; if (!out) return E_INVALIDARG; result = heap_alloc_zero(sizeof(*result)); if (!result) return E_OUTOFMEMORY; RtwqLockPlatform(); result->result.AsyncResult.lpVtbl = &async_result_vtbl; result->refcount = 1; result->object = object; if (result->object) IUnknown_AddRef(result->object); result->result.pCallback = callback; if (result->result.pCallback) IRtwqAsyncCallback_AddRef(result->result.pCallback); result->state = state; if (result->state) IUnknown_AddRef(result->state); *out = &result->result.AsyncResult; TRACE("Created async result object %p.\n", *out); return S_OK; } HRESULT WINAPI RtwqCreateAsyncResult(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **out) { TRACE("%p, %p, %p, %p.\n", object, callback, state, out); return create_async_result(object, callback, state, out); } HRESULT WINAPI RtwqLockPlatform(void) { InterlockedIncrement(&platform_lock); return S_OK; } HRESULT WINAPI RtwqUnlockPlatform(void) { InterlockedDecrement(&platform_lock); return S_OK; } static void init_system_queues(void) { /* Always initialize standard queue, keep the rest lazy. */ EnterCriticalSection(&queues_section); if (system_queues[SYS_QUEUE_STANDARD].pool) { LeaveCriticalSection(&queues_section); return; } init_work_queue(RTWQ_STANDARD_WORKQUEUE, &system_queues[SYS_QUEUE_STANDARD]); LeaveCriticalSection(&queues_section); } HRESULT WINAPI RtwqStartup(void) { if (InterlockedIncrement(&platform_lock) == 1) { init_system_queues(); } return S_OK; } static void shutdown_system_queues(void) { unsigned int i; EnterCriticalSection(&queues_section); for (i = 0; i < ARRAY_SIZE(system_queues); ++i) { shutdown_queue(&system_queues[i]); } LeaveCriticalSection(&queues_section); } HRESULT WINAPI RtwqShutdown(void) { if (platform_lock <= 0) return S_OK; if (InterlockedExchangeAdd(&platform_lock, -1) == 1) { shutdown_system_queues(); } return S_OK; } HRESULT WINAPI RtwqPutWaitingWorkItem(HANDLE event, LONG priority, IRtwqAsyncResult *result, RTWQWORKITEM_KEY *key) { struct queue *queue; HRESULT hr; TRACE("%p, %d, %p, %p.\n", event, priority, result, key); if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue))) return hr; hr = queue_submit_wait(queue, event, priority, result, key); return hr; } static HRESULT schedule_work_item(IRtwqAsyncResult *result, INT64 timeout, RTWQWORKITEM_KEY *key) { struct queue *queue; HRESULT hr; if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue))) return hr; TRACE("%p, %s, %p.\n", result, wine_dbgstr_longlong(timeout), key); return queue_submit_timer(queue, result, timeout, 0, key); } HRESULT WINAPI RtwqScheduleWorkItem(IRtwqAsyncResult *result, INT64 timeout, RTWQWORKITEM_KEY *key) { TRACE("%p, %s, %p.\n", result, wine_dbgstr_longlong(timeout), key); return schedule_work_item(result, timeout, key); } struct periodic_callback { IRtwqAsyncCallback IRtwqAsyncCallback_iface; LONG refcount; RTWQPERIODICCALLBACK callback; }; static struct periodic_callback *impl_from_IRtwqAsyncCallback(IRtwqAsyncCallback *iface) { return CONTAINING_RECORD(iface, struct periodic_callback, IRtwqAsyncCallback_iface); } static HRESULT WINAPI periodic_callback_QueryInterface(IRtwqAsyncCallback *iface, REFIID riid, void **obj) { if (IsEqualIID(riid, &IID_IRtwqAsyncCallback) || IsEqualIID(riid, &IID_IUnknown)) { *obj = iface; IRtwqAsyncCallback_AddRef(iface); return S_OK; } *obj = NULL; return E_NOINTERFACE; } static ULONG WINAPI periodic_callback_AddRef(IRtwqAsyncCallback *iface) { struct periodic_callback *callback = impl_from_IRtwqAsyncCallback(iface); ULONG refcount = InterlockedIncrement(&callback->refcount); TRACE("%p, %u.\n", iface, refcount); return refcount; } static ULONG WINAPI periodic_callback_Release(IRtwqAsyncCallback *iface) { struct periodic_callback *callback = impl_from_IRtwqAsyncCallback(iface); ULONG refcount = InterlockedDecrement(&callback->refcount); TRACE("%p, %u.\n", iface, refcount); if (!refcount) heap_free(callback); return refcount; } static HRESULT WINAPI periodic_callback_GetParameters(IRtwqAsyncCallback *iface, DWORD *flags, DWORD *queue) { return E_NOTIMPL; } static HRESULT WINAPI periodic_callback_Invoke(IRtwqAsyncCallback *iface, IRtwqAsyncResult *result) { struct periodic_callback *callback = impl_from_IRtwqAsyncCallback(iface); IUnknown *context = NULL; if (FAILED(IRtwqAsyncResult_GetObject(result, &context))) WARN("Expected object to be set for result object.\n"); callback->callback(context); if (context) IUnknown_Release(context); return S_OK; } static const IRtwqAsyncCallbackVtbl periodic_callback_vtbl = { periodic_callback_QueryInterface, periodic_callback_AddRef, periodic_callback_Release, periodic_callback_GetParameters, periodic_callback_Invoke, }; static HRESULT create_periodic_callback_obj(RTWQPERIODICCALLBACK callback, IRtwqAsyncCallback **out) { struct periodic_callback *object; object = heap_alloc(sizeof(*object)); if (!object) return E_OUTOFMEMORY; object->IRtwqAsyncCallback_iface.lpVtbl = &periodic_callback_vtbl; object->refcount = 1; object->callback = callback; *out = &object->IRtwqAsyncCallback_iface; return S_OK; } HRESULT WINAPI RtwqAddPeriodicCallback(RTWQPERIODICCALLBACK callback, IUnknown *context, DWORD *key) { IRtwqAsyncCallback *periodic_callback; RTWQWORKITEM_KEY workitem_key; IRtwqAsyncResult *result; struct queue *queue; HRESULT hr; TRACE("%p, %p, %p.\n", callback, context, key); if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue))) return hr; if (FAILED(hr = create_periodic_callback_obj(callback, &periodic_callback))) return hr; hr = create_async_result(context, periodic_callback, NULL, &result); IRtwqAsyncCallback_Release(periodic_callback); if (FAILED(hr)) return hr; /* Same period MFGetTimerPeriodicity() returns. */ hr = queue_submit_timer(queue, result, 0, 10, key ? &workitem_key : NULL); IRtwqAsyncResult_Release(result); if (key) *key = workitem_key; return S_OK; } HRESULT WINAPI RtwqRemovePeriodicCallback(DWORD key) { struct queue *queue; HRESULT hr; TRACE("%#x.\n", key); if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue))) return hr; return queue_cancel_item(queue, get_item_key(SCHEDULED_ITEM_KEY_MASK, key)); } HRESULT WINAPI RtwqCancelWorkItem(RTWQWORKITEM_KEY key) { struct queue *queue; HRESULT hr; TRACE("%s.\n", wine_dbgstr_longlong(key)); if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue))) return hr; return queue_cancel_item(queue, key); } HRESULT WINAPI RtwqInvokeCallback(IRtwqAsyncResult *result) { TRACE("%p.\n", result); return invoke_async_callback(result); } HRESULT WINAPI RtwqPutWorkItem(DWORD queue, LONG priority, IRtwqAsyncResult *result) { TRACE("%d, %d, %p.\n", queue, priority, result); return queue_put_work_item(queue, priority, result); } HRESULT WINAPI RtwqAllocateWorkQueue(RTWQ_WORKQUEUE_TYPE queue_type, DWORD *queue) { TRACE("%d, %p.\n", queue_type, queue); return alloc_user_queue(queue_type, queue); } HRESULT WINAPI RtwqLockWorkQueue(DWORD queue) { TRACE("%#x.\n", queue); return lock_user_queue(queue); } HRESULT WINAPI RtwqUnlockWorkQueue(DWORD queue) { TRACE("%#x.\n", queue); return unlock_user_queue(queue); } HRESULT WINAPI RtwqSetLongRunning(DWORD queue_id, BOOL enable) { struct queue *queue; HRESULT hr; int i; TRACE("%#x, %d.\n", queue_id, enable); lock_user_queue(queue_id); if (SUCCEEDED(hr = grab_queue(queue_id, &queue))) { for (i = 0; i < ARRAY_SIZE(queue->envs); ++i) queue->envs[i].u.s.LongFunction = !!enable; } unlock_user_queue(queue_id); return hr; } HRESULT WINAPI RtwqLockSharedWorkQueue(const WCHAR *usageclass, LONG priority, DWORD *taskid, DWORD *queue) { FIXME("%s, %d, %p, %p.\n", debugstr_w(usageclass), priority, taskid, queue); return RtwqAllocateWorkQueue(RTWQ_STANDARD_WORKQUEUE, queue); } HRESULT WINAPI RtwqSetDeadline(DWORD queue_id, LONGLONG deadline, HANDLE *request) { FIXME("%#x, %s, %p.\n", queue_id, wine_dbgstr_longlong(deadline), request); return E_NOTIMPL; } HRESULT WINAPI RtwqSetDeadline2(DWORD queue_id, LONGLONG deadline, LONGLONG predeadline, HANDLE *request) { FIXME("%#x, %s, %s, %p.\n", queue_id, wine_dbgstr_longlong(deadline), wine_dbgstr_longlong(predeadline), request); return E_NOTIMPL; } HRESULT WINAPI RtwqCancelDeadline(HANDLE request) { FIXME("%p.\n", request); return E_NOTIMPL; }