#include "ThreadID.h" DWORD ThreadID::thread_func_stub(LPVOID param) { ThreadID *t = static_cast<ThreadID*>(param); if (t != NULL) { return t->ThreadFunction(); } else return 0; } void ThreadID::Kill() { if (threadHandle && threadHandle != INVALID_HANDLE_VALUE) { //cut: WaitForSingleObject(threadHandle, INFINITE); while (WaitForMultipleObjectsEx(1, &threadHandle, FALSE, INFINITE, TRUE) != WAIT_OBJECT_0) { } } } ThreadID::ThreadID(ThreadFunctions *t_f, HANDLE killswitch, HANDLE global_functions_semaphore, ThreadPoolTypes::HandleList &inherited_handles, volatile LONG *thread_count, HANDLE _max_load_event, int _reserved, int _com_type) : ThreadFunctions(_reserved) { /* initialize values */ released = false; InitializeCriticalSection(&handle_lock); /* grab values passed to us */ reserved = _reserved; com_type = _com_type; max_load_event = _max_load_event; global_functions = t_f; num_threads_available = thread_count; /* wait_handles[0] is kill switch */ wait_handles.push_back(killswitch); /* wait_handles[1] is wake switch */ wakeHandle = CreateSemaphore(0, 0, ThreadPoolTypes::MAX_SEMAPHORE_VALUE, 0); wait_handles.push_back(wakeHandle); if (reserved) { /* if thread is reserved, wait_handles[2] is a Funcion Call wake semaphore for this thread only. */ wait_handles.push_back(functions_semaphore); // WAIT_OBJECT_0+1 == per-thread queued functions } else { /* if thread is not reserved, wait_handles[2] is a Function Call wake semaphore global to all threads */ wait_handles.push_back(global_functions_semaphore); // WAIT_OBJECT_0+2 == any-thread queued functions } /* add inherited handles (handles added to thread pool before this thread was created) */ for ( ThreadPoolTypes::HandleList::iterator itr = inherited_handles.begin(); itr != inherited_handles.end(); itr++ ) { wait_handles.push_back( *itr ); } /* start thread */ threadHandle = CreateThread(0, 0, thread_func_stub, this, 0, 0); } ThreadID::~ThreadID() { CloseHandle(threadHandle); CloseHandle(wakeHandle); DeleteCriticalSection(&handle_lock); } bool ThreadID::TryAddHandle(HANDLE new_handle) { // let's see if we get lucky and can access the handle list directly if (TryEnterCriticalSection(&handle_lock)) { // made it wait_handles.push_back(new_handle); LeaveCriticalSection(&handle_lock); return true; } else { ReleaseSemaphore(wakeHandle, 1, 0); // kick the thread out of WaitForMultiple... return false; } } void ThreadID::WaitAddHandle(HANDLE handle) { // wakeHandle already got released once by nature of this function being called EnterCriticalSection(&handle_lock); wait_handles.push_back(handle); LeaveCriticalSection(&handle_lock); ReleaseSemaphore(wakeHandle, 1, 0); // kick out the second wait } void ThreadID::AddHandle(HANDLE new_handle) { if (!TryAddHandle(new_handle)) WaitAddHandle(new_handle); } bool ThreadID::TryRemoveHandle(HANDLE handle) { // let's see if we get lucky and can access the handle list directly if (TryEnterCriticalSection(&handle_lock)) { RemoveHandle_Internal(handle); LeaveCriticalSection(&handle_lock); return true; } else { ReleaseSemaphore(wakeHandle, 1, 0); // kick the thread out of WaitForMultiple... return false; } return false; } void ThreadID::WaitRemoveHandle(HANDLE handle) { // wakeHandle already got released once by nature of this function being called EnterCriticalSection(&handle_lock); RemoveHandle_Internal(handle); LeaveCriticalSection(&handle_lock); ReleaseSemaphore(wakeHandle, 1, 0); // kick out the second wait } void ThreadID::RemoveHandle(HANDLE handle) { if (!TryRemoveHandle(handle)) WaitRemoveHandle(handle); } void ThreadID::RemoveHandle_Internal(HANDLE handle) { // first three handles are reserved, so start after that for (size_t i=3;i<wait_handles.size();i++) { if (wait_handles[i] == handle) { wait_handles.erase(wait_handles.begin() + i); i--; } } } bool ThreadID::IsReserved() const { return !!reserved; } DWORD CALLBACK ThreadID::ThreadFunction() { switch(com_type) { case api_threadpool::FLAG_REQUIRE_COM_MT: CoInitializeEx(0, COINIT_MULTITHREADED); break; case api_threadpool::FLAG_REQUIRE_COM_STA: CoInitialize(0); break; } while (1) { InterlockedIncrement(num_threads_available); EnterCriticalSection(&handle_lock); DWORD ret = WaitForMultipleObjectsEx((DWORD)wait_handles.size(), wait_handles.data(), FALSE, INFINITE, TRUE); // cut: LeaveCriticalSection(&handle_lock); if (InterlockedDecrement(num_threads_available) == 0 && !reserved) SetEvent(max_load_event); // notify the watch dog if all the threads are used up if (ret == WAIT_OBJECT_0) { // killswitch LeaveCriticalSection(&handle_lock); break; } else if (ret == WAIT_OBJECT_0 + 1) { // we got woken up to release the handles lock // wait for the second signal LeaveCriticalSection(&handle_lock); InterlockedIncrement(num_threads_available); WaitForSingleObject(wakeHandle, INFINITE); InterlockedDecrement(num_threads_available); } else if (ret == WAIT_OBJECT_0 + 2) { LeaveCriticalSection(&handle_lock); api_threadpool::ThreadPoolFunc func; void *user_data; intptr_t id; if (reserved) { // per-thread queued functions if (PopFunction(&func, &user_data, &id)) { func(0, user_data, id); } } else { // global queued functions if (global_functions->PopFunction(&func, &user_data, &id)) { func(0, user_data, id); } } } else if (ret > WAIT_OBJECT_0 && ret < (WAIT_OBJECT_0 + wait_handles.size())) { DWORD index = ret - WAIT_OBJECT_0; HANDLE handle = wait_handles[index]; LeaveCriticalSection(&handle_lock); /* !!! race condition here if someone calls ThreadPool::RemoveHandle and then CloseHandle() !!! before calling RemoveHandle, caller needs to either ensure that Event is unsignalled (And won't be signalled) or call RemoveHandle from within the function callback */ api_threadpool::ThreadPoolFunc func; void *user_data; intptr_t id; if (global_functions->Get(handle, &func, &user_data, &id)) { func(handle, user_data, id); } } else { LeaveCriticalSection(&handle_lock); } } if (com_type & api_threadpool::MASK_COM_FLAGS) CoUninitialize(); return 0; } bool ThreadID::CanRunCOM(int flags) const { switch(com_type) { case api_threadpool::FLAG_REQUIRE_COM_MT: // if we're a CONIT_MULTITHREADEX thread (default) return !(flags & api_threadpool::FLAG_REQUIRE_COM_STA); // don't let STA stuff run case api_threadpool::FLAG_REQUIRE_COM_STA: // if we're a CoInitialize(0) thread return !(flags & api_threadpool::FLAG_REQUIRE_COM_MT); // don't let MT stuff run } return false; // shouldn't get here } bool ThreadID::IsReleased() const { return released; } void ThreadID::Reserve() { released=false; } void ThreadID::Release() { released=true; }