winegstreamer: Flush connected pins directly in SetPositions().

Instead of propagating GStreamer flush events to corresponding DirectShow pins.

This is mainly to avoid more callbacks from GStreamer and further separate the
Win32 and Unix code.

Signed-off-by: Zebediah Figura <z.figura12@gmail.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
Zebediah Figura 2021-01-20 18:58:49 -06:00 committed by Alexandre Julliard
parent a952453888
commit ef79e902a6
1 changed files with 86 additions and 44 deletions

View File

@ -65,7 +65,7 @@ struct parser
/* FIXME: It would be nice to avoid duplicating these with strmbase. /* FIXME: It would be nice to avoid duplicating these with strmbase.
* However, synchronization is tricky; we need access to be protected by a * However, synchronization is tricky; we need access to be protected by a
* separate lock. */ * separate lock. */
bool streaming; bool streaming, flushing;
BOOL initial, ignore_flush; BOOL initial, ignore_flush;
GstElement *container; GstElement *container;
@ -111,8 +111,9 @@ struct parser_source
GstCaps *caps; GstCaps *caps;
SourceSeeking seek; SourceSeeking seek;
CONDITION_VARIABLE event_cv, event_empty_cv, flushing_cv, flush_stop_cv; CRITICAL_SECTION flushing_cs;
bool flushing, thread_blocked; CONDITION_VARIABLE event_cv, event_empty_cv;
bool flushing;
struct parser_event event; struct parser_event event;
HANDLE thread; HANDLE thread;
}; };
@ -630,8 +631,6 @@ static gboolean gst_base_src_perform_seek(struct parser *This, GstEvent *event)
tevent = gst_event_new_flush_start(); tevent = gst_event_new_flush_start();
gst_event_set_seqnum(tevent, seqnum); gst_event_set_seqnum(tevent, seqnum);
gst_pad_push_event(This->my_src, tevent); gst_pad_push_event(This->my_src, tevent);
if (This->reader)
IAsyncReader_BeginFlush(This->reader);
if (thread) if (thread)
gst_pad_set_active(This->my_src, 1); gst_pad_set_active(This->my_src, 1);
} }
@ -643,8 +642,6 @@ static gboolean gst_base_src_perform_seek(struct parser *This, GstEvent *event)
tevent = gst_event_new_flush_stop(TRUE); tevent = gst_event_new_flush_stop(TRUE);
gst_event_set_seqnum(tevent, seqnum); gst_event_set_seqnum(tevent, seqnum);
gst_pad_push_event(This->my_src, tevent); gst_pad_push_event(This->my_src, tevent);
if (This->reader)
IAsyncReader_EndFlush(This->reader);
if (thread) if (thread)
gst_pad_set_active(This->my_src, 1); gst_pad_set_active(This->my_src, 1);
} }
@ -659,25 +656,18 @@ static gboolean event_src(GstPad *pad, GstObject *parent, GstEvent *event)
TRACE("filter %p, type \"%s\".\n", This, GST_EVENT_TYPE_NAME(event)); TRACE("filter %p, type \"%s\".\n", This, GST_EVENT_TYPE_NAME(event));
switch (event->type) { switch (event->type)
{
case GST_EVENT_SEEK: case GST_EVENT_SEEK:
ret = gst_base_src_perform_seek(This, event); ret = gst_base_src_perform_seek(This, event);
break; break;
case GST_EVENT_FLUSH_START: case GST_EVENT_FLUSH_START:
EnterCriticalSection(&This->filter.filter_cs);
if (This->reader)
IAsyncReader_BeginFlush(This->reader);
LeaveCriticalSection(&This->filter.filter_cs);
break;
case GST_EVENT_FLUSH_STOP: case GST_EVENT_FLUSH_STOP:
EnterCriticalSection(&This->filter.filter_cs);
if (This->reader)
IAsyncReader_EndFlush(This->reader);
LeaveCriticalSection(&This->filter.filter_cs);
break;
case GST_EVENT_QOS: case GST_EVENT_QOS:
case GST_EVENT_RECONFIGURE: case GST_EVENT_RECONFIGURE:
break; break;
default: default:
WARN("Ignoring \"%s\" event.\n", GST_EVENT_TYPE_NAME(event)); WARN("Ignoring \"%s\" event.\n", GST_EVENT_TYPE_NAME(event));
ret = FALSE; ret = FALSE;
@ -757,19 +747,11 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
} }
if (pin->pin.pin.peer) if (pin->pin.pin.peer)
{ {
IPin_BeginFlush(pin->pin.pin.peer);
EnterCriticalSection(&filter->cs); EnterCriticalSection(&filter->cs);
pin->flushing = true; pin->flushing = true;
WakeConditionVariable(&pin->event_cv);
WakeConditionVariable(&pin->event_empty_cv); WakeConditionVariable(&pin->event_empty_cv);
/* Wait for the thread to pause itself, to ensure that no stale
* samples are sent. */
while (!pin->thread_blocked)
SleepConditionVariableCS(&pin->flushing_cv, &filter->cs, INFINITE);
/* And flush out any buffered event. */
switch (pin->event.type) switch (pin->event.type)
{ {
case PARSER_EVENT_NONE: case PARSER_EVENT_NONE:
@ -797,9 +779,6 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
EnterCriticalSection(&filter->cs); EnterCriticalSection(&filter->cs);
pin->flushing = false; pin->flushing = false;
LeaveCriticalSection(&filter->cs); LeaveCriticalSection(&filter->cs);
WakeConditionVariable(&pin->flush_stop_cv);
IPin_EndFlush(pin->pin.pin.peer);
} }
break; break;
@ -1057,32 +1036,31 @@ static DWORD CALLBACK stream_thread(void *arg)
{ {
struct parser_event event; struct parser_event event;
EnterCriticalSection(&pin->flushing_cs);
EnterCriticalSection(&filter->cs); EnterCriticalSection(&filter->cs);
while (filter->streaming && !pin->flushing && pin->event.type == PARSER_EVENT_NONE) while (filter->streaming && !filter->flushing && pin->event.type == PARSER_EVENT_NONE)
SleepConditionVariableCS(&pin->event_cv, &filter->cs, INFINITE); SleepConditionVariableCS(&pin->event_cv, &filter->cs, INFINITE);
if (pin->flushing) if (filter->flushing)
{ {
TRACE("Filter is flushing; pausing thread.\n"); LeaveCriticalSection(&filter->cs);
pin->thread_blocked = true; LeaveCriticalSection(&pin->flushing_cs);
WakeConditionVariable(&pin->flushing_cv); TRACE("Filter is flushing.\n");
do continue;
SleepConditionVariableCS(&pin->flush_stop_cv, &filter->cs, INFINITE);
while (pin->flushing);
pin->thread_blocked = false;
TRACE("Filter is no longer flushing; resuming thread.\n");
} }
if (!filter->streaming) if (!filter->streaming)
{ {
LeaveCriticalSection(&filter->cs); LeaveCriticalSection(&filter->cs);
LeaveCriticalSection(&pin->flushing_cs);
break; break;
} }
if (!pin->event.type) if (!pin->event.type)
{ {
LeaveCriticalSection(&filter->cs); LeaveCriticalSection(&filter->cs);
LeaveCriticalSection(&pin->flushing_cs);
continue; continue;
} }
@ -1127,6 +1105,8 @@ static DWORD CALLBACK stream_thread(void *arg)
case PARSER_EVENT_NONE: case PARSER_EVENT_NONE:
assert(0); assert(0);
} }
LeaveCriticalSection(&pin->flushing_cs);
} }
TRACE("Streaming stopped; exiting.\n"); TRACE("Streaming stopped; exiting.\n");
@ -2153,7 +2133,10 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface,
{ {
GstSeekType current_type = GST_SEEK_TYPE_SET, stop_type = GST_SEEK_TYPE_SET; GstSeekType current_type = GST_SEEK_TYPE_SET, stop_type = GST_SEEK_TYPE_SET;
struct parser_source *pin = impl_from_IMediaSeeking(iface); struct parser_source *pin = impl_from_IMediaSeeking(iface);
struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter);
GstSeekFlags flags = 0; GstSeekFlags flags = 0;
HRESULT hr = S_OK;
int i;
TRACE("pin %p, current %s, current_flags %#x, stop %s, stop_flags %#x.\n", TRACE("pin %p, current %s, current_flags %#x, stop %s, stop_flags %#x.\n",
pin, current ? debugstr_time(*current) : "<null>", current_flags, pin, current ? debugstr_time(*current) : "<null>", current_flags,
@ -2161,9 +2144,40 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface,
mark_wine_thread(); mark_wine_thread();
SourceSeekingImpl_SetPositions(iface, current, current_flags, stop, stop_flags);
if (pin->pin.pin.filter->state == State_Stopped) if (pin->pin.pin.filter->state == State_Stopped)
{
SourceSeekingImpl_SetPositions(iface, current, current_flags, stop, stop_flags);
return S_OK; return S_OK;
}
if (!(current_flags & AM_SEEKING_NoFlush))
{
EnterCriticalSection(&filter->cs);
filter->flushing = true;
LeaveCriticalSection(&filter->cs);
for (i = 0; i < filter->source_count; ++i)
{
if (filter->sources[i]->pin.pin.peer)
{
WakeConditionVariable(&pin->event_cv);
IPin_BeginFlush(filter->sources[i]->pin.pin.peer);
}
}
if (filter->reader)
IAsyncReader_BeginFlush(filter->reader);
}
/* Acquire the flushing locks. This blocks the streaming threads, and
* ensures the seek is serialized between flushes. */
for (i = 0; i < filter->source_count; ++i)
{
if (filter->sources[i]->pin.pin.peer)
EnterCriticalSection(&pin->flushing_cs);
}
SourceSeekingImpl_SetPositions(iface, current, current_flags, stop, stop_flags);
if (current_flags & AM_SEEKING_SeekToKeyFrame) if (current_flags & AM_SEEKING_SeekToKeyFrame)
flags |= GST_SEEK_FLAG_KEY_UNIT; flags |= GST_SEEK_FLAG_KEY_UNIT;
@ -2182,9 +2196,33 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface,
{ {
ERR("Failed to seek (current %s, stop %s).\n", ERR("Failed to seek (current %s, stop %s).\n",
debugstr_time(pin->seek.llCurrent), debugstr_time(pin->seek.llStop)); debugstr_time(pin->seek.llCurrent), debugstr_time(pin->seek.llStop));
return E_FAIL; hr = E_FAIL;
} }
return S_OK;
if (!(current_flags & AM_SEEKING_NoFlush))
{
EnterCriticalSection(&filter->cs);
filter->flushing = false;
LeaveCriticalSection(&filter->cs);
for (i = 0; i < filter->source_count; ++i)
{
if (filter->sources[i]->pin.pin.peer)
IPin_EndFlush(filter->sources[i]->pin.pin.peer);
}
if (filter->reader)
IAsyncReader_EndFlush(filter->reader);
}
/* Release the flushing locks. */
for (i = filter->source_count - 1; i >= 0; --i)
{
if (filter->sources[i]->pin.pin.peer)
LeaveCriticalSection(&pin->flushing_cs);
}
return hr;
} }
static const IMediaSeekingVtbl GST_Seeking_Vtbl = static const IMediaSeekingVtbl GST_Seeking_Vtbl =
@ -2397,6 +2435,9 @@ static void free_source_pin(struct parser_source *pin)
CloseHandle(pin->eos_event); CloseHandle(pin->eos_event);
gst_segment_free(pin->segment); gst_segment_free(pin->segment);
pin->flushing_cs.DebugInfo->Spare[0] = 0;
DeleteCriticalSection(&pin->flushing_cs);
strmbase_seeking_cleanup(&pin->seek); strmbase_seeking_cleanup(&pin->seek);
strmbase_source_cleanup(&pin->pin); strmbase_source_cleanup(&pin->pin);
heap_free(pin); heap_free(pin);
@ -2434,10 +2475,11 @@ static struct parser_source *create_pin(struct parser *filter, const WCHAR *name
GST_ChangeCurrent, GST_ChangeRate); GST_ChangeCurrent, GST_ChangeRate);
InitializeConditionVariable(&pin->event_cv); InitializeConditionVariable(&pin->event_cv);
InitializeConditionVariable(&pin->event_empty_cv); InitializeConditionVariable(&pin->event_empty_cv);
InitializeConditionVariable(&pin->flushing_cv);
InitializeConditionVariable(&pin->flush_stop_cv);
BaseFilterImpl_IncrementPinVersion(&filter->filter); BaseFilterImpl_IncrementPinVersion(&filter->filter);
InitializeCriticalSection(&pin->flushing_cs);
pin->flushing_cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": pin.flushing_cs");
sprintf(pad_name, "qz_sink_%u", filter->source_count); sprintf(pad_name, "qz_sink_%u", filter->source_count);
pin->my_sink = gst_pad_new(pad_name, GST_PAD_SINK); pin->my_sink = gst_pad_new(pad_name, GST_PAD_SINK);
gst_pad_set_element_private(pin->my_sink, pin); gst_pad_set_element_private(pin->my_sink, pin);