diff --git a/crates/colibri-daemon/src/scheduler.rs b/crates/colibri-daemon/src/scheduler.rs index 7364a2b..f27cf9d 100644 --- a/crates/colibri-daemon/src/scheduler.rs +++ b/crates/colibri-daemon/src/scheduler.rs @@ -98,6 +98,8 @@ pub struct ScheduledJob { /// A task submitted from an external intake (Telegram, socket, cron). #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TaskRequest { + /// Pre-created task ID from the caller (task is already in the store). + pub task_id: String, pub title: String, pub description: Option, pub required_capabilities: Vec, @@ -204,32 +206,29 @@ impl Scheduler { } // ── Process intake queue ────────────────────────────────── + // Tasks are already in the store (created by + // cmd_intake_task); tick only routes them to capable agents. let intake: Vec = self.intake_queue.drain(..).collect(); for req in intake { - let create_result = { + let agents = { let store = state.store.lock().unwrap(); - store.create_task(&req.title, req.description.as_deref()) + store.list_agents().unwrap_or_default() }; - match create_result { - Ok(task) => { - let agents = { - let store = state.store.lock().unwrap(); - store.list_agents().unwrap_or_default() - }; - if let Some(agent) = pick_agent(&req.required_capabilities, &agents) { - let _ = { - let store = state.store.lock().unwrap(); - store.claim_task(&task.id, &agent.id) - }; - info!( - task_id = %task.id, - agent_id = %agent.id, - "scheduler: assigned intake task to agent" - ); - } - } - Err(e) => { - warn!(error = %e, "scheduler: failed to create task from intake"); + if let Some(agent) = pick_agent(&req.required_capabilities, &agents) { + match { + let store = state.store.lock().unwrap(); + store.claim_task(&req.task_id, &agent.id) + } { + Ok(_) => info!( + task_id = %req.task_id, + agent_id = %agent.id, + "scheduler: assigned intake task to agent" + ), + Err(e) => warn!( + task_id = %req.task_id, + error = %e, + "scheduler: failed to claim intake task" + ), } } } @@ -630,7 +629,16 @@ mod tests { let data_dir = config.data_dir.clone(); let state: SharedState = Arc::new(crate::daemon::DaemonState::new(config)); let mut scheduler = Scheduler::new(); + // Pre-create the task so the tick can claim it (the tick no longer + // creates tasks — that happens in cmd_intake_task now). + let task = state + .store + .lock() + .unwrap() + .create_task("intake check", Some("must become a task")) + .unwrap(); scheduler.submit(TaskRequest { + task_id: task.id, title: "intake check".to_string(), description: Some("must become a task".to_string()), required_capabilities: Vec::new(), diff --git a/crates/colibri-daemon/src/socket.rs b/crates/colibri-daemon/src/socket.rs index 312c2a6..d508c56 100644 --- a/crates/colibri-daemon/src/socket.rs +++ b/crates/colibri-daemon/src/socket.rs @@ -1090,6 +1090,7 @@ async fn cmd_intake_task( // Also queue for the scheduler tick to route to a capable agent. let mut scheduler = state.scheduler.lock().await; scheduler.submit(crate::scheduler::TaskRequest { + task_id: task.id.clone(), title, description, required_capabilities: caps,