-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstart-node-command.txt
More file actions
228 lines (228 loc) · 8.31 KB
/
start-node-command.txt
File metadata and controls
228 lines (228 loc) · 8.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
@startuml
:main;
:reth::cli::run()|
note right
Cli<()>::parse().run,
----
//借助clap做参数解析//;
end note
:tracing[init log];
:match_command(Node);
:runner[run_command_until_exit];
note right#red: 获取(context、tokio_runtime、task_manager)
#pink:NodeCommand::execute();
:raise_fd_limit();
note right:限制进程的fd,fdlimit crate
partition "dir path handle(**data_dir**)"{
:handle_dir;
note right:处理数据目录
:load_config(reth.toml);
:db_path(init_db);
note right
init_db_by(db_path, log_level)
----
//Arc<DatabaseEnv>类型的db,type DatabaseEnv = Env<WriteMap>//
位于crate/storage/db
end note
}
:start_metrics_endpoint**.await**(clone_db);
note right: prometheus_exporter::initialize 和NodeCommand.metrics配置指定
:init_genesis_by(clone_db, node_command.chain);
note right: chain的类型Arc<ChainSpec>
:display_chain_fork,显示hardfork以及active的情况;
partition #pink "get_consensus"{
if (self.dev.dev?) then (yes)
:AutoSealConsensus::new;
else (no)
:BeaconConsensus::new;
endif
}
:init_trusted_nodes(&mut config);
note right: 根据self.network.trusted_only设置config以及trusted_peers非空时候添加到config的trusted_nodes中
:metrics_channel;
note right: 使用tokio的unbounded_channel
split
partition "监听指标任务" {
:MetricsListener::new(metrics_rx of metrics_channel的receiver);
:task_executor spawn(metrics_listener task);
}
:finish;
split again
:get prune_config;
partition #yellow "configure blockchain tree" {
:TreeExternals::new(**db_clone**, consensus_clone, **Factory::new(chain_spec_clone)**, chain);
note right: factory crates/revm的factory
:BlockchainTree::new(tree_externals, default_config, prune_config);
:blockchain_tree.with_sync_metrics_tx(metrics_tx);
:ShareableBlockchainTree::new(blockchain_tree);
note right
blockchain_tree中的重点结构
----
1. metrics_channel的sender
2. tokio::sync::broadcast::channel中的canon_state_notification_sender
canon_state_notification_sender 被后续使用
3. last_finalized_block_number和last_canonical_hashes的TreeState
4. prune_mode
5. ShareableBlockchainTree封装了Arc<parking_lot::RwLock<BlockchainTree>>
end note
}
:fetch_head();
note right
从数据库结合ChainSpec的chain_clone中fetch当前链头Head的状态
为什么这里fetch_head要在内部构建Provider,下面**紧接着**又是构建blockchain_provider?又在读取了chain_header之后drop了?
end note
partition "构建blockchain provider" {
:ProviderFactory::new(db_clone, chain_spec_clone);
:BlockchainProvider::new(provider_factory, shareable_blockchain_tree_clone);
note right#red: 获取**blockchain_db**
:get blob_store;
note right: InMemoryBlobStore::default()
partition "get transaction_validation_task_executor得到**validator**" {
: get EthTransactionValidatorBuilder;
: builder.with_head_timestamp();
: builder.kzg_settings();
: builder.with_additional_tasks;
#pink:builder.build_with_tasks(blockchain_db_clone, ctx的task_executor_clone, blob_store_clone);
note right
一些异步的task任务
----
1. ValidationTask,采用mpsc::channel,receiver接收处理,采用的task_clone
2. tasks.spawn_critical_blocking,transaction-validation-service,同样的task handle?
3. 显式将ValidationJobSender赋值给transaction_validation_task_executor对应的字段,channel的sender
end note
}
}
partition #pink "transaction_pool(crates/transaction-pool) tasks" {
:Pool::eth_pool(validator, blob_store, pool_config);
:blockchain_db.canonical_state_stream的chain_events;
note right
消息传递最终还是tokio::sync::broadcast::channel的sender的subscribe和tokio-stream中的BroadcastStream
----
= BlockchainProvider.tree.subscribe_to_canonical_state
= shared_blockchain_tree.tree.subscribe_canon_state
= blockchain_tree.canon_state_notification_sender.subscribe
end note
:ctx.task_executor.spawn_critical(transaction_pool::maintain::maintain_transaction_pool_future(blockchain_db_clone, transaction_pool_clone, chain_events, ctx.task_executor_clone, default_config));
}
partition "P2P网络连接处理 **得到network**" {
:p2p_secret_path;
:get secret key;
note right: 添加enode的加密key信息,标识一个网络节点的key
:获取network_config通过在指定目录;
#pink:start_network;
note right
P2P网络连接的处理,包含一些异步的任务,crates/net/network
----
1. NetworkManager.new.await? = bind+discover的await
2. 任务执行spawn_critical "p2p txpool"
3. 任务执行spawn_critical "p2p eth request handler"
4. 任务执行spawn_critical_with_signal "p2p network task"
end note
: fetch_client;
note right
发送FetchClient消息
----
1. 借助tokio的oneshot::channel send FetchClient消息
2. channel的rx .await
end note
}
partition #pink "RethNodeComponentsImpl初始化" {
:blockchain_provider = blockchain_db_clone;
:transaction_pool = transaction_pool_clone;
:network = network_clone;
:task_executor = ctx.task_executor_clone;
:events = blockchain_db_clone;
note right#red
借助NodeCommand.ext.on_components_initialized
----
1. task_executor 当前有几个executor?
2. events和blockchain_provider clone的同一个blockchain_db?
end note
}
partition #yellow "与共识层协作,构建payload" {
:unbounded_channel获取(consensus_engine_tx, consensus_engine_rx);
:根据配置获取最大block的个数 max_block;
:build_networked_pipeline;
note right
构建与网络连接的pipeline(管道)与EitherDownloader::Right(network_client)
----
1. ReverseHeadersDownloaderBuilder
2. BodiesDownloaderBuilder
- 1和2通过TaskDownloader::spawn_with构建分别得到header_downloader和body_downloader
3. dev下是EitherDownloader::Left(client)
end note
:build_pipeline;
note right
构建pipeline
----
1. tokio::sync::watch::channel(B256::ZERO)的使用(tip_tx, tip_rx)
2. revm的factory配置采取InspectorStackConfig
3. PruneModes和HeaderSyncMode
4. crates/stages 中的各个stage需要搞清楚
end note
:listen pipeline_events;
note right
----
tokio::sync::mpsc::unbounded_channel()
listeners.push(sender)
返回UnboundedReceiverStream<PipelineEvent>
end note
}
:根据配置设置初始化的目标initial_target;
:watch::channel(None);
note right:highest_snapshots_tx和highest_snapshots_rx
:EngineHooks::new();
: 根据prune_conf设置prune相关的events;
: Snapshotter::new(clone_db, chain_clone, chain.snapshot_block_interval,highest_snapshots_tx,);
note right:crates/snapshot中
partition "BeaconConsensusEngine init" {
#pink: BeaconConsensusEngine::with_channel();
note right
获取beacon_consensus_engine和beacon_engine_handle
----
1. client, pipeline得到的ether download
2. pipeline, pipeline得到的ether download
3. blockchain_db_clone,
4. Box::new(ctx.task_executor.clone()),
5. Box::new(network.clone()),
6. max_block,
7. self.debug.continuous,
8. payload_builder.clone(), PayloadBuilderHandle
9. initial_target,
10. MIN_BLOCKS_FOR_PIPELINE_RUN,
11. consensus_engine_tx,
12. consensus_engine_rx,
13. hooks,
end note
}
partition #yellow "events listen" {
:stream_select!;
note right
stream_select宏转换为events
----
1. network.event_listener
2. beacon_engine_handle.event_listener
3. pipeline_events
4. Either::Right,dev为Either::Left
5. pruner_events
end note
:ctx.task_executor.spawn_critical(events task);
}
partition "EngineApi init"{
:EngineApi::new;
:load jwt secret;
:rpc.start_servers;
note right
启动rpc server的参数如下
----
&components, engine_api, jwt_secret, &mut self.ext
http的rpc在哪里启动的?
end note
:oneshot::channel 使用rx和tx;
:ctx.task_executor.spawn_critical_blocking("consensus engine");
:self.ext.on_node_started(&components)? ;
:rx.await;
}
:根据配置决定退出状态;
:finish;
@enduml