-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun.py
More file actions
85 lines (65 loc) · 2.49 KB
/
run.py
File metadata and controls
85 lines (65 loc) · 2.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import asyncio
import logging
import os
import pathlib
from datetime import datetime
from aiogram import Dispatcher, Bot
from aiogram.fsm.storage.redis import RedisStorage
from aiogram.types import BotCommand
from sqlalchemy.engine import URL
from bot.db.engine import create_async_engine, get_session_maker
from bot.handlers import apsched
from bot.handlers.bot_commands import bot_commands
from bot.handlers.main import router as router_main
from bot.handlers.day7 import router as router_day7
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def bot_start(logger: logging.Logger) -> None:
logging.basicConfig(level=logging.INFO)
commands_for_bot = []
for cmd in bot_commands:
commands_for_bot.append(BotCommand(command=cmd[0], description=cmd[1]))
bot = Bot(token=os.getenv('TG_TOKEN'))
dp = Dispatcher()
dp.include_router(router_main)
dp.include_router(router_day7)
await bot.delete_webhook()
postgres_url = URL.create(
"postgresql+asyncpg",
username=os.getenv("DB_USER"),
host=os.getenv("DB_HOST"),
database=os.getenv("DB_NAME"),
password=os.getenv("DB_PASS"),
port=os.getenv("DB_PORT")
)
async_engine = create_async_engine(postgres_url)
session_maker = get_session_maker(async_engine)
scheduler = AsyncIOScheduler(timezone="Europe/Moscow")
scheduler.add_job(apsched.send_message_cron,
trigger='cron',
hour=int(os.getenv("AUTO_MESSAGE_HOUR")),
# minute=datetime.now().minute + 1,
start_date=datetime.now(),
kwargs={'bot': bot, 'session_maker': session_maker})
scheduler.start()
await dp.start_polling(bot,
session_maker=session_maker,
logger=logger,
allowed_updates=dp.resolve_used_update_types())
def setup_env():
"""Настройка переменных окружения"""
from dotenv import load_dotenv
path = pathlib.Path(__file__).parent
dotenv_path = path.joinpath('.env')
if dotenv_path.exists():
load_dotenv(dotenv_path)
def main():
"""Функция для запуска через poetry"""
logger = logging.getLogger(__name__)
try:
setup_env()
asyncio.run(bot_start(logger))
logger.info('Bot started')
except (KeyboardInterrupt, SystemExit):
logger.info('Bot stopped')
if __name__ == '__main__':
main()