rpcrt4: Implement asynchronous RPC support.

This commit is contained in:
Rob Shearman 2008-01-21 10:31:40 +00:00 committed by Alexandre Julliard
parent d757432c7d
commit 5f077bab07
4 changed files with 100 additions and 3 deletions

View File

@ -6,7 +6,7 @@ VPATH = @srcdir@
MODULE = rpcrt4.dll MODULE = rpcrt4.dll
IMPORTLIB = librpcrt4.$(IMPLIBEXT) IMPORTLIB = librpcrt4.$(IMPLIBEXT)
IMPORTS = iphlpapi advapi32 kernel32 ntdll IMPORTS = iphlpapi advapi32 kernel32 ntdll
DELAYIMPORTS = secur32 DELAYIMPORTS = secur32 user32
EXTRALIBS = -luuid EXTRALIBS = -luuid
C_SRCS = \ C_SRCS = \

View File

@ -73,6 +73,7 @@ typedef struct _RpcConnection
/* client-only */ /* client-only */
struct list conn_pool_entry; struct list conn_pool_entry;
ULONG assoc_group_id; /* association group returned during binding */ ULONG assoc_group_id; /* association group returned during binding */
RPC_ASYNC_STATE *async_state;
/* server-only */ /* server-only */
/* The active interface bound to server. */ /* The active interface bound to server. */
@ -92,6 +93,7 @@ struct connection_ops {
int (*write)(RpcConnection *conn, const void *buffer, unsigned int len); int (*write)(RpcConnection *conn, const void *buffer, unsigned int len);
int (*close)(RpcConnection *conn); int (*close)(RpcConnection *conn);
void (*cancel_call)(RpcConnection *conn); void (*cancel_call)(RpcConnection *conn);
int (*wait_for_incoming_data)(RpcConnection *conn);
size_t (*get_top_of_tower)(unsigned char *tower_data, const char *networkaddr, const char *endpoint); size_t (*get_top_of_tower)(unsigned char *tower_data, const char *networkaddr, const char *endpoint);
RPC_STATUS (*parse_top_of_tower)(const unsigned char *tower_data, size_t tower_size, char **networkaddr, char **endpoint); RPC_STATUS (*parse_top_of_tower)(const unsigned char *tower_data, size_t tower_size, char **networkaddr, char **endpoint);
}; };

View File

@ -27,6 +27,7 @@
#include "windef.h" #include "windef.h"
#include "winbase.h" #include "winbase.h"
#include "winerror.h" #include "winerror.h"
#include "winuser.h"
#include "rpc.h" #include "rpc.h"
#include "rpcndr.h" #include "rpcndr.h"
@ -1039,6 +1040,49 @@ RPC_STATUS WINAPI I_RpcFreeBuffer(PRPC_MESSAGE pMsg)
return RPC_S_OK; return RPC_S_OK;
} }
static void CALLBACK async_apc_notifier_proc(ULONG_PTR ulParam)
{
RPC_ASYNC_STATE *state = (RPC_ASYNC_STATE *)ulParam;
state->u.APC.NotificationRoutine(state, NULL, state->Event);
}
static DWORD WINAPI async_notifier_proc(LPVOID p)
{
RpcConnection *conn = p;
RPC_ASYNC_STATE *state = conn->async_state;
if (state && !conn->ops->wait_for_incoming_data(conn))
{
state->Event = RpcCallComplete;
switch (state->NotificationType)
{
case RpcNotificationTypeEvent:
SetEvent(state->u.hEvent);
break;
case RpcNotificationTypeApc:
QueueUserAPC(async_apc_notifier_proc, state->u.APC.hThread, (ULONG_PTR)state);
break;
case RpcNotificationTypeIoc:
PostQueuedCompletionStatus(state->u.IOC.hIOPort,
state->u.IOC.dwNumberOfBytesTransferred,
state->u.IOC.dwCompletionKey,
state->u.IOC.lpOverlapped);
break;
case RpcNotificationTypeHwnd:
PostMessageW(state->u.HWND.hWnd, state->u.HWND.Msg, 0, 0);
break;
case RpcNotificationTypeCallback:
state->u.NotificationRoutine(state, NULL, state->Event);
break;
case RpcNotificationTypeNone:
default:
break;
}
}
return 0;
}
/*********************************************************************** /***********************************************************************
* I_RpcSend [RPCRT4.@] * I_RpcSend [RPCRT4.@]
* *
@ -1080,6 +1124,12 @@ RPC_STATUS WINAPI I_RpcSend(PRPC_MESSAGE pMsg)
RPCRT4_FreeHeader(hdr); RPCRT4_FreeHeader(hdr);
if (status == RPC_S_OK && pMsg->RpcFlags & RPC_BUFFER_ASYNC)
{
if (!QueueUserWorkItem(async_notifier_proc, conn, WT_EXECUTEDEFAULT | WT_EXECUTELONGFUNCTION))
status = RPC_S_OUT_OF_RESOURCES;
}
return status; return status;
} }
@ -1198,8 +1248,17 @@ RPC_STATUS WINAPI I_RpcSendReceive(PRPC_MESSAGE pMsg)
*/ */
RPC_STATUS WINAPI I_RpcAsyncSetHandle(PRPC_MESSAGE pMsg, PRPC_ASYNC_STATE pAsync) RPC_STATUS WINAPI I_RpcAsyncSetHandle(PRPC_MESSAGE pMsg, PRPC_ASYNC_STATE pAsync)
{ {
FIXME("(%p, %p): stub\n", pMsg, pAsync); RpcBinding* bind = (RpcBinding*)pMsg->Handle;
return RPC_S_INVALID_BINDING; RpcConnection *conn;
TRACE("(%p, %p)\n", pMsg, pAsync);
if (!bind || bind->server || !pMsg->ReservedForRuntime) return RPC_S_INVALID_BINDING;
conn = pMsg->ReservedForRuntime;
conn->async_state = pAsync;
return RPC_S_OK;
} }
/*********************************************************************** /***********************************************************************

View File

@ -410,6 +410,12 @@ static void rpcrt4_conn_np_cancel_call(RpcConnection *Connection)
/* FIXME: implement when named pipe writes use overlapped I/O */ /* FIXME: implement when named pipe writes use overlapped I/O */
} }
static int rpcrt4_conn_np_wait_for_incoming_data(RpcConnection *Connection)
{
/* FIXME: implement when named pipe writes use overlapped I/O */
return -1;
}
static size_t rpcrt4_ncacn_np_get_top_of_tower(unsigned char *tower_data, static size_t rpcrt4_ncacn_np_get_top_of_tower(unsigned char *tower_data,
const char *networkaddr, const char *networkaddr,
const char *endpoint) const char *endpoint)
@ -1047,6 +1053,32 @@ static void rpcrt4_conn_tcp_cancel_call(RpcConnection *Connection)
write(tcpc->cancel_fds[1], &dummy, 1); write(tcpc->cancel_fds[1], &dummy, 1);
} }
static int rpcrt4_conn_tcp_wait_for_incoming_data(RpcConnection *Connection)
{
RpcConnection_tcp *tcpc = (RpcConnection_tcp *) Connection;
struct pollfd pfds[2];
TRACE("%p\n", Connection);
pfds[0].fd = tcpc->sock;
pfds[0].events = POLLIN;
pfds[1].fd = tcpc->cancel_fds[0];
pfds[1].events = POLLIN;
if (poll(pfds, 2, -1 /* infinite */) == -1 && errno != EINTR)
{
ERR("poll() failed: %s\n", strerror(errno));
return -1;
}
if (pfds[1].revents & POLLIN) /* canceled */
{
char dummy;
read(pfds[1].fd, &dummy, sizeof(dummy));
return -1;
}
return 0;
}
static size_t rpcrt4_ncacn_ip_tcp_get_top_of_tower(unsigned char *tower_data, static size_t rpcrt4_ncacn_ip_tcp_get_top_of_tower(unsigned char *tower_data,
const char *networkaddr, const char *networkaddr,
const char *endpoint) const char *endpoint)
@ -1330,6 +1362,7 @@ static const struct connection_ops conn_protseq_list[] = {
rpcrt4_conn_np_write, rpcrt4_conn_np_write,
rpcrt4_conn_np_close, rpcrt4_conn_np_close,
rpcrt4_conn_np_cancel_call, rpcrt4_conn_np_cancel_call,
rpcrt4_conn_np_wait_for_incoming_data,
rpcrt4_ncacn_np_get_top_of_tower, rpcrt4_ncacn_np_get_top_of_tower,
rpcrt4_ncacn_np_parse_top_of_tower, rpcrt4_ncacn_np_parse_top_of_tower,
}, },
@ -1342,6 +1375,7 @@ static const struct connection_ops conn_protseq_list[] = {
rpcrt4_conn_np_write, rpcrt4_conn_np_write,
rpcrt4_conn_np_close, rpcrt4_conn_np_close,
rpcrt4_conn_np_cancel_call, rpcrt4_conn_np_cancel_call,
rpcrt4_conn_np_wait_for_incoming_data,
rpcrt4_ncalrpc_get_top_of_tower, rpcrt4_ncalrpc_get_top_of_tower,
rpcrt4_ncalrpc_parse_top_of_tower, rpcrt4_ncalrpc_parse_top_of_tower,
}, },
@ -1354,6 +1388,7 @@ static const struct connection_ops conn_protseq_list[] = {
rpcrt4_conn_tcp_write, rpcrt4_conn_tcp_write,
rpcrt4_conn_tcp_close, rpcrt4_conn_tcp_close,
rpcrt4_conn_tcp_cancel_call, rpcrt4_conn_tcp_cancel_call,
rpcrt4_conn_tcp_wait_for_incoming_data,
rpcrt4_ncacn_ip_tcp_get_top_of_tower, rpcrt4_ncacn_ip_tcp_get_top_of_tower,
rpcrt4_ncacn_ip_tcp_parse_top_of_tower, rpcrt4_ncacn_ip_tcp_parse_top_of_tower,
} }
@ -1470,6 +1505,7 @@ RPC_STATUS RPCRT4_CreateConnection(RpcConnection** Connection, BOOL server,
NewConnection->QOS = QOS; NewConnection->QOS = QOS;
list_init(&NewConnection->conn_pool_entry); list_init(&NewConnection->conn_pool_entry);
NewConnection->async_state = NULL;
TRACE("connection: %p\n", NewConnection); TRACE("connection: %p\n", NewConnection);
*Connection = NewConnection; *Connection = NewConnection;