chore/store-phase3-followups #228
4 changed files with 140 additions and 16 deletions
|
|
@ -509,8 +509,8 @@ mod tests {
|
|||
capabilities: serde_json::json!(["rust"]),
|
||||
status: "offline".into(),
|
||||
created_at: "2026-01-01T00:00:00Z".into(),
|
||||
host: None,
|
||||
last_seen: None,
|
||||
host: None,
|
||||
last_seen: None,
|
||||
}];
|
||||
assert!(pick_agent(&required, &agents).is_none());
|
||||
}
|
||||
|
|
@ -524,8 +524,8 @@ mod tests {
|
|||
capabilities: serde_json::json!(["linux"]),
|
||||
status: "idle".into(),
|
||||
created_at: "2026-01-01T00:00:00Z".into(),
|
||||
host: None,
|
||||
last_seen: None,
|
||||
host: None,
|
||||
last_seen: None,
|
||||
}];
|
||||
assert!(pick_agent(&required, &agents).is_none());
|
||||
}
|
||||
|
|
@ -538,8 +538,8 @@ mod tests {
|
|||
capabilities: serde_json::json!([]),
|
||||
status: "idle".into(),
|
||||
created_at: "2026-01-01T00:00:00Z".into(),
|
||||
host: None,
|
||||
last_seen: None,
|
||||
host: None,
|
||||
last_seen: None,
|
||||
}];
|
||||
assert_eq!(pick_agent(&[], &agents).unwrap().name, "generalist");
|
||||
}
|
||||
|
|
@ -613,8 +613,8 @@ mod tests {
|
|||
capabilities: serde_json::json!(["rust"]),
|
||||
status: "active".into(),
|
||||
created_at: "2026-01-01T00:00:00Z".into(),
|
||||
host: None,
|
||||
last_seen: None,
|
||||
host: None,
|
||||
last_seen: None,
|
||||
}];
|
||||
let picked = pick_agent(&required, &agents);
|
||||
assert!(
|
||||
|
|
|
|||
|
|
@ -264,9 +264,11 @@ async fn dispatch(cmd: ColibriCommand, state: &SharedState) -> ColibriResponse {
|
|||
cmd_claim_task(state, task_id, agent_id).await
|
||||
}
|
||||
ColibriCommand::ListAgents => cmd_list_agents(state).await,
|
||||
ColibriCommand::RegisterAgent { name, capabilities, host } => {
|
||||
cmd_register_agent(state, name, capabilities, host.as_deref()).await
|
||||
}
|
||||
ColibriCommand::RegisterAgent {
|
||||
name,
|
||||
capabilities,
|
||||
host,
|
||||
} => cmd_register_agent(state, name, capabilities, host.as_deref()).await,
|
||||
ColibriCommand::Heartbeat { agent_id, host } => {
|
||||
cmd_heartbeat(state, agent_id, host.as_deref()).await
|
||||
}
|
||||
|
|
@ -1083,7 +1085,12 @@ async fn cmd_intake_task(
|
|||
) -> ColibriResponse {
|
||||
let caps = capabilities.unwrap_or_default();
|
||||
// Create the task immediately so the caller gets the ID back.
|
||||
let task = match state.store.lock().unwrap().create_task(&title, description.as_deref()) {
|
||||
let task = match state
|
||||
.store
|
||||
.lock()
|
||||
.unwrap()
|
||||
.create_task(&title, description.as_deref())
|
||||
{
|
||||
Ok(t) => t,
|
||||
Err(e) => return ColibriResponse::err(format!("create task failed: {e}")),
|
||||
};
|
||||
|
|
|
|||
|
|
@ -199,7 +199,10 @@ async fn scheduler_routes_intake_tasks_by_capability() {
|
|||
)
|
||||
.await;
|
||||
// Intake now returns the full task (id + status), not just {"status":"queued"}.
|
||||
assert!(fs_intake["data"]["id"].is_string(), "intake must return task id");
|
||||
assert!(
|
||||
fs_intake["data"]["id"].is_string(),
|
||||
"intake must return task id"
|
||||
);
|
||||
|
||||
send_command(
|
||||
&socket_path,
|
||||
|
|
|
|||
|
|
@ -375,9 +375,7 @@ impl Store {
|
|||
params![now, host, agent_id],
|
||||
)?;
|
||||
if rows == 0 {
|
||||
return Err(StoreError::NotFound(format!(
|
||||
"agent not found: {agent_id}"
|
||||
)));
|
||||
return Err(StoreError::NotFound(format!("agent not found: {agent_id}")));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -797,6 +795,122 @@ mod tests {
|
|||
assert_eq!(got.status, "offline");
|
||||
}
|
||||
|
||||
/// Phase 3: register_agent must persist `host` and stamp `last_seen`.
|
||||
#[test]
|
||||
fn test_register_agent_stores_host_and_last_seen() {
|
||||
let store = Store::open_memory().unwrap();
|
||||
|
||||
let agent = store
|
||||
.register_agent("zot", serde_json::json!(["code"]), Some("osa"))
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(agent.host.as_deref(), Some("osa"));
|
||||
assert!(
|
||||
agent.last_seen.is_some(),
|
||||
"last_seen must be set on register"
|
||||
);
|
||||
|
||||
// Round-trips through get_agent (i.e. the 7-column read works).
|
||||
let got = store.get_agent(&agent.id).unwrap().unwrap();
|
||||
assert_eq!(got.host.as_deref(), Some("osa"));
|
||||
assert_eq!(got.last_seen, agent.last_seen);
|
||||
}
|
||||
|
||||
/// Phase 3: heartbeat bumps `last_seen`. Passing `None` for host must not
|
||||
/// clobber an existing host (the COALESCE contract).
|
||||
#[test]
|
||||
fn test_heartbeat_updates_last_seen_and_preserves_host() {
|
||||
let store = Store::open_memory().unwrap();
|
||||
|
||||
let agent = store
|
||||
.register_agent("zot", serde_json::json!([]), Some("osa"))
|
||||
.unwrap();
|
||||
let registered_seen = agent.last_seen.clone().unwrap();
|
||||
|
||||
// Ensure the heartbeat timestamp ticks past the register timestamp.
|
||||
std::thread::sleep(std::time::Duration::from_millis(10));
|
||||
|
||||
store.heartbeat(&agent.id, None).unwrap();
|
||||
|
||||
let got = store.get_agent(&agent.id).unwrap().unwrap();
|
||||
assert_eq!(
|
||||
got.host.as_deref(),
|
||||
Some("osa"),
|
||||
"host preserved when heartbeat omits it"
|
||||
);
|
||||
assert!(
|
||||
got.last_seen.as_deref().unwrap() > registered_seen.as_str(),
|
||||
"last_seen must advance past the register timestamp"
|
||||
);
|
||||
}
|
||||
|
||||
/// Phase 3: heartbeat with `Some(host)` updates the host in place.
|
||||
#[test]
|
||||
fn test_heartbeat_updates_host() {
|
||||
let store = Store::open_memory().unwrap();
|
||||
|
||||
let agent = store
|
||||
.register_agent("zot", serde_json::json!([]), Some("osa"))
|
||||
.unwrap();
|
||||
|
||||
store.heartbeat(&agent.id, Some("domedog")).unwrap();
|
||||
|
||||
let got = store.get_agent(&agent.id).unwrap().unwrap();
|
||||
assert_eq!(
|
||||
got.host.as_deref(),
|
||||
Some("domedog"),
|
||||
"host must update to the new value"
|
||||
);
|
||||
}
|
||||
|
||||
/// Phase 3: heartbeat on an unknown agent id fails closed (NotFound), not
|
||||
/// silently.
|
||||
#[test]
|
||||
fn test_heartbeat_unknown_agent_not_found() {
|
||||
let store = Store::open_memory().unwrap();
|
||||
|
||||
let err = store.heartbeat("nonexistent-id", None).unwrap_err();
|
||||
assert!(
|
||||
matches!(err, StoreError::NotFound(ref m) if m.contains("agent not found")),
|
||||
"expected NotFound for unknown agent, got: {err:?}"
|
||||
);
|
||||
}
|
||||
|
||||
/// Phase 3: run_migrations must be idempotent across store reopens. The
|
||||
/// `ALTER TABLE ... ADD COLUMN` statements would otherwise raise
|
||||
/// "duplicate column name" on the second open. Uses a real file because
|
||||
/// in-memory DBs don't survive a reopen.
|
||||
#[test]
|
||||
fn test_migrations_idempotent_on_reopen() {
|
||||
let mut path = std::env::temp_dir();
|
||||
path.push(format!("colibri-phase3-reopen-{}.db", std::process::id()));
|
||||
// Clean up any leftovers from a previous run before *and* after.
|
||||
let _ = std::fs::remove_file(&path);
|
||||
let _ = std::fs::remove_file(format!("{}-wal", path.to_string_lossy()));
|
||||
let _ = std::fs::remove_file(format!("{}-shm", path.to_string_lossy()));
|
||||
|
||||
{
|
||||
let store = Store::open(&path).unwrap();
|
||||
// Exercises the migrated columns end-to-end.
|
||||
store
|
||||
.register_agent("zot", serde_json::json!([]), Some("osa"))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Reopen the same file: run_migrations runs again and must NOT error
|
||||
// on the already-applied ALTER statements.
|
||||
{
|
||||
let store = Store::open(&path).unwrap();
|
||||
let agents = store.list_agents().unwrap();
|
||||
assert_eq!(agents.len(), 1, "data must survive reopen");
|
||||
assert_eq!(agents[0].host.as_deref(), Some("osa"));
|
||||
}
|
||||
|
||||
let _ = std::fs::remove_file(&path);
|
||||
let _ = std::fs::remove_file(format!("{}-wal", path.to_string_lossy()));
|
||||
let _ = std::fs::remove_file(format!("{}-shm", path.to_string_lossy()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_skills_catalog() {
|
||||
let store = Store::open_memory().unwrap();
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue