diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c index 457e17250d5..45a3c4e8f2a 100644 --- a/dlls/winegstreamer/gstdemux.c +++ b/dlls/winegstreamer/gstdemux.c @@ -68,6 +68,8 @@ typedef struct GSTImpl { guint64 start, nextofs, nextpullofs, stop; ALLOCATOR_PROPERTIES props; HANDLE event, changed_ofs; + + HANDLE push_thread; } GSTImpl; struct GSTOutPin { @@ -297,6 +299,7 @@ static gboolean gst_base_src_perform_seek(GSTImpl *This, GstEvent *event) gboolean flush; guint32 seqnum; GstEvent *tevent; + BOOL thread = !!This->push_thread; gst_event_parse_seek(event, &rate, &seek_format, &flags, &cur_type, &cur, &stop_type, &stop); @@ -316,6 +319,8 @@ static gboolean gst_base_src_perform_seek(GSTImpl *This, GstEvent *event) gst_pad_push_event(This->my_src, tevent); if (This->pInputPin.pReader) IAsyncReader_BeginFlush(This->pInputPin.pReader); + if (thread) + gst_pad_activate_push(This->my_src, 0); } TRACE("++++++++++++++++ perform byte seek ------------------\n"); @@ -327,6 +332,8 @@ static gboolean gst_base_src_perform_seek(GSTImpl *This, GstEvent *event) gst_pad_push_event(This->my_src, tevent); if (This->pInputPin.pReader) IAsyncReader_EndFlush(This->pInputPin.pReader); + if (thread) + gst_pad_activate_push(This->my_src, 1); } return res; @@ -400,6 +407,76 @@ static void release_sample(void *data) { TRACE("Releasing %p returns %u\n", data, ret); } +static DWORD CALLBACK push_data(LPVOID iface) { + GSTImpl *This = iface; + IMediaSample *buf; + DWORD_PTR user; + HRESULT hr; + + TRACE("Starting..\n"); + for (;;) { + REFERENCE_TIME tStart, tStop; + ULONG len; + GstBuffer *gstbuf; + BYTE *data; + int ret; + + hr = IMemAllocator_GetBuffer(This->pInputPin.pAlloc, &buf, NULL, NULL, 0); + if (FAILED(hr)) + break; + + len = IMediaSample_GetSize(buf); + + tStart = MEDIATIME_FROM_BYTES(This->nextofs); + tStop = tStart + MEDIATIME_FROM_BYTES(len); + IMediaSample_SetTime(buf, &tStart, &tStop); + + hr = IAsyncReader_Request(This->pInputPin.pReader, buf, 0); + if (FAILED(hr)) { + IMediaSample_Release(buf); + break; + } + This->nextofs += len; + hr = IAsyncReader_WaitForNext(This->pInputPin.pReader, -1, &buf, &user); + if (FAILED(hr) || !buf) { + if (buf) + IMediaSample_Release(buf); + break; + } + + IMediaSample_GetPointer(buf, &data); + gstbuf = gst_app_buffer_new(data, IMediaSample_GetActualDataLength(buf), release_sample, buf); + if (!gstbuf) { + IMediaSample_Release(buf); + return S_OK; + } + gstbuf->duration = gstbuf->timestamp = -1; + ret = gst_pad_push(This->my_src, gstbuf); + if (ret >= 0) + hr = S_OK; + else + ERR("Sending returned: %i\n", ret); + if (ret == GST_FLOW_ERROR) + hr = E_FAIL; + else if (ret == GST_FLOW_WRONG_STATE) + hr = VFW_E_WRONG_STATE; + else if (ret == GST_FLOW_RESEND) + hr = S_FALSE; + if (hr != S_OK) + break; + } + + TRACE("Almost stopping.. %08x\n", hr); + do { + IAsyncReader_WaitForNext(This->pInputPin.pReader, 0, &buf, &user); + if (buf) + IMediaSample_Release(buf); + } while (buf); + + TRACE("Stopping.. %08x\n", hr); + return 0; +} + static HRESULT WINAPI GST_OutPin_QueryAccept(IPin *iface, const AM_MEDIA_TYPE *pmt) { GSTOutPin *pin = (GSTOutPin*)iface; FIXME("stub %p\n", pin); @@ -542,6 +619,29 @@ static GstFlowReturn request_buffer_src(GstPad *pad, guint64 ofs, guint len, Gst return ret; } +static DWORD CALLBACK push_data_init(LPVOID iface) { + GSTImpl *This = iface; + DWORD64 ofs = 0; + + TRACE("Starting..\n"); + for (;;) { + GstBuffer *buf; + GstFlowReturn ret = request_buffer_src(This->my_src, ofs, 4096, &buf); + if (ret < 0) { + ERR("Obtaining buffer returned: %i\n", ret); + break; + } + ret = gst_pad_push(This->my_src, buf); + ofs += 4096; + if (ret) + TRACE("Sending returned: %i\n", ret); + if (ret < 0) + break; + } + TRACE("Stopping..\n"); + return 0; +} + static void removed_decoded_pad(GstElement *bin, GstPad *pad, GSTImpl *This) { int x; GSTOutPin *pin; @@ -697,6 +797,27 @@ static gboolean query_function(GstPad *pad, GstQuery *query) { } } +static gboolean activate_push(GstPad *pad, gboolean activate) { + GSTImpl *This = gst_pad_get_element_private(pad); + EnterCriticalSection(&This->filter.csFilter); + if (!activate) { + TRACE("Deactivating\n"); + if (This->push_thread) { + WaitForSingleObject(This->push_thread, -1); + CloseHandle(This->push_thread); + This->push_thread = NULL; + } + } else if (!This->push_thread) { + TRACE("Activating\n"); + if (This->initial) + This->push_thread = CreateThread(NULL, 0, push_data_init, This, 0, NULL); + else + This->push_thread = CreateThread(NULL, 0, push_data, This, 0, NULL); + } + LeaveCriticalSection(&This->filter.csFilter); + return 1; +} + static void no_more_pads(GstElement *decodebin, GSTImpl *This) { FIXME("Done\n"); SetEvent(This->event); @@ -752,6 +873,7 @@ static HRESULT GST_Connect(GSTInPin *pPin, IPin *pConnectPin, ALLOCATOR_PROPERTI gst_pad_set_getrange_function(This->my_src, request_buffer_src); gst_pad_set_checkgetrange_function(This->my_src, check_get_range); gst_pad_set_query_function(This->my_src, query_function); + gst_pad_set_activatepush_function(This->my_src, activate_push); gst_pad_set_event_function(This->my_src, event_src); gst_pad_set_element_private (This->my_src, This); This->their_sink = gst_element_get_static_pad(This->gstfilter, "sink"); @@ -857,6 +979,7 @@ IUnknown * CALLBACK Gstreamer_Splitter_create(IUnknown *punkout, HRESULT *phr) { This->cStreams = 0; This->ppPins = NULL; + This->push_thread = NULL; This->event = CreateEventW(NULL, 0, 0, NULL); SourceSeeking_Init(&This->sourceSeeking, &GST_Seeking_Vtbl, GST_ChangeStop, GST_ChangeCurrent, GST_ChangeRate, &This->filter.csFilter);