test(daemon): multi-agent board — lifecycle, capability routing, contention #186

Merged
clawdie merged 4 commits from feat/multi-agent-board-tests into main 2026-06-25 16:58:26 +02:00

View file

@ -0,0 +1,373 @@
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use colibri_daemon::daemon::DaemonLoopConfig;
use colibri_daemon::{daemon, socket, DaemonConfig, DaemonState, SharedState};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
/// Board lifecycle with two agents: register, create, claim, transition.
/// Assignment here is explicit (manual `claim-task`) — this proves the board
/// mechanics and concurrent lifecycle, not capability-based routing. The
/// scheduler's `pick_agent` routing is proven by
/// `scheduler_routes_intake_tasks_by_capability`.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn two_agents_claim_distinct_tasks_on_the_board() {
let data_dir =
std::env::temp_dir().join(format!("colibri-multi-agent-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&data_dir).expect("create temp data dir");
let mut config = DaemonConfig::from_env();
config.data_dir = data_dir.clone();
config.socket_path = data_dir.join("colibri.sock");
config.db_path = data_dir.join("colibri.sqlite");
let socket_path = config.socket_path.clone();
let state: SharedState = Arc::new(DaemonState::new(config));
let server_state = state.clone();
let server_shutdown = state.shutdown_rx.resubscribe();
let server = tokio::spawn(async move {
let _ = socket::serve(server_state, server_shutdown).await;
});
let loop_state = state.clone();
let loop_shutdown = state.shutdown_rx.resubscribe();
let loop_config = DaemonLoopConfig {
scheduler_interval: Duration::from_millis(50),
..DaemonLoopConfig::default()
};
let bg_loop = tokio::spawn(async move {
daemon::run_loop(loop_state, loop_config, loop_shutdown).await;
});
wait_for_socket(&socket_path).await;
// Register two agents.
let sysadmin = send_command(
&socket_path,
r#"{"cmd":"register-agent","name":"sysadmin","capabilities":{"freebsd":true}}"#,
)
.await;
assert!(sysadmin["ok"].as_bool().unwrap_or(false));
let sa_id = sysadmin["data"]["id"].as_str().unwrap().to_string();
let dba = send_command(
&socket_path,
r#"{"cmd":"register-agent","name":"db-admin","capabilities":{"postgres":true}}"#,
)
.await;
assert!(dba["ok"].as_bool().unwrap_or(false));
let dba_id = dba["data"]["id"].as_str().unwrap().to_string();
// Both agents listed.
let agents = send_command(&socket_path, r#"{"cmd":"list-agents"}"#).await;
assert_eq!(agents["data"].as_array().unwrap().len(), 2);
// Create two tasks.
let zfs = send_command(
&socket_path,
r#"{"cmd":"create-task","title":"scrub zroot","description":"zpool scrub"}"#,
)
.await;
let zfs_id = zfs["data"]["id"].as_str().unwrap();
let vacuum = send_command(
&socket_path,
r#"{"cmd":"create-task","title":"vacuum db","description":"VACUUM ANALYZE"}"#,
)
.await;
let vacuum_id = vacuum["data"]["id"].as_str().unwrap();
// Both queued.
let q = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"queued"}"#).await;
assert_eq!(q["data"].as_array().unwrap().len(), 2);
// Each agent claims one task.
let c1 = send_command(
&socket_path,
&format!(r#"{{"cmd":"claim-task","task_id":"{zfs_id}","agent_id":"{sa_id}"}}"#),
)
.await;
assert!(c1["ok"].as_bool().unwrap_or(false), "claim zfs: {c1}");
let c2 = send_command(
&socket_path,
&format!(r#"{{"cmd":"claim-task","task_id":"{vacuum_id}","agent_id":"{dba_id}"}}"#),
)
.await;
assert!(c2["ok"].as_bool().unwrap_or(false), "claim vacuum: {c2}");
// Transition both to started, then done.
send_command(
&socket_path,
&format!(r#"{{"cmd":"transition-task","task_id":"{zfs_id}","status":"started"}}"#),
)
.await;
send_command(
&socket_path,
&format!(r#"{{"cmd":"transition-task","task_id":"{vacuum_id}","status":"started"}}"#),
)
.await;
let active = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"started"}"#).await;
assert_eq!(active["data"].as_array().unwrap().len(), 2);
send_command(
&socket_path,
&format!(r#"{{"cmd":"transition-task","task_id":"{zfs_id}","status":"done"}}"#),
)
.await;
send_command(
&socket_path,
&format!(r#"{{"cmd":"transition-task","task_id":"{vacuum_id}","status":"done"}}"#),
)
.await;
let done = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"done"}"#).await;
assert_eq!(done["data"].as_array().unwrap().len(), 2);
let empty = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"queued"}"#).await;
assert!(empty["data"].as_array().unwrap().is_empty());
state.shutdown_tx.send(()).ok();
let _ = tokio::time::timeout(Duration::from_secs(2), server).await;
let _ = tokio::time::timeout(Duration::from_secs(2), bg_loop).await;
let _ = std::fs::remove_dir_all(&data_dir);
}
/// The real Phase 1b proof: tasks are routed to agents *by capability* by the
/// scheduler, with no manual claim. Two agents with disjoint capabilities, two
/// intake tasks with matching `required` capabilities — the scheduler's
/// `pick_agent` must assign each task to the agent that can do it.
///
/// Capabilities are registered as a JSON array (`["freebsd"]`); `pick_agent`
/// deserializes them as `Vec<String>`, so the object form `{"freebsd":true}`
/// would score zero. Array form is required for routing to work.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn scheduler_routes_intake_tasks_by_capability() {
let data_dir = std::env::temp_dir().join(format!("colibri-route-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&data_dir).expect("create temp data dir");
let mut config = DaemonConfig::from_env();
config.data_dir = data_dir.clone();
config.socket_path = data_dir.join("colibri.sock");
config.db_path = data_dir.join("colibri.sqlite");
let socket_path = config.socket_path.clone();
let state: SharedState = Arc::new(DaemonState::new(config));
let server_state = state.clone();
let server_shutdown = state.shutdown_rx.resubscribe();
let server = tokio::spawn(async move {
let _ = socket::serve(server_state, server_shutdown).await;
});
let loop_state = state.clone();
let loop_shutdown = state.shutdown_rx.resubscribe();
let loop_config = DaemonLoopConfig {
scheduler_interval: Duration::from_millis(50),
..DaemonLoopConfig::default()
};
let bg_loop = tokio::spawn(async move {
daemon::run_loop(loop_state, loop_config, loop_shutdown).await;
});
wait_for_socket(&socket_path).await;
// Two agents with disjoint capabilities (array form — required by pick_agent).
let sysadmin = send_command(
&socket_path,
r#"{"cmd":"register-agent","name":"sysadmin","capabilities":["freebsd"]}"#,
)
.await;
let sa_id = sysadmin["data"]["id"].as_str().unwrap().to_string();
let dba = send_command(
&socket_path,
r#"{"cmd":"register-agent","name":"db-admin","capabilities":["postgres"]}"#,
)
.await;
let dba_id = dba["data"]["id"].as_str().unwrap().to_string();
// Submit two intake tasks with matching required capabilities. The scheduler
// creates and claims them — we never call claim-task.
let fs_intake = send_command(
&socket_path,
r#"{"cmd":"intake-task","title":"scrub zroot","capabilities":["freebsd"]}"#,
)
.await;
assert_eq!(fs_intake["data"]["status"].as_str(), Some("queued"));
send_command(
&socket_path,
r#"{"cmd":"intake-task","title":"vacuum db","capabilities":["postgres"]}"#,
)
.await;
// Poll until the scheduler has claimed both (created + assigned by pick_agent).
let mut claimed = serde_json::Value::Null;
for _ in 0..100 {
let resp = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"claimed"}"#).await;
if resp["data"].as_array().map(|a| a.len()).unwrap_or(0) == 2 {
claimed = resp;
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
let tasks = claimed["data"]
.as_array()
.expect("scheduler never claimed both intake tasks by capability");
assert_eq!(tasks.len(), 2, "expected 2 claimed tasks, got: {claimed}");
// Each task landed on the capability-matching agent — chosen by the
// scheduler, not by the test.
let agent_for = |title: &str| -> String {
tasks
.iter()
.find(|t| t["title"].as_str() == Some(title))
.and_then(|t| t["agent_id"].as_str())
.unwrap_or("")
.to_string()
};
assert_eq!(agent_for("scrub zroot"), sa_id, "freebsd task -> sysadmin");
assert_eq!(agent_for("vacuum db"), dba_id, "postgres task -> db-admin");
state.shutdown_tx.send(()).ok();
let _ = tokio::time::timeout(Duration::from_secs(2), server).await;
let _ = tokio::time::timeout(Duration::from_secs(2), bg_loop).await;
let _ = std::fs::remove_dir_all(&data_dir);
}
async fn wait_for_socket(socket_path: &Path) {
for _ in 0..100 {
if socket_path.exists() {
return;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
panic!("socket never appeared");
}
async fn send_command(socket_path: &Path, line: &str) -> serde_json::Value {
let stream = UnixStream::connect(socket_path).await.expect("connect");
let (reader, mut writer) = stream.into_split();
writer
.write_all(format!("{line}\n").as_bytes())
.await
.expect("write");
let mut reader = BufReader::new(reader);
let mut resp = String::new();
tokio::time::timeout(Duration::from_secs(2), reader.read_line(&mut resp))
.await
.expect("timeout")
.expect("read");
serde_json::from_str(resp.trim_end()).expect("parse JSON")
}
/// One agent claiming two tasks: documents current contention behavior (no
/// guard against one agent taking multiple tasks) and session isolation.
/// Manual claim — not a routing test.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn one_agent_handles_two_tasks_isolated_sessions() {
let data_dir =
std::env::temp_dir().join(format!("colibri-multi-task-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&data_dir).expect("create temp data dir");
let mut config = DaemonConfig::from_env();
config.data_dir = data_dir.clone();
config.socket_path = data_dir.join("colibri.sock");
config.db_path = data_dir.join("colibri.sqlite");
let socket_path = config.socket_path.clone();
let state: SharedState = Arc::new(DaemonState::new(config));
let server_state = state.clone();
let server_shutdown = state.shutdown_rx.resubscribe();
let server = tokio::spawn(async move {
let _ = socket::serve(server_state, server_shutdown).await;
});
let loop_state = state.clone();
let loop_shutdown = state.shutdown_rx.resubscribe();
let loop_config = DaemonLoopConfig {
scheduler_interval: Duration::from_millis(50),
..DaemonLoopConfig::default()
};
let bg_loop = tokio::spawn(async move {
daemon::run_loop(loop_state, loop_config, loop_shutdown).await;
});
wait_for_socket(&socket_path).await;
// Register one agent with freebsd capability.
let worker = send_command(
&socket_path,
r#"{"cmd":"register-agent","name":"worker","capabilities":{"freebsd":true}}"#,
)
.await;
assert!(worker["ok"].as_bool().unwrap_or(false));
let worker_id = worker["data"]["id"].as_str().unwrap().to_string();
// Two plain board tasks. This test documents single-agent contention and
// session isolation via manual claim — capability routing is covered by
// `scheduler_routes_intake_tasks_by_capability`, not here.
let t1 = send_command(
&socket_path,
r#"{"cmd":"create-task","title":"scrub zroot","description":"zpool scrub"}"#,
)
.await;
let t1_id = t1["data"]["id"].as_str().unwrap();
let t2 = send_command(
&socket_path,
r#"{"cmd":"create-task","title":"check smart","description":"smartctl -a"}"#,
)
.await;
let t2_id = t2["data"]["id"].as_str().unwrap();
// Both queued.
let q = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"queued"}"#).await;
assert_eq!(q["data"].as_array().unwrap().len(), 2);
// Same agent claims both tasks (current behavior: no guard against
// one agent taking multiple tasks).
for tid in [t1_id, t2_id] {
let claim = send_command(
&socket_path,
&format!(r#"{{"cmd":"claim-task","task_id":"{tid}","agent_id":"{worker_id}"}}"#),
)
.await;
assert!(
claim["ok"].as_bool().unwrap_or(false),
"claim {tid}: {claim}"
);
}
// Transition both to started, then done.
for tid in [t1_id, t2_id] {
send_command(
&socket_path,
&format!(r#"{{"cmd":"transition-task","task_id":"{tid}","status":"started"}}"#),
)
.await;
}
let active = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"started"}"#).await;
assert_eq!(active["data"].as_array().unwrap().len(), 2);
for tid in [t1_id, t2_id] {
send_command(
&socket_path,
&format!(r#"{{"cmd":"transition-task","task_id":"{tid}","status":"done"}}"#),
)
.await;
}
let done = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"done"}"#).await;
assert_eq!(done["data"].as_array().unwrap().len(), 2);
state.shutdown_tx.send(()).ok();
let _ = tokio::time::timeout(Duration::from_secs(2), server).await;
let _ = tokio::time::timeout(Duration::from_secs(2), bg_loop).await;
let _ = std::fs::remove_dir_all(&data_dir);
}