Skip to content

Commit f85b486

Browse files
committed
[perf] simplify some code.
1 parent a64abf1 commit f85b486

5 files changed

Lines changed: 33 additions & 46 deletions

File tree

src/GraphCtrl/GraphElement/GElement.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ CStatus GElement::fatProcessor(const CFunctionType& type) {
221221
switch (type) {
222222
case CFunctionType::RUN: {
223223
if (0 == trigger_times_) {
224-
/** 第一次执行的时候, */
224+
/** 第一次执行的时候,预先执行一下 prepareRun方法 */
225225
status = prepareRun();
226226
CGRAPH_FUNCTION_CHECK_STATUS
227227
}
@@ -232,7 +232,7 @@ CStatus GElement::fatProcessor(const CFunctionType& type) {
232232
status = doAspect(GAspectType::BEGIN_RUN);
233233
CGRAPH_FUNCTION_CHECK_STATUS
234234
do {
235-
status = (!isAsync()) ? run() : asyncRun();
235+
status = isAsync() ? asyncRun() : run();
236236
/**
237237
* 在实际run结束之后,首先需要判断一下是否进入yield状态了。
238238
* 接下来,如果状态是ok的,并且被条件hold住,则循环执行

src/UtilsCtrl/ThreadPool/Queue/UWorkStealingQueue.h

Lines changed: 15 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ class UWorkStealingQueue : public UQueueObject {
2525
*/
2626
CVoid push(T&& value) {
2727
while (true) {
28-
if (lock_.try_lock()) {
28+
if (mutex_.try_lock()) {
2929
deque_.emplace_back(std::forward<T>(value));
30-
lock_.unlock();
30+
mutex_.unlock();
3131
break;
3232
} else {
3333
std::this_thread::yield();
@@ -43,9 +43,9 @@ class UWorkStealingQueue : public UQueueObject {
4343
*/
4444
CBool tryPush(T&& value) {
4545
CBool result = false;
46-
if (lock_.try_lock()) {
46+
if (mutex_.try_lock()) {
4747
deque_.emplace_back(std::forward<T>(value));
48-
lock_.unlock();
48+
mutex_.unlock();
4949
result = true;
5050
}
5151
return result;
@@ -56,13 +56,13 @@ class UWorkStealingQueue : public UQueueObject {
5656
* 向队列中写入信息
5757
* @param values
5858
*/
59-
CVoid push(std::vector<T>& values) {
59+
CVoid push(const std::vector<T>& values) {
6060
while (true) {
61-
if (lock_.try_lock()) {
61+
if (mutex_.try_lock()) {
6262
for (auto& value : values) {
6363
deque_.emplace_back(value);
6464
}
65-
lock_.unlock();
65+
mutex_.unlock();
6666
break;
6767
} else {
6868
std::this_thread::yield();
@@ -71,24 +71,6 @@ class UWorkStealingQueue : public UQueueObject {
7171
}
7272

7373

74-
/**
75-
* 尝试批量写入内容
76-
* @param values
77-
* @return
78-
*/
79-
CBool tryPush(std::vector<T>& values) {
80-
CBool result = false;
81-
if (lock_.try_lock()) {
82-
for (const auto& value : values) {
83-
deque_.emplace_back(std::forward<T>(value));
84-
}
85-
lock_.unlock();
86-
result = true;
87-
}
88-
return result;
89-
}
90-
91-
9274
/**
9375
* 弹出节点,从头部进行
9476
* @param value
@@ -97,13 +79,13 @@ class UWorkStealingQueue : public UQueueObject {
9779
CBool tryPop(T& value) {
9880
// 这里不使用raii锁,主要是考虑到多线程的情况下,可能会重复进入
9981
bool result = false;
100-
if (!deque_.empty() && lock_.try_lock()) {
82+
if (!deque_.empty() && mutex_.try_lock()) {
10183
if (!deque_.empty()) {
10284
value = std::forward<T>(deque_.front()); // 从前方弹出
10385
deque_.pop_front();
10486
result = true;
10587
}
106-
lock_.unlock();
88+
mutex_.unlock();
10789
}
10890

10991
return result;
@@ -118,13 +100,13 @@ class UWorkStealingQueue : public UQueueObject {
118100
*/
119101
CBool tryPop(std::vector<T>& values, int maxLocalBatchSize) {
120102
bool result = false;
121-
if (!deque_.empty() && lock_.try_lock()) {
103+
if (!deque_.empty() && mutex_.try_lock()) {
122104
while (!deque_.empty() && maxLocalBatchSize--) {
123105
values.emplace_back(std::forward<T>(deque_.front()));
124106
deque_.pop_front();
125107
result = true;
126108
}
127-
lock_.unlock();
109+
mutex_.unlock();
128110
}
129111

130112
return result;
@@ -138,13 +120,13 @@ class UWorkStealingQueue : public UQueueObject {
138120
*/
139121
CBool trySteal(T& value) {
140122
bool result = false;
141-
if (!deque_.empty() && lock_.try_lock()) {
123+
if (!deque_.empty() && mutex_.try_lock()) {
142124
if (!deque_.empty()) {
143125
value = std::forward<T>(deque_.back()); // 从后方窃取
144126
deque_.pop_back();
145127
result = true;
146128
}
147-
lock_.unlock();
129+
mutex_.unlock();
148130
}
149131

150132
return result;
@@ -158,13 +140,13 @@ class UWorkStealingQueue : public UQueueObject {
158140
*/
159141
CBool trySteal(std::vector<T>& values, int maxStealBatchSize) {
160142
bool result = false;
161-
if (!deque_.empty() && lock_.try_lock()) {
143+
if (!deque_.empty() && mutex_.try_lock()) {
162144
while (!deque_.empty() && maxStealBatchSize--) {
163145
values.emplace_back(std::forward<T>(deque_.back()));
164146
deque_.pop_back();
165147
result = true;
166148
}
167-
lock_.unlock();
149+
mutex_.unlock();
168150
}
169151

170152
return result; // 如果非空,表示盗取成功
@@ -176,7 +158,6 @@ class UWorkStealingQueue : public UQueueObject {
176158

177159
private:
178160
std::deque<T> deque_; // 存放任务的双向队列
179-
std::mutex lock_; // 用于处理deque_的锁
180161
};
181162

182163
CGRAPH_NAMESPACE_END

src/UtilsCtrl/ThreadPool/Task/UTask.h

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,30 @@
1818
CGRAPH_NAMESPACE_BEGIN
1919

2020
class UTask : public UThreadObject {
21-
struct taskBased {
22-
explicit taskBased() = default;
21+
struct TaskBased {
22+
explicit TaskBased() = default;
2323
virtual CVoid call() = 0;
24-
virtual ~taskBased() = default;
24+
virtual ~TaskBased() = default;
2525
};
2626

2727
// 退化以获得实际类型,修改思路参考:https://github.com/ChunelFeng/CThreadPool/pull/3
2828
template<typename F, typename T = typename std::decay<F>::type>
29-
struct taskDerided : taskBased {
29+
struct TaskDerided : TaskBased {
3030
T func_;
31-
explicit taskDerided(F&& func) : func_(std::forward<F>(func)) {}
31+
explicit TaskDerided(F&& func) : func_(std::forward<F>(func)) {}
3232
CVoid call() override { func_(); }
3333
};
3434

3535
public:
3636
template<typename F>
37-
UTask(F&& f, int priority = 0)
38-
: impl_(new taskDerided<F>(std::forward<F>(f)))
37+
UTask(F&& func, int priority = 0)
38+
: impl_(new TaskDerided<F>(std::forward<F>(func)))
3939
, priority_(priority) {}
4040

4141
CVoid operator()() {
42-
impl_->call();
42+
if (likely(impl_)) {
43+
impl_->call();
44+
}
4345
}
4446

4547
UTask() = default;
@@ -65,7 +67,7 @@ class UTask : public UThreadObject {
6567
CGRAPH_NO_ALLOWED_COPY(UTask)
6668

6769
private:
68-
std::unique_ptr<taskBased> impl_ = nullptr;
70+
std::unique_ptr<TaskBased> impl_ = nullptr;
6971
int priority_ = 0; // 任务的优先级信息
7072
};
7173

src/UtilsCtrl/ThreadPool/UThreadPool.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class UThreadPool : public UThreadObject {
7878
template<typename FunctionType>
7979
auto commitWithPriority(const FunctionType& task,
8080
int priority)
81-
-> std::future<decltype(std::declval<FunctionType>()())>;;
81+
-> std::future<decltype(std::declval<FunctionType>()())>;
8282

8383
/**
8484
* 执行任务组信息

tutorial/T22-Timeout.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ void tutorial_timeout() {
4141
std::cout << "**** T22-timeout pipeline run error info : " << status.getInfo() << std::endl;
4242
}
4343

44+
c->setTimeout(0); // 设置回没有timeout的情况,重新执行,确认结果是正常的。
45+
status = pipeline->process();
46+
std::cout << "**** T22-timeout return to no timeout, error code is : " << status.getCode() << std::endl;
47+
4448
GPipelineFactory::remove(pipeline);
4549
}
4650

0 commit comments

Comments
 (0)