diff --git a/Cargo.lock b/Cargo.lock index 5afcdd3..2c32456 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,18 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.4" @@ -214,6 +226,7 @@ dependencies = [ "colibri-deepseek", "colibri-glasspane", "colibri-runtime", + "colibri-store", "dashmap", "reqwest", "serde", @@ -271,6 +284,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "colibri-store" +version = "0.0.1" +dependencies = [ + "chrono", + "rusqlite", + "serde", + "serde_json", + "thiserror 2.0.18", + "uuid", +] + [[package]] name = "compact_str" version = "0.9.1" @@ -520,6 +545,18 @@ dependencies = [ "num-traits", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fancy-regex" version = "0.11.0" @@ -698,6 +735,9 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", +] [[package]] name = "hashbrown" @@ -725,6 +765,15 @@ version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a" +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "heck" version = "0.5.0" @@ -1076,6 +1125,17 @@ version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" +[[package]] +name = "libsqlite3-sys" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c10584274047cb335c23d3e61bcef8e323adae7c5c8c760540f73610177fc3f" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "line-clipping" version = "0.3.7" @@ -1415,6 +1475,12 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" +[[package]] +name = "pkg-config" +version = "0.3.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" + [[package]] name = "portable-atomic" version = "1.13.1" @@ -1780,6 +1846,20 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rusqlite" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b838eba278d213a8beaf485bd313fd580ca4505a00d5871caeb1457c55322cae" +dependencies = [ + "bitflags 2.11.1", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustc-hash" version = "2.1.2" @@ -2537,6 +2617,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" diff --git a/Cargo.toml b/Cargo.toml index 98f245c..cdd1b4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["crates/colibri-contracts", "crates/colibri-deepseek", "crates/colibri-runtime", "crates/colibri-glasspane", "crates/colibri-daemon", "crates/colibri-client", "crates/colibri-glasspane-tui"] +members = ["crates/colibri-contracts", "crates/colibri-deepseek", "crates/colibri-runtime", "crates/colibri-glasspane", "crates/colibri-daemon", "crates/colibri-client", "crates/colibri-glasspane-tui", "crates/colibri-store"] [package] name = "colibri" diff --git a/README.md b/README.md index b3b5380..67b15d7 100644 --- a/README.md +++ b/README.md @@ -1,63 +1,64 @@ # Colibri The Clawdie control plane core — a small, cross-platform (FreeBSD + Linux) Rust -daemon. It unifies a coordination model (agents-as-teammates, task board, team -skills) with a cache-first cost discipline (byte-stable prompt prefixes, -cache-hit metering), sitting on top of the existing Pi engine, watchdog, hostd -and Postgres. +daemon that unifies coordination (task board, agent registry, skills catalog) +with cache-first cost discipline (byte-stable prompt prefixes, cache-hit metering). -Design + implementation path: see `doc/COLIBRI-CONTROLPLANE-PLAN.md` in -`clawdie-ai`. +**Status:** 8 crates, 72 tests, clippy-clean. Phase 3 (coordination core) in progress. -Planned next split-brain slice: +Design doc + cutover plan: `docs/COLIBRI-CUTOVER-PLAN.md`. -- `docs/COLIBRI-SKILLS-PLAN.md` — read-only Rust access layer for the - "manuals already included" / `system_skills` lane. Phase 1 is discovery + - read-only lookup/status only; no generator or embedding refresh rewrite. +## Workspace — 8 crates -## Phase 1 — `colibri-probe` +| Crate | Role | +|-------|------| +| `colibri-contracts` | JSON schema contracts (golden tests) | +| `colibri-deepseek` | DeepSeek cache-hit probe, prefix metering | +| `colibri-runtime` | Host status ingestion, runtime inventory | +| `colibri-glasspane` | Agent 5-state machine (Pi events → state) | +| `colibri-daemon` | Always-on Unix socket server, session lifecycle | +| `colibri-client` | Typed Unix-socket client + operator CLI | +| `colibri-glasspane-tui` | ratatui live dashboard (FreeBSD-native) | +| `colibri-store` | Embedded SQLite coordination (task board, agents, skills) | -A falsifiable first build that proves three things at once: - -1. Rust + `rustls` + `tokio` build cross-platform (Linux first, FreeBSD next). -2. A raw DeepSeek HTTPS call works. -3. DeepSeek **prefix caching** is real on our infra: send a byte-stable prefix - twice and observe `prompt_cache_hit_tokens > 0` on the second request. - -It prints a `clawdie.provider-smoke.result.v1` manifest on stdout. - -### Build (no key needed) +## Build ```sh cargo build --release ``` -### Run +## Test ```sh -# Build-only / skipped mode (no key): verifies the binary runs. -./target/release/colibri-probe - -# Live cache probe: -DEEPSEEK_API_KEY=sk-... ./target/release/colibri-probe +cargo test --workspace +cargo clippy --workspace --all-targets -- -D warnings ``` -Env overrides: `DEEPSEEK_MODEL` (default `deepseek-chat`, the DeepSeek API model -string — distinct from our internal `deepseek-v4-flash` alias), -`DEEPSEEK_ENDPOINT`, `COLIBRI_HOST`, `COLIBRI_AGENT`. +## Architecture -## Runtime inventory +``` +colibri-daemon (always-on Unix socket server) + ├── glasspane — agent state machine (Pi JSONL → idle/working/blocked/done) + ├── store — SQLite coordination (tasks, agents, skills) + ├── socket — newline-JSON socket API + ├── session — append-only JSONL sessions, 3-region prompt assembly + └── spawner — agent subprocess management (retry/backoff) -The FreeBSD/Linux build lane can emit the existing Clawdie runtime contract: +colibri-client — CLI tools (colibri_ctl, colibri_smoke_agent) +colibri-glasspane-tui— ratatui dashboard (no Herdr dependency) +``` + +## Probe binaries ```sh +# DeepSeek cache probe (needs DEEPSEEK_API_KEY) +cargo run --release --bin colibri-probe + +# Runtime inventory manifest cargo run --release --bin colibri-runtime-inventory ``` -It prints a `clawdie.runtime-version-inventory.v1` manifest on stdout. +## FreeBSD -## FreeBSD note - -Target `x86_64-unknown-freebsd` (Rust Tier-2). Install via `pkg install rust` or -rustup; the `rust-toolchain.toml` pins the channel for cross-host -reproducibility. TLS is `rustls` to avoid `openssl-sys` linking on FreeBSD. +Target `x86_64-unknown-freebsd` (Rust Tier-2). TLS is `rustls` to avoid +`openssl-sys` linking. Default DB path: `/var/db/colibri/colibri.sqlite`. diff --git a/crates/colibri-client/tests/live_socket_smoke.rs b/crates/colibri-client/tests/live_socket_smoke.rs index c7ec41c..0c0ab97 100644 --- a/crates/colibri-client/tests/live_socket_smoke.rs +++ b/crates/colibri-client/tests/live_socket_smoke.rs @@ -12,7 +12,8 @@ fn smoke_config() -> DaemonConfig { let data_dir = std::env::temp_dir().join(format!("colibri-live-smoke-{}", Uuid::new_v4())); DaemonConfig { socket_path: data_dir.join("colibri.sock"), - data_dir, + data_dir: data_dir.clone(), + db_path: data_dir.join("colibri.sqlite"), session_max_bytes: 2_000_000, deepseek_api_key: None, deepseek_endpoint: "https://api.deepseek.com/chat/completions".to_string(), diff --git a/crates/colibri-daemon/Cargo.toml b/crates/colibri-daemon/Cargo.toml index bb30c11..e0984d7 100644 --- a/crates/colibri-daemon/Cargo.toml +++ b/crates/colibri-daemon/Cargo.toml @@ -10,6 +10,7 @@ colibri-contracts = { path = "../colibri-contracts" } colibri-deepseek = { path = "../colibri-deepseek" } colibri-glasspane = { path = "../colibri-glasspane" } colibri-runtime = { path = "../colibri-runtime" } +colibri-store = { path = "../colibri-store" } tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.7", features = ["codec"] } serde = { version = "1", features = ["derive"] } diff --git a/crates/colibri-daemon/src/config.rs b/crates/colibri-daemon/src/config.rs index d30bf1a..3fc2d67 100644 --- a/crates/colibri-daemon/src/config.rs +++ b/crates/colibri-daemon/src/config.rs @@ -19,6 +19,8 @@ pub struct DaemonConfig { pub socket_path: PathBuf, /// Maximum bytes per session before automatic rollover. pub session_max_bytes: u64, + /// Path to the coordination SQLite database. + pub db_path: PathBuf, /// DeepSeek API key for provider routing. pub deepseek_api_key: Option, /// DeepSeek API endpoint. @@ -60,9 +62,12 @@ impl DaemonConfig { .map(PathBuf::from) .unwrap_or_else(|_| data_dir.join("colibri-daemon.sock")); + let db_path = colibri_store::default_db_path(); + Self { data_dir, socket_path, + db_path, session_max_bytes: env_parse("COLIBRI_SESSION_MAX_BYTES").unwrap_or(2_000_000), deepseek_api_key: nonempty_env("DEEPSEEK_API_KEY"), deepseek_endpoint: std::env::var("DEEPSEEK_ENDPOINT") diff --git a/crates/colibri-daemon/src/daemon.rs b/crates/colibri-daemon/src/daemon.rs index 0d5fac8..0cd4d91 100644 --- a/crates/colibri-daemon/src/daemon.rs +++ b/crates/colibri-daemon/src/daemon.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use std::time::Duration; use colibri_glasspane::PaneSupervisor; +use colibri_store::Store; use dashmap::DashMap; use tokio::sync::{broadcast, RwLock}; use tracing::{debug, error, info, warn}; @@ -28,6 +29,8 @@ pub struct DaemonState { pub agents: DashMap, /// Glasspane pane/session supervision state. pub glasspane: RwLock, + /// Coordination store (task board, agent registry, skills). + pub store: std::sync::Mutex, /// Shutdown signal: on drop or ctrl_c, all listeners abort. pub shutdown_tx: broadcast::Sender<()>, pub shutdown_rx: broadcast::Receiver<()>, @@ -36,11 +39,18 @@ pub struct DaemonState { impl DaemonState { pub fn new(config: DaemonConfig) -> Self { let (shutdown_tx, shutdown_rx) = broadcast::channel(16); + let store = Store::open(&config.db_path).unwrap_or_else(|e| { + panic!( + "failed to open coordination store at {:?}: {e}", + config.db_path + ) + }); Self { config, sessions: DashMap::new(), agents: DashMap::new(), glasspane: RwLock::new(PaneSupervisor::new()), + store: std::sync::Mutex::new(store), shutdown_tx, shutdown_rx, } diff --git a/crates/colibri-daemon/src/lib.rs b/crates/colibri-daemon/src/lib.rs index 3756d37..62c2458 100644 --- a/crates/colibri-daemon/src/lib.rs +++ b/crates/colibri-daemon/src/lib.rs @@ -48,6 +48,33 @@ pub enum HerdrCommand { GetSession { session_id: String }, #[serde(rename = "compact-session")] CompactSession { session_id: String }, + // ── Coordination board ──────────────────────────────────── + #[serde(rename = "list-tasks")] + ListTasks { status: Option }, + #[serde(rename = "create-task")] + CreateTask { + title: String, + description: Option, + }, + #[serde(rename = "transition-task")] + TransitionTask { task_id: String, status: String }, + #[serde(rename = "claim-task")] + ClaimTask { task_id: String, agent_id: String }, + #[serde(rename = "list-agents")] + ListAgents, + #[serde(rename = "register-agent")] + RegisterAgent { + name: String, + capabilities: Option, + }, + #[serde(rename = "list-skills")] + ListSkills, + #[serde(rename = "register-skill")] + RegisterSkill { + name: String, + description: Option, + category: Option, + }, } /// Outbound response to Herdr. diff --git a/crates/colibri-daemon/src/socket.rs b/crates/colibri-daemon/src/socket.rs index 7817dcb..774798e 100644 --- a/crates/colibri-daemon/src/socket.rs +++ b/crates/colibri-daemon/src/socket.rs @@ -153,6 +153,27 @@ async fn dispatch(cmd: HerdrCommand, state: &SharedState) -> HerdrResponse { HerdrCommand::KillAgent { agent_id } => cmd_kill_agent(state, agent_id).await, HerdrCommand::GetSession { session_id } => cmd_get_session(state, session_id).await, HerdrCommand::CompactSession { session_id } => cmd_compact_session(state, session_id).await, + // ── Coordination board ──────────────────────────────── + HerdrCommand::ListTasks { status } => cmd_list_tasks(state, status).await, + HerdrCommand::CreateTask { title, description } => { + cmd_create_task(state, title, description).await + } + HerdrCommand::TransitionTask { task_id, status } => { + cmd_transition_task(state, task_id, status).await + } + HerdrCommand::ClaimTask { task_id, agent_id } => { + cmd_claim_task(state, task_id, agent_id).await + } + HerdrCommand::ListAgents => cmd_list_agents(state).await, + HerdrCommand::RegisterAgent { name, capabilities } => { + cmd_register_agent(state, name, capabilities).await + } + HerdrCommand::ListSkills => cmd_list_skills(state).await, + HerdrCommand::RegisterSkill { + name, + description, + category, + } => cmd_register_skill(state, name, description, category).await, } } @@ -372,6 +393,101 @@ async fn cmd_compact_session(state: &SharedState, session_id: String) -> HerdrRe } } +// ── Coordination board handlers ────────────────────────────────── + +async fn cmd_list_tasks(state: &SharedState, status: Option) -> HerdrResponse { + let filter = status.and_then(|s| colibri_store::TaskStatus::parse_status(&s)); + match state.store.lock().unwrap().list_tasks(filter) { + Ok(tasks) => HerdrResponse::ok(serde_json::to_value(tasks).unwrap_or_default()), + Err(e) => HerdrResponse::err(format!("list tasks failed: {e}")), + } +} + +async fn cmd_create_task( + state: &SharedState, + title: String, + description: Option, +) -> HerdrResponse { + match state + .store + .lock() + .unwrap() + .create_task(&title, description.as_deref()) + { + Ok(task) => HerdrResponse::ok(serde_json::to_value(task).unwrap_or_default()), + Err(e) => HerdrResponse::err(format!("create task failed: {e}")), + } +} + +async fn cmd_transition_task( + state: &SharedState, + task_id: String, + status: String, +) -> HerdrResponse { + let new_status = match colibri_store::TaskStatus::parse_status(&status) { + Some(s) => s, + None => return HerdrResponse::err(format!("invalid status: {status}")), + }; + match state + .store + .lock() + .unwrap() + .transition_task(&task_id, new_status) + { + Ok(task) => HerdrResponse::ok(serde_json::to_value(task).unwrap_or_default()), + Err(e) => HerdrResponse::err(format!("transition task failed: {e}")), + } +} + +async fn cmd_claim_task(state: &SharedState, task_id: String, agent_id: String) -> HerdrResponse { + match state.store.lock().unwrap().claim_task(&task_id, &agent_id) { + Ok(task) => HerdrResponse::ok(serde_json::to_value(task).unwrap_or_default()), + Err(e) => HerdrResponse::err(format!("claim task failed: {e}")), + } +} + +async fn cmd_list_agents(state: &SharedState) -> HerdrResponse { + match state.store.lock().unwrap().list_agents() { + Ok(agents) => HerdrResponse::ok(serde_json::to_value(agents).unwrap_or_default()), + Err(e) => HerdrResponse::err(format!("list agents failed: {e}")), + } +} + +async fn cmd_register_agent( + state: &SharedState, + name: String, + capabilities: Option, +) -> HerdrResponse { + let caps = capabilities.unwrap_or(serde_json::json!([])); + match state.store.lock().unwrap().register_agent(&name, caps) { + Ok(agent) => HerdrResponse::ok(serde_json::to_value(agent).unwrap_or_default()), + Err(e) => HerdrResponse::err(format!("register agent failed: {e}")), + } +} + +async fn cmd_list_skills(state: &SharedState) -> HerdrResponse { + match state.store.lock().unwrap().list_skills() { + Ok(skills) => HerdrResponse::ok(serde_json::to_value(skills).unwrap_or_default()), + Err(e) => HerdrResponse::err(format!("list skills failed: {e}")), + } +} + +async fn cmd_register_skill( + state: &SharedState, + name: String, + description: Option, + category: Option, +) -> HerdrResponse { + match state.store.lock().unwrap().register_skill( + &name, + description.as_deref(), + category.as_deref(), + ) { + Ok(skill) => HerdrResponse::ok(serde_json::to_value(skill).unwrap_or_default()), + Err(e) => HerdrResponse::err(format!("register skill failed: {e}")), + } +} + #[cfg(test)] mod tests { use super::*; @@ -388,7 +504,8 @@ mod tests { )); DaemonConfig { socket_path: data_dir.join("colibri.sock"), - data_dir, + data_dir: data_dir.clone(), + db_path: data_dir.join("colibri.sqlite"), session_max_bytes: 2_000_000, deepseek_api_key: None, deepseek_endpoint: "https://api.deepseek.com/chat/completions".to_string(), diff --git a/crates/colibri-store/Cargo.toml b/crates/colibri-store/Cargo.toml new file mode 100644 index 0000000..df753a1 --- /dev/null +++ b/crates/colibri-store/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "colibri-store" +version = "0.0.1" +edition = "2021" +license = "AGPL-3.0-only" +description = "Embedded SQLite coordination store for Colibri — task board, agent registry, skills catalog" + +[dependencies] +rusqlite = { version = "0.31", features = ["bundled"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +chrono = { version = "0.4", default-features = false, features = ["clock"] } +thiserror = "2" +uuid = { version = "1", features = ["v4"] } diff --git a/crates/colibri-store/src/lib.rs b/crates/colibri-store/src/lib.rs new file mode 100644 index 0000000..1ecace9 --- /dev/null +++ b/crates/colibri-store/src/lib.rs @@ -0,0 +1,642 @@ +//! colibri-store — Embedded SQLite coordination database. +//! +//! Implements Decision #1 from `docs/COLIBRI-CUTOVER-PLAN.md`: +//! authoritative task-board lifecycle, agent registry, and skills catalog +//! in a single local SQLite file with WAL mode for concurrent snapshot readers. +//! +//! # Schema +//! +//! - `tasks` — queued→claimed→started→done/failed lifecycle +//! - `agents` — agent identity + capability profile +//! - `skills` — team-wide compounding skills catalog +//! +//! # Platform paths +//! +//! - FreeBSD service: `/var/db/colibri/colibri.sqlite` +//! - Linux dev: `$XDG_DATA_HOME/colibri/colibri.sqlite` +//! - Override: `COLIBRI_DB_PATH` env var + +mod schema; + +use std::path::{Path, PathBuf}; + +use chrono::Utc; +use rusqlite::{params, Connection, OpenFlags}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +// --------------------------------------------------------------------------- +// Public types +// --------------------------------------------------------------------------- + +/// Task lifecycle states — the task board's core state machine. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum TaskStatus { + Queued, + Claimed, + Started, + Done, + Failed, +} + +impl TaskStatus { + pub fn as_str(&self) -> &'static str { + match self { + Self::Queued => "queued", + Self::Claimed => "claimed", + Self::Started => "started", + Self::Done => "done", + Self::Failed => "failed", + } + } + + pub fn parse_status(s: &str) -> Option { + match s { + "queued" => Some(Self::Queued), + "claimed" => Some(Self::Claimed), + "started" => Some(Self::Started), + "done" => Some(Self::Done), + "failed" => Some(Self::Failed), + _ => None, + } + } +} + +/// A task on the coordination board. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Task { + pub id: String, + pub agent_id: Option, + pub status: TaskStatus, + pub title: String, + pub description: Option, + pub created_at: String, + pub updated_at: String, +} + +/// An agent registered in the coordination system. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Agent { + pub id: String, + pub name: String, + /// JSON blob of capability tags (e.g. `["code","freebsd","rust"]`). + pub capabilities: serde_json::Value, + /// Agent status: `active`, `idle`, `offline`. + pub status: String, + pub created_at: String, +} + +/// A team-wide skill entry. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Skill { + pub id: String, + pub name: String, + pub description: Option, + pub category: Option, + pub created_at: String, +} + +// --------------------------------------------------------------------------- +// Errors +// --------------------------------------------------------------------------- + +#[derive(Debug, Error)] +pub enum StoreError { + #[error("database error: {0}")] + Database(#[from] rusqlite::Error), + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + #[error("JSON error: {0}")] + Json(#[from] serde_json::Error), + #[error("not found: {0}")] + NotFound(String), + #[error("already exists: {0}")] + AlreadyExists(String), +} + +pub type Result = std::result::Result; + +// --------------------------------------------------------------------------- +// Store +// --------------------------------------------------------------------------- + +/// The coordination store — a thin typed wrapper around a SQLite connection. +pub struct Store { + conn: Connection, + db_path: PathBuf, +} + +impl Store { + /// Open (or create) the store at `db_path`. Runs migrations automatically. + pub fn open(db_path: impl Into) -> Result { + let db_path = db_path.into(); + + // Ensure parent directory exists + if let Some(parent) = db_path.parent() { + std::fs::create_dir_all(parent)?; + } + + let conn = Connection::open_with_flags( + &db_path, + OpenFlags::SQLITE_OPEN_READ_WRITE + | OpenFlags::SQLITE_OPEN_CREATE + | OpenFlags::SQLITE_OPEN_NO_MUTEX, + )?; + + // Enable WAL + NORMAL synchronous for concurrent snapshot readers + conn.execute_batch( + "PRAGMA journal_mode=WAL; + PRAGMA synchronous=NORMAL; + PRAGMA foreign_keys=ON;", + )?; + + let store = Self { conn, db_path }; + store.run_migrations()?; + Ok(store) + } + + /// Open an in-memory store for testing. + #[cfg(test)] + pub fn open_memory() -> Result { + let conn = Connection::open_in_memory()?; + conn.execute_batch( + "PRAGMA journal_mode=MEMORY; + PRAGMA foreign_keys=ON;", + )?; + let store = Self { + conn, + db_path: PathBuf::from(":memory:"), + }; + store.run_migrations()?; + Ok(store) + } + + /// Path to the database file. + pub fn db_path(&self) -> &Path { + &self.db_path + } + + // ------------------------------------------------------------------ + // Tasks + // ------------------------------------------------------------------ + + /// Create a new task (status = Queued). + pub fn create_task(&self, title: &str, description: Option<&str>) -> Result { + let id = uuid::Uuid::new_v4().to_string(); + let now = Utc::now().to_rfc3339(); + + self.conn.execute( + "INSERT INTO tasks (id, status, title, description, created_at, updated_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + params![ + id, + TaskStatus::Queued.as_str(), + title, + description, + now, + now + ], + )?; + + Ok(Task { + id, + agent_id: None, + status: TaskStatus::Queued, + title: title.to_string(), + description: description.map(str::to_string), + created_at: now.clone(), + updated_at: now, + }) + } + + /// Transition a task to a new status. Returns the updated task. + pub fn transition_task(&self, task_id: &str, new_status: TaskStatus) -> Result { + let now = Utc::now().to_rfc3339(); + let rows = self.conn.execute( + "UPDATE tasks SET status = ?1, updated_at = ?2 WHERE id = ?3", + params![new_status.as_str(), now, task_id], + )?; + + if rows == 0 { + return Err(StoreError::NotFound(task_id.to_string())); + } + + self.get_task(task_id)? + .ok_or_else(|| StoreError::NotFound(task_id.to_string())) + } + + /// Assign a task to an agent (status → Claimed). + pub fn claim_task(&self, task_id: &str, agent_id: &str) -> Result { + let now = Utc::now().to_rfc3339(); + let rows = self.conn.execute( + "UPDATE tasks SET agent_id = ?1, status = ?2, updated_at = ?3 WHERE id = ?4", + params![agent_id, TaskStatus::Claimed.as_str(), now, task_id], + )?; + + if rows == 0 { + return Err(StoreError::NotFound(task_id.to_string())); + } + + self.get_task(task_id)? + .ok_or_else(|| StoreError::NotFound(task_id.to_string())) + } + + /// Get a single task by ID. + pub fn get_task(&self, task_id: &str) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT id, agent_id, status, title, description, created_at, updated_at + FROM tasks WHERE id = ?1", + )?; + + let mut rows = stmt.query_map(params![task_id], |row| { + let status_str: String = row.get(2)?; + Ok(Task { + id: row.get(0)?, + agent_id: row.get(1)?, + status: TaskStatus::parse_status(&status_str).unwrap_or(TaskStatus::Queued), + title: row.get(3)?, + description: row.get(4)?, + created_at: row.get(5)?, + updated_at: row.get(6)?, + }) + })?; + + match rows.next() { + Some(Ok(task)) => Ok(Some(task)), + Some(Err(e)) => Err(StoreError::Database(e)), + None => Ok(None), + } + } + + /// List tasks, optionally filtered by status. + pub fn list_tasks(&self, status_filter: Option) -> Result> { + let sql = if status_filter.is_some() { + "SELECT id, agent_id, status, title, description, created_at, updated_at + FROM tasks WHERE status = ?1 ORDER BY created_at DESC" + } else { + "SELECT id, agent_id, status, title, description, created_at, updated_at + FROM tasks ORDER BY created_at DESC" + }; + + let mut stmt = self.conn.prepare(sql)?; + let rows = if let Some(status) = status_filter { + stmt.query_map(params![status.as_str()], row_to_task)? + } else { + stmt.query_map([], row_to_task)? + }; + + let mut tasks = Vec::new(); + for row in rows { + tasks.push(row?); + } + Ok(tasks) + } + + // ------------------------------------------------------------------ + // Agents + // ------------------------------------------------------------------ + + /// Register a new agent. + pub fn register_agent(&self, name: &str, capabilities: serde_json::Value) -> Result { + let id = uuid::Uuid::new_v4().to_string(); + let now = Utc::now().to_rfc3339(); + let caps_json = serde_json::to_string(&capabilities)?; + + self.conn.execute( + "INSERT INTO agents (id, name, capabilities, status, created_at) + VALUES (?1, ?2, ?3, 'active', ?4)", + params![id, name, caps_json, now], + )?; + + Ok(Agent { + id, + name: name.to_string(), + capabilities, + status: "active".to_string(), + created_at: now, + }) + } + + /// Get an agent by ID. + pub fn get_agent(&self, agent_id: &str) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT id, name, capabilities, status, created_at FROM agents WHERE id = ?1", + )?; + + let mut rows = stmt.query_map(params![agent_id], |row| { + let caps_str: String = row.get(2)?; + Ok(Agent { + id: row.get(0)?, + name: row.get(1)?, + capabilities: serde_json::from_str(&caps_str).unwrap_or(serde_json::Value::Null), + status: row.get(3)?, + created_at: row.get(4)?, + }) + })?; + + match rows.next() { + Some(Ok(agent)) => Ok(Some(agent)), + Some(Err(e)) => Err(StoreError::Database(e)), + None => Ok(None), + } + } + + /// List all registered agents. + pub fn list_agents(&self) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT id, name, capabilities, status, created_at FROM agents ORDER BY name", + )?; + let rows = stmt.query_map([], |row| { + let caps_str: String = row.get(2)?; + Ok(Agent { + id: row.get(0)?, + name: row.get(1)?, + capabilities: serde_json::from_str(&caps_str).unwrap_or(serde_json::Value::Null), + status: row.get(3)?, + created_at: row.get(4)?, + }) + })?; + + let mut agents = Vec::new(); + for row in rows { + agents.push(row?); + } + Ok(agents) + } + + /// Update agent status (active/idle/offline). + pub fn set_agent_status(&self, agent_id: &str, status: &str) -> Result<()> { + let rows = self.conn.execute( + "UPDATE agents SET status = ?1 WHERE id = ?2", + params![status, agent_id], + )?; + if rows == 0 { + return Err(StoreError::NotFound(agent_id.to_string())); + } + Ok(()) + } + + // ------------------------------------------------------------------ + // Skills + // ------------------------------------------------------------------ + + /// Register a team skill. + pub fn register_skill( + &self, + name: &str, + description: Option<&str>, + category: Option<&str>, + ) -> Result { + let id = uuid::Uuid::new_v4().to_string(); + let now = Utc::now().to_rfc3339(); + + self.conn.execute( + "INSERT INTO skills (id, name, description, category, created_at) + VALUES (?1, ?2, ?3, ?4, ?5)", + params![id, name, description, category, now], + )?; + + Ok(Skill { + id, + name: name.to_string(), + description: description.map(str::to_string), + category: category.map(str::to_string), + created_at: now, + }) + } + + /// List all registered skills. + pub fn list_skills(&self) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT id, name, description, category, created_at FROM skills ORDER BY name", + )?; + let rows = stmt.query_map([], |row| { + Ok(Skill { + id: row.get(0)?, + name: row.get(1)?, + description: row.get(2)?, + category: row.get(3)?, + created_at: row.get(4)?, + }) + })?; + + let mut skills = Vec::new(); + for row in rows { + skills.push(row?); + } + Ok(skills) + } + + // ------------------------------------------------------------------ + // Backup / export + // ------------------------------------------------------------------ + + /// Hot binary backup via `VACUUM INTO` — point-in-time, no stop required. + pub fn backup(&self, dest: impl AsRef) -> Result<()> { + self.conn.execute_batch(&format!( + "VACUUM INTO '{}';", + dest.as_ref().display().to_string().replace('\'', "''") + ))?; + Ok(()) + } + + /// Export the full store as a JSON object suitable for dual-run parity diffs. + pub fn export_json(&self) -> Result { + Ok(serde_json::json!({ + "tasks": self.list_tasks(None)?, + "agents": self.list_agents()?, + "skills": self.list_skills()?, + "exported_at": Utc::now().to_rfc3339(), + })) + } + + // ------------------------------------------------------------------ + // Migrations + // ------------------------------------------------------------------ + + fn run_migrations(&self) -> Result<()> { + self.conn.execute_batch(schema::SCHEMA_SQL)?; + Ok(()) + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +fn row_to_task(row: &rusqlite::Row<'_>) -> rusqlite::Result { + let status_str: String = row.get(2)?; + Ok(Task { + id: row.get(0)?, + agent_id: row.get(1)?, + status: TaskStatus::parse_status(&status_str).unwrap_or(TaskStatus::Queued), + title: row.get(3)?, + description: row.get(4)?, + created_at: row.get(5)?, + updated_at: row.get(6)?, + }) +} + +// --------------------------------------------------------------------------- +// Default database path (platform-aware) +// --------------------------------------------------------------------------- + +/// Returns the default database path for the current platform. +/// +/// - `COLIBRI_DB_PATH` env var wins if set. +/// - FreeBSD: `/var/db/colibri/colibri.sqlite` +/// - Linux/macOS: `$XDG_DATA_HOME/colibri/colibri.sqlite` +pub fn default_db_path() -> PathBuf { + if let Ok(p) = std::env::var("COLIBRI_DB_PATH") { + if !p.trim().is_empty() { + return PathBuf::from(p); + } + } + + #[cfg(target_os = "freebsd")] + { + PathBuf::from("/var/db/colibri/colibri.sqlite") + } + + #[cfg(not(target_os = "freebsd"))] + { + std::env::var("XDG_DATA_HOME") + .ok() + .filter(|v| !v.trim().is_empty()) + .map(PathBuf::from) + .or_else(|| { + std::env::var("HOME") + .ok() + .map(|h| PathBuf::from(h).join(".local").join("share")) + }) + .unwrap_or_else(|| PathBuf::from("/tmp")) + .join("colibri") + .join("colibri.sqlite") + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_task_lifecycle() { + let store = Store::open_memory().unwrap(); + + // Register an agent first (FK constraint) + let agent = store + .register_agent("codex", serde_json::json!(["code", "rust"])) + .unwrap(); + + // Create + let task = store.create_task("Test task", Some("a test task")).unwrap(); + assert_eq!(task.status, TaskStatus::Queued); + assert_eq!(task.title, "Test task"); + + // Claim + let task = store.claim_task(&task.id, &agent.id).unwrap(); + assert_eq!(task.status, TaskStatus::Claimed); + assert_eq!(task.agent_id.as_deref(), Some(agent.id.as_str())); + + // Start + let task = store + .transition_task(&task.id, TaskStatus::Started) + .unwrap(); + assert_eq!(task.status, TaskStatus::Started); + + // Done + let task = store.transition_task(&task.id, TaskStatus::Done).unwrap(); + assert_eq!(task.status, TaskStatus::Done); + } + + #[test] + fn test_transition_not_found() { + let store = Store::open_memory().unwrap(); + let result = store.transition_task("nonexistent", TaskStatus::Started); + assert!(result.is_err()); + } + + #[test] + fn test_list_tasks_filtered() { + let store = Store::open_memory().unwrap(); + + store.create_task("Task A", None).unwrap(); + let task_b = store.create_task("Task B", None).unwrap(); + store.transition_task(&task_b.id, TaskStatus::Done).unwrap(); + + let queued = store.list_tasks(Some(TaskStatus::Queued)).unwrap(); + assert_eq!(queued.len(), 1); + assert_eq!(queued[0].title, "Task A"); + + let done = store.list_tasks(Some(TaskStatus::Done)).unwrap(); + assert_eq!(done.len(), 1); + assert_eq!(done[0].title, "Task B"); + + let all = store.list_tasks(None).unwrap(); + assert_eq!(all.len(), 2); + } + + #[test] + fn test_agent_registry() { + let store = Store::open_memory().unwrap(); + + let caps = serde_json::json!(["code", "rust", "freebsd"]); + let agent = store.register_agent("codex", caps.clone()).unwrap(); + + assert_eq!(agent.name, "codex"); + assert_eq!(agent.status, "active"); + assert_eq!(agent.capabilities, caps); + + let got = store.get_agent(&agent.id).unwrap().unwrap(); + assert_eq!(got.name, "codex"); + + store.set_agent_status(&agent.id, "offline").unwrap(); + let got = store.get_agent(&agent.id).unwrap().unwrap(); + assert_eq!(got.status, "offline"); + } + + #[test] + fn test_skills_catalog() { + let store = Store::open_memory().unwrap(); + + let skill = store + .register_skill( + "ssh-agent-setup", + Some("SSH agent persistence"), + Some("devops"), + ) + .unwrap(); + + assert_eq!(skill.name, "ssh-agent-setup"); + assert_eq!(skill.description.as_deref(), Some("SSH agent persistence")); + + let all = store.list_skills().unwrap(); + assert_eq!(all.len(), 1); + } + + #[test] + fn test_export_json() { + let store = Store::open_memory().unwrap(); + store.create_task("Export test", None).unwrap(); + store + .register_agent("test-agent", serde_json::json!(["test"])) + .unwrap(); + store + .register_skill("test-skill", None, Some("test")) + .unwrap(); + + let json = store.export_json().unwrap(); + assert!(json["tasks"].as_array().unwrap().len() == 1); + assert!(json["agents"].as_array().unwrap().len() == 1); + assert!(json["skills"].as_array().unwrap().len() == 1); + assert!(json["exported_at"].is_string()); + } + + #[test] + fn test_default_db_path_freebsd() { + // On Linux, this returns the XDG path. On FreeBSD, /var/db/colibri. + let path = default_db_path(); + assert!(path.to_string_lossy().contains("colibri.sqlite")); + } +} diff --git a/crates/colibri-store/src/schema.rs b/crates/colibri-store/src/schema.rs new file mode 100644 index 0000000..f4269f8 --- /dev/null +++ b/crates/colibri-store/src/schema.rs @@ -0,0 +1,42 @@ +/// SQL schema for the Colibri coordination store. +/// +/// All tables use `IF NOT EXISTS` — safe to run on every open (idempotent). +/// No downward migrations yet; schema evolution adds columns/tables additively. +pub const SCHEMA_SQL: &str = " +-- Tasks: the coordination board's core lifecycle +CREATE TABLE IF NOT EXISTS tasks ( + id TEXT PRIMARY KEY NOT NULL, + agent_id TEXT, + status TEXT NOT NULL DEFAULT 'queued' + CHECK(status IN ('queued','claimed','started','done','failed')), + title TEXT NOT NULL, + description TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + FOREIGN KEY (agent_id) REFERENCES agents(id) +); + +CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status); +CREATE INDEX IF NOT EXISTS idx_tasks_agent ON tasks(agent_id); + +-- Agents: registered teammates with capability profiles +CREATE TABLE IF NOT EXISTS agents ( + id TEXT PRIMARY KEY NOT NULL, + name TEXT NOT NULL UNIQUE, + capabilities TEXT NOT NULL DEFAULT '[]', + status TEXT NOT NULL DEFAULT 'active' + CHECK(status IN ('active','idle','offline')), + created_at TEXT NOT NULL +); + +-- Skills: team-wide compounding catalog +CREATE TABLE IF NOT EXISTS skills ( + id TEXT PRIMARY KEY NOT NULL, + name TEXT NOT NULL UNIQUE, + description TEXT, + category TEXT, + created_at TEXT NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_skills_category ON skills(category); +";