From dd81ac50a974d513163f93a35e4071e746a6a260 Mon Sep 17 00:00:00 2001 From: Alexandre Julliard Date: Thu, 24 Feb 2005 17:06:31 +0000 Subject: [PATCH] Convert async I/O queues to standard lists. --- server/fd.c | 52 ++++++++++++++++++++----------------------------- server/file.c | 26 ++++++++++++------------- server/file.h | 8 ++++---- server/serial.c | 38 +++++++++++++++++++----------------- server/sock.c | 38 ++++++++++++++++-------------------- 5 files changed, 74 insertions(+), 88 deletions(-) diff --git a/server/fd.c b/server/fd.c index 2c381a55dc0..8e9237cf3e7 100644 --- a/server/fd.c +++ b/server/fd.c @@ -991,10 +991,23 @@ struct async void *sb; struct timeval when; struct timeout_user *timeout; - struct async *next; - struct async **head; + struct list entry; }; +/* notifies client thread of new status of its async request */ +/* destroys the server side of it */ +static void async_terminate( struct async *async, int status ) +{ + thread_queue_apc( async->thread, NULL, async->apc, APC_ASYNC_IO, + 1, async->user, async->sb, (void *)status ); + + if (async->timeout) remove_timeout_user( async->timeout ); + async->timeout = NULL; + list_remove( &async->entry ); + release_object( async->thread ); + free( async ); +} + /* cb for timeout on an async request */ static void async_callback(void *private) { @@ -1006,12 +1019,10 @@ static void async_callback(void *private) } /* create an async on a given queue of a fd */ -struct async *create_async(struct fd *fd, struct thread *thread, - int timeout, struct async **head, +struct async *create_async(struct fd *fd, struct thread *thread, int timeout, struct list *queue, void *io_apc, void *io_user, void* io_sb) { struct async *async = mem_alloc( sizeof(struct async) ); - struct async **p; if (!async) return NULL; @@ -1020,11 +1031,8 @@ struct async *create_async(struct fd *fd, struct thread *thread, async->apc = io_apc; async->user = io_user; async->sb = io_sb; - async->head = head; - async->next = NULL; - for (p = head; *p; p = &(*p)->next); - *p = async; + list_add_tail( queue, &async->entry ); if (timeout) { @@ -1037,29 +1045,11 @@ struct async *create_async(struct fd *fd, struct thread *thread, return async; } -/* notifies client thread of new status of its async request */ -/* destroys the server side of it */ -void async_terminate( struct async *async, int status ) +/* terminate the async operation at the head of the queue */ +void async_terminate_head( struct list *queue, int status ) { - struct async** p; - - thread_queue_apc( async->thread, NULL, async->apc, APC_ASYNC_IO, - 1, async->user, async->sb, (void *)status ); - - if (async->timeout) remove_timeout_user( async->timeout ); - async->timeout = NULL; - - for (p = async->head; *p; p = &(*p)->next) - { - if (*p == async) - { - *p = async->next; - break; - } - } - - release_object( async->thread ); - free( async ); + struct list *ptr = list_head( queue ); + if (ptr) async_terminate( LIST_ENTRY( ptr, struct async, entry ), status ); } /****************************************************************/ diff --git a/server/file.c b/server/file.c index 2b7fb2e654d..9ca2c9186fe 100644 --- a/server/file.c +++ b/server/file.c @@ -57,8 +57,8 @@ struct file struct fd *fd; /* file descriptor for this file */ unsigned int access; /* file access (GENERIC_READ/WRITE) */ unsigned int options; /* file options (FILE_DELETE_ON_CLOSE, FILE_SYNCHRONOUS...) */ - struct async *read_q; - struct async *write_q; + struct list read_q; + struct list write_q; }; static void file_dump( struct object *obj, int verbose ); @@ -160,10 +160,8 @@ static struct object *create_file( const char *nameptr, size_t len, unsigned int file->access = access; file->options = options; - if (is_overlapped( file )) - { - file->read_q = file->write_q = NULL; - } + list_init( &file->read_q ); + list_init( &file->write_q ); /* FIXME: should set error to STATUS_OBJECT_NAME_COLLISION if file existed before */ if (!(file->fd = alloc_fd( &file_fd_ops, &file->obj )) || @@ -237,14 +235,14 @@ static void file_poll_event( struct fd *fd, int event ) assert( file->obj.ops == &file_ops ); if (is_overlapped( file )) { - if ( file->read_q && (POLLIN & event) ) + if (!list_empty( &file->read_q ) && (POLLIN & event) ) { - async_terminate( file->read_q, STATUS_ALERTED ); + async_terminate_head( &file->read_q, STATUS_ALERTED ); return; } - if ( file->write_q && (POLLOUT & event) ) + if (!list_empty( &file->write_q ) && (POLLOUT & event) ) { - async_terminate( file->write_q, STATUS_ALERTED ); + async_terminate_head( &file->write_q, STATUS_ALERTED ); return; } } @@ -271,7 +269,7 @@ static void file_queue_async( struct fd *fd, void *apc, void *user, void *iosb, int type, int count ) { struct file *file = get_fd_user( fd ); - struct async **head; + struct list *queue; int events; assert( file->obj.ops == &file_ops ); @@ -285,17 +283,17 @@ static void file_queue_async( struct fd *fd, void *apc, void *user, void *iosb, switch (type) { case ASYNC_TYPE_READ: - head = &file->read_q; + queue = &file->read_q; break; case ASYNC_TYPE_WRITE: - head = &file->write_q; + queue = &file->write_q; break; default: set_error( STATUS_INVALID_PARAMETER ); return; } - if (!create_async( fd, current, 0, head, apc, user, iosb )) + if (!create_async( fd, current, 0, queue, apc, user, iosb )) return; /* Check if the new pending request can be served immediately */ diff --git a/server/file.h b/server/file.h index 7f535945e5f..1e4907343d0 100644 --- a/server/file.h +++ b/server/file.h @@ -112,12 +112,12 @@ extern struct object *create_serial( struct fd *fd, unsigned int options ); /* async I/O functions */ extern struct async *create_async( struct fd *fd, struct thread *thread, int timeout, - struct async **head, void *, void *, void *); -extern void async_terminate( struct async *async, int status ); + struct list *queue, void *, void *, void *); +extern void async_terminate_head( struct list *queue, int status ); -static inline void async_terminate_queue( struct async **head, int status ) +static inline void async_terminate_queue( struct list *queue, int status ) { - while (*head) async_terminate( *head, status ); + while (!list_empty( queue )) async_terminate_head( queue, status ); } #endif /* __WINE_SERVER_FILE_H */ diff --git a/server/serial.c b/server/serial.c index f24cfed351c..a0d59a9a987 100644 --- a/server/serial.c +++ b/server/serial.c @@ -83,9 +83,9 @@ struct serial struct termios original; - struct async *read_q; - struct async *write_q; - struct async *wait_q; + struct list read_q; + struct list write_q; + struct list wait_q; /* FIXME: add dcb, comm status, handler module, sharing */ }; @@ -145,7 +145,9 @@ struct object *create_serial( struct fd *fd, unsigned int options ) serial->writeconst = 0; serial->eventmask = 0; serial->commerror = 0; - serial->read_q = serial->write_q = serial->wait_q = NULL; + list_init( &serial->read_q ); + list_init( &serial->write_q ); + list_init( &serial->wait_q ); if (!(serial->fd = create_anonymous_fd( &serial_fd_ops, unix_fd, &serial->obj ))) { release_object( serial ); @@ -188,9 +190,9 @@ static int serial_get_poll_events( struct fd *fd ) int events = 0; assert( serial->obj.ops == &serial_ops ); - if (serial->read_q) events |= POLLIN; - if (serial->write_q) events |= POLLOUT; - if (serial->wait_q) events |= POLLIN; + if (!list_empty( &serial->read_q )) events |= POLLIN; + if (!list_empty( &serial->write_q )) events |= POLLOUT; + if (!list_empty( &serial->wait_q )) events |= POLLIN; /* fprintf(stderr,"poll events are %04x\n",events); */ @@ -221,14 +223,14 @@ static void serial_poll_event(struct fd *fd, int event) /* fprintf(stderr,"Poll event %02x\n",event); */ - if (serial->read_q && (POLLIN & event) ) - async_terminate( serial->read_q, STATUS_ALERTED ); + if (!list_empty( &serial->read_q ) && (POLLIN & event) ) + async_terminate_head( &serial->read_q, STATUS_ALERTED ); - if (serial->write_q && (POLLOUT & event) ) - async_terminate( serial->write_q, STATUS_ALERTED ); + if (!list_empty( &serial->write_q ) && (POLLOUT & event) ) + async_terminate_head( &serial->write_q, STATUS_ALERTED ); - if (serial->wait_q && (POLLIN & event) ) - async_terminate( serial->wait_q, STATUS_ALERTED ); + if (!list_empty( &serial->wait_q ) && (POLLIN & event) ) + async_terminate_head( &serial->wait_q, STATUS_ALERTED ); set_fd_events( fd, serial_get_poll_events(fd) ); } @@ -237,7 +239,7 @@ static void serial_queue_async( struct fd *fd, void *apc, void *user, void *iosb int type, int count ) { struct serial *serial = get_fd_user( fd ); - struct async **head; + struct list *queue; int timeout; int events; @@ -246,15 +248,15 @@ static void serial_queue_async( struct fd *fd, void *apc, void *user, void *iosb switch (type) { case ASYNC_TYPE_READ: - head = &serial->read_q; + queue = &serial->read_q; timeout = serial->readconst + serial->readmult*count; break; case ASYNC_TYPE_WAIT: - head = &serial->wait_q; + queue = &serial->wait_q; timeout = 0; break; case ASYNC_TYPE_WRITE: - head = &serial->write_q; + queue = &serial->write_q; timeout = serial->writeconst + serial->writemult*count; break; default: @@ -262,7 +264,7 @@ static void serial_queue_async( struct fd *fd, void *apc, void *user, void *iosb return; } - if (!create_async( fd, current, timeout, head, apc, user, iosb )) + if (!create_async( fd, current, timeout, queue, apc, user, iosb )) return; /* Check if the new pending request can be served immediately */ diff --git a/server/sock.c b/server/sock.c index 9a4ae794224..d29233c2795 100644 --- a/server/sock.c +++ b/server/sock.c @@ -81,8 +81,8 @@ struct sock obj_handle_t wparam; /* message wparam (socket handle) */ int errors[FD_MAX_EVENTS]; /* event errors */ struct sock *deferred; /* socket that waits for a deferred accept */ - struct async *read_q; /* Queue for asynchronous reads */ - struct async *write_q; /* Queue for asynchronous writes */ + struct list read_q; /* queue for asynchronous reads */ + struct list write_q; /* queue for asynchronous writes */ }; static void sock_dump( struct object *obj, int verbose ); @@ -237,16 +237,16 @@ static void sock_wake_up( struct sock *sock, int pollev ) if ( sock->flags & WSA_FLAG_OVERLAPPED ) { - if ( pollev & (POLLIN|POLLPRI) && sock->read_q ) + if ( pollev & (POLLIN|POLLPRI) && !list_empty( &sock->read_q )) { if (debug_level) fprintf( stderr, "activating read queue for socket %p\n", sock ); - async_terminate( sock->read_q, STATUS_ALERTED ); + async_terminate_head( &sock->read_q, STATUS_ALERTED ); async_active = 1; } - if ( pollev & POLLOUT && sock->write_q ) + if ( pollev & POLLOUT && !list_empty( &sock->write_q )) { if (debug_level) fprintf( stderr, "activating write queue for socket %p\n", sock ); - async_terminate( sock->write_q, STATUS_ALERTED ); + async_terminate_head( &sock->write_q, STATUS_ALERTED ); async_active = 1; } } @@ -466,9 +466,9 @@ static int sock_get_poll_events( struct fd *fd ) /* listening, wait for readable */ return (sock->hmask & FD_ACCEPT) ? 0 : POLLIN; - if (mask & (FD_READ) || (sock->flags & WSA_FLAG_OVERLAPPED && sock->read_q)) + if (mask & (FD_READ) || (sock->flags & WSA_FLAG_OVERLAPPED && !list_empty( &sock->read_q ))) ev |= POLLIN | POLLPRI; - if (mask & FD_WRITE || (sock->flags & WSA_FLAG_OVERLAPPED && sock->write_q)) + if (mask & FD_WRITE || (sock->flags & WSA_FLAG_OVERLAPPED && !list_empty( &sock->write_q ))) ev |= POLLOUT; /* We use POLLIN with 0 bytes recv() as FD_CLOSE indication for stream sockets. */ if ( sock->type == SOCK_STREAM && ( sock->mask & ~sock->hmask & FD_CLOSE) ) @@ -496,9 +496,9 @@ static void sock_queue_async( struct fd *fd, void *apc, void *user, void *iosb, int type, int count ) { struct sock *sock = get_fd_user( fd ); - struct async **head; + struct list *queue; int pollev; - + assert( sock->obj.ops == &sock_ops ); if ( !(sock->flags & WSA_FLAG_OVERLAPPED) ) @@ -510,11 +510,11 @@ static void sock_queue_async( struct fd *fd, void *apc, void *user, void *iosb, switch (type) { case ASYNC_TYPE_READ: - head = &sock->read_q; + queue = &sock->read_q; sock->hmask &= ~FD_CLOSE; break; case ASYNC_TYPE_WRITE: - head = &sock->write_q; + queue = &sock->write_q; break; default: set_error( STATUS_INVALID_PARAMETER ); @@ -528,7 +528,7 @@ static void sock_queue_async( struct fd *fd, void *apc, void *user, void *iosb, } else { - if (!create_async( fd, current, 0, head, apc, user, iosb )) + if (!create_async( fd, current, 0, queue, apc, user, iosb )) return; } @@ -608,10 +608,8 @@ static struct object *create_socket( int family, int type, int protocol, unsigne release_object( sock ); return NULL; } - if (sock->flags & WSA_FLAG_OVERLAPPED) - { - sock->read_q = sock->write_q = NULL; - } + list_init( &sock->read_q ); + list_init( &sock->write_q ); sock_reselect( sock ); clear_error(); return &sock->obj; @@ -682,10 +680,8 @@ static struct sock *accept_socket( obj_handle_t handle ) release_object( sock ); return NULL; } - if ( acceptsock->flags & WSA_FLAG_OVERLAPPED ) - { - acceptsock->read_q = acceptsock->write_q = NULL; - } + list_init( &acceptsock->read_q ); + list_init( &acceptsock->write_q ); } clear_error(); sock->pmask &= ~FD_ACCEPT;