Source code for codecrate.packer

from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor
from dataclasses import replace
from functools import partial
from pathlib import Path

from .ids import stable_body_hash
from .model import ClassRef, DefRef, FilePack, ImportRef, PackResult
from .ordering import sort_paths
from .parse import parse_symbols
from .stubber import stub_file_text
from .symbol_backend import extract_non_python_symbols


def _extract_canonical_source(text: str, d: DefRef) -> str:
    lines = text.splitlines(keepends=True)
    i0 = max(0, d.decorator_start - 1)
    i1 = min(len(lines), d.end_line)
    return "".join(lines[i0:i1]).rstrip() + "\n"


def _line_count(text: str) -> int:
    return text.count("\n") + 1 if text else 0


def _resolve_worker_count(max_workers: int, item_count: int) -> int:
    if item_count <= 1:
        return 1
    if max_workers > 0:
        return max_workers
    return min(32, item_count)


def _pack_one_file(
    *,
    path: Path,
    root: Path,
    keep_docstrings: bool,
    symbol_backend: str,
    file_texts: dict[Path, str] | None,
    encoding_errors: str,
) -> tuple[
    Path,
    str,
    str,
    str,
    list[ClassRef],
    list[DefRef],
    dict[str, str],
    str,
    str,
    str,
    str,
    list[ImportRef],
    list[str],
    tuple[int, int] | None,
]:
    text = file_texts[path] if file_texts is not None and path in file_texts else ""
    if file_texts is None or path not in file_texts:
        try:
            text = path.read_text(encoding="utf-8", errors=encoding_errors)
        except UnicodeDecodeError as e:
            raise ValueError(
                f"Failed to decode UTF-8 for {path.relative_to(root).as_posix()} "
                f"(encoding_errors={encoding_errors})"
            ) from e
    local_canon: dict[str, str] = {}

    if path.suffix.lower() == ".py":
        try:
            parsed = parse_symbols(path=path, root=root, text=text)
        except SyntaxError:
            classes = []
            defs = []
            imports = []
            exports = []
            module_docstring = None
            file_module = ""
            stubbed = text
            language_detected = "python"
            symbol_backend_requested = "python-ast"
            symbol_backend_used = "python-ast"
            symbol_extraction_status = "syntax-error"
        else:
            classes = parsed.classes
            defs = parsed.defs
            imports = parsed.imports
            exports = parsed.exports
            module_docstring = parsed.module_docstring
            file_module = parsed.module

            for d in defs:
                local_canon[d.local_id] = _extract_canonical_source(text, d)

            stubbed = stub_file_text(text, defs, keep_docstrings=keep_docstrings)
            language_detected = "python"
            symbol_backend_requested = "python-ast"
            symbol_backend_used = "python-ast"
            symbol_extraction_status = "ok"
    else:
        classes = []
        sym = extract_non_python_symbols(
            path=path,
            root=root,
            text=text,
            backend=symbol_backend,
        )
        defs = sym.defs
        imports = sym.imports
        exports = sym.exports
        module_docstring = sym.module_docstring
        file_module = defs[0].module if defs else ""
        stubbed = text
        language_detected = sym.language_detected
        symbol_backend_requested = sym.backend_requested
        symbol_backend_used = sym.backend_used
        symbol_extraction_status = sym.extraction_status

    return (
        path,
        text,
        file_module,
        stubbed,
        classes,
        defs,
        local_canon,
        language_detected,
        symbol_backend_requested,
        symbol_backend_used,
        symbol_extraction_status,
        imports,
        exports,
        module_docstring,
    )


[docs] def pack_repo( root: Path, files: list[Path], keep_docstrings: bool = True, dedupe: bool = False, symbol_backend: str = "auto", *, file_texts: dict[Path, str] | None = None, max_workers: int = 0, encoding_errors: str = "replace", ) -> tuple[PackResult, dict[str, str]]: files = sort_paths(files) filepacks: list[FilePack] = [] all_defs: list[DefRef] = [] all_classes: list[ClassRef] = [] local_canon: dict[str, str] = {} worker_count = _resolve_worker_count(max_workers, len(files)) worker = partial( _pack_one_file, root=root, keep_docstrings=keep_docstrings, symbol_backend=symbol_backend, file_texts=file_texts, encoding_errors=encoding_errors, ) if worker_count == 1: packed_data = [worker(path=path) for path in files] else: with ThreadPoolExecutor(max_workers=worker_count) as pool: packed_data = list(pool.map(lambda p: worker(path=p), files)) for ( path, text, file_module, stubbed, classes, defs, canon_for_file, language_detected, symbol_backend_requested, symbol_backend_used, symbol_extraction_status, imports, exports, module_docstring, ) in packed_data: local_canon.update(canon_for_file) fp = FilePack( path=path, module=file_module, original_text=text, stubbed_text=stubbed, line_count=_line_count(text), classes=classes, defs=defs, imports=imports, exports=exports, module_docstring=module_docstring, language_detected=language_detected, symbol_backend_requested=symbol_backend_requested, symbol_backend_used=symbol_backend_used, symbol_extraction_status=symbol_extraction_status, ) filepacks.append(fp) all_defs.extend(defs) all_classes.extend(classes) canonical_sources: dict[str, str] = {} if not dedupe: canonical_sources = { d.local_id: local_canon[d.local_id] for d in all_defs if d.local_id in local_canon } else: seen_by_hash: dict[str, str] = {} remapped_defs: list[DefRef] = [] for d in all_defs: code = local_canon.get(d.local_id) if code is None: remapped_defs.append(d) continue h = stable_body_hash(code) cid = seen_by_hash.get(h) if cid is None: cid = d.local_id seen_by_hash[h] = cid canonical_sources[cid] = code remapped_defs.append(replace(d, id=cid)) all_defs = remapped_defs defs_by_file: dict[Path, list[DefRef]] = {} for d in all_defs: defs_by_file.setdefault(d.path, []).append(d) filepacks2: list[FilePack] = [] for fp in filepacks: defs2 = defs_by_file.get(fp.path, []) if fp.path.suffix.lower() == ".py": stubbed2 = stub_file_text( fp.original_text, defs2, keep_docstrings=keep_docstrings, ) else: stubbed2 = fp.original_text filepacks2.append( FilePack( path=fp.path, module=fp.module, original_text=fp.original_text, stubbed_text=stubbed2, line_count=fp.line_count, classes=fp.classes, defs=defs2, imports=fp.imports, exports=fp.exports, module_docstring=fp.module_docstring, language_detected=fp.language_detected, symbol_backend_requested=fp.symbol_backend_requested, symbol_backend_used=fp.symbol_backend_used, symbol_extraction_status=fp.symbol_extraction_status, ) ) filepacks = filepacks2 pack = PackResult(root=root, files=filepacks, classes=all_classes, defs=all_defs) return pack, canonical_sources