docs/refresh-multi-agent-plan #205
7 changed files with 112 additions and 29 deletions
|
|
@ -240,6 +240,7 @@ impl DaemonClient {
|
|||
self.request(&ColibriCommand::RegisterAgent {
|
||||
name: name.into(),
|
||||
capabilities: Some(serde_json::to_value(capabilities).unwrap_or_default()),
|
||||
host: None,
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
|
|
|||
|
|
@ -75,6 +75,16 @@ pub enum ColibriCommand {
|
|||
RegisterAgent {
|
||||
name: String,
|
||||
capabilities: Option<serde_json::Value>,
|
||||
/// Host the agent is registering from (e.g. "osa", "domedog"). Phase 3.
|
||||
#[serde(default)]
|
||||
host: Option<String>,
|
||||
},
|
||||
#[serde(rename = "heartbeat")]
|
||||
Heartbeat {
|
||||
agent_id: String,
|
||||
/// Optional host update if the agent moved machines. Phase 3.
|
||||
#[serde(default)]
|
||||
host: Option<String>,
|
||||
},
|
||||
#[serde(rename = "list-skills")]
|
||||
ListSkills,
|
||||
|
|
|
|||
|
|
@ -482,6 +482,8 @@ mod tests {
|
|||
capabilities: serde_json::json!(["rust"]),
|
||||
status: "idle".into(),
|
||||
created_at: "2026-01-01T00:00:00Z".into(),
|
||||
host: None,
|
||||
last_seen: None,
|
||||
},
|
||||
colibri_store::Agent {
|
||||
id: "a2".into(),
|
||||
|
|
@ -489,6 +491,8 @@ mod tests {
|
|||
capabilities: serde_json::json!(["python"]),
|
||||
status: "idle".into(),
|
||||
created_at: "2026-01-01T00:00:00Z".into(),
|
||||
host: None,
|
||||
last_seen: None,
|
||||
},
|
||||
];
|
||||
let picked = pick_agent(&required, &agents);
|
||||
|
|
@ -505,6 +509,8 @@ mod tests {
|
|||
capabilities: serde_json::json!(["rust"]),
|
||||
status: "offline".into(),
|
||||
created_at: "2026-01-01T00:00:00Z".into(),
|
||||
host: None,
|
||||
last_seen: None,
|
||||
}];
|
||||
assert!(pick_agent(&required, &agents).is_none());
|
||||
}
|
||||
|
|
@ -518,6 +524,8 @@ mod tests {
|
|||
capabilities: serde_json::json!(["linux"]),
|
||||
status: "idle".into(),
|
||||
created_at: "2026-01-01T00:00:00Z".into(),
|
||||
host: None,
|
||||
last_seen: None,
|
||||
}];
|
||||
assert!(pick_agent(&required, &agents).is_none());
|
||||
}
|
||||
|
|
@ -530,6 +538,8 @@ mod tests {
|
|||
capabilities: serde_json::json!([]),
|
||||
status: "idle".into(),
|
||||
created_at: "2026-01-01T00:00:00Z".into(),
|
||||
host: None,
|
||||
last_seen: None,
|
||||
}];
|
||||
assert_eq!(pick_agent(&[], &agents).unwrap().name, "generalist");
|
||||
}
|
||||
|
|
@ -544,6 +554,8 @@ mod tests {
|
|||
capabilities: serde_json::json!(["rust"]),
|
||||
status: "idle".into(),
|
||||
created_at: "2026-01-01T00:00:00Z".into(),
|
||||
host: None,
|
||||
last_seen: None,
|
||||
},
|
||||
colibri_store::Agent {
|
||||
id: "a2".into(),
|
||||
|
|
@ -551,6 +563,8 @@ mod tests {
|
|||
capabilities: serde_json::json!(["rust"]),
|
||||
status: "idle".into(),
|
||||
created_at: "2026-01-02T00:00:00Z".into(),
|
||||
host: None,
|
||||
last_seen: None,
|
||||
},
|
||||
];
|
||||
let picked = pick_agent(&required, &agents).unwrap();
|
||||
|
|
@ -570,6 +584,8 @@ mod tests {
|
|||
capabilities: serde_json::json!(["rust"]),
|
||||
status: "idle".into(),
|
||||
created_at: "2026-01-01T00:00:00Z".into(),
|
||||
host: None,
|
||||
last_seen: None,
|
||||
},
|
||||
colibri_store::Agent {
|
||||
id: "a2".into(),
|
||||
|
|
@ -577,6 +593,8 @@ mod tests {
|
|||
capabilities: serde_json::json!(["rust", "freebsd"]),
|
||||
status: "idle".into(),
|
||||
created_at: "2026-01-01T00:00:00Z".into(),
|
||||
host: None,
|
||||
last_seen: None,
|
||||
},
|
||||
];
|
||||
let picked = pick_agent(&required, &agents).unwrap();
|
||||
|
|
@ -595,6 +613,8 @@ mod tests {
|
|||
capabilities: serde_json::json!(["rust"]),
|
||||
status: "active".into(),
|
||||
created_at: "2026-01-01T00:00:00Z".into(),
|
||||
host: None,
|
||||
last_seen: None,
|
||||
}];
|
||||
let picked = pick_agent(&required, &agents);
|
||||
assert!(
|
||||
|
|
|
|||
|
|
@ -264,8 +264,11 @@ async fn dispatch(cmd: ColibriCommand, state: &SharedState) -> ColibriResponse {
|
|||
cmd_claim_task(state, task_id, agent_id).await
|
||||
}
|
||||
ColibriCommand::ListAgents => cmd_list_agents(state).await,
|
||||
ColibriCommand::RegisterAgent { name, capabilities } => {
|
||||
cmd_register_agent(state, name, capabilities, None).await
|
||||
ColibriCommand::RegisterAgent { name, capabilities, host } => {
|
||||
cmd_register_agent(state, name, capabilities, host.as_deref()).await
|
||||
}
|
||||
ColibriCommand::Heartbeat { agent_id, host } => {
|
||||
cmd_heartbeat(state, agent_id, host.as_deref()).await
|
||||
}
|
||||
ColibriCommand::ListSkills => cmd_list_skills(state).await,
|
||||
ColibriCommand::RegisterSkill {
|
||||
|
|
@ -1000,20 +1003,31 @@ async fn cmd_register_agent(
|
|||
state: &SharedState,
|
||||
name: String,
|
||||
capabilities: Option<serde_json::Value>,
|
||||
_host: Option<&str>,
|
||||
host: Option<&str>,
|
||||
) -> ColibriResponse {
|
||||
let caps = capabilities.unwrap_or(serde_json::json!([]));
|
||||
match state
|
||||
.store
|
||||
.lock()
|
||||
.unwrap()
|
||||
.register_agent(&name, caps, None)
|
||||
.register_agent(&name, caps, host)
|
||||
{
|
||||
Ok(agent) => ColibriResponse::ok(serde_json::to_value(agent).unwrap_or_default()),
|
||||
Err(e) => ColibriResponse::err(format!("register agent failed: {e}")),
|
||||
}
|
||||
}
|
||||
|
||||
async fn cmd_heartbeat(
|
||||
state: &SharedState,
|
||||
agent_id: String,
|
||||
host: Option<&str>,
|
||||
) -> ColibriResponse {
|
||||
match state.store.lock().unwrap().heartbeat(&agent_id, host) {
|
||||
Ok(()) => ColibriResponse::ok(serde_json::json!({"ok": true})),
|
||||
Err(e) => ColibriResponse::err(format!("heartbeat failed: {e}")),
|
||||
}
|
||||
}
|
||||
|
||||
async fn cmd_register_tenant(
|
||||
state: &SharedState,
|
||||
tenant_id: String,
|
||||
|
|
|
|||
|
|
@ -85,6 +85,10 @@ pub struct Agent {
|
|||
/// Agent status: `active`, `idle`, `offline`.
|
||||
pub status: String,
|
||||
pub created_at: String,
|
||||
/// Host the agent registered from (e.g. "osa", "domedog"). Added Phase 3.
|
||||
pub host: Option<String>,
|
||||
/// Last time this agent checked in (heartbeat or re-register). Phase 3.
|
||||
pub last_seen: Option<String>,
|
||||
}
|
||||
|
||||
/// A team-wide skill entry.
|
||||
|
|
@ -339,16 +343,16 @@ impl Store {
|
|||
&self,
|
||||
name: &str,
|
||||
capabilities: serde_json::Value,
|
||||
_host: Option<&str>, // Phase 3: agent presence — host registry (WIP)
|
||||
host: Option<&str>,
|
||||
) -> Result<Agent> {
|
||||
let id = uuid::Uuid::new_v4().to_string();
|
||||
let now = Utc::now().to_rfc3339();
|
||||
let caps_json = serde_json::to_string(&capabilities)?;
|
||||
|
||||
self.conn.execute(
|
||||
"INSERT INTO agents (id, name, capabilities, status, created_at)
|
||||
VALUES (?1, ?2, ?3, 'active', ?4)",
|
||||
params![id, name, caps_json, now],
|
||||
"INSERT INTO agents (id, name, capabilities, status, created_at, host, last_seen)
|
||||
VALUES (?1, ?2, ?3, 'active', ?4, ?5, ?6)",
|
||||
params![id, name, caps_json, now, host, now],
|
||||
)?;
|
||||
|
||||
Ok(Agent {
|
||||
|
|
@ -356,14 +360,32 @@ impl Store {
|
|||
name: name.to_string(),
|
||||
capabilities,
|
||||
status: "active".to_string(),
|
||||
created_at: now,
|
||||
created_at: now.clone(),
|
||||
host: host.map(String::from),
|
||||
last_seen: Some(now),
|
||||
})
|
||||
}
|
||||
|
||||
/// Update an agent's last_seen timestamp (heartbeat). Optionally update
|
||||
/// the host if the agent is re-registering from a different machine.
|
||||
pub fn heartbeat(&self, agent_id: &str, host: Option<&str>) -> Result<()> {
|
||||
let now = Utc::now().to_rfc3339();
|
||||
let rows = self.conn.execute(
|
||||
"UPDATE agents SET last_seen = ?1, host = COALESCE(?2, host) WHERE id = ?3",
|
||||
params![now, host, agent_id],
|
||||
)?;
|
||||
if rows == 0 {
|
||||
return Err(StoreError::NotFound(format!(
|
||||
"agent not found: {agent_id}"
|
||||
)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get an agent by ID.
|
||||
pub fn get_agent(&self, agent_id: &str) -> Result<Option<Agent>> {
|
||||
let mut stmt = self.conn.prepare(
|
||||
"SELECT id, name, capabilities, status, created_at FROM agents WHERE id = ?1",
|
||||
"SELECT id, name, capabilities, status, created_at, host, last_seen FROM agents WHERE id = ?1",
|
||||
)?;
|
||||
|
||||
let mut rows = stmt.query_map(params![agent_id], |row| {
|
||||
|
|
@ -374,6 +396,8 @@ impl Store {
|
|||
capabilities: serde_json::from_str(&caps_str).unwrap_or(serde_json::Value::Null),
|
||||
status: row.get(3)?,
|
||||
created_at: row.get(4)?,
|
||||
host: row.get(5)?,
|
||||
last_seen: row.get(6)?,
|
||||
})
|
||||
})?;
|
||||
|
||||
|
|
@ -387,7 +411,7 @@ impl Store {
|
|||
/// List all registered agents.
|
||||
pub fn list_agents(&self) -> Result<Vec<Agent>> {
|
||||
let mut stmt = self.conn.prepare(
|
||||
"SELECT id, name, capabilities, status, created_at FROM agents ORDER BY name",
|
||||
"SELECT id, name, capabilities, status, created_at, host, last_seen FROM agents ORDER BY name",
|
||||
)?;
|
||||
let rows = stmt.query_map([], |row| {
|
||||
let caps_str: String = row.get(2)?;
|
||||
|
|
@ -397,6 +421,8 @@ impl Store {
|
|||
capabilities: serde_json::from_str(&caps_str).unwrap_or(serde_json::Value::Null),
|
||||
status: row.get(3)?,
|
||||
created_at: row.get(4)?,
|
||||
host: row.get(5)?,
|
||||
last_seen: row.get(6)?,
|
||||
})
|
||||
})?;
|
||||
|
||||
|
|
@ -587,6 +613,12 @@ impl Store {
|
|||
|
||||
fn run_migrations(&self) -> Result<()> {
|
||||
self.conn.execute_batch(schema::SCHEMA_SQL)?;
|
||||
for migration in schema::MIGRATIONS {
|
||||
// Each migration runs in a try-block: if the column already
|
||||
// exists (re-open), SQLite returns "duplicate column" which we
|
||||
// swallow — the store stays idempotent across restarts.
|
||||
let _ = self.conn.execute_batch(migration);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -62,7 +62,6 @@ CREATE INDEX IF NOT EXISTS idx_tenants_status ON tenants(status);
|
|||
|
||||
/// Column additions since the initial schema. Each runs inside a try-block
|
||||
/// so the store stays idempotent: adding a column that already exists is a
|
||||
#[allow(dead_code)] // Phase 3 WIP — schema not yet wired
|
||||
pub const MIGRATIONS: &[&str] = &[
|
||||
"ALTER TABLE agents ADD COLUMN host TEXT",
|
||||
"ALTER TABLE agents ADD COLUMN last_seen TEXT",
|
||||
|
|
|
|||
|
|
@ -1,12 +1,12 @@
|
|||
# Multi-Agent Multi-Host — Gap Analysis & Implementation Plan
|
||||
|
||||
**Created:** 19.jun.2026 (Sam & Hermes)
|
||||
**Updated:** 25.jun.2026 (Sam & Claude) — reflects 0.12.0 release; Phases 1 + 2 complete
|
||||
**Status:** Phases 1 + 2 complete; Phase 3 (agent presence schema) deferred
|
||||
**Updated:** 26.jun.2026 (Sam & Claude) — Phase 3 schema landed; bridge packaging + firewall live
|
||||
**Status:** Phases 1 + 2 complete; Phase 3 schema in, agent-presence logic + heartbeat/lease pending; Phase 5 bridge ready
|
||||
|
||||
## Context
|
||||
|
||||
Colibri 0.12.0 is released (MIT license, 258 tests, FreeBSD port + CI running).
|
||||
Colibri 0.12.0 is released (MIT license, 256 tests, FreeBSD port + CI running).
|
||||
The tenant/vault provision chain has landed (`register-tenant` → jail spawn →
|
||||
`provision_tenant_env()` → `colibri-vault::provision`). The next milestone is
|
||||
proving the multi-agent, multi-host coordination model: multiple agents on
|
||||
|
|
@ -35,11 +35,12 @@ The multi-host stack lives **outside the Rust daemon**:
|
|||
- **Transport:** `tokio::net::UnixListener` only — zero TCP in Rust. The socat
|
||||
bridge is a shell-level relay.
|
||||
- **Agent model:** `register-agent` stores name + capabilities + status
|
||||
(`active`/`idle`/`offline`). Awaiting `host` field, `last_seen`, heartbeat,
|
||||
and lease/TTL (Phase 3).
|
||||
(`active`/`idle`/`offline`). `host` and `last_seen` columns landed
|
||||
(Phase 3 schema, PR #204); the `_host` arg is still ignored in the handler
|
||||
— wiring + heartbeat/lease/TTL pending.
|
||||
- **Task assignment:** `pick_agent()` matches by capability score (partial
|
||||
match counts, highest score wins, tie → later-in-slice). `claim_task()` is a
|
||||
blind UPDATE; await a concurrency guard (Gap 4).
|
||||
match counts, highest score wins, tie → later-in-slice). `claim_task()` is
|
||||
atomic (gated on `status = 'queued'`); Gap 4 closed (PR #190).
|
||||
- **Polling:** `colibri_poll.py` queries `list-tasks status=started` filtered
|
||||
by `agent_id`. `colibri_task_done.py` calls `transition-task`.
|
||||
- **Spawning:** `poll_tasks()` in daemon.rs spawns agents for `Claimed` tasks,
|
||||
|
|
@ -84,7 +85,7 @@ and `set-cost-mode` were added in Phase 2b (PR #138).
|
|||
|
||||
| # | Gap | Severity | Linux-doable? |
|
||||
| --- | ------------------------------------------------------------------------------------------------------------------------------------------------- | -------- | -------------------------------- |
|
||||
| 3 | **Agent presence model** — await `host`, `last_seen`, and heartbeat/lease columns to detect stale remote agents (Phase 3) | High | Yes (schema change) |
|
||||
| 3 | **Agent presence wiring** — `host` and `last_seen` columns landed (Phase 3 schema, PR #204); `_host` arg still ignored, heartbeat/lease/TTL pending | High | Yes (follow-up PR) |
|
||||
| 5 | **Python polling scripts** — `colibri_poll.py` and `colibri_task_done.py` have zero test coverage | Medium | Yes |
|
||||
| 6 | **TCP bridge round-trip** — socat bridge untested end-to-end | Medium | Partial (needs socat or FreeBSD) |
|
||||
| 7 | **Cross-host coordination** — await a test simulating a remote agent claiming/transitioning a task over the bridge | High | FreeBSD only |
|
||||
|
|
@ -198,14 +199,16 @@ Parse tests added: `parses_claim_task`, `parses_transition_task`,
|
|||
`parses_set_cost_mode`, `rejects_claim_task_missing_flags`,
|
||||
`rejects_transition_task_missing_flags`, `rejects_set_cost_mode_without_arg`.
|
||||
|
||||
### Phase 3: Agent presence schema (deferred)
|
||||
### Phase 3: Agent presence schema (schema landed, logic pending)
|
||||
|
||||
Add `host` and `last_seen` columns to the agents table. Update `register-agent`
|
||||
to accept an optional `host` parameter and update `last_seen` on each call. Add
|
||||
a `heartbeat` socket command for liveness. Enables detecting stale remote agents.
|
||||
|
||||
**Deferred** — requires schema migration and broader design discussion about
|
||||
lease semantics. Not blocking the multi-agent test coverage goal.
|
||||
**Schema landed (PR #204).** `MIGRATIONS` adds `host TEXT` and `last_seen TEXT`
|
||||
columns idempotently. The `_host` arg is accepted but ignored in the handler —
|
||||
agent presence is not functional yet. Heartbeat dispatch, host wiring, and
|
||||
lease/TTL semantics remain open.
|
||||
|
||||
### Phase 4: Polling workflow integration test (deferred)
|
||||
|
||||
|
|
@ -266,9 +269,12 @@ on a *different* host, entirely over the Tailscale bridge — the same routing t
|
|||
|
||||
**Security:** bind to the tailnet interface only and scope the `pf` rule to
|
||||
`tailscale0`. Use placeholder tailnet addresses in any committed notes — never
|
||||
paste real `100.x` IPs into git. (The shipped `colibri_bridge.in` currently
|
||||
hardcodes a real default `listen_addr`; that should be scrubbed to a placeholder
|
||||
or required-via-rc.conf separately.)
|
||||
paste real `100.x` IPs into git. **Done (PR #204):** `colibri_bridge.in`
|
||||
default listen_addr is now `TAILSCALE_IP_REQUIRED` with a prestart guard
|
||||
that fails loud if unconfigured. Linux bridge packaging landed (PR #203 —
|
||||
systemd unit, nft rules, env example). Firewall rules live: pf rule on OSA
|
||||
(port 9190, tailscale0 only), ufw rule on domedog (same). Health/status
|
||||
functions unscrambled (PR #204).
|
||||
|
||||
---
|
||||
|
||||
|
|
@ -282,9 +288,10 @@ or required-via-rc.conf separately.)
|
|||
| 2a | Merge `feat/cli-register-agent` | `colibri.rs` + `lib.rs` | Yes | **Complete** (PR #107) |
|
||||
| 2b | Add `claim-task` + `transition-task` + `set-cost-mode` CLI | `colibri.rs` + `lib.rs` | Yes | **Complete** (PR #138) |
|
||||
| 2c | CLI parse tests | `colibri.rs` tests | Yes | **Complete** (PR #138) |
|
||||
| 3 | Agent presence schema | `schema.rs` + `lib.rs` + `socket.rs` | Yes | Deferred |
|
||||
| 3 | Agent presence schema (WIP) | `schema.rs` + `lib.rs` + `socket.rs` | Yes | Schema in (PR #204); wiring + heartbeat/lease pending |
|
||||
| 4 | Polling workflow test | `tests/` | Yes | Deferred |
|
||||
| 5 | TCP bridge validation | FreeBSD host | No | FreeBSD lane |
|
||||
| — | Bridge packaging (FreeBSD + Linux) | `packaging/freebsd/` + `linux/` | Yes | **Complete** (PR #203, #204) |
|
||||
| — | Firewall rules (pf + ufw) | OSA + domedog | Both | **Live** |
|
||||
|
||||
**Phases 1 + 2 complete.** Next scope: Phase 3 (agent presence schema) or
|
||||
Phase 5 (FreeBSD bridge validation).
|
||||
**Phases 1 + 2 complete. Phase 3 schema in (wiring pending). Phase 5 bridge packaging + firewall live — operational validation next.**
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue