Skip to content

Commit d2bb6a3

Browse files
committed
[perf] optimize message, with only one copy
1 parent 1b5fff3 commit d2bb6a3

7 files changed

Lines changed: 84 additions & 11 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ int main() {
325325
* 提供`coordinator`(协调)功能
326326

327327
[2023.09.16 - v2.5.2 - Chunel]
328-
* 优化`message`(消息)功能,可以设定写入阻塞时的处理方式
328+
* 优化`message`(消息)功能,可以设定写入阻塞时的处理方式,减少内存copy次数
329329
* 添加`example`相关内容,针对不同行业,提供一些简单实现
330330

331331
</details>

src/GraphCtrl/GraphMessage/GMessage.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,19 @@ class GMessage : public GMessageObject {
4444
queue_.push(value, strategy);
4545
}
4646

47+
/**
48+
* 写入智能指针类型的参数
49+
* @tparam TImpl
50+
* @param value
51+
* @param strategy
52+
* @return
53+
*/
54+
template<class TImpl,
55+
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
56+
CVoid send(std::unique_ptr<TImpl>& value, GMessagePushStrategy strategy) {
57+
queue_.push(value, strategy);
58+
}
59+
4760
/**
4861
* 获取参数
4962
* @param value

src/GraphCtrl/GraphMessage/GMessageManager.h

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,33 @@ class GMessageManager : public GMessageObject,
155155
CGRAPH_FUNCTION_END
156156
}
157157

158+
/**
159+
* 根据传入的topic,输入智能指针类型的信息
160+
* @tparam TImpl
161+
* @param topic
162+
* @param value
163+
* @param strategy
164+
* @return
165+
*/
166+
template<typename TImpl,
167+
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
168+
CStatus sendTopicValue(const std::string& topic,
169+
std::unique_ptr<TImpl>& value,
170+
GMessagePushStrategy strategy) {
171+
CGRAPH_FUNCTION_BEGIN
172+
auto innerTopic = SEND_RECV_PREFIX + topic;
173+
auto result = send_recv_message_map_.find(innerTopic);
174+
if (result == send_recv_message_map_.end()) {
175+
CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic");
176+
}
177+
178+
auto message = static_cast<GMessagePtr<T> >(result->second);
179+
CGRAPH_ASSERT_NOT_NULL(message);
180+
181+
message->send(value, strategy);
182+
CGRAPH_FUNCTION_END
183+
}
184+
158185
/**
159186
* 绑定对应的topic信息,并且获取 conn_id 信息
160187
* @tparam TImpl
@@ -165,10 +192,10 @@ class GMessageManager : public GMessageObject,
165192
template<typename TImpl,
166193
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
167194
CIndex bindTopic(const std::string& topic, CUint size) {
168-
CGRAPH_LOCK_GUARD lock(bind_mutex_);
169195
auto innerTopic = PUB_SUB_PREFIX + topic;
170196
auto message = UAllocator::safeMallocTemplateCObject<GMessage<TImpl>, CUint>(size);
171197

198+
CGRAPH_LOCK_GUARD lock(bind_mutex_);
172199
CIndex connId = (++cur_conn_id_);
173200
auto result = pub_sub_message_map_.find(innerTopic);
174201
if (result != pub_sub_message_map_.end()) {

src/UtilsCtrl/ThreadPool/Queue/UAtomicRingBufferQueue.h

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,36 @@ class UAtomicRingBufferQueue : public UQueueObject {
8181
pop_cv_.notify_one();
8282
}
8383

84+
/**
85+
* 写入智能指针类型的信息
86+
* @tparam TImpl
87+
* @param value
88+
* @param strategy
89+
* @return
90+
*/
91+
template<class TImpl = T>
92+
CVoid push(std::unique_ptr<TImpl>& value, URingBufferPushStrategy strategy) {
93+
{
94+
CGRAPH_UNIQUE_LOCK lk(mutex_);
95+
if (isFull()) {
96+
switch (strategy) {
97+
case URingBufferPushStrategy::WAIT:
98+
push_cv_.wait(lk, [this] { return !isFull(); });
99+
break;
100+
case URingBufferPushStrategy::REPLACE:
101+
head_ = (head_ + 1) % capacity_;
102+
break;
103+
case URingBufferPushStrategy::DROP:
104+
return; // 直接返回,不写入即可
105+
}
106+
}
107+
108+
ring_buffer_queue_[tail_] = std::move(value);
109+
tail_ = (tail_ + 1) % capacity_;
110+
}
111+
pop_cv_.notify_one();
112+
}
113+
84114
/**
85115
* 等待弹出信息
86116
* @param value

tutorial/MyGNode/MyRecvMessageNode.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515
class MyRecvMessageNode : public CGraph::GNode {
1616
public:
1717
CStatus run() override {
18-
MyMessageParam mp; // 接收一个消息
18+
std::unique_ptr<MyMessageParam> mp = nullptr; // 接收一个消息
1919
CStatus status = CGRAPH_RECV_MPARAM(MyMessageParam, "send-recv", mp);
2020
if (!status.isOK()) {
2121
CGraph::CGRAPH_ECHO("MySubMessageNode sub message error");
2222
return status;
2323
}
2424

25-
CGraph::CGRAPH_ECHO("num = [%d], info = [%s]", mp.num, mp.info.c_str());
25+
CGraph::CGRAPH_ECHO("num = [%d], info = [%s]", mp->num, mp->info.c_str());
2626
return status;
2727
}
2828
};

tutorial/MyGNode/MySendMessageNode.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,15 @@
1515
class MySendMessageNode : public CGraph::GNode {
1616
public:
1717
CStatus run() override {
18-
MyMessageParam mp; // 创建一个消息,并且发送出去
19-
mp.num = (num_++) * 10;
20-
mp.info = "this is a test send info, num = " + std::to_string(mp.num);
18+
/**
19+
* 可以使用 MyMessageParam mp; 构建值的方式传递
20+
* 推荐使用 unique_ptr 的方式,进行 send 和 recv
21+
* 可以减少内存copy次数
22+
*/
23+
std::unique_ptr<MyMessageParam> mp(new MyMessageParam());
24+
25+
mp->num = (num_++) * 10;
26+
mp->info = "this is a test send info, num = " + std::to_string(mp->num);
2127
/**
2228
* 在v2.5.1版本,增加了 GMessagePushStrategy 策略,不兼容之前版本
2329
* 如果需要跟之前逻辑保持一致,直接设定 CGraph::GMessagePushStrategy::WAIT 即可

tutorial/MyParams/MyMessageParam.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@ struct MyMessageParam : public CGraph::GMessageParam {
1515
int num = 0;
1616
std::string info;
1717

18-
explicit MyMessageParam() {
19-
num = 0;
20-
info = "";
21-
}
18+
explicit MyMessageParam() = default;
2219

2320
/**
2421
* 注意:

0 commit comments

Comments
 (0)