diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c index 949acd012aa..9cf2379cb2c 100644 --- a/dlls/winegstreamer/gstdemux.c +++ b/dlls/winegstreamer/gstdemux.c @@ -65,7 +65,7 @@ struct parser /* FIXME: It would be nice to avoid duplicating these with strmbase. * However, synchronization is tricky; we need access to be protected by a * separate lock. */ - bool streaming; + bool streaming, flushing; BOOL initial, ignore_flush; GstElement *container; @@ -111,8 +111,9 @@ struct parser_source GstCaps *caps; SourceSeeking seek; - CONDITION_VARIABLE event_cv, event_empty_cv, flushing_cv, flush_stop_cv; - bool flushing, thread_blocked; + CRITICAL_SECTION flushing_cs; + CONDITION_VARIABLE event_cv, event_empty_cv; + bool flushing; struct parser_event event; HANDLE thread; }; @@ -630,8 +631,6 @@ static gboolean gst_base_src_perform_seek(struct parser *This, GstEvent *event) tevent = gst_event_new_flush_start(); gst_event_set_seqnum(tevent, seqnum); gst_pad_push_event(This->my_src, tevent); - if (This->reader) - IAsyncReader_BeginFlush(This->reader); if (thread) gst_pad_set_active(This->my_src, 1); } @@ -643,8 +642,6 @@ static gboolean gst_base_src_perform_seek(struct parser *This, GstEvent *event) tevent = gst_event_new_flush_stop(TRUE); gst_event_set_seqnum(tevent, seqnum); gst_pad_push_event(This->my_src, tevent); - if (This->reader) - IAsyncReader_EndFlush(This->reader); if (thread) gst_pad_set_active(This->my_src, 1); } @@ -659,25 +656,18 @@ static gboolean event_src(GstPad *pad, GstObject *parent, GstEvent *event) TRACE("filter %p, type \"%s\".\n", This, GST_EVENT_TYPE_NAME(event)); - switch (event->type) { + switch (event->type) + { case GST_EVENT_SEEK: ret = gst_base_src_perform_seek(This, event); break; + case GST_EVENT_FLUSH_START: - EnterCriticalSection(&This->filter.filter_cs); - if (This->reader) - IAsyncReader_BeginFlush(This->reader); - LeaveCriticalSection(&This->filter.filter_cs); - break; case GST_EVENT_FLUSH_STOP: - EnterCriticalSection(&This->filter.filter_cs); - if (This->reader) - IAsyncReader_EndFlush(This->reader); - LeaveCriticalSection(&This->filter.filter_cs); - break; case GST_EVENT_QOS: case GST_EVENT_RECONFIGURE: break; + default: WARN("Ignoring \"%s\" event.\n", GST_EVENT_TYPE_NAME(event)); ret = FALSE; @@ -757,19 +747,11 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event) } if (pin->pin.pin.peer) { - IPin_BeginFlush(pin->pin.pin.peer); - EnterCriticalSection(&filter->cs); pin->flushing = true; - WakeConditionVariable(&pin->event_cv); WakeConditionVariable(&pin->event_empty_cv); - /* Wait for the thread to pause itself, to ensure that no stale - * samples are sent. */ - while (!pin->thread_blocked) - SleepConditionVariableCS(&pin->flushing_cv, &filter->cs, INFINITE); - /* And flush out any buffered event. */ switch (pin->event.type) { case PARSER_EVENT_NONE: @@ -797,9 +779,6 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event) EnterCriticalSection(&filter->cs); pin->flushing = false; LeaveCriticalSection(&filter->cs); - WakeConditionVariable(&pin->flush_stop_cv); - - IPin_EndFlush(pin->pin.pin.peer); } break; @@ -1057,32 +1036,31 @@ static DWORD CALLBACK stream_thread(void *arg) { struct parser_event event; + EnterCriticalSection(&pin->flushing_cs); EnterCriticalSection(&filter->cs); - while (filter->streaming && !pin->flushing && pin->event.type == PARSER_EVENT_NONE) + while (filter->streaming && !filter->flushing && pin->event.type == PARSER_EVENT_NONE) SleepConditionVariableCS(&pin->event_cv, &filter->cs, INFINITE); - if (pin->flushing) + if (filter->flushing) { - TRACE("Filter is flushing; pausing thread.\n"); - pin->thread_blocked = true; - WakeConditionVariable(&pin->flushing_cv); - do - SleepConditionVariableCS(&pin->flush_stop_cv, &filter->cs, INFINITE); - while (pin->flushing); - pin->thread_blocked = false; - TRACE("Filter is no longer flushing; resuming thread.\n"); + LeaveCriticalSection(&filter->cs); + LeaveCriticalSection(&pin->flushing_cs); + TRACE("Filter is flushing.\n"); + continue; } if (!filter->streaming) { LeaveCriticalSection(&filter->cs); + LeaveCriticalSection(&pin->flushing_cs); break; } if (!pin->event.type) { LeaveCriticalSection(&filter->cs); + LeaveCriticalSection(&pin->flushing_cs); continue; } @@ -1127,6 +1105,8 @@ static DWORD CALLBACK stream_thread(void *arg) case PARSER_EVENT_NONE: assert(0); } + + LeaveCriticalSection(&pin->flushing_cs); } TRACE("Streaming stopped; exiting.\n"); @@ -2153,7 +2133,10 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface, { GstSeekType current_type = GST_SEEK_TYPE_SET, stop_type = GST_SEEK_TYPE_SET; struct parser_source *pin = impl_from_IMediaSeeking(iface); + struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter); GstSeekFlags flags = 0; + HRESULT hr = S_OK; + int i; TRACE("pin %p, current %s, current_flags %#x, stop %s, stop_flags %#x.\n", pin, current ? debugstr_time(*current) : "", current_flags, @@ -2161,9 +2144,40 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface, mark_wine_thread(); - SourceSeekingImpl_SetPositions(iface, current, current_flags, stop, stop_flags); if (pin->pin.pin.filter->state == State_Stopped) + { + SourceSeekingImpl_SetPositions(iface, current, current_flags, stop, stop_flags); return S_OK; + } + + if (!(current_flags & AM_SEEKING_NoFlush)) + { + EnterCriticalSection(&filter->cs); + filter->flushing = true; + LeaveCriticalSection(&filter->cs); + + for (i = 0; i < filter->source_count; ++i) + { + if (filter->sources[i]->pin.pin.peer) + { + WakeConditionVariable(&pin->event_cv); + IPin_BeginFlush(filter->sources[i]->pin.pin.peer); + } + } + + if (filter->reader) + IAsyncReader_BeginFlush(filter->reader); + } + + /* Acquire the flushing locks. This blocks the streaming threads, and + * ensures the seek is serialized between flushes. */ + for (i = 0; i < filter->source_count; ++i) + { + if (filter->sources[i]->pin.pin.peer) + EnterCriticalSection(&pin->flushing_cs); + } + + SourceSeekingImpl_SetPositions(iface, current, current_flags, stop, stop_flags); if (current_flags & AM_SEEKING_SeekToKeyFrame) flags |= GST_SEEK_FLAG_KEY_UNIT; @@ -2182,9 +2196,33 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface, { ERR("Failed to seek (current %s, stop %s).\n", debugstr_time(pin->seek.llCurrent), debugstr_time(pin->seek.llStop)); - return E_FAIL; + hr = E_FAIL; } - return S_OK; + + if (!(current_flags & AM_SEEKING_NoFlush)) + { + EnterCriticalSection(&filter->cs); + filter->flushing = false; + LeaveCriticalSection(&filter->cs); + + for (i = 0; i < filter->source_count; ++i) + { + if (filter->sources[i]->pin.pin.peer) + IPin_EndFlush(filter->sources[i]->pin.pin.peer); + } + + if (filter->reader) + IAsyncReader_EndFlush(filter->reader); + } + + /* Release the flushing locks. */ + for (i = filter->source_count - 1; i >= 0; --i) + { + if (filter->sources[i]->pin.pin.peer) + LeaveCriticalSection(&pin->flushing_cs); + } + + return hr; } static const IMediaSeekingVtbl GST_Seeking_Vtbl = @@ -2397,6 +2435,9 @@ static void free_source_pin(struct parser_source *pin) CloseHandle(pin->eos_event); gst_segment_free(pin->segment); + pin->flushing_cs.DebugInfo->Spare[0] = 0; + DeleteCriticalSection(&pin->flushing_cs); + strmbase_seeking_cleanup(&pin->seek); strmbase_source_cleanup(&pin->pin); heap_free(pin); @@ -2434,10 +2475,11 @@ static struct parser_source *create_pin(struct parser *filter, const WCHAR *name GST_ChangeCurrent, GST_ChangeRate); InitializeConditionVariable(&pin->event_cv); InitializeConditionVariable(&pin->event_empty_cv); - InitializeConditionVariable(&pin->flushing_cv); - InitializeConditionVariable(&pin->flush_stop_cv); BaseFilterImpl_IncrementPinVersion(&filter->filter); + InitializeCriticalSection(&pin->flushing_cs); + pin->flushing_cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": pin.flushing_cs"); + sprintf(pad_name, "qz_sink_%u", filter->source_count); pin->my_sink = gst_pad_new(pad_name, GST_PAD_SINK); gst_pad_set_element_private(pin->my_sink, pin);