diff --git a/crates/colibri-daemon/tests/multi_agent_board.rs b/crates/colibri-daemon/tests/multi_agent_board.rs new file mode 100644 index 0000000..dee2217 --- /dev/null +++ b/crates/colibri-daemon/tests/multi_agent_board.rs @@ -0,0 +1,373 @@ +use std::path::Path; +use std::sync::Arc; +use std::time::Duration; + +use colibri_daemon::daemon::DaemonLoopConfig; +use colibri_daemon::{daemon, socket, DaemonConfig, DaemonState, SharedState}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::UnixStream; + +/// Board lifecycle with two agents: register, create, claim, transition. +/// Assignment here is explicit (manual `claim-task`) — this proves the board +/// mechanics and concurrent lifecycle, not capability-based routing. The +/// scheduler's `pick_agent` routing is proven by +/// `scheduler_routes_intake_tasks_by_capability`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn two_agents_claim_distinct_tasks_on_the_board() { + let data_dir = + std::env::temp_dir().join(format!("colibri-multi-agent-{}", uuid::Uuid::new_v4())); + std::fs::create_dir_all(&data_dir).expect("create temp data dir"); + + let mut config = DaemonConfig::from_env(); + config.data_dir = data_dir.clone(); + config.socket_path = data_dir.join("colibri.sock"); + config.db_path = data_dir.join("colibri.sqlite"); + let socket_path = config.socket_path.clone(); + + let state: SharedState = Arc::new(DaemonState::new(config)); + + let server_state = state.clone(); + let server_shutdown = state.shutdown_rx.resubscribe(); + let server = tokio::spawn(async move { + let _ = socket::serve(server_state, server_shutdown).await; + }); + + let loop_state = state.clone(); + let loop_shutdown = state.shutdown_rx.resubscribe(); + let loop_config = DaemonLoopConfig { + scheduler_interval: Duration::from_millis(50), + ..DaemonLoopConfig::default() + }; + let bg_loop = tokio::spawn(async move { + daemon::run_loop(loop_state, loop_config, loop_shutdown).await; + }); + + wait_for_socket(&socket_path).await; + + // Register two agents. + let sysadmin = send_command( + &socket_path, + r#"{"cmd":"register-agent","name":"sysadmin","capabilities":{"freebsd":true}}"#, + ) + .await; + assert!(sysadmin["ok"].as_bool().unwrap_or(false)); + let sa_id = sysadmin["data"]["id"].as_str().unwrap().to_string(); + + let dba = send_command( + &socket_path, + r#"{"cmd":"register-agent","name":"db-admin","capabilities":{"postgres":true}}"#, + ) + .await; + assert!(dba["ok"].as_bool().unwrap_or(false)); + let dba_id = dba["data"]["id"].as_str().unwrap().to_string(); + + // Both agents listed. + let agents = send_command(&socket_path, r#"{"cmd":"list-agents"}"#).await; + assert_eq!(agents["data"].as_array().unwrap().len(), 2); + + // Create two tasks. + let zfs = send_command( + &socket_path, + r#"{"cmd":"create-task","title":"scrub zroot","description":"zpool scrub"}"#, + ) + .await; + let zfs_id = zfs["data"]["id"].as_str().unwrap(); + + let vacuum = send_command( + &socket_path, + r#"{"cmd":"create-task","title":"vacuum db","description":"VACUUM ANALYZE"}"#, + ) + .await; + let vacuum_id = vacuum["data"]["id"].as_str().unwrap(); + + // Both queued. + let q = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"queued"}"#).await; + assert_eq!(q["data"].as_array().unwrap().len(), 2); + + // Each agent claims one task. + let c1 = send_command( + &socket_path, + &format!(r#"{{"cmd":"claim-task","task_id":"{zfs_id}","agent_id":"{sa_id}"}}"#), + ) + .await; + assert!(c1["ok"].as_bool().unwrap_or(false), "claim zfs: {c1}"); + + let c2 = send_command( + &socket_path, + &format!(r#"{{"cmd":"claim-task","task_id":"{vacuum_id}","agent_id":"{dba_id}"}}"#), + ) + .await; + assert!(c2["ok"].as_bool().unwrap_or(false), "claim vacuum: {c2}"); + + // Transition both to started, then done. + send_command( + &socket_path, + &format!(r#"{{"cmd":"transition-task","task_id":"{zfs_id}","status":"started"}}"#), + ) + .await; + send_command( + &socket_path, + &format!(r#"{{"cmd":"transition-task","task_id":"{vacuum_id}","status":"started"}}"#), + ) + .await; + + let active = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"started"}"#).await; + assert_eq!(active["data"].as_array().unwrap().len(), 2); + + send_command( + &socket_path, + &format!(r#"{{"cmd":"transition-task","task_id":"{zfs_id}","status":"done"}}"#), + ) + .await; + send_command( + &socket_path, + &format!(r#"{{"cmd":"transition-task","task_id":"{vacuum_id}","status":"done"}}"#), + ) + .await; + + let done = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"done"}"#).await; + assert_eq!(done["data"].as_array().unwrap().len(), 2); + + let empty = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"queued"}"#).await; + assert!(empty["data"].as_array().unwrap().is_empty()); + + state.shutdown_tx.send(()).ok(); + let _ = tokio::time::timeout(Duration::from_secs(2), server).await; + let _ = tokio::time::timeout(Duration::from_secs(2), bg_loop).await; + let _ = std::fs::remove_dir_all(&data_dir); +} + +/// The real Phase 1b proof: tasks are routed to agents *by capability* by the +/// scheduler, with no manual claim. Two agents with disjoint capabilities, two +/// intake tasks with matching `required` capabilities — the scheduler's +/// `pick_agent` must assign each task to the agent that can do it. +/// +/// Capabilities are registered as a JSON array (`["freebsd"]`); `pick_agent` +/// deserializes them as `Vec`, so the object form `{"freebsd":true}` +/// would score zero. Array form is required for routing to work. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn scheduler_routes_intake_tasks_by_capability() { + let data_dir = std::env::temp_dir().join(format!("colibri-route-{}", uuid::Uuid::new_v4())); + std::fs::create_dir_all(&data_dir).expect("create temp data dir"); + + let mut config = DaemonConfig::from_env(); + config.data_dir = data_dir.clone(); + config.socket_path = data_dir.join("colibri.sock"); + config.db_path = data_dir.join("colibri.sqlite"); + let socket_path = config.socket_path.clone(); + + let state: SharedState = Arc::new(DaemonState::new(config)); + + let server_state = state.clone(); + let server_shutdown = state.shutdown_rx.resubscribe(); + let server = tokio::spawn(async move { + let _ = socket::serve(server_state, server_shutdown).await; + }); + + let loop_state = state.clone(); + let loop_shutdown = state.shutdown_rx.resubscribe(); + let loop_config = DaemonLoopConfig { + scheduler_interval: Duration::from_millis(50), + ..DaemonLoopConfig::default() + }; + let bg_loop = tokio::spawn(async move { + daemon::run_loop(loop_state, loop_config, loop_shutdown).await; + }); + + wait_for_socket(&socket_path).await; + + // Two agents with disjoint capabilities (array form — required by pick_agent). + let sysadmin = send_command( + &socket_path, + r#"{"cmd":"register-agent","name":"sysadmin","capabilities":["freebsd"]}"#, + ) + .await; + let sa_id = sysadmin["data"]["id"].as_str().unwrap().to_string(); + + let dba = send_command( + &socket_path, + r#"{"cmd":"register-agent","name":"db-admin","capabilities":["postgres"]}"#, + ) + .await; + let dba_id = dba["data"]["id"].as_str().unwrap().to_string(); + + // Submit two intake tasks with matching required capabilities. The scheduler + // creates and claims them — we never call claim-task. + let fs_intake = send_command( + &socket_path, + r#"{"cmd":"intake-task","title":"scrub zroot","capabilities":["freebsd"]}"#, + ) + .await; + assert_eq!(fs_intake["data"]["status"].as_str(), Some("queued")); + + send_command( + &socket_path, + r#"{"cmd":"intake-task","title":"vacuum db","capabilities":["postgres"]}"#, + ) + .await; + + // Poll until the scheduler has claimed both (created + assigned by pick_agent). + let mut claimed = serde_json::Value::Null; + for _ in 0..100 { + let resp = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"claimed"}"#).await; + if resp["data"].as_array().map(|a| a.len()).unwrap_or(0) == 2 { + claimed = resp; + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + let tasks = claimed["data"] + .as_array() + .expect("scheduler never claimed both intake tasks by capability"); + assert_eq!(tasks.len(), 2, "expected 2 claimed tasks, got: {claimed}"); + + // Each task landed on the capability-matching agent — chosen by the + // scheduler, not by the test. + let agent_for = |title: &str| -> String { + tasks + .iter() + .find(|t| t["title"].as_str() == Some(title)) + .and_then(|t| t["agent_id"].as_str()) + .unwrap_or("") + .to_string() + }; + assert_eq!(agent_for("scrub zroot"), sa_id, "freebsd task -> sysadmin"); + assert_eq!(agent_for("vacuum db"), dba_id, "postgres task -> db-admin"); + + state.shutdown_tx.send(()).ok(); + let _ = tokio::time::timeout(Duration::from_secs(2), server).await; + let _ = tokio::time::timeout(Duration::from_secs(2), bg_loop).await; + let _ = std::fs::remove_dir_all(&data_dir); +} + +async fn wait_for_socket(socket_path: &Path) { + for _ in 0..100 { + if socket_path.exists() { + return; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + panic!("socket never appeared"); +} + +async fn send_command(socket_path: &Path, line: &str) -> serde_json::Value { + let stream = UnixStream::connect(socket_path).await.expect("connect"); + let (reader, mut writer) = stream.into_split(); + writer + .write_all(format!("{line}\n").as_bytes()) + .await + .expect("write"); + let mut reader = BufReader::new(reader); + let mut resp = String::new(); + tokio::time::timeout(Duration::from_secs(2), reader.read_line(&mut resp)) + .await + .expect("timeout") + .expect("read"); + serde_json::from_str(resp.trim_end()).expect("parse JSON") +} + +/// One agent claiming two tasks: documents current contention behavior (no +/// guard against one agent taking multiple tasks) and session isolation. +/// Manual claim — not a routing test. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn one_agent_handles_two_tasks_isolated_sessions() { + let data_dir = + std::env::temp_dir().join(format!("colibri-multi-task-{}", uuid::Uuid::new_v4())); + std::fs::create_dir_all(&data_dir).expect("create temp data dir"); + + let mut config = DaemonConfig::from_env(); + config.data_dir = data_dir.clone(); + config.socket_path = data_dir.join("colibri.sock"); + config.db_path = data_dir.join("colibri.sqlite"); + let socket_path = config.socket_path.clone(); + + let state: SharedState = Arc::new(DaemonState::new(config)); + + let server_state = state.clone(); + let server_shutdown = state.shutdown_rx.resubscribe(); + let server = tokio::spawn(async move { + let _ = socket::serve(server_state, server_shutdown).await; + }); + + let loop_state = state.clone(); + let loop_shutdown = state.shutdown_rx.resubscribe(); + let loop_config = DaemonLoopConfig { + scheduler_interval: Duration::from_millis(50), + ..DaemonLoopConfig::default() + }; + let bg_loop = tokio::spawn(async move { + daemon::run_loop(loop_state, loop_config, loop_shutdown).await; + }); + + wait_for_socket(&socket_path).await; + + // Register one agent with freebsd capability. + let worker = send_command( + &socket_path, + r#"{"cmd":"register-agent","name":"worker","capabilities":{"freebsd":true}}"#, + ) + .await; + assert!(worker["ok"].as_bool().unwrap_or(false)); + let worker_id = worker["data"]["id"].as_str().unwrap().to_string(); + + // Two plain board tasks. This test documents single-agent contention and + // session isolation via manual claim — capability routing is covered by + // `scheduler_routes_intake_tasks_by_capability`, not here. + let t1 = send_command( + &socket_path, + r#"{"cmd":"create-task","title":"scrub zroot","description":"zpool scrub"}"#, + ) + .await; + let t1_id = t1["data"]["id"].as_str().unwrap(); + + let t2 = send_command( + &socket_path, + r#"{"cmd":"create-task","title":"check smart","description":"smartctl -a"}"#, + ) + .await; + let t2_id = t2["data"]["id"].as_str().unwrap(); + + // Both queued. + let q = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"queued"}"#).await; + assert_eq!(q["data"].as_array().unwrap().len(), 2); + + // Same agent claims both tasks (current behavior: no guard against + // one agent taking multiple tasks). + for tid in [t1_id, t2_id] { + let claim = send_command( + &socket_path, + &format!(r#"{{"cmd":"claim-task","task_id":"{tid}","agent_id":"{worker_id}"}}"#), + ) + .await; + assert!( + claim["ok"].as_bool().unwrap_or(false), + "claim {tid}: {claim}" + ); + } + + // Transition both to started, then done. + for tid in [t1_id, t2_id] { + send_command( + &socket_path, + &format!(r#"{{"cmd":"transition-task","task_id":"{tid}","status":"started"}}"#), + ) + .await; + } + let active = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"started"}"#).await; + assert_eq!(active["data"].as_array().unwrap().len(), 2); + + for tid in [t1_id, t2_id] { + send_command( + &socket_path, + &format!(r#"{{"cmd":"transition-task","task_id":"{tid}","status":"done"}}"#), + ) + .await; + } + let done = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"done"}"#).await; + assert_eq!(done["data"].as_array().unwrap().len(), 2); + + state.shutdown_tx.send(()).ok(); + let _ = tokio::time::timeout(Duration::from_secs(2), server).await; + let _ = tokio::time::timeout(Duration::from_secs(2), bg_loop).await; + let _ = std::fs::remove_dir_all(&data_dir); +}