chore/store-phase3-followups #228

Merged
clawdie merged 2 commits from chore/store-phase3-followups into main 2026-06-26 22:12:12 +02:00
4 changed files with 140 additions and 16 deletions

View file

@ -509,8 +509,8 @@ mod tests {
capabilities: serde_json::json!(["rust"]),
status: "offline".into(),
created_at: "2026-01-01T00:00:00Z".into(),
host: None,
last_seen: None,
host: None,
last_seen: None,
}];
assert!(pick_agent(&required, &agents).is_none());
}
@ -524,8 +524,8 @@ mod tests {
capabilities: serde_json::json!(["linux"]),
status: "idle".into(),
created_at: "2026-01-01T00:00:00Z".into(),
host: None,
last_seen: None,
host: None,
last_seen: None,
}];
assert!(pick_agent(&required, &agents).is_none());
}
@ -538,8 +538,8 @@ mod tests {
capabilities: serde_json::json!([]),
status: "idle".into(),
created_at: "2026-01-01T00:00:00Z".into(),
host: None,
last_seen: None,
host: None,
last_seen: None,
}];
assert_eq!(pick_agent(&[], &agents).unwrap().name, "generalist");
}
@ -613,8 +613,8 @@ mod tests {
capabilities: serde_json::json!(["rust"]),
status: "active".into(),
created_at: "2026-01-01T00:00:00Z".into(),
host: None,
last_seen: None,
host: None,
last_seen: None,
}];
let picked = pick_agent(&required, &agents);
assert!(

View file

@ -264,9 +264,11 @@ async fn dispatch(cmd: ColibriCommand, state: &SharedState) -> ColibriResponse {
cmd_claim_task(state, task_id, agent_id).await
}
ColibriCommand::ListAgents => cmd_list_agents(state).await,
ColibriCommand::RegisterAgent { name, capabilities, host } => {
cmd_register_agent(state, name, capabilities, host.as_deref()).await
}
ColibriCommand::RegisterAgent {
name,
capabilities,
host,
} => cmd_register_agent(state, name, capabilities, host.as_deref()).await,
ColibriCommand::Heartbeat { agent_id, host } => {
cmd_heartbeat(state, agent_id, host.as_deref()).await
}
@ -1083,7 +1085,12 @@ async fn cmd_intake_task(
) -> ColibriResponse {
let caps = capabilities.unwrap_or_default();
// Create the task immediately so the caller gets the ID back.
let task = match state.store.lock().unwrap().create_task(&title, description.as_deref()) {
let task = match state
.store
.lock()
.unwrap()
.create_task(&title, description.as_deref())
{
Ok(t) => t,
Err(e) => return ColibriResponse::err(format!("create task failed: {e}")),
};

View file

@ -199,7 +199,10 @@ async fn scheduler_routes_intake_tasks_by_capability() {
)
.await;
// Intake now returns the full task (id + status), not just {"status":"queued"}.
assert!(fs_intake["data"]["id"].is_string(), "intake must return task id");
assert!(
fs_intake["data"]["id"].is_string(),
"intake must return task id"
);
send_command(
&socket_path,

View file

@ -375,9 +375,7 @@ impl Store {
params![now, host, agent_id],
)?;
if rows == 0 {
return Err(StoreError::NotFound(format!(
"agent not found: {agent_id}"
)));
return Err(StoreError::NotFound(format!("agent not found: {agent_id}")));
}
Ok(())
}
@ -797,6 +795,122 @@ mod tests {
assert_eq!(got.status, "offline");
}
/// Phase 3: register_agent must persist `host` and stamp `last_seen`.
#[test]
fn test_register_agent_stores_host_and_last_seen() {
let store = Store::open_memory().unwrap();
let agent = store
.register_agent("zot", serde_json::json!(["code"]), Some("osa"))
.unwrap();
assert_eq!(agent.host.as_deref(), Some("osa"));
assert!(
agent.last_seen.is_some(),
"last_seen must be set on register"
);
// Round-trips through get_agent (i.e. the 7-column read works).
let got = store.get_agent(&agent.id).unwrap().unwrap();
assert_eq!(got.host.as_deref(), Some("osa"));
assert_eq!(got.last_seen, agent.last_seen);
}
/// Phase 3: heartbeat bumps `last_seen`. Passing `None` for host must not
/// clobber an existing host (the COALESCE contract).
#[test]
fn test_heartbeat_updates_last_seen_and_preserves_host() {
let store = Store::open_memory().unwrap();
let agent = store
.register_agent("zot", serde_json::json!([]), Some("osa"))
.unwrap();
let registered_seen = agent.last_seen.clone().unwrap();
// Ensure the heartbeat timestamp ticks past the register timestamp.
std::thread::sleep(std::time::Duration::from_millis(10));
store.heartbeat(&agent.id, None).unwrap();
let got = store.get_agent(&agent.id).unwrap().unwrap();
assert_eq!(
got.host.as_deref(),
Some("osa"),
"host preserved when heartbeat omits it"
);
assert!(
got.last_seen.as_deref().unwrap() > registered_seen.as_str(),
"last_seen must advance past the register timestamp"
);
}
/// Phase 3: heartbeat with `Some(host)` updates the host in place.
#[test]
fn test_heartbeat_updates_host() {
let store = Store::open_memory().unwrap();
let agent = store
.register_agent("zot", serde_json::json!([]), Some("osa"))
.unwrap();
store.heartbeat(&agent.id, Some("domedog")).unwrap();
let got = store.get_agent(&agent.id).unwrap().unwrap();
assert_eq!(
got.host.as_deref(),
Some("domedog"),
"host must update to the new value"
);
}
/// Phase 3: heartbeat on an unknown agent id fails closed (NotFound), not
/// silently.
#[test]
fn test_heartbeat_unknown_agent_not_found() {
let store = Store::open_memory().unwrap();
let err = store.heartbeat("nonexistent-id", None).unwrap_err();
assert!(
matches!(err, StoreError::NotFound(ref m) if m.contains("agent not found")),
"expected NotFound for unknown agent, got: {err:?}"
);
}
/// Phase 3: run_migrations must be idempotent across store reopens. The
/// `ALTER TABLE ... ADD COLUMN` statements would otherwise raise
/// "duplicate column name" on the second open. Uses a real file because
/// in-memory DBs don't survive a reopen.
#[test]
fn test_migrations_idempotent_on_reopen() {
let mut path = std::env::temp_dir();
path.push(format!("colibri-phase3-reopen-{}.db", std::process::id()));
// Clean up any leftovers from a previous run before *and* after.
let _ = std::fs::remove_file(&path);
let _ = std::fs::remove_file(format!("{}-wal", path.to_string_lossy()));
let _ = std::fs::remove_file(format!("{}-shm", path.to_string_lossy()));
{
let store = Store::open(&path).unwrap();
// Exercises the migrated columns end-to-end.
store
.register_agent("zot", serde_json::json!([]), Some("osa"))
.unwrap();
}
// Reopen the same file: run_migrations runs again and must NOT error
// on the already-applied ALTER statements.
{
let store = Store::open(&path).unwrap();
let agents = store.list_agents().unwrap();
assert_eq!(agents.len(), 1, "data must survive reopen");
assert_eq!(agents[0].host.as_deref(), Some("osa"));
}
let _ = std::fs::remove_file(&path);
let _ = std::fs::remove_file(format!("{}-wal", path.to_string_lossy()));
let _ = std::fs::remove_file(format!("{}-shm", path.to_string_lossy()));
}
#[test]
fn test_skills_catalog() {
let store = Store::open_memory().unwrap();