layered-soul/scripts/verify_facts_probe.py

499 lines
17 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""Verify-facts probe — gather exact environment facts before acting.
Never assume what hardware, OS, timezone, storage, agents, or config you're
running on. This probe collects verified facts and emits them as structured
JSON. Use it at the start of any session where the environment matters.
Usage:
python3 verify_facts_probe.py # all facts
python3 verify_facts_probe.py --hw # hardware only
python3 verify_facts_probe.py --agents # agent/harness only
python3 verify_facts_probe.py --json # JSON output (default)
python3 verify_facts_probe.py --text # human-readable output
"""
from __future__ import annotations
import argparse
import json
import os
import platform
import re
import shutil
import socket
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
# ── OS detection (run FIRST — everything else depends on it) ──
def detect_os() -> dict:
"""Detect OS and return synthesized command map for cross-platform safety.
This is the first probe. All other probes use the command map instead of
hardcoded commands. Prevents Linux/FreeBSD command drift bugs.
"""
system = platform.system()
release = platform.release()
machine = platform.machine()
hostname = socket.gethostname()
kernel = run_raw(["uname", "-a"])
# OS-specific command equivalents
if system == "FreeBSD":
freebsd_version = run_raw(["freebsd-version"])
cmds = {
"disk_list": ["camcontrol", "devlist"],
"disk_info": ["gpart", "show"],
"memory": ["sysctl", "-n", "hw.physmem"],
"cpu_model": ["sysctl", "-n", "hw.model"],
"cpu_cores": ["sysctl", "-n", "hw.ncpu"],
"mounts": ["mount"],
"network": ["ifconfig"],
"shell": "/usr/local/bin/bash",
"make": "gmake",
"package_manager": "pkg install -y",
"service_manager": "rc.d",
"device_prefix": "/dev/da",
"checksum_cmd": "sha256",
"sed_in_place": "sed -i ''",
"grep_extended": "grep -E",
}
elif system == "Linux":
freebsd_version = None
cmds = {
"disk_list": ["lsblk", "-o", "NAME,PATH,SIZE,MODEL,SERIAL,TRAN,RM,HOTPLUG,MOUNTPOINTS", "-b", "-n"],
"disk_info": ["lsblk"],
"memory": ["free", "-h"],
"cpu_model": ["lscpu"],
"cpu_cores": ["lscpu"],
"mounts": ["mount"],
"network": ["ip", "addr"],
"shell": "/bin/bash",
"make": "make",
"package_manager": "apt install -y",
"service_manager": "systemd",
"device_prefix": "/dev/sd",
"checksum_cmd": "sha256sum",
"sed_in_place": "sed -i",
"grep_extended": "grep -E",
}
elif system == "Darwin":
freebsd_version = None
cmds = {
"disk_list": ["diskutil", "list"],
"disk_info": ["diskutil", "list"],
"memory": ["sysctl", "-n", "hw.memsize"],
"cpu_model": ["sysctl", "-n", "machdep.cpu.brand_string"],
"cpu_cores": ["sysctl", "-n", "hw.ncpu"],
"mounts": ["mount"],
"network": ["ifconfig"],
"shell": "/bin/bash",
"make": "make",
"package_manager": "brew install",
"service_manager": "launchd",
"device_prefix": "/dev/disk",
"checksum_cmd": "shasum -a 256",
"sed_in_place": "sed -i ''",
"grep_extended": "grep -E",
}
else:
freebsd_version = None
cmds = {}
return {
"system": system,
"release": release,
"machine": machine,
"hostname": hostname,
"kernel": kernel,
"freebsd_version": freebsd_version,
"_commands": cmds, # synthesized — use this for all subsequent probes
}
def run_raw(cmd: list[str], timeout: int = 10) -> str:
"""Run a command and return stripped stdout, or error string."""
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=timeout
)
return result.stdout.strip() or result.stderr.strip()
except Exception as e:
return f"ERROR: {e}"
def run(cmd: list[str], timeout: int = 10) -> str:
"""Run a command and return stripped stdout, or error string."""
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=timeout
)
return result.stdout.strip() or result.stderr.strip()
except Exception as e:
return f"ERROR: {e}"
def probe_os() -> dict:
return detect_os() # always first — synthesizes command map for all other probes
def probe_timezone_locale() -> dict:
tz = None
# Linux
if os.path.exists("/etc/timezone"):
tz = Path("/etc/timezone").read_text().strip()
elif os.path.exists("/etc/localtime"):
tz = os.path.realpath("/etc/localtime")
tz_env = os.environ.get("TZ", "")
timedatectl = run(["timedatectl", "show", "--property=Timezone"]) if shutil.which("timedatectl") else None
locale = {}
for key in ("LANG", "LC_ALL", "LC_TIME", "LC_CTYPE", "LC_MESSAGES"):
val = os.environ.get(key, "")
if val:
locale[key] = val
localectl = run(["localectl", "status"]) if shutil.which("localectl") else None
return {
"timezone": {
"system": tz,
"env_TZ": tz_env or None,
"timedatectl": timedatectl,
"now_utc": datetime.now(timezone.utc).isoformat(),
"now_local": datetime.now().astimezone().isoformat(),
},
"locale": {
"env": locale or None,
"localectl": localectl,
},
}
def probe_hardware() -> dict:
disks = []
# Linux
if shutil.which("lsblk"):
out = run(["lsblk", "-o", "NAME,PATH,SIZE,MODEL,SERIAL,TRAN,RM,HOTPLUG,MOUNTPOINTS", "-b", "-n"])
for line in out.split("\n"):
parts = line.split()
if parts:
disks.append({"name": parts[0], "path": parts[1] if len(parts) > 1 else ""})
# FreeBSD
elif shutil.which("camcontrol"):
out = run(["camcontrol", "devlist"])
for line in out.split("\n"):
disks.append({"device": line.strip()})
memory = {}
if shutil.which("free"):
out = run(["free", "-h"])
memory["free"] = out
elif shutil.which("sysctl"):
out = run(["sysctl", "-n", "hw.physmem"])
memory["physmem"] = f"{int(out) // (1024**3)} GiB" if out.isdigit() else out
cpu = {}
if shutil.which("lscpu"):
out = run(["lscpu"])
for line in out.split("\n"):
if "Model name" in line:
cpu["model"] = line.split(":", 1)[1].strip()
if "CPU(s)" in line and "NUMA" not in line:
cpu["cores"] = line.split(":", 1)[1].strip()
elif shutil.which("sysctl"):
cpu["model"] = run(["sysctl", "-n", "hw.model"])
cpu["cores"] = run(["sysctl", "-n", "hw.ncpu"])
gpu = run(["lspci", "|", "grep", "-i", "vga"]) if shutil.which("lspci") else None
return {
"disks": disks,
"memory": memory,
"cpu": cpu,
"gpu": gpu,
}
def probe_storage() -> dict:
zfs = {}
if shutil.which("zpool"):
zfs["pools"] = run(["zpool", "list", "-H", "-o", "name,size,health"])
zfs["datasets"] = run(["zfs", "list", "-H", "-o", "name,mountpoint,used,available"])
mounts = run(["mount"]) if shutil.which("mount") else run(["df", "-h"])
return {
"zfs": zfs or None,
"mounts": mounts[:500] if mounts else None,
}
def probe_jails_containers() -> dict:
jails = run_raw(["jls", "-n"]) if shutil.which("jls") else None
docker = {}
docker_bin = shutil.which("docker")
if docker_bin:
docker["binary"] = docker_bin
docker["version"] = run_raw(["docker", "--version"])
# Try both system socket and Docker Desktop socket
for sock in ["/var/run/docker.sock", os.path.expanduser("~/.docker/desktop/docker.sock")]:
if os.path.exists(sock):
docker["socket"] = sock
break
containers = run_raw(["docker", "ps", "-a", "--format", "table {{.Names}}\t{{.Image}}\t{{.Status}}"])
if containers and "ERROR" not in containers:
docker["containers"] = containers
else:
docker["containers"] = "daemon not running"
else:
docker["installed"] = False
return {
"freebsd_jails": jails,
"docker": docker or None,
}
def probe_network() -> dict:
tailscale = {}
if shutil.which("tailscale"):
tailscale["status"] = run(["tailscale", "status"])
interfaces = run(["ip", "addr"]) if shutil.which("ip") else run(["ifconfig"])
dns = Path("/etc/resolv.conf").read_text().strip() if os.path.exists("/etc/resolv.conf") else None
return {
"tailscale": tailscale or None,
"interfaces": interfaces[:500],
"dns": dns,
}
def probe_agents() -> dict:
agents = {}
# Hermes
hermes_bin = shutil.which("hermes")
if hermes_bin:
agents["hermes"] = {
"binary": hermes_bin,
"version": run(["hermes", "--version"], timeout=5),
"home": os.environ.get("HERMES_HOME", str(Path.home() / ".hermes")),
}
# Colibri daemon
colibri_bin = shutil.which("colibri")
if colibri_bin:
out = run(["colibri", "status"], timeout=5)
agents["colibri"] = {
"binary": colibri_bin,
"status": out[:500] if out else "no response",
}
# Zot
zot_bin = shutil.which("zot")
if zot_bin:
agents["zot"] = {"binary": zot_bin}
# Codex
codex_bin = shutil.which("codex")
if codex_bin:
agents["codex"] = {"binary": codex_bin, "version": run(["codex", "--version"], timeout=5)}
# Claude Code
claude_bin = shutil.which("claude")
if claude_bin:
agents["claude-code"] = {"binary": claude_bin}
return agents
def probe_layered_soul() -> dict:
"""Find and report layered-soul identity version."""
candidates = [
Path.home() / "ai" / "layered-soul",
Path(os.environ.get("LAYERED_SOUL_PATH", "")),
]
for path in candidates:
if (path / "manifest.json").exists():
try:
manifest = json.loads((path / "manifest.json").read_text())
import subprocess as sp
commit = sp.run(
["git", "-C", str(path), "rev-parse", "--short", "HEAD"],
capture_output=True, text=True, timeout=5
).stdout.strip()
return {
"path": str(path),
"schema": manifest.get("schema"),
"commit": commit,
"skills": len(list((path / "skills").rglob("SKILL.md"))),
"memories": len(list((path / "memories" / "curated").rglob("*.md"))),
}
except Exception:
pass
return {"found": False}
def probe_git() -> dict:
"""Check common repos for current state."""
repos = {}
for name in ["colibri", "layered-soul", "clawdie-iso", "clawdie-ai", "hermes-bsd", "hermes-soul"]:
path = Path.home() / "ai" / name
if (path / ".git").exists():
try:
branch = subprocess.run(
["git", "-C", str(path), "branch", "--show-current"],
capture_output=True, text=True, timeout=5
).stdout.strip()
sha = subprocess.run(
["git", "-C", str(path), "rev-parse", "--short", "HEAD"],
capture_output=True, text=True, timeout=5
).stdout.strip()
dirty = subprocess.run(
["git", "-C", str(path), "status", "--porcelain"],
capture_output=True, text=True, timeout=5
).stdout.strip()
repos[name] = {
"branch": branch,
"sha": sha,
"dirty": bool(dirty),
}
except Exception:
pass
return repos
def probe_build_tools() -> dict:
# Node: check fnm/nvm first (version managers override system node)
node_info = {}
system_node = run_raw(["node", "--version"]) if shutil.which("node") else None
system_npm = run_raw(["npm", "--version"]) if shutil.which("npm") else None
# fnm (Fast Node Manager) — Rust, cross-platform, like uv for Node
fnm_bin = shutil.which("fnm")
if fnm_bin:
node_info["manager"] = "fnm"
node_info["fnm_version"] = run_raw([fnm_bin, "--version"])
# List installed node versions via fnm
versions_dir = Path.home() / ".local" / "share" / "fnm" / "node-versions"
if versions_dir.exists():
installed = [d.name for d in versions_dir.iterdir() if d.is_dir()]
node_info["installed"] = installed
# Check for default alias
default_link = Path.home() / ".local" / "share" / "fnm" / "aliases" / "default" / "current"
if default_link.exists():
node_info["default"] = default_link.resolve().name
# nvm fallback
nvm_bin = shutil.which("nvm")
if nvm_bin:
node_info["manager"] = node_info.get("manager", "nvm")
if node_info:
node_info["system_node"] = system_node
node_info["system_npm"] = system_npm
else:
node_info = {"version": system_node, "npm": system_npm, "manager": "system"}
return {
"rust": run_raw(["rustc", "--version"]) if shutil.which("rustc") else None,
"go": run_raw(["go", "version"]) if shutil.which("go") else None,
"zig": run_raw(["zig", "version"]) if shutil.which("zig") else None,
"python": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
"node": node_info,
"gmake": run_raw(["gmake", "--version"]) if shutil.which("gmake") else None,
"uv": run_raw(["uv", "--version"]) if shutil.which("uv") else None,
}
def probe_secrets() -> dict:
"""Check secrets infrastructure reachability."""
result = {"vaultwarden": {}}
bw_bin = shutil.which("bw")
if bw_bin:
result["vaultwarden"]["bw_installed"] = True
result["vaultwarden"]["server"] = os.environ.get("BW_SERVER", "not set")
try:
status = run(["bw", "status"], timeout=5)
result["vaultwarden"]["status"] = status[:200]
except Exception:
result["vaultwarden"]["status"] = "unreachable"
else:
result["vaultwarden"]["bw_installed"] = False
return result
# ── Main ───────────────────────────────────────────────
FACTS = {
"os": probe_os,
"timezone_locale": probe_timezone_locale,
"hardware": probe_hardware,
"storage": probe_storage,
"jails_containers": probe_jails_containers,
"network": probe_network,
"agents": probe_agents,
"layered_soul": probe_layered_soul,
"git": probe_git,
"build_tools": probe_build_tools,
"secrets": probe_secrets,
}
def main():
parser = argparse.ArgumentParser(
description="Verify facts — gather exact environment facts before acting"
)
parser.add_argument("--json", action="store_true", default=True)
parser.add_argument("--text", action="store_true")
# Per-category flags
for key in FACTS:
parser.add_argument(f"--{key.replace('_', '-')}", action="store_true")
args = parser.parse_args()
# Determine which probes to run
specific = [k for k in FACTS if getattr(args, k.replace("-", "_"), False)]
probes = specific if specific else list(FACTS)
output: dict = {"probed_at": datetime.now(timezone.utc).isoformat()}
for key in probes:
try:
output[key] = FACTS[key]()
except Exception as e:
output[key] = {"error": str(e)}
if args.text:
_print_text(output)
else:
print(json.dumps(output, indent=2, default=str))
def _print_text(data: dict) -> None:
"""Human-readable output."""
for section, content in data.items():
if section == "probed_at":
print(f"=== PROBED {content} ===\n")
continue
print(f"--- {section.upper()} ---")
if isinstance(content, dict):
for k, v in content.items():
if v is not None:
print(f" {k}: {_fmt(v)}")
print()
def _fmt(val) -> str:
if isinstance(val, dict):
return json.dumps(val, default=str)[:200]
if isinstance(val, list):
return f"[{len(val)} items]"
return str(val)[:200]
if __name__ == "__main__":
main()