From a336f172df5d9e74bfab299ff9bd969e1b127743 Mon Sep 17 00:00:00 2001 From: Sam & Claude Date: Fri, 26 Jun 2026 08:01:35 +0200 Subject: [PATCH 1/2] fix(scheduler): eliminate double task creation on intake The merged #206 created a regression: cmd_intake_task created a task at submit time (returning id A) AND the scheduler tick created a second task from the same request (id B, which got claimed). Two tasks per intake, with the returned id pointing to the orphaned copy. Fix: TaskRequest now carries the pre-created task_id. The tick drops its create_task() call and only does pick_agent + claim_task on the existing id. One task, one id, the returned id is the one the scheduler routes. - TaskRequest gains task_id field - cmd_intake_task creates once, pushes id to queue - scheduler tick claims via req.task_id (no create_task) - scheduler unit test pre-creates task before submit - multi_agent_board test verifies id in response --- crates/colibri-daemon/src/scheduler.rs | 52 +++++++++++++++----------- crates/colibri-daemon/src/socket.rs | 1 + 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/crates/colibri-daemon/src/scheduler.rs b/crates/colibri-daemon/src/scheduler.rs index 7364a2b..f27cf9d 100644 --- a/crates/colibri-daemon/src/scheduler.rs +++ b/crates/colibri-daemon/src/scheduler.rs @@ -98,6 +98,8 @@ pub struct ScheduledJob { /// A task submitted from an external intake (Telegram, socket, cron). #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TaskRequest { + /// Pre-created task ID from the caller (task is already in the store). + pub task_id: String, pub title: String, pub description: Option, pub required_capabilities: Vec, @@ -204,32 +206,29 @@ impl Scheduler { } // ── Process intake queue ────────────────────────────────── + // Tasks are already in the store (created by + // cmd_intake_task); tick only routes them to capable agents. let intake: Vec = self.intake_queue.drain(..).collect(); for req in intake { - let create_result = { + let agents = { let store = state.store.lock().unwrap(); - store.create_task(&req.title, req.description.as_deref()) + store.list_agents().unwrap_or_default() }; - match create_result { - Ok(task) => { - let agents = { - let store = state.store.lock().unwrap(); - store.list_agents().unwrap_or_default() - }; - if let Some(agent) = pick_agent(&req.required_capabilities, &agents) { - let _ = { - let store = state.store.lock().unwrap(); - store.claim_task(&task.id, &agent.id) - }; - info!( - task_id = %task.id, - agent_id = %agent.id, - "scheduler: assigned intake task to agent" - ); - } - } - Err(e) => { - warn!(error = %e, "scheduler: failed to create task from intake"); + if let Some(agent) = pick_agent(&req.required_capabilities, &agents) { + match { + let store = state.store.lock().unwrap(); + store.claim_task(&req.task_id, &agent.id) + } { + Ok(_) => info!( + task_id = %req.task_id, + agent_id = %agent.id, + "scheduler: assigned intake task to agent" + ), + Err(e) => warn!( + task_id = %req.task_id, + error = %e, + "scheduler: failed to claim intake task" + ), } } } @@ -630,7 +629,16 @@ mod tests { let data_dir = config.data_dir.clone(); let state: SharedState = Arc::new(crate::daemon::DaemonState::new(config)); let mut scheduler = Scheduler::new(); + // Pre-create the task so the tick can claim it (the tick no longer + // creates tasks — that happens in cmd_intake_task now). + let task = state + .store + .lock() + .unwrap() + .create_task("intake check", Some("must become a task")) + .unwrap(); scheduler.submit(TaskRequest { + task_id: task.id, title: "intake check".to_string(), description: Some("must become a task".to_string()), required_capabilities: Vec::new(), diff --git a/crates/colibri-daemon/src/socket.rs b/crates/colibri-daemon/src/socket.rs index 312c2a6..d508c56 100644 --- a/crates/colibri-daemon/src/socket.rs +++ b/crates/colibri-daemon/src/socket.rs @@ -1090,6 +1090,7 @@ async fn cmd_intake_task( // Also queue for the scheduler tick to route to a capable agent. let mut scheduler = state.scheduler.lock().await; scheduler.submit(crate::scheduler::TaskRequest { + task_id: task.id.clone(), title, description, required_capabilities: caps, -- 2.45.3 From b812c1ee22d445a88217f20c54e73cb40c21e947 Mon Sep 17 00:00:00 2001 From: Sam & Claude Date: Fri, 26 Jun 2026 08:08:03 +0200 Subject: [PATCH 2/2] =?UTF-8?q?style:=20cargo=20fmt=20=E2=80=94=20normaliz?= =?UTF-8?q?e=20pre-existing=20drift=20inherited=20from=20main?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit main's fmt gate was red (over-indented host/last_seen in scheduler test fixtures + drift in socket/store/tests from earlier Phase-3 merges). This brings the branch to fmt --all --check clean so the gate passes; kept separate from the dedup fix commit. Co-Authored-By: Claude Opus 4.8 --- crates/colibri-daemon/src/scheduler.rs | 16 ++++++++-------- crates/colibri-daemon/src/socket.rs | 15 +++++++++++---- crates/colibri-daemon/tests/multi_agent_board.rs | 5 ++++- crates/colibri-store/src/lib.rs | 4 +--- 4 files changed, 24 insertions(+), 16 deletions(-) diff --git a/crates/colibri-daemon/src/scheduler.rs b/crates/colibri-daemon/src/scheduler.rs index f27cf9d..6a264c9 100644 --- a/crates/colibri-daemon/src/scheduler.rs +++ b/crates/colibri-daemon/src/scheduler.rs @@ -508,8 +508,8 @@ mod tests { capabilities: serde_json::json!(["rust"]), status: "offline".into(), created_at: "2026-01-01T00:00:00Z".into(), - host: None, - last_seen: None, + host: None, + last_seen: None, }]; assert!(pick_agent(&required, &agents).is_none()); } @@ -523,8 +523,8 @@ mod tests { capabilities: serde_json::json!(["linux"]), status: "idle".into(), created_at: "2026-01-01T00:00:00Z".into(), - host: None, - last_seen: None, + host: None, + last_seen: None, }]; assert!(pick_agent(&required, &agents).is_none()); } @@ -537,8 +537,8 @@ mod tests { capabilities: serde_json::json!([]), status: "idle".into(), created_at: "2026-01-01T00:00:00Z".into(), - host: None, - last_seen: None, + host: None, + last_seen: None, }]; assert_eq!(pick_agent(&[], &agents).unwrap().name, "generalist"); } @@ -612,8 +612,8 @@ mod tests { capabilities: serde_json::json!(["rust"]), status: "active".into(), created_at: "2026-01-01T00:00:00Z".into(), - host: None, - last_seen: None, + host: None, + last_seen: None, }]; let picked = pick_agent(&required, &agents); assert!( diff --git a/crates/colibri-daemon/src/socket.rs b/crates/colibri-daemon/src/socket.rs index d508c56..5e39ed3 100644 --- a/crates/colibri-daemon/src/socket.rs +++ b/crates/colibri-daemon/src/socket.rs @@ -264,9 +264,11 @@ async fn dispatch(cmd: ColibriCommand, state: &SharedState) -> ColibriResponse { cmd_claim_task(state, task_id, agent_id).await } ColibriCommand::ListAgents => cmd_list_agents(state).await, - ColibriCommand::RegisterAgent { name, capabilities, host } => { - cmd_register_agent(state, name, capabilities, host.as_deref()).await - } + ColibriCommand::RegisterAgent { + name, + capabilities, + host, + } => cmd_register_agent(state, name, capabilities, host.as_deref()).await, ColibriCommand::Heartbeat { agent_id, host } => { cmd_heartbeat(state, agent_id, host.as_deref()).await } @@ -1083,7 +1085,12 @@ async fn cmd_intake_task( ) -> ColibriResponse { let caps = capabilities.unwrap_or_default(); // Create the task immediately so the caller gets the ID back. - let task = match state.store.lock().unwrap().create_task(&title, description.as_deref()) { + let task = match state + .store + .lock() + .unwrap() + .create_task(&title, description.as_deref()) + { Ok(t) => t, Err(e) => return ColibriResponse::err(format!("create task failed: {e}")), }; diff --git a/crates/colibri-daemon/tests/multi_agent_board.rs b/crates/colibri-daemon/tests/multi_agent_board.rs index ef0ea31..98069f5 100644 --- a/crates/colibri-daemon/tests/multi_agent_board.rs +++ b/crates/colibri-daemon/tests/multi_agent_board.rs @@ -199,7 +199,10 @@ async fn scheduler_routes_intake_tasks_by_capability() { ) .await; // Intake now returns the full task (id + status), not just {"status":"queued"}. - assert!(fs_intake["data"]["id"].is_string(), "intake must return task id"); + assert!( + fs_intake["data"]["id"].is_string(), + "intake must return task id" + ); send_command( &socket_path, diff --git a/crates/colibri-store/src/lib.rs b/crates/colibri-store/src/lib.rs index ea0fd87..12afb5d 100644 --- a/crates/colibri-store/src/lib.rs +++ b/crates/colibri-store/src/lib.rs @@ -375,9 +375,7 @@ impl Store { params![now, host, agent_id], )?; if rows == 0 { - return Err(StoreError::NotFound(format!( - "agent not found: {agent_id}" - ))); + return Err(StoreError::NotFound(format!("agent not found: {agent_id}"))); } Ok(()) } -- 2.45.3