forked from python-telegram-bot/python-telegram-bot
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathchatmemberbot.py
More file actions
166 lines (136 loc) · 5.96 KB
/
chatmemberbot.py
File metadata and controls
166 lines (136 loc) · 5.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#!/usr/bin/env python
# pylint: disable=C0116,W0613
# This program is dedicated to the public domain under the CC0 license.
"""
Simple Bot to handle '(my_)chat_member' updates.
Greets new users & keeps track of which chats the bot is in.
Usage:
Press Ctrl-C on the command line or send a signal to the process to stop the
bot.
"""
import logging
from typing import Tuple, Optional
from telegram import Update, Chat, ChatMember, ParseMode, ChatMemberUpdated
from telegram.ext import (
Updater,
CommandHandler,
CallbackContext,
ChatMemberHandler,
)
# Enable logging
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
def extract_status_change(
chat_member_update: ChatMemberUpdated,
) -> Optional[Tuple[bool, bool]]:
"""Takes a ChatMemberUpdated instance and extracts whether the 'old_chat_member' was a member
of the chat and whether the 'new_chat_member' is a member of the chat. Returns None, if
the status didn't change.
"""
status_change = chat_member_update.difference().get("status")
old_is_member, new_is_member = chat_member_update.difference().get("is_member", (None, None))
if status_change is None:
return None
old_status, new_status = status_change
was_member = (
old_status
in [
ChatMember.MEMBER,
ChatMember.CREATOR,
ChatMember.ADMINISTRATOR,
]
or (old_status == ChatMember.RESTRICTED and old_is_member is True)
)
is_member = (
new_status
in [
ChatMember.MEMBER,
ChatMember.CREATOR,
ChatMember.ADMINISTRATOR,
]
or (new_status == ChatMember.RESTRICTED and new_is_member is True)
)
return was_member, is_member
def track_chats(update: Update, context: CallbackContext) -> None:
"""Tracks the chats the bot is in."""
result = extract_status_change(update.my_chat_member)
if result is None:
return
was_member, is_member = result
# Let's check who is responsible for the change
cause_name = update.effective_user.full_name
# Handle chat types differently:
chat = update.effective_chat
if chat.type == Chat.PRIVATE:
if not was_member and is_member:
logger.info("%s started the bot", cause_name)
context.bot_data.setdefault("user_ids", set()).add(chat.id)
elif was_member and not is_member:
logger.info("%s blocked the bot", cause_name)
context.bot_data.setdefault("user_ids", set()).discard(chat.id)
elif chat.type in [Chat.GROUP, Chat.SUPERGROUP]:
if not was_member and is_member:
logger.info("%s added the bot to the group %s", cause_name, chat.title)
context.bot_data.setdefault("group_ids", set()).add(chat.id)
elif was_member and not is_member:
logger.info("%s removed the bot from the group %s", cause_name, chat.title)
context.bot_data.setdefault("group_ids", set()).discard(chat.id)
else:
if not was_member and is_member:
logger.info("%s added the bot to the channel %s", cause_name, chat.title)
context.bot_data.setdefault("channel_ids", set()).add(chat.id)
elif was_member and not is_member:
logger.info("%s removed the bot from the channel %s", cause_name, chat.title)
context.bot_data.setdefault("channel_ids", set()).discard(chat.id)
def show_chats(update: Update, context: CallbackContext) -> None:
"""Shows which chats the bot is in"""
user_ids = ", ".join(str(uid) for uid in context.bot_data.setdefault("user_ids", set()))
group_ids = ", ".join(str(gid) for gid in context.bot_data.setdefault("group_ids", set()))
channel_ids = ", ".join(str(cid) for cid in context.bot_data.setdefault("channel_ids", set()))
text = (
f"@{context.bot.username} is currently in a conversation with the user IDs {user_ids}."
f" Moreover it is a member of the groups with IDs {group_ids} "
f"and administrator in the channels with IDs {channel_ids}."
)
update.effective_message.reply_text(text)
def greet_chat_members(update: Update, context: CallbackContext) -> None:
"""Greets new users in chats and announces when someone leaves"""
result = extract_status_change(update.chat_member)
if result is None:
return
was_member, is_member = result
cause_name = update.chat_member.from_user.mention_html()
member_name = update.chat_member.new_chat_member.user.mention_html()
if not was_member and is_member:
update.effective_chat.send_message(
f"{member_name} was added by {cause_name}. Welcome!",
parse_mode=ParseMode.HTML,
)
elif was_member and not is_member:
update.effective_chat.send_message(
f"{member_name} is no longer with us. Thanks a lot, {cause_name} ...",
parse_mode=ParseMode.HTML,
)
def main() -> None:
"""Start the bot."""
# Create the Updater and pass it your bot's token.
updater = Updater("TOKEN")
# Get the dispatcher to register handlers
dispatcher = updater.dispatcher
# Keep track of which chats the bot is in
dispatcher.add_handler(ChatMemberHandler(track_chats, ChatMemberHandler.MY_CHAT_MEMBER))
dispatcher.add_handler(CommandHandler("show_chats", show_chats))
# Handle members joining/leaving chats.
dispatcher.add_handler(ChatMemberHandler(greet_chat_members, ChatMemberHandler.CHAT_MEMBER))
# Start the Bot
# We pass 'allowed_updates' handle *all* updates including `chat_member` updates
# To reset this, simply pass `allowed_updates=[]`
updater.start_polling(allowed_updates=Update.ALL_TYPES)
# Run the bot until you press Ctrl-C or the process receives SIGINT,
# SIGTERM or SIGABRT. This should be used most of the time, since
# start_polling() is non-blocking and will stop the bot gracefully.
updater.idle()
if __name__ == "__main__":
main()