Multi-source event extraction pipeline for Slack and Telegram.
The system ingests channel messages, scores candidates, extracts structured events with LLMs, deduplicates results, and optionally publishes a digest to Slack.
- Slack (
MessageSource.SLACK) - Telegram (
MessageSource.TELEGRAM)
- Direct orchestrator (recommended for local/dev):
scripts/run_multi_source_pipeline.py- Runs ingest -> candidates -> LLM extraction -> dedup in one process
- Queue-based workers (recommended for production):
- Scheduler + workers (
scripts/run_pipeline_scheduler.py,scripts/run_ingest_worker.py,scripts/run_extraction_worker.py,scripts/run_llm_worker.py,scripts/run_dedup_worker.py)
- Scheduler + workers (
- Source-aware domain model (
source_idon messages/candidates/events) - Source-aware repository methods for ingestion, candidate selection, extraction, and dedup
- Per-source prompt configuration (
config/prompts/slack.yaml,config/prompts/telegram.yaml) - SQLite and PostgreSQL support through repository factory
src/
adapters/ # Slack/Telegram clients, repositories, factories
clients/ # Wrapped client interfaces
config/ # Settings + logging
domain/ # Models, protocols, business constants
observability/ # Metrics/tracing
ports/ # Task queue and job runner ports
presentation/ # Streamlit orchestration helpers
services/ # Scoring, dedup, normalization, object registry, etc.
use_cases/ # Ingest/extract/dedup/publish orchestration
workers/ # Task-queue workers
scripts/ # CLI entry points and operational scripts
config/defaults/ # Example YAML templates copied by setup script
- Python
3.11+ uv- Slack bot token (
SLACK_BOT_TOKEN) - OpenAI API key (
OPENAI_API_KEY) - Optional for Telegram:
TELEGRAM_API_IDTELEGRAM_API_HASH
git clone https://github.com/VaitaR/slack-event-manager.git
cd slack-event-manager
pip install uv
make sync-dev./scripts/setup_config.shThis creates local editable files from config/defaults/*.example.yaml:
config/main.yamlconfig/channels.yamlconfig/object_registry.yamlconfig/telegram_channels.yaml.env
Edit .env:
SLACK_BOT_TOKEN=xoxb-...
OPENAI_API_KEY=sk-...
# Optional Telegram
TELEGRAM_API_ID=123456
TELEGRAM_API_HASH=...
# Optional PostgreSQL
POSTGRES_PASSWORD=...Runtime config is loaded from config/main.yaml + other config/*.yaml files and merged.
Important files:
config/main.yaml- global pipeline/db/llm/digest settingsconfig/channels.yaml- Slack scoring/channel configconfig/telegram_channels.yaml- Telegram channel configconfig/object_registry.yaml- canonical object mappings
Detailed reference: docs/CONFIG.md
# Run all enabled sources once
python scripts/run_multi_source_pipeline.py
# Run only Slack
python scripts/run_multi_source_pipeline.py --source slack
# Run only Telegram
python scripts/run_multi_source_pipeline.py --source telegram
# Continuous mode
python scripts/run_multi_source_pipeline.py --interval-seconds 3600
# Publish digest
python scripts/run_multi_source_pipeline.py --publish
# Dry-run publish
python scripts/run_multi_source_pipeline.py --publish --dry-runpython scripts/run_pipeline.pyUse this only if you need the older Slack-only flow. For two-source processing use run_multi_source_pipeline.py.
# Enqueue periodic iterations
python scripts/run_pipeline_scheduler.py --interval-seconds 300
# Workers
python scripts/run_ingest_worker.py
python scripts/run_extraction_worker.py
python scripts/run_llm_worker.py
python scripts/run_dedup_worker.pyNotes:
- Current ingest worker composition is Slack-oriented (
create_slack_ingestion_handlers). - Multi-source end-to-end processing is fully supported via
run_multi_source_pipeline.py.
docker compose build
docker compose up -dServices include PostgreSQL, pipeline scheduler/workers, Telegram worker, metrics exporter, and Streamlit UI.
# Fast tests
make test-quick
# Coverage
make test-cov
# Lint + format + typecheck + tests
make ci
# Fast local checks
make pre-commitTooling:
- Formatter/Linter: Ruff
- Type checking: mypy
- Tests: pytest
streamlit run app.pyDefault URL: http://127.0.0.1:8501
- Docs index:
docs/README.md - Configuration:
docs/CONFIG.md - Metrics/health/observability:
docs/OPERATIONS_OBSERVABILITY.md - Pipeline workers:
docs/pipeline_workers.md
- Keep secrets only in
.env - Keep non-sensitive app config in
config/*.yaml - Prefer
maketargets over manual tool invocations - Validate changes with
make cibefore pushing