This commit is contained in:
Branimir Karadžić
2017-02-16 20:41:10 -08:00
parent c0e310c1f2
commit e70a583103
2 changed files with 244 additions and 82 deletions

View File

@@ -8,14 +8,11 @@
#include "bx.h"
#include "cpu.h"
#include "mutex.h"
#include "semaphore.h"
#include "uint32_t.h"
namespace bx
{
// http://drdobbs.com/article/print?articleId=210604448&siteSectionName=
template <typename Ty>
///
class SpScUnboundedQueue
{
BX_CLASS(SpScUnboundedQueue
@@ -24,66 +21,26 @@ namespace bx
);
public:
SpScUnboundedQueue()
: m_first(new Node(NULL) )
, m_divider(m_first)
, m_last(m_first)
{
}
///
SpScUnboundedQueue();
~SpScUnboundedQueue()
{
while (NULL != m_first)
{
Node* node = m_first;
m_first = node->m_next;
delete node;
}
}
///
~SpScUnboundedQueue();
void push(Ty* _ptr) // producer only
{
m_last->m_next = new Node( (void*)_ptr);
atomicExchangePtr( (void**)&m_last, m_last->m_next);
while (m_first != m_divider)
{
Node* node = m_first;
m_first = m_first->m_next;
delete node;
}
}
///
void push(void* _ptr);
Ty* peek() // consumer only
{
if (m_divider != m_last)
{
Ty* ptr = (Ty*)m_divider->m_next->m_ptr;
return ptr;
}
///
void* peek();
return NULL;
}
Ty* pop() // consumer only
{
if (m_divider != m_last)
{
Ty* ptr = (Ty*)m_divider->m_next->m_ptr;
atomicExchangePtr( (void**)&m_divider, m_divider->m_next);
return ptr;
}
return NULL;
}
///
void* pop();
private:
struct Node
{
Node(void* _ptr)
: m_ptr(_ptr)
, m_next(NULL)
{
}
///
Node(void* _ptr);
void* m_ptr;
Node* m_next;
@@ -94,8 +51,38 @@ namespace bx
Node* m_last;
};
///
template<typename Ty>
class SpScUnboundedQueueT
{
BX_CLASS(SpScUnboundedQueueT
, NO_COPY
, NO_ASSIGNMENT
);
public:
///
SpScUnboundedQueueT();
///
~SpScUnboundedQueueT();
///
void push(Ty* _ptr);
///
Ty* peek();
///
Ty* pop();
private:
SpScUnboundedQueue m_queue;
};
#if BX_CONFIG_SUPPORTS_THREADING
template <typename Ty>
///
class SpScBlockingUnboundedQueue
{
BX_CLASS(SpScBlockingUnboundedQueue
@@ -104,41 +91,58 @@ namespace bx
);
public:
SpScBlockingUnboundedQueue()
{
}
///
SpScBlockingUnboundedQueue();
~SpScBlockingUnboundedQueue()
{
}
///
~SpScBlockingUnboundedQueue();
void push(Ty* _ptr) // producer only
{
m_queue.push( (void*)_ptr);
m_count.post();
}
///
void push(void* _ptr); // producer only
Ty* peek() // consumer only
{
return (Ty*)m_queue.peek();
}
///
void* peek(); // consumer only
Ty* pop(int32_t _msecs = -1) // consumer only
{
if (m_count.wait(_msecs) )
{
return (Ty*)m_queue.pop();
}
return NULL;
}
///
void* pop(int32_t _msecs = -1); // consumer only
private:
Semaphore m_count;
SpScUnboundedQueue<void> m_queue;
SpScUnboundedQueue m_queue;
};
///
template<typename Ty>
class SpScBlockingUnboundedQueueT
{
BX_CLASS(SpScBlockingUnboundedQueueT
, NO_COPY
, NO_ASSIGNMENT
);
public:
///
SpScBlockingUnboundedQueueT();
///
~SpScBlockingUnboundedQueueT();
///
void push(Ty* _ptr); // producer only
///
Ty* peek(); // consumer only
///
Ty* pop(int32_t _msecs = -1); // consumer only
private:
SpScBlockingUnboundedQueue m_queue;
};
#endif // BX_CONFIG_SUPPORTS_THREADING
} // namespace bx
#include "spscqueue.inl"
#endif // BX_SPSCQUEUE_H_HEADER_GUARD

158
include/bx/spscqueue.inl Normal file
View File

@@ -0,0 +1,158 @@
/*
* Copyright 2010-2017 Branimir Karadzic. All rights reserved.
* License: https://github.com/bkaradzic/bx#license-bsd-2-clause
*/
#ifndef BX_SPSCQUEUE_H_HEADER_GUARD
# error "Must be included from bx/spscqueue.h!"
#endif // BX_SPSCQUEUE_H_HEADER_GUARD
namespace bx
{
// http://drdobbs.com/article/print?articleId=210604448&siteSectionName=
inline SpScUnboundedQueue::SpScUnboundedQueue()
: m_first(new Node(NULL) )
, m_divider(m_first)
, m_last(m_first)
{
}
inline SpScUnboundedQueue::~SpScUnboundedQueue()
{
while (NULL != m_first)
{
Node* node = m_first;
m_first = node->m_next;
delete node;
}
}
inline void SpScUnboundedQueue::push(void* _ptr)
{
m_last->m_next = new Node( (void*)_ptr);
atomicExchangePtr( (void**)&m_last, m_last->m_next);
while (m_first != m_divider)
{
Node* node = m_first;
m_first = m_first->m_next;
delete node;
}
}
inline void* SpScUnboundedQueue::peek()
{
if (m_divider != m_last)
{
return m_divider->m_next->m_ptr;
}
return NULL;
}
inline void* SpScUnboundedQueue::pop()
{
if (m_divider != m_last)
{
void* ptr = m_divider->m_next->m_ptr;
atomicExchangePtr( (void**)&m_divider, m_divider->m_next);
return ptr;
}
return NULL;
}
inline SpScUnboundedQueue::Node::Node(void* _ptr)
: m_ptr(_ptr)
, m_next(NULL)
{
}
template<typename Ty>
inline SpScUnboundedQueueT<Ty>::SpScUnboundedQueueT()
{
}
template<typename Ty>
inline SpScUnboundedQueueT<Ty>::~SpScUnboundedQueueT()
{
}
template<typename Ty>
inline void SpScUnboundedQueueT<Ty>::push(Ty* _ptr)
{
m_queue.push(_ptr);
}
template<typename Ty>
inline Ty* SpScUnboundedQueueT<Ty>::peek()
{
return (Ty*)m_queue.peek();
}
template<typename Ty>
inline Ty* SpScUnboundedQueueT<Ty>::pop()
{
return (Ty*)m_queue.pop();
}
#if BX_CONFIG_SUPPORTS_THREADING
inline SpScBlockingUnboundedQueue::SpScBlockingUnboundedQueue()
{
}
inline SpScBlockingUnboundedQueue::~SpScBlockingUnboundedQueue()
{
}
inline void SpScBlockingUnboundedQueue::push(void* _ptr)
{
m_queue.push( (void*)_ptr);
m_count.post();
}
inline void* SpScBlockingUnboundedQueue::peek()
{
return m_queue.peek();
}
inline void* SpScBlockingUnboundedQueue::pop(int32_t _msecs)
{
if (m_count.wait(_msecs) )
{
return m_queue.pop();
}
return NULL;
}
template<typename Ty>
inline SpScBlockingUnboundedQueueT<Ty>::SpScBlockingUnboundedQueueT()
{
}
template<typename Ty>
inline SpScBlockingUnboundedQueueT<Ty>::~SpScBlockingUnboundedQueueT()
{
}
template<typename Ty>
inline void SpScBlockingUnboundedQueueT<Ty>::push(Ty* _ptr)
{
m_queue.push(_ptr);
}
template<typename Ty>
inline Ty* SpScBlockingUnboundedQueueT<Ty>::peek()
{
return (Ty*)m_queue.peek();
}
template<typename Ty>
inline Ty* SpScBlockingUnboundedQueueT<Ty>::pop(int32_t _msecs)
{
return (Ty*)m_queue.pop();
}
#endif // BX_CONFIG_SUPPORTS_THREADING
} // namespace bx