diff --git a/crates/colibri-daemon/tests/multi_agent_board.rs b/crates/colibri-daemon/tests/multi_agent_board.rs new file mode 100644 index 0000000..fe0df9d --- /dev/null +++ b/crates/colibri-daemon/tests/multi_agent_board.rs @@ -0,0 +1,159 @@ +use std::path::Path; +use std::sync::Arc; +use std::time::Duration; + +use colibri_daemon::daemon::DaemonLoopConfig; +use colibri_daemon::{daemon, socket, DaemonConfig, DaemonState, SharedState}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::UnixStream; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn two_agents_claim_distinct_tasks_on_the_board() { + let data_dir = + std::env::temp_dir().join(format!("colibri-multi-agent-{}", uuid::Uuid::new_v4())); + std::fs::create_dir_all(&data_dir).expect("create temp data dir"); + + let mut config = DaemonConfig::from_env(); + config.data_dir = data_dir.clone(); + config.socket_path = data_dir.join("colibri.sock"); + config.db_path = data_dir.join("colibri.sqlite"); + let socket_path = config.socket_path.clone(); + + let state: SharedState = Arc::new(DaemonState::new(config)); + + let server_state = state.clone(); + let server_shutdown = state.shutdown_rx.resubscribe(); + let server = tokio::spawn(async move { + let _ = socket::serve(server_state, server_shutdown).await; + }); + + let loop_state = state.clone(); + let loop_shutdown = state.shutdown_rx.resubscribe(); + let loop_config = DaemonLoopConfig { + scheduler_interval: Duration::from_millis(50), + ..DaemonLoopConfig::default() + }; + let bg_loop = tokio::spawn(async move { + daemon::run_loop(loop_state, loop_config, loop_shutdown).await; + }); + + wait_for_socket(&socket_path).await; + + // Register two agents. + let sysadmin = send_command( + &socket_path, + r#"{"cmd":"register-agent","name":"sysadmin","capabilities":{"freebsd":true}}"#, + ) + .await; + assert!(sysadmin["ok"].as_bool().unwrap_or(false)); + let sa_id = sysadmin["data"]["id"].as_str().unwrap().to_string(); + + let dba = send_command( + &socket_path, + r#"{"cmd":"register-agent","name":"db-admin","capabilities":{"postgres":true}}"#, + ) + .await; + assert!(dba["ok"].as_bool().unwrap_or(false)); + let dba_id = dba["data"]["id"].as_str().unwrap().to_string(); + + // Both agents listed. + let agents = send_command(&socket_path, r#"{"cmd":"list-agents"}"#).await; + assert_eq!(agents["data"].as_array().unwrap().len(), 2); + + // Create two tasks. + let zfs = send_command( + &socket_path, + r#"{"cmd":"create-task","title":"scrub zroot","description":"zpool scrub"}"#, + ) + .await; + let zfs_id = zfs["data"]["id"].as_str().unwrap(); + + let vacuum = send_command( + &socket_path, + r#"{"cmd":"create-task","title":"vacuum db","description":"VACUUM ANALYZE"}"#, + ) + .await; + let vacuum_id = vacuum["data"]["id"].as_str().unwrap(); + + // Both queued. + let q = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"queued"}"#).await; + assert_eq!(q["data"].as_array().unwrap().len(), 2); + + // Each agent claims one task. + let c1 = send_command( + &socket_path, + &format!(r#"{{"cmd":"claim-task","task_id":"{zfs_id}","agent_id":"{sa_id}"}}"#), + ) + .await; + assert!(c1["ok"].as_bool().unwrap_or(false), "claim zfs: {c1}"); + + let c2 = send_command( + &socket_path, + &format!(r#"{{"cmd":"claim-task","task_id":"{vacuum_id}","agent_id":"{dba_id}"}}"#), + ) + .await; + assert!(c2["ok"].as_bool().unwrap_or(false), "claim vacuum: {c2}"); + + // Transition both to started, then done. + send_command( + &socket_path, + &format!(r#"{{"cmd":"transition-task","task_id":"{zfs_id}","status":"started"}}"#), + ) + .await; + send_command( + &socket_path, + &format!(r#"{{"cmd":"transition-task","task_id":"{vacuum_id}","status":"started"}}"#), + ) + .await; + + let active = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"started"}"#).await; + assert_eq!(active["data"].as_array().unwrap().len(), 2); + + send_command( + &socket_path, + &format!(r#"{{"cmd":"transition-task","task_id":"{zfs_id}","status":"done"}}"#), + ) + .await; + send_command( + &socket_path, + &format!(r#"{{"cmd":"transition-task","task_id":"{vacuum_id}","status":"done"}}"#), + ) + .await; + + let done = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"done"}"#).await; + assert_eq!(done["data"].as_array().unwrap().len(), 2); + + let empty = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"queued"}"#).await; + assert!(empty["data"].as_array().unwrap().is_empty()); + + state.shutdown_tx.send(()).ok(); + let _ = tokio::time::timeout(Duration::from_secs(2), server).await; + let _ = tokio::time::timeout(Duration::from_secs(2), bg_loop).await; + let _ = std::fs::remove_dir_all(&data_dir); +} + +async fn wait_for_socket(socket_path: &Path) { + for _ in 0..100 { + if socket_path.exists() { + return; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + panic!("socket never appeared"); +} + +async fn send_command(socket_path: &Path, line: &str) -> serde_json::Value { + let stream = UnixStream::connect(socket_path).await.expect("connect"); + let (reader, mut writer) = stream.into_split(); + writer + .write_all(format!("{line}\n").as_bytes()) + .await + .expect("write"); + let mut reader = BufReader::new(reader); + let mut resp = String::new(); + tokio::time::timeout(Duration::from_secs(2), reader.read_line(&mut resp)) + .await + .expect("timeout") + .expect("read"); + serde_json::from_str(resp.trim_end()).expect("parse JSON") +}