2323# SOFTWARE.
2424
2525
26- import inspect
2726import sys
2827import traceback
2928import gevent .pool
3837from .socket import SocketBase
3938from .heartbeat import HeartBeatOnChannel
4039from .context import Context
41-
40+ from .decorators import DecoratorBase , rep
41+ import patterns
4242
4343class ServerBase (object ):
4444
@@ -171,12 +171,11 @@ def stop(self):
171171
172172class ClientBase (object ):
173173
174- def __init__ (self , channel , patterns , context = None , timeout = 30 , heartbeat = 5 ,
174+ def __init__ (self , channel , context = None , timeout = 30 , heartbeat = 5 ,
175175 passive_heartbeat = False ):
176176 self ._multiplexer = ChannelMultiplexer (channel ,
177177 ignore_broadcast = True )
178178 self ._context = context or Context .get_instance ()
179- self ._patterns = patterns
180179 self ._timeout = timeout
181180 self ._heartbeat_freq = heartbeat
182181 self ._passive_heartbeat = passive_heartbeat
@@ -195,7 +194,7 @@ def _raise_remote_error(self, event):
195194 raise RemoteError ('RemoteError' , msg , None )
196195
197196 def _select_pattern (self , event ):
198- for pattern in self . _patterns :
197+ for pattern in patterns . patterns_list :
199198 if pattern .accept_answer (event ):
200199 return pattern
201200 msg = 'Unable to find a pattern for: {0}' .format (event )
@@ -211,7 +210,7 @@ def _process_response(self, method, bufchan, timeout):
211210
212211 pattern = self ._select_pattern (event )
213212 return pattern .process_answer (self ._context , bufchan , event , method ,
214- timeout , self ._raise_remote_error )
213+ self ._raise_remote_error )
215214 except :
216215 bufchan .close ()
217216 bufchan .channel .close ()
@@ -246,89 +245,6 @@ def __getattr__(self, method):
246245 return lambda * args , ** kargs : self (method , * args , ** kargs )
247246
248247
249- class DecoratorBase (object ):
250- pattern = None
251-
252- def __init__ (self , functor ):
253- self ._functor = functor
254- self .__doc__ = functor .__doc__
255- self .__name__ = functor .__name__
256-
257- def __get__ (self , instance , type_instance = None ):
258- if instance is None :
259- return self
260- return self .__class__ (self ._functor .__get__ (instance , type_instance ))
261-
262- def __call__ (self , * args , ** kargs ):
263- return self ._functor (* args , ** kargs )
264-
265- def _zerorpc_args (self ):
266- try :
267- args_spec = self ._functor ._zerorpc_args ()
268- except AttributeError :
269- try :
270- args_spec = inspect .getargspec (self ._functor )
271- except TypeError :
272- try :
273- args_spec = inspect .getargspec (self ._functor .__call__ )
274- except (AttributeError , TypeError ):
275- args_spec = None
276- return args_spec
277-
278-
279- class PatternReqRep ():
280-
281- def process_call (self , context , bufchan , event , functor ):
282- result = context .middleware_call_procedure (functor , * event .args )
283- bufchan .emit ('OK' , (result ,), context .middleware_get_task_context ())
284-
285- def accept_answer (self , event ):
286- return True
287-
288- def process_answer (self , context , bufchan , event , method , timeout ,
289- raise_remote_error ):
290- result = event .args [0 ]
291- if event .name == 'ERR' :
292- raise_remote_error (event )
293- bufchan .close ()
294- bufchan .channel .close ()
295- bufchan .channel .channel .close ()
296- return result
297-
298-
299- class rep (DecoratorBase ):
300- pattern = PatternReqRep ()
301-
302-
303- class PatternReqStream ():
304-
305- def process_call (self , context , bufchan , event , functor ):
306- xheader = context .middleware_get_task_context ()
307- for result in iter (context .middleware_call_procedure (functor ,
308- * event .args )):
309- bufchan .emit ('STREAM' , result , xheader )
310- bufchan .emit ('STREAM_DONE' , None , xheader )
311-
312- def accept_answer (self , event ):
313- return event .name in ('STREAM' , 'STREAM_DONE' )
314-
315- def process_answer (self , context , bufchan , event , method , timeout ,
316- raise_remote_error ):
317- def iterator (event ):
318- while event .name == 'STREAM' :
319- yield event .args
320- event = bufchan .recv ()
321- if event .name == 'ERR' :
322- raise_remote_error (event )
323- bufchan .close ()
324- bufchan .channel .channel .close ()
325- return iterator (event )
326-
327-
328- class stream (DecoratorBase ):
329- pattern = PatternReqStream ()
330-
331-
332248class Server (SocketBase , ServerBase ):
333249
334250 def __init__ (self , methods = None , name = None , context = None , pool_size = None ,
@@ -346,13 +262,12 @@ def close(self):
346262
347263
348264class Client (SocketBase , ClientBase ):
349- patterns = [PatternReqStream (), PatternReqRep ()]
350265
351266 def __init__ (self , connect_to = None , context = None , timeout = 30 , heartbeat = 5 ,
352267 passive_heartbeat = False ):
353268 SocketBase .__init__ (self , zmq .XREQ , context = context )
354- ClientBase .__init__ (self , self ._events , Client . patterns , context ,
355- timeout , heartbeat , passive_heartbeat )
269+ ClientBase .__init__ (self , self ._events , context , timeout , heartbeat ,
270+ passive_heartbeat )
356271 if connect_to :
357272 self .connect (connect_to )
358273
0 commit comments