Skip to content

Commit 77d57ca

Browse files
add amop impl of bcos3
1 parent 79682be commit 77d57ca

13 files changed

Lines changed: 558 additions & 213 deletions

bcos3sdk/bcos3callbackfuture.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import queue
2+
3+
from bcos3sdk.bcos3datadef import BcosReqContext, BCOS_CALLBACK_FUNC, BCOS_AMOP_SUB_CALLBACK_FUNC, \
4+
BCOS_AMOP_PUBLISH_CALLBACK_FUNC, BcosResponse
5+
6+
G_SEQ = 0
7+
8+
9+
# 模拟一个Future对象,用于接收回调,提供一个wait方法,把异步变成同步
10+
# 采用queue来模拟wait,原因是一个future里有可能多次被回调,回调的消息put到queue里,应用端可以用wait方法依次将消息pop出来
11+
# 注意,在此版本里,建议对bcossdk的一次接口调用,就用一个单独的CallbackFuture,不要混用
12+
# 否则sdk并发时,回调的消息会复用callback入口,导致返回的消息不对应刚才发送的req,或者需要用消息唯一序列号来对应是哪个请求包
13+
# 所以,一次调用一个future,基本能保证返回的消息,对应的是发送的消息,代码读起来也比较简单
14+
# todo: 可以扩展一些特性,比如,收到sdk回调后,立刻再递归回调应用层设置的callback,
15+
# 在event监听场景比较有意义,目前先统一用wait,参见tests/testbcos3event.py
16+
17+
class BcosCallbackFuture:
18+
response_queue = queue.Queue(100)
19+
context: BcosReqContext = None
20+
21+
22+
def __init__(self, context_name=None, context_msg=None):
23+
24+
if context_name is not None or context_msg is not None:
25+
self.context = BcosReqContext(self.next_seq(), context_name, context_msg)
26+
27+
self.callback = BCOS_CALLBACK_FUNC(self.bcos_callback)
28+
self.amop_callback = BCOS_AMOP_SUB_CALLBACK_FUNC(self.bcos_amop_callback)
29+
self.amop_publish_callback = BCOS_AMOP_PUBLISH_CALLBACK_FUNC(self.bcos_amop_publish_callback)
30+
31+
def next_seq(self, inc=1):
32+
global G_SEQ
33+
G_SEQ = G_SEQ + inc
34+
return G_SEQ
35+
36+
def is_empty(self):
37+
return self.response_queue.empty()
38+
39+
def bcos_callback(self, c_resp):
40+
if c_resp is None:
41+
return
42+
# print("bcos_callback-->",resp)
43+
resp = BcosResponse(c_resp)
44+
# print(f"context_callback {self.context_callback.detail()}")
45+
self.response_queue.put_nowait(resp)
46+
# print(f"--->QSIZE::{self.queue.qsize()}------<<<<",)
47+
48+
def bcos_amop_callback(self, endpoint, seq, c_resp):
49+
# print("[wrap]bcos_sdk_c_amop_subscribe_cb callback")
50+
51+
if c_resp is None:
52+
return
53+
54+
resp = BcosResponse(c_resp)
55+
resp.endpoint = str(endpoint, "utf-8")
56+
resp.seq = str(seq, "utf-8")
57+
self.response_queue.put_nowait(resp)
58+
59+
def bcos_amop_publish_callback(self, c_resp):
60+
61+
if c_resp is None:
62+
return
63+
# print("amop_publish_callback")
64+
resp = BcosResponse(c_resp)
65+
# print(self.error, self.desc)
66+
self.response_queue.put_nowait(resp)
67+
68+
def wait(self, timeout=5):
69+
is_timeout = False
70+
resp = None
71+
try:
72+
#self.is_timeout = False
73+
resp = self.response_queue.get(True, timeout)
74+
except:
75+
is_timeout = True
76+
pass
77+
return (is_timeout,resp)
78+
79+
def display(self):
80+
print(self.detail())
81+
82+
def detail(self):
83+
s = f"queuesize:{self.response_queue.qsize()},reqcontext:{self.context.detail()}"
84+
return s
85+

bcos3sdk/bcos3client.py

Lines changed: 97 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@
1919
import json
2020
import sys
2121
import time
22-
from ctypes import byref, c_char_p
22+
from ctypes import byref, c_char_p, POINTER, c_void_p, c_int
2323

24+
from bcos3sdk.bcos3callbackfuture import BcosCallbackFuture
25+
from bcos3sdk.bcos3datadef import *
2426
from bcos3sdk.bcos3sdkconfig import Bcos3SDKConfig
27+
from client.datatype_parser import DatatypeParser
2528
from client_config import client_config
26-
from bcos3sdk.bcos3sdk_wrap import NativeBcos3sdk, BcosCallbackFuture, \
27-
s2b, b2s
29+
from bcos3sdk.bcos3sdk_wrap import NativeBcos3sdk, \
30+
BCOS_AMOP_SUB_CALLBACK_FUNC, BCOS_AMOP_PUBLISH_CALLBACK_FUNC
2831
from client import clientlogger
2932
from client.bcoserror import BcosException
3033
from client.common import common
@@ -126,6 +129,20 @@ def init_clib_sdk(self):
126129
self.chainid = self.bcossdk.bcos_sdk_get_group_chain_id(self.bcossdk.sdk, s2b(self.group))
127130
return 0
128131

132+
def get_last_errormsg(self):
133+
res = self.bcossdk.bcos_sdk_get_last_error_msg()
134+
return b2s(res)
135+
136+
137+
def get_last_error(self):
138+
res= self.bcossdk.bcos_sdk_get_last_error()
139+
return res
140+
141+
def get_last_error_full(self):
142+
ret = self.get_last_error()
143+
msg = self.get_last_errormsg()
144+
return (ret,msg)
145+
129146
# load the account from keyfile
130147
def load_default_account(self):
131148
if self.default_from_account_signer is not None:
@@ -174,20 +191,26 @@ def getinfo(self):
174191
return info
175192

176193
def wait_result(self, future: BcosCallbackFuture):
177-
if future.wait().is_timeout:
194+
(is_timeout, response) = future.wait()
195+
if is_timeout:
178196
raise BcosException(f"bcos sdk timeout {future.context.msg}")
179197
# print(f"response context(callback): {future.context_callback.detail()}")
180-
return self.get_result(future.data)
198+
return self.get_result(response.data)
181199

182200
def get_result(self, response_data):
183201
# print(f"data is {response_data}")
184202
if response_data is None or len(response_data) == 0:
185-
raise BcosException(f"Response error: {response_data}")
186-
response = json.loads(response_data)
203+
raise BcosException(f"Response error: {[response_data]}")
204+
try:
205+
#处理可能不是json的情况
206+
response = json.loads(response_data)
207+
except Exception as e:
208+
return response_data
187209
if "error" in response:
188210
raise BcosException(response_data)
189211
if "result" not in response:
190212
raise BcosException(response_data)
213+
191214
# print(f"response :{response}")
192215
result = response["result"]
193216
if type(result) is str:
@@ -483,3 +506,70 @@ def deployFromFile(
483506

484507
result = self.deploy(contractbin, contract_abi, fn_args)
485508
return result
509+
510+
511+
def event_subscribe(self,address,event_name="",contract_abi="",topics=[],fromBlock=-1,toBlock=-1):
512+
event_param = dict()
513+
event_param["fromBlock"] = fromBlock # -1 表示最新
514+
event_param["toBlock"] = toBlock # -1表示最新
515+
event_param["address"] = [address] # sample helloWorld address
516+
if topics is not None and len(topics)>0:
517+
event_param["topics"] = topics
518+
else:
519+
#根据event_name获取topics,一般采用这种方法
520+
parser = DatatypeParser()
521+
parser.set_abi(contract_abi)
522+
eventtopic = parser.topic_from_event_name(event_name)
523+
event_param["topics"]=[eventtopic]
524+
525+
event_param_json = json.dumps(event_param)
526+
cbfuture = BcosCallbackFuture(sys._getframe().f_code.co_name, "")
527+
subid = self.bcossdk.bcos_event_sub_subscribe_event(self.bcossdk.sdk, s2b(self.group), s2b(event_param_json), cbfuture.callback,
528+
byref(cbfuture.context))
529+
return (subid,cbfuture)
530+
531+
def event_unsubscribe(self,subid):
532+
self.bcossdk.bcos_event_sub_unsubscribe_event(subid)
533+
534+
535+
def amop_subscribe(self,topiclist):
536+
ctopiclist = strarr2ctypes(topiclist)
537+
self.bcossdk.bcos_amop_subscribe_topic(self.bcossdk.sdk, ctopiclist, len(ctopiclist))
538+
539+
def amop_set_subscribe_topic_cb(self,cbfunc,context_):
540+
context = 0
541+
if context_ is not None and context_ != 0:
542+
context =byref(context_)
543+
self.bcossdk.bcos_amop_set_subscribe_topic_cb(self.bcossdk.sdk,(BCOS_AMOP_SUB_CALLBACK_FUNC)(cbfunc),
544+
context)
545+
546+
547+
callback = None
548+
def amop_subscribe_with_cb(self,topic):
549+
550+
cbfuture = BcosCallbackFuture(sys._getframe().f_code.co_name, "")
551+
552+
#self.callback = BCOS_AMOP_SUB_CALLBACK_FUNC(cbfuture.amop_callback)
553+
self.bcossdk.bcos_amop_subscribe_topic_with_cb(self.bcossdk.sdk,s2b(topic),
554+
cbfuture.amop_callback,
555+
byref(cbfuture.context))
556+
return cbfuture
557+
558+
def amop_publish(self,topic,data,future=None,timeout_=10000):
559+
if future ==None:
560+
future = BcosCallbackFuture(sys._getframe().f_code.co_name, "")
561+
cbfunc =future.amop_publish_callback
562+
#print("publish callback: ",cbfunc)
563+
self.bcossdk.bcos_amop_publish(self.bcossdk.sdk, s2b(topic),
564+
(s2b(data)), len(data),
565+
(c_int)(timeout_),
566+
cbfunc,
567+
byref(future.context)
568+
)
569+
return future
570+
571+
def amop_broadcast(self,topic,data=None):
572+
self.bcossdk.bcos_amop_broadcast(self.bcossdk.sdk,s2b(topic),s2b(data),len(data))
573+
574+
def amop_send_response(self,peer,seq,data):
575+
self.bcossdk.bcos_amop_send_response(self.bcossdk.sdk,s2b(peer),s2b(seq),s2b(data),len(data))

bcos3sdk/bcos3datadef.py

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
import ctypes
2+
import struct
3+
from ctypes import Structure, c_int, c_char_p, create_string_buffer, memmove, c_void_p, c_size_t
4+
5+
class BcosReqContext(Structure):
6+
_fields_ = [('seq', c_int),
7+
('name', c_char_p),
8+
('msg', c_char_p),
9+
]
10+
11+
def __init__(self, s, n, m):
12+
self.seq = s
13+
self.name = n.encode("utf-8")
14+
self.msg = m.encode("utf-8")
15+
16+
def detail(self):
17+
s = f"seq:{self.seq},name:{self.name},msg:{self.msg}";
18+
return s
19+
20+
21+
'''
22+
//c语言定义的返回结构体
23+
struct bcos_sdk_c_struct_response
24+
{
25+
int error; // 返回状态, 0成功, 其他失败
26+
char* desc; // 失败时描述错误信息
27+
void* data; // 返回数据, error=0 时有效
28+
size_t size; // 返回数据大小, error=0 时有效
29+
void* context; // 回调上下文,调用接口时传入的`context`参数
30+
};
31+
'''
32+
33+
34+
class BcosResponse:
35+
error = 0
36+
desc = ""
37+
data= ""
38+
size = 0
39+
context: BcosReqContext = None
40+
41+
def __init__(self, c_resp):
42+
self.extract_response(c_resp)
43+
44+
def extract_response(self, c_resp):
45+
if c_resp == None:
46+
return
47+
self.size = c_resp.contents.get_size()
48+
pool = create_string_buffer(c_resp.contents.size)
49+
memmove(pool, c_resp.contents.data, c_resp.contents.size)
50+
self.data = b2s(pool.value)
51+
self.error = c_resp.contents.get_error()
52+
if self.error != 0:
53+
self.desc = b2s(c_resp.contents.desc)
54+
else:
55+
self.desc = ""
56+
self.context_callback = c_resp.contents.get_context()
57+
return self
58+
59+
def detail(self):
60+
str = f"error:{self.error},size:{self.size},data:{self.data},desc:{self.desc}."
61+
c = self.context_callback
62+
if c is not None:
63+
str = str + (" | context:({}),{},[{}]".format(c.seq, b2s(c.name), b2s(c.msg)))
64+
return str
65+
66+
67+
# bcos sdk返回结构体,ctype定义
68+
class BcosResponseCType(Structure):
69+
_fields_ = [('error', c_int),
70+
('desc', c_char_p),
71+
('data', c_void_p),
72+
('size', c_size_t),
73+
('context', c_void_p),
74+
]
75+
76+
def get_data_str(self):
77+
pool = create_string_buffer(self.size)
78+
memmove(pool, self.data, self.size)
79+
return b2s(pool)
80+
81+
def get_desc(self):
82+
if self.desc is None:
83+
return ""
84+
return b2s(self.desc)
85+
86+
def get_size(self):
87+
return self.size
88+
89+
def get_error(self):
90+
return self.error
91+
92+
def get_context(self):
93+
c = ctypes.cast(self.context, ctypes.POINTER(BcosReqContext))
94+
return c.contents
95+
96+
97+
# cyber2023.3 by kent
98+
def strarr2ctypes(data):
99+
C_TOPIC_ARRAY = c_char_p * len(data)
100+
ctypetopic = C_TOPIC_ARRAY()
101+
for i in range(0, len(data)):
102+
ctypetopic[i] = (c_char_p)(s2b(data[i]))
103+
return ctypetopic
104+
105+
106+
# cyber2023.3 by kent
107+
def s2b(data):
108+
"""
109+
将Python数据类型转换为bytes类型
110+
:param data: 需要转换的数据
111+
:return: 转换后的bytes类型数据
112+
"""
113+
"""
114+
将Python数据类型转换为bytes类型
115+
:param data: 需要转换的数据
116+
:return: 转换后的bytes类型数据
117+
"""
118+
if isinstance(data, str):
119+
return data.encode('utf-8')
120+
elif isinstance(data, int):
121+
return data.to_bytes(4, byteorder='big')
122+
elif isinstance(data, float):
123+
return struct.pack('f', data)
124+
elif isinstance(data, bool):
125+
return int(data).to_bytes(1, byteorder='big')
126+
elif data is None:
127+
return b''
128+
elif isinstance(data, bytes):
129+
return data
130+
else:
131+
raise TypeError(f"不支持的数据类型转换为bytes类型,方法名:{s2b.__name__},输入参数类型:{type(data)}")
132+
133+
134+
def b2s(input):
135+
if type(input) is bytes:
136+
try:
137+
return str(input, "UTF-8")
138+
except Exception as e:
139+
return input
140+
return input
141+
142+
143+
144+
# bcos sdk回调函数定义
145+
# typedef void (*bcos_sdk_c_struct_response_cb)(struct bcos_sdk_c_struct_response* resp)
146+
BCOS_CALLBACK_FUNC = ctypes.CFUNCTYPE(None, ctypes.POINTER(BcosResponseCType))
147+
148+
# typedef void (*bcos_sdk_c_amop_subscribe_cb)(
149+
# const char* endpoint, const char* seq, struct bcos_sdk_c_struct_response* resp);
150+
BCOS_AMOP_SUB_CALLBACK_FUNC = ctypes.CFUNCTYPE(None, c_char_p, c_char_p, ctypes.POINTER(BcosResponseCType))
151+
BCOS_AMOP_PUBLISH_CALLBACK_FUNC = ctypes.CFUNCTYPE(None, ctypes.POINTER(BcosResponseCType))
152+
153+

0 commit comments

Comments
 (0)