server Introduce write queue for server-side named pipe I/O.

Signed-off-by: Jacek Caban <jacek@codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
Jacek Caban 2017-03-21 13:03:26 +01:00 committed by Alexandre Julliard
parent 2fad531cb5
commit 4b2d5171e0
1 changed files with 66 additions and 2 deletions

View File

@ -81,6 +81,7 @@ struct pipe_end
struct pipe_end *connection; /* the other end of the pipe */
data_size_t buffer_size;/* size of buffered data that doesn't block caller */
struct list message_queue;
struct async_queue *write_q; /* write queue */
};
struct pipe_server
@ -154,6 +155,7 @@ static const struct object_ops named_pipe_ops =
/* common server and client pipe end functions */
static void pipe_end_queue_async( struct fd *fd, struct async *async, int type, int count );
static void pipe_end_reselect_async( struct fd *fd, struct async_queue *queue );
/* server end functions */
static void pipe_server_dump( struct object *obj, int verbose );
@ -196,7 +198,7 @@ static const struct fd_ops pipe_server_fd_ops =
pipe_server_flush, /* flush */
pipe_server_ioctl, /* ioctl */
pipe_end_queue_async, /* queue_async */
default_fd_reselect_async /* reselect_async */
pipe_end_reselect_async /* reselect_async */
};
/* client end functions */
@ -239,7 +241,7 @@ static const struct fd_ops pipe_client_fd_ops =
pipe_client_flush, /* flush */
default_fd_ioctl, /* ioctl */
pipe_end_queue_async, /* queue_async */
default_fd_reselect_async /* reselect_async */
pipe_end_reselect_async /* reselect_async */
};
static void named_pipe_device_dump( struct object *obj, int verbose );
@ -388,6 +390,20 @@ static void notify_empty( struct pipe_server *server )
fd_async_wake_up( server->pipe_end.fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS );
}
static void wake_message( struct pipe_message *message )
{
struct async *async = message->async;
message->async = NULL;
message->iosb->status = STATUS_SUCCESS;
message->iosb->result = message->iosb->in_size;
if (async)
{
async_terminate( async, message->iosb->result ? STATUS_ALERTED : STATUS_SUCCESS );
release_object( async );
}
}
static void free_message( struct pipe_message *message )
{
list_remove( &message->entry );
@ -450,6 +466,8 @@ static void pipe_end_destroy( struct pipe_end *pipe_end )
assert( !message->async );
free_message( message );
}
free_async_queue( pipe_end->write_q );
}
static void pipe_server_destroy( struct object *obj)
@ -660,6 +678,39 @@ static obj_handle_t pipe_client_flush( struct fd *fd, struct async *async, int b
return 0;
}
/* We call async_terminate in our reselect implementation, which causes recursive reselect.
* We're not interested in such reselect calls, so we ignore them. */
static int ignore_reselect;
static void reselect_write_queue( struct pipe_end *pipe_end )
{
struct pipe_message *message, *next;
struct pipe_end *reader = pipe_end->connection;
data_size_t avail = 0;
if (!reader) return;
ignore_reselect = 1;
LIST_FOR_EACH_ENTRY_SAFE( message, next, &reader->message_queue, struct pipe_message, entry )
{
if (message->async && message->iosb->status != STATUS_PENDING)
{
release_object( message->async );
message->async = NULL;
free_message( message );
}
else
{
avail += message->iosb->in_size - message->read_pos;
if (message->iosb->status == STATUS_PENDING && (avail <= reader->buffer_size || !message->iosb->in_size))
wake_message( message );
}
}
ignore_reselect = 0;
}
static void pipe_end_queue_async( struct fd *fd, struct async *async, int type, int count )
{
struct pipe_end *pipe_end = get_fd_user( fd );
@ -667,6 +718,18 @@ static void pipe_end_queue_async( struct fd *fd, struct async *async, int type,
else default_fd_queue_async( fd, async, type, count );
}
static void pipe_end_reselect_async( struct fd *fd, struct async_queue *queue )
{
struct pipe_end *pipe_end = get_fd_user( fd );
if (ignore_reselect) return;
if (!use_server_io( pipe_end ))
default_fd_reselect_async( fd, queue );
else if (pipe_end->write_q && pipe_end->write_q == queue)
reselect_write_queue( pipe_end );
}
static inline int is_overlapped( unsigned int options )
{
return !(options & (FILE_SYNCHRONOUS_IO_ALERT | FILE_SYNCHRONOUS_IO_NONALERT));
@ -767,6 +830,7 @@ static void init_pipe_end( struct pipe_end *pipe_end, unsigned int pipe_flags, d
pipe_end->flags = pipe_flags;
pipe_end->connection = NULL;
pipe_end->buffer_size = buffer_size;
pipe_end->write_q = NULL;
list_init( &pipe_end->message_queue );
}