test(daemon): multi-agent board — register, claim, lifecycle #185
1 changed files with 159 additions and 0 deletions
159
crates/colibri-daemon/tests/multi_agent_board.rs
Normal file
159
crates/colibri-daemon/tests/multi_agent_board.rs
Normal file
|
|
@ -0,0 +1,159 @@
|
|||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use colibri_daemon::daemon::DaemonLoopConfig;
|
||||
use colibri_daemon::{daemon, socket, DaemonConfig, DaemonState, SharedState};
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::net::UnixStream;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn two_agents_claim_distinct_tasks_on_the_board() {
|
||||
let data_dir =
|
||||
std::env::temp_dir().join(format!("colibri-multi-agent-{}", uuid::Uuid::new_v4()));
|
||||
std::fs::create_dir_all(&data_dir).expect("create temp data dir");
|
||||
|
||||
let mut config = DaemonConfig::from_env();
|
||||
config.data_dir = data_dir.clone();
|
||||
config.socket_path = data_dir.join("colibri.sock");
|
||||
config.db_path = data_dir.join("colibri.sqlite");
|
||||
let socket_path = config.socket_path.clone();
|
||||
|
||||
let state: SharedState = Arc::new(DaemonState::new(config));
|
||||
|
||||
let server_state = state.clone();
|
||||
let server_shutdown = state.shutdown_rx.resubscribe();
|
||||
let server = tokio::spawn(async move {
|
||||
let _ = socket::serve(server_state, server_shutdown).await;
|
||||
});
|
||||
|
||||
let loop_state = state.clone();
|
||||
let loop_shutdown = state.shutdown_rx.resubscribe();
|
||||
let loop_config = DaemonLoopConfig {
|
||||
scheduler_interval: Duration::from_millis(50),
|
||||
..DaemonLoopConfig::default()
|
||||
};
|
||||
let bg_loop = tokio::spawn(async move {
|
||||
daemon::run_loop(loop_state, loop_config, loop_shutdown).await;
|
||||
});
|
||||
|
||||
wait_for_socket(&socket_path).await;
|
||||
|
||||
// Register two agents.
|
||||
let sysadmin = send_command(
|
||||
&socket_path,
|
||||
r#"{"cmd":"register-agent","name":"sysadmin","capabilities":{"freebsd":true}}"#,
|
||||
)
|
||||
.await;
|
||||
assert!(sysadmin["ok"].as_bool().unwrap_or(false));
|
||||
let sa_id = sysadmin["data"]["id"].as_str().unwrap().to_string();
|
||||
|
||||
let dba = send_command(
|
||||
&socket_path,
|
||||
r#"{"cmd":"register-agent","name":"db-admin","capabilities":{"postgres":true}}"#,
|
||||
)
|
||||
.await;
|
||||
assert!(dba["ok"].as_bool().unwrap_or(false));
|
||||
let dba_id = dba["data"]["id"].as_str().unwrap().to_string();
|
||||
|
||||
// Both agents listed.
|
||||
let agents = send_command(&socket_path, r#"{"cmd":"list-agents"}"#).await;
|
||||
assert_eq!(agents["data"].as_array().unwrap().len(), 2);
|
||||
|
||||
// Create two tasks.
|
||||
let zfs = send_command(
|
||||
&socket_path,
|
||||
r#"{"cmd":"create-task","title":"scrub zroot","description":"zpool scrub"}"#,
|
||||
)
|
||||
.await;
|
||||
let zfs_id = zfs["data"]["id"].as_str().unwrap();
|
||||
|
||||
let vacuum = send_command(
|
||||
&socket_path,
|
||||
r#"{"cmd":"create-task","title":"vacuum db","description":"VACUUM ANALYZE"}"#,
|
||||
)
|
||||
.await;
|
||||
let vacuum_id = vacuum["data"]["id"].as_str().unwrap();
|
||||
|
||||
// Both queued.
|
||||
let q = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"queued"}"#).await;
|
||||
assert_eq!(q["data"].as_array().unwrap().len(), 2);
|
||||
|
||||
// Each agent claims one task.
|
||||
let c1 = send_command(
|
||||
&socket_path,
|
||||
&format!(r#"{{"cmd":"claim-task","task_id":"{zfs_id}","agent_id":"{sa_id}"}}"#),
|
||||
)
|
||||
.await;
|
||||
assert!(c1["ok"].as_bool().unwrap_or(false), "claim zfs: {c1}");
|
||||
|
||||
let c2 = send_command(
|
||||
&socket_path,
|
||||
&format!(r#"{{"cmd":"claim-task","task_id":"{vacuum_id}","agent_id":"{dba_id}"}}"#),
|
||||
)
|
||||
.await;
|
||||
assert!(c2["ok"].as_bool().unwrap_or(false), "claim vacuum: {c2}");
|
||||
|
||||
// Transition both to started, then done.
|
||||
send_command(
|
||||
&socket_path,
|
||||
&format!(r#"{{"cmd":"transition-task","task_id":"{zfs_id}","status":"started"}}"#),
|
||||
)
|
||||
.await;
|
||||
send_command(
|
||||
&socket_path,
|
||||
&format!(r#"{{"cmd":"transition-task","task_id":"{vacuum_id}","status":"started"}}"#),
|
||||
)
|
||||
.await;
|
||||
|
||||
let active = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"started"}"#).await;
|
||||
assert_eq!(active["data"].as_array().unwrap().len(), 2);
|
||||
|
||||
send_command(
|
||||
&socket_path,
|
||||
&format!(r#"{{"cmd":"transition-task","task_id":"{zfs_id}","status":"done"}}"#),
|
||||
)
|
||||
.await;
|
||||
send_command(
|
||||
&socket_path,
|
||||
&format!(r#"{{"cmd":"transition-task","task_id":"{vacuum_id}","status":"done"}}"#),
|
||||
)
|
||||
.await;
|
||||
|
||||
let done = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"done"}"#).await;
|
||||
assert_eq!(done["data"].as_array().unwrap().len(), 2);
|
||||
|
||||
let empty = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"queued"}"#).await;
|
||||
assert!(empty["data"].as_array().unwrap().is_empty());
|
||||
|
||||
state.shutdown_tx.send(()).ok();
|
||||
let _ = tokio::time::timeout(Duration::from_secs(2), server).await;
|
||||
let _ = tokio::time::timeout(Duration::from_secs(2), bg_loop).await;
|
||||
let _ = std::fs::remove_dir_all(&data_dir);
|
||||
}
|
||||
|
||||
async fn wait_for_socket(socket_path: &Path) {
|
||||
for _ in 0..100 {
|
||||
if socket_path.exists() {
|
||||
return;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(20)).await;
|
||||
}
|
||||
panic!("socket never appeared");
|
||||
}
|
||||
|
||||
async fn send_command(socket_path: &Path, line: &str) -> serde_json::Value {
|
||||
let stream = UnixStream::connect(socket_path).await.expect("connect");
|
||||
let (reader, mut writer) = stream.into_split();
|
||||
writer
|
||||
.write_all(format!("{line}\n").as_bytes())
|
||||
.await
|
||||
.expect("write");
|
||||
let mut reader = BufReader::new(reader);
|
||||
let mut resp = String::new();
|
||||
tokio::time::timeout(Duration::from_secs(2), reader.read_line(&mut resp))
|
||||
.await
|
||||
.expect("timeout")
|
||||
.expect("read");
|
||||
serde_json::from_str(resp.trim_end()).expect("parse JSON")
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue