1from __future__ import annotations
2
3import json
4import subprocess
5import sys
6import tomllib
7from pathlib import Path
8from typing import Any
9
10import click
11
12from plain.cli import register_cli
13from plain.cli.print import print_event
14from plain.cli.runtime import common_command, without_runtime_setup
15
16from .annotations import AnnotationResult, check_annotations
17from .oxc import OxcTool, install_oxc
18
19DEFAULT_RUFF_CONFIG = Path(__file__).parent / "ruff_defaults.toml"
20
21
22@without_runtime_setup
23@register_cli("code")
24@click.group()
25def cli() -> None:
26 """Code formatting and linting"""
27 pass
28
29
30@without_runtime_setup
31@cli.command()
32@click.option("--force", is_flag=True, help="Reinstall even if up to date")
33@click.pass_context
34def install(ctx: click.Context, force: bool) -> None:
35 """Install or update oxlint and oxfmt binaries"""
36 config = get_code_config()
37
38 if not config.get("oxc", {}).get("enabled", True):
39 click.secho("Oxc is disabled in configuration", fg="yellow")
40 return
41
42 oxlint = OxcTool("oxlint")
43
44 if force or not oxlint.is_installed() or oxlint.needs_update():
45 version_to_install = config.get("oxc", {}).get("version", "")
46 if version_to_install:
47 click.secho(
48 f"Installing oxlint and oxfmt {version_to_install}...",
49 bold=True,
50 nl=False,
51 )
52 installed = install_oxc(version_to_install)
53 click.secho(f"oxlint and oxfmt {installed} installed", fg="green")
54 else:
55 ctx.invoke(update)
56 else:
57 click.secho("oxlint and oxfmt already installed", fg="green")
58
59
60@without_runtime_setup
61@cli.command()
62def update() -> None:
63 """Update oxlint and oxfmt to latest version"""
64 config = get_code_config()
65
66 if not config.get("oxc", {}).get("enabled", True):
67 click.secho("Oxc is disabled in configuration", fg="yellow")
68 return
69
70 click.secho("Updating oxlint and oxfmt...", bold=True)
71 version = install_oxc()
72 click.secho(f"oxlint and oxfmt {version} installed", fg="green")
73
74
75def _partition_paths(paths: tuple[str, ...]) -> tuple[tuple[str, ...], tuple[str, ...]]:
76 """Split paths into (python_paths, other_paths).
77
78 Directories go into both groups. Files are routed by extension.
79 """
80 python_paths: list[str] = []
81 other_paths: list[str] = []
82 for p in paths:
83 if Path(p).is_dir():
84 python_paths.append(p)
85 other_paths.append(p)
86 elif Path(p).suffix == ".py":
87 python_paths.append(p)
88 else:
89 other_paths.append(p)
90 return tuple(python_paths), tuple(other_paths)
91
92
93@without_runtime_setup
94@cli.command()
95@click.pass_context
96@click.argument("paths", nargs=-1)
97@click.option("--skip-ruff", is_flag=True, help="Skip Ruff checks")
98@click.option("--skip-ty", is_flag=True, help="Skip ty type checks")
99@click.option("--skip-oxc", is_flag=True, help="Skip oxlint and oxfmt checks")
100@click.option("--skip-annotations", is_flag=True, help="Skip type annotation checks")
101def check(
102 ctx: click.Context,
103 paths: tuple[str, ...],
104 skip_ruff: bool,
105 skip_ty: bool,
106 skip_oxc: bool,
107 skip_annotations: bool,
108) -> None:
109 """Check for formatting and linting issues"""
110 if not paths:
111 paths = (".",)
112
113 python_paths, other_paths = _partition_paths(paths)
114 ruff_args = ["--config", str(DEFAULT_RUFF_CONFIG)]
115 config = get_code_config()
116
117 for e in config.get("exclude", []):
118 ruff_args.extend(["--exclude", e])
119
120 def maybe_exit(return_code: int) -> None:
121 if return_code != 0:
122 click.secho(
123 "\nCode check failed. Run `plain fix` and/or fix issues manually.",
124 fg="red",
125 err=True,
126 )
127 sys.exit(return_code)
128
129 if not skip_ruff and python_paths:
130 print_event("ruff check...", newline=False)
131 result = subprocess.run(["ruff", "check", *python_paths, *ruff_args])
132 maybe_exit(result.returncode)
133
134 print_event("ruff format --check...", newline=False)
135 result = subprocess.run(
136 ["ruff", "format", *python_paths, "--check", *ruff_args]
137 )
138 maybe_exit(result.returncode)
139
140 if not skip_ty and python_paths and config.get("ty", {}).get("enabled", True):
141 print_event("ty check...", newline=False)
142 ty_args = ["ty", "check", *python_paths, "--no-progress"]
143 for e in config.get("exclude", []):
144 ty_args.extend(["--exclude", e])
145 result = subprocess.run(ty_args)
146 maybe_exit(result.returncode)
147
148 if not skip_oxc and other_paths and config.get("oxc", {}).get("enabled", True):
149 oxlint = OxcTool("oxlint")
150 oxfmt = OxcTool("oxfmt")
151
152 if oxlint.needs_update():
153 ctx.invoke(install)
154
155 print_event("oxlint...", newline=False)
156 result = oxlint.invoke(*other_paths)
157 maybe_exit(result.returncode)
158
159 print_event("oxfmt --check...", newline=False)
160 result = oxfmt.invoke("--check", *other_paths)
161 maybe_exit(result.returncode)
162
163 if (
164 not skip_annotations
165 and python_paths
166 and config.get("annotations", {}).get("enabled", True)
167 ):
168 print_event("annotations...", newline=False)
169 # Combine top-level exclude with annotation-specific exclude
170 exclude_patterns = list(config.get("exclude", []))
171 exclude_patterns.extend(config.get("annotations", {}).get("exclude", []))
172 ann_result = check_annotations(
173 *python_paths, exclude_patterns=exclude_patterns or None
174 )
175 if ann_result.missing_count > 0:
176 click.secho(
177 f"{ann_result.missing_count} functions are untyped",
178 fg="red",
179 )
180 click.secho("Run 'plain code annotations --details' for details")
181 maybe_exit(1)
182 else:
183 click.secho("All functions typed!", fg="green")
184
185
186@without_runtime_setup
187@cli.command()
188@click.argument("paths", nargs=-1)
189@click.option("--details", is_flag=True, help="List untyped functions")
190@click.option("--json", "as_json", is_flag=True, help="Output as JSON")
191def annotations(paths: tuple[str, ...], details: bool, as_json: bool) -> None:
192 """Check type annotation status"""
193 if not paths:
194 paths = (".",)
195 config = get_code_config()
196 # Combine top-level exclude with annotation-specific exclude
197 exclude_patterns = list(config.get("exclude", []))
198 exclude_patterns.extend(config.get("annotations", {}).get("exclude", []))
199 result = check_annotations(*paths, exclude_patterns=exclude_patterns or None)
200 if as_json:
201 _print_annotations_json(result)
202 else:
203 _print_annotations_report(result, show_details=details)
204
205
206def _print_annotations_report(
207 result: AnnotationResult,
208 show_details: bool = False,
209) -> None:
210 """Print the annotation report with colors."""
211 if result.total_functions == 0:
212 click.echo("No functions found")
213 return
214
215 # Detailed output first (if enabled and there are untyped functions)
216 if show_details and result.missing_count > 0:
217 # Collect all untyped functions with full paths
218 untyped_items: list[tuple[str, str, int, list[str]]] = []
219
220 for stats in result.file_stats:
221 for func in stats.functions:
222 if not func.is_fully_typed:
223 issues = []
224 if not func.has_return_type:
225 issues.append("return type")
226 missing_params = func.total_params - func.typed_params
227 if missing_params > 0:
228 param_word = "param" if missing_params == 1 else "params"
229 issues.append(f"{missing_params} {param_word}")
230 untyped_items.append((stats.path, func.name, func.line, issues))
231
232 # Sort by file path, then line number
233 untyped_items.sort(key=lambda x: (x[0], x[2]))
234
235 # Print each untyped function
236 for file_path, func_name, line, issues in untyped_items:
237 location = click.style(f"{file_path}:{line}", fg="cyan")
238 issue_str = click.style(f"({', '.join(issues)})", dim=True)
239 click.echo(f"{location} {func_name} {issue_str}")
240
241 click.echo()
242
243 # Summary line
244 pct = result.coverage_percentage
245 color = "green" if result.missing_count == 0 else "red"
246 click.secho(
247 f"{pct:.1f}% typed ({result.fully_typed_functions}/{result.total_functions} functions)",
248 fg=color,
249 )
250
251 # Code smell indicators (only if present)
252 smells = []
253 if result.total_ignores > 0:
254 smells.append(f"{result.total_ignores} ignore")
255 if result.total_casts > 0:
256 smells.append(f"{result.total_casts} cast")
257 if result.total_asserts > 0:
258 smells.append(f"{result.total_asserts} assert")
259 if smells:
260 click.secho(f"{', '.join(smells)}", fg="yellow")
261
262
263def _print_annotations_json(result: AnnotationResult) -> None:
264 """Print the annotation report as JSON."""
265 output = {
266 "overall_coverage": result.coverage_percentage,
267 "total_functions": result.total_functions,
268 "fully_typed_functions": result.fully_typed_functions,
269 "total_ignores": result.total_ignores,
270 "total_casts": result.total_casts,
271 "total_asserts": result.total_asserts,
272 }
273 click.echo(json.dumps(output))
274
275
276@common_command
277@without_runtime_setup
278@register_cli("fix", shortcut_for="code fix")
279@cli.command()
280@click.pass_context
281@click.argument("paths", nargs=-1)
282@click.option("--unsafe-fixes", is_flag=True, help="Apply ruff unsafe fixes")
283@click.option("--add-noqa", is_flag=True, help="Add noqa comments to suppress errors")
284def fix(
285 ctx: click.Context, paths: tuple[str, ...], unsafe_fixes: bool, add_noqa: bool
286) -> None:
287 """Fix formatting and linting issues"""
288 if not paths:
289 paths = (".",)
290
291 python_paths, other_paths = _partition_paths(paths)
292 ruff_args = ["--config", str(DEFAULT_RUFF_CONFIG)]
293 config = get_code_config()
294
295 for e in config.get("exclude", []):
296 ruff_args.extend(["--exclude", e])
297
298 if unsafe_fixes and add_noqa:
299 raise click.UsageError("Cannot use both --unsafe-fixes and --add-noqa")
300
301 if python_paths:
302 if unsafe_fixes:
303 print_event("ruff check --fix --unsafe-fixes...", newline=False)
304 result = subprocess.run(
305 ["ruff", "check", *python_paths, "--fix", "--unsafe-fixes", *ruff_args]
306 )
307 elif add_noqa:
308 print_event("ruff check --add-noqa...", newline=False)
309 result = subprocess.run(
310 ["ruff", "check", *python_paths, "--add-noqa", *ruff_args]
311 )
312 else:
313 print_event("ruff check --fix...", newline=False)
314 result = subprocess.run(
315 ["ruff", "check", *python_paths, "--fix", *ruff_args]
316 )
317
318 if result.returncode != 0:
319 sys.exit(result.returncode)
320
321 print_event("ruff format...", newline=False)
322 result = subprocess.run(["ruff", "format", *python_paths, *ruff_args])
323 if result.returncode != 0:
324 sys.exit(result.returncode)
325
326 if other_paths and config.get("oxc", {}).get("enabled", True):
327 oxlint = OxcTool("oxlint")
328 oxfmt = OxcTool("oxfmt")
329
330 if oxlint.needs_update():
331 ctx.invoke(install)
332
333 if unsafe_fixes:
334 print_event("oxlint --fix-dangerously...", newline=False)
335 result = oxlint.invoke(*other_paths, "--fix-dangerously")
336 else:
337 print_event("oxlint --fix...", newline=False)
338 result = oxlint.invoke(*other_paths, "--fix")
339
340 if result.returncode != 0:
341 sys.exit(result.returncode)
342
343 print_event("oxfmt...", newline=False)
344 result = oxfmt.invoke(*other_paths)
345
346 if result.returncode != 0:
347 sys.exit(result.returncode)
348
349
350def get_code_config() -> dict[str, Any]:
351 pyproject = Path("pyproject.toml")
352 if not pyproject.exists():
353 return {}
354 with pyproject.open("rb") as f:
355 return tomllib.load(f).get("tool", {}).get("plain", {}).get("code", {})