mfplat: Implement waiting items with ability to cancel.
Signed-off-by: Nikolay Sivov <nsivov@codeweavers.com> Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
parent
859f9bacc2
commit
1a53a655d1
|
@ -120,6 +120,7 @@
|
|||
@ stub MFJoinIoPort
|
||||
@ stdcall MFLockPlatform()
|
||||
@ stdcall MFLockWorkQueue(long)
|
||||
@ stdcall MFPutWaitingWorkItem(long long ptr ptr)
|
||||
@ stdcall MFPutWorkItem(long ptr ptr)
|
||||
@ stdcall MFPutWorkItemEx(long ptr)
|
||||
@ stub MFRecordError
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
|
||||
#include "wine/debug.h"
|
||||
#include "wine/heap.h"
|
||||
#include "wine/list.h"
|
||||
|
||||
#include "mfplat_private.h"
|
||||
|
||||
|
@ -33,16 +34,34 @@ WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
|
|||
#define FIRST_USER_QUEUE_HANDLE 5
|
||||
#define MAX_USER_QUEUE_HANDLES 124
|
||||
|
||||
#define WAIT_ITEM_KEY_MASK (0x82000000)
|
||||
|
||||
static LONG next_item_key;
|
||||
|
||||
static MFWORKITEM_KEY generate_item_key(DWORD mask)
|
||||
{
|
||||
return ((MFWORKITEM_KEY)mask << 32) | InterlockedIncrement(&next_item_key);
|
||||
}
|
||||
|
||||
struct work_item
|
||||
{
|
||||
struct list entry;
|
||||
LONG refcount;
|
||||
IMFAsyncResult *result;
|
||||
struct queue *queue;
|
||||
MFWORKITEM_KEY key;
|
||||
union
|
||||
{
|
||||
TP_WAIT *wait_object;
|
||||
} u;
|
||||
};
|
||||
|
||||
struct queue
|
||||
{
|
||||
TP_POOL *pool;
|
||||
TP_CALLBACK_ENVIRON env;
|
||||
CRITICAL_SECTION cs;
|
||||
struct list pending_items;
|
||||
};
|
||||
|
||||
struct queue_handle
|
||||
|
@ -121,6 +140,8 @@ static struct work_item * alloc_work_item(struct queue *queue, IMFAsyncResult *r
|
|||
item->result = result;
|
||||
IMFAsyncResult_AddRef(item->result);
|
||||
item->refcount = 1;
|
||||
item->queue = queue;
|
||||
list_init(&item->entry);
|
||||
|
||||
return item;
|
||||
}
|
||||
|
@ -143,6 +164,8 @@ static void init_work_queue(MFASYNC_WORKQUEUE_TYPE queue_type, struct queue *que
|
|||
queue->env.Pool = queue->pool;
|
||||
queue->env.CleanupGroup = CreateThreadpoolCleanupGroup();
|
||||
queue->env.CleanupGroupCancelCallback = standard_queue_cleanup_callback;
|
||||
list_init(&queue->pending_items);
|
||||
InitializeCriticalSection(&queue->cs);
|
||||
|
||||
max_thread = (queue_type == MF_STANDARD_WORKQUEUE || queue_type == MF_WINDOW_WORKQUEUE) ? 1 : 4;
|
||||
|
||||
|
@ -210,14 +233,68 @@ void init_system_queues(void)
|
|||
LeaveCriticalSection(&queues_section);
|
||||
}
|
||||
|
||||
static HRESULT lock_user_queue(DWORD queue)
|
||||
{
|
||||
HRESULT hr = MF_E_INVALID_WORKQUEUE;
|
||||
struct queue_handle *entry;
|
||||
|
||||
if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK))
|
||||
return S_OK;
|
||||
|
||||
EnterCriticalSection(&queues_section);
|
||||
entry = get_queue_obj(queue);
|
||||
if (entry && entry->refcount)
|
||||
{
|
||||
entry->refcount++;
|
||||
hr = S_OK;
|
||||
}
|
||||
LeaveCriticalSection(&queues_section);
|
||||
return hr;
|
||||
}
|
||||
|
||||
static HRESULT unlock_user_queue(DWORD queue)
|
||||
{
|
||||
HRESULT hr = MF_E_INVALID_WORKQUEUE;
|
||||
struct queue_handle *entry;
|
||||
|
||||
if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK))
|
||||
return S_OK;
|
||||
|
||||
EnterCriticalSection(&queues_section);
|
||||
entry = get_queue_obj(queue);
|
||||
if (entry && entry->refcount)
|
||||
{
|
||||
if (--entry->refcount == 0)
|
||||
{
|
||||
entry->obj = next_free_user_queue;
|
||||
next_free_user_queue = entry;
|
||||
}
|
||||
hr = S_OK;
|
||||
}
|
||||
LeaveCriticalSection(&queues_section);
|
||||
return hr;
|
||||
}
|
||||
|
||||
static void shutdown_queue(struct queue *queue)
|
||||
{
|
||||
struct work_item *item, *item2;
|
||||
|
||||
if (!queue->pool)
|
||||
return;
|
||||
|
||||
CloseThreadpoolCleanupGroupMembers(queue->env.CleanupGroup, TRUE, NULL);
|
||||
CloseThreadpool(queue->pool);
|
||||
queue->pool = NULL;
|
||||
|
||||
EnterCriticalSection(&queue->cs);
|
||||
LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry)
|
||||
{
|
||||
list_remove(&item->entry);
|
||||
release_work_item(item);
|
||||
}
|
||||
LeaveCriticalSection(&queue->cs);
|
||||
|
||||
DeleteCriticalSection(&queue->cs);
|
||||
}
|
||||
|
||||
void shutdown_system_queues(void)
|
||||
|
@ -234,6 +311,11 @@ void shutdown_system_queues(void)
|
|||
LeaveCriticalSection(&queues_section);
|
||||
}
|
||||
|
||||
static void grab_work_item(struct work_item *item)
|
||||
{
|
||||
InterlockedIncrement(&item->refcount);
|
||||
}
|
||||
|
||||
static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work)
|
||||
{
|
||||
struct work_item *item = context;
|
||||
|
@ -275,6 +357,124 @@ static HRESULT queue_put_work_item(DWORD queue_id, IMFAsyncResult *result)
|
|||
return hr;
|
||||
}
|
||||
|
||||
static HRESULT invoke_async_callback(IMFAsyncResult *result)
|
||||
{
|
||||
MFASYNCRESULT *result_data = (MFASYNCRESULT *)result;
|
||||
DWORD queue = MFASYNC_CALLBACK_QUEUE_STANDARD, flags;
|
||||
HRESULT hr;
|
||||
|
||||
if (FAILED(IMFAsyncCallback_GetParameters(result_data->pCallback, &flags, &queue)))
|
||||
queue = MFASYNC_CALLBACK_QUEUE_STANDARD;
|
||||
|
||||
if (FAILED(lock_user_queue(queue)))
|
||||
queue = MFASYNC_CALLBACK_QUEUE_STANDARD;
|
||||
|
||||
hr = queue_put_work_item(queue, result);
|
||||
|
||||
unlock_user_queue(queue);
|
||||
|
||||
return hr;
|
||||
}
|
||||
|
||||
static void queue_release_pending_item(struct work_item *item)
|
||||
{
|
||||
EnterCriticalSection(&item->queue->cs);
|
||||
if (item->key)
|
||||
{
|
||||
list_remove(&item->entry);
|
||||
item->key = 0;
|
||||
release_work_item(item);
|
||||
}
|
||||
LeaveCriticalSection(&item->queue->cs);
|
||||
}
|
||||
|
||||
static void CALLBACK waiting_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait,
|
||||
TP_WAIT_RESULT wait_result)
|
||||
{
|
||||
struct work_item *item = context;
|
||||
|
||||
TRACE("result object %p.\n", item->result);
|
||||
|
||||
invoke_async_callback(item->result);
|
||||
|
||||
release_work_item(item);
|
||||
}
|
||||
|
||||
static void CALLBACK waiting_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait,
|
||||
TP_WAIT_RESULT wait_result)
|
||||
{
|
||||
struct work_item *item = context;
|
||||
|
||||
TRACE("result object %p.\n", item->result);
|
||||
|
||||
queue_release_pending_item(item);
|
||||
|
||||
invoke_async_callback(item->result);
|
||||
|
||||
release_work_item(item);
|
||||
}
|
||||
|
||||
static void queue_mark_item_pending(DWORD mask, struct work_item *item, MFWORKITEM_KEY *key)
|
||||
{
|
||||
*key = generate_item_key(mask);
|
||||
item->key = *key;
|
||||
|
||||
EnterCriticalSection(&item->queue->cs);
|
||||
list_add_tail(&item->queue->pending_items, &item->entry);
|
||||
grab_work_item(item);
|
||||
LeaveCriticalSection(&item->queue->cs);
|
||||
}
|
||||
|
||||
static HRESULT queue_submit_wait(struct queue *queue, HANDLE event, LONG priority, IMFAsyncResult *result,
|
||||
MFWORKITEM_KEY *key)
|
||||
{
|
||||
PTP_WAIT_CALLBACK callback;
|
||||
struct work_item *item;
|
||||
|
||||
if (!(item = alloc_work_item(queue, result)))
|
||||
return E_OUTOFMEMORY;
|
||||
|
||||
if (key)
|
||||
{
|
||||
queue_mark_item_pending(WAIT_ITEM_KEY_MASK, item, key);
|
||||
callback = waiting_item_cancelable_callback;
|
||||
}
|
||||
else
|
||||
callback = waiting_item_callback;
|
||||
|
||||
item->u.wait_object = CreateThreadpoolWait(callback, item, &queue->env);
|
||||
SetThreadpoolWait(item->u.wait_object, event, NULL);
|
||||
|
||||
TRACE("dispatched %p.\n", result);
|
||||
|
||||
return S_OK;
|
||||
}
|
||||
|
||||
static HRESULT queue_cancel_item(struct queue *queue, MFWORKITEM_KEY key)
|
||||
{
|
||||
HRESULT hr = MF_E_NOT_FOUND;
|
||||
struct work_item *item;
|
||||
|
||||
EnterCriticalSection(&queue->cs);
|
||||
LIST_FOR_EACH_ENTRY(item, &queue->pending_items, struct work_item, entry)
|
||||
{
|
||||
if (item->key == key)
|
||||
{
|
||||
key >>= 32;
|
||||
if ((key & WAIT_ITEM_KEY_MASK) == WAIT_ITEM_KEY_MASK)
|
||||
CloseThreadpoolWait(item->u.wait_object);
|
||||
else
|
||||
WARN("Unknown item key mask %#x.\n", (DWORD)key);
|
||||
queue_release_pending_item(item);
|
||||
hr = S_OK;
|
||||
break;
|
||||
}
|
||||
}
|
||||
LeaveCriticalSection(&queue->cs);
|
||||
|
||||
return hr;
|
||||
}
|
||||
|
||||
static HRESULT alloc_user_queue(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue_id)
|
||||
{
|
||||
struct queue_handle *entry;
|
||||
|
@ -313,48 +513,6 @@ static HRESULT alloc_user_queue(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue_
|
|||
return S_OK;
|
||||
}
|
||||
|
||||
static HRESULT lock_user_queue(DWORD queue)
|
||||
{
|
||||
HRESULT hr = MF_E_INVALID_WORKQUEUE;
|
||||
struct queue_handle *entry;
|
||||
|
||||
if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK))
|
||||
return S_OK;
|
||||
|
||||
EnterCriticalSection(&queues_section);
|
||||
entry = get_queue_obj(queue);
|
||||
if (entry && entry->refcount)
|
||||
{
|
||||
entry->refcount++;
|
||||
hr = S_OK;
|
||||
}
|
||||
LeaveCriticalSection(&queues_section);
|
||||
return hr;
|
||||
}
|
||||
|
||||
static HRESULT unlock_user_queue(DWORD queue)
|
||||
{
|
||||
HRESULT hr = MF_E_INVALID_WORKQUEUE;
|
||||
struct queue_handle *entry;
|
||||
|
||||
if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK))
|
||||
return S_OK;
|
||||
|
||||
EnterCriticalSection(&queues_section);
|
||||
entry = get_queue_obj(queue);
|
||||
if (entry && entry->refcount)
|
||||
{
|
||||
if (--entry->refcount == 0)
|
||||
{
|
||||
entry->obj = next_free_user_queue;
|
||||
next_free_user_queue = entry;
|
||||
}
|
||||
hr = S_OK;
|
||||
}
|
||||
LeaveCriticalSection(&queues_section);
|
||||
return hr;
|
||||
}
|
||||
|
||||
struct async_result
|
||||
{
|
||||
MFASYNCRESULT result;
|
||||
|
@ -606,23 +764,9 @@ HRESULT WINAPI MFPutWorkItemEx(DWORD queue, IMFAsyncResult *result)
|
|||
*/
|
||||
HRESULT WINAPI MFInvokeCallback(IMFAsyncResult *result)
|
||||
{
|
||||
MFASYNCRESULT *result_data = (MFASYNCRESULT *)result;
|
||||
DWORD queue = MFASYNC_CALLBACK_QUEUE_STANDARD, flags;
|
||||
HRESULT hr;
|
||||
|
||||
TRACE("%p.\n", result);
|
||||
|
||||
if (FAILED(IMFAsyncCallback_GetParameters(result_data->pCallback, &flags, &queue)))
|
||||
queue = MFASYNC_CALLBACK_QUEUE_STANDARD;
|
||||
|
||||
if (FAILED(MFLockWorkQueue(queue)))
|
||||
queue = MFASYNC_CALLBACK_QUEUE_STANDARD;
|
||||
|
||||
hr = MFPutWorkItemEx(queue, result);
|
||||
|
||||
MFUnlockWorkQueue(queue);
|
||||
|
||||
return hr;
|
||||
return invoke_async_callback(result);
|
||||
}
|
||||
|
||||
static HRESULT schedule_work_item(IMFAsyncResult *result, INT64 timeout, MFWORKITEM_KEY *key)
|
||||
|
@ -662,12 +806,38 @@ HRESULT WINAPI MFScheduleWorkItem(IMFAsyncCallback *callback, IUnknown *state, I
|
|||
return hr;
|
||||
}
|
||||
|
||||
/***********************************************************************
|
||||
* MFPutWaitingWorkItem (mfplat.@)
|
||||
*/
|
||||
HRESULT WINAPI MFPutWaitingWorkItem(HANDLE event, LONG priority, IMFAsyncResult *result, MFWORKITEM_KEY *key)
|
||||
{
|
||||
struct queue *queue;
|
||||
HRESULT hr;
|
||||
|
||||
TRACE("%p, %d, %p, %p.\n", event, priority, result, key);
|
||||
|
||||
if (FAILED(hr = grab_queue(MFASYNC_CALLBACK_QUEUE_TIMER, &queue)))
|
||||
return hr;
|
||||
|
||||
hr = queue_submit_wait(queue, event, priority, result, key);
|
||||
|
||||
return hr;
|
||||
}
|
||||
|
||||
/***********************************************************************
|
||||
* MFCancelWorkItem (mfplat.@)
|
||||
*/
|
||||
HRESULT WINAPI MFCancelWorkItem(MFWORKITEM_KEY key)
|
||||
{
|
||||
FIXME("%s.\n", wine_dbgstr_longlong(key));
|
||||
struct queue *queue;
|
||||
HRESULT hr;
|
||||
|
||||
return E_NOTIMPL;
|
||||
TRACE("%s.\n", wine_dbgstr_longlong(key));
|
||||
|
||||
if (FAILED(hr = grab_queue(MFASYNC_CALLBACK_QUEUE_TIMER, &queue)))
|
||||
return hr;
|
||||
|
||||
hr = queue_cancel_item(queue, key);
|
||||
|
||||
return hr;
|
||||
}
|
||||
|
|
|
@ -1058,12 +1058,11 @@ todo_wine
|
|||
ok(hr == S_OK, "Failed to cancel item, hr %#x.\n", hr);
|
||||
|
||||
hr = MFCancelWorkItem(key);
|
||||
todo_wine
|
||||
ok(hr == MF_E_NOT_FOUND || broken(hr == S_OK) /* < win10 */, "Unexpected hr %#x.\n", hr);
|
||||
|
||||
if (!pMFPutWaitingWorkItem)
|
||||
{
|
||||
skip("Waiting items are not supported.\n");
|
||||
win_skip("Waiting items are not supported.\n");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1085,9 +1084,11 @@ todo_wine
|
|||
IMFAsyncResult_Release(result);
|
||||
|
||||
hr = MFScheduleWorkItem(&callback, NULL, -5000, &key);
|
||||
todo_wine
|
||||
ok(hr == S_OK, "Failed to schedule item, hr %#x.\n", hr);
|
||||
|
||||
hr = MFCancelWorkItem(key);
|
||||
todo_wine
|
||||
ok(hr == S_OK, "Failed to cancel item, hr %#x.\n", hr);
|
||||
|
||||
hr = MFShutdown();
|
||||
|
|
Loading…
Reference in New Issue