From ef79e902a61e25845b3fac16bc51294b370030d4 Mon Sep 17 00:00:00 2001 From: Zebediah Figura Date: Wed, 20 Jan 2021 18:58:49 -0600 Subject: [PATCH] winegstreamer: Flush connected pins directly in SetPositions(). Instead of propagating GStreamer flush events to corresponding DirectShow pins. This is mainly to avoid more callbacks from GStreamer and further separate the Win32 and Unix code. Signed-off-by: Zebediah Figura Signed-off-by: Alexandre Julliard --- dlls/winegstreamer/gstdemux.c | 130 ++++++++++++++++++++++------------ 1 file changed, 86 insertions(+), 44 deletions(-) diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c index 949acd012aa..9cf2379cb2c 100644 --- a/dlls/winegstreamer/gstdemux.c +++ b/dlls/winegstreamer/gstdemux.c @@ -65,7 +65,7 @@ struct parser /* FIXME: It would be nice to avoid duplicating these with strmbase. * However, synchronization is tricky; we need access to be protected by a * separate lock. */ - bool streaming; + bool streaming, flushing; BOOL initial, ignore_flush; GstElement *container; @@ -111,8 +111,9 @@ struct parser_source GstCaps *caps; SourceSeeking seek; - CONDITION_VARIABLE event_cv, event_empty_cv, flushing_cv, flush_stop_cv; - bool flushing, thread_blocked; + CRITICAL_SECTION flushing_cs; + CONDITION_VARIABLE event_cv, event_empty_cv; + bool flushing; struct parser_event event; HANDLE thread; }; @@ -630,8 +631,6 @@ static gboolean gst_base_src_perform_seek(struct parser *This, GstEvent *event) tevent = gst_event_new_flush_start(); gst_event_set_seqnum(tevent, seqnum); gst_pad_push_event(This->my_src, tevent); - if (This->reader) - IAsyncReader_BeginFlush(This->reader); if (thread) gst_pad_set_active(This->my_src, 1); } @@ -643,8 +642,6 @@ static gboolean gst_base_src_perform_seek(struct parser *This, GstEvent *event) tevent = gst_event_new_flush_stop(TRUE); gst_event_set_seqnum(tevent, seqnum); gst_pad_push_event(This->my_src, tevent); - if (This->reader) - IAsyncReader_EndFlush(This->reader); if (thread) gst_pad_set_active(This->my_src, 1); } @@ -659,25 +656,18 @@ static gboolean event_src(GstPad *pad, GstObject *parent, GstEvent *event) TRACE("filter %p, type \"%s\".\n", This, GST_EVENT_TYPE_NAME(event)); - switch (event->type) { + switch (event->type) + { case GST_EVENT_SEEK: ret = gst_base_src_perform_seek(This, event); break; + case GST_EVENT_FLUSH_START: - EnterCriticalSection(&This->filter.filter_cs); - if (This->reader) - IAsyncReader_BeginFlush(This->reader); - LeaveCriticalSection(&This->filter.filter_cs); - break; case GST_EVENT_FLUSH_STOP: - EnterCriticalSection(&This->filter.filter_cs); - if (This->reader) - IAsyncReader_EndFlush(This->reader); - LeaveCriticalSection(&This->filter.filter_cs); - break; case GST_EVENT_QOS: case GST_EVENT_RECONFIGURE: break; + default: WARN("Ignoring \"%s\" event.\n", GST_EVENT_TYPE_NAME(event)); ret = FALSE; @@ -757,19 +747,11 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event) } if (pin->pin.pin.peer) { - IPin_BeginFlush(pin->pin.pin.peer); - EnterCriticalSection(&filter->cs); pin->flushing = true; - WakeConditionVariable(&pin->event_cv); WakeConditionVariable(&pin->event_empty_cv); - /* Wait for the thread to pause itself, to ensure that no stale - * samples are sent. */ - while (!pin->thread_blocked) - SleepConditionVariableCS(&pin->flushing_cv, &filter->cs, INFINITE); - /* And flush out any buffered event. */ switch (pin->event.type) { case PARSER_EVENT_NONE: @@ -797,9 +779,6 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event) EnterCriticalSection(&filter->cs); pin->flushing = false; LeaveCriticalSection(&filter->cs); - WakeConditionVariable(&pin->flush_stop_cv); - - IPin_EndFlush(pin->pin.pin.peer); } break; @@ -1057,32 +1036,31 @@ static DWORD CALLBACK stream_thread(void *arg) { struct parser_event event; + EnterCriticalSection(&pin->flushing_cs); EnterCriticalSection(&filter->cs); - while (filter->streaming && !pin->flushing && pin->event.type == PARSER_EVENT_NONE) + while (filter->streaming && !filter->flushing && pin->event.type == PARSER_EVENT_NONE) SleepConditionVariableCS(&pin->event_cv, &filter->cs, INFINITE); - if (pin->flushing) + if (filter->flushing) { - TRACE("Filter is flushing; pausing thread.\n"); - pin->thread_blocked = true; - WakeConditionVariable(&pin->flushing_cv); - do - SleepConditionVariableCS(&pin->flush_stop_cv, &filter->cs, INFINITE); - while (pin->flushing); - pin->thread_blocked = false; - TRACE("Filter is no longer flushing; resuming thread.\n"); + LeaveCriticalSection(&filter->cs); + LeaveCriticalSection(&pin->flushing_cs); + TRACE("Filter is flushing.\n"); + continue; } if (!filter->streaming) { LeaveCriticalSection(&filter->cs); + LeaveCriticalSection(&pin->flushing_cs); break; } if (!pin->event.type) { LeaveCriticalSection(&filter->cs); + LeaveCriticalSection(&pin->flushing_cs); continue; } @@ -1127,6 +1105,8 @@ static DWORD CALLBACK stream_thread(void *arg) case PARSER_EVENT_NONE: assert(0); } + + LeaveCriticalSection(&pin->flushing_cs); } TRACE("Streaming stopped; exiting.\n"); @@ -2153,7 +2133,10 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface, { GstSeekType current_type = GST_SEEK_TYPE_SET, stop_type = GST_SEEK_TYPE_SET; struct parser_source *pin = impl_from_IMediaSeeking(iface); + struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter); GstSeekFlags flags = 0; + HRESULT hr = S_OK; + int i; TRACE("pin %p, current %s, current_flags %#x, stop %s, stop_flags %#x.\n", pin, current ? debugstr_time(*current) : "", current_flags, @@ -2161,9 +2144,40 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface, mark_wine_thread(); - SourceSeekingImpl_SetPositions(iface, current, current_flags, stop, stop_flags); if (pin->pin.pin.filter->state == State_Stopped) + { + SourceSeekingImpl_SetPositions(iface, current, current_flags, stop, stop_flags); return S_OK; + } + + if (!(current_flags & AM_SEEKING_NoFlush)) + { + EnterCriticalSection(&filter->cs); + filter->flushing = true; + LeaveCriticalSection(&filter->cs); + + for (i = 0; i < filter->source_count; ++i) + { + if (filter->sources[i]->pin.pin.peer) + { + WakeConditionVariable(&pin->event_cv); + IPin_BeginFlush(filter->sources[i]->pin.pin.peer); + } + } + + if (filter->reader) + IAsyncReader_BeginFlush(filter->reader); + } + + /* Acquire the flushing locks. This blocks the streaming threads, and + * ensures the seek is serialized between flushes. */ + for (i = 0; i < filter->source_count; ++i) + { + if (filter->sources[i]->pin.pin.peer) + EnterCriticalSection(&pin->flushing_cs); + } + + SourceSeekingImpl_SetPositions(iface, current, current_flags, stop, stop_flags); if (current_flags & AM_SEEKING_SeekToKeyFrame) flags |= GST_SEEK_FLAG_KEY_UNIT; @@ -2182,9 +2196,33 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface, { ERR("Failed to seek (current %s, stop %s).\n", debugstr_time(pin->seek.llCurrent), debugstr_time(pin->seek.llStop)); - return E_FAIL; + hr = E_FAIL; } - return S_OK; + + if (!(current_flags & AM_SEEKING_NoFlush)) + { + EnterCriticalSection(&filter->cs); + filter->flushing = false; + LeaveCriticalSection(&filter->cs); + + for (i = 0; i < filter->source_count; ++i) + { + if (filter->sources[i]->pin.pin.peer) + IPin_EndFlush(filter->sources[i]->pin.pin.peer); + } + + if (filter->reader) + IAsyncReader_EndFlush(filter->reader); + } + + /* Release the flushing locks. */ + for (i = filter->source_count - 1; i >= 0; --i) + { + if (filter->sources[i]->pin.pin.peer) + LeaveCriticalSection(&pin->flushing_cs); + } + + return hr; } static const IMediaSeekingVtbl GST_Seeking_Vtbl = @@ -2397,6 +2435,9 @@ static void free_source_pin(struct parser_source *pin) CloseHandle(pin->eos_event); gst_segment_free(pin->segment); + pin->flushing_cs.DebugInfo->Spare[0] = 0; + DeleteCriticalSection(&pin->flushing_cs); + strmbase_seeking_cleanup(&pin->seek); strmbase_source_cleanup(&pin->pin); heap_free(pin); @@ -2434,10 +2475,11 @@ static struct parser_source *create_pin(struct parser *filter, const WCHAR *name GST_ChangeCurrent, GST_ChangeRate); InitializeConditionVariable(&pin->event_cv); InitializeConditionVariable(&pin->event_empty_cv); - InitializeConditionVariable(&pin->flushing_cv); - InitializeConditionVariable(&pin->flush_stop_cv); BaseFilterImpl_IncrementPinVersion(&filter->filter); + InitializeCriticalSection(&pin->flushing_cs); + pin->flushing_cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": pin.flushing_cs"); + sprintf(pad_name, "qz_sink_%u", filter->source_count); pin->my_sink = gst_pad_new(pad_name, GST_PAD_SINK); gst_pad_set_element_private(pin->my_sink, pin);