diff --git a/crates/colibri-daemon/src/scheduler.rs b/crates/colibri-daemon/src/scheduler.rs index 5f1d1e5..f42c08b 100644 --- a/crates/colibri-daemon/src/scheduler.rs +++ b/crates/colibri-daemon/src/scheduler.rs @@ -165,26 +165,27 @@ impl Scheduler { } } - // Create the task + // Create the task. Keep each store lock in a short explicit scope: + // a `match state.store.lock().create_task(...)` temporary can live + // for the whole match arm, and relocking inside the arm deadlocks + // the scheduler on FreeBSD. debug!(job_id = %job.id, title = %job.task_title, "scheduler: firing job"); - match state - .store - .lock() - .unwrap() - .create_task(&job.task_title, job.task_description.as_deref()) - { + let create_result = { + let store = state.store.lock().unwrap(); + store.create_task(&job.task_title, job.task_description.as_deref()) + }; + match create_result { Ok(task) => { // Try to auto-assign - if let Some(agent) = pick_agent( - &job.required_capabilities, - &state - .store - .lock() - .unwrap() - .list_agents() - .unwrap_or_default(), - ) { - let _ = state.store.lock().unwrap().claim_task(&task.id, &agent.id); + let agents = { + let store = state.store.lock().unwrap(); + store.list_agents().unwrap_or_default() + }; + if let Some(agent) = pick_agent(&job.required_capabilities, &agents) { + let _ = { + let store = state.store.lock().unwrap(); + store.claim_task(&task.id, &agent.id) + }; info!( job_id = %job.id, task_id = %task.id, @@ -205,23 +206,21 @@ impl Scheduler { // ── Process intake queue ────────────────────────────────── let intake: Vec = self.intake_queue.drain(..).collect(); for req in intake { - match state - .store - .lock() - .unwrap() - .create_task(&req.title, req.description.as_deref()) - { + let create_result = { + let store = state.store.lock().unwrap(); + store.create_task(&req.title, req.description.as_deref()) + }; + match create_result { Ok(task) => { - if let Some(agent) = pick_agent( - &req.required_capabilities, - &state - .store - .lock() - .unwrap() - .list_agents() - .unwrap_or_default(), - ) { - let _ = state.store.lock().unwrap().claim_task(&task.id, &agent.id); + let agents = { + let store = state.store.lock().unwrap(); + store.list_agents().unwrap_or_default() + }; + if let Some(agent) = pick_agent(&req.required_capabilities, &agents) { + let _ = { + let store = state.store.lock().unwrap(); + store.claim_task(&task.id, &agent.id) + }; info!( task_id = %task.id, agent_id = %agent.id, @@ -334,6 +333,7 @@ fn same_utc_minute(a: DateTime, b: DateTime) -> bool { #[cfg(test)] mod tests { use super::*; + use std::{sync::Arc, time::Duration}; fn test_now() -> DateTime { DateTime::parse_from_rfc3339("2026-06-01T12:00:00Z") @@ -341,6 +341,27 @@ mod tests { .with_timezone(&Utc) } + fn test_config() -> crate::config::DaemonConfig { + let data_dir = + std::env::temp_dir().join(format!("colibri-scheduler-test-{}", uuid::Uuid::new_v4())); + crate::config::DaemonConfig { + socket_path: data_dir.join("colibri.sock"), + data_dir: data_dir.clone(), + db_path: data_dir.join("colibri.sqlite"), + session_max_bytes: 2_000_000, + deepseek_api_key: None, + deepseek_endpoint: "https://api.deepseek.com/chat/completions".to_string(), + deepseek_model: "deepseek-chat".to_string(), + openrouter_api_key: None, + openrouter_endpoint: "https://openrouter.ai/api/v1/chat/completions".to_string(), + anthropic_api_key: None, + anthropic_endpoint: "https://api.anthropic.com/v1/messages".to_string(), + host: "scheduler-test-host".to_string(), + max_context_tokens: 128_000, + max_uncompacted_turns: 20, + } + } + #[test] fn test_interval_fires_first_time() { let sched = Schedule::Interval { every_secs: 300 }; @@ -503,4 +524,27 @@ mod tests { }]; assert_eq!(pick_agent(&[], &agents).unwrap().name, "generalist"); } + + #[tokio::test] + async fn test_scheduler_tick_drains_intake_without_deadlock() { + let config = test_config(); + let data_dir = config.data_dir.clone(); + let state: SharedState = Arc::new(crate::daemon::DaemonState::new(config)); + let mut scheduler = Scheduler::new(); + scheduler.submit(TaskRequest { + title: "intake smoke".to_string(), + description: Some("must become a task".to_string()), + required_capabilities: Vec::new(), + }); + + tokio::time::timeout(Duration::from_secs(2), scheduler.tick(&state)) + .await + .expect("scheduler tick deadlocked"); + + let tasks = state.store.lock().unwrap().list_tasks(None).unwrap(); + assert_eq!(tasks.len(), 1); + assert_eq!(tasks[0].title, "intake smoke"); + + let _ = std::fs::remove_dir_all(data_dir); + } }