feat(glasspane): terminal capture, signature triage, and edge-triggered alerts
Some checks failed
CI / rust (pull_request) Has been cancelled
CI / markdown (pull_request) Has been cancelled
CI / port (pull_request) Has been cancelled
CI / agent-jail-pkgs (pull_request) Has been cancelled

Add the screen-scraping half of Glasspane to complement its event-state
model. Ports the *brain* of the clawdie-ai tmux-screenshot skill (not the
PNG) into the Rust core and wires it into the daemon.

colibri-glasspane:
- terminal.rs: strip_ansi, content-hash frame ids (sha256[:12]),
  CapturedFrame, and TerminalRecorder — a deduped ring buffer that drops
  frames identical to the previous one (so polling a static pane collapses
  to a log of real transitions) with edge-triggered alerting (a failure
  fires once on its rising edge, re-fires only after it clears). Thin
  capture_tmux_pane seam keeps I/O out of the testable core.
- signatures.rs: data-driven Severity/Signature/Detection/SignatureSet
  matcher with a high-value linux_default() set (systemd/oom/disk/docker/
  forwarding). Per-OS set is the hook for capability-routing.

colibri-daemon:
- DaemonState.terminal map of per-pane recorders; poll tick in run_loop
  gated on COLIBRI_TERMINAL_CAPTURE, seeded from COLIBRI_TERMINAL_WATCH.
- capture_and_record() shares the blocking tmux capture (on spawn_blocking)
  + brief lock fold between the loop and the socket; env-gated Telegram
  alert routing that no-ops cleanly when unconfigured.
- socket cmds: terminal-watch/unwatch/list/history/poll.
- env_bool helper: forgiving truthy parsing (1/true/yes/on) so
  COLIBRI_TERMINAL_CAPTURE=1 is not silently false like bool::from_str.

Tests: 17 new glasspane unit tests + daemon socket/config tests; whole
workspace green, clippy clean. Verified live on Linux (domedog): autonomous
loop deduped ~5 ticks into 2 frames and fired one edge-triggered alert.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Sam & Claude 2026-06-25 20:50:57 +02:00
parent e953b1c050
commit 0509ed76bc
12 changed files with 1141 additions and 4 deletions

2
Cargo.lock generated
View file

@ -361,8 +361,10 @@ version = "0.12.0"
dependencies = [
"chrono",
"portable-pty",
"regex",
"serde",
"serde_json",
"sha2",
]
[[package]]

View file

@ -31,6 +31,11 @@ fn check_config() -> DaemonConfig {
cache_warming_interval_hours: 0,
headroom_enabled: false,
headroom_socket_path: std::path::PathBuf::from("/var/run/colibri/headroom.sock"),
terminal_capture_enabled: false,
terminal_capture_interval_secs: 5,
terminal_watch_targets: Vec::new(),
telegram_bot_token: None,
telegram_chat_id: None,
}
}

View file

@ -53,6 +53,17 @@ pub struct DaemonConfig {
pub headroom_enabled: bool,
/// Path to the headroom sidecar Unix socket.
pub headroom_socket_path: PathBuf,
/// Enable the terminal-capture poll loop (tmux pane history + signature
/// alerts). Disabled by default — only ticks when panes are watched.
pub terminal_capture_enabled: bool,
/// How often (seconds) the poll loop captures each watched pane.
pub terminal_capture_interval_secs: u64,
/// tmux targets to watch from startup (CSV: `session:window`, `%pane`, …).
pub terminal_watch_targets: Vec<String>,
/// Telegram bot token for routing terminal alerts (optional).
pub telegram_bot_token: Option<String>,
/// Telegram chat id that terminal alerts are sent to (optional).
pub telegram_chat_id: Option<String>,
}
impl DaemonConfig {
@ -103,6 +114,13 @@ impl DaemonConfig {
headroom_socket_path: std::env::var("COLIBRI_HEADROOM_SOCKET")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("/var/run/colibri/headroom.sock")),
terminal_capture_enabled: env_bool("COLIBRI_TERMINAL_CAPTURE"),
terminal_capture_interval_secs: env_parse("COLIBRI_TERMINAL_CAPTURE_INTERVAL_SECS")
.filter(|&n| n > 0)
.unwrap_or(5),
terminal_watch_targets: env_csv("COLIBRI_TERMINAL_WATCH"),
telegram_bot_token: nonempty_env("TELEGRAM_BOT_TOKEN"),
telegram_chat_id: nonempty_env("TELEGRAM_CHAT_ID"),
}
}
}
@ -119,6 +137,29 @@ fn env_parse<T: std::str::FromStr>(name: &str) -> Option<T> {
std::env::var(name).ok().and_then(|v| v.parse().ok())
}
/// Parse a boolean env var, accepting the common truthy spellings
/// (`1`/`true`/`yes`/`on`, case-insensitive). Anything else — including unset —
/// is false. More forgiving than `bool::from_str`, which only takes
/// `true`/`false` and would silently treat `=1` as false.
fn env_bool(name: &str) -> bool {
std::env::var(name)
.map(|v| matches!(v.trim().to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
.unwrap_or(false)
}
/// Parse a comma-separated env var into trimmed, non-empty entries.
fn env_csv(name: &str) -> Vec<String> {
std::env::var(name)
.ok()
.map(|v| {
v.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect()
})
.unwrap_or_default()
}
fn dirs_data() -> Option<PathBuf> {
std::env::var("XDG_DATA_HOME")
.ok()
@ -190,6 +231,26 @@ mod tests {
assert!(config.anthropic_api_key.is_none());
}
#[test]
fn env_bool_accepts_common_truthy_spellings() {
let _guard = ENV_LOCK.lock().unwrap();
for (val, expected) in [
("1", true),
("true", true),
("TRUE", true),
("yes", true),
("on", true),
("0", false),
("false", false),
("nope", false),
] {
std::env::set_var("COLIBRI_TEST_BOOL", val);
assert_eq!(env_bool("COLIBRI_TEST_BOOL"), expected, "value {val:?}");
}
std::env::remove_var("COLIBRI_TEST_BOOL");
assert!(!env_bool("COLIBRI_TEST_BOOL"), "unset is false");
}
#[test]
fn test_config_from_env_vars() {
let _guard = ENV_LOCK.lock().unwrap();

View file

@ -1,10 +1,11 @@
//! Main daemon loop — heartbeat, task polling, session rotation, memory handoff, cache warming.
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, SystemTime};
use chrono::Utc;
use colibri_glasspane::PaneSupervisor;
use colibri_glasspane::{Observation, PaneSupervisor, SignatureMatch, TerminalRecorder};
use colibri_store::Store;
use dashmap::DashMap;
use tokio::sync::{broadcast, Mutex, RwLock};
@ -20,6 +21,10 @@ pub struct DaemonState {
pub sessions: DashMap<String, Session>,
pub agents: DashMap<String, AgentHandle>,
pub glasspane: RwLock<PaneSupervisor>,
/// Per-pane terminal-text recorders (tmux target → deduped history +
/// signature triage). Populated from config at startup and via the
/// `terminal-watch` socket command.
pub terminal: RwLock<BTreeMap<String, TerminalRecorder>>,
pub store: std::sync::Mutex<Store>,
pub scheduler: Mutex<crate::scheduler::Scheduler>,
pub last_warm_at: RwLock<Option<chrono::DateTime<Utc>>>,
@ -48,6 +53,7 @@ impl DaemonState {
sessions: DashMap::new(),
agents: DashMap::new(),
glasspane: RwLock::new(PaneSupervisor::new()),
terminal: RwLock::new(BTreeMap::new()),
store: std::sync::Mutex::new(store),
scheduler: Mutex::new(crate::scheduler::Scheduler::new()),
last_warm_at: RwLock::new(None),
@ -70,6 +76,7 @@ pub struct DaemonLoopConfig {
pub memory_handoff_interval: Duration,
pub agent_stall_timeout: Duration,
pub scheduler_interval: Duration,
pub terminal_capture_interval: Duration,
}
impl Default for DaemonLoopConfig {
@ -80,6 +87,7 @@ impl Default for DaemonLoopConfig {
memory_handoff_interval: Duration::from_secs(120),
agent_stall_timeout: Duration::from_secs(300),
scheduler_interval: Duration::from_secs(30),
terminal_capture_interval: Duration::from_secs(5),
}
}
}
@ -96,11 +104,16 @@ pub async fn run_loop(
let mut rotation_tick = tokio::time::interval(loop_config.session_rotation_interval);
let mut handoff_tick = tokio::time::interval(loop_config.memory_handoff_interval);
let mut scheduler_tick = tokio::time::interval(loop_config.scheduler_interval);
let mut terminal_tick = tokio::time::interval(loop_config.terminal_capture_interval);
heartbeat_tick.tick().await;
rotation_tick.tick().await;
handoff_tick.tick().await;
scheduler_tick.tick().await;
terminal_tick.tick().await;
// Seed terminal-capture watch list from config.
seed_terminal_watches(&state).await;
info!("daemon background loop started");
@ -130,6 +143,9 @@ pub async fn run_loop(
_ = rotation_tick.tick() => session_rotation(&state).await,
_ = handoff_tick.tick() => memory_handoff(&state).await,
_ = scheduler_tick.tick() => scheduler_tick_fn(&state).await,
_ = terminal_tick.tick(), if state.config.terminal_capture_enabled => {
terminal_capture_tick(&state).await;
}
_ = shutdown_rx.recv() => {
info!("daemon loop received shutdown signal");
break;
@ -479,6 +495,125 @@ async fn scheduler_tick_fn(state: &SharedState) {
poll_tasks(state).await;
}
// ---------------------------------------------------------------------------
// Terminal capture — tmux pane history + signature alerts
// ---------------------------------------------------------------------------
/// Register every `terminal_watch_targets` entry as a Linux-signature recorder
/// (idempotent — existing recorders are left untouched).
pub(crate) async fn seed_terminal_watches(state: &SharedState) {
if state.config.terminal_watch_targets.is_empty() {
return;
}
let mut map = state.terminal.write().await;
for target in &state.config.terminal_watch_targets {
map.entry(target.clone())
.or_insert_with(|| TerminalRecorder::linux(target.clone()));
}
info!(
watched = map.len(),
enabled = state.config.terminal_capture_enabled,
"terminal capture watch list seeded"
);
}
/// Capture and record every watched pane once. Per-pane capture errors (e.g. a
/// pane that has gone away) are logged at debug and skipped — one bad target
/// must not stall the others.
async fn terminal_capture_tick(state: &SharedState) {
let targets: Vec<String> = state.terminal.read().await.keys().cloned().collect();
for target in targets {
if let Err(e) = capture_and_record(state, &target).await {
debug!(target = %target, error = %e, "terminal capture skipped");
}
}
}
/// Capture one watched pane, fold it into its recorder, and route any
/// edge-triggered alerts. Returns the [`Observation`] so callers (the poll loop
/// and the `terminal-poll` socket command) can report what happened.
///
/// The blocking `tmux capture-pane` runs on a blocking thread so it never holds
/// the async runtime; the recorder lock is taken only briefly to fold the text.
pub(crate) async fn capture_and_record(
state: &SharedState,
target: &str,
) -> Result<Observation, String> {
let owned = target.to_string();
let raw = tokio::task::spawn_blocking(move || colibri_glasspane::capture_tmux_pane(&owned))
.await
.map_err(|e| format!("capture task failed: {e}"))?
.map_err(|e| e.to_string())?;
let observation = {
let mut map = state.terminal.write().await;
let recorder = map
.get_mut(target)
.ok_or_else(|| format!("pane {target:?} is not being watched"))?;
recorder.observe(&raw, SystemTime::now())
};
if let Observation::Recorded { uuid, new_alerts } = &observation {
if new_alerts.is_empty() {
debug!(target = %target, uuid = %uuid, "terminal frame recorded");
} else {
let message = format_terminal_alerts(state, target, new_alerts);
warn!(target = %target, uuid = %uuid, alerts = new_alerts.len(), "terminal alert fired");
notify_telegram(state, &message).await;
}
}
Ok(observation)
}
/// Build a human-readable, Telegram-friendly alert message from edge-triggered
/// signature matches.
fn format_terminal_alerts(state: &SharedState, target: &str, alerts: &[SignatureMatch]) -> String {
let mut lines = vec![format!(
"⚠ colibri@{}: {} new alert(s) on tmux {}",
state.config.host,
alerts.len(),
target
)];
for a in alerts {
let severity = format!("{:?}", a.severity).to_uppercase();
let invoke = a
.invoke
.as_deref()
.map(|s| format!(" [fix: run `{s}`]"))
.unwrap_or_default();
lines.push(format!("{severity} {}{}{invoke}", a.id, a.next_action));
}
lines.join("\n")
}
/// Send an alert to Telegram if a bot token + chat id are configured.
/// No-op (debug log only) when unconfigured, so the feature degrades cleanly.
async fn notify_telegram(state: &SharedState, text: &str) {
let (Some(token), Some(chat_id)) = (
state.config.telegram_bot_token.as_deref(),
state.config.telegram_chat_id.as_deref(),
) else {
debug!("telegram alert not sent: bot token / chat id not configured");
return;
};
let client = reqwest::Client::new();
let url = format!("https://api.telegram.org/bot{token}/sendMessage");
let send = client
.post(&url)
.json(&serde_json::json!({ "chat_id": chat_id, "text": text }))
.timeout(Duration::from_secs(10))
.send()
.await;
match send {
Ok(resp) if resp.status().is_success() => debug!("telegram alert sent"),
Ok(resp) => warn!(status = %resp.status(), "telegram alert rejected"),
Err(e) => warn!(error = %e, "telegram alert send failed"),
}
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -105,6 +105,31 @@ pub enum ColibriCommand {
},
#[serde(rename = "list-tenants")]
ListTenants,
// ── Terminal capture ──────────────────────────────────────
/// Start recording a tmux pane's terminal text (deduped history +
/// signature triage). `target` is any tmux target spec.
#[serde(rename = "terminal-watch")]
TerminalWatch { target: String },
/// Stop recording a pane and drop its history.
#[serde(rename = "terminal-unwatch")]
TerminalUnwatch { target: String },
/// List watched panes with frame counts and active alerts.
#[serde(rename = "terminal-list")]
TerminalList,
/// Return recorded frames for a pane (most recent `limit`, default 20).
#[serde(rename = "terminal-history")]
TerminalHistory {
target: String,
#[serde(default)]
limit: Option<usize>,
},
/// Capture watched panes immediately (one `target`, or all when omitted)
/// instead of waiting for the poll tick.
#[serde(rename = "terminal-poll")]
TerminalPoll {
#[serde(default)]
target: Option<String>,
},
}
/// Outbound control-plane response.

View file

@ -82,7 +82,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Start the daemon background loop (heartbeat, session rotation, scheduler)
let loop_state = state.clone();
let loop_shutdown = state.shutdown_rx.resubscribe();
let loop_config = daemon::DaemonLoopConfig::default();
let loop_config = daemon::DaemonLoopConfig {
terminal_capture_interval: std::time::Duration::from_secs(
config.terminal_capture_interval_secs,
),
..daemon::DaemonLoopConfig::default()
};
let loop_handle = tokio::spawn(async move {
daemon::run_loop(loop_state, loop_config, loop_shutdown).await;
});

View file

@ -363,6 +363,11 @@ mod tests {
cache_warming_interval_hours: 0,
headroom_enabled: false,
headroom_socket_path: std::path::PathBuf::from("/var/run/colibri/headroom.sock"),
terminal_capture_enabled: false,
terminal_capture_interval_secs: 5,
terminal_watch_targets: Vec::new(),
telegram_bot_token: None,
telegram_chat_id: None,
}
}

View file

@ -285,6 +285,13 @@ async fn dispatch(cmd: ColibriCommand, state: &SharedState) -> ColibriResponse {
collection_id,
} => cmd_register_tenant(state, tenant_id, jail_root_path, collection_id).await,
ColibriCommand::ListTenants => cmd_list_tenants(state).await,
ColibriCommand::TerminalWatch { target } => cmd_terminal_watch(state, target).await,
ColibriCommand::TerminalUnwatch { target } => cmd_terminal_unwatch(state, target).await,
ColibriCommand::TerminalList => cmd_terminal_list(state).await,
ColibriCommand::TerminalHistory { target, limit } => {
cmd_terminal_history(state, target, limit).await
}
ColibriCommand::TerminalPoll { target } => cmd_terminal_poll(state, target).await,
}
}
@ -383,6 +390,115 @@ async fn cmd_glasspane_snapshot(state: &SharedState) -> ColibriResponse {
}
}
// ---------------------------------------------------------------------------
// Terminal capture commands
// ---------------------------------------------------------------------------
async fn cmd_terminal_watch(state: &SharedState, target: String) -> ColibriResponse {
if target.trim().is_empty() {
return ColibriResponse::err("target must not be empty");
}
let mut map = state.terminal.write().await;
let already = map.contains_key(&target);
map.entry(target.clone())
.or_insert_with(|| colibri_glasspane::TerminalRecorder::linux(target.clone()));
info!(target = %target, already, "terminal-watch");
ColibriResponse::ok(serde_json::json!({
"target": target,
"watching": true,
"already_watched": already,
"capture_enabled": state.config.terminal_capture_enabled,
"watched_count": map.len(),
}))
}
async fn cmd_terminal_unwatch(state: &SharedState, target: String) -> ColibriResponse {
let removed = state.terminal.write().await.remove(&target).is_some();
ColibriResponse::ok(serde_json::json!({
"target": target,
"removed": removed,
}))
}
async fn cmd_terminal_list(state: &SharedState) -> ColibriResponse {
let map = state.terminal.read().await;
let panes: Vec<_> = map
.values()
.map(|rec| {
let latest = rec.latest();
serde_json::json!({
"target": rec.pane_id(),
"frames": rec.len(),
"active_alerts": rec.active_alerts().collect::<Vec<_>>(),
"latest_uuid": latest.map(|f| f.uuid.clone()),
"latest_at": latest.map(|f| f.observed_at.clone()),
"latest_failures": latest.map(|f| f.detection.failures.len()).unwrap_or(0),
})
})
.collect();
ColibriResponse::ok(serde_json::json!({
"capture_enabled": state.config.terminal_capture_enabled,
"interval_secs": state.config.terminal_capture_interval_secs,
"panes": panes,
}))
}
async fn cmd_terminal_history(
state: &SharedState,
target: String,
limit: Option<usize>,
) -> ColibriResponse {
let map = state.terminal.read().await;
let Some(rec) = map.get(&target) else {
return ColibriResponse::err(format!("pane {target:?} is not being watched"));
};
let frames: Vec<_> = rec.frames().collect();
let n = limit.unwrap_or(20).min(frames.len());
let tail = &frames[frames.len() - n..];
match serde_json::to_value(tail) {
Ok(value) => ColibriResponse::ok(serde_json::json!({
"target": target,
"total_frames": rec.len(),
"returned": n,
"frames": value,
})),
Err(e) => ColibriResponse::err(format!("history serialization failed: {e}")),
}
}
async fn cmd_terminal_poll(state: &SharedState, target: Option<String>) -> ColibriResponse {
let targets: Vec<String> = match target {
Some(t) => vec![t],
None => state.terminal.read().await.keys().cloned().collect(),
};
if targets.is_empty() {
return ColibriResponse::err("no panes are being watched");
}
let mut results = Vec::new();
for target in targets {
let entry = match crate::daemon::capture_and_record(state, &target).await {
Ok(colibri_glasspane::Observation::Unchanged) => serde_json::json!({
"target": target,
"status": "unchanged",
}),
Ok(colibri_glasspane::Observation::Recorded { uuid, new_alerts }) => serde_json::json!({
"target": target,
"status": "recorded",
"uuid": uuid,
"new_alerts": new_alerts,
}),
Err(e) => serde_json::json!({
"target": target,
"status": "error",
"error": e,
}),
};
results.push(entry);
}
ColibriResponse::ok(serde_json::json!({ "results": results }))
}
async fn cmd_list_sessions(state: &SharedState) -> ColibriResponse {
let mut sessions = Vec::new();
for entry in state.sessions.iter() {
@ -1031,6 +1147,11 @@ mod tests {
cache_warming_interval_hours: 0,
headroom_enabled: false,
headroom_socket_path: std::path::PathBuf::from("/var/run/colibri/headroom.sock"),
terminal_capture_enabled: false,
terminal_capture_interval_secs: 5,
terminal_watch_targets: Vec::new(),
telegram_bot_token: None,
telegram_chat_id: None,
}
}
@ -1142,6 +1263,50 @@ mod tests {
assert_eq!(pane.session_id.as_deref(), Some("pi-sample"));
}
#[tokio::test]
async fn terminal_watch_list_unwatch_roundtrip() {
let state: SharedState = Arc::new(DaemonState::new(test_config()));
let watched = cmd_terminal_watch(&state, "smoketest:0".to_string()).await;
assert!(watched.ok);
assert_eq!(watched.data.unwrap()["already_watched"], false);
// Idempotent: watching again reports already_watched.
let again = cmd_terminal_watch(&state, "smoketest:0".to_string()).await;
assert_eq!(again.data.unwrap()["already_watched"], true);
let list = cmd_terminal_list(&state).await;
let data = list.data.unwrap();
assert_eq!(data["panes"].as_array().unwrap().len(), 1);
assert_eq!(data["panes"][0]["target"], "smoketest:0");
assert_eq!(data["panes"][0]["frames"], 0);
let removed = cmd_terminal_unwatch(&state, "smoketest:0".to_string()).await;
assert_eq!(removed.data.unwrap()["removed"], true);
let empty = cmd_terminal_list(&state).await;
assert_eq!(empty.data.unwrap()["panes"].as_array().unwrap().len(), 0);
}
#[tokio::test]
async fn terminal_watch_rejects_empty_target() {
let state: SharedState = Arc::new(DaemonState::new(test_config()));
assert!(!cmd_terminal_watch(&state, " ".to_string()).await.ok);
}
#[tokio::test]
async fn terminal_history_errors_for_unwatched_pane() {
let state: SharedState = Arc::new(DaemonState::new(test_config()));
let resp = cmd_terminal_history(&state, "ghost".to_string(), None).await;
assert!(!resp.ok);
assert!(resp.error.unwrap().contains("not being watched"));
}
#[tokio::test]
async fn terminal_poll_errors_when_nothing_watched() {
let state: SharedState = Arc::new(DaemonState::new(test_config()));
assert!(!cmd_terminal_poll(&state, None).await.ok);
}
#[tokio::test]
async fn clear_stale_socket_returns_true_when_absent() {
let dir = std::env::temp_dir().join(format!(

View file

@ -10,3 +10,6 @@ chrono = { version = "0.4", default-features = false, features = ["clock"] }
portable-pty = "0.9"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
# Terminal-capture layer: content-hash frame ids + ANSI stripping.
sha2 = "0.10"
regex = "1"

View file

@ -12,6 +12,14 @@
//! is scaffolded with `portable-pty`; tests use sample JSONL readers until the
//! FreeBSD validation lane exercises real terminals.
pub mod signatures;
pub mod terminal;
pub use signatures::{Detection, Severity, Signature, SignatureMatch, SignatureSet};
pub use terminal::{
capture_tmux_pane, frame_uuid, strip_ansi, CapturedFrame, Observation, TerminalRecorder,
};
use std::{
collections::BTreeMap,
io::{self, BufRead, BufReader, Read, Write},
@ -619,7 +627,7 @@ fn skip_false(value: &bool) -> bool {
!*value
}
fn system_time_to_rfc3339(time: SystemTime) -> String {
pub(crate) fn system_time_to_rfc3339(time: SystemTime) -> String {
let dt: DateTime<Utc> = time.into();
dt.to_rfc3339_opts(SecondsFormat::Millis, true)
}

View file

@ -0,0 +1,361 @@
//! Signature-based triage of captured terminal text.
//!
//! A [`SignatureSet`] scans stripped terminal text for known patterns and
//! classifies the screen into `failures` / `warnings` / `info` / `healthy`.
//! Each match carries a human `next_action` and an optional `invoke` (a skill
//! the agent can run to remediate) — so a hit is not just "something happened"
//! but "here is what it means and what to do".
//!
//! The detection engine is data-driven: the FreeBSD host and a Linux host load
//! different [`Signature`] sets but share this matcher. This is the per-OS knob
//! that the rest of Colibri's capability-routing can lean on. [`SignatureSet::linux_default`]
//! ships a small, high-value starter set; callers can build their own with
//! [`SignatureSet::new`].
use serde::{Deserialize, Serialize};
/// How serious a matched signature is. Mirrors the Python skill's taxonomy
/// (`error` / `warn` / `info` for failure patterns, `ok` for success patterns).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Severity {
/// Known-broken state needing action.
Error,
/// Suspicious but not fatal; worth surfacing.
Warn,
/// Benign/expected note.
Info,
/// Known-healthy state — confirms things work.
Ok,
}
/// One known terminal-text pattern and what it means.
///
/// `patterns` are matched as **case-insensitive substrings**; the first one
/// that hits records the signature (further patterns are not reported twice).
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Signature {
pub id: String,
pub severity: Severity,
/// Which skill/domain owns this knowledge (provenance for the operator).
pub source: String,
pub patterns: Vec<String>,
/// Plain-language remediation hint.
pub next_action: String,
/// Optional skill to auto-invoke to fix the condition.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub invoke: Option<String>,
}
impl Signature {
/// Convenience builder for a failure/info signature without an `invoke`.
pub fn new(
id: impl Into<String>,
severity: Severity,
source: impl Into<String>,
patterns: impl IntoIterator<Item = impl Into<String>>,
next_action: impl Into<String>,
) -> Self {
Self {
id: id.into(),
severity,
source: source.into(),
patterns: patterns.into_iter().map(Into::into).collect(),
next_action: next_action.into(),
invoke: None,
}
}
/// Attach a remediation skill to auto-invoke.
pub fn with_invoke(mut self, invoke: impl Into<String>) -> Self {
self.invoke = Some(invoke.into());
self
}
}
/// A single signature hit against captured text.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SignatureMatch {
pub id: String,
pub severity: Severity,
pub source: String,
/// The exact pattern string that matched.
pub matched_text: String,
pub next_action: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub invoke: Option<String>,
}
/// The classified result of scanning one screen of text.
///
/// Buckets are filled in declaration order of the signatures; `failures` are
/// the ones the agent should act on before acknowledging `healthy` state.
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct Detection {
pub failures: Vec<SignatureMatch>,
pub warnings: Vec<SignatureMatch>,
pub info: Vec<SignatureMatch>,
pub healthy: Vec<SignatureMatch>,
}
impl Detection {
/// True when nothing matched at all.
pub fn is_empty(&self) -> bool {
self.failures.is_empty()
&& self.warnings.is_empty()
&& self.info.is_empty()
&& self.healthy.is_empty()
}
/// True when at least one `error`-severity signature fired.
pub fn has_failures(&self) -> bool {
!self.failures.is_empty()
}
/// All `error`/`warn` matches — the ones that should drive an alert.
/// (Edge-triggered de-duplication is the recorder's job, not this method's.)
pub fn alertable(&self) -> impl Iterator<Item = &SignatureMatch> {
self.failures.iter().chain(self.warnings.iter())
}
/// Flat list of every match across all buckets (failures → warnings →
/// info → healthy), for compact JSON metadata.
pub fn all(&self) -> Vec<SignatureMatch> {
let mut out = Vec::new();
out.extend(self.failures.iter().cloned());
out.extend(self.warnings.iter().cloned());
out.extend(self.info.iter().cloned());
out.extend(self.healthy.iter().cloned());
out
}
}
/// An ordered collection of signatures and the matcher over them.
#[derive(Debug, Clone, Default)]
pub struct SignatureSet {
signatures: Vec<Signature>,
}
impl SignatureSet {
/// Build a set from an explicit list of signatures.
pub fn new(signatures: impl IntoIterator<Item = Signature>) -> Self {
Self {
signatures: signatures.into_iter().collect(),
}
}
/// An empty set — useful for capture-only history with no triage.
pub fn empty() -> Self {
Self::default()
}
pub fn len(&self) -> usize {
self.signatures.len()
}
pub fn is_empty(&self) -> bool {
self.signatures.is_empty()
}
pub fn signatures(&self) -> &[Signature] {
&self.signatures
}
/// Scan `text` and classify every matching signature.
///
/// Matching is case-insensitive substring containment; for each signature
/// only the first matching pattern is recorded (mirrors the Python skill).
pub fn detect(&self, text: &str) -> Detection {
let lowered = text.to_lowercase();
let mut detection = Detection::default();
for sig in &self.signatures {
let Some(pattern) = sig
.patterns
.iter()
.find(|p| lowered.contains(&p.to_lowercase()))
else {
continue;
};
let hit = SignatureMatch {
id: sig.id.clone(),
severity: sig.severity,
source: sig.source.clone(),
matched_text: pattern.clone(),
next_action: sig.next_action.clone(),
invoke: sig.invoke.clone(),
};
match sig.severity {
Severity::Error => detection.failures.push(hit),
Severity::Warn => detection.warnings.push(hit),
Severity::Info => detection.info.push(hit),
Severity::Ok => detection.healthy.push(hit),
}
}
detection
}
/// A small, high-value starter set for Linux hosts (systemd / kernel /
/// container / network). Deliberately conservative — extend per deployment
/// rather than matching broad words like "error" that produce false hits.
pub fn linux_default() -> Self {
Self::new([
// ── failures ──────────────────────────────────────────────
Signature::new(
"systemd_unit_failed",
Severity::Error,
"systemd",
["active: failed", "code=exited, status=1/failure", "failed to start"],
"A systemd unit failed. Inspect with `systemctl status <unit>` and `journalctl -u <unit> -e`.",
)
.with_invoke("systemd-admin"),
Signature::new(
"oom_killer",
Severity::Error,
"kernel",
["out of memory: killed process", "invoked oom-killer"],
"The kernel OOM-killer reaped a process. Reduce memory pressure or raise limits/cgroup memory.",
),
Signature::new(
"disk_full",
Severity::Error,
"filesystem",
["no space left on device"],
"A filesystem is full. Free space or grow the volume; check `df -h` and large logs.",
),
Signature::new(
"docker_container_exited_error",
Severity::Error,
"docker",
["exited (1)", "exited (137)", "exited (139)"],
"A container exited non-zero. Check `docker logs <id>`; 137 = OOM/kill, 139 = segfault.",
),
Signature::new(
"ip_forwarding_disabled",
Severity::Error,
"network",
["net.ipv4.ip_forward = 0"],
"IPv4 forwarding is off. Enable with `sysctl -w net.ipv4.ip_forward=1` and persist in sysctl.d.",
),
Signature::new(
"firewall_drop_policy",
Severity::Warn,
"network",
["chain input (policy drop", "policy drop 0 packets"],
"Default INPUT policy is DROP. Confirm intended; add explicit pass rules for required services.",
),
// ── warnings ──────────────────────────────────────────────
Signature::new(
"connection_refused",
Severity::Warn,
"network",
["connection refused"],
"A connection was refused — the target service may be down or not listening on that port.",
),
Signature::new(
"permission_denied",
Severity::Warn,
"filesystem",
["permission denied"],
"Permission denied. Check file ownership/mode or whether the command needs elevated rights.",
),
// ── healthy ───────────────────────────────────────────────
Signature::new(
"systemd_active_running",
Severity::Ok,
"systemd",
["active: active (running)"],
"Service is active and running.",
),
Signature::new(
"ip_forwarding_enabled",
Severity::Ok,
"network",
["net.ipv4.ip_forward = 1"],
"IPv4 forwarding is enabled.",
),
Signature::new(
"zpool_online",
Severity::Ok,
"zfs",
["state: online"],
"ZFS pool is healthy (ONLINE).",
),
])
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn detect_classifies_into_buckets() {
let set = SignatureSet::linux_default();
let text = "● nginx.service\n Active: failed (Result: exit-code)\nnet.ipv4.ip_forward = 1";
let d = set.detect(text);
assert_eq!(d.failures.len(), 1);
assert_eq!(d.failures[0].id, "systemd_unit_failed");
assert_eq!(d.failures[0].invoke.as_deref(), Some("systemd-admin"));
assert_eq!(d.healthy.len(), 1);
assert_eq!(d.healthy[0].id, "ip_forwarding_enabled");
assert!(d.has_failures());
assert!(!d.is_empty());
}
#[test]
fn detect_is_case_insensitive() {
let set = SignatureSet::linux_default();
let d = set.detect("NO SPACE LEFT ON DEVICE while writing /var/log");
assert_eq!(d.failures.len(), 1);
assert_eq!(d.failures[0].id, "disk_full");
}
#[test]
fn only_first_matching_pattern_recorded_per_signature() {
let set = SignatureSet::linux_default();
// Two docker patterns present, but the signature should record once.
let d = set.detect("web exited (1)\ndb exited (137)");
assert_eq!(d.failures.len(), 1);
assert_eq!(d.failures[0].id, "docker_container_exited_error");
assert_eq!(d.failures[0].matched_text, "exited (1)");
}
#[test]
fn clean_screen_yields_empty_detection() {
let set = SignatureSet::linux_default();
let d = set.detect("$ ls\nCargo.toml src target\n$ ");
assert!(d.is_empty());
assert!(!d.has_failures());
}
#[test]
fn alertable_covers_failures_and_warnings_only() {
let set = SignatureSet::linux_default();
let d = set.detect("Active: failed\nconnection refused\nActive: active (running)");
let ids: Vec<&str> = d.alertable().map(|m| m.id.as_str()).collect();
assert!(ids.contains(&"systemd_unit_failed"));
assert!(ids.contains(&"connection_refused"));
assert!(!ids.contains(&"systemd_active_running")); // healthy is not alertable
}
#[test]
fn empty_set_matches_nothing() {
let set = SignatureSet::empty();
assert!(set.is_empty());
assert!(set.detect("Active: failed\nout of memory: killed process").is_empty());
}
#[test]
fn detection_round_trips_json() {
let set = SignatureSet::linux_default();
let d = set.detect("Active: failed");
let json = serde_json::to_string(&d).unwrap();
let back: Detection = serde_json::from_str(&json).unwrap();
assert_eq!(d, back);
assert!(json.contains("\"error\"")); // severity serialized lowercase
}
}

View file

@ -0,0 +1,362 @@
//! Terminal-text capture, content-hash framing, and deduplicated history.
//!
//! This is the complementary half of Glasspane's event model: where the rest
//! of the crate derives agent state from structured JSONL events, this module
//! records the **actual terminal text** of a pane and triages it with
//! [`SignatureSet`](crate::signatures::SignatureSet).
//!
//! The key bet is the same one the Python `tmux-screenshot` skill made: a
//! frame's identity is the **SHA-256 of its stripped text**. That makes
//! "record pane history" cheap — a [`TerminalRecorder`] drops any frame whose
//! hash equals the previous one, so polling a near-static pane every second
//! collapses into a compact log of *actual* state transitions rather than
//! thousands of duplicate frames.
//!
//! Alerts are **edge-triggered**: a failure/warning signature is reported only
//! on the frame where it first appears, not on every subsequent frame that
//! still shows it. When the condition clears and later recurs, it fires again.
//!
//! I/O is kept at the edges. [`capture_tmux_pane`] shells out to `tmux`, but
//! [`TerminalRecorder::observe`] takes raw text directly so the dedup/triage
//! logic is fully testable without a terminal (mirroring how PTY launch is
//! separated from the event state model).
use std::{
collections::{BTreeSet, VecDeque},
io,
process::Command,
sync::OnceLock,
time::SystemTime,
};
use regex::Regex;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use crate::signatures::{Detection, SignatureMatch, SignatureSet};
/// Default number of frames a recorder keeps before evicting the oldest.
pub const DEFAULT_HISTORY_CAPACITY: usize = 256;
/// One CSI escape sequence (SGR colors, cursor moves, etc.).
fn csi_re() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
RE.get_or_init(|| Regex::new(r"\x1b\[[0-9;?]*[A-Za-z]").unwrap())
}
/// Control characters to drop, preserving tab (`\x09`) and newline (`\x0a`).
/// Includes a stray ESC (`\x1b`) left over from non-CSI sequences.
fn ctrl_re() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
RE.get_or_init(|| Regex::new(r"[\x00-\x08\x0b-\x1f\x7f]").unwrap())
}
/// Strip ANSI escape sequences and control characters from terminal output,
/// leaving plain text suitable for hashing, diffing, and signature matching.
/// Tabs and newlines are preserved.
pub fn strip_ansi(text: &str) -> String {
let no_csi = csi_re().replace_all(text, "");
ctrl_re().replace_all(&no_csi, "").into_owned()
}
/// Content-addressed frame id: the first 12 hex chars of `SHA-256(stripped)`.
/// Identical screens produce identical ids — the basis for history dedup and
/// for cheap verification (the id *is* the checksum of the text).
pub fn frame_uuid(stripped: &str) -> String {
let digest = Sha256::digest(stripped.as_bytes());
let mut hex = String::with_capacity(12);
for byte in digest.iter().take(6) {
hex.push_str(&format!("{byte:02x}"));
}
hex
}
/// One recorded screen of terminal text plus its triage result.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CapturedFrame {
/// `frame_uuid` of `text` — also the dedup key.
pub uuid: String,
/// ANSI-stripped terminal text.
pub text: String,
/// RFC3339 capture time (matches the Glasspane snapshot convention).
pub observed_at: String,
/// Signatures detected in this frame.
pub detection: Detection,
}
/// Outcome of feeding one capture to a [`TerminalRecorder`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Observation {
/// The screen was identical to the last recorded frame — dropped, no
/// history entry, no alerts.
Unchanged,
/// The screen changed and was recorded.
Recorded {
/// Content hash of the new frame.
uuid: String,
/// Failure/warning signatures that *just appeared* (edge-triggered).
/// Empty when nothing newly fired, even if the frame still shows a
/// previously-reported condition.
new_alerts: Vec<SignatureMatch>,
},
}
impl Observation {
/// True when this observation produced a new history frame.
pub fn is_recorded(&self) -> bool {
matches!(self, Observation::Recorded { .. })
}
/// Newly-fired alerts, if any (empty for `Unchanged`).
pub fn new_alerts(&self) -> &[SignatureMatch] {
match self {
Observation::Recorded { new_alerts, .. } => new_alerts,
Observation::Unchanged => &[],
}
}
}
/// A deduplicated, signature-triaged history of one pane's terminal text.
///
/// Feed it raw captures with [`observe`](Self::observe); it keeps only frames
/// whose content actually changed and reports failure/warning signatures on
/// their rising edge. Bounded to `capacity` frames (ring buffer).
#[derive(Debug, Clone)]
pub struct TerminalRecorder {
pane_id: String,
capacity: usize,
signatures: SignatureSet,
last_hash: Option<String>,
/// Failure/warning signature ids firing as of the last recorded frame.
active: BTreeSet<String>,
frames: VecDeque<CapturedFrame>,
}
impl TerminalRecorder {
/// Create a recorder for `pane_id` with an explicit signature set and
/// history capacity.
pub fn new(pane_id: impl Into<String>, signatures: SignatureSet, capacity: usize) -> Self {
Self {
pane_id: pane_id.into(),
capacity: capacity.max(1),
signatures,
last_hash: None,
active: BTreeSet::new(),
frames: VecDeque::new(),
}
}
/// Recorder with the Linux default signature set and default capacity.
pub fn linux(pane_id: impl Into<String>) -> Self {
Self::new(pane_id, SignatureSet::linux_default(), DEFAULT_HISTORY_CAPACITY)
}
pub fn pane_id(&self) -> &str {
&self.pane_id
}
pub fn len(&self) -> usize {
self.frames.len()
}
pub fn is_empty(&self) -> bool {
self.frames.is_empty()
}
/// All retained frames, oldest first.
pub fn frames(&self) -> impl Iterator<Item = &CapturedFrame> {
self.frames.iter()
}
/// The most recently recorded frame, if any.
pub fn latest(&self) -> Option<&CapturedFrame> {
self.frames.back()
}
/// Failure/warning signature ids currently considered "firing".
pub fn active_alerts(&self) -> impl Iterator<Item = &str> {
self.active.iter().map(String::as_str)
}
/// Record one raw terminal capture observed at `observed_at`.
///
/// Returns [`Observation::Unchanged`] if the stripped text hashes to the
/// same id as the previous frame (dropped), otherwise
/// [`Observation::Recorded`] with any edge-triggered alerts.
pub fn observe(&mut self, raw: &str, observed_at: SystemTime) -> Observation {
let text = strip_ansi(raw);
let uuid = frame_uuid(&text);
if self.last_hash.as_deref() == Some(uuid.as_str()) {
return Observation::Unchanged;
}
let detection = self.signatures.detect(&text);
// Edge-trigger: alertable signatures not firing as of the last frame.
let current: BTreeSet<String> =
detection.alertable().map(|m| m.id.clone()).collect();
let new_alerts: Vec<SignatureMatch> = detection
.alertable()
.filter(|m| !self.active.contains(&m.id))
.cloned()
.collect();
self.active = current;
let frame = CapturedFrame {
uuid: uuid.clone(),
text,
observed_at: crate::system_time_to_rfc3339(observed_at),
detection,
};
self.frames.push_back(frame);
if self.frames.len() > self.capacity {
self.frames.pop_front();
}
self.last_hash = Some(uuid.clone());
Observation::Recorded { uuid, new_alerts }
}
/// Capture the named tmux target and record it. Convenience wrapper over
/// [`capture_tmux_pane`] + [`observe`](Self::observe) for the daemon's
/// poll loop. Errors propagate the tmux capture failure.
pub fn observe_tmux(&mut self, target: &str, observed_at: SystemTime) -> io::Result<Observation> {
let raw = capture_tmux_pane(target)?;
Ok(self.observe(&raw, observed_at))
}
}
/// Capture a tmux pane's visible content (with escape sequences) as a string.
///
/// `target` is any tmux target spec: `session`, `session:window`,
/// `session:window.pane`, or an absolute pane id like `%3`. Runs
/// `tmux capture-pane -e -p -t <target>`. Requires `tmux` on `PATH`.
pub fn capture_tmux_pane(target: &str) -> io::Result<String> {
let output = Command::new("tmux")
.args(["capture-pane", "-e", "-p", "-t", target])
.output()?;
if !output.status.success() {
return Err(io::Error::other(format!(
"tmux capture-pane failed for {target:?}: {}",
String::from_utf8_lossy(&output.stderr).trim()
)));
}
Ok(String::from_utf8_lossy(&output.stdout).into_owned())
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
fn t(secs: u64) -> SystemTime {
SystemTime::UNIX_EPOCH + Duration::from_secs(secs)
}
#[test]
fn strip_ansi_removes_color_and_control_keeps_text() {
// green "ok", reset, then a cursor-home CSI and a stray carriage return.
let raw = "\x1b[32mok\x1b[0m\x1b[Hdone\r\n";
assert_eq!(strip_ansi(raw), "okdone\n");
}
#[test]
fn strip_ansi_preserves_tabs_and_newlines() {
assert_eq!(strip_ansi("a\tb\nc"), "a\tb\nc");
}
#[test]
fn frame_uuid_is_stable_and_12_hex() {
let a = frame_uuid("hello world");
let b = frame_uuid("hello world");
assert_eq!(a, b);
assert_eq!(a.len(), 12);
assert!(a.chars().all(|c| c.is_ascii_hexdigit()));
assert_ne!(frame_uuid("hello world"), frame_uuid("hello worle"));
}
#[test]
fn uuid_ignores_color_only_differences() {
// Same text, different coloring → same frame id (dedup works on content).
let plain = "\x1b[31mALERT\x1b[0m";
let other = "\x1b[33mALERT\x1b[0m";
assert_eq!(frame_uuid(&strip_ansi(plain)), frame_uuid(&strip_ansi(other)));
}
#[test]
fn identical_screen_is_dropped() {
let mut rec = TerminalRecorder::linux("pane-1");
let first = rec.observe("$ idle prompt\n", t(1));
assert!(first.is_recorded());
let second = rec.observe("$ idle prompt\n", t(2));
assert_eq!(second, Observation::Unchanged);
assert_eq!(rec.len(), 1, "duplicate frame must not be stored");
}
#[test]
fn changed_screen_is_recorded() {
let mut rec = TerminalRecorder::linux("pane-1");
rec.observe("frame one\n", t(1));
rec.observe("frame two\n", t(2));
assert_eq!(rec.len(), 2);
assert_eq!(rec.latest().unwrap().text, "frame two\n");
assert_eq!(rec.latest().unwrap().observed_at, "1970-01-01T00:00:02.000Z");
}
#[test]
fn failure_alert_is_edge_triggered() {
let mut rec = TerminalRecorder::linux("pane-1");
// Rising edge: failure appears → alert fires once.
let o1 = rec.observe("boot log\nActive: failed\n", t(1));
assert_eq!(o1.new_alerts().len(), 1);
assert_eq!(o1.new_alerts()[0].id, "systemd_unit_failed");
// Different screen, failure still present → no repeat alert.
let o2 = rec.observe("boot log\nActive: failed\nstill broken\n", t(2));
assert!(o2.is_recorded());
assert_eq!(o2.new_alerts().len(), 0, "persisting failure must not re-alert");
// Condition clears.
let o3 = rec.observe("Active: active (running)\n", t(3));
assert_eq!(o3.new_alerts().len(), 0);
assert!(rec.active_alerts().next().is_none());
// Recurs → fires again (falling then rising edge).
let o4 = rec.observe("Active: failed\nagain\n", t(4));
assert_eq!(o4.new_alerts().len(), 1);
assert_eq!(o4.new_alerts()[0].id, "systemd_unit_failed");
}
#[test]
fn ring_buffer_evicts_oldest() {
let mut rec = TerminalRecorder::new("pane-1", SignatureSet::empty(), 3);
for i in 0..5 {
rec.observe(&format!("frame {i}\n"), t(i));
}
assert_eq!(rec.len(), 3);
let texts: Vec<String> = rec.frames().map(|f| f.text.clone()).collect();
assert_eq!(texts, vec!["frame 2\n", "frame 3\n", "frame 4\n"]);
}
#[test]
fn capacity_floor_is_one() {
let mut rec = TerminalRecorder::new("p", SignatureSet::empty(), 0);
rec.observe("a\n", t(1));
rec.observe("b\n", t(2));
assert_eq!(rec.len(), 1);
}
#[test]
fn frame_carries_detection_for_storage() {
let mut rec = TerminalRecorder::linux("pane-1");
rec.observe("no space left on device\n", t(1));
let frame = rec.latest().unwrap();
assert!(frame.detection.has_failures());
// Frame is serializable for history persistence.
let json = serde_json::to_string(frame).unwrap();
let back: CapturedFrame = serde_json::from_str(&json).unwrap();
assert_eq!(*frame, back);
}
}