winegstreamer: Use a pthread condition variable to wait for filter initialization.

Signed-off-by: Zebediah Figura <z.figura12@gmail.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
Zebediah Figura 2021-01-21 17:22:53 -06:00 committed by Alexandre Julliard
parent 2546ad8115
commit 1f7ab818d8
1 changed files with 79 additions and 41 deletions

View File

@ -71,7 +71,10 @@ struct parser
GstPad *my_src, *their_sink;
GstBus *bus;
guint64 start, nextofs, nextpullofs, stop;
HANDLE no_more_pads_event, duration_event, error_event;
pthread_mutex_t mutex;
pthread_cond_t init_cond;
bool no_more_pads, has_duration, error;
HANDLE push_thread;
@ -109,14 +112,13 @@ struct parser_source
GstPad *their_src, *post_sink, *post_src, *my_sink;
GstElement *flip;
HANDLE caps_event, eos_event;
GstSegment *segment;
GstCaps *caps;
SourceSeeking seek;
CRITICAL_SECTION flushing_cs;
CONDITION_VARIABLE event_cv, event_empty_cv;
bool flushing;
bool flushing, eos;
struct parser_event event;
HANDLE thread;
};
@ -742,7 +744,12 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
queue_stream_event(pin, &stream_event);
}
else
SetEvent(pin->eos_event);
{
pthread_mutex_lock(&filter->mutex);
pin->eos = true;
pthread_mutex_unlock(&filter->mutex);
pthread_cond_signal(&filter->init_cond);
}
break;
case GST_EVENT_FLUSH_START:
@ -785,8 +792,10 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
GstCaps *caps;
gst_event_parse_caps(event, &caps);
pthread_mutex_lock(&filter->mutex);
gst_caps_replace(&pin->caps, caps);
SetEvent(pin->caps_event);
pthread_mutex_unlock(&filter->mutex);
pthread_cond_signal(&filter->init_cond);
break;
}
@ -1390,7 +1399,11 @@ static void no_more_pads(GstElement *decodebin, gpointer user)
{
struct parser *filter = user;
TRACE("filter %p.\n", filter);
SetEvent(filter->no_more_pads_event);
pthread_mutex_lock(&filter->mutex);
filter->no_more_pads = true;
pthread_mutex_unlock(&filter->mutex);
pthread_cond_signal(&filter->init_cond);
}
static GstAutoplugSelectResult autoplug_blacklist(GstElement *bin, GstPad *pad, GstCaps *caps, GstElementFactory *fact, gpointer user)
@ -1428,8 +1441,12 @@ static GstBusSyncReply watch_bus(GstBus *bus, GstMessage *msg, gpointer data)
ERR("%s\n", dbg_info);
g_error_free(err);
g_free(dbg_info);
SetEvent(filter->error_event);
pthread_mutex_lock(&filter->mutex);
filter->error = true;
pthread_mutex_unlock(&filter->mutex);
pthread_cond_signal(&filter->init_cond);
break;
case GST_MESSAGE_WARNING:
gst_message_parse_warning(msg, &err, &dbg_info);
WARN("%s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
@ -1437,9 +1454,14 @@ static GstBusSyncReply watch_bus(GstBus *bus, GstMessage *msg, gpointer data)
g_error_free(err);
g_free(dbg_info);
break;
case GST_MESSAGE_DURATION_CHANGED:
SetEvent(filter->duration_event);
pthread_mutex_lock(&filter->mutex);
filter->has_duration = true;
pthread_mutex_unlock(&filter->mutex);
pthread_cond_signal(&filter->init_cond);
break;
default:
break;
}
@ -1499,17 +1521,25 @@ static HRESULT GST_Connect(struct parser *This, IPin *pConnectPin)
if (!This->init_gst(This))
return E_FAIL;
pthread_mutex_lock(&This->mutex);
for (i = 0; i < This->source_count; ++i)
{
struct parser_source *pin = This->sources[i];
const HANDLE events[2] = {pin->caps_event, This->error_event};
pin->seek.llDuration = pin->seek.llStop = query_duration(pin->their_src);
pin->seek.llCurrent = 0;
if (WaitForMultipleObjects(2, events, FALSE, INFINITE))
while (!pin->caps && !This->error)
pthread_cond_wait(&This->init_cond, &This->mutex);
if (This->error)
{
pthread_mutex_unlock(&This->mutex);
return E_FAIL;
}
}
pthread_mutex_unlock(&This->mutex);
This->nextofs = This->nextpullofs = 0;
return S_OK;
}
@ -1545,10 +1575,6 @@ static void parser_destroy(struct strmbase_filter *iface)
struct parser *filter = impl_from_strmbase_filter(iface);
HRESULT hr;
CloseHandle(filter->no_more_pads_event);
CloseHandle(filter->duration_event);
CloseHandle(filter->error_event);
/* Don't need to clean up output pins, disconnecting input pin will do that */
if (filter->sink.pin.peer)
{
@ -1568,6 +1594,9 @@ static void parser_destroy(struct strmbase_filter *iface)
gst_object_unref(filter->bus);
}
pthread_cond_destroy(&filter->init_cond);
pthread_mutex_destroy(&filter->mutex);
filter->cs.DebugInfo->Spare[0] = 0;
DeleteCriticalSection(&filter->cs);
strmbase_sink_cleanup(&filter->sink);
@ -1721,7 +1750,6 @@ static const struct strmbase_sink_ops sink_ops =
static BOOL decodebin_parser_init_gst(struct parser *filter)
{
GstElement *element = gst_element_factory_make("decodebin", NULL);
HANDLE events[2];
int ret;
if (!element)
@ -1739,7 +1767,10 @@ static BOOL decodebin_parser_init_gst(struct parser *filter)
g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_wrapper), filter);
filter->their_sink = gst_element_get_static_pad(element, "sink");
ResetEvent(filter->no_more_pads_event);
pthread_mutex_lock(&filter->mutex);
filter->no_more_pads = filter->error = false;
pthread_mutex_unlock(&filter->mutex);
if ((ret = gst_pad_link(filter->my_src, filter->their_sink)) < 0)
{
@ -1755,11 +1786,15 @@ static BOOL decodebin_parser_init_gst(struct parser *filter)
return FALSE;
}
events[0] = filter->no_more_pads_event;
events[1] = filter->error_event;
if (WaitForMultipleObjects(2, events, FALSE, INFINITE))
pthread_mutex_lock(&filter->mutex);
while (!filter->no_more_pads && !filter->error)
pthread_cond_wait(&filter->init_cond, &filter->mutex);
if (filter->error)
{
pthread_mutex_unlock(&filter->mutex);
return FALSE;
}
pthread_mutex_unlock(&filter->mutex);
return TRUE;
}
@ -1855,7 +1890,8 @@ static BOOL parser_init_gstreamer(void)
static void parser_init_common(struct parser *object)
{
object->error_event = CreateEventW(NULL, TRUE, FALSE, NULL);
pthread_mutex_init(&object->mutex, NULL);
pthread_cond_init(&object->init_cond, NULL);
InitializeCriticalSection(&object->cs);
object->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": parser.cs");
object->flushing = true;
@ -1878,7 +1914,6 @@ HRESULT decodebin_parser_create(IUnknown *outer, IUnknown **out)
strmbase_filter_init(&object->filter, outer, &CLSID_decodebin_parser, &filter_ops);
strmbase_sink_init(&object->sink, &object->filter, wcsInputPinName, &sink_ops, NULL);
object->no_more_pads_event = CreateEventW(NULL, FALSE, FALSE, NULL);
object->init_gst = decodebin_parser_init_gst;
object->source_query_accept = decodebin_parser_source_query_accept;
object->source_get_media_type = decodebin_parser_source_get_media_type;
@ -2291,8 +2326,6 @@ static void free_source_pin(struct parser_source *pin)
gst_object_unref(pin->their_src);
}
gst_object_unref(pin->my_sink);
CloseHandle(pin->caps_event);
CloseHandle(pin->eos_event);
gst_segment_free(pin->segment);
pin->flushing_cs.DebugInfo->Spare[0] = 0;
@ -2326,8 +2359,6 @@ static struct parser_source *create_pin(struct parser *filter, const WCHAR *name
return NULL;
strmbase_source_init(&pin->pin, &filter->filter, name, &source_ops);
pin->caps_event = CreateEventW(NULL, FALSE, FALSE, NULL);
pin->eos_event = CreateEventW(NULL, FALSE, FALSE, NULL);
pin->segment = gst_segment_new();
gst_segment_init(pin->segment, GST_FORMAT_TIME);
pin->IQualityControl_iface.lpVtbl = &GSTOutPin_QualityControl_Vtbl;
@ -2606,7 +2637,6 @@ static const struct strmbase_sink_ops avi_splitter_sink_ops =
static BOOL avi_splitter_init_gst(struct parser *filter)
{
GstElement *element = gst_element_factory_make("avidemux", NULL);
HANDLE events[2];
int ret;
if (!element)
@ -2623,7 +2653,10 @@ static BOOL avi_splitter_init_gst(struct parser *filter)
g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_wrapper), filter);
filter->their_sink = gst_element_get_static_pad(element, "sink");
ResetEvent(filter->no_more_pads_event);
pthread_mutex_lock(&filter->mutex);
filter->no_more_pads = filter->error = false;
pthread_mutex_unlock(&filter->mutex);
if ((ret = gst_pad_link(filter->my_src, filter->their_sink)) < 0)
{
@ -2639,11 +2672,15 @@ static BOOL avi_splitter_init_gst(struct parser *filter)
return FALSE;
}
events[0] = filter->no_more_pads_event;
events[1] = filter->error_event;
if (WaitForMultipleObjects(2, events, FALSE, INFINITE))
pthread_mutex_lock(&filter->mutex);
while (!filter->no_more_pads && !filter->error)
pthread_cond_wait(&filter->init_cond, &filter->mutex);
if (filter->error)
{
pthread_mutex_unlock(&filter->mutex);
return FALSE;
}
pthread_mutex_unlock(&filter->mutex);
return TRUE;
}
@ -2686,7 +2723,6 @@ HRESULT avi_splitter_create(IUnknown *outer, IUnknown **out)
strmbase_filter_init(&object->filter, outer, &CLSID_AviSplitter, &filter_ops);
strmbase_sink_init(&object->sink, &object->filter, sink_name, &avi_splitter_sink_ops, NULL);
object->no_more_pads_event = CreateEventW(NULL, FALSE, FALSE, NULL);
object->init_gst = avi_splitter_init_gst;
object->source_query_accept = avi_splitter_source_query_accept;
object->source_get_media_type = avi_splitter_source_get_media_type;
@ -2721,8 +2757,6 @@ static BOOL mpeg_splitter_init_gst(struct parser *filter)
static const WCHAR source_name[] = {'A','u','d','i','o',0};
struct parser_source *pin;
GstElement *element;
HANDLE events[3];
DWORD res;
int ret;
if (!(element = gst_element_factory_make("mpegaudioparse", NULL)))
@ -2759,11 +2793,16 @@ static BOOL mpeg_splitter_init_gst(struct parser *filter)
return FALSE;
}
events[0] = filter->duration_event;
events[1] = filter->error_event;
events[2] = pin->eos_event;
res = WaitForMultipleObjects(3, events, FALSE, INFINITE);
return (res != 1);
pthread_mutex_lock(&filter->mutex);
while (!filter->has_duration && !filter->error && !pin->eos)
pthread_cond_wait(&filter->init_cond, &filter->mutex);
if (filter->error)
{
pthread_mutex_unlock(&filter->mutex);
return FALSE;
}
pthread_mutex_unlock(&filter->mutex);
return TRUE;
}
static HRESULT mpeg_splitter_source_query_accept(struct parser_source *pin, const AM_MEDIA_TYPE *mt)
@ -2830,7 +2869,6 @@ HRESULT mpeg_splitter_create(IUnknown *outer, IUnknown **out)
strmbase_sink_init(&object->sink, &object->filter, sink_name, &mpeg_splitter_sink_ops, NULL);
object->IAMStreamSelect_iface.lpVtbl = &stream_select_vtbl;
object->duration_event = CreateEventW(NULL, FALSE, FALSE, NULL);
object->init_gst = mpeg_splitter_init_gst;
object->source_query_accept = mpeg_splitter_source_query_accept;
object->source_get_media_type = mpeg_splitter_source_get_media_type;