diff --git a/Cargo.lock b/Cargo.lock index d499ea7..75c186f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -303,6 +303,7 @@ version = "0.0.1" dependencies = [ "colibri-daemon", "colibri-glasspane", + "colibri-store", "serde", "serde_json", "thiserror 2.0.18", diff --git a/crates/colibri-client/Cargo.toml b/crates/colibri-client/Cargo.toml index ce86f99..3649e1f 100644 --- a/crates/colibri-client/Cargo.toml +++ b/crates/colibri-client/Cargo.toml @@ -24,3 +24,4 @@ tokio = { version = "1", features = ["io-util", "macros", "net", "rt-multi-threa [dev-dependencies] tokio = { version = "1", features = ["fs", "io-util", "macros", "net", "rt-multi-thread", "time"] } uuid = { version = "1", features = ["v4"] } +colibri-store = { path = "../colibri-store" } diff --git a/crates/colibri-client/tests/live_socket_smoke.rs b/crates/colibri-client/tests/live_socket_smoke.rs index 6ff7875..96ea848 100644 --- a/crates/colibri-client/tests/live_socket_smoke.rs +++ b/crates/colibri-client/tests/live_socket_smoke.rs @@ -5,6 +5,7 @@ use std::{ }; use colibri_client::DaemonClient; +use colibri_daemon::daemon::poll_tasks; use colibri_daemon::{socket, DaemonConfig, DaemonState, SharedState}; use colibri_glasspane::AgentState; use uuid::Uuid; @@ -15,7 +16,6 @@ fn smoke_config() -> DaemonConfig { socket_path: data_dir.join("colibri.sock"), data_dir: data_dir.clone(), db_path: data_dir.join("colibri.sqlite"), - session_max_bytes: 2_000_000, deepseek_api_key: None, deepseek_endpoint: "https://api.deepseek.com/chat/completions".to_string(), deepseek_model: "deepseek-chat".to_string(), @@ -25,7 +25,6 @@ fn smoke_config() -> DaemonConfig { anthropic_endpoint: "https://api.anthropic.com/v1/messages".to_string(), host: "live-smoke-host".to_string(), max_context_tokens: 128_000, - max_uncompacted_turns: 20, cost_mode: "smart".to_string(), scheduler_prompt_injection: false, cache_warming_enabled: false, @@ -200,6 +199,110 @@ async fn colibri_cli_task_commands_use_socket_api() { let _ = tokio::fs::remove_dir_all(config.data_dir).await; } +#[tokio::test] +async fn poll_tasks_spawns_agent_for_claimed_task() { + let mut config = smoke_config(); + let fake_agent = env!("CARGO_BIN_EXE_colibri-smoke-agent"); + // Set COLIBRI_AGENT_BINARY so poll_tasks uses the smoke agent binary + config.data_dir = + std::env::temp_dir().join(format!("colibri-poll-tasks-test-{}", Uuid::new_v4())); + tokio::fs::create_dir_all(&config.data_dir).await.unwrap(); + std::env::set_var("COLIBRI_AGENT_BINARY", fake_agent); + + let state: SharedState = Arc::new(DaemonState::new(config.clone())); + let shutdown = state.shutdown_rx.resubscribe(); + let server_state = state.clone(); + let server = tokio::spawn(async move { + socket::serve(server_state, shutdown).await; + }); + + let client = DaemonClient::new(config.socket_path.clone()); + wait_for_socket(&client).await; + + // Register an agent + { + let store = state.store.lock().unwrap(); + store + .register_agent("test-agent", serde_json::json!([])) + .unwrap(); + } + + // Create and claim a task directly in the store + let task = { + let store = state.store.lock().unwrap(); + let task = store.create_task("poll-tasks-smoke", None).unwrap(); + let agent = store.list_agents().unwrap(); + let agent_id = &agent[0].id; + store.claim_task(&task.id, agent_id).unwrap(); + store + .get_task(&task.id) + .unwrap() + .expect("task should exist") + }; + assert_eq!( + task.status, + colibri_store::TaskStatus::Claimed, + "task should be claimed before poll_tasks" + ); + + // Call poll_tasks — should spawn the smoke agent for this claimed task + poll_tasks(&state).await; + + // Verify an agent handle was created + let deadline = Instant::now() + Duration::from_secs(10); + loop { + if !state.agents.is_empty() { + break; + } + assert!(Instant::now() < deadline, "no agent spawned by poll_tasks"); + tokio::time::sleep(Duration::from_millis(50)).await; + } + + // Task should have transitioned to Started + let updated_task = { + let store = state.store.lock().unwrap(); + store + .get_task(&task.id) + .unwrap() + .expect("task should exist") + }; + assert_eq!( + updated_task.status, + colibri_store::TaskStatus::Started, + "task should be Started after poll_tasks spawns" + ); + + // A session should exist for this task + assert!( + state.sessions.contains_key(&format!("task-{}", task.id)), + "session should be created for task" + ); + + // Glasspane should observe the full lifecycle: the smoke agent runs + // through Idle → Working → Blocked → Working → Done quickly (10ms steps). + // Wait for Done — the terminal state — which proves the whole path worked: + // poll_tasks spawned the agent, stdout streamed to glasspane, and the + // state machine processed the JSONL events. + let deadline = Instant::now() + Duration::from_secs(20); + loop { + let snap = client.glasspane_snapshot().await.unwrap(); + if snap.panes.iter().any(|p| p.state == AgentState::Done) { + break; + } + assert!( + Instant::now() < deadline, + "agent did not reach Done — snapshot: {:?}", + snap.panes.iter().map(|p| (p.id.clone(), p.state)).collect::>() + ); + tokio::time::sleep(Duration::from_millis(100)).await; + } + + let _ = state.shutdown_tx.send(()); + server.await.unwrap(); + std::env::remove_var("COLIBRI_AGENT_BINARY"); + let _ = tokio::fs::remove_dir_all(config.data_dir).await; +} + #[tokio::test] async fn harness_double_spawn_session_isolation() { let config = smoke_config(); diff --git a/crates/colibri-daemon/src/config.rs b/crates/colibri-daemon/src/config.rs index 2a2dda3..c281d52 100644 --- a/crates/colibri-daemon/src/config.rs +++ b/crates/colibri-daemon/src/config.rs @@ -17,8 +17,6 @@ pub struct DaemonConfig { pub data_dir: PathBuf, /// Path for the Colibri control-plane socket. pub socket_path: PathBuf, - /// Maximum bytes per session before automatic rollover. - pub session_max_bytes: u64, /// Path to the coordination SQLite database. pub db_path: PathBuf, /// DeepSeek API key for provider routing. @@ -39,8 +37,6 @@ pub struct DaemonConfig { pub host: String, /// Maximum context window tokens (for compaction trigger). pub max_context_tokens: u64, - /// Compaction target: max turns to keep uncompacted. - pub max_uncompacted_turns: usize, /// Current cost mode (fast/smart/max). pub cost_mode: String, /// Inject session prompt context into spawned agent env (T1.4 PR3a). @@ -84,7 +80,6 @@ impl DaemonConfig { data_dir, socket_path, db_path, - session_max_bytes: env_parse("COLIBRI_SESSION_MAX_BYTES").unwrap_or(2_000_000), deepseek_api_key: nonempty_env("DEEPSEEK_API_KEY"), deepseek_endpoint: std::env::var("DEEPSEEK_ENDPOINT") .unwrap_or_else(|_| "https://api.deepseek.com/chat/completions".to_string()), @@ -98,7 +93,6 @@ impl DaemonConfig { .unwrap_or_else(|_| "https://api.anthropic.com/v1/messages".to_string()), host, max_context_tokens: env_parse("COLIBRI_MAX_CONTEXT_TOKENS").unwrap_or(128_000), - max_uncompacted_turns: env_parse("COLIBRI_MAX_UNCOMPACTED_TURNS").unwrap_or(20), cost_mode: std::env::var("COLIBRI_COST_MODE").unwrap_or_else(|_| "smart".to_string()), scheduler_prompt_injection: env_parse("COLIBRI_SCHEDULER_PROMPT_INJECTION") .unwrap_or(false), @@ -166,7 +160,6 @@ mod tests { "ANTHROPIC_API_KEY", "ANTHROPIC_ENDPOINT", "COLIBRI_MAX_CONTEXT_TOKENS", - "COLIBRI_MAX_UNCOMPACTED_TURNS", ]; let saved: Vec<(&str, Option)> = vars @@ -190,10 +183,8 @@ mod tests { assert_eq!(config.host, "unknown"); assert!(config.data_dir.to_string_lossy().contains("colibri-daemon")); - assert_eq!(config.session_max_bytes, 2_000_000); assert_eq!(config.deepseek_model, "deepseek-chat"); assert_eq!(config.max_context_tokens, 128_000); - assert_eq!(config.max_uncompacted_turns, 20); assert!(config.deepseek_api_key.is_none()); assert!(config.openrouter_api_key.is_none()); assert!(config.anthropic_api_key.is_none()); @@ -207,7 +198,6 @@ mod tests { "DEEPSEEK_API_KEY", "DEEPSEEK_MODEL", "COLIBRI_MAX_CONTEXT_TOKENS", - "COLIBRI_SESSION_MAX_BYTES", ]; let saved: Vec<(&str, Option)> = vars .iter() @@ -218,7 +208,6 @@ mod tests { std::env::set_var("DEEPSEEK_API_KEY", "sk-test"); std::env::set_var("DEEPSEEK_MODEL", "deepseek-v4"); std::env::set_var("COLIBRI_MAX_CONTEXT_TOKENS", "64000"); - std::env::set_var("COLIBRI_SESSION_MAX_BYTES", "500000"); let config = DaemonConfig::from_env(); @@ -234,6 +223,5 @@ mod tests { assert_eq!(config.deepseek_api_key.as_deref(), Some("sk-test")); assert_eq!(config.deepseek_model, "deepseek-v4"); assert_eq!(config.max_context_tokens, 64000); - assert_eq!(config.session_max_bytes, 500000); } } diff --git a/crates/colibri-daemon/src/daemon.rs b/crates/colibri-daemon/src/daemon.rs index 10f64eb..9aca608 100644 --- a/crates/colibri-daemon/src/daemon.rs +++ b/crates/colibri-daemon/src/daemon.rs @@ -29,6 +29,9 @@ pub struct DaemonState { pub shutdown_rx: broadcast::Receiver<()>, /// Headroom compression sidecar (optional, connected lazily on startup). pub headroom_sidecar: RwLock>, + /// Runtime cost mode — updated by `set-cost-mode` socket command. + /// All enforcement code reads from this, not `config.cost_mode`. + pub cost_mode: RwLock, } impl DaemonState { @@ -41,7 +44,7 @@ impl DaemonState { ) }); Self { - config, + config: config.clone(), sessions: DashMap::new(), agents: DashMap::new(), glasspane: RwLock::new(PaneSupervisor::new()), @@ -53,6 +56,7 @@ impl DaemonState { shutdown_tx, shutdown_rx, headroom_sidecar: RwLock::new(None), + cost_mode: RwLock::new(config.cost_mode.clone()), } } } @@ -240,22 +244,45 @@ async fn heartbeat(state: &SharedState, _stall_timeout: Duration) { } async fn session_rotation(state: &SharedState) { - let cost_mode = crate::cost::CostMode::parse(&state.config.cost_mode).unwrap_or_default(); - let max_bytes = cost_mode.session_max_bytes(); - let max_turns = cost_mode.max_uncompacted_turns(); + let cm_str = state.cost_mode.read().await.clone(); + let mut cost_mode = crate::cost::CostMode::parse(&cm_str).unwrap_or_default(); let mut compacted = 0usize; + let mut escalated = false; for entry in state.sessions.iter() { let s = entry.value(); + let max_bytes = cost_mode.session_max_bytes(); + let max_turns = cost_mode.max_uncompacted_turns(); let (bc, tc) = { (s.byte_count().await, s.turn_count().await) }; - if (bc > max_bytes || tc > max_turns) && s.compact_oldest_turns().await.is_ok() { + if (bc > max_bytes || tc > max_turns) && s.compact_oldest_turns(max_turns).await.is_ok() { compacted += 1; } if s.byte_count().await > max_bytes * 3 { let _ = s.prune_to((max_turns / 2).max(1)).await; } + + // Auto-escalate if still over budget after compaction + let post_bc = s.byte_count().await; + if post_bc > cost_mode.session_max_bytes() { + let trigger = crate::cost::EscalationTrigger::CompactionInsufficient { + freed_bytes: bc.saturating_sub(post_bc), + needed_bytes: post_bc.saturating_sub(cost_mode.session_max_bytes()), + }; + if let Some(next) = crate::cost::auto_escalate(cost_mode, &trigger) { + cost_mode = next; + escalated = true; + } + } } - if compacted > 0 { + + if escalated { + let mut cm = state.cost_mode.write().await; + *cm = cost_mode.as_str().to_string(); + info!( + escalated_to = cost_mode.as_str(), + compacted, "session rotation with cost mode escalation" + ); + } else if compacted > 0 { info!(compacted = compacted, "session rotation"); } } @@ -273,11 +300,112 @@ async fn memory_handoff(state: &SharedState) { pub async fn poll_tasks(state: &SharedState) { debug!("task polling tick"); - let _spawner = Spawner::new(state.config.clone().into()); + + let claimed = { + let store = state.store.lock().unwrap(); + store + .list_tasks(Some(colibri_store::TaskStatus::Claimed)) + .unwrap_or_default() + }; + + if claimed.is_empty() { + return; + } + + let spawner = Spawner::new(std::sync::Arc::new(state.config.clone())); + + for task in claimed { + let task_id = task.id.clone(); + + if state + .agents + .iter() + .any(|e| e.value().config.session_id.as_deref() == Some(&task_id)) + { + continue; + } + + let session_id = format!("task-{task_id}"); + match Session::create( + session_id.clone(), + std::sync::Arc::new(state.config.clone()), + ) { + Ok(sess) => { + state.sessions.insert(session_id.clone(), sess); + } + Err(_) => continue, + } + + let binary = std::env::var("COLIBRI_AGENT_BINARY") + .unwrap_or_else(|_| "colibri-smoke-agent".to_string()); + + let agent_config = crate::spawner::AgentSpawnConfig { + binary: binary.clone(), + args: vec![ + "--session-id".to_string(), + task_id.clone(), + "--step-ms".to_string(), + "10".to_string(), + "--hold-secs".to_string(), + "1".to_string(), + ], + provider: crate::spawner::Provider::Local, + model: binary.clone(), + session_id: Some(session_id.clone()), + ..Default::default() + }; + + match spawner.spawn(agent_config).await { + Ok(handle) => { + let agent_id = handle.id.clone(); + let agent_label = handle.config.binary.clone(); + let stdout = handle.take_stdout().await; + + let runtime = match std::path::Path::new(&agent_label) + .file_name() + .and_then(|s| s.to_str()) + { + Some("zot") => colibri_glasspane::AgentRuntime::Zot, + _ => colibri_glasspane::AgentRuntime::Pi, + }; + + state.glasspane.write().await.attach_pane_with_runtime( + agent_id.clone(), + &agent_label, + std::time::SystemTime::now(), + runtime, + ); + state.agents.insert(agent_id.clone(), handle); + + { + let store = state.store.lock().unwrap(); + let _ = store.transition_task(&task_id, colibri_store::TaskStatus::Started); + } + info!(task_id = %task_id, agent_id = %agent_id, "spawned agent for claimed task"); + + if let Some(stdout) = stdout { + let reader_state = state.clone(); + let reader_agent_id = agent_id.clone(); + tokio::spawn(async move { + crate::socket::stream_agent_stdout_to_glasspane( + reader_state, + reader_agent_id, + stdout, + ) + .await; + }); + } + } + Err(e) => { + warn!(task_id = %task_id, error = %e, "failed to spawn agent for claimed task"); + } + } + } } async fn scheduler_tick_fn(state: &SharedState) { state.scheduler.lock().await.tick(state).await; + poll_tasks(state).await; } #[cfg(test)] diff --git a/crates/colibri-daemon/src/main.rs b/crates/colibri-daemon/src/main.rs index cf04c66..ee465fa 100644 --- a/crates/colibri-daemon/src/main.rs +++ b/crates/colibri-daemon/src/main.rs @@ -30,7 +30,7 @@ async fn main() -> Result<(), Box> { host = %config.host, data_dir = %config.data_dir.display(), socket_path = %config.socket_path.display(), - session_max_bytes = config.session_max_bytes, + cost_mode = %config.cost_mode, max_context_tokens = config.max_context_tokens, "configuration loaded" ); diff --git a/crates/colibri-daemon/src/scheduler.rs b/crates/colibri-daemon/src/scheduler.rs index 3ad1e85..142c2e8 100644 --- a/crates/colibri-daemon/src/scheduler.rs +++ b/crates/colibri-daemon/src/scheduler.rs @@ -348,7 +348,6 @@ mod tests { socket_path: data_dir.join("colibri.sock"), data_dir: data_dir.clone(), db_path: data_dir.join("colibri.sqlite"), - session_max_bytes: 2_000_000, deepseek_api_key: None, deepseek_endpoint: "https://api.deepseek.com/chat/completions".to_string(), deepseek_model: "deepseek-chat".to_string(), @@ -358,7 +357,6 @@ mod tests { anthropic_endpoint: "https://api.anthropic.com/v1/messages".to_string(), host: "scheduler-test-host".to_string(), max_context_tokens: 128_000, - max_uncompacted_turns: 20, cost_mode: "smart".to_string(), scheduler_prompt_injection: true, cache_warming_enabled: false, diff --git a/crates/colibri-daemon/src/session.rs b/crates/colibri-daemon/src/session.rs index 28fdbfa..b266736 100644 --- a/crates/colibri-daemon/src/session.rs +++ b/crates/colibri-daemon/src/session.rs @@ -8,10 +8,10 @@ //! 2. Appendable conversation log (turns accumulate until compaction) //! 3. Volatile scratch (discarded per-turn, never persisted) //! -//! When total bytes exceed `session_max_bytes` or the number of uncompacted -//! turns exceeds `max_uncompacted_turns`, the oldest turns are compacted -//! (summarised) via LLM call. Compacted summaries are stored as special -//! "compaction" entries in the JSONL. +//! When total bytes exceed the cost mode's `session_max_bytes` or the number +//! of uncompacted turns exceeds `max_uncompacted_turns`, the oldest turns are +//! compacted (summarised) via LLM call. Compacted summaries are stored as +//! special "compaction" entries in the JSONL. use std::collections::VecDeque; use std::io::{BufRead, BufReader, Write}; @@ -387,18 +387,21 @@ impl Session { // ------------------------------------------------------------------ /// Check thresholds and compact/prune if needed. + /// + /// Thresholds are derived from the cost mode, not static config fields. async fn maybe_compact_or_rollover(&self) -> Result<(), SessionError> { + let cost_mode = crate::cost::CostMode::parse(&self.config.cost_mode).unwrap_or_default(); + let max_bytes = cost_mode.session_max_bytes(); + let max_turns = cost_mode.max_uncompacted_turns(); + let (byte_count, turn_count) = { let bc = self.byte_count.read().await; let turns = self.turns.read().await; (*bc, turns.len()) }; - let needs_compaction = byte_count > self.config.session_max_bytes - || turn_count > self.config.max_uncompacted_turns; - - if needs_compaction { - self.compact_oldest_turns().await?; + if byte_count > max_bytes || turn_count > max_turns { + self.compact_oldest_turns(max_turns).await?; } Ok(()) @@ -406,14 +409,12 @@ impl Session { /// Compact the oldest turns by merging them into a summary entry. /// - /// The caller (the LLM summarizer) produces a compact summary string. - /// This method handles the bookkeeping: removing old turn entries and - /// appending a compaction marker. - pub async fn compact_oldest_turns(&self) -> Result<(), SessionError> { + /// `keep` is the number of most-recent turns to retain. All older turns + /// are replaced by a compaction marker. The caller derives `keep` from + /// the current cost mode (`CostMode::max_uncompacted_turns()`). + pub async fn compact_oldest_turns(&self, keep: usize) -> Result<(), SessionError> { let mut turns = self.turns.write().await; - // Keep the most recent `max_uncompacted_turns` turns, compact the rest. - let keep = self.config.max_uncompacted_turns; if turns.len() <= keep { return Ok(()); } @@ -488,9 +489,9 @@ impl Session { // ------------------------------------------------------------------ /// Build a PromptAssembly from the current session state. - /// Wraps the existing `build_prompt_messages()` — no behavior change. + /// Applies cost-mode-aware trimming to stay within budget. pub async fn build_prompt_assembly(&self) -> PromptAssembly { - let messages = self.build_prompt_messages(None).await; + let mut messages = self.build_prompt_messages(None).await; let total_bytes: u64 = messages .iter() .map(|m| serde_json::to_string(m).unwrap_or_default().len() as u64) @@ -506,13 +507,18 @@ impl Session { let appendable_log = messages[1..].to_vec(); - PromptAssembly { + let mut assembly = PromptAssembly { immutable_prefix, appendable_log, volatile_scratch: Vec::new(), total_bytes, estimated_tokens, - } + }; + + let cost_mode = crate::cost::CostMode::parse(&self.config.cost_mode).unwrap_or_default(); + assembly.trim_to_budget(cost_mode); + let _ = &mut messages; // suppress unused mut warning + assembly } /// Build the 3-region prompt: @@ -523,6 +529,7 @@ impl Session { &self, mut headroom_sidecar: Option<&mut HeadroomSidecar>, ) -> Vec { + let cost_mode = crate::cost::CostMode::parse(&self.config.cost_mode).unwrap_or_default(); let mut messages: Vec = Vec::new(); // Region 1: Immutable system prefix @@ -560,11 +567,22 @@ impl Session { } SessionEntry::ToolResult { name, result } => { let raw = result.to_string(); - let content = match headroom_sidecar { - Some(ref mut sidecar) => { - sidecar.compress(&raw, name).await.unwrap_or(raw) + let content = if cost_mode.compact_tool_results() { + let from_sidecar = match headroom_sidecar { + Some(ref mut sidecar) => sidecar.compress(&raw, name).await, + None => None, + }; + match from_sidecar { + Some(compressed) => compressed, + None => crate::cost::compact_tool_result( + &raw, + cost_mode.tool_result_max_bytes(), + name, + ) + .unwrap_or(raw), } - None => raw, + } else { + raw }; messages.push(serde_json::json!({ "role": "tool", diff --git a/crates/colibri-daemon/src/socket.rs b/crates/colibri-daemon/src/socket.rs index 5e341df..150251b 100644 --- a/crates/colibri-daemon/src/socket.rs +++ b/crates/colibri-daemon/src/socket.rs @@ -260,7 +260,8 @@ async fn cmd_status(state: &SharedState) -> ColibriResponse { Err(_) => serde_json::json!({"error": "store unavailable"}), }; - let cost_mode = crate::cost::CostMode::parse(&state.config.cost_mode).unwrap_or_default(); + let cost_mode_str = state.cost_mode.read().await.clone(); + let cost_mode = crate::cost::CostMode::parse(&cost_mode_str).unwrap_or_default(); let last_warm_at = state.last_warm_at.read().await.map(|t| t.to_rfc3339()); let last_warm_hit = *state.last_warm_cache_hit.read().await; let last_warm_tokens = *state.last_warm_hit_tokens.read().await; @@ -378,8 +379,10 @@ async fn cmd_spawn_agent( if let Some(ref sid) = session_id { if let Some(session) = state.sessions.get(sid) { let assembly = session.value().build_prompt_assembly().await; - let cost_mode = - crate::cost::CostMode::parse(&state.config.cost_mode).unwrap_or_default(); + let cost_mode = { + let cm_str = state.cost_mode.read().await; + crate::cost::CostMode::parse(&cm_str).unwrap_or_default() + }; let _ = assembly; // trimmed context available for injection let context_json = serde_json::to_string(&assembly.to_messages()).unwrap_or_default(); @@ -489,7 +492,7 @@ async fn cmd_get_session(state: &SharedState, session_id: String) -> ColibriResp } } -async fn stream_agent_stdout_to_glasspane( +pub async fn stream_agent_stdout_to_glasspane( state: SharedState, agent_id: String, stdout: ChildStdout, @@ -531,14 +534,19 @@ async fn stream_agent_stdout_to_glasspane( async fn cmd_compact_session(state: &SharedState, session_id: String) -> ColibriResponse { match state.sessions.get(&session_id) { - Some(session) => match session.value().compact_oldest_turns().await { - Ok(()) => ColibriResponse::ok(serde_json::json!({ - "session_id": session_id, - "turn_count": session.value().turn_count().await, - "status": "compacted", - })), - Err(e) => ColibriResponse::err(format!("compaction failed: {e}")), - }, + Some(session) => { + let cm_str = state.cost_mode.read().await.clone(); + let cost_mode = crate::cost::CostMode::parse(&cm_str).unwrap_or_default(); + let keep = cost_mode.max_uncompacted_turns(); + match session.value().compact_oldest_turns(keep).await { + Ok(()) => ColibriResponse::ok(serde_json::json!({ + "session_id": session_id, + "turn_count": session.value().turn_count().await, + "status": "compacted", + })), + Err(e) => ColibriResponse::err(format!("compaction failed: {e}")), + } + } None => ColibriResponse::err(format!("session not found: {session_id}")), } } @@ -657,20 +665,18 @@ async fn cmd_intake_task( async fn cmd_set_cost_mode(state: &SharedState, mode: String) -> ColibriResponse { match crate::cost::CostMode::parse(&mode) { Some(cost_mode) => { + let prev = state.cost_mode.read().await.clone(); + *state.cost_mode.write().await = mode.clone(); info!( - from = state.config.cost_mode, - to = cost_mode.as_str(), + from = prev, + to = %mode, "cost mode changed via socket" ); - // Note: DaemonConfig is not mutable via &SharedState. - // In a future iteration, this would update the config in-place. - // For now, we acknowledge and log the request. - // T1.4 scope: runtime-only. Persistence (COLIBRI_COST_MODE in - // config/socket-update) is post-Phase-5. ColibriResponse::ok(serde_json::json!({ - "previous": state.config.cost_mode, - "requested": cost_mode.as_str(), - "status": "acknowledged", + "previous": prev, + "current": cost_mode.as_str(), + "session_max_bytes": cost_mode.session_max_bytes(), + "max_uncompacted_turns": cost_mode.max_uncompacted_turns(), })) } None => ColibriResponse::err(format!( @@ -697,7 +703,6 @@ mod tests { socket_path: data_dir.join("colibri.sock"), data_dir: data_dir.clone(), db_path: data_dir.join("colibri.sqlite"), - session_max_bytes: 2_000_000, deepseek_api_key: None, deepseek_endpoint: "https://api.deepseek.com/chat/completions".to_string(), deepseek_model: "deepseek-chat".to_string(), @@ -707,7 +712,6 @@ mod tests { anthropic_endpoint: "https://api.anthropic.com/v1/messages".to_string(), host: "test-host".to_string(), max_context_tokens: 128_000, - max_uncompacted_turns: 20, cost_mode: "smart".to_string(), scheduler_prompt_injection: true, cache_warming_enabled: false,