fix: avoid scheduler store deadlock on intake drain

This commit is contained in:
Sam & Claude 2026-05-27 21:02:01 +02:00
parent af8c0110d7
commit d760536fe1

View file

@ -165,26 +165,27 @@ impl Scheduler {
}
}
// Create the task
// Create the task. Keep each store lock in a short explicit scope:
// a `match state.store.lock().create_task(...)` temporary can live
// for the whole match arm, and relocking inside the arm deadlocks
// the scheduler on FreeBSD.
debug!(job_id = %job.id, title = %job.task_title, "scheduler: firing job");
match state
.store
.lock()
.unwrap()
.create_task(&job.task_title, job.task_description.as_deref())
{
let create_result = {
let store = state.store.lock().unwrap();
store.create_task(&job.task_title, job.task_description.as_deref())
};
match create_result {
Ok(task) => {
// Try to auto-assign
if let Some(agent) = pick_agent(
&job.required_capabilities,
&state
.store
.lock()
.unwrap()
.list_agents()
.unwrap_or_default(),
) {
let _ = state.store.lock().unwrap().claim_task(&task.id, &agent.id);
let agents = {
let store = state.store.lock().unwrap();
store.list_agents().unwrap_or_default()
};
if let Some(agent) = pick_agent(&job.required_capabilities, &agents) {
let _ = {
let store = state.store.lock().unwrap();
store.claim_task(&task.id, &agent.id)
};
info!(
job_id = %job.id,
task_id = %task.id,
@ -205,23 +206,21 @@ impl Scheduler {
// ── Process intake queue ──────────────────────────────────
let intake: Vec<TaskRequest> = self.intake_queue.drain(..).collect();
for req in intake {
match state
.store
.lock()
.unwrap()
.create_task(&req.title, req.description.as_deref())
{
let create_result = {
let store = state.store.lock().unwrap();
store.create_task(&req.title, req.description.as_deref())
};
match create_result {
Ok(task) => {
if let Some(agent) = pick_agent(
&req.required_capabilities,
&state
.store
.lock()
.unwrap()
.list_agents()
.unwrap_or_default(),
) {
let _ = state.store.lock().unwrap().claim_task(&task.id, &agent.id);
let agents = {
let store = state.store.lock().unwrap();
store.list_agents().unwrap_or_default()
};
if let Some(agent) = pick_agent(&req.required_capabilities, &agents) {
let _ = {
let store = state.store.lock().unwrap();
store.claim_task(&task.id, &agent.id)
};
info!(
task_id = %task.id,
agent_id = %agent.id,
@ -334,6 +333,7 @@ fn same_utc_minute(a: DateTime<Utc>, b: DateTime<Utc>) -> bool {
#[cfg(test)]
mod tests {
use super::*;
use std::{sync::Arc, time::Duration};
fn test_now() -> DateTime<Utc> {
DateTime::parse_from_rfc3339("2026-06-01T12:00:00Z")
@ -341,6 +341,27 @@ mod tests {
.with_timezone(&Utc)
}
fn test_config() -> crate::config::DaemonConfig {
let data_dir =
std::env::temp_dir().join(format!("colibri-scheduler-test-{}", uuid::Uuid::new_v4()));
crate::config::DaemonConfig {
socket_path: data_dir.join("colibri.sock"),
data_dir: data_dir.clone(),
db_path: data_dir.join("colibri.sqlite"),
session_max_bytes: 2_000_000,
deepseek_api_key: None,
deepseek_endpoint: "https://api.deepseek.com/chat/completions".to_string(),
deepseek_model: "deepseek-chat".to_string(),
openrouter_api_key: None,
openrouter_endpoint: "https://openrouter.ai/api/v1/chat/completions".to_string(),
anthropic_api_key: None,
anthropic_endpoint: "https://api.anthropic.com/v1/messages".to_string(),
host: "scheduler-test-host".to_string(),
max_context_tokens: 128_000,
max_uncompacted_turns: 20,
}
}
#[test]
fn test_interval_fires_first_time() {
let sched = Schedule::Interval { every_secs: 300 };
@ -503,4 +524,27 @@ mod tests {
}];
assert_eq!(pick_agent(&[], &agents).unwrap().name, "generalist");
}
#[tokio::test]
async fn test_scheduler_tick_drains_intake_without_deadlock() {
let config = test_config();
let data_dir = config.data_dir.clone();
let state: SharedState = Arc::new(crate::daemon::DaemonState::new(config));
let mut scheduler = Scheduler::new();
scheduler.submit(TaskRequest {
title: "intake smoke".to_string(),
description: Some("must become a task".to_string()),
required_capabilities: Vec::new(),
});
tokio::time::timeout(Duration::from_secs(2), scheduler.tick(&state))
.await
.expect("scheduler tick deadlocked");
let tasks = state.store.lock().unwrap().list_tasks(None).unwrap();
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0].title, "intake smoke");
let _ = std::fs::remove_dir_all(data_dir);
}
}