diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c index 3cdeeeb5374..e114a19a26b 100644 --- a/dlls/winegstreamer/wg_parser.c +++ b/dlls/winegstreamer/wg_parser.c @@ -415,43 +415,6 @@ static void no_more_pads_cb(GstElement *element, gpointer user) pthread_cond_signal(&parser->init_cond); } -static GstFlowReturn queue_stream_event(struct wg_parser_stream *stream, - const struct wg_parser_event *event, GstBuffer *buffer) -{ - struct wg_parser *parser = stream->parser; - - /* Unlike request_buffer_src() [q.v.], we need to watch for GStreamer - * flushes here. The difference is that we can be blocked by the streaming - * thread not running (or itself flushing on the DirectShow side). - * request_buffer_src() can only be blocked by the upstream source, and that - * is solved by flushing the upstream source. */ - - pthread_mutex_lock(&parser->mutex); - while (!stream->flushing && stream->event.type != WG_PARSER_EVENT_NONE) - pthread_cond_wait(&stream->event_empty_cond, &parser->mutex); - if (stream->flushing) - { - pthread_mutex_unlock(&parser->mutex); - GST_DEBUG("Filter is flushing; discarding event."); - return GST_FLOW_FLUSHING; - } - - assert(GST_IS_BUFFER(buffer)); - if (!gst_buffer_map(buffer, &stream->map_info, GST_MAP_READ)) - { - pthread_mutex_unlock(&parser->mutex); - GST_ERROR("Failed to map buffer.\n"); - return GST_FLOW_ERROR; - } - - stream->event = *event; - stream->buffer = buffer; - pthread_mutex_unlock(&parser->mutex); - pthread_cond_signal(&stream->event_cond); - GST_LOG("Event queued."); - return GST_FLOW_OK; -} - static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) { struct wg_parser_stream *stream = gst_pad_get_element_private(pad); @@ -550,8 +513,8 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event) static GstFlowReturn sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buffer) { struct wg_parser_stream *stream = gst_pad_get_element_private(pad); + struct wg_parser *parser = stream->parser; struct wg_parser_event stream_event; - GstFlowReturn ret; GST_LOG("stream %p, buffer %p.", stream, buffer); @@ -576,10 +539,41 @@ static GstFlowReturn sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *bu stream_event.u.buffer.delta = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT); stream_event.u.buffer.size = gst_buffer_get_size(buffer); - /* Transfer our reference to the buffer to the stream object. */ - if ((ret = queue_stream_event(stream, &stream_event, buffer)) != GST_FLOW_OK) + /* Allow this buffer to be flushed by GStreamer. We are effectively + * implementing a queue object here. */ + + pthread_mutex_lock(&parser->mutex); + + while (!stream->flushing && stream->event.type != WG_PARSER_EVENT_NONE) + pthread_cond_wait(&stream->event_empty_cond, &parser->mutex); + if (stream->flushing) + { + pthread_mutex_unlock(&parser->mutex); + GST_DEBUG("Stream is flushing; discarding buffer."); gst_buffer_unref(buffer); - return ret; + return GST_FLOW_FLUSHING; + } + + if (!gst_buffer_map(buffer, &stream->map_info, GST_MAP_READ)) + { + pthread_mutex_unlock(&parser->mutex); + GST_ERROR("Failed to map buffer.\n"); + gst_buffer_unref(buffer); + return GST_FLOW_ERROR; + } + + stream->event = stream_event; + stream->buffer = buffer; + + pthread_mutex_unlock(&parser->mutex); + pthread_cond_signal(&stream->event_cond); + + /* The chain callback is given a reference to the buffer. Transfer that + * reference to the stream object, which will release it in + * wg_parser_stream_release_buffer(). */ + + GST_LOG("Buffer queued."); + return GST_FLOW_OK; } static gboolean sink_query_cb(GstPad *pad, GstObject *parent, GstQuery *query)