#!/usr/bin/env python3 """Verify-facts probe — gather exact environment facts before acting. Never assume what hardware, OS, timezone, storage, agents, or config you're running on. This probe collects verified facts and emits them as structured JSON. Use it at the start of any session where the environment matters. Usage: python3 verify_facts_probe.py # all facts python3 verify_facts_probe.py --hw # hardware only python3 verify_facts_probe.py --agents # agent/harness only python3 verify_facts_probe.py --json # JSON output (default) python3 verify_facts_probe.py --text # human-readable output """ from __future__ import annotations import argparse import json import os import platform import re import shutil import socket import subprocess import sys from datetime import datetime, timezone from pathlib import Path # ── OS detection (run FIRST — everything else depends on it) ── def detect_os() -> dict: """Detect OS and return synthesized command map for cross-platform safety. This is the first probe. All other probes use the command map instead of hardcoded commands. Prevents Linux/FreeBSD command drift bugs. """ system = platform.system() release = platform.release() machine = platform.machine() hostname = socket.gethostname() kernel = run_raw(["uname", "-a"]) # OS-specific command equivalents if system == "FreeBSD": freebsd_version = run_raw(["freebsd-version"]) cmds = { "disk_list": ["camcontrol", "devlist"], "disk_info": ["gpart", "show"], "memory": ["sysctl", "-n", "hw.physmem"], "cpu_model": ["sysctl", "-n", "hw.model"], "cpu_cores": ["sysctl", "-n", "hw.ncpu"], "mounts": ["mount"], "network": ["ifconfig"], "shell": "/usr/local/bin/bash", "make": "gmake", "package_manager": "pkg install -y", "service_manager": "rc.d", "device_prefix": "/dev/da", "checksum_cmd": "sha256", "sed_in_place": "sed -i ''", "grep_extended": "grep -E", } elif system == "Linux": freebsd_version = None cmds = { "disk_list": ["lsblk", "-o", "NAME,PATH,SIZE,MODEL,SERIAL,TRAN,RM,HOTPLUG,MOUNTPOINTS", "-b", "-n"], "disk_info": ["lsblk"], "memory": ["free", "-h"], "cpu_model": ["lscpu"], "cpu_cores": ["lscpu"], "mounts": ["mount"], "network": ["ip", "addr"], "shell": "/bin/bash", "make": "make", "package_manager": "apt install -y", "service_manager": "systemd", "device_prefix": "/dev/sd", "checksum_cmd": "sha256sum", "sed_in_place": "sed -i", "grep_extended": "grep -E", } elif system == "Darwin": freebsd_version = None cmds = { "disk_list": ["diskutil", "list"], "disk_info": ["diskutil", "list"], "memory": ["sysctl", "-n", "hw.memsize"], "cpu_model": ["sysctl", "-n", "machdep.cpu.brand_string"], "cpu_cores": ["sysctl", "-n", "hw.ncpu"], "mounts": ["mount"], "network": ["ifconfig"], "shell": "/bin/bash", "make": "make", "package_manager": "brew install", "service_manager": "launchd", "device_prefix": "/dev/disk", "checksum_cmd": "shasum -a 256", "sed_in_place": "sed -i ''", "grep_extended": "grep -E", } else: freebsd_version = None cmds = {} return { "system": system, "release": release, "machine": machine, "hostname": hostname, "kernel": kernel, "freebsd_version": freebsd_version, "_commands": cmds, # synthesized — use this for all subsequent probes } def run_raw(cmd: list[str], timeout: int = 10) -> str: """Run a command and return stripped stdout, or error string.""" try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout ) return result.stdout.strip() or result.stderr.strip() except Exception as e: return f"ERROR: {e}" def run(cmd: list[str], timeout: int = 10) -> str: """Run a command and return stripped stdout, or error string.""" try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout ) return result.stdout.strip() or result.stderr.strip() except Exception as e: return f"ERROR: {e}" def probe_os() -> dict: return detect_os() # always first — synthesizes command map for all other probes def probe_timezone_locale() -> dict: tz = None # Linux if os.path.exists("/etc/timezone"): tz = Path("/etc/timezone").read_text().strip() elif os.path.exists("/etc/localtime"): tz = os.path.realpath("/etc/localtime") tz_env = os.environ.get("TZ", "") timedatectl = run(["timedatectl", "show", "--property=Timezone"]) if shutil.which("timedatectl") else None locale = {} for key in ("LANG", "LC_ALL", "LC_TIME", "LC_CTYPE", "LC_MESSAGES"): val = os.environ.get(key, "") if val: locale[key] = val localectl = run(["localectl", "status"]) if shutil.which("localectl") else None return { "timezone": { "system": tz, "env_TZ": tz_env or None, "timedatectl": timedatectl, "now_utc": datetime.now(timezone.utc).isoformat(), "now_local": datetime.now().astimezone().isoformat(), }, "locale": { "env": locale or None, "localectl": localectl, }, } def probe_hardware() -> dict: disks = [] # Linux if shutil.which("lsblk"): out = run(["lsblk", "-o", "NAME,PATH,SIZE,MODEL,SERIAL,TRAN,RM,HOTPLUG,MOUNTPOINTS", "-b", "-n"]) for line in out.split("\n"): parts = line.split() if parts: disks.append({"name": parts[0], "path": parts[1] if len(parts) > 1 else ""}) # FreeBSD elif shutil.which("camcontrol"): out = run(["camcontrol", "devlist"]) for line in out.split("\n"): disks.append({"device": line.strip()}) memory = {} if shutil.which("free"): out = run(["free", "-h"]) memory["free"] = out elif shutil.which("sysctl"): out = run(["sysctl", "-n", "hw.physmem"]) memory["physmem"] = f"{int(out) // (1024**3)} GiB" if out.isdigit() else out cpu = {} if shutil.which("lscpu"): out = run(["lscpu"]) for line in out.split("\n"): if "Model name" in line: cpu["model"] = line.split(":", 1)[1].strip() if "CPU(s)" in line and "NUMA" not in line: cpu["cores"] = line.split(":", 1)[1].strip() elif shutil.which("sysctl"): cpu["model"] = run(["sysctl", "-n", "hw.model"]) cpu["cores"] = run(["sysctl", "-n", "hw.ncpu"]) gpu = run(["lspci", "|", "grep", "-i", "vga"]) if shutil.which("lspci") else None return { "disks": disks, "memory": memory, "cpu": cpu, "gpu": gpu, } def probe_storage() -> dict: zfs = {} if shutil.which("zpool"): zfs["pools"] = run(["zpool", "list", "-H", "-o", "name,size,health"]) zfs["datasets"] = run(["zfs", "list", "-H", "-o", "name,mountpoint,used,available"]) mounts = run(["mount"]) if shutil.which("mount") else run(["df", "-h"]) return { "zfs": zfs or None, "mounts": mounts[:500] if mounts else None, } def probe_jails_containers() -> dict: jails = run_raw(["jls", "-n"]) if shutil.which("jls") else None docker = {} docker_bin = shutil.which("docker") if docker_bin: docker["binary"] = docker_bin docker["version"] = run_raw(["docker", "--version"]) # Try both system socket and Docker Desktop socket for sock in ["/var/run/docker.sock", os.path.expanduser("~/.docker/desktop/docker.sock")]: if os.path.exists(sock): docker["socket"] = sock break containers = run_raw(["docker", "ps", "-a", "--format", "table {{.Names}}\t{{.Image}}\t{{.Status}}"]) if containers and "ERROR" not in containers: docker["containers"] = containers else: docker["containers"] = "daemon not running" else: docker["installed"] = False return { "freebsd_jails": jails, "docker": docker or None, } def probe_network() -> dict: tailscale = {} if shutil.which("tailscale"): tailscale["status"] = run(["tailscale", "status"]) interfaces = run(["ip", "addr"]) if shutil.which("ip") else run(["ifconfig"]) dns = Path("/etc/resolv.conf").read_text().strip() if os.path.exists("/etc/resolv.conf") else None return { "tailscale": tailscale or None, "interfaces": interfaces[:500], "dns": dns, } def probe_agents() -> dict: agents = {} # Hermes hermes_bin = shutil.which("hermes") if hermes_bin: agents["hermes"] = { "binary": hermes_bin, "version": run(["hermes", "--version"], timeout=5), "home": os.environ.get("HERMES_HOME", str(Path.home() / ".hermes")), } # Colibri daemon colibri_bin = shutil.which("colibri") if colibri_bin: out = run(["colibri", "status"], timeout=5) agents["colibri"] = { "binary": colibri_bin, "status": out[:500] if out else "no response", } # Zot zot_bin = shutil.which("zot") if zot_bin: agents["zot"] = {"binary": zot_bin} # Codex codex_bin = shutil.which("codex") if codex_bin: agents["codex"] = {"binary": codex_bin, "version": run(["codex", "--version"], timeout=5)} # Claude Code claude_bin = shutil.which("claude") if claude_bin: agents["claude-code"] = {"binary": claude_bin} return agents def probe_layered_soul() -> dict: """Find and report layered-soul identity version.""" candidates = [ Path.home() / "ai" / "layered-soul", Path(os.environ.get("LAYERED_SOUL_PATH", "")), ] for path in candidates: if (path / "manifest.json").exists(): try: manifest = json.loads((path / "manifest.json").read_text()) import subprocess as sp commit = sp.run( ["git", "-C", str(path), "rev-parse", "--short", "HEAD"], capture_output=True, text=True, timeout=5 ).stdout.strip() return { "path": str(path), "schema": manifest.get("schema"), "commit": commit, "skills": len(list((path / "skills").rglob("SKILL.md"))), "memories": len(list((path / "memories" / "curated").rglob("*.md"))), } except Exception: pass return {"found": False} def probe_git() -> dict: """Check common repos for current state.""" repos = {} for name in ["colibri", "layered-soul", "clawdie-iso", "clawdie-ai", "hermes-bsd", "hermes-soul"]: path = Path.home() / "ai" / name if (path / ".git").exists(): try: branch = subprocess.run( ["git", "-C", str(path), "branch", "--show-current"], capture_output=True, text=True, timeout=5 ).stdout.strip() sha = subprocess.run( ["git", "-C", str(path), "rev-parse", "--short", "HEAD"], capture_output=True, text=True, timeout=5 ).stdout.strip() dirty = subprocess.run( ["git", "-C", str(path), "status", "--porcelain"], capture_output=True, text=True, timeout=5 ).stdout.strip() repos[name] = { "branch": branch, "sha": sha, "dirty": bool(dirty), } except Exception: pass return repos def probe_build_tools() -> dict: # Node: check fnm/nvm first (version managers override system node) node_info = {} system_node = run_raw(["node", "--version"]) if shutil.which("node") else None system_npm = run_raw(["npm", "--version"]) if shutil.which("npm") else None # fnm (Fast Node Manager) — Rust, cross-platform, like uv for Node fnm_bin = shutil.which("fnm") if fnm_bin: node_info["manager"] = "fnm" node_info["fnm_version"] = run_raw([fnm_bin, "--version"]) # List installed node versions via fnm versions_dir = Path.home() / ".local" / "share" / "fnm" / "node-versions" if versions_dir.exists(): installed = [d.name for d in versions_dir.iterdir() if d.is_dir()] node_info["installed"] = installed # Check for default alias default_link = Path.home() / ".local" / "share" / "fnm" / "aliases" / "default" / "current" if default_link.exists(): node_info["default"] = default_link.resolve().name # nvm fallback nvm_bin = shutil.which("nvm") if nvm_bin: node_info["manager"] = node_info.get("manager", "nvm") if node_info: node_info["system_node"] = system_node node_info["system_npm"] = system_npm else: node_info = {"version": system_node, "npm": system_npm, "manager": "system"} return { "rust": run_raw(["rustc", "--version"]) if shutil.which("rustc") else None, "go": run_raw(["go", "version"]) if shutil.which("go") else None, "zig": run_raw(["zig", "version"]) if shutil.which("zig") else None, "python": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}", "node": node_info, "gmake": run_raw(["gmake", "--version"]) if shutil.which("gmake") else None, "uv": run_raw(["uv", "--version"]) if shutil.which("uv") else None, } def probe_secrets() -> dict: """Check secrets infrastructure reachability.""" result = {"vaultwarden": {}} bw_bin = shutil.which("bw") if bw_bin: result["vaultwarden"]["bw_installed"] = True result["vaultwarden"]["server"] = os.environ.get("BW_SERVER", "not set") try: status = run(["bw", "status"], timeout=5) result["vaultwarden"]["status"] = status[:200] except Exception: result["vaultwarden"]["status"] = "unreachable" else: result["vaultwarden"]["bw_installed"] = False return result # ── Main ─────────────────────────────────────────────── FACTS = { "os": probe_os, "timezone_locale": probe_timezone_locale, "hardware": probe_hardware, "storage": probe_storage, "jails_containers": probe_jails_containers, "network": probe_network, "agents": probe_agents, "layered_soul": probe_layered_soul, "git": probe_git, "build_tools": probe_build_tools, "secrets": probe_secrets, } def main(): parser = argparse.ArgumentParser( description="Verify facts — gather exact environment facts before acting" ) parser.add_argument("--json", action="store_true", default=True) parser.add_argument("--text", action="store_true") # Per-category flags for key in FACTS: parser.add_argument(f"--{key.replace('_', '-')}", action="store_true") args = parser.parse_args() # Determine which probes to run specific = [k for k in FACTS if getattr(args, k.replace("-", "_"), False)] probes = specific if specific else list(FACTS) output: dict = {"probed_at": datetime.now(timezone.utc).isoformat()} for key in probes: try: output[key] = FACTS[key]() except Exception as e: output[key] = {"error": str(e)} if args.text: _print_text(output) else: print(json.dumps(output, indent=2, default=str)) def _print_text(data: dict) -> None: """Human-readable output.""" for section, content in data.items(): if section == "probed_at": print(f"=== PROBED {content} ===\n") continue print(f"--- {section.upper()} ---") if isinstance(content, dict): for k, v in content.items(): if v is not None: print(f" {k}: {_fmt(v)}") print() def _fmt(val) -> str: if isinstance(val, dict): return json.dumps(val, default=str)[:200] if isinstance(val, list): return f"[{len(val)} items]" return str(val)[:200] if __name__ == "__main__": main()