feat: add colibri-store — embedded SQLite coordination database (Sam & Hermes)

Phase 3 coordination core: task board (queued→claimed→started→done/failed),
agent registry, skills catalog. WAL mode, VACUUM INTO backup, JSON export.
DaemonConfig gains db_path; DaemonState gains Mutex<Store>.
9 coordination socket commands: list-tasks, create-task, transition-task,
claim-task, list-agents, register-agent, list-skills, register-skill.
8 crates, 72 tests, clippy-clean. README updated to reflect 8-crate workspace.
This commit is contained in:
123kupola 2026-05-27 16:40:19 +02:00
parent 743aac19dc
commit ebc1b99b7e
12 changed files with 987 additions and 41 deletions

86
Cargo.lock generated
View file

@ -2,6 +2,18 @@
# It is not intended for manual editing.
version = 4
[[package]]
name = "ahash"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
dependencies = [
"cfg-if",
"once_cell",
"version_check",
"zerocopy",
]
[[package]]
name = "aho-corasick"
version = "1.1.4"
@ -214,6 +226,7 @@ dependencies = [
"colibri-deepseek",
"colibri-glasspane",
"colibri-runtime",
"colibri-store",
"dashmap",
"reqwest",
"serde",
@ -271,6 +284,18 @@ dependencies = [
"tokio",
]
[[package]]
name = "colibri-store"
version = "0.0.1"
dependencies = [
"chrono",
"rusqlite",
"serde",
"serde_json",
"thiserror 2.0.18",
"uuid",
]
[[package]]
name = "compact_str"
version = "0.9.1"
@ -520,6 +545,18 @@ dependencies = [
"num-traits",
]
[[package]]
name = "fallible-iterator"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649"
[[package]]
name = "fallible-streaming-iterator"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "fancy-regex"
version = "0.11.0"
@ -698,6 +735,9 @@ name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
dependencies = [
"ahash",
]
[[package]]
name = "hashbrown"
@ -725,6 +765,15 @@ version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a"
[[package]]
name = "hashlink"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af"
dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "heck"
version = "0.5.0"
@ -1076,6 +1125,17 @@ version = "0.2.186"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66"
[[package]]
name = "libsqlite3-sys"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c10584274047cb335c23d3e61bcef8e323adae7c5c8c760540f73610177fc3f"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "line-clipping"
version = "0.3.7"
@ -1415,6 +1475,12 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd"
[[package]]
name = "pkg-config"
version = "0.3.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e"
[[package]]
name = "portable-atomic"
version = "1.13.1"
@ -1780,6 +1846,20 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "rusqlite"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b838eba278d213a8beaf485bd313fd580ca4505a00d5871caeb1457c55322cae"
dependencies = [
"bitflags 2.11.1",
"fallible-iterator",
"fallible-streaming-iterator",
"hashlink",
"libsqlite3-sys",
"smallvec",
]
[[package]]
name = "rustc-hash"
version = "2.1.2"
@ -2537,6 +2617,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.5"

View file

@ -1,5 +1,5 @@
[workspace]
members = ["crates/colibri-contracts", "crates/colibri-deepseek", "crates/colibri-runtime", "crates/colibri-glasspane", "crates/colibri-daemon", "crates/colibri-client", "crates/colibri-glasspane-tui"]
members = ["crates/colibri-contracts", "crates/colibri-deepseek", "crates/colibri-runtime", "crates/colibri-glasspane", "crates/colibri-daemon", "crates/colibri-client", "crates/colibri-glasspane-tui", "crates/colibri-store"]
[package]
name = "colibri"

View file

@ -1,63 +1,64 @@
# Colibri
The Clawdie control plane core — a small, cross-platform (FreeBSD + Linux) Rust
daemon. It unifies a coordination model (agents-as-teammates, task board, team
skills) with a cache-first cost discipline (byte-stable prompt prefixes,
cache-hit metering), sitting on top of the existing Pi engine, watchdog, hostd
and Postgres.
daemon that unifies coordination (task board, agent registry, skills catalog)
with cache-first cost discipline (byte-stable prompt prefixes, cache-hit metering).
Design + implementation path: see `doc/COLIBRI-CONTROLPLANE-PLAN.md` in
`clawdie-ai`.
**Status:** 8 crates, 72 tests, clippy-clean. Phase 3 (coordination core) in progress.
Planned next split-brain slice:
Design doc + cutover plan: `docs/COLIBRI-CUTOVER-PLAN.md`.
- `docs/COLIBRI-SKILLS-PLAN.md` — read-only Rust access layer for the
"manuals already included" / `system_skills` lane. Phase 1 is discovery +
read-only lookup/status only; no generator or embedding refresh rewrite.
## Workspace — 8 crates
## Phase 1 — `colibri-probe`
| Crate | Role |
|-------|------|
| `colibri-contracts` | JSON schema contracts (golden tests) |
| `colibri-deepseek` | DeepSeek cache-hit probe, prefix metering |
| `colibri-runtime` | Host status ingestion, runtime inventory |
| `colibri-glasspane` | Agent 5-state machine (Pi events → state) |
| `colibri-daemon` | Always-on Unix socket server, session lifecycle |
| `colibri-client` | Typed Unix-socket client + operator CLI |
| `colibri-glasspane-tui` | ratatui live dashboard (FreeBSD-native) |
| `colibri-store` | Embedded SQLite coordination (task board, agents, skills) |
A falsifiable first build that proves three things at once:
1. Rust + `rustls` + `tokio` build cross-platform (Linux first, FreeBSD next).
2. A raw DeepSeek HTTPS call works.
3. DeepSeek **prefix caching** is real on our infra: send a byte-stable prefix
twice and observe `prompt_cache_hit_tokens > 0` on the second request.
It prints a `clawdie.provider-smoke.result.v1` manifest on stdout.
### Build (no key needed)
## Build
```sh
cargo build --release
```
### Run
## Test
```sh
# Build-only / skipped mode (no key): verifies the binary runs.
./target/release/colibri-probe
# Live cache probe:
DEEPSEEK_API_KEY=sk-... ./target/release/colibri-probe
cargo test --workspace
cargo clippy --workspace --all-targets -- -D warnings
```
Env overrides: `DEEPSEEK_MODEL` (default `deepseek-chat`, the DeepSeek API model
string — distinct from our internal `deepseek-v4-flash` alias),
`DEEPSEEK_ENDPOINT`, `COLIBRI_HOST`, `COLIBRI_AGENT`.
## Architecture
## Runtime inventory
```
colibri-daemon (always-on Unix socket server)
├── glasspane — agent state machine (Pi JSONL → idle/working/blocked/done)
├── store — SQLite coordination (tasks, agents, skills)
├── socket — newline-JSON socket API
├── session — append-only JSONL sessions, 3-region prompt assembly
└── spawner — agent subprocess management (retry/backoff)
The FreeBSD/Linux build lane can emit the existing Clawdie runtime contract:
colibri-client — CLI tools (colibri_ctl, colibri_smoke_agent)
colibri-glasspane-tui— ratatui dashboard (no Herdr dependency)
```
## Probe binaries
```sh
# DeepSeek cache probe (needs DEEPSEEK_API_KEY)
cargo run --release --bin colibri-probe
# Runtime inventory manifest
cargo run --release --bin colibri-runtime-inventory
```
It prints a `clawdie.runtime-version-inventory.v1` manifest on stdout.
## FreeBSD
## FreeBSD note
Target `x86_64-unknown-freebsd` (Rust Tier-2). Install via `pkg install rust` or
rustup; the `rust-toolchain.toml` pins the channel for cross-host
reproducibility. TLS is `rustls` to avoid `openssl-sys` linking on FreeBSD.
Target `x86_64-unknown-freebsd` (Rust Tier-2). TLS is `rustls` to avoid
`openssl-sys` linking. Default DB path: `/var/db/colibri/colibri.sqlite`.

View file

@ -12,7 +12,8 @@ fn smoke_config() -> DaemonConfig {
let data_dir = std::env::temp_dir().join(format!("colibri-live-smoke-{}", Uuid::new_v4()));
DaemonConfig {
socket_path: data_dir.join("colibri.sock"),
data_dir,
data_dir: data_dir.clone(),
db_path: data_dir.join("colibri.sqlite"),
session_max_bytes: 2_000_000,
deepseek_api_key: None,
deepseek_endpoint: "https://api.deepseek.com/chat/completions".to_string(),

View file

@ -10,6 +10,7 @@ colibri-contracts = { path = "../colibri-contracts" }
colibri-deepseek = { path = "../colibri-deepseek" }
colibri-glasspane = { path = "../colibri-glasspane" }
colibri-runtime = { path = "../colibri-runtime" }
colibri-store = { path = "../colibri-store" }
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["codec"] }
serde = { version = "1", features = ["derive"] }

View file

@ -19,6 +19,8 @@ pub struct DaemonConfig {
pub socket_path: PathBuf,
/// Maximum bytes per session before automatic rollover.
pub session_max_bytes: u64,
/// Path to the coordination SQLite database.
pub db_path: PathBuf,
/// DeepSeek API key for provider routing.
pub deepseek_api_key: Option<String>,
/// DeepSeek API endpoint.
@ -60,9 +62,12 @@ impl DaemonConfig {
.map(PathBuf::from)
.unwrap_or_else(|_| data_dir.join("colibri-daemon.sock"));
let db_path = colibri_store::default_db_path();
Self {
data_dir,
socket_path,
db_path,
session_max_bytes: env_parse("COLIBRI_SESSION_MAX_BYTES").unwrap_or(2_000_000),
deepseek_api_key: nonempty_env("DEEPSEEK_API_KEY"),
deepseek_endpoint: std::env::var("DEEPSEEK_ENDPOINT")

View file

@ -7,6 +7,7 @@ use std::sync::Arc;
use std::time::Duration;
use colibri_glasspane::PaneSupervisor;
use colibri_store::Store;
use dashmap::DashMap;
use tokio::sync::{broadcast, RwLock};
use tracing::{debug, error, info, warn};
@ -28,6 +29,8 @@ pub struct DaemonState {
pub agents: DashMap<String, AgentHandle>,
/// Glasspane pane/session supervision state.
pub glasspane: RwLock<PaneSupervisor>,
/// Coordination store (task board, agent registry, skills).
pub store: std::sync::Mutex<Store>,
/// Shutdown signal: on drop or ctrl_c, all listeners abort.
pub shutdown_tx: broadcast::Sender<()>,
pub shutdown_rx: broadcast::Receiver<()>,
@ -36,11 +39,18 @@ pub struct DaemonState {
impl DaemonState {
pub fn new(config: DaemonConfig) -> Self {
let (shutdown_tx, shutdown_rx) = broadcast::channel(16);
let store = Store::open(&config.db_path).unwrap_or_else(|e| {
panic!(
"failed to open coordination store at {:?}: {e}",
config.db_path
)
});
Self {
config,
sessions: DashMap::new(),
agents: DashMap::new(),
glasspane: RwLock::new(PaneSupervisor::new()),
store: std::sync::Mutex::new(store),
shutdown_tx,
shutdown_rx,
}

View file

@ -48,6 +48,33 @@ pub enum HerdrCommand {
GetSession { session_id: String },
#[serde(rename = "compact-session")]
CompactSession { session_id: String },
// ── Coordination board ────────────────────────────────────
#[serde(rename = "list-tasks")]
ListTasks { status: Option<String> },
#[serde(rename = "create-task")]
CreateTask {
title: String,
description: Option<String>,
},
#[serde(rename = "transition-task")]
TransitionTask { task_id: String, status: String },
#[serde(rename = "claim-task")]
ClaimTask { task_id: String, agent_id: String },
#[serde(rename = "list-agents")]
ListAgents,
#[serde(rename = "register-agent")]
RegisterAgent {
name: String,
capabilities: Option<serde_json::Value>,
},
#[serde(rename = "list-skills")]
ListSkills,
#[serde(rename = "register-skill")]
RegisterSkill {
name: String,
description: Option<String>,
category: Option<String>,
},
}
/// Outbound response to Herdr.

View file

@ -153,6 +153,27 @@ async fn dispatch(cmd: HerdrCommand, state: &SharedState) -> HerdrResponse {
HerdrCommand::KillAgent { agent_id } => cmd_kill_agent(state, agent_id).await,
HerdrCommand::GetSession { session_id } => cmd_get_session(state, session_id).await,
HerdrCommand::CompactSession { session_id } => cmd_compact_session(state, session_id).await,
// ── Coordination board ────────────────────────────────
HerdrCommand::ListTasks { status } => cmd_list_tasks(state, status).await,
HerdrCommand::CreateTask { title, description } => {
cmd_create_task(state, title, description).await
}
HerdrCommand::TransitionTask { task_id, status } => {
cmd_transition_task(state, task_id, status).await
}
HerdrCommand::ClaimTask { task_id, agent_id } => {
cmd_claim_task(state, task_id, agent_id).await
}
HerdrCommand::ListAgents => cmd_list_agents(state).await,
HerdrCommand::RegisterAgent { name, capabilities } => {
cmd_register_agent(state, name, capabilities).await
}
HerdrCommand::ListSkills => cmd_list_skills(state).await,
HerdrCommand::RegisterSkill {
name,
description,
category,
} => cmd_register_skill(state, name, description, category).await,
}
}
@ -372,6 +393,101 @@ async fn cmd_compact_session(state: &SharedState, session_id: String) -> HerdrRe
}
}
// ── Coordination board handlers ──────────────────────────────────
async fn cmd_list_tasks(state: &SharedState, status: Option<String>) -> HerdrResponse {
let filter = status.and_then(|s| colibri_store::TaskStatus::parse_status(&s));
match state.store.lock().unwrap().list_tasks(filter) {
Ok(tasks) => HerdrResponse::ok(serde_json::to_value(tasks).unwrap_or_default()),
Err(e) => HerdrResponse::err(format!("list tasks failed: {e}")),
}
}
async fn cmd_create_task(
state: &SharedState,
title: String,
description: Option<String>,
) -> HerdrResponse {
match state
.store
.lock()
.unwrap()
.create_task(&title, description.as_deref())
{
Ok(task) => HerdrResponse::ok(serde_json::to_value(task).unwrap_or_default()),
Err(e) => HerdrResponse::err(format!("create task failed: {e}")),
}
}
async fn cmd_transition_task(
state: &SharedState,
task_id: String,
status: String,
) -> HerdrResponse {
let new_status = match colibri_store::TaskStatus::parse_status(&status) {
Some(s) => s,
None => return HerdrResponse::err(format!("invalid status: {status}")),
};
match state
.store
.lock()
.unwrap()
.transition_task(&task_id, new_status)
{
Ok(task) => HerdrResponse::ok(serde_json::to_value(task).unwrap_or_default()),
Err(e) => HerdrResponse::err(format!("transition task failed: {e}")),
}
}
async fn cmd_claim_task(state: &SharedState, task_id: String, agent_id: String) -> HerdrResponse {
match state.store.lock().unwrap().claim_task(&task_id, &agent_id) {
Ok(task) => HerdrResponse::ok(serde_json::to_value(task).unwrap_or_default()),
Err(e) => HerdrResponse::err(format!("claim task failed: {e}")),
}
}
async fn cmd_list_agents(state: &SharedState) -> HerdrResponse {
match state.store.lock().unwrap().list_agents() {
Ok(agents) => HerdrResponse::ok(serde_json::to_value(agents).unwrap_or_default()),
Err(e) => HerdrResponse::err(format!("list agents failed: {e}")),
}
}
async fn cmd_register_agent(
state: &SharedState,
name: String,
capabilities: Option<serde_json::Value>,
) -> HerdrResponse {
let caps = capabilities.unwrap_or(serde_json::json!([]));
match state.store.lock().unwrap().register_agent(&name, caps) {
Ok(agent) => HerdrResponse::ok(serde_json::to_value(agent).unwrap_or_default()),
Err(e) => HerdrResponse::err(format!("register agent failed: {e}")),
}
}
async fn cmd_list_skills(state: &SharedState) -> HerdrResponse {
match state.store.lock().unwrap().list_skills() {
Ok(skills) => HerdrResponse::ok(serde_json::to_value(skills).unwrap_or_default()),
Err(e) => HerdrResponse::err(format!("list skills failed: {e}")),
}
}
async fn cmd_register_skill(
state: &SharedState,
name: String,
description: Option<String>,
category: Option<String>,
) -> HerdrResponse {
match state.store.lock().unwrap().register_skill(
&name,
description.as_deref(),
category.as_deref(),
) {
Ok(skill) => HerdrResponse::ok(serde_json::to_value(skill).unwrap_or_default()),
Err(e) => HerdrResponse::err(format!("register skill failed: {e}")),
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -388,7 +504,8 @@ mod tests {
));
DaemonConfig {
socket_path: data_dir.join("colibri.sock"),
data_dir,
data_dir: data_dir.clone(),
db_path: data_dir.join("colibri.sqlite"),
session_max_bytes: 2_000_000,
deepseek_api_key: None,
deepseek_endpoint: "https://api.deepseek.com/chat/completions".to_string(),

View file

@ -0,0 +1,14 @@
[package]
name = "colibri-store"
version = "0.0.1"
edition = "2021"
license = "AGPL-3.0-only"
description = "Embedded SQLite coordination store for Colibri — task board, agent registry, skills catalog"
[dependencies]
rusqlite = { version = "0.31", features = ["bundled"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
thiserror = "2"
uuid = { version = "1", features = ["v4"] }

View file

@ -0,0 +1,642 @@
//! colibri-store — Embedded SQLite coordination database.
//!
//! Implements Decision #1 from `docs/COLIBRI-CUTOVER-PLAN.md`:
//! authoritative task-board lifecycle, agent registry, and skills catalog
//! in a single local SQLite file with WAL mode for concurrent snapshot readers.
//!
//! # Schema
//!
//! - `tasks` — queued→claimed→started→done/failed lifecycle
//! - `agents` — agent identity + capability profile
//! - `skills` — team-wide compounding skills catalog
//!
//! # Platform paths
//!
//! - FreeBSD service: `/var/db/colibri/colibri.sqlite`
//! - Linux dev: `$XDG_DATA_HOME/colibri/colibri.sqlite`
//! - Override: `COLIBRI_DB_PATH` env var
mod schema;
use std::path::{Path, PathBuf};
use chrono::Utc;
use rusqlite::{params, Connection, OpenFlags};
use serde::{Deserialize, Serialize};
use thiserror::Error;
// ---------------------------------------------------------------------------
// Public types
// ---------------------------------------------------------------------------
/// Task lifecycle states — the task board's core state machine.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum TaskStatus {
Queued,
Claimed,
Started,
Done,
Failed,
}
impl TaskStatus {
pub fn as_str(&self) -> &'static str {
match self {
Self::Queued => "queued",
Self::Claimed => "claimed",
Self::Started => "started",
Self::Done => "done",
Self::Failed => "failed",
}
}
pub fn parse_status(s: &str) -> Option<Self> {
match s {
"queued" => Some(Self::Queued),
"claimed" => Some(Self::Claimed),
"started" => Some(Self::Started),
"done" => Some(Self::Done),
"failed" => Some(Self::Failed),
_ => None,
}
}
}
/// A task on the coordination board.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Task {
pub id: String,
pub agent_id: Option<String>,
pub status: TaskStatus,
pub title: String,
pub description: Option<String>,
pub created_at: String,
pub updated_at: String,
}
/// An agent registered in the coordination system.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Agent {
pub id: String,
pub name: String,
/// JSON blob of capability tags (e.g. `["code","freebsd","rust"]`).
pub capabilities: serde_json::Value,
/// Agent status: `active`, `idle`, `offline`.
pub status: String,
pub created_at: String,
}
/// A team-wide skill entry.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Skill {
pub id: String,
pub name: String,
pub description: Option<String>,
pub category: Option<String>,
pub created_at: String,
}
// ---------------------------------------------------------------------------
// Errors
// ---------------------------------------------------------------------------
#[derive(Debug, Error)]
pub enum StoreError {
#[error("database error: {0}")]
Database(#[from] rusqlite::Error),
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
#[error("not found: {0}")]
NotFound(String),
#[error("already exists: {0}")]
AlreadyExists(String),
}
pub type Result<T> = std::result::Result<T, StoreError>;
// ---------------------------------------------------------------------------
// Store
// ---------------------------------------------------------------------------
/// The coordination store — a thin typed wrapper around a SQLite connection.
pub struct Store {
conn: Connection,
db_path: PathBuf,
}
impl Store {
/// Open (or create) the store at `db_path`. Runs migrations automatically.
pub fn open(db_path: impl Into<PathBuf>) -> Result<Self> {
let db_path = db_path.into();
// Ensure parent directory exists
if let Some(parent) = db_path.parent() {
std::fs::create_dir_all(parent)?;
}
let conn = Connection::open_with_flags(
&db_path,
OpenFlags::SQLITE_OPEN_READ_WRITE
| OpenFlags::SQLITE_OPEN_CREATE
| OpenFlags::SQLITE_OPEN_NO_MUTEX,
)?;
// Enable WAL + NORMAL synchronous for concurrent snapshot readers
conn.execute_batch(
"PRAGMA journal_mode=WAL;
PRAGMA synchronous=NORMAL;
PRAGMA foreign_keys=ON;",
)?;
let store = Self { conn, db_path };
store.run_migrations()?;
Ok(store)
}
/// Open an in-memory store for testing.
#[cfg(test)]
pub fn open_memory() -> Result<Self> {
let conn = Connection::open_in_memory()?;
conn.execute_batch(
"PRAGMA journal_mode=MEMORY;
PRAGMA foreign_keys=ON;",
)?;
let store = Self {
conn,
db_path: PathBuf::from(":memory:"),
};
store.run_migrations()?;
Ok(store)
}
/// Path to the database file.
pub fn db_path(&self) -> &Path {
&self.db_path
}
// ------------------------------------------------------------------
// Tasks
// ------------------------------------------------------------------
/// Create a new task (status = Queued).
pub fn create_task(&self, title: &str, description: Option<&str>) -> Result<Task> {
let id = uuid::Uuid::new_v4().to_string();
let now = Utc::now().to_rfc3339();
self.conn.execute(
"INSERT INTO tasks (id, status, title, description, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![
id,
TaskStatus::Queued.as_str(),
title,
description,
now,
now
],
)?;
Ok(Task {
id,
agent_id: None,
status: TaskStatus::Queued,
title: title.to_string(),
description: description.map(str::to_string),
created_at: now.clone(),
updated_at: now,
})
}
/// Transition a task to a new status. Returns the updated task.
pub fn transition_task(&self, task_id: &str, new_status: TaskStatus) -> Result<Task> {
let now = Utc::now().to_rfc3339();
let rows = self.conn.execute(
"UPDATE tasks SET status = ?1, updated_at = ?2 WHERE id = ?3",
params![new_status.as_str(), now, task_id],
)?;
if rows == 0 {
return Err(StoreError::NotFound(task_id.to_string()));
}
self.get_task(task_id)?
.ok_or_else(|| StoreError::NotFound(task_id.to_string()))
}
/// Assign a task to an agent (status → Claimed).
pub fn claim_task(&self, task_id: &str, agent_id: &str) -> Result<Task> {
let now = Utc::now().to_rfc3339();
let rows = self.conn.execute(
"UPDATE tasks SET agent_id = ?1, status = ?2, updated_at = ?3 WHERE id = ?4",
params![agent_id, TaskStatus::Claimed.as_str(), now, task_id],
)?;
if rows == 0 {
return Err(StoreError::NotFound(task_id.to_string()));
}
self.get_task(task_id)?
.ok_or_else(|| StoreError::NotFound(task_id.to_string()))
}
/// Get a single task by ID.
pub fn get_task(&self, task_id: &str) -> Result<Option<Task>> {
let mut stmt = self.conn.prepare(
"SELECT id, agent_id, status, title, description, created_at, updated_at
FROM tasks WHERE id = ?1",
)?;
let mut rows = stmt.query_map(params![task_id], |row| {
let status_str: String = row.get(2)?;
Ok(Task {
id: row.get(0)?,
agent_id: row.get(1)?,
status: TaskStatus::parse_status(&status_str).unwrap_or(TaskStatus::Queued),
title: row.get(3)?,
description: row.get(4)?,
created_at: row.get(5)?,
updated_at: row.get(6)?,
})
})?;
match rows.next() {
Some(Ok(task)) => Ok(Some(task)),
Some(Err(e)) => Err(StoreError::Database(e)),
None => Ok(None),
}
}
/// List tasks, optionally filtered by status.
pub fn list_tasks(&self, status_filter: Option<TaskStatus>) -> Result<Vec<Task>> {
let sql = if status_filter.is_some() {
"SELECT id, agent_id, status, title, description, created_at, updated_at
FROM tasks WHERE status = ?1 ORDER BY created_at DESC"
} else {
"SELECT id, agent_id, status, title, description, created_at, updated_at
FROM tasks ORDER BY created_at DESC"
};
let mut stmt = self.conn.prepare(sql)?;
let rows = if let Some(status) = status_filter {
stmt.query_map(params![status.as_str()], row_to_task)?
} else {
stmt.query_map([], row_to_task)?
};
let mut tasks = Vec::new();
for row in rows {
tasks.push(row?);
}
Ok(tasks)
}
// ------------------------------------------------------------------
// Agents
// ------------------------------------------------------------------
/// Register a new agent.
pub fn register_agent(&self, name: &str, capabilities: serde_json::Value) -> Result<Agent> {
let id = uuid::Uuid::new_v4().to_string();
let now = Utc::now().to_rfc3339();
let caps_json = serde_json::to_string(&capabilities)?;
self.conn.execute(
"INSERT INTO agents (id, name, capabilities, status, created_at)
VALUES (?1, ?2, ?3, 'active', ?4)",
params![id, name, caps_json, now],
)?;
Ok(Agent {
id,
name: name.to_string(),
capabilities,
status: "active".to_string(),
created_at: now,
})
}
/// Get an agent by ID.
pub fn get_agent(&self, agent_id: &str) -> Result<Option<Agent>> {
let mut stmt = self.conn.prepare(
"SELECT id, name, capabilities, status, created_at FROM agents WHERE id = ?1",
)?;
let mut rows = stmt.query_map(params![agent_id], |row| {
let caps_str: String = row.get(2)?;
Ok(Agent {
id: row.get(0)?,
name: row.get(1)?,
capabilities: serde_json::from_str(&caps_str).unwrap_or(serde_json::Value::Null),
status: row.get(3)?,
created_at: row.get(4)?,
})
})?;
match rows.next() {
Some(Ok(agent)) => Ok(Some(agent)),
Some(Err(e)) => Err(StoreError::Database(e)),
None => Ok(None),
}
}
/// List all registered agents.
pub fn list_agents(&self) -> Result<Vec<Agent>> {
let mut stmt = self.conn.prepare(
"SELECT id, name, capabilities, status, created_at FROM agents ORDER BY name",
)?;
let rows = stmt.query_map([], |row| {
let caps_str: String = row.get(2)?;
Ok(Agent {
id: row.get(0)?,
name: row.get(1)?,
capabilities: serde_json::from_str(&caps_str).unwrap_or(serde_json::Value::Null),
status: row.get(3)?,
created_at: row.get(4)?,
})
})?;
let mut agents = Vec::new();
for row in rows {
agents.push(row?);
}
Ok(agents)
}
/// Update agent status (active/idle/offline).
pub fn set_agent_status(&self, agent_id: &str, status: &str) -> Result<()> {
let rows = self.conn.execute(
"UPDATE agents SET status = ?1 WHERE id = ?2",
params![status, agent_id],
)?;
if rows == 0 {
return Err(StoreError::NotFound(agent_id.to_string()));
}
Ok(())
}
// ------------------------------------------------------------------
// Skills
// ------------------------------------------------------------------
/// Register a team skill.
pub fn register_skill(
&self,
name: &str,
description: Option<&str>,
category: Option<&str>,
) -> Result<Skill> {
let id = uuid::Uuid::new_v4().to_string();
let now = Utc::now().to_rfc3339();
self.conn.execute(
"INSERT INTO skills (id, name, description, category, created_at)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![id, name, description, category, now],
)?;
Ok(Skill {
id,
name: name.to_string(),
description: description.map(str::to_string),
category: category.map(str::to_string),
created_at: now,
})
}
/// List all registered skills.
pub fn list_skills(&self) -> Result<Vec<Skill>> {
let mut stmt = self.conn.prepare(
"SELECT id, name, description, category, created_at FROM skills ORDER BY name",
)?;
let rows = stmt.query_map([], |row| {
Ok(Skill {
id: row.get(0)?,
name: row.get(1)?,
description: row.get(2)?,
category: row.get(3)?,
created_at: row.get(4)?,
})
})?;
let mut skills = Vec::new();
for row in rows {
skills.push(row?);
}
Ok(skills)
}
// ------------------------------------------------------------------
// Backup / export
// ------------------------------------------------------------------
/// Hot binary backup via `VACUUM INTO` — point-in-time, no stop required.
pub fn backup(&self, dest: impl AsRef<Path>) -> Result<()> {
self.conn.execute_batch(&format!(
"VACUUM INTO '{}';",
dest.as_ref().display().to_string().replace('\'', "''")
))?;
Ok(())
}
/// Export the full store as a JSON object suitable for dual-run parity diffs.
pub fn export_json(&self) -> Result<serde_json::Value> {
Ok(serde_json::json!({
"tasks": self.list_tasks(None)?,
"agents": self.list_agents()?,
"skills": self.list_skills()?,
"exported_at": Utc::now().to_rfc3339(),
}))
}
// ------------------------------------------------------------------
// Migrations
// ------------------------------------------------------------------
fn run_migrations(&self) -> Result<()> {
self.conn.execute_batch(schema::SCHEMA_SQL)?;
Ok(())
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
fn row_to_task(row: &rusqlite::Row<'_>) -> rusqlite::Result<Task> {
let status_str: String = row.get(2)?;
Ok(Task {
id: row.get(0)?,
agent_id: row.get(1)?,
status: TaskStatus::parse_status(&status_str).unwrap_or(TaskStatus::Queued),
title: row.get(3)?,
description: row.get(4)?,
created_at: row.get(5)?,
updated_at: row.get(6)?,
})
}
// ---------------------------------------------------------------------------
// Default database path (platform-aware)
// ---------------------------------------------------------------------------
/// Returns the default database path for the current platform.
///
/// - `COLIBRI_DB_PATH` env var wins if set.
/// - FreeBSD: `/var/db/colibri/colibri.sqlite`
/// - Linux/macOS: `$XDG_DATA_HOME/colibri/colibri.sqlite`
pub fn default_db_path() -> PathBuf {
if let Ok(p) = std::env::var("COLIBRI_DB_PATH") {
if !p.trim().is_empty() {
return PathBuf::from(p);
}
}
#[cfg(target_os = "freebsd")]
{
PathBuf::from("/var/db/colibri/colibri.sqlite")
}
#[cfg(not(target_os = "freebsd"))]
{
std::env::var("XDG_DATA_HOME")
.ok()
.filter(|v| !v.trim().is_empty())
.map(PathBuf::from)
.or_else(|| {
std::env::var("HOME")
.ok()
.map(|h| PathBuf::from(h).join(".local").join("share"))
})
.unwrap_or_else(|| PathBuf::from("/tmp"))
.join("colibri")
.join("colibri.sqlite")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_task_lifecycle() {
let store = Store::open_memory().unwrap();
// Register an agent first (FK constraint)
let agent = store
.register_agent("codex", serde_json::json!(["code", "rust"]))
.unwrap();
// Create
let task = store.create_task("Test task", Some("a test task")).unwrap();
assert_eq!(task.status, TaskStatus::Queued);
assert_eq!(task.title, "Test task");
// Claim
let task = store.claim_task(&task.id, &agent.id).unwrap();
assert_eq!(task.status, TaskStatus::Claimed);
assert_eq!(task.agent_id.as_deref(), Some(agent.id.as_str()));
// Start
let task = store
.transition_task(&task.id, TaskStatus::Started)
.unwrap();
assert_eq!(task.status, TaskStatus::Started);
// Done
let task = store.transition_task(&task.id, TaskStatus::Done).unwrap();
assert_eq!(task.status, TaskStatus::Done);
}
#[test]
fn test_transition_not_found() {
let store = Store::open_memory().unwrap();
let result = store.transition_task("nonexistent", TaskStatus::Started);
assert!(result.is_err());
}
#[test]
fn test_list_tasks_filtered() {
let store = Store::open_memory().unwrap();
store.create_task("Task A", None).unwrap();
let task_b = store.create_task("Task B", None).unwrap();
store.transition_task(&task_b.id, TaskStatus::Done).unwrap();
let queued = store.list_tasks(Some(TaskStatus::Queued)).unwrap();
assert_eq!(queued.len(), 1);
assert_eq!(queued[0].title, "Task A");
let done = store.list_tasks(Some(TaskStatus::Done)).unwrap();
assert_eq!(done.len(), 1);
assert_eq!(done[0].title, "Task B");
let all = store.list_tasks(None).unwrap();
assert_eq!(all.len(), 2);
}
#[test]
fn test_agent_registry() {
let store = Store::open_memory().unwrap();
let caps = serde_json::json!(["code", "rust", "freebsd"]);
let agent = store.register_agent("codex", caps.clone()).unwrap();
assert_eq!(agent.name, "codex");
assert_eq!(agent.status, "active");
assert_eq!(agent.capabilities, caps);
let got = store.get_agent(&agent.id).unwrap().unwrap();
assert_eq!(got.name, "codex");
store.set_agent_status(&agent.id, "offline").unwrap();
let got = store.get_agent(&agent.id).unwrap().unwrap();
assert_eq!(got.status, "offline");
}
#[test]
fn test_skills_catalog() {
let store = Store::open_memory().unwrap();
let skill = store
.register_skill(
"ssh-agent-setup",
Some("SSH agent persistence"),
Some("devops"),
)
.unwrap();
assert_eq!(skill.name, "ssh-agent-setup");
assert_eq!(skill.description.as_deref(), Some("SSH agent persistence"));
let all = store.list_skills().unwrap();
assert_eq!(all.len(), 1);
}
#[test]
fn test_export_json() {
let store = Store::open_memory().unwrap();
store.create_task("Export test", None).unwrap();
store
.register_agent("test-agent", serde_json::json!(["test"]))
.unwrap();
store
.register_skill("test-skill", None, Some("test"))
.unwrap();
let json = store.export_json().unwrap();
assert!(json["tasks"].as_array().unwrap().len() == 1);
assert!(json["agents"].as_array().unwrap().len() == 1);
assert!(json["skills"].as_array().unwrap().len() == 1);
assert!(json["exported_at"].is_string());
}
#[test]
fn test_default_db_path_freebsd() {
// On Linux, this returns the XDG path. On FreeBSD, /var/db/colibri.
let path = default_db_path();
assert!(path.to_string_lossy().contains("colibri.sqlite"));
}
}

View file

@ -0,0 +1,42 @@
/// SQL schema for the Colibri coordination store.
///
/// All tables use `IF NOT EXISTS` — safe to run on every open (idempotent).
/// No downward migrations yet; schema evolution adds columns/tables additively.
pub const SCHEMA_SQL: &str = "
-- Tasks: the coordination board's core lifecycle
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY NOT NULL,
agent_id TEXT,
status TEXT NOT NULL DEFAULT 'queued'
CHECK(status IN ('queued','claimed','started','done','failed')),
title TEXT NOT NULL,
description TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY (agent_id) REFERENCES agents(id)
);
CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status);
CREATE INDEX IF NOT EXISTS idx_tasks_agent ON tasks(agent_id);
-- Agents: registered teammates with capability profiles
CREATE TABLE IF NOT EXISTS agents (
id TEXT PRIMARY KEY NOT NULL,
name TEXT NOT NULL UNIQUE,
capabilities TEXT NOT NULL DEFAULT '[]',
status TEXT NOT NULL DEFAULT 'active'
CHECK(status IN ('active','idle','offline')),
created_at TEXT NOT NULL
);
-- Skills: team-wide compounding catalog
CREATE TABLE IF NOT EXISTS skills (
id TEXT PRIMARY KEY NOT NULL,
name TEXT NOT NULL UNIQUE,
description TEXT,
category TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_skills_category ON skills(category);
";