test(daemon): multi-agent board — register, claim, lifecycle
Some checks failed
CI / rust (pull_request) Has been cancelled
CI / markdown (pull_request) Has been cancelled
CI / agent-jail-pkgs (pull_request) Has been cancelled
CI / port (pull_request) Has been cancelled

Proves the coordination board handles two agents concurrently:
- Register agents with distinct capability sets
- Create tasks visible to both agents
- Each agent claims only their assigned task
- Tasks flow through the full lifecycle: queued→claimed→started→done
- Final state: both done, zero queued

Uses isolated daemon with temp paths — zero production impact.
This commit is contained in:
Sam & Claude 2026-06-25 15:55:33 +02:00
parent 263b8f0b4d
commit 343eae87ce

View file

@ -0,0 +1,159 @@
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use colibri_daemon::daemon::DaemonLoopConfig;
use colibri_daemon::{daemon, socket, DaemonConfig, DaemonState, SharedState};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn two_agents_claim_distinct_tasks_on_the_board() {
let data_dir =
std::env::temp_dir().join(format!("colibri-multi-agent-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&data_dir).expect("create temp data dir");
let mut config = DaemonConfig::from_env();
config.data_dir = data_dir.clone();
config.socket_path = data_dir.join("colibri.sock");
config.db_path = data_dir.join("colibri.sqlite");
let socket_path = config.socket_path.clone();
let state: SharedState = Arc::new(DaemonState::new(config));
let server_state = state.clone();
let server_shutdown = state.shutdown_rx.resubscribe();
let server = tokio::spawn(async move {
let _ = socket::serve(server_state, server_shutdown).await;
});
let loop_state = state.clone();
let loop_shutdown = state.shutdown_rx.resubscribe();
let loop_config = DaemonLoopConfig {
scheduler_interval: Duration::from_millis(50),
..DaemonLoopConfig::default()
};
let bg_loop = tokio::spawn(async move {
daemon::run_loop(loop_state, loop_config, loop_shutdown).await;
});
wait_for_socket(&socket_path).await;
// Register two agents.
let sysadmin = send_command(
&socket_path,
r#"{"cmd":"register-agent","name":"sysadmin","capabilities":{"freebsd":true}}"#,
)
.await;
assert!(sysadmin["ok"].as_bool().unwrap_or(false));
let sa_id = sysadmin["data"]["id"].as_str().unwrap().to_string();
let dba = send_command(
&socket_path,
r#"{"cmd":"register-agent","name":"db-admin","capabilities":{"postgres":true}}"#,
)
.await;
assert!(dba["ok"].as_bool().unwrap_or(false));
let dba_id = dba["data"]["id"].as_str().unwrap().to_string();
// Both agents listed.
let agents = send_command(&socket_path, r#"{"cmd":"list-agents"}"#).await;
assert_eq!(agents["data"].as_array().unwrap().len(), 2);
// Create two tasks.
let zfs = send_command(
&socket_path,
r#"{"cmd":"create-task","title":"scrub zroot","description":"zpool scrub"}"#,
)
.await;
let zfs_id = zfs["data"]["id"].as_str().unwrap();
let vacuum = send_command(
&socket_path,
r#"{"cmd":"create-task","title":"vacuum db","description":"VACUUM ANALYZE"}"#,
)
.await;
let vacuum_id = vacuum["data"]["id"].as_str().unwrap();
// Both queued.
let q = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"queued"}"#).await;
assert_eq!(q["data"].as_array().unwrap().len(), 2);
// Each agent claims one task.
let c1 = send_command(
&socket_path,
&format!(r#"{{"cmd":"claim-task","task_id":"{zfs_id}","agent_id":"{sa_id}"}}"#),
)
.await;
assert!(c1["ok"].as_bool().unwrap_or(false), "claim zfs: {c1}");
let c2 = send_command(
&socket_path,
&format!(r#"{{"cmd":"claim-task","task_id":"{vacuum_id}","agent_id":"{dba_id}"}}"#),
)
.await;
assert!(c2["ok"].as_bool().unwrap_or(false), "claim vacuum: {c2}");
// Transition both to started, then done.
send_command(
&socket_path,
&format!(r#"{{"cmd":"transition-task","task_id":"{zfs_id}","status":"started"}}"#),
)
.await;
send_command(
&socket_path,
&format!(r#"{{"cmd":"transition-task","task_id":"{vacuum_id}","status":"started"}}"#),
)
.await;
let active = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"started"}"#).await;
assert_eq!(active["data"].as_array().unwrap().len(), 2);
send_command(
&socket_path,
&format!(r#"{{"cmd":"transition-task","task_id":"{zfs_id}","status":"done"}}"#),
)
.await;
send_command(
&socket_path,
&format!(r#"{{"cmd":"transition-task","task_id":"{vacuum_id}","status":"done"}}"#),
)
.await;
let done = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"done"}"#).await;
assert_eq!(done["data"].as_array().unwrap().len(), 2);
let empty = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"queued"}"#).await;
assert!(empty["data"].as_array().unwrap().is_empty());
state.shutdown_tx.send(()).ok();
let _ = tokio::time::timeout(Duration::from_secs(2), server).await;
let _ = tokio::time::timeout(Duration::from_secs(2), bg_loop).await;
let _ = std::fs::remove_dir_all(&data_dir);
}
async fn wait_for_socket(socket_path: &Path) {
for _ in 0..100 {
if socket_path.exists() {
return;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
panic!("socket never appeared");
}
async fn send_command(socket_path: &Path, line: &str) -> serde_json::Value {
let stream = UnixStream::connect(socket_path).await.expect("connect");
let (reader, mut writer) = stream.into_split();
writer
.write_all(format!("{line}\n").as_bytes())
.await
.expect("write");
let mut reader = BufReader::new(reader);
let mut resp = String::new();
tokio::time::timeout(Duration::from_secs(2), reader.read_line(&mut resp))
.await
.expect("timeout")
.expect("read");
serde_json::from_str(resp.trim_end()).expect("parse JSON")
}