Skip to content

Commit 583cf40

Browse files
committed
[perf] optimize all parallel dag speed. origin 21.3s, now 12.4s
1 parent 5399db3 commit 583cf40

4 files changed

Lines changed: 43 additions & 6 deletions

File tree

src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,13 @@ CStatus GDynamicEngine::setup(const GSortedGElementPtrSet& elements) {
3737

3838
CStatus GDynamicEngine::run() {
3939
CGRAPH_FUNCTION_BEGIN
40-
beforeRun();
41-
asyncRunAndWait();
4240

41+
if (likely(total_end_size_ != total_element_arr_.size())) {
42+
beforeRun();
43+
asyncRunAndWait();
44+
} else {
45+
parallelRunAll();
46+
}
4347
status = cur_status_;
4448
CGRAPH_FUNCTION_END
4549
}
@@ -154,4 +158,25 @@ CVoid GDynamicEngine::wait() {
154158
});
155159
}
156160

161+
162+
CVoid GDynamicEngine::parallelRunAll() {
163+
/**
164+
* 主要适用于dag是纯并发逻辑的情况
165+
* 直接并发的执行所有的流程,从而减少调度损耗
166+
* 实测效果,在32路纯并行的情况下,整体耗时从 21.5s降低到 12.5s
167+
* 非纯并行逻辑,不走此函数
168+
*/
169+
std::vector<std::future<CStatus>> futures;
170+
futures.reserve(front_element_arr_.size());
171+
for (auto* element : front_element_arr_) {
172+
futures.emplace_back(thread_pool_->commit([element] {
173+
return element->fatProcessor(CFunctionType::RUN);
174+
}, calcIndex(element)));
175+
}
176+
177+
for (auto& fut : futures) {
178+
cur_status_ += fut.get();
179+
}
180+
}
181+
157182
CGRAPH_NAMESPACE_END

src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,18 @@ class GDynamicEngine : public GEngine {
6262
*/
6363
CVoid wait();
6464

65+
/**
66+
* 并发的执行所有的节点
67+
* @return
68+
*/
69+
CVoid parallelRunAll();
70+
6571
private:
6672
GElementPtrArr total_element_arr_; // pipeline中所有的元素信息集合
6773
GElementPtrArr front_element_arr_; // 没有依赖的元素信息
6874
CSize total_end_size_ = 0; // 图结束节点数量
6975
CSize finished_end_size_ = 0; // 执行结束节点数量
70-
std::atomic<CSize> run_element_size_ { 0 }; // 执行元素的个数,用于后期校验。这里和静态不一样,需要加atomic
76+
std::atomic<CSize> run_element_size_ { 0 }; // 执行元素的个数,用于后期校验。这里和静态不一样,需要加atomic
7177
CStatus cur_status_; // 当前全局的状态信息
7278

7379
std::mutex lock_;

src/GraphCtrl/GraphParam/GParamManager.inl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ CStatus GParamManager::create(const std::string& key, CBool backtrace) {
3737
template<typename T,
3838
c_enable_if_t<std::is_base_of<GParam, T>::value, int>>
3939
T* GParamManager::get(const std::string& key) {
40-
CGRAPH_LOCK_GUARD lock(this->mutex_);
4140
auto result = params_map_.find(key);
4241
if (result == params_map_.end()) {
4342
return nullptr;

tutorial/MyGNode/MyWriteParamNode.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
class MyWriteParamNode : public CGraph::GNode {
1616
public:
17-
CStatus init () override {
17+
CStatus init() override {
1818
CStatus status;
1919
/**
2020
* 推荐在init()中,将可能用到的参数创建好。也支持在run的时候创建
@@ -24,7 +24,14 @@ class MyWriteParamNode : public CGraph::GNode {
2424
return status;
2525
}
2626

27-
CStatus run () override {
27+
CStatus run() override {
28+
/**
29+
* 为了提高执行效率,
30+
* 在【创建参数】的时候,【提供】锁保护机制
31+
* 在【获取参数】的时候,【不提供】锁保护的机制
32+
* 故无法通过在run()过程中,并发的通过 createGParam 和 getGParam 创建和获取参数
33+
* 如果需要做此操作,请自行外部加锁
34+
*/
2835
auto* myParam = CGRAPH_GET_GPARAM_WITH_NO_EMPTY(MyParam, "param1")
2936
int val = 0;
3037
int cnt = 0;

0 commit comments

Comments
 (0)