Added thread.

This commit is contained in:
bkaradzic
2012-12-02 18:18:47 -08:00
parent 59a6f1a684
commit c3c871539c
4 changed files with 446 additions and 257 deletions

View File

@@ -65,6 +65,10 @@
#define BX_UNUSED(_unused) do { (void)sizeof(_unused); } while(0)
#define BX_CLASS_NO_COPY_NO_ASSIGNMENT(_class) \
_class(const _class&); \
_class& operator=(const _class&)
#ifndef BX_CHECK
# define BX_CHECK(...) do {} while(0)
#endif // BX_CHECK

View File

@@ -1,105 +1,106 @@
/*
* Copyright 2010-2012 Branimir Karadzic. All rights reserved.
* License: http://www.opensource.org/licenses/BSD-2-Clause
*/
#ifndef __BX_SEM_H__
#define __BX_SEM_H__
#include "bx.h"
#if BX_PLATFORM_POSIX
# include <semaphore.h>
# include <time.h>
#elif BX_PLATFORM_WINDOWS || BX_PLATFORM_XBOX360
# include <limits.h>
#endif // BX_PLATFORM_
namespace bx
{
#if BX_PLATFORM_POSIX
class Semaphore
{
public:
Semaphore()
{
sem_init(&m_handle, 0, 0);
}
~Semaphore()
{
sem_destroy(&m_handle);
}
void post(uint32_t _count = 1)
{
for (uint32_t ii = 0; ii < _count; ++ii)
{
sem_post(&m_handle);
}
}
bool wait(int32_t _msecs = -1)
{
#if BX_PLATFORM_NACL
BX_CHECK(-1 == _msecs, "NaCl doesn't support sem_timedwait at this moment.");
return 0 == sem_wait(&m_handle);
#else
if (0 > _msecs)
{
return 0 == sem_wait(&m_handle);
}
timespec ts;
ts.tv_sec = _msecs/1000;
ts.tv_nsec = (_msecs%1000)*1000;
return 0 == sem_timedwait(&m_handle, &ts);
#endif // BX_PLATFORM_
}
private:
Semaphore(const Semaphore& _rhs); // no copy constructor
Semaphore& operator=(const Semaphore& _rhs); // no assignment operator
sem_t m_handle;
};
#elif BX_PLATFORM_WINDOWS || BX_PLATFORM_XBOX360
class Semaphore
{
public:
Semaphore()
{
m_handle = CreateSemaphore(NULL, 0, LONG_MAX, NULL);
BX_CHECK(NULL != m_handle, "Failed to create Semaphore!");
}
~Semaphore()
{
CloseHandle(m_handle);
}
void post(uint32_t _count = 1) const
{
ReleaseSemaphore(m_handle, _count, NULL);
}
bool wait(int32_t _msecs = -1) const
{
DWORD milliseconds = (0 > _msecs) ? INFINITE : _msecs;
return WAIT_OBJECT_0 == WaitForSingleObject(m_handle, milliseconds);
}
private:
Semaphore(const Semaphore& _rhs); // no copy constructor
Semaphore& operator=(const Semaphore& _rhs); // no assignment operator
HANDLE m_handle;
};
#endif // BX_PLATFORM_
} // namespace bx
#endif // __BX_SEM_H__
/*
* Copyright 2010-2012 Branimir Karadzic. All rights reserved.
* License: http://www.opensource.org/licenses/BSD-2-Clause
*/
#ifndef __BX_SEM_H__
#define __BX_SEM_H__
#include "bx.h"
#if BX_PLATFORM_POSIX
# include <semaphore.h>
# include <time.h>
#elif BX_PLATFORM_WINDOWS || BX_PLATFORM_XBOX360
# include <windows.h>
# include <limits.h>
#endif // BX_PLATFORM_
namespace bx
{
#if BX_PLATFORM_POSIX
class Semaphore
{
public:
Semaphore()
{
sem_init(&m_handle, 0, 0);
}
~Semaphore()
{
sem_destroy(&m_handle);
}
void post(uint32_t _count = 1)
{
for (uint32_t ii = 0; ii < _count; ++ii)
{
sem_post(&m_handle);
}
}
bool wait(int32_t _msecs = -1)
{
#if BX_PLATFORM_NACL
BX_CHECK(-1 == _msecs, "NaCl doesn't support sem_timedwait at this moment.");
return 0 == sem_wait(&m_handle);
#else
if (0 > _msecs)
{
return 0 == sem_wait(&m_handle);
}
timespec ts;
ts.tv_sec = _msecs/1000;
ts.tv_nsec = (_msecs%1000)*1000;
return 0 == sem_timedwait(&m_handle, &ts);
#endif // BX_PLATFORM_
}
private:
Semaphore(const Semaphore& _rhs); // no copy constructor
Semaphore& operator=(const Semaphore& _rhs); // no assignment operator
sem_t m_handle;
};
#elif BX_PLATFORM_WINDOWS || BX_PLATFORM_XBOX360
class Semaphore
{
public:
Semaphore()
{
m_handle = CreateSemaphore(NULL, 0, LONG_MAX, NULL);
BX_CHECK(NULL != m_handle, "Failed to create Semaphore!");
}
~Semaphore()
{
CloseHandle(m_handle);
}
void post(uint32_t _count = 1) const
{
ReleaseSemaphore(m_handle, _count, NULL);
}
bool wait(int32_t _msecs = -1) const
{
DWORD milliseconds = (0 > _msecs) ? INFINITE : _msecs;
return WAIT_OBJECT_0 == WaitForSingleObject(m_handle, milliseconds);
}
private:
Semaphore(const Semaphore& _rhs); // no copy constructor
Semaphore& operator=(const Semaphore& _rhs); // no assignment operator
HANDLE m_handle;
};
#endif // BX_PLATFORM_
} // namespace bx
#endif // __BX_SEM_H__

View File

@@ -1,152 +1,193 @@
/*
* Copyright 2010-2012 Branimir Karadzic. All rights reserved.
* License: http://www.opensource.org/licenses/BSD-2-Clause
*/
#ifndef __BX_SPSCQUEUE_H__
#define __BX_SPSCQUEUE_H__
#include <list>
#include "bx.h"
#include "cpu.h"
#include "mutex.h"
#include "uint32_t.h"
namespace bx
{
// http://drdobbs.com/article/print?articleId=210604448&siteSectionName=
template <typename Ty>
class SpScUnboundedQueueLf
{
public:
SpScUnboundedQueueLf()
: m_first(new Node(NULL) )
, m_divider(m_first)
, m_last(m_first)
{
}
~SpScUnboundedQueueLf()
{
while (NULL != m_first)
{
Node* node = m_first;
m_first = node->m_next;
delete node;
}
}
void push(Ty* _ptr) // producer only
{
m_last->m_next = new Node(_ptr);
atomicExchangePtr((void**)&m_last, m_last->m_next);
while (m_first != m_divider)
{
Node* node = m_first;
m_first = m_first->m_next;
delete node;
}
}
Ty* peek() // consumer only
{
if (m_divider != m_last)
{
Ty* ptr = m_divider->m_next->m_ptr;
return ptr;
}
return NULL;
}
Ty* pop() // consumer only
{
if (m_divider != m_last)
{
Ty* ptr = m_divider->m_next->m_ptr;
atomicExchangePtr((void**)&m_divider, m_divider->m_next);
return ptr;
}
return NULL;
}
private:
SpScUnboundedQueueLf(const SpScUnboundedQueueLf& _rhs); // no copy constructor
SpScUnboundedQueueLf& operator=(const SpScUnboundedQueueLf& _rhs); // no assignment operator
struct Node
{
Node(Ty* _ptr)
: m_ptr(_ptr)
, m_next(NULL)
{
}
Ty* m_ptr;
Node* m_next;
};
Node* m_first;
Node* m_divider;
Node* m_last;
};
template<typename Ty>
class SpScUnboundedQueueMutex
{
public:
SpScUnboundedQueueMutex()
{
}
~SpScUnboundedQueueMutex()
{
BX_CHECK(m_queue.empty(), "Queue is not empty!");
}
void push(Ty* _item)
{
bx::LwMutexScope lock(m_mutex);
m_queue.push_back(_item);
}
Ty* peek()
{
bx::LwMutexScope lock(m_mutex);
if (!m_queue.empty() )
{
return m_queue.front();
}
return NULL;
}
Ty* pop()
{
bx::LwMutexScope lock(m_mutex);
if (!m_queue.empty() )
{
Ty* item = m_queue.front();
m_queue.pop_front();
return item;
}
return NULL;
}
private:
bx::LwMutex m_mutex;
std::list<Ty*> m_queue;
};
#if BX_CONFIG_SPSCQUEUE_USE_MUTEX
# define SpScUnboundedQueue SpScUnboundedQueueMutex
#else
# define SpScUnboundedQueue SpScUnboundedQueueLf
#endif // BX_CONFIG_SPSCQUEUE_USE_MUTEX
} // namespace bx
#endif // __BX_RINGBUFFER_H__
/*
* Copyright 2010-2012 Branimir Karadzic. All rights reserved.
* License: http://www.opensource.org/licenses/BSD-2-Clause
*/
#ifndef __BX_SPSCQUEUE_H__
#define __BX_SPSCQUEUE_H__
#include <list>
#include "bx.h"
#include "cpu.h"
#include "mutex.h"
#include "uint32_t.h"
namespace bx
{
// http://drdobbs.com/article/print?articleId=210604448&siteSectionName=
template <typename Ty>
class SpScUnboundedQueueLf
{
BX_CLASS_NO_COPY_NO_ASSIGNMENT(SpScUnboundedQueueLf);
public:
SpScUnboundedQueueLf()
: m_first(new Node(NULL) )
, m_divider(m_first)
, m_last(m_first)
{
}
~SpScUnboundedQueueLf()
{
while (NULL != m_first)
{
Node* node = m_first;
m_first = node->m_next;
delete node;
}
}
void push(Ty* _ptr) // producer only
{
m_last->m_next = new Node( (void*)_ptr);
atomicExchangePtr( (void**)&m_last, m_last->m_next);
while (m_first != m_divider)
{
Node* node = m_first;
m_first = m_first->m_next;
delete node;
}
}
Ty* peek() // consumer only
{
if (m_divider != m_last)
{
Ty* ptr = (Ty*)m_divider->m_next->m_ptr;
return ptr;
}
return NULL;
}
Ty* pop() // consumer only
{
if (m_divider != m_last)
{
Ty* ptr = (Ty*)m_divider->m_next->m_ptr;
atomicExchangePtr( (void**)&m_divider, m_divider->m_next);
return ptr;
}
return NULL;
}
private:
struct Node
{
Node(void* _ptr)
: m_ptr(_ptr)
, m_next(NULL)
{
}
void* m_ptr;
Node* m_next;
};
Node* m_first;
Node* m_divider;
Node* m_last;
};
template<typename Ty>
class SpScUnboundedQueueMutex
{
BX_CLASS_NO_COPY_NO_ASSIGNMENT(SpScUnboundedQueueMutex);
public:
SpScUnboundedQueueMutex()
{
}
~SpScUnboundedQueueMutex()
{
BX_CHECK(m_queue.empty(), "Queue is not empty!");
}
void push(Ty* _item)
{
bx::LwMutexScope lock(m_mutex);
m_queue.push_back(_item);
}
Ty* peek()
{
bx::LwMutexScope lock(m_mutex);
if (!m_queue.empty() )
{
return m_queue.front();
}
return NULL;
}
Ty* pop()
{
bx::LwMutexScope lock(m_mutex);
if (!m_queue.empty() )
{
Ty* item = m_queue.front();
m_queue.pop_front();
return item;
}
return NULL;
}
private:
bx::LwMutex m_mutex;
std::list<Ty*> m_queue;
};
#if BX_CONFIG_SPSCQUEUE_USE_MUTEX
# define SpScUnboundedQueue SpScUnboundedQueueMutex
#else
# define SpScUnboundedQueue SpScUnboundedQueueLf
#endif // BX_CONFIG_SPSCQUEUE_USE_MUTEX
template <typename Ty>
class SpScBlockingUnboundedQueue
{
BX_CLASS_NO_COPY_NO_ASSIGNMENT(SpScBlockingUnboundedQueue);
public:
SpScBlockingUnboundedQueue()
{
}
~SpScBlockingUnboundedQueue()
{
}
void push(Ty* _ptr) // producer only
{
m_queue.push( (void*)_ptr);
m_count.post();
}
Ty* peek() // consumer only
{
return (Ty*)m_queue.peek();
}
Ty* pop(int32_t _msecs = -1) // consumer only
{
if (m_count.wait(_msecs) )
{
return (Ty*)m_queue.pop();
}
return NULL;
}
private:
Semaphore m_count;
SpScUnboundedQueue<void> m_queue;
};
} // namespace bx
#endif // __BX_RINGBUFFER_H__

143
include/bx/thread.h Normal file
View File

@@ -0,0 +1,143 @@
/*
* Copyright 2010-2012 Branimir Karadzic. All rights reserved.
* License: http://www.opensource.org/licenses/BSD-2-Clause
*/
#ifndef __BX_THREAD_H__
#define __BX_THREAD_H__
#if BX_PLATFORM_POSIX
# include <pthread.h>
#endif // BX_PLATFORM_POSIX
#include "sem.h"
namespace bx
{
typedef int32_t (*ThreadFn)(void* _userData);
class Thread
{
BX_CLASS_NO_COPY_NO_ASSIGNMENT(Thread);
public:
Thread(ThreadFn _fn, void* _userData, uint32_t _size = 16<<10)
#if BX_PLATFORM_WINDOWS|BX_PLATFORM_XBOX360
: m_handle(INVALID_HANDLE_VALUE)
#elif BX_PLATFORM_POSIX
: m_handle(NULL)
#endif // BX_PLATFORM_
, m_fn(_fn)
, m_userData(_userData)
, m_stackSize(_size)
, m_exitCode(EXIT_SUCCESS)
, m_running(false)
{
}
virtual ~Thread()
{
if (m_running)
{
shutdown();
}
}
void init()
{
BX_CHECK(!m_running, "Already running!");
m_running = true;
#if BX_PLATFORM_WINDOWS|BX_PLATFORM_XBOX360
m_handle = CreateThread(NULL
, m_stackSize
, threadFunc
, this
, 0
, NULL
);
#elif BX_PLATFORM_POSIX
int result;
pthread_attr_t attr;
result = pthread_attr_init(&attr);
BX_CHECK(0 == result, "pthread_attr_init failed! %d", result);
result = pthread_attr_setstacksize(&attr, m_stackSize);
BX_CHECK(0 == result, "pthread_attr_setstacksize failed! %d", result);
// sched_param sched;
// sched.sched_priority = 0;
// result = pthread_attr_setschedparam(&attr, &sched);
// BX_CHECK(0 == result, "pthread_attr_setschedparam failed! %d", result);
result = pthread_create(&m_handle, &attr, &threadFunc, this);
BX_CHECK(0 == result, "pthread_attr_setschedparam failed! %d", result);
#endif // BX_PLATFORM_
m_sem.wait();
}
void shutdown()
{
BX_CHECK(m_running, "Not running!");
#if BX_PLATFORM_WINDOWS|BX_PLATFORM_XBOX360
WaitForSingleObject(m_handle, INFINITE);
GetExitCodeThread(m_handle, (DWORD*)&m_exitCode);
CloseHandle(m_handle);
m_handle = INVALID_HANDLE_VALUE;
#elif BX_PLATFORM_POSIX
void* result;
pthread_join(m_handle, &result);
m_exitCode = reinterpret_cast<int32_t>(result);
m_handle = NULL;
#endif // BX_PLATFORM_
m_running = false;
}
bool isRunning() const
{
return m_running;
}
private:
int32_t entry()
{
m_sem.post();
return m_fn(m_userData);
}
#if BX_PLATFORM_WINDOWS|BX_PLATFORM_XBOX360
static DWORD WINAPI threadFunc(LPVOID _arg)
{
Thread* thread = (Thread*)_arg;
int32_t result = thread->entry();
return result;
}
#else
static void* threadFunc(void* _arg)
{
Thread* thread = (Thread*)_arg;
int32_t result = thread->entry();
return reinterpret_cast<void*>(result);
}
#endif // BX_PLATFORM_
#if BX_PLATFORM_WINDOWS|BX_PLATFORM_XBOX360
HANDLE m_handle;
#elif BX_PLATFORM_POSIX
pthread_t m_handle;
#endif // BX_PLATFORM_
ThreadFn m_fn;
void* m_userData;
Semaphore m_sem;
uint32_t m_stackSize;
int32_t m_exitCode;
bool m_running;
};
} // namespace bx
#endif // __BX_THREAD_H__