server: Connect pipe servers in order that they enter listening state.

Signed-off-by: Jacek Caban <jacek@codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
Jacek Caban 2019-03-11 17:03:23 +01:00 committed by Alexandre Julliard
parent 66c321a4b8
commit be3ae0131e
2 changed files with 123 additions and 45 deletions

View File

@ -3673,15 +3673,21 @@ static void test_namedpipe_session_id(void)
static void test_multiple_instances(void) static void test_multiple_instances(void)
{ {
HANDLE server[2], client; HANDLE server[4], client;
int i; int i;
BOOL ret; BOOL ret;
OVERLAPPED ov; OVERLAPPED ov;
if(!pCancelIoEx)
{
win_skip("Skiping multiple instance tests on too old Windows\n");
return;
}
for (i = 0; i < ARRAY_SIZE(server); i++) for (i = 0; i < ARRAY_SIZE(server); i++)
{ {
server[i] = CreateNamedPipeA(PIPENAME, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, server[i] = CreateNamedPipeA(PIPENAME, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
PIPE_READMODE_BYTE | PIPE_WAIT, 2, 1024, 1024, PIPE_READMODE_BYTE | PIPE_WAIT, ARRAY_SIZE(server), 1024, 1024,
NMPWAIT_USE_DEFAULT_WAIT, NULL); NMPWAIT_USE_DEFAULT_WAIT, NULL);
ok(server[i] != INVALID_HANDLE_VALUE, "got invalid handle\n"); ok(server[i] != INVALID_HANDLE_VALUE, "got invalid handle\n");
} }
@ -3689,10 +3695,10 @@ static void test_multiple_instances(void)
client = CreateFileA(PIPENAME, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, 0); client = CreateFileA(PIPENAME, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, 0);
ok(client != INVALID_HANDLE_VALUE, "got invalid handle\n"); ok(client != INVALID_HANDLE_VALUE, "got invalid handle\n");
/* Show that this has connected to server[0] not server[1] */ /* Show that this has connected to server[0] not any other one */
memset(&ov, 0, sizeof(ov)); memset(&ov, 0, sizeof(ov));
ret = ConnectNamedPipe(server[1], &ov); ret = ConnectNamedPipe(server[2], &ov);
ok(ret == FALSE, "got %d\n", ret); ok(ret == FALSE, "got %d\n", ret);
ok(GetLastError() == ERROR_IO_PENDING, "got %d\n", GetLastError()); ok(GetLastError() == ERROR_IO_PENDING, "got %d\n", GetLastError());
@ -3701,11 +3707,105 @@ static void test_multiple_instances(void)
ok(ret == FALSE, "got %d\n", ret); ok(ret == FALSE, "got %d\n", ret);
ok(GetLastError() == ERROR_PIPE_CONNECTED, "got %d\n", GetLastError()); ok(GetLastError() == ERROR_PIPE_CONNECTED, "got %d\n", GetLastError());
DisconnectNamedPipe(server[1]);
DisconnectNamedPipe(server[0]);
CloseHandle(client); CloseHandle(client);
CloseHandle(server[1]);
CloseHandle(server[0]); /* The next connected server is server[1], doesn't matter that server[2] has pending listeners */
client = CreateFileA(PIPENAME, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, 0);
ok(client != INVALID_HANDLE_VALUE, "got invalid handle\n");
memset(&ov, 0, sizeof(ov));
ret = ConnectNamedPipe(server[2], &ov);
ok(ret == FALSE, "got %d\n", ret);
ok(GetLastError() == ERROR_IO_PENDING, "got %d\n", GetLastError());
memset(&ov, 0, sizeof(ov));
ret = ConnectNamedPipe(server[1], &ov);
ok(ret == FALSE, "got %d\n", ret);
ok(GetLastError() == ERROR_PIPE_CONNECTED, "got %d\n", GetLastError());
CloseHandle(client);
/* server[2] is connected next */
client = CreateFileA(PIPENAME, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, 0);
ok(client != INVALID_HANDLE_VALUE, "got invalid handle\n");
memset(&ov, 0, sizeof(ov));
ret = ConnectNamedPipe(server[2], &ov);
ok(ret == FALSE, "got %d\n", ret);
ok(GetLastError() == ERROR_PIPE_CONNECTED, "got %d\n", GetLastError());
CloseHandle(client);
/* Disconnect in order server[0] and server[2] */
DisconnectNamedPipe(server[0]);
DisconnectNamedPipe(server[2]);
/* Put into listening state server[2] and server[0] */
memset(&ov, 0, sizeof(ov));
ret = ConnectNamedPipe(server[2], &ov);
ok(ret == FALSE, "got %d\n", ret);
ok(GetLastError() == ERROR_IO_PENDING, "got %d\n", GetLastError());
memset(&ov, 0, sizeof(ov));
ret = ConnectNamedPipe(server[0], &ov);
ok(ret == FALSE, "got %d\n", ret);
ok(GetLastError() == ERROR_IO_PENDING, "got %d\n", GetLastError());
/* server[3] is connected next */
client = CreateFileA(PIPENAME, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, 0);
ok(client != INVALID_HANDLE_VALUE, "got invalid handle\n");
memset(&ov, 0, sizeof(ov));
ret = ConnectNamedPipe(server[3], &ov);
ok(ret == FALSE, "got %d\n", ret);
ok(GetLastError() == ERROR_PIPE_CONNECTED, "got %d\n", GetLastError());
CloseHandle(client);
/* server[2], which stasted listening first, will be connected next */
client = CreateFileA(PIPENAME, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, 0);
ok(client != INVALID_HANDLE_VALUE, "got invalid handle\n");
memset(&ov, 0, sizeof(ov));
ret = ConnectNamedPipe(server[2], &ov);
ok(ret == FALSE, "got %d\n", ret);
ok(GetLastError() == ERROR_PIPE_CONNECTED, "got %d\n", GetLastError());
memset(&ov, 0, sizeof(ov));
ret = ConnectNamedPipe(server[0], &ov);
ok(ret == FALSE, "got %d\n", ret);
ok(GetLastError() == ERROR_IO_PENDING, "got %d\n", GetLastError());
CloseHandle(client);
/* Finally server[0] is connected */
client = CreateFileA(PIPENAME, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, 0);
ok(client != INVALID_HANDLE_VALUE, "got invalid handle\n");
memset(&ov, 0, sizeof(ov));
ret = ConnectNamedPipe(server[0], &ov);
ok(ret == FALSE, "got %d\n", ret);
ok(GetLastError() == ERROR_PIPE_CONNECTED, "got %d\n", GetLastError());
CloseHandle(client);
/* No more listening pipes available */
DisconnectNamedPipe(server[0]);
client = CreateFileA(PIPENAME, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, 0);
ok(client == INVALID_HANDLE_VALUE && GetLastError() == ERROR_PIPE_BUSY, "got %p(%u)\n", client, GetLastError());
for (i = 0; i < ARRAY_SIZE(server); i++)
{
DisconnectNamedPipe(server[i]);
CloseHandle(server[i]);
}
} }
START_TEST(pipe) START_TEST(pipe)

View File

@ -71,7 +71,7 @@ struct pipe_end
struct pipe_server struct pipe_server
{ {
struct pipe_end pipe_end; /* common header for both pipe ends */ struct pipe_end pipe_end; /* common header for both pipe ends */
struct list entry; /* entry in named pipe servers list */ struct list entry; /* entry in named pipe listeners list */
unsigned int options; /* pipe options */ unsigned int options; /* pipe options */
struct async_queue listen_q; /* listen queue */ struct async_queue listen_q; /* listen queue */
}; };
@ -86,7 +86,7 @@ struct named_pipe
unsigned int insize; unsigned int insize;
unsigned int instances; unsigned int instances;
timeout_t timeout; timeout_t timeout;
struct list servers; /* list of servers using this pipe */ struct list listeners; /* list of servers listening on this pipe */
struct async_queue waiters; /* list of clients waiting to connect */ struct async_queue waiters; /* list of clients waiting to connect */
}; };
@ -335,7 +335,7 @@ static void named_pipe_destroy( struct object *obj)
{ {
struct named_pipe *pipe = (struct named_pipe *) obj; struct named_pipe *pipe = (struct named_pipe *) obj;
assert( list_empty( &pipe->servers ) ); assert( list_empty( &pipe->listeners ) );
assert( !pipe->instances ); assert( !pipe->instances );
free_async_queue( &pipe->waiters ); free_async_queue( &pipe->waiters );
} }
@ -436,7 +436,8 @@ static void pipe_server_destroy( struct object *obj )
assert( pipe->instances ); assert( pipe->instances );
if (!--pipe->instances) unlink_named_object( &pipe->obj ); if (!--pipe->instances) unlink_named_object( &pipe->obj );
list_remove( &server->entry ); if (server->pipe_end.state == FILE_PIPE_LISTENING_STATE)
list_remove( &server->entry );
free_async_queue( &server->listen_q ); free_async_queue( &server->listen_q );
pipe_end_destroy( obj ); pipe_end_destroy( obj );
@ -1076,7 +1077,10 @@ static int pipe_server_ioctl( struct fd *fd, ioctl_code_t code, struct async *as
switch(server->pipe_end.state) switch(server->pipe_end.state)
{ {
case FILE_PIPE_LISTENING_STATE: case FILE_PIPE_LISTENING_STATE:
break;
case FILE_PIPE_DISCONNECTED_STATE: case FILE_PIPE_DISCONNECTED_STATE:
server->pipe_end.state = FILE_PIPE_LISTENING_STATE;
list_add_tail( &server->pipe_end.pipe->listeners, &server->entry );
break; break;
case FILE_PIPE_CONNECTED_STATE: case FILE_PIPE_CONNECTED_STATE:
set_error( STATUS_PIPE_CONNECTED ); set_error( STATUS_PIPE_CONNECTED );
@ -1086,7 +1090,6 @@ static int pipe_server_ioctl( struct fd *fd, ioctl_code_t code, struct async *as
return 0; return 0;
} }
server->pipe_end.state = FILE_PIPE_LISTENING_STATE;
queue_async( &server->listen_q, async ); queue_async( &server->listen_q, async );
async_wake_up( &server->pipe_end.pipe->waiters, STATUS_SUCCESS ); async_wake_up( &server->pipe_end.pipe->waiters, STATUS_SUCCESS );
set_error( STATUS_PENDING ); set_error( STATUS_PENDING );
@ -1162,7 +1165,7 @@ static struct pipe_server *create_pipe_server( struct named_pipe *pipe, unsigned
server->pipe_end.server_pid = get_process_id( current->process ); server->pipe_end.server_pid = get_process_id( current->process );
init_async_queue( &server->listen_q ); init_async_queue( &server->listen_q );
list_add_tail( &pipe->servers, &server->entry ); list_add_tail( &pipe->listeners, &server->entry );
if (!(server->pipe_end.fd = alloc_pseudo_fd( &pipe_server_fd_ops, &server->pipe_end.obj, options ))) if (!(server->pipe_end.fd = alloc_pseudo_fd( &pipe_server_fd_ops, &server->pipe_end.obj, options )))
{ {
release_object( server ); release_object( server );
@ -1197,27 +1200,6 @@ static struct pipe_end *create_pipe_client( struct named_pipe *pipe, data_size_t
return client; return client;
} }
static struct pipe_server *find_available_server( struct named_pipe *pipe )
{
struct pipe_server *server;
/* look for pipe servers that are listening */
LIST_FOR_EACH_ENTRY( server, &pipe->servers, struct pipe_server, entry )
{
if (server->pipe_end.state == FILE_PIPE_LISTENING_STATE && async_queued( &server->listen_q ))
return (struct pipe_server *)grab_object( server );
}
/* fall back to pipe servers that are idle */
LIST_FOR_EACH_ENTRY( server, &pipe->servers, struct pipe_server, entry )
{
if (server->pipe_end.state == FILE_PIPE_LISTENING_STATE )
return (struct pipe_server *)grab_object( server );
}
return NULL;
}
static int named_pipe_link_name( struct object *obj, struct object_name *name, struct object *parent ) static int named_pipe_link_name( struct object *obj, struct object_name *name, struct object *parent )
{ {
struct named_pipe_device *dev = (struct named_pipe_device *)parent; struct named_pipe_device *dev = (struct named_pipe_device *)parent;
@ -1240,18 +1222,18 @@ static struct object *named_pipe_open_file( struct object *obj, unsigned int acc
struct pipe_end *client; struct pipe_end *client;
unsigned int pipe_sharing; unsigned int pipe_sharing;
if (!(server = find_available_server( pipe ))) if (list_empty( &pipe->listeners ))
{ {
set_error( STATUS_PIPE_NOT_AVAILABLE ); set_error( STATUS_PIPE_NOT_AVAILABLE );
return NULL; return NULL;
} }
server = LIST_ENTRY( list_head( &pipe->listeners ), struct pipe_server, entry );
pipe_sharing = pipe->sharing; pipe_sharing = pipe->sharing;
if (((access & GENERIC_READ) && !(pipe_sharing & FILE_SHARE_READ)) || if (((access & GENERIC_READ) && !(pipe_sharing & FILE_SHARE_READ)) ||
((access & GENERIC_WRITE) && !(pipe_sharing & FILE_SHARE_WRITE))) ((access & GENERIC_WRITE) && !(pipe_sharing & FILE_SHARE_WRITE)))
{ {
set_error( STATUS_ACCESS_DENIED ); set_error( STATUS_ACCESS_DENIED );
release_object( server );
return NULL; return NULL;
} }
@ -1263,8 +1245,8 @@ static struct object *named_pipe_open_file( struct object *obj, unsigned int acc
client->connection = &server->pipe_end; client->connection = &server->pipe_end;
server->pipe_end.client_pid = client->client_pid; server->pipe_end.client_pid = client->client_pid;
client->server_pid = server->pipe_end.server_pid; client->server_pid = server->pipe_end.server_pid;
list_remove( &server->entry );
} }
release_object( server );
return &client->obj; return &client->obj;
} }
@ -1279,7 +1261,6 @@ static int named_pipe_device_ioctl( struct fd *fd, ioctl_code_t code, struct asy
const FILE_PIPE_WAIT_FOR_BUFFER *buffer = get_req_data(); const FILE_PIPE_WAIT_FOR_BUFFER *buffer = get_req_data();
data_size_t size = get_req_data_size(); data_size_t size = get_req_data_size();
struct named_pipe *pipe; struct named_pipe *pipe;
struct pipe_server *server;
struct unicode_str name; struct unicode_str name;
timeout_t when; timeout_t when;
@ -1293,19 +1274,16 @@ static int named_pipe_device_ioctl( struct fd *fd, ioctl_code_t code, struct asy
name.len = (buffer->NameLength / sizeof(WCHAR)) * sizeof(WCHAR); name.len = (buffer->NameLength / sizeof(WCHAR)) * sizeof(WCHAR);
if (!(pipe = open_named_object( &device->obj, &named_pipe_ops, &name, 0 ))) return 0; if (!(pipe = open_named_object( &device->obj, &named_pipe_ops, &name, 0 ))) return 0;
if (!(server = find_available_server( pipe ))) if (list_empty( &pipe->listeners ))
{ {
queue_async( &pipe->waiters, async ); queue_async( &pipe->waiters, async );
when = buffer->TimeoutSpecified ? buffer->Timeout.QuadPart : pipe->timeout; when = buffer->TimeoutSpecified ? buffer->Timeout.QuadPart : pipe->timeout;
async_set_timeout( async, when, STATUS_IO_TIMEOUT ); async_set_timeout( async, when, STATUS_IO_TIMEOUT );
release_object( pipe );
set_error( STATUS_PENDING ); set_error( STATUS_PENDING );
return 1;
} }
release_object( server );
release_object( pipe ); release_object( pipe );
return 0; return 1;
} }
default: default:
@ -1353,7 +1331,7 @@ DECL_HANDLER(create_named_pipe)
/* initialize it if it didn't already exist */ /* initialize it if it didn't already exist */
pipe->instances = 0; pipe->instances = 0;
init_async_queue( &pipe->waiters ); init_async_queue( &pipe->waiters );
list_init( &pipe->servers ); list_init( &pipe->listeners );
pipe->insize = req->insize; pipe->insize = req->insize;
pipe->outsize = req->outsize; pipe->outsize = req->outsize;
pipe->maxinstances = req->maxinstances; pipe->maxinstances = req->maxinstances;