-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
135 lines (112 loc) · 4.91 KB
/
main.py
File metadata and controls
135 lines (112 loc) · 4.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# model/main.py
import os
import json
import time
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv
# ---- Resolve project root and load envs (works when run from ROADEYE/) ----
ROOT = Path(__file__).resolve().parents[1]
load_dotenv(ROOT / ".env")
load_dotenv(ROOT / ".env.local", override=True) # optional local overrides
# Make Ultralytics write settings to a writable place on macOS/local
os.environ.setdefault("ULTRALYTICS_SETTINGS_DIR", str(Path.home() / ".yolo_config"))
os.environ.setdefault("YOLO_CONFIG_DIR", os.environ["ULTRALYTICS_SETTINGS_DIR"])
# ---- Imports that depend on env being loaded ----
from model_loader import load_model
from image_utils import decode_base64_image
from db import increment_user_event
from detection import detect_events
from show_detections import to_bgr_ndarray, normalize_bbox, render_detections_data_url
from redis_utils import pop_image, push_event, is_on_cooldown, set_cooldown, publish_event, publish_viz_frame
VIZ_FPS = 8.0 # ~8 fps is smooth in browsers
_last_viz_ts = 0.0
EVENT_KEYS = {
"police_car": "Police Car",
"accident": "Accident",
"Arrow Board": "Road Construction",
"cones": "Road Construction",
}
IDLE_SLEEP_SEC = 0.05 # prevent busy spinning if queue is empty
def _env_summary():
# Helpful one-time log so you know which envs are actually set
redis_url = os.getenv("REDIS_URL") or f"redis://{os.getenv('REDIS_HOST','localhost')}:{os.getenv('REDIS_PORT','6379')}/{os.getenv('REDIS_DB','0')}"
print("🔧 Env loaded from:", ROOT / ".env", "and", ROOT / ".env.local")
print("🔗 REDIS_URL:", redis_url)
print("📂 ULTRALYTICS_SETTINGS_DIR:", os.environ["ULTRALYTICS_SETTINGS_DIR"])
def process_images(model):
global _last_viz_ts
print("📡 Listening for incoming images...")
while True:
try:
raw = pop_image()
if not raw:
time.sleep(IDLE_SLEEP_SEC)
continue
if isinstance(raw, (bytes, bytearray)):
raw = raw.decode("utf-8", errors="ignore")
message = json.loads(raw)
user_id = message.get("userId")
image_b64 = message.get("image")
if not user_id or not image_b64:
continue
location = message.get("location", {})
timestamp = message.get("timestamp") or datetime.utcnow().isoformat()
image = decode_base64_image(image_b64)
if image is None:
continue
img_bgr = to_bgr_ndarray(image)
ih, iw = img_bgr.shape[:2]
detections = detect_events(
model, image, conf_thresh=0.7, device=DEVICE, half=USE_HALF, imgsz=640
)
# --- NEW: publish annotated frame at VIZ_FPS ---
now = time.time()
if now - _last_viz_ts >= (1.0 / VIZ_FPS):
data_url = render_detections_data_url(image, detections, max_width=1024, quality=80)
if data_url:
publish_viz_frame(json.dumps({
"type": "viz_frame",
"ts": int(now * 1000),
"image": data_url
}))
_last_viz_ts = now
# ---------------------------------------------
for det in detections:
event_type = det.get("class")
if not event_type:
continue
if is_on_cooldown(user_id, event_type):
print(f"⏱️ Cooldown active for {user_id} - {event_type}, skipping...")
continue
updated_name_event_type = EVENT_KEYS.get(event_type, event_type)
x, y, w, h = normalize_bbox(det["bbox"], iw, ih)
event = {
"userId": user_id,
"eventType": updated_name_event_type,
"location": location,
"timestamp": timestamp,
"confidence": round(float(det.get("confidence", 0.0)), 2),
"displayName": det.get("displayName", event_type),
"bbox": [x, y, w, h],
}
push_event(json.dumps(event))
publish_event(json.dumps({
"userId": user_id,
"event": updated_name_event_type,
"timestamp": timestamp,
"cooldown": 3 * 60,
}))
set_cooldown(user_id, event_type)
increment_user_event(user_id, event["displayName"])
print(f"✅ Event pushed: {updated_name_event_type} by {user_id}")
except KeyboardInterrupt:
print("\n🛑 Stopped by user.")
break
except Exception as e:
print(f"❌ Error: {e}")
time.sleep(0.25)
if __name__ == "__main__":
_env_summary()
model, DEVICE, USE_HALF = load_model()
process_images(model)