rpcrt4: Use non-blocking listening on named pipes.

Signed-off-by: Jacek Caban <jacek@codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
Jacek Caban 2017-05-22 16:04:31 +02:00 committed by Alexandre Julliard
parent 4f4ac8c40e
commit 6e7a2978e7
1 changed files with 62 additions and 77 deletions

View File

@ -38,6 +38,7 @@
#include "winerror.h"
#include "wininet.h"
#include "winternl.h"
#include "winioctl.h"
#include "wine/unicode.h"
#include "rpc.h"
@ -65,8 +66,8 @@ typedef struct _RpcConnection_np
{
RpcConnection common;
HANDLE pipe;
HANDLE listen_thread;
BOOL listening;
HANDLE listen_event;
IO_STATUS_BLOCK io_status;
} RpcConnection_np;
static RpcConnection *rpcrt4_conn_np_alloc(void)
@ -85,49 +86,6 @@ static void release_np_event(HANDLE event)
CloseHandle(event);
}
static DWORD CALLBACK listen_thread(void *arg)
{
RpcConnection_np *npc = arg;
for (;;)
{
if (ConnectNamedPipe(npc->pipe, NULL))
return RPC_S_OK;
switch(GetLastError())
{
case ERROR_PIPE_CONNECTED:
return RPC_S_OK;
case ERROR_HANDLES_CLOSED:
/* connection closed during listen */
return RPC_S_NO_CONTEXT_AVAILABLE;
case ERROR_NO_DATA_DETECTED:
/* client has disconnected, retry */
DisconnectNamedPipe( npc->pipe );
break;
default:
npc->listening = FALSE;
WARN("Couldn't ConnectNamedPipe (error was %d)\n", GetLastError());
return RPC_S_OUT_OF_RESOURCES;
}
}
}
static RPC_STATUS rpcrt4_conn_listen_pipe(RpcConnection_np *npc)
{
if (npc->listening)
return RPC_S_OK;
npc->listening = TRUE;
npc->listen_thread = CreateThread(NULL, 0, listen_thread, npc, 0, NULL);
if (!npc->listen_thread)
{
npc->listening = FALSE;
ERR("Couldn't create listen thread (error was %d)\n", GetLastError());
return RPC_S_OUT_OF_RESOURCES;
}
return RPC_S_OK;
}
static RPC_STATUS rpcrt4_conn_create_pipe(RpcConnection *Connection, LPCSTR pname)
{
RpcConnection_np *npc = (RpcConnection_np *) Connection;
@ -337,10 +295,8 @@ static void rpcrt4_conn_np_handoff(RpcConnection_np *old_npc, RpcConnection_np *
* to the child, then reopen the server binding to continue listening */
new_npc->pipe = old_npc->pipe;
new_npc->listen_thread = old_npc->listen_thread;
old_npc->pipe = 0;
old_npc->listen_thread = 0;
old_npc->listening = FALSE;
assert(!old_npc->listen_event);
}
static RPC_STATUS rpcrt4_ncacn_np_handoff(RpcConnection *old_conn, RpcConnection *new_conn)
@ -465,17 +421,19 @@ static int rpcrt4_conn_np_write(RpcConnection *conn, const void *buffer, unsigne
return count;
}
static int rpcrt4_conn_np_close(RpcConnection *Connection)
static int rpcrt4_conn_np_close(RpcConnection *conn)
{
RpcConnection_np *npc = (RpcConnection_np *) Connection;
if (npc->pipe) {
FlushFileBuffers(npc->pipe);
CloseHandle(npc->pipe);
npc->pipe = 0;
RpcConnection_np *connection = (RpcConnection_np *) conn;
if (connection->pipe)
{
FlushFileBuffers(connection->pipe);
CloseHandle(connection->pipe);
connection->pipe = 0;
}
if (npc->listen_thread) {
CloseHandle(npc->listen_thread);
npc->listen_thread = 0;
if (connection->listen_event)
{
CloseHandle(connection->listen_event);
connection->listen_event = 0;
}
return 0;
}
@ -677,8 +635,32 @@ static void *rpcrt4_protseq_np_get_wait_array(RpcServerProtseq *protseq, void *p
/* open and count connections */
*count = 1;
LIST_FOR_EACH_ENTRY(conn, &protseq->connections, RpcConnection_np, common.protseq_entry) {
rpcrt4_conn_listen_pipe(conn);
if (conn->listen_thread)
if (!conn->listen_event)
{
NTSTATUS status;
HANDLE event;
event = get_np_event();
if (!event)
continue;
status = NtFsControlFile(conn->pipe, event, NULL, NULL, &conn->io_status, FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0);
switch (status)
{
case STATUS_SUCCESS:
case STATUS_PIPE_CONNECTED:
conn->io_status.Status = status;
SetEvent(event);
break;
case STATUS_PENDING:
break;
default:
ERR("pipe listen error %x\n", status);
continue;
}
conn->listen_event = event;
}
(*count)++;
}
@ -697,8 +679,8 @@ static void *rpcrt4_protseq_np_get_wait_array(RpcServerProtseq *protseq, void *p
objs[0] = npps->mgr_event;
*count = 1;
LIST_FOR_EACH_ENTRY(conn, &protseq->connections, RpcConnection_np, common.protseq_entry) {
if ((objs[*count] = conn->listen_thread))
(*count)++;
if (conn->listen_event)
objs[(*count)++] = conn->listen_event;
}
LeaveCriticalSection(&protseq->cs);
return objs;
@ -741,13 +723,16 @@ static int rpcrt4_protseq_np_wait_for_new_connection(RpcServerProtseq *protseq,
b_handle = objs[res - WAIT_OBJECT_0];
/* find which connection got a RPC */
EnterCriticalSection(&protseq->cs);
LIST_FOR_EACH_ENTRY(conn, &protseq->connections, RpcConnection_np, common.protseq_entry) {
if (b_handle == conn->listen_thread) {
DWORD exit_code;
if (GetExitCodeThread(conn->listen_thread, &exit_code) && exit_code == RPC_S_OK)
LIST_FOR_EACH_ENTRY(conn, &protseq->connections, RpcConnection_np, common.protseq_entry)
{
if (b_handle == conn->listen_event)
{
release_np_event(conn->listen_event);
conn->listen_event = NULL;
if (conn->io_status.Status == STATUS_SUCCESS || conn->io_status.Status == STATUS_PIPE_CONNECTED)
RPCRT4_SpawnConnection(&cconn, &conn->common);
CloseHandle(conn->listen_thread);
conn->listen_thread = 0;
else
ERR("listen failed %x\n", conn->io_status.Status);
break;
}
}