winegstreamer: Use pthread condition variables to queue stream events.

Signed-off-by: Zebediah Figura <z.figura12@gmail.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
Zebediah Figura 2021-01-23 12:43:44 -06:00 committed by Alexandre Julliard
parent e785880a96
commit 54e012b5b7
1 changed files with 35 additions and 38 deletions

View File

@ -60,8 +60,6 @@ struct parser
LONGLONG filesize;
CRITICAL_SECTION cs;
/* FIXME: It would be nice to avoid duplicating these with strmbase.
* However, synchronization is tricky; we need access to be protected by a
* separate lock. */
@ -117,7 +115,7 @@ struct parser_source
SourceSeeking seek;
CRITICAL_SECTION flushing_cs;
CONDITION_VARIABLE event_cv, event_empty_cv;
pthread_cond_t event_cond, event_empty_cond;
bool flushing, eos;
struct parser_event event;
HANDLE thread;
@ -686,18 +684,18 @@ static GstFlowReturn queue_stream_event(struct parser_source *pin, const struct
{
struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter);
EnterCriticalSection(&filter->cs);
pthread_mutex_lock(&filter->mutex);
while (!pin->flushing && pin->event.type != PARSER_EVENT_NONE)
SleepConditionVariableCS(&pin->event_empty_cv, &filter->cs, INFINITE);
pthread_cond_wait(&pin->event_empty_cond, &filter->mutex);
if (pin->flushing)
{
LeaveCriticalSection(&filter->cs);
pthread_mutex_unlock(&filter->mutex);
TRACE("Filter is flushing; discarding event.\n");
return GST_FLOW_FLUSHING;
}
pin->event = *event;
LeaveCriticalSection(&filter->cs);
WakeConditionVariable(&pin->event_cv);
pthread_mutex_unlock(&filter->mutex);
pthread_cond_signal(&pin->event_cond);
TRACE("Event queued.\n");
return GST_FLOW_OK;
}
@ -755,10 +753,10 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
case GST_EVENT_FLUSH_START:
if (pin->pin.pin.peer)
{
EnterCriticalSection(&filter->cs);
pthread_mutex_lock(&filter->mutex);
pin->flushing = true;
WakeConditionVariable(&pin->event_empty_cv);
pthread_cond_signal(&pin->event_empty_cond);
switch (pin->event.type)
{
@ -773,7 +771,7 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
}
pin->event.type = PARSER_EVENT_NONE;
LeaveCriticalSection(&filter->cs);
pthread_mutex_unlock(&filter->mutex);
}
break;
@ -781,9 +779,9 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
gst_segment_init(pin->segment, GST_FORMAT_TIME);
if (pin->pin.pin.peer)
{
EnterCriticalSection(&filter->cs);
pthread_mutex_lock(&filter->mutex);
pin->flushing = false;
LeaveCriticalSection(&filter->cs);
pthread_mutex_unlock(&filter->mutex);
}
break;
@ -1037,14 +1035,14 @@ static DWORD CALLBACK stream_thread(void *arg)
struct parser_event event;
EnterCriticalSection(&pin->flushing_cs);
EnterCriticalSection(&filter->cs);
pthread_mutex_lock(&filter->mutex);
while (!filter->flushing && pin->event.type == PARSER_EVENT_NONE)
SleepConditionVariableCS(&pin->event_cv, &filter->cs, INFINITE);
pthread_cond_wait(&pin->event_cond, &filter->mutex);
if (filter->flushing)
{
LeaveCriticalSection(&filter->cs);
pthread_mutex_unlock(&filter->mutex);
LeaveCriticalSection(&pin->flushing_cs);
TRACE("Filter is flushing.\n");
continue;
@ -1052,16 +1050,16 @@ static DWORD CALLBACK stream_thread(void *arg)
if (!pin->event.type)
{
LeaveCriticalSection(&filter->cs);
pthread_mutex_unlock(&filter->mutex);
LeaveCriticalSection(&pin->flushing_cs);
continue;
}
event = pin->event;
pin->event.type = PARSER_EVENT_NONE;
WakeConditionVariable(&pin->event_empty_cv);
pthread_cond_signal(&pin->event_empty_cond);
LeaveCriticalSection(&filter->cs);
pthread_mutex_unlock(&filter->mutex);
TRACE("Got event of type %#x.\n", event.type);
@ -1598,8 +1596,6 @@ static void parser_destroy(struct strmbase_filter *iface)
pthread_cond_destroy(&filter->init_cond);
pthread_mutex_destroy(&filter->mutex);
filter->cs.DebugInfo->Spare[0] = 0;
DeleteCriticalSection(&filter->cs);
strmbase_sink_cleanup(&filter->sink);
strmbase_filter_cleanup(&filter->filter);
heap_free(filter);
@ -1616,9 +1612,9 @@ static HRESULT parser_init_stream(struct strmbase_filter *iface)
return S_OK;
filter->streaming = true;
EnterCriticalSection(&filter->cs);
pthread_mutex_lock(&filter->mutex);
filter->flushing = false;
LeaveCriticalSection(&filter->cs);
pthread_mutex_unlock(&filter->mutex);
/* DirectShow retains the old seek positions, but resets to them every time
* it transitions from stopped -> paused. */
@ -1656,9 +1652,9 @@ static HRESULT parser_cleanup_stream(struct strmbase_filter *iface)
return S_OK;
filter->streaming = false;
EnterCriticalSection(&filter->cs);
pthread_mutex_lock(&filter->mutex);
filter->flushing = true;
LeaveCriticalSection(&filter->cs);
pthread_mutex_unlock(&filter->mutex);
for (i = 0; i < filter->source_count; ++i)
{
@ -1667,7 +1663,7 @@ static HRESULT parser_cleanup_stream(struct strmbase_filter *iface)
if (!pin->pin.pin.peer)
continue;
WakeConditionVariable(&pin->event_cv);
pthread_cond_signal(&pin->event_cond);
}
for (i = 0; i < filter->source_count; ++i)
@ -1893,8 +1889,6 @@ static void parser_init_common(struct parser *object)
{
pthread_mutex_init(&object->mutex, NULL);
pthread_cond_init(&object->init_cond, NULL);
InitializeCriticalSection(&object->cs);
object->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": parser.cs");
object->flushing = true;
}
@ -2044,15 +2038,15 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface,
if (!(current_flags & AM_SEEKING_NoFlush))
{
EnterCriticalSection(&filter->cs);
pthread_mutex_lock(&filter->mutex);
filter->flushing = true;
LeaveCriticalSection(&filter->cs);
pthread_mutex_unlock(&filter->mutex);
for (i = 0; i < filter->source_count; ++i)
{
if (filter->sources[i]->pin.pin.peer)
{
WakeConditionVariable(&pin->event_cv);
pthread_cond_signal(&pin->event_cond);
IPin_BeginFlush(filter->sources[i]->pin.pin.peer);
}
}
@ -2093,9 +2087,9 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface,
if (!(current_flags & AM_SEEKING_NoFlush))
{
EnterCriticalSection(&filter->cs);
pthread_mutex_lock(&filter->mutex);
filter->flushing = false;
LeaveCriticalSection(&filter->cs);
pthread_mutex_unlock(&filter->mutex);
for (i = 0; i < filter->source_count; ++i)
{
@ -2329,6 +2323,9 @@ static void free_source_pin(struct parser_source *pin)
gst_object_unref(pin->my_sink);
gst_segment_free(pin->segment);
pthread_cond_destroy(&pin->event_cond);
pthread_cond_destroy(&pin->event_empty_cond);
pin->flushing_cs.DebugInfo->Spare[0] = 0;
DeleteCriticalSection(&pin->flushing_cs);
@ -2365,8 +2362,8 @@ static struct parser_source *create_pin(struct parser *filter, const WCHAR *name
pin->IQualityControl_iface.lpVtbl = &GSTOutPin_QualityControl_Vtbl;
strmbase_seeking_init(&pin->seek, &GST_Seeking_Vtbl, GST_ChangeStop,
GST_ChangeCurrent, GST_ChangeRate);
InitializeConditionVariable(&pin->event_cv);
InitializeConditionVariable(&pin->event_empty_cv);
pthread_cond_init(&pin->event_cond, NULL);
pthread_cond_init(&pin->event_empty_cond, NULL);
BaseFilterImpl_IncrementPinVersion(&filter->filter);
InitializeCriticalSection(&pin->flushing_cs);
@ -2394,13 +2391,13 @@ static HRESULT GST_RemoveOutputPins(struct parser *This)
return S_OK;
/* Unblock all of our streams. */
EnterCriticalSection(&This->cs);
pthread_mutex_lock(&This->mutex);
for (i = 0; i < This->source_count; ++i)
{
This->sources[i]->flushing = true;
WakeConditionVariable(&This->sources[i]->event_empty_cv);
pthread_cond_signal(&This->sources[i]->event_empty_cond);
}
LeaveCriticalSection(&This->cs);
pthread_mutex_unlock(&This->mutex);
gst_element_set_state(This->container, GST_STATE_NULL);
gst_pad_unlink(This->my_src, This->their_sink);