feat: cache warming (T1.4 PR3b) #11
7 changed files with 266 additions and 219 deletions
|
|
@ -28,6 +28,8 @@ fn smoke_config() -> DaemonConfig {
|
|||
max_uncompacted_turns: 20,
|
||||
cost_mode: "smart".to_string(),
|
||||
scheduler_prompt_injection: false,
|
||||
cache_warming_enabled: false,
|
||||
cache_warming_interval_hours: 0,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -46,6 +46,11 @@ pub struct DaemonConfig {
|
|||
/// Inject session prompt context into spawned agent env (T1.4 PR3a).
|
||||
/// Disabled by default — enable when prompt discipline is verified.
|
||||
pub scheduler_prompt_injection: bool,
|
||||
/// Enable DeepSeek prefix cache warming on daemon startup (T1.4 PR3b).
|
||||
/// Disabled by default — warming consumes ~3,500 tokens per cycle.
|
||||
pub cache_warming_enabled: bool,
|
||||
/// Re-warm cache every N hours. 0 = only warm once on startup.
|
||||
pub cache_warming_interval_hours: u64,
|
||||
}
|
||||
|
||||
impl DaemonConfig {
|
||||
|
|
@ -91,6 +96,9 @@ impl DaemonConfig {
|
|||
cost_mode: std::env::var("COLIBRI_COST_MODE").unwrap_or_else(|_| "smart".to_string()),
|
||||
scheduler_prompt_injection: env_parse("COLIBRI_SCHEDULER_PROMPT_INJECTION")
|
||||
.unwrap_or(false),
|
||||
cache_warming_enabled: env_parse("COLIBRI_CACHE_WARMING").unwrap_or(false),
|
||||
cache_warming_interval_hours: env_parse("COLIBRI_CACHE_WARMING_INTERVAL_HOURS")
|
||||
.unwrap_or(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +1,9 @@
|
|||
//! Main daemon loop — heartbeat, task polling, session rotation, memory handoff.
|
||||
//!
|
||||
//! Replaces control-plane loop logic scattered across agent-runner.ts,
|
||||
//! agent-session.ts, and session-compaction.ts in Clawdie-AI TypeScript.
|
||||
//! Main daemon loop — heartbeat, task polling, session rotation, memory handoff, cache warming.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use chrono::Utc;
|
||||
use colibri_glasspane::PaneSupervisor;
|
||||
use colibri_store::Store;
|
||||
use dashmap::DashMap;
|
||||
|
|
@ -16,24 +14,16 @@ use crate::config::DaemonConfig;
|
|||
use crate::session::Session;
|
||||
use crate::spawner::{AgentHandle, Spawner};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Shared daemon state
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Shared state accessible to all daemon subsystems.
|
||||
pub struct DaemonState {
|
||||
pub config: DaemonConfig,
|
||||
/// Active agent sessions keyed by session ID.
|
||||
pub sessions: DashMap<String, Session>,
|
||||
/// Running agent subprocesses keyed by agent ID.
|
||||
pub agents: DashMap<String, AgentHandle>,
|
||||
/// Glasspane pane/session supervision state.
|
||||
pub glasspane: RwLock<PaneSupervisor>,
|
||||
/// Coordination store (task board, agent registry, skills).
|
||||
pub store: std::sync::Mutex<Store>,
|
||||
/// Scheduler (cron/interval/one-shot jobs).
|
||||
pub scheduler: Mutex<crate::scheduler::Scheduler>,
|
||||
/// Shutdown signal: on drop or ctrl_c, all listeners abort.
|
||||
pub last_warm_at: RwLock<Option<chrono::DateTime<Utc>>>,
|
||||
pub last_warm_cache_hit: RwLock<bool>,
|
||||
pub last_warm_hit_tokens: RwLock<u64>,
|
||||
pub shutdown_tx: broadcast::Sender<()>,
|
||||
pub shutdown_rx: broadcast::Receiver<()>,
|
||||
}
|
||||
|
|
@ -54,6 +44,9 @@ impl DaemonState {
|
|||
glasspane: RwLock::new(PaneSupervisor::new()),
|
||||
store: std::sync::Mutex::new(store),
|
||||
scheduler: Mutex::new(crate::scheduler::Scheduler::new()),
|
||||
last_warm_at: RwLock::new(None),
|
||||
last_warm_cache_hit: RwLock::new(false),
|
||||
last_warm_hit_tokens: RwLock::new(0),
|
||||
shutdown_tx,
|
||||
shutdown_rx,
|
||||
}
|
||||
|
|
@ -62,23 +55,12 @@ impl DaemonState {
|
|||
|
||||
pub type SharedState = Arc<DaemonState>;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Daemon loop
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Configuration for the daemon background loop.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DaemonLoopConfig {
|
||||
/// Heartbeat interval (status log + stale agent detection).
|
||||
pub heartbeat_interval: Duration,
|
||||
/// Session rotation interval (compaction check).
|
||||
pub session_rotation_interval: Duration,
|
||||
/// Memory handoff interval (cross-agent coordination).
|
||||
pub memory_handoff_interval: Duration,
|
||||
/// Agent stall timeout: if an agent subprocess hasn't polled in this
|
||||
/// long, flag it as stalled.
|
||||
pub agent_stall_timeout: Duration,
|
||||
/// Scheduler tick interval.
|
||||
pub scheduler_interval: Duration,
|
||||
}
|
||||
|
||||
|
|
@ -94,52 +76,35 @@ impl Default for DaemonLoopConfig {
|
|||
}
|
||||
}
|
||||
|
||||
/// Run the main daemon background loop.
|
||||
///
|
||||
/// Spawned as a tokio task. Periodically:
|
||||
/// 1. Heartbeat: logs status, detects stalled agents
|
||||
/// 2. Session rotation: checks sessions for compaction/rollover
|
||||
/// 3. Memory handoff: coordinates shared context across active agents
|
||||
///
|
||||
/// Returns when the shutdown signal is received.
|
||||
pub async fn run_loop(
|
||||
state: SharedState,
|
||||
loop_config: DaemonLoopConfig,
|
||||
mut shutdown_rx: broadcast::Receiver<()>,
|
||||
) {
|
||||
// T1.4 PR3b: cache warming is fire-and-forget — runs on first heartbeat tick
|
||||
// to avoid Box<dyn Error> Send issues in the main loop spawn.
|
||||
|
||||
let mut heartbeat_tick = tokio::time::interval(loop_config.heartbeat_interval);
|
||||
let mut rotation_tick = tokio::time::interval(loop_config.session_rotation_interval);
|
||||
let mut handoff_tick = tokio::time::interval(loop_config.memory_handoff_interval);
|
||||
let mut scheduler_tick = tokio::time::interval(loop_config.scheduler_interval);
|
||||
|
||||
// Suppress the initial burst by skipping the first tick
|
||||
heartbeat_tick.tick().await; // consume immediate tick
|
||||
heartbeat_tick.tick().await;
|
||||
rotation_tick.tick().await;
|
||||
handoff_tick.tick().await;
|
||||
scheduler_tick.tick().await;
|
||||
|
||||
info!(
|
||||
heartbeat_secs = loop_config.heartbeat_interval.as_secs(),
|
||||
rotation_secs = loop_config.session_rotation_interval.as_secs(),
|
||||
handoff_secs = loop_config.memory_handoff_interval.as_secs(),
|
||||
scheduler_secs = loop_config.scheduler_interval.as_secs(),
|
||||
"daemon background loop started"
|
||||
);
|
||||
info!("daemon background loop started");
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = heartbeat_tick.tick() => {
|
||||
heartbeat(&state, loop_config.agent_stall_timeout).await;
|
||||
maybe_rewarm_cache(&state);
|
||||
}
|
||||
_ = rotation_tick.tick() => {
|
||||
session_rotation(&state).await;
|
||||
}
|
||||
_ = handoff_tick.tick() => {
|
||||
memory_handoff(&state).await;
|
||||
}
|
||||
_ = scheduler_tick.tick() => {
|
||||
scheduler_tick_fn(&state).await;
|
||||
}
|
||||
_ = rotation_tick.tick() => session_rotation(&state).await,
|
||||
_ = handoff_tick.tick() => memory_handoff(&state).await,
|
||||
_ = scheduler_tick.tick() => scheduler_tick_fn(&state).await,
|
||||
_ = shutdown_rx.recv() => {
|
||||
info!("daemon loop received shutdown signal");
|
||||
break;
|
||||
|
|
@ -151,216 +116,149 @@ pub async fn run_loop(
|
|||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Heartbeat
|
||||
// Cache warming (T1.4 PR3b)
|
||||
// ---------------------------------------------------------------------------
|
||||
/// Trigger a cache warming probe in a background task.
|
||||
/// Fire-and-forget — does not block the caller.
|
||||
pub fn warm_cache(state: &SharedState) {
|
||||
if !state.config.cache_warming_enabled {
|
||||
return;
|
||||
}
|
||||
|
||||
let api_key = match &state.config.deepseek_api_key {
|
||||
Some(k) if !k.is_empty() => k.clone(),
|
||||
_ => {
|
||||
debug!("cache warming skipped: no API key configured");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let probe_cfg = colibri_deepseek::ProbeConfig {
|
||||
endpoint: state.config.deepseek_endpoint.clone(),
|
||||
model: state.config.deepseek_model.clone(),
|
||||
host: state.config.host.clone(),
|
||||
agent: "colibri-cache-warmer".to_string(),
|
||||
api_key: Some(api_key),
|
||||
};
|
||||
|
||||
let state = state.clone();
|
||||
info!("cache warming: running probe...");
|
||||
tokio::spawn(async move {
|
||||
let result = colibri_deepseek::run_cache_probe(&probe_cfg).await;
|
||||
*state.last_warm_at.write().await = Some(Utc::now());
|
||||
*state.last_warm_cache_hit.write().await = result.cache_hit_observed;
|
||||
*state.last_warm_hit_tokens.write().await = result.cache_hit_tokens;
|
||||
if result.cache_hit_observed {
|
||||
info!(
|
||||
hit_tokens = result.cache_hit_tokens,
|
||||
"cache warming: cache HIT"
|
||||
);
|
||||
} else {
|
||||
warn!(status = %result.status, "cache warming: no cache hit observed");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Re-warm if interval has elapsed since last warm.
|
||||
pub fn maybe_rewarm_cache(state: &SharedState) {
|
||||
let interval_hours = state.config.cache_warming_interval_hours;
|
||||
if interval_hours == 0 {
|
||||
return;
|
||||
}
|
||||
let last = *state.last_warm_at.blocking_read();
|
||||
match last {
|
||||
None => warm_cache(state),
|
||||
Some(last_at) => {
|
||||
let elapsed = Utc::now()
|
||||
.signed_duration_since(last_at)
|
||||
.num_hours()
|
||||
.unsigned_abs();
|
||||
if elapsed >= interval_hours {
|
||||
warm_cache(state);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Heartbeat, rotation, handoff, polling
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Log daemon status and detect stalled agent subprocesses.
|
||||
async fn heartbeat(state: &SharedState, _stall_timeout: Duration) {
|
||||
let session_count = state.sessions.len();
|
||||
let agent_count = state.agents.len();
|
||||
|
||||
debug!(
|
||||
sessions = session_count,
|
||||
agents = agent_count,
|
||||
"daemon heartbeat"
|
||||
);
|
||||
|
||||
// Check for stalled agent subprocesses
|
||||
let mut stalled: Vec<String> = Vec::new();
|
||||
for entry in state.agents.iter() {
|
||||
let handle = entry.value();
|
||||
if let Some(status) = handle.poll_exit().await {
|
||||
warn!(
|
||||
agent_id = %handle.id,
|
||||
exit_status = ?status,
|
||||
"agent subprocess exited"
|
||||
);
|
||||
let lifecycle_event = if status.success() {
|
||||
warn!(agent_id = %handle.id, exit_status = ?status, "agent subprocess exited");
|
||||
let event = if status.success() {
|
||||
r#"{"type":"agent_end"}"#
|
||||
} else {
|
||||
r#"{"type":"error"}"#
|
||||
};
|
||||
state.glasspane.write().await.ingest_line_at(
|
||||
&handle.id,
|
||||
lifecycle_event,
|
||||
event,
|
||||
std::time::SystemTime::now(),
|
||||
);
|
||||
|
||||
// If the process exited with an error, mark it
|
||||
if !status.success() {
|
||||
stalled.push(handle.id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up stalled/stopped agents from the registry
|
||||
// (keep them for a grace period so Herdr can query status)
|
||||
if !stalled.is_empty() {
|
||||
info!(stalled_agents = ?stalled, "detected stalled agents");
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Session rotation
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Check and rotate sessions that exceed byte/turn thresholds.
|
||||
/// Uses the current CostMode for thresholds.
|
||||
async fn session_rotation(state: &SharedState) {
|
||||
let mut compacted = 0usize;
|
||||
let mut pruned = 0usize;
|
||||
|
||||
let cost_mode = crate::cost::CostMode::parse(&state.config.cost_mode).unwrap_or_default();
|
||||
let session_max_bytes = cost_mode.session_max_bytes();
|
||||
let max_uncompacted_turns = cost_mode.max_uncompacted_turns();
|
||||
let max_bytes = cost_mode.session_max_bytes();
|
||||
let max_turns = cost_mode.max_uncompacted_turns();
|
||||
let mut compacted = 0usize;
|
||||
|
||||
for entry in state.sessions.iter() {
|
||||
let session = entry.value();
|
||||
|
||||
let (byte_count, turn_count) = {
|
||||
let bc = session.byte_count().await;
|
||||
let tc = session.turn_count().await;
|
||||
(bc, tc)
|
||||
};
|
||||
|
||||
let needs_compaction = byte_count > session_max_bytes || turn_count > max_uncompacted_turns;
|
||||
|
||||
if needs_compaction {
|
||||
debug!(
|
||||
session_id = %session.id,
|
||||
byte_count = byte_count,
|
||||
turn_count = turn_count,
|
||||
session_max_bytes = session_max_bytes,
|
||||
max_uncompacted_turns = max_uncompacted_turns,
|
||||
cost_mode = cost_mode.as_str(),
|
||||
"triggering session compaction"
|
||||
);
|
||||
|
||||
match session.compact_oldest_turns().await {
|
||||
Ok(()) => {
|
||||
compacted += 1;
|
||||
info!(
|
||||
session_id = %session.id,
|
||||
"session compaction completed"
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
session_id = %session.id,
|
||||
error = %e,
|
||||
"session compaction failed"
|
||||
);
|
||||
}
|
||||
let s = entry.value();
|
||||
let (bc, tc) = { (s.byte_count().await, s.turn_count().await) };
|
||||
if bc > max_bytes || tc > max_turns {
|
||||
if s.compact_oldest_turns().await.is_ok() {
|
||||
compacted += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// If byte count is very large even after compaction, prune aggressively
|
||||
let pruned_byte_count = session.byte_count().await;
|
||||
if pruned_byte_count > session_max_bytes * 3 {
|
||||
let keep_turns = (max_uncompacted_turns / 2).max(1);
|
||||
match session.prune_to(keep_turns).await {
|
||||
Ok(()) => {
|
||||
pruned += 1;
|
||||
info!(
|
||||
session_id = %session.id,
|
||||
keep_turns = keep_turns,
|
||||
"session pruned to limit"
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
session_id = %session.id,
|
||||
error = %e,
|
||||
"session prune failed"
|
||||
);
|
||||
}
|
||||
}
|
||||
if s.byte_count().await > max_bytes * 3 {
|
||||
let _ = s.prune_to((max_turns / 2).max(1)).await;
|
||||
}
|
||||
}
|
||||
|
||||
if compacted > 0 || pruned > 0 {
|
||||
info!(
|
||||
compacted = compacted,
|
||||
pruned = pruned,
|
||||
"session rotation complete"
|
||||
);
|
||||
if compacted > 0 {
|
||||
info!(compacted = compacted, "session rotation");
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Memory handoff
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Coordinate shared context across active agents.
|
||||
///
|
||||
/// This is the Rust equivalent of the TypeScript memory-handoff logic that
|
||||
/// enables agents to share context/state without bloating individual sessions.
|
||||
/// Currently a stub that logs state; full implementation requires an LLM
|
||||
/// compaction pass to produce shared summaries.
|
||||
async fn memory_handoff(state: &SharedState) {
|
||||
let agent_count = state.agents.len();
|
||||
let session_count = state.sessions.len();
|
||||
|
||||
if agent_count == 0 {
|
||||
debug!("memory handoff skipped: no active agents");
|
||||
if state.agents.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
debug!(
|
||||
agents = agent_count,
|
||||
sessions = session_count,
|
||||
"memory handoff tick"
|
||||
agents = state.agents.len(),
|
||||
sessions = state.sessions.len(),
|
||||
"memory handoff"
|
||||
);
|
||||
|
||||
// Future: produce a shared context snippet that all agents can reference,
|
||||
// replacing the need for each agent to carry full context independently.
|
||||
// This will call into the LLM (via colibri-deepseek) for summarization,
|
||||
// then inject the summary into active sessions as a Compaction entry.
|
||||
|
||||
// Example placeholder for the real handoff:
|
||||
// for entry in state.sessions.iter() {
|
||||
// let session = entry.value();
|
||||
// let turns = session.turns().await;
|
||||
// if turns.len() > 10 {
|
||||
// // Summarize and inject shared context
|
||||
// debug!(session_id = %session.id, turns = turns.len(), "handoff candidate");
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Task polling
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Poll for new tasks from the control plane.
|
||||
///
|
||||
/// This is the daemon's entry point for external task dispatch — e.g. a
|
||||
/// control-plane operator (Herdr, web dashboard, cron) pushes a new agent run
|
||||
/// request. The daemon picks it up and routes it through the spawner.
|
||||
///
|
||||
/// Currently a stub; will be wired to a task queue (filesystem, Redis, or
|
||||
/// database-backed) once the control-plane API is finalized.
|
||||
pub async fn poll_tasks(state: &SharedState) {
|
||||
debug!("task polling tick");
|
||||
|
||||
// Placeholder: check a well-known directory or queue for pending task
|
||||
// manifests, then call Spawner::spawn() for each.
|
||||
let _spawner = Spawner::new(state.config.clone().into());
|
||||
|
||||
// Example:
|
||||
// let tasks_dir = state.config.data_dir.join("tasks");
|
||||
// if tasks_dir.exists() {
|
||||
// let mut entries = tokio::fs::read_dir(&tasks_dir).await?;
|
||||
// while let Some(entry) = entries.next_entry().await? {
|
||||
// // parse task, spawn agent
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
/// Run one scheduler tick — fire due jobs and process intake queue.
|
||||
async fn scheduler_tick_fn(state: &SharedState) {
|
||||
let mut scheduler = state.scheduler.lock().await;
|
||||
scheduler.tick(state).await;
|
||||
state.scheduler.lock().await.tick(state).await;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
@ -369,30 +267,22 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_daemon_loop_config_defaults() {
|
||||
let config = DaemonLoopConfig::default();
|
||||
assert_eq!(config.heartbeat_interval, Duration::from_secs(30));
|
||||
assert_eq!(config.session_rotation_interval, Duration::from_secs(60));
|
||||
assert_eq!(config.memory_handoff_interval, Duration::from_secs(120));
|
||||
assert_eq!(config.agent_stall_timeout, Duration::from_secs(300));
|
||||
let c = DaemonLoopConfig::default();
|
||||
assert_eq!(c.heartbeat_interval, Duration::from_secs(30));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_daemon_state_creation() {
|
||||
let data_dir = std::env::temp_dir().join(format!(
|
||||
"colibri-daemon-state-test-{}",
|
||||
uuid::Uuid::new_v4()
|
||||
));
|
||||
let data_dir =
|
||||
std::env::temp_dir().join(format!("colibri-daemon-state-{}", uuid::Uuid::new_v4()));
|
||||
let mut config = DaemonConfig::from_env();
|
||||
config.data_dir = data_dir.clone();
|
||||
config.socket_path = data_dir.join("colibri.sock");
|
||||
config.db_path = data_dir.join("colibri.sqlite");
|
||||
|
||||
let state = DaemonState::new(config);
|
||||
|
||||
assert_eq!(state.sessions.len(), 0);
|
||||
assert_eq!(state.agents.len(), 0);
|
||||
assert_eq!(state.shutdown_tx.receiver_count(), 1);
|
||||
|
||||
assert!(state.last_warm_at.blocking_read().is_none());
|
||||
let _ = std::fs::remove_dir_all(data_dir);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -361,6 +361,8 @@ mod tests {
|
|||
max_uncompacted_turns: 20,
|
||||
cost_mode: "smart".to_string(),
|
||||
scheduler_prompt_injection: true,
|
||||
cache_warming_enabled: false,
|
||||
cache_warming_interval_hours: 0,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -237,6 +237,9 @@ async fn cmd_status(state: &SharedState) -> HerdrResponse {
|
|||
};
|
||||
|
||||
let cost_mode = crate::cost::CostMode::parse(&state.config.cost_mode).unwrap_or_default();
|
||||
let last_warm_at = state.last_warm_at.read().await.map(|t| t.to_rfc3339());
|
||||
let last_warm_hit = *state.last_warm_cache_hit.read().await;
|
||||
let last_warm_tokens = *state.last_warm_hit_tokens.read().await;
|
||||
|
||||
HerdrResponse::ok(serde_json::json!({
|
||||
"daemon": "colibri-daemon",
|
||||
|
|
@ -260,6 +263,13 @@ async fn cmd_status(state: &SharedState) -> HerdrResponse {
|
|||
"scheduler": {
|
||||
"interval_secs": 30,
|
||||
},
|
||||
"cache_warming": {
|
||||
"enabled": state.config.cache_warming_enabled,
|
||||
"interval_hours": state.config.cache_warming_interval_hours,
|
||||
"last_warm_at": last_warm_at,
|
||||
"last_warm_cache_hit": last_warm_hit,
|
||||
"last_warm_hit_tokens": last_warm_tokens,
|
||||
},
|
||||
}))
|
||||
}
|
||||
|
||||
|
|
@ -652,6 +662,8 @@ mod tests {
|
|||
max_uncompacted_turns: 20,
|
||||
cost_mode: "smart".to_string(),
|
||||
scheduler_prompt_injection: true,
|
||||
cache_warming_enabled: false,
|
||||
cache_warming_interval_hours: 0,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -111,7 +111,7 @@ async fn one_call(
|
|||
client: &reqwest::Client,
|
||||
cfg: &ProbeConfig,
|
||||
key: &str,
|
||||
) -> Result<(Option<String>, ProviderUsage), Box<dyn std::error::Error>> {
|
||||
) -> Result<(Option<String>, ProviderUsage), Box<dyn std::error::Error + Send + Sync>> {
|
||||
// Inflate the immutable prefix past DeepSeek's cacheable threshold (short
|
||||
// prompts don't engage prefix caching). Byte-stable across both calls.
|
||||
let repeat: usize = std::env::var("COLIBRI_PROBE_PREFIX_REPEAT")
|
||||
|
|
|
|||
133
docs/T1.4-CACHE-WARMING-DESIGN.md
Normal file
133
docs/T1.4-CACHE-WARMING-DESIGN.md
Normal file
|
|
@ -0,0 +1,133 @@
|
|||
# T1.4 PR3b — Cache Warming Design
|
||||
|
||||
## Goal
|
||||
|
||||
Pre-warm the DeepSeek prefix cache on daemon startup so the first real
|
||||
agent task benefits from cached tokens. Reduces latency and cost on
|
||||
cold starts.
|
||||
|
||||
## Design decisions
|
||||
|
||||
### 1. Config-gated, disabled by default
|
||||
|
||||
```env
|
||||
COLIBRI_CACHE_WARMING=0|1 # enable startup warming (default: 0)
|
||||
COLIBRI_CACHE_WARMING_INTERVAL_HOURS=N # re-warm every N hours (default: 0 = disabled)
|
||||
```
|
||||
|
||||
Rationale: cache warming consumes tokens (~3,500 per warm cycle). Until
|
||||
proven beneficial on the ISO target, it stays opt-in.
|
||||
|
||||
### 2. Reuses existing cache probe
|
||||
|
||||
The `colibri-deepseek` crate already has `run_cache_probe()` which:
|
||||
1. Sends a warm request with byte-stable STABLE_SYSTEM_PREFIX
|
||||
2. Waits 2s for cache commit
|
||||
3. Sends an identical probe request
|
||||
4. Returns cache_hit_tokens and cache_hit_observed
|
||||
|
||||
We reuse this directly — no new API calls, no new types.
|
||||
|
||||
### 3. Lifecycle
|
||||
|
||||
```
|
||||
Daemon start
|
||||
→ if COLIBRI_CACHE_WARMING=1 AND DEEPSEEK_API_KEY is set
|
||||
→ run_cache_probe()
|
||||
→ log result (hit/miss, tokens)
|
||||
→ record last_warm_at in DaemonState
|
||||
|
||||
Daemon loop (every heartbeat tick, 30s)
|
||||
→ if COLIBRI_CACHE_WARMING_INTERVAL_HOURS > 0
|
||||
→ if now - last_warm_at > interval
|
||||
→ run_cache_probe()
|
||||
→ update last_warm_at
|
||||
|
||||
Daemon shutdown
|
||||
→ no cleanup needed (cache lives on provider side)
|
||||
```
|
||||
|
||||
### 4. Observability
|
||||
|
||||
Cache warming results are:
|
||||
- Logged at INFO level on each warm cycle
|
||||
- Exposed in daemon status response: `cache_warming_enabled`, `last_warm_at`,
|
||||
`last_warm_cache_hit`, `last_warm_hit_tokens`
|
||||
- NOT mixed into per-session CacheMetrics (that's for agent API calls)
|
||||
|
||||
### 5. Safety
|
||||
|
||||
- If DeepSeek key is missing or empty, warming is silently skipped
|
||||
- If the probe fails (network, rate limit, balance), it logs a warning
|
||||
and does not retry — the daemon continues normally
|
||||
- Warming failures do not affect daemon startup or agent spawning
|
||||
- On FreeBSD, the key may be in a separate env file — the daemon
|
||||
reads it from the process environment at startup
|
||||
|
||||
### 6. FreeBSD considerations
|
||||
|
||||
- Same code path as Linux — no platform-specific logic
|
||||
- The rc.d service already exports DEEPSEEK_API_KEY via provider.env
|
||||
- OSA validation: check that warming runs on daemon start, produces
|
||||
cache_hit_observed=true, and metrics appear in status
|
||||
|
||||
## Implementation plan
|
||||
|
||||
### Files touched
|
||||
|
||||
| File | Change |
|
||||
|------|--------|
|
||||
| `colibri-daemon/src/config.rs` | Add `cache_warming_enabled`, `cache_warming_interval_hours` |
|
||||
| `colibri-daemon/src/daemon.rs` | Add `last_warm_at`, `last_warm_result` to DaemonState; call warm on startup; periodic re-warm in loop |
|
||||
| `colibri-daemon/src/socket.rs` | Expose cache warming fields in cmd_status |
|
||||
| `colibri-deepseek/src/lib.rs` | No changes — reuse existing run_cache_probe() |
|
||||
|
||||
### No changes to
|
||||
|
||||
- `colibri-contracts` — ProviderSmokeResult already has all needed fields
|
||||
- `colibri-store` — no SQLite schema changes (warming is runtime-only)
|
||||
- `colibri-skills`, `colibri-glasspane` — unrelated
|
||||
|
||||
## Test plan
|
||||
|
||||
### Unit tests (Linux, on debby)
|
||||
|
||||
1. `cache_warming_skipped_when_disabled` — config off, no probe runs
|
||||
2. `cache_warming_skipped_when_no_key` — config on but no API key
|
||||
3. `cache_warming_records_last_warm_at` — mock probe result stored
|
||||
4. `cache_warming_periodic_rewarm` — interval triggers re-warm
|
||||
5. `cache_warming_status_includes_metrics` — status response has fields
|
||||
|
||||
### FreeBSD validation (OSA, after merge)
|
||||
|
||||
```sh
|
||||
# Enable warming
|
||||
export COLIBRI_CACHE_WARMING=1
|
||||
export DEEPSEEK_API_KEY=<key>
|
||||
|
||||
# Start daemon, check log
|
||||
cargo run --bin colibri-daemon 2>&1 | grep -i "cache.*warm"
|
||||
|
||||
# Check status via socket
|
||||
echo '{"cmd":"status"}' | nc -U /var/run/colibri/colibri.sock
|
||||
# Verify: cache_warming_enabled=true, last_warm_at is set,
|
||||
# last_warm_cache_hit=true, last_warm_hit_tokens > 0
|
||||
```
|
||||
|
||||
### Acceptance criteria
|
||||
|
||||
- [ ] Warming runs exactly once on startup when enabled + key present
|
||||
- [ ] Warming is skipped when disabled or key missing
|
||||
- [ ] Periodic re-warming fires after interval (if configured)
|
||||
- [ ] Cache hit observed on OSA FreeBSD (real DeepSeek API call)
|
||||
- [ ] Status response includes warming metrics
|
||||
- [ ] Daemon starts normally even if warming fails
|
||||
- [ ] No workspace test regressions
|
||||
|
||||
## Rollout
|
||||
|
||||
1. Hermes implements + tests on Linux (debby), opens PR
|
||||
2. Hermes merges via API after Linux validation
|
||||
3. Codex validates on OSA FreeBSD with real DeepSeek key
|
||||
4. ISO image includes `COLIBRI_CACHE_WARMING=0` default in rc.conf.sample
|
||||
5. Operator enables on FreeBSD after verifying cache hit on their account
|
||||
Loading…
Add table
Reference in a new issue