fix(scheduler): eliminate intake double-create (+ fmt normalize) #207
4 changed files with 55 additions and 38 deletions
|
|
@ -98,6 +98,8 @@ pub struct ScheduledJob {
|
|||
/// A task submitted from an external intake (Telegram, socket, cron).
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct TaskRequest {
|
||||
/// Pre-created task ID from the caller (task is already in the store).
|
||||
pub task_id: String,
|
||||
pub title: String,
|
||||
pub description: Option<String>,
|
||||
pub required_capabilities: Vec<String>,
|
||||
|
|
@ -204,32 +206,29 @@ impl Scheduler {
|
|||
}
|
||||
|
||||
// ── Process intake queue ──────────────────────────────────
|
||||
// Tasks are already in the store (created by
|
||||
// cmd_intake_task); tick only routes them to capable agents.
|
||||
let intake: Vec<TaskRequest> = self.intake_queue.drain(..).collect();
|
||||
for req in intake {
|
||||
let create_result = {
|
||||
let agents = {
|
||||
let store = state.store.lock().unwrap();
|
||||
store.create_task(&req.title, req.description.as_deref())
|
||||
store.list_agents().unwrap_or_default()
|
||||
};
|
||||
match create_result {
|
||||
Ok(task) => {
|
||||
let agents = {
|
||||
let store = state.store.lock().unwrap();
|
||||
store.list_agents().unwrap_or_default()
|
||||
};
|
||||
if let Some(agent) = pick_agent(&req.required_capabilities, &agents) {
|
||||
let _ = {
|
||||
let store = state.store.lock().unwrap();
|
||||
store.claim_task(&task.id, &agent.id)
|
||||
};
|
||||
info!(
|
||||
task_id = %task.id,
|
||||
agent_id = %agent.id,
|
||||
"scheduler: assigned intake task to agent"
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(error = %e, "scheduler: failed to create task from intake");
|
||||
if let Some(agent) = pick_agent(&req.required_capabilities, &agents) {
|
||||
match {
|
||||
let store = state.store.lock().unwrap();
|
||||
store.claim_task(&req.task_id, &agent.id)
|
||||
} {
|
||||
Ok(_) => info!(
|
||||
task_id = %req.task_id,
|
||||
agent_id = %agent.id,
|
||||
"scheduler: assigned intake task to agent"
|
||||
),
|
||||
Err(e) => warn!(
|
||||
task_id = %req.task_id,
|
||||
error = %e,
|
||||
"scheduler: failed to claim intake task"
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -509,8 +508,8 @@ mod tests {
|
|||
capabilities: serde_json::json!(["rust"]),
|
||||
status: "offline".into(),
|
||||
created_at: "2026-01-01T00:00:00Z".into(),
|
||||
host: None,
|
||||
last_seen: None,
|
||||
host: None,
|
||||
last_seen: None,
|
||||
}];
|
||||
assert!(pick_agent(&required, &agents).is_none());
|
||||
}
|
||||
|
|
@ -524,8 +523,8 @@ mod tests {
|
|||
capabilities: serde_json::json!(["linux"]),
|
||||
status: "idle".into(),
|
||||
created_at: "2026-01-01T00:00:00Z".into(),
|
||||
host: None,
|
||||
last_seen: None,
|
||||
host: None,
|
||||
last_seen: None,
|
||||
}];
|
||||
assert!(pick_agent(&required, &agents).is_none());
|
||||
}
|
||||
|
|
@ -538,8 +537,8 @@ mod tests {
|
|||
capabilities: serde_json::json!([]),
|
||||
status: "idle".into(),
|
||||
created_at: "2026-01-01T00:00:00Z".into(),
|
||||
host: None,
|
||||
last_seen: None,
|
||||
host: None,
|
||||
last_seen: None,
|
||||
}];
|
||||
assert_eq!(pick_agent(&[], &agents).unwrap().name, "generalist");
|
||||
}
|
||||
|
|
@ -613,8 +612,8 @@ mod tests {
|
|||
capabilities: serde_json::json!(["rust"]),
|
||||
status: "active".into(),
|
||||
created_at: "2026-01-01T00:00:00Z".into(),
|
||||
host: None,
|
||||
last_seen: None,
|
||||
host: None,
|
||||
last_seen: None,
|
||||
}];
|
||||
let picked = pick_agent(&required, &agents);
|
||||
assert!(
|
||||
|
|
@ -630,7 +629,16 @@ mod tests {
|
|||
let data_dir = config.data_dir.clone();
|
||||
let state: SharedState = Arc::new(crate::daemon::DaemonState::new(config));
|
||||
let mut scheduler = Scheduler::new();
|
||||
// Pre-create the task so the tick can claim it (the tick no longer
|
||||
// creates tasks — that happens in cmd_intake_task now).
|
||||
let task = state
|
||||
.store
|
||||
.lock()
|
||||
.unwrap()
|
||||
.create_task("intake check", Some("must become a task"))
|
||||
.unwrap();
|
||||
scheduler.submit(TaskRequest {
|
||||
task_id: task.id,
|
||||
title: "intake check".to_string(),
|
||||
description: Some("must become a task".to_string()),
|
||||
required_capabilities: Vec::new(),
|
||||
|
|
|
|||
|
|
@ -264,9 +264,11 @@ async fn dispatch(cmd: ColibriCommand, state: &SharedState) -> ColibriResponse {
|
|||
cmd_claim_task(state, task_id, agent_id).await
|
||||
}
|
||||
ColibriCommand::ListAgents => cmd_list_agents(state).await,
|
||||
ColibriCommand::RegisterAgent { name, capabilities, host } => {
|
||||
cmd_register_agent(state, name, capabilities, host.as_deref()).await
|
||||
}
|
||||
ColibriCommand::RegisterAgent {
|
||||
name,
|
||||
capabilities,
|
||||
host,
|
||||
} => cmd_register_agent(state, name, capabilities, host.as_deref()).await,
|
||||
ColibriCommand::Heartbeat { agent_id, host } => {
|
||||
cmd_heartbeat(state, agent_id, host.as_deref()).await
|
||||
}
|
||||
|
|
@ -1083,13 +1085,19 @@ async fn cmd_intake_task(
|
|||
) -> ColibriResponse {
|
||||
let caps = capabilities.unwrap_or_default();
|
||||
// Create the task immediately so the caller gets the ID back.
|
||||
let task = match state.store.lock().unwrap().create_task(&title, description.as_deref()) {
|
||||
let task = match state
|
||||
.store
|
||||
.lock()
|
||||
.unwrap()
|
||||
.create_task(&title, description.as_deref())
|
||||
{
|
||||
Ok(t) => t,
|
||||
Err(e) => return ColibriResponse::err(format!("create task failed: {e}")),
|
||||
};
|
||||
// Also queue for the scheduler tick to route to a capable agent.
|
||||
let mut scheduler = state.scheduler.lock().await;
|
||||
scheduler.submit(crate::scheduler::TaskRequest {
|
||||
task_id: task.id.clone(),
|
||||
title,
|
||||
description,
|
||||
required_capabilities: caps,
|
||||
|
|
|
|||
|
|
@ -199,7 +199,10 @@ async fn scheduler_routes_intake_tasks_by_capability() {
|
|||
)
|
||||
.await;
|
||||
// Intake now returns the full task (id + status), not just {"status":"queued"}.
|
||||
assert!(fs_intake["data"]["id"].is_string(), "intake must return task id");
|
||||
assert!(
|
||||
fs_intake["data"]["id"].is_string(),
|
||||
"intake must return task id"
|
||||
);
|
||||
|
||||
send_command(
|
||||
&socket_path,
|
||||
|
|
|
|||
|
|
@ -375,9 +375,7 @@ impl Store {
|
|||
params![now, host, agent_id],
|
||||
)?;
|
||||
if rows == 0 {
|
||||
return Err(StoreError::NotFound(format!(
|
||||
"agent not found: {agent_id}"
|
||||
)));
|
||||
return Err(StoreError::NotFound(format!("agent not found: {agent_id}")));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue