layered-soul/scripts/verify_facts_probe.py

363 lines
12 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""Verify-facts probe — gather exact environment facts before acting.
Never assume what hardware, OS, timezone, storage, agents, or config you're
running on. This probe collects verified facts and emits them as structured
JSON. Use it at the start of any session where the environment matters.
Usage:
python3 verify_facts_probe.py # all facts
python3 verify_facts_probe.py --hw # hardware only
python3 verify_facts_probe.py --agents # agent/harness only
python3 verify_facts_probe.py --json # JSON output (default)
python3 verify_facts_probe.py --text # human-readable output
"""
from __future__ import annotations
import argparse
import json
import os
import platform
import re
import shutil
import socket
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
def run(cmd: list[str], timeout: int = 10) -> str:
"""Run a command and return stripped stdout, or error string."""
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=timeout
)
return result.stdout.strip() or result.stderr.strip()
except Exception as e:
return f"ERROR: {e}"
def probe_os() -> dict:
return {
"system": platform.system(),
"release": platform.release(),
"version": platform.version(),
"machine": platform.machine(),
"hostname": socket.gethostname(),
"kernel": run(["uname", "-a"]),
"freebsd_version": run(["freebsd-version"]) if shutil.which("freebsd-version") else None,
}
def probe_timezone_locale() -> dict:
tz = None
# Linux
if os.path.exists("/etc/timezone"):
tz = Path("/etc/timezone").read_text().strip()
elif os.path.exists("/etc/localtime"):
tz = os.path.realpath("/etc/localtime")
tz_env = os.environ.get("TZ", "")
timedatectl = run(["timedatectl", "show", "--property=Timezone"]) if shutil.which("timedatectl") else None
locale = {}
for key in ("LANG", "LC_ALL", "LC_TIME", "LC_CTYPE", "LC_MESSAGES"):
val = os.environ.get(key, "")
if val:
locale[key] = val
localectl = run(["localectl", "status"]) if shutil.which("localectl") else None
return {
"timezone": {
"system": tz,
"env_TZ": tz_env or None,
"timedatectl": timedatectl,
"now_utc": datetime.now(timezone.utc).isoformat(),
"now_local": datetime.now().astimezone().isoformat(),
},
"locale": {
"env": locale or None,
"localectl": localectl,
},
}
def probe_hardware() -> dict:
disks = []
# Linux
if shutil.which("lsblk"):
out = run(["lsblk", "-o", "NAME,PATH,SIZE,MODEL,SERIAL,TRAN,RM,HOTPLUG,MOUNTPOINTS", "-b", "-n"])
for line in out.split("\n"):
parts = line.split()
if parts:
disks.append({"name": parts[0], "path": parts[1] if len(parts) > 1 else ""})
# FreeBSD
elif shutil.which("camcontrol"):
out = run(["camcontrol", "devlist"])
for line in out.split("\n"):
disks.append({"device": line.strip()})
memory = {}
if shutil.which("free"):
out = run(["free", "-h"])
memory["free"] = out
elif shutil.which("sysctl"):
out = run(["sysctl", "-n", "hw.physmem"])
memory["physmem"] = f"{int(out) // (1024**3)} GiB" if out.isdigit() else out
cpu = {}
if shutil.which("lscpu"):
out = run(["lscpu"])
for line in out.split("\n"):
if "Model name" in line:
cpu["model"] = line.split(":", 1)[1].strip()
if "CPU(s)" in line and "NUMA" not in line:
cpu["cores"] = line.split(":", 1)[1].strip()
elif shutil.which("sysctl"):
cpu["model"] = run(["sysctl", "-n", "hw.model"])
cpu["cores"] = run(["sysctl", "-n", "hw.ncpu"])
gpu = run(["lspci", "|", "grep", "-i", "vga"]) if shutil.which("lspci") else None
return {
"disks": disks,
"memory": memory,
"cpu": cpu,
"gpu": gpu,
}
def probe_storage() -> dict:
zfs = {}
if shutil.which("zpool"):
zfs["pools"] = run(["zpool", "list", "-H", "-o", "name,size,health"])
zfs["datasets"] = run(["zfs", "list", "-H", "-o", "name,mountpoint,used,available"])
mounts = run(["mount"]) if shutil.which("mount") else run(["df", "-h"])
return {
"zfs": zfs or None,
"mounts": mounts[:500] if mounts else None,
}
def probe_jails_containers() -> dict:
jails = run(["jls", "-n"]) if shutil.which("jls") else None
docker = run(["docker", "ps", "--format", "table {{.Names}}\t{{.Status}}"]) if shutil.which("docker") else None
return {
"freebsd_jails": jails,
"docker_containers": docker,
}
def probe_network() -> dict:
tailscale = {}
if shutil.which("tailscale"):
tailscale["status"] = run(["tailscale", "status"])
interfaces = run(["ip", "addr"]) if shutil.which("ip") else run(["ifconfig"])
dns = Path("/etc/resolv.conf").read_text().strip() if os.path.exists("/etc/resolv.conf") else None
return {
"tailscale": tailscale or None,
"interfaces": interfaces[:500],
"dns": dns,
}
def probe_agents() -> dict:
agents = {}
# Hermes
hermes_bin = shutil.which("hermes")
if hermes_bin:
agents["hermes"] = {
"binary": hermes_bin,
"version": run(["hermes", "--version"], timeout=5),
"home": os.environ.get("HERMES_HOME", str(Path.home() / ".hermes")),
}
# Colibri daemon
colibri_bin = shutil.which("colibri")
if colibri_bin:
out = run(["colibri", "status"], timeout=5)
agents["colibri"] = {
"binary": colibri_bin,
"status": out[:500] if out else "no response",
}
# Zot
zot_bin = shutil.which("zot")
if zot_bin:
agents["zot"] = {"binary": zot_bin}
# Codex
codex_bin = shutil.which("codex")
if codex_bin:
agents["codex"] = {"binary": codex_bin, "version": run(["codex", "--version"], timeout=5)}
# Claude Code
claude_bin = shutil.which("claude")
if claude_bin:
agents["claude-code"] = {"binary": claude_bin}
return agents
def probe_layered_soul() -> dict:
"""Find and report layered-soul identity version."""
candidates = [
Path.home() / "ai" / "layered-soul",
Path(os.environ.get("LAYERED_SOUL_PATH", "")),
]
for path in candidates:
if (path / "manifest.json").exists():
try:
manifest = json.loads((path / "manifest.json").read_text())
import subprocess as sp
commit = sp.run(
["git", "-C", str(path), "rev-parse", "--short", "HEAD"],
capture_output=True, text=True, timeout=5
).stdout.strip()
return {
"path": str(path),
"schema": manifest.get("schema"),
"commit": commit,
"skills": len(list((path / "skills").rglob("SKILL.md"))),
"memories": len(list((path / "memories" / "curated").rglob("*.md"))),
}
except Exception:
pass
return {"found": False}
def probe_git() -> dict:
"""Check common repos for current state."""
repos = {}
for name in ["colibri", "layered-soul", "clawdie-iso", "clawdie-ai", "hermes-bsd", "hermes-soul"]:
path = Path.home() / "ai" / name
if (path / ".git").exists():
try:
branch = subprocess.run(
["git", "-C", str(path), "branch", "--show-current"],
capture_output=True, text=True, timeout=5
).stdout.strip()
sha = subprocess.run(
["git", "-C", str(path), "rev-parse", "--short", "HEAD"],
capture_output=True, text=True, timeout=5
).stdout.strip()
dirty = subprocess.run(
["git", "-C", str(path), "status", "--porcelain"],
capture_output=True, text=True, timeout=5
).stdout.strip()
repos[name] = {
"branch": branch,
"sha": sha,
"dirty": bool(dirty),
}
except Exception:
pass
return repos
def probe_build_tools() -> dict:
return {
"rust": run(["rustc", "--version"]) if shutil.which("rustc") else None,
"go": run(["go", "version"]) if shutil.which("go") else None,
"zig": run(["zig", "version"]) if shutil.which("zig") else None,
"python": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
"node": run(["node", "--version"]) if shutil.which("node") else None,
"npm": run(["npm", "--version"]) if shutil.which("npm") else None,
"gmake": run(["gmake", "--version"]) if shutil.which("gmake") else None,
"uv": run(["uv", "--version"]) if shutil.which("uv") else None,
}
def probe_secrets() -> dict:
"""Check secrets infrastructure reachability."""
result = {"vaultwarden": {}}
bw_bin = shutil.which("bw")
if bw_bin:
result["vaultwarden"]["bw_installed"] = True
result["vaultwarden"]["server"] = os.environ.get("BW_SERVER", "not set")
try:
status = run(["bw", "status"], timeout=5)
result["vaultwarden"]["status"] = status[:200]
except Exception:
result["vaultwarden"]["status"] = "unreachable"
else:
result["vaultwarden"]["bw_installed"] = False
return result
# ── Main ───────────────────────────────────────────────
FACTS = {
"os": probe_os,
"timezone_locale": probe_timezone_locale,
"hardware": probe_hardware,
"storage": probe_storage,
"jails_containers": probe_jails_containers,
"network": probe_network,
"agents": probe_agents,
"layered_soul": probe_layered_soul,
"git": probe_git,
"build_tools": probe_build_tools,
"secrets": probe_secrets,
}
def main():
parser = argparse.ArgumentParser(
description="Verify facts — gather exact environment facts before acting"
)
parser.add_argument("--json", action="store_true", default=True)
parser.add_argument("--text", action="store_true")
# Per-category flags
for key in FACTS:
parser.add_argument(f"--{key.replace('_', '-')}", action="store_true")
args = parser.parse_args()
# Determine which probes to run
specific = [k for k in FACTS if getattr(args, k.replace("-", "_"), False)]
probes = specific if specific else list(FACTS)
output: dict = {"probed_at": datetime.now(timezone.utc).isoformat()}
for key in probes:
try:
output[key] = FACTS[key]()
except Exception as e:
output[key] = {"error": str(e)}
if args.text:
_print_text(output)
else:
print(json.dumps(output, indent=2, default=str))
def _print_text(data: dict) -> None:
"""Human-readable output."""
for section, content in data.items():
if section == "probed_at":
print(f"=== PROBED {content} ===\n")
continue
print(f"--- {section.upper()} ---")
if isinstance(content, dict):
for k, v in content.items():
if v is not None:
print(f" {k}: {_fmt(v)}")
print()
def _fmt(val) -> str:
if isinstance(val, dict):
return json.dumps(val, default=str)[:200]
if isinstance(val, list):
return f"[{len(val)} items]"
return str(val)[:200]
if __name__ == "__main__":
main()