3939from .heartbeat import HeartBeatOnChannel
4040
4141
42- class DecoratorBase (object ):
43- pattern = None
44-
45- def __init__ (self , functor ):
46- self ._functor = functor
47- self .__doc__ = functor .__doc__
48- self .__name__ = functor .__name__
49-
50- def __get__ (self , instance , type_instance = None ):
51- if instance is None :
52- return self
53- return self .__class__ (self ._functor .__get__ (instance , type_instance ))
54-
55- def __call__ (self , * args , ** kargs ):
56- return self ._functor (* args , ** kargs )
57-
58- def _zerorpc_args (self ):
59- try :
60- args_spec = self ._functor ._zerorpc_args ()
61- except AttributeError :
62- try :
63- args_spec = inspect .getargspec (self ._functor )
64- except TypeError :
65- try :
66- args_spec = inspect .getargspec (self ._functor .__call__ )
67- except (AttributeError , TypeError ):
68- args_spec = None
69- return args_spec
70-
71-
72- class PatternReqRep ():
73-
74- def process_call (self , context , bufchan , event , functor ):
75- result = context .middleware_call_procedure (functor , * event .args )
76- bufchan .emit ('OK' , (result ,))
42+ class ServerBase (object ):
7743
78- def accept_answer (self , event ):
79- return True
80-
81- def process_answer (self , context , bufchan , event , method , timeout ,
82- raise_remote_error ):
83- result = event .args [0 ]
84- if event .name == 'ERR' :
85- raise_remote_error (event )
86- bufchan .close ()
87- bufchan .channel .close ()
88- bufchan .channel .channel .close ()
89- return result
90-
91-
92- class rep (DecoratorBase ):
93- pattern = PatternReqRep ()
94-
95-
96- class PatternReqStream ():
97-
98- def process_call (self , context , bufchan , event , functor ):
99- for result in iter (context .middleware_call_procedure (functor ,
100- * event .args )):
101- bufchan .emit ('STREAM' , result )
102- bufchan .emit ('STREAM_DONE' , None )
103-
104- def accept_answer (self , event ):
105- return event .name in ('STREAM' , 'STREAM_DONE' )
106-
107- def process_answer (self , context , bufchan , event , method , timeout ,
108- raise_remote_error ):
109- def iterator (event ):
110- while event .name == 'STREAM' :
111- yield event .args
112- event = bufchan .recv ()
113- if event .name == 'ERR' :
114- raise_remote_error (event )
115- bufchan .close ()
116- bufchan .channel .channel .close ()
117- return iterator (event )
118-
119-
120- class stream (DecoratorBase ):
121- pattern = PatternReqStream ()
122-
123-
124- class Server (SocketBase ):
125- def __init__ (self , methods = None , name = None , context = None , pool_size = None ,
44+ def __init__ (self , channel , methods = None , name = None , pool_size = None ,
12645 heartbeat = 5 ):
127- super (Server , self ).__init__ (zmq .XREP , context )
128- self ._multiplexer = ChannelMultiplexer (self ._events )
46+ self ._multiplexer = ChannelMultiplexer (channel )
12947
13048 if methods is None :
13149 methods = self
@@ -155,12 +73,10 @@ def __init__(self, methods=None, name=None, context=None, pool_size=None,
15573
15674 def __del__ (self ):
15775 self .close ()
158- super (Server , self ).__del__ ()
15976
16077 def close (self ):
16178 self .stop ()
16279 self ._multiplexer .close ()
163- super (Server , self ).close ()
16480
16581 def _zerorpc_inspect (self , method = None , long_doc = True ):
16682 if method :
@@ -241,14 +157,13 @@ def stop(self):
241157 self ._acceptor_task .kill (block = False )
242158
243159
244- class Client (SocketBase ):
245- patterns = [PatternReqStream (), PatternReqRep ()]
160+ class ClientBase (object ):
246161
247- def __init__ (self , context = None , timeout = 30 , heartbeat = 5 ,
162+ def __init__ (self , channel , patterns , timeout = 30 , heartbeat = 5 ,
248163 passive_heartbeat = False ):
249- super (Client , self ).__init__ (zmq .XREQ , context = context )
250- self ._multiplexer = ChannelMultiplexer (self ._events ,
164+ self ._multiplexer = ChannelMultiplexer (channel ,
251165 ignore_broadcast = True )
166+ self ._patterns = patterns
252167 self ._timeout = timeout
253168 self ._heartbeat_freq = heartbeat
254169 self ._passive_heartbeat = passive_heartbeat
@@ -272,7 +187,7 @@ def _raise_remote_error(self, event):
272187 raise RemoteError ('RemoteError' , msg , None )
273188
274189 def _select_pattern (self , event ):
275- for pattern in Client . patterns :
190+ for pattern in self . _patterns :
276191 if pattern .accept_answer (event ):
277192 return pattern
278193 msg = 'Unable to find a pattern for: {0}' .format (event )
@@ -322,6 +237,114 @@ def __getattr__(self, method):
322237 return lambda * args , ** kargs : self (method , * args , ** kargs )
323238
324239
240+ class DecoratorBase (object ):
241+ pattern = None
242+
243+ def __init__ (self , functor ):
244+ self ._functor = functor
245+ self .__doc__ = functor .__doc__
246+ self .__name__ = functor .__name__
247+
248+ def __get__ (self , instance , type_instance = None ):
249+ if instance is None :
250+ return self
251+ return self .__class__ (self ._functor .__get__ (instance , type_instance ))
252+
253+ def __call__ (self , * args , ** kargs ):
254+ return self ._functor (* args , ** kargs )
255+
256+ def _zerorpc_args (self ):
257+ try :
258+ args_spec = self ._functor ._zerorpc_args ()
259+ except AttributeError :
260+ try :
261+ args_spec = inspect .getargspec (self ._functor )
262+ except TypeError :
263+ try :
264+ args_spec = inspect .getargspec (self ._functor .__call__ )
265+ except (AttributeError , TypeError ):
266+ args_spec = None
267+ return args_spec
268+
269+
270+ class PatternReqRep ():
271+
272+ def process_call (self , context , bufchan , event , functor ):
273+ result = context .middleware_call_procedure (functor , * event .args )
274+ bufchan .emit ('OK' , (result ,))
275+
276+ def accept_answer (self , event ):
277+ return True
278+
279+ def process_answer (self , context , bufchan , event , method , timeout ,
280+ raise_remote_error ):
281+ result = event .args [0 ]
282+ if event .name == 'ERR' :
283+ raise_remote_error (event )
284+ bufchan .close ()
285+ bufchan .channel .close ()
286+ bufchan .channel .channel .close ()
287+ return result
288+
289+
290+ class rep (DecoratorBase ):
291+ pattern = PatternReqRep ()
292+
293+
294+ class PatternReqStream ():
295+
296+ def process_call (self , context , bufchan , event , functor ):
297+ for result in iter (context .middleware_call_procedure (functor ,
298+ * event .args )):
299+ bufchan .emit ('STREAM' , result )
300+ bufchan .emit ('STREAM_DONE' , None )
301+
302+ def accept_answer (self , event ):
303+ return event .name in ('STREAM' , 'STREAM_DONE' )
304+
305+ def process_answer (self , context , bufchan , event , method , timeout ,
306+ raise_remote_error ):
307+ def iterator (event ):
308+ while event .name == 'STREAM' :
309+ yield event .args
310+ event = bufchan .recv ()
311+ if event .name == 'ERR' :
312+ raise_remote_error (event )
313+ bufchan .close ()
314+ bufchan .channel .channel .close ()
315+ return iterator (event )
316+
317+
318+ class stream (DecoratorBase ):
319+ pattern = PatternReqStream ()
320+
321+
322+ class Server (SocketBase , ServerBase ):
323+
324+ def __init__ (self , methods = None , name = None , context = None , pool_size = None ,
325+ heartbeat = 5 ):
326+ SocketBase .__init__ (self , zmq .XREP , context )
327+ ServerBase .__init__ (self , self ._events , methods , name , pool_size , heartbeat )
328+
329+ def close (self ):
330+ ServerBase .close (self )
331+ SocketBase .close (self )
332+
333+
334+ class Client (SocketBase , ClientBase ):
335+ patterns = [PatternReqStream (), PatternReqRep ()]
336+
337+ def __init__ (self , context = None , timeout = 30 , heartbeat = 5 ,
338+ passive_heartbeat = False ):
339+ SocketBase .__init__ (self , zmq .XREQ , context = context )
340+ ClientBase .__init__ (self , self ._events , Client .patterns , timeout ,
341+ heartbeat , passive_heartbeat )
342+
343+ def close (self ):
344+ ClientBase .close (self )
345+ SocketBase .close (self )
346+
347+
325348class Pusher (SocketBase ):
326349
327350 def __init__ (self , context = None , zmq_socket = zmq .PUSH ):
0 commit comments