feat: add PromptAssembly + CacheMetrics structs (T1.4 PR 1) #1
3 changed files with 382 additions and 0 deletions
|
|
@ -82,6 +82,59 @@ impl Turn {
|
|||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// T1.4 — PromptAssembly + CacheMetrics (PR 1: structural, no behavior change)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// The 3-region prompt assembly produced by a session.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PromptAssembly {
|
||||
pub immutable_prefix: String,
|
||||
pub appendable_log: Vec<serde_json::Value>,
|
||||
pub volatile_scratch: Vec<serde_json::Value>,
|
||||
pub total_bytes: u64,
|
||||
pub estimated_tokens: u64,
|
||||
}
|
||||
|
||||
impl PromptAssembly {
|
||||
pub fn to_messages(&self) -> Vec<serde_json::Value> {
|
||||
let mut messages =
|
||||
Vec::with_capacity(1 + self.appendable_log.len() + self.volatile_scratch.len());
|
||||
messages.push(serde_json::json!({
|
||||
"role": "system",
|
||||
"content": &self.immutable_prefix,
|
||||
}));
|
||||
messages.extend(self.appendable_log.clone());
|
||||
messages.extend(self.volatile_scratch.clone());
|
||||
messages
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct CacheMetrics {
|
||||
pub warm_call_tokens: u64,
|
||||
pub probe_call_tokens: u64,
|
||||
pub cache_hit_tokens: u64,
|
||||
pub cache_miss_tokens: u64,
|
||||
pub last_probe_at: Option<String>,
|
||||
}
|
||||
|
||||
impl CacheMetrics {
|
||||
pub fn hit_rate(&self) -> f64 {
|
||||
let total = self.cache_hit_tokens + self.cache_miss_tokens;
|
||||
if total == 0 {
|
||||
0.0
|
||||
} else {
|
||||
self.cache_hit_tokens as f64 / total as f64
|
||||
}
|
||||
}
|
||||
|
||||
pub fn record(&mut self, hit: u64, miss: u64) {
|
||||
self.cache_hit_tokens += hit;
|
||||
self.cache_miss_tokens += miss;
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Session
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -362,6 +415,34 @@ impl Session {
|
|||
// 3-region prompt assembly (for DeepSeek cache discipline)
|
||||
// ------------------------------------------------------------------
|
||||
|
||||
/// Build a PromptAssembly from the current session state.
|
||||
/// Wraps the existing `build_prompt_messages()` — no behavior change.
|
||||
pub async fn build_prompt_assembly(&self) -> PromptAssembly {
|
||||
let messages = self.build_prompt_messages().await;
|
||||
let total_bytes: u64 = messages
|
||||
.iter()
|
||||
.map(|m| serde_json::to_string(m).unwrap_or_default().len() as u64)
|
||||
.sum();
|
||||
let estimated_tokens = total_bytes.div_ceil(3);
|
||||
|
||||
let immutable_prefix = messages
|
||||
.first()
|
||||
.and_then(|m| m.get("content"))
|
||||
.and_then(|c| c.as_str())
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
|
||||
let appendable_log = messages[1..].to_vec();
|
||||
|
||||
PromptAssembly {
|
||||
immutable_prefix,
|
||||
appendable_log,
|
||||
volatile_scratch: Vec::new(),
|
||||
total_bytes,
|
||||
estimated_tokens,
|
||||
}
|
||||
}
|
||||
|
||||
/// Build the 3-region prompt:
|
||||
/// 1. Immutable system prefix (byte-stable for cache hits)
|
||||
/// 2. Appendable conversation log (turns, possibly with compaction gaps)
|
||||
|
|
@ -427,4 +508,63 @@ impl Session {
|
|||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// T1.4 golden tests — PromptAssembly + CacheMetrics
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[cfg(test)]
|
||||
mod t14_tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn prompt_assembly_to_messages_includes_all_regions() {
|
||||
let assembly = PromptAssembly {
|
||||
immutable_prefix: "SYSTEM".into(),
|
||||
appendable_log: vec![
|
||||
serde_json::json!({"role": "user", "content": "hello"}),
|
||||
serde_json::json!({"role": "assistant", "content": "hi"}),
|
||||
],
|
||||
volatile_scratch: vec![serde_json::json!({"role": "user", "content": "v"})],
|
||||
total_bytes: 0,
|
||||
estimated_tokens: 0,
|
||||
};
|
||||
let messages = assembly.to_messages();
|
||||
assert_eq!(messages.len(), 4);
|
||||
assert_eq!(messages[0]["role"], "system");
|
||||
assert_eq!(messages[3]["content"], "v");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prompt_assembly_empty_volatile_is_fine() {
|
||||
let assembly = PromptAssembly {
|
||||
immutable_prefix: "PREFIX".into(),
|
||||
appendable_log: vec![],
|
||||
volatile_scratch: vec![],
|
||||
total_bytes: 0,
|
||||
estimated_tokens: 0,
|
||||
};
|
||||
assert_eq!(assembly.to_messages().len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cache_metrics_hit_rate_zero_when_empty() {
|
||||
assert_eq!(CacheMetrics::default().hit_rate(), 0.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cache_metrics_hit_rate_calculation() {
|
||||
let mut m = CacheMetrics::default();
|
||||
m.record(300, 700);
|
||||
assert!((m.hit_rate() - 0.3).abs() < 0.001);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cache_metrics_record_accumulates() {
|
||||
let mut m = CacheMetrics::default();
|
||||
m.record(100, 400);
|
||||
m.record(200, 300);
|
||||
assert!((m.hit_rate() - 0.3).abs() < 0.001);
|
||||
}
|
||||
}
|
||||
|
||||
// Tests deferred to crate-level integration tests (tests/ directory).
|
||||
|
|
|
|||
|
|
@ -115,6 +115,7 @@ takeover.
|
|||
Pi engine integration via the events parser; Telegram-intake compatibility.
|
||||
- **T1.4 Phase 5 — cache-first prompt discipline.** Deterministic 3-region
|
||||
prompt assembler; cost modes (fast/smart/max); visible escalation.
|
||||
[Implementation plan →](T1.4-PROMPT-DISCIPLINE-PLAN.md)
|
||||
- **T1.5 Phase 6 — gated cutover.** Replace TS control-plane paths only after
|
||||
proof gates pass; **separate deprecation PR per path**.
|
||||
- **FreeBSD validation (each step):** push → Codex on osa runs `cargo test` on
|
||||
|
|
|
|||
241
docs/T1.4-PROMPT-DISCIPLINE-PLAN.md
Normal file
241
docs/T1.4-PROMPT-DISCIPLINE-PLAN.md
Normal file
|
|
@ -0,0 +1,241 @@
|
|||
# T1.4 — Cache-First Prompt Discipline Plan
|
||||
|
||||
## Status: code present, integration incomplete
|
||||
|
||||
The building blocks exist across 4 crates/files. T1.4 connects them
|
||||
into a deterministic, measurable, cost-aware pipeline.
|
||||
|
||||
## Files touched
|
||||
|
||||
| File | Current state | T1.4 change |
|
||||
|------|--------------|-------------|
|
||||
| `colibri-deepseek/src/lib.rs` | Cache probe + `STABLE_SYSTEM_PREFIX` | Add `CacheMetrics` struct, per-session prefix assembly |
|
||||
| `colibri-daemon/src/cost.rs` | `CostMode`, thresholds, escalation | Add `prompt_budget()` per mode, auto-escalation trigger |
|
||||
| `colibri-daemon/src/session.rs` | 3-region model, `build_prompt_messages()` | Deterministic assembly, cache-hit tracking, cost-aware trim |
|
||||
| `colibri-daemon/src/config.rs` | `DaemonConfig` with cost fields | Add `cache_warm_on_start`, `cost_escalation_auto` |
|
||||
| `colibri-daemon/src/socket.rs` | `set-cost-mode`, status reporting | Add cache-metrics to status, per-request budget header |
|
||||
| `colibri-daemon/src/spawner.rs` | Agent spawn with session_id | Inject cost-aware prompt assembly before spawn |
|
||||
|
||||
## Current architecture (what's already built)
|
||||
|
||||
```
|
||||
cost.rs: CostMode { Fast, Smart, Max }
|
||||
├─ session_max_bytes() 500K / 2M / 8M
|
||||
├─ max_uncompacted_turns() 5 / 20 / 100
|
||||
├─ compact_tool_results() true / true / false
|
||||
├─ tool_result_max_bytes() 4K / 16K / unlimited
|
||||
└─ escalate() Fast→Smart→Max
|
||||
|
||||
session.rs: Session
|
||||
├─ turns: VecDeque<Turn> (append-only JSONL)
|
||||
├─ build_prompt_messages() → Vec<Value>
|
||||
│ ├─ Region 1: STABLE_SYSTEM_PREFIX (byte-stable)
|
||||
│ ├─ Region 2: conversation log (turns)
|
||||
│ └─ Region 3: volatile scratch (left empty)
|
||||
├─ compact_oldest_turns() — summarise + compaction marker
|
||||
└─ maybe_compact_or_rollover() — auto-triggered on append
|
||||
|
||||
colibri-deepseek: cache probe
|
||||
├─ STABLE_SYSTEM_PREFIX — deliberately fixed byte-for-byte
|
||||
├─ run_cache_probe() — warm + probe + cache_hit_observed
|
||||
└─ WireUsage — prompt_cache_hit_tokens, prompt_cache_miss_tokens
|
||||
```
|
||||
|
||||
## What T1.4 adds
|
||||
|
||||
### 1. Deterministic 3-region assembler
|
||||
|
||||
`build_prompt_messages()` currently returns `Vec<Value>`. T1.4 wraps this
|
||||
in a `PromptAssembly` struct with explicit byte-stability guarantees:
|
||||
|
||||
```rust
|
||||
pub struct PromptAssembly {
|
||||
pub immutable_prefix: String, // Region 1 — byte-identical every request
|
||||
pub appendable_log: Vec<Turn>, // Region 2 — grows monotonically
|
||||
pub volatile_scratch: Vec<Value>, // Region 3 — discarded per-turn
|
||||
pub total_bytes: u64,
|
||||
pub estimated_tokens: u64,
|
||||
pub cost_mode: CostMode,
|
||||
}
|
||||
|
||||
impl PromptAssembly {
|
||||
/// Build the full message list for an API call.
|
||||
/// The caller appends volatile scratch before sending.
|
||||
pub fn to_messages(&self) -> Vec<Value>;
|
||||
|
||||
/// Check whether the prefix can still cache-hit given current byte count.
|
||||
/// If the prefix has grown (e.g. new session metadata), cache may miss.
|
||||
pub fn prefix_is_cacheable(&self) -> bool;
|
||||
|
||||
/// Trim the appendable log to fit within cost_mode budget.
|
||||
/// Oldest turns are compacted/summarised first.
|
||||
pub fn trim_to_budget(&mut self, mode: CostMode);
|
||||
}
|
||||
```
|
||||
|
||||
### 2. Cache-hit metering
|
||||
|
||||
Add `CacheMetrics` to track per-session cache behavior:
|
||||
|
||||
```rust
|
||||
pub struct CacheMetrics {
|
||||
pub warm_call_tokens: u64,
|
||||
pub probe_call_tokens: u64,
|
||||
pub cache_hit_tokens: u64,
|
||||
pub cache_miss_tokens: u64,
|
||||
pub cache_hit_rate: f64, // hit / (hit + miss)
|
||||
pub last_probe_at: Option<String>,
|
||||
}
|
||||
```
|
||||
|
||||
Integrate into:
|
||||
- `Session.build_prompt_messages()` — record prefix byte count before send
|
||||
- Daemon status response — include `cache_metrics` per session
|
||||
- Scheduler — skip cache warming if last probe was recent
|
||||
|
||||
### 3. Cost-aware prompt trimming
|
||||
|
||||
Instead of only compacting by raw byte count, trim by cost mode budget:
|
||||
|
||||
| Mode | Max prompt bytes | Max turns | Tool result policy |
|
||||
|------|-----------------|-----------|-------------------|
|
||||
| Fast | 500K (~12K tokens) | 5 | Truncate >4K |
|
||||
| Smart | 2M (~50K tokens) | 20 | Truncate >16K |
|
||||
| Max | 8M (~200K tokens) | 100 | Preserve all |
|
||||
|
||||
```rust
|
||||
pub fn trim_prompt_for_mode(
|
||||
assembly: &mut PromptAssembly,
|
||||
mode: CostMode,
|
||||
) -> TrimResult {
|
||||
// 1. Calculate current size
|
||||
// 2. If under budget: return Ok
|
||||
// 3. If over: compact oldest turns first
|
||||
// 4. If still over: escalate cost mode (visible log)
|
||||
// 5. If already Max and still over: truncate volatile, warn
|
||||
}
|
||||
```
|
||||
|
||||
### 4. Auto-escalation with visible logging
|
||||
|
||||
Current `escalate()` is manual. T1.4 triggers it automatically when:
|
||||
|
||||
```rust
|
||||
pub enum EscalationTrigger {
|
||||
/// Session byte count exceeded current mode budget
|
||||
BudgetExceeded { current_bytes: u64, budget_bytes: u64 },
|
||||
/// Cache miss rate above threshold (prefix changed?)
|
||||
CacheMissRate { rate: f64, threshold: f64 },
|
||||
/// Compaction didn't free enough space
|
||||
CompactionInsufficient { freed_bytes: u64, needed_bytes: u64 },
|
||||
}
|
||||
```
|
||||
|
||||
When triggered:
|
||||
1. Log visible escalation event
|
||||
2. Update session's active cost mode
|
||||
3. Re-trim prompt to new budget
|
||||
4. Record in session JSONL as `{"type":"escalation","from":"smart","to":"max","reason":"..."}`
|
||||
|
||||
### 5. Scheduler handoff prompts
|
||||
|
||||
When the scheduler spawns an agent via `cmd_spawn_agent`, inject cost-aware
|
||||
prompt assembly:
|
||||
|
||||
```rust
|
||||
// Before spawn:
|
||||
let session = state.sessions.get(&session_id)?;
|
||||
let assembly = session.build_prompt_assembly().await;
|
||||
let trimmed = assembly.trim_to_budget(cost_mode);
|
||||
|
||||
// Inject into spawn command:
|
||||
spawn_agent(
|
||||
provider,
|
||||
model,
|
||||
session_id,
|
||||
system_prompt, // from Region 1
|
||||
conversation_context, // from Region 2 (trimmed)
|
||||
cost_mode, // passed to agent
|
||||
);
|
||||
```
|
||||
|
||||
### 6. Daemon startup cache warming
|
||||
|
||||
On daemon start, if `cache_warm_on_start` is true and a DeepSeek key is set:
|
||||
|
||||
```rust
|
||||
pub async fn warm_cache_on_startup(config: &DaemonConfig) {
|
||||
if let Some(key) = &config.deepseek_api_key {
|
||||
let probe_cfg = ProbeConfig::from_env();
|
||||
let result = run_cache_probe(&probe_cfg).await;
|
||||
info!(
|
||||
cache_hit = result.cache_hit_observed,
|
||||
hit_tokens = result.cache_hit_tokens,
|
||||
"startup cache probe complete"
|
||||
);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Implementation order
|
||||
|
||||
### PR 1: PromptAssembly + CacheMetrics (no behavior change)
|
||||
|
||||
Files: `session.rs`, `colibri-deepseek/src/lib.rs`
|
||||
- Add `PromptAssembly` struct
|
||||
- Add `CacheMetrics` struct
|
||||
- Add `PromptAssembly::to_messages()` (wraps existing `build_prompt_messages`)
|
||||
- Add `PromptAssembly::trim_to_budget()` (stub — no actual trimming yet)
|
||||
- Tests: verify byte-stable prefix is identical across two assemblies
|
||||
- Tests: verify trim_to_budget preserves ordering
|
||||
- No change to daemon behavior — `build_prompt_messages()` still works
|
||||
|
||||
### PR 2: Cost-aware trimming + auto-escalation
|
||||
|
||||
Files: `cost.rs`, `session.rs`, `socket.rs`
|
||||
- Implement `trim_to_budget()` with real compaction
|
||||
- Add `EscalationTrigger` enum
|
||||
- Auto-escalate on budget exceeded
|
||||
- Log escalation events
|
||||
- Add `set-cost-mode auto` to socket API
|
||||
- Tests: trim leaves session under budget
|
||||
- Tests: escalation chain Fast→Smart→Max
|
||||
|
||||
### PR 3: Scheduler injection + cache warming
|
||||
|
||||
Files: `socket.rs`, `spawner.rs`, `daemon.rs`, `config.rs`
|
||||
- Inject cost-aware assembly into spawn-agent
|
||||
- Cache warming on daemon startup
|
||||
- Cache metrics in status response
|
||||
- `cache_warm_on_start` config flag
|
||||
- Tests: scheduler handoff includes trimmed prompt
|
||||
- Tests: startup probe runs when key is set
|
||||
|
||||
## Verification
|
||||
|
||||
After all 3 PRs:
|
||||
|
||||
```sh
|
||||
# 1. Start daemon in fast mode
|
||||
COLIBRI_COST_MODE=fast cargo run --bin colibri-daemon
|
||||
|
||||
# 2. Verify cache warming
|
||||
# Log should show: "startup cache probe complete" with hit/miss
|
||||
|
||||
# 3. Spawn agent and verify prompt assembly
|
||||
echo '{"cmd":"spawn-agent","provider":"deepseek","model":"deepseek-chat","system_prompt":"..."}' | nc -U /tmp/colibri-daemon.sock
|
||||
|
||||
# 4. Verify cost mode escalation
|
||||
echo '{"cmd":"set-cost-mode","mode":"smart"}' | nc -U /tmp/colibri-daemon.sock
|
||||
|
||||
# 5. Check status includes cache metrics
|
||||
echo '{"cmd":"status"}' | nc -U /tmp/colibri-daemon.sock
|
||||
```
|
||||
|
||||
## What NOT to change
|
||||
|
||||
- `PiJsonlIngestor` in glasspane — unrelated
|
||||
- `colibri-skills` crate — unrelated (parked on feature branch)
|
||||
- `zot-runtime-event-adapter` — unrelated (parked on feature branch)
|
||||
- `colibri-contracts` — ProviderSmokeResult/RunManifest types are stable
|
||||
- `colibri-store` — SQLite schema unchanged
|
||||
Loading…
Add table
Reference in a new issue