1from __future__ import annotations
  2
  3import json
  4import subprocess
  5import sys
  6import tomllib
  7from pathlib import Path
  8from typing import Any
  9
 10import click
 11
 12from plain.cli import register_cli
 13from plain.cli.print import print_event
 14from plain.cli.runtime import common_command, without_runtime_setup
 15
 16from .annotations import AnnotationResult, check_annotations
 17from .oxc import OxcTool, install_oxc
 18
 19DEFAULT_RUFF_CONFIG = Path(__file__).parent / "ruff_defaults.toml"
 20
 21
 22@without_runtime_setup
 23@register_cli("code")
 24@click.group()
 25def cli() -> None:
 26    """Code formatting and linting"""
 27    pass
 28
 29
 30@without_runtime_setup
 31@cli.command()
 32@click.option("--force", is_flag=True, help="Reinstall even if up to date")
 33@click.pass_context
 34def install(ctx: click.Context, force: bool) -> None:
 35    """Install or update oxlint and oxfmt binaries"""
 36    config = get_code_config()
 37
 38    if not config.get("oxc", {}).get("enabled", True):
 39        click.secho("Oxc is disabled in configuration", fg="yellow")
 40        return
 41
 42    oxlint = OxcTool("oxlint")
 43
 44    if force or not oxlint.is_installed() or oxlint.needs_update():
 45        version_to_install = config.get("oxc", {}).get("version", "")
 46        if version_to_install:
 47            click.secho(
 48                f"Installing oxlint and oxfmt {version_to_install}...",
 49                bold=True,
 50                nl=False,
 51            )
 52            installed = install_oxc(version_to_install)
 53            click.secho(f"oxlint and oxfmt {installed} installed", fg="green")
 54        else:
 55            ctx.invoke(update)
 56    else:
 57        click.secho("oxlint and oxfmt already installed", fg="green")
 58
 59
 60@without_runtime_setup
 61@cli.command()
 62def update() -> None:
 63    """Update oxlint and oxfmt to latest version"""
 64    config = get_code_config()
 65
 66    if not config.get("oxc", {}).get("enabled", True):
 67        click.secho("Oxc is disabled in configuration", fg="yellow")
 68        return
 69
 70    click.secho("Updating oxlint and oxfmt...", bold=True)
 71    version = install_oxc()
 72    click.secho(f"oxlint and oxfmt {version} installed", fg="green")
 73
 74
 75def _partition_paths(paths: tuple[str, ...]) -> tuple[tuple[str, ...], tuple[str, ...]]:
 76    """Split paths into (python_paths, other_paths).
 77
 78    Directories go into both groups. Files are routed by extension.
 79    """
 80    python_paths: list[str] = []
 81    other_paths: list[str] = []
 82    for p in paths:
 83        if Path(p).is_dir():
 84            python_paths.append(p)
 85            other_paths.append(p)
 86        elif Path(p).suffix == ".py":
 87            python_paths.append(p)
 88        else:
 89            other_paths.append(p)
 90    return tuple(python_paths), tuple(other_paths)
 91
 92
 93@without_runtime_setup
 94@cli.command()
 95@click.pass_context
 96@click.argument("paths", nargs=-1)
 97@click.option("--skip-ruff", is_flag=True, help="Skip Ruff checks")
 98@click.option("--skip-ty", is_flag=True, help="Skip ty type checks")
 99@click.option("--skip-oxc", is_flag=True, help="Skip oxlint and oxfmt checks")
100@click.option("--skip-annotations", is_flag=True, help="Skip type annotation checks")
101def check(
102    ctx: click.Context,
103    paths: tuple[str, ...],
104    skip_ruff: bool,
105    skip_ty: bool,
106    skip_oxc: bool,
107    skip_annotations: bool,
108) -> None:
109    """Check for formatting and linting issues"""
110    if not paths:
111        paths = (".",)
112
113    python_paths, other_paths = _partition_paths(paths)
114    ruff_args = ["--config", str(DEFAULT_RUFF_CONFIG)]
115    config = get_code_config()
116
117    for e in config.get("exclude", []):
118        ruff_args.extend(["--exclude", e])
119
120    def maybe_exit(return_code: int) -> None:
121        if return_code != 0:
122            click.secho(
123                "\nCode check failed. Run `plain fix` and/or fix issues manually.",
124                fg="red",
125                err=True,
126            )
127            sys.exit(return_code)
128
129    if not skip_ruff and python_paths:
130        print_event("ruff check...", newline=False)
131        result = subprocess.run(["ruff", "check", *python_paths, *ruff_args])
132        maybe_exit(result.returncode)
133
134        print_event("ruff format --check...", newline=False)
135        result = subprocess.run(
136            ["ruff", "format", *python_paths, "--check", *ruff_args]
137        )
138        maybe_exit(result.returncode)
139
140    if not skip_ty and python_paths and config.get("ty", {}).get("enabled", True):
141        print_event("ty check...", newline=False)
142        ty_args = ["ty", "check", *python_paths, "--no-progress"]
143        for e in config.get("exclude", []):
144            ty_args.extend(["--exclude", e])
145        result = subprocess.run(ty_args)
146        maybe_exit(result.returncode)
147
148    if not skip_oxc and other_paths and config.get("oxc", {}).get("enabled", True):
149        oxlint = OxcTool("oxlint")
150        oxfmt = OxcTool("oxfmt")
151
152        if oxlint.needs_update():
153            ctx.invoke(install)
154
155        print_event("oxlint...", newline=False)
156        result = oxlint.invoke(*other_paths)
157        maybe_exit(result.returncode)
158
159        print_event("oxfmt --check...", newline=False)
160        result = oxfmt.invoke("--check", *other_paths)
161        maybe_exit(result.returncode)
162
163    if (
164        not skip_annotations
165        and python_paths
166        and config.get("annotations", {}).get("enabled", True)
167    ):
168        print_event("annotations...", newline=False)
169        # Combine top-level exclude with annotation-specific exclude
170        exclude_patterns = list(config.get("exclude", []))
171        exclude_patterns.extend(config.get("annotations", {}).get("exclude", []))
172        ann_result = check_annotations(
173            *python_paths, exclude_patterns=exclude_patterns or None
174        )
175        if ann_result.missing_count > 0:
176            click.secho(
177                f"{ann_result.missing_count} functions are untyped",
178                fg="red",
179            )
180            click.secho("Run 'plain code annotations --details' for details")
181            maybe_exit(1)
182        else:
183            click.secho("All functions typed!", fg="green")
184
185
186@without_runtime_setup
187@cli.command()
188@click.argument("paths", nargs=-1)
189@click.option("--details", is_flag=True, help="List untyped functions")
190@click.option("--json", "as_json", is_flag=True, help="Output as JSON")
191def annotations(paths: tuple[str, ...], details: bool, as_json: bool) -> None:
192    """Check type annotation status"""
193    if not paths:
194        paths = (".",)
195    config = get_code_config()
196    # Combine top-level exclude with annotation-specific exclude
197    exclude_patterns = list(config.get("exclude", []))
198    exclude_patterns.extend(config.get("annotations", {}).get("exclude", []))
199    result = check_annotations(*paths, exclude_patterns=exclude_patterns or None)
200    if as_json:
201        _print_annotations_json(result)
202    else:
203        _print_annotations_report(result, show_details=details)
204
205
206def _print_annotations_report(
207    result: AnnotationResult,
208    show_details: bool = False,
209) -> None:
210    """Print the annotation report with colors."""
211    if result.total_functions == 0:
212        click.echo("No functions found")
213        return
214
215    # Detailed output first (if enabled and there are untyped functions)
216    if show_details and result.missing_count > 0:
217        # Collect all untyped functions with full paths
218        untyped_items: list[tuple[str, str, int, list[str]]] = []
219
220        for stats in result.file_stats:
221            for func in stats.functions:
222                if not func.is_fully_typed:
223                    issues = []
224                    if not func.has_return_type:
225                        issues.append("return type")
226                    missing_params = func.total_params - func.typed_params
227                    if missing_params > 0:
228                        param_word = "param" if missing_params == 1 else "params"
229                        issues.append(f"{missing_params} {param_word}")
230                    untyped_items.append((stats.path, func.name, func.line, issues))
231
232        # Sort by file path, then line number
233        untyped_items.sort(key=lambda x: (x[0], x[2]))
234
235        # Print each untyped function
236        for file_path, func_name, line, issues in untyped_items:
237            location = click.style(f"{file_path}:{line}", fg="cyan")
238            issue_str = click.style(f"({', '.join(issues)})", dim=True)
239            click.echo(f"{location}  {func_name}  {issue_str}")
240
241        click.echo()
242
243    # Summary line
244    pct = result.coverage_percentage
245    color = "green" if result.missing_count == 0 else "red"
246    click.secho(
247        f"{pct:.1f}% typed ({result.fully_typed_functions}/{result.total_functions} functions)",
248        fg=color,
249    )
250
251    # Code smell indicators (only if present)
252    smells = []
253    if result.total_ignores > 0:
254        smells.append(f"{result.total_ignores} ignore")
255    if result.total_casts > 0:
256        smells.append(f"{result.total_casts} cast")
257    if result.total_asserts > 0:
258        smells.append(f"{result.total_asserts} assert")
259    if smells:
260        click.secho(f"{', '.join(smells)}", fg="yellow")
261
262
263def _print_annotations_json(result: AnnotationResult) -> None:
264    """Print the annotation report as JSON."""
265    output = {
266        "overall_coverage": result.coverage_percentage,
267        "total_functions": result.total_functions,
268        "fully_typed_functions": result.fully_typed_functions,
269        "total_ignores": result.total_ignores,
270        "total_casts": result.total_casts,
271        "total_asserts": result.total_asserts,
272    }
273    click.echo(json.dumps(output))
274
275
276@common_command
277@without_runtime_setup
278@register_cli("fix", shortcut_for="code fix")
279@cli.command()
280@click.pass_context
281@click.argument("paths", nargs=-1)
282@click.option("--unsafe-fixes", is_flag=True, help="Apply ruff unsafe fixes")
283@click.option("--add-noqa", is_flag=True, help="Add noqa comments to suppress errors")
284def fix(
285    ctx: click.Context, paths: tuple[str, ...], unsafe_fixes: bool, add_noqa: bool
286) -> None:
287    """Fix formatting and linting issues"""
288    if not paths:
289        paths = (".",)
290
291    python_paths, other_paths = _partition_paths(paths)
292    ruff_args = ["--config", str(DEFAULT_RUFF_CONFIG)]
293    config = get_code_config()
294
295    for e in config.get("exclude", []):
296        ruff_args.extend(["--exclude", e])
297
298    if unsafe_fixes and add_noqa:
299        raise click.UsageError("Cannot use both --unsafe-fixes and --add-noqa")
300
301    if python_paths:
302        if unsafe_fixes:
303            print_event("ruff check --fix --unsafe-fixes...", newline=False)
304            result = subprocess.run(
305                ["ruff", "check", *python_paths, "--fix", "--unsafe-fixes", *ruff_args]
306            )
307        elif add_noqa:
308            print_event("ruff check --add-noqa...", newline=False)
309            result = subprocess.run(
310                ["ruff", "check", *python_paths, "--add-noqa", *ruff_args]
311            )
312        else:
313            print_event("ruff check --fix...", newline=False)
314            result = subprocess.run(
315                ["ruff", "check", *python_paths, "--fix", *ruff_args]
316            )
317
318        if result.returncode != 0:
319            sys.exit(result.returncode)
320
321        print_event("ruff format...", newline=False)
322        result = subprocess.run(["ruff", "format", *python_paths, *ruff_args])
323        if result.returncode != 0:
324            sys.exit(result.returncode)
325
326    if other_paths and config.get("oxc", {}).get("enabled", True):
327        oxlint = OxcTool("oxlint")
328        oxfmt = OxcTool("oxfmt")
329
330        if oxlint.needs_update():
331            ctx.invoke(install)
332
333        if unsafe_fixes:
334            print_event("oxlint --fix-dangerously...", newline=False)
335            result = oxlint.invoke(*other_paths, "--fix-dangerously")
336        else:
337            print_event("oxlint --fix...", newline=False)
338            result = oxlint.invoke(*other_paths, "--fix")
339
340        if result.returncode != 0:
341            sys.exit(result.returncode)
342
343        print_event("oxfmt...", newline=False)
344        result = oxfmt.invoke(*other_paths)
345
346        if result.returncode != 0:
347            sys.exit(result.returncode)
348
349
350def get_code_config() -> dict[str, Any]:
351    pyproject = Path("pyproject.toml")
352    if not pyproject.exists():
353        return {}
354    with pyproject.open("rb") as f:
355        return tomllib.load(f).get("tool", {}).get("plain", {}).get("code", {})