From ef8b94622a842b6c745cc093ff925b49b94a8373 Mon Sep 17 00:00:00 2001 From: Mike McCormack Date: Thu, 15 May 2003 04:22:45 +0000 Subject: [PATCH] - rewrite of the named pipe code - allow NtFileFlushBuffers to wait - allow DisconnectNamedPipe to invalidate client cached fd - fix the pipe test now that one extra test passes --- dlls/kernel/sync.c | 1 + dlls/kernel/tests/pipe.c | 5 +- dlls/ntdll/file.c | 8 + include/wine/server_protocol.h | 4 +- server/fd.c | 9 +- server/file.c | 4 +- server/file.h | 5 +- server/handle.c | 14 + server/named_pipe.c | 738 +++++++++++++++++++++++---------- server/protocol.def | 4 + server/serial.c | 4 +- server/trace.c | 14 +- 12 files changed, 587 insertions(+), 223 deletions(-) diff --git a/dlls/kernel/sync.c b/dlls/kernel/sync.c index b30f60bd4de..90fd5b9e0eb 100644 --- a/dlls/kernel/sync.c +++ b/dlls/kernel/sync.c @@ -754,6 +754,7 @@ BOOL WINAPI DisconnectNamedPipe(HANDLE hPipe) { req->handle = hPipe; ret = !wine_server_call_err( req ); + if (ret && reply->fd != -1) close( reply->fd ); } SERVER_END_REQ; diff --git a/dlls/kernel/tests/pipe.c b/dlls/kernel/tests/pipe.c index 8f7b0e57380..40b7b748ec0 100644 --- a/dlls/kernel/tests/pipe.c +++ b/dlls/kernel/tests/pipe.c @@ -90,10 +90,7 @@ void test_CreateNamedPipeA(void) hFile = CreateFileA(PIPENAME, GENERIC_READ|GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, 0); - todo_wine - { - ok(hFile != INVALID_HANDLE_VALUE, "CreateFile failed"); - } + ok(hFile != INVALID_HANDLE_VALUE, "CreateFile failed"); /* don't try to do i/o if one side couldn't be opened, as it hangs */ if (hFile != INVALID_HANDLE_VALUE) { diff --git a/dlls/ntdll/file.c b/dlls/ntdll/file.c index a111cf70642..4f390dfc95c 100644 --- a/dlls/ntdll/file.c +++ b/dlls/ntdll/file.c @@ -498,11 +498,19 @@ NTSTATUS WINAPI NtQueryVolumeInformationFile ( NTSTATUS WINAPI NtFlushBuffersFile( HANDLE hFile, IO_STATUS_BLOCK* IoStatusBlock ) { NTSTATUS ret; + HANDLE hEvent = NULL; + SERVER_START_REQ( flush_file ) { req->handle = hFile; ret = wine_server_call( req ); + hEvent = reply->event; } SERVER_END_REQ; + if( !ret && hEvent ) + { + ret = NtWaitForSingleObject( hEvent, FALSE, NULL ); + NtClose( hEvent ); + } return ret; } diff --git a/include/wine/server_protocol.h b/include/wine/server_protocol.h index 36accfb8d6d..e8365bcda56 100644 --- a/include/wine/server_protocol.h +++ b/include/wine/server_protocol.h @@ -876,6 +876,7 @@ struct flush_file_request struct flush_file_reply { struct reply_header __header; + obj_handle_t event; }; @@ -2464,6 +2465,7 @@ struct disconnect_named_pipe_request struct disconnect_named_pipe_reply { struct reply_header __header; + int fd; }; @@ -3581,6 +3583,6 @@ union generic_reply struct get_next_hook_reply get_next_hook_reply; }; -#define SERVER_PROTOCOL_VERSION 105 +#define SERVER_PROTOCOL_VERSION 106 #endif /* __WINE_WINE_SERVER_PROTOCOL_H */ diff --git a/server/fd.c b/server/fd.c index 3ca3b74dcf7..072d3496637 100644 --- a/server/fd.c +++ b/server/fd.c @@ -963,7 +963,7 @@ void default_poll_event( struct fd *fd, int event ) } /* default flush() routine */ -int no_flush( struct fd *fd ) +int no_flush( struct fd *fd, struct event **event ) { set_error( STATUS_OBJECT_TYPE_MISMATCH ); return 0; @@ -1002,10 +1002,15 @@ static struct fd *get_handle_fd_obj( struct process *process, obj_handle_t handl DECL_HANDLER(flush_file) { struct fd *fd = get_handle_fd_obj( current->process, req->handle, 0 ); + struct event * event = NULL; if (fd) { - fd->fd_ops->flush( fd ); + fd->fd_ops->flush( fd, &event ); + if( event ) + { + reply->event = alloc_handle( current->process, event, SYNCHRONIZE, 0 ); + } release_object( fd ); } } diff --git a/server/file.c b/server/file.c index 456d94dc4c7..0ed7ae50698 100644 --- a/server/file.c +++ b/server/file.c @@ -72,7 +72,7 @@ static void file_destroy( struct object *obj ); static int file_get_poll_events( struct fd *fd ); static void file_poll_event( struct fd *fd, int event ); -static int file_flush( struct fd *fd ); +static int file_flush( struct fd *fd, struct event **event ); static int file_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags ); static void file_queue_async( struct fd *fd, void *ptr, unsigned int status, int type, int count ); @@ -301,7 +301,7 @@ static void file_poll_event( struct fd *fd, int event ) } -static int file_flush( struct fd *fd ) +static int file_flush( struct fd *fd, struct event **event ) { int ret = (fsync( get_unix_fd(fd) ) != -1); if (!ret) file_set_error(); diff --git a/server/file.h b/server/file.h index 10e61ac057b..e558a354bf3 100644 --- a/server/file.h +++ b/server/file.h @@ -35,7 +35,7 @@ struct fd_ops /* a poll() event occured */ void (*poll_event)(struct fd *,int event); /* flush the object buffers */ - int (*flush)(struct fd *); + int (*flush)(struct fd *, struct event **); /* get file information */ int (*get_file_info)(struct fd *,struct get_file_info_reply *, int *flags); /* queue an async operation - see register_async handler in async.c*/ @@ -55,12 +55,13 @@ extern int check_fd_events( struct fd *fd, int events ); extern void set_fd_events( struct fd *fd, int events ); extern obj_handle_t lock_fd( struct fd *fd, file_pos_t offset, file_pos_t count, int shared, int wait ); extern void unlock_fd( struct fd *fd, file_pos_t offset, file_pos_t count ); +extern int flush_cached_fd( struct process *process, obj_handle_t handle ); extern int default_fd_add_queue( struct object *obj, struct wait_queue_entry *entry ); extern void default_fd_remove_queue( struct object *obj, struct wait_queue_entry *entry ); extern int default_fd_signaled( struct object *obj, struct thread *thread ); extern void default_poll_event( struct fd *fd, int event ); -extern int no_flush( struct fd *fd ); +extern int no_flush( struct fd *fd, struct event **event ); extern int no_get_file_info( struct fd *fd, struct get_file_info_reply *info, int *flags ); extern void no_queue_async( struct fd *fd, void* ptr, unsigned int status, int type, int count ); extern void main_loop(void); diff --git a/server/handle.c b/server/handle.c index 4a2b4ef57eb..ff9e54605c8 100644 --- a/server/handle.c +++ b/server/handle.c @@ -397,6 +397,20 @@ int get_handle_unix_fd( struct process *process, obj_handle_t handle, unsigned i return entry->fd; } +/* remove the cached fd and return it */ +int flush_cached_fd( struct process *process, obj_handle_t handle ) +{ + struct handle_entry *entry = get_handle( process, handle ); + int fd = -1; + + if (entry) + { + fd = entry->fd; + entry->fd = -1; + } + return fd; +} + /* find the first inherited handle of the given type */ /* this is needed for window stations and desktops (don't ask...) */ obj_handle_t find_inherited_handle( struct process *process, const struct object_ops *ops ) diff --git a/server/named_pipe.c b/server/named_pipe.c index 780fcc69a1b..7f517748a93 100644 --- a/server/named_pipe.c +++ b/server/named_pipe.c @@ -19,7 +19,7 @@ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * TODO: - * improve error handling + * message mode */ #include "config.h" @@ -50,26 +50,47 @@ enum pipe_state ps_none, ps_idle_server, ps_wait_open, - ps_wait_connect, ps_connected_server, - ps_connected_client, - ps_disconnected + ps_wait_disconnect, + ps_disconnected_server, + ps_wait_connect +}; + +struct wait_info +{ + struct thread *thread; + void *func; + void *overlapped; }; struct named_pipe; -struct pipe_user +struct pipe_server { - struct object obj; - struct fd *fd; - enum pipe_state state; - struct pipe_user *other; - struct named_pipe *pipe; - struct pipe_user *next; - struct pipe_user *prev; - struct thread *thread; - void *func; - void *overlapped; + struct object obj; + struct fd *fd; + enum pipe_state state; + struct pipe_client *client; + struct named_pipe *pipe; + struct pipe_server *next; + struct pipe_server *prev; + struct timeout_user *flush_poll; + struct event *event; + struct wait_info wait; +}; + +struct pipe_client +{ + struct object obj; + struct fd *fd; + struct pipe_server *server; + struct wait_info wait; +}; + +struct connect_wait +{ + struct wait_info wait; + struct connect_wait *next; }; struct named_pipe @@ -80,11 +101,13 @@ struct named_pipe unsigned int outsize; unsigned int insize; unsigned int timeout; - struct pipe_user *users; + unsigned int instances; + struct pipe_server *servers; + struct connect_wait *connect_waiters; }; static void named_pipe_dump( struct object *obj, int verbose ); -static void named_pipe_destroy( struct object *obj); +static void named_pipe_destroy( struct object *obj ); static const struct object_ops named_pipe_ops = { @@ -98,120 +121,356 @@ static const struct object_ops named_pipe_ops = named_pipe_destroy /* destroy */ }; -static void pipe_user_dump( struct object *obj, int verbose ); -static struct fd *pipe_user_get_fd( struct object *obj ); -static void pipe_user_destroy( struct object *obj); +/* common to clients and servers */ +static int pipe_end_get_poll_events( struct fd *fd ); +static int pipe_end_get_info( struct fd *fd, + struct get_file_info_reply *reply, int *flags ); -static int pipe_user_get_poll_events( struct fd *fd ); -static int pipe_user_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags ); +/* server end functions */ +static void pipe_server_dump( struct object *obj, int verbose ); +static struct fd *pipe_server_get_fd( struct object *obj ); +static void pipe_server_destroy( struct object *obj); +static int pipe_server_flush( struct fd *fd, struct event **event ); -static const struct object_ops pipe_user_ops = +static const struct object_ops pipe_server_ops = { - sizeof(struct pipe_user), /* size */ - pipe_user_dump, /* dump */ + sizeof(struct pipe_server), /* size */ + pipe_server_dump, /* dump */ default_fd_add_queue, /* add_queue */ default_fd_remove_queue, /* remove_queue */ default_fd_signaled, /* signaled */ no_satisfied, /* satisfied */ - pipe_user_get_fd, /* get_fd */ - pipe_user_destroy /* destroy */ + pipe_server_get_fd, /* get_fd */ + pipe_server_destroy /* destroy */ }; -static const struct fd_ops pipe_user_fd_ops = +static const struct fd_ops pipe_server_fd_ops = { - pipe_user_get_poll_events, /* get_poll_events */ + pipe_end_get_poll_events, /* get_poll_events */ default_poll_event, /* poll_event */ - no_flush, /* flush */ - pipe_user_get_info, /* get_file_info */ + pipe_server_flush, /* flush */ + pipe_end_get_info, /* get_file_info */ + no_queue_async /* queue_async */ +}; + +/* client end functions */ +static void pipe_client_dump( struct object *obj, int verbose ); +static struct fd *pipe_client_get_fd( struct object *obj ); +static void pipe_client_destroy( struct object *obj ); +static int pipe_client_flush( struct fd *fd, struct event **event ); + +static const struct object_ops pipe_client_ops = +{ + sizeof(struct pipe_client), /* size */ + pipe_client_dump, /* dump */ + default_fd_add_queue, /* add_queue */ + default_fd_remove_queue, /* remove_queue */ + default_fd_signaled, /* signaled */ + no_satisfied, /* satisfied */ + pipe_client_get_fd, /* get_fd */ + pipe_client_destroy /* destroy */ +}; + +static const struct fd_ops pipe_client_fd_ops = +{ + pipe_end_get_poll_events, /* get_poll_events */ + default_poll_event, /* poll_event */ + pipe_client_flush, /* flush */ + pipe_end_get_info, /* get_file_info */ no_queue_async /* queue_async */ }; static void named_pipe_dump( struct object *obj, int verbose ) { - struct named_pipe *pipe = (struct named_pipe *)obj; + struct named_pipe *pipe = (struct named_pipe *) obj; assert( obj->ops == &named_pipe_ops ); fprintf( stderr, "named pipe %p\n" ,pipe); } -static void pipe_user_dump( struct object *obj, int verbose ) +static void pipe_server_dump( struct object *obj, int verbose ) { - struct pipe_user *user = (struct pipe_user *)obj; - assert( obj->ops == &pipe_user_ops ); - fprintf( stderr, "named pipe user %p (state %d)\n", user, user->state ); + struct pipe_server *server = (struct pipe_server *) obj; + assert( obj->ops == &pipe_server_ops ); + fprintf( stderr, "named pipe server %p (state %d)\n", + server, server->state ); +} + +static void pipe_client_dump( struct object *obj, int verbose ) +{ + struct pipe_client *client = (struct pipe_client *) obj; + assert( obj->ops == &pipe_server_ops ); + fprintf( stderr, "named pipe client %p (server state %d)\n", + client, client->server->state ); } static void named_pipe_destroy( struct object *obj) { - struct named_pipe *pipe = (struct named_pipe *)obj; - assert( !pipe->users ); + struct named_pipe *pipe = (struct named_pipe *) obj; + assert( !pipe->servers ); + assert( !pipe->instances ); } -static void notify_waiter( struct pipe_user *user, unsigned int status) +static void notify_waiter( struct wait_info *wait, unsigned int status ) { - if(user->thread && user->func && user->overlapped) + if( wait->thread && wait->func && wait->overlapped ) { /* queue a system APC, to notify a waiting thread */ - thread_queue_apc(user->thread, NULL, user->func, APC_ASYNC, 1, - user->overlapped, (void *)status, NULL); + thread_queue_apc( wait->thread, NULL, wait->func, APC_ASYNC, + 1, wait->overlapped, (void *)status, NULL ); } - if (user->thread) release_object(user->thread); - user->thread = NULL; - user->func = NULL; - user->overlapped=NULL; + if( wait->thread ) release_object( wait->thread ); + wait->thread = NULL; } -static struct fd *pipe_user_get_fd( struct object *obj ) +static void set_waiter( struct wait_info *wait, void *func, void *ov ) { - struct pipe_user *user = (struct pipe_user *)obj; - if (user->fd) return (struct fd *)grab_object( user->fd ); + wait->thread = (struct thread *) grab_object( current ); + wait->func = func; + wait->overlapped = ov; +} + +static void notify_connect_waiters( struct named_pipe *pipe ) +{ + struct connect_wait *cw, **x = &pipe->connect_waiters; + + while( *x ) + { + cw = *x; + notify_waiter( &cw->wait, STATUS_SUCCESS ); + release_object( pipe ); + *x = cw->next; + free( cw ); + } +} + +static void queue_connect_waiter( struct named_pipe *pipe, + void *func, void *overlapped ) +{ + struct connect_wait *waiter; + + waiter = mem_alloc( sizeof *waiter ); + if( waiter ) + { + set_waiter( &waiter->wait, func, overlapped ); + waiter->next = pipe->connect_waiters; + pipe->connect_waiters = waiter; + grab_object( pipe ); + } +} + +static struct fd *pipe_client_get_fd( struct object *obj ) +{ + struct pipe_client *client = (struct pipe_client *) obj; + if( client->fd ) + return (struct fd *) grab_object( client->fd ); set_error( STATUS_PIPE_DISCONNECTED ); return NULL; } -static void pipe_user_destroy( struct object *obj) +static struct fd *pipe_server_get_fd( struct object *obj ) { - struct pipe_user *user = (struct pipe_user *)obj; + struct pipe_server *server = (struct pipe_server *) obj; - assert( obj->ops == &pipe_user_ops ); - - if(user->overlapped) - notify_waiter(user,STATUS_HANDLES_CLOSED); - - if(user->other) + switch(server->state) { - release_object( user->other->fd ); - user->other->fd = NULL; - switch(user->other->state) - { - case ps_connected_server: - user->other->state = ps_idle_server; - break; - case ps_connected_client: - user->other->state = ps_disconnected; - break; - default: - fprintf(stderr,"connected pipe has strange state %d!\n", - user->other->state); - } - user->other->other=NULL; - user->other = NULL; - } + case ps_connected_server: + case ps_wait_disconnect: + assert( server->fd ); + return (struct fd *) grab_object( server->fd ); - /* remove user from pipe's user list */ - if (user->next) user->next->prev = user->prev; - if (user->prev) user->prev->next = user->next; - else user->pipe->users = user->next; - if (user->thread) release_object(user->thread); - release_object(user->pipe); - if (user->fd) release_object( user->fd ); + case ps_wait_open: + case ps_idle_server: + set_error( STATUS_PIPE_LISTENING ); + break; + + case ps_disconnected_server: + case ps_wait_connect: + set_error( STATUS_PIPE_DISCONNECTED ); + break; + + default: + assert( 0 ); + } + return NULL; } -static int pipe_user_get_poll_events( struct fd *fd ) + +static void notify_empty( struct pipe_server *server ) +{ + if( !server->flush_poll ) + return; + assert( server->state == ps_connected_server ); + assert( server->event ); + remove_timeout_user( server->flush_poll ); + server->flush_poll = NULL; + set_event( server->event ); + release_object( server->event ); + server->event = NULL; +} + +static void do_disconnect( struct pipe_server *server ) +{ + /* we may only have a server fd, if the client disconnected */ + if( server->client ) + { + assert( server->client->server == server ); + assert( server->client->fd ); + release_object( server->client->fd ); + server->client->fd = NULL; + } + assert( server->fd ); + release_object( server->fd ); + server->fd = NULL; +} + +static void pipe_server_destroy( struct object *obj) +{ + struct pipe_server *server = (struct pipe_server *)obj; + + assert( obj->ops == &pipe_server_ops ); + + if( server->fd ) + { + notify_empty( server ); + do_disconnect( server ); + } + + if( server->client ) + { + server->client->server = NULL; + server->client = NULL; + } + + notify_waiter( &server->wait, STATUS_HANDLES_CLOSED ); + + assert( server->pipe->instances ); + server->pipe->instances--; + + /* remove server from pipe's server list */ + if( server->next ) server->next->prev = server->prev; + if( server->prev ) server->prev->next = server->next; + else server->pipe->servers = server->next; + release_object( server->pipe ); +} + +static void pipe_client_destroy( struct object *obj) +{ + struct pipe_client *client = (struct pipe_client *)obj; + struct pipe_server *server = client->server; + + assert( obj->ops == &pipe_client_ops ); + + notify_waiter( &client->wait, STATUS_HANDLES_CLOSED ); + + if( server ) + { + notify_empty( server ); + + switch( server->state ) + { + case ps_connected_server: + /* Don't destroy the server's fd here as we can't + do a successful flush without it. */ + server->state = ps_wait_disconnect; + release_object( client->fd ); + client->fd = NULL; + break; + case ps_disconnected_server: + server->state = ps_wait_connect; + break; + default: + assert( 0 ); + } + assert( server->client ); + server->client = NULL; + client->server = NULL; + } + assert( !client->fd ); +} + +static int pipe_end_get_poll_events( struct fd *fd ) { return POLLIN | POLLOUT; /* FIXME */ } -static int pipe_user_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags ) +static int pipe_data_remaining( struct pipe_server *server ) +{ + struct pollfd pfd; + int fd; + + assert( server->client ); + + fd = get_unix_fd( server->client->fd ); + if( fd < 0 ) + return 0; + pfd.fd = fd; + pfd.events = POLLIN; + pfd.revents = 0; + + if( 0 > poll( &pfd, 1, 0 ) ) + return 0; + + return pfd.revents&POLLIN; +} + +static void check_flushed( void *arg ) +{ + struct pipe_server *server = (struct pipe_server*) arg; + + assert( server->event ); + if( pipe_data_remaining( server ) ) + { + struct timeval tv; + + gettimeofday( &tv, 0 ); + add_timeout( &tv, 100 ); + server->flush_poll = add_timeout_user( &tv, check_flushed, server ); + } + else + notify_empty( server ); +} + +static int pipe_server_flush( struct fd *fd, struct event **event ) +{ + struct pipe_server *server = get_fd_user( fd ); + + if( !server ) + return 0; + + if( server->state != ps_connected_server ) + return 0; + + /* FIXME: if multiple threads flush the same pipe, + maybe should create a list of processes to notify */ + if( server->flush_poll ) + return 0; + + if( pipe_data_remaining( server ) ) + { + struct timeval tv; + + /* this kind of sux - + there's no unix way to be alerted when a pipe becomes empty */ + server->event = create_event( NULL, 0, 0, 0 ); + if( !server->event ) + return 0; + gettimeofday( &tv, 0 ); + add_timeout( &tv, 100 ); + server->flush_poll = add_timeout_user( &tv, check_flushed, server ); + *event = server->event; + } + + return 0; +} + +static int pipe_client_flush( struct fd *fd, struct event **event ) +{ + /* FIXME: what do we have to do for this? */ + return 0; +} + +static int pipe_end_get_info( struct fd *fd, + struct get_file_info_reply *reply, int *flags ) { if (reply) { @@ -234,12 +493,15 @@ static struct named_pipe *create_named_pipe( const WCHAR *name, size_t len ) { struct named_pipe *pipe; - if ((pipe = create_named_object( sync_namespace, &named_pipe_ops, name, len ))) + pipe = create_named_object( sync_namespace, &named_pipe_ops, name, len ); + if( pipe ) { - if (get_error() != STATUS_OBJECT_NAME_COLLISION) + if( get_error() != STATUS_OBJECT_NAME_COLLISION ) { /* initialize it if it didn't already exist */ - pipe->users = 0; + pipe->servers = 0; + pipe->instances = 0; + pipe->connect_waiters = NULL; } } return pipe; @@ -260,65 +522,80 @@ static struct named_pipe *open_named_pipe( const WCHAR *name, size_t len ) return NULL; } -static struct pipe_user *get_pipe_user_obj( struct process *process, obj_handle_t handle, - unsigned int access ) +static struct pipe_server *get_pipe_server_obj( struct process *process, + obj_handle_t handle, unsigned int access ) { - return (struct pipe_user *)get_handle_obj( process, handle, access, &pipe_user_ops ); + struct object *obj; + obj = get_handle_obj( process, handle, access, &pipe_server_ops ); + return (struct pipe_server *) obj; } -static struct pipe_user *create_pipe_user( struct named_pipe *pipe ) +static struct pipe_server *create_pipe_server( struct named_pipe *pipe ) { - struct pipe_user *user; + struct pipe_server *server; - user = alloc_object( &pipe_user_ops ); - if(!user) + server = alloc_object( &pipe_server_ops ); + if( !server ) return NULL; - user->fd = NULL; - user->pipe = pipe; - user->state = ps_none; - user->other = NULL; - user->thread = NULL; - user->func = NULL; - user->overlapped = NULL; + server->fd = NULL; + server->pipe = pipe; + server->state = ps_none; + server->client = NULL; + server->flush_poll = NULL; + server->wait.thread = NULL; - /* add to list of pipe users */ - if ((user->next = pipe->users)) user->next->prev = user; - user->prev = NULL; - pipe->users = user; + /* add to list of pipe servers */ + if ((server->next = pipe->servers)) server->next->prev = server; + server->prev = NULL; + pipe->servers = server; - grab_object(pipe); + grab_object( pipe ); - return user; + return server; } -static struct pipe_user *find_partner(struct named_pipe *pipe, enum pipe_state state) +static struct pipe_client *create_pipe_client( struct pipe_server *server ) { - struct pipe_user *x; + struct pipe_client *client; - for(x = pipe->users; x; x=x->next) - { - if(x->state==state) - break; - } - - if(!x) + client = alloc_object( &pipe_client_ops ); + if( !client ) return NULL; - return (struct pipe_user *)grab_object( x ); + client->fd = NULL; + client->server = server; + client->wait.thread = NULL; + + return client; +} + +static struct pipe_server *find_server( struct named_pipe *pipe, + enum pipe_state state ) +{ + struct pipe_server *x; + + for( x = pipe->servers; x; x = x->next ) + if( x->state == state ) + break; + + if( !x ) + return NULL; + + return (struct pipe_server *) grab_object( x ); } DECL_HANDLER(create_named_pipe) { struct named_pipe *pipe; - struct pipe_user *user; + struct pipe_server *server; reply->handle = 0; pipe = create_named_pipe( get_req_data(), get_req_data_size() ); - if(!pipe) + if( !pipe ) return; - if (get_error() != STATUS_OBJECT_NAME_COLLISION) + if( get_error() != STATUS_OBJECT_NAME_COLLISION ) { pipe->insize = req->insize; pipe->outsize = req->outsize; @@ -326,14 +603,33 @@ DECL_HANDLER(create_named_pipe) pipe->timeout = req->timeout; pipe->pipemode = req->pipemode; } - - user = create_pipe_user( pipe ); - - if(user) + else { - user->state = ps_idle_server; - reply->handle = alloc_handle( current->process, user, GENERIC_READ|GENERIC_WRITE, 0 ); - release_object( user ); + set_error( 0 ); /* clear the name collision */ + if( pipe->maxinstances <= pipe->instances ) + { + set_error( STATUS_PIPE_BUSY ); + release_object( pipe ); + return; + } + if( ( pipe->maxinstances != req->maxinstances ) || + ( pipe->timeout != req->timeout ) || + ( pipe->pipemode != req->pipemode ) ) + { + set_error( STATUS_ACCESS_DENIED ); + release_object( pipe ); + return; + } + } + + server = create_pipe_server( pipe ); + if(server) + { + server->state = ps_idle_server; + reply->handle = alloc_handle( current->process, server, + GENERIC_READ|GENERIC_WRITE, 0 ); + server->pipe->instances++; + release_object( server ); } release_object( pipe ); @@ -341,147 +637,173 @@ DECL_HANDLER(create_named_pipe) DECL_HANDLER(open_named_pipe) { - struct pipe_user *user, *partner; + struct pipe_server *server; + struct pipe_client *client; struct named_pipe *pipe; + int fds[2]; reply->handle = 0; - if (!(pipe = open_named_pipe( get_req_data(), get_req_data_size() ))) + pipe = open_named_pipe( get_req_data(), get_req_data_size() ); + if ( !pipe ) { set_error( STATUS_NO_SUCH_FILE ); return; } - if (!(partner = find_partner(pipe, ps_wait_open))) + + for( server = pipe->servers; server; server = server->next ) + if( ( server->state==ps_idle_server ) || + ( server->state==ps_wait_open ) ) + break; + release_object( pipe ); + + if ( !server ) { - release_object(pipe); set_error( STATUS_PIPE_NOT_AVAILABLE ); return; } - if ((user = create_pipe_user( pipe ))) - { - int fds[2]; - if(!socketpair(PF_UNIX, SOCK_STREAM, 0, fds)) + client = create_pipe_client( server ); + if( client ) + { + if( !socketpair( PF_UNIX, SOCK_STREAM, 0, fds ) ) { - user->fd = create_anonymous_fd( &pipe_user_fd_ops, fds[1], &user->obj ); - partner->fd = create_anonymous_fd( &pipe_user_fd_ops, fds[0], &partner->obj ); - if (user->fd && partner->fd) + assert( !client->fd ); + assert( !server->fd ); + client->fd = create_anonymous_fd( &pipe_server_fd_ops, + fds[1], &client->obj ); + server->fd = create_anonymous_fd( &pipe_server_fd_ops, + fds[0], &server->obj ); + if (client->fd && server->fd) { - notify_waiter(partner,STATUS_SUCCESS); - partner->state = ps_connected_server; - partner->other = user; - user->state = ps_connected_client; - user->other = partner; - reply->handle = alloc_handle( current->process, user, req->access, 0 ); + if( server->state == ps_wait_open ) + notify_waiter( &server->wait, STATUS_SUCCESS ); + assert( !server->wait.thread ); + server->state = ps_connected_server; + server->client = client; + client->server = server; + reply->handle = alloc_handle( current->process, client, + req->access, 0 ); } } - else file_set_error(); + else + file_set_error(); - release_object( user ); + release_object( client ); } - release_object( partner ); - release_object( pipe ); } DECL_HANDLER(connect_named_pipe) { - struct pipe_user *user, *partner; + struct pipe_server *server; - user = get_pipe_user_obj(current->process, req->handle, 0); - if(!user) + server = get_pipe_server_obj(current->process, req->handle, 0); + if(!server) return; - if( user->state != ps_idle_server ) + switch( server->state ) { - set_error(STATUS_PORT_ALREADY_SET); - } - else - { - user->state = ps_wait_open; - user->thread = (struct thread *)grab_object(current); - user->func = req->func; - user->overlapped = req->overlapped; - - /* notify all waiters that a pipe just became available */ - while( (partner = find_partner(user->pipe,ps_wait_connect)) ) - { - notify_waiter(partner,STATUS_SUCCESS); - release_object(partner); - } + case ps_idle_server: + case ps_wait_connect: + assert( !server->fd ); + server->state = ps_wait_open; + set_waiter( &server->wait, req->func, req->overlapped ); + notify_connect_waiters( server->pipe ); + break; + case ps_connected_server: + assert( server->fd ); + set_error( STATUS_PIPE_CONNECTED ); + break; + case ps_disconnected_server: + set_error( STATUS_PIPE_BUSY ); + break; + case ps_wait_disconnect: + set_error( STATUS_NO_DATA_DETECTED ); + break; + default: + set_error( STATUS_INVALID_HANDLE ); + break; } - release_object(user); + release_object(server); } DECL_HANDLER(wait_named_pipe) { struct named_pipe *pipe; - struct pipe_user *partner; + struct pipe_server *server; if (!(pipe = open_named_pipe( get_req_data(), get_req_data_size() ))) { set_error( STATUS_PIPE_NOT_AVAILABLE ); return; } - if( (partner = find_partner(pipe,ps_wait_open)) ) + server = find_server( pipe, ps_wait_open ); + if( server ) { - /* this should use notify_waiter, - but no pipe_user object exists now... */ - thread_queue_apc(current,NULL,req->func, - APC_ASYNC, 1, req->overlapped, STATUS_SUCCESS, NULL); - release_object(partner); + /* there's already a server waiting for a client to connect */ + struct wait_info wait; + set_waiter( &wait, req->func, req->overlapped ); + notify_waiter( &wait, STATUS_SUCCESS ); + release_object( server ); } else - { - struct pipe_user *user; + queue_connect_waiter( pipe, req->func, req->overlapped ); - if( (user = create_pipe_user( pipe )) ) - { - user->state = ps_wait_connect; - user->thread = (struct thread *)grab_object(current); - user->func = req->func; - user->overlapped = req->overlapped; - /* don't release it */ - } - } - release_object(pipe); + release_object( pipe ); } DECL_HANDLER(disconnect_named_pipe) { - struct pipe_user *user; + struct pipe_server *server; - user = get_pipe_user_obj(current->process, req->handle, 0); - if(!user) + reply->fd = -1; + server = get_pipe_server_obj( current->process, req->handle, 0 ); + if( !server ) return; - if( (user->state == ps_connected_server) && - (user->other->state == ps_connected_client) ) + switch( server->state ) { - release_object( user->other->fd ); - user->other->fd = NULL; - user->other->state = ps_disconnected; - user->other->other = NULL; + case ps_connected_server: + assert( server->fd ); + assert( server->client ); + assert( server->client->fd ); - release_object( user->fd ); - user->fd = NULL; - user->state = ps_idle_server; - user->other = NULL; + notify_empty( server ); + notify_waiter( &server->client->wait, STATUS_PIPE_DISCONNECTED ); + + /* Dump the client and server fds, but keep the pointers + around - client loses all waiting data */ + server->state = ps_disconnected_server; + do_disconnect( server ); + reply->fd = flush_cached_fd( current->process, req->handle ); + break; + + case ps_wait_disconnect: + assert( !server->client ); + assert( server->fd ); + do_disconnect( server ); + server->state = ps_wait_connect; + reply->fd = flush_cached_fd( current->process, req->handle ); + break; + + default: + set_error( STATUS_PIPE_DISCONNECTED ); } - release_object(user); + release_object( server ); } DECL_HANDLER(get_named_pipe_info) { - struct pipe_user *user; + struct pipe_server *server; - user = get_pipe_user_obj(current->process, req->handle, 0); - if(!user) + server = get_pipe_server_obj( current->process, req->handle, 0 ); + if(!server) return; - reply->flags = user->pipe->pipemode; - reply->maxinstances = user->pipe->maxinstances; - reply->insize = user->pipe->insize; - reply->outsize = user->pipe->outsize; + reply->flags = server->pipe->pipemode; + reply->maxinstances = server->pipe->maxinstances; + reply->insize = server->pipe->insize; + reply->outsize = server->pipe->outsize; - release_object(user); + release_object(server); } diff --git a/server/protocol.def b/server/protocol.def index 90565f8714b..75438fd8a8a 100644 --- a/server/protocol.def +++ b/server/protocol.def @@ -665,6 +665,8 @@ enum fd_type /* Flush a file buffers */ @REQ(flush_file) obj_handle_t handle; /* handle to the file */ +@REPLY + obj_handle_t event; /* event set when finished */ @END @@ -1739,6 +1741,8 @@ enum message_type /* Disconnect a named pipe */ @REQ(disconnect_named_pipe) obj_handle_t handle; +@REPLY + int fd; /* associated fd to close */ @END diff --git a/server/serial.c b/server/serial.c index 3831b7e2240..b2dca50905c 100644 --- a/server/serial.c +++ b/server/serial.c @@ -58,7 +58,7 @@ static void serial_destroy(struct object *obj); static int serial_get_poll_events( struct fd *fd ); static void serial_poll_event( struct fd *fd, int event ); static int serial_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags ); -static int serial_flush( struct fd *fd ); +static int serial_flush( struct fd *fd, struct event **event ); static void serial_queue_async(struct fd *fd, void *ptr, unsigned int status, int type, int count); struct serial @@ -329,7 +329,7 @@ static void serial_queue_async(struct fd *fd, void *ptr, unsigned int status, in set_fd_events ( fd, serial_get_poll_events( fd )); } -static int serial_flush( struct fd *fd ) +static int serial_flush( struct fd *fd, struct event **event ) { /* MSDN says: If hFile is a handle to a communications device, * the function only flushes the transmit buffer. diff --git a/server/trace.c b/server/trace.c index 93b3de829f8..d45dc53f63a 100644 --- a/server/trace.c +++ b/server/trace.c @@ -877,6 +877,11 @@ static void dump_flush_file_request( const struct flush_file_request *req ) fprintf( stderr, " handle=%p", req->handle ); } +static void dump_flush_file_reply( const struct flush_file_reply *req ) +{ + fprintf( stderr, " event=%p", req->event ); +} + static void dump_get_file_info_request( const struct get_file_info_request *req ) { fprintf( stderr, " handle=%p", req->handle ); @@ -1993,6 +1998,11 @@ static void dump_disconnect_named_pipe_request( const struct disconnect_named_pi fprintf( stderr, " handle=%p", req->handle ); } +static void dump_disconnect_named_pipe_reply( const struct disconnect_named_pipe_reply *req ) +{ + fprintf( stderr, " fd=%d", req->fd ); +} + static void dump_get_named_pipe_info_request( const struct get_named_pipe_info_request *req ) { fprintf( stderr, " handle=%p", req->handle ); @@ -2660,7 +2670,7 @@ static const dump_func reply_dumpers[REQ_NB_REQUESTS] = { (dump_func)dump_set_file_pointer_reply, (dump_func)0, (dump_func)0, - (dump_func)0, + (dump_func)dump_flush_file_reply, (dump_func)dump_get_file_info_reply, (dump_func)dump_lock_file_reply, (dump_func)0, @@ -2755,7 +2765,7 @@ static const dump_func reply_dumpers[REQ_NB_REQUESTS] = { (dump_func)dump_open_named_pipe_reply, (dump_func)0, (dump_func)0, - (dump_func)0, + (dump_func)dump_disconnect_named_pipe_reply, (dump_func)dump_get_named_pipe_info_reply, (dump_func)dump_create_smb_reply, (dump_func)dump_get_smb_info_reply,