winegstreamer: Return S_FALSE from wg_parser_stream_get_event() if the stream is EOS.

Instead of using WG_PARSER_EVENT_EOS.

Signed-off-by: Zebediah Figura <zfigura@codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
Zebediah Figura 2022-02-23 15:46:48 -06:00 committed by Alexandre Julliard
parent 5144b27661
commit 28c9c138d2
7 changed files with 122 additions and 165 deletions

View File

@ -80,7 +80,7 @@ void wg_parser_stream_get_preferred_format(struct wg_parser_stream *stream, stru
void wg_parser_stream_enable(struct wg_parser_stream *stream, const struct wg_format *format); void wg_parser_stream_enable(struct wg_parser_stream *stream, const struct wg_format *format);
void wg_parser_stream_disable(struct wg_parser_stream *stream); void wg_parser_stream_disable(struct wg_parser_stream *stream);
void wg_parser_stream_get_event(struct wg_parser_stream *stream, struct wg_parser_event *event); bool wg_parser_stream_get_event(struct wg_parser_stream *stream, struct wg_parser_event *event);
bool wg_parser_stream_copy_buffer(struct wg_parser_stream *stream, bool wg_parser_stream_copy_buffer(struct wg_parser_stream *stream,
void *data, uint32_t offset, uint32_t size); void *data, uint32_t offset, uint32_t size);
void wg_parser_stream_release_buffer(struct wg_parser_stream *stream); void wg_parser_stream_release_buffer(struct wg_parser_stream *stream);

View File

@ -172,7 +172,7 @@ void wg_parser_stream_disable(struct wg_parser_stream *stream)
__wine_unix_call(unix_handle, unix_wg_parser_stream_disable, stream); __wine_unix_call(unix_handle, unix_wg_parser_stream_disable, stream);
} }
void wg_parser_stream_get_event(struct wg_parser_stream *stream, struct wg_parser_event *event) bool wg_parser_stream_get_event(struct wg_parser_stream *stream, struct wg_parser_event *event)
{ {
struct wg_parser_stream_get_event_params params = struct wg_parser_stream_get_event_params params =
{ {
@ -180,7 +180,7 @@ void wg_parser_stream_get_event(struct wg_parser_stream *stream, struct wg_parse
.event = event, .event = event,
}; };
__wine_unix_call(unix_handle, unix_wg_parser_stream_get_event, &params); return !__wine_unix_call(unix_handle, unix_wg_parser_stream_get_event, &params);
} }
bool wg_parser_stream_copy_buffer(struct wg_parser_stream *stream, bool wg_parser_stream_copy_buffer(struct wg_parser_stream *stream,

View File

@ -532,33 +532,15 @@ static void wait_on_sample(struct media_stream *stream, IUnknown *token)
TRACE("%p, %p\n", stream, token); TRACE("%p, %p\n", stream, token);
if (stream->eos) if (wg_parser_stream_get_event(stream->wg_stream, &event))
{ {
IMFMediaEventQueue_QueueEventParamVar(stream->event_queue, MEError, &GUID_NULL, MF_E_END_OF_STREAM, &empty_var); send_buffer(stream, &event, token);
return;
} }
else
for (;;)
{ {
wg_parser_stream_get_event(stream->wg_stream, &event); stream->eos = TRUE;
IMFMediaEventQueue_QueueEventParamVar(stream->event_queue, MEEndOfStream, &GUID_NULL, S_OK, &empty_var);
TRACE("Got event of type %#x.\n", event.type); dispatch_end_of_presentation(stream->parent_source);
switch (event.type)
{
case WG_PARSER_EVENT_BUFFER:
send_buffer(stream, &event, token);
return;
case WG_PARSER_EVENT_EOS:
stream->eos = TRUE;
IMFMediaEventQueue_QueueEventParamVar(stream->event_queue, MEEndOfStream, &GUID_NULL, S_OK, &empty_var);
dispatch_end_of_presentation(stream->parent_source);
return;
case WG_PARSER_EVENT_NONE:
assert(0);
}
} }
} }

View File

@ -833,25 +833,19 @@ static DWORD CALLBACK stream_thread(void *arg)
continue; continue;
} }
wg_parser_stream_get_event(pin->wg_stream, &event); if (wg_parser_stream_get_event(pin->wg_stream, &event))
{
send_buffer(pin, &event);
}
else
{
TRACE("Got EOS.\n");
IPin_EndOfStream(pin->pin.pin.peer);
pin->eos = true;
}
TRACE("Got event of type %#x.\n", event.type); TRACE("Got event of type %#x.\n", event.type);
switch (event.type)
{
case WG_PARSER_EVENT_BUFFER:
send_buffer(pin, &event);
break;
case WG_PARSER_EVENT_EOS:
IPin_EndOfStream(pin->pin.pin.peer);
pin->eos = true;
break;
case WG_PARSER_EVENT_NONE:
assert(0);
}
LeaveCriticalSection(&pin->flushing_cs); LeaveCriticalSection(&pin->flushing_cs);
} }

View File

@ -107,7 +107,6 @@ enum wg_parser_event_type
{ {
WG_PARSER_EVENT_NONE = 0, WG_PARSER_EVENT_NONE = 0,
WG_PARSER_EVENT_BUFFER, WG_PARSER_EVENT_BUFFER,
WG_PARSER_EVENT_EOS,
}; };
struct wg_parser_event struct wg_parser_event

View File

@ -259,19 +259,20 @@ static NTSTATUS wg_parser_stream_get_event(void *args)
pthread_mutex_lock(&parser->mutex); pthread_mutex_lock(&parser->mutex);
while (stream->event.type == WG_PARSER_EVENT_NONE) while (!stream->eos && stream->event.type == WG_PARSER_EVENT_NONE)
pthread_cond_wait(&stream->event_cond, &parser->mutex); pthread_cond_wait(&stream->event_cond, &parser->mutex);
*params->event = stream->event; /* Note that we can both have a buffer and stream->eos, in which case we
* must return the buffer. */
if (stream->event.type != WG_PARSER_EVENT_BUFFER) if (stream->event.type != WG_PARSER_EVENT_NONE)
{ {
stream->event.type = WG_PARSER_EVENT_NONE; *params->event = stream->event;
pthread_cond_signal(&stream->event_empty_cond); pthread_mutex_unlock(&parser->mutex);
return S_OK;
} }
pthread_mutex_unlock(&parser->mutex);
return S_OK; pthread_mutex_unlock(&parser->mutex);
return S_FALSE;
} }
static NTSTATUS wg_parser_stream_copy_buffer(void *args) static NTSTATUS wg_parser_stream_copy_buffer(void *args)
@ -434,16 +435,15 @@ static GstFlowReturn queue_stream_event(struct wg_parser_stream *stream,
GST_DEBUG("Filter is flushing; discarding event."); GST_DEBUG("Filter is flushing; discarding event.");
return GST_FLOW_FLUSHING; return GST_FLOW_FLUSHING;
} }
if (buffer)
assert(GST_IS_BUFFER(buffer));
if (!gst_buffer_map(buffer, &stream->map_info, GST_MAP_READ))
{ {
assert(GST_IS_BUFFER(buffer)); pthread_mutex_unlock(&parser->mutex);
if (!gst_buffer_map(buffer, &stream->map_info, GST_MAP_READ)) GST_ERROR("Failed to map buffer.\n");
{ return GST_FLOW_ERROR;
pthread_mutex_unlock(&parser->mutex);
GST_ERROR("Failed to map buffer.\n");
return GST_FLOW_ERROR;
}
} }
stream->event = *event; stream->event = *event;
stream->buffer = buffer; stream->buffer = buffer;
pthread_mutex_unlock(&parser->mutex); pthread_mutex_unlock(&parser->mutex);
@ -479,20 +479,13 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event)
break; break;
case GST_EVENT_EOS: case GST_EVENT_EOS:
pthread_mutex_lock(&parser->mutex);
stream->eos = true;
pthread_mutex_unlock(&parser->mutex);
if (stream->enabled) if (stream->enabled)
{ pthread_cond_signal(&stream->event_cond);
struct wg_parser_event stream_event;
stream_event.type = WG_PARSER_EVENT_EOS;
queue_stream_event(stream, &stream_event, NULL);
}
else else
{
pthread_mutex_lock(&parser->mutex);
stream->eos = true;
pthread_mutex_unlock(&parser->mutex);
pthread_cond_signal(&parser->init_cond); pthread_cond_signal(&parser->init_cond);
}
break; break;
case GST_EVENT_FLUSH_START: case GST_EVENT_FLUSH_START:
@ -524,12 +517,13 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event)
if (reset_time) if (reset_time)
gst_segment_init(&stream->segment, GST_FORMAT_UNDEFINED); gst_segment_init(&stream->segment, GST_FORMAT_UNDEFINED);
pthread_mutex_lock(&parser->mutex);
stream->eos = false;
if (stream->enabled) if (stream->enabled)
{
pthread_mutex_lock(&parser->mutex);
stream->flushing = false; stream->flushing = false;
pthread_mutex_unlock(&parser->mutex);
} pthread_mutex_unlock(&parser->mutex);
break; break;
} }

View File

@ -1826,6 +1826,10 @@ HRESULT wm_reader_get_stream_sample(struct wm_stream *stream,
IWMReaderCallbackAdvanced *callback_advanced = stream->reader->callback_advanced; IWMReaderCallbackAdvanced *callback_advanced = stream->reader->callback_advanced;
struct wg_parser_stream *wg_stream = stream->wg_stream; struct wg_parser_stream *wg_stream = stream->wg_stream;
struct wg_parser_event event; struct wg_parser_event event;
DWORD size, capacity;
INSSBuffer *sample;
HRESULT hr;
BYTE *data;
if (stream->selection == WMT_OFF) if (stream->selection == WMT_OFF)
return NS_E_INVALID_REQUEST; return NS_E_INVALID_REQUEST;
@ -1835,104 +1839,88 @@ HRESULT wm_reader_get_stream_sample(struct wm_stream *stream,
for (;;) for (;;)
{ {
wg_parser_stream_get_event(wg_stream, &event); if (!wg_parser_stream_get_event(wg_stream, &event))
TRACE("Got event of type %#x for %s stream %p.\n", event.type,
get_major_type_string(stream->format.major_type), stream);
switch (event.type)
{ {
case WG_PARSER_EVENT_BUFFER: stream->eos = true;
TRACE("End of stream.\n");
return NS_E_NO_MORE_SAMPLES;
}
TRACE("Got buffer for '%s' stream %p.\n", get_major_type_string(stream->format.major_type), stream);
if (callback_advanced && stream->read_compressed && stream->allocate_stream)
{
if (FAILED(hr = IWMReaderCallbackAdvanced_AllocateForStream(callback_advanced,
stream->index + 1, event.u.buffer.size, &sample, NULL)))
{ {
DWORD size, capacity; ERR("Failed to allocate stream sample of %u bytes, hr %#lx.\n", event.u.buffer.size, hr);
INSSBuffer *sample;
HRESULT hr;
BYTE *data;
if (callback_advanced && stream->read_compressed && stream->allocate_stream)
{
if (FAILED(hr = IWMReaderCallbackAdvanced_AllocateForStream(callback_advanced,
stream->index + 1, event.u.buffer.size, &sample, NULL)))
{
ERR("Failed to allocate stream sample of %u bytes, hr %#lx.\n", event.u.buffer.size, hr);
wg_parser_stream_release_buffer(wg_stream);
return hr;
}
}
else if (callback_advanced && !stream->read_compressed && stream->allocate_output)
{
if (FAILED(hr = IWMReaderCallbackAdvanced_AllocateForOutput(callback_advanced,
stream->index, event.u.buffer.size, &sample, NULL)))
{
ERR("Failed to allocate output sample of %u bytes, hr %#lx.\n", event.u.buffer.size, hr);
wg_parser_stream_release_buffer(wg_stream);
return hr;
}
}
else
{
struct buffer *object;
/* FIXME: Should these be pooled? */
if (!(object = calloc(1, offsetof(struct buffer, data[event.u.buffer.size]))))
{
wg_parser_stream_release_buffer(wg_stream);
return E_OUTOFMEMORY;
}
object->INSSBuffer_iface.lpVtbl = &buffer_vtbl;
object->refcount = 1;
object->capacity = event.u.buffer.size;
TRACE("Created buffer %p.\n", object);
sample = &object->INSSBuffer_iface;
}
if (FAILED(hr = INSSBuffer_GetBufferAndLength(sample, &data, &size)))
ERR("Failed to get data pointer, hr %#lx.\n", hr);
if (FAILED(hr = INSSBuffer_GetMaxLength(sample, &capacity)))
ERR("Failed to get capacity, hr %#lx.\n", hr);
if (event.u.buffer.size > capacity)
ERR("Returned capacity %lu is less than requested capacity %u.\n",
capacity, event.u.buffer.size);
if (!wg_parser_stream_copy_buffer(wg_stream, data, 0, event.u.buffer.size))
{
/* The GStreamer pin has been flushed. */
INSSBuffer_Release(sample);
break;
}
if (FAILED(hr = INSSBuffer_SetLength(sample, event.u.buffer.size)))
ERR("Failed to set size %u, hr %#lx.\n", event.u.buffer.size, hr);
wg_parser_stream_release_buffer(wg_stream); wg_parser_stream_release_buffer(wg_stream);
return hr;
}
}
else if (callback_advanced && !stream->read_compressed && stream->allocate_output)
{
if (FAILED(hr = IWMReaderCallbackAdvanced_AllocateForOutput(callback_advanced,
stream->index, event.u.buffer.size, &sample, NULL)))
{
ERR("Failed to allocate output sample of %u bytes, hr %#lx.\n", event.u.buffer.size, hr);
wg_parser_stream_release_buffer(wg_stream);
return hr;
}
}
else
{
struct buffer *object;
if (!event.u.buffer.has_pts) /* FIXME: Should these be pooled? */
FIXME("Missing PTS.\n"); if (!(object = calloc(1, offsetof(struct buffer, data[event.u.buffer.size]))))
if (!event.u.buffer.has_duration) {
FIXME("Missing duration.\n"); wg_parser_stream_release_buffer(wg_stream);
return E_OUTOFMEMORY;
*pts = event.u.buffer.pts;
*duration = event.u.buffer.duration;
*flags = 0;
if (event.u.buffer.discontinuity)
*flags |= WM_SF_DISCONTINUITY;
if (!event.u.buffer.delta)
*flags |= WM_SF_CLEANPOINT;
*ret_sample = sample;
return S_OK;
} }
case WG_PARSER_EVENT_EOS: object->INSSBuffer_iface.lpVtbl = &buffer_vtbl;
stream->eos = true; object->refcount = 1;
TRACE("End of stream.\n"); object->capacity = event.u.buffer.size;
return NS_E_NO_MORE_SAMPLES;
case WG_PARSER_EVENT_NONE: TRACE("Created buffer %p.\n", object);
assert(0); sample = &object->INSSBuffer_iface;
} }
if (FAILED(hr = INSSBuffer_GetBufferAndLength(sample, &data, &size)))
ERR("Failed to get data pointer, hr %#lx.\n", hr);
if (FAILED(hr = INSSBuffer_GetMaxLength(sample, &capacity)))
ERR("Failed to get capacity, hr %#lx.\n", hr);
if (event.u.buffer.size > capacity)
ERR("Returned capacity %lu is less than requested capacity %u.\n", capacity, event.u.buffer.size);
if (!wg_parser_stream_copy_buffer(wg_stream, data, 0, event.u.buffer.size))
{
/* The GStreamer pin has been flushed. */
INSSBuffer_Release(sample);
continue;
}
if (FAILED(hr = INSSBuffer_SetLength(sample, event.u.buffer.size)))
ERR("Failed to set size %u, hr %#lx.\n", event.u.buffer.size, hr);
wg_parser_stream_release_buffer(wg_stream);
if (!event.u.buffer.has_pts)
FIXME("Missing PTS.\n");
if (!event.u.buffer.has_duration)
FIXME("Missing duration.\n");
*pts = event.u.buffer.pts;
*duration = event.u.buffer.duration;
*flags = 0;
if (event.u.buffer.discontinuity)
*flags |= WM_SF_DISCONTINUITY;
if (!event.u.buffer.delta)
*flags |= WM_SF_CLEANPOINT;
*ret_sample = sample;
return S_OK;
} }
} }