diff --git a/dlls/webservices/channel.c b/dlls/webservices/channel.c index a8ad01235f7..08e04935d34 100644 --- a/dlls/webservices/channel.c +++ b/dlls/webservices/channel.c @@ -86,6 +86,105 @@ static const struct prop_desc channel_props[] = { sizeof(ULONG), FALSE } /* WS_CHANNEL_PROPERTY_MAX_HTTP_REQUEST_HEADERS_BUFFER_SIZE */ }; +struct task +{ + struct list entry; + void (*proc)( struct task * ); +}; + +struct queue +{ + CRITICAL_SECTION cs; + HANDLE wait; + HANDLE cancel; + HANDLE ready; + struct list tasks; +}; + +static struct task *dequeue_task( struct queue *queue ) +{ + struct task *task; + + EnterCriticalSection( &queue->cs ); + TRACE( "%u tasks queued\n", list_count( &queue->tasks ) ); + task = LIST_ENTRY( list_head( &queue->tasks ), struct task, entry ); + if (task) list_remove( &task->entry ); + LeaveCriticalSection( &queue->cs ); + + TRACE( "returning task %p\n", task ); + return task; +} + +static void CALLBACK queue_runner( TP_CALLBACK_INSTANCE *instance, void *ctx ) +{ + struct queue *queue = ctx; + HANDLE handles[] = { queue->wait, queue->cancel }; + + SetEvent( queue->ready ); + for (;;) + { + DWORD err = WaitForMultipleObjects( 2, handles, FALSE, INFINITE ); + switch (err) + { + case WAIT_OBJECT_0: + { + struct task *task; + while ((task = dequeue_task( queue ))) + { + task->proc( task ); + heap_free( task ); + } + break; + } + case WAIT_OBJECT_0 + 1: + TRACE( "cancelled\n" ); + SetEvent( queue->ready ); + return; + + default: + ERR( "wait failed %u\n", err ); + return; + } + } +} + +static HRESULT start_queue( struct queue *queue ) +{ + HRESULT hr = E_OUTOFMEMORY; + + if (queue->wait) return S_OK; + list_init( &queue->tasks ); + if (!(queue->wait = CreateEventW( NULL, FALSE, FALSE, NULL ))) goto error; + if (!(queue->cancel = CreateEventW( NULL, FALSE, FALSE, NULL ))) goto error; + if (!(queue->ready = CreateEventW( NULL, FALSE, FALSE, NULL ))) goto error; + if (!TrySubmitThreadpoolCallback( queue_runner, queue, NULL )) hr = HRESULT_FROM_WIN32( GetLastError() ); + else + { + WaitForSingleObject( queue->ready, INFINITE ); + return S_OK; + } + +error: + CloseHandle( queue->wait ); + CloseHandle( queue->cancel ); + CloseHandle( queue->ready ); + return hr; +} + +static HRESULT queue_task( struct queue *queue, struct task *task ) +{ + HRESULT hr; + if ((hr = start_queue( queue )) != S_OK) return hr; + + EnterCriticalSection( &queue->cs ); + TRACE( "queueing task %p\n", task ); + list_add_tail( &queue->tasks, &task->entry ); + LeaveCriticalSection( &queue->cs ); + + SetEvent( queue->wait ); + return WS_S_ASYNC; +}; + enum session_state { SESSION_STATE_UNINITIALIZED, @@ -107,6 +206,8 @@ struct channel enum session_state session_state; struct dictionary dict_send; struct dictionary dict_recv; + struct queue send_q; + struct queue recv_q; union { struct @@ -147,22 +248,57 @@ static struct channel *alloc_channel(void) InitializeCriticalSection( &ret->cs ); ret->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": channel.cs"); + InitializeCriticalSection( &ret->send_q.cs ); + ret->send_q.cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": channel.send_q.cs"); + + InitializeCriticalSection( &ret->recv_q.cs ); + ret->recv_q.cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": channel.recv_q.cs"); + prop_init( channel_props, count, ret->prop, &ret[1] ); ret->prop_count = count; return ret; } +static void clear_addr( WS_ENDPOINT_ADDRESS *addr ) +{ + heap_free( addr->url.chars ); + addr->url.chars = NULL; + addr->url.length = 0; +} + +static void clear_queue( struct queue *queue ) +{ + struct list *ptr; + + SetEvent( queue->cancel ); + WaitForSingleObject( queue->ready, INFINITE ); + + while ((ptr = list_head( &queue->tasks ))) + { + struct task *task = LIST_ENTRY( ptr, struct task, entry ); + list_remove( &task->entry ); + heap_free( task ); + } + + CloseHandle( queue->wait ); + queue->wait = NULL; + CloseHandle( queue->cancel ); + queue->cancel = NULL; + CloseHandle( queue->ready ); + queue->ready = NULL; +} + static void reset_channel( struct channel *channel ) { - channel->state = WS_CHANNEL_STATE_CREATED; - heap_free( channel->addr.url.chars ); - channel->addr.url.chars = NULL; - channel->addr.url.length = 0; - channel->msg = NULL; - channel->read_size = 0; - channel->session_state = SESSION_STATE_UNINITIALIZED; + clear_queue( &channel->send_q ); + clear_queue( &channel->recv_q ); + channel->state = WS_CHANNEL_STATE_CREATED; + channel->session_state = SESSION_STATE_UNINITIALIZED; + clear_addr( &channel->addr ); clear_dict( &channel->dict_send ); clear_dict( &channel->dict_recv ); + channel->msg = NULL; + channel->read_size = 0; switch (channel->binding) { @@ -201,6 +337,12 @@ static void free_channel( struct channel *channel ) heap_free( channel->read_buf ); + channel->send_q.cs.DebugInfo->Spare[0] = 0; + DeleteCriticalSection( &channel->send_q.cs ); + + channel->recv_q.cs.DebugInfo->Spare[0] = 0; + DeleteCriticalSection( &channel->recv_q.cs ); + channel->cs.DebugInfo->Spare[0] = 0; DeleteCriticalSection( &channel->cs ); heap_free( channel ); @@ -1571,7 +1713,7 @@ static HRESULT receive_message_sock( struct channel *channel, SOCKET socket ) return init_reader( channel ); } -static HRESULT receive_message( struct channel *channel ) +static HRESULT receive_message_bytes( struct channel *channel ) { HRESULT hr; if ((hr = connect_channel( channel )) != S_OK) return hr; @@ -1623,7 +1765,7 @@ HRESULT channel_receive_message( WS_CHANNEL *handle ) return E_INVALIDARG; } - hr = receive_message( channel ); + hr = receive_message_bytes( channel ); LeaveCriticalSection( &channel->cs ); return hr; @@ -1656,6 +1798,80 @@ static HRESULT read_message( WS_MESSAGE *handle, WS_XML_READER *reader, const WS return WsReadEnvelopeEnd( handle, NULL ); } +static HRESULT receive_message( struct channel *channel, WS_MESSAGE *msg, const WS_MESSAGE_DESCRIPTION **desc, + ULONG count, WS_RECEIVE_OPTION option, WS_READ_OPTION read_option, WS_HEAP *heap, + void *value, ULONG size, ULONG *index ) +{ + HRESULT hr; + ULONG i; + + if ((hr = receive_message_bytes( channel )) != S_OK) return hr; + for (i = 0; i < count; i++) + { + const WS_ELEMENT_DESCRIPTION *body = desc[i]->bodyElementDescription; + if ((hr = read_message( msg, channel->reader, body, read_option, heap, value, size )) == S_OK) + { + if (index) *index = i; + break; + } + if ((hr = WsResetMessage( msg, NULL )) != S_OK) return hr; + if ((hr = init_reader( channel )) != S_OK) return hr; + } + return (i == count) ? WS_E_INVALID_FORMAT : S_OK; +} + +struct receive_message +{ + struct task task; + struct channel *channel; + WS_MESSAGE *msg; + const WS_MESSAGE_DESCRIPTION **desc; + ULONG count; + WS_RECEIVE_OPTION option; + WS_READ_OPTION read_option; + WS_HEAP *heap; + void *value; + ULONG size; + ULONG *index; + const WS_ASYNC_CONTEXT *ctx; +}; + +static void receive_message_proc( struct task *task ) +{ + struct receive_message *r = (struct receive_message *)task; + HRESULT hr; + + hr = receive_message( r->channel, r->msg, r->desc, r->count, r->option, r->read_option, r->heap, r->value, + r->size, r->index ); + + TRACE( "calling %p(%08x)\n", r->ctx->callback, hr ); + r->ctx->callback( hr, WS_LONG_CALLBACK, r->ctx->callbackState ); + TRACE( "%p returned\n", r->ctx->callback ); +} + +static HRESULT queue_receive_message( struct channel *channel, WS_MESSAGE *msg, const WS_MESSAGE_DESCRIPTION **desc, + ULONG count, WS_RECEIVE_OPTION option, WS_READ_OPTION read_option, + WS_HEAP *heap, void *value, ULONG size, ULONG *index, + const WS_ASYNC_CONTEXT *ctx ) +{ + struct receive_message *r; + + if (!(r = heap_alloc( sizeof(*r) ))) return E_OUTOFMEMORY; + r->task.proc = receive_message_proc; + r->channel = channel; + r->msg = msg; + r->desc = desc; + r->count = count; + r->option = option; + r->read_option = read_option; + r->heap = heap; + r->value = value; + r->size = size; + r->index = index; + r->ctx = ctx; + return queue_task( &channel->recv_q, &r->task ); +} + /************************************************************************** * WsReceiveMessage [webservices.@] */ @@ -1665,12 +1881,10 @@ HRESULT WINAPI WsReceiveMessage( WS_CHANNEL *handle, WS_MESSAGE *msg, const WS_M { struct channel *channel = (struct channel *)handle; HRESULT hr; - ULONG i; TRACE( "%p %p %p %u %08x %08x %p %p %u %p %p %p\n", handle, msg, desc, count, option, read_option, heap, value, size, index, ctx, error ); if (error) FIXME( "ignoring error parameter\n" ); - if (ctx) FIXME( "ignoring ctx parameter\n" ); if (!channel || !msg || !desc || !count) return E_INVALIDARG; @@ -1682,24 +1896,11 @@ HRESULT WINAPI WsReceiveMessage( WS_CHANNEL *handle, WS_MESSAGE *msg, const WS_M return E_INVALIDARG; } - if (!channel->read_size) hr = receive_message( channel ); - else if (option == WS_RECEIVE_OPTIONAL_MESSAGE) hr = WS_S_END; - else hr = WS_E_INVALID_FORMAT; + if (ctx) + hr = queue_receive_message( channel, msg, desc, count, option, read_option, heap, value, size, index, ctx ); + else + hr = receive_message( channel, msg, desc, count, option, read_option, heap, value, size, index ); - if (hr != S_OK) goto done; - - for (i = 0; i < count; i++) - { - if ((hr = read_message( msg, channel->reader, desc[i]->bodyElementDescription, read_option, heap, - value, size )) == S_OK) - { - if (index) *index = i; - break; - } - if ((hr = init_reader( channel )) != S_OK) break; - } - -done: LeaveCriticalSection( &channel->cs ); return hr; } @@ -1727,7 +1928,7 @@ HRESULT WINAPI WsReadMessageStart( WS_CHANNEL *handle, WS_MESSAGE *msg, const WS return E_INVALIDARG; } - if ((hr = receive_message( channel )) == S_OK) + if ((hr = receive_message_bytes( channel )) == S_OK) { hr = WsReadEnvelopeStart( msg, channel->reader, NULL, NULL, NULL ); }