From 4e509c3e37d3ef49b4d032f4fe1c9532df8d8d15 Mon Sep 17 00:00:00 2001 From: Sam & Claude Date: Fri, 26 Jun 2026 01:49:46 +0200 Subject: [PATCH 1/2] docs(plan): refresh MULTI-AGENT-HOST-PLAN for current state MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 3 schema landed (PR #204) — columns exist, wiring pending. Bridge IP scrubbed, health/status unscrambled. Linux packaging added (PR #203). Firewall rules live (pf OSA + ufw domedog). Gap 4 (claim_task atomicity) marked closed. Test count: 256. Accurate status: Phase 3 is schema-in/logic-pending (not 'deferred' and not 'done'). Heartbeat/lease/TTL remain open. --- docs/MULTI-AGENT-HOST-PLAN.md | 41 ++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/docs/MULTI-AGENT-HOST-PLAN.md b/docs/MULTI-AGENT-HOST-PLAN.md index 00a97a6..e4c0f6c 100644 --- a/docs/MULTI-AGENT-HOST-PLAN.md +++ b/docs/MULTI-AGENT-HOST-PLAN.md @@ -1,12 +1,12 @@ # Multi-Agent Multi-Host — Gap Analysis & Implementation Plan **Created:** 19.jun.2026 (Sam & Hermes) -**Updated:** 25.jun.2026 (Sam & Claude) — reflects 0.12.0 release; Phases 1 + 2 complete -**Status:** Phases 1 + 2 complete; Phase 3 (agent presence schema) deferred +**Updated:** 26.jun.2026 (Sam & Claude) — Phase 3 schema landed; bridge packaging + firewall live +**Status:** Phases 1 + 2 complete; Phase 3 schema in, agent-presence logic + heartbeat/lease pending; Phase 5 bridge ready ## Context -Colibri 0.12.0 is released (MIT license, 258 tests, FreeBSD port + CI running). +Colibri 0.12.0 is released (MIT license, 256 tests, FreeBSD port + CI running). The tenant/vault provision chain has landed (`register-tenant` → jail spawn → `provision_tenant_env()` → `colibri-vault::provision`). The next milestone is proving the multi-agent, multi-host coordination model: multiple agents on @@ -35,11 +35,12 @@ The multi-host stack lives **outside the Rust daemon**: - **Transport:** `tokio::net::UnixListener` only — zero TCP in Rust. The socat bridge is a shell-level relay. - **Agent model:** `register-agent` stores name + capabilities + status - (`active`/`idle`/`offline`). Awaiting `host` field, `last_seen`, heartbeat, - and lease/TTL (Phase 3). + (`active`/`idle`/`offline`). `host` and `last_seen` columns landed + (Phase 3 schema, PR #204); the `_host` arg is still ignored in the handler + — wiring + heartbeat/lease/TTL pending. - **Task assignment:** `pick_agent()` matches by capability score (partial - match counts, highest score wins, tie → later-in-slice). `claim_task()` is a - blind UPDATE; await a concurrency guard (Gap 4). + match counts, highest score wins, tie → later-in-slice). `claim_task()` is + atomic (gated on `status = 'queued'`); Gap 4 closed (PR #190). - **Polling:** `colibri_poll.py` queries `list-tasks status=started` filtered by `agent_id`. `colibri_task_done.py` calls `transition-task`. - **Spawning:** `poll_tasks()` in daemon.rs spawns agents for `Claimed` tasks, @@ -84,7 +85,7 @@ and `set-cost-mode` were added in Phase 2b (PR #138). | # | Gap | Severity | Linux-doable? | | --- | ------------------------------------------------------------------------------------------------------------------------------------------------- | -------- | -------------------------------- | -| 3 | **Agent presence model** — await `host`, `last_seen`, and heartbeat/lease columns to detect stale remote agents (Phase 3) | High | Yes (schema change) | +| 3 | **Agent presence wiring** — `host` and `last_seen` columns landed (Phase 3 schema, PR #204); `_host` arg still ignored, heartbeat/lease/TTL pending | High | Yes (follow-up PR) | | 5 | **Python polling scripts** — `colibri_poll.py` and `colibri_task_done.py` have zero test coverage | Medium | Yes | | 6 | **TCP bridge round-trip** — socat bridge untested end-to-end | Medium | Partial (needs socat or FreeBSD) | | 7 | **Cross-host coordination** — await a test simulating a remote agent claiming/transitioning a task over the bridge | High | FreeBSD only | @@ -198,14 +199,16 @@ Parse tests added: `parses_claim_task`, `parses_transition_task`, `parses_set_cost_mode`, `rejects_claim_task_missing_flags`, `rejects_transition_task_missing_flags`, `rejects_set_cost_mode_without_arg`. -### Phase 3: Agent presence schema (deferred) +### Phase 3: Agent presence schema (schema landed, logic pending) Add `host` and `last_seen` columns to the agents table. Update `register-agent` to accept an optional `host` parameter and update `last_seen` on each call. Add a `heartbeat` socket command for liveness. Enables detecting stale remote agents. -**Deferred** — requires schema migration and broader design discussion about -lease semantics. Not blocking the multi-agent test coverage goal. +**Schema landed (PR #204).** `MIGRATIONS` adds `host TEXT` and `last_seen TEXT` +columns idempotently. The `_host` arg is accepted but ignored in the handler — +agent presence is not functional yet. Heartbeat dispatch, host wiring, and +lease/TTL semantics remain open. ### Phase 4: Polling workflow integration test (deferred) @@ -266,9 +269,12 @@ on a *different* host, entirely over the Tailscale bridge — the same routing t **Security:** bind to the tailnet interface only and scope the `pf` rule to `tailscale0`. Use placeholder tailnet addresses in any committed notes — never -paste real `100.x` IPs into git. (The shipped `colibri_bridge.in` currently -hardcodes a real default `listen_addr`; that should be scrubbed to a placeholder -or required-via-rc.conf separately.) +paste real `100.x` IPs into git. **Done (PR #204):** `colibri_bridge.in` +default listen_addr is now `TAILSCALE_IP_REQUIRED` with a prestart guard +that fails loud if unconfigured. Linux bridge packaging landed (PR #203 — +systemd unit, nft rules, env example). Firewall rules live: pf rule on OSA +(port 9190, tailscale0 only), ufw rule on domedog (same). Health/status +functions unscrambled (PR #204). --- @@ -282,9 +288,10 @@ or required-via-rc.conf separately.) | 2a | Merge `feat/cli-register-agent` | `colibri.rs` + `lib.rs` | Yes | **Complete** (PR #107) | | 2b | Add `claim-task` + `transition-task` + `set-cost-mode` CLI | `colibri.rs` + `lib.rs` | Yes | **Complete** (PR #138) | | 2c | CLI parse tests | `colibri.rs` tests | Yes | **Complete** (PR #138) | -| 3 | Agent presence schema | `schema.rs` + `lib.rs` + `socket.rs` | Yes | Deferred | +| 3 | Agent presence schema (WIP) | `schema.rs` + `lib.rs` + `socket.rs` | Yes | Schema in (PR #204); wiring + heartbeat/lease pending | | 4 | Polling workflow test | `tests/` | Yes | Deferred | | 5 | TCP bridge validation | FreeBSD host | No | FreeBSD lane | +| — | Bridge packaging (FreeBSD + Linux) | `packaging/freebsd/` + `linux/` | Yes | **Complete** (PR #203, #204) | +| — | Firewall rules (pf + ufw) | OSA + domedog | Both | **Live** | -**Phases 1 + 2 complete.** Next scope: Phase 3 (agent presence schema) or -Phase 5 (FreeBSD bridge validation). +**Phases 1 + 2 complete. Phase 3 schema in (wiring pending). Phase 5 bridge packaging + firewall live — operational validation next.** -- 2.45.3 From f7d29b6257cab1a2d67632d695fdef8eefdfd3f9 Mon Sep 17 00:00:00 2001 From: Sam & Claude Date: Fri, 26 Jun 2026 02:00:26 +0200 Subject: [PATCH 2/2] =?UTF-8?q?feat(store):=20wire=20Phase=203=20agent=20p?= =?UTF-8?q?resence=20=E2=80=94=20host,=20heartbeat,=20last=5Fseen?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Agent struct now carries host (where the agent registered from) and last_seen (last heartbeat/re-register timestamp). The SQL columns existed since the Phase 3 schema commit but were never read or written — Rust ignored them. Changes: - Agent struct: +host: Option, +last_seen: Option - register_agent: un-underscore host arg, INSERT/read it - get_agent / list_agents: SELECT host + last_seen columns - Store::heartbeat(): UPDATE last_seen + COALESCE host - ColibriCommand: +Heartbeat variant, +host on RegisterAgent - cmd_heartbeat: socket dispatch → store heartbeat - run_migrations: actually execute MIGRATIONS (was defined but never run) - All test Agent constructions updated What's still open per the plan: lease/TTL semantics, daemon heartbeat tick calling the store, offline detection for stale agents. Those are follow-up slices — the schema + wiring foundation is in. --- crates/colibri-client/src/lib.rs | 1 + crates/colibri-daemon/src/lib.rs | 10 ++++++ crates/colibri-daemon/src/scheduler.rs | 20 +++++++++++ crates/colibri-daemon/src/socket.rs | 22 +++++++++--- crates/colibri-store/src/lib.rs | 46 ++++++++++++++++++++++---- crates/colibri-store/src/schema.rs | 1 - 6 files changed, 88 insertions(+), 12 deletions(-) diff --git a/crates/colibri-client/src/lib.rs b/crates/colibri-client/src/lib.rs index 9ae05b8..5bbea2e 100644 --- a/crates/colibri-client/src/lib.rs +++ b/crates/colibri-client/src/lib.rs @@ -240,6 +240,7 @@ impl DaemonClient { self.request(&ColibriCommand::RegisterAgent { name: name.into(), capabilities: Some(serde_json::to_value(capabilities).unwrap_or_default()), + host: None, }) .await } diff --git a/crates/colibri-daemon/src/lib.rs b/crates/colibri-daemon/src/lib.rs index e8bce0e..b6fc8ac 100644 --- a/crates/colibri-daemon/src/lib.rs +++ b/crates/colibri-daemon/src/lib.rs @@ -75,6 +75,16 @@ pub enum ColibriCommand { RegisterAgent { name: String, capabilities: Option, + /// Host the agent is registering from (e.g. "osa", "domedog"). Phase 3. + #[serde(default)] + host: Option, + }, + #[serde(rename = "heartbeat")] + Heartbeat { + agent_id: String, + /// Optional host update if the agent moved machines. Phase 3. + #[serde(default)] + host: Option, }, #[serde(rename = "list-skills")] ListSkills, diff --git a/crates/colibri-daemon/src/scheduler.rs b/crates/colibri-daemon/src/scheduler.rs index 7b39d9a..7364a2b 100644 --- a/crates/colibri-daemon/src/scheduler.rs +++ b/crates/colibri-daemon/src/scheduler.rs @@ -482,6 +482,8 @@ mod tests { capabilities: serde_json::json!(["rust"]), status: "idle".into(), created_at: "2026-01-01T00:00:00Z".into(), + host: None, + last_seen: None, }, colibri_store::Agent { id: "a2".into(), @@ -489,6 +491,8 @@ mod tests { capabilities: serde_json::json!(["python"]), status: "idle".into(), created_at: "2026-01-01T00:00:00Z".into(), + host: None, + last_seen: None, }, ]; let picked = pick_agent(&required, &agents); @@ -505,6 +509,8 @@ mod tests { capabilities: serde_json::json!(["rust"]), status: "offline".into(), created_at: "2026-01-01T00:00:00Z".into(), + host: None, + last_seen: None, }]; assert!(pick_agent(&required, &agents).is_none()); } @@ -518,6 +524,8 @@ mod tests { capabilities: serde_json::json!(["linux"]), status: "idle".into(), created_at: "2026-01-01T00:00:00Z".into(), + host: None, + last_seen: None, }]; assert!(pick_agent(&required, &agents).is_none()); } @@ -530,6 +538,8 @@ mod tests { capabilities: serde_json::json!([]), status: "idle".into(), created_at: "2026-01-01T00:00:00Z".into(), + host: None, + last_seen: None, }]; assert_eq!(pick_agent(&[], &agents).unwrap().name, "generalist"); } @@ -544,6 +554,8 @@ mod tests { capabilities: serde_json::json!(["rust"]), status: "idle".into(), created_at: "2026-01-01T00:00:00Z".into(), + host: None, + last_seen: None, }, colibri_store::Agent { id: "a2".into(), @@ -551,6 +563,8 @@ mod tests { capabilities: serde_json::json!(["rust"]), status: "idle".into(), created_at: "2026-01-02T00:00:00Z".into(), + host: None, + last_seen: None, }, ]; let picked = pick_agent(&required, &agents).unwrap(); @@ -570,6 +584,8 @@ mod tests { capabilities: serde_json::json!(["rust"]), status: "idle".into(), created_at: "2026-01-01T00:00:00Z".into(), + host: None, + last_seen: None, }, colibri_store::Agent { id: "a2".into(), @@ -577,6 +593,8 @@ mod tests { capabilities: serde_json::json!(["rust", "freebsd"]), status: "idle".into(), created_at: "2026-01-01T00:00:00Z".into(), + host: None, + last_seen: None, }, ]; let picked = pick_agent(&required, &agents).unwrap(); @@ -595,6 +613,8 @@ mod tests { capabilities: serde_json::json!(["rust"]), status: "active".into(), created_at: "2026-01-01T00:00:00Z".into(), + host: None, + last_seen: None, }]; let picked = pick_agent(&required, &agents); assert!( diff --git a/crates/colibri-daemon/src/socket.rs b/crates/colibri-daemon/src/socket.rs index 7ad3d33..2284f4b 100644 --- a/crates/colibri-daemon/src/socket.rs +++ b/crates/colibri-daemon/src/socket.rs @@ -264,8 +264,11 @@ async fn dispatch(cmd: ColibriCommand, state: &SharedState) -> ColibriResponse { cmd_claim_task(state, task_id, agent_id).await } ColibriCommand::ListAgents => cmd_list_agents(state).await, - ColibriCommand::RegisterAgent { name, capabilities } => { - cmd_register_agent(state, name, capabilities, None).await + ColibriCommand::RegisterAgent { name, capabilities, host } => { + cmd_register_agent(state, name, capabilities, host.as_deref()).await + } + ColibriCommand::Heartbeat { agent_id, host } => { + cmd_heartbeat(state, agent_id, host.as_deref()).await } ColibriCommand::ListSkills => cmd_list_skills(state).await, ColibriCommand::RegisterSkill { @@ -1000,20 +1003,31 @@ async fn cmd_register_agent( state: &SharedState, name: String, capabilities: Option, - _host: Option<&str>, + host: Option<&str>, ) -> ColibriResponse { let caps = capabilities.unwrap_or(serde_json::json!([])); match state .store .lock() .unwrap() - .register_agent(&name, caps, None) + .register_agent(&name, caps, host) { Ok(agent) => ColibriResponse::ok(serde_json::to_value(agent).unwrap_or_default()), Err(e) => ColibriResponse::err(format!("register agent failed: {e}")), } } +async fn cmd_heartbeat( + state: &SharedState, + agent_id: String, + host: Option<&str>, +) -> ColibriResponse { + match state.store.lock().unwrap().heartbeat(&agent_id, host) { + Ok(()) => ColibriResponse::ok(serde_json::json!({"ok": true})), + Err(e) => ColibriResponse::err(format!("heartbeat failed: {e}")), + } +} + async fn cmd_register_tenant( state: &SharedState, tenant_id: String, diff --git a/crates/colibri-store/src/lib.rs b/crates/colibri-store/src/lib.rs index a0e074c..ea0fd87 100644 --- a/crates/colibri-store/src/lib.rs +++ b/crates/colibri-store/src/lib.rs @@ -85,6 +85,10 @@ pub struct Agent { /// Agent status: `active`, `idle`, `offline`. pub status: String, pub created_at: String, + /// Host the agent registered from (e.g. "osa", "domedog"). Added Phase 3. + pub host: Option, + /// Last time this agent checked in (heartbeat or re-register). Phase 3. + pub last_seen: Option, } /// A team-wide skill entry. @@ -339,16 +343,16 @@ impl Store { &self, name: &str, capabilities: serde_json::Value, - _host: Option<&str>, // Phase 3: agent presence — host registry (WIP) + host: Option<&str>, ) -> Result { let id = uuid::Uuid::new_v4().to_string(); let now = Utc::now().to_rfc3339(); let caps_json = serde_json::to_string(&capabilities)?; self.conn.execute( - "INSERT INTO agents (id, name, capabilities, status, created_at) - VALUES (?1, ?2, ?3, 'active', ?4)", - params![id, name, caps_json, now], + "INSERT INTO agents (id, name, capabilities, status, created_at, host, last_seen) + VALUES (?1, ?2, ?3, 'active', ?4, ?5, ?6)", + params![id, name, caps_json, now, host, now], )?; Ok(Agent { @@ -356,14 +360,32 @@ impl Store { name: name.to_string(), capabilities, status: "active".to_string(), - created_at: now, + created_at: now.clone(), + host: host.map(String::from), + last_seen: Some(now), }) } + /// Update an agent's last_seen timestamp (heartbeat). Optionally update + /// the host if the agent is re-registering from a different machine. + pub fn heartbeat(&self, agent_id: &str, host: Option<&str>) -> Result<()> { + let now = Utc::now().to_rfc3339(); + let rows = self.conn.execute( + "UPDATE agents SET last_seen = ?1, host = COALESCE(?2, host) WHERE id = ?3", + params![now, host, agent_id], + )?; + if rows == 0 { + return Err(StoreError::NotFound(format!( + "agent not found: {agent_id}" + ))); + } + Ok(()) + } + /// Get an agent by ID. pub fn get_agent(&self, agent_id: &str) -> Result> { let mut stmt = self.conn.prepare( - "SELECT id, name, capabilities, status, created_at FROM agents WHERE id = ?1", + "SELECT id, name, capabilities, status, created_at, host, last_seen FROM agents WHERE id = ?1", )?; let mut rows = stmt.query_map(params![agent_id], |row| { @@ -374,6 +396,8 @@ impl Store { capabilities: serde_json::from_str(&caps_str).unwrap_or(serde_json::Value::Null), status: row.get(3)?, created_at: row.get(4)?, + host: row.get(5)?, + last_seen: row.get(6)?, }) })?; @@ -387,7 +411,7 @@ impl Store { /// List all registered agents. pub fn list_agents(&self) -> Result> { let mut stmt = self.conn.prepare( - "SELECT id, name, capabilities, status, created_at FROM agents ORDER BY name", + "SELECT id, name, capabilities, status, created_at, host, last_seen FROM agents ORDER BY name", )?; let rows = stmt.query_map([], |row| { let caps_str: String = row.get(2)?; @@ -397,6 +421,8 @@ impl Store { capabilities: serde_json::from_str(&caps_str).unwrap_or(serde_json::Value::Null), status: row.get(3)?, created_at: row.get(4)?, + host: row.get(5)?, + last_seen: row.get(6)?, }) })?; @@ -587,6 +613,12 @@ impl Store { fn run_migrations(&self) -> Result<()> { self.conn.execute_batch(schema::SCHEMA_SQL)?; + for migration in schema::MIGRATIONS { + // Each migration runs in a try-block: if the column already + // exists (re-open), SQLite returns "duplicate column" which we + // swallow — the store stays idempotent across restarts. + let _ = self.conn.execute_batch(migration); + } Ok(()) } } diff --git a/crates/colibri-store/src/schema.rs b/crates/colibri-store/src/schema.rs index 438b300..4f96cc0 100644 --- a/crates/colibri-store/src/schema.rs +++ b/crates/colibri-store/src/schema.rs @@ -62,7 +62,6 @@ CREATE INDEX IF NOT EXISTS idx_tenants_status ON tenants(status); /// Column additions since the initial schema. Each runs inside a try-block /// so the store stays idempotent: adding a column that already exists is a -#[allow(dead_code)] // Phase 3 WIP — schema not yet wired pub const MIGRATIONS: &[&str] = &[ "ALTER TABLE agents ADD COLUMN host TEXT", "ALTER TABLE agents ADD COLUMN last_seen TEXT", -- 2.45.3