winegstreamer: Allocate source media buffers in the PE components.

This necessitates an extra blit for the input data, but this is necessary for
both WoW64 support and an internal rework of the source path in wg_parser to use
GstAppSrc. Since source data is usually compressed and not a bottleneck, we
don't expect this to affect performance.

Signed-off-by: Derek Lesho <dlesho@codeweavers.com>
Signed-off-by: Zebediah Figura <zfigura@codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
Derek Lesho 2021-09-15 18:00:09 -05:00 committed by Alexandre Julliard
parent 67734bfce3
commit 8b7390f80d
4 changed files with 66 additions and 23 deletions

View File

@ -170,9 +170,10 @@ struct unix_funcs
void (CDECL *wg_parser_begin_flush)(struct wg_parser *parser); void (CDECL *wg_parser_begin_flush)(struct wg_parser *parser);
void (CDECL *wg_parser_end_flush)(struct wg_parser *parser); void (CDECL *wg_parser_end_flush)(struct wg_parser *parser);
bool (CDECL *wg_parser_get_read_request)(struct wg_parser *parser, bool (CDECL *wg_parser_get_next_read_offset)(struct wg_parser *parser,
void **data, uint64_t *offset, uint32_t *size); uint64_t *offset, uint32_t *size);
void (CDECL *wg_parser_complete_read_request)(struct wg_parser *parser, bool ret); void (CDECL *wg_parser_push_data)(struct wg_parser *parser,
const void *data, uint32_t size);
void (CDECL *wg_parser_set_unlimited_buffering)(struct wg_parser *parser); void (CDECL *wg_parser_set_unlimited_buffering)(struct wg_parser *parser);

View File

@ -530,6 +530,11 @@ static DWORD CALLBACK read_thread(void *arg)
{ {
struct media_source *source = arg; struct media_source *source = arg;
IMFByteStream *byte_stream = source->byte_stream; IMFByteStream *byte_stream = source->byte_stream;
uint32_t buffer_size = 0;
uint64_t file_size;
void *data = NULL;
IMFByteStream_GetLength(byte_stream, &file_size);
TRACE("Starting read thread for media source %p.\n", source); TRACE("Starting read thread for media source %p.\n", source);
@ -539,18 +544,33 @@ static DWORD CALLBACK read_thread(void *arg)
ULONG ret_size; ULONG ret_size;
uint32_t size; uint32_t size;
HRESULT hr; HRESULT hr;
void *data;
if (!unix_funcs->wg_parser_get_read_request(source->wg_parser, &data, &offset, &size)) if (!unix_funcs->wg_parser_get_next_read_offset(source->wg_parser, &offset, &size))
continue; continue;
if (offset >= file_size)
size = 0;
else if (offset + size >= file_size)
size = file_size - offset;
if (size > buffer_size)
{
buffer_size = size;
data = realloc(data, size);
}
ret_size = 0;
if (SUCCEEDED(hr = IMFByteStream_SetCurrentPosition(byte_stream, offset))) if (SUCCEEDED(hr = IMFByteStream_SetCurrentPosition(byte_stream, offset)))
hr = IMFByteStream_Read(byte_stream, data, size, &ret_size); hr = IMFByteStream_Read(byte_stream, data, size, &ret_size);
if (SUCCEEDED(hr) && ret_size != size) if (FAILED(hr))
ERR("Failed to read %u bytes at offset %I64u, hr %#x.\n", size, offset, hr);
else if (ret_size != size)
ERR("Unexpected short read: requested %u bytes, got %u.\n", size, ret_size); ERR("Unexpected short read: requested %u bytes, got %u.\n", size, ret_size);
unix_funcs->wg_parser_complete_read_request(source->wg_parser, SUCCEEDED(hr)); unix_funcs->wg_parser_push_data(source->wg_parser, SUCCEEDED(hr) ? data : NULL, ret_size);
} }
free(data);
TRACE("Media source is shutting down; exiting.\n"); TRACE("Media source is shutting down; exiting.\n");
return 0; return 0;
} }

View File

@ -785,6 +785,11 @@ static DWORD CALLBACK stream_thread(void *arg)
static DWORD CALLBACK read_thread(void *arg) static DWORD CALLBACK read_thread(void *arg)
{ {
struct parser *filter = arg; struct parser *filter = arg;
LONGLONG file_size, unused;
uint32_t buffer_size = 0;
void *data = NULL;
IAsyncReader_Length(filter->reader, &file_size, &unused);
TRACE("Starting read thread for filter %p.\n", filter); TRACE("Starting read thread for filter %p.\n", filter);
@ -793,14 +798,29 @@ static DWORD CALLBACK read_thread(void *arg)
uint64_t offset; uint64_t offset;
uint32_t size; uint32_t size;
HRESULT hr; HRESULT hr;
void *data;
if (!unix_funcs->wg_parser_get_read_request(filter->wg_parser, &data, &offset, &size)) if (!unix_funcs->wg_parser_get_next_read_offset(filter->wg_parser, &offset, &size))
continue; continue;
if (offset >= file_size)
size = 0;
else if (offset + size >= file_size)
size = file_size - offset;
if (size > buffer_size)
{
buffer_size = size;
data = realloc(data, size);
}
hr = IAsyncReader_SyncRead(filter->reader, offset, size, data); hr = IAsyncReader_SyncRead(filter->reader, offset, size, data);
unix_funcs->wg_parser_complete_read_request(filter->wg_parser, SUCCEEDED(hr)); if (FAILED(hr))
ERR("Failed to read %u bytes at offset %I64u, hr %#x.\n", size, offset, hr);
unix_funcs->wg_parser_push_data(filter->wg_parser, SUCCEEDED(hr) ? data : NULL, size);
} }
free(data);
TRACE("Streaming stopped; exiting.\n"); TRACE("Streaming stopped; exiting.\n");
return 0; return 0;
} }

View File

@ -517,8 +517,8 @@ static void CDECL wg_parser_end_flush(struct wg_parser *parser)
pthread_mutex_unlock(&parser->mutex); pthread_mutex_unlock(&parser->mutex);
} }
static bool CDECL wg_parser_get_read_request(struct wg_parser *parser, static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser,
void **data, uint64_t *offset, uint32_t *size) uint64_t *offset, uint32_t *size)
{ {
pthread_mutex_lock(&parser->mutex); pthread_mutex_lock(&parser->mutex);
@ -531,7 +531,6 @@ static bool CDECL wg_parser_get_read_request(struct wg_parser *parser,
return false; return false;
} }
*data = parser->read_request.data;
*offset = parser->read_request.offset; *offset = parser->read_request.offset;
*size = parser->read_request.size; *size = parser->read_request.size;
@ -539,11 +538,15 @@ static bool CDECL wg_parser_get_read_request(struct wg_parser *parser,
return true; return true;
} }
static void CDECL wg_parser_complete_read_request(struct wg_parser *parser, bool ret) static void CDECL wg_parser_push_data(struct wg_parser *parser,
const void *data, uint32_t size)
{ {
pthread_mutex_lock(&parser->mutex); pthread_mutex_lock(&parser->mutex);
parser->read_request.size = size;
parser->read_request.done = true; parser->read_request.done = true;
parser->read_request.ret = ret; parser->read_request.ret = !!data;
if (data)
memcpy(parser->read_request.data, data, size);
parser->read_request.data = NULL; parser->read_request.data = NULL;
pthread_mutex_unlock(&parser->mutex); pthread_mutex_unlock(&parser->mutex);
pthread_cond_signal(&parser->read_done_cond); pthread_cond_signal(&parser->read_done_cond);
@ -1214,10 +1217,6 @@ static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent,
if (offset == GST_BUFFER_OFFSET_NONE) if (offset == GST_BUFFER_OFFSET_NONE)
offset = parser->next_pull_offset; offset = parser->next_pull_offset;
parser->next_pull_offset = offset + size; parser->next_pull_offset = offset + size;
if (offset >= parser->file_size)
return GST_FLOW_EOS;
if (offset + size >= parser->file_size)
size = parser->file_size - offset;
if (!*buffer) if (!*buffer)
*buffer = new_buffer = gst_buffer_new_and_alloc(size); *buffer = new_buffer = gst_buffer_new_and_alloc(size);
@ -1241,6 +1240,7 @@ static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent,
pthread_cond_wait(&parser->read_done_cond, &parser->mutex); pthread_cond_wait(&parser->read_done_cond, &parser->mutex);
ret = parser->read_request.ret; ret = parser->read_request.ret;
gst_buffer_set_size(*buffer, parser->read_request.size);
pthread_mutex_unlock(&parser->mutex); pthread_mutex_unlock(&parser->mutex);
@ -1248,10 +1248,12 @@ static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent,
GST_LOG("Request returned %d.", ret); GST_LOG("Request returned %d.", ret);
if (!ret && new_buffer) if ((!ret || !size) && new_buffer)
gst_buffer_unref(new_buffer); gst_buffer_unref(new_buffer);
return ret ? GST_FLOW_OK : GST_FLOW_ERROR; if (ret)
return size ? GST_FLOW_OK : GST_FLOW_EOS;
return GST_FLOW_ERROR;
} }
static gboolean src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query) static gboolean src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query)
@ -1918,8 +1920,8 @@ static const struct unix_funcs funcs =
wg_parser_begin_flush, wg_parser_begin_flush,
wg_parser_end_flush, wg_parser_end_flush,
wg_parser_get_read_request, wg_parser_get_next_read_offset,
wg_parser_complete_read_request, wg_parser_push_data,
wg_parser_set_unlimited_buffering, wg_parser_set_unlimited_buffering,