colibri/scripts/headroom-sidecar.py
Sam & Claude 34929a6a53
Some checks failed
CI / rust (pull_request) Has been cancelled
CI / markdown (pull_request) Has been cancelled
fix(headroom): harden sidecar protocol and timeout (Sam & Codex)
Keep the Python sidecar connection open for multiple newline-delimited requests, add daemon-side request timeout/fallback tests, and document the opt-in Headroom sidecar contract.\n\nChecks: ./scripts/check-format.sh; cargo fmt --check; python3 -m py_compile scripts/headroom-sidecar.py; git diff --check; cargo test -p colibri-daemon cost -- --nocapture; cargo test -p colibri-daemon session:: -- --nocapture; cargo test -p colibri-daemon --all-targets; cargo check -p colibri-daemon; manual sidecar two-request smoke using a headroom-capable Python env.
2026-06-14 01:30:45 +02:00

163 lines
4.5 KiB
Python

#!/usr/bin/env python3
"""Headroom compression sidecar for Colibri daemon.
Listens on a Unix domain socket. Accepts newline-delimited JSON requests,
compresses the tool output via headroom, and returns the compressed result.
Protocol:
{"id": "uuid", "raw": "tool output text...", "role": "tool"}\n
{"id": "uuid", "compressed": "...", "tokens_before": N, "tokens_after": M}\n
Exit codes:
0 — clean shutdown (SIGTERM)
1 — startup failure (socket bind error, import error)
2 — runtime panic (unhandled exception in event loop)
"""
import argparse
import json
import os
import signal
import socket
import sys
import uuid
SOCKET_BACKLOG = 5
MAX_MESSAGE_BYTES = 1_048_576 # 1 MiB
def main():
parser = argparse.ArgumentParser(description="Headroom compression sidecar")
parser.add_argument(
"--socket",
default="/var/run/colibri/headroom.sock",
help="Unix socket path",
)
args = parser.parse_args()
sock_path = args.socket
_ensure_socket_dir(sock_path)
# Late import so --help is fast
from headroom import compress
running = True
def _handle_signal(signum, frame):
nonlocal running
running = False
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
_remove_stale_socket(sock_path)
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server.bind(sock_path)
os.chmod(sock_path, 0o660)
server.listen(SOCKET_BACKLOG)
server.settimeout(1.0) # wake up every second to check shutdown flag
print(f"headroom-sidecar listening on {sock_path}", flush=True)
while running:
try:
conn, _addr = server.accept()
except socket.timeout:
continue
try:
_handle_connection(conn, compress)
except Exception:
# Don't crash the whole server on one bad message
pass
finally:
conn.close()
_remove_stale_socket(sock_path)
return 0
def _ensure_socket_dir(sock_path: str) -> None:
sock_dir = os.path.dirname(sock_path)
os.makedirs(sock_dir, exist_ok=True)
def _remove_stale_socket(sock_path: str) -> None:
try:
os.unlink(sock_path)
except OSError:
pass
def _handle_connection(conn: socket.socket, compress_fn) -> None:
"""Read newline-delimited requests until the client disconnects.
Colibri keeps one sidecar connection open and reuses it across tool results,
so this handler must process more than one request per accepted socket.
"""
buf = b""
while True:
try:
chunk = conn.recv(4096)
except (ConnectionResetError, BrokenPipeError):
return
if not chunk:
return
buf += chunk
if len(buf) > MAX_MESSAGE_BYTES and b"\n" not in buf:
_write_error(conn, f"message too large (>{MAX_MESSAGE_BYTES} bytes)")
return
while b"\n" in buf:
line, buf = buf.split(b"\n", 1)
if not line:
continue
if len(line) > MAX_MESSAGE_BYTES:
_write_error(conn, f"message too large (>{MAX_MESSAGE_BYTES} bytes)")
return
if not _handle_request_line(conn, line, compress_fn):
return
def _handle_request_line(conn: socket.socket, line: bytes, compress_fn) -> bool:
"""Handle one JSON request line. Returns False when the connection is dead."""
try:
request = json.loads(line)
except json.JSONDecodeError:
return _write_error(conn, "invalid json")
req_id = request.get("id", str(uuid.uuid4()))
raw = request.get("raw", "")
role = request.get("role", "tool")
try:
messages = [{"role": role, "content": raw}]
result = compress_fn(messages)
response = {
"id": req_id,
"compressed": result.messages[0]["content"],
"tokens_before": result.tokens_before,
"tokens_after": result.tokens_after,
}
except Exception as exc:
return _write_error(conn, str(exc))
try:
conn.sendall(json.dumps(response).encode() + b"\n")
return True
except (BrokenPipeError, ConnectionResetError):
return False
def _write_error(conn: socket.socket, message: str) -> bool:
try:
conn.sendall(json.dumps({"error": message}).encode() + b"\n")
return True
except (BrokenPipeError, ConnectionResetError):
return False
if __name__ == "__main__":
sys.exit(main())