colibri/crates/colibri-daemon/tests/intake_scheduler_loop.rs
Sam & Claude 4517e13935
Some checks failed
CI / rust (pull_request) Has been cancelled
CI / markdown (pull_request) Has been cancelled
fix(daemon): fail closed when socket ownership is unsafe (Sam & Codex)
Return an error from the socket server when another daemon owns the Unix socket or bind setup fails, and broadcast shutdown so the daemon does not stay alive without a control socket. Also format the PR docs.\n\nChecks: cargo fmt --check; ./scripts/check-format.sh; git diff --check; cargo test -p colibri-daemon clear_stale_socket -- --nocapture; cargo test -p colibri-daemon --test sigterm_shutdown -- --nocapture.
2026-06-15 09:08:56 +02:00

126 lines
4.6 KiB
Rust

//! Regression test for the bug a 2026-05-27 osa FreeBSD check caught:
//! `main.rs` started the socket server but never spawned `daemon::run_loop`, so
//! an `intake-task` reported `queued` over the socket yet was never drained into
//! the SQLite coordination store.
//!
//! This proves the full path: socket `intake-task` → `run_loop` scheduler tick →
//! persisted SQLite task. If `run_loop` is ever dropped from the wiring again,
//! this test fails.
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use colibri_daemon::daemon::DaemonLoopConfig;
use colibri_daemon::{daemon, socket, DaemonConfig, DaemonState, SharedState};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn intake_task_over_socket_drains_to_sqlite_via_run_loop() {
// Isolated temp paths — never touch production /var/db or /var/run.
let data_dir =
std::env::temp_dir().join(format!("colibri-intake-check-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&data_dir).expect("create temp data dir");
let mut config = DaemonConfig::from_env();
config.data_dir = data_dir.clone();
config.socket_path = data_dir.join("colibri.sock");
config.db_path = data_dir.join("colibri.sqlite");
let socket_path = config.socket_path.clone();
let state: SharedState = Arc::new(DaemonState::new(config));
// Socket server.
let server_state = state.clone();
let server_shutdown = state.shutdown_rx.resubscribe();
let server = tokio::spawn(async move {
let _ = socket::serve(server_state, server_shutdown).await;
});
// Background loop with a fast scheduler tick so the test stays quick.
let loop_state = state.clone();
let loop_shutdown = state.shutdown_rx.resubscribe();
let loop_config = DaemonLoopConfig {
scheduler_interval: Duration::from_millis(50),
..DaemonLoopConfig::default()
};
let bg_loop = tokio::spawn(async move {
daemon::run_loop(loop_state, loop_config, loop_shutdown).await;
});
wait_for_socket(&socket_path).await;
// Submit an intake task over the socket; it should be accepted + queued.
const TITLE: &str = "intake-check-task";
let response = send_command(
&socket_path,
&format!(
r#"{{"cmd":"intake-task","title":"{TITLE}","description":"via socket","capabilities":[]}}"#
),
)
.await;
assert!(
response["ok"].as_bool().unwrap_or(false),
"intake-task rejected: {response}"
);
assert_eq!(response["data"]["status"], "queued");
// The scheduler tick (≤50ms) must drain it into a persisted SQLite task.
let task = poll_store_for_task(&state, TITLE).await;
assert_eq!(
task["status"], "queued",
"intake task should land queued (no agents to claim it): {task}"
);
assert_eq!(task["title"], TITLE);
// Clean shutdown.
state.shutdown_tx.send(()).ok();
let _ = tokio::time::timeout(Duration::from_secs(2), server).await;
let _ = tokio::time::timeout(Duration::from_secs(2), bg_loop).await;
let _ = std::fs::remove_dir_all(&data_dir);
}
async fn wait_for_socket(socket_path: &Path) {
for _ in 0..100 {
if socket_path.exists() {
return;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
panic!("daemon socket never appeared at {}", socket_path.display());
}
async fn send_command(socket_path: &Path, line: &str) -> serde_json::Value {
let stream = UnixStream::connect(socket_path)
.await
.expect("connect to daemon socket");
let (reader, mut writer) = stream.into_split();
writer
.write_all(format!("{line}\n").as_bytes())
.await
.expect("write command");
let mut reader = BufReader::new(reader);
let mut resp = String::new();
tokio::time::timeout(Duration::from_secs(2), reader.read_line(&mut resp))
.await
.expect("response timed out")
.expect("read response");
serde_json::from_str(resp.trim_end()).expect("parse response JSON")
}
async fn poll_store_for_task(state: &SharedState, title: &str) -> serde_json::Value {
for _ in 0..100 {
let tasks = state
.store
.lock()
.unwrap()
.list_tasks(None)
.expect("list_tasks");
if let Some(task) = tasks.iter().find(|t| t.title == title) {
return serde_json::to_value(task).expect("serialize task");
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
panic!("intake task '{title}' never drained to the SQLite store");
}