test(daemon): multi-agent board — lifecycle, capability routing, contention #186
1 changed files with 373 additions and 0 deletions
373
crates/colibri-daemon/tests/multi_agent_board.rs
Normal file
373
crates/colibri-daemon/tests/multi_agent_board.rs
Normal file
|
|
@ -0,0 +1,373 @@
|
|||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use colibri_daemon::daemon::DaemonLoopConfig;
|
||||
use colibri_daemon::{daemon, socket, DaemonConfig, DaemonState, SharedState};
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::net::UnixStream;
|
||||
|
||||
/// Board lifecycle with two agents: register, create, claim, transition.
|
||||
/// Assignment here is explicit (manual `claim-task`) — this proves the board
|
||||
/// mechanics and concurrent lifecycle, not capability-based routing. The
|
||||
/// scheduler's `pick_agent` routing is proven by
|
||||
/// `scheduler_routes_intake_tasks_by_capability`.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn two_agents_claim_distinct_tasks_on_the_board() {
|
||||
let data_dir =
|
||||
std::env::temp_dir().join(format!("colibri-multi-agent-{}", uuid::Uuid::new_v4()));
|
||||
std::fs::create_dir_all(&data_dir).expect("create temp data dir");
|
||||
|
||||
let mut config = DaemonConfig::from_env();
|
||||
config.data_dir = data_dir.clone();
|
||||
config.socket_path = data_dir.join("colibri.sock");
|
||||
config.db_path = data_dir.join("colibri.sqlite");
|
||||
let socket_path = config.socket_path.clone();
|
||||
|
||||
let state: SharedState = Arc::new(DaemonState::new(config));
|
||||
|
||||
let server_state = state.clone();
|
||||
let server_shutdown = state.shutdown_rx.resubscribe();
|
||||
let server = tokio::spawn(async move {
|
||||
let _ = socket::serve(server_state, server_shutdown).await;
|
||||
});
|
||||
|
||||
let loop_state = state.clone();
|
||||
let loop_shutdown = state.shutdown_rx.resubscribe();
|
||||
let loop_config = DaemonLoopConfig {
|
||||
scheduler_interval: Duration::from_millis(50),
|
||||
..DaemonLoopConfig::default()
|
||||
};
|
||||
let bg_loop = tokio::spawn(async move {
|
||||
daemon::run_loop(loop_state, loop_config, loop_shutdown).await;
|
||||
});
|
||||
|
||||
wait_for_socket(&socket_path).await;
|
||||
|
||||
// Register two agents.
|
||||
let sysadmin = send_command(
|
||||
&socket_path,
|
||||
r#"{"cmd":"register-agent","name":"sysadmin","capabilities":{"freebsd":true}}"#,
|
||||
)
|
||||
.await;
|
||||
assert!(sysadmin["ok"].as_bool().unwrap_or(false));
|
||||
let sa_id = sysadmin["data"]["id"].as_str().unwrap().to_string();
|
||||
|
||||
let dba = send_command(
|
||||
&socket_path,
|
||||
r#"{"cmd":"register-agent","name":"db-admin","capabilities":{"postgres":true}}"#,
|
||||
)
|
||||
.await;
|
||||
assert!(dba["ok"].as_bool().unwrap_or(false));
|
||||
let dba_id = dba["data"]["id"].as_str().unwrap().to_string();
|
||||
|
||||
// Both agents listed.
|
||||
let agents = send_command(&socket_path, r#"{"cmd":"list-agents"}"#).await;
|
||||
assert_eq!(agents["data"].as_array().unwrap().len(), 2);
|
||||
|
||||
// Create two tasks.
|
||||
let zfs = send_command(
|
||||
&socket_path,
|
||||
r#"{"cmd":"create-task","title":"scrub zroot","description":"zpool scrub"}"#,
|
||||
)
|
||||
.await;
|
||||
let zfs_id = zfs["data"]["id"].as_str().unwrap();
|
||||
|
||||
let vacuum = send_command(
|
||||
&socket_path,
|
||||
r#"{"cmd":"create-task","title":"vacuum db","description":"VACUUM ANALYZE"}"#,
|
||||
)
|
||||
.await;
|
||||
let vacuum_id = vacuum["data"]["id"].as_str().unwrap();
|
||||
|
||||
// Both queued.
|
||||
let q = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"queued"}"#).await;
|
||||
assert_eq!(q["data"].as_array().unwrap().len(), 2);
|
||||
|
||||
// Each agent claims one task.
|
||||
let c1 = send_command(
|
||||
&socket_path,
|
||||
&format!(r#"{{"cmd":"claim-task","task_id":"{zfs_id}","agent_id":"{sa_id}"}}"#),
|
||||
)
|
||||
.await;
|
||||
assert!(c1["ok"].as_bool().unwrap_or(false), "claim zfs: {c1}");
|
||||
|
||||
let c2 = send_command(
|
||||
&socket_path,
|
||||
&format!(r#"{{"cmd":"claim-task","task_id":"{vacuum_id}","agent_id":"{dba_id}"}}"#),
|
||||
)
|
||||
.await;
|
||||
assert!(c2["ok"].as_bool().unwrap_or(false), "claim vacuum: {c2}");
|
||||
|
||||
// Transition both to started, then done.
|
||||
send_command(
|
||||
&socket_path,
|
||||
&format!(r#"{{"cmd":"transition-task","task_id":"{zfs_id}","status":"started"}}"#),
|
||||
)
|
||||
.await;
|
||||
send_command(
|
||||
&socket_path,
|
||||
&format!(r#"{{"cmd":"transition-task","task_id":"{vacuum_id}","status":"started"}}"#),
|
||||
)
|
||||
.await;
|
||||
|
||||
let active = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"started"}"#).await;
|
||||
assert_eq!(active["data"].as_array().unwrap().len(), 2);
|
||||
|
||||
send_command(
|
||||
&socket_path,
|
||||
&format!(r#"{{"cmd":"transition-task","task_id":"{zfs_id}","status":"done"}}"#),
|
||||
)
|
||||
.await;
|
||||
send_command(
|
||||
&socket_path,
|
||||
&format!(r#"{{"cmd":"transition-task","task_id":"{vacuum_id}","status":"done"}}"#),
|
||||
)
|
||||
.await;
|
||||
|
||||
let done = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"done"}"#).await;
|
||||
assert_eq!(done["data"].as_array().unwrap().len(), 2);
|
||||
|
||||
let empty = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"queued"}"#).await;
|
||||
assert!(empty["data"].as_array().unwrap().is_empty());
|
||||
|
||||
state.shutdown_tx.send(()).ok();
|
||||
let _ = tokio::time::timeout(Duration::from_secs(2), server).await;
|
||||
let _ = tokio::time::timeout(Duration::from_secs(2), bg_loop).await;
|
||||
let _ = std::fs::remove_dir_all(&data_dir);
|
||||
}
|
||||
|
||||
/// The real Phase 1b proof: tasks are routed to agents *by capability* by the
|
||||
/// scheduler, with no manual claim. Two agents with disjoint capabilities, two
|
||||
/// intake tasks with matching `required` capabilities — the scheduler's
|
||||
/// `pick_agent` must assign each task to the agent that can do it.
|
||||
///
|
||||
/// Capabilities are registered as a JSON array (`["freebsd"]`); `pick_agent`
|
||||
/// deserializes them as `Vec<String>`, so the object form `{"freebsd":true}`
|
||||
/// would score zero. Array form is required for routing to work.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn scheduler_routes_intake_tasks_by_capability() {
|
||||
let data_dir = std::env::temp_dir().join(format!("colibri-route-{}", uuid::Uuid::new_v4()));
|
||||
std::fs::create_dir_all(&data_dir).expect("create temp data dir");
|
||||
|
||||
let mut config = DaemonConfig::from_env();
|
||||
config.data_dir = data_dir.clone();
|
||||
config.socket_path = data_dir.join("colibri.sock");
|
||||
config.db_path = data_dir.join("colibri.sqlite");
|
||||
let socket_path = config.socket_path.clone();
|
||||
|
||||
let state: SharedState = Arc::new(DaemonState::new(config));
|
||||
|
||||
let server_state = state.clone();
|
||||
let server_shutdown = state.shutdown_rx.resubscribe();
|
||||
let server = tokio::spawn(async move {
|
||||
let _ = socket::serve(server_state, server_shutdown).await;
|
||||
});
|
||||
|
||||
let loop_state = state.clone();
|
||||
let loop_shutdown = state.shutdown_rx.resubscribe();
|
||||
let loop_config = DaemonLoopConfig {
|
||||
scheduler_interval: Duration::from_millis(50),
|
||||
..DaemonLoopConfig::default()
|
||||
};
|
||||
let bg_loop = tokio::spawn(async move {
|
||||
daemon::run_loop(loop_state, loop_config, loop_shutdown).await;
|
||||
});
|
||||
|
||||
wait_for_socket(&socket_path).await;
|
||||
|
||||
// Two agents with disjoint capabilities (array form — required by pick_agent).
|
||||
let sysadmin = send_command(
|
||||
&socket_path,
|
||||
r#"{"cmd":"register-agent","name":"sysadmin","capabilities":["freebsd"]}"#,
|
||||
)
|
||||
.await;
|
||||
let sa_id = sysadmin["data"]["id"].as_str().unwrap().to_string();
|
||||
|
||||
let dba = send_command(
|
||||
&socket_path,
|
||||
r#"{"cmd":"register-agent","name":"db-admin","capabilities":["postgres"]}"#,
|
||||
)
|
||||
.await;
|
||||
let dba_id = dba["data"]["id"].as_str().unwrap().to_string();
|
||||
|
||||
// Submit two intake tasks with matching required capabilities. The scheduler
|
||||
// creates and claims them — we never call claim-task.
|
||||
let fs_intake = send_command(
|
||||
&socket_path,
|
||||
r#"{"cmd":"intake-task","title":"scrub zroot","capabilities":["freebsd"]}"#,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(fs_intake["data"]["status"].as_str(), Some("queued"));
|
||||
|
||||
send_command(
|
||||
&socket_path,
|
||||
r#"{"cmd":"intake-task","title":"vacuum db","capabilities":["postgres"]}"#,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Poll until the scheduler has claimed both (created + assigned by pick_agent).
|
||||
let mut claimed = serde_json::Value::Null;
|
||||
for _ in 0..100 {
|
||||
let resp = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"claimed"}"#).await;
|
||||
if resp["data"].as_array().map(|a| a.len()).unwrap_or(0) == 2 {
|
||||
claimed = resp;
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
let tasks = claimed["data"]
|
||||
.as_array()
|
||||
.expect("scheduler never claimed both intake tasks by capability");
|
||||
assert_eq!(tasks.len(), 2, "expected 2 claimed tasks, got: {claimed}");
|
||||
|
||||
// Each task landed on the capability-matching agent — chosen by the
|
||||
// scheduler, not by the test.
|
||||
let agent_for = |title: &str| -> String {
|
||||
tasks
|
||||
.iter()
|
||||
.find(|t| t["title"].as_str() == Some(title))
|
||||
.and_then(|t| t["agent_id"].as_str())
|
||||
.unwrap_or("")
|
||||
.to_string()
|
||||
};
|
||||
assert_eq!(agent_for("scrub zroot"), sa_id, "freebsd task -> sysadmin");
|
||||
assert_eq!(agent_for("vacuum db"), dba_id, "postgres task -> db-admin");
|
||||
|
||||
state.shutdown_tx.send(()).ok();
|
||||
let _ = tokio::time::timeout(Duration::from_secs(2), server).await;
|
||||
let _ = tokio::time::timeout(Duration::from_secs(2), bg_loop).await;
|
||||
let _ = std::fs::remove_dir_all(&data_dir);
|
||||
}
|
||||
|
||||
async fn wait_for_socket(socket_path: &Path) {
|
||||
for _ in 0..100 {
|
||||
if socket_path.exists() {
|
||||
return;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(20)).await;
|
||||
}
|
||||
panic!("socket never appeared");
|
||||
}
|
||||
|
||||
async fn send_command(socket_path: &Path, line: &str) -> serde_json::Value {
|
||||
let stream = UnixStream::connect(socket_path).await.expect("connect");
|
||||
let (reader, mut writer) = stream.into_split();
|
||||
writer
|
||||
.write_all(format!("{line}\n").as_bytes())
|
||||
.await
|
||||
.expect("write");
|
||||
let mut reader = BufReader::new(reader);
|
||||
let mut resp = String::new();
|
||||
tokio::time::timeout(Duration::from_secs(2), reader.read_line(&mut resp))
|
||||
.await
|
||||
.expect("timeout")
|
||||
.expect("read");
|
||||
serde_json::from_str(resp.trim_end()).expect("parse JSON")
|
||||
}
|
||||
|
||||
/// One agent claiming two tasks: documents current contention behavior (no
|
||||
/// guard against one agent taking multiple tasks) and session isolation.
|
||||
/// Manual claim — not a routing test.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn one_agent_handles_two_tasks_isolated_sessions() {
|
||||
let data_dir =
|
||||
std::env::temp_dir().join(format!("colibri-multi-task-{}", uuid::Uuid::new_v4()));
|
||||
std::fs::create_dir_all(&data_dir).expect("create temp data dir");
|
||||
|
||||
let mut config = DaemonConfig::from_env();
|
||||
config.data_dir = data_dir.clone();
|
||||
config.socket_path = data_dir.join("colibri.sock");
|
||||
config.db_path = data_dir.join("colibri.sqlite");
|
||||
let socket_path = config.socket_path.clone();
|
||||
|
||||
let state: SharedState = Arc::new(DaemonState::new(config));
|
||||
|
||||
let server_state = state.clone();
|
||||
let server_shutdown = state.shutdown_rx.resubscribe();
|
||||
let server = tokio::spawn(async move {
|
||||
let _ = socket::serve(server_state, server_shutdown).await;
|
||||
});
|
||||
|
||||
let loop_state = state.clone();
|
||||
let loop_shutdown = state.shutdown_rx.resubscribe();
|
||||
let loop_config = DaemonLoopConfig {
|
||||
scheduler_interval: Duration::from_millis(50),
|
||||
..DaemonLoopConfig::default()
|
||||
};
|
||||
let bg_loop = tokio::spawn(async move {
|
||||
daemon::run_loop(loop_state, loop_config, loop_shutdown).await;
|
||||
});
|
||||
|
||||
wait_for_socket(&socket_path).await;
|
||||
|
||||
// Register one agent with freebsd capability.
|
||||
let worker = send_command(
|
||||
&socket_path,
|
||||
r#"{"cmd":"register-agent","name":"worker","capabilities":{"freebsd":true}}"#,
|
||||
)
|
||||
.await;
|
||||
assert!(worker["ok"].as_bool().unwrap_or(false));
|
||||
let worker_id = worker["data"]["id"].as_str().unwrap().to_string();
|
||||
|
||||
// Two plain board tasks. This test documents single-agent contention and
|
||||
// session isolation via manual claim — capability routing is covered by
|
||||
// `scheduler_routes_intake_tasks_by_capability`, not here.
|
||||
let t1 = send_command(
|
||||
&socket_path,
|
||||
r#"{"cmd":"create-task","title":"scrub zroot","description":"zpool scrub"}"#,
|
||||
)
|
||||
.await;
|
||||
let t1_id = t1["data"]["id"].as_str().unwrap();
|
||||
|
||||
let t2 = send_command(
|
||||
&socket_path,
|
||||
r#"{"cmd":"create-task","title":"check smart","description":"smartctl -a"}"#,
|
||||
)
|
||||
.await;
|
||||
let t2_id = t2["data"]["id"].as_str().unwrap();
|
||||
|
||||
// Both queued.
|
||||
let q = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"queued"}"#).await;
|
||||
assert_eq!(q["data"].as_array().unwrap().len(), 2);
|
||||
|
||||
// Same agent claims both tasks (current behavior: no guard against
|
||||
// one agent taking multiple tasks).
|
||||
for tid in [t1_id, t2_id] {
|
||||
let claim = send_command(
|
||||
&socket_path,
|
||||
&format!(r#"{{"cmd":"claim-task","task_id":"{tid}","agent_id":"{worker_id}"}}"#),
|
||||
)
|
||||
.await;
|
||||
assert!(
|
||||
claim["ok"].as_bool().unwrap_or(false),
|
||||
"claim {tid}: {claim}"
|
||||
);
|
||||
}
|
||||
|
||||
// Transition both to started, then done.
|
||||
for tid in [t1_id, t2_id] {
|
||||
send_command(
|
||||
&socket_path,
|
||||
&format!(r#"{{"cmd":"transition-task","task_id":"{tid}","status":"started"}}"#),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
let active = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"started"}"#).await;
|
||||
assert_eq!(active["data"].as_array().unwrap().len(), 2);
|
||||
|
||||
for tid in [t1_id, t2_id] {
|
||||
send_command(
|
||||
&socket_path,
|
||||
&format!(r#"{{"cmd":"transition-task","task_id":"{tid}","status":"done"}}"#),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
let done = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"done"}"#).await;
|
||||
assert_eq!(done["data"].as_array().unwrap().len(), 2);
|
||||
|
||||
state.shutdown_tx.send(()).ok();
|
||||
let _ = tokio::time::timeout(Duration::from_secs(2), server).await;
|
||||
let _ = tokio::time::timeout(Duration::from_secs(2), bg_loop).await;
|
||||
let _ = std::fs::remove_dir_all(&data_dir);
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue