fix: avoid scheduler store deadlock on intake drain
This commit is contained in:
parent
af8c0110d7
commit
d760536fe1
1 changed files with 77 additions and 33 deletions
|
|
@ -165,26 +165,27 @@ impl Scheduler {
|
|||
}
|
||||
}
|
||||
|
||||
// Create the task
|
||||
// Create the task. Keep each store lock in a short explicit scope:
|
||||
// a `match state.store.lock().create_task(...)` temporary can live
|
||||
// for the whole match arm, and relocking inside the arm deadlocks
|
||||
// the scheduler on FreeBSD.
|
||||
debug!(job_id = %job.id, title = %job.task_title, "scheduler: firing job");
|
||||
match state
|
||||
.store
|
||||
.lock()
|
||||
.unwrap()
|
||||
.create_task(&job.task_title, job.task_description.as_deref())
|
||||
{
|
||||
let create_result = {
|
||||
let store = state.store.lock().unwrap();
|
||||
store.create_task(&job.task_title, job.task_description.as_deref())
|
||||
};
|
||||
match create_result {
|
||||
Ok(task) => {
|
||||
// Try to auto-assign
|
||||
if let Some(agent) = pick_agent(
|
||||
&job.required_capabilities,
|
||||
&state
|
||||
.store
|
||||
.lock()
|
||||
.unwrap()
|
||||
.list_agents()
|
||||
.unwrap_or_default(),
|
||||
) {
|
||||
let _ = state.store.lock().unwrap().claim_task(&task.id, &agent.id);
|
||||
let agents = {
|
||||
let store = state.store.lock().unwrap();
|
||||
store.list_agents().unwrap_or_default()
|
||||
};
|
||||
if let Some(agent) = pick_agent(&job.required_capabilities, &agents) {
|
||||
let _ = {
|
||||
let store = state.store.lock().unwrap();
|
||||
store.claim_task(&task.id, &agent.id)
|
||||
};
|
||||
info!(
|
||||
job_id = %job.id,
|
||||
task_id = %task.id,
|
||||
|
|
@ -205,23 +206,21 @@ impl Scheduler {
|
|||
// ── Process intake queue ──────────────────────────────────
|
||||
let intake: Vec<TaskRequest> = self.intake_queue.drain(..).collect();
|
||||
for req in intake {
|
||||
match state
|
||||
.store
|
||||
.lock()
|
||||
.unwrap()
|
||||
.create_task(&req.title, req.description.as_deref())
|
||||
{
|
||||
let create_result = {
|
||||
let store = state.store.lock().unwrap();
|
||||
store.create_task(&req.title, req.description.as_deref())
|
||||
};
|
||||
match create_result {
|
||||
Ok(task) => {
|
||||
if let Some(agent) = pick_agent(
|
||||
&req.required_capabilities,
|
||||
&state
|
||||
.store
|
||||
.lock()
|
||||
.unwrap()
|
||||
.list_agents()
|
||||
.unwrap_or_default(),
|
||||
) {
|
||||
let _ = state.store.lock().unwrap().claim_task(&task.id, &agent.id);
|
||||
let agents = {
|
||||
let store = state.store.lock().unwrap();
|
||||
store.list_agents().unwrap_or_default()
|
||||
};
|
||||
if let Some(agent) = pick_agent(&req.required_capabilities, &agents) {
|
||||
let _ = {
|
||||
let store = state.store.lock().unwrap();
|
||||
store.claim_task(&task.id, &agent.id)
|
||||
};
|
||||
info!(
|
||||
task_id = %task.id,
|
||||
agent_id = %agent.id,
|
||||
|
|
@ -334,6 +333,7 @@ fn same_utc_minute(a: DateTime<Utc>, b: DateTime<Utc>) -> bool {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
fn test_now() -> DateTime<Utc> {
|
||||
DateTime::parse_from_rfc3339("2026-06-01T12:00:00Z")
|
||||
|
|
@ -341,6 +341,27 @@ mod tests {
|
|||
.with_timezone(&Utc)
|
||||
}
|
||||
|
||||
fn test_config() -> crate::config::DaemonConfig {
|
||||
let data_dir =
|
||||
std::env::temp_dir().join(format!("colibri-scheduler-test-{}", uuid::Uuid::new_v4()));
|
||||
crate::config::DaemonConfig {
|
||||
socket_path: data_dir.join("colibri.sock"),
|
||||
data_dir: data_dir.clone(),
|
||||
db_path: data_dir.join("colibri.sqlite"),
|
||||
session_max_bytes: 2_000_000,
|
||||
deepseek_api_key: None,
|
||||
deepseek_endpoint: "https://api.deepseek.com/chat/completions".to_string(),
|
||||
deepseek_model: "deepseek-chat".to_string(),
|
||||
openrouter_api_key: None,
|
||||
openrouter_endpoint: "https://openrouter.ai/api/v1/chat/completions".to_string(),
|
||||
anthropic_api_key: None,
|
||||
anthropic_endpoint: "https://api.anthropic.com/v1/messages".to_string(),
|
||||
host: "scheduler-test-host".to_string(),
|
||||
max_context_tokens: 128_000,
|
||||
max_uncompacted_turns: 20,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_interval_fires_first_time() {
|
||||
let sched = Schedule::Interval { every_secs: 300 };
|
||||
|
|
@ -503,4 +524,27 @@ mod tests {
|
|||
}];
|
||||
assert_eq!(pick_agent(&[], &agents).unwrap().name, "generalist");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_scheduler_tick_drains_intake_without_deadlock() {
|
||||
let config = test_config();
|
||||
let data_dir = config.data_dir.clone();
|
||||
let state: SharedState = Arc::new(crate::daemon::DaemonState::new(config));
|
||||
let mut scheduler = Scheduler::new();
|
||||
scheduler.submit(TaskRequest {
|
||||
title: "intake smoke".to_string(),
|
||||
description: Some("must become a task".to_string()),
|
||||
required_capabilities: Vec::new(),
|
||||
});
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(2), scheduler.tick(&state))
|
||||
.await
|
||||
.expect("scheduler tick deadlocked");
|
||||
|
||||
let tasks = state.store.lock().unwrap().list_tasks(None).unwrap();
|
||||
assert_eq!(tasks.len(), 1);
|
||||
assert_eq!(tasks[0].title, "intake smoke");
|
||||
|
||||
let _ = std::fs::remove_dir_all(data_dir);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue