This commit is contained in:
Branimir Karadžić
2017-02-16 21:32:40 -08:00
parent 0a48f50929
commit 1935b6a9fa
3 changed files with 128 additions and 39 deletions

View File

@@ -6,48 +6,42 @@
#ifndef BX_MPSCQUEUE_H_HEADER_GUARD
#define BX_MPSCQUEUE_H_HEADER_GUARD
#include "mutex.h"
#include "spscqueue.h"
namespace bx
{
///
template <typename Ty>
class MpScUnboundedQueue
class MpScUnboundedQueueT
{
BX_CLASS(MpScUnboundedQueue
BX_CLASS(MpScUnboundedQueueT
, NO_COPY
, NO_ASSIGNMENT
);
public:
MpScUnboundedQueue()
{
}
///
MpScUnboundedQueueT();
~MpScUnboundedQueue()
{
}
///
~MpScUnboundedQueueT();
void push(Ty* _ptr) // producer only
{
MutexScope lock(m_write);
m_queue.push(_ptr);
}
///
void push(Ty* _ptr); // producer only
Ty* peek() // consumer only
{
return m_queue.peek();
}
///
Ty* peek(); // consumer only
Ty* pop() // consumer only
{
return m_queue.pop();
}
///
Ty* pop(); // consumer only
private:
Mutex m_write;
SpScUnboundedQueue<Ty> m_queue;
SpScUnboundedQueueT<Ty> m_queue;
};
///
template <typename Ty>
class MpScUnboundedBlockingQueue
{
@@ -57,31 +51,25 @@ namespace bx
);
public:
MpScUnboundedBlockingQueue()
{
}
///
MpScUnboundedBlockingQueue();
~MpScUnboundedBlockingQueue()
{
}
///
~MpScUnboundedBlockingQueue();
void push(Ty* _ptr) // producer only
{
m_queue.push(_ptr);
m_sem.post();
}
///
void push(Ty* _ptr); // producer only
Ty* pop() // consumer only
{
m_sem.wait();
return m_queue.pop();
}
///
Ty* pop(); // consumer only
private:
MpScUnboundedQueue<Ty> m_queue;
MpScUnboundedQueueT<Ty> m_queue;
Semaphore m_sem;
};
} // namespace bx
#include "mpscqueue.inl"
#endif // BX_MPSCQUEUE_H_HEADER_GUARD

67
include/bx/mpscqueue.inl Normal file
View File

@@ -0,0 +1,67 @@
/*
* Copyright 2010-2017 Branimir Karadzic. All rights reserved.
* License: https://github.com/bkaradzic/bx#license-bsd-2-clause
*/
#ifndef BX_MPSCQUEUE_H_HEADER_GUARD
# error "Must be included from bx/mpscqueue.h!"
#endif // BX_MPSCQUEUE_H_HEADER_GUARD
#include "spscqueue.h"
namespace bx
{
template <typename Ty>
inline MpScUnboundedQueueT<Ty>::MpScUnboundedQueueT()
{
}
template <typename Ty>
inline MpScUnboundedQueueT<Ty>::~MpScUnboundedQueueT()
{
}
template <typename Ty>
inline void MpScUnboundedQueueT<Ty>::push(Ty* _ptr)
{
MutexScope lock(m_write);
m_queue.push(_ptr);
}
template <typename Ty>
inline Ty* MpScUnboundedQueueT<Ty>::peek()
{
return m_queue.peek();
}
template <typename Ty>
inline Ty* MpScUnboundedQueueT<Ty>::pop()
{
return m_queue.pop();
}
template <typename Ty>
inline MpScUnboundedBlockingQueue<Ty>::MpScUnboundedBlockingQueue()
{
}
template <typename Ty>
inline MpScUnboundedBlockingQueue<Ty>::~MpScUnboundedBlockingQueue()
{
}
template <typename Ty>
inline void MpScUnboundedBlockingQueue<Ty>::push(Ty* _ptr)
{
m_queue.push(_ptr);
m_sem.post();
}
template <typename Ty>
inline Ty* MpScUnboundedBlockingQueue<Ty>::pop()
{
m_sem.wait();
return m_queue.pop();
}
} // namespace bx

34
tests/queue_test.cpp Normal file
View File

@@ -0,0 +1,34 @@
/*
* Copyright 2010-2017 Branimir Karadzic. All rights reserved.
* License: https://github.com/bkaradzic/bx#license-bsd-2-clause
*/
#include "test.h"
#include <bx/spscqueue.h>
#include <bx/mpscqueue.h>
void* bitsToPtr(uintptr_t _ui)
{
union { uintptr_t ui; void* ptr; } cast = { _ui };
return cast.ptr;
}
uintptr_t ptrToBits(void* _ptr)
{
union { void* ptr; uintptr_t ui; } cast = { _ptr };
return cast.ui;
}
TEST_CASE("SpSc", "")
{
bx::SpScUnboundedQueue queue;
queue.push(bitsToPtr(0xdeadbeef) );
REQUIRE(0xdeadbeef == ptrToBits(queue.pop() ) );
}
TEST_CASE("MpSc", "")
{
bx::MpScUnboundedQueueT<void> queue;
queue.push(bitsToPtr(0xdeadbeef) );
REQUIRE(0xdeadbeef == ptrToBits(queue.pop() ) );
}