ntdll: Implement threadpool I/O queues.
Signed-off-by: Zebediah Figura <z.figura12@gmail.com> Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
parent
74d830755c
commit
75e8c4493a
|
@ -1060,6 +1060,7 @@
|
|||
@ stdcall RtlxUnicodeStringToAnsiSize(ptr) RtlUnicodeStringToAnsiSize
|
||||
@ stdcall RtlxUnicodeStringToOemSize(ptr) RtlUnicodeStringToOemSize
|
||||
@ stdcall TpAllocCleanupGroup(ptr)
|
||||
@ stdcall TpAllocIoCompletion(ptr ptr ptr ptr ptr)
|
||||
@ stdcall TpAllocPool(ptr ptr)
|
||||
@ stdcall TpAllocTimer(ptr ptr ptr ptr)
|
||||
@ stdcall TpAllocWait(ptr ptr ptr ptr)
|
||||
|
@ -1070,12 +1071,14 @@
|
|||
@ stdcall TpCallbackReleaseSemaphoreOnCompletion(ptr long long)
|
||||
@ stdcall TpCallbackSetEventOnCompletion(ptr long)
|
||||
@ stdcall TpCallbackUnloadDllOnCompletion(ptr ptr)
|
||||
@ stdcall TpCancelAsyncIoOperation(ptr)
|
||||
@ stdcall TpDisassociateCallback(ptr)
|
||||
@ stdcall TpIsTimerSet(ptr)
|
||||
@ stdcall TpPostWork(ptr)
|
||||
@ stdcall TpQueryPoolStackInformation(ptr ptr)
|
||||
@ stdcall TpReleaseCleanupGroup(ptr)
|
||||
@ stdcall TpReleaseCleanupGroupMembers(ptr long ptr)
|
||||
@ stdcall TpReleaseIoCompletion(ptr)
|
||||
@ stdcall TpReleasePool(ptr)
|
||||
@ stdcall TpReleaseTimer(ptr)
|
||||
@ stdcall TpReleaseWait(ptr)
|
||||
|
@ -1087,6 +1090,7 @@
|
|||
@ stdcall TpSetWait(ptr long ptr)
|
||||
@ stdcall TpSimpleTryPost(ptr ptr ptr)
|
||||
@ stdcall TpStartAsyncIoOperation(ptr)
|
||||
@ stdcall TpWaitForIoCompletion(ptr long)
|
||||
@ stdcall TpWaitForTimer(ptr long)
|
||||
@ stdcall TpWaitForWait(ptr long)
|
||||
@ stdcall TpWaitForWork(ptr long)
|
||||
|
|
|
@ -30,6 +30,13 @@
|
|||
#include "wine/server.h"
|
||||
#include "wine/asm.h"
|
||||
|
||||
#define DECLARE_CRITICAL_SECTION(cs) \
|
||||
static RTL_CRITICAL_SECTION cs; \
|
||||
static RTL_CRITICAL_SECTION_DEBUG cs##_debug = \
|
||||
{ 0, 0, &cs, { &cs##_debug.ProcessLocksList, &cs##_debug.ProcessLocksList }, \
|
||||
0, 0, { (DWORD_PTR)(__FILE__ ": " # cs) }}; \
|
||||
static RTL_CRITICAL_SECTION cs = { &cs##_debug, -1, 0, 0, 0, 0 };
|
||||
|
||||
#define MAX_NT_PATH_LENGTH 277
|
||||
|
||||
#define MAX_DOS_DRIVES 26
|
||||
|
|
|
@ -18,22 +18,27 @@
|
|||
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
|
||||
*/
|
||||
|
||||
#define NONAMELESSSTRUCT
|
||||
#define NONAMELESSUNION
|
||||
#include "ntdll_test.h"
|
||||
|
||||
static HMODULE hntdll = 0;
|
||||
static NTSTATUS (WINAPI *pTpAllocCleanupGroup)(TP_CLEANUP_GROUP **);
|
||||
static NTSTATUS (WINAPI *pTpAllocIoCompletion)(TP_IO **,HANDLE,PTP_IO_CALLBACK,void *,TP_CALLBACK_ENVIRON *);
|
||||
static NTSTATUS (WINAPI *pTpAllocPool)(TP_POOL **,PVOID);
|
||||
static NTSTATUS (WINAPI *pTpAllocTimer)(TP_TIMER **,PTP_TIMER_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
|
||||
static NTSTATUS (WINAPI *pTpAllocWait)(TP_WAIT **,PTP_WAIT_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
|
||||
static NTSTATUS (WINAPI *pTpAllocWork)(TP_WORK **,PTP_WORK_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
|
||||
static NTSTATUS (WINAPI *pTpCallbackMayRunLong)(TP_CALLBACK_INSTANCE *);
|
||||
static VOID (WINAPI *pTpCallbackReleaseSemaphoreOnCompletion)(TP_CALLBACK_INSTANCE *,HANDLE,DWORD);
|
||||
static void (WINAPI *pTpCancelAsyncIoOperation)(TP_IO *);
|
||||
static VOID (WINAPI *pTpDisassociateCallback)(TP_CALLBACK_INSTANCE *);
|
||||
static BOOL (WINAPI *pTpIsTimerSet)(TP_TIMER *);
|
||||
static VOID (WINAPI *pTpReleaseWait)(TP_WAIT *);
|
||||
static VOID (WINAPI *pTpPostWork)(TP_WORK *);
|
||||
static VOID (WINAPI *pTpReleaseCleanupGroup)(TP_CLEANUP_GROUP *);
|
||||
static VOID (WINAPI *pTpReleaseCleanupGroupMembers)(TP_CLEANUP_GROUP *,BOOL,PVOID);
|
||||
static void (WINAPI *pTpReleaseIoCompletion)(TP_IO *);
|
||||
static VOID (WINAPI *pTpReleasePool)(TP_POOL *);
|
||||
static VOID (WINAPI *pTpReleaseTimer)(TP_TIMER *);
|
||||
static VOID (WINAPI *pTpReleaseWork)(TP_WORK *);
|
||||
|
@ -41,6 +46,8 @@ static VOID (WINAPI *pTpSetPoolMaxThreads)(TP_POOL *,DWORD);
|
|||
static VOID (WINAPI *pTpSetTimer)(TP_TIMER *,LARGE_INTEGER *,LONG,LONG);
|
||||
static VOID (WINAPI *pTpSetWait)(TP_WAIT *,HANDLE,LARGE_INTEGER *);
|
||||
static NTSTATUS (WINAPI *pTpSimpleTryPost)(PTP_SIMPLE_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
|
||||
static void (WINAPI *pTpStartAsyncIoOperation)(TP_IO *);
|
||||
static void (WINAPI *pTpWaitForIoCompletion)(TP_IO *,BOOL);
|
||||
static VOID (WINAPI *pTpWaitForTimer)(TP_TIMER *,BOOL);
|
||||
static VOID (WINAPI *pTpWaitForWait)(TP_WAIT *,BOOL);
|
||||
static VOID (WINAPI *pTpWaitForWork)(TP_WORK *,BOOL);
|
||||
|
@ -63,10 +70,12 @@ static BOOL init_threadpool(void)
|
|||
}
|
||||
|
||||
NTDLL_GET_PROC(TpAllocCleanupGroup);
|
||||
NTDLL_GET_PROC(TpAllocIoCompletion);
|
||||
NTDLL_GET_PROC(TpAllocPool);
|
||||
NTDLL_GET_PROC(TpAllocTimer);
|
||||
NTDLL_GET_PROC(TpAllocWait);
|
||||
NTDLL_GET_PROC(TpAllocWork);
|
||||
NTDLL_GET_PROC(TpCancelAsyncIoOperation);
|
||||
NTDLL_GET_PROC(TpCallbackMayRunLong);
|
||||
NTDLL_GET_PROC(TpCallbackReleaseSemaphoreOnCompletion);
|
||||
NTDLL_GET_PROC(TpDisassociateCallback);
|
||||
|
@ -74,6 +83,7 @@ static BOOL init_threadpool(void)
|
|||
NTDLL_GET_PROC(TpPostWork);
|
||||
NTDLL_GET_PROC(TpReleaseCleanupGroup);
|
||||
NTDLL_GET_PROC(TpReleaseCleanupGroupMembers);
|
||||
NTDLL_GET_PROC(TpReleaseIoCompletion);
|
||||
NTDLL_GET_PROC(TpReleasePool);
|
||||
NTDLL_GET_PROC(TpReleaseTimer);
|
||||
NTDLL_GET_PROC(TpReleaseWait);
|
||||
|
@ -82,6 +92,8 @@ static BOOL init_threadpool(void)
|
|||
NTDLL_GET_PROC(TpSetTimer);
|
||||
NTDLL_GET_PROC(TpSetWait);
|
||||
NTDLL_GET_PROC(TpSimpleTryPost);
|
||||
NTDLL_GET_PROC(TpStartAsyncIoOperation);
|
||||
NTDLL_GET_PROC(TpWaitForIoCompletion);
|
||||
NTDLL_GET_PROC(TpWaitForTimer);
|
||||
NTDLL_GET_PROC(TpWaitForWait);
|
||||
NTDLL_GET_PROC(TpWaitForWork);
|
||||
|
@ -1906,6 +1918,172 @@ static void test_tp_multi_wait(void)
|
|||
CloseHandle(semaphore);
|
||||
}
|
||||
|
||||
struct io_cb_ctx
|
||||
{
|
||||
unsigned int count;
|
||||
void *ovl;
|
||||
NTSTATUS ret;
|
||||
ULONG_PTR length;
|
||||
TP_IO *io;
|
||||
};
|
||||
|
||||
static void CALLBACK io_cb(TP_CALLBACK_INSTANCE *instance, void *userdata,
|
||||
void *cvalue, IO_STATUS_BLOCK *iosb, TP_IO *io)
|
||||
{
|
||||
struct io_cb_ctx *ctx = userdata;
|
||||
++ctx->count;
|
||||
ctx->ovl = cvalue;
|
||||
ctx->ret = iosb->u.Status;
|
||||
ctx->length = iosb->Information;
|
||||
ctx->io = io;
|
||||
}
|
||||
|
||||
static DWORD WINAPI io_wait_thread(void *arg)
|
||||
{
|
||||
TP_IO *io = arg;
|
||||
pTpWaitForIoCompletion(io, FALSE);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void test_tp_io(void)
|
||||
{
|
||||
TP_CALLBACK_ENVIRON environment = {.Version = 1};
|
||||
OVERLAPPED ovl = {}, ovl2 = {};
|
||||
HANDLE client, server, thread;
|
||||
struct io_cb_ctx userdata;
|
||||
char in[1], in2[1];
|
||||
const char out[1];
|
||||
NTSTATUS status;
|
||||
DWORD ret_size;
|
||||
TP_POOL *pool;
|
||||
TP_IO *io;
|
||||
BOOL ret;
|
||||
|
||||
ovl.hEvent = CreateEventW(NULL, TRUE, FALSE, NULL);
|
||||
|
||||
status = pTpAllocPool(&pool, NULL);
|
||||
ok(!status, "failed to allocate pool, status %#x\n", status);
|
||||
|
||||
server = CreateNamedPipeA("\\\\.\\pipe\\wine_tp_test",
|
||||
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, 0, 1, 1024, 1024, 0, NULL);
|
||||
ok(server != INVALID_HANDLE_VALUE, "Failed to create server pipe, error %u.\n", GetLastError());
|
||||
client = CreateFileA("\\\\.\\pipe\\wine_tp_test", GENERIC_READ | GENERIC_WRITE,
|
||||
0, NULL, OPEN_EXISTING, 0, 0);
|
||||
ok(client != INVALID_HANDLE_VALUE, "Failed to create client pipe, error %u.\n", GetLastError());
|
||||
|
||||
environment.Pool = pool;
|
||||
io = NULL;
|
||||
status = pTpAllocIoCompletion(&io, server, io_cb, &userdata, &environment);
|
||||
ok(!status, "got %#x\n", status);
|
||||
ok(!!io, "expected non-NULL TP_IO\n");
|
||||
|
||||
pTpWaitForIoCompletion(io, FALSE);
|
||||
|
||||
userdata.count = 0;
|
||||
pTpStartAsyncIoOperation(io);
|
||||
|
||||
thread = CreateThread(NULL, 0, io_wait_thread, io, 0, NULL);
|
||||
ok(WaitForSingleObject(thread, 100) == WAIT_TIMEOUT, "TpWaitForIoCompletion() should not return\n");
|
||||
|
||||
ret = ReadFile(server, in, sizeof(in), NULL, &ovl);
|
||||
ok(!ret, "wrong ret %d\n", ret);
|
||||
ok(GetLastError() == ERROR_IO_PENDING, "wrong error %u\n", GetLastError());
|
||||
|
||||
ret = WriteFile(client, out, sizeof(out), &ret_size, NULL);
|
||||
ok(ret, "WriteFile() failed, error %u\n", GetLastError());
|
||||
|
||||
pTpWaitForIoCompletion(io, FALSE);
|
||||
ok(userdata.count == 1, "callback ran %u times\n", userdata.count);
|
||||
ok(userdata.ovl == &ovl, "expected %p, got %p\n", &ovl, userdata.ovl);
|
||||
ok(userdata.ret == STATUS_SUCCESS, "got status %#x\n", userdata.ret);
|
||||
ok(userdata.length == 1, "got length %lu\n", userdata.length);
|
||||
ok(userdata.io == io, "expected %p, got %p\n", io, userdata.io);
|
||||
|
||||
ok(!WaitForSingleObject(thread, 1000), "wait timed out\n");
|
||||
CloseHandle(thread);
|
||||
|
||||
userdata.count = 0;
|
||||
pTpStartAsyncIoOperation(io);
|
||||
pTpStartAsyncIoOperation(io);
|
||||
|
||||
ret = ReadFile(server, in, sizeof(in), NULL, &ovl);
|
||||
ok(!ret, "wrong ret %d\n", ret);
|
||||
ok(GetLastError() == ERROR_IO_PENDING, "wrong error %u\n", GetLastError());
|
||||
ret = ReadFile(server, in2, sizeof(in2), NULL, &ovl2);
|
||||
ok(!ret, "wrong ret %d\n", ret);
|
||||
ok(GetLastError() == ERROR_IO_PENDING, "wrong error %u\n", GetLastError());
|
||||
|
||||
ret = WriteFile(client, out, sizeof(out), &ret_size, NULL);
|
||||
ok(ret, "WriteFile() failed, error %u\n", GetLastError());
|
||||
ret = WriteFile(client, out, sizeof(out), &ret_size, NULL);
|
||||
ok(ret, "WriteFile() failed, error %u\n", GetLastError());
|
||||
|
||||
pTpWaitForIoCompletion(io, FALSE);
|
||||
ok(userdata.count == 2, "callback ran %u times\n", userdata.count);
|
||||
ok(userdata.ret == STATUS_SUCCESS, "got status %#x\n", userdata.ret);
|
||||
ok(userdata.length == 1, "got length %lu\n", userdata.length);
|
||||
ok(userdata.io == io, "expected %p, got %p\n", io, userdata.io);
|
||||
|
||||
/* The documentation is a bit unclear about passing TRUE to
|
||||
* WaitForThreadpoolIoCallbacks()—"pending I/O requests are not canceled"
|
||||
* [as with CancelIoEx()], but pending threadpool callbacks are, even those
|
||||
* which have not yet reached the completion port [as with
|
||||
* TpCancelAsyncIoOperation()]. */
|
||||
userdata.count = 0;
|
||||
pTpStartAsyncIoOperation(io);
|
||||
|
||||
pTpWaitForIoCompletion(io, TRUE);
|
||||
ok(!userdata.count, "callback ran %u times\n", userdata.count);
|
||||
|
||||
pTpStartAsyncIoOperation(io);
|
||||
|
||||
ret = WriteFile(client, out, sizeof(out), &ret_size, NULL);
|
||||
ok(ret, "WriteFile() failed, error %u\n", GetLastError());
|
||||
|
||||
ret = ReadFile(server, in, sizeof(in), NULL, &ovl);
|
||||
ok(ret, "wrong ret %d\n", ret);
|
||||
|
||||
pTpWaitForIoCompletion(io, FALSE);
|
||||
ok(userdata.count == 1, "callback ran %u times\n", userdata.count);
|
||||
ok(userdata.ovl == &ovl, "expected %p, got %p\n", &ovl, userdata.ovl);
|
||||
ok(userdata.ret == STATUS_SUCCESS, "got status %#x\n", userdata.ret);
|
||||
ok(userdata.length == 1, "got length %lu\n", userdata.length);
|
||||
ok(userdata.io == io, "expected %p, got %p\n", io, userdata.io);
|
||||
|
||||
userdata.count = 0;
|
||||
pTpStartAsyncIoOperation(io);
|
||||
|
||||
ret = ReadFile(server, NULL, 1, NULL, &ovl);
|
||||
ok(!ret, "wrong ret %d\n", ret);
|
||||
ok(GetLastError() == ERROR_NOACCESS, "wrong error %u\n", GetLastError());
|
||||
|
||||
pTpCancelAsyncIoOperation(io);
|
||||
pTpWaitForIoCompletion(io, FALSE);
|
||||
ok(!userdata.count, "callback ran %u times\n", userdata.count);
|
||||
|
||||
userdata.count = 0;
|
||||
pTpStartAsyncIoOperation(io);
|
||||
|
||||
ret = ReadFile(server, in, sizeof(in), NULL, &ovl);
|
||||
ok(!ret, "wrong ret %d\n", ret);
|
||||
ok(GetLastError() == ERROR_IO_PENDING, "wrong error %u\n", GetLastError());
|
||||
ret = CancelIo(server);
|
||||
ok(ret, "CancelIo() failed, error %u\n", GetLastError());
|
||||
|
||||
pTpWaitForIoCompletion(io, FALSE);
|
||||
ok(userdata.count == 1, "callback ran %u times\n", userdata.count);
|
||||
ok(userdata.ovl == &ovl, "expected %p, got %p\n", &ovl, userdata.ovl);
|
||||
ok(userdata.ret == STATUS_CANCELLED, "got status %#x\n", userdata.ret);
|
||||
ok(!userdata.length, "got length %lu\n", userdata.length);
|
||||
ok(userdata.io == io, "expected %p, got %p\n", io, userdata.io);
|
||||
|
||||
CloseHandle(ovl.hEvent);
|
||||
CloseHandle(client);
|
||||
CloseHandle(server);
|
||||
pTpReleaseIoCompletion(io);
|
||||
pTpReleasePool(pool);
|
||||
}
|
||||
|
||||
START_TEST(threadpool)
|
||||
{
|
||||
test_RtlQueueWorkItem();
|
||||
|
@ -1925,4 +2103,5 @@ START_TEST(threadpool)
|
|||
test_tp_window_length();
|
||||
test_tp_wait();
|
||||
test_tp_multi_wait();
|
||||
test_tp_io();
|
||||
}
|
||||
|
|
|
@ -131,6 +131,7 @@ struct threadpool
|
|||
int min_workers;
|
||||
int num_workers;
|
||||
int num_busy_workers;
|
||||
HANDLE compl_port;
|
||||
TP_POOL_STACK_INFORMATION stack_info;
|
||||
};
|
||||
|
||||
|
@ -139,7 +140,14 @@ enum threadpool_objtype
|
|||
TP_OBJECT_TYPE_SIMPLE,
|
||||
TP_OBJECT_TYPE_WORK,
|
||||
TP_OBJECT_TYPE_TIMER,
|
||||
TP_OBJECT_TYPE_WAIT
|
||||
TP_OBJECT_TYPE_WAIT,
|
||||
TP_OBJECT_TYPE_IO,
|
||||
};
|
||||
|
||||
struct io_completion
|
||||
{
|
||||
IO_STATUS_BLOCK iosb;
|
||||
ULONG_PTR cvalue;
|
||||
};
|
||||
|
||||
/* internal threadpool object representation */
|
||||
|
@ -201,6 +209,13 @@ struct threadpool_object
|
|||
ULONGLONG timeout;
|
||||
HANDLE handle;
|
||||
} wait;
|
||||
struct
|
||||
{
|
||||
PTP_IO_CALLBACK callback;
|
||||
/* locked via .pool->cs */
|
||||
unsigned int pending_count, completion_count, completion_max;
|
||||
struct io_completion *completions;
|
||||
} io;
|
||||
} u;
|
||||
};
|
||||
|
||||
|
@ -291,6 +306,29 @@ struct waitqueue_bucket
|
|||
HANDLE update_event;
|
||||
};
|
||||
|
||||
/* global I/O completion queue object */
|
||||
static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug;
|
||||
|
||||
static struct
|
||||
{
|
||||
CRITICAL_SECTION cs;
|
||||
LONG objcount;
|
||||
BOOL thread_running;
|
||||
HANDLE port;
|
||||
RTL_CONDITION_VARIABLE update_event;
|
||||
}
|
||||
ioqueue =
|
||||
{
|
||||
.cs = { &ioqueue_debug, -1, 0, 0, 0, 0 },
|
||||
};
|
||||
|
||||
static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug =
|
||||
{
|
||||
0, 0, &ioqueue.cs,
|
||||
{ &ioqueue_debug.ProcessLocksList, &ioqueue_debug.ProcessLocksList },
|
||||
0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") }
|
||||
};
|
||||
|
||||
static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
|
||||
{
|
||||
return (struct threadpool *)pool;
|
||||
|
@ -317,6 +355,13 @@ static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
|
|||
return object;
|
||||
}
|
||||
|
||||
static inline struct threadpool_object *impl_from_TP_IO( TP_IO *io )
|
||||
{
|
||||
struct threadpool_object *object = (struct threadpool_object *)io;
|
||||
assert( object->type == TP_OBJECT_TYPE_IO );
|
||||
return object;
|
||||
}
|
||||
|
||||
static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
|
||||
{
|
||||
return (struct threadpool_group *)group;
|
||||
|
@ -343,6 +388,33 @@ static inline LONG interlocked_dec( PLONG dest )
|
|||
return interlocked_xchg_add( dest, -1 ) - 1;
|
||||
}
|
||||
|
||||
static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
|
||||
{
|
||||
unsigned int new_capacity, max_capacity;
|
||||
void *new_elements;
|
||||
|
||||
if (count <= *capacity)
|
||||
return TRUE;
|
||||
|
||||
max_capacity = ~(SIZE_T)0 / size;
|
||||
if (count > max_capacity)
|
||||
return FALSE;
|
||||
|
||||
new_capacity = max(4, *capacity);
|
||||
while (new_capacity < count && new_capacity <= max_capacity / 2)
|
||||
new_capacity *= 2;
|
||||
if (new_capacity < count)
|
||||
new_capacity = max_capacity;
|
||||
|
||||
if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size )))
|
||||
return FALSE;
|
||||
|
||||
*elements = new_elements;
|
||||
*capacity = new_capacity;
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata )
|
||||
{
|
||||
struct rtl_work_item *item = userdata;
|
||||
|
@ -1642,6 +1714,127 @@ static void tp_waitqueue_unlock( struct threadpool_object *wait )
|
|||
RtlLeaveCriticalSection( &waitqueue.cs );
|
||||
}
|
||||
|
||||
static void CALLBACK ioqueue_thread_proc( void *param )
|
||||
{
|
||||
struct io_completion *completion;
|
||||
struct threadpool_object *io;
|
||||
IO_STATUS_BLOCK iosb;
|
||||
ULONG_PTR key, value;
|
||||
NTSTATUS status;
|
||||
|
||||
TRACE( "starting I/O completion thread\n" );
|
||||
|
||||
RtlEnterCriticalSection( &ioqueue.cs );
|
||||
|
||||
for (;;)
|
||||
{
|
||||
RtlLeaveCriticalSection( &ioqueue.cs );
|
||||
if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL )))
|
||||
ERR("NtRemoveIoCompletion failed, status %#x.\n", status);
|
||||
RtlEnterCriticalSection( &ioqueue.cs );
|
||||
|
||||
if (key)
|
||||
{
|
||||
io = (struct threadpool_object *)key;
|
||||
|
||||
RtlEnterCriticalSection( &io->pool->cs );
|
||||
|
||||
if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max,
|
||||
io->u.io.completion_count + 1, sizeof(*io->u.io.completions)))
|
||||
{
|
||||
ERR("Failed to allocate memory.\n");
|
||||
RtlLeaveCriticalSection( &io->pool->cs );
|
||||
continue;
|
||||
}
|
||||
|
||||
completion = &io->u.io.completions[io->u.io.completion_count++];
|
||||
completion->iosb = iosb;
|
||||
completion->cvalue = value;
|
||||
|
||||
tp_object_submit( io, FALSE );
|
||||
|
||||
RtlLeaveCriticalSection( &io->pool->cs );
|
||||
}
|
||||
|
||||
if (!ioqueue.objcount)
|
||||
{
|
||||
/* All I/O objects have been destroyed; if no new objects are
|
||||
* created within some amount of time, then we can shutdown this
|
||||
* thread. */
|
||||
LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000};
|
||||
if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs,
|
||||
&timeout) == STATUS_TIMEOUT && !ioqueue.objcount)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
RtlLeaveCriticalSection( &ioqueue.cs );
|
||||
|
||||
TRACE( "terminating I/O completion thread\n" );
|
||||
|
||||
RtlExitUserThread( 0 );
|
||||
}
|
||||
|
||||
static NTSTATUS tp_ioqueue_lock( struct threadpool_object *io, HANDLE file )
|
||||
{
|
||||
NTSTATUS status = STATUS_SUCCESS;
|
||||
|
||||
assert( io->type == TP_OBJECT_TYPE_IO );
|
||||
|
||||
RtlEnterCriticalSection( &ioqueue.cs );
|
||||
|
||||
if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port,
|
||||
IO_COMPLETION_ALL_ACCESS, NULL, 0 )))
|
||||
{
|
||||
RtlLeaveCriticalSection( &ioqueue.cs );
|
||||
return status;
|
||||
}
|
||||
|
||||
if (!ioqueue.thread_running)
|
||||
{
|
||||
HANDLE thread;
|
||||
|
||||
if (!(status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
|
||||
NULL, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL )))
|
||||
{
|
||||
ioqueue.thread_running = TRUE;
|
||||
NtClose( thread );
|
||||
}
|
||||
}
|
||||
|
||||
if (status == STATUS_SUCCESS)
|
||||
{
|
||||
FILE_COMPLETION_INFORMATION info;
|
||||
IO_STATUS_BLOCK iosb;
|
||||
|
||||
info.CompletionPort = ioqueue.port;
|
||||
info.CompletionKey = (ULONG_PTR)io;
|
||||
|
||||
status = NtSetInformationFile( file, &iosb, &info, sizeof(info), FileCompletionInformation );
|
||||
}
|
||||
|
||||
if (status == STATUS_SUCCESS)
|
||||
{
|
||||
if (!ioqueue.objcount++)
|
||||
RtlWakeConditionVariable( &ioqueue.update_event );
|
||||
}
|
||||
|
||||
RtlLeaveCriticalSection( &ioqueue.cs );
|
||||
return status;
|
||||
}
|
||||
|
||||
static void tp_ioqueue_unlock( struct threadpool_object *io )
|
||||
{
|
||||
assert( io->type == TP_OBJECT_TYPE_IO );
|
||||
|
||||
RtlEnterCriticalSection( &ioqueue.cs );
|
||||
|
||||
if (!--ioqueue.objcount)
|
||||
NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 );
|
||||
|
||||
RtlLeaveCriticalSection( &ioqueue.cs );
|
||||
}
|
||||
|
||||
/***********************************************************************
|
||||
* tp_threadpool_alloc (internal)
|
||||
*
|
||||
|
@ -2017,6 +2210,8 @@ static void tp_object_cancel( struct threadpool_object *object )
|
|||
if (object->type == TP_OBJECT_TYPE_WAIT)
|
||||
object->u.wait.signaled = 0;
|
||||
}
|
||||
if (object->type == TP_OBJECT_TYPE_IO)
|
||||
object->u.io.pending_count = 0;
|
||||
RtlLeaveCriticalSection( &pool->cs );
|
||||
|
||||
while (pending_callbacks--)
|
||||
|
@ -2027,6 +2222,8 @@ static BOOL object_is_finished( struct threadpool_object *object, BOOL group )
|
|||
{
|
||||
if (object->num_pending_callbacks)
|
||||
return FALSE;
|
||||
if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count)
|
||||
return FALSE;
|
||||
|
||||
if (group)
|
||||
return !object->num_running_callbacks;
|
||||
|
@ -2066,6 +2263,8 @@ static void tp_object_prepare_shutdown( struct threadpool_object *object )
|
|||
tp_timerqueue_unlock( object );
|
||||
else if (object->type == TP_OBJECT_TYPE_WAIT)
|
||||
tp_waitqueue_unlock( object );
|
||||
else if (object->type == TP_OBJECT_TYPE_IO)
|
||||
tp_ioqueue_unlock( object );
|
||||
}
|
||||
|
||||
/***********************************************************************
|
||||
|
@ -2131,6 +2330,7 @@ static void CALLBACK threadpool_worker_proc( void *param )
|
|||
{
|
||||
TP_CALLBACK_INSTANCE *callback_instance;
|
||||
struct threadpool_instance instance;
|
||||
struct io_completion completion;
|
||||
struct threadpool *pool = param;
|
||||
TP_WAIT_RESULT wait_result = 0;
|
||||
LARGE_INTEGER timeout;
|
||||
|
@ -2160,6 +2360,12 @@ static void CALLBACK threadpool_worker_proc( void *param )
|
|||
wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
|
||||
if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
|
||||
}
|
||||
else if (object->type == TP_OBJECT_TYPE_IO)
|
||||
{
|
||||
assert( object->u.io.completion_count );
|
||||
completion = object->u.io.completions[--object->u.io.completion_count];
|
||||
object->u.io.pending_count--;
|
||||
}
|
||||
|
||||
/* Leave critical section and do the actual callback. */
|
||||
object->num_associated_callbacks++;
|
||||
|
@ -2218,6 +2424,17 @@ static void CALLBACK threadpool_worker_proc( void *param )
|
|||
break;
|
||||
}
|
||||
|
||||
case TP_OBJECT_TYPE_IO:
|
||||
{
|
||||
TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n",
|
||||
object->u.io.callback, callback_instance, object->userdata,
|
||||
completion.cvalue, &completion.iosb, (TP_IO *)object );
|
||||
object->u.io.callback( callback_instance, object->userdata,
|
||||
(void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
|
||||
TRACE( "callback %p returned\n", object->u.io.callback );
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
assert(0);
|
||||
break;
|
||||
|
@ -2317,6 +2534,50 @@ NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
|
|||
return tp_group_alloc( (struct threadpool_group **)out );
|
||||
}
|
||||
|
||||
/***********************************************************************
|
||||
* TpAllocIoCompletion (NTDLL.@)
|
||||
*/
|
||||
NTSTATUS WINAPI TpAllocIoCompletion( TP_IO **out, HANDLE file, PTP_IO_CALLBACK callback,
|
||||
void *userdata, TP_CALLBACK_ENVIRON *environment )
|
||||
{
|
||||
struct threadpool_object *object;
|
||||
struct threadpool *pool;
|
||||
NTSTATUS status;
|
||||
|
||||
TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment );
|
||||
|
||||
if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) )))
|
||||
return STATUS_NO_MEMORY;
|
||||
|
||||
if ((status = tp_threadpool_lock( &pool, environment )))
|
||||
{
|
||||
RtlFreeHeap( GetProcessHeap(), 0, object );
|
||||
return status;
|
||||
}
|
||||
|
||||
object->type = TP_OBJECT_TYPE_IO;
|
||||
object->u.io.callback = callback;
|
||||
if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) )))
|
||||
{
|
||||
tp_threadpool_unlock( pool );
|
||||
RtlFreeHeap( GetProcessHeap(), 0, object );
|
||||
return status;
|
||||
}
|
||||
|
||||
if ((status = tp_ioqueue_lock( object, file )))
|
||||
{
|
||||
tp_threadpool_unlock( pool );
|
||||
RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions );
|
||||
RtlFreeHeap( GetProcessHeap(), 0, object );
|
||||
return status;
|
||||
}
|
||||
|
||||
tp_object_initialize( object, pool, userdata, environment );
|
||||
|
||||
*out = (TP_IO *)object;
|
||||
return STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
/***********************************************************************
|
||||
* TpAllocPool (NTDLL.@)
|
||||
*/
|
||||
|
@ -2441,6 +2702,26 @@ NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID us
|
|||
return STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
/***********************************************************************
|
||||
* TpCancelAsyncIoOperation (NTDLL.@)
|
||||
*/
|
||||
void WINAPI TpCancelAsyncIoOperation( TP_IO *io )
|
||||
{
|
||||
struct threadpool_object *this = impl_from_TP_IO( io );
|
||||
|
||||
TRACE( "%p\n", io );
|
||||
|
||||
RtlEnterCriticalSection( &this->pool->cs );
|
||||
|
||||
this->u.io.pending_count--;
|
||||
if (object_is_finished( this, TRUE ))
|
||||
RtlWakeAllConditionVariable( &this->group_finished_event );
|
||||
if (object_is_finished( this, FALSE ))
|
||||
RtlWakeAllConditionVariable( &this->finished_event );
|
||||
|
||||
RtlLeaveCriticalSection( &this->pool->cs );
|
||||
}
|
||||
|
||||
/***********************************************************************
|
||||
* TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
|
||||
*/
|
||||
|
@ -2692,6 +2973,20 @@ VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_p
|
|||
}
|
||||
}
|
||||
|
||||
/***********************************************************************
|
||||
* TpReleaseIoCompletion (NTDLL.@)
|
||||
*/
|
||||
void WINAPI TpReleaseIoCompletion( TP_IO *io )
|
||||
{
|
||||
struct threadpool_object *this = impl_from_TP_IO( io );
|
||||
|
||||
TRACE( "%p\n", io );
|
||||
|
||||
tp_object_prepare_shutdown( this );
|
||||
this->shutdown = TRUE;
|
||||
tp_object_release( this );
|
||||
}
|
||||
|
||||
/***********************************************************************
|
||||
* TpReleasePool (NTDLL.@)
|
||||
*/
|
||||
|
@ -2960,6 +3255,36 @@ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
|
|||
return STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
/***********************************************************************
|
||||
* TpStartAsyncIoOperation (NTDLL.@)
|
||||
*/
|
||||
void WINAPI TpStartAsyncIoOperation( TP_IO *io )
|
||||
{
|
||||
struct threadpool_object *this = impl_from_TP_IO( io );
|
||||
|
||||
TRACE( "%p\n", io );
|
||||
|
||||
RtlEnterCriticalSection( &this->pool->cs );
|
||||
|
||||
this->u.io.pending_count++;
|
||||
|
||||
RtlLeaveCriticalSection( &this->pool->cs );
|
||||
}
|
||||
|
||||
/***********************************************************************
|
||||
* TpWaitForIoCompletion (NTDLL.@)
|
||||
*/
|
||||
void WINAPI TpWaitForIoCompletion( TP_IO *io, BOOL cancel_pending )
|
||||
{
|
||||
struct threadpool_object *this = impl_from_TP_IO( io );
|
||||
|
||||
TRACE( "%p %d\n", io, cancel_pending );
|
||||
|
||||
if (cancel_pending)
|
||||
tp_object_cancel( this );
|
||||
tp_object_wait( this, FALSE );
|
||||
}
|
||||
|
||||
/***********************************************************************
|
||||
* TpWaitForTimer (NTDLL.@)
|
||||
*/
|
||||
|
@ -3039,11 +3364,3 @@ NTSTATUS WINAPI TpQueryPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORM
|
|||
|
||||
return STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
/***********************************************************************
|
||||
* TpStartAsyncIoOperation (NTDLL.@)
|
||||
*/
|
||||
void WINAPI TpStartAsyncIoOperation( TP_IO *io )
|
||||
{
|
||||
FIXME( "%p\n", io );
|
||||
}
|
||||
|
|
|
@ -2351,6 +2351,8 @@ typedef struct _SYSTEM_MODULE_INFORMATION
|
|||
|
||||
typedef LONG (CALLBACK *PRTL_EXCEPTION_FILTER)(PEXCEPTION_POINTERS);
|
||||
|
||||
typedef void (CALLBACK *PTP_IO_CALLBACK)(PTP_CALLBACK_INSTANCE,void*,void*,IO_STATUS_BLOCK*,PTP_IO);
|
||||
|
||||
/***********************************************************************
|
||||
* Function declarations
|
||||
*/
|
||||
|
@ -3013,6 +3015,7 @@ NTSYSAPI NTSTATUS WINAPI RtlLargeIntegerToChar(const ULONGLONG *,ULONG,ULONG,PC
|
|||
/* Threadpool functions */
|
||||
|
||||
NTSYSAPI NTSTATUS WINAPI TpAllocCleanupGroup(TP_CLEANUP_GROUP **);
|
||||
NTSYSAPI NTSTATUS WINAPI TpAllocIoCompletion(TP_IO **,HANDLE,PTP_IO_CALLBACK,void *,TP_CALLBACK_ENVIRON *);
|
||||
NTSYSAPI NTSTATUS WINAPI TpAllocPool(TP_POOL **,PVOID);
|
||||
NTSYSAPI NTSTATUS WINAPI TpAllocTimer(TP_TIMER **,PTP_TIMER_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
|
||||
NTSYSAPI NTSTATUS WINAPI TpAllocWait(TP_WAIT **,PTP_WAIT_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
|
||||
|
@ -3023,23 +3026,26 @@ NTSYSAPI void WINAPI TpCallbackReleaseMutexOnCompletion(TP_CALLBACK_INSTANC
|
|||
NTSYSAPI void WINAPI TpCallbackReleaseSemaphoreOnCompletion(TP_CALLBACK_INSTANCE *,HANDLE,DWORD);
|
||||
NTSYSAPI void WINAPI TpCallbackSetEventOnCompletion(TP_CALLBACK_INSTANCE *,HANDLE);
|
||||
NTSYSAPI void WINAPI TpCallbackUnloadDllOnCompletion(TP_CALLBACK_INSTANCE *,HMODULE);
|
||||
NTSYSAPI void WINAPI TpCancelAsyncIoOperation(TP_IO *);
|
||||
NTSYSAPI void WINAPI TpDisassociateCallback(TP_CALLBACK_INSTANCE *);
|
||||
NTSYSAPI BOOL WINAPI TpIsTimerSet(TP_TIMER *);
|
||||
NTSYSAPI void WINAPI TpPostWork(TP_WORK *);
|
||||
NTSYSAPI NTSTATUS WINAPI TpQueryPoolStackInformation(TP_POOL *, TP_POOL_STACK_INFORMATION *stack_info);
|
||||
NTSYSAPI void WINAPI TpReleaseCleanupGroup(TP_CLEANUP_GROUP *);
|
||||
NTSYSAPI void WINAPI TpReleaseCleanupGroupMembers(TP_CLEANUP_GROUP *,BOOL,PVOID);
|
||||
NTSYSAPI void WINAPI TpReleaseIoCompletion(TP_IO *);
|
||||
NTSYSAPI void WINAPI TpReleasePool(TP_POOL *);
|
||||
NTSYSAPI void WINAPI TpReleaseTimer(TP_TIMER *);
|
||||
NTSYSAPI void WINAPI TpReleaseWait(TP_WAIT *);
|
||||
NTSYSAPI void WINAPI TpReleaseWork(TP_WORK *);
|
||||
NTSYSAPI void WINAPI TpStartAsyncIoOperation(TP_IO *);
|
||||
NTSYSAPI void WINAPI TpSetPoolMaxThreads(TP_POOL *,DWORD);
|
||||
NTSYSAPI BOOL WINAPI TpSetPoolMinThreads(TP_POOL *,DWORD);
|
||||
NTSYSAPI NTSTATUS WINAPI TpSetPoolStackInformation(TP_POOL *, TP_POOL_STACK_INFORMATION *stack_info);
|
||||
NTSYSAPI void WINAPI TpSetTimer(TP_TIMER *, LARGE_INTEGER *,LONG,LONG);
|
||||
NTSYSAPI void WINAPI TpSetWait(TP_WAIT *,HANDLE,LARGE_INTEGER *);
|
||||
NTSYSAPI NTSTATUS WINAPI TpSimpleTryPost(PTP_SIMPLE_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
|
||||
NTSYSAPI void WINAPI TpStartAsyncIoOperation(TP_IO *);
|
||||
NTSYSAPI void WINAPI TpWaitForIoCompletion(TP_IO *,BOOL);
|
||||
NTSYSAPI void WINAPI TpWaitForTimer(TP_TIMER *,BOOL);
|
||||
NTSYSAPI void WINAPI TpWaitForWait(TP_WAIT *,BOOL);
|
||||
NTSYSAPI void WINAPI TpWaitForWork(TP_WORK *,BOOL);
|
||||
|
|
Loading…
Reference in New Issue