ML Worker for object detection at scale. Pulls tasks from Pub/Sub, reads images from GCS, runs RFDETR, stores results to GCS, and notifies an internal API.
- Consume tasks from Pub/Sub subscription
- Download image from GCS (
image_path) - Run detection with RFDETR
- Store JSON results to
results/<task_id>/detection_results.jsonin GCS - POST result summary to internal API
- Pub/Sub driven worker
- GCS I/O (images, results)
- Internal API callback
- Docker + Kubernetes ready
- Python 3.11+
- Google Cloud: Pub/Sub + Storage
- GCS bucket
-
Clone and setup:
git clone <repository-url> cd object-detection-worker
-
Install dependencies:
pip install -r requirements.txt
-
Configure environment:
cp .env.example .env # Edit .env with your configuration -
Run tests (optional):
pytest -q
-
Run the worker:
python -m src.main
# Build image
docker build -t object-detection-worker .
# Run container
docker run --env-file .env object-detection-worker# Apply all manifests
kubectl apply -f k8s/| Variable | Description | Default |
|---|---|---|
GCP_PROJECT_ID |
Google Cloud Project ID | your-gcp-project |
GCS_BUCKET |
GCS bucket name | object-detection-images |
PUBSUB_SUBSCRIPTION |
Pub/Sub subscription | detection-workers |
API_SERVICE_URL |
Internal API base URL | http://object-detection-api |
CONFIDENCE_THRESHOLD |
Detection threshold | 0.5 |
CALLBACK_TIMEOUT |
Callback timeout (s) | 30 |
Publish a message to Pub/Sub with:
{
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"image_path": "images/photo.jpg"
}Result JSON stored in GCS:
{
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"detections": [
{
"class_id": 0,
"class_name": "person",
"confidence": 0.85,
"bbox": {
"x1": 100.0,
"y1": 150.0,
"x2": 200.0,
"y2": 300.0
}
}
],
"processed_at": "2024-01-01T12:00:00Z",
"processing_time_ms": 1500
}src/
├── main.py
├── domain/
│ ├── entities/
│ └── repositories/
└── infrastructure/
├── config.py
├── models/
├── repositories/
└── services/
pytest -qGitHub Actions workflow .github/workflows/deploy.yml:
- Job 1: run tests →
pytest tests/ --verbose - Job 2: build & push Docker to GAR
- Apply K8s manifests with updated image
K8s HPA manifests included under k8s/. Tune based on Pub/Sub queue depth and resource usage.
INFO-level logs for task start/end, errors, and processing time.