diff --git a/crates/colibri-client/src/bin/colibri_test_agent.rs b/crates/colibri-client/src/bin/colibri_test_agent.rs index 7db2045..6986da3 100644 --- a/crates/colibri-client/src/bin/colibri_test_agent.rs +++ b/crates/colibri-client/src/bin/colibri_test_agent.rs @@ -8,9 +8,10 @@ use std::{ fn usage() -> &'static str { r#"Usage: - colibri-test-agent [--session-id ID] [--cwd PATH] [--step-ms MS] [--hold-secs SECONDS] + colibri-test-agent [--session-id ID] [--cwd PATH] [--step-ms MS] [--hold-secs SECONDS] [--emit-usage] Emits deterministic Pi-compatible JSONL for local colibri-daemon startup checks. +With --emit-usage, appends a zot-compatible usage event for cost-tracking tests. "# } @@ -20,6 +21,7 @@ struct Options { cwd: String, step: Duration, hold: Duration, + emit_usage: bool, } impl Default for Options { @@ -32,6 +34,7 @@ impl Default for Options { .unwrap_or_else(|| "/tmp".to_string()), step: Duration::from_secs(1), hold: Duration::from_secs(30), + emit_usage: false, } } } @@ -84,6 +87,10 @@ where options.hold = Duration::from_secs(seconds); i += 2; } + "--emit-usage" => { + options.emit_usage = true; + i += 1; + } other => return Err(format!("unknown option: {other}\n\n{}", usage())), } } @@ -114,8 +121,24 @@ fn emit_jsonl(options: &Options) -> io::Result<()> { thread::sleep(options.step); write_event(&mut stdout, serde_json::json!({"type":"turn_end"}))?; - thread::sleep(options.hold); + // Emit a zot-compatible usage event for cost-tracking integration tests. + if options.emit_usage { + thread::sleep(options.step); + write_event( + &mut stdout, + serde_json::json!({ + "type": "usage", + "input": 150, + "output": 80, + "cache_read": 200, + "cache_write": 50, + "cost_usd": 0.0042 + }), + )?; + } + + thread::sleep(options.hold); Ok(()) } @@ -170,6 +193,12 @@ mod tests { assert_eq!(options.hold, Duration::from_secs(2)); } + #[test] + fn parses_emit_usage_flag() { + let options = parse_args(["--emit-usage"]).unwrap(); + assert!(options.emit_usage); + } + #[test] fn write_event_serializes_jsonl() { let mut bytes = Vec::new(); diff --git a/crates/colibri-client/tests/live_socket_check.rs b/crates/colibri-client/tests/live_socket_check.rs index 9a6059b..e1305bb 100644 --- a/crates/colibri-client/tests/live_socket_check.rs +++ b/crates/colibri-client/tests/live_socket_check.rs @@ -432,3 +432,99 @@ async fn register_tenant_and_list_over_socket() { server.await.unwrap(); let _ = tokio::fs::remove_dir_all(config.data_dir).await; } + +#[tokio::test] +async fn spawn_agent_with_usage_captures_task_cost() { + let mut config = check_config(); + let sample_agent = env!("CARGO_BIN_EXE_colibri-test-agent"); + std::env::set_var("COLIBRI_AGENT_BINARY", &sample_agent); + config.data_dir = + std::env::temp_dir().join(format!("colibri-cost-test-{}", Uuid::new_v4())); + tokio::fs::create_dir_all(&config.data_dir).await.unwrap(); + + let state: SharedState = Arc::new(DaemonState::new(config.clone())); + let shutdown = state.shutdown_rx.resubscribe(); + let server_state = state.clone(); + let server = tokio::spawn(async move { + let _ = socket::serve(server_state, shutdown).await; + }); + + let client = DaemonClient::new(config.socket_path.clone()); + wait_for_socket(&client).await; + + // Register agent + create/claim task (same flow as poll_tasks test) + let task_id = { + let store = state.store.lock().unwrap(); + store + .register_agent("cost-agent", serde_json::json!(["cost-track"]), None) + .unwrap(); + let task = store.create_task("cost-tracking-check", None).unwrap(); + let tid = task.id.clone(); + let agents = store.list_agents().unwrap(); + store.claim_task(&tid, &agents[0].id).unwrap(); + tid + }; + + // Spawn agent with --emit-usage via raw request (spawn_agent_with doesn't expose args) + let spawn_resp: serde_json::Value = client + .request(&colibri_daemon::ColibriCommand::SpawnAgent { + provider: "local".to_string(), + model: sample_agent.to_string(), + session_id: Some(format!("task-{task_id}")), + system_prompt: None, + local_args: Some(vec![ + "--session-id".to_string(), + task_id.clone(), + "--step-ms".to_string(), + "10".to_string(), + "--hold-secs".to_string(), + "1".to_string(), + "--emit-usage".to_string(), + ]), + jail: None, + }) + .await + .expect("spawn should succeed"); + let agent_id = spawn_resp["agent_id"].as_str().unwrap().to_string(); + + // Wait for agent to reach Done + let deadline = Instant::now() + Duration::from_secs(20); + loop { + let snap = client.glasspane_snapshot().await.unwrap(); + if snap.panes.iter().any(|p| p.state == AgentState::Done) { + break; + } + assert!( + Instant::now() < deadline, + "agent did not reach Done" + ); + tokio::time::sleep(Duration::from_millis(200)).await; + } + + // Agent reached Done — now agent holds 1s then exits per --hold-secs 1. + // Call heartbeat manually to detect exit and capture cost. + tokio::time::sleep(Duration::from_secs(2)).await; + colibri_daemon::daemon::heartbeat(&state, Duration::from_secs(30)).await; + + // Verify cost was stored on the task + let task = { + let store = state.store.lock().unwrap(); + store.get_task(&task_id).unwrap().expect("task should exist") + }; + + // The test agent emits: input=150, output=80, cache_read=200, cache_write=50, cost_usd=0.0042 + assert_eq!(task.input_tokens, 150, "input tokens"); + assert_eq!(task.output_tokens, 80, "output tokens"); + assert_eq!(task.cache_read_tokens, 200, "cache read tokens"); + assert_eq!(task.cache_write_tokens, 50, "cache write tokens"); + assert!( + (task.cost - 0.0042).abs() < 0.0001, + "cost should be 0.0042, got {}", + task.cost + ); + + let _ = state.shutdown_tx.send(()); + server.await.unwrap(); + std::env::remove_var("COLIBRI_AGENT_BINARY"); + let _ = tokio::fs::remove_dir_all(config.data_dir).await; +} diff --git a/crates/colibri-daemon/src/daemon.rs b/crates/colibri-daemon/src/daemon.rs index 4fcb346..1ecb1f4 100644 --- a/crates/colibri-daemon/src/daemon.rs +++ b/crates/colibri-daemon/src/daemon.rs @@ -225,7 +225,7 @@ pub async fn maybe_rewarm_cache(state: &SharedState) { // Heartbeat, rotation, handoff, polling // --------------------------------------------------------------------------- -async fn heartbeat(state: &SharedState, _stall_timeout: Duration) { +pub async fn heartbeat(state: &SharedState, _stall_timeout: Duration) { let session_count = state.sessions.len(); let agent_count = state.agents.len(); debug!( diff --git a/crates/colibri-glasspane/src/lib.rs b/crates/colibri-glasspane/src/lib.rs index fb78520..e71fdd5 100644 --- a/crates/colibri-glasspane/src/lib.rs +++ b/crates/colibri-glasspane/src/lib.rs @@ -338,12 +338,12 @@ impl PiJsonlIngestor { ) -> Option { let value: Value = serde_json::from_str(line.trim()).ok()?; - // Accumulate token usage from zot usage events (type: "usage"). + // Accumulate token usage from agent usage events (type: "usage"). // The state machine skips usage events for state-change purposes, // but we capture the cost data here before that skip. - if matches!(self.runtime, AgentRuntime::Zot) - && value.get("type").and_then(Value::as_str) == Some("usage") - { + // Originally Zot-only — now any runtime can emit usage events + // for cost tracking across all agent harnesses. + if value.get("type").and_then(Value::as_str) == Some("usage") { if let Some(input) = value.get("input").and_then(Value::as_u64) { self.usage.input_tokens += input; } @@ -1217,14 +1217,15 @@ mod zot_runtime_tests { assert_eq!(ingestor.usage.cache_write_tokens, 15); assert!((ingestor.usage.cost() - 0.0035).abs() < 0.0001); - // Pi usage events should be skipped (only Zot accumulates) + // Pi usage events should also accumulate now (runtime-agnostic cost tracking) let mut pi_ingestor = PiJsonlIngestor::default(); // defaults to Pi pi_ingestor.ingest_line_at( r#"{"type":"usage","input":999,"output":999,"cost_usd":99.0}"#, now, ); - assert_eq!(pi_ingestor.usage.input_tokens, 0); - assert_eq!(pi_ingestor.usage.cost(), 0.0); + // All runtimes now accumulate usage for cost tracking + assert_eq!(pi_ingestor.usage.input_tokens, 999); + assert!((pi_ingestor.usage.cost() - 99.0).abs() < 0.01); } #[test]