#!/usr/bin/env python3 """Headroom compression sidecar for Colibri daemon. Listens on a Unix domain socket. Accepts newline-delimited JSON requests, compresses the tool output via headroom, and returns the compressed result. Protocol: → {"id": "uuid", "raw": "tool output text...", "role": "tool"}\n ← {"id": "uuid", "compressed": "...", "tokens_before": N, "tokens_after": M}\n Exit codes: 0 — clean shutdown (SIGTERM) 1 — startup failure (socket bind error, import error) 2 — runtime panic (unhandled exception in event loop) """ import argparse import json import os import signal import socket import sys import uuid SOCKET_BACKLOG = 5 MAX_MESSAGE_BYTES = 1_048_576 # 1 MiB def main(): parser = argparse.ArgumentParser(description="Headroom compression sidecar") parser.add_argument( "--socket", default="/var/run/colibri/headroom.sock", help="Unix socket path", ) args = parser.parse_args() sock_path = args.socket _ensure_socket_dir(sock_path) # Late import so --help is fast from headroom import compress running = True def _handle_signal(signum, frame): nonlocal running running = False signal.signal(signal.SIGTERM, _handle_signal) signal.signal(signal.SIGINT, _handle_signal) _remove_stale_socket(sock_path) server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) server.bind(sock_path) os.chmod(sock_path, 0o660) server.listen(SOCKET_BACKLOG) server.settimeout(1.0) # wake up every second to check shutdown flag print(f"headroom-sidecar listening on {sock_path}", flush=True) while running: try: conn, _addr = server.accept() except socket.timeout: continue try: _handle_connection(conn, compress) except Exception: # Don't crash the whole server on one bad message pass finally: conn.close() _remove_stale_socket(sock_path) return 0 def _ensure_socket_dir(sock_path: str) -> None: sock_dir = os.path.dirname(sock_path) os.makedirs(sock_dir, exist_ok=True) def _remove_stale_socket(sock_path: str) -> None: try: os.unlink(sock_path) except OSError: pass def _handle_connection(conn: socket.socket, compress_fn) -> None: """Read newline-delimited requests until the client disconnects. Colibri keeps one sidecar connection open and reuses it across tool results, so this handler must process more than one request per accepted socket. """ buf = b"" while True: try: chunk = conn.recv(4096) except (ConnectionResetError, BrokenPipeError): return if not chunk: return buf += chunk if len(buf) > MAX_MESSAGE_BYTES and b"\n" not in buf: _write_error(conn, f"message too large (>{MAX_MESSAGE_BYTES} bytes)") return while b"\n" in buf: line, buf = buf.split(b"\n", 1) if not line: continue if len(line) > MAX_MESSAGE_BYTES: _write_error(conn, f"message too large (>{MAX_MESSAGE_BYTES} bytes)") return if not _handle_request_line(conn, line, compress_fn): return def _handle_request_line(conn: socket.socket, line: bytes, compress_fn) -> bool: """Handle one JSON request line. Returns False when the connection is dead.""" try: request = json.loads(line) except json.JSONDecodeError: return _write_error(conn, "invalid json") req_id = request.get("id", str(uuid.uuid4())) raw = request.get("raw", "") role = request.get("role", "tool") try: messages = [{"role": role, "content": raw}] result = compress_fn(messages) response = { "id": req_id, "compressed": result.messages[0]["content"], "tokens_before": result.tokens_before, "tokens_after": result.tokens_after, } except Exception as exc: return _write_error(conn, str(exc)) try: conn.sendall(json.dumps(response).encode() + b"\n") return True except (BrokenPipeError, ConnectionResetError): return False def _write_error(conn: socket.socket, message: str) -> bool: try: conn.sendall(json.dumps({"error": message}).encode() + b"\n") return True except (BrokenPipeError, ConnectionResetError): return False if __name__ == "__main__": sys.exit(main())