@@ -102,6 +102,10 @@ def channel(self, from_event=None):
102102 def active_channels (self ):
103103 return self ._active_channels
104104
105+ @property
106+ def context (self ):
107+ return self ._events .context
108+
105109
106110class Channel (object ):
107111
@@ -128,28 +132,33 @@ def close(self):
128132 del self ._multiplexer ._active_channels [self ._channel_id ]
129133 self ._channel_id = None
130134
131- def emit (self , name , args , xheader = {}):
135+ def create_event (self , name , args , xheader = {}):
132136 event = self ._multiplexer .create_event (name , args , xheader )
133-
134137 if self ._channel_id is None :
135138 self ._channel_id = event .header ['message_id' ]
136139 self ._multiplexer ._active_channels [self ._channel_id ] = self
137140 else :
138141 event .header ['response_to' ] = self ._channel_id
142+ return event
143+
144+ def emit (self , name , args , xheader = {}):
145+ event = self .create_event (name , args , xheader )
146+ self ._multiplexer .emit_event (event , self ._zmqid )
139147
140- # TODO debug middleware
141- # print time.time(), 'channel emit', event
148+ def emit_event (self , event ):
142149 self ._multiplexer .emit_event (event , self ._zmqid )
143150
144151 def recv (self , timeout = None ):
145152 try :
146153 event = self ._queue .get (timeout = timeout )
147154 except gevent .queue .Empty :
148155 raise TimeoutExpired (timeout )
149- # TODO debug middleware
150- # print time.time(), 'channel recv', event
151156 return event
152157
158+ @property
159+ def context (self ):
160+ return self ._multiplexer .context
161+
153162
154163class BufferedChannel (object ):
155164
@@ -193,20 +202,27 @@ def _recver(self):
193202 else :
194203 self ._input_queue .put (event )
195204
196- def emit (self , name , args , xheader = {}, block = True , timeout = None ):
205+ def create_event (self , name , args , xheader = {}):
206+ return self ._channel .create_event (name , args , xheader )
207+
208+ def emit_event (self , event , block = True , timeout = None ):
197209 if self ._remote_queue_open_slots == 0 :
198210 if not block :
199211 return False
200212 self ._remote_can_recv .clear ()
201213 self ._remote_can_recv .wait (timeout = timeout )
202214 self ._remote_queue_open_slots -= 1
203215 try :
204- self ._channel .emit ( name , args , xheader )
216+ self ._channel .emit_event ( event )
205217 except :
206218 self ._remote_queue_open_slots += 1
207219 raise
208220 return True
209221
222+ def emit (self , name , args , xheader = {}, block = True , timeout = None ):
223+ event = self .create_event (name , args , xheader )
224+ return self .emit_event (event , block , timeout )
225+
210226 def _request_data (self ):
211227 open_slots = self ._input_queue_size - self ._input_queue_reserved
212228 self ._input_queue_reserved += open_slots
@@ -230,3 +246,7 @@ def recv(self, timeout=None):
230246 @property
231247 def channel (self ):
232248 return self ._channel
249+
250+ @property
251+ def context (self ):
252+ return self ._channel .context
0 commit comments