feat: T2.x eval harness + RPC task dispatch #264
8 changed files with 659 additions and 40 deletions
|
|
@ -170,6 +170,16 @@ impl DaemonClient {
|
|||
self.request(&ColibriCommand::ListTasks { status }).await
|
||||
}
|
||||
|
||||
pub async fn get_task_eval(
|
||||
&self,
|
||||
task_id: impl Into<String>,
|
||||
) -> Result<serde_json::Value, ClientError> {
|
||||
self.request(&ColibriCommand::GetTaskEval {
|
||||
task_id: task_id.into(),
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn create_task(
|
||||
&self,
|
||||
title: impl Into<String>,
|
||||
|
|
|
|||
|
|
@ -271,46 +271,21 @@ pub async fn heartbeat(state: &SharedState, _stall_timeout: Duration) {
|
|||
});
|
||||
(u, proof)
|
||||
};
|
||||
if let Some(u) = usage {
|
||||
let tc = colibri_store::TaskCost {
|
||||
provider: Some(handle.config.provider.as_str().to_string()),
|
||||
model: Some(handle.config.model.clone()),
|
||||
input_tokens: u.input_tokens,
|
||||
output_tokens: u.output_tokens,
|
||||
cache_read_tokens: u.cache_read_tokens,
|
||||
cache_write_tokens: u.cache_write_tokens,
|
||||
cost: u.cost(),
|
||||
if let Some(usage) = usage {
|
||||
// T2.x: central hook point for post-task accounting.
|
||||
// Called from heartbeat when an agent exits (poll_exit), and from
|
||||
// future RPC-completion paths for persistent agents.
|
||||
let completion = TaskCompletion {
|
||||
task_id: task_id.to_string(),
|
||||
agent_id: handle.id.clone(),
|
||||
provider: handle.config.provider.as_str().to_string(),
|
||||
model: handle.config.model.clone(),
|
||||
success: status.success(),
|
||||
usage,
|
||||
proof_text,
|
||||
};
|
||||
let store = match state.store.try_lock() {
|
||||
Ok(s) => s,
|
||||
Err(std::sync::TryLockError::WouldBlock) => {
|
||||
warn!(task_id = %task_id, "store lock busy; cost capture deferred to next heartbeat");
|
||||
continue;
|
||||
}
|
||||
Err(std::sync::TryLockError::Poisoned(e)) => {
|
||||
warn!(task_id = %task_id, "store lock poisoned (prior panic); recovering inner store");
|
||||
e.into_inner()
|
||||
}
|
||||
};
|
||||
match store.set_task_cost(task_id, &tc) {
|
||||
Ok(t) => {
|
||||
info!(
|
||||
task_id = %task_id,
|
||||
input_tokens = u.input_tokens,
|
||||
output_tokens = u.output_tokens,
|
||||
cache_read = u.cache_read_tokens,
|
||||
cost = u.cost(),
|
||||
success = tc.success,
|
||||
status = %t.status.as_str(),
|
||||
"task cost captured"
|
||||
);
|
||||
// Best-effort push to mother for dashboard aggregation.
|
||||
push_cost_to_mother(task_id, &tc, proof_text.as_deref());
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(task_id = %task_id, error = %e, "failed to write task cost")
|
||||
}
|
||||
if let Err(e) = record_task_completion(state, &completion).await {
|
||||
warn!(task_id = %task_id, error = %e, "failed to record task completion");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -326,6 +301,117 @@ pub async fn heartbeat(state: &SharedState, _stall_timeout: Duration) {
|
|||
}
|
||||
}
|
||||
|
||||
/// Data describing a completed task (for eval harness + cost capture).
|
||||
///
|
||||
/// Consolidates arguments to `record_task_completion` so the function
|
||||
/// signature is stable as new fields are added (e.g. eval_mode, eval_latency_ms).
|
||||
/// Caller owns a `TaskCompletion`, daemon takes ownership and persists.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TaskCompletion {
|
||||
pub task_id: String,
|
||||
pub agent_id: String,
|
||||
pub provider: String,
|
||||
pub model: String,
|
||||
pub success: bool,
|
||||
pub usage: colibri_glasspane::PaneUsage,
|
||||
pub proof_text: Option<String>,
|
||||
}
|
||||
|
||||
/// Record task completion: write cost + eval records.
|
||||
///
|
||||
/// This is the central hook point for post-task accounting. Called from
|
||||
/// `heartbeat` when an agent process exits (`poll_exit`), and callable
|
||||
/// from future RPC-completion paths (e.g., persistent agents finishing
|
||||
/// via RPC instead of process exit).
|
||||
///
|
||||
/// Writes two records:
|
||||
/// - `task_costs`: usage tokens, provider, model, success status
|
||||
/// - `task_evals`: T2.x quality signal (agent self-report for now)
|
||||
///
|
||||
/// Returns Ok(true) if cost was written, Ok(false) if deferred, Err on failure.
|
||||
async fn record_task_completion(state: &SharedState, tc: &TaskCompletion) -> Result<bool, String> {
|
||||
let TaskCompletion {
|
||||
task_id,
|
||||
agent_id,
|
||||
provider,
|
||||
model,
|
||||
success,
|
||||
usage,
|
||||
proof_text,
|
||||
} = tc;
|
||||
// Build task cost record
|
||||
let task_cost = colibri_store::TaskCost {
|
||||
provider: Some(provider.clone()),
|
||||
model: Some(model.clone()),
|
||||
input_tokens: usage.input_tokens,
|
||||
output_tokens: usage.output_tokens,
|
||||
cache_read_tokens: usage.cache_read_tokens,
|
||||
cache_write_tokens: usage.cache_write_tokens,
|
||||
cost: usage.cost(),
|
||||
success: *success,
|
||||
};
|
||||
|
||||
// Acquire store lock
|
||||
let store_result = match state.store.try_lock() {
|
||||
Ok(s) => s,
|
||||
Err(std::sync::TryLockError::WouldBlock) => {
|
||||
warn!(task_id = %task_id, "store lock busy; completion capture deferred to next heartbeat");
|
||||
return Ok(false);
|
||||
}
|
||||
Err(std::sync::TryLockError::Poisoned(e)) => {
|
||||
warn!(task_id = %task_id, "store lock poisoned (prior panic); recovering inner store");
|
||||
e.into_inner()
|
||||
}
|
||||
};
|
||||
|
||||
// Write task cost
|
||||
match store_result.set_task_cost(task_id, &task_cost) {
|
||||
Ok(t) => {
|
||||
info!(
|
||||
task_id = %task_id,
|
||||
input_tokens = usage.input_tokens,
|
||||
output_tokens = usage.output_tokens,
|
||||
cache_read = usage.cache_read_tokens,
|
||||
cost = usage.cost(),
|
||||
success = *success,
|
||||
status = %t.status.as_str(),
|
||||
"task cost captured"
|
||||
);
|
||||
|
||||
// T2.x: Write eval record (agent self-report mode)
|
||||
let eval = colibri_store::TaskEval {
|
||||
task_id: task_id.to_string(),
|
||||
agent_id: Some(agent_id.to_string()),
|
||||
eval_mode: "agent".to_string(),
|
||||
completion_status: if *success { "success" } else { "fail" }.to_string(),
|
||||
quality_score: Some(if *success { 1.0 } else { 0.0 }),
|
||||
correctness_check: if *success { "pass" } else { "fail" }.to_string(),
|
||||
eval_provider: Some(format!("{}:{}", provider, model)),
|
||||
eval_latency_ms: None, // Future: track task duration
|
||||
eval_cost_usd: 0.0, // Self-report is free
|
||||
evaluated_at: chrono::Utc::now().to_rfc3339(),
|
||||
};
|
||||
match store_result.write_task_eval(&eval) {
|
||||
Ok(_) => {
|
||||
debug!(task_id = %task_id, "task eval recorded (agent self-report)");
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(task_id = %task_id, error = %e, "failed to write task eval");
|
||||
}
|
||||
}
|
||||
|
||||
// Best-effort push to mother for dashboard aggregation
|
||||
push_cost_to_mother(task_id, &task_cost, proof_text.as_deref());
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(task_id = %task_id, error = %e, "failed to write task cost");
|
||||
Err(format!("failed to write task cost: {}", e))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Best-effort push of task cost to mother node for dashboard aggregation.
|
||||
///
|
||||
/// SSH's to mother with `report-task-cost` forced command, pipes TaskCost JSON
|
||||
|
|
@ -548,6 +634,84 @@ pub async fn poll_tasks(state: &SharedState) {
|
|||
return;
|
||||
}
|
||||
|
||||
// ── Route claimed tasks to running RPC agents (autospawn path) ─────
|
||||
// Tasks claimed by an autospawned agent (zot rpc) don't need a new
|
||||
// process — send the task description via the existing RPC channel,
|
||||
// transition to started, and let the heartbeat capture results.
|
||||
let mut remaining = Vec::new();
|
||||
for task in claimed {
|
||||
let agent_id = match &task.agent_id {
|
||||
Some(aid) if !aid.is_empty() => aid.clone(),
|
||||
_ => {
|
||||
remaining.push(task);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Check if this agent has an RPC sender (autospawned agents do).
|
||||
// The task's agent_id is the store row ID; the store agent's 'name'
|
||||
// column holds the spawn handle ID used in state.agents.
|
||||
let spawn_id = match state.store.try_lock() {
|
||||
Ok(store) => store.get_agent(&agent_id).ok().flatten().map(|a| a.name),
|
||||
Err(std::sync::TryLockError::WouldBlock) => {
|
||||
warn!("poll_tasks: store locked; deferring agent lookup");
|
||||
remaining.push(task);
|
||||
continue;
|
||||
}
|
||||
Err(std::sync::TryLockError::Poisoned(e)) => e
|
||||
.into_inner()
|
||||
.get_agent(&agent_id)
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|a| a.name),
|
||||
};
|
||||
if let Some(ref sid) = spawn_id {
|
||||
if let Some(entry) = state.agents.get(sid) {
|
||||
if let Some(sender) = entry.value().rpc_sender() {
|
||||
let description = task
|
||||
.description
|
||||
.clone()
|
||||
.unwrap_or_else(|| task.title.clone());
|
||||
match sender.send_prompt(&description).await {
|
||||
Ok(rpc_id) => {
|
||||
// Transition task to started — agent is working on it.
|
||||
if let Ok(store) = state.store.try_lock() {
|
||||
if store
|
||||
.transition_task(&task.id, colibri_store::TaskStatus::Started)
|
||||
.is_ok()
|
||||
{
|
||||
info!(
|
||||
task_id = %task.id,
|
||||
agent_id = %agent_id,
|
||||
rpc_id = %rpc_id,
|
||||
"poll_tasks: dispatched task to running RPC agent"
|
||||
);
|
||||
}
|
||||
}
|
||||
continue; // Task dispatched — skip spawn path
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
task_id = %task.id,
|
||||
agent_id = %agent_id,
|
||||
error = %e,
|
||||
"poll_tasks: RPC send failed; falling back to spawn"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
remaining.push(task);
|
||||
}
|
||||
|
||||
if remaining.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
// ── Spawn-per-task path (fallback for tasks without an RPC agent) ───
|
||||
let claimed = remaining;
|
||||
|
||||
let spawner = Spawner::new(std::sync::Arc::new(state.config.clone()));
|
||||
|
||||
for task in claimed {
|
||||
|
|
|
|||
|
|
@ -69,6 +69,8 @@ pub enum ColibriCommand {
|
|||
TransitionTask { task_id: String, status: String },
|
||||
#[serde(rename = "claim-task")]
|
||||
ClaimTask { task_id: String, agent_id: String },
|
||||
#[serde(rename = "get-task-eval")]
|
||||
GetTaskEval { task_id: String },
|
||||
#[serde(rename = "list-agents")]
|
||||
ListAgents,
|
||||
#[serde(rename = "register-agent")]
|
||||
|
|
|
|||
|
|
@ -263,6 +263,7 @@ async fn dispatch(cmd: ColibriCommand, state: &SharedState) -> ColibriResponse {
|
|||
ColibriCommand::ClaimTask { task_id, agent_id } => {
|
||||
cmd_claim_task(state, task_id, agent_id).await
|
||||
}
|
||||
ColibriCommand::GetTaskEval { task_id } => cmd_get_task_eval(state, task_id).await,
|
||||
ColibriCommand::ListAgents => cmd_list_agents(state).await,
|
||||
ColibriCommand::RegisterAgent {
|
||||
name,
|
||||
|
|
@ -927,7 +928,7 @@ fn probe_capabilities() -> Vec<String> {
|
|||
if let Ok(entries) = std::fs::read_dir("/var/db/models") {
|
||||
if entries
|
||||
.filter_map(|e| e.ok())
|
||||
.any(|e| e.path().extension().map_or(false, |ext| ext == "gguf"))
|
||||
.any(|e| e.path().extension().is_some_and(|ext| ext == "gguf"))
|
||||
{
|
||||
if !caps.contains(&"local-llm".to_string()) {
|
||||
caps.push("local-llm".to_string());
|
||||
|
|
@ -1285,6 +1286,14 @@ async fn cmd_claim_task(state: &SharedState, task_id: String, agent_id: String)
|
|||
}
|
||||
}
|
||||
|
||||
async fn cmd_get_task_eval(state: &SharedState, task_id: String) -> ColibriResponse {
|
||||
match state.store.lock().unwrap().read_task_eval(&task_id) {
|
||||
Ok(Some(eval)) => ColibriResponse::ok(serde_json::to_value(eval).unwrap_or_default()),
|
||||
Ok(None) => ColibriResponse::err(format!("no eval found for task: {task_id}")),
|
||||
Err(e) => ColibriResponse::err(format!("read task eval failed: {e}")),
|
||||
}
|
||||
}
|
||||
|
||||
async fn cmd_list_agents(state: &SharedState) -> ColibriResponse {
|
||||
match state.store.lock().unwrap().list_agents() {
|
||||
Ok(agents) => ColibriResponse::ok(serde_json::to_value(agents).unwrap_or_default()),
|
||||
|
|
|
|||
|
|
@ -104,6 +104,20 @@ pub fn tool_list() -> Vec<Value> {
|
|||
}
|
||||
})),
|
||||
),
|
||||
json_tool(
|
||||
"colibri_get_task_eval",
|
||||
"Get task evaluation record with quality metrics and eval metadata",
|
||||
Some(serde_json::json!({
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"task_id": {
|
||||
"type": "string",
|
||||
"description": "Task ID to query evaluation for"
|
||||
}
|
||||
},
|
||||
"required": ["task_id"]
|
||||
})),
|
||||
),
|
||||
json_tool(
|
||||
"colibri_list_skills",
|
||||
"List registered skills in the Colibri catalog",
|
||||
|
|
@ -329,6 +343,14 @@ pub async fn dispatch_tool(
|
|||
let data = client.list_tasks(status).await.map_err(map_client_error)?;
|
||||
Ok(tool_text(data))
|
||||
}
|
||||
"colibri_get_task_eval" => {
|
||||
let task_id = require_string(arguments, "task_id")?;
|
||||
let data = client
|
||||
.get_task_eval(task_id)
|
||||
.await
|
||||
.map_err(map_client_error)?;
|
||||
Ok(tool_text(data))
|
||||
}
|
||||
"colibri_list_skills" => {
|
||||
let data = client.list_skills().await.map_err(map_client_error)?;
|
||||
Ok(tool_text(data))
|
||||
|
|
|
|||
|
|
@ -255,6 +255,7 @@ fn tool_list_has_all_phase1_tools() {
|
|||
assert!(names.contains(&"colibri_list_task_costs"));
|
||||
|
||||
assert!(names.contains(&"colibri_get_task"));
|
||||
assert!(names.contains(&"colibri_get_task_eval"));
|
||||
assert!(names.contains(&"colibri_deploy_run"));
|
||||
assert!(names.contains(&"colibri_deploy_targets"));
|
||||
assert!(names.contains(&"colibri_wiki_search"));
|
||||
|
|
@ -263,7 +264,7 @@ fn tool_list_has_all_phase1_tools() {
|
|||
assert!(names.contains(&"colibri_zfs_destroy_snapshot"));
|
||||
assert!(names.contains(&"colibri_pf_list_rules"));
|
||||
assert!(names.contains(&"colibri_pf_list_states"));
|
||||
assert_eq!(names.len(), 20);
|
||||
assert_eq!(names.len(), 21);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
|||
|
|
@ -191,6 +191,66 @@ pub struct TaskCost {
|
|||
pub success: bool,
|
||||
}
|
||||
|
||||
/// Multidimensional task evaluation record (T2.x eval harness).
|
||||
///
|
||||
/// Records how well a task succeeded beyond just "exit code 0". The eval
|
||||
/// harness populates this after task completion — either from agent
|
||||
/// self-report, local LLM evaluation, cloud LLM evaluation, or skipped
|
||||
/// (exit-code-only signal).
|
||||
///
|
||||
/// Schema constraints (enforced by SQLite CHECK):
|
||||
/// - `eval_mode` ∈ 'agent' | 'local-llm' | 'cloud-llm' | 'skipped'
|
||||
/// - `completion_status` ∈ 'success' | 'partial' | 'fail' | 'silent-fail'
|
||||
/// - `correctness_check` ∈ 'pass' | 'fail' | 'skipped'
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct TaskEval {
|
||||
pub task_id: String,
|
||||
/// The agent that ran the task. None if the completing agent isn't
|
||||
/// linked to a registered agent row (e.g. fire-and-forget tasks).
|
||||
pub agent_id: Option<String>,
|
||||
/// How this eval was produced: 'agent' (self-report), 'local-llm',
|
||||
/// 'cloud-llm', or 'skipped' (exit code only, no quality signal).
|
||||
pub eval_mode: String,
|
||||
/// Whether the task completed: 'success', 'partial', 'fail', or
|
||||
/// 'silent-fail' (exit 0 but output was garbage).
|
||||
pub completion_status: String,
|
||||
/// Quality score 0.0..=1.0, or None if unverifiable.
|
||||
pub quality_score: Option<f64>,
|
||||
/// Output correctness: 'pass', 'fail', or 'skipped' if not checked.
|
||||
pub correctness_check: String,
|
||||
/// Provider:model that ran the eval (e.g. 'deepseek:deepseek-chat'
|
||||
/// for agent self-report, 'local-deepseek-r1-7b' for local eval).
|
||||
/// None for 'skipped' mode.
|
||||
pub eval_provider: Option<String>,
|
||||
/// Eval latency in milliseconds. None for instant modes.
|
||||
pub eval_latency_ms: Option<u64>,
|
||||
/// Cost in USD to produce this eval. 0.0 for 'agent' and 'skipped'.
|
||||
pub eval_cost_usd: f64,
|
||||
/// When this eval was recorded (RFC3339 timestamp).
|
||||
pub evaluated_at: String,
|
||||
}
|
||||
|
||||
/// Per-provider rollup of eval data, used by model-selection (Phase 3).
|
||||
///
|
||||
/// Computed by `Store::eval_summary`. Counts and scores cover only
|
||||
/// tasks with a non-null quality_score within the query window.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct EvalSummary {
|
||||
/// The eval provider string (e.g. 'deepseek:deepseek-chat',
|
||||
/// 'local-deepseek-r1-7b', or None if the eval_provider was unset).
|
||||
pub eval_provider: Option<String>,
|
||||
/// Number of tasks evaluated by this provider in the window.
|
||||
pub count: u64,
|
||||
/// Mean quality score across those tasks (0.0..=1.0), or None if zero.
|
||||
pub avg_quality: Option<f64>,
|
||||
/// Number of those tasks whose completion_status was 'success'.
|
||||
pub n_success: u64,
|
||||
/// Mean eval cost in USD.
|
||||
pub avg_cost: f64,
|
||||
/// Mean eval latency in milliseconds, or None if no timed evals.
|
||||
pub avg_latency_ms: Option<u64>,
|
||||
}
|
||||
|
||||
impl Store {
|
||||
/// Open (or create) the store at `db_path`. Runs migrations automatically.
|
||||
pub fn open(db_path: impl Into<PathBuf>) -> Result<Self> {
|
||||
|
|
@ -338,6 +398,118 @@ impl Store {
|
|||
.ok_or_else(|| StoreError::NotFound(task_id.to_string()))
|
||||
}
|
||||
|
||||
/// Write a task evaluation record (T2.x eval harness).
|
||||
///
|
||||
/// Uses `INSERT OR REPLACE` so a higher-quality follow-up eval
|
||||
/// (e.g. local-llm upgrade after initial 'skipped') can overwrite
|
||||
/// the existing record for the same task_id.
|
||||
pub fn write_task_eval(&self, te: &TaskEval) -> Result<()> {
|
||||
self.conn.execute(
|
||||
"INSERT OR REPLACE INTO task_evals
|
||||
(task_id, agent_id, eval_mode, completion_status, quality_score,
|
||||
correctness_check, eval_provider, eval_latency_ms,
|
||||
eval_cost_usd, evaluated_at)
|
||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
|
||||
params![
|
||||
te.task_id,
|
||||
te.agent_id,
|
||||
te.eval_mode,
|
||||
te.completion_status,
|
||||
te.quality_score,
|
||||
te.correctness_check,
|
||||
te.eval_provider,
|
||||
te.eval_latency_ms.map(|v| v as i64),
|
||||
te.eval_cost_usd,
|
||||
te.evaluated_at,
|
||||
],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Read the evaluation for a specific task, if any.
|
||||
pub fn read_task_eval(&self, task_id: &str) -> Result<Option<TaskEval>> {
|
||||
let mut stmt = self.conn.prepare(
|
||||
"SELECT task_id, agent_id, eval_mode, completion_status,
|
||||
quality_score, correctness_check, eval_provider,
|
||||
eval_latency_ms, eval_cost_usd, evaluated_at
|
||||
FROM task_evals WHERE task_id = ?1 LIMIT 1",
|
||||
)?;
|
||||
let mut rows = stmt.query_map(params![task_id], row_to_task_eval)?;
|
||||
match rows.next() {
|
||||
Some(Ok(te)) => Ok(Some(te)),
|
||||
Some(Err(e)) => Err(StoreError::Database(e)),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// List all evals for an agent, newest first.
|
||||
///
|
||||
/// Used by the model-selection layer (Phase 3) to compute
|
||||
/// per-agent success rates for routing decisions.
|
||||
pub fn list_task_evals_by_agent(&self, agent_id: &str) -> Result<Vec<TaskEval>> {
|
||||
let mut stmt = self.conn.prepare(
|
||||
"SELECT task_id, agent_id, eval_mode, completion_status,
|
||||
quality_score, correctness_check, eval_provider,
|
||||
eval_latency_ms, eval_cost_usd, evaluated_at
|
||||
FROM task_evals WHERE agent_id = ?1
|
||||
ORDER BY evaluated_at DESC",
|
||||
)?;
|
||||
let mut result = Vec::new();
|
||||
for row in stmt.query_map(params![agent_id], row_to_task_eval)? {
|
||||
result.push(row?);
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// List all task evals, newest first.
|
||||
pub fn list_all_task_evals(&self) -> Result<Vec<TaskEval>> {
|
||||
let mut stmt = self.conn.prepare(
|
||||
"SELECT task_id, agent_id, eval_mode, completion_status,
|
||||
quality_score, correctness_check, eval_provider,
|
||||
eval_latency_ms, eval_cost_usd, evaluated_at
|
||||
FROM task_evals ORDER BY evaluated_at DESC",
|
||||
)?;
|
||||
let mut result = Vec::new();
|
||||
for row in stmt.query_map([], row_to_task_eval)? {
|
||||
result.push(row?);
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Aggregate success rates for eval-driven model selection (Phase 3).
|
||||
///
|
||||
/// Returns, per `eval_provider`, the count of evaluated tasks and the
|
||||
/// average quality score over the last `window_hours`. Only tasks with
|
||||
/// a non-null quality_score contribute.
|
||||
pub fn eval_summary(&self, window_hours: u64) -> Result<Vec<EvalSummary>> {
|
||||
let cutoff = (Utc::now() - chrono::Duration::hours(window_hours as i64)).to_rfc3339();
|
||||
let mut stmt = self.conn.prepare(
|
||||
"SELECT eval_provider,
|
||||
COUNT(*) as n,
|
||||
AVG(quality_score) as avg_quality,
|
||||
SUM(CASE WHEN completion_status = 'success' THEN 1 ELSE 0 END) as n_success,
|
||||
AVG(eval_cost_usd) as avg_cost,
|
||||
AVG(eval_latency_ms) as avg_latency_ms
|
||||
FROM task_evals
|
||||
WHERE evaluated_at >= ?1 AND quality_score IS NOT NULL
|
||||
GROUP BY eval_provider",
|
||||
)?;
|
||||
let mut out = Vec::new();
|
||||
for row in stmt.query_map(params![cutoff], |r| {
|
||||
Ok(EvalSummary {
|
||||
eval_provider: r.get(0)?,
|
||||
count: r.get::<_, i64>(1)? as u64,
|
||||
avg_quality: r.get::<_, f64>(2).ok(),
|
||||
n_success: r.get::<_, i64>(3)? as u64,
|
||||
avg_cost: r.get::<_, f64>(4)?,
|
||||
avg_latency_ms: r.get::<_, i64>(5).ok().map(|v| v as u64),
|
||||
})
|
||||
})? {
|
||||
out.push(row?);
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Assign a task to an agent (status → Claimed).
|
||||
///
|
||||
/// The UPDATE is guarded on `status = 'queued'`, so the claim is atomic and
|
||||
|
|
@ -675,6 +847,7 @@ impl Store {
|
|||
"agents": self.list_agents()?,
|
||||
"skills": self.list_skills()?,
|
||||
"tenants": self.list_tenants()?,
|
||||
"task_evals": self.list_all_task_evals()?,
|
||||
"exported_at": Utc::now().to_rfc3339(),
|
||||
}))
|
||||
}
|
||||
|
|
@ -721,6 +894,21 @@ fn row_to_task(row: &rusqlite::Row<'_>) -> rusqlite::Result<Task> {
|
|||
})
|
||||
}
|
||||
|
||||
fn row_to_task_eval(row: &rusqlite::Row<'_>) -> rusqlite::Result<TaskEval> {
|
||||
Ok(TaskEval {
|
||||
task_id: row.get(0)?,
|
||||
agent_id: row.get(1)?,
|
||||
eval_mode: row.get(2)?,
|
||||
completion_status: row.get(3)?,
|
||||
quality_score: row.get::<_, f64>(4).ok(),
|
||||
correctness_check: row.get(5)?,
|
||||
eval_provider: row.get(6)?,
|
||||
eval_latency_ms: row.get::<_, i64>(7).ok().map(|v| v as u64),
|
||||
eval_cost_usd: row.get::<_, f64>(8).unwrap_or(0.0),
|
||||
evaluated_at: row.get(9)?,
|
||||
})
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Default database path (platform-aware)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -1154,4 +1342,193 @@ mod tests {
|
|||
let path = default_db_path();
|
||||
assert!(path.to_string_lossy().contains("colibri.sqlite"));
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Task evaluations (T2.x eval harness)
|
||||
// ------------------------------------------------------------------
|
||||
|
||||
/// Helper: build a minimal TaskEval at the current time.
|
||||
fn fake_eval(task_id: &str, agent_id: &str, mode: &str, quality: f64) -> TaskEval {
|
||||
TaskEval {
|
||||
task_id: task_id.to_string(),
|
||||
agent_id: Some(agent_id.to_string()),
|
||||
eval_mode: mode.to_string(),
|
||||
completion_status: if quality >= 0.7 {
|
||||
"success".to_string()
|
||||
} else {
|
||||
"fail".to_string()
|
||||
},
|
||||
quality_score: Some(quality),
|
||||
correctness_check: if quality >= 0.7 {
|
||||
"pass".to_string()
|
||||
} else {
|
||||
"fail".to_string()
|
||||
},
|
||||
eval_provider: Some("test-provider".to_string()),
|
||||
eval_latency_ms: Some(42),
|
||||
eval_cost_usd: 0.001,
|
||||
evaluated_at: Utc::now().to_rfc3339(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_task_eval_roundtrip() {
|
||||
let store = Store::open_memory().unwrap();
|
||||
|
||||
// Task must exist (FK from evals)
|
||||
let task = store.create_task("Eval test task", None).unwrap();
|
||||
let agent = store
|
||||
.register_agent("zot-1", serde_json::json!(["shell", "freebsd"]), None)
|
||||
.unwrap();
|
||||
|
||||
// Write an eval
|
||||
let eval = fake_eval(&task.id, &agent.id, "agent", 0.9);
|
||||
store.write_task_eval(&eval).unwrap();
|
||||
|
||||
// Read it back
|
||||
let read = store.read_task_eval(&task.id).unwrap().unwrap();
|
||||
assert_eq!(read.task_id, task.id);
|
||||
assert_eq!(read.agent_id, Some(agent.id.clone()));
|
||||
assert_eq!(read.eval_mode, "agent");
|
||||
assert_eq!(read.completion_status, "success");
|
||||
assert_eq!(read.quality_score, Some(0.9));
|
||||
assert_eq!(read.correctness_check, "pass");
|
||||
assert_eq!(read.eval_provider, Some("test-provider".to_string()));
|
||||
assert_eq!(read.eval_latency_ms, Some(42));
|
||||
assert_eq!(read.eval_cost_usd, 0.001);
|
||||
|
||||
// Task not found → None
|
||||
assert!(store.read_task_eval("nonexistent-task").unwrap().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_task_eval_insert_or_replace() {
|
||||
let store = Store::open_memory().unwrap();
|
||||
let task = store.create_task("Upgrade eval", None).unwrap();
|
||||
let agent = store
|
||||
.register_agent("agent-x", serde_json::json!(["code"]), None)
|
||||
.unwrap();
|
||||
|
||||
// Initial eval: agent self-report
|
||||
let initial = fake_eval(&task.id, &agent.id, "agent", 0.6);
|
||||
store.write_task_eval(&initial).unwrap();
|
||||
assert_eq!(
|
||||
store
|
||||
.read_task_eval(&task.id)
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.quality_score,
|
||||
Some(0.6)
|
||||
);
|
||||
|
||||
// Follow-up eval: local-llm upgrade (same task_id)
|
||||
let follow = fake_eval(&task.id, &agent.id, "local-llm", 0.8);
|
||||
store.write_task_eval(&follow).unwrap();
|
||||
|
||||
// Should have overwritten the initial record
|
||||
let read = store.read_task_eval(&task.id).unwrap().unwrap();
|
||||
assert_eq!(read.eval_mode, "local-llm");
|
||||
assert_eq!(read.quality_score, Some(0.8));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_list_task_evals_by_agent_filter() {
|
||||
let store = Store::open_memory().unwrap();
|
||||
let task_a = store.create_task("Task A", None).unwrap();
|
||||
let task_b = store.create_task("Task B", None).unwrap();
|
||||
let task_c = store.create_task("Task C", None).unwrap();
|
||||
|
||||
let agent_1 = store
|
||||
.register_agent("agent-1", serde_json::json!(["shell"]), None)
|
||||
.unwrap();
|
||||
let agent_2 = store
|
||||
.register_agent("agent-2", serde_json::json!(["code"]), None)
|
||||
.unwrap();
|
||||
|
||||
store
|
||||
.write_task_eval(&fake_eval(&task_a.id, &agent_1.id, "agent", 0.9))
|
||||
.unwrap();
|
||||
store
|
||||
.write_task_eval(&fake_eval(&task_b.id, &agent_2.id, "local-llm", 0.7))
|
||||
.unwrap();
|
||||
store
|
||||
.write_task_eval(&fake_eval(&task_c.id, &agent_1.id, "agent", 0.5))
|
||||
.unwrap();
|
||||
|
||||
let agent1_evals = store.list_task_evals_by_agent(&agent_1.id).unwrap();
|
||||
assert_eq!(agent1_evals.len(), 2);
|
||||
// Newest first
|
||||
assert_eq!(agent1_evals[0].task_id, task_c.id);
|
||||
assert_eq!(agent1_evals[1].task_id, task_a.id);
|
||||
|
||||
let agent2_evals = store.list_task_evals_by_agent(&agent_2.id).unwrap();
|
||||
assert_eq!(agent2_evals.len(), 1);
|
||||
assert_eq!(agent2_evals[0].eval_mode, "local-llm");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_eval_summary_aggregation() {
|
||||
let store = Store::open_memory().unwrap();
|
||||
let task_a = store.create_task("Summary A", None).unwrap();
|
||||
let task_b = store.create_task("Summary B", None).unwrap();
|
||||
let task_c = store.create_task("Summary C", None).unwrap();
|
||||
|
||||
let agent = store
|
||||
.register_agent("agent", serde_json::json!(["shell"]), None)
|
||||
.unwrap();
|
||||
|
||||
// Two high-quality evals, one low-quality
|
||||
store
|
||||
.write_task_eval(&fake_eval(&task_a.id, &agent.id, "agent", 0.9))
|
||||
.unwrap();
|
||||
store
|
||||
.write_task_eval(&fake_eval(&task_b.id, &agent.id, "agent", 0.8))
|
||||
.unwrap();
|
||||
store
|
||||
.write_task_eval(&fake_eval(&task_c.id, &agent.id, "agent", 0.3))
|
||||
.unwrap();
|
||||
|
||||
let summary = store.eval_summary(24).unwrap();
|
||||
assert_eq!(summary.len(), 1);
|
||||
assert_eq!(summary[0].count, 3);
|
||||
assert_eq!(summary[0].n_success, 2); // quality >= 0.7 → 'success'
|
||||
let avg_quality = summary[0].avg_quality.unwrap();
|
||||
assert!((avg_quality - 2.0 / 3.0).abs() < 0.01);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_eval_schema_check_constraints() {
|
||||
let store = Store::open_memory().unwrap();
|
||||
let task = store.create_task("Constraint test", None).unwrap();
|
||||
|
||||
// Invalid eval_mode should fail at INSERT (SQLite CHECK)
|
||||
let bad = TaskEval {
|
||||
task_id: task.id.clone(),
|
||||
agent_id: None,
|
||||
eval_mode: "invalid-mode".to_string(),
|
||||
completion_status: "success".to_string(),
|
||||
quality_score: Some(1.0),
|
||||
correctness_check: "pass".to_string(),
|
||||
eval_provider: None,
|
||||
eval_latency_ms: None,
|
||||
eval_cost_usd: 0.0,
|
||||
evaluated_at: Utc::now().to_rfc3339(),
|
||||
};
|
||||
assert!(store.write_task_eval(&bad).is_err());
|
||||
|
||||
// Invalid completion_status should fail
|
||||
let bad2 = TaskEval {
|
||||
task_id: task.id.clone(),
|
||||
agent_id: None,
|
||||
eval_mode: "agent".to_string(),
|
||||
completion_status: "weird-status".to_string(),
|
||||
quality_score: Some(1.0),
|
||||
correctness_check: "pass".to_string(),
|
||||
eval_provider: None,
|
||||
eval_latency_ms: None,
|
||||
eval_cost_usd: 0.0,
|
||||
evaluated_at: Utc::now().to_rfc3339(),
|
||||
};
|
||||
assert!(store.write_task_eval(&bad2).is_err());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -58,6 +58,40 @@ CREATE TABLE IF NOT EXISTS tenants (
|
|||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_tenants_status ON tenants(status);
|
||||
|
||||
-- Task evaluations (T2.x): multi-dimensional quality signal.
|
||||
--
|
||||
-- The canonical record of how well a task went beyond exit code. Populated
|
||||
-- by the eval harness after task completion. Eval modes:
|
||||
-- * 'agent' — agent self-reported via completion event
|
||||
-- * 'local-llm' — local model evaluated the output
|
||||
-- * 'cloud-llm' — cloud model evaluated the output
|
||||
-- * 'skipped' — task succeeded (exit 0) but no quality signal
|
||||
--
|
||||
-- This table is the single source of truth for eval data. The model-
|
||||
-- selection layer (Phase 3) reads from here to build (model, task_type)
|
||||
-- success-rate histories.
|
||||
--
|
||||
-- FK to tasks(id) intentionally omitted — we don't want DELETE CASCADE
|
||||
-- to lose eval history if a task row is cleared, and we don't want a
|
||||
-- missing task row to prevent recording evals (fire-and-forget evals).
|
||||
-- Referential alignment is the daemon's responsibility.
|
||||
CREATE TABLE IF NOT EXISTS task_evals (
|
||||
task_id TEXT PRIMARY KEY NOT NULL,
|
||||
agent_id TEXT,
|
||||
eval_mode TEXT NOT NULL CHECK(eval_mode IN ('agent','local-llm','cloud-llm','skipped')),
|
||||
completion_status TEXT NOT NULL CHECK(completion_status IN ('success','partial','fail','silent-fail')),
|
||||
quality_score REAL, -- 0.0..=1.0, nullable if unverifiable
|
||||
correctness_check TEXT NOT NULL CHECK(correctness_check IN ('pass','fail','skipped')),
|
||||
eval_provider TEXT, -- e.g. 'deepseek:deepseek-chat', 'local-deepseek-r1-7b'
|
||||
eval_latency_ms INTEGER,
|
||||
eval_cost_usd REAL NOT NULL DEFAULT 0.0,
|
||||
evaluated_at TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_evals_agent ON task_evals(agent_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_evals_mode ON task_evals(eval_mode);
|
||||
CREATE INDEX IF NOT EXISTS idx_evals_at ON task_evals(evaluated_at);
|
||||
";
|
||||
|
||||
/// Column additions since the initial schema. Each runs inside a try-block
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue