webservices: Add asynchronous support for WsReceiveMessage.

Signed-off-by: Hans Leidekker <hans@codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
Hans Leidekker 2017-09-21 11:26:14 +02:00 committed by Alexandre Julliard
parent 049d8cfc7d
commit 97ccfe2078
1 changed files with 230 additions and 29 deletions

View File

@ -86,6 +86,105 @@ static const struct prop_desc channel_props[] =
{ sizeof(ULONG), FALSE } /* WS_CHANNEL_PROPERTY_MAX_HTTP_REQUEST_HEADERS_BUFFER_SIZE */
};
struct task
{
struct list entry;
void (*proc)( struct task * );
};
struct queue
{
CRITICAL_SECTION cs;
HANDLE wait;
HANDLE cancel;
HANDLE ready;
struct list tasks;
};
static struct task *dequeue_task( struct queue *queue )
{
struct task *task;
EnterCriticalSection( &queue->cs );
TRACE( "%u tasks queued\n", list_count( &queue->tasks ) );
task = LIST_ENTRY( list_head( &queue->tasks ), struct task, entry );
if (task) list_remove( &task->entry );
LeaveCriticalSection( &queue->cs );
TRACE( "returning task %p\n", task );
return task;
}
static void CALLBACK queue_runner( TP_CALLBACK_INSTANCE *instance, void *ctx )
{
struct queue *queue = ctx;
HANDLE handles[] = { queue->wait, queue->cancel };
SetEvent( queue->ready );
for (;;)
{
DWORD err = WaitForMultipleObjects( 2, handles, FALSE, INFINITE );
switch (err)
{
case WAIT_OBJECT_0:
{
struct task *task;
while ((task = dequeue_task( queue )))
{
task->proc( task );
heap_free( task );
}
break;
}
case WAIT_OBJECT_0 + 1:
TRACE( "cancelled\n" );
SetEvent( queue->ready );
return;
default:
ERR( "wait failed %u\n", err );
return;
}
}
}
static HRESULT start_queue( struct queue *queue )
{
HRESULT hr = E_OUTOFMEMORY;
if (queue->wait) return S_OK;
list_init( &queue->tasks );
if (!(queue->wait = CreateEventW( NULL, FALSE, FALSE, NULL ))) goto error;
if (!(queue->cancel = CreateEventW( NULL, FALSE, FALSE, NULL ))) goto error;
if (!(queue->ready = CreateEventW( NULL, FALSE, FALSE, NULL ))) goto error;
if (!TrySubmitThreadpoolCallback( queue_runner, queue, NULL )) hr = HRESULT_FROM_WIN32( GetLastError() );
else
{
WaitForSingleObject( queue->ready, INFINITE );
return S_OK;
}
error:
CloseHandle( queue->wait );
CloseHandle( queue->cancel );
CloseHandle( queue->ready );
return hr;
}
static HRESULT queue_task( struct queue *queue, struct task *task )
{
HRESULT hr;
if ((hr = start_queue( queue )) != S_OK) return hr;
EnterCriticalSection( &queue->cs );
TRACE( "queueing task %p\n", task );
list_add_tail( &queue->tasks, &task->entry );
LeaveCriticalSection( &queue->cs );
SetEvent( queue->wait );
return WS_S_ASYNC;
};
enum session_state
{
SESSION_STATE_UNINITIALIZED,
@ -107,6 +206,8 @@ struct channel
enum session_state session_state;
struct dictionary dict_send;
struct dictionary dict_recv;
struct queue send_q;
struct queue recv_q;
union
{
struct
@ -147,22 +248,57 @@ static struct channel *alloc_channel(void)
InitializeCriticalSection( &ret->cs );
ret->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": channel.cs");
InitializeCriticalSection( &ret->send_q.cs );
ret->send_q.cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": channel.send_q.cs");
InitializeCriticalSection( &ret->recv_q.cs );
ret->recv_q.cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": channel.recv_q.cs");
prop_init( channel_props, count, ret->prop, &ret[1] );
ret->prop_count = count;
return ret;
}
static void clear_addr( WS_ENDPOINT_ADDRESS *addr )
{
heap_free( addr->url.chars );
addr->url.chars = NULL;
addr->url.length = 0;
}
static void clear_queue( struct queue *queue )
{
struct list *ptr;
SetEvent( queue->cancel );
WaitForSingleObject( queue->ready, INFINITE );
while ((ptr = list_head( &queue->tasks )))
{
struct task *task = LIST_ENTRY( ptr, struct task, entry );
list_remove( &task->entry );
heap_free( task );
}
CloseHandle( queue->wait );
queue->wait = NULL;
CloseHandle( queue->cancel );
queue->cancel = NULL;
CloseHandle( queue->ready );
queue->ready = NULL;
}
static void reset_channel( struct channel *channel )
{
channel->state = WS_CHANNEL_STATE_CREATED;
heap_free( channel->addr.url.chars );
channel->addr.url.chars = NULL;
channel->addr.url.length = 0;
channel->msg = NULL;
channel->read_size = 0;
channel->session_state = SESSION_STATE_UNINITIALIZED;
clear_queue( &channel->send_q );
clear_queue( &channel->recv_q );
channel->state = WS_CHANNEL_STATE_CREATED;
channel->session_state = SESSION_STATE_UNINITIALIZED;
clear_addr( &channel->addr );
clear_dict( &channel->dict_send );
clear_dict( &channel->dict_recv );
channel->msg = NULL;
channel->read_size = 0;
switch (channel->binding)
{
@ -201,6 +337,12 @@ static void free_channel( struct channel *channel )
heap_free( channel->read_buf );
channel->send_q.cs.DebugInfo->Spare[0] = 0;
DeleteCriticalSection( &channel->send_q.cs );
channel->recv_q.cs.DebugInfo->Spare[0] = 0;
DeleteCriticalSection( &channel->recv_q.cs );
channel->cs.DebugInfo->Spare[0] = 0;
DeleteCriticalSection( &channel->cs );
heap_free( channel );
@ -1571,7 +1713,7 @@ static HRESULT receive_message_sock( struct channel *channel, SOCKET socket )
return init_reader( channel );
}
static HRESULT receive_message( struct channel *channel )
static HRESULT receive_message_bytes( struct channel *channel )
{
HRESULT hr;
if ((hr = connect_channel( channel )) != S_OK) return hr;
@ -1623,7 +1765,7 @@ HRESULT channel_receive_message( WS_CHANNEL *handle )
return E_INVALIDARG;
}
hr = receive_message( channel );
hr = receive_message_bytes( channel );
LeaveCriticalSection( &channel->cs );
return hr;
@ -1656,6 +1798,80 @@ static HRESULT read_message( WS_MESSAGE *handle, WS_XML_READER *reader, const WS
return WsReadEnvelopeEnd( handle, NULL );
}
static HRESULT receive_message( struct channel *channel, WS_MESSAGE *msg, const WS_MESSAGE_DESCRIPTION **desc,
ULONG count, WS_RECEIVE_OPTION option, WS_READ_OPTION read_option, WS_HEAP *heap,
void *value, ULONG size, ULONG *index )
{
HRESULT hr;
ULONG i;
if ((hr = receive_message_bytes( channel )) != S_OK) return hr;
for (i = 0; i < count; i++)
{
const WS_ELEMENT_DESCRIPTION *body = desc[i]->bodyElementDescription;
if ((hr = read_message( msg, channel->reader, body, read_option, heap, value, size )) == S_OK)
{
if (index) *index = i;
break;
}
if ((hr = WsResetMessage( msg, NULL )) != S_OK) return hr;
if ((hr = init_reader( channel )) != S_OK) return hr;
}
return (i == count) ? WS_E_INVALID_FORMAT : S_OK;
}
struct receive_message
{
struct task task;
struct channel *channel;
WS_MESSAGE *msg;
const WS_MESSAGE_DESCRIPTION **desc;
ULONG count;
WS_RECEIVE_OPTION option;
WS_READ_OPTION read_option;
WS_HEAP *heap;
void *value;
ULONG size;
ULONG *index;
const WS_ASYNC_CONTEXT *ctx;
};
static void receive_message_proc( struct task *task )
{
struct receive_message *r = (struct receive_message *)task;
HRESULT hr;
hr = receive_message( r->channel, r->msg, r->desc, r->count, r->option, r->read_option, r->heap, r->value,
r->size, r->index );
TRACE( "calling %p(%08x)\n", r->ctx->callback, hr );
r->ctx->callback( hr, WS_LONG_CALLBACK, r->ctx->callbackState );
TRACE( "%p returned\n", r->ctx->callback );
}
static HRESULT queue_receive_message( struct channel *channel, WS_MESSAGE *msg, const WS_MESSAGE_DESCRIPTION **desc,
ULONG count, WS_RECEIVE_OPTION option, WS_READ_OPTION read_option,
WS_HEAP *heap, void *value, ULONG size, ULONG *index,
const WS_ASYNC_CONTEXT *ctx )
{
struct receive_message *r;
if (!(r = heap_alloc( sizeof(*r) ))) return E_OUTOFMEMORY;
r->task.proc = receive_message_proc;
r->channel = channel;
r->msg = msg;
r->desc = desc;
r->count = count;
r->option = option;
r->read_option = read_option;
r->heap = heap;
r->value = value;
r->size = size;
r->index = index;
r->ctx = ctx;
return queue_task( &channel->recv_q, &r->task );
}
/**************************************************************************
* WsReceiveMessage [webservices.@]
*/
@ -1665,12 +1881,10 @@ HRESULT WINAPI WsReceiveMessage( WS_CHANNEL *handle, WS_MESSAGE *msg, const WS_M
{
struct channel *channel = (struct channel *)handle;
HRESULT hr;
ULONG i;
TRACE( "%p %p %p %u %08x %08x %p %p %u %p %p %p\n", handle, msg, desc, count, option, read_option, heap,
value, size, index, ctx, error );
if (error) FIXME( "ignoring error parameter\n" );
if (ctx) FIXME( "ignoring ctx parameter\n" );
if (!channel || !msg || !desc || !count) return E_INVALIDARG;
@ -1682,24 +1896,11 @@ HRESULT WINAPI WsReceiveMessage( WS_CHANNEL *handle, WS_MESSAGE *msg, const WS_M
return E_INVALIDARG;
}
if (!channel->read_size) hr = receive_message( channel );
else if (option == WS_RECEIVE_OPTIONAL_MESSAGE) hr = WS_S_END;
else hr = WS_E_INVALID_FORMAT;
if (ctx)
hr = queue_receive_message( channel, msg, desc, count, option, read_option, heap, value, size, index, ctx );
else
hr = receive_message( channel, msg, desc, count, option, read_option, heap, value, size, index );
if (hr != S_OK) goto done;
for (i = 0; i < count; i++)
{
if ((hr = read_message( msg, channel->reader, desc[i]->bodyElementDescription, read_option, heap,
value, size )) == S_OK)
{
if (index) *index = i;
break;
}
if ((hr = init_reader( channel )) != S_OK) break;
}
done:
LeaveCriticalSection( &channel->cs );
return hr;
}
@ -1727,7 +1928,7 @@ HRESULT WINAPI WsReadMessageStart( WS_CHANNEL *handle, WS_MESSAGE *msg, const WS
return E_INVALIDARG;
}
if ((hr = receive_message( channel )) == S_OK)
if ((hr = receive_message_bytes( channel )) == S_OK)
{
hr = WsReadEnvelopeStart( msg, channel->reader, NULL, NULL, NULL );
}