-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpre_commit_code_review.py
More file actions
executable file
·345 lines (291 loc) · 12.9 KB
/
pre_commit_code_review.py
File metadata and controls
executable file
·345 lines (291 loc) · 12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
"""Run specfact code review as a staged-file pre-commit gate (modules repo).
Writes a machine-readable JSON report to ``.specfact/code-review.json`` (gitignored)
so IDEs and Copilot can read findings; exit code still reflects the governed CI verdict.
If ``specfact_cli`` is not installed, attempts ``hatch run dev-deps`` / ``ensure_core_dependency``
(sibling ``specfact-cli`` checkout) before failing.
"""
# CrossHair: ignore
# This helper shells out to the CLI and is intentionally side-effecting.
from __future__ import annotations
import importlib
import importlib.util
import json
import os
import subprocess
import sys
from collections.abc import Callable, Sequence
from pathlib import Path
from subprocess import TimeoutExpired
from typing import Any, cast
from icontract import ensure, require
REPO_ROOT = Path(__file__).resolve().parents[1]
def _load_dev_bootstrap() -> Any:
"""Load ``specfact_cli_modules.dev_bootstrap`` without package install assumptions."""
module_path = REPO_ROOT / "src" / "specfact_cli_modules" / "dev_bootstrap.py"
spec = importlib.util.spec_from_file_location("specfact_cli_modules.dev_bootstrap", module_path)
if spec is None or spec.loader is None:
raise RuntimeError(f"Could not load dev bootstrap module from {module_path}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
_dev_bootstrap = _load_dev_bootstrap()
ensure_core_dependency = cast(Callable[[Path], int], _dev_bootstrap.ensure_core_dependency)
apply_specfact_workspace_env = _dev_bootstrap.apply_specfact_workspace_env
# Default matches dogfood / OpenSpec: machine-readable report under ignored ``.specfact/``.
REVIEW_JSON_OUT = ".specfact/code-review.json"
def _is_review_gate_path(path: str) -> bool:
"""Return whether a repo-relative path should participate in the pre-commit review gate."""
normalized = path.replace("\\", "/").strip()
if not normalized:
return False
if normalized.endswith("module-package.yaml"):
return False
if normalized.startswith("openspec/changes/") and Path(normalized).name.casefold() == "tdd_evidence.md":
return False
prefixes = (
"packages/",
"registry/",
"scripts/",
"tools/",
"tests/",
"openspec/changes/",
)
return any(normalized.startswith(prefix) for prefix in prefixes)
@require(lambda paths: paths is not None)
@ensure(lambda result: len(result) == len(set(result)))
def filter_review_gate_paths(paths: Sequence[str]) -> list[str]:
"""Return staged paths under contract- and tooling-heavy trees for the review gate."""
seen: set[str] = set()
filtered: list[str] = []
for path in paths:
if not _is_review_gate_path(path):
continue
if path in seen:
continue
seen.add(path)
filtered.append(path)
return filtered
def _specfact_review_paths(paths: Sequence[str]) -> list[str]:
"""Paths to pass to SpecFact ``code review run`` (Python sources only; skip Markdown and non-.py/.pyi)."""
result: list[str] = []
for raw in paths:
normalized = raw.replace("\\", "/").strip()
if normalized.startswith("openspec/changes/") and normalized.lower().endswith(".md"):
continue
if not normalized.endswith((".py", ".pyi")):
continue
result.append(raw)
return result
@require(lambda files: files is not None)
@ensure(lambda result: result[:5] == [sys.executable, "-m", "specfact_cli.cli", "code", "review"])
@ensure(lambda result: "--json" in result and "--out" in result)
@ensure(lambda result: REVIEW_JSON_OUT in result)
def build_review_command(files: Sequence[str]) -> list[str]:
"""Build ``code review run --json --out …`` so findings are written for tooling."""
return [
sys.executable,
"-m",
"specfact_cli.cli",
"code",
"review",
"run",
"--json",
"--out",
REVIEW_JSON_OUT,
*files,
]
def _repo_root() -> Path:
"""Repository root (parent of ``scripts/``)."""
return REPO_ROOT
def _report_path(repo_root: Path) -> Path:
"""Absolute path to the machine-readable review report."""
return repo_root / REVIEW_JSON_OUT
def _prepare_report_path(repo_root: Path) -> Path:
"""Create the review-report directory and clear any stale report file."""
report_path = _report_path(repo_root)
report_path.parent.mkdir(parents=True, exist_ok=True)
if report_path.is_file():
report_path.unlink()
return report_path
def _run_review_subprocess(
cmd: list[str],
repo_root: Path,
files: Sequence[str],
) -> subprocess.CompletedProcess[str] | None:
"""Run the nested SpecFact review command and handle timeout reporting."""
env = os.environ.copy()
# Ensure nested `python -m specfact_cli.cli` bootstraps this checkout's bundle sources first
# (see `specfact_cli/__init__.py::_bootstrap_bundle_paths`) so ~/.specfact/modules tarballs do not
# shadow in-repo `specfact_code_review` during the pre-commit gate.
env["SPECFACT_MODULES_REPO"] = str(repo_root.resolve())
env["SPECFACT_CLI_MODULES_REPO"] = str(repo_root.resolve())
code_review_src = repo_root / "packages" / "specfact-code-review" / "src"
if code_review_src.is_dir():
prefix = str(code_review_src)
previous = env.get("PYTHONPATH", "").strip()
env["PYTHONPATH"] = f"{prefix}{os.pathsep}{previous}" if previous else prefix
try:
return subprocess.run(
cmd,
check=False,
text=True,
capture_output=True,
cwd=str(repo_root),
env=env,
timeout=300,
)
except TimeoutExpired:
joined_cmd = " ".join(cmd)
sys.stderr.write(f"Code review gate timed out after 300s (command: {joined_cmd!r}, files: {list(files)!r}).\n")
return None
def _emit_completed_output(result: subprocess.CompletedProcess[str]) -> None:
"""Forward captured subprocess output to stderr when the JSON report is missing."""
if result.stdout:
sys.stderr.write(result.stdout if result.stdout.endswith("\n") else result.stdout + "\n")
if result.stderr:
sys.stderr.write(result.stderr if result.stderr.endswith("\n") else result.stderr + "\n")
def _missing_report_exit_code(
report_path: Path,
result: subprocess.CompletedProcess[str],
) -> int:
"""Return the gate exit code when the nested review run failed to create its JSON report."""
_emit_completed_output(result)
sys.stderr.write(
f"Code review: expected review report at {report_path.relative_to(_repo_root())} but it was not created.\n",
)
return result.returncode if result.returncode != 0 else 1
def _classify_severity(item: object) -> str:
"""Map one review finding to a bucket name."""
if not isinstance(item, dict):
return "other"
row = cast(dict[str, Any], item)
raw = row.get("severity")
if not isinstance(raw, str):
return "other"
key = raw.lower().strip()
if key in ("error", "err"):
return "error"
if key in ("warning", "warn"):
return "warning"
if key in ("advisory", "advise"):
return "advisory"
if key == "info":
return "info"
return "other"
@require(lambda findings: findings is not None)
@ensure(lambda result: set(result) == {"error", "warning", "advisory", "info", "other"})
def count_findings_by_severity(findings: list[object]) -> dict[str, int]:
"""Bucket review findings by severity (unknown severities go to ``other``)."""
buckets = {"error": 0, "warning": 0, "advisory": 0, "info": 0, "other": 0}
for item in findings:
buckets[_classify_severity(item)] += 1
return buckets
def _print_review_findings_summary(repo_root: Path) -> tuple[bool, int | None, int | None]:
"""Parse ``REVIEW_JSON_OUT``, print counts, return ``(ok, error_count, ci_exit_code)``.
Callers should use ``ci_exit_code`` as the hook exit code; ``error_count`` is informational only
because fixable error-severity findings may still yield a passing ``ci_exit_code``.
"""
report_path = _report_path(repo_root)
if not report_path.is_file():
sys.stderr.write(f"Code review: no report file at {REVIEW_JSON_OUT} (could not print findings summary).\n")
return False, None, None
try:
data = json.loads(report_path.read_text(encoding="utf-8"))
except (OSError, UnicodeDecodeError) as exc:
sys.stderr.write(f"Code review: could not read {REVIEW_JSON_OUT}: {exc}\n")
return False, None, None
except json.JSONDecodeError as exc:
sys.stderr.write(f"Code review: invalid JSON in {REVIEW_JSON_OUT}: {exc}\n")
return False, None, None
if not isinstance(data, dict):
sys.stderr.write(f"Code review: expected top-level JSON object in {REVIEW_JSON_OUT}.\n")
return False, None, None
findings_raw = data.get("findings")
if not isinstance(findings_raw, list):
sys.stderr.write(f"Code review: report has no findings list in {REVIEW_JSON_OUT}.\n")
return False, None, None
counts = count_findings_by_severity(findings_raw)
total = len(findings_raw)
verdict = data.get("overall_verdict", "?")
ci_exit_code = data.get("ci_exit_code")
if ci_exit_code not in {0, 1}:
ci_exit_code = 1 if verdict == "FAIL" else 0
parts = [
f"errors={counts['error']}",
f"warnings={counts['warning']}",
f"advisory={counts['advisory']}",
]
if counts["info"]:
parts.append(f"info={counts['info']}")
if counts["other"]:
parts.append(f"other={counts['other']}")
summary = ", ".join(parts)
sys.stderr.write(f"Code review summary: {total} finding(s) ({summary}); overall_verdict={verdict!r}.\n")
abs_report = report_path.resolve()
sys.stderr.write(f"Code review report file: {REVIEW_JSON_OUT}\n")
sys.stderr.write(f" absolute path: {abs_report}\n")
sys.stderr.write("Copy-paste for Copilot or Cursor:\n")
sys.stderr.write(
f" Read `{REVIEW_JSON_OUT}` and fix every finding (errors first), using file and line from each entry.\n"
)
sys.stderr.write(f" @workspace Open `{REVIEW_JSON_OUT}` and remediate each item in `findings`.\n")
return True, counts["error"], ci_exit_code
@ensure(lambda result: isinstance(result, tuple) and len(result) == 2)
@ensure(lambda result: isinstance(result[0], bool) and (result[1] is None or isinstance(result[1], str)))
def ensure_runtime_available() -> tuple[bool, str | None]:
"""Verify the current Python environment can import SpecFact CLI; try local sibling install."""
try:
importlib.import_module("specfact_cli.cli")
except ModuleNotFoundError:
root = _repo_root()
if ensure_core_dependency(root) != 0:
return (
False,
"Could not install local specfact-cli. Run `hatch run dev-deps` or set SPECFACT_CLI_REPO.",
)
try:
importlib.import_module("specfact_cli.cli")
except ModuleNotFoundError:
return (
False,
"specfact_cli still not importable after ensure_core_dependency; check sibling checkout.",
)
return True, None
@ensure(lambda result: isinstance(result, int))
def main(argv: Sequence[str] | None = None) -> int:
"""Run the code review gate; write JSON under ``.specfact/`` and return CLI exit code."""
apply_specfact_workspace_env(REPO_ROOT)
files = filter_review_gate_paths(list(argv or []))
if len(files) == 0:
sys.stdout.write(
"No staged review-relevant files under packages/, registry/, scripts/, tools/, tests/, "
"or openspec/changes/; skipping code review gate.\n"
)
return 0
specfact_files = _specfact_review_paths(files)
if len(specfact_files) == 0:
sys.stdout.write(
"Staged review paths are only OpenSpec Markdown under openspec/changes/; "
"skipping SpecFact code review (no staged .py/.pyi targets; Markdown is not passed to SpecFact).\n"
)
return 0
available, guidance = ensure_runtime_available()
if available is False:
sys.stdout.write(f"Unable to run the code review gate. {guidance}\n")
return 1
repo_root = _repo_root()
cmd = build_review_command(specfact_files)
report_path = _prepare_report_path(repo_root)
result = _run_review_subprocess(cmd, repo_root, specfact_files)
if result is None:
return 1
if not report_path.is_file():
return _missing_report_exit_code(report_path, result)
# Do not echo nested `specfact code review run` stdout/stderr (verbose tool banners); full report
# is in REVIEW_JSON_OUT; we print a short summary on stderr below.
summary_ok, _error_count, ci_exit_code = _print_review_findings_summary(repo_root)
if not summary_ok or ci_exit_code is None:
return 1
return int(ci_exit_code)
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))