fix(store): atomic+exclusive claim_task — close Gap 4 concurrency guard #190

Merged
clawdie merged 1 commit from fix/claim-task-concurrency-guard into main 2026-06-25 17:33:17 +02:00
3 changed files with 140 additions and 4 deletions

View file

@ -240,6 +240,79 @@ async fn scheduler_routes_intake_tasks_by_capability() {
let _ = std::fs::remove_dir_all(&data_dir);
}
/// Gap 4 — the claim guard over the socket. Two agents race to claim the same
/// task: the first wins, the second is rejected (`ok:false`), and the task stays
/// with the winner. This is the remote-agent contention case the Tailscale
/// bridge exposes — without the `status = 'queued'` guard the second claim would
/// silently steal the task.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn socket_rejects_double_claim_of_same_task() {
let data_dir =
std::env::temp_dir().join(format!("colibri-double-claim-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&data_dir).expect("create temp data dir");
let mut config = DaemonConfig::from_env();
config.data_dir = data_dir.clone();
config.socket_path = data_dir.join("colibri.sock");
config.db_path = data_dir.join("colibri.sqlite");
let socket_path = config.socket_path.clone();
let state: SharedState = Arc::new(DaemonState::new(config));
let server_state = state.clone();
let server_shutdown = state.shutdown_rx.resubscribe();
let server = tokio::spawn(async move {
let _ = socket::serve(server_state, server_shutdown).await;
});
wait_for_socket(&socket_path).await;
let a = send_command(
&socket_path,
r#"{"cmd":"register-agent","name":"a","capabilities":["x"]}"#,
)
.await;
let a_id = a["data"]["id"].as_str().unwrap().to_string();
let b = send_command(
&socket_path,
r#"{"cmd":"register-agent","name":"b","capabilities":["x"]}"#,
)
.await;
let b_id = b["data"]["id"].as_str().unwrap().to_string();
let task = send_command(
&socket_path,
r#"{"cmd":"create-task","title":"contended"}"#,
)
.await;
let task_id = task["data"]["id"].as_str().unwrap().to_string();
// First claim wins.
let c1 = send_command(
&socket_path,
&format!(r#"{{"cmd":"claim-task","task_id":"{task_id}","agent_id":"{a_id}"}}"#),
)
.await;
assert!(c1["ok"].as_bool().unwrap_or(false), "first claim should win: {c1}");
// Second claim of the same task is rejected — not a silent steal.
let c2 = send_command(
&socket_path,
&format!(r#"{{"cmd":"claim-task","task_id":"{task_id}","agent_id":"{b_id}"}}"#),
)
.await;
assert_eq!(c2["ok"].as_bool(), Some(false), "second claim must be rejected: {c2}");
// The task still belongs to the first agent.
let claimed = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"claimed"}"#).await;
let tasks = claimed["data"].as_array().unwrap();
assert_eq!(tasks.len(), 1, "exactly one claimed task: {claimed}");
assert_eq!(tasks[0]["agent_id"].as_str(), Some(a_id.as_str()));
state.shutdown_tx.send(()).ok();
let _ = tokio::time::timeout(Duration::from_secs(2), server).await;
let _ = std::fs::remove_dir_all(&data_dir);
}
async fn wait_for_socket(socket_path: &Path) {
for _ in 0..100 {
if socket_path.exists() {

View file

@ -128,6 +128,8 @@ pub enum StoreError {
NotFound(String),
#[error("already exists: {0}")]
AlreadyExists(String),
#[error("conflict: {0}")]
Conflict(String),
}
pub type Result<T> = std::result::Result<T, StoreError>;
@ -242,15 +244,35 @@ impl Store {
}
/// Assign a task to an agent (status → Claimed).
/// Claim a queued task for an agent.
///
/// The UPDATE is guarded on `status = 'queued'`, so the claim is atomic and
/// exclusive: when two agents race for the same task, only the first wins
/// (its UPDATE matches a still-queued row); the rest match zero rows and get
/// a `Conflict`. This is the concurrency guard that makes claiming safe for
/// remote agents polling the board, not a blind last-writer-wins UPDATE.
pub fn claim_task(&self, task_id: &str, agent_id: &str) -> Result<Task> {
let now = Utc::now().to_rfc3339();
let rows = self.conn.execute(
"UPDATE tasks SET agent_id = ?1, status = ?2, updated_at = ?3 WHERE id = ?4",
params![agent_id, TaskStatus::Claimed.as_str(), now, task_id],
"UPDATE tasks SET agent_id = ?1, status = ?2, updated_at = ?3
WHERE id = ?4 AND status = ?5",
params![
agent_id,
TaskStatus::Claimed.as_str(),
now,
task_id,
TaskStatus::Queued.as_str(),
],
)?;
if rows == 0 {
return Err(StoreError::NotFound(task_id.to_string()));
// Distinguish "no such task" from "already claimed / not queued".
return match self.get_task(task_id)? {
None => Err(StoreError::NotFound(task_id.to_string())),
Some(_) => Err(StoreError::Conflict(format!(
"task {task_id} is not claimable (already claimed or not queued)"
))),
};
}
self.get_task(task_id)?
@ -660,6 +682,42 @@ mod tests {
assert!(result.is_err());
}
#[test]
fn test_claim_task_is_exclusive() {
let store = Store::open_memory().unwrap();
let first = store
.register_agent("first", serde_json::json!(["freebsd"]))
.unwrap();
let second = store
.register_agent("second", serde_json::json!(["freebsd"]))
.unwrap();
let task = store.create_task("contended", None).unwrap();
// First agent wins the claim.
let claimed = store.claim_task(&task.id, &first.id).unwrap();
assert_eq!(claimed.agent_id.as_deref(), Some(first.id.as_str()));
// Second agent's claim must be rejected with Conflict — not silently
// overwrite the winner (the old blind UPDATE would have stolen it).
let race = store.claim_task(&task.id, &second.id);
assert!(matches!(race, Err(StoreError::Conflict(_))), "got {race:?}");
// Task still belongs to the first agent, still Claimed.
let after = store.get_task(&task.id).unwrap().unwrap();
assert_eq!(after.agent_id.as_deref(), Some(first.id.as_str()));
assert_eq!(after.status, TaskStatus::Claimed);
}
#[test]
fn test_claim_task_not_found() {
let store = Store::open_memory().unwrap();
let agent = store
.register_agent("a", serde_json::json!(["x"]))
.unwrap();
let result = store.claim_task("nonexistent", &agent.id);
assert!(matches!(result, Err(StoreError::NotFound(_))), "got {result:?}");
}
#[test]
fn test_list_tasks_filtered() {
let store = Store::open_memory().unwrap();

View file

@ -85,13 +85,18 @@ and `set-cost-mode` were added in Phase 2b (PR #138).
| # | Gap | Severity | Linux-doable? |
| --- | ------------------------------------------------------------------------------------------------------------------------------------------------- | -------- | -------------------------------- |
| 3 | **Agent presence model** — await `host`, `last_seen`, and heartbeat/lease columns to detect stale remote agents (Phase 3) | High | Yes (schema change) |
| 4 | **Remote-safe task claim**`claim_task` is a blind UPDATE; await a concurrency guard or lease/TTL | Medium | Yes |
| 5 | **Python polling scripts**`colibri_poll.py` and `colibri_task_done.py` have zero test coverage | Medium | Yes |
| 6 | **TCP bridge round-trip** — socat bridge untested end-to-end | Medium | Partial (needs socat or FreeBSD) |
| 7 | **Cross-host coordination** — await a test simulating a remote agent claiming/transitioning a task over the bridge | High | FreeBSD only |
### Closed gaps (since the original 19.jun.2026 analysis)
- **Remote-safe task claim (Gap 4)**`claim_task` was a blind UPDATE (last
writer wins). Now guarded on `status = 'queued'`, so the claim is atomic and
exclusive: racing agents — exactly the contention the Tailscale bridge exposes
— get a `Conflict` instead of silently stealing a claimed task. Covered by
`test_claim_task_is_exclusive` (store) and `socket_rejects_double_claim_of_same_task`
(daemon, end-to-end over the socket).
- **Multi-agent task-board contention (Gap 1)** — tie-breaking, multi-required-
capability, and active-status eligibility tests added (Phase 1a, PR #138).
Full board lifecycle, capability routing, and contention tests added