From 9f6af19917d519084721083d7262a5a6e29a4ef5 Mon Sep 17 00:00:00 2001 From: 123kupola <123kupola@gmail.com> Date: Wed, 27 May 2026 02:32:17 +0200 Subject: [PATCH] Add colibri-daemon: always-on Rust agent runtime scaffold (Sam & Hermes) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Scaffolds the daemon crate that replaces agent-runner.ts, agent-session.ts, and session-compaction.ts from Clawdie-AI TS. Crate: crates/colibri-daemon - src/main.rs — tokio::main with graceful shutdown - src/config.rs — DaemonConfig from env (DEEPSEEK_API_KEY, etc.) - src/daemon.rs — main loop: heartbeat, session rotation, memory handoff - src/session.rs — JSONL session lifecycle (write/read/prune) - src/spawner.rs — agent subprocess management (pi spawn) - src/socket.rs — Unix socket API for Herdr glasspane - 4 tests passing, cargo check green --- Cargo.lock | 519 ++++++++++++++++++++++++++- Cargo.toml | 2 +- crates/colibri-daemon/Cargo.toml | 24 ++ crates/colibri-daemon/src/config.rs | 180 ++++++++++ crates/colibri-daemon/src/daemon.rs | 340 ++++++++++++++++++ crates/colibri-daemon/src/lib.rs | 77 ++++ crates/colibri-daemon/src/main.rs | 101 ++++++ crates/colibri-daemon/src/session.rs | 429 ++++++++++++++++++++++ crates/colibri-daemon/src/socket.rs | 284 +++++++++++++++ crates/colibri-daemon/src/spawner.rs | 383 ++++++++++++++++++++ 10 files changed, 2336 insertions(+), 3 deletions(-) create mode 100644 crates/colibri-daemon/Cargo.toml create mode 100644 crates/colibri-daemon/src/config.rs create mode 100644 crates/colibri-daemon/src/daemon.rs create mode 100644 crates/colibri-daemon/src/lib.rs create mode 100644 crates/colibri-daemon/src/main.rs create mode 100644 crates/colibri-daemon/src/session.rs create mode 100644 crates/colibri-daemon/src/socket.rs create mode 100644 crates/colibri-daemon/src/spawner.rs diff --git a/Cargo.lock b/Cargo.lock index 94ddc22..15fdcb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + [[package]] name = "android_system_properties" version = "0.1.5" @@ -29,6 +38,17 @@ version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2032f911046de80f0a198e0901378627c33f59ea0ac00e363d481118bd70a53" +[[package]] +name = "backon" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" +dependencies = [ + "fastrand", + "gloo-timers", + "tokio", +] + [[package]] name = "base64" version = "0.22.1" @@ -117,6 +137,28 @@ dependencies = [ "serde_json", ] +[[package]] +name = "colibri-daemon" +version = "0.0.1" +dependencies = [ + "backon", + "chrono", + "colibri-contracts", + "colibri-deepseek", + "colibri-glasspane", + "colibri-runtime", + "dashmap", + "reqwest", + "serde", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", + "uuid", +] + [[package]] name = "colibri-deepseek" version = "0.0.1" @@ -156,6 +198,26 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + +[[package]] +name = "dashmap" +version = "6.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6361d5c062261c78a176addb82d4c821ae42bed6089de0e12603cd25de2059c" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -179,6 +241,28 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2" +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys 0.61.2", +] + +[[package]] +name = "fastrand" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" + [[package]] name = "filedescriptor" version = "0.8.3" @@ -196,6 +280,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -220,6 +310,12 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" +[[package]] +name = "futures-sink" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" + [[package]] name = "futures-task" version = "0.3.32" @@ -260,11 +356,63 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "r-efi", + "r-efi 5.3.0", "wasip2", "wasm-bindgen", ] +[[package]] +name = "getrandom" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" +dependencies = [ + "cfg-if", + "libc", + "r-efi 6.0.0", + "wasip2", + "wasip3", +] + +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "foldhash", +] + +[[package]] +name = "hashbrown" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a" + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "http" version = "1.4.1" @@ -469,6 +617,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "id-arena" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" + [[package]] name = "idna" version = "1.1.0" @@ -490,6 +644,18 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "2.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9" +dependencies = [ + "equivalent", + "hashbrown 0.17.1", + "serde", + "serde_core", +] + [[package]] name = "ipnet" version = "2.12.0" @@ -520,6 +686,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "libc" version = "0.2.186" @@ -532,6 +704,15 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0" +[[package]] +name = "lock_api" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.30" @@ -544,6 +725,15 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "memchr" version = "2.8.0" @@ -573,6 +763,15 @@ dependencies = [ "libc", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -588,6 +787,29 @@ version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" +[[package]] +name = "parking_lot" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-link", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -639,6 +861,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro2" version = "1.0.106" @@ -718,6 +950,12 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + [[package]] name = "rand" version = "0.9.4" @@ -747,6 +985,32 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "redox_syscall" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" +dependencies = [ + "bitflags 2.11.1", +] + +[[package]] +name = "regex-automata" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" + [[package]] name = "reqwest" version = "0.12.28" @@ -852,6 +1116,18 @@ version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "semver" +version = "1.0.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd" + [[package]] name = "serde" version = "1.0.228" @@ -918,6 +1194,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shared_library" version = "0.1.9" @@ -940,6 +1225,16 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + [[package]] name = "slab" version = "0.4.12" @@ -1045,6 +1340,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "tinystr" version = "0.8.3" @@ -1079,7 +1383,9 @@ dependencies = [ "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.61.2", @@ -1106,6 +1412,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-util" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "tower" version = "0.5.3" @@ -1158,9 +1477,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.36" @@ -1168,6 +1499,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -1182,6 +1543,12 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "untrusted" version = "0.9.0" @@ -1206,6 +1573,23 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "uuid" +version = "1.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd74a9687298c6858e9b88ec8935ec45d22e8fd5e6394fa1bd4e99a87789c76" +dependencies = [ + "getrandom 0.4.2", + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "want" version = "0.3.1" @@ -1227,7 +1611,16 @@ version = "1.0.3+wasi-0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20064672db26d7cdc89c7798c48a0fdfac8213434a1186e5ef29fd560ae223d6" dependencies = [ - "wit-bindgen", + "wit-bindgen 0.57.1", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen 0.51.0", ] [[package]] @@ -1285,6 +1678,40 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap", + "wasm-encoder", + "wasmparser", +] + +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags 2.11.1", + "hashbrown 0.15.5", + "indexmap", + "semver", +] + [[package]] name = "web-sys" version = "0.3.99" @@ -1560,12 +1987,100 @@ dependencies = [ "winapi", ] +[[package]] +name = "wit-bindgen" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + [[package]] name = "wit-bindgen" version = "0.57.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ebf944e87a7c253233ad6766e082e3cd714b5d03812acc24c318f549614536e" +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap", + "prettyplease", + "syn", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags 2.11.1", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + [[package]] name = "writeable" version = "0.6.3" diff --git a/Cargo.toml b/Cargo.toml index 8da1be9..8f020ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["crates/colibri-contracts", "crates/colibri-deepseek", "crates/colibri-runtime", "crates/colibri-glasspane"] +members = ["crates/colibri-contracts", "crates/colibri-deepseek", "crates/colibri-runtime", "crates/colibri-glasspane", "crates/colibri-daemon"] [package] name = "colibri" diff --git a/crates/colibri-daemon/Cargo.toml b/crates/colibri-daemon/Cargo.toml new file mode 100644 index 0000000..bb30c11 --- /dev/null +++ b/crates/colibri-daemon/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "colibri-daemon" +version = "0.0.1" +edition = "2021" +license = "AGPL-3.0-only" +description = "Always-on Rust service: agent session lifecycle, subprocess spawner, Herdr Unix socket API" + +[dependencies] +colibri-contracts = { path = "../colibri-contracts" } +colibri-deepseek = { path = "../colibri-deepseek" } +colibri-glasspane = { path = "../colibri-glasspane" } +colibri-runtime = { path = "../colibri-runtime" } +tokio = { version = "1", features = ["full"] } +tokio-util = { version = "0.7", features = ["codec"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } +chrono = { version = "0.4", default-features = false, features = ["clock"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +dashmap = "6" +backon = "1" +uuid = { version = "1", features = ["v4"] } +thiserror = "2" diff --git a/crates/colibri-daemon/src/config.rs b/crates/colibri-daemon/src/config.rs new file mode 100644 index 0000000..df657bb --- /dev/null +++ b/crates/colibri-daemon/src/config.rs @@ -0,0 +1,180 @@ +//! Daemon configuration from environment variables. +//! +//! Replaces TypeScript `process.env` + zod validation scattered across +//! agent-runner.ts and agent-session.ts. + +use std::path::PathBuf; + +use serde::{Deserialize, Serialize}; + +// --------------------------------------------------------------------------- +// Daemon configuration +// --------------------------------------------------------------------------- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DaemonConfig { + /// Directory for session JSONL files and state. + pub data_dir: PathBuf, + /// Path for the Herdr Unix socket. + pub socket_path: PathBuf, + /// Maximum bytes per session before automatic rollover. + pub session_max_bytes: u64, + /// DeepSeek API key for provider routing. + pub deepseek_api_key: Option, + /// DeepSeek API endpoint. + pub deepseek_endpoint: String, + /// DeepSeek model string. + pub deepseek_model: String, + /// OpenRouter API key for fallback routing. + pub openrouter_api_key: Option, + /// OpenRouter API endpoint. + pub openrouter_endpoint: String, + /// Anthropic API key for fallback routing. + pub anthropic_api_key: Option, + /// Anthropic API endpoint. + pub anthropic_endpoint: String, + /// Host identifier. + pub host: String, + /// Maximum context window tokens (for compaction trigger). + pub max_context_tokens: u64, + /// Compaction target: max turns to keep uncompacted. + pub max_uncompacted_turns: usize, +} + +impl DaemonConfig { + /// Build configuration from environment variables with sensible defaults. + pub fn from_env() -> Self { + let host = std::env::var("COLIBRI_HOST") + .or_else(|_| std::env::var("HOSTNAME")) + .ok() + .filter(|v| !v.trim().is_empty()) + .unwrap_or_else(|| "unknown".to_string()); + + let data_dir = std::env::var("COLIBRI_DAEMON_DATA_DIR") + .map(PathBuf::from) + .unwrap_or_else(|_| { + dirs_data().unwrap_or_else(|| PathBuf::from("/tmp/colibri-daemon")) + }); + + let socket_path = std::env::var("COLIBRI_DAEMON_SOCKET") + .map(PathBuf::from) + .unwrap_or_else(|_| data_dir.join("colibri-daemon.sock")); + + Self { + data_dir, + socket_path, + session_max_bytes: env_parse("COLIBRI_SESSION_MAX_BYTES").unwrap_or(2_000_000), + deepseek_api_key: nonempty_env("DEEPSEEK_API_KEY"), + deepseek_endpoint: std::env::var("DEEPSEEK_ENDPOINT") + .unwrap_or_else(|_| "https://api.deepseek.com/chat/completions".to_string()), + deepseek_model: std::env::var("DEEPSEEK_MODEL") + .unwrap_or_else(|_| "deepseek-chat".to_string()), + openrouter_api_key: nonempty_env("OPENROUTER_API_KEY"), + openrouter_endpoint: std::env::var("OPENROUTER_ENDPOINT") + .unwrap_or_else(|_| "https://openrouter.ai/api/v1/chat/completions".to_string()), + anthropic_api_key: nonempty_env("ANTHROPIC_API_KEY"), + anthropic_endpoint: std::env::var("ANTHROPIC_ENDPOINT") + .unwrap_or_else(|_| "https://api.anthropic.com/v1/messages".to_string()), + host, + max_context_tokens: env_parse("COLIBRI_MAX_CONTEXT_TOKENS").unwrap_or(128_000), + max_uncompacted_turns: env_parse("COLIBRI_MAX_UNCOMPACTED_TURNS").unwrap_or(20), + } + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +fn nonempty_env(name: &str) -> Option { + std::env::var(name).ok().filter(|v| !v.trim().is_empty()) +} + +fn env_parse(name: &str) -> Option { + std::env::var(name).ok().and_then(|v| v.parse().ok()) +} + +fn dirs_data() -> Option { + std::env::var("XDG_DATA_HOME") + .ok() + .filter(|v| !v.trim().is_empty()) + .map(PathBuf::from) + .or_else(|| { + std::env::var("HOME") + .ok() + .map(|h| PathBuf::from(h).join(".local").join("share").join("colibri-daemon")) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_config_defaults_from_empty_env() { + // Safety: temporarily clear relevant env vars + let vars = [ + "COLIBRI_HOST", + "HOSTNAME", + "COLIBRI_DAEMON_DATA_DIR", + "XDG_DATA_HOME", + "COLIBRI_DAEMON_SOCKET", + "COLIBRI_SESSION_MAX_BYTES", + "DEEPSEEK_API_KEY", + "DEEPSEEK_ENDPOINT", + "DEEPSEEK_MODEL", + "OPENROUTER_API_KEY", + "OPENROUTER_ENDPOINT", + "ANTHROPIC_API_KEY", + "ANTHROPIC_ENDPOINT", + "COLIBRI_MAX_CONTEXT_TOKENS", + "COLIBRI_MAX_UNCOMPACTED_TURNS", + ]; + + let saved: Vec<(&str, Option)> = vars + .iter() + .map(|&name| (name, std::env::var(name).ok())) + .collect(); + for &name in &vars { + std::env::remove_var(name); + } + + let config = DaemonConfig::from_env(); + + // Restore saved env + for (name, val) in saved { + if let Some(v) = val { + std::env::set_var(name, v); + } else { + std::env::remove_var(name); + } + } + + assert_eq!(config.host, "unknown"); + assert!(config.data_dir.to_string_lossy().contains("colibri-daemon")); + assert_eq!(config.session_max_bytes, 2_000_000); + assert_eq!(config.deepseek_model, "deepseek-chat"); + assert_eq!(config.max_context_tokens, 128_000); + assert_eq!(config.max_uncompacted_turns, 20); + assert!(config.deepseek_api_key.is_none()); + assert!(config.openrouter_api_key.is_none()); + assert!(config.anthropic_api_key.is_none()); + } + + #[test] + fn test_config_from_env_vars() { + std::env::set_var("COLIBRI_HOST", "test-host"); + std::env::set_var("DEEPSEEK_API_KEY", "sk-test"); + std::env::set_var("DEEPSEEK_MODEL", "deepseek-v4"); + std::env::set_var("COLIBRI_MAX_CONTEXT_TOKENS", "64000"); + std::env::set_var("COLIBRI_SESSION_MAX_BYTES", "500000"); + + let config = DaemonConfig::from_env(); + + assert_eq!(config.host, "test-host"); + assert_eq!(config.deepseek_api_key.as_deref(), Some("sk-test")); + assert_eq!(config.deepseek_model, "deepseek-v4"); + assert_eq!(config.max_context_tokens, 64000); + assert_eq!(config.session_max_bytes, 500000); + } +} diff --git a/crates/colibri-daemon/src/daemon.rs b/crates/colibri-daemon/src/daemon.rs new file mode 100644 index 0000000..5fcaaa5 --- /dev/null +++ b/crates/colibri-daemon/src/daemon.rs @@ -0,0 +1,340 @@ +//! Main daemon loop — heartbeat, task polling, session rotation, memory handoff. +//! +//! Replaces control-plane loop logic scattered across agent-runner.ts, +//! agent-session.ts, and session-compaction.ts in Clawdie-AI TypeScript. + +use std::sync::Arc; +use std::time::Duration; + +use dashmap::DashMap; +use tokio::sync::broadcast; +use tracing::{debug, error, info, warn}; + +use crate::config::DaemonConfig; +use crate::session::Session; +use crate::spawner::{AgentHandle, Spawner}; + +// --------------------------------------------------------------------------- +// Shared daemon state +// --------------------------------------------------------------------------- + +/// Shared state accessible to all daemon subsystems. +pub struct DaemonState { + pub config: DaemonConfig, + /// Active agent sessions keyed by session ID. + pub sessions: DashMap, + /// Running agent subprocesses keyed by agent ID. + pub agents: DashMap, + /// Shutdown signal: on drop or ctrl_c, all listeners abort. + pub shutdown_tx: broadcast::Sender<()>, + pub shutdown_rx: broadcast::Receiver<()>, +} + +impl DaemonState { + pub fn new(config: DaemonConfig) -> Self { + let (shutdown_tx, shutdown_rx) = broadcast::channel(16); + Self { + config, + sessions: DashMap::new(), + agents: DashMap::new(), + shutdown_tx, + shutdown_rx, + } + } +} + +pub type SharedState = Arc; + +// --------------------------------------------------------------------------- +// Daemon loop +// --------------------------------------------------------------------------- + +/// Configuration for the daemon background loop. +#[derive(Debug, Clone)] +pub struct DaemonLoopConfig { + /// Heartbeat interval (status log + stale agent detection). + pub heartbeat_interval: Duration, + /// Session rotation interval (compaction check). + pub session_rotation_interval: Duration, + /// Memory handoff interval (cross-agent coordination). + pub memory_handoff_interval: Duration, + /// Agent stall timeout: if an agent subprocess hasn't polled in this + /// long, flag it as stalled. + pub agent_stall_timeout: Duration, +} + +impl Default for DaemonLoopConfig { + fn default() -> Self { + Self { + heartbeat_interval: Duration::from_secs(30), + session_rotation_interval: Duration::from_secs(60), + memory_handoff_interval: Duration::from_secs(120), + agent_stall_timeout: Duration::from_secs(300), + } + } +} + +/// Run the main daemon background loop. +/// +/// Spawned as a tokio task. Periodically: +/// 1. Heartbeat: logs status, detects stalled agents +/// 2. Session rotation: checks sessions for compaction/rollover +/// 3. Memory handoff: coordinates shared context across active agents +/// +/// Returns when the shutdown signal is received. +pub async fn run_loop( + state: SharedState, + loop_config: DaemonLoopConfig, + mut shutdown_rx: broadcast::Receiver<()>, +) { + let mut heartbeat_tick = tokio::time::interval(loop_config.heartbeat_interval); + let mut rotation_tick = tokio::time::interval(loop_config.session_rotation_interval); + let mut handoff_tick = tokio::time::interval(loop_config.memory_handoff_interval); + + // Suppress the initial burst by skipping the first tick + heartbeat_tick.tick().await; // consume immediate tick + rotation_tick.tick().await; + handoff_tick.tick().await; + + info!( + heartbeat_secs = loop_config.heartbeat_interval.as_secs(), + rotation_secs = loop_config.session_rotation_interval.as_secs(), + handoff_secs = loop_config.memory_handoff_interval.as_secs(), + "daemon background loop started" + ); + + loop { + tokio::select! { + _ = heartbeat_tick.tick() => { + heartbeat(&state, loop_config.agent_stall_timeout).await; + } + _ = rotation_tick.tick() => { + session_rotation(&state).await; + } + _ = handoff_tick.tick() => { + memory_handoff(&state).await; + } + _ = shutdown_rx.recv() => { + info!("daemon loop received shutdown signal"); + break; + } + } + } + + info!("daemon background loop exited"); +} + +// --------------------------------------------------------------------------- +// Heartbeat +// --------------------------------------------------------------------------- + +/// Log daemon status and detect stalled agent subprocesses. +async fn heartbeat(state: &SharedState, _stall_timeout: Duration) { + let session_count = state.sessions.len(); + let agent_count = state.agents.len(); + + debug!( + sessions = session_count, + agents = agent_count, + "daemon heartbeat" + ); + + // Check for stalled agent subprocesses + let mut stalled: Vec = Vec::new(); + for entry in state.agents.iter() { + let handle = entry.value(); + if let Some(status) = handle.poll_exit().await { + warn!( + agent_id = %handle.id, + exit_status = ?status, + "agent subprocess exited" + ); + // If the process exited with an error, mark it + if !status.success() { + stalled.push(handle.id.clone()); + } + } + } + + // Clean up stalled/stopped agents from the registry + // (keep them for a grace period so Herdr can query status) + if !stalled.is_empty() { + info!(stalled_agents = ?stalled, "detected stalled agents"); + } +} + +// --------------------------------------------------------------------------- +// Session rotation +// --------------------------------------------------------------------------- + +/// Check and rotate sessions that exceed byte/turn thresholds. +async fn session_rotation(state: &SharedState) { + let mut compacted = 0usize; + let mut pruned = 0usize; + + for entry in state.sessions.iter() { + let session = entry.value(); + + let (byte_count, turn_count) = { + let bc = session.byte_count().await; + let tc = session.turn_count().await; + (bc, tc) + }; + + let needs_compaction = byte_count > state.config.session_max_bytes + || turn_count > state.config.max_uncompacted_turns; + + if needs_compaction { + debug!( + session_id = %session.id, + byte_count = byte_count, + turn_count = turn_count, + session_max_bytes = state.config.session_max_bytes, + max_uncompacted_turns = state.config.max_uncompacted_turns, + "triggering session compaction" + ); + + match session.compact_oldest_turns().await { + Ok(()) => { + compacted += 1; + info!( + session_id = %session.id, + "session compaction completed" + ); + } + Err(e) => { + error!( + session_id = %session.id, + error = %e, + "session compaction failed" + ); + } + } + } + + // If byte count is very large even after compaction, prune aggressively + let pruned_byte_count = session.byte_count().await; + if pruned_byte_count > state.config.session_max_bytes * 3 { + let keep_turns = (state.config.max_uncompacted_turns / 2).max(1); + match session.prune_to(keep_turns).await { + Ok(()) => { + pruned += 1; + info!( + session_id = %session.id, + keep_turns = keep_turns, + "session pruned to limit" + ); + } + Err(e) => { + error!( + session_id = %session.id, + error = %e, + "session prune failed" + ); + } + } + } + } + + if compacted > 0 || pruned > 0 { + info!( + compacted = compacted, + pruned = pruned, + "session rotation complete" + ); + } +} + +// --------------------------------------------------------------------------- +// Memory handoff +// --------------------------------------------------------------------------- + +/// Coordinate shared context across active agents. +/// +/// This is the Rust equivalent of the TypeScript memory-handoff logic that +/// enables agents to share context/state without bloating individual sessions. +/// Currently a stub that logs state; full implementation requires an LLM +/// compaction pass to produce shared summaries. +async fn memory_handoff(state: &SharedState) { + let agent_count = state.agents.len(); + let session_count = state.sessions.len(); + + if agent_count == 0 { + debug!("memory handoff skipped: no active agents"); + return; + } + + debug!( + agents = agent_count, + sessions = session_count, + "memory handoff tick" + ); + + // Future: produce a shared context snippet that all agents can reference, + // replacing the need for each agent to carry full context independently. + // This will call into the LLM (via colibri-deepseek) for summarization, + // then inject the summary into active sessions as a Compaction entry. + + // Example placeholder for the real handoff: + // for entry in state.sessions.iter() { + // let session = entry.value(); + // let turns = session.turns().await; + // if turns.len() > 10 { + // // Summarize and inject shared context + // debug!(session_id = %session.id, turns = turns.len(), "handoff candidate"); + // } + // } +} + +// --------------------------------------------------------------------------- +// Task polling +// --------------------------------------------------------------------------- + +/// Poll for new tasks from the control plane. +/// +/// This is the daemon's entry point for external task dispatch — e.g. a +/// control-plane operator (Herdr, web dashboard, cron) pushes a new agent run +/// request. The daemon picks it up and routes it through the spawner. +/// +/// Currently a stub; will be wired to a task queue (filesystem, Redis, or +/// database-backed) once the control-plane API is finalized. +pub async fn poll_tasks(state: &SharedState) { + debug!("task polling tick"); + + // Placeholder: check a well-known directory or queue for pending task + // manifests, then call Spawner::spawn() for each. + let _spawner = Spawner::new(state.config.clone().into()); + + // Example: + // let tasks_dir = state.config.data_dir.join("tasks"); + // if tasks_dir.exists() { + // let mut entries = tokio::fs::read_dir(&tasks_dir).await?; + // while let Some(entry) = entries.next_entry().await? { + // // parse task, spawn agent + // } + // } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_daemon_loop_config_defaults() { + let config = DaemonLoopConfig::default(); + assert_eq!(config.heartbeat_interval, Duration::from_secs(30)); + assert_eq!(config.session_rotation_interval, Duration::from_secs(60)); + assert_eq!(config.memory_handoff_interval, Duration::from_secs(120)); + assert_eq!(config.agent_stall_timeout, Duration::from_secs(300)); + } + + #[test] + fn test_daemon_state_creation() { + let config = DaemonConfig::from_env(); + let state = DaemonState::new(config); + + assert_eq!(state.sessions.len(), 0); + assert_eq!(state.agents.len(), 0); + assert_eq!(state.shutdown_tx.receiver_count(), 1); + } +} diff --git a/crates/colibri-daemon/src/lib.rs b/crates/colibri-daemon/src/lib.rs new file mode 100644 index 0000000..5648589 --- /dev/null +++ b/crates/colibri-daemon/src/lib.rs @@ -0,0 +1,77 @@ +//! colibri-daemon — always-on Rust service replacing agent-runner.ts, +//! agent-session.ts, session-compaction.ts, and controlplane-*.ts runners. +//! +//! Core responsibilities: +//! - Session lifecycle (JSONL write/read/prune + context-window management) +//! - Agent subprocess spawner (provider/model config, retry/backoff) +//! - Unix socket API for Herdr operator dashboard +//! - Provider routing: DeepSeek primary, OpenRouter/Anthropic fallback +//! - DeepSeek cache discipline: immutable system prefix + appendable log + +//! volatile scratch (the 3-region prompt model) + +pub mod config; +pub mod daemon; +pub mod session; +pub mod socket; +pub mod spawner; + +// Re-exports for convenience +pub use config::DaemonConfig; +pub use daemon::{DaemonState, SharedState}; + +use serde::{Deserialize, Serialize}; + +// --------------------------------------------------------------------------- +// Wire types for the socket API +// --------------------------------------------------------------------------- + +/// Inbound command from Herdr over the Unix socket. +#[derive(Debug, Deserialize)] +#[serde(tag = "cmd")] +pub enum HerdrCommand { + #[serde(rename = "status")] + Status, + #[serde(rename = "list-sessions")] + ListSessions, + #[serde(rename = "spawn-agent")] + SpawnAgent { + provider: String, + model: String, + session_id: Option, + system_prompt: Option, + }, + #[serde(rename = "kill-agent")] + KillAgent { agent_id: String }, + #[serde(rename = "get-session")] + GetSession { session_id: String }, + #[serde(rename = "compact-session")] + CompactSession { session_id: String }, +} + +/// Outbound response to Herdr. +#[derive(Debug, Serialize)] +pub struct HerdrResponse { + pub ok: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, +} + +impl HerdrResponse { + pub fn ok(data: serde_json::Value) -> Self { + Self { + ok: true, + error: None, + data: Some(data), + } + } + + pub fn err(message: impl Into) -> Self { + Self { + ok: false, + error: Some(message.into()), + data: None, + } + } +} diff --git a/crates/colibri-daemon/src/main.rs b/crates/colibri-daemon/src/main.rs new file mode 100644 index 0000000..dd2544f --- /dev/null +++ b/crates/colibri-daemon/src/main.rs @@ -0,0 +1,101 @@ +//! colibri-daemon — always-on Rust service for the Colibri control plane. +//! +//! Starts: +//! - Session manager (JSONL persistence + context window management) +//! - Agent spawner (subprocess lifecycle + provider routing) +//! - Herdr Unix socket API +//! +//! Graceful shutdown on SIGTERM/SIGINT (or Ctrl+C). + +use std::sync::Arc; + +use colibri_daemon::{session, socket, DaemonConfig, DaemonState, SharedState}; +use tracing::info; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialise tracing + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .init(); + + info!("colibri-daemon starting up"); + + // Load configuration + let config = DaemonConfig::from_env(); + info!( + host = %config.host, + data_dir = %config.data_dir.display(), + socket_path = %config.socket_path.display(), + session_max_bytes = config.session_max_bytes, + max_context_tokens = config.max_context_tokens, + "configuration loaded" + ); + + // Build shared state + let state: SharedState = Arc::new(DaemonState::new(config.clone())); + + // Ensure data directories exist + tokio::fs::create_dir_all(&config.data_dir).await?; + tokio::fs::create_dir_all(config.data_dir.join("sessions")).await?; + + // Bootstrap: load any existing sessions + if let Ok(mut entries) = tokio::fs::read_dir(config.data_dir.join("sessions")).await { + while let Ok(Some(entry)) = entries.next_entry().await { + let path = entry.path(); + if path.extension().and_then(|e| e.to_str()) == Some("jsonl") { + if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) { + let session_id = stem.to_string(); + let cfg = Arc::new(config.clone()); + match session::Session::load(session_id.clone(), cfg) { + Ok(sess) => { + info!(session_id = %session_id, "loaded existing session"); + state.sessions.insert(session_id, sess); + } + Err(e) => { + tracing::warn!( + session_id = %session_id, + error = %e, + "failed to load session file" + ); + } + } + } + } + } + } + + // Start the Herdr socket server + let socket_state = state.clone(); + let socket_shutdown = state.shutdown_rx.resubscribe(); + let socket_handle = tokio::spawn(async move { + socket::serve(socket_state, socket_shutdown).await; + }); + + // Listen for shutdown signals + let shutdown_state = state.clone(); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.ok(); + info!("received interrupt signal, initiating graceful shutdown"); + let _ = shutdown_state.shutdown_tx.send(()); + + // Give sub-tasks a moment to clean up + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + // Kill any running agent subprocesses + for entry in shutdown_state.agents.iter() { + let handle = entry.value(); + info!(agent_id = %handle.id, "killing agent"); + let _ = handle.kill().await; + } + }); + + // Wait for the socket server to finish + socket_handle.await?; + + info!("colibri-daemon shut down cleanly"); + Ok(()) +} diff --git a/crates/colibri-daemon/src/session.rs b/crates/colibri-daemon/src/session.rs new file mode 100644 index 0000000..8c5ba33 --- /dev/null +++ b/crates/colibri-daemon/src/session.rs @@ -0,0 +1,429 @@ +//! Session lifecycle management — replaces agent-session.ts and +//! session-compaction.ts. +//! +//! Each session is an append-only JSONL file on disk with an in-memory +//! representation. The session enforces the DeepSeek 3-region prompt model: +//! +//! 1. Immutable system prefix (byte-stable, cacheable) +//! 2. Appendable conversation log (turns accumulate until compaction) +//! 3. Volatile scratch (discarded per-turn, never persisted) +//! +//! When total bytes exceed `session_max_bytes` or the number of uncompacted +//! turns exceeds `max_uncompacted_turns`, the oldest turns are compacted +//! (summarised) via LLM call. Compacted summaries are stored as special +//! "compaction" entries in the JSONL. + +use std::collections::VecDeque; +use std::io::{BufRead, BufReader, Write}; +use std::path::PathBuf; +use std::sync::Arc; + +use chrono::Utc; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use thiserror::Error; +use tokio::sync::RwLock; + +use crate::DaemonConfig; + +// --------------------------------------------------------------------------- +// Turn / Entry types +// --------------------------------------------------------------------------- + +/// A single entry in the session JSONL log. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum SessionEntry { + #[serde(rename = "system")] + System { + content: String, + #[serde(default = "default_true")] + cacheable: bool, + }, + #[serde(rename = "user")] + User { content: String }, + #[serde(rename = "assistant")] + Assistant { content: String }, + #[serde(rename = "tool_call")] + ToolCall { + name: String, + arguments: Value, + }, + #[serde(rename = "tool_result")] + ToolResult { + name: String, + result: Value, + }, + #[serde(rename = "compaction")] + Compaction { + summary: String, + compacted_count: usize, + #[serde(flatten)] + extra: Value, + }, +} + +fn default_true() -> bool { + true +} + +impl SessionEntry { + /// Approximate token count for this entry (character count / 3). + pub fn approx_tokens(&self) -> u64 { + let s = serde_json::to_string(self).unwrap_or_default(); + (s.len() as u64).div_ceil(3) + } +} + +/// A single conversation turn (user + assistant + optional tool calls/results). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Turn { + pub index: u64, + pub entries: Vec, +} + +impl Turn { + pub fn approx_tokens(&self) -> u64 { + self.entries.iter().map(SessionEntry::approx_tokens).sum() + } +} + +// --------------------------------------------------------------------------- +// Session +// --------------------------------------------------------------------------- + +#[derive(Debug, Error)] +pub enum SessionError { + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + #[error("JSON error: {0}")] + Json(#[from] serde_json::Error), + #[error("session not found: {0}")] + NotFound(String), + #[error("session already exists: {0}")] + AlreadyExists(String), +} + +/// An active agent session. +/// +/// Thread-safe: all mutable state is behind an `RwLock`. +pub struct Session { + /// Unique session ID. + pub id: String, + /// Path to the backing JSONL file. + path: PathBuf, + /// In-memory turn buffer (most recent first for efficient pruning). + turns: RwLock>, + /// Total bytes written to the JSONL file (approximate). + byte_count: RwLock, + /// Configuration reference. + config: Arc, + /// Creation timestamp. + pub created_at: String, +} + +impl Session { + /// Create a new session, writing the initial JSONL file. + pub fn create(id: String, config: Arc) -> Result { + let path = config.data_dir.join("sessions").join(format!("{id}.jsonl")); + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent)?; + } + if path.exists() { + return Err(SessionError::AlreadyExists(id)); + } + // Touch the file + std::fs::File::create(&path)?; + + Ok(Self { + id, + path, + turns: RwLock::new(VecDeque::new()), + byte_count: RwLock::new(0), + config, + created_at: Utc::now().to_rfc3339(), + }) + } + + /// Load an existing session from its JSONL file. + pub fn load(id: String, config: Arc) -> Result { + let path = config.data_dir.join("sessions").join(format!("{id}.jsonl")); + if !path.exists() { + return Err(SessionError::NotFound(id)); + } + + let file = std::fs::File::open(&path)?; + let reader = BufReader::new(file); + let mut turns: VecDeque = VecDeque::new(); + let mut current_turn: Option = None; + let mut turn_index: u64 = 0; + let mut byte_count: u64 = 0; + + for line in reader.lines() { + let line = line?; + byte_count += line.len() as u64 + 1; // +1 for newline + let entry: SessionEntry = serde_json::from_str(&line)?; + + let is_user = matches!(entry, SessionEntry::User { .. } | SessionEntry::System { .. }); + if is_user { + // Start a new turn + if let Some(turn) = current_turn.take() { + turns.push_back(turn); + } + current_turn = Some(Turn { + index: turn_index, + entries: vec![entry], + }); + turn_index += 1; + } else if let Some(ref mut turn) = current_turn { + turn.entries.push(entry); + } else { + // Orphan entry not preceded by a user/system message: treat as + // its own turn. + current_turn = Some(Turn { + index: turn_index, + entries: vec![entry], + }); + turn_index += 1; + } + } + if let Some(turn) = current_turn { + turns.push_back(turn); + } + + Ok(Self { + id, + path, + turns: RwLock::new(turns), + byte_count: RwLock::new(byte_count), + config, + created_at: Utc::now().to_rfc3339(), // best-effort; could store in session metadata + }) + } + + /// Append a single entry to the session, flushing to JSONL. + pub async fn append(&self, entry: SessionEntry) -> Result<(), SessionError> { + let line = serde_json::to_string(&entry)?; + let line_bytes = line.len() as u64 + 1; + + // Write to JSONL + let mut file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&self.path)?; + writeln!(file, "{line}")?; + + // Update in-memory + { + let mut turns = self.turns.write().await; + // Add to the current turn if exists (and entry is assistant/tool), + // otherwise start a new turn for user/system messages. + let is_user = + matches!(entry, SessionEntry::User { .. } | SessionEntry::System { .. }); + if is_user || turns.is_empty() { + let next_index = turns.back().map(|t| t.index + 1).unwrap_or(0); + turns.push_back(Turn { + index: next_index, + entries: vec![entry], + }); + } else if let Some(last) = turns.back_mut() { + last.entries.push(entry); + } + } + + { + let mut bc = self.byte_count.write().await; + *bc += line_bytes; + } + + // Check if we need rollover or compaction + self.maybe_compact_or_rollover().await?; + + Ok(()) + } + + /// Get all turns (newest last for prompt construction). + pub async fn turns(&self) -> Vec { + let turns = self.turns.read().await; + turns.iter().cloned().collect() + } + + /// Get the total byte count. + pub async fn byte_count(&self) -> u64 { + *self.byte_count.read().await + } + + /// Get the number of turns. + pub async fn turn_count(&self) -> usize { + self.turns.read().await.len() + } + + // ------------------------------------------------------------------ + // Compaction + // ------------------------------------------------------------------ + + /// Check thresholds and compact/prune if needed. + async fn maybe_compact_or_rollover(&self) -> Result<(), SessionError> { + let (byte_count, turn_count) = { + let bc = self.byte_count.read().await; + let turns = self.turns.read().await; + (*bc, turns.len()) + }; + + let needs_compaction = + byte_count > self.config.session_max_bytes + || turn_count > self.config.max_uncompacted_turns; + + if needs_compaction { + self.compact_oldest_turns().await?; + } + + Ok(()) + } + + /// Compact the oldest turns by merging them into a summary entry. + /// + /// The caller (the LLM summarizer) produces a compact summary string. + /// This method handles the bookkeeping: removing old turn entries and + /// appending a compaction marker. + pub async fn compact_oldest_turns(&self) -> Result<(), SessionError> { + let mut turns = self.turns.write().await; + + // Keep the most recent `max_uncompacted_turns` turns, compact the rest. + let keep = self.config.max_uncompacted_turns; + if turns.len() <= keep { + return Ok(()); + } + + let compact_count = turns.len() - keep; + // Build a simple summary of the compacted turns (placeholder; real + // compaction uses LLM summarisation). + let summary = format!( + "[compacted {} earlier turns — total {} entries]", + compact_count, + turns.iter().take(compact_count).map(|t| t.entries.len()).sum::(), + ); + + // Append compaction entry to the live JSONL + let entry = SessionEntry::Compaction { + summary, + compacted_count: compact_count, + extra: serde_json::json!({ + "compacted_at": Utc::now().to_rfc3339(), + "compacted_turn_indices": (0..compact_count as u64).collect::>(), + }), + }; + let line = serde_json::to_string(&entry)?; + let mut file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&self.path)?; + writeln!(file, "{line}")?; + + // Drop compacted turns from the front + for _ in 0..compact_count { + turns.pop_front(); + } + + Ok(()) + } + + /// Prune the session: keep only the last N turns, removing all older + /// entries from both JSONL and memory. + pub async fn prune_to(&self, keep_turns: usize) -> Result<(), SessionError> { + let mut turns = self.turns.write().await; + while turns.len() > keep_turns { + turns.pop_front(); + } + + // Re-write JSONL from current turns + let tmp_path = self.path.with_extension("jsonl.tmp"); + { + let mut file = std::fs::File::create(&tmp_path)?; + for turn in turns.iter() { + for entry in &turn.entries { + let line = serde_json::to_string(entry)?; + writeln!(file, "{line}")?; + } + } + } + std::fs::rename(&tmp_path, &self.path)?; + + // Recompute byte count + let meta = std::fs::metadata(&self.path)?; + *self.byte_count.write().await = meta.len(); + + Ok(()) + } + + // ------------------------------------------------------------------ + // 3-region prompt assembly (for DeepSeek cache discipline) + // ------------------------------------------------------------------ + + /// Build the 3-region prompt: + /// 1. Immutable system prefix (byte-stable for cache hits) + /// 2. Appendable conversation log (turns, possibly with compaction gaps) + /// 3. Volatile scratch (not included — caller appends per-request) + pub async fn build_prompt_messages(&self) -> Vec { + let mut messages: Vec = Vec::new(); + + // Region 1: Immutable system prefix + messages.push(serde_json::json!({ + "role": "system", + "content": colibri_deepseek::STABLE_SYSTEM_PREFIX, + })); + + // Region 2: Appendable conversation log + let turns = self.turns.read().await; + for turn in turns.iter() { + for entry in &turn.entries { + match entry { + SessionEntry::System { content, .. } => { + messages.push(serde_json::json!({"role": "system", "content": content})); + } + SessionEntry::User { content } => { + messages.push(serde_json::json!({"role": "user", "content": content})); + } + SessionEntry::Assistant { content } => { + messages.push(serde_json::json!({"role": "assistant", "content": content})); + } + SessionEntry::ToolCall { name, arguments } => { + messages.push(serde_json::json!({ + "role": "assistant", + "tool_calls": [{ + "id": format!("call_{}", turn.index), + "type": "function", + "function": { + "name": name, + "arguments": serde_json::to_string(arguments).unwrap_or_default(), + } + }] + })); + } + SessionEntry::ToolResult { name, result } => { + messages.push(serde_json::json!({ + "role": "tool", + "tool_call_id": format!("call_{}", turn.index), + "name": name, + "content": result.to_string(), + })); + } + SessionEntry::Compaction { summary, .. } => { + messages.push(serde_json::json!({ + "role": "system", + "content": format!("[SESSION COMPACTION] {summary}"), + })); + } + } + } + } + + // Region 3 (volatile scratch) is left empty — the API caller appends + // per-request user/assistant content. + + messages + } +} + +// Tests deferred to crate-level integration tests (tests/ directory). + diff --git a/crates/colibri-daemon/src/socket.rs b/crates/colibri-daemon/src/socket.rs new file mode 100644 index 0000000..104c725 --- /dev/null +++ b/crates/colibri-daemon/src/socket.rs @@ -0,0 +1,284 @@ +//! Unix socket API for Herdr operator dashboard. +//! +//! Listens on a Unix domain socket, accepts newline-delimited JSON commands, +//! and dispatches to the session manager and agent spawner. +//! +//! Protocol (matching the existing watchdog JSON+newline convention): +//! - Client sends: `{"cmd":"status"}\n` +//! - Server responds: `{"ok":true,"data":{...}}\n` + +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::{UnixListener, UnixStream}; +use tokio::select; +use tokio::sync::broadcast; +use tracing::{debug, error, info, trace, warn}; + +use crate::spawner::{AgentSpawnConfig, Provider, Spawner}; +use crate::{HerdrCommand, HerdrResponse, SharedState}; + +// --------------------------------------------------------------------------- +// Socket server +// --------------------------------------------------------------------------- + +/// Run the Herdr socket API server. Binds to the configured Unix socket path +/// and processes inbound commands until a shutdown signal is received. +pub async fn serve(state: SharedState, mut shutdown_rx: broadcast::Receiver<()>) { + let socket_path = state.config.socket_path.clone(); + + // Clean up stale socket file + if socket_path.exists() { + if let Err(e) = tokio::fs::remove_file(&socket_path).await { + warn!( + path = %socket_path.display(), + error = %e, + "failed to remove stale socket file" + ); + } + } + + // Ensure parent directory exists + if let Some(parent) = socket_path.parent() { + if let Err(e) = tokio::fs::create_dir_all(parent).await { + error!( + path = %parent.display(), + error = %e, + "failed to create socket parent directory" + ); + return; + } + } + + let listener = match UnixListener::bind(&socket_path) { + Ok(l) => l, + Err(e) => { + error!( + path = %socket_path.display(), + error = %e, + "failed to bind Unix socket" + ); + return; + } + }; + + info!(path = %socket_path.display(), "Herdr socket API listening"); + + loop { + select! { + accept_result = listener.accept() => { + match accept_result { + Ok((stream, _addr)) => { + let state = state.clone(); + tokio::spawn(handle_connection(stream, state)); + } + Err(e) => { + error!(error = %e, "socket accept error"); + } + } + } + _ = shutdown_rx.recv() => { + info!("socket server received shutdown signal"); + break; + } + } + } + + // Clean up socket file on graceful exit + let _ = tokio::fs::remove_file(&socket_path).await; + info!("Herdr socket API shut down"); +} + +// --------------------------------------------------------------------------- +// Connection handler +// --------------------------------------------------------------------------- + +async fn handle_connection(stream: UnixStream, state: SharedState) { + let (reader, mut writer) = stream.into_split(); + let mut reader = BufReader::new(reader); + let mut line = String::new(); + + loop { + line.clear(); + match reader.read_line(&mut line).await { + Ok(0) => { + // EOF — client disconnected + trace!("socket client disconnected"); + break; + } + Ok(_) => { + let trimmed = line.trim_end(); + if trimmed.is_empty() { + continue; + } + + let response = match serde_json::from_str::(trimmed) { + Ok(cmd) => dispatch(cmd, &state).await, + Err(e) => HerdrResponse::err(format!("invalid command: {e}")), + }; + + let mut response_json = serde_json::to_string(&response).unwrap_or_default(); + response_json.push('\n'); + + if let Err(e) = writer.write_all(response_json.as_bytes()).await { + debug!(error = %e, "socket write error"); + break; + } + } + Err(e) => { + debug!(error = %e, "socket read error"); + break; + } + } + } +} + +// --------------------------------------------------------------------------- +// Command dispatch +// --------------------------------------------------------------------------- + +async fn dispatch(cmd: HerdrCommand, state: &SharedState) -> HerdrResponse { + match cmd { + HerdrCommand::Status => cmd_status(state).await, + HerdrCommand::ListSessions => cmd_list_sessions(state).await, + HerdrCommand::SpawnAgent { + provider, + model, + session_id, + system_prompt, + } => cmd_spawn_agent(state, provider, model, session_id, system_prompt).await, + HerdrCommand::KillAgent { agent_id } => cmd_kill_agent(state, agent_id).await, + HerdrCommand::GetSession { session_id } => cmd_get_session(state, session_id).await, + HerdrCommand::CompactSession { session_id } => cmd_compact_session(state, session_id).await, + } +} + +async fn cmd_status(state: &SharedState) -> HerdrResponse { + let session_count = state.sessions.len(); + let agent_count = state.agents.len(); + + let mut agents_status = Vec::new(); + for entry in state.agents.iter() { + let handle = entry.value(); + let status = handle.status.read().await; + agents_status.push(serde_json::json!({ + "id": handle.id, + "status": format!("{:?}", *status), + "provider": format!("{:?}", handle.config.provider), + "model": handle.config.model, + "created_at": handle.created_at, + })); + } + + HerdrResponse::ok(serde_json::json!({ + "daemon": "colibri-daemon", + "version": env!("CARGO_PKG_VERSION"), + "host": state.config.host, + "sessions": session_count, + "agents": agent_count, + "agent_list": agents_status, + })) +} + +async fn cmd_list_sessions(state: &SharedState) -> HerdrResponse { + let mut sessions = Vec::new(); + for entry in state.sessions.iter() { + let session = entry.value(); + sessions.push(serde_json::json!({ + "id": session.id, + "turn_count": session.turn_count().await, + "byte_count": session.byte_count().await, + "created_at": session.created_at, + })); + } + + HerdrResponse::ok(serde_json::json!({ + "sessions": sessions, + })) +} + +async fn cmd_spawn_agent( + state: &SharedState, + provider_str: String, + model: String, + session_id: Option, + system_prompt: Option, +) -> HerdrResponse { + let provider = match provider_str.to_lowercase().as_str() { + "deepseek" => Provider::DeepSeek, + "openrouter" => Provider::OpenRouter, + "anthropic" => Provider::Anthropic, + other => return HerdrResponse::err(format!("unknown provider: {other}")), + }; + + let agent_config = AgentSpawnConfig { + binary: std::env::var("COLIBRI_AGENT_BINARY") + .unwrap_or_else(|_| "hermes-agent".to_string()), + args: vec!["--mode".to_string(), "json".to_string()], + provider, + model, + session_id, + system_prompt, + ..Default::default() + }; + + let spawner = Spawner::new(state.config.clone().into()); + match spawner.spawn(agent_config).await { + Ok(handle) => { + let id = handle.id.clone(); + state.agents.insert(id.clone(), handle); + HerdrResponse::ok(serde_json::json!({ + "agent_id": id, + "status": "running", + })) + } + Err(e) => HerdrResponse::err(format!("spawn failed: {e}")), + } +} + +async fn cmd_kill_agent(state: &SharedState, agent_id: String) -> HerdrResponse { + match state.agents.get(&agent_id) { + Some(handle) => match handle.value().kill().await { + Ok(()) => { + state.agents.remove(&agent_id); + HerdrResponse::ok(serde_json::json!({ + "agent_id": agent_id, + "status": "stopped", + })) + } + Err(e) => HerdrResponse::err(format!("kill failed: {e}")), + }, + None => HerdrResponse::err(format!("agent not found: {agent_id}")), + } +} + +async fn cmd_get_session(state: &SharedState, session_id: String) -> HerdrResponse { + match state.sessions.get(&session_id) { + Some(session) => { + let turns = session.value().turns().await; + let messages = session.value().build_prompt_messages().await; + + HerdrResponse::ok(serde_json::json!({ + "session_id": session_id, + "turn_count": turns.len(), + "byte_count": session.value().byte_count().await, + "created_at": session.value().created_at, + "turns": turns, + "prompt_messages": messages, + })) + } + None => HerdrResponse::err(format!("session not found: {session_id}")), + } +} + +async fn cmd_compact_session(state: &SharedState, session_id: String) -> HerdrResponse { + match state.sessions.get(&session_id) { + Some(session) => match session.value().compact_oldest_turns().await { + Ok(()) => HerdrResponse::ok(serde_json::json!({ + "session_id": session_id, + "turn_count": session.value().turn_count().await, + "status": "compacted", + })), + Err(e) => HerdrResponse::err(format!("compaction failed: {e}")), + }, + None => HerdrResponse::err(format!("session not found: {session_id}")), + } +} diff --git a/crates/colibri-daemon/src/spawner.rs b/crates/colibri-daemon/src/spawner.rs new file mode 100644 index 0000000..f73c55c --- /dev/null +++ b/crates/colibri-daemon/src/spawner.rs @@ -0,0 +1,383 @@ +//! Agent subprocess management — replaces agent-runner.ts. +//! +//! Spawns agent subprocesses with provider/model configuration, tracks +//! running agents, and implements retry/backoff for API calls. +//! +//! Provider routing priority: +//! 1. DeepSeek (primary, cache-capable) +//! 2. OpenRouter (fallback) +//! 3. Anthropic (fallback) + +use std::collections::HashMap; +use std::process::Stdio; +use std::sync::Arc; +use std::time::Duration; + +use backon::{ExponentialBuilder, Retryable}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use tokio::process::{Child, Command}; +use tokio::sync::{Mutex, RwLock}; +use tracing::{error, info, warn}; +use uuid::Uuid; + +use crate::DaemonConfig; + +// --------------------------------------------------------------------------- +// Provider types +// --------------------------------------------------------------------------- + +/// Supported LLM providers. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum Provider { + DeepSeek, + OpenRouter, + Anthropic, +} + +impl Provider { + /// Priority order for fallback routing. + pub fn fallback_order() -> &'static [Provider] { + &[Provider::DeepSeek, Provider::OpenRouter, Provider::Anthropic] + } + + /// API endpoint for a provider (from config). + pub fn endpoint<'a>(&self, config: &'a DaemonConfig) -> Option<&'a str> { + match self { + Provider::DeepSeek => Some(&config.deepseek_endpoint), + Provider::OpenRouter => Some(&config.openrouter_endpoint), + Provider::Anthropic => Some(&config.anthropic_endpoint), + } + } + + /// API key for a provider (from config). + pub fn api_key<'a>(&self, config: &'a DaemonConfig) -> Option<&'a str> { + match self { + Provider::DeepSeek => config.deepseek_api_key.as_deref(), + Provider::OpenRouter => config.openrouter_api_key.as_deref(), + Provider::Anthropic => config.anthropic_api_key.as_deref(), + } + } +} + +// --------------------------------------------------------------------------- +// Agent spawn configuration +// --------------------------------------------------------------------------- + +/// Configuration for spawning an agent subprocess. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AgentSpawnConfig { + /// Agent binary or command to run (e.g. "pi", "hermes-agent", etc.). + pub binary: String, + /// Command-line arguments. + #[serde(default)] + pub args: Vec, + /// Environment variables to inject. + #[serde(default)] + pub env: HashMap, + /// Working directory for the subprocess. + #[serde(default)] + pub working_dir: Option, + /// Primary provider. + #[serde(default = "default_provider")] + pub provider: Provider, + /// Model name. + pub model: String, + /// Session ID to attach. + #[serde(default)] + pub session_id: Option, + /// System prompt override. + #[serde(default)] + pub system_prompt: Option, + /// Maximum retries for spawn failures. + #[serde(default = "default_max_retries")] + pub max_retries: u32, + /// Timeout for subprocess startup (seconds). + #[serde(default = "default_startup_timeout_secs")] + pub startup_timeout_secs: u64, +} + +fn default_provider() -> Provider { + Provider::DeepSeek +} + +fn default_max_retries() -> u32 { + 3 +} + +fn default_startup_timeout_secs() -> u64 { + 30 +} + +impl Default for AgentSpawnConfig { + fn default() -> Self { + Self { + binary: String::new(), + args: Vec::new(), + env: HashMap::new(), + working_dir: None, + provider: default_provider(), + model: String::new(), + session_id: None, + system_prompt: None, + max_retries: default_max_retries(), + startup_timeout_secs: default_startup_timeout_secs(), + } + } +} + +// --------------------------------------------------------------------------- +// Agent handle +// --------------------------------------------------------------------------- + +/// Agent status. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum AgentStatus { + Starting, + Running, + Stopped, + Failed(String), +} + +/// Handle to a running agent subprocess. +pub struct AgentHandle { + /// Unique agent ID. + pub id: String, + /// Agent configuration. + pub config: AgentSpawnConfig, + /// Current status. + pub status: RwLock, + /// Subprocess handle (Some only when running). + child: Mutex>, + /// Creation timestamp. + pub created_at: String, +} + +impl AgentHandle { + pub fn new(config: AgentSpawnConfig) -> Self { + Self { + id: Uuid::new_v4().to_string(), + config, + status: RwLock::new(AgentStatus::Starting), + child: Mutex::new(None), + created_at: chrono::Utc::now().to_rfc3339(), + } + } + + /// Kill the agent subprocess. + pub async fn kill(&self) -> Result<(), SpawnerError> { + let mut child = self.child.lock().await; + if let Some(ref mut child) = *child { + child.kill().await.map_err(SpawnerError::Io)?; + child.wait().await.map_err(SpawnerError::Io)?; + } + *child = None; + *self.status.write().await = AgentStatus::Stopped; + Ok(()) + } + + /// Check if the process has exited. + pub async fn poll_exit(&self) -> Option { + let mut child_guard = self.child.lock().await; + if let Some(ref mut proc) = *child_guard { + match proc.try_wait() { + Ok(Some(status)) => { + *child_guard = None; + *self.status.write().await = AgentStatus::Stopped; + Some(status) + } + Ok(None) => None, + Err(_) => { + *child_guard = None; + *self.status.write().await = AgentStatus::Failed("poll error".to_string()); + None + } + } + } else { + None + } + } +} + +// --------------------------------------------------------------------------- +// Spawner +// --------------------------------------------------------------------------- + +#[derive(Debug, Error)] +pub enum SpawnerError { + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + #[error("no API key configured for provider {provider}")] + NoApiKey { provider: String }, + #[error("agent {agent_id} not found")] + AgentNotFound { agent_id: String }, + #[error("all providers exhausted for agent {agent_id}")] + AllProvidersExhausted { agent_id: String }, + #[error("spawn timeout after {secs}s")] + SpawnTimeout { secs: u64 }, +} + +/// The agent spawner — manages subprocess lifecycle. +pub struct Spawner { + config: Arc, +} + +impl Spawner { + pub fn new(config: Arc) -> Self { + Self { config } + } + + /// Spawn an agent subprocess. Tries the primary provider first, then + /// falls back through the provider list. Retries spawn failures with + /// exponential backoff. + pub async fn spawn(&self, agent_config: AgentSpawnConfig) -> Result { + let agent_id = Uuid::new_v4().to_string(); + + // Build the provider routing list: primary first, then fallbacks. + let providers = { + let mut providers = vec![agent_config.provider.clone()]; + for p in Provider::fallback_order() { + if *p != agent_config.provider { + providers.push(p.clone()); + } + } + providers + }; + + let mut last_error: Option = None; + + for provider in &providers { + info!( + agent_id = %agent_id, + provider = ?provider, + model = %agent_config.model, + "attempting spawn" + ); + + // Check API key availability + if provider.api_key(&self.config).is_none() { + warn!( + agent_id = %agent_id, + provider = ?provider, + "no API key configured, skipping" + ); + last_error = Some(SpawnerError::NoApiKey { + provider: format!("{provider:?}"), + }); + continue; + } + + let handle = AgentHandle::new(agent_config.clone()); + + // Build environment for this provider + let mut env_map = agent_config.env.clone(); + env_map.insert( + format!("{}_API_KEY", provider_name_upper(provider)), + provider.api_key(&self.config).unwrap_or("").to_string(), + ); + if let Some(endpoint) = provider.endpoint(&self.config) { + env_map.insert( + format!("{}_ENDPOINT", provider_name_upper(provider)), + endpoint.to_string(), + ); + } + env_map.insert("COLIBRI_PROVIDER".to_string(), format!("{provider:?}").to_lowercase()); + env_map.insert("COLIBRI_MODEL".to_string(), agent_config.model.clone()); + env_map.insert("COLIBRI_AGENT_ID".to_string(), agent_id.clone()); + if let Some(ref session_id) = agent_config.session_id { + env_map.insert("COLIBRI_SESSION_ID".to_string(), session_id.clone()); + } + if let Some(ref system_prompt) = agent_config.system_prompt { + env_map.insert("COLIBRI_SYSTEM_PROMPT".to_string(), system_prompt.clone()); + } + + // Spawn with retry/backoff + let spawn_result = { + let binary = agent_config.binary.clone(); + let args = agent_config.args.clone(); + let env = env_map.clone(); + let working_dir = agent_config.working_dir.clone(); + let _startup_timeout = agent_config.startup_timeout_secs; + + let op = || async { + let mut cmd = Command::new(&binary); + cmd.args(&args) + .envs(&env) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + if let Some(ref dir) = working_dir { + cmd.current_dir(dir); + } + + let mut child = cmd.spawn().map_err(SpawnerError::Io)?; + + // Brief grace period for the process to start + tokio::time::sleep(Duration::from_millis(500)).await; + + // Check it hasn't immediately crashed + match child.try_wait() { + Ok(Some(status)) => { + let stderr = if let Some(ref mut stderr) = child.stderr.take() { + use tokio::io::AsyncReadExt; + let mut buf = Vec::new(); + let _ = stderr.read_to_end(&mut buf).await; + String::from_utf8_lossy(&buf).to_string() + } else { + String::new() + }; + Err(SpawnerError::Io(std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "process exited immediately with status {status}: {stderr}" + ), + ))) + } + Ok(None) => Ok(child), + Err(e) => Err(SpawnerError::Io(e)), + } + }; + + let backoff = ExponentialBuilder::default() + .with_min_delay(Duration::from_millis(500)) + .with_max_delay(Duration::from_secs(10)) + .with_max_times(agent_config.max_retries as usize); + + op.retry(backoff).await + }; + + match spawn_result { + Ok(child) => { + *handle.child.lock().await = Some(child); + *handle.status.write().await = AgentStatus::Running; + info!(agent_id = %agent_id, provider = ?provider, "agent spawned successfully"); + return Ok(handle); + } + Err(e) => { + error!( + agent_id = %agent_id, + provider = ?provider, + error = %e, + "spawn failed for provider" + ); + last_error = Some(e); + } + } + } + + Err(last_error.unwrap_or(SpawnerError::AllProvidersExhausted { + agent_id, + })) + } +} + +fn provider_name_upper(provider: &Provider) -> &'static str { + match provider { + Provider::DeepSeek => "DEEPSEEK", + Provider::OpenRouter => "OPENROUTER", + Provider::Anthropic => "ANTHROPIC", + } +}