diff --git a/crates/colibri-daemon/src/scheduler.rs b/crates/colibri-daemon/src/scheduler.rs index 7364a2b..4ac4dac 100644 --- a/crates/colibri-daemon/src/scheduler.rs +++ b/crates/colibri-daemon/src/scheduler.rs @@ -509,8 +509,8 @@ mod tests { capabilities: serde_json::json!(["rust"]), status: "offline".into(), created_at: "2026-01-01T00:00:00Z".into(), - host: None, - last_seen: None, + host: None, + last_seen: None, }]; assert!(pick_agent(&required, &agents).is_none()); } @@ -524,8 +524,8 @@ mod tests { capabilities: serde_json::json!(["linux"]), status: "idle".into(), created_at: "2026-01-01T00:00:00Z".into(), - host: None, - last_seen: None, + host: None, + last_seen: None, }]; assert!(pick_agent(&required, &agents).is_none()); } @@ -538,8 +538,8 @@ mod tests { capabilities: serde_json::json!([]), status: "idle".into(), created_at: "2026-01-01T00:00:00Z".into(), - host: None, - last_seen: None, + host: None, + last_seen: None, }]; assert_eq!(pick_agent(&[], &agents).unwrap().name, "generalist"); } @@ -613,8 +613,8 @@ mod tests { capabilities: serde_json::json!(["rust"]), status: "active".into(), created_at: "2026-01-01T00:00:00Z".into(), - host: None, - last_seen: None, + host: None, + last_seen: None, }]; let picked = pick_agent(&required, &agents); assert!( diff --git a/crates/colibri-daemon/src/socket.rs b/crates/colibri-daemon/src/socket.rs index 312c2a6..409180f 100644 --- a/crates/colibri-daemon/src/socket.rs +++ b/crates/colibri-daemon/src/socket.rs @@ -264,9 +264,11 @@ async fn dispatch(cmd: ColibriCommand, state: &SharedState) -> ColibriResponse { cmd_claim_task(state, task_id, agent_id).await } ColibriCommand::ListAgents => cmd_list_agents(state).await, - ColibriCommand::RegisterAgent { name, capabilities, host } => { - cmd_register_agent(state, name, capabilities, host.as_deref()).await - } + ColibriCommand::RegisterAgent { + name, + capabilities, + host, + } => cmd_register_agent(state, name, capabilities, host.as_deref()).await, ColibriCommand::Heartbeat { agent_id, host } => { cmd_heartbeat(state, agent_id, host.as_deref()).await } @@ -1083,7 +1085,12 @@ async fn cmd_intake_task( ) -> ColibriResponse { let caps = capabilities.unwrap_or_default(); // Create the task immediately so the caller gets the ID back. - let task = match state.store.lock().unwrap().create_task(&title, description.as_deref()) { + let task = match state + .store + .lock() + .unwrap() + .create_task(&title, description.as_deref()) + { Ok(t) => t, Err(e) => return ColibriResponse::err(format!("create task failed: {e}")), }; diff --git a/crates/colibri-daemon/tests/multi_agent_board.rs b/crates/colibri-daemon/tests/multi_agent_board.rs index ef0ea31..98069f5 100644 --- a/crates/colibri-daemon/tests/multi_agent_board.rs +++ b/crates/colibri-daemon/tests/multi_agent_board.rs @@ -199,7 +199,10 @@ async fn scheduler_routes_intake_tasks_by_capability() { ) .await; // Intake now returns the full task (id + status), not just {"status":"queued"}. - assert!(fs_intake["data"]["id"].is_string(), "intake must return task id"); + assert!( + fs_intake["data"]["id"].is_string(), + "intake must return task id" + ); send_command( &socket_path, diff --git a/crates/colibri-store/src/lib.rs b/crates/colibri-store/src/lib.rs index ea0fd87..4495186 100644 --- a/crates/colibri-store/src/lib.rs +++ b/crates/colibri-store/src/lib.rs @@ -375,9 +375,7 @@ impl Store { params![now, host, agent_id], )?; if rows == 0 { - return Err(StoreError::NotFound(format!( - "agent not found: {agent_id}" - ))); + return Err(StoreError::NotFound(format!("agent not found: {agent_id}"))); } Ok(()) } @@ -797,6 +795,122 @@ mod tests { assert_eq!(got.status, "offline"); } + /// Phase 3: register_agent must persist `host` and stamp `last_seen`. + #[test] + fn test_register_agent_stores_host_and_last_seen() { + let store = Store::open_memory().unwrap(); + + let agent = store + .register_agent("zot", serde_json::json!(["code"]), Some("osa")) + .unwrap(); + + assert_eq!(agent.host.as_deref(), Some("osa")); + assert!( + agent.last_seen.is_some(), + "last_seen must be set on register" + ); + + // Round-trips through get_agent (i.e. the 7-column read works). + let got = store.get_agent(&agent.id).unwrap().unwrap(); + assert_eq!(got.host.as_deref(), Some("osa")); + assert_eq!(got.last_seen, agent.last_seen); + } + + /// Phase 3: heartbeat bumps `last_seen`. Passing `None` for host must not + /// clobber an existing host (the COALESCE contract). + #[test] + fn test_heartbeat_updates_last_seen_and_preserves_host() { + let store = Store::open_memory().unwrap(); + + let agent = store + .register_agent("zot", serde_json::json!([]), Some("osa")) + .unwrap(); + let registered_seen = agent.last_seen.clone().unwrap(); + + // Ensure the heartbeat timestamp ticks past the register timestamp. + std::thread::sleep(std::time::Duration::from_millis(10)); + + store.heartbeat(&agent.id, None).unwrap(); + + let got = store.get_agent(&agent.id).unwrap().unwrap(); + assert_eq!( + got.host.as_deref(), + Some("osa"), + "host preserved when heartbeat omits it" + ); + assert!( + got.last_seen.as_deref().unwrap() > registered_seen.as_str(), + "last_seen must advance past the register timestamp" + ); + } + + /// Phase 3: heartbeat with `Some(host)` updates the host in place. + #[test] + fn test_heartbeat_updates_host() { + let store = Store::open_memory().unwrap(); + + let agent = store + .register_agent("zot", serde_json::json!([]), Some("osa")) + .unwrap(); + + store.heartbeat(&agent.id, Some("domedog")).unwrap(); + + let got = store.get_agent(&agent.id).unwrap().unwrap(); + assert_eq!( + got.host.as_deref(), + Some("domedog"), + "host must update to the new value" + ); + } + + /// Phase 3: heartbeat on an unknown agent id fails closed (NotFound), not + /// silently. + #[test] + fn test_heartbeat_unknown_agent_not_found() { + let store = Store::open_memory().unwrap(); + + let err = store.heartbeat("nonexistent-id", None).unwrap_err(); + assert!( + matches!(err, StoreError::NotFound(ref m) if m.contains("agent not found")), + "expected NotFound for unknown agent, got: {err:?}" + ); + } + + /// Phase 3: run_migrations must be idempotent across store reopens. The + /// `ALTER TABLE ... ADD COLUMN` statements would otherwise raise + /// "duplicate column name" on the second open. Uses a real file because + /// in-memory DBs don't survive a reopen. + #[test] + fn test_migrations_idempotent_on_reopen() { + let mut path = std::env::temp_dir(); + path.push(format!("colibri-phase3-reopen-{}.db", std::process::id())); + // Clean up any leftovers from a previous run before *and* after. + let _ = std::fs::remove_file(&path); + let _ = std::fs::remove_file(format!("{}-wal", path.to_string_lossy())); + let _ = std::fs::remove_file(format!("{}-shm", path.to_string_lossy())); + + { + let store = Store::open(&path).unwrap(); + // Exercises the migrated columns end-to-end. + store + .register_agent("zot", serde_json::json!([]), Some("osa")) + .unwrap(); + } + + // Reopen the same file: run_migrations runs again and must NOT error + // on the already-applied ALTER statements. + { + let store = Store::open(&path).unwrap(); + let agents = store.list_agents().unwrap(); + assert_eq!(agents.len(), 1, "data must survive reopen"); + assert_eq!(agents[0].host.as_deref(), Some("osa")); + } + + let _ = std::fs::remove_file(&path); + let _ = std::fs::remove_file(format!("{}-wal", path.to_string_lossy())); + let _ = std::fs::remove_file(format!("{}-shm", path.to_string_lossy())); + } + #[test] fn test_skills_catalog() { let store = Store::open_memory().unwrap();