mfreadwrite: Use single reader lock for all streams.

Signed-off-by: Nikolay Sivov <nsivov@codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
Nikolay Sivov 2020-03-31 15:46:34 +03:00 committed by Alexandre Julliard
parent ae5406f243
commit 4e72ea0f4a
1 changed files with 132 additions and 100 deletions

View File

@ -108,9 +108,6 @@ struct media_stream
IMFTransform *decoder;
DWORD id;
unsigned int index;
CRITICAL_SECTION cs;
CONDITION_VARIABLE sample_event;
struct list responses;
enum media_stream_state state;
BOOL selected;
BOOL presented;
@ -150,7 +147,9 @@ struct source_reader
enum media_source_state source_state;
struct media_stream *streams;
DWORD stream_count;
struct list responses;
CRITICAL_SECTION cs;
CONDITION_VARIABLE sample_event;
};
static inline struct source_reader *impl_from_IMFSourceReader(IMFSourceReader *iface)
@ -327,7 +326,7 @@ static void source_reader_queue_response(struct source_reader *reader, struct me
if (response->sample)
IMFSample_AddRef(response->sample);
list_add_tail(&stream->responses, &response->entry);
list_add_tail(&reader->responses, &response->entry);
if (stream->requests)
{
@ -343,7 +342,7 @@ static void source_reader_queue_response(struct source_reader *reader, struct me
}
}
else
WakeAllConditionVariable(&stream->sample_event);
WakeAllConditionVariable(&reader->sample_event);
stream->requests--;
}
@ -386,12 +385,15 @@ static HRESULT source_reader_new_stream_handler(struct source_reader *reader, IM
return hr;
}
EnterCriticalSection(&reader->cs);
for (i = 0; i < reader->stream_count; ++i)
{
if (id == reader->streams[i].id)
{
if (!InterlockedCompareExchangePointer((void **)&reader->streams[i].stream, stream, NULL))
if (!reader->streams[i].stream)
{
reader->streams[i].stream = stream;
IMFMediaStream_AddRef(reader->streams[i].stream);
if (FAILED(hr = IMFMediaStream_BeginGetEvent(stream, &reader->stream_events_callback,
(IUnknown *)stream)))
@ -399,10 +401,8 @@ static HRESULT source_reader_new_stream_handler(struct source_reader *reader, IM
WARN("Failed to subscribe to stream events, hr %#x.\n", hr);
}
EnterCriticalSection(&reader->streams[i].cs);
if (reader->streams[i].requests)
source_reader_request_sample(reader, &reader->streams[i]);
LeaveCriticalSection(&reader->streams[i].cs);
}
break;
}
@ -411,6 +411,8 @@ static HRESULT source_reader_new_stream_handler(struct source_reader *reader, IM
if (i == reader->stream_count)
WARN("Stream with id %#x was not present in presentation descriptor.\n", id);
LeaveCriticalSection(&reader->cs);
IMFMediaStream_Release(stream);
return hr;
@ -630,21 +632,19 @@ static HRESULT source_reader_media_sample_handler(struct source_reader *reader,
return hr;
}
EnterCriticalSection(&reader->cs);
for (i = 0; i < reader->stream_count; ++i)
{
if (id == reader->streams[i].id)
{
/* FIXME: propagate processing errors? */
EnterCriticalSection(&reader->streams[i].cs);
reader->streams[i].flags &= ~STREAM_FLAG_SAMPLE_REQUESTED;
hr = source_reader_process_sample(reader, &reader->streams[i], sample);
if (reader->streams[i].requests)
source_reader_request_sample(reader, &reader->streams[i]);
LeaveCriticalSection(&reader->streams[i].cs);
break;
}
}
@ -652,6 +652,8 @@ static HRESULT source_reader_media_sample_handler(struct source_reader *reader,
if (i == reader->stream_count)
WARN("Stream with id %#x was not present in presentation descriptor.\n", id);
LeaveCriticalSection(&reader->cs);
IMFSample_Release(sample);
return hr;
@ -675,14 +677,14 @@ static HRESULT source_reader_media_stream_state_handler(struct source_reader *re
return hr;
}
EnterCriticalSection(&reader->cs);
for (i = 0; i < reader->stream_count; ++i)
{
struct media_stream *stream = &reader->streams[i];
if (id == stream->id)
{
EnterCriticalSection(&stream->cs);
switch (event_type)
{
case MEEndOfStream:
@ -716,12 +718,12 @@ static HRESULT source_reader_media_stream_state_handler(struct source_reader *re
;
}
LeaveCriticalSection(&stream->cs);
break;
}
}
LeaveCriticalSection(&reader->cs);
return S_OK;
}
@ -790,18 +792,33 @@ static ULONG WINAPI source_reader_async_commands_callback_Release(IMFAsyncCallba
return IMFSourceReader_Release(&reader->IMFSourceReader_iface);
}
static struct stream_response *media_stream_pop_response(struct media_stream *stream)
static struct stream_response *media_stream_pop_response(struct source_reader *reader, struct media_stream *stream)
{
struct stream_response *response = NULL;
struct stream_response *response;
struct list *head;
if ((head = list_head(&stream->responses)))
if (stream)
{
LIST_FOR_EACH_ENTRY(response, &reader->responses, struct stream_response, entry)
{
if (response->stream_index == stream->index)
{
list_remove(&response->entry);
return response;
}
}
}
else
{
if ((head = list_head(&reader->responses)))
{
response = LIST_ENTRY(head, struct stream_response, entry);
list_remove(&response->entry);
return response;
}
}
return response;
return NULL;
}
static void source_reader_release_response(struct stream_response *response)
@ -857,13 +874,38 @@ static HRESULT source_reader_start_source(struct source_reader *reader)
return hr;
}
static BOOL source_reader_got_response_for_stream(struct source_reader *reader, struct media_stream *stream)
{
struct stream_response *response;
LIST_FOR_EACH_ENTRY(response, &reader->responses, struct stream_response, entry)
{
if (response->stream_index == stream->index)
return TRUE;
}
return FALSE;
}
static BOOL source_reader_get_read_result(struct source_reader *reader, struct media_stream *stream, DWORD flags,
HRESULT *status, DWORD *stream_index, DWORD *stream_flags, LONGLONG *timestamp, IMFSample **sample)
{
struct stream_response *response = NULL;
BOOL request_sample = FALSE;
if (list_empty(&stream->responses))
if ((response = media_stream_pop_response(reader, stream)))
{
*status = response->status;
*stream_index = stream->index;
*stream_flags = response->stream_flags;
*timestamp = response->timestamp;
*sample = response->sample;
if (*sample)
IMFSample_AddRef(*sample);
source_reader_release_response(response);
}
else
{
*status = S_OK;
*stream_index = stream->index;
@ -880,20 +922,6 @@ static BOOL source_reader_get_read_result(struct source_reader *reader, struct m
*stream_flags = 0;
}
}
else
{
response = media_stream_pop_response(stream);
*status = response->status;
*stream_index = stream->index;
*stream_flags = response->stream_flags;
*timestamp = response->timestamp;
*sample = response->sample;
if (*sample)
IMFSample_AddRef(*sample);
source_reader_release_response(response);
}
return !request_sample;
}
@ -925,12 +953,19 @@ static HRESULT source_reader_get_stream_read_index(struct source_reader *reader,
return hr;
}
static void source_reader_release_responses(struct media_stream *stream)
static void source_reader_release_responses(struct source_reader *reader, struct media_stream *stream)
{
struct stream_response *ptr, *next;
LIST_FOR_EACH_ENTRY_SAFE(ptr, next, &stream->responses, struct stream_response, entry)
LIST_FOR_EACH_ENTRY_SAFE(ptr, next, &reader->responses, struct stream_response, entry)
{
if (stream && stream->index != ptr->stream_index &&
ptr->stream_index != MF_SOURCE_READER_FIRST_VIDEO_STREAM &&
ptr->stream_index != MF_SOURCE_READER_FIRST_AUDIO_STREAM &&
ptr->stream_index != MF_SOURCE_READER_ANY_STREAM)
{
continue;
}
list_remove(&ptr->entry);
source_reader_release_response(ptr);
}
@ -938,22 +973,26 @@ static void source_reader_release_responses(struct media_stream *stream)
static void source_reader_flush_stream(struct source_reader *reader, DWORD stream_index)
{
struct media_stream *stream = &reader->streams[stream_index];
struct media_stream *stream = stream_index == MF_SOURCE_READER_ALL_STREAMS ? NULL : &reader->streams[stream_index];
EnterCriticalSection(&stream->cs);
source_reader_release_responses(stream);
source_reader_release_responses(reader, stream);
if (stream->decoder)
IMFTransform_ProcessMessage(stream->decoder, MFT_MESSAGE_COMMAND_FLUSH, 0);
stream->requests = 0;
LeaveCriticalSection(&stream->cs);
}
static HRESULT source_reader_flush(struct source_reader *reader, unsigned int index)
{
unsigned int stream_index;
EnterCriticalSection(&reader->cs);
if (index == MF_SOURCE_READER_ALL_STREAMS)
{
source_reader_flush_stream(reader, index);
}
else
{
switch (index)
{
case MF_SOURCE_READER_FIRST_VIDEO_STREAM:
@ -961,19 +1000,15 @@ static HRESULT source_reader_flush(struct source_reader *reader, unsigned int in
break;
case MF_SOURCE_READER_FIRST_AUDIO_STREAM:
stream_index = reader->first_audio_stream_index;
break;
case MF_SOURCE_READER_ALL_STREAMS:
for (stream_index = 0; stream_index < reader->stream_count; ++stream_index)
{
source_reader_flush_stream(reader, stream_index);
}
break;
default:
stream_index = index;
}
source_reader_flush_stream(reader, stream_index);
}
LeaveCriticalSection(&reader->cs);
return S_OK;
}
@ -1002,9 +1037,9 @@ static HRESULT WINAPI source_reader_async_commands_callback_Invoke(IMFAsyncCallb
if (FAILED(hr = source_reader_get_stream_read_index(reader, command->stream_index, &stream_index)))
return hr;
stream = &reader->streams[stream_index];
EnterCriticalSection(&reader->cs);
EnterCriticalSection(&stream->cs);
stream = &reader->streams[stream_index];
if (SUCCEEDED(hr = source_reader_start_source(reader)))
{
@ -1017,7 +1052,7 @@ static HRESULT WINAPI source_reader_async_commands_callback_Invoke(IMFAsyncCallb
}
}
LeaveCriticalSection(&stream->cs);
LeaveCriticalSection(&reader->cs);
if (report_sample)
IMFSourceReaderCallback_OnReadSample(reader->async_callback, status, stream_index, stream_flags,
@ -1029,11 +1064,10 @@ static HRESULT WINAPI source_reader_async_commands_callback_Invoke(IMFAsyncCallb
break;
case SOURCE_READER_ASYNC_SAMPLE_READY:
stream = &reader->streams[command->stream_index];
EnterCriticalSection(&stream->cs);
response = media_stream_pop_response(stream);
LeaveCriticalSection(&stream->cs);
EnterCriticalSection(&reader->cs);
response = media_stream_pop_response(reader, NULL);
LeaveCriticalSection(&reader->cs);
if (response)
{
@ -1126,10 +1160,8 @@ static ULONG WINAPI src_reader_Release(IMFSourceReader *iface)
IMFMediaType_Release(stream->current);
if (stream->decoder)
IMFTransform_Release(stream->decoder);
DeleteCriticalSection(&stream->cs);
source_reader_release_responses(stream);
}
source_reader_release_responses(reader, NULL);
heap_free(reader->streams);
DeleteCriticalSection(&reader->cs);
heap_free(reader);
@ -1544,39 +1576,40 @@ static HRESULT source_reader_read_sample(struct source_reader *reader, DWORD ind
if (!actual_index)
actual_index = &actual_index_tmp;
if (FAILED(hr = source_reader_get_stream_read_index(reader, index, &stream_index)))
{
*actual_index = index;
*stream_flags = MF_SOURCE_READERF_ERROR;
*timestamp = 0;
return hr;
}
EnterCriticalSection(&reader->cs);
if (SUCCEEDED(hr = source_reader_start_source(reader)))
{
if (SUCCEEDED(hr = source_reader_get_stream_read_index(reader, index, &stream_index)))
{
*actual_index = stream_index;
stream = &reader->streams[stream_index];
EnterCriticalSection(&stream->cs);
if (SUCCEEDED(hr = source_reader_start_source(reader)))
{
if (!source_reader_get_read_result(reader, stream, flags, &hr, actual_index, stream_flags,
timestamp, sample))
{
while (list_empty(&stream->responses) && stream->state != STREAM_STATE_EOS)
while (!source_reader_got_response_for_stream(reader, stream) && stream->state != STREAM_STATE_EOS)
{
stream->requests++;
if (FAILED(hr = source_reader_request_sample(reader, stream)))
WARN("Failed to request a sample, hr %#x.\n", hr);
SleepConditionVariableCS(&stream->sample_event, &stream->cs, INFINITE);
SleepConditionVariableCS(&reader->sample_event, &reader->cs, INFINITE);
}
source_reader_get_read_result(reader, stream, flags, &hr, actual_index, stream_flags,
timestamp, sample);
}
}
else
{
*actual_index = index;
*stream_flags = MF_SOURCE_READERF_ERROR;
*timestamp = 0;
}
}
LeaveCriticalSection(&stream->cs);
LeaveCriticalSection(&reader->cs);
TRACE("Stream %u, got sample %p, flags %#x.\n", *actual_index, *sample, *stream_flags);
@ -1796,9 +1829,11 @@ static HRESULT create_source_reader_from_source(IMFMediaSource *source, IMFAttri
object->stream_events_callback.lpVtbl = &stream_events_callback_vtbl;
object->async_commands_callback.lpVtbl = &async_commands_callback_vtbl;
object->refcount = 1;
list_init(&object->responses);
object->source = source;
IMFMediaSource_AddRef(object->source);
InitializeCriticalSection(&object->cs);
InitializeConditionVariable(&object->sample_event);
if (FAILED(hr = IMFMediaSource_CreatePresentationDescriptor(object->source, &object->descriptor)))
goto failed;
@ -1845,9 +1880,6 @@ static HRESULT create_source_reader_from_source(IMFMediaSource *source, IMFAttri
break;
object->streams[i].index = i;
InitializeCriticalSection(&object->streams[i].cs);
InitializeConditionVariable(&object->streams[i].sample_event);
list_init(&object->streams[i].responses);
}
if (FAILED(hr))