forked from sttp/cppapi
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathThreadPool.cpp
More file actions
150 lines (120 loc) · 4.2 KB
/
ThreadPool.cpp
File metadata and controls
150 lines (120 loc) · 4.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
//******************************************************************************************************
// ThreadPool.cpp - Gbtc
//
// Copyright © 2020, Grid Protection Alliance. All Rights Reserved.
//
// Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
// the NOTICE file distributed with this work for additional information regarding copyright ownership.
// The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
// file except in compliance with the License. You may obtain a copy of the License at:
//
// http://opensource.org/licenses/MIT
//
// Unless agreed to in writing, the subject software distributed under the License is distributed on an
// "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
// License for the specific language governing permissions and limitations.
//
// Code Modification History:
// ----------------------------------------------------------------------------------------------------
// 12/27/2020 - J. Ritchie Carroll
// Generated original version of source code.
//
//******************************************************************************************************
#include "ThreadPool.h"
using namespace std;
using namespace sttp;
ThreadPool::ThreadPool() :
m_disposing(false)
{
m_removeCompletedTimersThread = Thread([this]()
{
while (!m_disposing)
{
m_completedTimers.WaitForData();
if (m_disposing)
break;
// Dequeue next completed timer
const TimerPtr timer = m_completedTimers.Dequeue();
while (timer->IsRunning() && !m_disposing)
ThreadSleep(10);
if (m_disposing)
break;
// Remove timer from reference set
ScopeLock lock(m_waitTimersLock);
m_waitTimers.erase(timer);
}
});
}
ThreadPool::~ThreadPool() noexcept
{
try
{
ShutDown();
}
catch (...)
{
// ReSharper disable once CppRedundantControlFlowJump
return;
}
}
void ThreadPool::ShutDown()
{
if (m_disposing)
return;
m_disposing = true;
m_completedTimers.Release();
m_removeCompletedTimersThread.join();
ScopeLock lock(m_waitTimersLock);
for (const auto& currentTimer : m_waitTimers)
currentTimer->Stop(); // This joins thread if running
}
void ThreadPool::Queue(const std::function<void()>& action)
{
Queue(0, nullptr, [action](void*){ action(); });
}
void ThreadPool::Queue(void* state, const std::function<void(void*)>& action)
{
Queue(0, state, action);
}
void ThreadPool::Queue(const uint32_t delay, const std::function<void()>& action)
{
Queue(delay, nullptr, [action](void*){ action(); });
}
void ThreadPool::Queue(const uint32_t delay, void* state, const std::function<void(void*)>& action)
{
if (m_disposing)
return;
const TimerPtr waitTimer = NewSharedPtr<Timer>(delay, [&,this,action,state](const TimerPtr& timer, void*)
{
if (m_disposing)
return;
// Execute action after specified delay (zero for immediate execution)
if (action != nullptr)
action(state);
ScopeLock lock(m_waitTimersLock);
if (m_disposing)
return;
TimerPtr targetTimer = Timer::NullPtr;
// Find this timer in reference set
for (const auto& currentTimer : m_waitTimers)
{
if (currentTimer == timer)
{
targetTimer = currentTimer;
break;
}
}
if (targetTimer == nullptr || m_disposing)
return;
// Queue timer for removal from reference set which will cause its immediate
// destruction - cannot remove here before timer callback completes
m_completedTimers.Enqueue(targetTimer);
});
ScopeLock lock(m_waitTimersLock);
if (m_disposing)
return;
// Keep a reference to timer in a set so it will have scope beyond this function
m_waitTimers.insert(waitTimer);
// Start timer
waitTimer->Start();
}