winegstreamer: Always run gstreamer callbacks on a Wine thread.

Signed-off-by: Andrew Eikum <aeikum@codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
Andrew Eikum 2016-01-12 13:35:04 -06:00 committed by Alexandre Julliard
parent d24239efdf
commit 56b6523842
8 changed files with 806 additions and 435 deletions

View File

@ -4,7 +4,7 @@ EXTRAINCL = $(GSTREAMER_CFLAGS)
EXTRALIBS = $(GSTREAMER_LIBS) $(PTHREAD_LIBS)
C_SRCS = \
glibthread.c \
gst_cbs.c \
gstdemux.c \
gsttffilter.c \
main.c

View File

@ -1,390 +0,0 @@
/* GLIB - Library of useful routines for C programming
* Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald
*
* Wine GStreamer integration
* Copyright 2010 Aric Stewart, CodeWeavers
*
* gthread.c: solaris thread system implementation
* Copyright 1998-2001 Sebastian Wilhelmi; University of Karlsruhe
* Copyright 2001 Hans Breuer
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
*/
/*
* Modified by the GLib Team and others 1997-2000. See the AUTHORS
* file for a list of people on the GLib Team. See the ChangeLog
* files for a list of changes. These files are distributed with
* GLib at ftp://ftp.gtk.org/pub/gtk/.
*/
#include "config.h"
#ifdef HAVE_PTHREAD_H
#include <pthread.h>
#endif
#include <glib.h>
#include <errno.h>
#include <stdarg.h>
#include <stdlib.h>
#include <stdio.h>
#include "windef.h"
#include "winbase.h"
#include "winnls.h"
#include "wine/debug.h"
WINE_DEFAULT_DEBUG_CHANNEL(gstreamer);
static gchar *
g_win32_error_message (gint error)
{
gchar *retval;
WCHAR *msg = NULL;
int nchars;
FormatMessageW (FORMAT_MESSAGE_ALLOCATE_BUFFER
|FORMAT_MESSAGE_IGNORE_INSERTS
|FORMAT_MESSAGE_FROM_SYSTEM,
NULL, error, 0,
(LPWSTR) &msg, 0, NULL);
if (msg != NULL)
{
nchars = WideCharToMultiByte(CP_UTF8, 0, msg, -1, NULL, 0, NULL, NULL);
if (nchars > 2 && msg[nchars-1] == '\n' && msg[nchars-2] == '\r')
msg[nchars-2] = '\0';
retval = g_utf16_to_utf8 (msg, -1, NULL, NULL, NULL);
LocalFree (msg);
}
else
retval = g_strdup ("");
return retval;
}
static gint g_thread_priority_map [G_THREAD_PRIORITY_URGENT + 1] = {
THREAD_PRIORITY_BELOW_NORMAL,
THREAD_PRIORITY_NORMAL,
THREAD_PRIORITY_ABOVE_NORMAL,
THREAD_PRIORITY_HIGHEST
};
static DWORD g_thread_self_tls;
/* A "forward" declaration of this structure */
static GThreadFunctions g_thread_functions_for_glib_use_default;
typedef struct _GThreadData GThreadData;
struct _GThreadData
{
GThreadFunc func;
gpointer data;
HANDLE thread;
gboolean joinable;
};
static GMutex *
g_mutex_new_posix_impl (void)
{
GMutex *result = (GMutex *) g_new (pthread_mutex_t, 1);
pthread_mutex_init ((pthread_mutex_t *) result, NULL);
return result;
}
static void
g_mutex_free_posix_impl (GMutex * mutex)
{
pthread_mutex_destroy ((pthread_mutex_t *) mutex);
g_free (mutex);
}
/* NOTE: the functions g_mutex_lock and g_mutex_unlock may not use
functions from gmem.c and gmessages.c; */
/* pthread_mutex_lock, pthread_mutex_unlock can be taken directly, as
signature and semantic are right, but without error check then!!!!,
we might want to change this therefore. */
static gboolean
g_mutex_trylock_posix_impl (GMutex * mutex)
{
int result;
result = pthread_mutex_trylock ((pthread_mutex_t *) mutex);
if (result == EBUSY)
return FALSE;
if (result) ERR("pthread_mutex_trylock %x\n",result);
return TRUE;
}
static GCond *
g_cond_new_posix_impl (void)
{
GCond *result = (GCond *) g_new (pthread_cond_t, 1);
pthread_cond_init ((pthread_cond_t *) result, NULL);
return result;
}
/* pthread_cond_signal, pthread_cond_broadcast and pthread_cond_wait
can be taken directly, as signature and semantic are right, but
without error check then!!!!, we might want to change this
therefore. */
#define G_NSEC_PER_SEC 1000000000
static gboolean
g_cond_timed_wait_posix_impl (GCond * cond,
GMutex * entered_mutex,
GTimeVal * abs_time)
{
int result;
struct timespec end_time;
gboolean timed_out;
g_return_val_if_fail (cond != NULL, FALSE);
g_return_val_if_fail (entered_mutex != NULL, FALSE);
if (!abs_time)
{
result = pthread_cond_wait ((pthread_cond_t *)cond,
(pthread_mutex_t *) entered_mutex);
timed_out = FALSE;
}
else
{
end_time.tv_sec = abs_time->tv_sec;
end_time.tv_nsec = abs_time->tv_usec * (G_NSEC_PER_SEC / G_USEC_PER_SEC);
g_return_val_if_fail (end_time.tv_nsec < G_NSEC_PER_SEC, TRUE);
result = pthread_cond_timedwait ((pthread_cond_t *) cond,
(pthread_mutex_t *) entered_mutex,
&end_time);
timed_out = (result == ETIMEDOUT);
}
if (!timed_out)
if (result) ERR("pthread_cond_timedwait %x\n",result);
return !timed_out;
}
static void
g_cond_free_posix_impl (GCond * cond)
{
pthread_cond_destroy ((pthread_cond_t *) cond);
g_free (cond);
}
static GPrivate *
g_private_new_posix_impl (GDestroyNotify destructor)
{
GPrivate *result = (GPrivate *) g_new (pthread_key_t, 1);
pthread_key_create ((pthread_key_t *) result, destructor);
return result;
}
/* NOTE: the functions g_private_get and g_private_set may not use
functions from gmem.c and gmessages.c */
static void
g_private_set_posix_impl (GPrivate * private_key, gpointer value)
{
if (!private_key)
return;
pthread_setspecific (*(pthread_key_t *) private_key, value);
}
static gpointer
g_private_get_posix_impl (GPrivate * private_key)
{
if (!private_key)
return NULL;
return pthread_getspecific (*(pthread_key_t *) private_key);
}
static void
g_thread_set_priority_win32_impl (gpointer thread, GThreadPriority priority)
{
GThreadData *target = *(GThreadData **)thread;
g_return_if_fail ((int)priority >= G_THREAD_PRIORITY_LOW);
g_return_if_fail ((int)priority <= G_THREAD_PRIORITY_URGENT);
SetThreadPriority (target->thread, g_thread_priority_map [priority]);
}
static void
g_thread_self_win32_impl (gpointer thread)
{
GThreadData *self = TlsGetValue (g_thread_self_tls);
if (!self)
{
/* This should only happen for the main thread! */
HANDLE handle = GetCurrentThread ();
HANDLE process = GetCurrentProcess ();
self = g_new (GThreadData, 1);
DuplicateHandle (process, handle, process, &self->thread, 0, FALSE,
DUPLICATE_SAME_ACCESS);
TlsSetValue (g_thread_self_tls, self);
self->func = NULL;
self->data = NULL;
self->joinable = FALSE;
}
*(GThreadData **)thread = self;
}
static void
g_thread_exit_win32_impl (void)
{
GThreadData *self = TlsGetValue (g_thread_self_tls);
if (self)
{
if (!self->joinable)
{
CloseHandle (self->thread);
g_free (self);
}
TlsSetValue (g_thread_self_tls, NULL);
}
ExitThread (0);
}
static guint __stdcall
g_thread_proxy (gpointer data)
{
GThreadData *self = (GThreadData*) data;
TlsSetValue (g_thread_self_tls, self);
self->func (self->data);
g_thread_exit_win32_impl ();
g_assert_not_reached ();
return 0;
}
static void
g_thread_create_win32_impl (GThreadFunc func,
gpointer data,
gulong stack_size,
gboolean joinable,
gboolean bound,
GThreadPriority priority,
gpointer thread,
GError **error)
{
guint ignore;
GThreadData *retval;
g_return_if_fail (func);
g_return_if_fail ((int)priority >= G_THREAD_PRIORITY_LOW);
g_return_if_fail ((int)priority <= G_THREAD_PRIORITY_URGENT);
retval = g_new(GThreadData, 1);
retval->func = func;
retval->data = data;
retval->joinable = joinable;
retval->thread = (HANDLE) CreateThread (NULL, stack_size, g_thread_proxy,
retval, 0, &ignore);
if (retval->thread == NULL)
{
gchar *win_error = g_win32_error_message (GetLastError ());
g_set_error (error, G_THREAD_ERROR, G_THREAD_ERROR_AGAIN,
"Error creating thread: %s", win_error);
g_free (retval);
g_free (win_error);
return;
}
*(GThreadData **)thread = retval;
g_thread_set_priority_win32_impl (thread, priority);
}
static void
g_thread_yield_win32_impl (void)
{
Sleep(0);
}
static void
g_thread_join_win32_impl (gpointer thread)
{
GThreadData *target = *(GThreadData **)thread;
g_return_if_fail (target->joinable);
WaitForSingleObject (target->thread, INFINITE);
CloseHandle (target->thread);
g_free (target);
}
static GThreadFunctions g_thread_functions_for_glib_use_default =
{
/* Posix functions here for speed */
g_mutex_new_posix_impl,
(void (*)(GMutex *)) pthread_mutex_lock,
g_mutex_trylock_posix_impl,
(void (*)(GMutex *)) pthread_mutex_unlock,
g_mutex_free_posix_impl,
g_cond_new_posix_impl,
(void (*)(GCond *)) pthread_cond_signal,
(void (*)(GCond *)) pthread_cond_broadcast,
(void (*)(GCond *, GMutex *)) pthread_cond_wait,
g_cond_timed_wait_posix_impl,
g_cond_free_posix_impl,
g_private_new_posix_impl,
g_private_get_posix_impl,
g_private_set_posix_impl,
/* win32 function required here */
g_thread_create_win32_impl, /* thread */
g_thread_yield_win32_impl,
g_thread_join_win32_impl,
g_thread_exit_win32_impl,
g_thread_set_priority_win32_impl,
g_thread_self_win32_impl,
NULL /* no equal function necessary */
};
void g_thread_impl_init (void)
{
static gboolean beenhere = FALSE;
if (beenhere)
return;
beenhere = TRUE;
g_thread_self_tls = TlsAlloc ();
g_thread_init(&g_thread_functions_for_glib_use_default);
}

View File

@ -0,0 +1,325 @@
/*
* Copyright 2015 Andrew Eikum for CodeWeavers
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
*/
#include "config.h"
#include <gst/app/gstappsink.h>
#include <gst/app/gstappsrc.h>
#include <gst/app/gstappbuffer.h>
#include <gst/gstutils.h>
#include "wine/list.h"
#include "gst_cbs.h"
/* gstreamer calls our callbacks from threads that Wine did not create. Some
* callbacks execute code which requires Wine to have created the thread
* (critical sections, debug logging, dshow client code). Since gstreamer can't
* provide an API to override its thread creation, we have to intercept all
* callbacks in code which avoids the Wine thread requirement, and then
* dispatch those callbacks on a thread that is known to be created by Wine.
*
* This file must not contain any code that depends on the Wine TEB!
*/
static void call_cb(struct cb_data *cbdata)
{
pthread_mutex_init(&cbdata->lock, NULL);
pthread_cond_init(&cbdata->cond, NULL);
cbdata->finished = 0;
if(is_wine_thread()){
/* The thread which triggered gstreamer to call this callback may
* already hold a critical section. If so, executing the callback on a
* worker thread can cause a deadlock. If we are already on a Wine
* thread, then there is no need to run this callback on a worker
* thread anyway, which avoids the deadlock issue. */
perform_cb(NULL, cbdata);
pthread_cond_destroy(&cbdata->cond);
pthread_mutex_destroy(&cbdata->lock);
return;
}
pthread_mutex_lock(&cb_list_lock);
list_add_tail(&cb_list, &cbdata->entry);
pthread_cond_broadcast(&cb_list_cond);
pthread_mutex_lock(&cbdata->lock);
pthread_mutex_unlock(&cb_list_lock);
while(!cbdata->finished)
pthread_cond_wait(&cbdata->cond, &cbdata->lock);
pthread_mutex_unlock(&cbdata->lock);
pthread_cond_destroy(&cbdata->cond);
pthread_mutex_destroy(&cbdata->lock);
}
GstBusSyncReply watch_bus_wrapper(GstBus *bus, GstMessage *msg, gpointer user)
{
struct cb_data cbdata = { WATCH_BUS };
cbdata.u.watch_bus_data.bus = bus;
cbdata.u.watch_bus_data.msg = msg;
cbdata.u.watch_bus_data.user = user;
call_cb(&cbdata);
return cbdata.u.watch_bus_data.ret;
}
void existing_new_pad_wrapper(GstElement *bin, GstPad *pad, gboolean last,
gpointer user)
{
struct cb_data cbdata = { EXISTING_NEW_PAD };
cbdata.u.existing_new_pad_data.bin = bin;
cbdata.u.existing_new_pad_data.pad = pad;
cbdata.u.existing_new_pad_data.last = last;
cbdata.u.existing_new_pad_data.user = user;
call_cb(&cbdata);
}
gboolean check_get_range_wrapper(GstPad *pad)
{
struct cb_data cbdata = { CHECK_GET_RANGE };
cbdata.u.check_get_range_data.pad = pad;
call_cb(&cbdata);
return cbdata.u.check_get_range_data.ret;
}
gboolean query_function_wrapper(GstPad *pad, GstQuery *query)
{
struct cb_data cbdata = { QUERY_FUNCTION };
cbdata.u.query_function_data.pad = pad;
cbdata.u.query_function_data.query = query;
call_cb(&cbdata);
return cbdata.u.query_function_data.ret;
}
gboolean activate_push_wrapper(GstPad *pad, gboolean activate)
{
struct cb_data cbdata = { ACTIVATE_PUSH };
cbdata.u.activate_push_data.pad = pad;
cbdata.u.activate_push_data.activate = activate;
call_cb(&cbdata);
return cbdata.u.activate_push_data.ret;
}
void no_more_pads_wrapper(GstElement *decodebin, gpointer user)
{
struct cb_data cbdata = { NO_MORE_PADS };
cbdata.u.no_more_pads_data.decodebin = decodebin;
cbdata.u.no_more_pads_data.user = user;
call_cb(&cbdata);
}
GstFlowReturn request_buffer_src_wrapper(GstPad *pad, guint64 ofs, guint len,
GstBuffer **buf)
{
struct cb_data cbdata = { REQUEST_BUFFER_SRC };
cbdata.u.request_buffer_src_data.pad = pad;
cbdata.u.request_buffer_src_data.ofs = ofs;
cbdata.u.request_buffer_src_data.len = len;
cbdata.u.request_buffer_src_data.buf = buf;
call_cb(&cbdata);
return cbdata.u.request_buffer_src_data.ret;
}
gboolean event_src_wrapper(GstPad *pad, GstEvent *event)
{
struct cb_data cbdata = { EVENT_SRC };
cbdata.u.event_src_data.pad = pad;
cbdata.u.event_src_data.event = event;
call_cb(&cbdata);
return cbdata.u.event_src_data.ret;
}
gboolean event_sink_wrapper(GstPad *pad, GstEvent *event)
{
struct cb_data cbdata = { EVENT_SINK };
cbdata.u.event_sink_data.pad = pad;
cbdata.u.event_sink_data.event = event;
call_cb(&cbdata);
return cbdata.u.event_sink_data.ret;
}
GstFlowReturn request_buffer_sink_wrapper(GstPad *pad, guint64 ofs, guint size,
GstCaps *caps, GstBuffer **buf)
{
struct cb_data cbdata = { REQUEST_BUFFER_SINK };
cbdata.u.request_buffer_sink_data.pad = pad;
cbdata.u.request_buffer_sink_data.ofs = ofs;
cbdata.u.request_buffer_sink_data.size = size;
cbdata.u.request_buffer_sink_data.caps = caps;
cbdata.u.request_buffer_sink_data.buf = buf;
call_cb(&cbdata);
return cbdata.u.request_buffer_sink_data.ret;
}
gboolean accept_caps_sink_wrapper(GstPad *pad, GstCaps *caps)
{
struct cb_data cbdata = { ACCEPT_CAPS_SINK };
cbdata.u.accept_caps_sink_data.pad = pad;
cbdata.u.accept_caps_sink_data.caps = caps;
call_cb(&cbdata);
return cbdata.u.accept_caps_sink_data.ret;
}
gboolean setcaps_sink_wrapper(GstPad *pad, GstCaps *caps)
{
struct cb_data cbdata = { SETCAPS_SINK };
cbdata.u.setcaps_sink_data.pad = pad;
cbdata.u.setcaps_sink_data.caps = caps;
call_cb(&cbdata);
return cbdata.u.setcaps_sink_data.ret;
}
GstFlowReturn got_data_sink_wrapper(GstPad *pad, GstBuffer *buf)
{
struct cb_data cbdata = { GOT_DATA_SINK };
cbdata.u.got_data_sink_data.pad = pad;
cbdata.u.got_data_sink_data.buf = buf;
call_cb(&cbdata);
return cbdata.u.got_data_sink_data.ret;
}
GstFlowReturn got_data_wrapper(GstPad *pad, GstBuffer *buf)
{
struct cb_data cbdata = { GOT_DATA };
cbdata.u.got_data_data.pad = pad;
cbdata.u.got_data_data.buf = buf;
call_cb(&cbdata);
return cbdata.u.got_data_data.ret;
}
GstFlowReturn request_buffer_wrapper(GstPad *pad, guint64 ofs, guint size,
GstCaps *caps, GstBuffer **buf)
{
struct cb_data cbdata = { REQUEST_BUFFER };
cbdata.u.request_buffer_data.pad = pad;
cbdata.u.request_buffer_data.ofs = ofs;
cbdata.u.request_buffer_data.size = size;
cbdata.u.request_buffer_data.caps = caps;
cbdata.u.request_buffer_data.buf = buf;
call_cb(&cbdata);
return cbdata.u.request_buffer_data.ret;
}
void removed_decoded_pad_wrapper(GstElement *bin, GstPad *pad, gpointer user)
{
struct cb_data cbdata = { REMOVED_DECODED_PAD };
cbdata.u.removed_decoded_pad_data.bin = bin;
cbdata.u.removed_decoded_pad_data.pad = pad;
cbdata.u.removed_decoded_pad_data.user = user;
call_cb(&cbdata);
}
GstAutoplugSelectResult autoplug_blacklist_wrapper(GstElement *bin, GstPad *pad,
GstCaps *caps, GstElementFactory *fact, gpointer user)
{
struct cb_data cbdata = { AUTOPLUG_BLACKLIST };
cbdata.u.autoplug_blacklist_data.bin = bin;
cbdata.u.autoplug_blacklist_data.pad = pad;
cbdata.u.autoplug_blacklist_data.caps = caps;
cbdata.u.autoplug_blacklist_data.fact = fact;
cbdata.u.autoplug_blacklist_data.user = user;
call_cb(&cbdata);
return cbdata.u.autoplug_blacklist_data.ret;
}
void unknown_type_wrapper(GstElement *bin, GstPad *pad, GstCaps *caps, gpointer user)
{
struct cb_data cbdata = { UNKNOWN_TYPE };
cbdata.u.unknown_type_data.bin = bin;
cbdata.u.unknown_type_data.pad = pad;
cbdata.u.unknown_type_data.caps = caps;
cbdata.u.unknown_type_data.user = user;
call_cb(&cbdata);
}
void release_sample_wrapper(gpointer data)
{
struct cb_data cbdata = { RELEASE_SAMPLE };
cbdata.u.release_sample_data.data = data;
call_cb(&cbdata);
}
void Gstreamer_transform_pad_added_wrapper(GstElement *filter, GstPad *pad, gpointer user)
{
struct cb_data cbdata = { TRANSFORM_PAD_ADDED };
cbdata.u.transform_pad_added_data.filter = filter;
cbdata.u.transform_pad_added_data.pad = pad;
cbdata.u.transform_pad_added_data.user = user;
call_cb(&cbdata);
}

View File

@ -0,0 +1,204 @@
/*
* Copyright 2015 Andrew Eikum for CodeWeavers
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
*/
#ifndef GST_CBS_H
#define GST_CBS_H
#include "wine/list.h"
#include "windef.h"
#include <pthread.h>
typedef enum {
GST_AUTOPLUG_SELECT_TRY,
GST_AUTOPLUG_SELECT_EXPOSE,
GST_AUTOPLUG_SELECT_SKIP
} GstAutoplugSelectResult;
enum CB_TYPE {
WATCH_BUS,
EXISTING_NEW_PAD,
CHECK_GET_RANGE,
QUERY_FUNCTION,
ACTIVATE_PUSH,
NO_MORE_PADS,
REQUEST_BUFFER_SRC,
EVENT_SRC,
EVENT_SINK,
REQUEST_BUFFER_SINK,
ACCEPT_CAPS_SINK,
SETCAPS_SINK,
GOT_DATA_SINK,
GOT_DATA,
REQUEST_BUFFER,
REMOVED_DECODED_PAD,
AUTOPLUG_BLACKLIST,
UNKNOWN_TYPE,
RELEASE_SAMPLE,
TRANSFORM_PAD_ADDED
};
struct cb_data {
enum CB_TYPE type;
union {
struct watch_bus_data {
GstBus *bus;
GstMessage *msg;
gpointer user;
GstBusSyncReply ret;
} watch_bus_data;
struct existing_new_pad_data {
GstElement *bin;
GstPad *pad;
gboolean last;
gpointer user;
} existing_new_pad_data;
struct check_get_range_data {
GstPad *pad;
gboolean ret;
} check_get_range_data;
struct query_function_data {
GstPad *pad;
GstQuery *query;
gboolean ret;
} query_function_data;
struct activate_push_data {
GstPad *pad;
gboolean activate;
gboolean ret;
} activate_push_data;
struct no_more_pads_data {
GstElement *decodebin;
gpointer user;
} no_more_pads_data;
struct request_buffer_src_data {
GstPad *pad;
guint64 ofs;
guint len;
GstBuffer **buf;
GstFlowReturn ret;
} request_buffer_src_data;
struct event_src_data {
GstPad *pad;
GstEvent *event;
gboolean ret;
} event_src_data;
struct event_sink_data {
GstPad *pad;
GstEvent *event;
gboolean ret;
} event_sink_data;
struct request_buffer_sink_data {
GstPad *pad;
guint64 ofs;
guint size;
GstCaps *caps;
GstBuffer **buf;
GstFlowReturn ret;
} request_buffer_sink_data;
struct accept_caps_sink_data {
GstPad *pad;
GstCaps *caps;
gboolean ret;
} accept_caps_sink_data;
struct setcaps_sink_data {
GstPad *pad;
GstCaps *caps;
gboolean ret;
} setcaps_sink_data;
struct got_data_sink_data {
GstPad *pad;
GstBuffer *buf;
GstFlowReturn ret;
} got_data_sink_data;
struct got_data_data {
GstPad *pad;
GstBuffer *buf;
GstFlowReturn ret;
} got_data_data;
struct request_buffer_data {
GstPad *pad;
guint64 ofs;
guint size;
GstCaps *caps;
GstBuffer **buf;
GstFlowReturn ret;
} request_buffer_data;
struct removed_decoded_pad_data {
GstElement *bin;
GstPad *pad;
gpointer user;
} removed_decoded_pad_data;
struct autoplug_blacklist_data {
GstElement *bin;
GstPad *pad;
GstCaps *caps;
GstElementFactory *fact;
gpointer user;
GstAutoplugSelectResult ret;
} autoplug_blacklist_data;
struct unknown_type_data {
GstElement *bin;
GstPad *pad;
GstCaps *caps;
gpointer user;
} unknown_type_data;
struct release_sample_data {
gpointer data;
} release_sample_data;
struct transform_pad_added_data {
GstElement *filter;
GstPad *pad;
gpointer user;
} transform_pad_added_data;
} u;
int finished;
pthread_mutex_t lock;
pthread_cond_t cond;
struct list entry;
};
extern pthread_mutex_t cb_list_lock DECLSPEC_HIDDEN;
extern pthread_cond_t cb_list_cond DECLSPEC_HIDDEN;
extern struct list cb_list DECLSPEC_HIDDEN;
void CALLBACK perform_cb(TP_CALLBACK_INSTANCE *instance, void *user) DECLSPEC_HIDDEN;
BOOL is_wine_thread(void) DECLSPEC_HIDDEN;
void mark_wine_thread(void) DECLSPEC_HIDDEN;
GstBusSyncReply watch_bus_wrapper(GstBus *bus, GstMessage *msg, gpointer user) DECLSPEC_HIDDEN;
void existing_new_pad_wrapper(GstElement *bin, GstPad *pad, gboolean last, gpointer user) DECLSPEC_HIDDEN;
gboolean check_get_range_wrapper(GstPad *pad) DECLSPEC_HIDDEN;
gboolean query_function_wrapper(GstPad *pad, GstQuery *query) DECLSPEC_HIDDEN;
gboolean activate_push_wrapper(GstPad *pad, gboolean activate) DECLSPEC_HIDDEN;
void no_more_pads_wrapper(GstElement *decodebin, gpointer user) DECLSPEC_HIDDEN;
GstFlowReturn request_buffer_src_wrapper(GstPad *pad, guint64 ofs, guint len, GstBuffer **buf) DECLSPEC_HIDDEN;
gboolean event_src_wrapper(GstPad *pad, GstEvent *event) DECLSPEC_HIDDEN;
gboolean event_sink_wrapper(GstPad *pad, GstEvent *event) DECLSPEC_HIDDEN;
GstFlowReturn request_buffer_sink_wrapper(GstPad *pad, guint64 ofs, guint size, GstCaps *caps, GstBuffer **buf) DECLSPEC_HIDDEN;
gboolean accept_caps_sink_wrapper(GstPad *pad, GstCaps *caps) DECLSPEC_HIDDEN;
gboolean setcaps_sink_wrapper(GstPad *pad, GstCaps *caps) DECLSPEC_HIDDEN;
GstFlowReturn got_data_sink_wrapper(GstPad *pad, GstBuffer *buf) DECLSPEC_HIDDEN;
GstFlowReturn got_data_wrapper(GstPad *pad, GstBuffer *buf) DECLSPEC_HIDDEN;
GstFlowReturn request_buffer_wrapper(GstPad *pad, guint64 ofs, guint size, GstCaps *caps, GstBuffer **buf) DECLSPEC_HIDDEN;
void removed_decoded_pad_wrapper(GstElement *bin, GstPad *pad, gpointer user) DECLSPEC_HIDDEN;
GstAutoplugSelectResult autoplug_blacklist_wrapper(GstElement *bin, GstPad *pad, GstCaps *caps, GstElementFactory *fact, gpointer user) DECLSPEC_HIDDEN;
void unknown_type_wrapper(GstElement *bin, GstPad *pad, GstCaps *caps, gpointer user) DECLSPEC_HIDDEN;
void release_sample_wrapper(gpointer data) DECLSPEC_HIDDEN;
void Gstreamer_transform_pad_added_wrapper(GstElement *filter, GstPad *pad, gpointer user) DECLSPEC_HIDDEN;
#endif

View File

@ -42,6 +42,12 @@ IUnknown * CALLBACK Gstreamer_Mp3_create(IUnknown *pUnkOuter, HRESULT *phr);
IUnknown * CALLBACK Gstreamer_YUV_create(IUnknown *pUnkOuter, HRESULT *phr);
IUnknown * CALLBACK Gstreamer_Splitter_create(IUnknown *pUnkOuter, HRESULT *phr);
void g_thread_impl_init(void);
DWORD Gstreamer_init(void);
GstFlowReturn got_data(GstPad *pad, GstBuffer *buf) DECLSPEC_HIDDEN;
GstFlowReturn request_buffer(GstPad *pad, guint64 ofs, guint size, GstCaps *caps, GstBuffer **buf) DECLSPEC_HIDDEN;
void Gstreamer_transform_pad_added(GstElement *filter, GstPad *pad, gpointer user) DECLSPEC_HIDDEN;
void start_dispatch_thread(void) DECLSPEC_HIDDEN;
#endif /* __GST_PRIVATE_INCLUDED__ */

View File

@ -27,6 +27,7 @@
#include "gst_private.h"
#include "gst_guids.h"
#include "gst_cbs.h"
#include "vfwmsgs.h"
#include "amvideo.h"
@ -44,6 +45,8 @@
WINE_DEFAULT_DEBUG_CHANNEL(gstreamer);
static pthread_key_t wine_gst_key;
typedef struct GSTOutPin GSTOutPin;
typedef struct GSTInPin {
BasePin pin;
@ -97,6 +100,17 @@ static HRESULT WINAPI GST_ChangeCurrent(IMediaSeeking *iface);
static HRESULT WINAPI GST_ChangeStop(IMediaSeeking *iface);
static HRESULT WINAPI GST_ChangeRate(IMediaSeeking *iface);
void mark_wine_thread(void)
{
/* set it to non-NULL to indicate that this is a Wine thread */
pthread_setspecific(wine_gst_key, &wine_gst_key);
}
BOOL is_wine_thread(void)
{
return pthread_getspecific(wine_gst_key) != NULL;
}
static gboolean amt_from_gst_caps_audio(GstCaps *caps, AM_MEDIA_TYPE *amt) {
WAVEFORMATEXTENSIBLE *wfe;
WAVEFORMATEX *wfx;
@ -466,7 +480,7 @@ static DWORD CALLBACK push_data(LPVOID iface) {
}
IMediaSample_GetPointer(buf, &data);
gstbuf = gst_app_buffer_new(data, IMediaSample_GetActualDataLength(buf), release_sample, buf);
gstbuf = gst_app_buffer_new(data, IMediaSample_GetActualDataLength(buf), release_sample_wrapper, buf);
if (!gstbuf) {
IMediaSample_Release(buf);
break;
@ -621,7 +635,7 @@ static GstFlowReturn request_buffer_sink(GstPad *pad, guint64 ofs, guint size, G
}
IMediaSample_SetActualDataLength(sample, size);
IMediaSample_GetPointer(sample, &ptr);
*buf = gst_app_buffer_new(ptr, size, release_sample, sample);
*buf = gst_app_buffer_new(ptr, size, release_sample_wrapper, sample);
if (!*buf) {
IMediaSample_Release(sample);
ERR("Out of memory\n");
@ -682,7 +696,8 @@ static DWORD CALLBACK push_data_init(LPVOID iface) {
return 0;
}
static void removed_decoded_pad(GstElement *bin, GstPad *pad, GSTImpl *This) {
static void removed_decoded_pad(GstElement *bin, GstPad *pad, gpointer user) {
GSTImpl *This = (GSTImpl*)user;
int x;
GSTOutPin *pin;
@ -728,11 +743,11 @@ static void init_new_decoded_pad(GstElement *bin, GstPad *pad, gboolean last, GS
typename = gst_structure_get_name(arg);
mypad = gst_pad_new(NULL, GST_PAD_SINK);
gst_pad_set_chain_function(mypad, got_data_sink);
gst_pad_set_event_function(mypad, event_sink);
gst_pad_set_bufferalloc_function(mypad, request_buffer_sink);
gst_pad_set_acceptcaps_function(mypad, accept_caps_sink);
gst_pad_set_setcaps_function(mypad, setcaps_sink);
gst_pad_set_chain_function(mypad, got_data_sink_wrapper);
gst_pad_set_event_function(mypad, event_sink_wrapper);
gst_pad_set_bufferalloc_function(mypad, request_buffer_sink_wrapper);
gst_pad_set_acceptcaps_function(mypad, accept_caps_sink_wrapper);
gst_pad_set_setcaps_function(mypad, setcaps_sink_wrapper);
if (!strcmp(typename, "audio/x-raw-int") ||
!strcmp(typename, "audio/x-raw-float")) {
@ -766,7 +781,8 @@ static void init_new_decoded_pad(GstElement *bin, GstPad *pad, gboolean last, GS
}
}
static void existing_new_pad(GstElement *bin, GstPad *pad, gboolean last, GSTImpl *This) {
static void existing_new_pad(GstElement *bin, GstPad *pad, gboolean last, gpointer user) {
GSTImpl *This = (GSTImpl*)user;
int x;
if (gst_pad_is_linked(pad))
@ -858,18 +874,13 @@ static gboolean activate_push(GstPad *pad, gboolean activate) {
return TRUE;
}
static void no_more_pads(GstElement *decodebin, GSTImpl *This) {
static void no_more_pads(GstElement *decodebin, gpointer user) {
GSTImpl *This = (GSTImpl*)user;
TRACE("Done\n");
SetEvent(This->event);
}
typedef enum {
GST_AUTOPLUG_SELECT_TRY,
GST_AUTOPLUG_SELECT_EXPOSE,
GST_AUTOPLUG_SELECT_SKIP
} GstAutoplugSelectResult;
static GstAutoplugSelectResult autoplug_blacklist(GstElement *bin, GstPad *pad, GstCaps *caps, GstElementFactory *fact, GSTImpl *This) {
static GstAutoplugSelectResult autoplug_blacklist(GstElement *bin, GstPad *pad, GstCaps *caps, GstElementFactory *fact, gpointer user) {
const char *name = gst_element_factory_get_longname(fact);
if (strstr(name, "Player protection")) {
@ -904,7 +915,7 @@ static GstBusSyncReply watch_bus(GstBus *bus, GstMessage *msg, gpointer data) {
return GST_BUS_DROP;
}
static void unknown_type(GstElement *bin, GstPad *pad, GstCaps *caps, GSTImpl *This) {
static void unknown_type(GstElement *bin, GstPad *pad, GstCaps *caps, gpointer user) {
gchar *strcaps = gst_caps_to_string(caps);
FIXME("Could not find a filter for caps: %s\n", strcaps);
g_free(strcaps);
@ -928,7 +939,7 @@ static HRESULT GST_Connect(GSTInPin *pPin, IPin *pConnectPin, ALLOCATOR_PROPERTI
if (!This->bus) {
This->bus = gst_bus_new();
gst_bus_set_sync_handler(This->bus, watch_bus, This);
gst_bus_set_sync_handler(This->bus, watch_bus_wrapper, This);
}
This->gstfilter = gst_element_factory_make("decodebin2", NULL);
@ -938,21 +949,21 @@ static HRESULT GST_Connect(GSTInPin *pPin, IPin *pConnectPin, ALLOCATOR_PROPERTI
return E_FAIL;
}
gst_element_set_bus(This->gstfilter, This->bus);
g_signal_connect(This->gstfilter, "new-decoded-pad", G_CALLBACK(existing_new_pad), This);
g_signal_connect(This->gstfilter, "pad-removed", G_CALLBACK(removed_decoded_pad), This);
g_signal_connect(This->gstfilter, "autoplug-select", G_CALLBACK(autoplug_blacklist), This);
g_signal_connect(This->gstfilter, "unknown-type", G_CALLBACK(unknown_type), This);
g_signal_connect(This->gstfilter, "new-decoded-pad", G_CALLBACK(existing_new_pad_wrapper), This);
g_signal_connect(This->gstfilter, "pad-removed", G_CALLBACK(removed_decoded_pad_wrapper), This);
g_signal_connect(This->gstfilter, "autoplug-select", G_CALLBACK(autoplug_blacklist_wrapper), This);
g_signal_connect(This->gstfilter, "unknown-type", G_CALLBACK(unknown_type_wrapper), This);
This->my_src = gst_pad_new_from_static_template(&src_template, "quartz-src");
gst_pad_set_getrange_function(This->my_src, request_buffer_src);
gst_pad_set_checkgetrange_function(This->my_src, check_get_range);
gst_pad_set_query_function(This->my_src, query_function);
gst_pad_set_activatepush_function(This->my_src, activate_push);
gst_pad_set_event_function(This->my_src, event_src);
gst_pad_set_getrange_function(This->my_src, request_buffer_src_wrapper);
gst_pad_set_checkgetrange_function(This->my_src, check_get_range_wrapper);
gst_pad_set_query_function(This->my_src, query_function_wrapper);
gst_pad_set_activatepush_function(This->my_src, activate_push_wrapper);
gst_pad_set_event_function(This->my_src, event_src_wrapper);
gst_pad_set_element_private (This->my_src, This);
This->their_sink = gst_element_get_static_pad(This->gstfilter, "sink");
g_signal_connect(This->gstfilter, "no-more-pads", G_CALLBACK(no_more_pads), This);
g_signal_connect(This->gstfilter, "no-more-pads", G_CALLBACK(no_more_pads_wrapper), This);
ret = gst_pad_link(This->my_src, This->their_sink);
gst_object_unref(This->their_sink);
if (ret < 0) {
@ -1042,6 +1053,8 @@ IUnknown * CALLBACK Gstreamer_Splitter_create(IUnknown *punkout, HRESULT *phr) {
return NULL;
}
mark_wine_thread();
This = CoTaskMemAlloc(sizeof(*This));
obj = (IUnknown*)This;
if (!This)
@ -1150,6 +1163,8 @@ static HRESULT WINAPI GST_Stop(IBaseFilter *iface) {
TRACE("()\n");
mark_wine_thread();
if (This->gstfilter)
gst_element_set_state(This->gstfilter, GST_STATE_READY);
return S_OK;
@ -1165,6 +1180,8 @@ static HRESULT WINAPI GST_Pause(IBaseFilter *iface) {
if (!This->gstfilter)
return VFW_E_NOT_CONNECTED;
mark_wine_thread();
gst_element_get_state(This->gstfilter, &now, NULL, -1);
if (now == GST_STATE_PAUSED)
return S_OK;
@ -1187,6 +1204,8 @@ static HRESULT WINAPI GST_Run(IBaseFilter *iface, REFERENCE_TIME tStart) {
TRACE("(%s)\n", wine_dbgstr_longlong(tStart));
mark_wine_thread();
if (!This->gstfilter)
return VFW_E_NOT_CONNECTED;
@ -1236,6 +1255,8 @@ static HRESULT WINAPI GST_GetState(IBaseFilter *iface, DWORD dwMilliSecsTimeout,
TRACE("(%d, %p)\n", dwMilliSecsTimeout, pState);
mark_wine_thread();
if (!This->gstfilter) {
*pState = State_Stopped;
return S_OK;
@ -1290,6 +1311,7 @@ static HRESULT WINAPI GST_ChangeRate(IMediaSeeking *iface) {
GSTOutPin *This = impl_from_IMediaSeeking(iface);
GstEvent *ev = gst_event_new_seek(This->seek.dRate, GST_FORMAT_TIME, 0, GST_SEEK_TYPE_NONE, -1, GST_SEEK_TYPE_NONE, -1);
TRACE("(%p) New rate %g\n", iface, This->seek.dRate);
mark_wine_thread();
gst_pad_push_event(This->my_sink, ev);
return S_OK;
}
@ -1316,6 +1338,8 @@ static HRESULT WINAPI GST_Seeking_GetCurrentPosition(IMediaSeeking *iface, REFER
if (!pos)
return E_POINTER;
mark_wine_thread();
if (!This->their_src) {
*pos = This->seek.llCurrent;
TRACE("Cached value\n");
@ -1352,6 +1376,8 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface, REFERENCE_T
GstSeekType curtype, stoptype;
GstEvent *e;
mark_wine_thread();
if (!This->seek.llDuration)
return E_NOTIMPL;
@ -1425,6 +1451,7 @@ static ULONG WINAPI GST_QualityControl_Release(IQualityControl *iface)
static HRESULT WINAPI GST_QualityControl_Notify(IQualityControl *iface, IBaseFilter *sender, Quality qm) {
GSTOutPin *pin = impl_from_IQualityControl(iface);
REFERENCE_TIME late = qm.Late;
mark_wine_thread();
if (qm.Late < 0 && -qm.Late > qm.TimeStamp)
late = -qm.TimeStamp;
gst_pad_push_event(pin->my_sink, gst_event_new_qos(1000./qm.Proportion, late*100, qm.TimeStamp*100));
@ -1474,6 +1501,8 @@ static ULONG WINAPI GSTOutPin_Release(IPin *iface) {
ULONG refCount = InterlockedDecrement(&This->pin.pin.refCount);
TRACE("(%p)->() Release from %d\n", iface, refCount + 1);
mark_wine_thread();
if (!refCount) {
if (This->their_src)
gst_pad_unlink(This->their_src, This->my_sink);
@ -1608,6 +1637,7 @@ static HRESULT GST_RemoveOutputPins(GSTImpl *This) {
ULONG i;
GSTOutPin **ppOldPins = This->ppPins;
TRACE("(%p)\n", This);
mark_wine_thread();
if (!This->gstfilter)
return S_OK;
@ -1657,6 +1687,8 @@ static HRESULT WINAPI GSTInPin_ReceiveConnection(IPin *iface, IPin *pReceivePin,
TRACE("(%p/%p)->(%p, %p)\n", This, iface, pReceivePin, pmt);
dump_AM_MEDIA_TYPE(pmt);
mark_wine_thread();
EnterCriticalSection(This->pin.pCritSec);
if (!This->pin.pConnectedTo) {
ALLOCATOR_PROPERTIES props;
@ -1711,6 +1743,8 @@ static HRESULT WINAPI GSTInPin_Disconnect(IPin *iface) {
FILTER_STATE state;
TRACE("()\n");
mark_wine_thread();
hr = IBaseFilter_GetState(This->pin.pinInfo.pFilter, INFINITE, &state);
EnterCriticalSection(This->pin.pCritSec);
if (This->pin.pConnectedTo) {
@ -1830,3 +1864,175 @@ static const IPinVtbl GST_InputPin_Vtbl = {
GSTInPin_EndFlush,
GSTInPin_NewSegment
};
pthread_mutex_t cb_list_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cb_list_cond = PTHREAD_COND_INITIALIZER;
struct list cb_list = LIST_INIT(cb_list);
void CALLBACK perform_cb(TP_CALLBACK_INSTANCE *instance, void *user)
{
struct cb_data *cbdata = user;
TRACE("got cb type: 0x%x\n", cbdata->type);
switch(cbdata->type)
{
case WATCH_BUS:
{
struct watch_bus_data *data = &cbdata->u.watch_bus_data;
cbdata->u.watch_bus_data.ret = watch_bus(data->bus, data->msg, data->user);
break;
}
case EXISTING_NEW_PAD:
{
struct existing_new_pad_data *data = &cbdata->u.existing_new_pad_data;
existing_new_pad(data->bin, data->pad, data->last, data->user);
break;
}
case CHECK_GET_RANGE:
{
struct check_get_range_data *data = &cbdata->u.check_get_range_data;
cbdata->u.check_get_range_data.ret = check_get_range(data->pad);
break;
}
case QUERY_FUNCTION:
{
struct query_function_data *data = &cbdata->u.query_function_data;
cbdata->u.query_function_data.ret = query_function(data->pad, data->query);
break;
}
case ACTIVATE_PUSH:
{
struct activate_push_data *data = &cbdata->u.activate_push_data;
cbdata->u.activate_push_data.ret = activate_push(data->pad, data->activate);
break;
}
case NO_MORE_PADS:
{
struct no_more_pads_data *data = &cbdata->u.no_more_pads_data;
no_more_pads(data->decodebin, data->user);
break;
}
case REQUEST_BUFFER_SRC:
{
struct request_buffer_src_data *data = &cbdata->u.request_buffer_src_data;
cbdata->u.request_buffer_src_data.ret = request_buffer_src(data->pad,
data->ofs, data->len, data->buf);
break;
}
case EVENT_SRC:
{
struct event_src_data *data = &cbdata->u.event_src_data;
cbdata->u.event_src_data.ret = event_src(data->pad, data->event);
break;
}
case EVENT_SINK:
{
struct event_sink_data *data = &cbdata->u.event_sink_data;
cbdata->u.event_sink_data.ret = event_sink(data->pad, data->event);
break;
}
case REQUEST_BUFFER_SINK:
{
struct request_buffer_sink_data *data = &cbdata->u.request_buffer_sink_data;
cbdata->u.request_buffer_sink_data.ret = request_buffer_sink(data->pad,
data->ofs, data->size, data->caps, data->buf);
break;
}
case ACCEPT_CAPS_SINK:
{
struct accept_caps_sink_data *data = &cbdata->u.accept_caps_sink_data;
cbdata->u.accept_caps_sink_data.ret = accept_caps_sink(data->pad, data->caps);
break;
}
case SETCAPS_SINK:
{
struct setcaps_sink_data *data = &cbdata->u.setcaps_sink_data;
cbdata->u.setcaps_sink_data.ret = setcaps_sink(data->pad, data->caps);
break;
}
case GOT_DATA_SINK:
{
struct got_data_sink_data *data = &cbdata->u.got_data_sink_data;
cbdata->u.got_data_sink_data.ret = got_data_sink(data->pad, data->buf);
break;
}
case GOT_DATA:
{
struct got_data_data *data = &cbdata->u.got_data_data;
cbdata->u.got_data_data.ret = got_data(data->pad, data->buf);
break;
}
case REQUEST_BUFFER:
{
struct request_buffer_data *data = &cbdata->u.request_buffer_data;
cbdata->u.request_buffer_data.ret = request_buffer(data->pad,
data->ofs, data->size, data->caps, data->buf);
break;
}
case REMOVED_DECODED_PAD:
{
struct removed_decoded_pad_data *data = &cbdata->u.removed_decoded_pad_data;
removed_decoded_pad(data->bin, data->pad, data->user);
break;
}
case AUTOPLUG_BLACKLIST:
{
struct autoplug_blacklist_data *data = &cbdata->u.autoplug_blacklist_data;
cbdata->u.autoplug_blacklist_data.ret = autoplug_blacklist(data->bin,
data->pad, data->caps, data->fact, data->user);
break;
}
case UNKNOWN_TYPE:
{
struct unknown_type_data *data = &cbdata->u.unknown_type_data;
unknown_type(data->bin, data->pad, data->caps, data->user);
break;
}
case RELEASE_SAMPLE:
{
struct release_sample_data *data = &cbdata->u.release_sample_data;
release_sample(data->data);
break;
}
case TRANSFORM_PAD_ADDED:
{
struct transform_pad_added_data *data = &cbdata->u.transform_pad_added_data;
Gstreamer_transform_pad_added(data->filter, data->pad, data->user);
break;
}
}
pthread_mutex_lock(&cbdata->lock);
cbdata->finished = 1;
pthread_cond_broadcast(&cbdata->cond);
pthread_mutex_unlock(&cbdata->lock);
}
static DWORD WINAPI dispatch_thread(void *user)
{
struct cb_data *cbdata;
pthread_mutex_lock(&cb_list_lock);
while(1){
pthread_cond_wait(&cb_list_cond, &cb_list_lock);
while(!list_empty(&cb_list)){
cbdata = LIST_ENTRY(list_head(&cb_list), struct cb_data, entry);
list_remove(&cbdata->entry);
TrySubmitThreadpoolCallback(&perform_cb, cbdata, NULL);
}
}
pthread_mutex_unlock(&cb_list_lock);
return 0;
}
void start_dispatch_thread(void)
{
pthread_key_create(&wine_gst_key, NULL);
CloseHandle(CreateThread(NULL, 0, &dispatch_thread, NULL, 0, NULL));
}

View File

@ -28,6 +28,7 @@
#include "gst_private.h"
#include "gst_guids.h"
#include "gst_cbs.h"
#include "uuids.h"
#include "mmreg.h"
@ -125,6 +126,8 @@ static HRESULT WINAPI Gstreamer_transform_ProcessBegin(TransformFilter *iface) {
GstTfImpl *This = (GstTfImpl*)iface;
int ret;
mark_wine_thread();
ret = gst_element_set_state(This->filter, GST_STATE_PLAYING);
TRACE("Returned: %i\n", ret);
return S_OK;
@ -146,12 +149,7 @@ static HRESULT WINAPI Gstreamer_transform_DecideBufferSize(TransformFilter *tf,
return IMemAllocator_SetProperties(pAlloc, ppropInputRequest, &actual);
}
static void release_sample(void *data) {
TRACE("Releasing %p\n", data);
IMediaSample_Release((IMediaSample *)data);
}
static GstFlowReturn got_data(GstPad *pad, GstBuffer *buf) {
GstFlowReturn got_data(GstPad *pad, GstBuffer *buf) {
GstTfImpl *This = gst_pad_get_element_private(pad);
IMediaSample *sample = GST_APP_BUFFER(buf)->priv;
REFERENCE_TIME tStart, tStop;
@ -188,7 +186,7 @@ static GstFlowReturn got_data(GstPad *pad, GstBuffer *buf) {
return GST_FLOW_OK;
}
static GstFlowReturn request_buffer(GstPad *pad, guint64 ofs, guint size, GstCaps *caps, GstBuffer **buf) {
GstFlowReturn request_buffer(GstPad *pad, guint64 ofs, guint size, GstCaps *caps, GstBuffer **buf) {
GstTfImpl *This = gst_pad_get_element_private(pad);
IMediaSample *sample;
BYTE *ptr;
@ -202,7 +200,7 @@ static GstFlowReturn request_buffer(GstPad *pad, guint64 ofs, guint size, GstCap
}
IMediaSample_SetActualDataLength(sample, size);
IMediaSample_GetPointer(sample, &ptr);
*buf = gst_app_buffer_new(ptr, size, release_sample, sample);
*buf = gst_app_buffer_new(ptr, size, release_sample_wrapper, sample);
if (!*buf) {
IMediaSample_Release(sample);
@ -224,9 +222,11 @@ static HRESULT WINAPI Gstreamer_transform_ProcessData(TransformFilter *iface, IM
int ret;
TRACE("Reading %p\n", sample);
mark_wine_thread();
EnterCriticalSection(&This->tf.csReceive);
IMediaSample_GetPointer(sample, &data);
buf = gst_app_buffer_new(data, IMediaSample_GetActualDataLength(sample), release_sample, sample);
buf = gst_app_buffer_new(data, IMediaSample_GetActualDataLength(sample), release_sample_wrapper, sample);
if (!buf) {
LeaveCriticalSection(&This->tf.csReceive);
return S_OK;
@ -267,6 +267,8 @@ static HRESULT WINAPI Gstreamer_transform_ProcessEnd(TransformFilter *iface) {
GstTfImpl *This = (GstTfImpl*)iface;
int ret;
mark_wine_thread();
LeaveCriticalSection(&This->tf.csReceive);
ret = gst_element_set_state(This->filter, GST_STATE_READY);
EnterCriticalSection(&This->tf.csReceive);
@ -274,8 +276,9 @@ static HRESULT WINAPI Gstreamer_transform_ProcessEnd(TransformFilter *iface) {
return S_OK;
}
static void Gstreamer_transform_pad_added(GstElement *filter, GstPad *pad, GstTfImpl *This)
void Gstreamer_transform_pad_added(GstElement *filter, GstPad *pad, gpointer user)
{
GstTfImpl *This = (GstTfImpl*)user;
int ret;
if (!GST_PAD_IS_SRC(pad))
return;
@ -294,6 +297,8 @@ static HRESULT Gstreamer_transform_ConnectInput(GstTfImpl *This, const AM_MEDIA_
BOOL done = FALSE, found = FALSE;
int ret;
mark_wine_thread();
This->filter = gst_element_factory_make(This->gstreamer_name, NULL);
if (!This->filter) {
FIXME("Could not make %s filter\n", This->gstreamer_name);
@ -303,8 +308,8 @@ static HRESULT Gstreamer_transform_ConnectInput(GstTfImpl *This, const AM_MEDIA_
gst_pad_set_element_private (This->my_src, This);
This->my_sink = gst_pad_new(NULL, GST_PAD_SINK);
gst_pad_set_chain_function(This->my_sink, got_data);
gst_pad_set_bufferalloc_function(This->my_sink, request_buffer);
gst_pad_set_chain_function(This->my_sink, got_data_wrapper);
gst_pad_set_bufferalloc_function(This->my_sink, request_buffer_wrapper);
gst_pad_set_element_private (This->my_sink, This);
ret = gst_pad_set_caps(This->my_src, capsin);
@ -362,7 +367,7 @@ static HRESULT Gstreamer_transform_ConnectInput(GstTfImpl *This, const AM_MEDIA_
gst_iterator_free(it);
found = !!This->their_src;
if (!found)
g_signal_connect(This->filter, "pad-added", G_CALLBACK(Gstreamer_transform_pad_added), This);
g_signal_connect(This->filter, "pad-added", G_CALLBACK(Gstreamer_transform_pad_added_wrapper), This);
ret = gst_pad_link(This->my_src, This->their_sink);
if (ret < 0) {
WARN("Failed to link with %i\n", ret);
@ -382,6 +387,8 @@ static HRESULT Gstreamer_transform_ConnectInput(GstTfImpl *This, const AM_MEDIA_
static HRESULT WINAPI Gstreamer_transform_Cleanup(TransformFilter *tf, PIN_DIRECTION dir) {
GstTfImpl *This = (GstTfImpl*)tf;
mark_wine_thread();
if (dir == PINDIR_INPUT)
{
if (This->filter) {
@ -405,6 +412,7 @@ static HRESULT WINAPI Gstreamer_transform_Cleanup(TransformFilter *tf, PIN_DIREC
static HRESULT WINAPI Gstreamer_transform_EndOfStream(TransformFilter *iface) {
GstTfImpl *This = (GstTfImpl*)iface;
TRACE("%p\n", This);
mark_wine_thread();
gst_pad_push_event(This->my_src, gst_event_new_eos());
return S_OK;
@ -413,6 +421,7 @@ static HRESULT WINAPI Gstreamer_transform_EndOfStream(TransformFilter *iface) {
static HRESULT WINAPI Gstreamer_transform_BeginFlush(TransformFilter *iface) {
GstTfImpl *This = (GstTfImpl*)iface;
TRACE("%p\n", This);
mark_wine_thread();
gst_pad_push_event(This->my_src, gst_event_new_flush_start());
return S_OK;
@ -421,6 +430,7 @@ static HRESULT WINAPI Gstreamer_transform_BeginFlush(TransformFilter *iface) {
static HRESULT WINAPI Gstreamer_transform_EndFlush(TransformFilter *iface) {
GstTfImpl *This = (GstTfImpl*)iface;
TRACE("%p\n", This);
mark_wine_thread();
gst_pad_push_event(This->my_src, gst_event_new_flush_stop());
return S_OK;
@ -429,6 +439,7 @@ static HRESULT WINAPI Gstreamer_transform_EndFlush(TransformFilter *iface) {
static HRESULT WINAPI Gstreamer_transform_NewSegment(TransformFilter *iface, REFERENCE_TIME tStart, REFERENCE_TIME tStop, double dRate) {
GstTfImpl *This = (GstTfImpl*)iface;
TRACE("%p\n", This);
mark_wine_thread();
gst_pad_push_event(This->my_src, gst_event_new_new_segment_full(1,
1.0, dRate, GST_FORMAT_TIME, 0, tStop <= tStart ? -1 : tStop * 100, tStart*100));
@ -438,6 +449,7 @@ static HRESULT WINAPI Gstreamer_transform_NewSegment(TransformFilter *iface, REF
static HRESULT WINAPI Gstreamer_transform_QOS(TransformFilter *iface, IBaseFilter *sender, Quality qm) {
GstTfImpl *This = (GstTfImpl*)iface;
REFERENCE_TIME late = qm.Late;
mark_wine_thread();
if (qm.Late < 0 && -qm.Late > qm.TimeStamp)
late = -qm.TimeStamp;
gst_pad_push_event(This->my_sink, gst_event_new_qos(1000. / qm.Proportion, late * 100, qm.TimeStamp * 100));
@ -480,6 +492,8 @@ static HRESULT WINAPI Gstreamer_Mp3_SetMediaType(TransformFilter *tf, PIN_DIRECT
HRESULT hr;
int layer;
mark_wine_thread();
if (dir != PINDIR_INPUT)
return S_OK;
@ -572,6 +586,7 @@ IUnknown * CALLBACK Gstreamer_Mp3_create(IUnknown *punkout, HRESULT *phr)
*phr = E_FAIL;
return NULL;
}
mark_wine_thread();
plugin = Gstreamer_FindMatch("audio/mpeg, mpegversion=(int) 1");
if (!plugin)
{
@ -620,6 +635,8 @@ static HRESULT WINAPI Gstreamer_YUV_SetMediaType(TransformFilter *tf, PIN_DIRECT
int avgtime;
LONG width, height;
mark_wine_thread();
if (dir != PINDIR_INPUT)
return S_OK;
@ -737,6 +754,8 @@ static HRESULT WINAPI Gstreamer_AudioConvert_SetMediaType(TransformFilter *tf, P
BOOL inisfloat = FALSE;
int indepth;
mark_wine_thread();
if (dir != PINDIR_INPUT)
return S_OK;

View File

@ -254,7 +254,6 @@ DWORD Gstreamer_init(void) {
argv[0] = argv0;
argv[1] = argv1;
argv[2] = NULL;
g_thread_impl_init();
inited = gst_init_check(&argc, &argv, &err);
HeapFree(GetProcessHeap(), 0, argv);
if (err) {
@ -269,6 +268,8 @@ DWORD Gstreamer_init(void) {
(LPCWSTR)hInst, &newhandle);
if (!newhandle)
ERR("Could not pin module %p\n", hInst);
start_dispatch_thread();
}
}
return inited;