Skip to content

Commit e459ae0

Browse files
committed
refactor(dispatcher): optimize and reorganize
1 parent d1a6bae commit e459ae0

1 file changed

Lines changed: 110 additions & 165 deletions

File tree

hydrogram/dispatcher.py

Lines changed: 110 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,7 @@
6060

6161

6262
class Dispatcher:
63-
NEW_MESSAGE_UPDATES = (
64-
UpdateNewMessage,
65-
UpdateNewChannelMessage,
66-
UpdateNewScheduledMessage,
67-
)
63+
NEW_MESSAGE_UPDATES = (UpdateNewMessage, UpdateNewChannelMessage, UpdateNewScheduledMessage)
6864
EDIT_MESSAGE_UPDATES = (UpdateEditMessage, UpdateEditChannelMessage)
6965
DELETE_MESSAGES_UPDATES = (UpdateDeleteMessages, UpdateDeleteChannelMessages)
7066
CALLBACK_QUERY_UPDATES = (UpdateBotCallbackQuery, UpdateInlineBotCallbackQuery)
@@ -78,208 +74,157 @@ class Dispatcher:
7874
def __init__(self, client: "hydrogram.Client"):
7975
self.client = client
8076
self.loop = asyncio.get_event_loop()
81-
8277
self.handler_worker_tasks = []
8378
self.locks_list = []
84-
8579
self.updates_queue = asyncio.Queue()
8680
self.groups = OrderedDict()
81+
self._init_update_parsers()
8782

88-
async def message_parser(update, users, chats):
89-
return (
90-
await hydrogram.types.Message._parse(
91-
client=self.client,
92-
message=update.message,
93-
users=users,
94-
chats=chats,
95-
is_scheduled=isinstance(update, UpdateNewScheduledMessage),
96-
),
97-
MessageHandler,
98-
)
99-
100-
async def edited_message_parser(update, users, chats):
101-
# Edited messages are parsed the same way as new messages, but the handler is different
102-
parsed, _ = await message_parser(update, users, chats)
103-
104-
return (parsed, EditedMessageHandler)
105-
106-
def deleted_messages_parser(update, users, chats):
107-
return (
108-
utils.parse_deleted_messages(self.client, update),
109-
DeletedMessagesHandler,
110-
)
111-
112-
async def callback_query_parser(update, users, chats):
113-
return (
114-
await hydrogram.types.CallbackQuery._parse(self.client, update, users),
115-
CallbackQueryHandler,
116-
)
117-
118-
def user_status_parser(update, users, chats):
119-
return (
120-
hydrogram.types.User._parse_user_status(self.client, update),
121-
UserStatusHandler,
122-
)
123-
124-
def inline_query_parser(update, users, chats):
125-
return (
126-
hydrogram.types.InlineQuery._parse(self.client, update, users),
127-
InlineQueryHandler,
128-
)
129-
130-
def poll_parser(update, users, chats):
131-
return (
132-
hydrogram.types.Poll._parse_update(self.client, update),
133-
PollHandler,
134-
)
135-
136-
def chosen_inline_result_parser(update, users, chats):
137-
return (
138-
hydrogram.types.ChosenInlineResult._parse(self.client, update, users),
139-
ChosenInlineResultHandler,
140-
)
141-
142-
def chat_member_updated_parser(update, users, chats):
143-
return (
144-
hydrogram.types.ChatMemberUpdated._parse(self.client, update, users, chats),
145-
ChatMemberUpdatedHandler,
146-
)
147-
148-
def chat_join_request_parser(update, users, chats):
149-
return (
150-
hydrogram.types.ChatJoinRequest._parse(self.client, update, users, chats),
151-
ChatJoinRequestHandler,
152-
)
153-
83+
def _init_update_parsers(self):
15484
self.update_parsers = {
155-
Dispatcher.NEW_MESSAGE_UPDATES: message_parser,
156-
Dispatcher.EDIT_MESSAGE_UPDATES: edited_message_parser,
157-
Dispatcher.DELETE_MESSAGES_UPDATES: deleted_messages_parser,
158-
Dispatcher.CALLBACK_QUERY_UPDATES: callback_query_parser,
159-
Dispatcher.USER_STATUS_UPDATES: user_status_parser,
160-
Dispatcher.BOT_INLINE_QUERY_UPDATES: inline_query_parser,
161-
Dispatcher.POLL_UPDATES: poll_parser,
162-
Dispatcher.CHOSEN_INLINE_RESULT_UPDATES: chosen_inline_result_parser,
163-
Dispatcher.CHAT_MEMBER_UPDATES: chat_member_updated_parser,
164-
Dispatcher.CHAT_JOIN_REQUEST_UPDATES: chat_join_request_parser,
85+
Dispatcher.NEW_MESSAGE_UPDATES: self._message_parser,
86+
Dispatcher.EDIT_MESSAGE_UPDATES: self._edited_message_parser,
87+
Dispatcher.DELETE_MESSAGES_UPDATES: self._deleted_messages_parser,
88+
Dispatcher.CALLBACK_QUERY_UPDATES: self._callback_query_parser,
89+
Dispatcher.USER_STATUS_UPDATES: self._user_status_parser,
90+
Dispatcher.BOT_INLINE_QUERY_UPDATES: self._inline_query_parser,
91+
Dispatcher.POLL_UPDATES: self._poll_parser,
92+
Dispatcher.CHOSEN_INLINE_RESULT_UPDATES: self._chosen_inline_result_parser,
93+
Dispatcher.CHAT_MEMBER_UPDATES: self._chat_member_updated_parser,
94+
Dispatcher.CHAT_JOIN_REQUEST_UPDATES: self._chat_join_request_parser,
16595
}
166-
16796
self.update_parsers = {
16897
key: value for key_tuple, value in self.update_parsers.items() for key in key_tuple
16998
}
17099

100+
async def _message_parser(self, update, users, chats):
101+
return (
102+
await hydrogram.types.Message._parse(
103+
client=self.client,
104+
message=update.message,
105+
users=users,
106+
chats=chats,
107+
is_scheduled=isinstance(update, UpdateNewScheduledMessage),
108+
),
109+
MessageHandler,
110+
)
111+
112+
async def _edited_message_parser(self, update, users, chats):
113+
parsed, _ = await self._message_parser(update, users, chats)
114+
return parsed, EditedMessageHandler
115+
116+
def _deleted_messages_parser(self, update, users, chats):
117+
return utils.parse_deleted_messages(self.client, update), DeletedMessagesHandler
118+
119+
async def _callback_query_parser(self, update, users, chats):
120+
return await hydrogram.types.CallbackQuery._parse(
121+
self.client, update, users
122+
), CallbackQueryHandler
123+
124+
def _user_status_parser(self, update, users, chats):
125+
return hydrogram.types.User._parse_user_status(self.client, update), UserStatusHandler
126+
127+
def _inline_query_parser(self, update, users, chats):
128+
return hydrogram.types.InlineQuery._parse(self.client, update, users), InlineQueryHandler
129+
130+
def _poll_parser(self, update, users, chats):
131+
return hydrogram.types.Poll._parse_update(self.client, update), PollHandler
132+
133+
def _chosen_inline_result_parser(self, update, users, chats):
134+
return hydrogram.types.ChosenInlineResult._parse(
135+
self.client, update, users
136+
), ChosenInlineResultHandler
137+
138+
def _chat_member_updated_parser(self, update, users, chats):
139+
return hydrogram.types.ChatMemberUpdated._parse(
140+
self.client, update, users, chats
141+
), ChatMemberUpdatedHandler
142+
143+
def _chat_join_request_parser(self, update, users, chats):
144+
return hydrogram.types.ChatJoinRequest._parse(
145+
self.client, update, users, chats
146+
), ChatJoinRequestHandler
147+
171148
async def start(self):
172149
if not self.client.no_updates:
173-
for _ in range(self.client.workers):
174-
self.locks_list.append(asyncio.Lock())
175-
176-
self.handler_worker_tasks.append(
177-
self.loop.create_task(self.handler_worker(self.locks_list[-1]))
178-
)
179-
150+
self.locks_list = [asyncio.Lock() for _ in range(self.client.workers)]
151+
self.handler_worker_tasks = [
152+
self.loop.create_task(self.handler_worker(lock)) for lock in self.locks_list
153+
]
180154
log.info("Started %s HandlerTasks", self.client.workers)
181155

182156
async def stop(self):
183157
if not self.client.no_updates:
184158
for _ in range(self.client.workers):
185159
self.updates_queue.put_nowait(None)
186-
187-
for i in self.handler_worker_tasks:
188-
await i
189-
160+
await asyncio.gather(*self.handler_worker_tasks)
190161
self.handler_worker_tasks.clear()
191162
self.groups.clear()
192-
193163
log.info("Stopped %s HandlerTasks", self.client.workers)
194164

195165
def add_handler(self, handler, group: int):
196166
async def fn():
197-
for lock in self.locks_list:
198-
await lock.acquire()
199-
200-
try:
167+
async with asyncio.Lock():
201168
if group not in self.groups:
202169
self.groups[group] = []
203170
self.groups = OrderedDict(sorted(self.groups.items()))
204-
205171
self.groups[group].append(handler)
206-
finally:
207-
for lock in self.locks_list:
208-
lock.release()
209172

210173
self.loop.create_task(fn())
211174

212175
def remove_handler(self, handler, group: int):
213176
async def fn():
214-
for lock in self.locks_list:
215-
await lock.acquire()
216-
217-
try:
177+
async with asyncio.Lock():
218178
if group not in self.groups:
219179
raise ValueError(f"Group {group} does not exist. Handler was not removed.")
220-
221180
self.groups[group].remove(handler)
222-
finally:
223-
for lock in self.locks_list:
224-
lock.release()
225181

226182
self.loop.create_task(fn())
227183

228184
async def handler_worker(self, lock):
229185
while True:
230186
packet = await self.updates_queue.get()
231-
232187
if packet is None:
233188
break
234-
235-
try:
236-
update, users, chats = packet
237-
parser = self.update_parsers.get(type(update), None)
238-
239-
parsed_update, handler_type = (
240-
await parser(update, users, chats)
241-
if parser is not None
242-
else (None, type(None))
243-
)
244-
245-
async with lock:
246-
for group in self.groups.values():
247-
for handler in group:
248-
args = None
249-
250-
if isinstance(handler, handler_type):
251-
try:
252-
if await handler.check(self.client, parsed_update):
253-
args = (parsed_update,)
254-
except Exception as e:
255-
log.exception(e)
256-
continue
257-
258-
elif isinstance(handler, RawUpdateHandler):
259-
args = (update, users, chats)
260-
261-
if args is None:
262-
continue
263-
264-
try:
265-
if inspect.iscoroutinefunction(handler.callback):
266-
await handler.callback(self.client, *args)
267-
else:
268-
await self.loop.run_in_executor(
269-
self.client.executor,
270-
handler.callback,
271-
self.client,
272-
*args,
273-
)
274-
except hydrogram.StopPropagation:
275-
raise
276-
except hydrogram.ContinuePropagation:
277-
continue
278-
except Exception as e:
279-
log.exception(e)
280-
281-
break
282-
except hydrogram.StopPropagation:
283-
pass
284-
except Exception as e:
285-
log.exception(e)
189+
await self._process_packet(packet, lock)
190+
191+
async def _process_packet(self, packet, lock):
192+
try:
193+
update, users, chats = packet
194+
parser = self.update_parsers.get(type(update))
195+
if not parser:
196+
return
197+
198+
parsed_update, handler_type = await parser(update, users, chats)
199+
async with lock:
200+
for group in self.groups.values():
201+
for handler in group:
202+
await self._handle_update(
203+
handler, handler_type, parsed_update, update, users, chats
204+
)
205+
except hydrogram.StopPropagation:
206+
pass
207+
except Exception as e:
208+
log.exception(e)
209+
210+
async def _handle_update(self, handler, handler_type, parsed_update, update, users, chats):
211+
try:
212+
if isinstance(handler, handler_type):
213+
if await handler.check(self.client, parsed_update):
214+
await self._execute_callback(handler, parsed_update)
215+
elif isinstance(handler, RawUpdateHandler):
216+
await self._execute_callback(handler, update, users, chats)
217+
except hydrogram.StopPropagation:
218+
raise
219+
except hydrogram.ContinuePropagation:
220+
pass
221+
except Exception as e:
222+
log.exception(e)
223+
224+
async def _execute_callback(self, handler, *args):
225+
if inspect.iscoroutinefunction(handler.callback):
226+
await handler.callback(self.client, *args)
227+
else:
228+
await self.loop.run_in_executor(
229+
self.client.executor, handler.callback, self.client, *args
230+
)

0 commit comments

Comments
 (0)