winegstreamer: Manage our own thread for read requests.
Synchronization around flushing (on both sides) is tricky here, and the solution used by this patch means that a "kernel" thread can be blocked on a "user" thread. However, since it's not a real kernel thread and can't bring down the rest of the "kernel", I don't think this is a concern. Signed-off-by: Zebediah Figura <z.figura12@gmail.com> Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
parent
2e41e3be57
commit
3b885fb5f3
|
@ -63,7 +63,7 @@ struct parser
|
|||
/* FIXME: It would be nice to avoid duplicating these with strmbase.
|
||||
* However, synchronization is tricky; we need access to be protected by a
|
||||
* separate lock. */
|
||||
bool streaming, flushing;
|
||||
bool streaming, flushing, sink_connected;
|
||||
|
||||
GstElement *container;
|
||||
GstPad *my_src, *their_sink;
|
||||
|
@ -76,6 +76,17 @@ struct parser
|
|||
|
||||
HANDLE push_thread;
|
||||
|
||||
HANDLE read_thread;
|
||||
pthread_cond_t read_cond, read_done_cond;
|
||||
struct
|
||||
{
|
||||
GstBuffer *buffer;
|
||||
uint64_t offset;
|
||||
uint32_t size;
|
||||
bool done;
|
||||
GstFlowReturn ret;
|
||||
} read_request;
|
||||
|
||||
BOOL (*init_gst)(struct parser *filter);
|
||||
HRESULT (*source_query_accept)(struct parser_source *pin, const AM_MEDIA_TYPE *mt);
|
||||
HRESULT (*source_get_media_type)(struct parser_source *pin, unsigned int index, AM_MEDIA_TYPE *mt);
|
||||
|
@ -684,6 +695,12 @@ static GstFlowReturn queue_stream_event(struct parser_source *pin, const struct
|
|||
{
|
||||
struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter);
|
||||
|
||||
/* Unlike request_buffer_src() [q.v.], we need to watch for GStreamer
|
||||
* flushes here. The difference is that we can be blocked by the streaming
|
||||
* thread not running (or itself flushing on the DirectShow side).
|
||||
* request_buffer_src() can only be blocked by the upstream source, and that
|
||||
* is solved by flushing the upstream source. */
|
||||
|
||||
pthread_mutex_lock(&filter->mutex);
|
||||
while (!pin->flushing && pin->event.type != PARSER_EVENT_NONE)
|
||||
pthread_cond_wait(&pin->event_empty_cond, &filter->mutex);
|
||||
|
@ -1095,14 +1112,51 @@ static DWORD CALLBACK stream_thread(void *arg)
|
|||
return 0;
|
||||
}
|
||||
|
||||
static GstFlowReturn request_buffer_src(GstPad *pad, GstObject *parent, guint64 ofs, guint len, GstBuffer **buffer)
|
||||
static GstFlowReturn request_buffer_src(GstPad *pad, GstObject *parent, guint64 offset, guint size, GstBuffer **buffer)
|
||||
{
|
||||
struct parser *This = gst_pad_get_element_private(pad);
|
||||
struct parser *filter = gst_pad_get_element_private(pad);
|
||||
GstBuffer *new_buffer = NULL;
|
||||
GstFlowReturn ret;
|
||||
|
||||
GST_LOG("pad %p, offset %" G_GINT64_MODIFIER "u, length %u, buffer %p.", pad, offset, size, *buffer);
|
||||
|
||||
if (!*buffer)
|
||||
*buffer = new_buffer = gst_buffer_new_and_alloc(size);
|
||||
|
||||
pthread_mutex_lock(&filter->mutex);
|
||||
|
||||
assert(!filter->read_request.buffer);
|
||||
filter->read_request.buffer = *buffer;
|
||||
filter->read_request.offset = offset;
|
||||
filter->read_request.size = size;
|
||||
filter->read_request.done = false;
|
||||
pthread_cond_signal(&filter->read_cond);
|
||||
|
||||
/* Note that we don't unblock this wait on GST_EVENT_FLUSH_START. We expect
|
||||
* the upstream pin to flush if necessary. We should never be blocked on
|
||||
* read_thread() not running. */
|
||||
|
||||
while (!filter->read_request.done)
|
||||
pthread_cond_wait(&filter->read_done_cond, &filter->mutex);
|
||||
|
||||
ret = filter->read_request.ret;
|
||||
|
||||
pthread_mutex_unlock(&filter->mutex);
|
||||
|
||||
GST_LOG("Request returned %s.", gst_flow_get_name(ret));
|
||||
|
||||
if (ret != GST_FLOW_OK && new_buffer)
|
||||
gst_buffer_unref(new_buffer);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static GstFlowReturn read_buffer(struct parser *This, guint64 ofs, guint len, GstBuffer *buffer)
|
||||
{
|
||||
HRESULT hr;
|
||||
GstMapInfo info;
|
||||
|
||||
TRACE("pad %p, offset %s, length %u, buffer %p.\n", pad, wine_dbgstr_longlong(ofs), len, *buffer);
|
||||
TRACE("filter %p, offset %s, length %u, buffer %p.\n", This, wine_dbgstr_longlong(ofs), len, buffer);
|
||||
|
||||
if (ofs == GST_BUFFER_OFFSET_NONE)
|
||||
ofs = This->nextpullofs;
|
||||
|
@ -1114,22 +1168,47 @@ static GstFlowReturn request_buffer_src(GstPad *pad, GstObject *parent, guint64
|
|||
len = This->filesize - ofs;
|
||||
This->nextpullofs = ofs + len;
|
||||
|
||||
if (!*buffer)
|
||||
*buffer = new_buffer = gst_buffer_new_and_alloc(len);
|
||||
gst_buffer_map(*buffer, &info, GST_MAP_WRITE);
|
||||
gst_buffer_map(buffer, &info, GST_MAP_WRITE);
|
||||
hr = IAsyncReader_SyncRead(This->reader, ofs, len, info.data);
|
||||
gst_buffer_unmap(*buffer, &info);
|
||||
gst_buffer_unmap(buffer, &info);
|
||||
if (FAILED(hr))
|
||||
{
|
||||
ERR("Failed to read data, hr %#x.\n", hr);
|
||||
if (new_buffer)
|
||||
gst_buffer_unref(new_buffer);
|
||||
return GST_FLOW_ERROR;
|
||||
}
|
||||
|
||||
return GST_FLOW_OK;
|
||||
}
|
||||
|
||||
static DWORD CALLBACK read_thread(void *arg)
|
||||
{
|
||||
struct parser *filter = arg;
|
||||
|
||||
TRACE("Starting read thread for filter %p.\n", filter);
|
||||
|
||||
pthread_mutex_lock(&filter->mutex);
|
||||
|
||||
for (;;)
|
||||
{
|
||||
while (filter->sink_connected && !filter->read_request.buffer)
|
||||
pthread_cond_wait(&filter->read_cond, &filter->mutex);
|
||||
|
||||
if (!filter->sink_connected)
|
||||
break;
|
||||
|
||||
filter->read_request.done = true;
|
||||
filter->read_request.ret = read_buffer(filter, filter->read_request.offset,
|
||||
filter->read_request.size, filter->read_request.buffer);
|
||||
filter->read_request.buffer = NULL;
|
||||
pthread_cond_signal(&filter->read_done_cond);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&filter->mutex);
|
||||
|
||||
TRACE("Streaming stopped; exiting.\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void removed_decoded_pad(GstElement *bin, GstPad *pad, gpointer user)
|
||||
{
|
||||
struct parser *filter = user;
|
||||
|
@ -1506,6 +1585,10 @@ static HRESULT GST_Connect(struct parser *This, IPin *pConnectPin)
|
|||
|
||||
IAsyncReader_Length(This->reader, &This->filesize, &avail);
|
||||
|
||||
This->sink_connected = true;
|
||||
|
||||
This->read_thread = CreateThread(NULL, 0, read_thread, This, 0, NULL);
|
||||
|
||||
if (!This->bus) {
|
||||
This->bus = gst_bus_new();
|
||||
gst_bus_set_sync_handler(This->bus, watch_bus, This, NULL);
|
||||
|
@ -1599,6 +1682,8 @@ static void parser_destroy(struct strmbase_filter *iface)
|
|||
gst_object_unref(filter->bus);
|
||||
}
|
||||
|
||||
pthread_cond_destroy(&filter->read_cond);
|
||||
pthread_cond_destroy(&filter->read_done_cond);
|
||||
pthread_cond_destroy(&filter->init_cond);
|
||||
pthread_mutex_destroy(&filter->mutex);
|
||||
|
||||
|
@ -1895,6 +1980,8 @@ static void parser_init_common(struct parser *object)
|
|||
{
|
||||
pthread_mutex_init(&object->mutex, NULL);
|
||||
pthread_cond_init(&object->init_cond, NULL);
|
||||
pthread_cond_init(&object->read_cond, NULL);
|
||||
pthread_cond_init(&object->read_done_cond, NULL);
|
||||
object->flushing = true;
|
||||
}
|
||||
|
||||
|
@ -2411,6 +2498,15 @@ static HRESULT GST_RemoveOutputPins(struct parser *This)
|
|||
gst_object_unref(This->their_sink);
|
||||
This->my_src = This->their_sink = NULL;
|
||||
|
||||
/* read_thread() needs to stay alive to service any read requests GStreamer
|
||||
* sends, so we can only shut it down after GStreamer stops. */
|
||||
pthread_mutex_lock(&This->mutex);
|
||||
This->sink_connected = false;
|
||||
pthread_mutex_unlock(&This->mutex);
|
||||
pthread_cond_signal(&This->read_cond);
|
||||
WaitForSingleObject(This->read_thread, INFINITE);
|
||||
CloseHandle(This->read_thread);
|
||||
|
||||
for (i = 0; i < This->source_count; ++i)
|
||||
free_source_pin(This->sources[i]);
|
||||
|
||||
|
|
Loading…
Reference in New Issue