fix(scheduler): eliminate double task creation on intake

The merged #206 created a regression: cmd_intake_task created a task
at submit time (returning id A) AND the scheduler tick created a second
task from the same request (id B, which got claimed). Two tasks per
intake, with the returned id pointing to the orphaned copy.

Fix: TaskRequest now carries the pre-created task_id. The tick drops
its create_task() call and only does pick_agent + claim_task on the
existing id. One task, one id, the returned id is the one the
scheduler routes.

- TaskRequest gains task_id field
- cmd_intake_task creates once, pushes id to queue
- scheduler tick claims via req.task_id (no create_task)
- scheduler unit test pre-creates task before submit
- multi_agent_board test verifies id in response
This commit is contained in:
Sam & Claude 2026-06-26 08:01:35 +02:00 committed by 123kupola
parent d76504c5bb
commit 81c677cc3d
2 changed files with 31 additions and 22 deletions

View file

@ -98,6 +98,8 @@ pub struct ScheduledJob {
/// A task submitted from an external intake (Telegram, socket, cron).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskRequest {
/// Pre-created task ID from the caller (task is already in the store).
pub task_id: String,
pub title: String,
pub description: Option<String>,
pub required_capabilities: Vec<String>,
@ -204,32 +206,29 @@ impl Scheduler {
}
// ── Process intake queue ──────────────────────────────────
// Tasks are already in the store (created by
// cmd_intake_task); tick only routes them to capable agents.
let intake: Vec<TaskRequest> = self.intake_queue.drain(..).collect();
for req in intake {
let create_result = {
let agents = {
let store = state.store.lock().unwrap();
store.create_task(&req.title, req.description.as_deref())
store.list_agents().unwrap_or_default()
};
match create_result {
Ok(task) => {
let agents = {
let store = state.store.lock().unwrap();
store.list_agents().unwrap_or_default()
};
if let Some(agent) = pick_agent(&req.required_capabilities, &agents) {
let _ = {
let store = state.store.lock().unwrap();
store.claim_task(&task.id, &agent.id)
};
info!(
task_id = %task.id,
agent_id = %agent.id,
"scheduler: assigned intake task to agent"
);
}
}
Err(e) => {
warn!(error = %e, "scheduler: failed to create task from intake");
if let Some(agent) = pick_agent(&req.required_capabilities, &agents) {
match {
let store = state.store.lock().unwrap();
store.claim_task(&req.task_id, &agent.id)
} {
Ok(_) => info!(
task_id = %req.task_id,
agent_id = %agent.id,
"scheduler: assigned intake task to agent"
),
Err(e) => warn!(
task_id = %req.task_id,
error = %e,
"scheduler: failed to claim intake task"
),
}
}
}
@ -630,7 +629,16 @@ mod tests {
let data_dir = config.data_dir.clone();
let state: SharedState = Arc::new(crate::daemon::DaemonState::new(config));
let mut scheduler = Scheduler::new();
// Pre-create the task so the tick can claim it (the tick no longer
// creates tasks — that happens in cmd_intake_task now).
let task = state
.store
.lock()
.unwrap()
.create_task("intake check", Some("must become a task"))
.unwrap();
scheduler.submit(TaskRequest {
task_id: task.id,
title: "intake check".to_string(),
description: Some("must become a task".to_string()),
required_capabilities: Vec::new(),

View file

@ -1090,6 +1090,7 @@ async fn cmd_intake_task(
// Also queue for the scheduler tick to route to a capable agent.
let mut scheduler = state.scheduler.lock().await;
scheduler.submit(crate::scheduler::TaskRequest {
task_id: task.id.clone(),
title,
description,
required_capabilities: caps,