feat: wire cost mode enforcement + poll_tasks spawn path (Sam & Hermes)
Priority 3 — Cost mode enforcement: - Removed session_max_bytes/max_uncompacted_turns from DaemonConfig; cost mode string is now the single source of truth for all thresholds - maybe_compact_or_rollover() derives thresholds from CostMode, not static config fields - compact_oldest_turns() takes a keep parameter (derived from cost mode) - compact_tool_result() wired into build_prompt_messages() — tool results are truncated when cost mode says to compact - trim_to_budget() called in build_prompt_assembly() - auto_escalate() wired into session_rotation() — escalates cost mode when compaction is insufficient - set-cost-mode socket command now updates runtime cost_mode (RwLock on DaemonState) instead of just acknowledging Priority 2 — Pi spawn path end-to-end: - poll_tasks() now queries claimed tasks, spawns the configured agent binary (COLIBRI_AGENT_BINARY), creates a session, wires stdout to glasspane, and transitions the task to Started - stream_agent_stdout_to_glasspane made pub for cross-module access - poll_tasks called from scheduler_tick_fn after the scheduler runs - New integration test: poll_tasks_spawns_agent_for_claimed_task validates the full path: create task → claim → poll_tasks spawns → glasspane observes Idle → Working → Blocked → Done lifecycle Gates: fmt/clippy/test all green (207 tests, 0 failures).
This commit is contained in:
parent
7e7915c829
commit
9d443a498c
9 changed files with 312 additions and 71 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -303,6 +303,7 @@ version = "0.0.1"
|
|||
dependencies = [
|
||||
"colibri-daemon",
|
||||
"colibri-glasspane",
|
||||
"colibri-store",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror 2.0.18",
|
||||
|
|
|
|||
|
|
@ -24,3 +24,4 @@ tokio = { version = "1", features = ["io-util", "macros", "net", "rt-multi-threa
|
|||
[dev-dependencies]
|
||||
tokio = { version = "1", features = ["fs", "io-util", "macros", "net", "rt-multi-thread", "time"] }
|
||||
uuid = { version = "1", features = ["v4"] }
|
||||
colibri-store = { path = "../colibri-store" }
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ use std::{
|
|||
};
|
||||
|
||||
use colibri_client::DaemonClient;
|
||||
use colibri_daemon::daemon::poll_tasks;
|
||||
use colibri_daemon::{socket, DaemonConfig, DaemonState, SharedState};
|
||||
use colibri_glasspane::AgentState;
|
||||
use uuid::Uuid;
|
||||
|
|
@ -15,7 +16,6 @@ fn smoke_config() -> DaemonConfig {
|
|||
socket_path: data_dir.join("colibri.sock"),
|
||||
data_dir: data_dir.clone(),
|
||||
db_path: data_dir.join("colibri.sqlite"),
|
||||
session_max_bytes: 2_000_000,
|
||||
deepseek_api_key: None,
|
||||
deepseek_endpoint: "https://api.deepseek.com/chat/completions".to_string(),
|
||||
deepseek_model: "deepseek-chat".to_string(),
|
||||
|
|
@ -25,7 +25,6 @@ fn smoke_config() -> DaemonConfig {
|
|||
anthropic_endpoint: "https://api.anthropic.com/v1/messages".to_string(),
|
||||
host: "live-smoke-host".to_string(),
|
||||
max_context_tokens: 128_000,
|
||||
max_uncompacted_turns: 20,
|
||||
cost_mode: "smart".to_string(),
|
||||
scheduler_prompt_injection: false,
|
||||
cache_warming_enabled: false,
|
||||
|
|
@ -200,6 +199,110 @@ async fn colibri_cli_task_commands_use_socket_api() {
|
|||
let _ = tokio::fs::remove_dir_all(config.data_dir).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn poll_tasks_spawns_agent_for_claimed_task() {
|
||||
let mut config = smoke_config();
|
||||
let fake_agent = env!("CARGO_BIN_EXE_colibri-smoke-agent");
|
||||
// Set COLIBRI_AGENT_BINARY so poll_tasks uses the smoke agent binary
|
||||
config.data_dir =
|
||||
std::env::temp_dir().join(format!("colibri-poll-tasks-test-{}", Uuid::new_v4()));
|
||||
tokio::fs::create_dir_all(&config.data_dir).await.unwrap();
|
||||
std::env::set_var("COLIBRI_AGENT_BINARY", fake_agent);
|
||||
|
||||
let state: SharedState = Arc::new(DaemonState::new(config.clone()));
|
||||
let shutdown = state.shutdown_rx.resubscribe();
|
||||
let server_state = state.clone();
|
||||
let server = tokio::spawn(async move {
|
||||
socket::serve(server_state, shutdown).await;
|
||||
});
|
||||
|
||||
let client = DaemonClient::new(config.socket_path.clone());
|
||||
wait_for_socket(&client).await;
|
||||
|
||||
// Register an agent
|
||||
{
|
||||
let store = state.store.lock().unwrap();
|
||||
store
|
||||
.register_agent("test-agent", serde_json::json!([]))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Create and claim a task directly in the store
|
||||
let task = {
|
||||
let store = state.store.lock().unwrap();
|
||||
let task = store.create_task("poll-tasks-smoke", None).unwrap();
|
||||
let agent = store.list_agents().unwrap();
|
||||
let agent_id = &agent[0].id;
|
||||
store.claim_task(&task.id, agent_id).unwrap();
|
||||
store
|
||||
.get_task(&task.id)
|
||||
.unwrap()
|
||||
.expect("task should exist")
|
||||
};
|
||||
assert_eq!(
|
||||
task.status,
|
||||
colibri_store::TaskStatus::Claimed,
|
||||
"task should be claimed before poll_tasks"
|
||||
);
|
||||
|
||||
// Call poll_tasks — should spawn the smoke agent for this claimed task
|
||||
poll_tasks(&state).await;
|
||||
|
||||
// Verify an agent handle was created
|
||||
let deadline = Instant::now() + Duration::from_secs(10);
|
||||
loop {
|
||||
if !state.agents.is_empty() {
|
||||
break;
|
||||
}
|
||||
assert!(Instant::now() < deadline, "no agent spawned by poll_tasks");
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
|
||||
// Task should have transitioned to Started
|
||||
let updated_task = {
|
||||
let store = state.store.lock().unwrap();
|
||||
store
|
||||
.get_task(&task.id)
|
||||
.unwrap()
|
||||
.expect("task should exist")
|
||||
};
|
||||
assert_eq!(
|
||||
updated_task.status,
|
||||
colibri_store::TaskStatus::Started,
|
||||
"task should be Started after poll_tasks spawns"
|
||||
);
|
||||
|
||||
// A session should exist for this task
|
||||
assert!(
|
||||
state.sessions.contains_key(&format!("task-{}", task.id)),
|
||||
"session should be created for task"
|
||||
);
|
||||
|
||||
// Glasspane should observe the full lifecycle: the smoke agent runs
|
||||
// through Idle → Working → Blocked → Working → Done quickly (10ms steps).
|
||||
// Wait for Done — the terminal state — which proves the whole path worked:
|
||||
// poll_tasks spawned the agent, stdout streamed to glasspane, and the
|
||||
// state machine processed the JSONL events.
|
||||
let deadline = Instant::now() + Duration::from_secs(20);
|
||||
loop {
|
||||
let snap = client.glasspane_snapshot().await.unwrap();
|
||||
if snap.panes.iter().any(|p| p.state == AgentState::Done) {
|
||||
break;
|
||||
}
|
||||
assert!(
|
||||
Instant::now() < deadline,
|
||||
"agent did not reach Done — snapshot: {:?}",
|
||||
snap.panes.iter().map(|p| (p.id.clone(), p.state)).collect::<Vec<_>>()
|
||||
);
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
let _ = state.shutdown_tx.send(());
|
||||
server.await.unwrap();
|
||||
std::env::remove_var("COLIBRI_AGENT_BINARY");
|
||||
let _ = tokio::fs::remove_dir_all(config.data_dir).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn harness_double_spawn_session_isolation() {
|
||||
let config = smoke_config();
|
||||
|
|
|
|||
|
|
@ -17,8 +17,6 @@ pub struct DaemonConfig {
|
|||
pub data_dir: PathBuf,
|
||||
/// Path for the Colibri control-plane socket.
|
||||
pub socket_path: PathBuf,
|
||||
/// Maximum bytes per session before automatic rollover.
|
||||
pub session_max_bytes: u64,
|
||||
/// Path to the coordination SQLite database.
|
||||
pub db_path: PathBuf,
|
||||
/// DeepSeek API key for provider routing.
|
||||
|
|
@ -39,8 +37,6 @@ pub struct DaemonConfig {
|
|||
pub host: String,
|
||||
/// Maximum context window tokens (for compaction trigger).
|
||||
pub max_context_tokens: u64,
|
||||
/// Compaction target: max turns to keep uncompacted.
|
||||
pub max_uncompacted_turns: usize,
|
||||
/// Current cost mode (fast/smart/max).
|
||||
pub cost_mode: String,
|
||||
/// Inject session prompt context into spawned agent env (T1.4 PR3a).
|
||||
|
|
@ -84,7 +80,6 @@ impl DaemonConfig {
|
|||
data_dir,
|
||||
socket_path,
|
||||
db_path,
|
||||
session_max_bytes: env_parse("COLIBRI_SESSION_MAX_BYTES").unwrap_or(2_000_000),
|
||||
deepseek_api_key: nonempty_env("DEEPSEEK_API_KEY"),
|
||||
deepseek_endpoint: std::env::var("DEEPSEEK_ENDPOINT")
|
||||
.unwrap_or_else(|_| "https://api.deepseek.com/chat/completions".to_string()),
|
||||
|
|
@ -98,7 +93,6 @@ impl DaemonConfig {
|
|||
.unwrap_or_else(|_| "https://api.anthropic.com/v1/messages".to_string()),
|
||||
host,
|
||||
max_context_tokens: env_parse("COLIBRI_MAX_CONTEXT_TOKENS").unwrap_or(128_000),
|
||||
max_uncompacted_turns: env_parse("COLIBRI_MAX_UNCOMPACTED_TURNS").unwrap_or(20),
|
||||
cost_mode: std::env::var("COLIBRI_COST_MODE").unwrap_or_else(|_| "smart".to_string()),
|
||||
scheduler_prompt_injection: env_parse("COLIBRI_SCHEDULER_PROMPT_INJECTION")
|
||||
.unwrap_or(false),
|
||||
|
|
@ -166,7 +160,6 @@ mod tests {
|
|||
"ANTHROPIC_API_KEY",
|
||||
"ANTHROPIC_ENDPOINT",
|
||||
"COLIBRI_MAX_CONTEXT_TOKENS",
|
||||
"COLIBRI_MAX_UNCOMPACTED_TURNS",
|
||||
];
|
||||
|
||||
let saved: Vec<(&str, Option<String>)> = vars
|
||||
|
|
@ -190,10 +183,8 @@ mod tests {
|
|||
|
||||
assert_eq!(config.host, "unknown");
|
||||
assert!(config.data_dir.to_string_lossy().contains("colibri-daemon"));
|
||||
assert_eq!(config.session_max_bytes, 2_000_000);
|
||||
assert_eq!(config.deepseek_model, "deepseek-chat");
|
||||
assert_eq!(config.max_context_tokens, 128_000);
|
||||
assert_eq!(config.max_uncompacted_turns, 20);
|
||||
assert!(config.deepseek_api_key.is_none());
|
||||
assert!(config.openrouter_api_key.is_none());
|
||||
assert!(config.anthropic_api_key.is_none());
|
||||
|
|
@ -207,7 +198,6 @@ mod tests {
|
|||
"DEEPSEEK_API_KEY",
|
||||
"DEEPSEEK_MODEL",
|
||||
"COLIBRI_MAX_CONTEXT_TOKENS",
|
||||
"COLIBRI_SESSION_MAX_BYTES",
|
||||
];
|
||||
let saved: Vec<(&str, Option<String>)> = vars
|
||||
.iter()
|
||||
|
|
@ -218,7 +208,6 @@ mod tests {
|
|||
std::env::set_var("DEEPSEEK_API_KEY", "sk-test");
|
||||
std::env::set_var("DEEPSEEK_MODEL", "deepseek-v4");
|
||||
std::env::set_var("COLIBRI_MAX_CONTEXT_TOKENS", "64000");
|
||||
std::env::set_var("COLIBRI_SESSION_MAX_BYTES", "500000");
|
||||
|
||||
let config = DaemonConfig::from_env();
|
||||
|
||||
|
|
@ -234,6 +223,5 @@ mod tests {
|
|||
assert_eq!(config.deepseek_api_key.as_deref(), Some("sk-test"));
|
||||
assert_eq!(config.deepseek_model, "deepseek-v4");
|
||||
assert_eq!(config.max_context_tokens, 64000);
|
||||
assert_eq!(config.session_max_bytes, 500000);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,6 +29,9 @@ pub struct DaemonState {
|
|||
pub shutdown_rx: broadcast::Receiver<()>,
|
||||
/// Headroom compression sidecar (optional, connected lazily on startup).
|
||||
pub headroom_sidecar: RwLock<Option<HeadroomSidecar>>,
|
||||
/// Runtime cost mode — updated by `set-cost-mode` socket command.
|
||||
/// All enforcement code reads from this, not `config.cost_mode`.
|
||||
pub cost_mode: RwLock<String>,
|
||||
}
|
||||
|
||||
impl DaemonState {
|
||||
|
|
@ -41,7 +44,7 @@ impl DaemonState {
|
|||
)
|
||||
});
|
||||
Self {
|
||||
config,
|
||||
config: config.clone(),
|
||||
sessions: DashMap::new(),
|
||||
agents: DashMap::new(),
|
||||
glasspane: RwLock::new(PaneSupervisor::new()),
|
||||
|
|
@ -53,6 +56,7 @@ impl DaemonState {
|
|||
shutdown_tx,
|
||||
shutdown_rx,
|
||||
headroom_sidecar: RwLock::new(None),
|
||||
cost_mode: RwLock::new(config.cost_mode.clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -240,22 +244,45 @@ async fn heartbeat(state: &SharedState, _stall_timeout: Duration) {
|
|||
}
|
||||
|
||||
async fn session_rotation(state: &SharedState) {
|
||||
let cost_mode = crate::cost::CostMode::parse(&state.config.cost_mode).unwrap_or_default();
|
||||
let max_bytes = cost_mode.session_max_bytes();
|
||||
let max_turns = cost_mode.max_uncompacted_turns();
|
||||
let cm_str = state.cost_mode.read().await.clone();
|
||||
let mut cost_mode = crate::cost::CostMode::parse(&cm_str).unwrap_or_default();
|
||||
let mut compacted = 0usize;
|
||||
let mut escalated = false;
|
||||
|
||||
for entry in state.sessions.iter() {
|
||||
let s = entry.value();
|
||||
let max_bytes = cost_mode.session_max_bytes();
|
||||
let max_turns = cost_mode.max_uncompacted_turns();
|
||||
let (bc, tc) = { (s.byte_count().await, s.turn_count().await) };
|
||||
if (bc > max_bytes || tc > max_turns) && s.compact_oldest_turns().await.is_ok() {
|
||||
if (bc > max_bytes || tc > max_turns) && s.compact_oldest_turns(max_turns).await.is_ok() {
|
||||
compacted += 1;
|
||||
}
|
||||
if s.byte_count().await > max_bytes * 3 {
|
||||
let _ = s.prune_to((max_turns / 2).max(1)).await;
|
||||
}
|
||||
|
||||
// Auto-escalate if still over budget after compaction
|
||||
let post_bc = s.byte_count().await;
|
||||
if post_bc > cost_mode.session_max_bytes() {
|
||||
let trigger = crate::cost::EscalationTrigger::CompactionInsufficient {
|
||||
freed_bytes: bc.saturating_sub(post_bc),
|
||||
needed_bytes: post_bc.saturating_sub(cost_mode.session_max_bytes()),
|
||||
};
|
||||
if let Some(next) = crate::cost::auto_escalate(cost_mode, &trigger) {
|
||||
cost_mode = next;
|
||||
escalated = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if compacted > 0 {
|
||||
|
||||
if escalated {
|
||||
let mut cm = state.cost_mode.write().await;
|
||||
*cm = cost_mode.as_str().to_string();
|
||||
info!(
|
||||
escalated_to = cost_mode.as_str(),
|
||||
compacted, "session rotation with cost mode escalation"
|
||||
);
|
||||
} else if compacted > 0 {
|
||||
info!(compacted = compacted, "session rotation");
|
||||
}
|
||||
}
|
||||
|
|
@ -273,11 +300,112 @@ async fn memory_handoff(state: &SharedState) {
|
|||
|
||||
pub async fn poll_tasks(state: &SharedState) {
|
||||
debug!("task polling tick");
|
||||
let _spawner = Spawner::new(state.config.clone().into());
|
||||
|
||||
let claimed = {
|
||||
let store = state.store.lock().unwrap();
|
||||
store
|
||||
.list_tasks(Some(colibri_store::TaskStatus::Claimed))
|
||||
.unwrap_or_default()
|
||||
};
|
||||
|
||||
if claimed.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let spawner = Spawner::new(std::sync::Arc::new(state.config.clone()));
|
||||
|
||||
for task in claimed {
|
||||
let task_id = task.id.clone();
|
||||
|
||||
if state
|
||||
.agents
|
||||
.iter()
|
||||
.any(|e| e.value().config.session_id.as_deref() == Some(&task_id))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
let session_id = format!("task-{task_id}");
|
||||
match Session::create(
|
||||
session_id.clone(),
|
||||
std::sync::Arc::new(state.config.clone()),
|
||||
) {
|
||||
Ok(sess) => {
|
||||
state.sessions.insert(session_id.clone(), sess);
|
||||
}
|
||||
Err(_) => continue,
|
||||
}
|
||||
|
||||
let binary = std::env::var("COLIBRI_AGENT_BINARY")
|
||||
.unwrap_or_else(|_| "colibri-smoke-agent".to_string());
|
||||
|
||||
let agent_config = crate::spawner::AgentSpawnConfig {
|
||||
binary: binary.clone(),
|
||||
args: vec![
|
||||
"--session-id".to_string(),
|
||||
task_id.clone(),
|
||||
"--step-ms".to_string(),
|
||||
"10".to_string(),
|
||||
"--hold-secs".to_string(),
|
||||
"1".to_string(),
|
||||
],
|
||||
provider: crate::spawner::Provider::Local,
|
||||
model: binary.clone(),
|
||||
session_id: Some(session_id.clone()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
match spawner.spawn(agent_config).await {
|
||||
Ok(handle) => {
|
||||
let agent_id = handle.id.clone();
|
||||
let agent_label = handle.config.binary.clone();
|
||||
let stdout = handle.take_stdout().await;
|
||||
|
||||
let runtime = match std::path::Path::new(&agent_label)
|
||||
.file_name()
|
||||
.and_then(|s| s.to_str())
|
||||
{
|
||||
Some("zot") => colibri_glasspane::AgentRuntime::Zot,
|
||||
_ => colibri_glasspane::AgentRuntime::Pi,
|
||||
};
|
||||
|
||||
state.glasspane.write().await.attach_pane_with_runtime(
|
||||
agent_id.clone(),
|
||||
&agent_label,
|
||||
std::time::SystemTime::now(),
|
||||
runtime,
|
||||
);
|
||||
state.agents.insert(agent_id.clone(), handle);
|
||||
|
||||
{
|
||||
let store = state.store.lock().unwrap();
|
||||
let _ = store.transition_task(&task_id, colibri_store::TaskStatus::Started);
|
||||
}
|
||||
info!(task_id = %task_id, agent_id = %agent_id, "spawned agent for claimed task");
|
||||
|
||||
if let Some(stdout) = stdout {
|
||||
let reader_state = state.clone();
|
||||
let reader_agent_id = agent_id.clone();
|
||||
tokio::spawn(async move {
|
||||
crate::socket::stream_agent_stdout_to_glasspane(
|
||||
reader_state,
|
||||
reader_agent_id,
|
||||
stdout,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(task_id = %task_id, error = %e, "failed to spawn agent for claimed task");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn scheduler_tick_fn(state: &SharedState) {
|
||||
state.scheduler.lock().await.tick(state).await;
|
||||
poll_tasks(state).await;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
host = %config.host,
|
||||
data_dir = %config.data_dir.display(),
|
||||
socket_path = %config.socket_path.display(),
|
||||
session_max_bytes = config.session_max_bytes,
|
||||
cost_mode = %config.cost_mode,
|
||||
max_context_tokens = config.max_context_tokens,
|
||||
"configuration loaded"
|
||||
);
|
||||
|
|
|
|||
|
|
@ -348,7 +348,6 @@ mod tests {
|
|||
socket_path: data_dir.join("colibri.sock"),
|
||||
data_dir: data_dir.clone(),
|
||||
db_path: data_dir.join("colibri.sqlite"),
|
||||
session_max_bytes: 2_000_000,
|
||||
deepseek_api_key: None,
|
||||
deepseek_endpoint: "https://api.deepseek.com/chat/completions".to_string(),
|
||||
deepseek_model: "deepseek-chat".to_string(),
|
||||
|
|
@ -358,7 +357,6 @@ mod tests {
|
|||
anthropic_endpoint: "https://api.anthropic.com/v1/messages".to_string(),
|
||||
host: "scheduler-test-host".to_string(),
|
||||
max_context_tokens: 128_000,
|
||||
max_uncompacted_turns: 20,
|
||||
cost_mode: "smart".to_string(),
|
||||
scheduler_prompt_injection: true,
|
||||
cache_warming_enabled: false,
|
||||
|
|
|
|||
|
|
@ -8,10 +8,10 @@
|
|||
//! 2. Appendable conversation log (turns accumulate until compaction)
|
||||
//! 3. Volatile scratch (discarded per-turn, never persisted)
|
||||
//!
|
||||
//! When total bytes exceed `session_max_bytes` or the number of uncompacted
|
||||
//! turns exceeds `max_uncompacted_turns`, the oldest turns are compacted
|
||||
//! (summarised) via LLM call. Compacted summaries are stored as special
|
||||
//! "compaction" entries in the JSONL.
|
||||
//! When total bytes exceed the cost mode's `session_max_bytes` or the number
|
||||
//! of uncompacted turns exceeds `max_uncompacted_turns`, the oldest turns are
|
||||
//! compacted (summarised) via LLM call. Compacted summaries are stored as
|
||||
//! special "compaction" entries in the JSONL.
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::io::{BufRead, BufReader, Write};
|
||||
|
|
@ -387,18 +387,21 @@ impl Session {
|
|||
// ------------------------------------------------------------------
|
||||
|
||||
/// Check thresholds and compact/prune if needed.
|
||||
///
|
||||
/// Thresholds are derived from the cost mode, not static config fields.
|
||||
async fn maybe_compact_or_rollover(&self) -> Result<(), SessionError> {
|
||||
let cost_mode = crate::cost::CostMode::parse(&self.config.cost_mode).unwrap_or_default();
|
||||
let max_bytes = cost_mode.session_max_bytes();
|
||||
let max_turns = cost_mode.max_uncompacted_turns();
|
||||
|
||||
let (byte_count, turn_count) = {
|
||||
let bc = self.byte_count.read().await;
|
||||
let turns = self.turns.read().await;
|
||||
(*bc, turns.len())
|
||||
};
|
||||
|
||||
let needs_compaction = byte_count > self.config.session_max_bytes
|
||||
|| turn_count > self.config.max_uncompacted_turns;
|
||||
|
||||
if needs_compaction {
|
||||
self.compact_oldest_turns().await?;
|
||||
if byte_count > max_bytes || turn_count > max_turns {
|
||||
self.compact_oldest_turns(max_turns).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
@ -406,14 +409,12 @@ impl Session {
|
|||
|
||||
/// Compact the oldest turns by merging them into a summary entry.
|
||||
///
|
||||
/// The caller (the LLM summarizer) produces a compact summary string.
|
||||
/// This method handles the bookkeeping: removing old turn entries and
|
||||
/// appending a compaction marker.
|
||||
pub async fn compact_oldest_turns(&self) -> Result<(), SessionError> {
|
||||
/// `keep` is the number of most-recent turns to retain. All older turns
|
||||
/// are replaced by a compaction marker. The caller derives `keep` from
|
||||
/// the current cost mode (`CostMode::max_uncompacted_turns()`).
|
||||
pub async fn compact_oldest_turns(&self, keep: usize) -> Result<(), SessionError> {
|
||||
let mut turns = self.turns.write().await;
|
||||
|
||||
// Keep the most recent `max_uncompacted_turns` turns, compact the rest.
|
||||
let keep = self.config.max_uncompacted_turns;
|
||||
if turns.len() <= keep {
|
||||
return Ok(());
|
||||
}
|
||||
|
|
@ -488,9 +489,9 @@ impl Session {
|
|||
// ------------------------------------------------------------------
|
||||
|
||||
/// Build a PromptAssembly from the current session state.
|
||||
/// Wraps the existing `build_prompt_messages()` — no behavior change.
|
||||
/// Applies cost-mode-aware trimming to stay within budget.
|
||||
pub async fn build_prompt_assembly(&self) -> PromptAssembly {
|
||||
let messages = self.build_prompt_messages(None).await;
|
||||
let mut messages = self.build_prompt_messages(None).await;
|
||||
let total_bytes: u64 = messages
|
||||
.iter()
|
||||
.map(|m| serde_json::to_string(m).unwrap_or_default().len() as u64)
|
||||
|
|
@ -506,13 +507,18 @@ impl Session {
|
|||
|
||||
let appendable_log = messages[1..].to_vec();
|
||||
|
||||
PromptAssembly {
|
||||
let mut assembly = PromptAssembly {
|
||||
immutable_prefix,
|
||||
appendable_log,
|
||||
volatile_scratch: Vec::new(),
|
||||
total_bytes,
|
||||
estimated_tokens,
|
||||
}
|
||||
};
|
||||
|
||||
let cost_mode = crate::cost::CostMode::parse(&self.config.cost_mode).unwrap_or_default();
|
||||
assembly.trim_to_budget(cost_mode);
|
||||
let _ = &mut messages; // suppress unused mut warning
|
||||
assembly
|
||||
}
|
||||
|
||||
/// Build the 3-region prompt:
|
||||
|
|
@ -523,6 +529,7 @@ impl Session {
|
|||
&self,
|
||||
mut headroom_sidecar: Option<&mut HeadroomSidecar>,
|
||||
) -> Vec<serde_json::Value> {
|
||||
let cost_mode = crate::cost::CostMode::parse(&self.config.cost_mode).unwrap_or_default();
|
||||
let mut messages: Vec<serde_json::Value> = Vec::new();
|
||||
|
||||
// Region 1: Immutable system prefix
|
||||
|
|
@ -560,11 +567,22 @@ impl Session {
|
|||
}
|
||||
SessionEntry::ToolResult { name, result } => {
|
||||
let raw = result.to_string();
|
||||
let content = match headroom_sidecar {
|
||||
Some(ref mut sidecar) => {
|
||||
sidecar.compress(&raw, name).await.unwrap_or(raw)
|
||||
let content = if cost_mode.compact_tool_results() {
|
||||
let from_sidecar = match headroom_sidecar {
|
||||
Some(ref mut sidecar) => sidecar.compress(&raw, name).await,
|
||||
None => None,
|
||||
};
|
||||
match from_sidecar {
|
||||
Some(compressed) => compressed,
|
||||
None => crate::cost::compact_tool_result(
|
||||
&raw,
|
||||
cost_mode.tool_result_max_bytes(),
|
||||
name,
|
||||
)
|
||||
.unwrap_or(raw),
|
||||
}
|
||||
None => raw,
|
||||
} else {
|
||||
raw
|
||||
};
|
||||
messages.push(serde_json::json!({
|
||||
"role": "tool",
|
||||
|
|
|
|||
|
|
@ -260,7 +260,8 @@ async fn cmd_status(state: &SharedState) -> ColibriResponse {
|
|||
Err(_) => serde_json::json!({"error": "store unavailable"}),
|
||||
};
|
||||
|
||||
let cost_mode = crate::cost::CostMode::parse(&state.config.cost_mode).unwrap_or_default();
|
||||
let cost_mode_str = state.cost_mode.read().await.clone();
|
||||
let cost_mode = crate::cost::CostMode::parse(&cost_mode_str).unwrap_or_default();
|
||||
let last_warm_at = state.last_warm_at.read().await.map(|t| t.to_rfc3339());
|
||||
let last_warm_hit = *state.last_warm_cache_hit.read().await;
|
||||
let last_warm_tokens = *state.last_warm_hit_tokens.read().await;
|
||||
|
|
@ -378,8 +379,10 @@ async fn cmd_spawn_agent(
|
|||
if let Some(ref sid) = session_id {
|
||||
if let Some(session) = state.sessions.get(sid) {
|
||||
let assembly = session.value().build_prompt_assembly().await;
|
||||
let cost_mode =
|
||||
crate::cost::CostMode::parse(&state.config.cost_mode).unwrap_or_default();
|
||||
let cost_mode = {
|
||||
let cm_str = state.cost_mode.read().await;
|
||||
crate::cost::CostMode::parse(&cm_str).unwrap_or_default()
|
||||
};
|
||||
let _ = assembly; // trimmed context available for injection
|
||||
let context_json =
|
||||
serde_json::to_string(&assembly.to_messages()).unwrap_or_default();
|
||||
|
|
@ -489,7 +492,7 @@ async fn cmd_get_session(state: &SharedState, session_id: String) -> ColibriResp
|
|||
}
|
||||
}
|
||||
|
||||
async fn stream_agent_stdout_to_glasspane(
|
||||
pub async fn stream_agent_stdout_to_glasspane(
|
||||
state: SharedState,
|
||||
agent_id: String,
|
||||
stdout: ChildStdout,
|
||||
|
|
@ -531,14 +534,19 @@ async fn stream_agent_stdout_to_glasspane(
|
|||
|
||||
async fn cmd_compact_session(state: &SharedState, session_id: String) -> ColibriResponse {
|
||||
match state.sessions.get(&session_id) {
|
||||
Some(session) => match session.value().compact_oldest_turns().await {
|
||||
Ok(()) => ColibriResponse::ok(serde_json::json!({
|
||||
"session_id": session_id,
|
||||
"turn_count": session.value().turn_count().await,
|
||||
"status": "compacted",
|
||||
})),
|
||||
Err(e) => ColibriResponse::err(format!("compaction failed: {e}")),
|
||||
},
|
||||
Some(session) => {
|
||||
let cm_str = state.cost_mode.read().await.clone();
|
||||
let cost_mode = crate::cost::CostMode::parse(&cm_str).unwrap_or_default();
|
||||
let keep = cost_mode.max_uncompacted_turns();
|
||||
match session.value().compact_oldest_turns(keep).await {
|
||||
Ok(()) => ColibriResponse::ok(serde_json::json!({
|
||||
"session_id": session_id,
|
||||
"turn_count": session.value().turn_count().await,
|
||||
"status": "compacted",
|
||||
})),
|
||||
Err(e) => ColibriResponse::err(format!("compaction failed: {e}")),
|
||||
}
|
||||
}
|
||||
None => ColibriResponse::err(format!("session not found: {session_id}")),
|
||||
}
|
||||
}
|
||||
|
|
@ -657,20 +665,18 @@ async fn cmd_intake_task(
|
|||
async fn cmd_set_cost_mode(state: &SharedState, mode: String) -> ColibriResponse {
|
||||
match crate::cost::CostMode::parse(&mode) {
|
||||
Some(cost_mode) => {
|
||||
let prev = state.cost_mode.read().await.clone();
|
||||
*state.cost_mode.write().await = mode.clone();
|
||||
info!(
|
||||
from = state.config.cost_mode,
|
||||
to = cost_mode.as_str(),
|
||||
from = prev,
|
||||
to = %mode,
|
||||
"cost mode changed via socket"
|
||||
);
|
||||
// Note: DaemonConfig is not mutable via &SharedState.
|
||||
// In a future iteration, this would update the config in-place.
|
||||
// For now, we acknowledge and log the request.
|
||||
// T1.4 scope: runtime-only. Persistence (COLIBRI_COST_MODE in
|
||||
// config/socket-update) is post-Phase-5.
|
||||
ColibriResponse::ok(serde_json::json!({
|
||||
"previous": state.config.cost_mode,
|
||||
"requested": cost_mode.as_str(),
|
||||
"status": "acknowledged",
|
||||
"previous": prev,
|
||||
"current": cost_mode.as_str(),
|
||||
"session_max_bytes": cost_mode.session_max_bytes(),
|
||||
"max_uncompacted_turns": cost_mode.max_uncompacted_turns(),
|
||||
}))
|
||||
}
|
||||
None => ColibriResponse::err(format!(
|
||||
|
|
@ -697,7 +703,6 @@ mod tests {
|
|||
socket_path: data_dir.join("colibri.sock"),
|
||||
data_dir: data_dir.clone(),
|
||||
db_path: data_dir.join("colibri.sqlite"),
|
||||
session_max_bytes: 2_000_000,
|
||||
deepseek_api_key: None,
|
||||
deepseek_endpoint: "https://api.deepseek.com/chat/completions".to_string(),
|
||||
deepseek_model: "deepseek-chat".to_string(),
|
||||
|
|
@ -707,7 +712,6 @@ mod tests {
|
|||
anthropic_endpoint: "https://api.anthropic.com/v1/messages".to_string(),
|
||||
host: "test-host".to_string(),
|
||||
max_context_tokens: 128_000,
|
||||
max_uncompacted_turns: 20,
|
||||
cost_mode: "smart".to_string(),
|
||||
scheduler_prompt_injection: true,
|
||||
cache_warming_enabled: false,
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue