diff --git a/crates/colibri-client/src/lib.rs b/crates/colibri-client/src/lib.rs index 9ae05b8..5bbea2e 100644 --- a/crates/colibri-client/src/lib.rs +++ b/crates/colibri-client/src/lib.rs @@ -240,6 +240,7 @@ impl DaemonClient { self.request(&ColibriCommand::RegisterAgent { name: name.into(), capabilities: Some(serde_json::to_value(capabilities).unwrap_or_default()), + host: None, }) .await } diff --git a/crates/colibri-daemon/src/lib.rs b/crates/colibri-daemon/src/lib.rs index e8bce0e..b6fc8ac 100644 --- a/crates/colibri-daemon/src/lib.rs +++ b/crates/colibri-daemon/src/lib.rs @@ -75,6 +75,16 @@ pub enum ColibriCommand { RegisterAgent { name: String, capabilities: Option, + /// Host the agent is registering from (e.g. "osa", "domedog"). Phase 3. + #[serde(default)] + host: Option, + }, + #[serde(rename = "heartbeat")] + Heartbeat { + agent_id: String, + /// Optional host update if the agent moved machines. Phase 3. + #[serde(default)] + host: Option, }, #[serde(rename = "list-skills")] ListSkills, diff --git a/crates/colibri-daemon/src/scheduler.rs b/crates/colibri-daemon/src/scheduler.rs index 7b39d9a..7364a2b 100644 --- a/crates/colibri-daemon/src/scheduler.rs +++ b/crates/colibri-daemon/src/scheduler.rs @@ -482,6 +482,8 @@ mod tests { capabilities: serde_json::json!(["rust"]), status: "idle".into(), created_at: "2026-01-01T00:00:00Z".into(), + host: None, + last_seen: None, }, colibri_store::Agent { id: "a2".into(), @@ -489,6 +491,8 @@ mod tests { capabilities: serde_json::json!(["python"]), status: "idle".into(), created_at: "2026-01-01T00:00:00Z".into(), + host: None, + last_seen: None, }, ]; let picked = pick_agent(&required, &agents); @@ -505,6 +509,8 @@ mod tests { capabilities: serde_json::json!(["rust"]), status: "offline".into(), created_at: "2026-01-01T00:00:00Z".into(), + host: None, + last_seen: None, }]; assert!(pick_agent(&required, &agents).is_none()); } @@ -518,6 +524,8 @@ mod tests { capabilities: serde_json::json!(["linux"]), status: "idle".into(), created_at: "2026-01-01T00:00:00Z".into(), + host: None, + last_seen: None, }]; assert!(pick_agent(&required, &agents).is_none()); } @@ -530,6 +538,8 @@ mod tests { capabilities: serde_json::json!([]), status: "idle".into(), created_at: "2026-01-01T00:00:00Z".into(), + host: None, + last_seen: None, }]; assert_eq!(pick_agent(&[], &agents).unwrap().name, "generalist"); } @@ -544,6 +554,8 @@ mod tests { capabilities: serde_json::json!(["rust"]), status: "idle".into(), created_at: "2026-01-01T00:00:00Z".into(), + host: None, + last_seen: None, }, colibri_store::Agent { id: "a2".into(), @@ -551,6 +563,8 @@ mod tests { capabilities: serde_json::json!(["rust"]), status: "idle".into(), created_at: "2026-01-02T00:00:00Z".into(), + host: None, + last_seen: None, }, ]; let picked = pick_agent(&required, &agents).unwrap(); @@ -570,6 +584,8 @@ mod tests { capabilities: serde_json::json!(["rust"]), status: "idle".into(), created_at: "2026-01-01T00:00:00Z".into(), + host: None, + last_seen: None, }, colibri_store::Agent { id: "a2".into(), @@ -577,6 +593,8 @@ mod tests { capabilities: serde_json::json!(["rust", "freebsd"]), status: "idle".into(), created_at: "2026-01-01T00:00:00Z".into(), + host: None, + last_seen: None, }, ]; let picked = pick_agent(&required, &agents).unwrap(); @@ -595,6 +613,8 @@ mod tests { capabilities: serde_json::json!(["rust"]), status: "active".into(), created_at: "2026-01-01T00:00:00Z".into(), + host: None, + last_seen: None, }]; let picked = pick_agent(&required, &agents); assert!( diff --git a/crates/colibri-daemon/src/socket.rs b/crates/colibri-daemon/src/socket.rs index 7ad3d33..2284f4b 100644 --- a/crates/colibri-daemon/src/socket.rs +++ b/crates/colibri-daemon/src/socket.rs @@ -264,8 +264,11 @@ async fn dispatch(cmd: ColibriCommand, state: &SharedState) -> ColibriResponse { cmd_claim_task(state, task_id, agent_id).await } ColibriCommand::ListAgents => cmd_list_agents(state).await, - ColibriCommand::RegisterAgent { name, capabilities } => { - cmd_register_agent(state, name, capabilities, None).await + ColibriCommand::RegisterAgent { name, capabilities, host } => { + cmd_register_agent(state, name, capabilities, host.as_deref()).await + } + ColibriCommand::Heartbeat { agent_id, host } => { + cmd_heartbeat(state, agent_id, host.as_deref()).await } ColibriCommand::ListSkills => cmd_list_skills(state).await, ColibriCommand::RegisterSkill { @@ -1000,20 +1003,31 @@ async fn cmd_register_agent( state: &SharedState, name: String, capabilities: Option, - _host: Option<&str>, + host: Option<&str>, ) -> ColibriResponse { let caps = capabilities.unwrap_or(serde_json::json!([])); match state .store .lock() .unwrap() - .register_agent(&name, caps, None) + .register_agent(&name, caps, host) { Ok(agent) => ColibriResponse::ok(serde_json::to_value(agent).unwrap_or_default()), Err(e) => ColibriResponse::err(format!("register agent failed: {e}")), } } +async fn cmd_heartbeat( + state: &SharedState, + agent_id: String, + host: Option<&str>, +) -> ColibriResponse { + match state.store.lock().unwrap().heartbeat(&agent_id, host) { + Ok(()) => ColibriResponse::ok(serde_json::json!({"ok": true})), + Err(e) => ColibriResponse::err(format!("heartbeat failed: {e}")), + } +} + async fn cmd_register_tenant( state: &SharedState, tenant_id: String, diff --git a/crates/colibri-store/src/lib.rs b/crates/colibri-store/src/lib.rs index a0e074c..ea0fd87 100644 --- a/crates/colibri-store/src/lib.rs +++ b/crates/colibri-store/src/lib.rs @@ -85,6 +85,10 @@ pub struct Agent { /// Agent status: `active`, `idle`, `offline`. pub status: String, pub created_at: String, + /// Host the agent registered from (e.g. "osa", "domedog"). Added Phase 3. + pub host: Option, + /// Last time this agent checked in (heartbeat or re-register). Phase 3. + pub last_seen: Option, } /// A team-wide skill entry. @@ -339,16 +343,16 @@ impl Store { &self, name: &str, capabilities: serde_json::Value, - _host: Option<&str>, // Phase 3: agent presence — host registry (WIP) + host: Option<&str>, ) -> Result { let id = uuid::Uuid::new_v4().to_string(); let now = Utc::now().to_rfc3339(); let caps_json = serde_json::to_string(&capabilities)?; self.conn.execute( - "INSERT INTO agents (id, name, capabilities, status, created_at) - VALUES (?1, ?2, ?3, 'active', ?4)", - params![id, name, caps_json, now], + "INSERT INTO agents (id, name, capabilities, status, created_at, host, last_seen) + VALUES (?1, ?2, ?3, 'active', ?4, ?5, ?6)", + params![id, name, caps_json, now, host, now], )?; Ok(Agent { @@ -356,14 +360,32 @@ impl Store { name: name.to_string(), capabilities, status: "active".to_string(), - created_at: now, + created_at: now.clone(), + host: host.map(String::from), + last_seen: Some(now), }) } + /// Update an agent's last_seen timestamp (heartbeat). Optionally update + /// the host if the agent is re-registering from a different machine. + pub fn heartbeat(&self, agent_id: &str, host: Option<&str>) -> Result<()> { + let now = Utc::now().to_rfc3339(); + let rows = self.conn.execute( + "UPDATE agents SET last_seen = ?1, host = COALESCE(?2, host) WHERE id = ?3", + params![now, host, agent_id], + )?; + if rows == 0 { + return Err(StoreError::NotFound(format!( + "agent not found: {agent_id}" + ))); + } + Ok(()) + } + /// Get an agent by ID. pub fn get_agent(&self, agent_id: &str) -> Result> { let mut stmt = self.conn.prepare( - "SELECT id, name, capabilities, status, created_at FROM agents WHERE id = ?1", + "SELECT id, name, capabilities, status, created_at, host, last_seen FROM agents WHERE id = ?1", )?; let mut rows = stmt.query_map(params![agent_id], |row| { @@ -374,6 +396,8 @@ impl Store { capabilities: serde_json::from_str(&caps_str).unwrap_or(serde_json::Value::Null), status: row.get(3)?, created_at: row.get(4)?, + host: row.get(5)?, + last_seen: row.get(6)?, }) })?; @@ -387,7 +411,7 @@ impl Store { /// List all registered agents. pub fn list_agents(&self) -> Result> { let mut stmt = self.conn.prepare( - "SELECT id, name, capabilities, status, created_at FROM agents ORDER BY name", + "SELECT id, name, capabilities, status, created_at, host, last_seen FROM agents ORDER BY name", )?; let rows = stmt.query_map([], |row| { let caps_str: String = row.get(2)?; @@ -397,6 +421,8 @@ impl Store { capabilities: serde_json::from_str(&caps_str).unwrap_or(serde_json::Value::Null), status: row.get(3)?, created_at: row.get(4)?, + host: row.get(5)?, + last_seen: row.get(6)?, }) })?; @@ -587,6 +613,12 @@ impl Store { fn run_migrations(&self) -> Result<()> { self.conn.execute_batch(schema::SCHEMA_SQL)?; + for migration in schema::MIGRATIONS { + // Each migration runs in a try-block: if the column already + // exists (re-open), SQLite returns "duplicate column" which we + // swallow — the store stays idempotent across restarts. + let _ = self.conn.execute_batch(migration); + } Ok(()) } } diff --git a/crates/colibri-store/src/schema.rs b/crates/colibri-store/src/schema.rs index 438b300..4f96cc0 100644 --- a/crates/colibri-store/src/schema.rs +++ b/crates/colibri-store/src/schema.rs @@ -62,7 +62,6 @@ CREATE INDEX IF NOT EXISTS idx_tenants_status ON tenants(status); /// Column additions since the initial schema. Each runs inside a try-block /// so the store stays idempotent: adding a column that already exists is a -#[allow(dead_code)] // Phase 3 WIP — schema not yet wired pub const MIGRATIONS: &[&str] = &[ "ALTER TABLE agents ADD COLUMN host TEXT", "ALTER TABLE agents ADD COLUMN last_seen TEXT",