quartz/filesource: Use a completion port to handle asynchronous requests.

Thus simplifying the code and allowing us to handle more than 63 at once.

Signed-off-by: Zebediah Figura <z.figura12@gmail.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
Zebediah Figura 2020-01-29 19:56:59 -06:00 committed by Alexandre Julliard
parent 1bb953c676
commit abac070387
2 changed files with 176 additions and 232 deletions

View File

@ -46,12 +46,12 @@ static const AM_MEDIA_TYPE default_mt =
NULL
};
typedef struct DATAREQUEST
struct request
{
IMediaSample *pSample;
DWORD_PTR dwUserData;
IMediaSample *sample;
DWORD_PTR cookie;
OVERLAPPED ovl;
} DATAREQUEST;
};
typedef struct AsyncReader
{
@ -64,15 +64,12 @@ typedef struct AsyncReader
LPOLESTR pszFileName;
AM_MEDIA_TYPE *pmt;
ALLOCATOR_PROPERTIES allocProps;
HANDLE file;
BOOL flushing;
unsigned int queued_number;
unsigned int samples;
unsigned int oldest_sample;
HANDLE file, port, io_thread;
CRITICAL_SECTION sample_cs;
DATAREQUEST *sample_list;
/* Have a handle for every sample, and then one more as flushing handle */
HANDLE *handle_list;
BOOL flushing;
struct request *requests;
unsigned int max_requests;
CONDITION_VARIABLE sample_cv;
} AsyncReader;
static const struct strmbase_source_ops source_ops;
@ -352,18 +349,23 @@ static void async_reader_destroy(struct strmbase_filter *iface)
IPin_Disconnect(&filter->source.pin.IPin_iface);
CoTaskMemFree(filter->sample_list);
if (filter->handle_list)
if (filter->requests)
{
for (i = 0; i <= filter->samples; ++i)
CloseHandle(filter->handle_list[i]);
CoTaskMemFree(filter->handle_list);
for (i = 0; i < filter->max_requests; ++i)
CloseHandle(filter->requests[i].ovl.hEvent);
CoTaskMemFree(filter->requests);
}
CloseHandle(filter->file);
filter->sample_cs.DebugInfo->Spare[0] = 0;
DeleteCriticalSection(&filter->sample_cs);
strmbase_source_cleanup(&filter->source);
}
PostQueuedCompletionStatus(filter->port, 0, 1, NULL);
WaitForSingleObject(filter->io_thread, INFINITE);
CloseHandle(filter->io_thread);
CloseHandle(filter->port);
CoTaskMemFree(filter->pszFileName);
if (filter->pmt)
DeleteMediaType(filter->pmt);
@ -392,6 +394,42 @@ static const struct strmbase_filter_ops filter_ops =
.filter_query_interface = async_reader_query_interface,
};
static DWORD CALLBACK io_thread(void *arg)
{
AsyncReader *filter = arg;
struct request *req;
OVERLAPPED *ovl;
ULONG_PTR key;
DWORD size;
BOOL ret;
for (;;)
{
ret = GetQueuedCompletionStatus(filter->port, &size, &key, &ovl, INFINITE);
if (ret && key)
break;
EnterCriticalSection(&filter->sample_cs);
req = CONTAINING_RECORD(ovl, struct request, ovl);
TRACE("Got sample %u.\n", req - filter->requests);
assert(req >= filter->requests && req < filter->requests + filter->max_requests);
if (ret)
WakeConditionVariable(&filter->sample_cv);
else
{
ERR("GetQueuedCompletionStatus() returned failure, error %u.\n", GetLastError());
req->sample = NULL;
}
LeaveCriticalSection(&filter->sample_cs);
}
return 0;
}
HRESULT AsyncReader_create(IUnknown *outer, void **out)
{
AsyncReader *pAsyncRead;
@ -410,6 +448,12 @@ HRESULT AsyncReader_create(IUnknown *outer, void **out)
pAsyncRead->pszFileName = NULL;
pAsyncRead->pmt = NULL;
InitializeCriticalSection(&pAsyncRead->sample_cs);
pAsyncRead->sample_cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": FileAsyncReader.sample_cs");
InitializeConditionVariable(&pAsyncRead->sample_cv);
pAsyncRead->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
pAsyncRead->io_thread = CreateThread(NULL, 0, io_thread, pAsyncRead, 0, NULL);
*out = &pAsyncRead->filter.IUnknown_inner;
TRACE("-- created at %p\n", pAsyncRead);
@ -470,11 +514,7 @@ static HRESULT WINAPI FileSource_Load(IFileSourceFilter * iface, LPCOLESTR pszFi
This->file = hFile;
This->flushing = FALSE;
This->sample_list = NULL;
This->handle_list = NULL;
This->queued_number = 0;
InitializeCriticalSection(&This->sample_cs);
This->sample_cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": FileAsyncReader.sample_cs");
This->requests = NULL;
if (This->pmt)
DeleteMediaType(This->pmt);
@ -705,42 +745,33 @@ static HRESULT WINAPI FileAsyncReader_RequestAllocator(IAsyncReader * iface, IMe
done:
if (SUCCEEDED(hr))
{
CoTaskMemFree(This->sample_list);
if (This->handle_list)
if (This->requests)
{
int x;
for (x = 0; x <= This->samples; ++x)
CloseHandle(This->handle_list[x]);
CoTaskMemFree(This->handle_list);
unsigned int i;
for (i = 0; i < This->max_requests; ++i)
CloseHandle(This->requests[i].ovl.hEvent);
CoTaskMemFree(This->requests);
}
This->samples = pProps->cBuffers;
This->oldest_sample = 0;
TRACE("Samples: %u\n", This->samples);
This->sample_list = CoTaskMemAlloc(sizeof(This->sample_list[0]) * pProps->cBuffers);
This->handle_list = CoTaskMemAlloc(sizeof(HANDLE) * pProps->cBuffers * 2);
This->max_requests = pProps->cBuffers;
TRACE("Maximum request count: %u.\n", This->max_requests);
This->requests = CoTaskMemAlloc(sizeof(This->requests[0]) * pProps->cBuffers);
if (This->sample_list && This->handle_list)
if (This->requests)
{
int x;
ZeroMemory(This->sample_list, sizeof(This->sample_list[0]) * pProps->cBuffers);
for (x = 0; x < This->samples; ++x)
{
This->sample_list[x].ovl.hEvent = This->handle_list[x] = CreateEventW(NULL, 0, 0, NULL);
if (x + 1 < This->samples)
This->handle_list[This->samples + 1 + x] = This->handle_list[x];
}
This->handle_list[This->samples] = CreateEventW(NULL, 1, 0, NULL);
ZeroMemory(This->requests, sizeof(This->requests[0]) * pProps->cBuffers);
for (x = 0; x < This->max_requests; ++x)
This->requests[x].ovl.hEvent = CreateEventW(NULL, 0, 0, NULL);
This->allocProps = *pProps;
}
else
{
hr = E_OUTOFMEMORY;
CoTaskMemFree(This->sample_list);
CoTaskMemFree(This->handle_list);
This->samples = 0;
This->sample_list = NULL;
This->handle_list = NULL;
CoTaskMemFree(This->requests);
This->max_requests = 0;
This->requests = NULL;
}
}
@ -755,215 +786,107 @@ done:
return hr;
}
/* we could improve the Request/WaitForNext mechanism by allowing out of order samples.
* however, this would be quite complicated to do and may be a bit error prone */
static HRESULT WINAPI FileAsyncReader_Request(IAsyncReader * iface, IMediaSample * pSample, DWORD_PTR dwUser)
static HRESULT WINAPI FileAsyncReader_Request(IAsyncReader *iface, IMediaSample *sample, DWORD_PTR cookie)
{
AsyncReader *This = impl_from_IAsyncReader(iface);
HRESULT hr = S_OK;
REFERENCE_TIME Start;
REFERENCE_TIME Stop;
LPBYTE pBuffer = NULL;
AsyncReader *filter = impl_from_IAsyncReader(iface);
REFERENCE_TIME start, end;
struct request *req;
unsigned int i;
HRESULT hr;
BYTE *data;
TRACE("%p->(%p, %lx)\n", This, pSample, dwUser);
TRACE("filter %p, sample %p, cookie %#lx.\n", filter, sample, cookie);
if (!pSample)
if (!sample)
return E_POINTER;
/* get start and stop positions in bytes */
if (SUCCEEDED(hr))
hr = IMediaSample_GetTime(pSample, &Start, &Stop);
if (FAILED(hr = IMediaSample_GetTime(sample, &start, &end)))
return hr;
if (SUCCEEDED(hr))
hr = IMediaSample_GetPointer(pSample, &pBuffer);
if (FAILED(hr = IMediaSample_GetPointer(sample, &data)))
return hr;
EnterCriticalSection(&This->sample_cs);
if (This->flushing)
EnterCriticalSection(&filter->sample_cs);
if (filter->flushing)
{
LeaveCriticalSection(&This->sample_cs);
LeaveCriticalSection(&filter->sample_cs);
return VFW_E_WRONG_STATE;
}
if (SUCCEEDED(hr))
for (i = 0; i < filter->max_requests; ++i)
{
DWORD dwLength = (DWORD) BYTES_FROM_MEDIATIME(Stop - Start);
DATAREQUEST *pDataRq;
int x;
/* Try to insert above the waiting sample if possible */
for (x = This->oldest_sample; x < This->samples; ++x)
{
if (!This->sample_list[x].pSample)
break;
}
if (x >= This->samples)
for (x = 0; x < This->oldest_sample; ++x)
{
if (!This->sample_list[x].pSample)
break;
}
/* There must be a sample we have found */
assert(x < This->samples);
++This->queued_number;
pDataRq = This->sample_list + x;
pDataRq->ovl.u.s.Offset = (DWORD) BYTES_FROM_MEDIATIME(Start);
pDataRq->ovl.u.s.OffsetHigh = (DWORD)(BYTES_FROM_MEDIATIME(Start) >> (sizeof(DWORD) * 8));
pDataRq->dwUserData = dwUser;
/* we violate traditional COM rules here by maintaining
* a reference to the sample, but not calling AddRef, but
* that's what MSDN says to do */
pDataRq->pSample = pSample;
/* this is definitely not how it is implemented on Win9x
* as they do not support async reads on files, but it is
* sooo much easier to use this than messing around with threads!
*/
if (!ReadFile(This->file, pBuffer, dwLength, NULL, &pDataRq->ovl))
hr = HRESULT_FROM_WIN32(GetLastError());
/* ERROR_IO_PENDING is not actually an error since this is what we want! */
if (hr == HRESULT_FROM_WIN32(ERROR_IO_PENDING))
hr = S_OK;
if (!filter->requests[i].sample)
break;
}
assert(i < filter->max_requests);
req = &filter->requests[i];
LeaveCriticalSection(&This->sample_cs);
req->ovl.u.s.Offset = BYTES_FROM_MEDIATIME(start);
req->ovl.u.s.OffsetHigh = BYTES_FROM_MEDIATIME(start) >> 32;
/* No reference is taken. */
TRACE("-- %x\n", hr);
if (ReadFile(filter->file, data, BYTES_FROM_MEDIATIME(end - start), NULL, &req->ovl)
|| GetLastError() == ERROR_IO_PENDING)
{
hr = S_OK;
req->sample = sample;
req->cookie = cookie;
}
else
hr = HRESULT_FROM_WIN32(GetLastError());
LeaveCriticalSection(&filter->sample_cs);
return hr;
}
static HRESULT WINAPI FileAsyncReader_WaitForNext(IAsyncReader * iface, DWORD dwTimeout, IMediaSample ** ppSample, DWORD_PTR * pdwUser)
static HRESULT WINAPI FileAsyncReader_WaitForNext(IAsyncReader *iface,
DWORD timeout, IMediaSample **sample, DWORD_PTR *cookie)
{
AsyncReader *This = impl_from_IAsyncReader(iface);
HRESULT hr = S_OK;
DWORD buffer = ~0;
AsyncReader *filter = impl_from_IAsyncReader(iface);
unsigned int i;
TRACE("%p->(%u, %p, %p)\n", This, dwTimeout, ppSample, pdwUser);
TRACE("filter %p, timeout %u, sample %p, cookie %p.\n", filter, timeout, sample, cookie);
*ppSample = NULL;
*pdwUser = 0;
*sample = NULL;
*cookie = 0;
EnterCriticalSection(&This->sample_cs);
if (!This->flushing)
EnterCriticalSection(&filter->sample_cs);
do
{
LONG oldest = This->oldest_sample;
if (!This->queued_number)
if (filter->flushing)
{
/* It could be that nothing is queued right now, but that can be fixed */
WARN("Called without samples in queue and not flushing!!\n");
}
LeaveCriticalSection(&This->sample_cs);
/* wait for an object to read, or time out */
buffer = WaitForMultipleObjectsEx(This->samples+1, This->handle_list + oldest, FALSE, dwTimeout, TRUE);
EnterCriticalSection(&This->sample_cs);
if (buffer <= This->samples)
{
/* Re-scale the buffer back to normal */
buffer += oldest;
/* Uh oh, we overshot the flusher handle, renormalize it back to 0..Samples-1 */
if (buffer > This->samples)
buffer -= This->samples + 1;
assert(buffer <= This->samples);
LeaveCriticalSection(&filter->sample_cs);
return VFW_E_WRONG_STATE;
}
if (buffer >= This->samples)
for (i = 0; i < filter->max_requests; ++i)
{
if (buffer != This->samples)
struct request *req = &filter->requests[i];
DWORD size;
if (req->sample && GetOverlappedResult(filter->file, &req->ovl, &size, FALSE))
{
FIXME("Returned: %u (%08x)\n", buffer, GetLastError());
hr = VFW_E_TIMEOUT;
}
else
hr = VFW_E_WRONG_STATE;
buffer = ~0;
}
else
--This->queued_number;
}
REFERENCE_TIME start, end;
if (This->flushing && buffer == ~0)
{
for (buffer = 0; buffer < This->samples; ++buffer)
{
if (This->sample_list[buffer].pSample)
{
ResetEvent(This->handle_list[buffer]);
break;
IMediaSample_SetActualDataLength(req->sample, size);
start = MEDIATIME_FROM_BYTES(((ULONGLONG)req->ovl.u.s.OffsetHigh << 32) + req->ovl.u.s.Offset);
end = start + MEDIATIME_FROM_BYTES(size);
IMediaSample_SetTime(req->sample, &start, &end);
*sample = req->sample;
*cookie = req->cookie;
req->sample = NULL;
LeaveCriticalSection(&filter->sample_cs);
TRACE("Returning sample %u.\n", i);
return S_OK;
}
}
if (buffer == This->samples)
{
assert(!This->queued_number);
hr = VFW_E_TIMEOUT;
}
else
{
--This->queued_number;
hr = S_OK;
}
}
} while (SleepConditionVariableCS(&filter->sample_cv, &filter->sample_cs, timeout));
if (SUCCEEDED(hr))
{
REFERENCE_TIME rtStart, rtStop;
DATAREQUEST *pDataRq = This->sample_list + buffer;
DWORD dwBytes = 0;
/* get any errors */
if (!This->flushing && !GetOverlappedResult(This->file, &pDataRq->ovl, &dwBytes, FALSE))
hr = HRESULT_FROM_WIN32(GetLastError());
/* Return the sample no matter what so it can be destroyed */
*ppSample = pDataRq->pSample;
*pdwUser = pDataRq->dwUserData;
if (This->flushing)
hr = VFW_E_WRONG_STATE;
if (FAILED(hr))
dwBytes = 0;
/* Set the time on the sample */
IMediaSample_SetActualDataLength(pDataRq->pSample, dwBytes);
rtStart = (DWORD64)pDataRq->ovl.u.s.Offset + ((DWORD64)pDataRq->ovl.u.s.OffsetHigh << 32);
rtStart = MEDIATIME_FROM_BYTES(rtStart);
rtStop = rtStart + MEDIATIME_FROM_BYTES(dwBytes);
IMediaSample_SetTime(pDataRq->pSample, &rtStart, &rtStop);
This->sample_list[buffer].pSample = NULL;
assert(This->oldest_sample < This->samples);
if (buffer == This->oldest_sample)
{
LONG x;
for (x = This->oldest_sample + 1; x < This->samples; ++x)
if (This->sample_list[x].pSample)
break;
if (x >= This->samples)
for (x = 0; x < This->oldest_sample; ++x)
if (This->sample_list[x].pSample)
break;
if (This->oldest_sample == x)
/* No samples found, reset to 0 */
x = 0;
This->oldest_sample = x;
}
}
LeaveCriticalSection(&This->sample_cs);
TRACE("-- %x\n", hr);
return hr;
LeaveCriticalSection(&filter->sample_cs);
return VFW_E_TIMEOUT;
}
static BOOL sync_read(HANDLE file, LONGLONG offset, LONG length, BYTE *buffer, DWORD *read_len)
@ -971,7 +894,7 @@ static BOOL sync_read(HANDLE file, LONGLONG offset, LONG length, BYTE *buffer, D
OVERLAPPED ovl = {0};
BOOL ret;
ovl.hEvent = CreateEventW(NULL, TRUE, FALSE, NULL);
ovl.hEvent = (HANDLE)((ULONG_PTR)CreateEventW(NULL, TRUE, FALSE, NULL) | 1);
ovl.u.s.Offset = (DWORD)offset;
ovl.u.s.OffsetHigh = offset >> 32;
@ -1062,14 +985,17 @@ static HRESULT WINAPI FileAsyncReader_Length(IAsyncReader *iface, LONGLONG *tota
static HRESULT WINAPI FileAsyncReader_BeginFlush(IAsyncReader * iface)
{
AsyncReader *filter = impl_from_IAsyncReader(iface);
unsigned int i;
TRACE("iface %p.\n", iface);
EnterCriticalSection(&filter->sample_cs);
filter->flushing = TRUE;
for (i = 0; i < filter->max_requests; ++i)
filter->requests[i].sample = NULL;
CancelIoEx(filter->file, NULL);
SetEvent(filter->handle_list[filter->samples]);
WakeAllConditionVariable(&filter->sample_cv);
LeaveCriticalSection(&filter->sample_cs);
@ -1079,16 +1005,12 @@ static HRESULT WINAPI FileAsyncReader_BeginFlush(IAsyncReader * iface)
static HRESULT WINAPI FileAsyncReader_EndFlush(IAsyncReader * iface)
{
AsyncReader *filter = impl_from_IAsyncReader(iface);
int x;
TRACE("iface %p.\n", iface);
EnterCriticalSection(&filter->sample_cs);
ResetEvent(filter->handle_list[filter->samples]);
filter->flushing = FALSE;
for (x = 0; x < filter->samples; ++x)
assert(!filter->sample_list[x].pSample);
LeaveCriticalSection(&filter->sample_cs);

View File

@ -916,12 +916,23 @@ static void test_request(IAsyncReader *reader, IMemAllocator *allocator)
IMediaSample_Release(sample2);
}
static DWORD CALLBACK wait_thread(void *arg)
{
IAsyncReader *reader = arg;
IMediaSample *sample;
DWORD_PTR cookie;
HRESULT hr = IAsyncReader_WaitForNext(reader, 2000, &sample, &cookie);
ok(hr == VFW_E_WRONG_STATE, "Got hr %#x.\n", hr);
return 0;
}
static void test_flush(IAsyncReader *reader, IMemAllocator *allocator)
{
REFERENCE_TIME start_time, end_time;
IMediaSample *sample, *ret_sample;
BYTE buffer[20], *data;
DWORD_PTR cookie;
HANDLE thread;
HRESULT hr;
int i;
@ -973,12 +984,23 @@ static void test_flush(IAsyncReader *reader, IMemAllocator *allocator)
for (i = 0; i < 512; i++)
ok(data[i] == i % 111, "Got wrong byte %02x at %u.\n", data[i], i);
thread = CreateThread(NULL, 0, wait_thread, reader, 0, NULL);
ok(WaitForSingleObject(thread, 100) == WAIT_TIMEOUT, "Expected timeout.\n");
hr = IAsyncReader_BeginFlush(reader);
ok(hr == S_OK, "Got hr %#x.\n", hr);
ok(!WaitForSingleObject(thread, 1000), "Wait timed out.\n");
CloseHandle(thread);
hr = IAsyncReader_EndFlush(reader);
ok(hr == S_OK, "Got hr %#x.\n", hr);
IMediaSample_Release(sample);
}
static void test_async_reader(void)
{
ALLOCATOR_PROPERTIES req_props = {2, 1024, 512, 0}, ret_props;
ALLOCATOR_PROPERTIES req_props = {100, 1024, 512, 0}, ret_props;
IBaseFilter *filter = create_file_source();
IFileSourceFilter *filesource;
LONGLONG length, available;
@ -1048,7 +1070,7 @@ static void test_async_reader(void)
ret_props = req_props;
hr = IAsyncReader_RequestAllocator(reader, NULL, &ret_props, &allocator);
ok(hr == S_OK, "Got hr %#x.\n", hr);
ok(ret_props.cBuffers == 2, "Got %d buffers.\n", ret_props.cBuffers);
ok(ret_props.cBuffers == 100, "Got %d buffers.\n", ret_props.cBuffers);
ok(ret_props.cbBuffer == 1024, "Got size %d.\n", ret_props.cbBuffer);
ok(ret_props.cbAlign == 512, "Got alignment %d.\n", ret_props.cbAlign);
ok(ret_props.cbPrefix == 0, "Got prefix %d.\n", ret_props.cbPrefix);