diff --git a/crates/colibri-daemon/tests/intake_scheduler_loop.rs b/crates/colibri-daemon/tests/intake_scheduler_loop.rs new file mode 100644 index 0000000..983d3fa --- /dev/null +++ b/crates/colibri-daemon/tests/intake_scheduler_loop.rs @@ -0,0 +1,127 @@ +//! Regression test for the bug the 2026-05-27 osa FreeBSD smoke caught: +//! `main.rs` started the socket server but never spawned `daemon::run_loop`, so +//! an `intake-task` reported `queued` over the socket yet was never drained into +//! the SQLite coordination store. See +//! `docs/internal/sessions/2026-05-27-osa-freebsd-daemon-scheduler-smoke.md`. +//! +//! This proves the full path: socket `intake-task` → `run_loop` scheduler tick → +//! persisted SQLite task. If `run_loop` is ever dropped from the wiring again, +//! this test fails. + +use std::path::Path; +use std::sync::Arc; +use std::time::Duration; + +use colibri_daemon::daemon::DaemonLoopConfig; +use colibri_daemon::{daemon, socket, DaemonConfig, DaemonState, SharedState}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::UnixStream; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn intake_task_over_socket_drains_to_sqlite_via_run_loop() { + // Isolated temp paths — never touch production /var/db or /var/run. + let data_dir = + std::env::temp_dir().join(format!("colibri-intake-smoke-{}", uuid::Uuid::new_v4())); + std::fs::create_dir_all(&data_dir).expect("create temp data dir"); + + let mut config = DaemonConfig::from_env(); + config.data_dir = data_dir.clone(); + config.socket_path = data_dir.join("colibri.sock"); + config.db_path = data_dir.join("colibri.sqlite"); + let socket_path = config.socket_path.clone(); + + let state: SharedState = Arc::new(DaemonState::new(config)); + + // Socket server. + let server_state = state.clone(); + let server_shutdown = state.shutdown_rx.resubscribe(); + let server = tokio::spawn(async move { + socket::serve(server_state, server_shutdown).await; + }); + + // Background loop with a fast scheduler tick so the test stays quick. + let loop_state = state.clone(); + let loop_shutdown = state.shutdown_rx.resubscribe(); + let loop_config = DaemonLoopConfig { + scheduler_interval: Duration::from_millis(50), + ..DaemonLoopConfig::default() + }; + let bg_loop = tokio::spawn(async move { + daemon::run_loop(loop_state, loop_config, loop_shutdown).await; + }); + + wait_for_socket(&socket_path).await; + + // Submit an intake task over the socket; it should be accepted + queued. + const TITLE: &str = "intake-smoke-task"; + let response = send_command( + &socket_path, + &format!( + r#"{{"cmd":"intake-task","title":"{TITLE}","description":"via socket","capabilities":[]}}"# + ), + ) + .await; + assert!( + response["ok"].as_bool().unwrap_or(false), + "intake-task rejected: {response}" + ); + assert_eq!(response["data"]["status"], "queued"); + + // The scheduler tick (≤50ms) must drain it into a persisted SQLite task. + let task = poll_store_for_task(&state, TITLE).await; + assert_eq!( + task["status"], "queued", + "intake task should land queued (no agents to claim it): {task}" + ); + assert_eq!(task["title"], TITLE); + + // Clean shutdown. + state.shutdown_tx.send(()).ok(); + let _ = tokio::time::timeout(Duration::from_secs(2), server).await; + let _ = tokio::time::timeout(Duration::from_secs(2), bg_loop).await; + let _ = std::fs::remove_dir_all(&data_dir); +} + +async fn wait_for_socket(socket_path: &Path) { + for _ in 0..100 { + if socket_path.exists() { + return; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + panic!("daemon socket never appeared at {}", socket_path.display()); +} + +async fn send_command(socket_path: &Path, line: &str) -> serde_json::Value { + let stream = UnixStream::connect(socket_path) + .await + .expect("connect to daemon socket"); + let (reader, mut writer) = stream.into_split(); + writer + .write_all(format!("{line}\n").as_bytes()) + .await + .expect("write command"); + let mut reader = BufReader::new(reader); + let mut resp = String::new(); + tokio::time::timeout(Duration::from_secs(2), reader.read_line(&mut resp)) + .await + .expect("response timed out") + .expect("read response"); + serde_json::from_str(resp.trim_end()).expect("parse response JSON") +} + +async fn poll_store_for_task(state: &SharedState, title: &str) -> serde_json::Value { + for _ in 0..100 { + let tasks = state + .store + .lock() + .unwrap() + .list_tasks(None) + .expect("list_tasks"); + if let Some(task) = tasks.iter().find(|t| t.title == title) { + return serde_json::to_value(task).expect("serialize task"); + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + panic!("intake task '{title}' never drained to the SQLite store"); +}