layered-soul/scripts/layered_soul.py

243 lines
7.7 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""Layered Soul helper.
Stdlib-only utilities for validating a Layered Soul repository, rendering a
prompt bundle for task harnesses, and planning safe exports from private runtime
backups such as hermes-soul.
"""
from __future__ import annotations
import argparse
import fnmatch
import json
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any
FORBIDDEN_PATH_PATTERNS = [
".env",
"*.key",
"*.pem",
"*.token",
"config.yaml",
"channel_directory.json",
"sessions/*",
"cron/*",
"**/__pycache__/*",
"**/*.pyc",
"**/*.lock",
]
CORE_FILE_KEYS = ["soul", "user", "identity", "harness_rules"]
@dataclass
class Finding:
level: str
message: str
def load_json(path: Path) -> Any:
try:
return json.loads(path.read_text(encoding="utf-8"))
except FileNotFoundError:
raise SystemExit(f"missing JSON file: {path}")
except json.JSONDecodeError as exc:
raise SystemExit(f"invalid JSON in {path}: {exc}")
def rel_paths(root: Path) -> list[str]:
paths: list[str] = []
for path in root.rglob("*"):
if ".git" in path.parts:
continue
if path.is_file():
paths.append(path.relative_to(root).as_posix())
return sorted(paths)
def matches_any(path: str, patterns: list[str]) -> bool:
return any(fnmatch.fnmatch(path, pattern) for pattern in patterns)
def validate(root: Path) -> list[Finding]:
findings: list[Finding] = []
manifest_path = root / "manifest.json"
if not manifest_path.exists():
return [Finding("error", "manifest.json is required")]
manifest = load_json(manifest_path)
if manifest.get("schema") != "clawdie.layered-soul.v1":
findings.append(
Finding(
"error",
"manifest.schema must be clawdie.layered-soul.v1",
)
)
core_files = manifest.get("core_files", {})
for key in CORE_FILE_KEYS:
value = core_files.get(key)
if not value:
findings.append(Finding("error", f"manifest.core_files.{key} is required"))
continue
path = root / value
if not path.exists():
findings.append(Finding("error", f"core file missing: {value}"))
elif path.stat().st_size == 0:
findings.append(Finding("warn", f"core file is empty: {value}"))
privacy = manifest.get("privacy", {})
if privacy.get("secrets") != "excluded":
findings.append(Finding("warn", "privacy.secrets should be 'excluded'"))
if privacy.get("operator_review_required_before_cross_harness_import") is not True:
findings.append(
Finding(
"warn",
"operator_review_required_before_cross_harness_import should be true",
)
)
for path in rel_paths(root):
if matches_any(path, FORBIDDEN_PATH_PATTERNS):
findings.append(Finding("error", f"forbidden runtime/private path: {path}"))
return findings
def render_prompt(root: Path, include_curated: bool) -> str:
manifest = load_json(root / "manifest.json")
core_files = manifest.get("core_files", {})
sections: list[str] = []
for key in CORE_FILE_KEYS:
rel = core_files.get(key)
if not rel:
continue
path = root / rel
if not path.exists():
continue
content = path.read_text(encoding="utf-8").strip()
if content:
sections.append(f"<!-- {rel} -->\n\n{content}")
if include_curated:
for path in sorted((root / "memories" / "curated").glob("*.md")):
content = path.read_text(encoding="utf-8").strip()
if content:
rel = path.relative_to(root).as_posix()
sections.append(f"<!-- {rel} -->\n\n{content}")
return "\n\n---\n\n".join(sections).strip() + "\n"
def plan_private_source(config_path: Path, source_root: Path | None) -> dict[str, Any]:
config = load_json(config_path)
source = source_root or Path(config.get("local_path", ""))
if not str(source):
raise SystemExit("provide --source-root or local_path in the source config")
source = source.expanduser().resolve()
def count(pattern: str) -> int:
return len([p for p in source.glob(pattern) if p.is_file()])
exists = source.exists()
report: dict[str, Any] = {
"config": str(config_path),
"name": config.get("name"),
"kind": config.get("kind"),
"repo": config.get("repo"),
"visibility": config.get("visibility"),
"source_root": str(source),
"source_exists": exists,
"policy": config.get("export_policy", {}),
"recommended_flow": [
"inspect candidates locally",
"summarize or redact private memory",
"copy only reviewed output into layered-soul",
"run: python3 scripts/layered_soul.py validate .",
],
}
if exists:
report["candidate_counts"] = {
"memory_markdown": count("memories/*.md"),
"skill_markdown": count("skills/**/*.md"),
"session_archives_excluded": count("sessions/*"),
"cron_archives_excluded": count("cron/*"),
}
report["candidate_paths"] = {
"user_memory": "memories/USER.md" if (source / "memories" / "USER.md").exists() else None,
"general_memory": "memories/MEMORY.md" if (source / "memories" / "MEMORY.md").exists() else None,
"skills_root": "skills/" if (source / "skills").exists() else None,
}
return report
def print_findings(findings: list[Finding]) -> int:
errors = 0
for finding in findings:
print(f"{finding.level.upper()}: {finding.message}")
if finding.level == "error":
errors += 1
if not findings:
print("OK: layered soul repository looks valid")
elif errors == 0:
print("OK: no validation errors")
return 1 if errors else 0
def cmd_validate(args: argparse.Namespace) -> int:
return print_findings(validate(Path(args.root).resolve()))
def cmd_render_prompt(args: argparse.Namespace) -> int:
root = Path(args.root).resolve()
rendered = render_prompt(root, include_curated=args.include_curated)
if args.output:
Path(args.output).write_text(rendered, encoding="utf-8")
else:
print(rendered, end="")
return 0
def cmd_plan_private_source(args: argparse.Namespace) -> int:
report = plan_private_source(
Path(args.config).resolve(),
Path(args.source_root).expanduser() if args.source_root else None,
)
print(json.dumps(report, indent=2, sort_keys=True))
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Layered Soul helper")
sub = parser.add_subparsers(dest="command", required=True)
validate_cmd = sub.add_parser("validate", help="validate a Layered Soul repository")
validate_cmd.add_argument("root", nargs="?", default=".")
validate_cmd.set_defaults(func=cmd_validate)
render_cmd = sub.add_parser("render-prompt", help="render core files as a prompt bundle")
render_cmd.add_argument("root", nargs="?", default=".")
render_cmd.add_argument("--include-curated", action="store_true")
render_cmd.add_argument("--output")
render_cmd.set_defaults(func=cmd_render_prompt)
plan_cmd = sub.add_parser("plan-private-source", help="inspect a private source export plan")
plan_cmd.add_argument("config")
plan_cmd.add_argument("--source-root")
plan_cmd.set_defaults(func=cmd_plan_private_source)
return parser
def main(argv: list[str]) -> int:
parser = build_parser()
args = parser.parse_args(argv)
return args.func(args)
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))