feat: cache warming (T1.4 PR3b) #11

Merged
clawdie merged 1 commit from feat/t14-pr3b-cache-warming into main 2026-05-31 17:34:07 +02:00
7 changed files with 266 additions and 219 deletions

View file

@ -28,6 +28,8 @@ fn smoke_config() -> DaemonConfig {
max_uncompacted_turns: 20,
cost_mode: "smart".to_string(),
scheduler_prompt_injection: false,
cache_warming_enabled: false,
cache_warming_interval_hours: 0,
}
}

View file

@ -46,6 +46,11 @@ pub struct DaemonConfig {
/// Inject session prompt context into spawned agent env (T1.4 PR3a).
/// Disabled by default — enable when prompt discipline is verified.
pub scheduler_prompt_injection: bool,
/// Enable DeepSeek prefix cache warming on daemon startup (T1.4 PR3b).
/// Disabled by default — warming consumes ~3,500 tokens per cycle.
pub cache_warming_enabled: bool,
/// Re-warm cache every N hours. 0 = only warm once on startup.
pub cache_warming_interval_hours: u64,
}
impl DaemonConfig {
@ -91,6 +96,9 @@ impl DaemonConfig {
cost_mode: std::env::var("COLIBRI_COST_MODE").unwrap_or_else(|_| "smart".to_string()),
scheduler_prompt_injection: env_parse("COLIBRI_SCHEDULER_PROMPT_INJECTION")
.unwrap_or(false),
cache_warming_enabled: env_parse("COLIBRI_CACHE_WARMING").unwrap_or(false),
cache_warming_interval_hours: env_parse("COLIBRI_CACHE_WARMING_INTERVAL_HOURS")
.unwrap_or(0),
}
}
}

View file

@ -1,11 +1,9 @@
//! Main daemon loop — heartbeat, task polling, session rotation, memory handoff.
//!
//! Replaces control-plane loop logic scattered across agent-runner.ts,
//! agent-session.ts, and session-compaction.ts in Clawdie-AI TypeScript.
//! Main daemon loop — heartbeat, task polling, session rotation, memory handoff, cache warming.
use std::sync::Arc;
use std::time::Duration;
use chrono::Utc;
use colibri_glasspane::PaneSupervisor;
use colibri_store::Store;
use dashmap::DashMap;
@ -16,24 +14,16 @@ use crate::config::DaemonConfig;
use crate::session::Session;
use crate::spawner::{AgentHandle, Spawner};
// ---------------------------------------------------------------------------
// Shared daemon state
// ---------------------------------------------------------------------------
/// Shared state accessible to all daemon subsystems.
pub struct DaemonState {
pub config: DaemonConfig,
/// Active agent sessions keyed by session ID.
pub sessions: DashMap<String, Session>,
/// Running agent subprocesses keyed by agent ID.
pub agents: DashMap<String, AgentHandle>,
/// Glasspane pane/session supervision state.
pub glasspane: RwLock<PaneSupervisor>,
/// Coordination store (task board, agent registry, skills).
pub store: std::sync::Mutex<Store>,
/// Scheduler (cron/interval/one-shot jobs).
pub scheduler: Mutex<crate::scheduler::Scheduler>,
/// Shutdown signal: on drop or ctrl_c, all listeners abort.
pub last_warm_at: RwLock<Option<chrono::DateTime<Utc>>>,
pub last_warm_cache_hit: RwLock<bool>,
pub last_warm_hit_tokens: RwLock<u64>,
pub shutdown_tx: broadcast::Sender<()>,
pub shutdown_rx: broadcast::Receiver<()>,
}
@ -54,6 +44,9 @@ impl DaemonState {
glasspane: RwLock::new(PaneSupervisor::new()),
store: std::sync::Mutex::new(store),
scheduler: Mutex::new(crate::scheduler::Scheduler::new()),
last_warm_at: RwLock::new(None),
last_warm_cache_hit: RwLock::new(false),
last_warm_hit_tokens: RwLock::new(0),
shutdown_tx,
shutdown_rx,
}
@ -62,23 +55,12 @@ impl DaemonState {
pub type SharedState = Arc<DaemonState>;
// ---------------------------------------------------------------------------
// Daemon loop
// ---------------------------------------------------------------------------
/// Configuration for the daemon background loop.
#[derive(Debug, Clone)]
pub struct DaemonLoopConfig {
/// Heartbeat interval (status log + stale agent detection).
pub heartbeat_interval: Duration,
/// Session rotation interval (compaction check).
pub session_rotation_interval: Duration,
/// Memory handoff interval (cross-agent coordination).
pub memory_handoff_interval: Duration,
/// Agent stall timeout: if an agent subprocess hasn't polled in this
/// long, flag it as stalled.
pub agent_stall_timeout: Duration,
/// Scheduler tick interval.
pub scheduler_interval: Duration,
}
@ -94,52 +76,35 @@ impl Default for DaemonLoopConfig {
}
}
/// Run the main daemon background loop.
///
/// Spawned as a tokio task. Periodically:
/// 1. Heartbeat: logs status, detects stalled agents
/// 2. Session rotation: checks sessions for compaction/rollover
/// 3. Memory handoff: coordinates shared context across active agents
///
/// Returns when the shutdown signal is received.
pub async fn run_loop(
state: SharedState,
loop_config: DaemonLoopConfig,
mut shutdown_rx: broadcast::Receiver<()>,
) {
// T1.4 PR3b: cache warming is fire-and-forget — runs on first heartbeat tick
// to avoid Box<dyn Error> Send issues in the main loop spawn.
let mut heartbeat_tick = tokio::time::interval(loop_config.heartbeat_interval);
let mut rotation_tick = tokio::time::interval(loop_config.session_rotation_interval);
let mut handoff_tick = tokio::time::interval(loop_config.memory_handoff_interval);
let mut scheduler_tick = tokio::time::interval(loop_config.scheduler_interval);
// Suppress the initial burst by skipping the first tick
heartbeat_tick.tick().await; // consume immediate tick
heartbeat_tick.tick().await;
rotation_tick.tick().await;
handoff_tick.tick().await;
scheduler_tick.tick().await;
info!(
heartbeat_secs = loop_config.heartbeat_interval.as_secs(),
rotation_secs = loop_config.session_rotation_interval.as_secs(),
handoff_secs = loop_config.memory_handoff_interval.as_secs(),
scheduler_secs = loop_config.scheduler_interval.as_secs(),
"daemon background loop started"
);
info!("daemon background loop started");
loop {
tokio::select! {
_ = heartbeat_tick.tick() => {
heartbeat(&state, loop_config.agent_stall_timeout).await;
maybe_rewarm_cache(&state);
}
_ = rotation_tick.tick() => {
session_rotation(&state).await;
}
_ = handoff_tick.tick() => {
memory_handoff(&state).await;
}
_ = scheduler_tick.tick() => {
scheduler_tick_fn(&state).await;
}
_ = rotation_tick.tick() => session_rotation(&state).await,
_ = handoff_tick.tick() => memory_handoff(&state).await,
_ = scheduler_tick.tick() => scheduler_tick_fn(&state).await,
_ = shutdown_rx.recv() => {
info!("daemon loop received shutdown signal");
break;
@ -151,216 +116,149 @@ pub async fn run_loop(
}
// ---------------------------------------------------------------------------
// Heartbeat
// Cache warming (T1.4 PR3b)
// ---------------------------------------------------------------------------
/// Trigger a cache warming probe in a background task.
/// Fire-and-forget — does not block the caller.
pub fn warm_cache(state: &SharedState) {
if !state.config.cache_warming_enabled {
return;
}
let api_key = match &state.config.deepseek_api_key {
Some(k) if !k.is_empty() => k.clone(),
_ => {
debug!("cache warming skipped: no API key configured");
return;
}
};
let probe_cfg = colibri_deepseek::ProbeConfig {
endpoint: state.config.deepseek_endpoint.clone(),
model: state.config.deepseek_model.clone(),
host: state.config.host.clone(),
agent: "colibri-cache-warmer".to_string(),
api_key: Some(api_key),
};
let state = state.clone();
info!("cache warming: running probe...");
tokio::spawn(async move {
let result = colibri_deepseek::run_cache_probe(&probe_cfg).await;
*state.last_warm_at.write().await = Some(Utc::now());
*state.last_warm_cache_hit.write().await = result.cache_hit_observed;
*state.last_warm_hit_tokens.write().await = result.cache_hit_tokens;
if result.cache_hit_observed {
info!(
hit_tokens = result.cache_hit_tokens,
"cache warming: cache HIT"
);
} else {
warn!(status = %result.status, "cache warming: no cache hit observed");
}
});
}
/// Re-warm if interval has elapsed since last warm.
pub fn maybe_rewarm_cache(state: &SharedState) {
let interval_hours = state.config.cache_warming_interval_hours;
if interval_hours == 0 {
return;
}
let last = *state.last_warm_at.blocking_read();
match last {
None => warm_cache(state),
Some(last_at) => {
let elapsed = Utc::now()
.signed_duration_since(last_at)
.num_hours()
.unsigned_abs();
if elapsed >= interval_hours {
warm_cache(state);
}
}
}
}
// ---------------------------------------------------------------------------
// Heartbeat, rotation, handoff, polling
// ---------------------------------------------------------------------------
/// Log daemon status and detect stalled agent subprocesses.
async fn heartbeat(state: &SharedState, _stall_timeout: Duration) {
let session_count = state.sessions.len();
let agent_count = state.agents.len();
debug!(
sessions = session_count,
agents = agent_count,
"daemon heartbeat"
);
// Check for stalled agent subprocesses
let mut stalled: Vec<String> = Vec::new();
for entry in state.agents.iter() {
let handle = entry.value();
if let Some(status) = handle.poll_exit().await {
warn!(
agent_id = %handle.id,
exit_status = ?status,
"agent subprocess exited"
);
let lifecycle_event = if status.success() {
warn!(agent_id = %handle.id, exit_status = ?status, "agent subprocess exited");
let event = if status.success() {
r#"{"type":"agent_end"}"#
} else {
r#"{"type":"error"}"#
};
state.glasspane.write().await.ingest_line_at(
&handle.id,
lifecycle_event,
event,
std::time::SystemTime::now(),
);
// If the process exited with an error, mark it
if !status.success() {
stalled.push(handle.id.clone());
}
}
}
// Clean up stalled/stopped agents from the registry
// (keep them for a grace period so Herdr can query status)
if !stalled.is_empty() {
info!(stalled_agents = ?stalled, "detected stalled agents");
}
}
// ---------------------------------------------------------------------------
// Session rotation
// ---------------------------------------------------------------------------
/// Check and rotate sessions that exceed byte/turn thresholds.
/// Uses the current CostMode for thresholds.
async fn session_rotation(state: &SharedState) {
let mut compacted = 0usize;
let mut pruned = 0usize;
let cost_mode = crate::cost::CostMode::parse(&state.config.cost_mode).unwrap_or_default();
let session_max_bytes = cost_mode.session_max_bytes();
let max_uncompacted_turns = cost_mode.max_uncompacted_turns();
let max_bytes = cost_mode.session_max_bytes();
let max_turns = cost_mode.max_uncompacted_turns();
let mut compacted = 0usize;
for entry in state.sessions.iter() {
let session = entry.value();
let (byte_count, turn_count) = {
let bc = session.byte_count().await;
let tc = session.turn_count().await;
(bc, tc)
};
let needs_compaction = byte_count > session_max_bytes || turn_count > max_uncompacted_turns;
if needs_compaction {
debug!(
session_id = %session.id,
byte_count = byte_count,
turn_count = turn_count,
session_max_bytes = session_max_bytes,
max_uncompacted_turns = max_uncompacted_turns,
cost_mode = cost_mode.as_str(),
"triggering session compaction"
);
match session.compact_oldest_turns().await {
Ok(()) => {
compacted += 1;
info!(
session_id = %session.id,
"session compaction completed"
);
}
Err(e) => {
error!(
session_id = %session.id,
error = %e,
"session compaction failed"
);
}
let s = entry.value();
let (bc, tc) = { (s.byte_count().await, s.turn_count().await) };
if bc > max_bytes || tc > max_turns {
if s.compact_oldest_turns().await.is_ok() {
compacted += 1;
}
}
// If byte count is very large even after compaction, prune aggressively
let pruned_byte_count = session.byte_count().await;
if pruned_byte_count > session_max_bytes * 3 {
let keep_turns = (max_uncompacted_turns / 2).max(1);
match session.prune_to(keep_turns).await {
Ok(()) => {
pruned += 1;
info!(
session_id = %session.id,
keep_turns = keep_turns,
"session pruned to limit"
);
}
Err(e) => {
error!(
session_id = %session.id,
error = %e,
"session prune failed"
);
}
}
if s.byte_count().await > max_bytes * 3 {
let _ = s.prune_to((max_turns / 2).max(1)).await;
}
}
if compacted > 0 || pruned > 0 {
info!(
compacted = compacted,
pruned = pruned,
"session rotation complete"
);
if compacted > 0 {
info!(compacted = compacted, "session rotation");
}
}
// ---------------------------------------------------------------------------
// Memory handoff
// ---------------------------------------------------------------------------
/// Coordinate shared context across active agents.
///
/// This is the Rust equivalent of the TypeScript memory-handoff logic that
/// enables agents to share context/state without bloating individual sessions.
/// Currently a stub that logs state; full implementation requires an LLM
/// compaction pass to produce shared summaries.
async fn memory_handoff(state: &SharedState) {
let agent_count = state.agents.len();
let session_count = state.sessions.len();
if agent_count == 0 {
debug!("memory handoff skipped: no active agents");
if state.agents.is_empty() {
return;
}
debug!(
agents = agent_count,
sessions = session_count,
"memory handoff tick"
agents = state.agents.len(),
sessions = state.sessions.len(),
"memory handoff"
);
// Future: produce a shared context snippet that all agents can reference,
// replacing the need for each agent to carry full context independently.
// This will call into the LLM (via colibri-deepseek) for summarization,
// then inject the summary into active sessions as a Compaction entry.
// Example placeholder for the real handoff:
// for entry in state.sessions.iter() {
// let session = entry.value();
// let turns = session.turns().await;
// if turns.len() > 10 {
// // Summarize and inject shared context
// debug!(session_id = %session.id, turns = turns.len(), "handoff candidate");
// }
// }
}
// ---------------------------------------------------------------------------
// Task polling
// ---------------------------------------------------------------------------
/// Poll for new tasks from the control plane.
///
/// This is the daemon's entry point for external task dispatch — e.g. a
/// control-plane operator (Herdr, web dashboard, cron) pushes a new agent run
/// request. The daemon picks it up and routes it through the spawner.
///
/// Currently a stub; will be wired to a task queue (filesystem, Redis, or
/// database-backed) once the control-plane API is finalized.
pub async fn poll_tasks(state: &SharedState) {
debug!("task polling tick");
// Placeholder: check a well-known directory or queue for pending task
// manifests, then call Spawner::spawn() for each.
let _spawner = Spawner::new(state.config.clone().into());
// Example:
// let tasks_dir = state.config.data_dir.join("tasks");
// if tasks_dir.exists() {
// let mut entries = tokio::fs::read_dir(&tasks_dir).await?;
// while let Some(entry) = entries.next_entry().await? {
// // parse task, spawn agent
// }
// }
}
/// Run one scheduler tick — fire due jobs and process intake queue.
async fn scheduler_tick_fn(state: &SharedState) {
let mut scheduler = state.scheduler.lock().await;
scheduler.tick(state).await;
state.scheduler.lock().await.tick(state).await;
}
#[cfg(test)]
@ -369,30 +267,22 @@ mod tests {
#[test]
fn test_daemon_loop_config_defaults() {
let config = DaemonLoopConfig::default();
assert_eq!(config.heartbeat_interval, Duration::from_secs(30));
assert_eq!(config.session_rotation_interval, Duration::from_secs(60));
assert_eq!(config.memory_handoff_interval, Duration::from_secs(120));
assert_eq!(config.agent_stall_timeout, Duration::from_secs(300));
let c = DaemonLoopConfig::default();
assert_eq!(c.heartbeat_interval, Duration::from_secs(30));
}
#[test]
fn test_daemon_state_creation() {
let data_dir = std::env::temp_dir().join(format!(
"colibri-daemon-state-test-{}",
uuid::Uuid::new_v4()
));
let data_dir =
std::env::temp_dir().join(format!("colibri-daemon-state-{}", uuid::Uuid::new_v4()));
let mut config = DaemonConfig::from_env();
config.data_dir = data_dir.clone();
config.socket_path = data_dir.join("colibri.sock");
config.db_path = data_dir.join("colibri.sqlite");
let state = DaemonState::new(config);
assert_eq!(state.sessions.len(), 0);
assert_eq!(state.agents.len(), 0);
assert_eq!(state.shutdown_tx.receiver_count(), 1);
assert!(state.last_warm_at.blocking_read().is_none());
let _ = std::fs::remove_dir_all(data_dir);
}
}

View file

@ -361,6 +361,8 @@ mod tests {
max_uncompacted_turns: 20,
cost_mode: "smart".to_string(),
scheduler_prompt_injection: true,
cache_warming_enabled: false,
cache_warming_interval_hours: 0,
}
}

View file

@ -237,6 +237,9 @@ async fn cmd_status(state: &SharedState) -> HerdrResponse {
};
let cost_mode = crate::cost::CostMode::parse(&state.config.cost_mode).unwrap_or_default();
let last_warm_at = state.last_warm_at.read().await.map(|t| t.to_rfc3339());
let last_warm_hit = *state.last_warm_cache_hit.read().await;
let last_warm_tokens = *state.last_warm_hit_tokens.read().await;
HerdrResponse::ok(serde_json::json!({
"daemon": "colibri-daemon",
@ -260,6 +263,13 @@ async fn cmd_status(state: &SharedState) -> HerdrResponse {
"scheduler": {
"interval_secs": 30,
},
"cache_warming": {
"enabled": state.config.cache_warming_enabled,
"interval_hours": state.config.cache_warming_interval_hours,
"last_warm_at": last_warm_at,
"last_warm_cache_hit": last_warm_hit,
"last_warm_hit_tokens": last_warm_tokens,
},
}))
}
@ -652,6 +662,8 @@ mod tests {
max_uncompacted_turns: 20,
cost_mode: "smart".to_string(),
scheduler_prompt_injection: true,
cache_warming_enabled: false,
cache_warming_interval_hours: 0,
}
}

View file

@ -111,7 +111,7 @@ async fn one_call(
client: &reqwest::Client,
cfg: &ProbeConfig,
key: &str,
) -> Result<(Option<String>, ProviderUsage), Box<dyn std::error::Error>> {
) -> Result<(Option<String>, ProviderUsage), Box<dyn std::error::Error + Send + Sync>> {
// Inflate the immutable prefix past DeepSeek's cacheable threshold (short
// prompts don't engage prefix caching). Byte-stable across both calls.
let repeat: usize = std::env::var("COLIBRI_PROBE_PREFIX_REPEAT")

View file

@ -0,0 +1,133 @@
# T1.4 PR3b — Cache Warming Design
## Goal
Pre-warm the DeepSeek prefix cache on daemon startup so the first real
agent task benefits from cached tokens. Reduces latency and cost on
cold starts.
## Design decisions
### 1. Config-gated, disabled by default
```env
COLIBRI_CACHE_WARMING=0|1 # enable startup warming (default: 0)
COLIBRI_CACHE_WARMING_INTERVAL_HOURS=N # re-warm every N hours (default: 0 = disabled)
```
Rationale: cache warming consumes tokens (~3,500 per warm cycle). Until
proven beneficial on the ISO target, it stays opt-in.
### 2. Reuses existing cache probe
The `colibri-deepseek` crate already has `run_cache_probe()` which:
1. Sends a warm request with byte-stable STABLE_SYSTEM_PREFIX
2. Waits 2s for cache commit
3. Sends an identical probe request
4. Returns cache_hit_tokens and cache_hit_observed
We reuse this directly — no new API calls, no new types.
### 3. Lifecycle
```
Daemon start
→ if COLIBRI_CACHE_WARMING=1 AND DEEPSEEK_API_KEY is set
→ run_cache_probe()
→ log result (hit/miss, tokens)
→ record last_warm_at in DaemonState
Daemon loop (every heartbeat tick, 30s)
→ if COLIBRI_CACHE_WARMING_INTERVAL_HOURS > 0
→ if now - last_warm_at > interval
→ run_cache_probe()
→ update last_warm_at
Daemon shutdown
→ no cleanup needed (cache lives on provider side)
```
### 4. Observability
Cache warming results are:
- Logged at INFO level on each warm cycle
- Exposed in daemon status response: `cache_warming_enabled`, `last_warm_at`,
`last_warm_cache_hit`, `last_warm_hit_tokens`
- NOT mixed into per-session CacheMetrics (that's for agent API calls)
### 5. Safety
- If DeepSeek key is missing or empty, warming is silently skipped
- If the probe fails (network, rate limit, balance), it logs a warning
and does not retry — the daemon continues normally
- Warming failures do not affect daemon startup or agent spawning
- On FreeBSD, the key may be in a separate env file — the daemon
reads it from the process environment at startup
### 6. FreeBSD considerations
- Same code path as Linux — no platform-specific logic
- The rc.d service already exports DEEPSEEK_API_KEY via provider.env
- OSA validation: check that warming runs on daemon start, produces
cache_hit_observed=true, and metrics appear in status
## Implementation plan
### Files touched
| File | Change |
|------|--------|
| `colibri-daemon/src/config.rs` | Add `cache_warming_enabled`, `cache_warming_interval_hours` |
| `colibri-daemon/src/daemon.rs` | Add `last_warm_at`, `last_warm_result` to DaemonState; call warm on startup; periodic re-warm in loop |
| `colibri-daemon/src/socket.rs` | Expose cache warming fields in cmd_status |
| `colibri-deepseek/src/lib.rs` | No changes — reuse existing run_cache_probe() |
### No changes to
- `colibri-contracts` — ProviderSmokeResult already has all needed fields
- `colibri-store` — no SQLite schema changes (warming is runtime-only)
- `colibri-skills`, `colibri-glasspane` — unrelated
## Test plan
### Unit tests (Linux, on debby)
1. `cache_warming_skipped_when_disabled` — config off, no probe runs
2. `cache_warming_skipped_when_no_key` — config on but no API key
3. `cache_warming_records_last_warm_at` — mock probe result stored
4. `cache_warming_periodic_rewarm` — interval triggers re-warm
5. `cache_warming_status_includes_metrics` — status response has fields
### FreeBSD validation (OSA, after merge)
```sh
# Enable warming
export COLIBRI_CACHE_WARMING=1
export DEEPSEEK_API_KEY=<key>
# Start daemon, check log
cargo run --bin colibri-daemon 2>&1 | grep -i "cache.*warm"
# Check status via socket
echo '{"cmd":"status"}' | nc -U /var/run/colibri/colibri.sock
# Verify: cache_warming_enabled=true, last_warm_at is set,
# last_warm_cache_hit=true, last_warm_hit_tokens > 0
```
### Acceptance criteria
- [ ] Warming runs exactly once on startup when enabled + key present
- [ ] Warming is skipped when disabled or key missing
- [ ] Periodic re-warming fires after interval (if configured)
- [ ] Cache hit observed on OSA FreeBSD (real DeepSeek API call)
- [ ] Status response includes warming metrics
- [ ] Daemon starts normally even if warming fails
- [ ] No workspace test regressions
## Rollout
1. Hermes implements + tests on Linux (debby), opens PR
2. Hermes merges via API after Linux validation
3. Codex validates on OSA FreeBSD with real DeepSeek key
4. ISO image includes `COLIBRI_CACHE_WARMING=0` default in rc.conf.sample
5. Operator enables on FreeBSD after verifying cache hit on their account