6060
6161
6262class Dispatcher :
63- NEW_MESSAGE_UPDATES = (
64- UpdateNewMessage ,
65- UpdateNewChannelMessage ,
66- UpdateNewScheduledMessage ,
67- )
63+ NEW_MESSAGE_UPDATES = (UpdateNewMessage , UpdateNewChannelMessage , UpdateNewScheduledMessage )
6864 EDIT_MESSAGE_UPDATES = (UpdateEditMessage , UpdateEditChannelMessage )
6965 DELETE_MESSAGES_UPDATES = (UpdateDeleteMessages , UpdateDeleteChannelMessages )
7066 CALLBACK_QUERY_UPDATES = (UpdateBotCallbackQuery , UpdateInlineBotCallbackQuery )
@@ -78,208 +74,157 @@ class Dispatcher:
7874 def __init__ (self , client : "hydrogram.Client" ):
7975 self .client = client
8076 self .loop = asyncio .get_event_loop ()
81-
8277 self .handler_worker_tasks = []
8378 self .locks_list = []
84-
8579 self .updates_queue = asyncio .Queue ()
8680 self .groups = OrderedDict ()
81+ self ._init_update_parsers ()
8782
88- async def message_parser (update , users , chats ):
89- return (
90- await hydrogram .types .Message ._parse (
91- client = self .client ,
92- message = update .message ,
93- users = users ,
94- chats = chats ,
95- is_scheduled = isinstance (update , UpdateNewScheduledMessage ),
96- ),
97- MessageHandler ,
98- )
99-
100- async def edited_message_parser (update , users , chats ):
101- # Edited messages are parsed the same way as new messages, but the handler is different
102- parsed , _ = await message_parser (update , users , chats )
103-
104- return (parsed , EditedMessageHandler )
105-
106- def deleted_messages_parser (update , users , chats ):
107- return (
108- utils .parse_deleted_messages (self .client , update ),
109- DeletedMessagesHandler ,
110- )
111-
112- async def callback_query_parser (update , users , chats ):
113- return (
114- await hydrogram .types .CallbackQuery ._parse (self .client , update , users ),
115- CallbackQueryHandler ,
116- )
117-
118- def user_status_parser (update , users , chats ):
119- return (
120- hydrogram .types .User ._parse_user_status (self .client , update ),
121- UserStatusHandler ,
122- )
123-
124- def inline_query_parser (update , users , chats ):
125- return (
126- hydrogram .types .InlineQuery ._parse (self .client , update , users ),
127- InlineQueryHandler ,
128- )
129-
130- def poll_parser (update , users , chats ):
131- return (
132- hydrogram .types .Poll ._parse_update (self .client , update ),
133- PollHandler ,
134- )
135-
136- def chosen_inline_result_parser (update , users , chats ):
137- return (
138- hydrogram .types .ChosenInlineResult ._parse (self .client , update , users ),
139- ChosenInlineResultHandler ,
140- )
141-
142- def chat_member_updated_parser (update , users , chats ):
143- return (
144- hydrogram .types .ChatMemberUpdated ._parse (self .client , update , users , chats ),
145- ChatMemberUpdatedHandler ,
146- )
147-
148- def chat_join_request_parser (update , users , chats ):
149- return (
150- hydrogram .types .ChatJoinRequest ._parse (self .client , update , users , chats ),
151- ChatJoinRequestHandler ,
152- )
153-
83+ def _init_update_parsers (self ):
15484 self .update_parsers = {
155- Dispatcher .NEW_MESSAGE_UPDATES : message_parser ,
156- Dispatcher .EDIT_MESSAGE_UPDATES : edited_message_parser ,
157- Dispatcher .DELETE_MESSAGES_UPDATES : deleted_messages_parser ,
158- Dispatcher .CALLBACK_QUERY_UPDATES : callback_query_parser ,
159- Dispatcher .USER_STATUS_UPDATES : user_status_parser ,
160- Dispatcher .BOT_INLINE_QUERY_UPDATES : inline_query_parser ,
161- Dispatcher .POLL_UPDATES : poll_parser ,
162- Dispatcher .CHOSEN_INLINE_RESULT_UPDATES : chosen_inline_result_parser ,
163- Dispatcher .CHAT_MEMBER_UPDATES : chat_member_updated_parser ,
164- Dispatcher .CHAT_JOIN_REQUEST_UPDATES : chat_join_request_parser ,
85+ Dispatcher .NEW_MESSAGE_UPDATES : self . _message_parser ,
86+ Dispatcher .EDIT_MESSAGE_UPDATES : self . _edited_message_parser ,
87+ Dispatcher .DELETE_MESSAGES_UPDATES : self . _deleted_messages_parser ,
88+ Dispatcher .CALLBACK_QUERY_UPDATES : self . _callback_query_parser ,
89+ Dispatcher .USER_STATUS_UPDATES : self . _user_status_parser ,
90+ Dispatcher .BOT_INLINE_QUERY_UPDATES : self . _inline_query_parser ,
91+ Dispatcher .POLL_UPDATES : self . _poll_parser ,
92+ Dispatcher .CHOSEN_INLINE_RESULT_UPDATES : self . _chosen_inline_result_parser ,
93+ Dispatcher .CHAT_MEMBER_UPDATES : self . _chat_member_updated_parser ,
94+ Dispatcher .CHAT_JOIN_REQUEST_UPDATES : self . _chat_join_request_parser ,
16595 }
166-
16796 self .update_parsers = {
16897 key : value for key_tuple , value in self .update_parsers .items () for key in key_tuple
16998 }
17099
100+ async def _message_parser (self , update , users , chats ):
101+ return (
102+ await hydrogram .types .Message ._parse (
103+ client = self .client ,
104+ message = update .message ,
105+ users = users ,
106+ chats = chats ,
107+ is_scheduled = isinstance (update , UpdateNewScheduledMessage ),
108+ ),
109+ MessageHandler ,
110+ )
111+
112+ async def _edited_message_parser (self , update , users , chats ):
113+ parsed , _ = await self ._message_parser (update , users , chats )
114+ return parsed , EditedMessageHandler
115+
116+ def _deleted_messages_parser (self , update , users , chats ):
117+ return utils .parse_deleted_messages (self .client , update ), DeletedMessagesHandler
118+
119+ async def _callback_query_parser (self , update , users , chats ):
120+ return await hydrogram .types .CallbackQuery ._parse (
121+ self .client , update , users
122+ ), CallbackQueryHandler
123+
124+ def _user_status_parser (self , update , users , chats ):
125+ return hydrogram .types .User ._parse_user_status (self .client , update ), UserStatusHandler
126+
127+ def _inline_query_parser (self , update , users , chats ):
128+ return hydrogram .types .InlineQuery ._parse (self .client , update , users ), InlineQueryHandler
129+
130+ def _poll_parser (self , update , users , chats ):
131+ return hydrogram .types .Poll ._parse_update (self .client , update ), PollHandler
132+
133+ def _chosen_inline_result_parser (self , update , users , chats ):
134+ return hydrogram .types .ChosenInlineResult ._parse (
135+ self .client , update , users
136+ ), ChosenInlineResultHandler
137+
138+ def _chat_member_updated_parser (self , update , users , chats ):
139+ return hydrogram .types .ChatMemberUpdated ._parse (
140+ self .client , update , users , chats
141+ ), ChatMemberUpdatedHandler
142+
143+ def _chat_join_request_parser (self , update , users , chats ):
144+ return hydrogram .types .ChatJoinRequest ._parse (
145+ self .client , update , users , chats
146+ ), ChatJoinRequestHandler
147+
171148 async def start (self ):
172149 if not self .client .no_updates :
173- for _ in range (self .client .workers ):
174- self .locks_list .append (asyncio .Lock ())
175-
176- self .handler_worker_tasks .append (
177- self .loop .create_task (self .handler_worker (self .locks_list [- 1 ]))
178- )
179-
150+ self .locks_list = [asyncio .Lock () for _ in range (self .client .workers )]
151+ self .handler_worker_tasks = [
152+ self .loop .create_task (self .handler_worker (lock )) for lock in self .locks_list
153+ ]
180154 log .info ("Started %s HandlerTasks" , self .client .workers )
181155
182156 async def stop (self ):
183157 if not self .client .no_updates :
184158 for _ in range (self .client .workers ):
185159 self .updates_queue .put_nowait (None )
186-
187- for i in self .handler_worker_tasks :
188- await i
189-
160+ await asyncio .gather (* self .handler_worker_tasks )
190161 self .handler_worker_tasks .clear ()
191162 self .groups .clear ()
192-
193163 log .info ("Stopped %s HandlerTasks" , self .client .workers )
194164
195165 def add_handler (self , handler , group : int ):
196166 async def fn ():
197- for lock in self .locks_list :
198- await lock .acquire ()
199-
200- try :
167+ async with asyncio .Lock ():
201168 if group not in self .groups :
202169 self .groups [group ] = []
203170 self .groups = OrderedDict (sorted (self .groups .items ()))
204-
205171 self .groups [group ].append (handler )
206- finally :
207- for lock in self .locks_list :
208- lock .release ()
209172
210173 self .loop .create_task (fn ())
211174
212175 def remove_handler (self , handler , group : int ):
213176 async def fn ():
214- for lock in self .locks_list :
215- await lock .acquire ()
216-
217- try :
177+ async with asyncio .Lock ():
218178 if group not in self .groups :
219179 raise ValueError (f"Group { group } does not exist. Handler was not removed." )
220-
221180 self .groups [group ].remove (handler )
222- finally :
223- for lock in self .locks_list :
224- lock .release ()
225181
226182 self .loop .create_task (fn ())
227183
228184 async def handler_worker (self , lock ):
229185 while True :
230186 packet = await self .updates_queue .get ()
231-
232187 if packet is None :
233188 break
234-
235- try :
236- update , users , chats = packet
237- parser = self .update_parsers .get (type (update ), None )
238-
239- parsed_update , handler_type = (
240- await parser (update , users , chats )
241- if parser is not None
242- else (None , type (None ))
243- )
244-
245- async with lock :
246- for group in self .groups .values ():
247- for handler in group :
248- args = None
249-
250- if isinstance (handler , handler_type ):
251- try :
252- if await handler .check (self .client , parsed_update ):
253- args = (parsed_update ,)
254- except Exception as e :
255- log .exception (e )
256- continue
257-
258- elif isinstance (handler , RawUpdateHandler ):
259- args = (update , users , chats )
260-
261- if args is None :
262- continue
263-
264- try :
265- if inspect .iscoroutinefunction (handler .callback ):
266- await handler .callback (self .client , * args )
267- else :
268- await self .loop .run_in_executor (
269- self .client .executor ,
270- handler .callback ,
271- self .client ,
272- * args ,
273- )
274- except hydrogram .StopPropagation :
275- raise
276- except hydrogram .ContinuePropagation :
277- continue
278- except Exception as e :
279- log .exception (e )
280-
281- break
282- except hydrogram .StopPropagation :
283- pass
284- except Exception as e :
285- log .exception (e )
189+ await self ._process_packet (packet , lock )
190+
191+ async def _process_packet (self , packet , lock ):
192+ try :
193+ update , users , chats = packet
194+ parser = self .update_parsers .get (type (update ))
195+ if not parser :
196+ return
197+
198+ parsed_update , handler_type = await parser (update , users , chats )
199+ async with lock :
200+ for group in self .groups .values ():
201+ for handler in group :
202+ await self ._handle_update (
203+ handler , handler_type , parsed_update , update , users , chats
204+ )
205+ except hydrogram .StopPropagation :
206+ pass
207+ except Exception as e :
208+ log .exception (e )
209+
210+ async def _handle_update (self , handler , handler_type , parsed_update , update , users , chats ):
211+ try :
212+ if isinstance (handler , handler_type ):
213+ if await handler .check (self .client , parsed_update ):
214+ await self ._execute_callback (handler , parsed_update )
215+ elif isinstance (handler , RawUpdateHandler ):
216+ await self ._execute_callback (handler , update , users , chats )
217+ except hydrogram .StopPropagation :
218+ raise
219+ except hydrogram .ContinuePropagation :
220+ pass
221+ except Exception as e :
222+ log .exception (e )
223+
224+ async def _execute_callback (self , handler , * args ):
225+ if inspect .iscoroutinefunction (handler .callback ):
226+ await handler .callback (self .client , * args )
227+ else :
228+ await self .loop .run_in_executor (
229+ self .client .executor , handler .callback , self .client , * args
230+ )
0 commit comments