Changed select interface, separated timeouts from file descriptors.
This commit is contained in:
parent
3da5f84df1
commit
88de35cd70
|
@ -188,7 +188,7 @@ int no_get_file_info( struct object *obj, struct get_file_info_reply *info )
|
|||
return 0;
|
||||
}
|
||||
|
||||
void default_select_event( int fd, int event, void *private )
|
||||
void default_select_event( int event, void *private )
|
||||
{
|
||||
struct object *obj = (struct object *)private;
|
||||
assert( obj );
|
||||
|
|
|
@ -74,7 +74,7 @@ extern int no_read_fd( struct object *obj );
|
|||
extern int no_write_fd( struct object *obj );
|
||||
extern int no_flush( struct object *obj );
|
||||
extern int no_get_file_info( struct object *obj, struct get_file_info_reply *info );
|
||||
extern void default_select_event( int fd, int event, void *private );
|
||||
extern void default_select_event( int event, void *private );
|
||||
|
||||
/* request handlers */
|
||||
|
||||
|
@ -84,7 +84,7 @@ struct thread;
|
|||
extern void fatal_protocol_error( const char *err, ... );
|
||||
extern void call_req_handler( struct thread *thread, enum request req,
|
||||
void *data, int len, int fd );
|
||||
extern void call_timeout_handler( struct thread *thread );
|
||||
extern void call_timeout_handler( void *thread );
|
||||
extern void call_kill_handler( struct thread *thread, int exit_code );
|
||||
|
||||
extern void trace_request( enum request req, void *data, int len, int fd );
|
||||
|
@ -103,25 +103,37 @@ extern void trace_reply( struct thread *thread, int type, int pass_fd,
|
|||
#define READ_EVENT 1
|
||||
#define WRITE_EVENT 2
|
||||
|
||||
struct select_ops
|
||||
struct select_user
|
||||
{
|
||||
void (*event)( int fd, int event, void *private );
|
||||
void (*timeout)( int fd, void *private );
|
||||
int fd; /* user fd */
|
||||
void (*func)(int event, void *private); /* callback function */
|
||||
void *private; /* callback private data */
|
||||
};
|
||||
|
||||
extern int add_select_user( int fd, int events, const struct select_ops *ops, void *private );
|
||||
extern void remove_select_user( int fd );
|
||||
extern void set_select_timeout( int fd, struct timeval *when );
|
||||
extern void set_select_events( int fd, int events );
|
||||
extern void *get_select_private_data( const struct select_ops *ops, int fd );
|
||||
extern void register_select_user( struct select_user *user );
|
||||
extern void unregister_select_user( struct select_user *user );
|
||||
extern void set_select_events( struct select_user *user, int events );
|
||||
extern int check_select_events( struct select_user *user, int events );
|
||||
extern void select_loop(void);
|
||||
|
||||
/* timeout functions */
|
||||
|
||||
struct timeout_user;
|
||||
|
||||
typedef void (*timeout_callback)( void *private );
|
||||
|
||||
extern struct timeout_user *add_timeout_user( struct timeval *when,
|
||||
timeout_callback func, void *private );
|
||||
extern void remove_timeout_user( struct timeout_user *user );
|
||||
extern void make_timeout( struct timeval *when, int timeout );
|
||||
|
||||
/* socket functions */
|
||||
|
||||
extern int add_client( int client_fd, struct thread *self );
|
||||
extern void remove_client( int client_fd, int exit_code );
|
||||
extern void set_timeout( int client_fd, struct timeval *when );
|
||||
extern int send_reply_v( int client_fd, int type, int pass_fd,
|
||||
struct client;
|
||||
|
||||
extern struct client *add_client( int client_fd, struct thread *self );
|
||||
extern void remove_client( struct client *client, int exit_code );
|
||||
extern int send_reply_v( struct client *client, int type, int pass_fd,
|
||||
struct iovec *vec, int veclen );
|
||||
|
||||
/* mutex functions */
|
||||
|
|
|
@ -33,7 +33,7 @@ void fatal_protocol_error( const char *err, ... )
|
|||
fprintf( stderr, "Protocol error:%p: ", current );
|
||||
vfprintf( stderr, err, args );
|
||||
va_end( args );
|
||||
remove_client( current->client_fd, -2 );
|
||||
remove_client( current->client, -2 );
|
||||
}
|
||||
|
||||
/* call a request handler */
|
||||
|
@ -68,10 +68,10 @@ void call_req_handler( struct thread *thread, enum request req,
|
|||
current = NULL;
|
||||
}
|
||||
|
||||
/* handle a client timeout (unused for now) */
|
||||
void call_timeout_handler( struct thread *thread )
|
||||
/* handle a client timeout */
|
||||
void call_timeout_handler( void *thread )
|
||||
{
|
||||
current = thread;
|
||||
current = (struct thread *)thread;
|
||||
if (debug_level) trace_timeout();
|
||||
CLEAR_ERROR();
|
||||
thread_timeout();
|
||||
|
|
168
server/select.c
168
server/select.c
|
@ -17,81 +17,82 @@
|
|||
|
||||
#include "object.h"
|
||||
|
||||
/* select user fd */
|
||||
struct user
|
||||
struct timeout_user
|
||||
{
|
||||
struct timeval when; /* timeout expiry (absolute time) */
|
||||
struct user *next; /* next in sorted timeout list */
|
||||
struct user *prev; /* prev in sorted timeout list */
|
||||
const struct select_ops *ops; /* user operations list */
|
||||
int fd; /* user fd */
|
||||
void *private; /* user private data */
|
||||
struct timeout_user *next; /* next in sorted timeout list */
|
||||
struct timeout_user *prev; /* prev in sorted timeout list */
|
||||
struct timeval when; /* timeout expiry (absolute time) */
|
||||
timeout_callback callback; /* callback function */
|
||||
void *private; /* callback private data */
|
||||
};
|
||||
|
||||
static struct user *users[FD_SETSIZE]; /* users array */
|
||||
static struct select_user *users[FD_SETSIZE]; /* users array */
|
||||
static fd_set read_set, write_set; /* current select sets */
|
||||
static int nb_users; /* current number of users */
|
||||
static int max_fd; /* max fd in use */
|
||||
static struct user *timeout_head; /* sorted timeouts list head */
|
||||
static struct user *timeout_tail; /* sorted timeouts list tail */
|
||||
static struct timeout_user *timeout_head; /* sorted timeouts list head */
|
||||
static struct timeout_user *timeout_tail; /* sorted timeouts list tail */
|
||||
|
||||
|
||||
/* add a user */
|
||||
int add_select_user( int fd, int events, const struct select_ops *ops, void *private )
|
||||
/* register a user */
|
||||
void register_select_user( struct select_user *user )
|
||||
{
|
||||
int flags;
|
||||
struct user *user = malloc( sizeof(*user) );
|
||||
if (!user) return -1;
|
||||
assert( !users[fd] );
|
||||
assert( !users[user->fd] );
|
||||
|
||||
user->ops = ops;
|
||||
user->when.tv_sec = 0;
|
||||
user->when.tv_usec = 0;
|
||||
user->fd = fd;
|
||||
user->private = private;
|
||||
flags = fcntl( user->fd, F_GETFL, 0 );
|
||||
fcntl( user->fd, F_SETFL, flags | O_NONBLOCK );
|
||||
|
||||
flags = fcntl( fd, F_GETFL, 0 );
|
||||
fcntl( fd, F_SETFL, flags | O_NONBLOCK );
|
||||
|
||||
users[fd] = user;
|
||||
set_select_events( fd, events );
|
||||
if (fd > max_fd) max_fd = fd;
|
||||
users[user->fd] = user;
|
||||
if (user->fd > max_fd) max_fd = user->fd;
|
||||
nb_users++;
|
||||
return fd;
|
||||
}
|
||||
|
||||
/* remove a user */
|
||||
void remove_select_user( int fd )
|
||||
void unregister_select_user( struct select_user *user )
|
||||
{
|
||||
struct user *user = users[fd];
|
||||
assert( user );
|
||||
assert( users[user->fd] == user );
|
||||
|
||||
set_select_timeout( fd, 0 );
|
||||
set_select_events( fd, 0 );
|
||||
users[fd] = NULL;
|
||||
if (max_fd == fd) while (max_fd && !users[max_fd]) max_fd--;
|
||||
FD_CLR( user->fd, &read_set );
|
||||
FD_CLR( user->fd, &write_set );
|
||||
users[user->fd] = NULL;
|
||||
if (max_fd == user->fd) while (max_fd && !users[max_fd]) max_fd--;
|
||||
nb_users--;
|
||||
free( user );
|
||||
}
|
||||
|
||||
/* set a user timeout */
|
||||
void set_select_timeout( int fd, struct timeval *when )
|
||||
/* set the events that select waits for on this fd */
|
||||
void set_select_events( struct select_user *user, int events )
|
||||
{
|
||||
struct user *user = users[fd];
|
||||
struct user *pos;
|
||||
assert( user );
|
||||
assert( users[user->fd] == user );
|
||||
if (events & READ_EVENT) FD_SET( user->fd, &read_set );
|
||||
else FD_CLR( user->fd, &read_set );
|
||||
if (events & WRITE_EVENT) FD_SET( user->fd, &write_set );
|
||||
else FD_CLR( user->fd, &write_set );
|
||||
}
|
||||
|
||||
if (user->when.tv_sec || user->when.tv_usec)
|
||||
{
|
||||
/* there is already a timeout */
|
||||
if (user->next) user->next->prev = user->prev;
|
||||
else timeout_tail = user->prev;
|
||||
if (user->prev) user->prev->next = user->next;
|
||||
else timeout_head = user->next;
|
||||
user->when.tv_sec = user->when.tv_usec = 0;
|
||||
}
|
||||
if (!when) return; /* no timeout */
|
||||
user->when = *when;
|
||||
/* check if events are pending */
|
||||
int check_select_events( struct select_user *user, int events )
|
||||
{
|
||||
fd_set read_fds, write_fds;
|
||||
struct timeval tv = { 0, 0 };
|
||||
|
||||
FD_ZERO( &read_fds );
|
||||
FD_ZERO( &write_fds );
|
||||
if (events & READ_EVENT) FD_SET( user->fd, &read_fds );
|
||||
if (events & WRITE_EVENT) FD_SET( user->fd, &write_fds );
|
||||
return select( user->fd + 1, &read_fds, &write_fds, NULL, &tv ) > 0;
|
||||
}
|
||||
|
||||
/* add a timeout user */
|
||||
struct timeout_user *add_timeout_user( struct timeval *when, timeout_callback func, void *private )
|
||||
{
|
||||
struct timeout_user *user;
|
||||
struct timeout_user *pos;
|
||||
|
||||
if (!(user = mem_alloc( sizeof(*user) ))) return NULL;
|
||||
user->when = *when;
|
||||
user->callback = func;
|
||||
user->private = private;
|
||||
|
||||
/* Now insert it in the linked list */
|
||||
|
||||
|
@ -117,24 +118,41 @@ void set_select_timeout( int fd, struct timeval *when )
|
|||
user->prev = timeout_tail;
|
||||
timeout_tail = user;
|
||||
}
|
||||
return user;
|
||||
}
|
||||
|
||||
/* set the events that select waits for on this fd */
|
||||
void set_select_events( int fd, int events )
|
||||
/* remove a timeout user */
|
||||
void remove_timeout_user( struct timeout_user *user )
|
||||
{
|
||||
if (events & READ_EVENT) FD_SET( fd, &read_set );
|
||||
else FD_CLR( fd, &read_set );
|
||||
if (events & WRITE_EVENT) FD_SET( fd, &write_set );
|
||||
else FD_CLR( fd, &write_set );
|
||||
if (user->next) user->next->prev = user->prev;
|
||||
else timeout_tail = user->prev;
|
||||
if (user->prev) user->prev->next = user->next;
|
||||
else timeout_head = user->next;
|
||||
free( user );
|
||||
}
|
||||
|
||||
/* get a user private data, checking the type */
|
||||
void *get_select_private_data( const struct select_ops *ops, int fd )
|
||||
/* make an absolute timeout value from a relative timeout in milliseconds */
|
||||
void make_timeout( struct timeval *when, int timeout )
|
||||
{
|
||||
struct user *user = users[fd];
|
||||
assert( user );
|
||||
assert( user->ops == ops );
|
||||
return user->private;
|
||||
gettimeofday( when, 0 );
|
||||
if (!timeout) return;
|
||||
if ((when->tv_usec += (timeout % 1000) * 1000) >= 1000000)
|
||||
{
|
||||
when->tv_usec -= 1000000;
|
||||
when->tv_sec++;
|
||||
}
|
||||
when->tv_sec += timeout / 1000;
|
||||
}
|
||||
|
||||
/* handle an expired timeout */
|
||||
static void handle_timeout( struct timeout_user *user )
|
||||
{
|
||||
if (user->next) user->next->prev = user->prev;
|
||||
else timeout_tail = user->prev;
|
||||
if (user->prev) user->prev->next = user->next;
|
||||
else timeout_head = user->next;
|
||||
user->callback( user->private );
|
||||
free( user );
|
||||
}
|
||||
|
||||
/* server main loop */
|
||||
|
@ -148,12 +166,6 @@ void select_loop(void)
|
|||
while (nb_users)
|
||||
{
|
||||
fd_set read = read_set, write = write_set;
|
||||
#if 0
|
||||
printf( "select: " );
|
||||
for (i = 0; i <= max_fd; i++) printf( "%c", FD_ISSET( i, &read_set ) ? 'r' :
|
||||
(FD_ISSET( i, &write_set ) ? 'w' : '-') );
|
||||
printf( "\n" );
|
||||
#endif
|
||||
if (timeout_head)
|
||||
{
|
||||
struct timeval tv, now;
|
||||
|
@ -162,7 +174,7 @@ void select_loop(void)
|
|||
((timeout_head->when.tv_sec == now.tv_sec) &&
|
||||
(timeout_head->when.tv_usec < now.tv_usec)))
|
||||
{
|
||||
timeout_head->ops->timeout( timeout_head->fd, timeout_head->private );
|
||||
handle_timeout( timeout_head );
|
||||
continue;
|
||||
}
|
||||
tv.tv_sec = timeout_head->when.tv_sec - now.tv_sec;
|
||||
|
@ -171,10 +183,22 @@ void select_loop(void)
|
|||
tv.tv_usec += 1000000;
|
||||
tv.tv_sec--;
|
||||
}
|
||||
#if 0
|
||||
printf( "select: " );
|
||||
for (i = 0; i <= max_fd; i++) printf( "%c", FD_ISSET( i, &read_set ) ? 'r' :
|
||||
(FD_ISSET( i, &write_set ) ? 'w' : '-') );
|
||||
printf( " timeout %d.%06d\n", tv.tv_sec, tv.tv_usec );
|
||||
#endif
|
||||
ret = select( max_fd + 1, &read, &write, NULL, &tv );
|
||||
}
|
||||
else /* no timeout */
|
||||
{
|
||||
#if 0
|
||||
printf( "select: " );
|
||||
for (i = 0; i <= max_fd; i++) printf( "%c", FD_ISSET( i, &read_set ) ? 'r' :
|
||||
(FD_ISSET( i, &write_set ) ? 'w' : '-') );
|
||||
printf( " no timeout\n" );
|
||||
#endif
|
||||
ret = select( max_fd + 1, &read, &write, NULL, NULL );
|
||||
}
|
||||
|
||||
|
@ -194,7 +218,7 @@ void select_loop(void)
|
|||
called in an earlier pass of this loop might have removed
|
||||
the current user ... */
|
||||
if (event && users[i])
|
||||
users[i]->ops->event( i, event, users[i]->private );
|
||||
users[i]->func( event, users[i]->private );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,13 +37,15 @@ enum state
|
|||
/* client structure */
|
||||
struct client
|
||||
{
|
||||
enum state state; /* client state */
|
||||
unsigned int seq; /* current sequence number */
|
||||
struct header head; /* current msg header */
|
||||
char *data; /* current msg data */
|
||||
int count; /* bytes sent/received so far */
|
||||
int pass_fd; /* fd to pass to and from the client */
|
||||
struct thread *self; /* client thread (opaque pointer) */
|
||||
enum state state; /* client state */
|
||||
struct select_user select; /* select user */
|
||||
unsigned int seq; /* current sequence number */
|
||||
struct header head; /* current msg header */
|
||||
char *data; /* current msg data */
|
||||
int count; /* bytes sent/received so far */
|
||||
int pass_fd; /* fd to pass to and from the client */
|
||||
struct thread *self; /* client thread (opaque pointer) */
|
||||
struct timeout_user *timeout; /* current timeout (opaque pointer) */
|
||||
};
|
||||
|
||||
|
||||
|
@ -54,12 +56,12 @@ struct client
|
|||
|
||||
|
||||
/* signal a client protocol error */
|
||||
static void protocol_error( int client_fd, const char *err, ... )
|
||||
static void protocol_error( struct client *client, const char *err, ... )
|
||||
{
|
||||
va_list args;
|
||||
|
||||
va_start( args, err );
|
||||
fprintf( stderr, "Protocol error:%d: ", client_fd );
|
||||
fprintf( stderr, "Protocol error:%d: ", client->select.fd );
|
||||
vfprintf( stderr, err, args );
|
||||
va_end( args );
|
||||
}
|
||||
|
@ -131,7 +133,7 @@ static void do_write( struct client *client, int client_fd )
|
|||
if (ret == -1)
|
||||
{
|
||||
if (errno != EPIPE) perror("sendmsg");
|
||||
remove_client( client_fd, BROKEN_PIPE );
|
||||
remove_client( client, BROKEN_PIPE );
|
||||
return;
|
||||
}
|
||||
if (client->pass_fd != -1) /* We sent the fd, now we can close it */
|
||||
|
@ -147,7 +149,7 @@ static void do_write( struct client *client, int client_fd )
|
|||
client->count = 0;
|
||||
client->state = RUNNING;
|
||||
client->seq++;
|
||||
set_select_events( client_fd, READ_EVENT );
|
||||
set_select_events( &client->select, READ_EVENT );
|
||||
}
|
||||
|
||||
|
||||
|
@ -191,7 +193,7 @@ static void do_read( struct client *client, int client_fd )
|
|||
if (!client->data &&
|
||||
!(client->data = malloc(client->head.len-sizeof(client->head))))
|
||||
{
|
||||
remove_client( client_fd, OUT_OF_MEMORY );
|
||||
remove_client( client, OUT_OF_MEMORY );
|
||||
return;
|
||||
}
|
||||
vec.iov_base = client->data + client->count - sizeof(client->head);
|
||||
|
@ -202,7 +204,7 @@ static void do_read( struct client *client, int client_fd )
|
|||
if (ret == -1)
|
||||
{
|
||||
perror("recvmsg");
|
||||
remove_client( client_fd, BROKEN_PIPE );
|
||||
remove_client( client, BROKEN_PIPE );
|
||||
return;
|
||||
}
|
||||
#ifndef HAVE_MSGHDR_ACCRIGHTS
|
||||
|
@ -216,7 +218,7 @@ static void do_read( struct client *client, int client_fd )
|
|||
}
|
||||
else if (!ret) /* closed pipe */
|
||||
{
|
||||
remove_client( client_fd, BROKEN_PIPE );
|
||||
remove_client( client, BROKEN_PIPE );
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -231,17 +233,17 @@ static void do_read( struct client *client, int client_fd )
|
|||
/* sanity checks */
|
||||
if (client->head.seq != client->seq)
|
||||
{
|
||||
protocol_error( client_fd, "bad sequence %08x instead of %08x\n",
|
||||
protocol_error( client, "bad sequence %08x instead of %08x\n",
|
||||
client->head.seq, client->seq );
|
||||
remove_client( client_fd, PROTOCOL_ERROR );
|
||||
remove_client( client, PROTOCOL_ERROR );
|
||||
return;
|
||||
}
|
||||
if ((client->head.len < sizeof(client->head)) ||
|
||||
(client->head.len > MAX_MSG_LENGTH + sizeof(client->head)))
|
||||
{
|
||||
protocol_error( client_fd, "bad header length %08x\n",
|
||||
protocol_error( client, "bad header length %08x\n",
|
||||
client->head.len );
|
||||
remove_client( client_fd, PROTOCOL_ERROR );
|
||||
remove_client( client, PROTOCOL_ERROR );
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -270,66 +272,51 @@ static void do_read( struct client *client, int client_fd )
|
|||
}
|
||||
}
|
||||
|
||||
/* handle a client timeout */
|
||||
static void client_timeout( int client_fd, void *private )
|
||||
{
|
||||
struct client *client = (struct client *)private;
|
||||
set_select_timeout( client_fd, 0 ); /* Remove the timeout */
|
||||
call_timeout_handler( client->self );
|
||||
}
|
||||
|
||||
/* handle a client event */
|
||||
static void client_event( int client_fd, int event, void *private )
|
||||
static void client_event( int event, void *private )
|
||||
{
|
||||
struct client *client = (struct client *)private;
|
||||
if (event & WRITE_EVENT)
|
||||
do_write( client, client_fd );
|
||||
if (event & READ_EVENT)
|
||||
do_read( client, client_fd );
|
||||
if (event & WRITE_EVENT) do_write( client, client->select.fd );
|
||||
if (event & READ_EVENT) do_read( client, client->select.fd );
|
||||
}
|
||||
|
||||
static const struct select_ops client_ops =
|
||||
{
|
||||
client_event,
|
||||
client_timeout
|
||||
};
|
||||
|
||||
/*******************************************************************/
|
||||
/* server-side exported functions */
|
||||
|
||||
/* add a client */
|
||||
int add_client( int client_fd, struct thread *self )
|
||||
struct client *add_client( int fd, struct thread *self )
|
||||
{
|
||||
struct client *client = malloc( sizeof(*client) );
|
||||
if (!client) return -1;
|
||||
struct client *client = mem_alloc( sizeof(*client) );
|
||||
if (!client) return NULL;
|
||||
|
||||
client->state = RUNNING;
|
||||
client->select.fd = fd;
|
||||
client->select.func = client_event;
|
||||
client->select.private = client;
|
||||
client->seq = 0;
|
||||
client->head.len = 0;
|
||||
client->head.type = 0;
|
||||
client->count = 0;
|
||||
client->data = NULL;
|
||||
client->self = self;
|
||||
client->timeout = NULL;
|
||||
client->pass_fd = -1;
|
||||
|
||||
if (add_select_user( client_fd, READ_EVENT, &client_ops, client ) == -1)
|
||||
{
|
||||
free( client );
|
||||
return -1;
|
||||
}
|
||||
return client_fd;
|
||||
register_select_user( &client->select );
|
||||
set_select_events( &client->select, READ_EVENT );
|
||||
return client;
|
||||
}
|
||||
|
||||
/* remove a client */
|
||||
void remove_client( int client_fd, int exit_code )
|
||||
void remove_client( struct client *client, int exit_code )
|
||||
{
|
||||
struct client *client = (struct client *)get_select_private_data( &client_ops, client_fd );
|
||||
assert( client );
|
||||
|
||||
call_kill_handler( client->self, exit_code );
|
||||
|
||||
remove_select_user( client_fd );
|
||||
close( client_fd );
|
||||
if (client->timeout) remove_timeout_user( client->timeout );
|
||||
unregister_select_user( &client->select );
|
||||
close( client->select.fd );
|
||||
|
||||
/* Purge messages */
|
||||
if (client->data) free( client->data );
|
||||
|
@ -338,13 +325,12 @@ void remove_client( int client_fd, int exit_code )
|
|||
}
|
||||
|
||||
/* send a reply to a client */
|
||||
int send_reply_v( int client_fd, int type, int pass_fd,
|
||||
int send_reply_v( struct client *client, int type, int pass_fd,
|
||||
struct iovec *vec, int veclen )
|
||||
{
|
||||
int i;
|
||||
unsigned int len;
|
||||
char *p;
|
||||
struct client *client = (struct client *)get_select_private_data( &client_ops, client_fd );
|
||||
|
||||
assert( client );
|
||||
assert( client->state == WAITING );
|
||||
|
@ -369,6 +355,6 @@ int send_reply_v( int client_fd, int type, int pass_fd,
|
|||
}
|
||||
|
||||
client->state = READING;
|
||||
set_select_events( client_fd, WRITE_EVENT );
|
||||
set_select_events( &client->select, WRITE_EVENT );
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue