Skip to content

Commit 4b87834

Browse files
committed
更新消息队列
1 parent 6589a37 commit 4b87834

8 files changed

Lines changed: 197 additions & 7 deletions

File tree

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#ifndef _ARRAYLOCKFREEQUEUE_H___
2+
#define _ARRAYLOCKFREEQUEUE_H___
3+
4+
#include <stdint.h>
5+
6+
#ifdef _WIN64
7+
#define QUEUE_INT LONG64
8+
#else
9+
#define QUEUE_INT unsigned long
10+
#endif
11+
12+
#define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65535 // 2^16
13+
14+
template <typename ELEM_T, QUEUE_INT Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE>
15+
class ArrayLockFreeQueue
16+
{
17+
public:
18+
19+
ArrayLockFreeQueue();
20+
virtual ~ArrayLockFreeQueue();
21+
22+
QUEUE_INT size();
23+
24+
bool enqueue(const ELEM_T &a_data);
25+
26+
bool dequeue(ELEM_T &a_data);
27+
28+
bool try_dequeue(ELEM_T &a_data);
29+
30+
private:
31+
32+
ELEM_T m_thequeue[Q_SIZE];
33+
34+
volatile QUEUE_INT m_count;
35+
volatile QUEUE_INT m_writeIndex;
36+
37+
volatile QUEUE_INT m_readIndex;
38+
39+
volatile QUEUE_INT m_maximumReadIndex;
40+
41+
inline QUEUE_INT countToIndex(QUEUE_INT a_count);
42+
};
43+
44+
#include "ArrayLockFreeQueueImp.h"
45+
46+
#endif
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
#ifndef _ARRAYLOCKFREEQUEUEIMP_H___
2+
#define _ARRAYLOCKFREEQUEUEIMP_H___
3+
#include "ArrayLockFreeQueue.h"
4+
5+
#include <assert.h>
6+
#include "atom_opt.h"
7+
8+
template <typename ELEM_T, QUEUE_INT Q_SIZE>
9+
ArrayLockFreeQueue<ELEM_T, Q_SIZE>::ArrayLockFreeQueue() :
10+
m_writeIndex(0),
11+
m_readIndex(0),
12+
m_maximumReadIndex(0)
13+
{
14+
15+
m_count = 0;
16+
17+
}
18+
19+
template <typename ELEM_T, QUEUE_INT Q_SIZE>
20+
ArrayLockFreeQueue<ELEM_T, Q_SIZE>::~ArrayLockFreeQueue()
21+
{
22+
23+
}
24+
25+
template <typename ELEM_T, QUEUE_INT Q_SIZE>
26+
inline QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(QUEUE_INT a_count)
27+
{
28+
return (a_count % Q_SIZE);
29+
}
30+
31+
template <typename ELEM_T, QUEUE_INT Q_SIZE>
32+
QUEUE_INT ArrayLockFreeQueue<ELEM_T, Q_SIZE>::size()
33+
{
34+
QUEUE_INT currentWriteIndex = m_writeIndex;
35+
QUEUE_INT currentReadIndex = m_readIndex;
36+
37+
if(currentWriteIndex>=currentReadIndex)
38+
return currentWriteIndex - currentReadIndex;
39+
else
40+
return Q_SIZE + currentWriteIndex - currentReadIndex;
41+
42+
}
43+
44+
template <typename ELEM_T, QUEUE_INT Q_SIZE>
45+
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data)
46+
{
47+
QUEUE_INT currentWriteIndex;
48+
QUEUE_INT currentReadIndex;
49+
do
50+
{
51+
currentWriteIndex = m_writeIndex;
52+
currentReadIndex = m_readIndex;
53+
if(countToIndex(currentWriteIndex + 1) ==
54+
countToIndex(currentReadIndex))
55+
{
56+
return false;
57+
}
58+
} while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));
59+
60+
m_thequeue[countToIndex(currentWriteIndex)] = a_data;
61+
62+
while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
63+
{
64+
sched_yield();
65+
}
66+
67+
AtomicAdd(&m_count, 1);
68+
69+
return true;
70+
71+
}
72+
73+
template <typename ELEM_T, QUEUE_INT Q_SIZE>
74+
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::try_dequeue(ELEM_T &a_data)
75+
{
76+
return dequeue(a_data);
77+
}
78+
79+
template <typename ELEM_T, QUEUE_INT Q_SIZE>
80+
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data)
81+
{
82+
QUEUE_INT currentMaximumReadIndex;
83+
QUEUE_INT currentReadIndex;
84+
85+
do
86+
{
87+
currentReadIndex = m_readIndex;
88+
currentMaximumReadIndex = m_maximumReadIndex;
89+
90+
if(countToIndex(currentReadIndex) ==
91+
countToIndex(currentMaximumReadIndex))
92+
{
93+
return false;
94+
}
95+
96+
a_data = m_thequeue[countToIndex(currentReadIndex)];
97+
98+
if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
99+
{
100+
AtomicSub(&m_count, 1);
101+
return true;
102+
}
103+
} while(true);
104+
105+
assert(0);
106+
107+
return false;
108+
109+
}
110+
111+
#endif

QuantBox_Queue/MsgQueue.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#include "stdafx.h"
22
#include "MsgQueue.h"
33

4-
CMsgQueue::CMsgQueue():m_queue(1024)
4+
CMsgQueue::CMsgQueue()//:m_queue(1024)
55
{
66
m_hThread = nullptr;
77
m_bRunning = false;
@@ -71,10 +71,13 @@ void CMsgQueue::StartThread()
7171
void CMsgQueue::StopThread()
7272
{
7373
m_bRunning = false;
74+
this_thread::sleep_for(chrono::milliseconds(1));
7475
m_cv.notify_all();
76+
this_thread::sleep_for(chrono::milliseconds(1));
7577
lock_guard<mutex> cl(m_mtx_del);
7678
if(m_hThread)
7779
{
80+
//m_cv.notify_all();
7881
m_hThread->join();
7982
delete m_hThread;
8083
m_hThread = nullptr;
@@ -92,7 +95,7 @@ void CMsgQueue::RunInThread()
9295
else
9396
{
9497
// 空闲时等1ms,如果立即有事件过来就晚了1ms
95-
//this_thread::sleep_for(chrono::milliseconds(1));
98+
//this_thread::sleep_for(chrono::milliseconds(1));
9699

97100
// 空闲时过来等1ms,没等到就回去再试
98101
// 如过正好等到了,就立即去试,应当会快一点吧?

QuantBox_Queue/MsgQueue.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
#include "../include/ApiStruct.h"
1414

1515
//#include "readerwriterqueue.h"
16-
#include "concurrentqueue.h"
16+
//#include "concurrentqueue.h"
17+
#include "ArrayLockFreeQueue.h"
18+
1719

1820
using namespace std;
19-
using namespace moodycamel;
21+
//using namespace moodycamel;
2022

2123
#pragma warning(push)
2224
#pragma warning(disable:4251)
@@ -222,7 +224,9 @@ class DLL_PUBLIC CMsgQueue
222224
thread* m_hThread;
223225

224226
private:
225-
ConcurrentQueue<ResponeItem*> m_queue;
227+
// ConcurrentQueue<ResponeItem*> m_queue;
228+
ArrayLockFreeQueue<ResponeItem*> m_queue;
229+
226230
fnOnRespone m_fnOnRespone;
227231
void* m_pClass;
228232
};

QuantBox_Queue/QuantBox_Queue.vcxproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@
9595
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
9696
<LinkIncremental>true</LinkIncremental>
9797
<TargetName>QuantBox_Queue_x86</TargetName>
98-
<OutDir>$(SolutionDir)$(Configuration)\</OutDir>
98+
<OutDir>C:\Program Files\SmartQuant Ltd\OpenQuant 2014\XAPI\LTS\x86</OutDir>
9999
</PropertyGroup>
100100
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
101101
<LinkIncremental>true</LinkIncremental>

QuantBox_Queue/atom_opt.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#ifndef _ATOM_OPT_H___
2+
#define _ATOM_OPT_H___
3+
4+
#ifdef __GNUC__
5+
#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)
6+
#define AtomicAdd(a_ptr,a_count) __sync_fetch_and_add (a_ptr, a_count)
7+
#define AtomicSub(a_ptr,a_count) __sync_fetch_and_sub (a_ptr, a_count)
8+
#include <sched.h> // sched_yield()
9+
#else
10+
11+
#include <Windows.h>
12+
#ifdef _WIN64
13+
#define CAS(a_ptr, a_oldVal, a_newVal) (a_oldVal == InterlockedCompareExchange64(a_ptr, a_newVal, a_oldVal))
14+
#define sched_yield() SwitchToThread()
15+
#define AtomicAdd(a_ptr, num) InterlockedIncrement64(a_ptr)
16+
#define AtomicSub(a_ptr, num) InterlockedDecrement64(a_ptr)
17+
#else
18+
#define CAS(a_ptr, a_oldVal, a_newVal) (a_oldVal == InterlockedCompareExchange(a_ptr, a_newVal, a_oldVal))
19+
#define sched_yield() SwitchToThread()
20+
#define AtomicAdd(a_ptr, num) InterlockedIncrement(a_ptr)
21+
#define AtomicSub(a_ptr, num) InterlockedDecrement(a_ptr)
22+
#endif
23+
24+
#endif
25+
26+
#endif

QuantBox_Queue/main.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ void* __stdcall XRequest(char type, void* pApi1, void* pApi2, double double1, do
2020
case GetApiType:
2121
return nullptr;
2222
case GetApiVersion:
23-
return (void*)"0.3.0.20150402";
23+
return (void*)"0.4.0.20150526";
2424
case GetApiName:
2525
return (void*)"Queue";
2626
case Create:

lib/QuantBox_Queue_x86.lib

1.66 KB
Binary file not shown.

0 commit comments

Comments
 (0)