winhttp: Make the task queue implementation more generic.

Signed-off-by: Hans Leidekker <hans@codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
Hans Leidekker 2020-06-22 21:39:22 +02:00 committed by Alexandre Julliard
parent 5c4f1e4c82
commit 6a98d5f524
3 changed files with 109 additions and 95 deletions

View File

@ -121,27 +121,25 @@ static const WCHAR *attribute_table[] =
NULL /* WINHTTP_QUERY_PASSPORT_CONFIG = 78 */
};
static struct task_header *dequeue_task( struct request *request )
static struct task_header *dequeue_task( struct queue *queue )
{
struct task_header *task;
EnterCriticalSection( &request->task_cs );
TRACE("%u tasks queued\n", list_count( &request->task_queue ));
task = LIST_ENTRY( list_head( &request->task_queue ), struct task_header, entry );
EnterCriticalSection( &queue->cs );
TRACE("%u tasks queued in %p\n", list_count(&queue->tasks), queue);
task = LIST_ENTRY( list_head( &queue->tasks ), struct task_header, entry );
if (task) list_remove( &task->entry );
LeaveCriticalSection( &request->task_cs );
LeaveCriticalSection( &queue->cs );
TRACE("returning task %p\n", task);
return task;
}
static void CALLBACK task_proc( TP_CALLBACK_INSTANCE *instance, void *ctx )
static void CALLBACK run_queue( TP_CALLBACK_INSTANCE *instance, void *ctx )
{
struct request *request = ctx;
HANDLE handles[2];
struct queue *queue = ctx;
HANDLE handles[] = { queue->wait, queue->cancel };
handles[0] = request->task_wait;
handles[1] = request->task_cancel;
for (;;)
{
DWORD err = WaitForMultipleObjects( 2, handles, FALSE, INFINITE );
@ -150,21 +148,19 @@ static void CALLBACK task_proc( TP_CALLBACK_INSTANCE *instance, void *ctx )
case WAIT_OBJECT_0:
{
struct task_header *task;
while ((task = dequeue_task( request )))
while ((task = dequeue_task( queue )))
{
task->proc( task );
release_object( &task->request->hdr );
release_object( task->object );
heap_free( task );
}
break;
}
case WAIT_OBJECT_0 + 1:
TRACE("exiting\n");
CloseHandle( request->task_cancel );
CloseHandle( request->task_wait );
request->task_cs.DebugInfo->Spare[0] = 0;
DeleteCriticalSection( &request->task_cs );
request->hdr.vtbl->destroy( &request->hdr );
CloseHandle( queue->wait );
CloseHandle( queue->cancel );
queue->object->vtbl->destroy( queue->object );
return;
default:
@ -174,38 +170,43 @@ static void CALLBACK task_proc( TP_CALLBACK_INSTANCE *instance, void *ctx )
}
}
static DWORD queue_task( struct task_header *task )
static DWORD start_queue( struct object_header *object, struct queue *queue )
{
struct request *request = task->request;
DWORD ret = ERROR_OUTOFMEMORY;
if (!request->task_wait)
if (queue->wait) return ERROR_SUCCESS;
queue->object = object;
list_init( &queue->tasks );
if (!(queue->wait = CreateEventW( NULL, FALSE, FALSE, NULL ))) goto error;
if (!(queue->cancel = CreateEventW( NULL, FALSE, FALSE, NULL ))) goto error;
if (!TrySubmitThreadpoolCallback( run_queue, queue, NULL )) ret = GetLastError();
else
{
if (!(request->task_wait = CreateEventW( NULL, FALSE, FALSE, NULL ))) return ERROR_OUTOFMEMORY;
if (!(request->task_cancel = CreateEventW( NULL, FALSE, FALSE, NULL )))
{
CloseHandle( request->task_wait );
request->task_wait = NULL;
return ERROR_OUTOFMEMORY;
}
if (!TrySubmitThreadpoolCallback( task_proc, request, NULL ))
{
CloseHandle( request->task_wait );
request->task_wait = NULL;
CloseHandle( request->task_cancel );
request->task_cancel = NULL;
return GetLastError();
}
request->task_proc_running = TRUE;
InitializeCriticalSection( &request->task_cs );
request->task_cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": request.task_cs");
queue->proc_running = TRUE;
TRACE("started %p\n", queue);
return ERROR_SUCCESS;
}
EnterCriticalSection( &request->task_cs );
TRACE("queueing task %p\n", task );
list_add_tail( &request->task_queue, &task->entry );
LeaveCriticalSection( &request->task_cs );
error:
CloseHandle( queue->wait );
queue->wait = NULL;
CloseHandle( queue->cancel );
queue->cancel = NULL;
return ret;
}
SetEvent( request->task_wait );
static DWORD queue_task( struct object_header *object, struct queue *queue, struct task_header *task )
{
DWORD ret;
if ((ret = start_queue( object, queue ))) return ret;
EnterCriticalSection( &queue->cs );
TRACE("queueing task %p in %p\n", task, queue);
list_add_tail( &queue->tasks, &task->entry );
LeaveCriticalSection( &queue->cs );
SetEvent( queue->wait );
return ERROR_SUCCESS;
}
@ -2215,8 +2216,10 @@ end:
static void task_send_request( struct task_header *task )
{
struct request *request = (struct request *)task->object;
struct send_request *s = (struct send_request *)task;
send_request( s->hdr.request, s->headers, s->headers_len, s->optional, s->optional_len, s->total_len, s->context, TRUE );
send_request( request, s->headers, s->headers_len, s->optional, s->optional_len, s->total_len, s->context, TRUE );
heap_free( s->headers );
}
@ -2250,8 +2253,8 @@ BOOL WINAPI WinHttpSendRequest( HINTERNET hrequest, LPCWSTR headers, DWORD heade
{
struct send_request *s;
if (!(s = heap_alloc( sizeof(struct send_request) ))) return FALSE;
s->hdr.request = request;
if (!(s = heap_alloc( sizeof(*s) ))) return FALSE;
s->hdr.object = &request->hdr;
s->hdr.proc = task_send_request;
s->headers = strdupW( headers );
s->headers_len = headers_len;
@ -2261,10 +2264,9 @@ BOOL WINAPI WinHttpSendRequest( HINTERNET hrequest, LPCWSTR headers, DWORD heade
s->context = context;
addref_object( &request->hdr );
ret = queue_task( (struct task_header *)s );
ret = queue_task( &request->hdr, &request->queue, (struct task_header *)s );
}
else
ret = send_request( request, headers, headers_len, optional, optional_len, total_len, context, FALSE );
else ret = send_request( request, headers, headers_len, optional, optional_len, total_len, context, FALSE );
release_object( &request->hdr );
SetLastError( ret );
@ -2795,8 +2797,8 @@ static DWORD receive_response( struct request *request, BOOL async )
static void task_receive_response( struct task_header *task )
{
struct receive_response *r = (struct receive_response *)task;
receive_response( r->hdr.request, TRUE );
struct request *request = (struct request *)task->object;
receive_response( request, TRUE );
}
/***********************************************************************
@ -2825,15 +2827,14 @@ BOOL WINAPI WinHttpReceiveResponse( HINTERNET hrequest, LPVOID reserved )
{
struct receive_response *r;
if (!(r = heap_alloc( sizeof(struct receive_response) ))) return FALSE;
r->hdr.request = request;
r->hdr.proc = task_receive_response;
if (!(r = heap_alloc( sizeof(*r) ))) return FALSE;
r->hdr.object = &request->hdr;
r->hdr.proc = task_receive_response;
addref_object( &request->hdr );
ret = queue_task( (struct task_header *)r );
ret = queue_task( &request->hdr, &request->queue, (struct task_header *)r );
}
else
ret = receive_response( request, FALSE );
else ret = receive_response( request, FALSE );
release_object( &request->hdr );
SetLastError( ret );
@ -2875,8 +2876,10 @@ done:
static void task_query_data_available( struct task_header *task )
{
struct request *request = (struct request *)task->object;
struct query_data *q = (struct query_data *)task;
query_data_available( q->hdr.request, q->available, TRUE );
query_data_available( request, q->available, TRUE );
}
/***********************************************************************
@ -2905,16 +2908,15 @@ BOOL WINAPI WinHttpQueryDataAvailable( HINTERNET hrequest, LPDWORD available )
{
struct query_data *q;
if (!(q = heap_alloc( sizeof(struct query_data) ))) return FALSE;
q->hdr.request = request;
q->hdr.proc = task_query_data_available;
q->available = available;
if (!(q = heap_alloc( sizeof(*q) ))) return FALSE;
q->hdr.object = &request->hdr;
q->hdr.proc = task_query_data_available;
q->available = available;
addref_object( &request->hdr );
ret = queue_task( (struct task_header *)q );
ret = queue_task( &request->hdr, &request->queue, (struct task_header *)q );
}
else
ret = query_data_available( request, available, FALSE );
else ret = query_data_available( request, available, FALSE );
release_object( &request->hdr );
SetLastError( ret );
@ -2923,8 +2925,10 @@ BOOL WINAPI WinHttpQueryDataAvailable( HINTERNET hrequest, LPDWORD available )
static void task_read_data( struct task_header *task )
{
struct request *request = (struct request *)task->object;
struct read_data *r = (struct read_data *)task;
read_data( r->hdr.request, r->buffer, r->to_read, r->read, TRUE );
read_data( request, r->buffer, r->to_read, r->read, TRUE );
}
/***********************************************************************
@ -2953,18 +2957,17 @@ BOOL WINAPI WinHttpReadData( HINTERNET hrequest, LPVOID buffer, DWORD to_read, L
{
struct read_data *r;
if (!(r = heap_alloc( sizeof(struct read_data) ))) return FALSE;
r->hdr.request = request;
r->hdr.proc = task_read_data;
r->buffer = buffer;
r->to_read = to_read;
r->read = read;
if (!(r = heap_alloc( sizeof(*r) ))) return FALSE;
r->hdr.object = &request->hdr;
r->hdr.proc = task_read_data;
r->buffer = buffer;
r->to_read = to_read;
r->read = read;
addref_object( &request->hdr );
ret = queue_task( (struct task_header *)r );
ret = queue_task( &request->hdr, &request->queue, (struct task_header *)r );
}
else
ret = read_data( request, buffer, to_read, read, FALSE );
else ret = read_data( request, buffer, to_read, read, FALSE );
release_object( &request->hdr );
SetLastError( ret );
@ -2995,8 +2998,10 @@ static DWORD write_data( struct request *request, const void *buffer, DWORD to_w
static void task_write_data( struct task_header *task )
{
struct request *request = (struct request *)task->object;
struct write_data *w = (struct write_data *)task;
write_data( w->hdr.request, w->buffer, w->to_write, w->written, TRUE );
write_data( request, w->buffer, w->to_write, w->written, TRUE );
}
/***********************************************************************
@ -3025,18 +3030,17 @@ BOOL WINAPI WinHttpWriteData( HINTERNET hrequest, LPCVOID buffer, DWORD to_write
{
struct write_data *w;
if (!(w = heap_alloc( sizeof(struct write_data) ))) return FALSE;
w->hdr.request = request;
w->hdr.proc = task_write_data;
w->buffer = buffer;
w->to_write = to_write;
w->written = written;
if (!(w = heap_alloc( sizeof(*w) ))) return FALSE;
w->hdr.object = &request->hdr;
w->hdr.proc = task_write_data;
w->buffer = buffer;
w->to_write = to_write;
w->written = written;
addref_object( &request->hdr );
ret = queue_task( (struct task_header *)w );
ret = queue_task( &request->hdr, &request->queue, (struct task_header *)w );
}
else
ret = write_data( request, buffer, to_write, written, FALSE );
else ret = write_data( request, buffer, to_write, written, FALSE );
release_object( &request->hdr );
SetLastError( ret );

View File

@ -578,11 +578,11 @@ static void request_destroy( struct object_header *hdr )
TRACE("%p\n", request);
if (request->task_proc_running)
if (request->queue.proc_running)
{
/* Signal to the task proc to quit. It will call this again when it does. */
request->task_proc_running = FALSE;
SetEvent( request->task_cancel );
request->queue.proc_running = FALSE;
SetEvent( request->queue.cancel );
return;
}
release_object( &request->connect->hdr );
@ -613,6 +613,9 @@ static void request_destroy( struct object_header *hdr )
heap_free( request->creds[i][j].password );
}
}
request->queue.cs.DebugInfo->Spare[0] = 0;
DeleteCriticalSection( &request->queue.cs );
heap_free( request );
}
@ -1117,7 +1120,8 @@ HINTERNET WINAPI WinHttpOpenRequest( HINTERNET hconnect, LPCWSTR verb, LPCWSTR o
request->hdr.notify_mask = connect->hdr.notify_mask;
request->hdr.context = connect->hdr.context;
request->hdr.redirect_policy = connect->hdr.redirect_policy;
list_init( &request->task_queue );
InitializeCriticalSection( &request->queue.cs );
request->queue.cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": request.queue.cs");
addref_object( &connect->hdr );
request->connect = connect;

View File

@ -155,6 +155,16 @@ struct authinfo
BOOL finished; /* finished authenticating */
};
struct queue
{
struct object_header *object;
CRITICAL_SECTION cs;
BOOL proc_running;
HANDLE wait;
HANDLE cancel;
struct list tasks;
};
enum request_flags
{
REQUEST_FLAG_WEBSOCKET_UPGRADE = 0x01,
@ -196,11 +206,7 @@ struct request
DWORD num_headers;
struct authinfo *authinfo;
struct authinfo *proxy_authinfo;
HANDLE task_wait;
HANDLE task_cancel;
BOOL task_proc_running;
struct list task_queue;
CRITICAL_SECTION task_cs;
struct queue queue;
struct
{
WCHAR *username;
@ -217,7 +223,7 @@ struct socket
struct task_header
{
struct list entry;
struct request *request;
struct object_header *object;
void (*proc)( struct task_header * );
};