diff --git a/dlls/msvcp90/misc.c b/dlls/msvcp90/misc.c index ca8507916ce..b3e546ae100 100644 --- a/dlls/msvcp90/misc.c +++ b/dlls/msvcp90/misc.c @@ -1236,9 +1236,24 @@ typedef struct extern const vtable_ptr MSVCP__Concurrent_queue_base_v4_vtable; #if _MSVCP_VER == 100 +#define call__Concurrent_queue_base_v4__Move_item call__Concurrent_queue_base_v4__Copy_item +#define call__Concurrent_queue_base_v4__Copy_item(this,dst,idx,src) CALL_VTBL_FUNC(this, \ + 0, void, (_Concurrent_queue_base_v4*,_Page*,MSVCP_size_t,const void*), (this,dst,idx,src)) +#define call__Concurrent_queue_base_v4__Assign_and_destroy_item(this,dst,src,idx) CALL_VTBL_FUNC(this, \ + 4, void, (_Concurrent_queue_base_v4*,void*,_Page*,MSVCP_size_t), (this,dst,src,idx)) +#define call__Concurrent_queue_base_v4__Allocate_page(this) CALL_VTBL_FUNC(this, \ + 12, _Page*, (_Concurrent_queue_base_v4*), (this)) #define call__Concurrent_queue_base_v4__Deallocate_page(this, page) CALL_VTBL_FUNC(this, \ 16, void, (_Concurrent_queue_base_v4*,_Page*), (this,page)) #else +#define call__Concurrent_queue_base_v4__Move_item(this,dst,idx,src) CALL_VTBL_FUNC(this, \ + 0, void, (_Concurrent_queue_base_v4*,_Page*,MSVCP_size_t,void*), (this,dst,idx,src)) +#define call__Concurrent_queue_base_v4__Copy_item(this,dst,idx,src) CALL_VTBL_FUNC(this, \ + 4, void, (_Concurrent_queue_base_v4*,_Page*,MSVCP_size_t,const void*), (this,dst,idx,src)) +#define call__Concurrent_queue_base_v4__Assign_and_destroy_item(this,dst,src,idx) CALL_VTBL_FUNC(this, \ + 8, void, (_Concurrent_queue_base_v4*,void*,_Page*,MSVCP_size_t), (this,dst,src,idx)) +#define call__Concurrent_queue_base_v4__Allocate_page(this) CALL_VTBL_FUNC(this, \ + 16, _Page*, (_Concurrent_queue_base_v4*), (this)) #define call__Concurrent_queue_base_v4__Deallocate_page(this, page) CALL_VTBL_FUNC(this, \ 20, void, (_Concurrent_queue_base_v4*,_Page*), (this,page)) #endif @@ -1344,13 +1359,134 @@ MSVCP_size_t __thiscall _Concurrent_queue_base_v4__Internal_size( return this->data->tail_pos - this->data->head_pos; } +static void spin_wait(int *counter) +{ + static int spin_limit = -1; + + if(spin_limit == -1) + { + SYSTEM_INFO si; + GetSystemInfo(&si); + spin_limit = si.dwNumberOfProcessors>1 ? 4000 : 0; + } + + if(*counter >= spin_limit) + { + *counter = 0; + Sleep(0); + } + else + { + (*counter)++; + } +} + +#ifdef _WIN64 +static MSVCP_size_t InterlockedIncrementSizeT(MSVCP_size_t volatile *dest) +{ + MSVCP_size_t v; + + do + { + v = *dest; + } while(InterlockedCompareExchange64((LONGLONG*)dest, v+1, v) != v); + + return v+1; +} +#else +#define InterlockedIncrementSizeT(dest) InterlockedIncrement((LONG*)dest) +#endif + +static void threadsafe_queue_push(threadsafe_queue *queue, MSVCP_size_t id, + void *e, _Concurrent_queue_base_v4 *parent, BOOL copy) +{ + MSVCP_size_t page_id = id & ~(parent->alloc_count-1); + int spin; + _Page *p; + + spin = 0; + while(queue->tail_pos != id) + spin_wait(&spin); + + if(page_id == id) + { + /* TODO: Add exception handling */ + p = call__Concurrent_queue_base_v4__Allocate_page(parent); + p->_Next = NULL; + p->_Mask = 0; + + spin = 0; + while(InterlockedCompareExchange(&queue->lock, 1, 0)) + spin_wait(&spin); + if(queue->tail) + queue->tail->_Next = p; + queue->tail = p; + if(!queue->head) + queue->head = p; + queue->lock = 0; + } + else + { + p = queue->tail; + } + + /* TODO: Add exception handling */ + if(copy) + call__Concurrent_queue_base_v4__Copy_item(parent, p, id-page_id, e); + else + call__Concurrent_queue_base_v4__Move_item(parent, p, id-page_id, e); + p->_Mask |= 1 << (id - page_id); + InterlockedIncrementSizeT(&queue->tail_pos); +} + +static void threadsafe_queue_pop(threadsafe_queue *queue, MSVCP_size_t id, + void *e, _Concurrent_queue_base_v4 *parent) +{ + MSVCP_size_t page_id = id & ~(parent->alloc_count-1); + int spin; + _Page *p; + + spin = 0; + while(queue->tail_pos <= id) + spin_wait(&spin); + + spin = 0; + while(queue->head_pos != id) + spin_wait(&spin); + + p = queue->head; + /* TODO: Add exception handling */ + call__Concurrent_queue_base_v4__Assign_and_destroy_item(parent, e, p, id-page_id); + + if(id == page_id+parent->alloc_count-1) + { + spin = 0; + while(InterlockedCompareExchange(&queue->lock, 1, 0)) + spin_wait(&spin); + queue->head = p->_Next; + if(!queue->head) + queue->tail = NULL; + queue->lock = 0; + + /* TODO: Add exception handling */ + call__Concurrent_queue_base_v4__Deallocate_page(parent, p); + } + InterlockedIncrementSizeT(&queue->head_pos); +} + /* ?_Internal_push@_Concurrent_queue_base_v4@details@Concurrency@@IAEXPBX@Z */ /* ?_Internal_push@_Concurrent_queue_base_v4@details@Concurrency@@IEAAXPEBX@Z */ DEFINE_THISCALL_WRAPPER(_Concurrent_queue_base_v4__Internal_push, 8) void __thiscall _Concurrent_queue_base_v4__Internal_push( - _Concurrent_queue_base_v4 *this, const void *e) + _Concurrent_queue_base_v4 *this, void *e) { - FIXME("(%p %p) stub\n", this, e); + MSVCP_size_t id; + + TRACE("(%p %p)\n", this, e); + + id = InterlockedIncrementSizeT(&this->data->tail_pos)-1; + threadsafe_queue_push(this->data->queues + id % QUEUES_NO, + id / QUEUES_NO, e, this, TRUE); } /* ?_Internal_move_push@_Concurrent_queue_base_v4@details@Concurrency@@IAEXPAX@Z */ @@ -1359,7 +1495,13 @@ DEFINE_THISCALL_WRAPPER(_Concurrent_queue_base_v4__Internal_move_push, 8) void __thiscall _Concurrent_queue_base_v4__Internal_move_push( _Concurrent_queue_base_v4 *this, void *e) { - FIXME("(%p %p) stub\n", this, e); + MSVCP_size_t id; + + TRACE("(%p %p)\n", this, e); + + id = InterlockedIncrementSizeT(&this->data->tail_pos)-1; + threadsafe_queue_push(this->data->queues + id % QUEUES_NO, + id / QUEUES_NO, e, this, FALSE); } /* ?_Internal_pop_if_present@_Concurrent_queue_base_v4@details@Concurrency@@IAE_NPAX@Z */ @@ -1368,8 +1510,18 @@ DEFINE_THISCALL_WRAPPER(_Concurrent_queue_base_v4__Internal_pop_if_present, 8) MSVCP_bool __thiscall _Concurrent_queue_base_v4__Internal_pop_if_present( _Concurrent_queue_base_v4 *this, void *e) { - FIXME("(%p %p) stub\n", this, e); - return 0; + MSVCP_size_t id; + + TRACE("(%p %p)\n", this, e); + + do + { + id = this->data->head_pos; + if(id == this->data->tail_pos) return FALSE; + } while(InterlockedCompareExchangePointer((void**)&this->data->head_pos, + (void*)(id+1), (void*)id) != (void*)id); + threadsafe_queue_pop(this->data->queues + id % QUEUES_NO, id / QUEUES_NO, e, this); + return TRUE; } /* ?_Internal_swap@_Concurrent_queue_base_v4@details@Concurrency@@IAEXAAV123@@Z */