forked from FISCO-BCOS/python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrpc_abi.py
More file actions
92 lines (83 loc) · 2.79 KB
/
rpc_abi.py
File metadata and controls
92 lines (83 loc) · 2.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from eth_utils import (
to_dict,
)
from eth_utils.toolz import (
curry,
)
from utils.abi import (
map_abi_data,
)
from utils.formatters import (
apply_formatter_at_index,
)
TRANSACTION_PARAMS_ABIS = {
'data': 'bytes',
'from': 'address',
'gas': 'uint',
'gasPrice': 'uint',
'nonce': 'uint',
'to': 'address',
'value': 'uint',
}
FILTER_PARAMS_ABIS = {
'to': 'address',
'address': 'address[]',
}
TRACE_PARAMS_ABIS = {
'to': 'address',
'from': 'address',
}
RPC_ABIS = {
# eth
'eth_call': TRANSACTION_PARAMS_ABIS,
'eth_estimateGas': TRANSACTION_PARAMS_ABIS,
'eth_getBalance': ['address', None],
'eth_getBlockByHash': ['bytes32', 'bool'],
'eth_getBlockTransactionCountByHash': ['bytes32'],
'eth_getCode': ['address', None],
'eth_getLogs': FILTER_PARAMS_ABIS,
'eth_getStorageAt': ['address', 'uint', None],
'eth_getProof': ['address', 'uint[]', None],
'eth_getTransactionByBlockHashAndIndex': ['bytes32', 'uint'],
'eth_getTransactionByHash': ['bytes32'],
'eth_getTransactionCount': ['address', None],
'eth_getTransactionReceipt': ['bytes32'],
'eth_getUncleCountByBlockHash': ['bytes32'],
'eth_newFilter': FILTER_PARAMS_ABIS,
'eth_sendRawTransaction': ['bytes'],
'eth_sendTransaction': TRANSACTION_PARAMS_ABIS,
'eth_signTransaction': TRANSACTION_PARAMS_ABIS,
'eth_sign': ['address', 'bytes'],
'eth_signTypedData': ['address', None],
'eth_submitHashrate': ['uint', 'bytes32'],
'eth_submitWork': ['bytes8', 'bytes32', 'bytes32'],
# personal
'personal_sendTransaction': TRANSACTION_PARAMS_ABIS,
'personal_lockAccount': ['address'],
'personal_unlockAccount': ['address', None, None],
'personal_sign': [None, 'address', None],
'personal_signTypedData': [None, 'address', None],
'trace_call': TRACE_PARAMS_ABIS,
# parity
'parity_listStorageKeys': ['address', None, None, None],
}
@curry
def apply_abi_formatters_to_dict(normalizers, abi_dict, data):
fields = list(set(abi_dict.keys()) & set(data.keys()))
formatted_values = map_abi_data(
normalizers,
[abi_dict[field] for field in fields],
[data[field] for field in fields],
)
formatted_dict = dict(zip(fields, formatted_values))
return dict(data, **formatted_dict)
@to_dict
def abi_request_formatters(normalizers, abis):
for method, abi_types in abis.items():
if isinstance(abi_types, list):
yield method, map_abi_data(normalizers, abi_types)
elif isinstance(abi_types, dict):
single_dict_formatter = apply_abi_formatters_to_dict(normalizers, abi_types)
yield method, apply_formatter_at_index(single_dict_formatter, 0)
else:
raise TypeError("ABI definitions must be a list or dictionary, got %r" % abi_types)