from __future__ import annotations
import hashlib
import re
from collections import Counter
from dataclasses import dataclass
from pathlib import Path
from .fences import is_fence_close, parse_fence_open
from .formats import FENCE_MACHINE_HEADER, FENCE_MANIFEST, PACK_FORMAT_VERSION
from .ids import ID_FORMAT_VERSION, MARKER_FORMAT_VERSION, MARKER_NAMESPACE
from .manifest import manifest_sha256
from .mdparse import parse_packed_markdown
from .repositories import split_repository_sections
from .udiff import normalize_newlines
from .unpacker import _apply_canonical_into_stub
_MARK_RE = re.compile(rf"{MARKER_NAMESPACE}:(?:v\d+:)?(?P<id>[0-9A-Fa-f]{{8}})")
_ANCHOR_RE = re.compile(r'^\s*<a id="([^"]+)"></a>\s*$')
def _sha256_text(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def _validate_machine_header(
*,
machine_header: dict | None,
manifest: dict,
) -> list[str]:
if machine_header is None:
return []
errors: list[str] = []
got_format = str(machine_header.get("format") or "")
exp_format = str(manifest.get("format") or "")
if got_format and exp_format and got_format != exp_format:
errors.append(
f"Machine header format mismatch: expected {exp_format}, got {got_format}"
)
got_manifest_sha = str(machine_header.get("manifest_sha256") or "")
exp_manifest_sha = manifest_sha256(manifest)
if not got_manifest_sha:
errors.append("Machine header missing manifest_sha256")
elif got_manifest_sha != exp_manifest_sha:
errors.append(
"Machine header checksum mismatch: "
f"expected {exp_manifest_sha}, got {got_manifest_sha}"
)
return errors
[docs]
@dataclass(frozen=True)
class ValidationReport:
errors: list[str]
warnings: list[str]
root_drift_paths: list[str]
redacted_count: int
safety_skip_count: int
@dataclass(frozen=True)
class _FileValidationResult:
errors: list[str]
warnings: list[str]
marker_ids: list[str]
active_marker_ids: list[str]
root_drift_paths: list[str]
def _validate_manifest_structure(markdown_text: str, manifest: dict) -> list[str]:
errors: list[str] = []
file_block_paths = _scan_file_block_paths(markdown_text)
manifest_paths = [
str(f.get("path") or "") for f in manifest.get("files") or [] if f.get("path")
]
file_block_counts = Counter(file_block_paths)
for rel in sorted(path for path, count in file_block_counts.items() if count > 1):
errors.append(f"Duplicate file block for {rel}")
file_block_set = set(file_block_paths)
manifest_path_set = set(manifest_paths)
for rel in sorted(manifest_path_set - file_block_set):
errors.append(f"Manifest file missing from file blocks: {rel}")
for rel in sorted(file_block_set - manifest_path_set):
errors.append(f"File block not present in manifest: {rel}")
referenced_ids = {
str(d.get("id") or "").upper()
for f in manifest.get("files") or []
for d in f.get("defs") or []
if d.get("id")
}
function_library_ids = {
i.upper() for i in _scan_function_library_ids(markdown_text)
}
for orphan in sorted(function_library_ids - referenced_ids):
errors.append(f"Orphan function-library entry: id={orphan}")
return errors
def _is_sha256_hex(value: object) -> bool:
if not isinstance(value, str) or len(value) != 64:
return False
return all(ch in "0123456789abcdef" for ch in value.lower())
def _validate_manifest_schema(manifest: dict) -> list[str]:
errors: list[str] = []
fmt = manifest.get("format")
if fmt != PACK_FORMAT_VERSION:
errors.append(f"Unsupported manifest format: {fmt!r}")
id_fmt = manifest.get("id_format_version")
if id_fmt not in {None, ID_FORMAT_VERSION}:
errors.append(
f"Unsupported id_format_version: {id_fmt!r} (expected {ID_FORMAT_VERSION})"
)
marker_fmt = manifest.get("marker_format_version")
if marker_fmt not in {None, MARKER_FORMAT_VERSION}:
errors.append(
"Unsupported marker_format_version: "
f"{marker_fmt!r} (expected {MARKER_FORMAT_VERSION})"
)
files = manifest.get("files")
if not isinstance(files, list):
errors.append("Manifest 'files' must be a list")
return errors
for i, f in enumerate(files):
if not isinstance(f, dict):
errors.append(f"Manifest file[{i}] must be an object")
continue
rel = f.get("path")
if not isinstance(rel, str) or not rel.strip():
errors.append(f"Manifest file[{i}] has invalid 'path'")
if "line_count" in f and not isinstance(f.get("line_count"), int):
errors.append(f"Manifest file[{i}] has invalid 'line_count'")
if not _is_sha256_hex(f.get("sha256_original")):
errors.append(f"Manifest file[{i}] has invalid 'sha256_original'")
if "sha256_stubbed" in f and not _is_sha256_hex(f.get("sha256_stubbed")):
errors.append(f"Manifest file[{i}] has invalid 'sha256_stubbed'")
defs = f.get("defs")
if defs is None:
continue
if "sha256_stubbed" not in f:
errors.append(
f"Manifest file[{i}] missing 'sha256_stubbed' for stub layout"
)
if not isinstance(defs, list):
errors.append(f"Manifest file[{i}] has invalid 'defs' (must be list)")
continue
for j, d in enumerate(defs):
if not isinstance(d, dict):
errors.append(f"Manifest file[{i}] def[{j}] must be an object")
continue
if not isinstance(d.get("id"), str) or not d.get("id"):
errors.append(f"Manifest file[{i}] def[{j}] has invalid 'id'")
if not isinstance(d.get("local_id"), str) or not d.get("local_id"):
errors.append(f"Manifest file[{i}] def[{j}] has invalid 'local_id'")
if not isinstance(d.get("qualname"), str) or not d.get("qualname"):
errors.append(f"Manifest file[{i}] def[{j}] has invalid 'qualname'")
return errors
def _validate_file_entry(
*,
file_entry: dict,
packed: object,
strict: bool,
root_resolved: Path | None,
encoding_errors: str,
) -> _FileValidationResult:
errors: list[str] = []
warnings: list[str] = []
root_drift_paths: list[str] = []
rel = file_entry.get("path")
if not rel:
return _FileValidationResult(
errors=["Manifest entry missing 'path'"],
warnings=[],
marker_ids=[],
active_marker_ids=[],
root_drift_paths=[],
)
stub = getattr(packed, "stubbed_files", {}).get(rel)
if stub is None:
return _FileValidationResult(
errors=[f"Missing stubbed file block for {rel}"],
warnings=[],
marker_ids=[],
active_marker_ids=[],
root_drift_paths=[],
)
stub_norm = normalize_newlines(stub)
exp_stub = file_entry.get("sha256_stubbed")
got_stub = _sha256_text(stub_norm)
if exp_stub and got_stub != exp_stub:
errors.append(
f"Stub sha mismatch for {rel}: expected {exp_stub}, got {got_stub}"
)
marker_ids = [m.group("id").upper() for m in _MARK_RE.finditer(stub_norm)]
active_marker_ids: list[str] = []
if marker_ids:
c = Counter(marker_ids)
dup = [k for k, v in c.items() if v > 1]
if dup:
warnings.append(f"Marker collision in {rel}: {', '.join(sorted(dup))}")
defs = file_entry.get("defs") or []
canonical_sources = getattr(packed, "canonical_sources", {})
for d in defs:
cid = str(d.get("id") or "").upper()
lid = str(d.get("local_id") or "").upper()
if cid and cid not in canonical_sources:
errors.append(
f"Missing canonical source for {rel}:{d.get('qualname')} id={cid}"
)
if d.get("has_marker") is False:
continue
marker_key = lid or cid
if marker_key:
active_marker_ids.append(marker_key)
if (lid and lid not in marker_ids) and (cid and cid not in marker_ids):
msg = (
f"Missing FUNC marker in stub for {rel}:{d.get('qualname')} "
f"(local_id={lid or '∅'}, id={cid or '∅'})"
)
if strict:
errors.append(msg)
else:
warnings.append(msg)
try:
marker_issues: list[str] = []
reconstructed = _apply_canonical_into_stub(
stub_norm,
defs,
canonical_sources,
strict=False,
issues=marker_issues,
)
reconstructed = normalize_newlines(reconstructed)
except Exception as e: # pragma: no cover
errors.append(f"Failed to reconstruct {rel}: {e}")
return _FileValidationResult(
errors=errors,
warnings=warnings,
marker_ids=marker_ids,
active_marker_ids=active_marker_ids,
root_drift_paths=root_drift_paths,
)
for issue in marker_issues:
msg = f"Unresolved marker mapping for {rel}: {issue}"
if strict:
errors.append(msg)
else:
warnings.append(msg)
exp_orig = file_entry.get("sha256_original")
got_orig = _sha256_text(reconstructed)
if exp_orig and got_orig != exp_orig:
errors.append(
f"Original sha mismatch for {rel}: expected {exp_orig}, got {got_orig}"
)
if root_resolved is not None:
disk_path = root_resolved / str(rel)
if not disk_path.exists():
warnings.append(f"On-disk file missing under root: {rel}")
root_drift_paths.append(str(rel))
else:
try:
disk_text = normalize_newlines(
disk_path.read_text(encoding="utf-8", errors=encoding_errors)
)
except UnicodeDecodeError as e:
errors.append(
f"Failed to decode on-disk file {rel} "
f"(encoding_errors={encoding_errors}): {e}"
)
return _FileValidationResult(
errors=errors,
warnings=warnings,
marker_ids=marker_ids,
active_marker_ids=active_marker_ids,
root_drift_paths=root_drift_paths,
)
if _sha256_text(disk_text) != got_orig:
warnings.append(f"On-disk file differs from pack for {rel}")
root_drift_paths.append(str(rel))
return _FileValidationResult(
errors=errors,
warnings=warnings,
marker_ids=marker_ids,
active_marker_ids=active_marker_ids,
root_drift_paths=root_drift_paths,
)
[docs]
def validate_pack_markdown(
markdown_text: str,
*,
root: Path | None = None,
strict: bool = False,
encoding_errors: str = "replace",
) -> ValidationReport:
sections = split_repository_sections(markdown_text)
if not sections:
return _validate_single_pack_markdown(markdown_text, root=root, strict=strict)
errors: list[str] = []
warnings: list[str] = []
root_drift_paths: list[str] = []
anchor_owner: dict[str, str] = {}
redacted_count = 0
safety_skip_count = 0
root_resolved = root.resolve() if root is not None else None
for section in sections:
scope = f"repo '{section.label}' ({section.slug})"
if not section.content.strip():
errors.append(f"{scope}: repository section is empty")
continue
manifest_count = _count_manifest_blocks(section.content)
if manifest_count != 1:
errors.append(
f"{scope}: expected exactly one {FENCE_MANIFEST} block, "
f"found {manifest_count}"
)
for anchor in _iter_anchor_ids(section.content):
owner = anchor_owner.get(anchor)
if owner is None:
anchor_owner[anchor] = scope
continue
if owner != scope:
errors.append(
f"Cross-repo anchor collision for '{anchor}': {owner} vs {scope}"
)
section_root = (
root_resolved / section.slug if root_resolved is not None else None
)
try:
report = _validate_single_pack_markdown(
section.content,
root=section_root,
strict=strict,
encoding_errors=encoding_errors,
)
except Exception as e:
errors.append(f"{scope}: failed to parse repository pack: {e}")
continue
errors.extend(f"{scope}: {err}" for err in report.errors)
warnings.extend(f"{scope}: {w}" for w in report.warnings)
root_drift_paths.extend(f"{scope}: {path}" for path in report.root_drift_paths)
redacted_count += report.redacted_count
safety_skip_count += report.safety_skip_count
return ValidationReport(
errors=errors,
warnings=warnings,
root_drift_paths=root_drift_paths,
redacted_count=redacted_count,
safety_skip_count=safety_skip_count,
)
def _validate_single_pack_markdown(
markdown_text: str,
*,
root: Path | None = None,
strict: bool = False,
encoding_errors: str = "replace",
) -> ValidationReport:
"""Validate a packed Codecrate Markdown for internal consistency.
Checks (pack-only):
- Every manifest file has a corresponding stubbed code block.
- sha256_stubbed matches the stubbed code block (normalized newlines).
- Every def in manifest has a canonical body in the function library.
- Reconstructing each file from stub+canonical reproduces sha256_original.
- Marker collisions / missing markers are reported as warnings.
Optional root:
- If provided, compares reconstructed 'original' text against files on disk.
"""
errors: list[str] = []
warnings: list[str] = []
root_drift_paths: list[str] = []
packed = parse_packed_markdown(markdown_text)
manifest = packed.manifest
root_resolved = root.resolve() if root is not None else None
redacted_count, safety_skip_count = _scan_safety_header_counts(markdown_text)
manifest_count = _count_manifest_blocks(markdown_text)
if manifest_count != 1:
errors.append(
f"expected exactly one {FENCE_MANIFEST} block, found {manifest_count}"
)
machine_header_count = _count_machine_header_blocks(markdown_text)
if machine_header_count != 1:
errors.append(
f"expected exactly one {FENCE_MACHINE_HEADER} block, "
f"found {machine_header_count}"
)
errors.extend(_validate_manifest_schema(manifest))
errors.extend(
_validate_machine_header(
machine_header=packed.machine_header,
manifest=manifest,
)
)
errors.extend(_validate_manifest_structure(markdown_text, manifest))
files = manifest.get("files") or []
marker_owners: dict[str, set[str]] = {}
for f in files:
result = _validate_file_entry(
file_entry=f,
packed=packed,
strict=strict,
root_resolved=root_resolved,
encoding_errors=encoding_errors,
)
errors.extend(result.errors)
warnings.extend(result.warnings)
root_drift_paths.extend(result.root_drift_paths)
rel = str(f.get("path") or "")
for marker_id in result.active_marker_ids:
marker_owners.setdefault(marker_id, set()).add(rel)
for marker_id in sorted(marker_owners):
owners = sorted(marker_owners[marker_id])
if len(owners) <= 1:
continue
warnings.append(
f"Repo-scope marker collision for {marker_id}: {', '.join(owners)}"
)
return ValidationReport(
errors=errors,
warnings=warnings,
root_drift_paths=sorted(set(root_drift_paths)),
redacted_count=redacted_count,
safety_skip_count=safety_skip_count,
)
def _count_manifest_blocks(markdown_text: str) -> int:
count = 0
fence: str | None = None
for line in markdown_text.splitlines():
if fence is None:
opened = parse_fence_open(line)
if opened is None:
continue
fence = opened[0]
if opened[1] == FENCE_MANIFEST:
count += 1
continue
if is_fence_close(line, fence):
fence = None
return count
def _count_machine_header_blocks(markdown_text: str) -> int:
count = 0
fence: str | None = None
for line in markdown_text.splitlines():
if fence is None:
opened = parse_fence_open(line)
if opened is None:
continue
fence = opened[0]
if opened[1] == FENCE_MACHINE_HEADER:
count += 1
continue
if is_fence_close(line, fence):
fence = None
return count
def _scan_safety_header_counts(markdown_text: str) -> tuple[int, int]:
redacted = 0
skipped = 0
redacted_match = re.search(
r"^Redacted for safety:\s+(\d+) file\(s\)$",
markdown_text,
flags=re.MULTILINE,
)
if redacted_match is not None:
redacted = int(redacted_match.group(1))
skipped_match = re.search(
r"^Skipped for safety:\s+(\d+) file\(s\)$",
markdown_text,
flags=re.MULTILINE,
)
if skipped_match is not None:
skipped = int(skipped_match.group(1))
return redacted, skipped
def _iter_anchor_ids(markdown_text: str) -> list[str]:
anchors: list[str] = []
fence: str | None = None
for line in markdown_text.splitlines():
if fence is None:
opened = parse_fence_open(line)
if opened is not None:
fence = opened[0]
continue
match = _ANCHOR_RE.match(line)
if match:
anchors.append(match.group(1))
continue
if is_fence_close(line, fence):
fence = None
return anchors
def _scan_section_lines(markdown_text: str, section_title: str) -> list[str]:
lines = markdown_text.splitlines()
fence: str | None = None
start: int | None = None
for i, line in enumerate(lines):
if fence is None:
opened = parse_fence_open(line)
if opened is not None:
fence = opened[0]
continue
if line.strip() == section_title:
start = i + 1
break
else:
if is_fence_close(line, fence):
fence = None
if start is None:
return []
fence = None
end = len(lines)
for j in range(start, len(lines)):
line = lines[j]
if fence is None:
opened = parse_fence_open(line)
if opened is not None:
fence = opened[0]
continue
if line.startswith("## ") and line.strip() != section_title:
end = j
break
else:
if is_fence_close(line, fence):
fence = None
return lines[start:end]
def _scan_file_block_paths(markdown_text: str) -> list[str]:
paths: list[str] = []
fence: str | None = None
for line in _scan_section_lines(markdown_text, "## Files"):
if fence is None:
opened = parse_fence_open(line)
if opened is not None:
fence = opened[0]
continue
else:
if is_fence_close(line, fence):
fence = None
continue
if not line.startswith("### `"):
continue
first_tick = line.find("`")
if first_tick < 0:
continue
second_tick = line.find("`", first_tick + 1)
if second_tick <= first_tick:
continue
rel = line[first_tick + 1 : second_tick].strip()
if rel:
paths.append(rel)
return paths
def _scan_function_library_ids(markdown_text: str) -> list[str]:
ids: list[str] = []
fence: str | None = None
for line in _scan_section_lines(markdown_text, "## Function Library"):
if fence is None:
opened = parse_fence_open(line)
if opened is not None:
fence = opened[0]
continue
else:
if is_fence_close(line, fence):
fence = None
continue
if not line.startswith("### "):
continue
title = line.replace("###", "", 1).strip()
maybe_id = title.split(" — ", 1)[0].strip()
if maybe_id:
ids.append(maybe_id)
return ids