From 0509ed76bc1a723f1edad58deec989415e9f393e Mon Sep 17 00:00:00 2001 From: Sam & Claude Date: Thu, 25 Jun 2026 20:50:57 +0200 Subject: [PATCH] feat(glasspane): terminal capture, signature triage, and edge-triggered alerts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the screen-scraping half of Glasspane to complement its event-state model. Ports the *brain* of the clawdie-ai tmux-screenshot skill (not the PNG) into the Rust core and wires it into the daemon. colibri-glasspane: - terminal.rs: strip_ansi, content-hash frame ids (sha256[:12]), CapturedFrame, and TerminalRecorder — a deduped ring buffer that drops frames identical to the previous one (so polling a static pane collapses to a log of real transitions) with edge-triggered alerting (a failure fires once on its rising edge, re-fires only after it clears). Thin capture_tmux_pane seam keeps I/O out of the testable core. - signatures.rs: data-driven Severity/Signature/Detection/SignatureSet matcher with a high-value linux_default() set (systemd/oom/disk/docker/ forwarding). Per-OS set is the hook for capability-routing. colibri-daemon: - DaemonState.terminal map of per-pane recorders; poll tick in run_loop gated on COLIBRI_TERMINAL_CAPTURE, seeded from COLIBRI_TERMINAL_WATCH. - capture_and_record() shares the blocking tmux capture (on spawn_blocking) + brief lock fold between the loop and the socket; env-gated Telegram alert routing that no-ops cleanly when unconfigured. - socket cmds: terminal-watch/unwatch/list/history/poll. - env_bool helper: forgiving truthy parsing (1/true/yes/on) so COLIBRI_TERMINAL_CAPTURE=1 is not silently false like bool::from_str. Tests: 17 new glasspane unit tests + daemon socket/config tests; whole workspace green, clippy clean. Verified live on Linux (domedog): autonomous loop deduped ~5 ticks into 2 frames and fired one edge-triggered alert. Co-Authored-By: Claude Opus 4.8 --- Cargo.lock | 2 + .../colibri-client/tests/live_socket_check.rs | 5 + crates/colibri-daemon/src/config.rs | 61 +++ crates/colibri-daemon/src/daemon.rs | 139 ++++++- crates/colibri-daemon/src/lib.rs | 25 ++ crates/colibri-daemon/src/main.rs | 7 +- crates/colibri-daemon/src/scheduler.rs | 5 + crates/colibri-daemon/src/socket.rs | 165 ++++++++ crates/colibri-glasspane/Cargo.toml | 3 + crates/colibri-glasspane/src/lib.rs | 10 +- crates/colibri-glasspane/src/signatures.rs | 361 +++++++++++++++++ crates/colibri-glasspane/src/terminal.rs | 362 ++++++++++++++++++ 12 files changed, 1141 insertions(+), 4 deletions(-) create mode 100644 crates/colibri-glasspane/src/signatures.rs create mode 100644 crates/colibri-glasspane/src/terminal.rs diff --git a/Cargo.lock b/Cargo.lock index c8072ab..53a5972 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -361,8 +361,10 @@ version = "0.12.0" dependencies = [ "chrono", "portable-pty", + "regex", "serde", "serde_json", + "sha2", ] [[package]] diff --git a/crates/colibri-client/tests/live_socket_check.rs b/crates/colibri-client/tests/live_socket_check.rs index a1aec38..c22041a 100644 --- a/crates/colibri-client/tests/live_socket_check.rs +++ b/crates/colibri-client/tests/live_socket_check.rs @@ -31,6 +31,11 @@ fn check_config() -> DaemonConfig { cache_warming_interval_hours: 0, headroom_enabled: false, headroom_socket_path: std::path::PathBuf::from("/var/run/colibri/headroom.sock"), + terminal_capture_enabled: false, + terminal_capture_interval_secs: 5, + terminal_watch_targets: Vec::new(), + telegram_bot_token: None, + telegram_chat_id: None, } } diff --git a/crates/colibri-daemon/src/config.rs b/crates/colibri-daemon/src/config.rs index c281d52..5e51b36 100644 --- a/crates/colibri-daemon/src/config.rs +++ b/crates/colibri-daemon/src/config.rs @@ -53,6 +53,17 @@ pub struct DaemonConfig { pub headroom_enabled: bool, /// Path to the headroom sidecar Unix socket. pub headroom_socket_path: PathBuf, + /// Enable the terminal-capture poll loop (tmux pane history + signature + /// alerts). Disabled by default — only ticks when panes are watched. + pub terminal_capture_enabled: bool, + /// How often (seconds) the poll loop captures each watched pane. + pub terminal_capture_interval_secs: u64, + /// tmux targets to watch from startup (CSV: `session:window`, `%pane`, …). + pub terminal_watch_targets: Vec, + /// Telegram bot token for routing terminal alerts (optional). + pub telegram_bot_token: Option, + /// Telegram chat id that terminal alerts are sent to (optional). + pub telegram_chat_id: Option, } impl DaemonConfig { @@ -103,6 +114,13 @@ impl DaemonConfig { headroom_socket_path: std::env::var("COLIBRI_HEADROOM_SOCKET") .map(PathBuf::from) .unwrap_or_else(|_| PathBuf::from("/var/run/colibri/headroom.sock")), + terminal_capture_enabled: env_bool("COLIBRI_TERMINAL_CAPTURE"), + terminal_capture_interval_secs: env_parse("COLIBRI_TERMINAL_CAPTURE_INTERVAL_SECS") + .filter(|&n| n > 0) + .unwrap_or(5), + terminal_watch_targets: env_csv("COLIBRI_TERMINAL_WATCH"), + telegram_bot_token: nonempty_env("TELEGRAM_BOT_TOKEN"), + telegram_chat_id: nonempty_env("TELEGRAM_CHAT_ID"), } } } @@ -119,6 +137,29 @@ fn env_parse(name: &str) -> Option { std::env::var(name).ok().and_then(|v| v.parse().ok()) } +/// Parse a boolean env var, accepting the common truthy spellings +/// (`1`/`true`/`yes`/`on`, case-insensitive). Anything else — including unset — +/// is false. More forgiving than `bool::from_str`, which only takes +/// `true`/`false` and would silently treat `=1` as false. +fn env_bool(name: &str) -> bool { + std::env::var(name) + .map(|v| matches!(v.trim().to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on")) + .unwrap_or(false) +} + +/// Parse a comma-separated env var into trimmed, non-empty entries. +fn env_csv(name: &str) -> Vec { + std::env::var(name) + .ok() + .map(|v| { + v.split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect() + }) + .unwrap_or_default() +} + fn dirs_data() -> Option { std::env::var("XDG_DATA_HOME") .ok() @@ -190,6 +231,26 @@ mod tests { assert!(config.anthropic_api_key.is_none()); } + #[test] + fn env_bool_accepts_common_truthy_spellings() { + let _guard = ENV_LOCK.lock().unwrap(); + for (val, expected) in [ + ("1", true), + ("true", true), + ("TRUE", true), + ("yes", true), + ("on", true), + ("0", false), + ("false", false), + ("nope", false), + ] { + std::env::set_var("COLIBRI_TEST_BOOL", val); + assert_eq!(env_bool("COLIBRI_TEST_BOOL"), expected, "value {val:?}"); + } + std::env::remove_var("COLIBRI_TEST_BOOL"); + assert!(!env_bool("COLIBRI_TEST_BOOL"), "unset is false"); + } + #[test] fn test_config_from_env_vars() { let _guard = ENV_LOCK.lock().unwrap(); diff --git a/crates/colibri-daemon/src/daemon.rs b/crates/colibri-daemon/src/daemon.rs index b5affba..2b51622 100644 --- a/crates/colibri-daemon/src/daemon.rs +++ b/crates/colibri-daemon/src/daemon.rs @@ -1,10 +1,11 @@ //! Main daemon loop — heartbeat, task polling, session rotation, memory handoff, cache warming. +use std::collections::BTreeMap; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use chrono::Utc; -use colibri_glasspane::PaneSupervisor; +use colibri_glasspane::{Observation, PaneSupervisor, SignatureMatch, TerminalRecorder}; use colibri_store::Store; use dashmap::DashMap; use tokio::sync::{broadcast, Mutex, RwLock}; @@ -20,6 +21,10 @@ pub struct DaemonState { pub sessions: DashMap, pub agents: DashMap, pub glasspane: RwLock, + /// Per-pane terminal-text recorders (tmux target → deduped history + + /// signature triage). Populated from config at startup and via the + /// `terminal-watch` socket command. + pub terminal: RwLock>, pub store: std::sync::Mutex, pub scheduler: Mutex, pub last_warm_at: RwLock>>, @@ -48,6 +53,7 @@ impl DaemonState { sessions: DashMap::new(), agents: DashMap::new(), glasspane: RwLock::new(PaneSupervisor::new()), + terminal: RwLock::new(BTreeMap::new()), store: std::sync::Mutex::new(store), scheduler: Mutex::new(crate::scheduler::Scheduler::new()), last_warm_at: RwLock::new(None), @@ -70,6 +76,7 @@ pub struct DaemonLoopConfig { pub memory_handoff_interval: Duration, pub agent_stall_timeout: Duration, pub scheduler_interval: Duration, + pub terminal_capture_interval: Duration, } impl Default for DaemonLoopConfig { @@ -80,6 +87,7 @@ impl Default for DaemonLoopConfig { memory_handoff_interval: Duration::from_secs(120), agent_stall_timeout: Duration::from_secs(300), scheduler_interval: Duration::from_secs(30), + terminal_capture_interval: Duration::from_secs(5), } } } @@ -96,11 +104,16 @@ pub async fn run_loop( let mut rotation_tick = tokio::time::interval(loop_config.session_rotation_interval); let mut handoff_tick = tokio::time::interval(loop_config.memory_handoff_interval); let mut scheduler_tick = tokio::time::interval(loop_config.scheduler_interval); + let mut terminal_tick = tokio::time::interval(loop_config.terminal_capture_interval); heartbeat_tick.tick().await; rotation_tick.tick().await; handoff_tick.tick().await; scheduler_tick.tick().await; + terminal_tick.tick().await; + + // Seed terminal-capture watch list from config. + seed_terminal_watches(&state).await; info!("daemon background loop started"); @@ -130,6 +143,9 @@ pub async fn run_loop( _ = rotation_tick.tick() => session_rotation(&state).await, _ = handoff_tick.tick() => memory_handoff(&state).await, _ = scheduler_tick.tick() => scheduler_tick_fn(&state).await, + _ = terminal_tick.tick(), if state.config.terminal_capture_enabled => { + terminal_capture_tick(&state).await; + } _ = shutdown_rx.recv() => { info!("daemon loop received shutdown signal"); break; @@ -479,6 +495,125 @@ async fn scheduler_tick_fn(state: &SharedState) { poll_tasks(state).await; } +// --------------------------------------------------------------------------- +// Terminal capture — tmux pane history + signature alerts +// --------------------------------------------------------------------------- + +/// Register every `terminal_watch_targets` entry as a Linux-signature recorder +/// (idempotent — existing recorders are left untouched). +pub(crate) async fn seed_terminal_watches(state: &SharedState) { + if state.config.terminal_watch_targets.is_empty() { + return; + } + let mut map = state.terminal.write().await; + for target in &state.config.terminal_watch_targets { + map.entry(target.clone()) + .or_insert_with(|| TerminalRecorder::linux(target.clone())); + } + info!( + watched = map.len(), + enabled = state.config.terminal_capture_enabled, + "terminal capture watch list seeded" + ); +} + +/// Capture and record every watched pane once. Per-pane capture errors (e.g. a +/// pane that has gone away) are logged at debug and skipped — one bad target +/// must not stall the others. +async fn terminal_capture_tick(state: &SharedState) { + let targets: Vec = state.terminal.read().await.keys().cloned().collect(); + for target in targets { + if let Err(e) = capture_and_record(state, &target).await { + debug!(target = %target, error = %e, "terminal capture skipped"); + } + } +} + +/// Capture one watched pane, fold it into its recorder, and route any +/// edge-triggered alerts. Returns the [`Observation`] so callers (the poll loop +/// and the `terminal-poll` socket command) can report what happened. +/// +/// The blocking `tmux capture-pane` runs on a blocking thread so it never holds +/// the async runtime; the recorder lock is taken only briefly to fold the text. +pub(crate) async fn capture_and_record( + state: &SharedState, + target: &str, +) -> Result { + let owned = target.to_string(); + let raw = tokio::task::spawn_blocking(move || colibri_glasspane::capture_tmux_pane(&owned)) + .await + .map_err(|e| format!("capture task failed: {e}"))? + .map_err(|e| e.to_string())?; + + let observation = { + let mut map = state.terminal.write().await; + let recorder = map + .get_mut(target) + .ok_or_else(|| format!("pane {target:?} is not being watched"))?; + recorder.observe(&raw, SystemTime::now()) + }; + + if let Observation::Recorded { uuid, new_alerts } = &observation { + if new_alerts.is_empty() { + debug!(target = %target, uuid = %uuid, "terminal frame recorded"); + } else { + let message = format_terminal_alerts(state, target, new_alerts); + warn!(target = %target, uuid = %uuid, alerts = new_alerts.len(), "terminal alert fired"); + notify_telegram(state, &message).await; + } + } + + Ok(observation) +} + +/// Build a human-readable, Telegram-friendly alert message from edge-triggered +/// signature matches. +fn format_terminal_alerts(state: &SharedState, target: &str, alerts: &[SignatureMatch]) -> String { + let mut lines = vec![format!( + "⚠ colibri@{}: {} new alert(s) on tmux {}", + state.config.host, + alerts.len(), + target + )]; + for a in alerts { + let severity = format!("{:?}", a.severity).to_uppercase(); + let invoke = a + .invoke + .as_deref() + .map(|s| format!(" [fix: run `{s}`]")) + .unwrap_or_default(); + lines.push(format!("• {severity} {} — {}{invoke}", a.id, a.next_action)); + } + lines.join("\n") +} + +/// Send an alert to Telegram if a bot token + chat id are configured. +/// No-op (debug log only) when unconfigured, so the feature degrades cleanly. +async fn notify_telegram(state: &SharedState, text: &str) { + let (Some(token), Some(chat_id)) = ( + state.config.telegram_bot_token.as_deref(), + state.config.telegram_chat_id.as_deref(), + ) else { + debug!("telegram alert not sent: bot token / chat id not configured"); + return; + }; + + let client = reqwest::Client::new(); + let url = format!("https://api.telegram.org/bot{token}/sendMessage"); + let send = client + .post(&url) + .json(&serde_json::json!({ "chat_id": chat_id, "text": text })) + .timeout(Duration::from_secs(10)) + .send() + .await; + + match send { + Ok(resp) if resp.status().is_success() => debug!("telegram alert sent"), + Ok(resp) => warn!(status = %resp.status(), "telegram alert rejected"), + Err(e) => warn!(error = %e, "telegram alert send failed"), + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/colibri-daemon/src/lib.rs b/crates/colibri-daemon/src/lib.rs index 7d51486..e8bce0e 100644 --- a/crates/colibri-daemon/src/lib.rs +++ b/crates/colibri-daemon/src/lib.rs @@ -105,6 +105,31 @@ pub enum ColibriCommand { }, #[serde(rename = "list-tenants")] ListTenants, + // ── Terminal capture ────────────────────────────────────── + /// Start recording a tmux pane's terminal text (deduped history + + /// signature triage). `target` is any tmux target spec. + #[serde(rename = "terminal-watch")] + TerminalWatch { target: String }, + /// Stop recording a pane and drop its history. + #[serde(rename = "terminal-unwatch")] + TerminalUnwatch { target: String }, + /// List watched panes with frame counts and active alerts. + #[serde(rename = "terminal-list")] + TerminalList, + /// Return recorded frames for a pane (most recent `limit`, default 20). + #[serde(rename = "terminal-history")] + TerminalHistory { + target: String, + #[serde(default)] + limit: Option, + }, + /// Capture watched panes immediately (one `target`, or all when omitted) + /// instead of waiting for the poll tick. + #[serde(rename = "terminal-poll")] + TerminalPoll { + #[serde(default)] + target: Option, + }, } /// Outbound control-plane response. diff --git a/crates/colibri-daemon/src/main.rs b/crates/colibri-daemon/src/main.rs index 194d0f1..3eb8da2 100644 --- a/crates/colibri-daemon/src/main.rs +++ b/crates/colibri-daemon/src/main.rs @@ -82,7 +82,12 @@ async fn main() -> Result<(), Box> { // Start the daemon background loop (heartbeat, session rotation, scheduler) let loop_state = state.clone(); let loop_shutdown = state.shutdown_rx.resubscribe(); - let loop_config = daemon::DaemonLoopConfig::default(); + let loop_config = daemon::DaemonLoopConfig { + terminal_capture_interval: std::time::Duration::from_secs( + config.terminal_capture_interval_secs, + ), + ..daemon::DaemonLoopConfig::default() + }; let loop_handle = tokio::spawn(async move { daemon::run_loop(loop_state, loop_config, loop_shutdown).await; }); diff --git a/crates/colibri-daemon/src/scheduler.rs b/crates/colibri-daemon/src/scheduler.rs index f98cbd8..7b39d9a 100644 --- a/crates/colibri-daemon/src/scheduler.rs +++ b/crates/colibri-daemon/src/scheduler.rs @@ -363,6 +363,11 @@ mod tests { cache_warming_interval_hours: 0, headroom_enabled: false, headroom_socket_path: std::path::PathBuf::from("/var/run/colibri/headroom.sock"), + terminal_capture_enabled: false, + terminal_capture_interval_secs: 5, + terminal_watch_targets: Vec::new(), + telegram_bot_token: None, + telegram_chat_id: None, } } diff --git a/crates/colibri-daemon/src/socket.rs b/crates/colibri-daemon/src/socket.rs index c6526da..9d6b9b9 100644 --- a/crates/colibri-daemon/src/socket.rs +++ b/crates/colibri-daemon/src/socket.rs @@ -285,6 +285,13 @@ async fn dispatch(cmd: ColibriCommand, state: &SharedState) -> ColibriResponse { collection_id, } => cmd_register_tenant(state, tenant_id, jail_root_path, collection_id).await, ColibriCommand::ListTenants => cmd_list_tenants(state).await, + ColibriCommand::TerminalWatch { target } => cmd_terminal_watch(state, target).await, + ColibriCommand::TerminalUnwatch { target } => cmd_terminal_unwatch(state, target).await, + ColibriCommand::TerminalList => cmd_terminal_list(state).await, + ColibriCommand::TerminalHistory { target, limit } => { + cmd_terminal_history(state, target, limit).await + } + ColibriCommand::TerminalPoll { target } => cmd_terminal_poll(state, target).await, } } @@ -383,6 +390,115 @@ async fn cmd_glasspane_snapshot(state: &SharedState) -> ColibriResponse { } } +// --------------------------------------------------------------------------- +// Terminal capture commands +// --------------------------------------------------------------------------- + +async fn cmd_terminal_watch(state: &SharedState, target: String) -> ColibriResponse { + if target.trim().is_empty() { + return ColibriResponse::err("target must not be empty"); + } + let mut map = state.terminal.write().await; + let already = map.contains_key(&target); + map.entry(target.clone()) + .or_insert_with(|| colibri_glasspane::TerminalRecorder::linux(target.clone())); + info!(target = %target, already, "terminal-watch"); + ColibriResponse::ok(serde_json::json!({ + "target": target, + "watching": true, + "already_watched": already, + "capture_enabled": state.config.terminal_capture_enabled, + "watched_count": map.len(), + })) +} + +async fn cmd_terminal_unwatch(state: &SharedState, target: String) -> ColibriResponse { + let removed = state.terminal.write().await.remove(&target).is_some(); + ColibriResponse::ok(serde_json::json!({ + "target": target, + "removed": removed, + })) +} + +async fn cmd_terminal_list(state: &SharedState) -> ColibriResponse { + let map = state.terminal.read().await; + let panes: Vec<_> = map + .values() + .map(|rec| { + let latest = rec.latest(); + serde_json::json!({ + "target": rec.pane_id(), + "frames": rec.len(), + "active_alerts": rec.active_alerts().collect::>(), + "latest_uuid": latest.map(|f| f.uuid.clone()), + "latest_at": latest.map(|f| f.observed_at.clone()), + "latest_failures": latest.map(|f| f.detection.failures.len()).unwrap_or(0), + }) + }) + .collect(); + ColibriResponse::ok(serde_json::json!({ + "capture_enabled": state.config.terminal_capture_enabled, + "interval_secs": state.config.terminal_capture_interval_secs, + "panes": panes, + })) +} + +async fn cmd_terminal_history( + state: &SharedState, + target: String, + limit: Option, +) -> ColibriResponse { + let map = state.terminal.read().await; + let Some(rec) = map.get(&target) else { + return ColibriResponse::err(format!("pane {target:?} is not being watched")); + }; + let frames: Vec<_> = rec.frames().collect(); + let n = limit.unwrap_or(20).min(frames.len()); + let tail = &frames[frames.len() - n..]; + match serde_json::to_value(tail) { + Ok(value) => ColibriResponse::ok(serde_json::json!({ + "target": target, + "total_frames": rec.len(), + "returned": n, + "frames": value, + })), + Err(e) => ColibriResponse::err(format!("history serialization failed: {e}")), + } +} + +async fn cmd_terminal_poll(state: &SharedState, target: Option) -> ColibriResponse { + let targets: Vec = match target { + Some(t) => vec![t], + None => state.terminal.read().await.keys().cloned().collect(), + }; + if targets.is_empty() { + return ColibriResponse::err("no panes are being watched"); + } + + let mut results = Vec::new(); + for target in targets { + let entry = match crate::daemon::capture_and_record(state, &target).await { + Ok(colibri_glasspane::Observation::Unchanged) => serde_json::json!({ + "target": target, + "status": "unchanged", + }), + Ok(colibri_glasspane::Observation::Recorded { uuid, new_alerts }) => serde_json::json!({ + "target": target, + "status": "recorded", + "uuid": uuid, + "new_alerts": new_alerts, + }), + Err(e) => serde_json::json!({ + "target": target, + "status": "error", + "error": e, + }), + }; + results.push(entry); + } + ColibriResponse::ok(serde_json::json!({ "results": results })) +} + async fn cmd_list_sessions(state: &SharedState) -> ColibriResponse { let mut sessions = Vec::new(); for entry in state.sessions.iter() { @@ -1031,6 +1147,11 @@ mod tests { cache_warming_interval_hours: 0, headroom_enabled: false, headroom_socket_path: std::path::PathBuf::from("/var/run/colibri/headroom.sock"), + terminal_capture_enabled: false, + terminal_capture_interval_secs: 5, + terminal_watch_targets: Vec::new(), + telegram_bot_token: None, + telegram_chat_id: None, } } @@ -1142,6 +1263,50 @@ mod tests { assert_eq!(pane.session_id.as_deref(), Some("pi-sample")); } + #[tokio::test] + async fn terminal_watch_list_unwatch_roundtrip() { + let state: SharedState = Arc::new(DaemonState::new(test_config())); + + let watched = cmd_terminal_watch(&state, "smoketest:0".to_string()).await; + assert!(watched.ok); + assert_eq!(watched.data.unwrap()["already_watched"], false); + + // Idempotent: watching again reports already_watched. + let again = cmd_terminal_watch(&state, "smoketest:0".to_string()).await; + assert_eq!(again.data.unwrap()["already_watched"], true); + + let list = cmd_terminal_list(&state).await; + let data = list.data.unwrap(); + assert_eq!(data["panes"].as_array().unwrap().len(), 1); + assert_eq!(data["panes"][0]["target"], "smoketest:0"); + assert_eq!(data["panes"][0]["frames"], 0); + + let removed = cmd_terminal_unwatch(&state, "smoketest:0".to_string()).await; + assert_eq!(removed.data.unwrap()["removed"], true); + let empty = cmd_terminal_list(&state).await; + assert_eq!(empty.data.unwrap()["panes"].as_array().unwrap().len(), 0); + } + + #[tokio::test] + async fn terminal_watch_rejects_empty_target() { + let state: SharedState = Arc::new(DaemonState::new(test_config())); + assert!(!cmd_terminal_watch(&state, " ".to_string()).await.ok); + } + + #[tokio::test] + async fn terminal_history_errors_for_unwatched_pane() { + let state: SharedState = Arc::new(DaemonState::new(test_config())); + let resp = cmd_terminal_history(&state, "ghost".to_string(), None).await; + assert!(!resp.ok); + assert!(resp.error.unwrap().contains("not being watched")); + } + + #[tokio::test] + async fn terminal_poll_errors_when_nothing_watched() { + let state: SharedState = Arc::new(DaemonState::new(test_config())); + assert!(!cmd_terminal_poll(&state, None).await.ok); + } + #[tokio::test] async fn clear_stale_socket_returns_true_when_absent() { let dir = std::env::temp_dir().join(format!( diff --git a/crates/colibri-glasspane/Cargo.toml b/crates/colibri-glasspane/Cargo.toml index 43d1dda..26b26fc 100644 --- a/crates/colibri-glasspane/Cargo.toml +++ b/crates/colibri-glasspane/Cargo.toml @@ -10,3 +10,6 @@ chrono = { version = "0.4", default-features = false, features = ["clock"] } portable-pty = "0.9" serde = { version = "1", features = ["derive"] } serde_json = "1" +# Terminal-capture layer: content-hash frame ids + ANSI stripping. +sha2 = "0.10" +regex = "1" diff --git a/crates/colibri-glasspane/src/lib.rs b/crates/colibri-glasspane/src/lib.rs index ccf2ecc..0777b6e 100644 --- a/crates/colibri-glasspane/src/lib.rs +++ b/crates/colibri-glasspane/src/lib.rs @@ -12,6 +12,14 @@ //! is scaffolded with `portable-pty`; tests use sample JSONL readers until the //! FreeBSD validation lane exercises real terminals. +pub mod signatures; +pub mod terminal; + +pub use signatures::{Detection, Severity, Signature, SignatureMatch, SignatureSet}; +pub use terminal::{ + capture_tmux_pane, frame_uuid, strip_ansi, CapturedFrame, Observation, TerminalRecorder, +}; + use std::{ collections::BTreeMap, io::{self, BufRead, BufReader, Read, Write}, @@ -619,7 +627,7 @@ fn skip_false(value: &bool) -> bool { !*value } -fn system_time_to_rfc3339(time: SystemTime) -> String { +pub(crate) fn system_time_to_rfc3339(time: SystemTime) -> String { let dt: DateTime = time.into(); dt.to_rfc3339_opts(SecondsFormat::Millis, true) } diff --git a/crates/colibri-glasspane/src/signatures.rs b/crates/colibri-glasspane/src/signatures.rs new file mode 100644 index 0000000..8dd9184 --- /dev/null +++ b/crates/colibri-glasspane/src/signatures.rs @@ -0,0 +1,361 @@ +//! Signature-based triage of captured terminal text. +//! +//! A [`SignatureSet`] scans stripped terminal text for known patterns and +//! classifies the screen into `failures` / `warnings` / `info` / `healthy`. +//! Each match carries a human `next_action` and an optional `invoke` (a skill +//! the agent can run to remediate) — so a hit is not just "something happened" +//! but "here is what it means and what to do". +//! +//! The detection engine is data-driven: the FreeBSD host and a Linux host load +//! different [`Signature`] sets but share this matcher. This is the per-OS knob +//! that the rest of Colibri's capability-routing can lean on. [`SignatureSet::linux_default`] +//! ships a small, high-value starter set; callers can build their own with +//! [`SignatureSet::new`]. + +use serde::{Deserialize, Serialize}; + +/// How serious a matched signature is. Mirrors the Python skill's taxonomy +/// (`error` / `warn` / `info` for failure patterns, `ok` for success patterns). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum Severity { + /// Known-broken state needing action. + Error, + /// Suspicious but not fatal; worth surfacing. + Warn, + /// Benign/expected note. + Info, + /// Known-healthy state — confirms things work. + Ok, +} + +/// One known terminal-text pattern and what it means. +/// +/// `patterns` are matched as **case-insensitive substrings**; the first one +/// that hits records the signature (further patterns are not reported twice). +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Signature { + pub id: String, + pub severity: Severity, + /// Which skill/domain owns this knowledge (provenance for the operator). + pub source: String, + pub patterns: Vec, + /// Plain-language remediation hint. + pub next_action: String, + /// Optional skill to auto-invoke to fix the condition. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub invoke: Option, +} + +impl Signature { + /// Convenience builder for a failure/info signature without an `invoke`. + pub fn new( + id: impl Into, + severity: Severity, + source: impl Into, + patterns: impl IntoIterator>, + next_action: impl Into, + ) -> Self { + Self { + id: id.into(), + severity, + source: source.into(), + patterns: patterns.into_iter().map(Into::into).collect(), + next_action: next_action.into(), + invoke: None, + } + } + + /// Attach a remediation skill to auto-invoke. + pub fn with_invoke(mut self, invoke: impl Into) -> Self { + self.invoke = Some(invoke.into()); + self + } +} + +/// A single signature hit against captured text. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct SignatureMatch { + pub id: String, + pub severity: Severity, + pub source: String, + /// The exact pattern string that matched. + pub matched_text: String, + pub next_action: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub invoke: Option, +} + +/// The classified result of scanning one screen of text. +/// +/// Buckets are filled in declaration order of the signatures; `failures` are +/// the ones the agent should act on before acknowledging `healthy` state. +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct Detection { + pub failures: Vec, + pub warnings: Vec, + pub info: Vec, + pub healthy: Vec, +} + +impl Detection { + /// True when nothing matched at all. + pub fn is_empty(&self) -> bool { + self.failures.is_empty() + && self.warnings.is_empty() + && self.info.is_empty() + && self.healthy.is_empty() + } + + /// True when at least one `error`-severity signature fired. + pub fn has_failures(&self) -> bool { + !self.failures.is_empty() + } + + /// All `error`/`warn` matches — the ones that should drive an alert. + /// (Edge-triggered de-duplication is the recorder's job, not this method's.) + pub fn alertable(&self) -> impl Iterator { + self.failures.iter().chain(self.warnings.iter()) + } + + /// Flat list of every match across all buckets (failures → warnings → + /// info → healthy), for compact JSON metadata. + pub fn all(&self) -> Vec { + let mut out = Vec::new(); + out.extend(self.failures.iter().cloned()); + out.extend(self.warnings.iter().cloned()); + out.extend(self.info.iter().cloned()); + out.extend(self.healthy.iter().cloned()); + out + } +} + +/// An ordered collection of signatures and the matcher over them. +#[derive(Debug, Clone, Default)] +pub struct SignatureSet { + signatures: Vec, +} + +impl SignatureSet { + /// Build a set from an explicit list of signatures. + pub fn new(signatures: impl IntoIterator) -> Self { + Self { + signatures: signatures.into_iter().collect(), + } + } + + /// An empty set — useful for capture-only history with no triage. + pub fn empty() -> Self { + Self::default() + } + + pub fn len(&self) -> usize { + self.signatures.len() + } + + pub fn is_empty(&self) -> bool { + self.signatures.is_empty() + } + + pub fn signatures(&self) -> &[Signature] { + &self.signatures + } + + /// Scan `text` and classify every matching signature. + /// + /// Matching is case-insensitive substring containment; for each signature + /// only the first matching pattern is recorded (mirrors the Python skill). + pub fn detect(&self, text: &str) -> Detection { + let lowered = text.to_lowercase(); + let mut detection = Detection::default(); + + for sig in &self.signatures { + let Some(pattern) = sig + .patterns + .iter() + .find(|p| lowered.contains(&p.to_lowercase())) + else { + continue; + }; + + let hit = SignatureMatch { + id: sig.id.clone(), + severity: sig.severity, + source: sig.source.clone(), + matched_text: pattern.clone(), + next_action: sig.next_action.clone(), + invoke: sig.invoke.clone(), + }; + + match sig.severity { + Severity::Error => detection.failures.push(hit), + Severity::Warn => detection.warnings.push(hit), + Severity::Info => detection.info.push(hit), + Severity::Ok => detection.healthy.push(hit), + } + } + + detection + } + + /// A small, high-value starter set for Linux hosts (systemd / kernel / + /// container / network). Deliberately conservative — extend per deployment + /// rather than matching broad words like "error" that produce false hits. + pub fn linux_default() -> Self { + Self::new([ + // ── failures ────────────────────────────────────────────── + Signature::new( + "systemd_unit_failed", + Severity::Error, + "systemd", + ["active: failed", "code=exited, status=1/failure", "failed to start"], + "A systemd unit failed. Inspect with `systemctl status ` and `journalctl -u -e`.", + ) + .with_invoke("systemd-admin"), + Signature::new( + "oom_killer", + Severity::Error, + "kernel", + ["out of memory: killed process", "invoked oom-killer"], + "The kernel OOM-killer reaped a process. Reduce memory pressure or raise limits/cgroup memory.", + ), + Signature::new( + "disk_full", + Severity::Error, + "filesystem", + ["no space left on device"], + "A filesystem is full. Free space or grow the volume; check `df -h` and large logs.", + ), + Signature::new( + "docker_container_exited_error", + Severity::Error, + "docker", + ["exited (1)", "exited (137)", "exited (139)"], + "A container exited non-zero. Check `docker logs `; 137 = OOM/kill, 139 = segfault.", + ), + Signature::new( + "ip_forwarding_disabled", + Severity::Error, + "network", + ["net.ipv4.ip_forward = 0"], + "IPv4 forwarding is off. Enable with `sysctl -w net.ipv4.ip_forward=1` and persist in sysctl.d.", + ), + Signature::new( + "firewall_drop_policy", + Severity::Warn, + "network", + ["chain input (policy drop", "policy drop 0 packets"], + "Default INPUT policy is DROP. Confirm intended; add explicit pass rules for required services.", + ), + // ── warnings ────────────────────────────────────────────── + Signature::new( + "connection_refused", + Severity::Warn, + "network", + ["connection refused"], + "A connection was refused — the target service may be down or not listening on that port.", + ), + Signature::new( + "permission_denied", + Severity::Warn, + "filesystem", + ["permission denied"], + "Permission denied. Check file ownership/mode or whether the command needs elevated rights.", + ), + // ── healthy ─────────────────────────────────────────────── + Signature::new( + "systemd_active_running", + Severity::Ok, + "systemd", + ["active: active (running)"], + "Service is active and running.", + ), + Signature::new( + "ip_forwarding_enabled", + Severity::Ok, + "network", + ["net.ipv4.ip_forward = 1"], + "IPv4 forwarding is enabled.", + ), + Signature::new( + "zpool_online", + Severity::Ok, + "zfs", + ["state: online"], + "ZFS pool is healthy (ONLINE).", + ), + ]) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn detect_classifies_into_buckets() { + let set = SignatureSet::linux_default(); + let text = "● nginx.service\n Active: failed (Result: exit-code)\nnet.ipv4.ip_forward = 1"; + let d = set.detect(text); + assert_eq!(d.failures.len(), 1); + assert_eq!(d.failures[0].id, "systemd_unit_failed"); + assert_eq!(d.failures[0].invoke.as_deref(), Some("systemd-admin")); + assert_eq!(d.healthy.len(), 1); + assert_eq!(d.healthy[0].id, "ip_forwarding_enabled"); + assert!(d.has_failures()); + assert!(!d.is_empty()); + } + + #[test] + fn detect_is_case_insensitive() { + let set = SignatureSet::linux_default(); + let d = set.detect("NO SPACE LEFT ON DEVICE while writing /var/log"); + assert_eq!(d.failures.len(), 1); + assert_eq!(d.failures[0].id, "disk_full"); + } + + #[test] + fn only_first_matching_pattern_recorded_per_signature() { + let set = SignatureSet::linux_default(); + // Two docker patterns present, but the signature should record once. + let d = set.detect("web exited (1)\ndb exited (137)"); + assert_eq!(d.failures.len(), 1); + assert_eq!(d.failures[0].id, "docker_container_exited_error"); + assert_eq!(d.failures[0].matched_text, "exited (1)"); + } + + #[test] + fn clean_screen_yields_empty_detection() { + let set = SignatureSet::linux_default(); + let d = set.detect("$ ls\nCargo.toml src target\n$ "); + assert!(d.is_empty()); + assert!(!d.has_failures()); + } + + #[test] + fn alertable_covers_failures_and_warnings_only() { + let set = SignatureSet::linux_default(); + let d = set.detect("Active: failed\nconnection refused\nActive: active (running)"); + let ids: Vec<&str> = d.alertable().map(|m| m.id.as_str()).collect(); + assert!(ids.contains(&"systemd_unit_failed")); + assert!(ids.contains(&"connection_refused")); + assert!(!ids.contains(&"systemd_active_running")); // healthy is not alertable + } + + #[test] + fn empty_set_matches_nothing() { + let set = SignatureSet::empty(); + assert!(set.is_empty()); + assert!(set.detect("Active: failed\nout of memory: killed process").is_empty()); + } + + #[test] + fn detection_round_trips_json() { + let set = SignatureSet::linux_default(); + let d = set.detect("Active: failed"); + let json = serde_json::to_string(&d).unwrap(); + let back: Detection = serde_json::from_str(&json).unwrap(); + assert_eq!(d, back); + assert!(json.contains("\"error\"")); // severity serialized lowercase + } +} diff --git a/crates/colibri-glasspane/src/terminal.rs b/crates/colibri-glasspane/src/terminal.rs new file mode 100644 index 0000000..39be5a5 --- /dev/null +++ b/crates/colibri-glasspane/src/terminal.rs @@ -0,0 +1,362 @@ +//! Terminal-text capture, content-hash framing, and deduplicated history. +//! +//! This is the complementary half of Glasspane's event model: where the rest +//! of the crate derives agent state from structured JSONL events, this module +//! records the **actual terminal text** of a pane and triages it with +//! [`SignatureSet`](crate::signatures::SignatureSet). +//! +//! The key bet is the same one the Python `tmux-screenshot` skill made: a +//! frame's identity is the **SHA-256 of its stripped text**. That makes +//! "record pane history" cheap — a [`TerminalRecorder`] drops any frame whose +//! hash equals the previous one, so polling a near-static pane every second +//! collapses into a compact log of *actual* state transitions rather than +//! thousands of duplicate frames. +//! +//! Alerts are **edge-triggered**: a failure/warning signature is reported only +//! on the frame where it first appears, not on every subsequent frame that +//! still shows it. When the condition clears and later recurs, it fires again. +//! +//! I/O is kept at the edges. [`capture_tmux_pane`] shells out to `tmux`, but +//! [`TerminalRecorder::observe`] takes raw text directly so the dedup/triage +//! logic is fully testable without a terminal (mirroring how PTY launch is +//! separated from the event state model). + +use std::{ + collections::{BTreeSet, VecDeque}, + io, + process::Command, + sync::OnceLock, + time::SystemTime, +}; + +use regex::Regex; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; + +use crate::signatures::{Detection, SignatureMatch, SignatureSet}; + +/// Default number of frames a recorder keeps before evicting the oldest. +pub const DEFAULT_HISTORY_CAPACITY: usize = 256; + +/// One CSI escape sequence (SGR colors, cursor moves, etc.). +fn csi_re() -> &'static Regex { + static RE: OnceLock = OnceLock::new(); + RE.get_or_init(|| Regex::new(r"\x1b\[[0-9;?]*[A-Za-z]").unwrap()) +} + +/// Control characters to drop, preserving tab (`\x09`) and newline (`\x0a`). +/// Includes a stray ESC (`\x1b`) left over from non-CSI sequences. +fn ctrl_re() -> &'static Regex { + static RE: OnceLock = OnceLock::new(); + RE.get_or_init(|| Regex::new(r"[\x00-\x08\x0b-\x1f\x7f]").unwrap()) +} + +/// Strip ANSI escape sequences and control characters from terminal output, +/// leaving plain text suitable for hashing, diffing, and signature matching. +/// Tabs and newlines are preserved. +pub fn strip_ansi(text: &str) -> String { + let no_csi = csi_re().replace_all(text, ""); + ctrl_re().replace_all(&no_csi, "").into_owned() +} + +/// Content-addressed frame id: the first 12 hex chars of `SHA-256(stripped)`. +/// Identical screens produce identical ids — the basis for history dedup and +/// for cheap verification (the id *is* the checksum of the text). +pub fn frame_uuid(stripped: &str) -> String { + let digest = Sha256::digest(stripped.as_bytes()); + let mut hex = String::with_capacity(12); + for byte in digest.iter().take(6) { + hex.push_str(&format!("{byte:02x}")); + } + hex +} + +/// One recorded screen of terminal text plus its triage result. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct CapturedFrame { + /// `frame_uuid` of `text` — also the dedup key. + pub uuid: String, + /// ANSI-stripped terminal text. + pub text: String, + /// RFC3339 capture time (matches the Glasspane snapshot convention). + pub observed_at: String, + /// Signatures detected in this frame. + pub detection: Detection, +} + +/// Outcome of feeding one capture to a [`TerminalRecorder`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Observation { + /// The screen was identical to the last recorded frame — dropped, no + /// history entry, no alerts. + Unchanged, + /// The screen changed and was recorded. + Recorded { + /// Content hash of the new frame. + uuid: String, + /// Failure/warning signatures that *just appeared* (edge-triggered). + /// Empty when nothing newly fired, even if the frame still shows a + /// previously-reported condition. + new_alerts: Vec, + }, +} + +impl Observation { + /// True when this observation produced a new history frame. + pub fn is_recorded(&self) -> bool { + matches!(self, Observation::Recorded { .. }) + } + + /// Newly-fired alerts, if any (empty for `Unchanged`). + pub fn new_alerts(&self) -> &[SignatureMatch] { + match self { + Observation::Recorded { new_alerts, .. } => new_alerts, + Observation::Unchanged => &[], + } + } +} + +/// A deduplicated, signature-triaged history of one pane's terminal text. +/// +/// Feed it raw captures with [`observe`](Self::observe); it keeps only frames +/// whose content actually changed and reports failure/warning signatures on +/// their rising edge. Bounded to `capacity` frames (ring buffer). +#[derive(Debug, Clone)] +pub struct TerminalRecorder { + pane_id: String, + capacity: usize, + signatures: SignatureSet, + last_hash: Option, + /// Failure/warning signature ids firing as of the last recorded frame. + active: BTreeSet, + frames: VecDeque, +} + +impl TerminalRecorder { + /// Create a recorder for `pane_id` with an explicit signature set and + /// history capacity. + pub fn new(pane_id: impl Into, signatures: SignatureSet, capacity: usize) -> Self { + Self { + pane_id: pane_id.into(), + capacity: capacity.max(1), + signatures, + last_hash: None, + active: BTreeSet::new(), + frames: VecDeque::new(), + } + } + + /// Recorder with the Linux default signature set and default capacity. + pub fn linux(pane_id: impl Into) -> Self { + Self::new(pane_id, SignatureSet::linux_default(), DEFAULT_HISTORY_CAPACITY) + } + + pub fn pane_id(&self) -> &str { + &self.pane_id + } + + pub fn len(&self) -> usize { + self.frames.len() + } + + pub fn is_empty(&self) -> bool { + self.frames.is_empty() + } + + /// All retained frames, oldest first. + pub fn frames(&self) -> impl Iterator { + self.frames.iter() + } + + /// The most recently recorded frame, if any. + pub fn latest(&self) -> Option<&CapturedFrame> { + self.frames.back() + } + + /// Failure/warning signature ids currently considered "firing". + pub fn active_alerts(&self) -> impl Iterator { + self.active.iter().map(String::as_str) + } + + /// Record one raw terminal capture observed at `observed_at`. + /// + /// Returns [`Observation::Unchanged`] if the stripped text hashes to the + /// same id as the previous frame (dropped), otherwise + /// [`Observation::Recorded`] with any edge-triggered alerts. + pub fn observe(&mut self, raw: &str, observed_at: SystemTime) -> Observation { + let text = strip_ansi(raw); + let uuid = frame_uuid(&text); + + if self.last_hash.as_deref() == Some(uuid.as_str()) { + return Observation::Unchanged; + } + + let detection = self.signatures.detect(&text); + + // Edge-trigger: alertable signatures not firing as of the last frame. + let current: BTreeSet = + detection.alertable().map(|m| m.id.clone()).collect(); + let new_alerts: Vec = detection + .alertable() + .filter(|m| !self.active.contains(&m.id)) + .cloned() + .collect(); + self.active = current; + + let frame = CapturedFrame { + uuid: uuid.clone(), + text, + observed_at: crate::system_time_to_rfc3339(observed_at), + detection, + }; + self.frames.push_back(frame); + if self.frames.len() > self.capacity { + self.frames.pop_front(); + } + self.last_hash = Some(uuid.clone()); + + Observation::Recorded { uuid, new_alerts } + } + + /// Capture the named tmux target and record it. Convenience wrapper over + /// [`capture_tmux_pane`] + [`observe`](Self::observe) for the daemon's + /// poll loop. Errors propagate the tmux capture failure. + pub fn observe_tmux(&mut self, target: &str, observed_at: SystemTime) -> io::Result { + let raw = capture_tmux_pane(target)?; + Ok(self.observe(&raw, observed_at)) + } +} + +/// Capture a tmux pane's visible content (with escape sequences) as a string. +/// +/// `target` is any tmux target spec: `session`, `session:window`, +/// `session:window.pane`, or an absolute pane id like `%3`. Runs +/// `tmux capture-pane -e -p -t `. Requires `tmux` on `PATH`. +pub fn capture_tmux_pane(target: &str) -> io::Result { + let output = Command::new("tmux") + .args(["capture-pane", "-e", "-p", "-t", target]) + .output()?; + if !output.status.success() { + return Err(io::Error::other(format!( + "tmux capture-pane failed for {target:?}: {}", + String::from_utf8_lossy(&output.stderr).trim() + ))); + } + Ok(String::from_utf8_lossy(&output.stdout).into_owned()) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + fn t(secs: u64) -> SystemTime { + SystemTime::UNIX_EPOCH + Duration::from_secs(secs) + } + + #[test] + fn strip_ansi_removes_color_and_control_keeps_text() { + // green "ok", reset, then a cursor-home CSI and a stray carriage return. + let raw = "\x1b[32mok\x1b[0m\x1b[Hdone\r\n"; + assert_eq!(strip_ansi(raw), "okdone\n"); + } + + #[test] + fn strip_ansi_preserves_tabs_and_newlines() { + assert_eq!(strip_ansi("a\tb\nc"), "a\tb\nc"); + } + + #[test] + fn frame_uuid_is_stable_and_12_hex() { + let a = frame_uuid("hello world"); + let b = frame_uuid("hello world"); + assert_eq!(a, b); + assert_eq!(a.len(), 12); + assert!(a.chars().all(|c| c.is_ascii_hexdigit())); + assert_ne!(frame_uuid("hello world"), frame_uuid("hello worle")); + } + + #[test] + fn uuid_ignores_color_only_differences() { + // Same text, different coloring → same frame id (dedup works on content). + let plain = "\x1b[31mALERT\x1b[0m"; + let other = "\x1b[33mALERT\x1b[0m"; + assert_eq!(frame_uuid(&strip_ansi(plain)), frame_uuid(&strip_ansi(other))); + } + + #[test] + fn identical_screen_is_dropped() { + let mut rec = TerminalRecorder::linux("pane-1"); + let first = rec.observe("$ idle prompt\n", t(1)); + assert!(first.is_recorded()); + let second = rec.observe("$ idle prompt\n", t(2)); + assert_eq!(second, Observation::Unchanged); + assert_eq!(rec.len(), 1, "duplicate frame must not be stored"); + } + + #[test] + fn changed_screen_is_recorded() { + let mut rec = TerminalRecorder::linux("pane-1"); + rec.observe("frame one\n", t(1)); + rec.observe("frame two\n", t(2)); + assert_eq!(rec.len(), 2); + assert_eq!(rec.latest().unwrap().text, "frame two\n"); + assert_eq!(rec.latest().unwrap().observed_at, "1970-01-01T00:00:02.000Z"); + } + + #[test] + fn failure_alert_is_edge_triggered() { + let mut rec = TerminalRecorder::linux("pane-1"); + + // Rising edge: failure appears → alert fires once. + let o1 = rec.observe("boot log\nActive: failed\n", t(1)); + assert_eq!(o1.new_alerts().len(), 1); + assert_eq!(o1.new_alerts()[0].id, "systemd_unit_failed"); + + // Different screen, failure still present → no repeat alert. + let o2 = rec.observe("boot log\nActive: failed\nstill broken\n", t(2)); + assert!(o2.is_recorded()); + assert_eq!(o2.new_alerts().len(), 0, "persisting failure must not re-alert"); + + // Condition clears. + let o3 = rec.observe("Active: active (running)\n", t(3)); + assert_eq!(o3.new_alerts().len(), 0); + assert!(rec.active_alerts().next().is_none()); + + // Recurs → fires again (falling then rising edge). + let o4 = rec.observe("Active: failed\nagain\n", t(4)); + assert_eq!(o4.new_alerts().len(), 1); + assert_eq!(o4.new_alerts()[0].id, "systemd_unit_failed"); + } + + #[test] + fn ring_buffer_evicts_oldest() { + let mut rec = TerminalRecorder::new("pane-1", SignatureSet::empty(), 3); + for i in 0..5 { + rec.observe(&format!("frame {i}\n"), t(i)); + } + assert_eq!(rec.len(), 3); + let texts: Vec = rec.frames().map(|f| f.text.clone()).collect(); + assert_eq!(texts, vec!["frame 2\n", "frame 3\n", "frame 4\n"]); + } + + #[test] + fn capacity_floor_is_one() { + let mut rec = TerminalRecorder::new("p", SignatureSet::empty(), 0); + rec.observe("a\n", t(1)); + rec.observe("b\n", t(2)); + assert_eq!(rec.len(), 1); + } + + #[test] + fn frame_carries_detection_for_storage() { + let mut rec = TerminalRecorder::linux("pane-1"); + rec.observe("no space left on device\n", t(1)); + let frame = rec.latest().unwrap(); + assert!(frame.detection.has_failures()); + // Frame is serializable for history persistence. + let json = serde_json::to_string(frame).unwrap(); + let back: CapturedFrame = serde_json::from_str(&json).unwrap(); + assert_eq!(*frame, back); + } +}