From 6ed6b82949ad4d6bc112f0da72cc230171d958b2 Mon Sep 17 00:00:00 2001 From: Sam & Claude Date: Thu, 25 Jun 2026 17:32:56 +0200 Subject: [PATCH] fix(store): make claim_task atomic+exclusive (Gap 4 concurrency guard) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit claim_task was a blind UPDATE (`WHERE id = ?`), so two agents racing to claim the same task both 'succeeded' — last writer wins. This is the exact contention the Tailscale bridge exposes once remote agents poll the board. Guard the UPDATE on `status = 'queued'` so the claim is atomic: only the first claimant matches a still-queued row; the rest match zero rows and get a new StoreError::Conflict (surfaced as ok:false over the socket). A zero-row result is disambiguated into NotFound vs Conflict. Tests: - test_claim_task_is_exclusive / test_claim_task_not_found (store) - socket_rejects_double_claim_of_same_task (daemon, end-to-end over socket) Plan: Gap 4 moved to closed. Store 11 green, board 4 green, clippy clean. Co-Authored-By: Claude Opus 4.8 --- .../colibri-daemon/tests/multi_agent_board.rs | 73 +++++++++++++++++++ crates/colibri-store/src/lib.rs | 64 +++++++++++++++- docs/MULTI-AGENT-HOST-PLAN.md | 7 +- 3 files changed, 140 insertions(+), 4 deletions(-) diff --git a/crates/colibri-daemon/tests/multi_agent_board.rs b/crates/colibri-daemon/tests/multi_agent_board.rs index dee2217..da162d4 100644 --- a/crates/colibri-daemon/tests/multi_agent_board.rs +++ b/crates/colibri-daemon/tests/multi_agent_board.rs @@ -240,6 +240,79 @@ async fn scheduler_routes_intake_tasks_by_capability() { let _ = std::fs::remove_dir_all(&data_dir); } +/// Gap 4 — the claim guard over the socket. Two agents race to claim the same +/// task: the first wins, the second is rejected (`ok:false`), and the task stays +/// with the winner. This is the remote-agent contention case the Tailscale +/// bridge exposes — without the `status = 'queued'` guard the second claim would +/// silently steal the task. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn socket_rejects_double_claim_of_same_task() { + let data_dir = + std::env::temp_dir().join(format!("colibri-double-claim-{}", uuid::Uuid::new_v4())); + std::fs::create_dir_all(&data_dir).expect("create temp data dir"); + + let mut config = DaemonConfig::from_env(); + config.data_dir = data_dir.clone(); + config.socket_path = data_dir.join("colibri.sock"); + config.db_path = data_dir.join("colibri.sqlite"); + let socket_path = config.socket_path.clone(); + + let state: SharedState = Arc::new(DaemonState::new(config)); + let server_state = state.clone(); + let server_shutdown = state.shutdown_rx.resubscribe(); + let server = tokio::spawn(async move { + let _ = socket::serve(server_state, server_shutdown).await; + }); + + wait_for_socket(&socket_path).await; + + let a = send_command( + &socket_path, + r#"{"cmd":"register-agent","name":"a","capabilities":["x"]}"#, + ) + .await; + let a_id = a["data"]["id"].as_str().unwrap().to_string(); + let b = send_command( + &socket_path, + r#"{"cmd":"register-agent","name":"b","capabilities":["x"]}"#, + ) + .await; + let b_id = b["data"]["id"].as_str().unwrap().to_string(); + + let task = send_command( + &socket_path, + r#"{"cmd":"create-task","title":"contended"}"#, + ) + .await; + let task_id = task["data"]["id"].as_str().unwrap().to_string(); + + // First claim wins. + let c1 = send_command( + &socket_path, + &format!(r#"{{"cmd":"claim-task","task_id":"{task_id}","agent_id":"{a_id}"}}"#), + ) + .await; + assert!(c1["ok"].as_bool().unwrap_or(false), "first claim should win: {c1}"); + + // Second claim of the same task is rejected — not a silent steal. + let c2 = send_command( + &socket_path, + &format!(r#"{{"cmd":"claim-task","task_id":"{task_id}","agent_id":"{b_id}"}}"#), + ) + .await; + assert_eq!(c2["ok"].as_bool(), Some(false), "second claim must be rejected: {c2}"); + + // The task still belongs to the first agent. + let claimed = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"claimed"}"#).await; + let tasks = claimed["data"].as_array().unwrap(); + assert_eq!(tasks.len(), 1, "exactly one claimed task: {claimed}"); + assert_eq!(tasks[0]["agent_id"].as_str(), Some(a_id.as_str())); + + state.shutdown_tx.send(()).ok(); + let _ = tokio::time::timeout(Duration::from_secs(2), server).await; + let _ = std::fs::remove_dir_all(&data_dir); +} + async fn wait_for_socket(socket_path: &Path) { for _ in 0..100 { if socket_path.exists() { diff --git a/crates/colibri-store/src/lib.rs b/crates/colibri-store/src/lib.rs index d6180c3..84c1f54 100644 --- a/crates/colibri-store/src/lib.rs +++ b/crates/colibri-store/src/lib.rs @@ -128,6 +128,8 @@ pub enum StoreError { NotFound(String), #[error("already exists: {0}")] AlreadyExists(String), + #[error("conflict: {0}")] + Conflict(String), } pub type Result = std::result::Result; @@ -242,15 +244,35 @@ impl Store { } /// Assign a task to an agent (status → Claimed). + /// Claim a queued task for an agent. + /// + /// The UPDATE is guarded on `status = 'queued'`, so the claim is atomic and + /// exclusive: when two agents race for the same task, only the first wins + /// (its UPDATE matches a still-queued row); the rest match zero rows and get + /// a `Conflict`. This is the concurrency guard that makes claiming safe for + /// remote agents polling the board, not a blind last-writer-wins UPDATE. pub fn claim_task(&self, task_id: &str, agent_id: &str) -> Result { let now = Utc::now().to_rfc3339(); let rows = self.conn.execute( - "UPDATE tasks SET agent_id = ?1, status = ?2, updated_at = ?3 WHERE id = ?4", - params![agent_id, TaskStatus::Claimed.as_str(), now, task_id], + "UPDATE tasks SET agent_id = ?1, status = ?2, updated_at = ?3 + WHERE id = ?4 AND status = ?5", + params![ + agent_id, + TaskStatus::Claimed.as_str(), + now, + task_id, + TaskStatus::Queued.as_str(), + ], )?; if rows == 0 { - return Err(StoreError::NotFound(task_id.to_string())); + // Distinguish "no such task" from "already claimed / not queued". + return match self.get_task(task_id)? { + None => Err(StoreError::NotFound(task_id.to_string())), + Some(_) => Err(StoreError::Conflict(format!( + "task {task_id} is not claimable (already claimed or not queued)" + ))), + }; } self.get_task(task_id)? @@ -660,6 +682,42 @@ mod tests { assert!(result.is_err()); } + #[test] + fn test_claim_task_is_exclusive() { + let store = Store::open_memory().unwrap(); + let first = store + .register_agent("first", serde_json::json!(["freebsd"])) + .unwrap(); + let second = store + .register_agent("second", serde_json::json!(["freebsd"])) + .unwrap(); + let task = store.create_task("contended", None).unwrap(); + + // First agent wins the claim. + let claimed = store.claim_task(&task.id, &first.id).unwrap(); + assert_eq!(claimed.agent_id.as_deref(), Some(first.id.as_str())); + + // Second agent's claim must be rejected with Conflict — not silently + // overwrite the winner (the old blind UPDATE would have stolen it). + let race = store.claim_task(&task.id, &second.id); + assert!(matches!(race, Err(StoreError::Conflict(_))), "got {race:?}"); + + // Task still belongs to the first agent, still Claimed. + let after = store.get_task(&task.id).unwrap().unwrap(); + assert_eq!(after.agent_id.as_deref(), Some(first.id.as_str())); + assert_eq!(after.status, TaskStatus::Claimed); + } + + #[test] + fn test_claim_task_not_found() { + let store = Store::open_memory().unwrap(); + let agent = store + .register_agent("a", serde_json::json!(["x"])) + .unwrap(); + let result = store.claim_task("nonexistent", &agent.id); + assert!(matches!(result, Err(StoreError::NotFound(_))), "got {result:?}"); + } + #[test] fn test_list_tasks_filtered() { let store = Store::open_memory().unwrap(); diff --git a/docs/MULTI-AGENT-HOST-PLAN.md b/docs/MULTI-AGENT-HOST-PLAN.md index c381d10..00a97a6 100644 --- a/docs/MULTI-AGENT-HOST-PLAN.md +++ b/docs/MULTI-AGENT-HOST-PLAN.md @@ -85,13 +85,18 @@ and `set-cost-mode` were added in Phase 2b (PR #138). | # | Gap | Severity | Linux-doable? | | --- | ------------------------------------------------------------------------------------------------------------------------------------------------- | -------- | -------------------------------- | | 3 | **Agent presence model** — await `host`, `last_seen`, and heartbeat/lease columns to detect stale remote agents (Phase 3) | High | Yes (schema change) | -| 4 | **Remote-safe task claim** — `claim_task` is a blind UPDATE; await a concurrency guard or lease/TTL | Medium | Yes | | 5 | **Python polling scripts** — `colibri_poll.py` and `colibri_task_done.py` have zero test coverage | Medium | Yes | | 6 | **TCP bridge round-trip** — socat bridge untested end-to-end | Medium | Partial (needs socat or FreeBSD) | | 7 | **Cross-host coordination** — await a test simulating a remote agent claiming/transitioning a task over the bridge | High | FreeBSD only | ### Closed gaps (since the original 19.jun.2026 analysis) +- **Remote-safe task claim (Gap 4)** — `claim_task` was a blind UPDATE (last + writer wins). Now guarded on `status = 'queued'`, so the claim is atomic and + exclusive: racing agents — exactly the contention the Tailscale bridge exposes + — get a `Conflict` instead of silently stealing a claimed task. Covered by + `test_claim_task_is_exclusive` (store) and `socket_rejects_double_claim_of_same_task` + (daemon, end-to-end over the socket). - **Multi-agent task-board contention (Gap 1)** — tie-breaking, multi-required- capability, and active-status eligibility tests added (Phase 1a, PR #138). Full board lifecycle, capability routing, and contention tests added -- 2.45.3