from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor
from dataclasses import replace
from functools import partial
from pathlib import Path
from .ids import stable_body_hash
from .model import ClassRef, DefRef, FilePack, ImportRef, PackResult
from .ordering import sort_paths
from .parse import parse_symbols
from .stubber import stub_file_text
from .symbol_backend import extract_non_python_symbols
def _extract_canonical_source(text: str, d: DefRef) -> str:
lines = text.splitlines(keepends=True)
i0 = max(0, d.decorator_start - 1)
i1 = min(len(lines), d.end_line)
return "".join(lines[i0:i1]).rstrip() + "\n"
def _line_count(text: str) -> int:
return text.count("\n") + 1 if text else 0
def _resolve_worker_count(max_workers: int, item_count: int) -> int:
if item_count <= 1:
return 1
if max_workers > 0:
return max_workers
return min(32, item_count)
def _pack_one_file(
*,
path: Path,
root: Path,
keep_docstrings: bool,
symbol_backend: str,
file_texts: dict[Path, str] | None,
encoding_errors: str,
) -> tuple[
Path,
str,
str,
str,
list[ClassRef],
list[DefRef],
dict[str, str],
str,
str,
str,
str,
list[ImportRef],
list[str],
tuple[int, int] | None,
]:
text = file_texts[path] if file_texts is not None and path in file_texts else ""
if file_texts is None or path not in file_texts:
try:
text = path.read_text(encoding="utf-8", errors=encoding_errors)
except UnicodeDecodeError as e:
raise ValueError(
f"Failed to decode UTF-8 for {path.relative_to(root).as_posix()} "
f"(encoding_errors={encoding_errors})"
) from e
local_canon: dict[str, str] = {}
if path.suffix.lower() == ".py":
try:
parsed = parse_symbols(path=path, root=root, text=text)
except SyntaxError:
classes = []
defs = []
imports = []
exports = []
module_docstring = None
file_module = ""
stubbed = text
language_detected = "python"
symbol_backend_requested = "python-ast"
symbol_backend_used = "python-ast"
symbol_extraction_status = "syntax-error"
else:
classes = parsed.classes
defs = parsed.defs
imports = parsed.imports
exports = parsed.exports
module_docstring = parsed.module_docstring
file_module = parsed.module
for d in defs:
local_canon[d.local_id] = _extract_canonical_source(text, d)
stubbed = stub_file_text(text, defs, keep_docstrings=keep_docstrings)
language_detected = "python"
symbol_backend_requested = "python-ast"
symbol_backend_used = "python-ast"
symbol_extraction_status = "ok"
else:
classes = []
sym = extract_non_python_symbols(
path=path,
root=root,
text=text,
backend=symbol_backend,
)
defs = sym.defs
imports = sym.imports
exports = sym.exports
module_docstring = sym.module_docstring
file_module = defs[0].module if defs else ""
stubbed = text
language_detected = sym.language_detected
symbol_backend_requested = sym.backend_requested
symbol_backend_used = sym.backend_used
symbol_extraction_status = sym.extraction_status
return (
path,
text,
file_module,
stubbed,
classes,
defs,
local_canon,
language_detected,
symbol_backend_requested,
symbol_backend_used,
symbol_extraction_status,
imports,
exports,
module_docstring,
)
[docs]
def pack_repo(
root: Path,
files: list[Path],
keep_docstrings: bool = True,
dedupe: bool = False,
symbol_backend: str = "auto",
*,
file_texts: dict[Path, str] | None = None,
max_workers: int = 0,
encoding_errors: str = "replace",
) -> tuple[PackResult, dict[str, str]]:
files = sort_paths(files)
filepacks: list[FilePack] = []
all_defs: list[DefRef] = []
all_classes: list[ClassRef] = []
local_canon: dict[str, str] = {}
worker_count = _resolve_worker_count(max_workers, len(files))
worker = partial(
_pack_one_file,
root=root,
keep_docstrings=keep_docstrings,
symbol_backend=symbol_backend,
file_texts=file_texts,
encoding_errors=encoding_errors,
)
if worker_count == 1:
packed_data = [worker(path=path) for path in files]
else:
with ThreadPoolExecutor(max_workers=worker_count) as pool:
packed_data = list(pool.map(lambda p: worker(path=p), files))
for (
path,
text,
file_module,
stubbed,
classes,
defs,
canon_for_file,
language_detected,
symbol_backend_requested,
symbol_backend_used,
symbol_extraction_status,
imports,
exports,
module_docstring,
) in packed_data:
local_canon.update(canon_for_file)
fp = FilePack(
path=path,
module=file_module,
original_text=text,
stubbed_text=stubbed,
line_count=_line_count(text),
classes=classes,
defs=defs,
imports=imports,
exports=exports,
module_docstring=module_docstring,
language_detected=language_detected,
symbol_backend_requested=symbol_backend_requested,
symbol_backend_used=symbol_backend_used,
symbol_extraction_status=symbol_extraction_status,
)
filepacks.append(fp)
all_defs.extend(defs)
all_classes.extend(classes)
canonical_sources: dict[str, str] = {}
if not dedupe:
canonical_sources = {
d.local_id: local_canon[d.local_id]
for d in all_defs
if d.local_id in local_canon
}
else:
seen_by_hash: dict[str, str] = {}
remapped_defs: list[DefRef] = []
for d in all_defs:
code = local_canon.get(d.local_id)
if code is None:
remapped_defs.append(d)
continue
h = stable_body_hash(code)
cid = seen_by_hash.get(h)
if cid is None:
cid = d.local_id
seen_by_hash[h] = cid
canonical_sources[cid] = code
remapped_defs.append(replace(d, id=cid))
all_defs = remapped_defs
defs_by_file: dict[Path, list[DefRef]] = {}
for d in all_defs:
defs_by_file.setdefault(d.path, []).append(d)
filepacks2: list[FilePack] = []
for fp in filepacks:
defs2 = defs_by_file.get(fp.path, [])
if fp.path.suffix.lower() == ".py":
stubbed2 = stub_file_text(
fp.original_text,
defs2,
keep_docstrings=keep_docstrings,
)
else:
stubbed2 = fp.original_text
filepacks2.append(
FilePack(
path=fp.path,
module=fp.module,
original_text=fp.original_text,
stubbed_text=stubbed2,
line_count=fp.line_count,
classes=fp.classes,
defs=defs2,
imports=fp.imports,
exports=fp.exports,
module_docstring=fp.module_docstring,
language_detected=fp.language_detected,
symbol_backend_requested=fp.symbol_backend_requested,
symbol_backend_used=fp.symbol_backend_used,
symbol_extraction_status=fp.symbol_extraction_status,
)
)
filepacks = filepacks2
pack = PackResult(root=root, files=filepacks, classes=all_classes, defs=all_defs)
return pack, canonical_sources