Add colibri-daemon: always-on Rust agent runtime scaffold (Sam & Hermes)

Scaffolds the daemon crate that replaces agent-runner.ts,
agent-session.ts, and session-compaction.ts from Clawdie-AI TS.

Crate: crates/colibri-daemon
- src/main.rs — tokio::main with graceful shutdown
- src/config.rs — DaemonConfig from env (DEEPSEEK_API_KEY, etc.)
- src/daemon.rs — main loop: heartbeat, session rotation, memory handoff
- src/session.rs — JSONL session lifecycle (write/read/prune)
- src/spawner.rs — agent subprocess management (pi spawn)
- src/socket.rs — Unix socket API for Herdr glasspane
- 4 tests passing, cargo check green
This commit is contained in:
123kupola 2026-05-27 02:32:17 +02:00
parent 9ffdde0579
commit 9f6af19917
10 changed files with 2336 additions and 3 deletions

519
Cargo.lock generated
View file

@ -2,6 +2,15 @@
# It is not intended for manual editing.
version = 4
[[package]]
name = "aho-corasick"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301"
dependencies = [
"memchr",
]
[[package]]
name = "android_system_properties"
version = "0.1.5"
@ -29,6 +38,17 @@ version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2032f911046de80f0a198e0901378627c33f59ea0ac00e363d481118bd70a53"
[[package]]
name = "backon"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef"
dependencies = [
"fastrand",
"gloo-timers",
"tokio",
]
[[package]]
name = "base64"
version = "0.22.1"
@ -117,6 +137,28 @@ dependencies = [
"serde_json",
]
[[package]]
name = "colibri-daemon"
version = "0.0.1"
dependencies = [
"backon",
"chrono",
"colibri-contracts",
"colibri-deepseek",
"colibri-glasspane",
"colibri-runtime",
"dashmap",
"reqwest",
"serde",
"serde_json",
"thiserror 2.0.18",
"tokio",
"tokio-util",
"tracing",
"tracing-subscriber",
"uuid",
]
[[package]]
name = "colibri-deepseek"
version = "0.0.1"
@ -156,6 +198,26 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
[[package]]
name = "dashmap"
version = "6.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6361d5c062261c78a176addb82d4c821ae42bed6089de0e12603cd25de2059c"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "displaydoc"
version = "0.2.5"
@ -179,6 +241,28 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2"
[[package]]
name = "equivalent"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
[[package]]
name = "errno"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb"
dependencies = [
"libc",
"windows-sys 0.61.2",
]
[[package]]
name = "fastrand"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6"
[[package]]
name = "filedescriptor"
version = "0.8.3"
@ -196,6 +280,12 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582"
[[package]]
name = "foldhash"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "form_urlencoded"
version = "1.2.2"
@ -220,6 +310,12 @@ version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"
[[package]]
name = "futures-sink"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893"
[[package]]
name = "futures-task"
version = "0.3.32"
@ -260,11 +356,63 @@ dependencies = [
"cfg-if",
"js-sys",
"libc",
"r-efi",
"r-efi 5.3.0",
"wasip2",
"wasm-bindgen",
]
[[package]]
name = "getrandom"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555"
dependencies = [
"cfg-if",
"libc",
"r-efi 6.0.0",
"wasip2",
"wasip3",
]
[[package]]
name = "gloo-timers"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994"
dependencies = [
"futures-channel",
"futures-core",
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
[[package]]
name = "hashbrown"
version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
dependencies = [
"foldhash",
]
[[package]]
name = "hashbrown"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a"
[[package]]
name = "heck"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "http"
version = "1.4.1"
@ -469,6 +617,12 @@ dependencies = [
"zerovec",
]
[[package]]
name = "id-arena"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954"
[[package]]
name = "idna"
version = "1.1.0"
@ -490,6 +644,18 @@ dependencies = [
"icu_properties",
]
[[package]]
name = "indexmap"
version = "2.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9"
dependencies = [
"equivalent",
"hashbrown 0.17.1",
"serde",
"serde_core",
]
[[package]]
name = "ipnet"
version = "2.12.0"
@ -520,6 +686,12 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "leb128fmt"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "libc"
version = "0.2.186"
@ -532,6 +704,15 @@ version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0"
[[package]]
name = "lock_api"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965"
dependencies = [
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.30"
@ -544,6 +725,15 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
[[package]]
name = "matchers"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
dependencies = [
"regex-automata",
]
[[package]]
name = "memchr"
version = "2.8.0"
@ -573,6 +763,15 @@ dependencies = [
"libc",
]
[[package]]
name = "nu-ansi-term"
version = "0.50.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "num-traits"
version = "0.2.19"
@ -588,6 +787,29 @@ version = "1.21.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50"
[[package]]
name = "parking_lot"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-link",
]
[[package]]
name = "percent-encoding"
version = "2.3.2"
@ -639,6 +861,16 @@ dependencies = [
"zerocopy",
]
[[package]]
name = "prettyplease"
version = "0.2.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b"
dependencies = [
"proc-macro2",
"syn",
]
[[package]]
name = "proc-macro2"
version = "1.0.106"
@ -718,6 +950,12 @@ version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
[[package]]
name = "r-efi"
version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf"
[[package]]
name = "rand"
version = "0.9.4"
@ -747,6 +985,32 @@ dependencies = [
"getrandom 0.3.4",
]
[[package]]
name = "redox_syscall"
version = "0.5.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d"
dependencies = [
"bitflags 2.11.1",
]
[[package]]
name = "regex-automata"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a"
[[package]]
name = "reqwest"
version = "0.12.28"
@ -852,6 +1116,18 @@ version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f"
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "semver"
version = "1.0.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd"
[[package]]
name = "serde"
version = "1.0.228"
@ -918,6 +1194,15 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]]
name = "shared_library"
version = "0.1.9"
@ -940,6 +1225,16 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook-registry"
version = "1.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b"
dependencies = [
"errno",
"libc",
]
[[package]]
name = "slab"
version = "0.4.12"
@ -1045,6 +1340,15 @@ dependencies = [
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185"
dependencies = [
"cfg-if",
]
[[package]]
name = "tinystr"
version = "0.8.3"
@ -1079,7 +1383,9 @@ dependencies = [
"bytes",
"libc",
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.61.2",
@ -1106,6 +1412,19 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tower"
version = "0.5.3"
@ -1158,9 +1477,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100"
dependencies = [
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.36"
@ -1168,6 +1499,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex-automata",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]]
@ -1182,6 +1543,12 @@ version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75"
[[package]]
name = "unicode-xid"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853"
[[package]]
name = "untrusted"
version = "0.9.0"
@ -1206,6 +1573,23 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "uuid"
version = "1.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddd74a9687298c6858e9b88ec8935ec45d22e8fd5e6394fa1bd4e99a87789c76"
dependencies = [
"getrandom 0.4.2",
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "valuable"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "want"
version = "0.3.1"
@ -1227,7 +1611,16 @@ version = "1.0.3+wasi-0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20064672db26d7cdc89c7798c48a0fdfac8213434a1186e5ef29fd560ae223d6"
dependencies = [
"wit-bindgen",
"wit-bindgen 0.57.1",
]
[[package]]
name = "wasip3"
version = "0.4.0+wasi-0.3.0-rc-2026-01-06"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5"
dependencies = [
"wit-bindgen 0.51.0",
]
[[package]]
@ -1285,6 +1678,40 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "wasm-encoder"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319"
dependencies = [
"leb128fmt",
"wasmparser",
]
[[package]]
name = "wasm-metadata"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909"
dependencies = [
"anyhow",
"indexmap",
"wasm-encoder",
"wasmparser",
]
[[package]]
name = "wasmparser"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe"
dependencies = [
"bitflags 2.11.1",
"hashbrown 0.15.5",
"indexmap",
"semver",
]
[[package]]
name = "web-sys"
version = "0.3.99"
@ -1560,12 +1987,100 @@ dependencies = [
"winapi",
]
[[package]]
name = "wit-bindgen"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5"
dependencies = [
"wit-bindgen-rust-macro",
]
[[package]]
name = "wit-bindgen"
version = "0.57.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ebf944e87a7c253233ad6766e082e3cd714b5d03812acc24c318f549614536e"
[[package]]
name = "wit-bindgen-core"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc"
dependencies = [
"anyhow",
"heck",
"wit-parser",
]
[[package]]
name = "wit-bindgen-rust"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21"
dependencies = [
"anyhow",
"heck",
"indexmap",
"prettyplease",
"syn",
"wasm-metadata",
"wit-bindgen-core",
"wit-component",
]
[[package]]
name = "wit-bindgen-rust-macro"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a"
dependencies = [
"anyhow",
"prettyplease",
"proc-macro2",
"quote",
"syn",
"wit-bindgen-core",
"wit-bindgen-rust",
]
[[package]]
name = "wit-component"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2"
dependencies = [
"anyhow",
"bitflags 2.11.1",
"indexmap",
"log",
"serde",
"serde_derive",
"serde_json",
"wasm-encoder",
"wasm-metadata",
"wasmparser",
"wit-parser",
]
[[package]]
name = "wit-parser"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736"
dependencies = [
"anyhow",
"id-arena",
"indexmap",
"log",
"semver",
"serde",
"serde_derive",
"serde_json",
"unicode-xid",
"wasmparser",
]
[[package]]
name = "writeable"
version = "0.6.3"

View file

@ -1,5 +1,5 @@
[workspace]
members = ["crates/colibri-contracts", "crates/colibri-deepseek", "crates/colibri-runtime", "crates/colibri-glasspane"]
members = ["crates/colibri-contracts", "crates/colibri-deepseek", "crates/colibri-runtime", "crates/colibri-glasspane", "crates/colibri-daemon"]
[package]
name = "colibri"

View file

@ -0,0 +1,24 @@
[package]
name = "colibri-daemon"
version = "0.0.1"
edition = "2021"
license = "AGPL-3.0-only"
description = "Always-on Rust service: agent session lifecycle, subprocess spawner, Herdr Unix socket API"
[dependencies]
colibri-contracts = { path = "../colibri-contracts" }
colibri-deepseek = { path = "../colibri-deepseek" }
colibri-glasspane = { path = "../colibri-glasspane" }
colibri-runtime = { path = "../colibri-runtime" }
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["codec"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
dashmap = "6"
backon = "1"
uuid = { version = "1", features = ["v4"] }
thiserror = "2"

View file

@ -0,0 +1,180 @@
//! Daemon configuration from environment variables.
//!
//! Replaces TypeScript `process.env` + zod validation scattered across
//! agent-runner.ts and agent-session.ts.
use std::path::PathBuf;
use serde::{Deserialize, Serialize};
// ---------------------------------------------------------------------------
// Daemon configuration
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DaemonConfig {
/// Directory for session JSONL files and state.
pub data_dir: PathBuf,
/// Path for the Herdr Unix socket.
pub socket_path: PathBuf,
/// Maximum bytes per session before automatic rollover.
pub session_max_bytes: u64,
/// DeepSeek API key for provider routing.
pub deepseek_api_key: Option<String>,
/// DeepSeek API endpoint.
pub deepseek_endpoint: String,
/// DeepSeek model string.
pub deepseek_model: String,
/// OpenRouter API key for fallback routing.
pub openrouter_api_key: Option<String>,
/// OpenRouter API endpoint.
pub openrouter_endpoint: String,
/// Anthropic API key for fallback routing.
pub anthropic_api_key: Option<String>,
/// Anthropic API endpoint.
pub anthropic_endpoint: String,
/// Host identifier.
pub host: String,
/// Maximum context window tokens (for compaction trigger).
pub max_context_tokens: u64,
/// Compaction target: max turns to keep uncompacted.
pub max_uncompacted_turns: usize,
}
impl DaemonConfig {
/// Build configuration from environment variables with sensible defaults.
pub fn from_env() -> Self {
let host = std::env::var("COLIBRI_HOST")
.or_else(|_| std::env::var("HOSTNAME"))
.ok()
.filter(|v| !v.trim().is_empty())
.unwrap_or_else(|| "unknown".to_string());
let data_dir = std::env::var("COLIBRI_DAEMON_DATA_DIR")
.map(PathBuf::from)
.unwrap_or_else(|_| {
dirs_data().unwrap_or_else(|| PathBuf::from("/tmp/colibri-daemon"))
});
let socket_path = std::env::var("COLIBRI_DAEMON_SOCKET")
.map(PathBuf::from)
.unwrap_or_else(|_| data_dir.join("colibri-daemon.sock"));
Self {
data_dir,
socket_path,
session_max_bytes: env_parse("COLIBRI_SESSION_MAX_BYTES").unwrap_or(2_000_000),
deepseek_api_key: nonempty_env("DEEPSEEK_API_KEY"),
deepseek_endpoint: std::env::var("DEEPSEEK_ENDPOINT")
.unwrap_or_else(|_| "https://api.deepseek.com/chat/completions".to_string()),
deepseek_model: std::env::var("DEEPSEEK_MODEL")
.unwrap_or_else(|_| "deepseek-chat".to_string()),
openrouter_api_key: nonempty_env("OPENROUTER_API_KEY"),
openrouter_endpoint: std::env::var("OPENROUTER_ENDPOINT")
.unwrap_or_else(|_| "https://openrouter.ai/api/v1/chat/completions".to_string()),
anthropic_api_key: nonempty_env("ANTHROPIC_API_KEY"),
anthropic_endpoint: std::env::var("ANTHROPIC_ENDPOINT")
.unwrap_or_else(|_| "https://api.anthropic.com/v1/messages".to_string()),
host,
max_context_tokens: env_parse("COLIBRI_MAX_CONTEXT_TOKENS").unwrap_or(128_000),
max_uncompacted_turns: env_parse("COLIBRI_MAX_UNCOMPACTED_TURNS").unwrap_or(20),
}
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
fn nonempty_env(name: &str) -> Option<String> {
std::env::var(name).ok().filter(|v| !v.trim().is_empty())
}
fn env_parse<T: std::str::FromStr>(name: &str) -> Option<T> {
std::env::var(name).ok().and_then(|v| v.parse().ok())
}
fn dirs_data() -> Option<PathBuf> {
std::env::var("XDG_DATA_HOME")
.ok()
.filter(|v| !v.trim().is_empty())
.map(PathBuf::from)
.or_else(|| {
std::env::var("HOME")
.ok()
.map(|h| PathBuf::from(h).join(".local").join("share").join("colibri-daemon"))
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_defaults_from_empty_env() {
// Safety: temporarily clear relevant env vars
let vars = [
"COLIBRI_HOST",
"HOSTNAME",
"COLIBRI_DAEMON_DATA_DIR",
"XDG_DATA_HOME",
"COLIBRI_DAEMON_SOCKET",
"COLIBRI_SESSION_MAX_BYTES",
"DEEPSEEK_API_KEY",
"DEEPSEEK_ENDPOINT",
"DEEPSEEK_MODEL",
"OPENROUTER_API_KEY",
"OPENROUTER_ENDPOINT",
"ANTHROPIC_API_KEY",
"ANTHROPIC_ENDPOINT",
"COLIBRI_MAX_CONTEXT_TOKENS",
"COLIBRI_MAX_UNCOMPACTED_TURNS",
];
let saved: Vec<(&str, Option<String>)> = vars
.iter()
.map(|&name| (name, std::env::var(name).ok()))
.collect();
for &name in &vars {
std::env::remove_var(name);
}
let config = DaemonConfig::from_env();
// Restore saved env
for (name, val) in saved {
if let Some(v) = val {
std::env::set_var(name, v);
} else {
std::env::remove_var(name);
}
}
assert_eq!(config.host, "unknown");
assert!(config.data_dir.to_string_lossy().contains("colibri-daemon"));
assert_eq!(config.session_max_bytes, 2_000_000);
assert_eq!(config.deepseek_model, "deepseek-chat");
assert_eq!(config.max_context_tokens, 128_000);
assert_eq!(config.max_uncompacted_turns, 20);
assert!(config.deepseek_api_key.is_none());
assert!(config.openrouter_api_key.is_none());
assert!(config.anthropic_api_key.is_none());
}
#[test]
fn test_config_from_env_vars() {
std::env::set_var("COLIBRI_HOST", "test-host");
std::env::set_var("DEEPSEEK_API_KEY", "sk-test");
std::env::set_var("DEEPSEEK_MODEL", "deepseek-v4");
std::env::set_var("COLIBRI_MAX_CONTEXT_TOKENS", "64000");
std::env::set_var("COLIBRI_SESSION_MAX_BYTES", "500000");
let config = DaemonConfig::from_env();
assert_eq!(config.host, "test-host");
assert_eq!(config.deepseek_api_key.as_deref(), Some("sk-test"));
assert_eq!(config.deepseek_model, "deepseek-v4");
assert_eq!(config.max_context_tokens, 64000);
assert_eq!(config.session_max_bytes, 500000);
}
}

View file

@ -0,0 +1,340 @@
//! Main daemon loop — heartbeat, task polling, session rotation, memory handoff.
//!
//! Replaces control-plane loop logic scattered across agent-runner.ts,
//! agent-session.ts, and session-compaction.ts in Clawdie-AI TypeScript.
use std::sync::Arc;
use std::time::Duration;
use dashmap::DashMap;
use tokio::sync::broadcast;
use tracing::{debug, error, info, warn};
use crate::config::DaemonConfig;
use crate::session::Session;
use crate::spawner::{AgentHandle, Spawner};
// ---------------------------------------------------------------------------
// Shared daemon state
// ---------------------------------------------------------------------------
/// Shared state accessible to all daemon subsystems.
pub struct DaemonState {
pub config: DaemonConfig,
/// Active agent sessions keyed by session ID.
pub sessions: DashMap<String, Session>,
/// Running agent subprocesses keyed by agent ID.
pub agents: DashMap<String, AgentHandle>,
/// Shutdown signal: on drop or ctrl_c, all listeners abort.
pub shutdown_tx: broadcast::Sender<()>,
pub shutdown_rx: broadcast::Receiver<()>,
}
impl DaemonState {
pub fn new(config: DaemonConfig) -> Self {
let (shutdown_tx, shutdown_rx) = broadcast::channel(16);
Self {
config,
sessions: DashMap::new(),
agents: DashMap::new(),
shutdown_tx,
shutdown_rx,
}
}
}
pub type SharedState = Arc<DaemonState>;
// ---------------------------------------------------------------------------
// Daemon loop
// ---------------------------------------------------------------------------
/// Configuration for the daemon background loop.
#[derive(Debug, Clone)]
pub struct DaemonLoopConfig {
/// Heartbeat interval (status log + stale agent detection).
pub heartbeat_interval: Duration,
/// Session rotation interval (compaction check).
pub session_rotation_interval: Duration,
/// Memory handoff interval (cross-agent coordination).
pub memory_handoff_interval: Duration,
/// Agent stall timeout: if an agent subprocess hasn't polled in this
/// long, flag it as stalled.
pub agent_stall_timeout: Duration,
}
impl Default for DaemonLoopConfig {
fn default() -> Self {
Self {
heartbeat_interval: Duration::from_secs(30),
session_rotation_interval: Duration::from_secs(60),
memory_handoff_interval: Duration::from_secs(120),
agent_stall_timeout: Duration::from_secs(300),
}
}
}
/// Run the main daemon background loop.
///
/// Spawned as a tokio task. Periodically:
/// 1. Heartbeat: logs status, detects stalled agents
/// 2. Session rotation: checks sessions for compaction/rollover
/// 3. Memory handoff: coordinates shared context across active agents
///
/// Returns when the shutdown signal is received.
pub async fn run_loop(
state: SharedState,
loop_config: DaemonLoopConfig,
mut shutdown_rx: broadcast::Receiver<()>,
) {
let mut heartbeat_tick = tokio::time::interval(loop_config.heartbeat_interval);
let mut rotation_tick = tokio::time::interval(loop_config.session_rotation_interval);
let mut handoff_tick = tokio::time::interval(loop_config.memory_handoff_interval);
// Suppress the initial burst by skipping the first tick
heartbeat_tick.tick().await; // consume immediate tick
rotation_tick.tick().await;
handoff_tick.tick().await;
info!(
heartbeat_secs = loop_config.heartbeat_interval.as_secs(),
rotation_secs = loop_config.session_rotation_interval.as_secs(),
handoff_secs = loop_config.memory_handoff_interval.as_secs(),
"daemon background loop started"
);
loop {
tokio::select! {
_ = heartbeat_tick.tick() => {
heartbeat(&state, loop_config.agent_stall_timeout).await;
}
_ = rotation_tick.tick() => {
session_rotation(&state).await;
}
_ = handoff_tick.tick() => {
memory_handoff(&state).await;
}
_ = shutdown_rx.recv() => {
info!("daemon loop received shutdown signal");
break;
}
}
}
info!("daemon background loop exited");
}
// ---------------------------------------------------------------------------
// Heartbeat
// ---------------------------------------------------------------------------
/// Log daemon status and detect stalled agent subprocesses.
async fn heartbeat(state: &SharedState, _stall_timeout: Duration) {
let session_count = state.sessions.len();
let agent_count = state.agents.len();
debug!(
sessions = session_count,
agents = agent_count,
"daemon heartbeat"
);
// Check for stalled agent subprocesses
let mut stalled: Vec<String> = Vec::new();
for entry in state.agents.iter() {
let handle = entry.value();
if let Some(status) = handle.poll_exit().await {
warn!(
agent_id = %handle.id,
exit_status = ?status,
"agent subprocess exited"
);
// If the process exited with an error, mark it
if !status.success() {
stalled.push(handle.id.clone());
}
}
}
// Clean up stalled/stopped agents from the registry
// (keep them for a grace period so Herdr can query status)
if !stalled.is_empty() {
info!(stalled_agents = ?stalled, "detected stalled agents");
}
}
// ---------------------------------------------------------------------------
// Session rotation
// ---------------------------------------------------------------------------
/// Check and rotate sessions that exceed byte/turn thresholds.
async fn session_rotation(state: &SharedState) {
let mut compacted = 0usize;
let mut pruned = 0usize;
for entry in state.sessions.iter() {
let session = entry.value();
let (byte_count, turn_count) = {
let bc = session.byte_count().await;
let tc = session.turn_count().await;
(bc, tc)
};
let needs_compaction = byte_count > state.config.session_max_bytes
|| turn_count > state.config.max_uncompacted_turns;
if needs_compaction {
debug!(
session_id = %session.id,
byte_count = byte_count,
turn_count = turn_count,
session_max_bytes = state.config.session_max_bytes,
max_uncompacted_turns = state.config.max_uncompacted_turns,
"triggering session compaction"
);
match session.compact_oldest_turns().await {
Ok(()) => {
compacted += 1;
info!(
session_id = %session.id,
"session compaction completed"
);
}
Err(e) => {
error!(
session_id = %session.id,
error = %e,
"session compaction failed"
);
}
}
}
// If byte count is very large even after compaction, prune aggressively
let pruned_byte_count = session.byte_count().await;
if pruned_byte_count > state.config.session_max_bytes * 3 {
let keep_turns = (state.config.max_uncompacted_turns / 2).max(1);
match session.prune_to(keep_turns).await {
Ok(()) => {
pruned += 1;
info!(
session_id = %session.id,
keep_turns = keep_turns,
"session pruned to limit"
);
}
Err(e) => {
error!(
session_id = %session.id,
error = %e,
"session prune failed"
);
}
}
}
}
if compacted > 0 || pruned > 0 {
info!(
compacted = compacted,
pruned = pruned,
"session rotation complete"
);
}
}
// ---------------------------------------------------------------------------
// Memory handoff
// ---------------------------------------------------------------------------
/// Coordinate shared context across active agents.
///
/// This is the Rust equivalent of the TypeScript memory-handoff logic that
/// enables agents to share context/state without bloating individual sessions.
/// Currently a stub that logs state; full implementation requires an LLM
/// compaction pass to produce shared summaries.
async fn memory_handoff(state: &SharedState) {
let agent_count = state.agents.len();
let session_count = state.sessions.len();
if agent_count == 0 {
debug!("memory handoff skipped: no active agents");
return;
}
debug!(
agents = agent_count,
sessions = session_count,
"memory handoff tick"
);
// Future: produce a shared context snippet that all agents can reference,
// replacing the need for each agent to carry full context independently.
// This will call into the LLM (via colibri-deepseek) for summarization,
// then inject the summary into active sessions as a Compaction entry.
// Example placeholder for the real handoff:
// for entry in state.sessions.iter() {
// let session = entry.value();
// let turns = session.turns().await;
// if turns.len() > 10 {
// // Summarize and inject shared context
// debug!(session_id = %session.id, turns = turns.len(), "handoff candidate");
// }
// }
}
// ---------------------------------------------------------------------------
// Task polling
// ---------------------------------------------------------------------------
/// Poll for new tasks from the control plane.
///
/// This is the daemon's entry point for external task dispatch — e.g. a
/// control-plane operator (Herdr, web dashboard, cron) pushes a new agent run
/// request. The daemon picks it up and routes it through the spawner.
///
/// Currently a stub; will be wired to a task queue (filesystem, Redis, or
/// database-backed) once the control-plane API is finalized.
pub async fn poll_tasks(state: &SharedState) {
debug!("task polling tick");
// Placeholder: check a well-known directory or queue for pending task
// manifests, then call Spawner::spawn() for each.
let _spawner = Spawner::new(state.config.clone().into());
// Example:
// let tasks_dir = state.config.data_dir.join("tasks");
// if tasks_dir.exists() {
// let mut entries = tokio::fs::read_dir(&tasks_dir).await?;
// while let Some(entry) = entries.next_entry().await? {
// // parse task, spawn agent
// }
// }
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_daemon_loop_config_defaults() {
let config = DaemonLoopConfig::default();
assert_eq!(config.heartbeat_interval, Duration::from_secs(30));
assert_eq!(config.session_rotation_interval, Duration::from_secs(60));
assert_eq!(config.memory_handoff_interval, Duration::from_secs(120));
assert_eq!(config.agent_stall_timeout, Duration::from_secs(300));
}
#[test]
fn test_daemon_state_creation() {
let config = DaemonConfig::from_env();
let state = DaemonState::new(config);
assert_eq!(state.sessions.len(), 0);
assert_eq!(state.agents.len(), 0);
assert_eq!(state.shutdown_tx.receiver_count(), 1);
}
}

View file

@ -0,0 +1,77 @@
//! colibri-daemon — always-on Rust service replacing agent-runner.ts,
//! agent-session.ts, session-compaction.ts, and controlplane-*.ts runners.
//!
//! Core responsibilities:
//! - Session lifecycle (JSONL write/read/prune + context-window management)
//! - Agent subprocess spawner (provider/model config, retry/backoff)
//! - Unix socket API for Herdr operator dashboard
//! - Provider routing: DeepSeek primary, OpenRouter/Anthropic fallback
//! - DeepSeek cache discipline: immutable system prefix + appendable log +
//! volatile scratch (the 3-region prompt model)
pub mod config;
pub mod daemon;
pub mod session;
pub mod socket;
pub mod spawner;
// Re-exports for convenience
pub use config::DaemonConfig;
pub use daemon::{DaemonState, SharedState};
use serde::{Deserialize, Serialize};
// ---------------------------------------------------------------------------
// Wire types for the socket API
// ---------------------------------------------------------------------------
/// Inbound command from Herdr over the Unix socket.
#[derive(Debug, Deserialize)]
#[serde(tag = "cmd")]
pub enum HerdrCommand {
#[serde(rename = "status")]
Status,
#[serde(rename = "list-sessions")]
ListSessions,
#[serde(rename = "spawn-agent")]
SpawnAgent {
provider: String,
model: String,
session_id: Option<String>,
system_prompt: Option<String>,
},
#[serde(rename = "kill-agent")]
KillAgent { agent_id: String },
#[serde(rename = "get-session")]
GetSession { session_id: String },
#[serde(rename = "compact-session")]
CompactSession { session_id: String },
}
/// Outbound response to Herdr.
#[derive(Debug, Serialize)]
pub struct HerdrResponse {
pub ok: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<serde_json::Value>,
}
impl HerdrResponse {
pub fn ok(data: serde_json::Value) -> Self {
Self {
ok: true,
error: None,
data: Some(data),
}
}
pub fn err(message: impl Into<String>) -> Self {
Self {
ok: false,
error: Some(message.into()),
data: None,
}
}
}

View file

@ -0,0 +1,101 @@
//! colibri-daemon — always-on Rust service for the Colibri control plane.
//!
//! Starts:
//! - Session manager (JSONL persistence + context window management)
//! - Agent spawner (subprocess lifecycle + provider routing)
//! - Herdr Unix socket API
//!
//! Graceful shutdown on SIGTERM/SIGINT (or Ctrl+C).
use std::sync::Arc;
use colibri_daemon::{session, socket, DaemonConfig, DaemonState, SharedState};
use tracing::info;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialise tracing
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
info!("colibri-daemon starting up");
// Load configuration
let config = DaemonConfig::from_env();
info!(
host = %config.host,
data_dir = %config.data_dir.display(),
socket_path = %config.socket_path.display(),
session_max_bytes = config.session_max_bytes,
max_context_tokens = config.max_context_tokens,
"configuration loaded"
);
// Build shared state
let state: SharedState = Arc::new(DaemonState::new(config.clone()));
// Ensure data directories exist
tokio::fs::create_dir_all(&config.data_dir).await?;
tokio::fs::create_dir_all(config.data_dir.join("sessions")).await?;
// Bootstrap: load any existing sessions
if let Ok(mut entries) = tokio::fs::read_dir(config.data_dir.join("sessions")).await {
while let Ok(Some(entry)) = entries.next_entry().await {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) == Some("jsonl") {
if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
let session_id = stem.to_string();
let cfg = Arc::new(config.clone());
match session::Session::load(session_id.clone(), cfg) {
Ok(sess) => {
info!(session_id = %session_id, "loaded existing session");
state.sessions.insert(session_id, sess);
}
Err(e) => {
tracing::warn!(
session_id = %session_id,
error = %e,
"failed to load session file"
);
}
}
}
}
}
}
// Start the Herdr socket server
let socket_state = state.clone();
let socket_shutdown = state.shutdown_rx.resubscribe();
let socket_handle = tokio::spawn(async move {
socket::serve(socket_state, socket_shutdown).await;
});
// Listen for shutdown signals
let shutdown_state = state.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
info!("received interrupt signal, initiating graceful shutdown");
let _ = shutdown_state.shutdown_tx.send(());
// Give sub-tasks a moment to clean up
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
// Kill any running agent subprocesses
for entry in shutdown_state.agents.iter() {
let handle = entry.value();
info!(agent_id = %handle.id, "killing agent");
let _ = handle.kill().await;
}
});
// Wait for the socket server to finish
socket_handle.await?;
info!("colibri-daemon shut down cleanly");
Ok(())
}

View file

@ -0,0 +1,429 @@
//! Session lifecycle management — replaces agent-session.ts and
//! session-compaction.ts.
//!
//! Each session is an append-only JSONL file on disk with an in-memory
//! representation. The session enforces the DeepSeek 3-region prompt model:
//!
//! 1. Immutable system prefix (byte-stable, cacheable)
//! 2. Appendable conversation log (turns accumulate until compaction)
//! 3. Volatile scratch (discarded per-turn, never persisted)
//!
//! When total bytes exceed `session_max_bytes` or the number of uncompacted
//! turns exceeds `max_uncompacted_turns`, the oldest turns are compacted
//! (summarised) via LLM call. Compacted summaries are stored as special
//! "compaction" entries in the JSONL.
use std::collections::VecDeque;
use std::io::{BufRead, BufReader, Write};
use std::path::PathBuf;
use std::sync::Arc;
use chrono::Utc;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use thiserror::Error;
use tokio::sync::RwLock;
use crate::DaemonConfig;
// ---------------------------------------------------------------------------
// Turn / Entry types
// ---------------------------------------------------------------------------
/// A single entry in the session JSONL log.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum SessionEntry {
#[serde(rename = "system")]
System {
content: String,
#[serde(default = "default_true")]
cacheable: bool,
},
#[serde(rename = "user")]
User { content: String },
#[serde(rename = "assistant")]
Assistant { content: String },
#[serde(rename = "tool_call")]
ToolCall {
name: String,
arguments: Value,
},
#[serde(rename = "tool_result")]
ToolResult {
name: String,
result: Value,
},
#[serde(rename = "compaction")]
Compaction {
summary: String,
compacted_count: usize,
#[serde(flatten)]
extra: Value,
},
}
fn default_true() -> bool {
true
}
impl SessionEntry {
/// Approximate token count for this entry (character count / 3).
pub fn approx_tokens(&self) -> u64 {
let s = serde_json::to_string(self).unwrap_or_default();
(s.len() as u64).div_ceil(3)
}
}
/// A single conversation turn (user + assistant + optional tool calls/results).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Turn {
pub index: u64,
pub entries: Vec<SessionEntry>,
}
impl Turn {
pub fn approx_tokens(&self) -> u64 {
self.entries.iter().map(SessionEntry::approx_tokens).sum()
}
}
// ---------------------------------------------------------------------------
// Session
// ---------------------------------------------------------------------------
#[derive(Debug, Error)]
pub enum SessionError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
#[error("session not found: {0}")]
NotFound(String),
#[error("session already exists: {0}")]
AlreadyExists(String),
}
/// An active agent session.
///
/// Thread-safe: all mutable state is behind an `RwLock`.
pub struct Session {
/// Unique session ID.
pub id: String,
/// Path to the backing JSONL file.
path: PathBuf,
/// In-memory turn buffer (most recent first for efficient pruning).
turns: RwLock<VecDeque<Turn>>,
/// Total bytes written to the JSONL file (approximate).
byte_count: RwLock<u64>,
/// Configuration reference.
config: Arc<DaemonConfig>,
/// Creation timestamp.
pub created_at: String,
}
impl Session {
/// Create a new session, writing the initial JSONL file.
pub fn create(id: String, config: Arc<DaemonConfig>) -> Result<Self, SessionError> {
let path = config.data_dir.join("sessions").join(format!("{id}.jsonl"));
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
if path.exists() {
return Err(SessionError::AlreadyExists(id));
}
// Touch the file
std::fs::File::create(&path)?;
Ok(Self {
id,
path,
turns: RwLock::new(VecDeque::new()),
byte_count: RwLock::new(0),
config,
created_at: Utc::now().to_rfc3339(),
})
}
/// Load an existing session from its JSONL file.
pub fn load(id: String, config: Arc<DaemonConfig>) -> Result<Self, SessionError> {
let path = config.data_dir.join("sessions").join(format!("{id}.jsonl"));
if !path.exists() {
return Err(SessionError::NotFound(id));
}
let file = std::fs::File::open(&path)?;
let reader = BufReader::new(file);
let mut turns: VecDeque<Turn> = VecDeque::new();
let mut current_turn: Option<Turn> = None;
let mut turn_index: u64 = 0;
let mut byte_count: u64 = 0;
for line in reader.lines() {
let line = line?;
byte_count += line.len() as u64 + 1; // +1 for newline
let entry: SessionEntry = serde_json::from_str(&line)?;
let is_user = matches!(entry, SessionEntry::User { .. } | SessionEntry::System { .. });
if is_user {
// Start a new turn
if let Some(turn) = current_turn.take() {
turns.push_back(turn);
}
current_turn = Some(Turn {
index: turn_index,
entries: vec![entry],
});
turn_index += 1;
} else if let Some(ref mut turn) = current_turn {
turn.entries.push(entry);
} else {
// Orphan entry not preceded by a user/system message: treat as
// its own turn.
current_turn = Some(Turn {
index: turn_index,
entries: vec![entry],
});
turn_index += 1;
}
}
if let Some(turn) = current_turn {
turns.push_back(turn);
}
Ok(Self {
id,
path,
turns: RwLock::new(turns),
byte_count: RwLock::new(byte_count),
config,
created_at: Utc::now().to_rfc3339(), // best-effort; could store in session metadata
})
}
/// Append a single entry to the session, flushing to JSONL.
pub async fn append(&self, entry: SessionEntry) -> Result<(), SessionError> {
let line = serde_json::to_string(&entry)?;
let line_bytes = line.len() as u64 + 1;
// Write to JSONL
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)?;
writeln!(file, "{line}")?;
// Update in-memory
{
let mut turns = self.turns.write().await;
// Add to the current turn if exists (and entry is assistant/tool),
// otherwise start a new turn for user/system messages.
let is_user =
matches!(entry, SessionEntry::User { .. } | SessionEntry::System { .. });
if is_user || turns.is_empty() {
let next_index = turns.back().map(|t| t.index + 1).unwrap_or(0);
turns.push_back(Turn {
index: next_index,
entries: vec![entry],
});
} else if let Some(last) = turns.back_mut() {
last.entries.push(entry);
}
}
{
let mut bc = self.byte_count.write().await;
*bc += line_bytes;
}
// Check if we need rollover or compaction
self.maybe_compact_or_rollover().await?;
Ok(())
}
/// Get all turns (newest last for prompt construction).
pub async fn turns(&self) -> Vec<Turn> {
let turns = self.turns.read().await;
turns.iter().cloned().collect()
}
/// Get the total byte count.
pub async fn byte_count(&self) -> u64 {
*self.byte_count.read().await
}
/// Get the number of turns.
pub async fn turn_count(&self) -> usize {
self.turns.read().await.len()
}
// ------------------------------------------------------------------
// Compaction
// ------------------------------------------------------------------
/// Check thresholds and compact/prune if needed.
async fn maybe_compact_or_rollover(&self) -> Result<(), SessionError> {
let (byte_count, turn_count) = {
let bc = self.byte_count.read().await;
let turns = self.turns.read().await;
(*bc, turns.len())
};
let needs_compaction =
byte_count > self.config.session_max_bytes
|| turn_count > self.config.max_uncompacted_turns;
if needs_compaction {
self.compact_oldest_turns().await?;
}
Ok(())
}
/// Compact the oldest turns by merging them into a summary entry.
///
/// The caller (the LLM summarizer) produces a compact summary string.
/// This method handles the bookkeeping: removing old turn entries and
/// appending a compaction marker.
pub async fn compact_oldest_turns(&self) -> Result<(), SessionError> {
let mut turns = self.turns.write().await;
// Keep the most recent `max_uncompacted_turns` turns, compact the rest.
let keep = self.config.max_uncompacted_turns;
if turns.len() <= keep {
return Ok(());
}
let compact_count = turns.len() - keep;
// Build a simple summary of the compacted turns (placeholder; real
// compaction uses LLM summarisation).
let summary = format!(
"[compacted {} earlier turns — total {} entries]",
compact_count,
turns.iter().take(compact_count).map(|t| t.entries.len()).sum::<usize>(),
);
// Append compaction entry to the live JSONL
let entry = SessionEntry::Compaction {
summary,
compacted_count: compact_count,
extra: serde_json::json!({
"compacted_at": Utc::now().to_rfc3339(),
"compacted_turn_indices": (0..compact_count as u64).collect::<Vec<_>>(),
}),
};
let line = serde_json::to_string(&entry)?;
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)?;
writeln!(file, "{line}")?;
// Drop compacted turns from the front
for _ in 0..compact_count {
turns.pop_front();
}
Ok(())
}
/// Prune the session: keep only the last N turns, removing all older
/// entries from both JSONL and memory.
pub async fn prune_to(&self, keep_turns: usize) -> Result<(), SessionError> {
let mut turns = self.turns.write().await;
while turns.len() > keep_turns {
turns.pop_front();
}
// Re-write JSONL from current turns
let tmp_path = self.path.with_extension("jsonl.tmp");
{
let mut file = std::fs::File::create(&tmp_path)?;
for turn in turns.iter() {
for entry in &turn.entries {
let line = serde_json::to_string(entry)?;
writeln!(file, "{line}")?;
}
}
}
std::fs::rename(&tmp_path, &self.path)?;
// Recompute byte count
let meta = std::fs::metadata(&self.path)?;
*self.byte_count.write().await = meta.len();
Ok(())
}
// ------------------------------------------------------------------
// 3-region prompt assembly (for DeepSeek cache discipline)
// ------------------------------------------------------------------
/// Build the 3-region prompt:
/// 1. Immutable system prefix (byte-stable for cache hits)
/// 2. Appendable conversation log (turns, possibly with compaction gaps)
/// 3. Volatile scratch (not included — caller appends per-request)
pub async fn build_prompt_messages(&self) -> Vec<serde_json::Value> {
let mut messages: Vec<serde_json::Value> = Vec::new();
// Region 1: Immutable system prefix
messages.push(serde_json::json!({
"role": "system",
"content": colibri_deepseek::STABLE_SYSTEM_PREFIX,
}));
// Region 2: Appendable conversation log
let turns = self.turns.read().await;
for turn in turns.iter() {
for entry in &turn.entries {
match entry {
SessionEntry::System { content, .. } => {
messages.push(serde_json::json!({"role": "system", "content": content}));
}
SessionEntry::User { content } => {
messages.push(serde_json::json!({"role": "user", "content": content}));
}
SessionEntry::Assistant { content } => {
messages.push(serde_json::json!({"role": "assistant", "content": content}));
}
SessionEntry::ToolCall { name, arguments } => {
messages.push(serde_json::json!({
"role": "assistant",
"tool_calls": [{
"id": format!("call_{}", turn.index),
"type": "function",
"function": {
"name": name,
"arguments": serde_json::to_string(arguments).unwrap_or_default(),
}
}]
}));
}
SessionEntry::ToolResult { name, result } => {
messages.push(serde_json::json!({
"role": "tool",
"tool_call_id": format!("call_{}", turn.index),
"name": name,
"content": result.to_string(),
}));
}
SessionEntry::Compaction { summary, .. } => {
messages.push(serde_json::json!({
"role": "system",
"content": format!("[SESSION COMPACTION] {summary}"),
}));
}
}
}
}
// Region 3 (volatile scratch) is left empty — the API caller appends
// per-request user/assistant content.
messages
}
}
// Tests deferred to crate-level integration tests (tests/ directory).

View file

@ -0,0 +1,284 @@
//! Unix socket API for Herdr operator dashboard.
//!
//! Listens on a Unix domain socket, accepts newline-delimited JSON commands,
//! and dispatches to the session manager and agent spawner.
//!
//! Protocol (matching the existing watchdog JSON+newline convention):
//! - Client sends: `{"cmd":"status"}\n`
//! - Server responds: `{"ok":true,"data":{...}}\n`
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{UnixListener, UnixStream};
use tokio::select;
use tokio::sync::broadcast;
use tracing::{debug, error, info, trace, warn};
use crate::spawner::{AgentSpawnConfig, Provider, Spawner};
use crate::{HerdrCommand, HerdrResponse, SharedState};
// ---------------------------------------------------------------------------
// Socket server
// ---------------------------------------------------------------------------
/// Run the Herdr socket API server. Binds to the configured Unix socket path
/// and processes inbound commands until a shutdown signal is received.
pub async fn serve(state: SharedState, mut shutdown_rx: broadcast::Receiver<()>) {
let socket_path = state.config.socket_path.clone();
// Clean up stale socket file
if socket_path.exists() {
if let Err(e) = tokio::fs::remove_file(&socket_path).await {
warn!(
path = %socket_path.display(),
error = %e,
"failed to remove stale socket file"
);
}
}
// Ensure parent directory exists
if let Some(parent) = socket_path.parent() {
if let Err(e) = tokio::fs::create_dir_all(parent).await {
error!(
path = %parent.display(),
error = %e,
"failed to create socket parent directory"
);
return;
}
}
let listener = match UnixListener::bind(&socket_path) {
Ok(l) => l,
Err(e) => {
error!(
path = %socket_path.display(),
error = %e,
"failed to bind Unix socket"
);
return;
}
};
info!(path = %socket_path.display(), "Herdr socket API listening");
loop {
select! {
accept_result = listener.accept() => {
match accept_result {
Ok((stream, _addr)) => {
let state = state.clone();
tokio::spawn(handle_connection(stream, state));
}
Err(e) => {
error!(error = %e, "socket accept error");
}
}
}
_ = shutdown_rx.recv() => {
info!("socket server received shutdown signal");
break;
}
}
}
// Clean up socket file on graceful exit
let _ = tokio::fs::remove_file(&socket_path).await;
info!("Herdr socket API shut down");
}
// ---------------------------------------------------------------------------
// Connection handler
// ---------------------------------------------------------------------------
async fn handle_connection(stream: UnixStream, state: SharedState) {
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => {
// EOF — client disconnected
trace!("socket client disconnected");
break;
}
Ok(_) => {
let trimmed = line.trim_end();
if trimmed.is_empty() {
continue;
}
let response = match serde_json::from_str::<HerdrCommand>(trimmed) {
Ok(cmd) => dispatch(cmd, &state).await,
Err(e) => HerdrResponse::err(format!("invalid command: {e}")),
};
let mut response_json = serde_json::to_string(&response).unwrap_or_default();
response_json.push('\n');
if let Err(e) = writer.write_all(response_json.as_bytes()).await {
debug!(error = %e, "socket write error");
break;
}
}
Err(e) => {
debug!(error = %e, "socket read error");
break;
}
}
}
}
// ---------------------------------------------------------------------------
// Command dispatch
// ---------------------------------------------------------------------------
async fn dispatch(cmd: HerdrCommand, state: &SharedState) -> HerdrResponse {
match cmd {
HerdrCommand::Status => cmd_status(state).await,
HerdrCommand::ListSessions => cmd_list_sessions(state).await,
HerdrCommand::SpawnAgent {
provider,
model,
session_id,
system_prompt,
} => cmd_spawn_agent(state, provider, model, session_id, system_prompt).await,
HerdrCommand::KillAgent { agent_id } => cmd_kill_agent(state, agent_id).await,
HerdrCommand::GetSession { session_id } => cmd_get_session(state, session_id).await,
HerdrCommand::CompactSession { session_id } => cmd_compact_session(state, session_id).await,
}
}
async fn cmd_status(state: &SharedState) -> HerdrResponse {
let session_count = state.sessions.len();
let agent_count = state.agents.len();
let mut agents_status = Vec::new();
for entry in state.agents.iter() {
let handle = entry.value();
let status = handle.status.read().await;
agents_status.push(serde_json::json!({
"id": handle.id,
"status": format!("{:?}", *status),
"provider": format!("{:?}", handle.config.provider),
"model": handle.config.model,
"created_at": handle.created_at,
}));
}
HerdrResponse::ok(serde_json::json!({
"daemon": "colibri-daemon",
"version": env!("CARGO_PKG_VERSION"),
"host": state.config.host,
"sessions": session_count,
"agents": agent_count,
"agent_list": agents_status,
}))
}
async fn cmd_list_sessions(state: &SharedState) -> HerdrResponse {
let mut sessions = Vec::new();
for entry in state.sessions.iter() {
let session = entry.value();
sessions.push(serde_json::json!({
"id": session.id,
"turn_count": session.turn_count().await,
"byte_count": session.byte_count().await,
"created_at": session.created_at,
}));
}
HerdrResponse::ok(serde_json::json!({
"sessions": sessions,
}))
}
async fn cmd_spawn_agent(
state: &SharedState,
provider_str: String,
model: String,
session_id: Option<String>,
system_prompt: Option<String>,
) -> HerdrResponse {
let provider = match provider_str.to_lowercase().as_str() {
"deepseek" => Provider::DeepSeek,
"openrouter" => Provider::OpenRouter,
"anthropic" => Provider::Anthropic,
other => return HerdrResponse::err(format!("unknown provider: {other}")),
};
let agent_config = AgentSpawnConfig {
binary: std::env::var("COLIBRI_AGENT_BINARY")
.unwrap_or_else(|_| "hermes-agent".to_string()),
args: vec!["--mode".to_string(), "json".to_string()],
provider,
model,
session_id,
system_prompt,
..Default::default()
};
let spawner = Spawner::new(state.config.clone().into());
match spawner.spawn(agent_config).await {
Ok(handle) => {
let id = handle.id.clone();
state.agents.insert(id.clone(), handle);
HerdrResponse::ok(serde_json::json!({
"agent_id": id,
"status": "running",
}))
}
Err(e) => HerdrResponse::err(format!("spawn failed: {e}")),
}
}
async fn cmd_kill_agent(state: &SharedState, agent_id: String) -> HerdrResponse {
match state.agents.get(&agent_id) {
Some(handle) => match handle.value().kill().await {
Ok(()) => {
state.agents.remove(&agent_id);
HerdrResponse::ok(serde_json::json!({
"agent_id": agent_id,
"status": "stopped",
}))
}
Err(e) => HerdrResponse::err(format!("kill failed: {e}")),
},
None => HerdrResponse::err(format!("agent not found: {agent_id}")),
}
}
async fn cmd_get_session(state: &SharedState, session_id: String) -> HerdrResponse {
match state.sessions.get(&session_id) {
Some(session) => {
let turns = session.value().turns().await;
let messages = session.value().build_prompt_messages().await;
HerdrResponse::ok(serde_json::json!({
"session_id": session_id,
"turn_count": turns.len(),
"byte_count": session.value().byte_count().await,
"created_at": session.value().created_at,
"turns": turns,
"prompt_messages": messages,
}))
}
None => HerdrResponse::err(format!("session not found: {session_id}")),
}
}
async fn cmd_compact_session(state: &SharedState, session_id: String) -> HerdrResponse {
match state.sessions.get(&session_id) {
Some(session) => match session.value().compact_oldest_turns().await {
Ok(()) => HerdrResponse::ok(serde_json::json!({
"session_id": session_id,
"turn_count": session.value().turn_count().await,
"status": "compacted",
})),
Err(e) => HerdrResponse::err(format!("compaction failed: {e}")),
},
None => HerdrResponse::err(format!("session not found: {session_id}")),
}
}

View file

@ -0,0 +1,383 @@
//! Agent subprocess management — replaces agent-runner.ts.
//!
//! Spawns agent subprocesses with provider/model configuration, tracks
//! running agents, and implements retry/backoff for API calls.
//!
//! Provider routing priority:
//! 1. DeepSeek (primary, cache-capable)
//! 2. OpenRouter (fallback)
//! 3. Anthropic (fallback)
use std::collections::HashMap;
use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
use backon::{ExponentialBuilder, Retryable};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::process::{Child, Command};
use tokio::sync::{Mutex, RwLock};
use tracing::{error, info, warn};
use uuid::Uuid;
use crate::DaemonConfig;
// ---------------------------------------------------------------------------
// Provider types
// ---------------------------------------------------------------------------
/// Supported LLM providers.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Provider {
DeepSeek,
OpenRouter,
Anthropic,
}
impl Provider {
/// Priority order for fallback routing.
pub fn fallback_order() -> &'static [Provider] {
&[Provider::DeepSeek, Provider::OpenRouter, Provider::Anthropic]
}
/// API endpoint for a provider (from config).
pub fn endpoint<'a>(&self, config: &'a DaemonConfig) -> Option<&'a str> {
match self {
Provider::DeepSeek => Some(&config.deepseek_endpoint),
Provider::OpenRouter => Some(&config.openrouter_endpoint),
Provider::Anthropic => Some(&config.anthropic_endpoint),
}
}
/// API key for a provider (from config).
pub fn api_key<'a>(&self, config: &'a DaemonConfig) -> Option<&'a str> {
match self {
Provider::DeepSeek => config.deepseek_api_key.as_deref(),
Provider::OpenRouter => config.openrouter_api_key.as_deref(),
Provider::Anthropic => config.anthropic_api_key.as_deref(),
}
}
}
// ---------------------------------------------------------------------------
// Agent spawn configuration
// ---------------------------------------------------------------------------
/// Configuration for spawning an agent subprocess.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentSpawnConfig {
/// Agent binary or command to run (e.g. "pi", "hermes-agent", etc.).
pub binary: String,
/// Command-line arguments.
#[serde(default)]
pub args: Vec<String>,
/// Environment variables to inject.
#[serde(default)]
pub env: HashMap<String, String>,
/// Working directory for the subprocess.
#[serde(default)]
pub working_dir: Option<String>,
/// Primary provider.
#[serde(default = "default_provider")]
pub provider: Provider,
/// Model name.
pub model: String,
/// Session ID to attach.
#[serde(default)]
pub session_id: Option<String>,
/// System prompt override.
#[serde(default)]
pub system_prompt: Option<String>,
/// Maximum retries for spawn failures.
#[serde(default = "default_max_retries")]
pub max_retries: u32,
/// Timeout for subprocess startup (seconds).
#[serde(default = "default_startup_timeout_secs")]
pub startup_timeout_secs: u64,
}
fn default_provider() -> Provider {
Provider::DeepSeek
}
fn default_max_retries() -> u32 {
3
}
fn default_startup_timeout_secs() -> u64 {
30
}
impl Default for AgentSpawnConfig {
fn default() -> Self {
Self {
binary: String::new(),
args: Vec::new(),
env: HashMap::new(),
working_dir: None,
provider: default_provider(),
model: String::new(),
session_id: None,
system_prompt: None,
max_retries: default_max_retries(),
startup_timeout_secs: default_startup_timeout_secs(),
}
}
}
// ---------------------------------------------------------------------------
// Agent handle
// ---------------------------------------------------------------------------
/// Agent status.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum AgentStatus {
Starting,
Running,
Stopped,
Failed(String),
}
/// Handle to a running agent subprocess.
pub struct AgentHandle {
/// Unique agent ID.
pub id: String,
/// Agent configuration.
pub config: AgentSpawnConfig,
/// Current status.
pub status: RwLock<AgentStatus>,
/// Subprocess handle (Some only when running).
child: Mutex<Option<Child>>,
/// Creation timestamp.
pub created_at: String,
}
impl AgentHandle {
pub fn new(config: AgentSpawnConfig) -> Self {
Self {
id: Uuid::new_v4().to_string(),
config,
status: RwLock::new(AgentStatus::Starting),
child: Mutex::new(None),
created_at: chrono::Utc::now().to_rfc3339(),
}
}
/// Kill the agent subprocess.
pub async fn kill(&self) -> Result<(), SpawnerError> {
let mut child = self.child.lock().await;
if let Some(ref mut child) = *child {
child.kill().await.map_err(SpawnerError::Io)?;
child.wait().await.map_err(SpawnerError::Io)?;
}
*child = None;
*self.status.write().await = AgentStatus::Stopped;
Ok(())
}
/// Check if the process has exited.
pub async fn poll_exit(&self) -> Option<std::process::ExitStatus> {
let mut child_guard = self.child.lock().await;
if let Some(ref mut proc) = *child_guard {
match proc.try_wait() {
Ok(Some(status)) => {
*child_guard = None;
*self.status.write().await = AgentStatus::Stopped;
Some(status)
}
Ok(None) => None,
Err(_) => {
*child_guard = None;
*self.status.write().await = AgentStatus::Failed("poll error".to_string());
None
}
}
} else {
None
}
}
}
// ---------------------------------------------------------------------------
// Spawner
// ---------------------------------------------------------------------------
#[derive(Debug, Error)]
pub enum SpawnerError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("no API key configured for provider {provider}")]
NoApiKey { provider: String },
#[error("agent {agent_id} not found")]
AgentNotFound { agent_id: String },
#[error("all providers exhausted for agent {agent_id}")]
AllProvidersExhausted { agent_id: String },
#[error("spawn timeout after {secs}s")]
SpawnTimeout { secs: u64 },
}
/// The agent spawner — manages subprocess lifecycle.
pub struct Spawner {
config: Arc<DaemonConfig>,
}
impl Spawner {
pub fn new(config: Arc<DaemonConfig>) -> Self {
Self { config }
}
/// Spawn an agent subprocess. Tries the primary provider first, then
/// falls back through the provider list. Retries spawn failures with
/// exponential backoff.
pub async fn spawn(&self, agent_config: AgentSpawnConfig) -> Result<AgentHandle, SpawnerError> {
let agent_id = Uuid::new_v4().to_string();
// Build the provider routing list: primary first, then fallbacks.
let providers = {
let mut providers = vec![agent_config.provider.clone()];
for p in Provider::fallback_order() {
if *p != agent_config.provider {
providers.push(p.clone());
}
}
providers
};
let mut last_error: Option<SpawnerError> = None;
for provider in &providers {
info!(
agent_id = %agent_id,
provider = ?provider,
model = %agent_config.model,
"attempting spawn"
);
// Check API key availability
if provider.api_key(&self.config).is_none() {
warn!(
agent_id = %agent_id,
provider = ?provider,
"no API key configured, skipping"
);
last_error = Some(SpawnerError::NoApiKey {
provider: format!("{provider:?}"),
});
continue;
}
let handle = AgentHandle::new(agent_config.clone());
// Build environment for this provider
let mut env_map = agent_config.env.clone();
env_map.insert(
format!("{}_API_KEY", provider_name_upper(provider)),
provider.api_key(&self.config).unwrap_or("").to_string(),
);
if let Some(endpoint) = provider.endpoint(&self.config) {
env_map.insert(
format!("{}_ENDPOINT", provider_name_upper(provider)),
endpoint.to_string(),
);
}
env_map.insert("COLIBRI_PROVIDER".to_string(), format!("{provider:?}").to_lowercase());
env_map.insert("COLIBRI_MODEL".to_string(), agent_config.model.clone());
env_map.insert("COLIBRI_AGENT_ID".to_string(), agent_id.clone());
if let Some(ref session_id) = agent_config.session_id {
env_map.insert("COLIBRI_SESSION_ID".to_string(), session_id.clone());
}
if let Some(ref system_prompt) = agent_config.system_prompt {
env_map.insert("COLIBRI_SYSTEM_PROMPT".to_string(), system_prompt.clone());
}
// Spawn with retry/backoff
let spawn_result = {
let binary = agent_config.binary.clone();
let args = agent_config.args.clone();
let env = env_map.clone();
let working_dir = agent_config.working_dir.clone();
let _startup_timeout = agent_config.startup_timeout_secs;
let op = || async {
let mut cmd = Command::new(&binary);
cmd.args(&args)
.envs(&env)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
if let Some(ref dir) = working_dir {
cmd.current_dir(dir);
}
let mut child = cmd.spawn().map_err(SpawnerError::Io)?;
// Brief grace period for the process to start
tokio::time::sleep(Duration::from_millis(500)).await;
// Check it hasn't immediately crashed
match child.try_wait() {
Ok(Some(status)) => {
let stderr = if let Some(ref mut stderr) = child.stderr.take() {
use tokio::io::AsyncReadExt;
let mut buf = Vec::new();
let _ = stderr.read_to_end(&mut buf).await;
String::from_utf8_lossy(&buf).to_string()
} else {
String::new()
};
Err(SpawnerError::Io(std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"process exited immediately with status {status}: {stderr}"
),
)))
}
Ok(None) => Ok(child),
Err(e) => Err(SpawnerError::Io(e)),
}
};
let backoff = ExponentialBuilder::default()
.with_min_delay(Duration::from_millis(500))
.with_max_delay(Duration::from_secs(10))
.with_max_times(agent_config.max_retries as usize);
op.retry(backoff).await
};
match spawn_result {
Ok(child) => {
*handle.child.lock().await = Some(child);
*handle.status.write().await = AgentStatus::Running;
info!(agent_id = %agent_id, provider = ?provider, "agent spawned successfully");
return Ok(handle);
}
Err(e) => {
error!(
agent_id = %agent_id,
provider = ?provider,
error = %e,
"spawn failed for provider"
);
last_error = Some(e);
}
}
}
Err(last_error.unwrap_or(SpawnerError::AllProvidersExhausted {
agent_id,
}))
}
}
fn provider_name_upper(provider: &Provider) -> &'static str {
match provider {
Provider::DeepSeek => "DEEPSEEK",
Provider::OpenRouter => "OPENROUTER",
Provider::Anthropic => "ANTHROPIC",
}
}