Keep the Python sidecar connection open for multiple newline-delimited requests, add daemon-side request timeout/fallback tests, and document the opt-in Headroom sidecar contract.\n\nChecks: ./scripts/check-format.sh; cargo fmt --check; python3 -m py_compile scripts/headroom-sidecar.py; git diff --check; cargo test -p colibri-daemon cost -- --nocapture; cargo test -p colibri-daemon session:: -- --nocapture; cargo test -p colibri-daemon --all-targets; cargo check -p colibri-daemon; manual sidecar two-request smoke using a headroom-capable Python env.
163 lines
4.5 KiB
Python
163 lines
4.5 KiB
Python
#!/usr/bin/env python3
|
|
"""Headroom compression sidecar for Colibri daemon.
|
|
|
|
Listens on a Unix domain socket. Accepts newline-delimited JSON requests,
|
|
compresses the tool output via headroom, and returns the compressed result.
|
|
|
|
Protocol:
|
|
→ {"id": "uuid", "raw": "tool output text...", "role": "tool"}\n
|
|
← {"id": "uuid", "compressed": "...", "tokens_before": N, "tokens_after": M}\n
|
|
|
|
Exit codes:
|
|
0 — clean shutdown (SIGTERM)
|
|
1 — startup failure (socket bind error, import error)
|
|
2 — runtime panic (unhandled exception in event loop)
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import signal
|
|
import socket
|
|
import sys
|
|
import uuid
|
|
|
|
SOCKET_BACKLOG = 5
|
|
MAX_MESSAGE_BYTES = 1_048_576 # 1 MiB
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Headroom compression sidecar")
|
|
parser.add_argument(
|
|
"--socket",
|
|
default="/var/run/colibri/headroom.sock",
|
|
help="Unix socket path",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
sock_path = args.socket
|
|
_ensure_socket_dir(sock_path)
|
|
|
|
# Late import so --help is fast
|
|
from headroom import compress
|
|
|
|
running = True
|
|
|
|
def _handle_signal(signum, frame):
|
|
nonlocal running
|
|
running = False
|
|
|
|
signal.signal(signal.SIGTERM, _handle_signal)
|
|
signal.signal(signal.SIGINT, _handle_signal)
|
|
|
|
_remove_stale_socket(sock_path)
|
|
|
|
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
server.bind(sock_path)
|
|
os.chmod(sock_path, 0o660)
|
|
server.listen(SOCKET_BACKLOG)
|
|
server.settimeout(1.0) # wake up every second to check shutdown flag
|
|
|
|
print(f"headroom-sidecar listening on {sock_path}", flush=True)
|
|
|
|
while running:
|
|
try:
|
|
conn, _addr = server.accept()
|
|
except socket.timeout:
|
|
continue
|
|
|
|
try:
|
|
_handle_connection(conn, compress)
|
|
except Exception:
|
|
# Don't crash the whole server on one bad message
|
|
pass
|
|
finally:
|
|
conn.close()
|
|
|
|
_remove_stale_socket(sock_path)
|
|
return 0
|
|
|
|
|
|
def _ensure_socket_dir(sock_path: str) -> None:
|
|
sock_dir = os.path.dirname(sock_path)
|
|
os.makedirs(sock_dir, exist_ok=True)
|
|
|
|
|
|
def _remove_stale_socket(sock_path: str) -> None:
|
|
try:
|
|
os.unlink(sock_path)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def _handle_connection(conn: socket.socket, compress_fn) -> None:
|
|
"""Read newline-delimited requests until the client disconnects.
|
|
|
|
Colibri keeps one sidecar connection open and reuses it across tool results,
|
|
so this handler must process more than one request per accepted socket.
|
|
"""
|
|
buf = b""
|
|
while True:
|
|
try:
|
|
chunk = conn.recv(4096)
|
|
except (ConnectionResetError, BrokenPipeError):
|
|
return
|
|
if not chunk:
|
|
return
|
|
|
|
buf += chunk
|
|
if len(buf) > MAX_MESSAGE_BYTES and b"\n" not in buf:
|
|
_write_error(conn, f"message too large (>{MAX_MESSAGE_BYTES} bytes)")
|
|
return
|
|
|
|
while b"\n" in buf:
|
|
line, buf = buf.split(b"\n", 1)
|
|
if not line:
|
|
continue
|
|
if len(line) > MAX_MESSAGE_BYTES:
|
|
_write_error(conn, f"message too large (>{MAX_MESSAGE_BYTES} bytes)")
|
|
return
|
|
if not _handle_request_line(conn, line, compress_fn):
|
|
return
|
|
|
|
|
|
def _handle_request_line(conn: socket.socket, line: bytes, compress_fn) -> bool:
|
|
"""Handle one JSON request line. Returns False when the connection is dead."""
|
|
try:
|
|
request = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
return _write_error(conn, "invalid json")
|
|
|
|
req_id = request.get("id", str(uuid.uuid4()))
|
|
raw = request.get("raw", "")
|
|
role = request.get("role", "tool")
|
|
|
|
try:
|
|
messages = [{"role": role, "content": raw}]
|
|
result = compress_fn(messages)
|
|
response = {
|
|
"id": req_id,
|
|
"compressed": result.messages[0]["content"],
|
|
"tokens_before": result.tokens_before,
|
|
"tokens_after": result.tokens_after,
|
|
}
|
|
except Exception as exc:
|
|
return _write_error(conn, str(exc))
|
|
|
|
try:
|
|
conn.sendall(json.dumps(response).encode() + b"\n")
|
|
return True
|
|
except (BrokenPipeError, ConnectionResetError):
|
|
return False
|
|
|
|
|
|
def _write_error(conn: socket.socket, message: str) -> bool:
|
|
try:
|
|
conn.sendall(json.dumps({"error": message}).encode() + b"\n")
|
|
return True
|
|
except (BrokenPipeError, ConnectionResetError):
|
|
return False
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|