diff --git a/server/async.c b/server/async.c index 4e30c2e6f85..e113681bb09 100644 --- a/server/async.c +++ b/server/async.c @@ -474,6 +474,15 @@ struct iosb *async_get_iosb( struct async *async ) return async->iosb ? (struct iosb *)grab_object( async->iosb ) : NULL; } +/* find the first pending async in queue */ +struct async *find_pending_async( struct async_queue *queue ) +{ + struct async *async; + if (queue) LIST_FOR_EACH_ENTRY( async, &queue->queue, struct async, queue_entry ) + if (async->status == STATUS_PENDING) return (struct async *)grab_object( async ); + return NULL; +} + /* cancels all async I/O */ DECL_HANDLER(cancel_async) { diff --git a/server/file.h b/server/file.h index 8e906d3ebf8..398733c460c 100644 --- a/server/file.h +++ b/server/file.h @@ -188,6 +188,7 @@ extern struct completion *fd_get_completion( struct fd *fd, apc_param_t *p_key ) extern void fd_copy_completion( struct fd *src, struct fd *dst ); extern struct iosb *create_iosb( const void *in_data, data_size_t in_size, data_size_t out_size ); extern struct iosb *async_get_iosb( struct async *async ); +extern struct async *find_pending_async( struct async_queue *queue ); extern void cancel_process_asyncs( struct process *process ); /* access rights that require Unix read permission */ diff --git a/server/named_pipe.c b/server/named_pipe.c index 85521e0953b..e83f7736691 100644 --- a/server/named_pipe.c +++ b/server/named_pipe.c @@ -81,6 +81,7 @@ struct pipe_end struct pipe_end *connection; /* the other end of the pipe */ data_size_t buffer_size;/* size of buffered data that doesn't block caller */ struct list message_queue; + struct async_queue *read_q; /* read queue */ struct async_queue *write_q; /* write queue */ }; @@ -425,6 +426,7 @@ static void pipe_end_disconnect( struct pipe_end *pipe_end, unsigned int status struct pipe_message *message, *next; struct async *async; if (pipe_end->fd) fd_async_wake_up( pipe_end->fd, ASYNC_TYPE_WAIT, status ); + async_wake_up( pipe_end->read_q, status ); LIST_FOR_EACH_ENTRY_SAFE( message, next, &pipe_end->message_queue, struct pipe_message, entry ) { async = message->async; @@ -470,6 +472,7 @@ static void pipe_end_destroy( struct pipe_end *pipe_end ) free_message( message ); } + free_async_queue( pipe_end->read_q ); free_async_queue( pipe_end->write_q ); } @@ -681,10 +684,89 @@ static obj_handle_t pipe_client_flush( struct fd *fd, struct async *async, int b return 0; } +static void message_queue_read( struct pipe_end *pipe_end, struct iosb *iosb ) +{ + struct pipe_message *message; + data_size_t avail = 0; + + LIST_FOR_EACH_ENTRY( message, &pipe_end->message_queue, struct pipe_message, entry ) + { + avail += message->iosb->in_size - message->read_pos; + if (avail >= iosb->out_size) break; + } + iosb->out_size = min( iosb->out_size, avail ); + iosb->status = STATUS_SUCCESS; + + message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry ); + if (!message->read_pos && message->iosb->in_size == iosb->out_size) /* fast path */ + { + iosb->out_data = message->iosb->in_data; + message->iosb->in_data = NULL; + wake_message( message ); + free_message( message ); + } + else + { + data_size_t write_pos = 0, writing; + char *buf = NULL; + + if (iosb->out_size && !(buf = iosb->out_data = malloc( iosb->out_size ))) + { + iosb->out_size = 0; + iosb->status = STATUS_NO_MEMORY; + return; + } + + do + { + message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry ); + writing = min( iosb->out_size - write_pos, message->iosb->in_size - message->read_pos ); + if (writing) memcpy( buf + write_pos, (const char *)message->iosb->in_data + message->read_pos, writing ); + write_pos += writing; + message->read_pos += writing; + if (message->read_pos == message->iosb->in_size) + { + wake_message(message); + free_message(message); + } + } while (write_pos < iosb->out_size); + } + iosb->result = iosb->out_size; +} + /* We call async_terminate in our reselect implementation, which causes recursive reselect. * We're not interested in such reselect calls, so we ignore them. */ static int ignore_reselect; +static void reselect_write_queue( struct pipe_end *pipe_end ); + +static void reselect_read_queue( struct pipe_end *pipe_end ) +{ + struct async *async; + struct iosb *iosb; + int read_done = 0; + + ignore_reselect = 1; + while (!list_empty( &pipe_end->message_queue) && (async = find_pending_async( pipe_end->read_q ))) + { + iosb = async_get_iosb( async ); + message_queue_read( pipe_end, iosb ); + async_terminate( async, iosb->result ? STATUS_ALERTED : iosb->status ); + release_object( async ); + release_object( iosb ); + read_done = 1; + } + ignore_reselect = 0; + + if (pipe_end->connection) + { + if (list_empty( &pipe_end->message_queue )) + fd_async_wake_up( pipe_end->connection->fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS ); + else if (read_done) + reselect_write_queue( pipe_end->connection ); + } +} + static void reselect_write_queue( struct pipe_end *pipe_end ) { struct pipe_message *message, *next; @@ -777,6 +859,8 @@ static void pipe_end_reselect_async( struct fd *fd, struct async_queue *queue ) default_fd_reselect_async( fd, queue ); else if (pipe_end->write_q && pipe_end->write_q == queue) reselect_write_queue( pipe_end ); + else if (pipe_end->read_q && pipe_end->read_q == queue) + reselect_read_queue( pipe_end ); } static inline int is_overlapped( unsigned int options ) @@ -937,6 +1021,7 @@ static void init_pipe_end( struct pipe_end *pipe_end, unsigned int pipe_flags, d pipe_end->flags = pipe_flags; pipe_end->connection = NULL; pipe_end->buffer_size = buffer_size; + pipe_end->read_q = NULL; pipe_end->write_q = NULL; list_init( &pipe_end->message_queue ); }