forked from python-telegram-bot/python-telegram-bot
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathchatmemberbot.py
More file actions
158 lines (127 loc) · 6.18 KB
/
chatmemberbot.py
File metadata and controls
158 lines (127 loc) · 6.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#!/usr/bin/env python
# pylint: disable=unused-argument, wrong-import-position
# This program is dedicated to the public domain under the CC0 license.
"""
Simple Bot to handle '(my_)chat_member' updates.
Greets new users & keeps track of which chats the bot is in.
Usage:
Press Ctrl-C on the command line or send a signal to the process to stop the
bot.
"""
import logging
from typing import Optional, Tuple
from telegram import __version__ as TG_VER
try:
from telegram import __version_info__
except ImportError:
__version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment]
if __version_info__ < (20, 0, 0, "alpha", 1):
raise RuntimeError(
f"This example is not compatible with your current PTB version {TG_VER}. To view the "
f"{TG_VER} version of this example, "
f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
)
from telegram import Chat, ChatMember, ChatMemberUpdated, Update
from telegram.constants import ParseMode
from telegram.ext import Application, ChatMemberHandler, CommandHandler, ContextTypes
# Enable logging
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
def extract_status_change(chat_member_update: ChatMemberUpdated) -> Optional[Tuple[bool, bool]]:
"""Takes a ChatMemberUpdated instance and extracts whether the 'old_chat_member' was a member
of the chat and whether the 'new_chat_member' is a member of the chat. Returns None, if
the status didn't change.
"""
status_change = chat_member_update.difference().get("status")
old_is_member, new_is_member = chat_member_update.difference().get("is_member", (None, None))
if status_change is None:
return None
old_status, new_status = status_change
was_member = old_status in [
ChatMember.MEMBER,
ChatMember.OWNER,
ChatMember.ADMINISTRATOR,
] or (old_status == ChatMember.RESTRICTED and old_is_member is True)
is_member = new_status in [
ChatMember.MEMBER,
ChatMember.OWNER,
ChatMember.ADMINISTRATOR,
] or (new_status == ChatMember.RESTRICTED and new_is_member is True)
return was_member, is_member
async def track_chats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Tracks the chats the bot is in."""
result = extract_status_change(update.my_chat_member)
if result is None:
return
was_member, is_member = result
# Let's check who is responsible for the change
cause_name = update.effective_user.full_name
# Handle chat types differently:
chat = update.effective_chat
if chat.type == Chat.PRIVATE:
if not was_member and is_member:
logger.info("%s started the bot", cause_name)
context.bot_data.setdefault("user_ids", set()).add(chat.id)
elif was_member and not is_member:
logger.info("%s blocked the bot", cause_name)
context.bot_data.setdefault("user_ids", set()).discard(chat.id)
elif chat.type in [Chat.GROUP, Chat.SUPERGROUP]:
if not was_member and is_member:
logger.info("%s added the bot to the group %s", cause_name, chat.title)
context.bot_data.setdefault("group_ids", set()).add(chat.id)
elif was_member and not is_member:
logger.info("%s removed the bot from the group %s", cause_name, chat.title)
context.bot_data.setdefault("group_ids", set()).discard(chat.id)
else:
if not was_member and is_member:
logger.info("%s added the bot to the channel %s", cause_name, chat.title)
context.bot_data.setdefault("channel_ids", set()).add(chat.id)
elif was_member and not is_member:
logger.info("%s removed the bot from the channel %s", cause_name, chat.title)
context.bot_data.setdefault("channel_ids", set()).discard(chat.id)
async def show_chats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Shows which chats the bot is in"""
user_ids = ", ".join(str(uid) for uid in context.bot_data.setdefault("user_ids", set()))
group_ids = ", ".join(str(gid) for gid in context.bot_data.setdefault("group_ids", set()))
channel_ids = ", ".join(str(cid) for cid in context.bot_data.setdefault("channel_ids", set()))
text = (
f"@{context.bot.username} is currently in a conversation with the user IDs {user_ids}."
f" Moreover it is a member of the groups with IDs {group_ids} "
f"and administrator in the channels with IDs {channel_ids}."
)
await update.effective_message.reply_text(text)
async def greet_chat_members(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Greets new users in chats and announces when someone leaves"""
result = extract_status_change(update.chat_member)
if result is None:
return
was_member, is_member = result
cause_name = update.chat_member.from_user.mention_html()
member_name = update.chat_member.new_chat_member.user.mention_html()
if not was_member and is_member:
await update.effective_chat.send_message(
f"{member_name} was added by {cause_name}. Welcome!",
parse_mode=ParseMode.HTML,
)
elif was_member and not is_member:
await update.effective_chat.send_message(
f"{member_name} is no longer with us. Thanks a lot, {cause_name} ...",
parse_mode=ParseMode.HTML,
)
def main() -> None:
"""Start the bot."""
# Create the Application and pass it your bot's token.
application = Application.builder().token("TOKEN").build()
# Keep track of which chats the bot is in
application.add_handler(ChatMemberHandler(track_chats, ChatMemberHandler.MY_CHAT_MEMBER))
application.add_handler(CommandHandler("show_chats", show_chats))
# Handle members joining/leaving chats.
application.add_handler(ChatMemberHandler(greet_chat_members, ChatMemberHandler.CHAT_MEMBER))
# Run the bot until the user presses Ctrl-C
# We pass 'allowed_updates' handle *all* updates including `chat_member` updates
# To reset this, simply pass `allowed_updates=[]`
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()