forked from FISCO-BCOS/python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrpc.py
More file actions
111 lines (89 loc) · 3.09 KB
/
rpc.py
File metadata and controls
111 lines (89 loc) · 3.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import logging
import os
import itertools
from eth_utils import (
to_dict,
to_text,
to_bytes,
)
from client.stattool import StatTool
from utils.http import (
construct_user_agent,
)
from utils.request import (
make_post_request,
)
from utils.encoding import (
FriendlyJsonSerde)
from client import clientlogger
def get_default_endpoint():
return os.environ.get('WEB3_HTTP_PROVIDER_URI', 'http://localhost:8545')
class JSONBaseProvider():
def __init__(self):
self.request_counter = itertools.count()
def decode_rpc_response(self, response):
text_response = to_text(response)
return FriendlyJsonSerde().json_decode(text_response)
def encode_rpc_request(self, method, params):
rpc_dict = {
"jsonrpc": "2.0",
"method": method,
"params": params or [],
"id": next(self.request_counter),
}
encoded = FriendlyJsonSerde().json_encode(rpc_dict)
return to_bytes(text=encoded)
def isConnected(self):
try:
response = self.make_request('getClientVersion', [])
# print(response["result"])
except IOError:
return False
assert response['jsonrpc'] == '2.0'
assert 'error' not in response
return True
class HTTPProvider(JSONBaseProvider):
logger = logging.getLogger("client.providers.HTTPProvider")
endpoint_uri = None
_request_args = None
_request_kwargs = None
def __init__(self, endpoint_uri=None, request_kwargs=None):
if endpoint_uri is None:
self.endpoint_uri = get_default_endpoint()
else:
self.endpoint_uri = endpoint_uri
self._request_kwargs = request_kwargs or {}
super().__init__()
def __str__(self):
return "RPC connection {0}".format(self.endpoint_uri)
@to_dict
def get_request_kwargs(self):
if 'headers' not in self._request_kwargs:
yield 'headers', self.get_request_headers()
for key, value in self._request_kwargs.items():
yield key, value
def get_request_headers(self):
return {
'Content-Type': 'application/json',
'User-Agent': construct_user_agent(str(type(self))),
}
def make_request(self, method, params):
request_data = self.encode_rpc_request(method, params)
#print("request", request_data)
stat = StatTool.begin()
self.logger.debug("request: %s, %s,data: %s",
self.endpoint_uri, method, request_data)
raw_response = make_post_request(
self.endpoint_uri,
method,
params,
request_data,
**self.get_request_kwargs()
)
# self.logger.debug("raw response {}, method: {}".format(raw_response, method))
response = self.decode_rpc_response(raw_response)
stat.done()
stat.debug("make_request:{},sendbyts:{}".format(method, len(request_data)))
self.logger.debug("GetResponse. %s, Response: %s",
method, response)
return response