From 6264d5f4328504904743153e6b470a47a2fb9bb4 Mon Sep 17 00:00:00 2001 From: Sam & Claude Date: Thu, 25 Jun 2026 15:55:33 +0200 Subject: [PATCH 1/4] =?UTF-8?q?test(daemon):=20multi-agent=20board=20?= =?UTF-8?q?=E2=80=94=20register,=20claim,=20lifecycle?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Proves the coordination board handles two agents concurrently: - Register agents with distinct capability sets - Create tasks visible to both agents - Each agent claims only their assigned task - Tasks flow through the full lifecycle: queued→claimed→started→done - Final state: both done, zero queued Uses isolated daemon with temp paths — zero production impact. --- .../colibri-daemon/tests/multi_agent_board.rs | 159 ++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 crates/colibri-daemon/tests/multi_agent_board.rs diff --git a/crates/colibri-daemon/tests/multi_agent_board.rs b/crates/colibri-daemon/tests/multi_agent_board.rs new file mode 100644 index 0000000..fe0df9d --- /dev/null +++ b/crates/colibri-daemon/tests/multi_agent_board.rs @@ -0,0 +1,159 @@ +use std::path::Path; +use std::sync::Arc; +use std::time::Duration; + +use colibri_daemon::daemon::DaemonLoopConfig; +use colibri_daemon::{daemon, socket, DaemonConfig, DaemonState, SharedState}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::UnixStream; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn two_agents_claim_distinct_tasks_on_the_board() { + let data_dir = + std::env::temp_dir().join(format!("colibri-multi-agent-{}", uuid::Uuid::new_v4())); + std::fs::create_dir_all(&data_dir).expect("create temp data dir"); + + let mut config = DaemonConfig::from_env(); + config.data_dir = data_dir.clone(); + config.socket_path = data_dir.join("colibri.sock"); + config.db_path = data_dir.join("colibri.sqlite"); + let socket_path = config.socket_path.clone(); + + let state: SharedState = Arc::new(DaemonState::new(config)); + + let server_state = state.clone(); + let server_shutdown = state.shutdown_rx.resubscribe(); + let server = tokio::spawn(async move { + let _ = socket::serve(server_state, server_shutdown).await; + }); + + let loop_state = state.clone(); + let loop_shutdown = state.shutdown_rx.resubscribe(); + let loop_config = DaemonLoopConfig { + scheduler_interval: Duration::from_millis(50), + ..DaemonLoopConfig::default() + }; + let bg_loop = tokio::spawn(async move { + daemon::run_loop(loop_state, loop_config, loop_shutdown).await; + }); + + wait_for_socket(&socket_path).await; + + // Register two agents. + let sysadmin = send_command( + &socket_path, + r#"{"cmd":"register-agent","name":"sysadmin","capabilities":{"freebsd":true}}"#, + ) + .await; + assert!(sysadmin["ok"].as_bool().unwrap_or(false)); + let sa_id = sysadmin["data"]["id"].as_str().unwrap().to_string(); + + let dba = send_command( + &socket_path, + r#"{"cmd":"register-agent","name":"db-admin","capabilities":{"postgres":true}}"#, + ) + .await; + assert!(dba["ok"].as_bool().unwrap_or(false)); + let dba_id = dba["data"]["id"].as_str().unwrap().to_string(); + + // Both agents listed. + let agents = send_command(&socket_path, r#"{"cmd":"list-agents"}"#).await; + assert_eq!(agents["data"].as_array().unwrap().len(), 2); + + // Create two tasks. + let zfs = send_command( + &socket_path, + r#"{"cmd":"create-task","title":"scrub zroot","description":"zpool scrub"}"#, + ) + .await; + let zfs_id = zfs["data"]["id"].as_str().unwrap(); + + let vacuum = send_command( + &socket_path, + r#"{"cmd":"create-task","title":"vacuum db","description":"VACUUM ANALYZE"}"#, + ) + .await; + let vacuum_id = vacuum["data"]["id"].as_str().unwrap(); + + // Both queued. + let q = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"queued"}"#).await; + assert_eq!(q["data"].as_array().unwrap().len(), 2); + + // Each agent claims one task. + let c1 = send_command( + &socket_path, + &format!(r#"{{"cmd":"claim-task","task_id":"{zfs_id}","agent_id":"{sa_id}"}}"#), + ) + .await; + assert!(c1["ok"].as_bool().unwrap_or(false), "claim zfs: {c1}"); + + let c2 = send_command( + &socket_path, + &format!(r#"{{"cmd":"claim-task","task_id":"{vacuum_id}","agent_id":"{dba_id}"}}"#), + ) + .await; + assert!(c2["ok"].as_bool().unwrap_or(false), "claim vacuum: {c2}"); + + // Transition both to started, then done. + send_command( + &socket_path, + &format!(r#"{{"cmd":"transition-task","task_id":"{zfs_id}","status":"started"}}"#), + ) + .await; + send_command( + &socket_path, + &format!(r#"{{"cmd":"transition-task","task_id":"{vacuum_id}","status":"started"}}"#), + ) + .await; + + let active = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"started"}"#).await; + assert_eq!(active["data"].as_array().unwrap().len(), 2); + + send_command( + &socket_path, + &format!(r#"{{"cmd":"transition-task","task_id":"{zfs_id}","status":"done"}}"#), + ) + .await; + send_command( + &socket_path, + &format!(r#"{{"cmd":"transition-task","task_id":"{vacuum_id}","status":"done"}}"#), + ) + .await; + + let done = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"done"}"#).await; + assert_eq!(done["data"].as_array().unwrap().len(), 2); + + let empty = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"queued"}"#).await; + assert!(empty["data"].as_array().unwrap().is_empty()); + + state.shutdown_tx.send(()).ok(); + let _ = tokio::time::timeout(Duration::from_secs(2), server).await; + let _ = tokio::time::timeout(Duration::from_secs(2), bg_loop).await; + let _ = std::fs::remove_dir_all(&data_dir); +} + +async fn wait_for_socket(socket_path: &Path) { + for _ in 0..100 { + if socket_path.exists() { + return; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + panic!("socket never appeared"); +} + +async fn send_command(socket_path: &Path, line: &str) -> serde_json::Value { + let stream = UnixStream::connect(socket_path).await.expect("connect"); + let (reader, mut writer) = stream.into_split(); + writer + .write_all(format!("{line}\n").as_bytes()) + .await + .expect("write"); + let mut reader = BufReader::new(reader); + let mut resp = String::new(); + tokio::time::timeout(Duration::from_secs(2), reader.read_line(&mut resp)) + .await + .expect("timeout") + .expect("read"); + serde_json::from_str(resp.trim_end()).expect("parse JSON") +} -- 2.45.3 From 2288c12c8bc73794d28aeb4defa54dc4d1b1ebba Mon Sep 17 00:00:00 2001 From: 123kupola Date: Thu, 25 Jun 2026 16:26:35 +0200 Subject: [PATCH 2/4] =?UTF-8?q?test(daemon):=20multi-agent=20board=20?= =?UTF-8?q?=E2=80=94=202-agent=20distinct=20claims=20+=201-agent=20multi-t?= =?UTF-8?q?ask?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1b: two agents claim distinct tasks on the board. Registers sysadmin (freebsd) and db-admin (postgres), creates two tasks, each agent claims one, both transition queued→claimed →started→done. Proves different agents get different tasks. Phase 1c: one agent handles two tasks with isolated sessions. Registers one worker (freebsd), submits two freebsd tasks, same agent claims both, both transition to done independently. Documents current behavior (no guard against multi-claim) and proves session isolation. All workspace tests green (0 failures). --- .../colibri-daemon/tests/multi_agent_board.rs | 98 +++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/crates/colibri-daemon/tests/multi_agent_board.rs b/crates/colibri-daemon/tests/multi_agent_board.rs index fe0df9d..048cb61 100644 --- a/crates/colibri-daemon/tests/multi_agent_board.rs +++ b/crates/colibri-daemon/tests/multi_agent_board.rs @@ -157,3 +157,101 @@ async fn send_command(socket_path: &Path, line: &str) -> serde_json::Value { .expect("read"); serde_json::from_str(resp.trim_end()).expect("parse JSON") } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn one_agent_handles_two_tasks_isolated_sessions() { + let data_dir = + std::env::temp_dir().join(format!("colibri-multi-task-{}", uuid::Uuid::new_v4())); + std::fs::create_dir_all(&data_dir).expect("create temp data dir"); + + let mut config = DaemonConfig::from_env(); + config.data_dir = data_dir.clone(); + config.socket_path = data_dir.join("colibri.sock"); + config.db_path = data_dir.join("colibri.sqlite"); + let socket_path = config.socket_path.clone(); + + let state: SharedState = Arc::new(DaemonState::new(config)); + + let server_state = state.clone(); + let server_shutdown = state.shutdown_rx.resubscribe(); + let server = tokio::spawn(async move { + let _ = socket::serve(server_state, server_shutdown).await; + }); + + let loop_state = state.clone(); + let loop_shutdown = state.shutdown_rx.resubscribe(); + let loop_config = DaemonLoopConfig { + scheduler_interval: Duration::from_millis(50), + ..DaemonLoopConfig::default() + }; + let bg_loop = tokio::spawn(async move { + daemon::run_loop(loop_state, loop_config, loop_shutdown).await; + }); + + wait_for_socket(&socket_path).await; + + // Register one agent with freebsd capability. + let worker = send_command( + &socket_path, + r#"{"cmd":"register-agent","name":"worker","capabilities":{"freebsd":true}}"#, + ) + .await; + assert!(worker["ok"].as_bool().unwrap_or(false)); + let worker_id = worker["data"]["id"].as_str().unwrap().to_string(); + + // Submit two intake tasks, both requiring freebsd. + let t1 = send_command( + &socket_path, + r#"{"cmd":"create-task","title":"scrub zroot","description":"zpool scrub"}"#, + ) + .await; + let t1_id = t1["data"]["id"].as_str().unwrap(); + + let t2 = send_command( + &socket_path, + r#"{"cmd":"create-task","title":"check smart","description":"smartctl -a"}"#, + ) + .await; + let t2_id = t2["data"]["id"].as_str().unwrap(); + + // Both queued. + let q = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"queued"}"#).await; + assert_eq!(q["data"].as_array().unwrap().len(), 2); + + // Same agent claims both tasks (current behavior: no guard against + // one agent taking multiple tasks). + for tid in [t1_id, t2_id] { + let claim = send_command( + &socket_path, + &format!(r#"{{"cmd":"claim-task","task_id":"{tid}","agent_id":"{worker_id}"}}"#), + ) + .await; + assert!(claim["ok"].as_bool().unwrap_or(false), "claim {tid}: {claim}"); + } + + // Transition both to started, then done. + for tid in [t1_id, t2_id] { + send_command( + &socket_path, + &format!(r#"{{"cmd":"transition-task","task_id":"{tid}","status":"started"}}"#), + ) + .await; + } + let active = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"started"}"#).await; + assert_eq!(active["data"].as_array().unwrap().len(), 2); + + for tid in [t1_id, t2_id] { + send_command( + &socket_path, + &format!(r#"{{"cmd":"transition-task","task_id":"{tid}","status":"done"}}"#), + ) + .await; + } + let done = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"done"}"#).await; + assert_eq!(done["data"].as_array().unwrap().len(), 2); + + state.shutdown_tx.send(()).ok(); + let _ = tokio::time::timeout(Duration::from_secs(2), server).await; + let _ = tokio::time::timeout(Duration::from_secs(2), bg_loop).await; + let _ = std::fs::remove_dir_all(&data_dir); +} -- 2.45.3 From e1dfe1db52c16dd7f00547a5e15bdcc1c2a4c4b6 Mon Sep 17 00:00:00 2001 From: Sam & Claude Date: Thu, 25 Jun 2026 16:36:12 +0200 Subject: [PATCH 3/4] test(daemon): add real capability-routing test; fix misleading comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The two existing tests assign tasks by manual claim-task — they prove board lifecycle + contention + session isolation, not capability routing. One comment claimed 'intake tasks requiring freebsd' while the code used plain create-task with no required capabilities; corrected, and added doc comments scoping each test honestly. Add scheduler_routes_intake_tasks_by_capability: the actual Phase 1b proof. Two agents with disjoint capabilities, two intake-task submissions with matching required caps, and the scheduler's pick_agent assigns each task to the capable agent — no manual claim. Note capabilities must be registered in array form (['freebsd']); pick_agent deserializes Vec, so the object form scores zero. All 3 board tests pass; full colibri-daemon suite green (91 unit + integration), clippy --tests clean. Co-Authored-By: Claude Opus 4.8 --- .../colibri-daemon/tests/multi_agent_board.rs | 116 +++++++++++++++++- 1 file changed, 115 insertions(+), 1 deletion(-) diff --git a/crates/colibri-daemon/tests/multi_agent_board.rs b/crates/colibri-daemon/tests/multi_agent_board.rs index 048cb61..6e7b59b 100644 --- a/crates/colibri-daemon/tests/multi_agent_board.rs +++ b/crates/colibri-daemon/tests/multi_agent_board.rs @@ -7,6 +7,11 @@ use colibri_daemon::{daemon, socket, DaemonConfig, DaemonState, SharedState}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::UnixStream; +/// Board lifecycle with two agents: register, create, claim, transition. +/// Assignment here is explicit (manual `claim-task`) — this proves the board +/// mechanics and concurrent lifecycle, not capability-based routing. The +/// scheduler's `pick_agent` routing is proven by +/// `scheduler_routes_intake_tasks_by_capability`. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn two_agents_claim_distinct_tasks_on_the_board() { let data_dir = @@ -132,6 +137,110 @@ async fn two_agents_claim_distinct_tasks_on_the_board() { let _ = std::fs::remove_dir_all(&data_dir); } +/// The real Phase 1b proof: tasks are routed to agents *by capability* by the +/// scheduler, with no manual claim. Two agents with disjoint capabilities, two +/// intake tasks with matching `required` capabilities — the scheduler's +/// `pick_agent` must assign each task to the agent that can do it. +/// +/// Capabilities are registered as a JSON array (`["freebsd"]`); `pick_agent` +/// deserializes them as `Vec`, so the object form `{"freebsd":true}` +/// would score zero. Array form is required for routing to work. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn scheduler_routes_intake_tasks_by_capability() { + let data_dir = + std::env::temp_dir().join(format!("colibri-route-{}", uuid::Uuid::new_v4())); + std::fs::create_dir_all(&data_dir).expect("create temp data dir"); + + let mut config = DaemonConfig::from_env(); + config.data_dir = data_dir.clone(); + config.socket_path = data_dir.join("colibri.sock"); + config.db_path = data_dir.join("colibri.sqlite"); + let socket_path = config.socket_path.clone(); + + let state: SharedState = Arc::new(DaemonState::new(config)); + + let server_state = state.clone(); + let server_shutdown = state.shutdown_rx.resubscribe(); + let server = tokio::spawn(async move { + let _ = socket::serve(server_state, server_shutdown).await; + }); + + let loop_state = state.clone(); + let loop_shutdown = state.shutdown_rx.resubscribe(); + let loop_config = DaemonLoopConfig { + scheduler_interval: Duration::from_millis(50), + ..DaemonLoopConfig::default() + }; + let bg_loop = tokio::spawn(async move { + daemon::run_loop(loop_state, loop_config, loop_shutdown).await; + }); + + wait_for_socket(&socket_path).await; + + // Two agents with disjoint capabilities (array form — required by pick_agent). + let sysadmin = send_command( + &socket_path, + r#"{"cmd":"register-agent","name":"sysadmin","capabilities":["freebsd"]}"#, + ) + .await; + let sa_id = sysadmin["data"]["id"].as_str().unwrap().to_string(); + + let dba = send_command( + &socket_path, + r#"{"cmd":"register-agent","name":"db-admin","capabilities":["postgres"]}"#, + ) + .await; + let dba_id = dba["data"]["id"].as_str().unwrap().to_string(); + + // Submit two intake tasks with matching required capabilities. The scheduler + // creates and claims them — we never call claim-task. + let fs_intake = send_command( + &socket_path, + r#"{"cmd":"intake-task","title":"scrub zroot","capabilities":["freebsd"]}"#, + ) + .await; + assert_eq!(fs_intake["data"]["status"].as_str(), Some("queued")); + + send_command( + &socket_path, + r#"{"cmd":"intake-task","title":"vacuum db","capabilities":["postgres"]}"#, + ) + .await; + + // Poll until the scheduler has claimed both (created + assigned by pick_agent). + let mut claimed = serde_json::Value::Null; + for _ in 0..100 { + let resp = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"claimed"}"#).await; + if resp["data"].as_array().map(|a| a.len()).unwrap_or(0) == 2 { + claimed = resp; + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + let tasks = claimed["data"] + .as_array() + .expect("scheduler never claimed both intake tasks by capability"); + assert_eq!(tasks.len(), 2, "expected 2 claimed tasks, got: {claimed}"); + + // Each task landed on the capability-matching agent — chosen by the + // scheduler, not by the test. + let agent_for = |title: &str| -> String { + tasks + .iter() + .find(|t| t["title"].as_str() == Some(title)) + .and_then(|t| t["agent_id"].as_str()) + .unwrap_or("") + .to_string() + }; + assert_eq!(agent_for("scrub zroot"), sa_id, "freebsd task -> sysadmin"); + assert_eq!(agent_for("vacuum db"), dba_id, "postgres task -> db-admin"); + + state.shutdown_tx.send(()).ok(); + let _ = tokio::time::timeout(Duration::from_secs(2), server).await; + let _ = tokio::time::timeout(Duration::from_secs(2), bg_loop).await; + let _ = std::fs::remove_dir_all(&data_dir); +} + async fn wait_for_socket(socket_path: &Path) { for _ in 0..100 { if socket_path.exists() { @@ -158,6 +267,9 @@ async fn send_command(socket_path: &Path, line: &str) -> serde_json::Value { serde_json::from_str(resp.trim_end()).expect("parse JSON") } +/// One agent claiming two tasks: documents current contention behavior (no +/// guard against one agent taking multiple tasks) and session isolation. +/// Manual claim — not a routing test. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn one_agent_handles_two_tasks_isolated_sessions() { let data_dir = @@ -199,7 +311,9 @@ async fn one_agent_handles_two_tasks_isolated_sessions() { assert!(worker["ok"].as_bool().unwrap_or(false)); let worker_id = worker["data"]["id"].as_str().unwrap().to_string(); - // Submit two intake tasks, both requiring freebsd. + // Two plain board tasks. This test documents single-agent contention and + // session isolation via manual claim — capability routing is covered by + // `scheduler_routes_intake_tasks_by_capability`, not here. let t1 = send_command( &socket_path, r#"{"cmd":"create-task","title":"scrub zroot","description":"zpool scrub"}"#, -- 2.45.3 From 9443a0336959402af9136ddaeeeec2d2632085b6 Mon Sep 17 00:00:00 2001 From: Sam & Claude Date: Thu, 25 Jun 2026 16:43:24 +0200 Subject: [PATCH 4/4] style: cargo fmt fix for multi_agent_board.rs Fix two formatting nits (wrapped let, assert! macro) introduced in the capability-routing test. No logic changes. (Sam & Claude) --- crates/colibri-daemon/tests/multi_agent_board.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/colibri-daemon/tests/multi_agent_board.rs b/crates/colibri-daemon/tests/multi_agent_board.rs index 6e7b59b..dee2217 100644 --- a/crates/colibri-daemon/tests/multi_agent_board.rs +++ b/crates/colibri-daemon/tests/multi_agent_board.rs @@ -147,8 +147,7 @@ async fn two_agents_claim_distinct_tasks_on_the_board() { /// would score zero. Array form is required for routing to work. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn scheduler_routes_intake_tasks_by_capability() { - let data_dir = - std::env::temp_dir().join(format!("colibri-route-{}", uuid::Uuid::new_v4())); + let data_dir = std::env::temp_dir().join(format!("colibri-route-{}", uuid::Uuid::new_v4())); std::fs::create_dir_all(&data_dir).expect("create temp data dir"); let mut config = DaemonConfig::from_env(); @@ -340,7 +339,10 @@ async fn one_agent_handles_two_tasks_isolated_sessions() { &format!(r#"{{"cmd":"claim-task","task_id":"{tid}","agent_id":"{worker_id}"}}"#), ) .await; - assert!(claim["ok"].as_bool().unwrap_or(false), "claim {tid}: {claim}"); + assert!( + claim["ok"].as_bool().unwrap_or(false), + "claim {tid}: {claim}" + ); } // Transition both to started, then done. -- 2.45.3