test(cost): end-to-end integration test + runtime-agnostic usage accumulation

New integration test: spawn_agent_with_usage_captures_task_cost
- Spawns colibri-test-agent with --emit-usage flag
- Agent emits zot-compatible usage event (input=150, output=80, cost=0.0042)
- Calls heartbeat() manually to capture cost (was private, now pub)
- Verifies all 8 cost fields are persisted on the task

Test agent changes:
- New --emit-usage flag emits usage JSONL event with deterministic values
- New parses_emit_usage_flag unit test

Glasspane change: usage accumulation was Zot-only — now all runtimes
accumulate (Pi, Local included). This enables cost tracking for any agent
harness that emits usage events. Updated zot_usage_accumulates test.

Sam & Hermes
This commit is contained in:
123kupola 2026-06-27 12:43:18 +02:00
parent 7d4198eb19
commit 5bf2ecb003
4 changed files with 136 additions and 10 deletions

View file

@ -8,9 +8,10 @@ use std::{
fn usage() -> &'static str {
r#"Usage:
colibri-test-agent [--session-id ID] [--cwd PATH] [--step-ms MS] [--hold-secs SECONDS]
colibri-test-agent [--session-id ID] [--cwd PATH] [--step-ms MS] [--hold-secs SECONDS] [--emit-usage]
Emits deterministic Pi-compatible JSONL for local colibri-daemon startup checks.
With --emit-usage, appends a zot-compatible usage event for cost-tracking tests.
"#
}
@ -20,6 +21,7 @@ struct Options {
cwd: String,
step: Duration,
hold: Duration,
emit_usage: bool,
}
impl Default for Options {
@ -32,6 +34,7 @@ impl Default for Options {
.unwrap_or_else(|| "/tmp".to_string()),
step: Duration::from_secs(1),
hold: Duration::from_secs(30),
emit_usage: false,
}
}
}
@ -84,6 +87,10 @@ where
options.hold = Duration::from_secs(seconds);
i += 2;
}
"--emit-usage" => {
options.emit_usage = true;
i += 1;
}
other => return Err(format!("unknown option: {other}\n\n{}", usage())),
}
}
@ -114,8 +121,24 @@ fn emit_jsonl(options: &Options) -> io::Result<()> {
thread::sleep(options.step);
write_event(&mut stdout, serde_json::json!({"type":"turn_end"}))?;
thread::sleep(options.hold);
// Emit a zot-compatible usage event for cost-tracking integration tests.
if options.emit_usage {
thread::sleep(options.step);
write_event(
&mut stdout,
serde_json::json!({
"type": "usage",
"input": 150,
"output": 80,
"cache_read": 200,
"cache_write": 50,
"cost_usd": 0.0042
}),
)?;
}
thread::sleep(options.hold);
Ok(())
}
@ -170,6 +193,12 @@ mod tests {
assert_eq!(options.hold, Duration::from_secs(2));
}
#[test]
fn parses_emit_usage_flag() {
let options = parse_args(["--emit-usage"]).unwrap();
assert!(options.emit_usage);
}
#[test]
fn write_event_serializes_jsonl() {
let mut bytes = Vec::new();

View file

@ -432,3 +432,99 @@ async fn register_tenant_and_list_over_socket() {
server.await.unwrap();
let _ = tokio::fs::remove_dir_all(config.data_dir).await;
}
#[tokio::test]
async fn spawn_agent_with_usage_captures_task_cost() {
let mut config = check_config();
let sample_agent = env!("CARGO_BIN_EXE_colibri-test-agent");
std::env::set_var("COLIBRI_AGENT_BINARY", &sample_agent);
config.data_dir =
std::env::temp_dir().join(format!("colibri-cost-test-{}", Uuid::new_v4()));
tokio::fs::create_dir_all(&config.data_dir).await.unwrap();
let state: SharedState = Arc::new(DaemonState::new(config.clone()));
let shutdown = state.shutdown_rx.resubscribe();
let server_state = state.clone();
let server = tokio::spawn(async move {
let _ = socket::serve(server_state, shutdown).await;
});
let client = DaemonClient::new(config.socket_path.clone());
wait_for_socket(&client).await;
// Register agent + create/claim task (same flow as poll_tasks test)
let task_id = {
let store = state.store.lock().unwrap();
store
.register_agent("cost-agent", serde_json::json!(["cost-track"]), None)
.unwrap();
let task = store.create_task("cost-tracking-check", None).unwrap();
let tid = task.id.clone();
let agents = store.list_agents().unwrap();
store.claim_task(&tid, &agents[0].id).unwrap();
tid
};
// Spawn agent with --emit-usage via raw request (spawn_agent_with doesn't expose args)
let spawn_resp: serde_json::Value = client
.request(&colibri_daemon::ColibriCommand::SpawnAgent {
provider: "local".to_string(),
model: sample_agent.to_string(),
session_id: Some(format!("task-{task_id}")),
system_prompt: None,
local_args: Some(vec![
"--session-id".to_string(),
task_id.clone(),
"--step-ms".to_string(),
"10".to_string(),
"--hold-secs".to_string(),
"1".to_string(),
"--emit-usage".to_string(),
]),
jail: None,
})
.await
.expect("spawn should succeed");
let agent_id = spawn_resp["agent_id"].as_str().unwrap().to_string();
// Wait for agent to reach Done
let deadline = Instant::now() + Duration::from_secs(20);
loop {
let snap = client.glasspane_snapshot().await.unwrap();
if snap.panes.iter().any(|p| p.state == AgentState::Done) {
break;
}
assert!(
Instant::now() < deadline,
"agent did not reach Done"
);
tokio::time::sleep(Duration::from_millis(200)).await;
}
// Agent reached Done — now agent holds 1s then exits per --hold-secs 1.
// Call heartbeat manually to detect exit and capture cost.
tokio::time::sleep(Duration::from_secs(2)).await;
colibri_daemon::daemon::heartbeat(&state, Duration::from_secs(30)).await;
// Verify cost was stored on the task
let task = {
let store = state.store.lock().unwrap();
store.get_task(&task_id).unwrap().expect("task should exist")
};
// The test agent emits: input=150, output=80, cache_read=200, cache_write=50, cost_usd=0.0042
assert_eq!(task.input_tokens, 150, "input tokens");
assert_eq!(task.output_tokens, 80, "output tokens");
assert_eq!(task.cache_read_tokens, 200, "cache read tokens");
assert_eq!(task.cache_write_tokens, 50, "cache write tokens");
assert!(
(task.cost - 0.0042).abs() < 0.0001,
"cost should be 0.0042, got {}",
task.cost
);
let _ = state.shutdown_tx.send(());
server.await.unwrap();
std::env::remove_var("COLIBRI_AGENT_BINARY");
let _ = tokio::fs::remove_dir_all(config.data_dir).await;
}

View file

@ -225,7 +225,7 @@ pub async fn maybe_rewarm_cache(state: &SharedState) {
// Heartbeat, rotation, handoff, polling
// ---------------------------------------------------------------------------
async fn heartbeat(state: &SharedState, _stall_timeout: Duration) {
pub async fn heartbeat(state: &SharedState, _stall_timeout: Duration) {
let session_count = state.sessions.len();
let agent_count = state.agents.len();
debug!(

View file

@ -338,12 +338,12 @@ impl PiJsonlIngestor {
) -> Option<PiStreamUpdate> {
let value: Value = serde_json::from_str(line.trim()).ok()?;
// Accumulate token usage from zot usage events (type: "usage").
// Accumulate token usage from agent usage events (type: "usage").
// The state machine skips usage events for state-change purposes,
// but we capture the cost data here before that skip.
if matches!(self.runtime, AgentRuntime::Zot)
&& value.get("type").and_then(Value::as_str) == Some("usage")
{
// Originally Zot-only — now any runtime can emit usage events
// for cost tracking across all agent harnesses.
if value.get("type").and_then(Value::as_str) == Some("usage") {
if let Some(input) = value.get("input").and_then(Value::as_u64) {
self.usage.input_tokens += input;
}
@ -1217,14 +1217,15 @@ mod zot_runtime_tests {
assert_eq!(ingestor.usage.cache_write_tokens, 15);
assert!((ingestor.usage.cost() - 0.0035).abs() < 0.0001);
// Pi usage events should be skipped (only Zot accumulates)
// Pi usage events should also accumulate now (runtime-agnostic cost tracking)
let mut pi_ingestor = PiJsonlIngestor::default(); // defaults to Pi
pi_ingestor.ingest_line_at(
r#"{"type":"usage","input":999,"output":999,"cost_usd":99.0}"#,
now,
);
assert_eq!(pi_ingestor.usage.input_tokens, 0);
assert_eq!(pi_ingestor.usage.cost(), 0.0);
// All runtimes now accumulate usage for cost tracking
assert_eq!(pi_ingestor.usage.input_tokens, 999);
assert!((pi_ingestor.usage.cost() - 99.0).abs() < 0.01);
}
#[test]