Source code for codecrate.unpacker

from __future__ import annotations

import hashlib
import re
import warnings
from dataclasses import dataclass
from pathlib import Path
from typing import Literal

from .formats import PACK_FORMAT_VERSION
from .ids import MARKER_NAMESPACE
from .manifest import manifest_sha256
from .mdparse import parse_packed_markdown
from .repositories import split_repository_sections
from .udiff import ensure_parent_dir, normalize_newlines

_MARK_RE = re.compile(
    rf"{MARKER_NAMESPACE}:(?:v\d+:)?(?P<id>[0-9A-Fa-f]{{8}})",
)


[docs] @dataclass(frozen=True) class UnpackIssue: severity: Literal["warning", "error"] path: str | None message: str
def _record_warning(issues: list[UnpackIssue], path: str | None, message: str) -> None: issues.append(UnpackIssue(severity="warning", path=path, message=message)) warnings.warn(message, RuntimeWarning, stacklevel=3) def _raise_if_warning_failure(issues: list[UnpackIssue]) -> None: warnings_found = [issue for issue in issues if issue.severity == "warning"] if not warnings_found: return first = warnings_found[0] suffix = f": {first.message}" if first.message else "" raise ValueError(f"Unpack warnings encountered{suffix}") def _verify_machine_header(machine_header: dict | None, manifest: dict) -> None: if machine_header is None: raise ValueError("Machine header check requested but no machine header found.") got_manifest_sha = str(machine_header.get("manifest_sha256") or "") if not got_manifest_sha: raise ValueError( "Machine header check requested but manifest_sha256 is missing." ) exp_manifest_sha = manifest_sha256(manifest) if got_manifest_sha != exp_manifest_sha: raise ValueError( "Machine header checksum mismatch: " f"expected {exp_manifest_sha}, got {got_manifest_sha}" ) def _ws_len(s: str) -> int: return len(s) - len(s.lstrip(" \t")) def _apply_canonical_into_stub( stub: str, defs: list[dict], canonical: dict[str, str], *, strict: bool = False, issues: list[str] | None = None, ) -> str: """ Reconstruct original by locating FUNC:<id> markers in the stub and replacing the surrounding def region (decorators + def + stubbed body/docstring) with the canonical code. Does not rely on line-number alignment. Marker semantics: - New packs use local_id in stub markers (unique per occurrence). - Canonical code is still fetched by id (deduped across identical bodies). - For backwards compatibility, we also accept markers keyed by id. """ lines = stub.splitlines(keepends=True) # Allow multiple occurrences of the same marker id (older dedupe packs). marker_lines_for: dict[str, list[int]] = {} def _record_issue(message: str) -> None: if issues is not None: issues.append(message) if strict: raise ValueError(message) for i, ln in enumerate(lines): m = _MARK_RE.search(ln) if m: marker_lines_for.setdefault(m.group("id").upper(), []).append(i) # Apply bottom-up so indices remain stable. work: list[tuple[int, dict, str]] = [] for d in defs: cid = d.get("id") or d.get("local_id") if not cid: _record_issue("definition missing both id and local_id") continue # Prefer locating the marker by local_id (unique), but fall back to cid for # older packs. marker_key = d.get("local_id") or cid idxs = marker_lines_for.get(str(marker_key).upper()) if not idxs and str(cid).upper() != str(marker_key).upper(): idxs = marker_lines_for.get(str(cid).upper()) if not idxs: _record_issue( "missing marker for " f"{d.get('qualname') or '<unknown>'} " f"(local_id={d.get('local_id') or '∅'}, id={cid})" ) continue mi = idxs.pop() # consume the bottom-most occurrence work.append((mi, d, str(cid))) work.sort(key=lambda t: t[0], reverse=True) for mi, d, cid in work: # Fetch canonical by cid first, then fall back to local_id. code = canonical.get(cid) if code is None: alt = d.get("local_id") if alt: code = canonical.get(str(alt)) if code is None: _record_issue( "missing canonical source for " f"{d.get('qualname') or '<unknown>'} " f"(id={cid}, local_id={d.get('local_id') or '∅'})" ) continue # Find def line above (supports single-line defs where marker is on def line). def_i = mi while def_i >= 0: s = lines[def_i].lstrip(" \t") if s.startswith("def ") or s.startswith("async def "): break def_i -= 1 if def_i < 0: _record_issue( "unable to locate def line above marker for " f"{d.get('qualname') or '<unknown>'}" ) continue def_indent = _ws_len(lines[def_i]) # Include decorators directly above the def. start_i = def_i j = def_i - 1 while j >= 0: if _ws_len(lines[j]) == def_indent and lines[j].lstrip(" \t").startswith( "@" ): start_i = j j -= 1 continue break # Replace through the marker line (or just the def line for single-line defs). end_i = (def_i + 1) if mi == def_i else (mi + 1) repl = code.splitlines(keepends=True) if repl and not repl[-1].endswith("\n"): repl[-1] = repl[-1] + "\n" lines[start_i:end_i] = repl return "".join(lines) def _unpack_single_markdown( markdown_text: str, out_dir: Path, *, strict: bool, check_machine_header: bool, issues: list[UnpackIssue], ) -> None: packed = parse_packed_markdown(markdown_text) manifest = packed.manifest if manifest.get("format") != PACK_FORMAT_VERSION: raise ValueError(f"Unsupported format: {manifest.get('format')}") if check_machine_header: _verify_machine_header(packed.machine_header, manifest) out_dir = out_dir.resolve() missing: list[str] = [] for f in manifest.get("files", []): rel = f["path"] stub = packed.stubbed_files.get(rel) exp = f.get("line_count") exp_n = int(exp) if exp is not None else None has_defs = bool(f.get("defs")) if stub is None or (has_defs and exp_n and exp_n > 0 and not stub.strip()): missing.append(rel) continue defs = f.get("defs", []) marker_issues: list[str] = [] reconstructed = _apply_canonical_into_stub( stub, defs, packed.canonical_sources, strict=strict, issues=marker_issues, ) reconstructed = normalize_newlines(reconstructed) if marker_issues: msg = ( f"Unresolved marker mapping for {rel}: " + "; ".join(marker_issues[:5]) + ("; ..." if len(marker_issues) > 5 else "") ) _record_warning(issues, rel, msg) exp_sha = f.get("sha256_original") if exp_sha: got_sha = hashlib.sha256(reconstructed.encode("utf-8")).hexdigest() if got_sha != exp_sha: _record_warning( issues, rel, f"SHA256 mismatch for {rel}: expected {exp_sha}, got {got_sha}", ) # Prevent path traversal / writing outside out_dir target = (out_dir / rel).resolve() if out_dir != target and out_dir not in target.parents: raise ValueError(f"Refusing to write outside out_dir: {rel}") ensure_parent_dir(target) target.write_text(reconstructed, encoding="utf-8", newline="\n") if missing: files_str = ", ".join(missing[:10]) if len(missing) > 10: files_str += "..." msg = f"Missing stubbed file blocks for {len(missing)} file(s): {files_str}" _record_warning(issues, None, msg)
[docs] def unpack_to_dir( markdown_text: str, out_dir: Path, *, strict: bool = False, fail_on_warning: bool = False, check_machine_header: bool = False, ) -> list[UnpackIssue]: issues: list[UnpackIssue] = [] sections = split_repository_sections(markdown_text) if not sections: _unpack_single_markdown( markdown_text, out_dir, strict=strict, check_machine_header=check_machine_header, issues=issues, ) if fail_on_warning: _raise_if_warning_failure(issues) return issues out_root = out_dir.resolve() for section in sections: _unpack_single_markdown( section.content, out_root / section.slug, strict=strict, check_machine_header=check_machine_header, issues=issues, ) if fail_on_warning: _raise_if_warning_failure(issues) return issues