Implemented server framework for asynchronous IO on sockets.
This commit is contained in:
parent
046dbf94f8
commit
bff7e69981
|
@ -795,7 +795,8 @@ enum fd_type
|
|||
};
|
||||
#define FD_FLAG_OVERLAPPED 0x01
|
||||
#define FD_FLAG_TIMEOUT 0x02
|
||||
|
||||
#define FD_FLAG_RECV_SHUTDOWN 0x04
|
||||
#define FD_FLAG_SEND_SHUTDOWN 0x08
|
||||
|
||||
|
||||
struct set_file_pointer_request
|
||||
|
|
|
@ -612,7 +612,8 @@ enum fd_type
|
|||
};
|
||||
#define FD_FLAG_OVERLAPPED 0x01
|
||||
#define FD_FLAG_TIMEOUT 0x02
|
||||
|
||||
#define FD_FLAG_RECV_SHUTDOWN 0x04
|
||||
#define FD_FLAG_SEND_SHUTDOWN 0x08
|
||||
|
||||
/* Set a file current position */
|
||||
@REQ(set_file_pointer)
|
||||
|
|
178
server/sock.c
178
server/sock.c
|
@ -86,6 +86,7 @@ static int sock_get_info( struct object *obj, struct get_file_info_reply *reply,
|
|||
static void sock_destroy( struct object *obj );
|
||||
static int sock_get_error( int err );
|
||||
static void sock_set_error(void);
|
||||
static void sock_queue_async(struct object *obj, void *ptr, unsigned int status, int type, int count);
|
||||
|
||||
static const struct object_ops sock_ops =
|
||||
{
|
||||
|
@ -100,7 +101,7 @@ static const struct object_ops sock_ops =
|
|||
sock_get_fd, /* get_fd */
|
||||
no_flush, /* flush */
|
||||
sock_get_info, /* get_file_info */
|
||||
NULL, /* queue_async */
|
||||
sock_queue_async, /* queue_async */
|
||||
sock_destroy /* destroy */
|
||||
};
|
||||
|
||||
|
@ -124,7 +125,7 @@ static const int event_bitorder[FD_MAX_EVENTS] =
|
|||
};
|
||||
|
||||
|
||||
static void sock_reselect( struct sock *sock )
|
||||
static int sock_reselect( struct sock *sock )
|
||||
{
|
||||
int ev = sock_get_poll_events( &sock->obj );
|
||||
|
||||
|
@ -133,21 +134,61 @@ static void sock_reselect( struct sock *sock )
|
|||
|
||||
if (sock->obj.select == -1) {
|
||||
/* previously unconnected socket, is this reselect supposed to connect it? */
|
||||
if (!(sock->state & ~FD_WINE_NONBLOCKING)) return;
|
||||
if (!(sock->state & ~FD_WINE_NONBLOCKING)) return 0;
|
||||
/* ok, it is, attach it to the wineserver's main poll loop */
|
||||
add_select_user( &sock->obj );
|
||||
}
|
||||
/* update condition mask */
|
||||
set_select_events( &sock->obj, ev );
|
||||
return ev;
|
||||
}
|
||||
|
||||
/* After POLLHUP is received, the socket will no longer be in the main select loop.
|
||||
This function is used to signal pending events nevertheless */
|
||||
static void sock_try_event ( struct sock *sock, int event )
|
||||
{
|
||||
struct pollfd pfd;
|
||||
|
||||
pfd.fd = sock->obj.fd;
|
||||
pfd.events = event;
|
||||
pfd.revents = 0;
|
||||
poll (&pfd, 1, 0);
|
||||
|
||||
if ( pfd.revents )
|
||||
{
|
||||
if ( debug_level ) fprintf ( stderr, "sock_try_event: %x\n", pfd.revents );
|
||||
sock_poll_event ( &sock->obj, pfd.revents );
|
||||
}
|
||||
}
|
||||
|
||||
/* wake anybody waiting on the socket event or send the associated message */
|
||||
static void sock_wake_up( struct sock *sock )
|
||||
static void sock_wake_up( struct sock *sock, int pollev )
|
||||
{
|
||||
unsigned int events = sock->pmask & sock->mask;
|
||||
int i;
|
||||
int async_active = 0;
|
||||
|
||||
if (!events) return;
|
||||
if ( sock->flags & FD_FLAG_OVERLAPPED )
|
||||
{
|
||||
if( pollev & (POLLIN|POLLPRI) && IS_READY( sock->read_q ) )
|
||||
{
|
||||
if (debug_level) fprintf ( stderr, "activating read queue for socket %p\n", sock );
|
||||
async_notify( sock->read_q.head, STATUS_ALERTED );
|
||||
async_active = 1;
|
||||
}
|
||||
if( pollev & POLLOUT && IS_READY( sock->write_q ) )
|
||||
{
|
||||
if (debug_level) fprintf ( stderr, "activating write queue for socket %p\n", sock );
|
||||
async_notify( sock->write_q.head, STATUS_ALERTED );
|
||||
async_active = 1;
|
||||
}
|
||||
}
|
||||
|
||||
/* Do not signal events if there are still pending asynchronous IO requests */
|
||||
/* We need this to delay FD_CLOSE events until all pending overlapped requests are processed */
|
||||
if ( !events || async_active ) return;
|
||||
|
||||
if (events & FD_CLOSE) sock->hmask |= FD_CLOSE;
|
||||
|
||||
if (sock->event)
|
||||
{
|
||||
|
@ -183,6 +224,7 @@ inline static int sock_error(int s)
|
|||
static void sock_poll_event( struct object *obj, int event )
|
||||
{
|
||||
struct sock *sock = (struct sock *)obj;
|
||||
int empty_recv = 0;
|
||||
|
||||
assert( sock->obj.ops == &sock_ops );
|
||||
if (debug_level)
|
||||
|
@ -208,6 +250,7 @@ static void sock_poll_event( struct object *obj, int event )
|
|||
sock->errors[FD_CONNECT_BIT] = sock_error( sock->obj.fd );
|
||||
if (debug_level)
|
||||
fprintf(stderr, "socket %d connection failure\n", sock->obj.fd);
|
||||
set_select_events( &sock->obj, -1 );
|
||||
}
|
||||
} else
|
||||
if (sock->state & FD_WINE_LISTENING)
|
||||
|
@ -226,6 +269,7 @@ static void sock_poll_event( struct object *obj, int event )
|
|||
sock->pmask |= FD_ACCEPT;
|
||||
sock->errors[FD_ACCEPT_BIT] = sock_error( sock->obj.fd );
|
||||
sock->hmask |= FD_ACCEPT;
|
||||
set_select_events( &sock->obj, -1 );
|
||||
}
|
||||
} else
|
||||
{
|
||||
|
@ -233,11 +277,12 @@ static void sock_poll_event( struct object *obj, int event )
|
|||
if (event & POLLIN)
|
||||
{
|
||||
char dummy;
|
||||
int nr;
|
||||
|
||||
/* Linux 2.4 doesn't report POLLHUP if only one side of the socket
|
||||
* has been closed, so we need to check for it explicitly here */
|
||||
if (!recv( sock->obj.fd, &dummy, 1, MSG_PEEK )) event = POLLHUP;
|
||||
else
|
||||
nr = recv( sock->obj.fd, &dummy, 1, MSG_PEEK );
|
||||
if ( nr > 0 )
|
||||
{
|
||||
/* incoming data */
|
||||
sock->pmask |= FD_READ;
|
||||
|
@ -246,6 +291,22 @@ static void sock_poll_event( struct object *obj, int event )
|
|||
if (debug_level)
|
||||
fprintf(stderr, "socket %d is readable\n", sock->obj.fd );
|
||||
}
|
||||
else if ( nr == 0 )
|
||||
empty_recv = 1;
|
||||
else
|
||||
{
|
||||
/* EAGAIN can happen if an async recv() falls between the server's poll()
|
||||
call and the invocation of this routine */
|
||||
if ( errno == EAGAIN )
|
||||
event &= ~POLLIN;
|
||||
else
|
||||
{
|
||||
if ( debug_level )
|
||||
fprintf ( stderr, "recv error on socket %d: %d\n", sock->obj.fd, errno );
|
||||
event = POLLERR;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
if (event & POLLOUT)
|
||||
{
|
||||
|
@ -263,25 +324,25 @@ static void sock_poll_event( struct object *obj, int event )
|
|||
if (debug_level)
|
||||
fprintf(stderr, "socket %d got OOB data\n", sock->obj.fd);
|
||||
}
|
||||
if (((event & POLLERR) || ((event & (POLLIN|POLLHUP)) == POLLHUP))
|
||||
&& (sock->state & (FD_READ|FD_WRITE))) {
|
||||
/* socket closing */
|
||||
/* According to WS2 specs, FD_CLOSE is only delivered when there is
|
||||
no more data to be read (i.e. empty_recv = 1) */
|
||||
else if ( empty_recv && (sock->state & (FD_READ|FD_WRITE) ))
|
||||
{
|
||||
sock->errors[FD_CLOSE_BIT] = sock_error( sock->obj.fd );
|
||||
sock->state &= ~(FD_WINE_CONNECTED|FD_READ|FD_WRITE);
|
||||
if ( event & ( POLLERR|POLLHUP ) )
|
||||
sock->state &= ~(FD_WINE_CONNECTED|FD_WRITE);
|
||||
sock->pmask |= FD_CLOSE;
|
||||
if (debug_level)
|
||||
fprintf(stderr, "socket %d aborted by error %d\n",
|
||||
sock->obj.fd, sock->errors[FD_CLOSE_BIT]);
|
||||
fprintf(stderr, "socket %d aborted by error %d, event: %x - removing from select loop\n",
|
||||
sock->obj.fd, sock->errors[FD_CLOSE_BIT], event);
|
||||
set_select_events( &sock->obj, -1 );
|
||||
}
|
||||
}
|
||||
|
||||
if (event & (POLLERR|POLLHUP))
|
||||
set_select_events( &sock->obj, -1 );
|
||||
else
|
||||
sock_reselect( sock );
|
||||
|
||||
/* wake up anyone waiting for whatever just happened */
|
||||
if (sock->pmask & sock->mask) sock_wake_up( sock );
|
||||
if ( sock->pmask & sock->mask || sock->flags & FD_FLAG_OVERLAPPED ) sock_wake_up( sock, event );
|
||||
|
||||
/* if anyone is stupid enough to wait on the socket object itself,
|
||||
* maybe we should wake them up too, just in case? */
|
||||
|
@ -320,8 +381,14 @@ static int sock_get_poll_events( struct object *obj )
|
|||
/* listening, wait for readable */
|
||||
return (sock->hmask & FD_ACCEPT) ? 0 : POLLIN;
|
||||
|
||||
if (mask & FD_READ) ev |= POLLIN | POLLPRI;
|
||||
if (mask & FD_WRITE) ev |= POLLOUT;
|
||||
if (mask & (FD_READ) || (sock->flags & WSA_FLAG_OVERLAPPED && IS_READY (sock->read_q)))
|
||||
ev |= POLLIN | POLLPRI;
|
||||
if (mask & FD_WRITE || (sock->flags & WSA_FLAG_OVERLAPPED && IS_READY (sock->write_q)))
|
||||
ev |= POLLOUT;
|
||||
/* We use POLLIN with 0 bytes recv() as FD_CLOSE indication. */
|
||||
if (sock->mask & ~sock->hmask & FD_CLOSE && !(sock->hmask & FD_READ) )
|
||||
ev |= POLLIN;
|
||||
|
||||
return ev;
|
||||
}
|
||||
|
||||
|
@ -352,9 +419,68 @@ static int sock_get_info( struct object *obj, struct get_file_info_reply *reply,
|
|||
}
|
||||
*flags = 0;
|
||||
if (sock->flags & WSA_FLAG_OVERLAPPED) *flags |= FD_FLAG_OVERLAPPED;
|
||||
if ( !(sock->state & FD_READ ) ) *flags |= FD_FLAG_RECV_SHUTDOWN;
|
||||
if ( !(sock->state & FD_WRITE ) ) *flags |= FD_FLAG_SEND_SHUTDOWN;
|
||||
return FD_TYPE_DEFAULT;
|
||||
}
|
||||
|
||||
static void sock_queue_async(struct object *obj, void *ptr, unsigned int status, int type, int count)
|
||||
{
|
||||
struct sock *sock = (struct sock *)obj;
|
||||
struct async_queue *q;
|
||||
struct async *async;
|
||||
int pollev;
|
||||
|
||||
assert( obj->ops == &sock_ops );
|
||||
|
||||
if ( !(sock->flags & WSA_FLAG_OVERLAPPED) )
|
||||
{
|
||||
set_error ( STATUS_INVALID_HANDLE );
|
||||
return;
|
||||
}
|
||||
|
||||
switch( type )
|
||||
{
|
||||
case ASYNC_TYPE_READ:
|
||||
q = &sock->read_q;
|
||||
break;
|
||||
case ASYNC_TYPE_WRITE:
|
||||
q = &sock->write_q;
|
||||
break;
|
||||
default:
|
||||
set_error( STATUS_INVALID_PARAMETER );
|
||||
return;
|
||||
}
|
||||
|
||||
async = find_async ( q, current, ptr );
|
||||
|
||||
if ( status == STATUS_PENDING )
|
||||
{
|
||||
if ( ( !( sock->state & FD_READ ) && type == ASYNC_TYPE_READ ) ||
|
||||
( !( sock->state & FD_WRITE ) && type == ASYNC_TYPE_WRITE ) )
|
||||
{
|
||||
set_error ( STATUS_PIPE_DISCONNECTED );
|
||||
if ( async ) destroy_async ( async );
|
||||
}
|
||||
else
|
||||
{
|
||||
if ( !async )
|
||||
async = create_async ( obj, current, ptr );
|
||||
if ( !async )
|
||||
return;
|
||||
|
||||
async->status = STATUS_PENDING;
|
||||
if ( !async->q )
|
||||
async_insert ( q, async );
|
||||
}
|
||||
}
|
||||
else if ( async ) destroy_async ( async );
|
||||
else set_error ( STATUS_INVALID_PARAMETER );
|
||||
|
||||
pollev = sock_reselect ( sock );
|
||||
if ( pollev ) sock_try_event ( sock, pollev );
|
||||
}
|
||||
|
||||
static void sock_destroy( struct object *obj )
|
||||
{
|
||||
struct sock *sock = (struct sock *)obj;
|
||||
|
@ -578,6 +704,7 @@ DECL_HANDLER(set_socket_event)
|
|||
{
|
||||
struct sock *sock;
|
||||
struct event *old_event;
|
||||
int pollev;
|
||||
|
||||
if (!(sock = (struct sock*)get_handle_obj( current->process, req->handle,
|
||||
GENERIC_READ|GENERIC_WRITE|SYNCHRONIZE, &sock_ops)))
|
||||
|
@ -591,7 +718,10 @@ DECL_HANDLER(set_socket_event)
|
|||
if (req->event) sock->event = get_event_obj( current->process, req->event, EVENT_MODIFY_STATE );
|
||||
|
||||
if (debug_level && sock->event) fprintf(stderr, "event ptr: %p\n", sock->event);
|
||||
sock_reselect( sock );
|
||||
|
||||
pollev = sock_reselect( sock );
|
||||
if ( pollev ) sock_try_event ( sock, pollev );
|
||||
|
||||
if (sock->mask)
|
||||
sock->state |= FD_WINE_NONBLOCKING;
|
||||
|
||||
|
@ -599,7 +729,7 @@ DECL_HANDLER(set_socket_event)
|
|||
it is possible that FD_CONNECT or FD_ACCEPT network events has happened
|
||||
before a WSAEventSelect() was done on it.
|
||||
(when dealing with Asynchronous socket) */
|
||||
if (sock->pmask & sock->mask) sock_wake_up( sock );
|
||||
if (sock->pmask & sock->mask) sock_wake_up( sock, pollev );
|
||||
|
||||
if (old_event) release_object( old_event ); /* we're through with it */
|
||||
release_object( &sock->obj );
|
||||
|
@ -646,6 +776,7 @@ DECL_HANDLER(get_socket_event)
|
|||
DECL_HANDLER(enable_socket_event)
|
||||
{
|
||||
struct sock *sock;
|
||||
int pollev;
|
||||
|
||||
if (!(sock = (struct sock*)get_handle_obj( current->process, req->handle,
|
||||
GENERIC_READ|GENERIC_WRITE|SYNCHRONIZE, &sock_ops)))
|
||||
|
@ -655,7 +786,10 @@ DECL_HANDLER(enable_socket_event)
|
|||
sock->hmask &= ~req->mask;
|
||||
sock->state |= req->sstate;
|
||||
sock->state &= ~req->cstate;
|
||||
sock_reselect( sock );
|
||||
|
||||
pollev = sock_reselect( sock );
|
||||
if ( pollev ) sock_try_event ( sock, pollev );
|
||||
|
||||
release_object( &sock->obj );
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue