fix(store): atomic+exclusive claim_task — close Gap 4 concurrency guard #190
3 changed files with 140 additions and 4 deletions
|
|
@ -240,6 +240,79 @@ async fn scheduler_routes_intake_tasks_by_capability() {
|
|||
let _ = std::fs::remove_dir_all(&data_dir);
|
||||
}
|
||||
|
||||
/// Gap 4 — the claim guard over the socket. Two agents race to claim the same
|
||||
/// task: the first wins, the second is rejected (`ok:false`), and the task stays
|
||||
/// with the winner. This is the remote-agent contention case the Tailscale
|
||||
/// bridge exposes — without the `status = 'queued'` guard the second claim would
|
||||
/// silently steal the task.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn socket_rejects_double_claim_of_same_task() {
|
||||
let data_dir =
|
||||
std::env::temp_dir().join(format!("colibri-double-claim-{}", uuid::Uuid::new_v4()));
|
||||
std::fs::create_dir_all(&data_dir).expect("create temp data dir");
|
||||
|
||||
let mut config = DaemonConfig::from_env();
|
||||
config.data_dir = data_dir.clone();
|
||||
config.socket_path = data_dir.join("colibri.sock");
|
||||
config.db_path = data_dir.join("colibri.sqlite");
|
||||
let socket_path = config.socket_path.clone();
|
||||
|
||||
let state: SharedState = Arc::new(DaemonState::new(config));
|
||||
let server_state = state.clone();
|
||||
let server_shutdown = state.shutdown_rx.resubscribe();
|
||||
let server = tokio::spawn(async move {
|
||||
let _ = socket::serve(server_state, server_shutdown).await;
|
||||
});
|
||||
|
||||
wait_for_socket(&socket_path).await;
|
||||
|
||||
let a = send_command(
|
||||
&socket_path,
|
||||
r#"{"cmd":"register-agent","name":"a","capabilities":["x"]}"#,
|
||||
)
|
||||
.await;
|
||||
let a_id = a["data"]["id"].as_str().unwrap().to_string();
|
||||
let b = send_command(
|
||||
&socket_path,
|
||||
r#"{"cmd":"register-agent","name":"b","capabilities":["x"]}"#,
|
||||
)
|
||||
.await;
|
||||
let b_id = b["data"]["id"].as_str().unwrap().to_string();
|
||||
|
||||
let task = send_command(
|
||||
&socket_path,
|
||||
r#"{"cmd":"create-task","title":"contended"}"#,
|
||||
)
|
||||
.await;
|
||||
let task_id = task["data"]["id"].as_str().unwrap().to_string();
|
||||
|
||||
// First claim wins.
|
||||
let c1 = send_command(
|
||||
&socket_path,
|
||||
&format!(r#"{{"cmd":"claim-task","task_id":"{task_id}","agent_id":"{a_id}"}}"#),
|
||||
)
|
||||
.await;
|
||||
assert!(c1["ok"].as_bool().unwrap_or(false), "first claim should win: {c1}");
|
||||
|
||||
// Second claim of the same task is rejected — not a silent steal.
|
||||
let c2 = send_command(
|
||||
&socket_path,
|
||||
&format!(r#"{{"cmd":"claim-task","task_id":"{task_id}","agent_id":"{b_id}"}}"#),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(c2["ok"].as_bool(), Some(false), "second claim must be rejected: {c2}");
|
||||
|
||||
// The task still belongs to the first agent.
|
||||
let claimed = send_command(&socket_path, r#"{"cmd":"list-tasks","status":"claimed"}"#).await;
|
||||
let tasks = claimed["data"].as_array().unwrap();
|
||||
assert_eq!(tasks.len(), 1, "exactly one claimed task: {claimed}");
|
||||
assert_eq!(tasks[0]["agent_id"].as_str(), Some(a_id.as_str()));
|
||||
|
||||
state.shutdown_tx.send(()).ok();
|
||||
let _ = tokio::time::timeout(Duration::from_secs(2), server).await;
|
||||
let _ = std::fs::remove_dir_all(&data_dir);
|
||||
}
|
||||
|
||||
async fn wait_for_socket(socket_path: &Path) {
|
||||
for _ in 0..100 {
|
||||
if socket_path.exists() {
|
||||
|
|
|
|||
|
|
@ -128,6 +128,8 @@ pub enum StoreError {
|
|||
NotFound(String),
|
||||
#[error("already exists: {0}")]
|
||||
AlreadyExists(String),
|
||||
#[error("conflict: {0}")]
|
||||
Conflict(String),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, StoreError>;
|
||||
|
|
@ -242,15 +244,35 @@ impl Store {
|
|||
}
|
||||
|
||||
/// Assign a task to an agent (status → Claimed).
|
||||
/// Claim a queued task for an agent.
|
||||
///
|
||||
/// The UPDATE is guarded on `status = 'queued'`, so the claim is atomic and
|
||||
/// exclusive: when two agents race for the same task, only the first wins
|
||||
/// (its UPDATE matches a still-queued row); the rest match zero rows and get
|
||||
/// a `Conflict`. This is the concurrency guard that makes claiming safe for
|
||||
/// remote agents polling the board, not a blind last-writer-wins UPDATE.
|
||||
pub fn claim_task(&self, task_id: &str, agent_id: &str) -> Result<Task> {
|
||||
let now = Utc::now().to_rfc3339();
|
||||
let rows = self.conn.execute(
|
||||
"UPDATE tasks SET agent_id = ?1, status = ?2, updated_at = ?3 WHERE id = ?4",
|
||||
params![agent_id, TaskStatus::Claimed.as_str(), now, task_id],
|
||||
"UPDATE tasks SET agent_id = ?1, status = ?2, updated_at = ?3
|
||||
WHERE id = ?4 AND status = ?5",
|
||||
params![
|
||||
agent_id,
|
||||
TaskStatus::Claimed.as_str(),
|
||||
now,
|
||||
task_id,
|
||||
TaskStatus::Queued.as_str(),
|
||||
],
|
||||
)?;
|
||||
|
||||
if rows == 0 {
|
||||
return Err(StoreError::NotFound(task_id.to_string()));
|
||||
// Distinguish "no such task" from "already claimed / not queued".
|
||||
return match self.get_task(task_id)? {
|
||||
None => Err(StoreError::NotFound(task_id.to_string())),
|
||||
Some(_) => Err(StoreError::Conflict(format!(
|
||||
"task {task_id} is not claimable (already claimed or not queued)"
|
||||
))),
|
||||
};
|
||||
}
|
||||
|
||||
self.get_task(task_id)?
|
||||
|
|
@ -660,6 +682,42 @@ mod tests {
|
|||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_claim_task_is_exclusive() {
|
||||
let store = Store::open_memory().unwrap();
|
||||
let first = store
|
||||
.register_agent("first", serde_json::json!(["freebsd"]))
|
||||
.unwrap();
|
||||
let second = store
|
||||
.register_agent("second", serde_json::json!(["freebsd"]))
|
||||
.unwrap();
|
||||
let task = store.create_task("contended", None).unwrap();
|
||||
|
||||
// First agent wins the claim.
|
||||
let claimed = store.claim_task(&task.id, &first.id).unwrap();
|
||||
assert_eq!(claimed.agent_id.as_deref(), Some(first.id.as_str()));
|
||||
|
||||
// Second agent's claim must be rejected with Conflict — not silently
|
||||
// overwrite the winner (the old blind UPDATE would have stolen it).
|
||||
let race = store.claim_task(&task.id, &second.id);
|
||||
assert!(matches!(race, Err(StoreError::Conflict(_))), "got {race:?}");
|
||||
|
||||
// Task still belongs to the first agent, still Claimed.
|
||||
let after = store.get_task(&task.id).unwrap().unwrap();
|
||||
assert_eq!(after.agent_id.as_deref(), Some(first.id.as_str()));
|
||||
assert_eq!(after.status, TaskStatus::Claimed);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_claim_task_not_found() {
|
||||
let store = Store::open_memory().unwrap();
|
||||
let agent = store
|
||||
.register_agent("a", serde_json::json!(["x"]))
|
||||
.unwrap();
|
||||
let result = store.claim_task("nonexistent", &agent.id);
|
||||
assert!(matches!(result, Err(StoreError::NotFound(_))), "got {result:?}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_list_tasks_filtered() {
|
||||
let store = Store::open_memory().unwrap();
|
||||
|
|
|
|||
|
|
@ -85,13 +85,18 @@ and `set-cost-mode` were added in Phase 2b (PR #138).
|
|||
| # | Gap | Severity | Linux-doable? |
|
||||
| --- | ------------------------------------------------------------------------------------------------------------------------------------------------- | -------- | -------------------------------- |
|
||||
| 3 | **Agent presence model** — await `host`, `last_seen`, and heartbeat/lease columns to detect stale remote agents (Phase 3) | High | Yes (schema change) |
|
||||
| 4 | **Remote-safe task claim** — `claim_task` is a blind UPDATE; await a concurrency guard or lease/TTL | Medium | Yes |
|
||||
| 5 | **Python polling scripts** — `colibri_poll.py` and `colibri_task_done.py` have zero test coverage | Medium | Yes |
|
||||
| 6 | **TCP bridge round-trip** — socat bridge untested end-to-end | Medium | Partial (needs socat or FreeBSD) |
|
||||
| 7 | **Cross-host coordination** — await a test simulating a remote agent claiming/transitioning a task over the bridge | High | FreeBSD only |
|
||||
|
||||
### Closed gaps (since the original 19.jun.2026 analysis)
|
||||
|
||||
- **Remote-safe task claim (Gap 4)** — `claim_task` was a blind UPDATE (last
|
||||
writer wins). Now guarded on `status = 'queued'`, so the claim is atomic and
|
||||
exclusive: racing agents — exactly the contention the Tailscale bridge exposes
|
||||
— get a `Conflict` instead of silently stealing a claimed task. Covered by
|
||||
`test_claim_task_is_exclusive` (store) and `socket_rejects_double_claim_of_same_task`
|
||||
(daemon, end-to-end over the socket).
|
||||
- **Multi-agent task-board contention (Gap 1)** — tie-breaking, multi-required-
|
||||
capability, and active-status eligibility tests added (Phase 1a, PR #138).
|
||||
Full board lifecycle, capability routing, and contention tests added
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue