This repository was archived by the owner on Apr 10, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmission.py
More file actions
5249 lines (4569 loc) · 237 KB
/
mission.py
File metadata and controls
5249 lines (4569 loc) · 237 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
ERIC — Mission Logic
Camera strategy:
Navigation (moving): Layer 1 (LiDAR/OAK-D) handles safety automatically.
Layer 2 (YOLO on OAK-D Myriad X) detects people/animals.
No Cosmos called while moving — Eric moves continuously.
Scanning (stopped): dual camera (pan-tilt + webcam), single stable frame each
360° scan (stopped): pan-tilt sweeps ±90° in 30° steps + ONE 180° chassis turn
(finer coverage, far less chassis movement than old 8×45° rotation)
Face/robot centering: pan-tilt only, settle before capture
Stabilization rule:
Every pantilt_move_wait() includes a settle delay.
Captures only happen when robot is stopped or pan-tilt has settled.
LED:
Adaptive — on only when captured frame is dark.
Sensor integration:
_sensor_context() builds a text summary of LiDAR + OAK-D readings that is
prepended to every Cosmos nav-check and scan prompt. This gives Cosmos real
metric ground-truth distances so it reasons accurately rather than guessing
from visual cues alone.
Nav2 integration:
_move_forward() uses Nav2 send_goal() when available, falling back to direct
motor control. Cosmos still decides WHERE to go — Nav2 handles HOW.
Async Cosmos:
_cosmos_frames_async() submits Cosmos calls to a ThreadPoolExecutor so the
mission loop can keep doing sensor checks while Cosmos is thinking.
Multi-step missions:
Briefing is parsed by Cosmos into MissionStep objects at start.
Each step has a target + action type (find_and_approach, deliver_message,
speak_to, wait_for_response, photograph). Steps advance sequentially.
Mission only ends after ALL steps are complete.
Eye-contact greeting:
Persons are only greeted when Cosmos confirms they are close AND facing Eric.
Terrain speed control:
TERRAIN_SPEED_MAP maps terrain strings to motor speeds. Impassable terrain
(stairs, gaps, walls) triggers the full avoidance pipeline.
Logging:
All AI calls, motor actions, and mission events are logged via logger.
"""
import time
import threading
import logging
import json
import math
import pathlib
import datetime
import dataclasses
import concurrent.futures
import requests
from typing import Optional
from config import MOTOR_SPEED_SLOW, MOTOR_SPEED_NORMAL, MOTOR_SPEED_FAST, MISSIONS_DIR, VLLM_URL, COSMOS_MODEL
def _safe_to_fwd() -> bool:
"""Guard before every motors.forward() — checks LiDAR obstacle state."""
try:
from lidar import safe_to_forward
return safe_to_forward()
except Exception:
return True # lidar not loaded — allow forward
from motors import motors
from cosmos import (
ask_cosmos, ask_cosmos_plain, set_mission_briefing, get_mission_briefing,
capture_frame, capture_frames_video,
start_frame_buffer, get_buffered_frames,
CAMERA_WEBCAM, CAMERA_PANTILT
)
from tts import speak
from logger import (
log_ai, log_action, log_mission_event,
start_mission_log, end_mission_log, log_exception
)
from alarm import sound_alarm, stop_alarm, AlarmType
log = logging.getLogger("eric.mission")
# ─── Async Cosmos executor ────────────────────────────────────────────────────
# Max 2 workers: one for nav checks, one for scan analysis.
# This lets the mission loop keep running sensor checks while Cosmos is thinking.
_cosmos_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=2, thread_name_prefix="cosmos"
)
class State:
IDLE = "idle"
SEARCHING = "searching"
SCANNING_360 = "scanning_360"
INTERACTING = "interacting"
AVOIDING = "avoiding"
COMPLETE = "complete"
# ─── Mission State Container ──────────────────────────────────────────────────
# Consolidates every mutable module-level global into one typed dataclass.
#
# Benefits vs 20+ scattered globals:
# • Thread-safety — attribute access is atomic; no partial-update windows
# • Testability — reset() gives a clean slate without a module reload
# • Debuggability — repr() dumps all state in one log line
# • Readability — _ms.mission_active is explicit, not a mystery global
#
# External callers (GUI, etc.) import _ms directly:
# from mission import _ms
# if _ms.mission_active: ...
# ─────────────────────────────────────────────────────────────────────────────
@dataclasses.dataclass
class MissionState:
"""Single source of truth for all mutable mission state."""
# ── Core control ──────────────────────────────────────────────────────────
mission_active: bool = False
mission_state: str = State.IDLE
conversation_history: list = dataclasses.field(default_factory=list)
# ── Search / avoidance counters ───────────────────────────────────────────
empty_scans: int = 0
avoid_attempts: int = 0
scans_since_360: int = 0
target_spotted_count: int = 0
nav_clips_since_scan: int = 0
# ── Mission step engine ───────────────────────────────────────────────────
mission_steps: list = dataclasses.field(default_factory=list)
current_step_idx: int = 0
# ── YAML mission metadata ─────────────────────────────────────────────────
mission_alarm_type: str = AlarmType.HAZARD
mission_target_objects: list = dataclasses.field(default_factory=list)
mission_flags: dict = dataclasses.field(default_factory=dict)
mission_find_count: int = 0
mission_hazard_log: list = dataclasses.field(default_factory=list)
# ── Async nav check ───────────────────────────────────────────────────────
pending_nav: object = None # concurrent.futures.Future | None
last_nav_result: dict = dataclasses.field(default_factory=dict)
# ── YOLO Layer 2 detection ────────────────────────────────────────────────
yolo_person_detected: bool = False
yolo_detect_label: object = None
yolo_detect_distance: object = None
yolo_detect_bearing: object = None
yolo_detect_bearing_deg: object = None
yolo_detect_time: float = 0.0
# ── TTS head movement ─────────────────────────────────────────────────────
head_talking: bool = False
def reset_counters(self):
"""Reset search/avoidance counters — call when starting a new search phase."""
self.empty_scans = 0
self.avoid_attempts = 0
self.scans_since_360 = 0
self.target_spotted_count = 0
self.nav_clips_since_scan = 0
def reset_for_new_mission(self):
"""Full reset — call at mission start."""
self.conversation_history = []
self.mission_find_count = 0
self.mission_hazard_log = []
self.pending_nav = None
self.last_nav_result = {}
self.yolo_person_detected = False
self.yolo_detect_label = None
self.yolo_detect_distance = None
self.yolo_detect_bearing = None
self.yolo_detect_bearing_deg = None
self.yolo_detect_time = 0.0
self.reset_counters()
def __repr__(self) -> str:
return (
f"MissionState(active={self.mission_active}, state={self.mission_state}, "
f"step={self.current_step_idx}/{len(self.mission_steps)}, "
f"empty={self.empty_scans}, avoid={self.avoid_attempts}, "
f"spotted={self.target_spotted_count})"
)
# ── Module-level singleton — the only mutable state in this module ────────────
_ms = MissionState()
# ── YOLO callback lock (replaces old _yolo_lock module global) ────────────────
_yolo_lock = threading.Lock()
# ── UI callback registry (infrastructure, not mission state) ──────────────────
_ui_callbacks: dict = {"eric_says": None, "status": None, "log": None}
# ── Backward-compat module-level accessors ────────────────────────────────────
# gui.py imports: mission_active, mission_state, conversation_history
# These are thin functions — gui.py must call them to get live state.
# The bare-name imports in gui.py line 31 are replaced by _ms references below.
def get_mission_active() -> bool:
return _ms.mission_active
def get_mission_state() -> str:
return _ms.mission_state
def get_conversation_history() -> list:
return _ms.conversation_history
# ── Tuning constants (never mutated at runtime) ──────────────────────────────
EMPTY_SCAN_LIMIT = 5 # trigger 360 after 5 consecutive empty scans
SCANS_BEFORE_360 = 10 # periodic 360 every 10 quick scans
MAX_AVOID_ATTEMPTS = 3 # force 360 after this many avoid failures
TARGET_CONFIRM_NEEDED = 1 # only needs 1 positive scan to approach
DETECTION_CONFIDENCE_MIN = 0.0 # Cosmos does not emit confidence scores — always 0.0
# below this, sweep detections are treated as hallucinations and skipped
# ─── Terrain Speed Map ────────────────────────────────────────────────────────
# None = impassable → triggers full avoidance pipeline + spoken warning
TERRAIN_SPEED_MAP: dict[str, float | None] = {
# Fast — smooth flat surfaces
"road": MOTOR_SPEED_FAST,
"floor": MOTOR_SPEED_FAST,
"tile": MOTOR_SPEED_FAST,
"tiles": MOTOR_SPEED_FAST,
"pavement": MOTOR_SPEED_FAST,
"concrete": MOTOR_SPEED_FAST,
"asphalt": MOTOR_SPEED_FAST,
"hardwood": MOTOR_SPEED_FAST,
"linoleum": MOTOR_SPEED_FAST,
"wood": MOTOR_SPEED_FAST,
"smooth": MOTOR_SPEED_FAST,
# Medium — outdoor traversable ground
"grass": MOTOR_SPEED_NORMAL,
"lawn": MOTOR_SPEED_NORMAL,
"gravel": MOTOR_SPEED_NORMAL,
"dirt": MOTOR_SPEED_NORMAL,
"soil": MOTOR_SPEED_NORMAL,
"sand": MOTOR_SPEED_NORMAL,
"path": MOTOR_SPEED_NORMAL,
"clear": MOTOR_SPEED_NORMAL,
"flat": MOTOR_SPEED_NORMAL,
"ground": MOTOR_SPEED_NORMAL,
# Slow — rough, soft, or mildly risky
"carpet": MOTOR_SPEED_SLOW,
"rug": MOTOR_SPEED_SLOW,
"mat": MOTOR_SPEED_SLOW,
"mud": MOTOR_SPEED_SLOW,
"wet": MOTOR_SPEED_SLOW,
"rocks": MOTOR_SPEED_SLOW,
"rocky": MOTOR_SPEED_SLOW,
"pebbles": MOTOR_SPEED_SLOW,
"slope": MOTOR_SPEED_SLOW, # shallow slope / ramp
"ramp": MOTOR_SPEED_SLOW,
"step": MOTOR_SPEED_SLOW, # single small step / curb
"curb": MOTOR_SPEED_SLOW,
"leaves": MOTOR_SPEED_SLOW,
"threshold": MOTOR_SPEED_SLOW,
"uneven": MOTOR_SPEED_SLOW,
"rough": MOTOR_SPEED_SLOW,
"bumpy": MOTOR_SPEED_SLOW,
# Impassable — stop and navigate around
"stairs": None,
"staircase": None,
"steps": None,
"wall": None,
"fence": None,
"water": None,
"gap": None,
"cliff": None,
"ledge": None,
"deep_slope": None,
"steep": None,
"blockade": None,
"barrier": None,
"curbs": None, # plural = raised road barrier
}
def _speed_for_terrain(terrain: str) -> float | None:
"""
Return target speed for a terrain string, or None if impassable.
Fuzzy-matches Cosmos inventions like 'rough_grass' or 'wet tiles'.
Falls back to MOTOR_SPEED_NORMAL for genuinely unknown terrain.
"""
t = str(terrain).lower().strip() if terrain else "clear"
if t in TERRAIN_SPEED_MAP:
return TERRAIN_SPEED_MAP[t]
# Partial keyword scan — longer keys first to avoid spurious short matches
for key in sorted(TERRAIN_SPEED_MAP, key=len, reverse=True):
if key in t:
log.debug(f"Terrain '{t}' → fuzzy match '{key}'")
return TERRAIN_SPEED_MAP[key]
log.debug(f"Unknown terrain '{t}' — defaulting to NORMAL speed")
return MOTOR_SPEED_NORMAL
# ─── Mission Step Engine ──────────────────────────────────────────────────────
@dataclasses.dataclass
class MissionStep:
step_num: int
target: str # e.g. "person", "robot", "cat"
action: str # see ACTION_TYPES below
message: str = "" # text for deliver_message / speak_to
photo_count: int = 1 # number of sharp photos to capture
wait_sec: int = 20 # seconds to wait for a response
completed: bool = False
# Valid action types:
# find_and_approach — get close, mark done (default)
# deliver_message — speak step.message to target, then advance
# speak_to — initiate conversation, wait wait_sec for reply
# wait_for_response — just wait wait_sec for target to say something
# photograph — save photo_count sharp close-range photos to disk
def register_ui_callbacks(**cbs):
_ui_callbacks.update(cbs)
def _ui(key, text):
"""Deliver a UI event. Never raises — a broken callback must not crash the mission."""
cb = _ui_callbacks.get(key)
if cb:
try:
cb(text)
except Exception as _exc:
log.warning(f"UI callback '{key}' raised: {_exc}")
def _head_talk_thread(tilt: int):
"""
Background thread — occasional natural head micro-movements while Eric speaks.
Pattern: hold at centre (random duration) -> move to random small angle -> return to centre.
Pan +-5 degrees, tilt offset +-3 degrees. Feels organic, not mechanical.
Stops when _head_talking flag is cleared.
"""
import random
try:
while _ms.head_talking:
# Hold at centre — random pause, sometimes long sometimes short
centre_hold = random.uniform(2.0, 6.0)
t0 = time.time()
while _ms.head_talking and (time.time() - t0) < centre_hold:
time.sleep(0.1)
if not _ms.head_talking:
break
# Small random position — pan +-5, slight tilt offset +-3
rand_pan = random.choice([-5, -4, -3, -2, 2, 3, 4, 5])
rand_tilt = tilt + random.choice([-3, -2, 0, 0, 2, 3])
motors.pantilt(rand_pan, rand_tilt, 30)
# Hold briefly at that angle
move_hold = random.uniform(0.8, 2.5)
t0 = time.time()
while _ms.head_talking and (time.time() - t0) < move_hold:
time.sleep(0.1)
if not _ms.head_talking:
break
# Return to centre
motors.pantilt(0, tilt, 30)
except Exception:
pass
finally:
try:
motors.pantilt(0, tilt, 30) # return to centre
except Exception:
pass
def eric_say(text):
if not text:
return
# Don't speak or display raw JSON — Cosmos sometimes leaks it into speak field
text_stripped = str(text).strip()
if text_stripped.startswith("{") or text_stripped.startswith("["):
log.warning(f"eric_say received JSON instead of plain text — suppressed: {text_stripped[:80]}")
return
_ui("eric_says", text_stripped)
log_mission_event("eric_say", text_stripped[:120])
# Start head movement thread while speaking — only if mission flag is set
_head_move = _ms.mission_flags.get("head_talk", False)
if _head_move:
try:
_current_tilt = getattr(_ms, "last_confirm_tilt", 10)
_ms.head_talking = True
_ht = threading.Thread(target=_head_talk_thread, args=(_current_tilt,), daemon=True)
_ht.start()
except Exception:
pass
speak(text_stripped) # speak full text — TTS handles all sentences
# Stop head movement — only if it was started
if _head_move:
try:
from tts import wait_speak_stop
wait_speak_stop()
except Exception:
pass
try:
_ms.head_talking = False
except Exception:
pass
# ─── Async Cosmos Wrapper ─────────────────────────────────────────────────────
def _cosmos_frames(frames, prompt, max_tokens=250, temp=0.3):
"""Synchronous Cosmos call with logging. Used directly or via async wrapper."""
from cosmos import _system_prompt as sys_prompt
# ── Token budget guard — model max_model_len=2048 ─────────────────────────
# Each image costs ~256 tokens. System prompt + mission briefing can be large.
# Estimate: 4 chars ~ 1 token. Reserve max_tokens for output.
# Budget: 2048 - max_tokens - (num_frames * 256) - 50 (safety margin)
_IMAGE_TOKENS = 256 # vLLM vision token cost per image
_CHAR_PER_TOKEN = 4
_token_budget = 2048 - max_tokens - (len(frames) * _IMAGE_TOKENS) - 50
_char_budget = max(_token_budget, 200) * _CHAR_PER_TOKEN
# Truncate system prompt (keep tail — mission briefing is appended at end)
_sys = sys_prompt or ""
_sys_char_limit = int(_char_budget * 0.4)
if len(_sys) > _sys_char_limit:
_sys = _sys[-_sys_char_limit:]
log.debug(f"_cosmos_frames: system prompt truncated to {_sys_char_limit} chars")
# Remaining budget for user prompt
_prompt_char_limit = max(_char_budget - len(_sys), 200)
_prompt = prompt if len(prompt) <= _prompt_char_limit else prompt[-_prompt_char_limit:]
if _prompt != prompt:
log.debug(f"_cosmos_frames: user prompt truncated to {_prompt_char_limit} chars")
img_content = [
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{f}"}}
for f in frames
]
img_content.append({"type": "text", "text": _prompt})
payload = {
"model": COSMOS_MODEL,
"messages": [
{"role": "system", "content": _sys},
{"role": "user", "content": img_content}
],
"max_tokens": max_tokens,
"temperature": temp,
"repetition_penalty": 1.15,
}
r = requests.post(VLLM_URL, json=payload, timeout=120)
r.raise_for_status()
response = r.json()["choices"][0]["message"]["content"].strip()
log_ai(prompt[-400:], response, label="COSMOS_FRAMES")
return response
def _cosmos_frames_async(frames, prompt, max_tokens=250, temp=0.3) -> concurrent.futures.Future:
"""
Submit Cosmos vision call to thread pool. Returns a Future immediately.
Call future.result(timeout=60) when you actually need the answer.
This lets the mission loop keep doing sensor checks while Cosmos is thinking.
"""
return _cosmos_executor.submit(_cosmos_frames, frames, prompt, max_tokens, temp)
# ─── Mission Step Helpers ─────────────────────────────────────────────────────
def _parse_mission_steps(briefing: str) -> list[MissionStep]:
"""
Ask Cosmos to parse the mission briefing into an ordered list of MissionStep objects.
Falls back to a single find_and_approach step if parsing fails.
"""
# ── Simple mission: skip Cosmos entirely — no KV cache bleed risk ────────
# If the briefing has no explicit step markers, it's a simple find mission.
# Build a single step directly from target_objects — no Cosmos call needed.
_step_markers = ["step 1:", "step 2:", "step1.", "step2.", "deliver_message",
"find_and_approach", "speak_to", "step_num"]
_is_multistep = any(m in briefing.lower() for m in _step_markers)
if not _is_multistep:
_tgt = (_ms.mission_target_objects[0]
if _ms.mission_target_objects else "target")
log.info(f"Simple mission — building single step: find_and_approach {_tgt!r}")
return [MissionStep(step_num=1, target=_tgt, action="find_and_approach")]
prompt = f"""You are parsing a robot mission briefing into structured, ordered steps.
BRIEFING:
\"\"\"{briefing}\"\"\"
Extract each discrete task as a step. Return ONLY a JSON array.
Valid action types:
"find_and_approach" — find the target and get within close range
"deliver_message" — speak a specific message to the target when close
"speak_to" — start a conversation with the target, wait for reply
"wait_for_response" — wait for the target to say something (use wait_sec)
"photograph" — take sharp close-range photos of the target (use photo_count)
JSON schema per step:
{{
"step_num": 1,
"target": "person",
"action": "deliver_message",
"message": "Package delivered.",
"photo_count": 1,
"wait_sec": 20
}}
Example for multi-step mission:
[
{{"step_num": 1, "target": "person", "action": "deliver_message",
"message": "Package delivered.", "photo_count": 1, "wait_sec": 20}},
{{"step_num": 2, "target": "robot", "action": "speak_to",
"message": "", "photo_count": 1, "wait_sec": 30}},
{{"step_num": 3, "target": "cat", "action": "photograph",
"message": "", "photo_count": 3, "wait_sec": 10}}
]
Return ONLY the JSON array. No markdown. No explanation. No extra text.
"""
try:
raw = ask_cosmos(prompt, max_tokens=500)
log_ai(prompt[-300:], raw, label="STEP_PARSE")
clean = raw.replace("```json", "").replace("```", "").strip()
s = clean.find("["); e = clean.rfind("]") + 1
items = json.loads(clean[s:e])
steps = []
for i, it in enumerate(items):
steps.append(MissionStep(
step_num = int(it.get("step_num", i + 1)),
target = str(it.get("target", "target")),
action = str(it.get("action", "find_and_approach")),
message = str(it.get("message", "")),
photo_count = int(it.get("photo_count", 1)),
wait_sec = int(it.get("wait_sec", 20)),
))
log.info(f"Parsed {len(steps)} mission steps: {[s.target for s in steps]}")
return steps
except Exception as e:
log_exception("_parse_mission_steps", e)
return [MissionStep(step_num=1, target="target", action="find_and_approach")]
def _current_step() -> Optional[MissionStep]:
if _ms.mission_steps and _ms.current_step_idx < len(_ms.mission_steps):
return _ms.mission_steps[_ms.current_step_idx]
return None
def _advance_step():
"""Mark the current step complete and move to the next, or end the mission."""
step = _current_step()
if step:
step.completed = True
log_mission_event(f"step_{step.step_num}_complete", f"{step.target} — {step.action}")
_ms.current_step_idx += 1
if _ms.current_step_idx >= len(_ms.mission_steps):
# All steps done
last_target = step.target if step else "all targets"
_handle_mission_complete(last_target)
else:
nxt = _current_step()
msg = f"Step {_ms.current_step_idx} complete. Now finding {nxt.target}."
eric_say(msg)
_ui("status", f"STEP {nxt.step_num}: {nxt.target.upper()}")
_ui("log", msg)
# Update Cosmos system prompt so it searches for the next target
set_mission_briefing(
f"CURRENT STEP {nxt.step_num} of {len(_ms.mission_steps)}: "
f"Find {nxt.target} and {nxt.action.replace('_', ' ')}.\n"
f"Original mission: {get_mission_briefing()}"
)
# Resume searching
_ms.reset_counters()
try:
from avoidance import reset_avoid_counter
reset_avoid_counter()
except ImportError as _exc:
log.debug(f"avoidance module not loaded: {_exc}")
_ms.mission_state = State.SEARCHING
if _safe_to_fwd():
motors.forward(MOTOR_SPEED_SLOW)
def _execute_step_action(obj_name: str):
"""
Called when Eric arrives at the current step's target.
Executes the required action (speak, photograph, wait, etc.) then advances.
"""
step = _current_step()
if not step:
_handle_mission_complete(obj_name)
return
_ms.mission_state = State.INTERACTING
motors.stop()
log_mission_event("step_arrived", f"step={step.step_num} target={step.target} action={step.action}")
log.info(f"Executing step {step.step_num}: {step.action} for {step.target}")
if step.action == "find_and_approach":
# For alarm missions (SAR, siren) — trigger full confirm+photo+alarm pipeline
# For narrative missions (AlarmType.NONE) — just advance
_is_alarm_mission = (
_ms.mission_alarm_type not in (AlarmType.NONE,)
and str(_ms.mission_alarm_type).lower() not in ("none", "null", "")
)
if _is_alarm_mission:
_confirm_and_photograph_target()
else:
_advance_step()
elif step.action == "deliver_message":
msg = step.message or f"Message delivered to {step.target}."
eric_say(msg)
log_mission_event("message_delivered", f"to={step.target}: {msg}")
motors.oled(0, "Delivering msg")
motors.oled(1, step.target[:16])
time.sleep(min(step.wait_sec, 10))
_advance_step()
elif step.action == "speak_to":
greeting = ask_cosmos_plain(
f"You have found {step.target}. "
+ (f"Your mission: {step.message}. " if step.message else "")
+ "Greet them warmly and start the conversation. 2 sentences.",
max_tokens=120
)
eric_say(greeting)
log_mission_event("spoke_to", f"{step.target}: {greeting[:80]}")
motors.oled(0, f"Talking to")
motors.oled(1, step.target[:16])
_ui("log", f"Waiting {step.wait_sec}s for {step.target} to respond...")
time.sleep(step.wait_sec)
_advance_step()
elif step.action == "wait_for_response":
eric_say(f"Waiting for {step.target} to respond.")
motors.oled(0, "Waiting...")
motors.oled(1, step.target[:16])
_ui("log", f"Waiting up to {step.wait_sec}s for {step.target} to speak...")
time.sleep(step.wait_sec)
_advance_step()
elif step.action == "photograph":
eric_say(f"I will take {step.photo_count} photo{'s' if step.photo_count > 1 else ''} of {step.target}.")
motors.oled(0, "Taking photos")
motors.oled(1, step.target[:16])
photos_taken = 0
max_attempts = step.photo_count * 4
for attempt in range(max_attempts):
if photos_taken >= step.photo_count:
break
frame = capture_frame(CAMERA_PANTILT, 1280, 720)
if frame and not _is_blurry(frame):
import base64 as _b64
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:19]
fname = f"photo_{step.target.replace(' ', '_')}_{photos_taken + 1}_{ts}.jpg"
out = pathlib.Path("missions/photos") / fname
out.parent.mkdir(parents=True, exist_ok=True)
out.write_bytes(_b64.b64decode(frame))
photos_taken += 1
_ui("log", f"📸 Photo {photos_taken}/{step.photo_count} saved: {fname}")
log_mission_event("photo_saved", fname)
motors.oled(1, f"Photo {photos_taken}/{step.photo_count}")
time.sleep(0.8)
else:
time.sleep(0.4)
completion_msg = f"Captured {photos_taken} of {step.photo_count} photo(s) of {step.target}."
eric_say(completion_msg)
log_mission_event("photograph_done", completion_msg)
_advance_step()
else:
log.warning(f"Unknown step action '{step.action}' — advancing")
_advance_step()
def _parse_json(response, fallback, label="COSMOS"):
try:
clean = response.replace("```json", "").replace("```python", "").replace("```", "").strip()
# ── Handle JSON array — Cosmos sometimes returns [{...}, {...}] ───────
# Merge all items: pick the highest-priority object across all entries,
# collect all object_names, and OR all boolean flags together.
arr_start = clean.find("[")
obj_start = clean.find("{")
if arr_start >= 0 and (obj_start < 0 or arr_start < obj_start):
arr_end = clean.rfind("]") + 1
if arr_end > arr_start:
items = json.loads(clean[arr_start:arr_end])
if isinstance(items, list) and items:
result = _merge_array_items(items, fallback)
# skip to normalization below
return _finalize_result(result, fallback, label)
# ── Normal single-object JSON ─────────────────────────────────────────
s = clean.find("{")
e = clean.rfind("}") + 1
if s >= 0 and e > s:
result = json.loads(clean[s:e])
return _finalize_result(result, fallback, label)
except Exception as _exc: # optional component
log.debug(f"optional component error: {_exc}")
log.debug(f"JSON parse failed (label={label}): {response[:80]}")
return fallback
# Object-name → category mapping for when Cosmos sets object="unknown"
# but object_name reveals what it actually is.
_NAME_TO_CATEGORY = {
# obstacles / furniture
"book": "obstacle", "box": "obstacle", "bag": "obstacle",
"chair": "obstacle", "table": "obstacle", "desk": "obstacle",
"bottle": "obstacle", "cup": "obstacle", "shoe": "shoe",
"slipper": "slipper", "sandal": "slipper",
# people
"man": "person", "woman": "person", "person": "person",
"human": "person", "child": "person", "kid": "person",
# robots — broad coverage for Cosmos inventions
"droid": "robot", "robot": "robot", "r2": "robot", "bb8": "robot",
"toy_droid": "robot", "toy_robot": "robot", "toy droid": "robot",
"mech": "robot", "android": "robot", "bot": "robot",
# walls / structural
"wall": "wall", "door": "wall", "fence": "wall",
}
# Non-standard object strings Cosmos invents that map to canonical categories.
# Applied in _finalize_result regardless of whether object is "unknown".
_OBJ_REMAP = {
"toy_droid": "robot", "toy_robot": "robot", "toy droid": "robot",
"toy robot": "robot", "droid": "robot", "android": "robot",
"mech": "robot", "bot": "robot",
"sandal": "slipper", "flip_flop": "slipper", "flip flop": "slipper",
"sneaker": "shoe", "boot": "shoe",
"human": "person", "man": "person", "woman": "person",
"kid": "person", "child": "person",
}
_OBJ_PRIORITY = ["person", "robot", "slipper", "shoe", "obstacle", "wall", "clear", "unknown"]
def _infer_category(obj: str, name: str | None) -> str:
"""If obj is 'unknown' but name hints at a real category, return that category."""
if obj not in ("unknown", "", None):
return obj
if not name:
return obj or "unknown"
name_lower = str(name).lower()
for keyword, category in _NAME_TO_CATEGORY.items():
if keyword in name_lower:
return category
return obj or "unknown"
def _merge_array_items(items: list, fallback: dict) -> dict:
"""Merge a list of per-frame result dicts into one combined result."""
merged = dict(fallback)
names = []
for item in items:
if not isinstance(item, dict):
continue
# Pick highest-priority object seen across frames
item_obj = _infer_category(
item.get("object", "unknown"),
item.get("object_name")
)
merged_obj = merged.get("object", "unknown")
if _OBJ_PRIORITY.index(item_obj) < _OBJ_PRIORITY.index(merged_obj):
merged["object"] = item_obj
# Collect names
n = item.get("object_name")
if n and str(n) not in names:
names.append(str(n))
# OR all boolean flags
for flag in ("wall_ahead", "obstacle_close", "small_obstacle",
"target_visible", "in_my_path", "mission_complete"):
if item.get(flag):
merged[flag] = True
# Take first non-empty string fields
for field in ("terrain", "distance", "target_direction",
"clearest_direction", "action", "speak", "physical_reasoning"):
if not merged.get(field) or merged[field] in (None, "", fallback.get(field)):
val = item.get(field)
if val and val not in (None, ""):
merged[field] = val
merged["object_name"] = ", ".join(names) if names else None
return merged
def _finalize_result(result: dict, fallback: dict, label: str) -> dict:
"""Normalize types, infer category from name, fill fallback, print."""
# ── Step 0: remap aliased field names Cosmos (2B) frequently hallucinates ─
# The model invents slight variations of canonical names. Catch them all here
# before any downstream logic sees them. "canonical" wins if both exist.
_FIELD_ALIASES: dict[str, str] = {
# speak
"speaker": "speak",
"speech": "speak",
"say": "speak",
"spoken": "speak",
"tts": "speak",
"announcement": "speak",
"narration": "speak",
"response": "speak",
# target_visible
"target_visibility": "target_visible",
"targetvisible": "target_visible",
"target_found": "target_visible",
"found": "target_visible",
"detected": "target_visible",
# physical_reasoning
"reasoning": "physical_reasoning",
"reason": "physical_reasoning",
"explanation": "physical_reasoning",
"analysis": "physical_reasoning",
"observation": "physical_reasoning",
"notes": "physical_reasoning",
"summary": "physical_reasoning",
# object_name
"name": "object_name",
"label": "object_name",
"object_label": "object_name",
# action
"movement": "action",
"next_action": "action",
"recommended_action": "action",
# clearest_direction
"clear_direction": "clearest_direction",
"best_direction": "clearest_direction",
"open_direction": "clearest_direction",
# target_direction
"direction": "target_direction",
"target_location": "target_direction",
"target_side": "target_direction",
}
for alias, canonical in _FIELD_ALIASES.items():
if alias in result:
if canonical not in result:
log.info(f"Field alias: '{alias}' → '{canonical}'")
result[canonical] = result.pop(alias)
else:
result.pop(alias) # canonical already present — drop the duplicate
# ── Step 0b: strip unknown fields so they don't pollute the debug print ───
_VALID_FIELDS = {
"object", "object_name", "terrain", "distance", "in_my_path",
"wall_ahead", "obstacle_close", "small_obstacle", "void_ahead",
"target_visible", "target_direction", "clearest_direction",
"action", "speak", "physical_reasoning", "mission_complete",
# nav-check only
"person_visible",
# optional / extended
"severity", "social_intent", "risk_assessment",
}
stray = [k for k in list(result) if k not in _VALID_FIELDS]
if stray:
log.info(f"Dropping unknown fields from Cosmos output: {stray}")
for k in stray:
result.pop(k)
# Flatten dict-type "object" field
obj = result.get("object")
if isinstance(obj, dict):
priority = ["person", "robot", "slipper", "shoe", "obstacle", "wall", "clear"]
flat = "unknown"
for key in priority:
if obj.get(key):
flat = key
items = obj[key]
if isinstance(items, list) and items and not result.get("object_name"):
result["object_name"] = str(items[0])
break
elif key in obj:
flat = key
result["object"] = flat
# Flatten list-type "object_name"
name = result.get("object_name")
if isinstance(name, list):
result["object_name"] = ", ".join(str(x) for x in name if x) or None
# Infer category from name when object is "unknown"
result["object"] = _infer_category(result.get("object", "unknown"),
result.get("object_name"))
# ── Remap non-standard object strings Cosmos invents ─────────────────────
raw_obj = str(result.get("object", "unknown")).lower().strip()
if raw_obj in _OBJ_REMAP:
log.info(f"Remapping object '{raw_obj}' → '{_OBJ_REMAP[raw_obj]}'")
result["object"] = _OBJ_REMAP[raw_obj]
elif "_" in raw_obj or " " in raw_obj:
for key, val in _OBJ_REMAP.items():
if key in raw_obj:
log.info(f"Remapping object '{raw_obj}' → '{val}' (partial match '{key}')")
result["object"] = val
break
# ── Normalize action to canonical set ────────────────────────────────────
_VALID_ACTIONS = {"forward", "backward", "left", "right", "slow",
"stop", "navigate_around", "turn_left", "turn_right", "turn_back"}
raw_action = str(result.get("action", "forward")).lower().strip()
if raw_action not in _VALID_ACTIONS:
_ACTION_MAP = {
"move_forward": "forward", "go_forward": "forward", "continue": "forward",
"move": "forward", "proceed": "forward", "advance": "forward",
"go": "forward", "drive": "forward", "go_ahead": "forward",
"turn": "turn_right", "avoid": "navigate_around", "reverse": "backward",
"back_up": "backward", "back": "backward", "halt": "stop", "pause": "stop",
}
normalized = _ACTION_MAP.get(raw_action)
if not normalized:
normalized = "forward" if "forward" in raw_action else "stop"
log.info(f"Normalized action '{raw_action}' → '{normalized}'")
result["action"] = normalized
# ── Consistency fix: if object matches mission target, target_visible must be True ──
# Cosmos sometimes sees the target but second-guesses target_visible=False.
# If the object field matches any keyword in mission_target_objects, force True.
_obj_val = str(result.get("object", "")).lower()
_name_val = str(result.get("object_name", "") or "").lower()
_targets = [t.lower() for t in (_ms.mission_target_objects or [])]
if _obj_val not in ("", "unknown", "clear") and not result.get("target_visible"):
# Check if object or object_name matches any target keyword
# Match only if object/name is a meaningful part of a target keyword.
# Use word-boundary logic: "person" matches "injured person" but
# "trap" must NOT match "trapped person" (partial word, not a real match).
import re as _re
def _word_match(obj, target):
# obj word must appear as a whole word in target, or target word in obj
obj_words = set(obj.split())
tgt_words = set(target.split())
return bool(obj_words & tgt_words) # shared whole words only
_matched = any(
(_word_match(_obj_val, kw) or _word_match(_name_val, kw))
for kw in _targets
) if _targets else False
if _matched:
log.info(f"Auto-correcting target_visible=True (object={_obj_val} matched targets={_targets})")
result["target_visible"] = True
# ── Note: stop→forward auto-correction removed.
# Cosmos saying stop with no explicit obstacle flag is valid —
# it may have seen something the sensor fields don't capture.
# Hardware sensor overrides in _quick_scan/_nav_check handle false stops.
# Stringify any remaining list/dict in string fields
for field in ("terrain", "distance", "target_direction",
"clearest_direction", "action", "physical_reasoning"):
val = result.get(field)
if isinstance(val, (list, dict)):
result[field] = str(val)
# Fill missing keys from fallback
for k, v in fallback.items():
result.setdefault(k, v)
# ── Print ──────────────────────────────────────────────────────────────
print(f"\n{'─'*60}")
print(f"🧠 {label}:")
for k, v in result.items():
icon = ""
if k == "object" and v not in ("clear", "unknown"): icon = " ⚠️ "
if k == "wall_ahead" and v: icon = " 🚧 "
if k == "obstacle_close" and v: icon = " 🚧 "
if k == "small_obstacle" and v: icon = " ⚠️ "
if k == "target_visible" and v: icon = " 🎯 "
if k == "detection_confidence": # hidden from display
continue
#if False and isinstance(v, float):
# icon = f" {'✅' if v >= DETECTION_CONFIDENCE_MIN else '❌ LOW'}"
if k == "mission_complete" and v: icon = " 🏆 "
if k == "speak" and v: icon = " 🔊 "
print(f" {k:25s}: {v}{icon}")
print(f"{'─'*60}\n")
return result