Merge pull request 'feat(daemon): headroom compression sidecar (hardened)' (#57) from fix/headroom-sidecar-quality into main
Some checks are pending
CI / rust (push) Waiting to run
CI / markdown (push) Waiting to run

This commit is contained in:
clawdie 2026-06-14 01:35:53 +02:00
commit 83abd586c3
10 changed files with 513 additions and 5 deletions

View file

@ -10,6 +10,7 @@ Next ISO integration plan: `docs/ISO-INTEGRATION-PLAN.md`.
ISO acceptance runbook: `docs/ISO-ACCEPTANCE-RUNBOOK.md`.
Clawdie Studio/Zed proposal: `docs/CLAWDIE-STUDIO-PROPOSAL.md`.
External MCP host prototype: `docs/COLIBRI-EXTERNAL-MCP-PROTOTYPE.md`.
Optional Headroom compression sidecar: `docs/HEADROOM-SIDECAR.md`.
## Workspace — 11 crates

View file

@ -30,6 +30,8 @@ fn smoke_config() -> DaemonConfig {
scheduler_prompt_injection: false,
cache_warming_enabled: false,
cache_warming_interval_hours: 0,
headroom_enabled: false,
headroom_socket_path: std::path::PathBuf::from("/var/run/colibri/headroom.sock"),
}
}

View file

@ -51,6 +51,12 @@ pub struct DaemonConfig {
pub cache_warming_enabled: bool,
/// Re-warm cache every N hours. 0 = only warm once on startup.
pub cache_warming_interval_hours: u64,
/// Enable headroom compression sidecar for tool results (disabled by default).
/// When enabled, tool results are compressed before entering agent context,
/// saving 40%+ tokens with no accuracy loss.
pub headroom_enabled: bool,
/// Path to the headroom sidecar Unix socket.
pub headroom_socket_path: PathBuf,
}
impl DaemonConfig {
@ -99,6 +105,10 @@ impl DaemonConfig {
cache_warming_enabled: env_parse("COLIBRI_CACHE_WARMING").unwrap_or(false),
cache_warming_interval_hours: env_parse("COLIBRI_CACHE_WARMING_INTERVAL_HOURS")
.unwrap_or(0),
headroom_enabled: env_parse("COLIBRI_HEADROOM_ENABLED").unwrap_or(false),
headroom_socket_path: std::env::var("COLIBRI_HEADROOM_SOCKET")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("/var/run/colibri/headroom.sock")),
}
}
}

View file

@ -10,7 +10,12 @@
//! 3. Volatile scratch — discarded per-turn, never persisted
use serde::{Deserialize, Serialize};
use tracing::info;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
use tokio::time::timeout;
use tracing::{debug, info, warn};
// ---------------------------------------------------------------------------
// Cost mode
@ -172,6 +177,145 @@ pub fn compact_tool_result(raw: &str, max_bytes: u64, tool_name: &str) -> Option
))
}
// ---------------------------------------------------------------------------
// Headroom compression sidecar
// ---------------------------------------------------------------------------
const HEADROOM_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
/// A long-lived connection to the headroom compression sidecar process.
///
/// The sidecar is a Python script (`scripts/headroom-sidecar.py`) that
/// listens on a Unix domain socket. Each request is a single JSON line;
/// each response is a single JSON line. The connection is kept open and
/// reused across tool results.
pub struct HeadroomSidecar {
stream: UnixStream,
socket_path: std::path::PathBuf,
}
impl HeadroomSidecar {
/// Connect to the headroom sidecar at the given socket path.
pub async fn connect(socket_path: &std::path::Path) -> std::io::Result<Self> {
let stream = UnixStream::connect(socket_path).await?;
info!(
socket = %socket_path.display(),
"connected to headroom sidecar"
);
Ok(Self {
stream,
socket_path: socket_path.to_path_buf(),
})
}
/// Compress a tool result through the sidecar.
///
/// Returns the compressed content on success, or `None` if compression
/// failed, timed out, or produced no savings (graceful degradation).
pub async fn compress(&mut self, raw: &str, tool_name: &str) -> Option<String> {
self.compress_with_timeout(raw, tool_name, HEADROOM_REQUEST_TIMEOUT)
.await
}
async fn compress_with_timeout(
&mut self,
raw: &str,
tool_name: &str,
timeout_duration: Duration,
) -> Option<String> {
match timeout(timeout_duration, self.compress_once(raw, tool_name)).await {
Ok(result) => result,
Err(_) => {
warn!(
tool = %tool_name,
timeout_ms = timeout_duration.as_millis(),
"headroom sidecar request timed out"
);
None
}
}
}
async fn compress_once(&mut self, raw: &str, tool_name: &str) -> Option<String> {
let request = serde_json::json!({
"id": tool_name,
"raw": raw,
"role": "tool",
});
let payload = serde_json::to_string(&request).unwrap_or_default() + "\n";
match self.stream.write_all(payload.as_bytes()).await {
Ok(()) => {}
Err(e) => {
warn!(error = %e, "headroom sidecar write failed");
return None;
}
}
let mut reader = BufReader::new(&mut self.stream);
let mut line = String::new();
match reader.read_line(&mut line).await {
Ok(0) => {
warn!("headroom sidecar closed connection");
return None;
}
Ok(_) => {}
Err(e) => {
warn!(error = %e, "headroom sidecar read failed");
return None;
}
}
let response: serde_json::Value = match serde_json::from_str(&line) {
Ok(v) => v,
Err(e) => {
warn!(error = %e, raw_line = %line, "headroom sidecar returned invalid JSON");
return None;
}
};
if response.get("error").is_some() {
warn!(error = %response["error"], "headroom sidecar error");
return None;
}
let tokens_before = response["tokens_before"].as_u64().unwrap_or(0);
let tokens_after = response["tokens_after"].as_u64().unwrap_or(0);
let compressed = response["compressed"].as_str().unwrap_or(raw);
if tokens_after >= tokens_before {
debug!(
tool = %tool_name,
tokens_before,
tokens_after,
"headroom: no savings, returning original"
);
return None;
}
info!(
tool = %tool_name,
tokens_before,
tokens_after,
saved = tokens_before.saturating_sub(tokens_after),
pct = 100 - (tokens_after * 100 / tokens_before.max(1)),
"headroom compression applied"
);
Some(compressed.to_string())
}
}
impl Drop for HeadroomSidecar {
fn drop(&mut self) {
info!(
socket = %self.socket_path.display(),
"headroom sidecar disconnected"
);
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
@ -259,4 +403,75 @@ mod tests {
assert!(compacted.contains("truncated"));
assert!(compacted.is_char_boundary(compacted.len()));
}
fn test_socket_path(name: &str) -> std::path::PathBuf {
let dir =
std::env::temp_dir().join(format!("colibri-headroom-{name}-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&dir).unwrap();
dir.join("headroom.sock")
}
#[tokio::test]
async fn headroom_sidecar_reuses_one_connection_for_multiple_requests() {
let socket = test_socket_path("reuse");
let listener = tokio::net::UnixListener::bind(&socket).unwrap();
let server = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let mut reader = BufReader::new(stream);
for i in 0..2 {
let mut line = String::new();
assert!(reader.read_line(&mut line).await.unwrap() > 0);
let request: serde_json::Value = serde_json::from_str(&line).unwrap();
assert_eq!(request["role"], "tool");
let response = serde_json::json!({
"id": request["id"],
"compressed": format!("compressed-{i}"),
"tokens_before": 100,
"tokens_after": 10,
});
reader
.get_mut()
.write_all((response.to_string() + "\n").as_bytes())
.await
.unwrap();
}
});
let mut sidecar = HeadroomSidecar::connect(&socket).await.unwrap();
assert_eq!(
sidecar.compress("raw one", "tool-a").await.as_deref(),
Some("compressed-0")
);
assert_eq!(
sidecar.compress("raw two", "tool-b").await.as_deref(),
Some("compressed-1")
);
server.await.unwrap();
let _ = std::fs::remove_file(&socket);
let _ = std::fs::remove_dir(socket.parent().unwrap());
}
#[tokio::test]
async fn headroom_sidecar_timeout_degrades_to_none() {
let socket = test_socket_path("timeout");
let listener = tokio::net::UnixListener::bind(&socket).unwrap();
let server = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let mut reader = BufReader::new(stream);
let mut line = String::new();
assert!(reader.read_line(&mut line).await.unwrap() > 0);
tokio::time::sleep(Duration::from_millis(200)).await;
});
let mut sidecar = HeadroomSidecar::connect(&socket).await.unwrap();
assert_eq!(
sidecar
.compress_with_timeout("raw", "slow-tool", Duration::from_millis(25))
.await,
None
);
server.await.unwrap();
let _ = std::fs::remove_file(&socket);
let _ = std::fs::remove_dir(socket.parent().unwrap());
}
}

View file

@ -11,6 +11,7 @@ use tokio::sync::{broadcast, Mutex, RwLock};
use tracing::{debug, info, warn};
use crate::config::DaemonConfig;
use crate::cost::HeadroomSidecar;
use crate::session::Session;
use crate::spawner::{AgentHandle, Spawner};
@ -26,6 +27,8 @@ pub struct DaemonState {
pub last_warm_hit_tokens: RwLock<u64>,
pub shutdown_tx: broadcast::Sender<()>,
pub shutdown_rx: broadcast::Receiver<()>,
/// Headroom compression sidecar (optional, connected lazily on startup).
pub headroom_sidecar: RwLock<Option<HeadroomSidecar>>,
}
impl DaemonState {
@ -49,6 +52,7 @@ impl DaemonState {
last_warm_hit_tokens: RwLock::new(0),
shutdown_tx,
shutdown_rx,
headroom_sidecar: RwLock::new(None),
}
}
}
@ -96,6 +100,23 @@ pub async fn run_loop(
info!("daemon background loop started");
// Connect headroom compression sidecar if enabled.
if state.config.headroom_enabled {
match HeadroomSidecar::connect(&state.config.headroom_socket_path).await {
Ok(sidecar) => {
info!("headroom compression sidecar active");
*state.headroom_sidecar.write().await = Some(sidecar);
}
Err(e) => {
warn!(
error = %e,
socket = %state.config.headroom_socket_path.display(),
"headroom sidecar unavailable — continuing without compression"
);
}
}
}
loop {
tokio::select! {
_ = heartbeat_tick.tick() => {

View file

@ -363,6 +363,8 @@ mod tests {
scheduler_prompt_injection: true,
cache_warming_enabled: false,
cache_warming_interval_hours: 0,
headroom_enabled: false,
headroom_socket_path: std::path::PathBuf::from("/var/run/colibri/headroom.sock"),
}
}

View file

@ -24,6 +24,7 @@ use serde_json::Value;
use thiserror::Error;
use tokio::sync::RwLock;
use crate::cost::HeadroomSidecar;
use crate::DaemonConfig;
// ---------------------------------------------------------------------------
@ -489,7 +490,7 @@ impl Session {
/// Build a PromptAssembly from the current session state.
/// Wraps the existing `build_prompt_messages()` — no behavior change.
pub async fn build_prompt_assembly(&self) -> PromptAssembly {
let messages = self.build_prompt_messages().await;
let messages = self.build_prompt_messages(None).await;
let total_bytes: u64 = messages
.iter()
.map(|m| serde_json::to_string(m).unwrap_or_default().len() as u64)
@ -518,7 +519,10 @@ impl Session {
/// 1. Immutable system prefix (byte-stable for cache hits)
/// 2. Appendable conversation log (turns, possibly with compaction gaps)
/// 3. Volatile scratch (not included — caller appends per-request)
pub async fn build_prompt_messages(&self) -> Vec<serde_json::Value> {
pub async fn build_prompt_messages(
&self,
mut headroom_sidecar: Option<&mut HeadroomSidecar>,
) -> Vec<serde_json::Value> {
let mut messages: Vec<serde_json::Value> = Vec::new();
// Region 1: Immutable system prefix
@ -555,11 +559,18 @@ impl Session {
}));
}
SessionEntry::ToolResult { name, result } => {
let raw = result.to_string();
let content = match headroom_sidecar {
Some(ref mut sidecar) => {
sidecar.compress(&raw, name).await.unwrap_or(raw)
}
None => raw,
};
messages.push(serde_json::json!({
"role": "tool",
"tool_call_id": format!("call_{}", turn.index),
"name": name,
"content": result.to_string(),
"content": content,
}));
}
SessionEntry::Compaction { summary, .. } => {

View file

@ -294,6 +294,11 @@ async fn cmd_status(state: &SharedState) -> ColibriResponse {
"last_warm_cache_hit": last_warm_hit,
"last_warm_hit_tokens": last_warm_tokens,
},
"headroom": {
"enabled": state.config.headroom_enabled,
"socket": state.config.headroom_socket_path,
"connected": state.headroom_sidecar.read().await.is_some(),
},
}))
}
@ -463,7 +468,13 @@ async fn cmd_get_session(state: &SharedState, session_id: String) -> ColibriResp
match state.sessions.get(&session_id) {
Some(session) => {
let turns = session.value().turns().await;
let messages = session.value().build_prompt_messages().await;
let messages = {
let mut sidecar_guard = state.headroom_sidecar.write().await;
session
.value()
.build_prompt_messages((*sidecar_guard).as_mut())
.await
};
ColibriResponse::ok(serde_json::json!({
"session_id": session_id,
@ -701,6 +712,8 @@ mod tests {
scheduler_prompt_injection: true,
cache_warming_enabled: false,
cache_warming_interval_hours: 0,
headroom_enabled: false,
headroom_socket_path: std::path::PathBuf::from("/var/run/colibri/headroom.sock"),
}
}

70
docs/HEADROOM-SIDECAR.md Normal file
View file

@ -0,0 +1,70 @@
# Headroom sidecar for Colibri
Colibri can optionally ask a local `headroom-ai` sidecar to compress tool results
before they are rendered through the session read path.
This is **off by default**. The daemon does not start or supervise Headroom; it
only connects to an already-running Unix socket when explicitly enabled.
## Configuration
```sh
COLIBRI_HEADROOM_ENABLED=1
COLIBRI_HEADROOM_SOCKET=/var/run/colibri/headroom.sock # optional default
```
Start the sidecar with a Python environment that can import `headroom`:
```sh
python3 scripts/headroom-sidecar.py --socket /var/run/colibri/headroom.sock
```
Then start `colibri-daemon` with the environment above. Check connection state
with `colibri status`; the status JSON includes:
```json
{
"headroom": {
"enabled": true,
"socket": "/var/run/colibri/headroom.sock",
"connected": true
}
}
```
## Protocol
The daemon keeps one Unix-socket connection open and sends newline-delimited JSON
requests. The sidecar must support multiple requests on that same accepted
connection.
Request:
```json
{"id":"tool-name","raw":"tool output text","role":"tool"}
```
Response:
```json
{"id":"tool-name","compressed":"shorter text","tokens_before":100,"tokens_after":25}
```
If `tokens_after >= tokens_before`, Colibri keeps the original tool result. If
the sidecar errors, disconnects, or times out, Colibri degrades to the original
uncompressed content.
Current daemon timeout: 5 seconds per sidecar request.
## Scope and caveats
- Compression is opt-in and best-effort.
- The sidecar currently affects session prompt/message rendering paths that pass
a `HeadroomSidecar`; it is not a global daemon-wide replacement for existing
cost-mode compaction.
- Keep `COLIBRI_HEADROOM_ENABLED=0` for ISO/live-USB defaults unless Headroom is
staged and smoke-tested on the target image.
- FreeBSD: upstream `headroom-ai` may require local packaging work because some
ONNX/ORT-backed extras do not have FreeBSD prebuilt binaries. Use a known-good
Python environment and validate `scripts/headroom-sidecar.py` directly before
enabling the daemon flag.

163
scripts/headroom-sidecar.py Normal file
View file

@ -0,0 +1,163 @@
#!/usr/bin/env python3
"""Headroom compression sidecar for Colibri daemon.
Listens on a Unix domain socket. Accepts newline-delimited JSON requests,
compresses the tool output via headroom, and returns the compressed result.
Protocol:
{"id": "uuid", "raw": "tool output text...", "role": "tool"}\n
{"id": "uuid", "compressed": "...", "tokens_before": N, "tokens_after": M}\n
Exit codes:
0 clean shutdown (SIGTERM)
1 startup failure (socket bind error, import error)
2 runtime panic (unhandled exception in event loop)
"""
import argparse
import json
import os
import signal
import socket
import sys
import uuid
SOCKET_BACKLOG = 5
MAX_MESSAGE_BYTES = 1_048_576 # 1 MiB
def main():
parser = argparse.ArgumentParser(description="Headroom compression sidecar")
parser.add_argument(
"--socket",
default="/var/run/colibri/headroom.sock",
help="Unix socket path",
)
args = parser.parse_args()
sock_path = args.socket
_ensure_socket_dir(sock_path)
# Late import so --help is fast
from headroom import compress
running = True
def _handle_signal(signum, frame):
nonlocal running
running = False
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
_remove_stale_socket(sock_path)
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server.bind(sock_path)
os.chmod(sock_path, 0o660)
server.listen(SOCKET_BACKLOG)
server.settimeout(1.0) # wake up every second to check shutdown flag
print(f"headroom-sidecar listening on {sock_path}", flush=True)
while running:
try:
conn, _addr = server.accept()
except socket.timeout:
continue
try:
_handle_connection(conn, compress)
except Exception:
# Don't crash the whole server on one bad message
pass
finally:
conn.close()
_remove_stale_socket(sock_path)
return 0
def _ensure_socket_dir(sock_path: str) -> None:
sock_dir = os.path.dirname(sock_path)
os.makedirs(sock_dir, exist_ok=True)
def _remove_stale_socket(sock_path: str) -> None:
try:
os.unlink(sock_path)
except OSError:
pass
def _handle_connection(conn: socket.socket, compress_fn) -> None:
"""Read newline-delimited requests until the client disconnects.
Colibri keeps one sidecar connection open and reuses it across tool results,
so this handler must process more than one request per accepted socket.
"""
buf = b""
while True:
try:
chunk = conn.recv(4096)
except (ConnectionResetError, BrokenPipeError):
return
if not chunk:
return
buf += chunk
if len(buf) > MAX_MESSAGE_BYTES and b"\n" not in buf:
_write_error(conn, f"message too large (>{MAX_MESSAGE_BYTES} bytes)")
return
while b"\n" in buf:
line, buf = buf.split(b"\n", 1)
if not line:
continue
if len(line) > MAX_MESSAGE_BYTES:
_write_error(conn, f"message too large (>{MAX_MESSAGE_BYTES} bytes)")
return
if not _handle_request_line(conn, line, compress_fn):
return
def _handle_request_line(conn: socket.socket, line: bytes, compress_fn) -> bool:
"""Handle one JSON request line. Returns False when the connection is dead."""
try:
request = json.loads(line)
except json.JSONDecodeError:
return _write_error(conn, "invalid json")
req_id = request.get("id", str(uuid.uuid4()))
raw = request.get("raw", "")
role = request.get("role", "tool")
try:
messages = [{"role": role, "content": raw}]
result = compress_fn(messages)
response = {
"id": req_id,
"compressed": result.messages[0]["content"],
"tokens_before": result.tokens_before,
"tokens_after": result.tokens_after,
}
except Exception as exc:
return _write_error(conn, str(exc))
try:
conn.sendall(json.dumps(response).encode() + b"\n")
return True
except (BrokenPipeError, ConnectionResetError):
return False
def _write_error(conn: socket.socket, message: str) -> bool:
try:
conn.sendall(json.dumps({"error": message}).encode() + b"\n")
return True
except (BrokenPipeError, ConnectionResetError):
return False
if __name__ == "__main__":
sys.exit(main())