test: intake-task drains to SQLite via run_loop (Sam & Claude)
Regression guard for the scheduler store deadlock fixed upstream ind760536: scheduler.tick held a non-reentrant std::sync::Mutex (state.store) across the match scrutinee, so relocking inside the arm deadlocked the first time any intake/scheduled task fired. This test (independently authored on domedog) submits an intake-task over the real Unix socket, runs run_loop with a fast scheduler tick, and asserts the task is drained into SQLite. It hangs (>8min) against the pre-fix code and passes in 0.09s againstd760536— so it cross-validates that fix and guards the regression. Closes osa-smoke rec #2. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
8199e23890
commit
4f81e49983
1 changed files with 127 additions and 0 deletions
127
crates/colibri-daemon/tests/intake_scheduler_loop.rs
Normal file
127
crates/colibri-daemon/tests/intake_scheduler_loop.rs
Normal file
|
|
@ -0,0 +1,127 @@
|
|||
//! Regression test for the bug the 2026-05-27 osa FreeBSD smoke caught:
|
||||
//! `main.rs` started the socket server but never spawned `daemon::run_loop`, so
|
||||
//! an `intake-task` reported `queued` over the socket yet was never drained into
|
||||
//! the SQLite coordination store. See
|
||||
//! `docs/internal/sessions/2026-05-27-osa-freebsd-daemon-scheduler-smoke.md`.
|
||||
//!
|
||||
//! This proves the full path: socket `intake-task` → `run_loop` scheduler tick →
|
||||
//! persisted SQLite task. If `run_loop` is ever dropped from the wiring again,
|
||||
//! this test fails.
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use colibri_daemon::daemon::DaemonLoopConfig;
|
||||
use colibri_daemon::{daemon, socket, DaemonConfig, DaemonState, SharedState};
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::net::UnixStream;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn intake_task_over_socket_drains_to_sqlite_via_run_loop() {
|
||||
// Isolated temp paths — never touch production /var/db or /var/run.
|
||||
let data_dir =
|
||||
std::env::temp_dir().join(format!("colibri-intake-smoke-{}", uuid::Uuid::new_v4()));
|
||||
std::fs::create_dir_all(&data_dir).expect("create temp data dir");
|
||||
|
||||
let mut config = DaemonConfig::from_env();
|
||||
config.data_dir = data_dir.clone();
|
||||
config.socket_path = data_dir.join("colibri.sock");
|
||||
config.db_path = data_dir.join("colibri.sqlite");
|
||||
let socket_path = config.socket_path.clone();
|
||||
|
||||
let state: SharedState = Arc::new(DaemonState::new(config));
|
||||
|
||||
// Socket server.
|
||||
let server_state = state.clone();
|
||||
let server_shutdown = state.shutdown_rx.resubscribe();
|
||||
let server = tokio::spawn(async move {
|
||||
socket::serve(server_state, server_shutdown).await;
|
||||
});
|
||||
|
||||
// Background loop with a fast scheduler tick so the test stays quick.
|
||||
let loop_state = state.clone();
|
||||
let loop_shutdown = state.shutdown_rx.resubscribe();
|
||||
let loop_config = DaemonLoopConfig {
|
||||
scheduler_interval: Duration::from_millis(50),
|
||||
..DaemonLoopConfig::default()
|
||||
};
|
||||
let bg_loop = tokio::spawn(async move {
|
||||
daemon::run_loop(loop_state, loop_config, loop_shutdown).await;
|
||||
});
|
||||
|
||||
wait_for_socket(&socket_path).await;
|
||||
|
||||
// Submit an intake task over the socket; it should be accepted + queued.
|
||||
const TITLE: &str = "intake-smoke-task";
|
||||
let response = send_command(
|
||||
&socket_path,
|
||||
&format!(
|
||||
r#"{{"cmd":"intake-task","title":"{TITLE}","description":"via socket","capabilities":[]}}"#
|
||||
),
|
||||
)
|
||||
.await;
|
||||
assert!(
|
||||
response["ok"].as_bool().unwrap_or(false),
|
||||
"intake-task rejected: {response}"
|
||||
);
|
||||
assert_eq!(response["data"]["status"], "queued");
|
||||
|
||||
// The scheduler tick (≤50ms) must drain it into a persisted SQLite task.
|
||||
let task = poll_store_for_task(&state, TITLE).await;
|
||||
assert_eq!(
|
||||
task["status"], "queued",
|
||||
"intake task should land queued (no agents to claim it): {task}"
|
||||
);
|
||||
assert_eq!(task["title"], TITLE);
|
||||
|
||||
// Clean shutdown.
|
||||
state.shutdown_tx.send(()).ok();
|
||||
let _ = tokio::time::timeout(Duration::from_secs(2), server).await;
|
||||
let _ = tokio::time::timeout(Duration::from_secs(2), bg_loop).await;
|
||||
let _ = std::fs::remove_dir_all(&data_dir);
|
||||
}
|
||||
|
||||
async fn wait_for_socket(socket_path: &Path) {
|
||||
for _ in 0..100 {
|
||||
if socket_path.exists() {
|
||||
return;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(20)).await;
|
||||
}
|
||||
panic!("daemon socket never appeared at {}", socket_path.display());
|
||||
}
|
||||
|
||||
async fn send_command(socket_path: &Path, line: &str) -> serde_json::Value {
|
||||
let stream = UnixStream::connect(socket_path)
|
||||
.await
|
||||
.expect("connect to daemon socket");
|
||||
let (reader, mut writer) = stream.into_split();
|
||||
writer
|
||||
.write_all(format!("{line}\n").as_bytes())
|
||||
.await
|
||||
.expect("write command");
|
||||
let mut reader = BufReader::new(reader);
|
||||
let mut resp = String::new();
|
||||
tokio::time::timeout(Duration::from_secs(2), reader.read_line(&mut resp))
|
||||
.await
|
||||
.expect("response timed out")
|
||||
.expect("read response");
|
||||
serde_json::from_str(resp.trim_end()).expect("parse response JSON")
|
||||
}
|
||||
|
||||
async fn poll_store_for_task(state: &SharedState, title: &str) -> serde_json::Value {
|
||||
for _ in 0..100 {
|
||||
let tasks = state
|
||||
.store
|
||||
.lock()
|
||||
.unwrap()
|
||||
.list_tasks(None)
|
||||
.expect("list_tasks");
|
||||
if let Some(task) = tasks.iter().find(|t| t.title == title) {
|
||||
return serde_json::to_value(task).expect("serialize task");
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(20)).await;
|
||||
}
|
||||
panic!("intake task '{title}' never drained to the SQLite store");
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue