diff --git a/crates/colibri-client/src/lib.rs b/crates/colibri-client/src/lib.rs index 52c0140..eddb99b 100644 --- a/crates/colibri-client/src/lib.rs +++ b/crates/colibri-client/src/lib.rs @@ -170,6 +170,16 @@ impl DaemonClient { self.request(&ColibriCommand::ListTasks { status }).await } + pub async fn get_task_eval( + &self, + task_id: impl Into, + ) -> Result { + self.request(&ColibriCommand::GetTaskEval { + task_id: task_id.into(), + }) + .await + } + pub async fn create_task( &self, title: impl Into, diff --git a/crates/colibri-daemon/src/daemon.rs b/crates/colibri-daemon/src/daemon.rs index 6688526..d9710f6 100644 --- a/crates/colibri-daemon/src/daemon.rs +++ b/crates/colibri-daemon/src/daemon.rs @@ -271,46 +271,21 @@ pub async fn heartbeat(state: &SharedState, _stall_timeout: Duration) { }); (u, proof) }; - if let Some(u) = usage { - let tc = colibri_store::TaskCost { - provider: Some(handle.config.provider.as_str().to_string()), - model: Some(handle.config.model.clone()), - input_tokens: u.input_tokens, - output_tokens: u.output_tokens, - cache_read_tokens: u.cache_read_tokens, - cache_write_tokens: u.cache_write_tokens, - cost: u.cost(), + if let Some(usage) = usage { + // T2.x: central hook point for post-task accounting. + // Called from heartbeat when an agent exits (poll_exit), and from + // future RPC-completion paths for persistent agents. + let completion = TaskCompletion { + task_id: task_id.to_string(), + agent_id: handle.id.clone(), + provider: handle.config.provider.as_str().to_string(), + model: handle.config.model.clone(), success: status.success(), + usage, + proof_text, }; - let store = match state.store.try_lock() { - Ok(s) => s, - Err(std::sync::TryLockError::WouldBlock) => { - warn!(task_id = %task_id, "store lock busy; cost capture deferred to next heartbeat"); - continue; - } - Err(std::sync::TryLockError::Poisoned(e)) => { - warn!(task_id = %task_id, "store lock poisoned (prior panic); recovering inner store"); - e.into_inner() - } - }; - match store.set_task_cost(task_id, &tc) { - Ok(t) => { - info!( - task_id = %task_id, - input_tokens = u.input_tokens, - output_tokens = u.output_tokens, - cache_read = u.cache_read_tokens, - cost = u.cost(), - success = tc.success, - status = %t.status.as_str(), - "task cost captured" - ); - // Best-effort push to mother for dashboard aggregation. - push_cost_to_mother(task_id, &tc, proof_text.as_deref()); - } - Err(e) => { - warn!(task_id = %task_id, error = %e, "failed to write task cost") - } + if let Err(e) = record_task_completion(state, &completion).await { + warn!(task_id = %task_id, error = %e, "failed to record task completion"); } } } @@ -326,6 +301,117 @@ pub async fn heartbeat(state: &SharedState, _stall_timeout: Duration) { } } +/// Data describing a completed task (for eval harness + cost capture). +/// +/// Consolidates arguments to `record_task_completion` so the function +/// signature is stable as new fields are added (e.g. eval_mode, eval_latency_ms). +/// Caller owns a `TaskCompletion`, daemon takes ownership and persists. +#[derive(Debug, Clone)] +pub struct TaskCompletion { + pub task_id: String, + pub agent_id: String, + pub provider: String, + pub model: String, + pub success: bool, + pub usage: colibri_glasspane::PaneUsage, + pub proof_text: Option, +} + +/// Record task completion: write cost + eval records. +/// +/// This is the central hook point for post-task accounting. Called from +/// `heartbeat` when an agent process exits (`poll_exit`), and callable +/// from future RPC-completion paths (e.g., persistent agents finishing +/// via RPC instead of process exit). +/// +/// Writes two records: +/// - `task_costs`: usage tokens, provider, model, success status +/// - `task_evals`: T2.x quality signal (agent self-report for now) +/// +/// Returns Ok(true) if cost was written, Ok(false) if deferred, Err on failure. +async fn record_task_completion(state: &SharedState, tc: &TaskCompletion) -> Result { + let TaskCompletion { + task_id, + agent_id, + provider, + model, + success, + usage, + proof_text, + } = tc; + // Build task cost record + let task_cost = colibri_store::TaskCost { + provider: Some(provider.clone()), + model: Some(model.clone()), + input_tokens: usage.input_tokens, + output_tokens: usage.output_tokens, + cache_read_tokens: usage.cache_read_tokens, + cache_write_tokens: usage.cache_write_tokens, + cost: usage.cost(), + success: *success, + }; + + // Acquire store lock + let store_result = match state.store.try_lock() { + Ok(s) => s, + Err(std::sync::TryLockError::WouldBlock) => { + warn!(task_id = %task_id, "store lock busy; completion capture deferred to next heartbeat"); + return Ok(false); + } + Err(std::sync::TryLockError::Poisoned(e)) => { + warn!(task_id = %task_id, "store lock poisoned (prior panic); recovering inner store"); + e.into_inner() + } + }; + + // Write task cost + match store_result.set_task_cost(task_id, &task_cost) { + Ok(t) => { + info!( + task_id = %task_id, + input_tokens = usage.input_tokens, + output_tokens = usage.output_tokens, + cache_read = usage.cache_read_tokens, + cost = usage.cost(), + success = *success, + status = %t.status.as_str(), + "task cost captured" + ); + + // T2.x: Write eval record (agent self-report mode) + let eval = colibri_store::TaskEval { + task_id: task_id.to_string(), + agent_id: Some(agent_id.to_string()), + eval_mode: "agent".to_string(), + completion_status: if *success { "success" } else { "fail" }.to_string(), + quality_score: Some(if *success { 1.0 } else { 0.0 }), + correctness_check: if *success { "pass" } else { "fail" }.to_string(), + eval_provider: Some(format!("{}:{}", provider, model)), + eval_latency_ms: None, // Future: track task duration + eval_cost_usd: 0.0, // Self-report is free + evaluated_at: chrono::Utc::now().to_rfc3339(), + }; + match store_result.write_task_eval(&eval) { + Ok(_) => { + debug!(task_id = %task_id, "task eval recorded (agent self-report)"); + } + Err(e) => { + warn!(task_id = %task_id, error = %e, "failed to write task eval"); + } + } + + // Best-effort push to mother for dashboard aggregation + push_cost_to_mother(task_id, &task_cost, proof_text.as_deref()); + + Ok(true) + } + Err(e) => { + warn!(task_id = %task_id, error = %e, "failed to write task cost"); + Err(format!("failed to write task cost: {}", e)) + } + } +} + /// Best-effort push of task cost to mother node for dashboard aggregation. /// /// SSH's to mother with `report-task-cost` forced command, pipes TaskCost JSON diff --git a/crates/colibri-daemon/src/lib.rs b/crates/colibri-daemon/src/lib.rs index e569f05..7bd554f 100644 --- a/crates/colibri-daemon/src/lib.rs +++ b/crates/colibri-daemon/src/lib.rs @@ -69,6 +69,8 @@ pub enum ColibriCommand { TransitionTask { task_id: String, status: String }, #[serde(rename = "claim-task")] ClaimTask { task_id: String, agent_id: String }, + #[serde(rename = "get-task-eval")] + GetTaskEval { task_id: String }, #[serde(rename = "list-agents")] ListAgents, #[serde(rename = "register-agent")] diff --git a/crates/colibri-daemon/src/socket.rs b/crates/colibri-daemon/src/socket.rs index 59fd3d5..70abbea 100644 --- a/crates/colibri-daemon/src/socket.rs +++ b/crates/colibri-daemon/src/socket.rs @@ -263,6 +263,7 @@ async fn dispatch(cmd: ColibriCommand, state: &SharedState) -> ColibriResponse { ColibriCommand::ClaimTask { task_id, agent_id } => { cmd_claim_task(state, task_id, agent_id).await } + ColibriCommand::GetTaskEval { task_id } => cmd_get_task_eval(state, task_id).await, ColibriCommand::ListAgents => cmd_list_agents(state).await, ColibriCommand::RegisterAgent { name, @@ -1285,6 +1286,14 @@ async fn cmd_claim_task(state: &SharedState, task_id: String, agent_id: String) } } +async fn cmd_get_task_eval(state: &SharedState, task_id: String) -> ColibriResponse { + match state.store.lock().unwrap().read_task_eval(&task_id) { + Ok(Some(eval)) => ColibriResponse::ok(serde_json::to_value(eval).unwrap_or_default()), + Ok(None) => ColibriResponse::err(format!("no eval found for task: {task_id}")), + Err(e) => ColibriResponse::err(format!("read task eval failed: {e}")), + } +} + async fn cmd_list_agents(state: &SharedState) -> ColibriResponse { match state.store.lock().unwrap().list_agents() { Ok(agents) => ColibriResponse::ok(serde_json::to_value(agents).unwrap_or_default()), diff --git a/crates/colibri-mcp/src/lib.rs b/crates/colibri-mcp/src/lib.rs index 50c1c50..f38b045 100644 --- a/crates/colibri-mcp/src/lib.rs +++ b/crates/colibri-mcp/src/lib.rs @@ -104,6 +104,20 @@ pub fn tool_list() -> Vec { } })), ), + json_tool( + "colibri_get_task_eval", + "Get task evaluation record with quality metrics and eval metadata", + Some(serde_json::json!({ + "type": "object", + "properties": { + "task_id": { + "type": "string", + "description": "Task ID to query evaluation for" + } + }, + "required": ["task_id"] + })), + ), json_tool( "colibri_list_skills", "List registered skills in the Colibri catalog", @@ -329,6 +343,14 @@ pub async fn dispatch_tool( let data = client.list_tasks(status).await.map_err(map_client_error)?; Ok(tool_text(data)) } + "colibri_get_task_eval" => { + let task_id = require_string(arguments, "task_id")?; + let data = client + .get_task_eval(task_id) + .await + .map_err(map_client_error)?; + Ok(tool_text(data)) + } "colibri_list_skills" => { let data = client.list_skills().await.map_err(map_client_error)?; Ok(tool_text(data)) diff --git a/crates/colibri-mcp/tests/tool_dispatch.rs b/crates/colibri-mcp/tests/tool_dispatch.rs index 3bf99ea..99b7dbf 100644 --- a/crates/colibri-mcp/tests/tool_dispatch.rs +++ b/crates/colibri-mcp/tests/tool_dispatch.rs @@ -255,6 +255,7 @@ fn tool_list_has_all_phase1_tools() { assert!(names.contains(&"colibri_list_task_costs")); assert!(names.contains(&"colibri_get_task")); + assert!(names.contains(&"colibri_get_task_eval")); assert!(names.contains(&"colibri_deploy_run")); assert!(names.contains(&"colibri_deploy_targets")); assert!(names.contains(&"colibri_wiki_search")); @@ -263,7 +264,7 @@ fn tool_list_has_all_phase1_tools() { assert!(names.contains(&"colibri_zfs_destroy_snapshot")); assert!(names.contains(&"colibri_pf_list_rules")); assert!(names.contains(&"colibri_pf_list_states")); - assert_eq!(names.len(), 20); + assert_eq!(names.len(), 21); } #[tokio::test] diff --git a/crates/colibri-store/src/lib.rs b/crates/colibri-store/src/lib.rs index 1794999..1fae620 100644 --- a/crates/colibri-store/src/lib.rs +++ b/crates/colibri-store/src/lib.rs @@ -191,6 +191,66 @@ pub struct TaskCost { pub success: bool, } +/// Multidimensional task evaluation record (T2.x eval harness). +/// +/// Records how well a task succeeded beyond just "exit code 0". The eval +/// harness populates this after task completion — either from agent +/// self-report, local LLM evaluation, cloud LLM evaluation, or skipped +/// (exit-code-only signal). +/// +/// Schema constraints (enforced by SQLite CHECK): +/// - `eval_mode` ∈ 'agent' | 'local-llm' | 'cloud-llm' | 'skipped' +/// - `completion_status` ∈ 'success' | 'partial' | 'fail' | 'silent-fail' +/// - `correctness_check` ∈ 'pass' | 'fail' | 'skipped' +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TaskEval { + pub task_id: String, + /// The agent that ran the task. None if the completing agent isn't + /// linked to a registered agent row (e.g. fire-and-forget tasks). + pub agent_id: Option, + /// How this eval was produced: 'agent' (self-report), 'local-llm', + /// 'cloud-llm', or 'skipped' (exit code only, no quality signal). + pub eval_mode: String, + /// Whether the task completed: 'success', 'partial', 'fail', or + /// 'silent-fail' (exit 0 but output was garbage). + pub completion_status: String, + /// Quality score 0.0..=1.0, or None if unverifiable. + pub quality_score: Option, + /// Output correctness: 'pass', 'fail', or 'skipped' if not checked. + pub correctness_check: String, + /// Provider:model that ran the eval (e.g. 'deepseek:deepseek-chat' + /// for agent self-report, 'local-deepseek-r1-7b' for local eval). + /// None for 'skipped' mode. + pub eval_provider: Option, + /// Eval latency in milliseconds. None for instant modes. + pub eval_latency_ms: Option, + /// Cost in USD to produce this eval. 0.0 for 'agent' and 'skipped'. + pub eval_cost_usd: f64, + /// When this eval was recorded (RFC3339 timestamp). + pub evaluated_at: String, +} + +/// Per-provider rollup of eval data, used by model-selection (Phase 3). +/// +/// Computed by `Store::eval_summary`. Counts and scores cover only +/// tasks with a non-null quality_score within the query window. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EvalSummary { + /// The eval provider string (e.g. 'deepseek:deepseek-chat', + /// 'local-deepseek-r1-7b', or None if the eval_provider was unset). + pub eval_provider: Option, + /// Number of tasks evaluated by this provider in the window. + pub count: u64, + /// Mean quality score across those tasks (0.0..=1.0), or None if zero. + pub avg_quality: Option, + /// Number of those tasks whose completion_status was 'success'. + pub n_success: u64, + /// Mean eval cost in USD. + pub avg_cost: f64, + /// Mean eval latency in milliseconds, or None if no timed evals. + pub avg_latency_ms: Option, +} + impl Store { /// Open (or create) the store at `db_path`. Runs migrations automatically. pub fn open(db_path: impl Into) -> Result { @@ -338,6 +398,118 @@ impl Store { .ok_or_else(|| StoreError::NotFound(task_id.to_string())) } + /// Write a task evaluation record (T2.x eval harness). + /// + /// Uses `INSERT OR REPLACE` so a higher-quality follow-up eval + /// (e.g. local-llm upgrade after initial 'skipped') can overwrite + /// the existing record for the same task_id. + pub fn write_task_eval(&self, te: &TaskEval) -> Result<()> { + self.conn.execute( + "INSERT OR REPLACE INTO task_evals + (task_id, agent_id, eval_mode, completion_status, quality_score, + correctness_check, eval_provider, eval_latency_ms, + eval_cost_usd, evaluated_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", + params![ + te.task_id, + te.agent_id, + te.eval_mode, + te.completion_status, + te.quality_score, + te.correctness_check, + te.eval_provider, + te.eval_latency_ms.map(|v| v as i64), + te.eval_cost_usd, + te.evaluated_at, + ], + )?; + Ok(()) + } + + /// Read the evaluation for a specific task, if any. + pub fn read_task_eval(&self, task_id: &str) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT task_id, agent_id, eval_mode, completion_status, + quality_score, correctness_check, eval_provider, + eval_latency_ms, eval_cost_usd, evaluated_at + FROM task_evals WHERE task_id = ?1 LIMIT 1", + )?; + let mut rows = stmt.query_map(params![task_id], row_to_task_eval)?; + match rows.next() { + Some(Ok(te)) => Ok(Some(te)), + Some(Err(e)) => Err(StoreError::Database(e)), + None => Ok(None), + } + } + + /// List all evals for an agent, newest first. + /// + /// Used by the model-selection layer (Phase 3) to compute + /// per-agent success rates for routing decisions. + pub fn list_task_evals_by_agent(&self, agent_id: &str) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT task_id, agent_id, eval_mode, completion_status, + quality_score, correctness_check, eval_provider, + eval_latency_ms, eval_cost_usd, evaluated_at + FROM task_evals WHERE agent_id = ?1 + ORDER BY evaluated_at DESC", + )?; + let mut result = Vec::new(); + for row in stmt.query_map(params![agent_id], row_to_task_eval)? { + result.push(row?); + } + Ok(result) + } + + /// List all task evals, newest first. + pub fn list_all_task_evals(&self) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT task_id, agent_id, eval_mode, completion_status, + quality_score, correctness_check, eval_provider, + eval_latency_ms, eval_cost_usd, evaluated_at + FROM task_evals ORDER BY evaluated_at DESC", + )?; + let mut result = Vec::new(); + for row in stmt.query_map([], row_to_task_eval)? { + result.push(row?); + } + Ok(result) + } + + /// Aggregate success rates for eval-driven model selection (Phase 3). + /// + /// Returns, per `eval_provider`, the count of evaluated tasks and the + /// average quality score over the last `window_hours`. Only tasks with + /// a non-null quality_score contribute. + pub fn eval_summary(&self, window_hours: u64) -> Result> { + let cutoff = (Utc::now() - chrono::Duration::hours(window_hours as i64)).to_rfc3339(); + let mut stmt = self.conn.prepare( + "SELECT eval_provider, + COUNT(*) as n, + AVG(quality_score) as avg_quality, + SUM(CASE WHEN completion_status = 'success' THEN 1 ELSE 0 END) as n_success, + AVG(eval_cost_usd) as avg_cost, + AVG(eval_latency_ms) as avg_latency_ms + FROM task_evals + WHERE evaluated_at >= ?1 AND quality_score IS NOT NULL + GROUP BY eval_provider", + )?; + let mut out = Vec::new(); + for row in stmt.query_map(params![cutoff], |r| { + Ok(EvalSummary { + eval_provider: r.get(0)?, + count: r.get::<_, i64>(1)? as u64, + avg_quality: r.get::<_, f64>(2).ok(), + n_success: r.get::<_, i64>(3)? as u64, + avg_cost: r.get::<_, f64>(4)?, + avg_latency_ms: r.get::<_, i64>(5).ok().map(|v| v as u64), + }) + })? { + out.push(row?); + } + Ok(out) + } + /// Assign a task to an agent (status → Claimed). /// /// The UPDATE is guarded on `status = 'queued'`, so the claim is atomic and @@ -675,6 +847,7 @@ impl Store { "agents": self.list_agents()?, "skills": self.list_skills()?, "tenants": self.list_tenants()?, + "task_evals": self.list_all_task_evals()?, "exported_at": Utc::now().to_rfc3339(), })) } @@ -721,6 +894,21 @@ fn row_to_task(row: &rusqlite::Row<'_>) -> rusqlite::Result { }) } +fn row_to_task_eval(row: &rusqlite::Row<'_>) -> rusqlite::Result { + Ok(TaskEval { + task_id: row.get(0)?, + agent_id: row.get(1)?, + eval_mode: row.get(2)?, + completion_status: row.get(3)?, + quality_score: row.get::<_, f64>(4).ok(), + correctness_check: row.get(5)?, + eval_provider: row.get(6)?, + eval_latency_ms: row.get::<_, i64>(7).ok().map(|v| v as u64), + eval_cost_usd: row.get::<_, f64>(8).unwrap_or(0.0), + evaluated_at: row.get(9)?, + }) +} + // --------------------------------------------------------------------------- // Default database path (platform-aware) // --------------------------------------------------------------------------- @@ -1154,4 +1342,193 @@ mod tests { let path = default_db_path(); assert!(path.to_string_lossy().contains("colibri.sqlite")); } + + // ------------------------------------------------------------------ + // Task evaluations (T2.x eval harness) + // ------------------------------------------------------------------ + + /// Helper: build a minimal TaskEval at the current time. + fn fake_eval(task_id: &str, agent_id: &str, mode: &str, quality: f64) -> TaskEval { + TaskEval { + task_id: task_id.to_string(), + agent_id: Some(agent_id.to_string()), + eval_mode: mode.to_string(), + completion_status: if quality >= 0.7 { + "success".to_string() + } else { + "fail".to_string() + }, + quality_score: Some(quality), + correctness_check: if quality >= 0.7 { + "pass".to_string() + } else { + "fail".to_string() + }, + eval_provider: Some("test-provider".to_string()), + eval_latency_ms: Some(42), + eval_cost_usd: 0.001, + evaluated_at: Utc::now().to_rfc3339(), + } + } + + #[test] + fn test_task_eval_roundtrip() { + let store = Store::open_memory().unwrap(); + + // Task must exist (FK from evals) + let task = store.create_task("Eval test task", None).unwrap(); + let agent = store + .register_agent("zot-1", serde_json::json!(["shell", "freebsd"]), None) + .unwrap(); + + // Write an eval + let eval = fake_eval(&task.id, &agent.id, "agent", 0.9); + store.write_task_eval(&eval).unwrap(); + + // Read it back + let read = store.read_task_eval(&task.id).unwrap().unwrap(); + assert_eq!(read.task_id, task.id); + assert_eq!(read.agent_id, Some(agent.id.clone())); + assert_eq!(read.eval_mode, "agent"); + assert_eq!(read.completion_status, "success"); + assert_eq!(read.quality_score, Some(0.9)); + assert_eq!(read.correctness_check, "pass"); + assert_eq!(read.eval_provider, Some("test-provider".to_string())); + assert_eq!(read.eval_latency_ms, Some(42)); + assert_eq!(read.eval_cost_usd, 0.001); + + // Task not found → None + assert!(store.read_task_eval("nonexistent-task").unwrap().is_none()); + } + + #[test] + fn test_task_eval_insert_or_replace() { + let store = Store::open_memory().unwrap(); + let task = store.create_task("Upgrade eval", None).unwrap(); + let agent = store + .register_agent("agent-x", serde_json::json!(["code"]), None) + .unwrap(); + + // Initial eval: agent self-report + let initial = fake_eval(&task.id, &agent.id, "agent", 0.6); + store.write_task_eval(&initial).unwrap(); + assert_eq!( + store + .read_task_eval(&task.id) + .unwrap() + .unwrap() + .quality_score, + Some(0.6) + ); + + // Follow-up eval: local-llm upgrade (same task_id) + let follow = fake_eval(&task.id, &agent.id, "local-llm", 0.8); + store.write_task_eval(&follow).unwrap(); + + // Should have overwritten the initial record + let read = store.read_task_eval(&task.id).unwrap().unwrap(); + assert_eq!(read.eval_mode, "local-llm"); + assert_eq!(read.quality_score, Some(0.8)); + } + + #[test] + fn test_list_task_evals_by_agent_filter() { + let store = Store::open_memory().unwrap(); + let task_a = store.create_task("Task A", None).unwrap(); + let task_b = store.create_task("Task B", None).unwrap(); + let task_c = store.create_task("Task C", None).unwrap(); + + let agent_1 = store + .register_agent("agent-1", serde_json::json!(["shell"]), None) + .unwrap(); + let agent_2 = store + .register_agent("agent-2", serde_json::json!(["code"]), None) + .unwrap(); + + store + .write_task_eval(&fake_eval(&task_a.id, &agent_1.id, "agent", 0.9)) + .unwrap(); + store + .write_task_eval(&fake_eval(&task_b.id, &agent_2.id, "local-llm", 0.7)) + .unwrap(); + store + .write_task_eval(&fake_eval(&task_c.id, &agent_1.id, "agent", 0.5)) + .unwrap(); + + let agent1_evals = store.list_task_evals_by_agent(&agent_1.id).unwrap(); + assert_eq!(agent1_evals.len(), 2); + // Newest first + assert_eq!(agent1_evals[0].task_id, task_c.id); + assert_eq!(agent1_evals[1].task_id, task_a.id); + + let agent2_evals = store.list_task_evals_by_agent(&agent_2.id).unwrap(); + assert_eq!(agent2_evals.len(), 1); + assert_eq!(agent2_evals[0].eval_mode, "local-llm"); + } + + #[test] + fn test_eval_summary_aggregation() { + let store = Store::open_memory().unwrap(); + let task_a = store.create_task("Summary A", None).unwrap(); + let task_b = store.create_task("Summary B", None).unwrap(); + let task_c = store.create_task("Summary C", None).unwrap(); + + let agent = store + .register_agent("agent", serde_json::json!(["shell"]), None) + .unwrap(); + + // Two high-quality evals, one low-quality + store + .write_task_eval(&fake_eval(&task_a.id, &agent.id, "agent", 0.9)) + .unwrap(); + store + .write_task_eval(&fake_eval(&task_b.id, &agent.id, "agent", 0.8)) + .unwrap(); + store + .write_task_eval(&fake_eval(&task_c.id, &agent.id, "agent", 0.3)) + .unwrap(); + + let summary = store.eval_summary(24).unwrap(); + assert_eq!(summary.len(), 1); + assert_eq!(summary[0].count, 3); + assert_eq!(summary[0].n_success, 2); // quality >= 0.7 → 'success' + let avg_quality = summary[0].avg_quality.unwrap(); + assert!((avg_quality - 2.0 / 3.0).abs() < 0.01); + } + + #[test] + fn test_eval_schema_check_constraints() { + let store = Store::open_memory().unwrap(); + let task = store.create_task("Constraint test", None).unwrap(); + + // Invalid eval_mode should fail at INSERT (SQLite CHECK) + let bad = TaskEval { + task_id: task.id.clone(), + agent_id: None, + eval_mode: "invalid-mode".to_string(), + completion_status: "success".to_string(), + quality_score: Some(1.0), + correctness_check: "pass".to_string(), + eval_provider: None, + eval_latency_ms: None, + eval_cost_usd: 0.0, + evaluated_at: Utc::now().to_rfc3339(), + }; + assert!(store.write_task_eval(&bad).is_err()); + + // Invalid completion_status should fail + let bad2 = TaskEval { + task_id: task.id.clone(), + agent_id: None, + eval_mode: "agent".to_string(), + completion_status: "weird-status".to_string(), + quality_score: Some(1.0), + correctness_check: "pass".to_string(), + eval_provider: None, + eval_latency_ms: None, + eval_cost_usd: 0.0, + evaluated_at: Utc::now().to_rfc3339(), + }; + assert!(store.write_task_eval(&bad2).is_err()); + } } diff --git a/crates/colibri-store/src/schema.rs b/crates/colibri-store/src/schema.rs index 0470516..61d160b 100644 --- a/crates/colibri-store/src/schema.rs +++ b/crates/colibri-store/src/schema.rs @@ -58,6 +58,40 @@ CREATE TABLE IF NOT EXISTS tenants ( ); CREATE INDEX IF NOT EXISTS idx_tenants_status ON tenants(status); + +-- Task evaluations (T2.x): multi-dimensional quality signal. +-- +-- The canonical record of how well a task went beyond exit code. Populated +-- by the eval harness after task completion. Eval modes: +-- * 'agent' — agent self-reported via completion event +-- * 'local-llm' — local model evaluated the output +-- * 'cloud-llm' — cloud model evaluated the output +-- * 'skipped' — task succeeded (exit 0) but no quality signal +-- +-- This table is the single source of truth for eval data. The model- +-- selection layer (Phase 3) reads from here to build (model, task_type) +-- success-rate histories. +-- +-- FK to tasks(id) intentionally omitted — we don't want DELETE CASCADE +-- to lose eval history if a task row is cleared, and we don't want a +-- missing task row to prevent recording evals (fire-and-forget evals). +-- Referential alignment is the daemon's responsibility. +CREATE TABLE IF NOT EXISTS task_evals ( + task_id TEXT PRIMARY KEY NOT NULL, + agent_id TEXT, + eval_mode TEXT NOT NULL CHECK(eval_mode IN ('agent','local-llm','cloud-llm','skipped')), + completion_status TEXT NOT NULL CHECK(completion_status IN ('success','partial','fail','silent-fail')), + quality_score REAL, -- 0.0..=1.0, nullable if unverifiable + correctness_check TEXT NOT NULL CHECK(correctness_check IN ('pass','fail','skipped')), + eval_provider TEXT, -- e.g. 'deepseek:deepseek-chat', 'local-deepseek-r1-7b' + eval_latency_ms INTEGER, + eval_cost_usd REAL NOT NULL DEFAULT 0.0, + evaluated_at TEXT NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_evals_agent ON task_evals(agent_id); +CREATE INDEX IF NOT EXISTS idx_evals_mode ON task_evals(eval_mode); +CREATE INDEX IF NOT EXISTS idx_evals_at ON task_evals(evaluated_at); "; /// Column additions since the initial schema. Each runs inside a try-block