rpcrt4: Abstract out the way incoming connections are waited for so that we no longer need to wait on Win32 handles.
This commit is contained in:
parent
92c3979433
commit
1ceeb058e3
|
@ -347,72 +347,94 @@ static void RPCRT4_new_client(RpcConnection* conn)
|
|||
CloseHandle( thread );
|
||||
}
|
||||
|
||||
static DWORD CALLBACK RPCRT4_server_thread(LPVOID the_arg)
|
||||
typedef struct _RpcServerProtseq_np
|
||||
{
|
||||
HANDLE m_event, b_handle;
|
||||
HANDLE *objs = NULL;
|
||||
DWORD count, res;
|
||||
RpcServerProtseq* cps = the_arg;
|
||||
RpcServerProtseq common;
|
||||
HANDLE mgr_event;
|
||||
} RpcServerProtseq_np;
|
||||
|
||||
static RpcServerProtseq *rpcrt4_protseq_np_alloc(void)
|
||||
{
|
||||
RpcServerProtseq_np *ps = HeapAlloc(GetProcessHeap(), 0, sizeof(*ps));
|
||||
if (ps)
|
||||
ps->mgr_event = CreateEventW(NULL, FALSE, FALSE, NULL);
|
||||
return &ps->common;
|
||||
}
|
||||
|
||||
static void rpcrt4_protseq_np_signal_state_changed(RpcServerProtseq *protseq)
|
||||
{
|
||||
RpcServerProtseq_np *npps = CONTAINING_RECORD(protseq, RpcServerProtseq_np, common);
|
||||
SetEvent(npps->mgr_event);
|
||||
}
|
||||
|
||||
static void *rpcrt4_protseq_np_get_wait_array(RpcServerProtseq *protseq, void *prev_array, unsigned int *count)
|
||||
{
|
||||
HANDLE *objs = prev_array;
|
||||
RpcConnection* conn;
|
||||
RpcConnection* cconn;
|
||||
BOOL set_ready_event = FALSE;
|
||||
RpcServerProtseq_np *npps = CONTAINING_RECORD(protseq, RpcServerProtseq_np, common);
|
||||
|
||||
TRACE("(the_arg == ^%p)\n", the_arg);
|
||||
|
||||
m_event = cps->mgr_event;
|
||||
|
||||
for (;;) {
|
||||
EnterCriticalSection(&server_cs);
|
||||
/* open and count connections */
|
||||
count = 1;
|
||||
conn = cps->conn;
|
||||
*count = 1;
|
||||
conn = protseq->conn;
|
||||
while (conn) {
|
||||
RPCRT4_OpenConnection(conn);
|
||||
if (rpcrt4_conn_get_wait_object(conn))
|
||||
count++;
|
||||
(*count)++;
|
||||
conn = conn->Next;
|
||||
}
|
||||
|
||||
/* make array of connections */
|
||||
if (objs)
|
||||
objs = HeapReAlloc(GetProcessHeap(), 0, objs, count*sizeof(HANDLE));
|
||||
objs = HeapReAlloc(GetProcessHeap(), 0, objs, *count*sizeof(HANDLE));
|
||||
else
|
||||
objs = HeapAlloc(GetProcessHeap(), 0, count*sizeof(HANDLE));
|
||||
objs = HeapAlloc(GetProcessHeap(), 0, *count*sizeof(HANDLE));
|
||||
if (!objs)
|
||||
{
|
||||
ERR("couldn't allocate objs\n");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
objs[0] = m_event;
|
||||
count = 1;
|
||||
conn = cps->conn;
|
||||
objs[0] = npps->mgr_event;
|
||||
*count = 1;
|
||||
conn = protseq->conn;
|
||||
while (conn) {
|
||||
if ((objs[count] = rpcrt4_conn_get_wait_object(conn)))
|
||||
count++;
|
||||
if ((objs[*count] = rpcrt4_conn_get_wait_object(conn)))
|
||||
(*count)++;
|
||||
conn = conn->Next;
|
||||
}
|
||||
LeaveCriticalSection(&server_cs);
|
||||
|
||||
if (set_ready_event)
|
||||
{
|
||||
/* signal to function that changed state that we are now sync'ed */
|
||||
SetEvent(cps->server_ready_event);
|
||||
set_ready_event = FALSE;
|
||||
return objs;
|
||||
}
|
||||
|
||||
/* start waiting */
|
||||
static void rpcrt4_protseq_np_free_wait_array(RpcServerProtseq *protseq, void *array)
|
||||
{
|
||||
HeapFree(GetProcessHeap(), 0, array);
|
||||
}
|
||||
|
||||
static int rpcrt4_protseq_np_wait_for_new_connection(RpcServerProtseq *protseq, unsigned int count, void *wait_array)
|
||||
{
|
||||
HANDLE b_handle;
|
||||
HANDLE *objs = wait_array;
|
||||
DWORD res;
|
||||
RpcConnection* cconn;
|
||||
RpcConnection* conn;
|
||||
|
||||
if (!objs)
|
||||
return -1;
|
||||
|
||||
res = WaitForMultipleObjects(count, objs, FALSE, INFINITE);
|
||||
if (res == WAIT_OBJECT_0) {
|
||||
if (!std_listen)
|
||||
if (res == WAIT_OBJECT_0)
|
||||
return 0;
|
||||
else if (res == WAIT_FAILED)
|
||||
{
|
||||
SetEvent(cps->server_ready_event);
|
||||
break;
|
||||
ERR("wait failed with error %ld\n", GetLastError());
|
||||
return -1;
|
||||
}
|
||||
set_ready_event = TRUE;
|
||||
}
|
||||
else if (res == WAIT_FAILED) {
|
||||
ERR("wait failed\n");
|
||||
}
|
||||
else {
|
||||
else
|
||||
{
|
||||
b_handle = objs[res - WAIT_OBJECT_0];
|
||||
/* find which connection got a RPC */
|
||||
EnterCriticalSection(&server_cs);
|
||||
conn = cps->conn;
|
||||
conn = protseq->conn;
|
||||
while (conn) {
|
||||
if (b_handle == rpcrt4_conn_get_wait_object(conn)) break;
|
||||
conn = conn->Next;
|
||||
|
@ -423,10 +445,90 @@ static DWORD CALLBACK RPCRT4_server_thread(LPVOID the_arg)
|
|||
else
|
||||
ERR("failed to locate connection for handle %p\n", b_handle);
|
||||
LeaveCriticalSection(&server_cs);
|
||||
if (cconn) RPCRT4_new_client(cconn);
|
||||
if (cconn)
|
||||
{
|
||||
RPCRT4_new_client(cconn);
|
||||
return 1;
|
||||
}
|
||||
else return -1;
|
||||
}
|
||||
}
|
||||
HeapFree(GetProcessHeap(), 0, objs);
|
||||
|
||||
static const struct protseq_ops protseq_list[] =
|
||||
{
|
||||
{
|
||||
"ncacn_np",
|
||||
rpcrt4_protseq_np_alloc,
|
||||
rpcrt4_protseq_np_signal_state_changed,
|
||||
rpcrt4_protseq_np_get_wait_array,
|
||||
rpcrt4_protseq_np_free_wait_array,
|
||||
rpcrt4_protseq_np_wait_for_new_connection,
|
||||
},
|
||||
{
|
||||
"ncalrpc",
|
||||
rpcrt4_protseq_np_alloc,
|
||||
rpcrt4_protseq_np_signal_state_changed,
|
||||
rpcrt4_protseq_np_get_wait_array,
|
||||
rpcrt4_protseq_np_free_wait_array,
|
||||
rpcrt4_protseq_np_wait_for_new_connection,
|
||||
},
|
||||
{
|
||||
"ncacn_ip_tcp",
|
||||
rpcrt4_protseq_np_alloc,
|
||||
rpcrt4_protseq_np_signal_state_changed,
|
||||
rpcrt4_protseq_np_get_wait_array,
|
||||
rpcrt4_protseq_np_free_wait_array,
|
||||
rpcrt4_protseq_np_wait_for_new_connection,
|
||||
},
|
||||
};
|
||||
|
||||
static const struct protseq_ops *rpcrt4_get_protseq_ops(const char *protseq)
|
||||
{
|
||||
int i;
|
||||
for(i=0; i < sizeof(protseq_list)/sizeof(protseq_list[0]); i++)
|
||||
if (!strcmp(protseq_list[i].name, protseq))
|
||||
return &protseq_list[i];
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static DWORD CALLBACK RPCRT4_server_thread(LPVOID the_arg)
|
||||
{
|
||||
int res;
|
||||
unsigned int count;
|
||||
void *objs = NULL;
|
||||
RpcServerProtseq* cps = the_arg;
|
||||
RpcConnection* conn;
|
||||
BOOL set_ready_event = FALSE;
|
||||
|
||||
TRACE("(the_arg == ^%p)\n", the_arg);
|
||||
|
||||
for (;;) {
|
||||
EnterCriticalSection(&server_cs);
|
||||
objs = cps->ops->get_wait_array(cps, objs, &count);
|
||||
LeaveCriticalSection(&server_cs);
|
||||
|
||||
if (set_ready_event)
|
||||
{
|
||||
/* signal to function that changed state that we are now sync'ed */
|
||||
SetEvent(cps->server_ready_event);
|
||||
set_ready_event = FALSE;
|
||||
}
|
||||
|
||||
/* start waiting */
|
||||
res = cps->ops->wait_for_new_connection(cps, count, objs);
|
||||
if (res == -1)
|
||||
break;
|
||||
else if (res == 0)
|
||||
{
|
||||
if (!std_listen)
|
||||
{
|
||||
SetEvent(cps->server_ready_event);
|
||||
break;
|
||||
}
|
||||
set_ready_event = TRUE;
|
||||
}
|
||||
}
|
||||
cps->ops->free_wait_array(cps, objs);
|
||||
EnterCriticalSection(&server_cs);
|
||||
/* close connections */
|
||||
conn = cps->conn;
|
||||
|
@ -447,7 +549,8 @@ static void RPCRT4_sync_with_server_thread(RpcServerProtseq *ps)
|
|||
* the server_ready_event when the new state hasn't yet been applied */
|
||||
WaitForSingleObject(ps->mgr_mutex, INFINITE);
|
||||
|
||||
SetEvent(ps->mgr_event);
|
||||
ps->ops->signal_state_changed(ps);
|
||||
|
||||
/* wait for server thread to make the requested changes before returning */
|
||||
WaitForSingleObject(ps->server_ready_event, INFINITE);
|
||||
|
||||
|
@ -463,7 +566,6 @@ static RPC_STATUS RPCRT4_start_listen_protseq(RpcServerProtseq *ps, BOOL auto_li
|
|||
if (ps->is_listening) goto done;
|
||||
|
||||
if (!ps->mgr_mutex) ps->mgr_mutex = CreateMutexW(NULL, FALSE, NULL);
|
||||
if (!ps->mgr_event) ps->mgr_event = CreateEventW(NULL, FALSE, FALSE, NULL);
|
||||
if (!ps->server_ready_event) ps->server_ready_event = CreateEventW(NULL, FALSE, FALSE, NULL);
|
||||
server_thread = CreateThread(NULL, 0, RPCRT4_server_thread, ps, 0, NULL);
|
||||
if (!server_thread)
|
||||
|
@ -654,11 +756,23 @@ RPC_STATUS WINAPI RpcServerUseProtseqEpW( RPC_WSTR Protseq, UINT MaxCalls, RPC_W
|
|||
static RpcServerProtseq *alloc_serverprotoseq(UINT MaxCalls, char *Protseq, char *Endpoint)
|
||||
{
|
||||
RpcServerProtseq* ps;
|
||||
const struct protseq_ops *ops = rpcrt4_get_protseq_ops(Protseq);
|
||||
|
||||
ps = HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(RpcServerProtseq));
|
||||
if (!ops)
|
||||
return NULL;
|
||||
|
||||
ps = ops->alloc();
|
||||
if (!ps)
|
||||
return NULL;
|
||||
ps->MaxCalls = MaxCalls;
|
||||
ps->Protseq = Protseq;
|
||||
ps->Endpoint = Endpoint;
|
||||
ps->ops = ops;
|
||||
ps->MaxCalls = 0;
|
||||
ps->conn = NULL;
|
||||
ps->is_listening = FALSE;
|
||||
ps->mgr_mutex = NULL;
|
||||
ps->server_ready_event = NULL;
|
||||
|
||||
return ps;
|
||||
}
|
||||
|
|
|
@ -23,8 +23,11 @@
|
|||
|
||||
#include "rpc_binding.h"
|
||||
|
||||
struct protseq_ops;
|
||||
|
||||
typedef struct _RpcServerProtseq
|
||||
{
|
||||
const struct protseq_ops *ops;
|
||||
struct _RpcServerProtseq* Next;
|
||||
LPSTR Protseq;
|
||||
LPSTR Endpoint;
|
||||
|
@ -33,14 +36,25 @@ typedef struct _RpcServerProtseq
|
|||
|
||||
/* is the server currently listening? */
|
||||
BOOL is_listening;
|
||||
/* set on change of configuration (e.g. listening on new protseq) */
|
||||
HANDLE mgr_event;
|
||||
/* mutex for ensuring only one thread can change state at a time */
|
||||
HANDLE mgr_mutex;
|
||||
/* set when server thread has finished opening connections */
|
||||
HANDLE server_ready_event;
|
||||
} RpcServerProtseq;
|
||||
|
||||
struct protseq_ops
|
||||
{
|
||||
const char *name;
|
||||
RpcServerProtseq *(*alloc)(void);
|
||||
void (*signal_state_changed)(RpcServerProtseq *protseq);
|
||||
/* previous array is passed in to allow reuse of memory */
|
||||
void *(*get_wait_array)(RpcServerProtseq *protseq, void *prev_array, unsigned int *count);
|
||||
void (*free_wait_array)(RpcServerProtseq *protseq, void *array);
|
||||
/* returns -1 for failure, 0 for server state changed and 1 to indicate a
|
||||
* new connection was established */
|
||||
int (*wait_for_new_connection)(RpcServerProtseq *protseq, unsigned int count, void *wait_array);
|
||||
};
|
||||
|
||||
typedef struct _RpcServerInterface
|
||||
{
|
||||
struct _RpcServerInterface* Next;
|
||||
|
|
Loading…
Reference in New Issue