feat(glasspane): terminal capture, signature triage, and edge-triggered alerts
Add the screen-scraping half of Glasspane to complement its event-state model. Ports the *brain* of the clawdie-ai tmux-screenshot skill (not the PNG) into the Rust core and wires it into the daemon. colibri-glasspane: - terminal.rs: strip_ansi, content-hash frame ids (sha256[:12]), CapturedFrame, and TerminalRecorder — a deduped ring buffer that drops frames identical to the previous one (so polling a static pane collapses to a log of real transitions) with edge-triggered alerting (a failure fires once on its rising edge, re-fires only after it clears). Thin capture_tmux_pane seam keeps I/O out of the testable core. - signatures.rs: data-driven Severity/Signature/Detection/SignatureSet matcher with a high-value linux_default() set (systemd/oom/disk/docker/ forwarding). Per-OS set is the hook for capability-routing. colibri-daemon: - DaemonState.terminal map of per-pane recorders; poll tick in run_loop gated on COLIBRI_TERMINAL_CAPTURE, seeded from COLIBRI_TERMINAL_WATCH. - capture_and_record() shares the blocking tmux capture (on spawn_blocking) + brief lock fold between the loop and the socket; env-gated Telegram alert routing that no-ops cleanly when unconfigured. - socket cmds: terminal-watch/unwatch/list/history/poll. - env_bool helper: forgiving truthy parsing (1/true/yes/on) so COLIBRI_TERMINAL_CAPTURE=1 is not silently false like bool::from_str. Tests: 17 new glasspane unit tests + daemon socket/config tests; whole workspace green, clippy clean. Verified live on Linux (domedog): autonomous loop deduped ~5 ticks into 2 frames and fired one edge-triggered alert. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
e953b1c050
commit
0509ed76bc
12 changed files with 1141 additions and 4 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
|
@ -361,8 +361,10 @@ version = "0.12.0"
|
|||
dependencies = [
|
||||
"chrono",
|
||||
"portable-pty",
|
||||
"regex",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
|||
|
|
@ -31,6 +31,11 @@ fn check_config() -> DaemonConfig {
|
|||
cache_warming_interval_hours: 0,
|
||||
headroom_enabled: false,
|
||||
headroom_socket_path: std::path::PathBuf::from("/var/run/colibri/headroom.sock"),
|
||||
terminal_capture_enabled: false,
|
||||
terminal_capture_interval_secs: 5,
|
||||
terminal_watch_targets: Vec::new(),
|
||||
telegram_bot_token: None,
|
||||
telegram_chat_id: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -53,6 +53,17 @@ pub struct DaemonConfig {
|
|||
pub headroom_enabled: bool,
|
||||
/// Path to the headroom sidecar Unix socket.
|
||||
pub headroom_socket_path: PathBuf,
|
||||
/// Enable the terminal-capture poll loop (tmux pane history + signature
|
||||
/// alerts). Disabled by default — only ticks when panes are watched.
|
||||
pub terminal_capture_enabled: bool,
|
||||
/// How often (seconds) the poll loop captures each watched pane.
|
||||
pub terminal_capture_interval_secs: u64,
|
||||
/// tmux targets to watch from startup (CSV: `session:window`, `%pane`, …).
|
||||
pub terminal_watch_targets: Vec<String>,
|
||||
/// Telegram bot token for routing terminal alerts (optional).
|
||||
pub telegram_bot_token: Option<String>,
|
||||
/// Telegram chat id that terminal alerts are sent to (optional).
|
||||
pub telegram_chat_id: Option<String>,
|
||||
}
|
||||
|
||||
impl DaemonConfig {
|
||||
|
|
@ -103,6 +114,13 @@ impl DaemonConfig {
|
|||
headroom_socket_path: std::env::var("COLIBRI_HEADROOM_SOCKET")
|
||||
.map(PathBuf::from)
|
||||
.unwrap_or_else(|_| PathBuf::from("/var/run/colibri/headroom.sock")),
|
||||
terminal_capture_enabled: env_bool("COLIBRI_TERMINAL_CAPTURE"),
|
||||
terminal_capture_interval_secs: env_parse("COLIBRI_TERMINAL_CAPTURE_INTERVAL_SECS")
|
||||
.filter(|&n| n > 0)
|
||||
.unwrap_or(5),
|
||||
terminal_watch_targets: env_csv("COLIBRI_TERMINAL_WATCH"),
|
||||
telegram_bot_token: nonempty_env("TELEGRAM_BOT_TOKEN"),
|
||||
telegram_chat_id: nonempty_env("TELEGRAM_CHAT_ID"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -119,6 +137,29 @@ fn env_parse<T: std::str::FromStr>(name: &str) -> Option<T> {
|
|||
std::env::var(name).ok().and_then(|v| v.parse().ok())
|
||||
}
|
||||
|
||||
/// Parse a boolean env var, accepting the common truthy spellings
|
||||
/// (`1`/`true`/`yes`/`on`, case-insensitive). Anything else — including unset —
|
||||
/// is false. More forgiving than `bool::from_str`, which only takes
|
||||
/// `true`/`false` and would silently treat `=1` as false.
|
||||
fn env_bool(name: &str) -> bool {
|
||||
std::env::var(name)
|
||||
.map(|v| matches!(v.trim().to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Parse a comma-separated env var into trimmed, non-empty entries.
|
||||
fn env_csv(name: &str) -> Vec<String> {
|
||||
std::env::var(name)
|
||||
.ok()
|
||||
.map(|v| {
|
||||
v.split(',')
|
||||
.map(|s| s.trim().to_string())
|
||||
.filter(|s| !s.is_empty())
|
||||
.collect()
|
||||
})
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
fn dirs_data() -> Option<PathBuf> {
|
||||
std::env::var("XDG_DATA_HOME")
|
||||
.ok()
|
||||
|
|
@ -190,6 +231,26 @@ mod tests {
|
|||
assert!(config.anthropic_api_key.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn env_bool_accepts_common_truthy_spellings() {
|
||||
let _guard = ENV_LOCK.lock().unwrap();
|
||||
for (val, expected) in [
|
||||
("1", true),
|
||||
("true", true),
|
||||
("TRUE", true),
|
||||
("yes", true),
|
||||
("on", true),
|
||||
("0", false),
|
||||
("false", false),
|
||||
("nope", false),
|
||||
] {
|
||||
std::env::set_var("COLIBRI_TEST_BOOL", val);
|
||||
assert_eq!(env_bool("COLIBRI_TEST_BOOL"), expected, "value {val:?}");
|
||||
}
|
||||
std::env::remove_var("COLIBRI_TEST_BOOL");
|
||||
assert!(!env_bool("COLIBRI_TEST_BOOL"), "unset is false");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_config_from_env_vars() {
|
||||
let _guard = ENV_LOCK.lock().unwrap();
|
||||
|
|
|
|||
|
|
@ -1,10 +1,11 @@
|
|||
//! Main daemon loop — heartbeat, task polling, session rotation, memory handoff, cache warming.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
use chrono::Utc;
|
||||
use colibri_glasspane::PaneSupervisor;
|
||||
use colibri_glasspane::{Observation, PaneSupervisor, SignatureMatch, TerminalRecorder};
|
||||
use colibri_store::Store;
|
||||
use dashmap::DashMap;
|
||||
use tokio::sync::{broadcast, Mutex, RwLock};
|
||||
|
|
@ -20,6 +21,10 @@ pub struct DaemonState {
|
|||
pub sessions: DashMap<String, Session>,
|
||||
pub agents: DashMap<String, AgentHandle>,
|
||||
pub glasspane: RwLock<PaneSupervisor>,
|
||||
/// Per-pane terminal-text recorders (tmux target → deduped history +
|
||||
/// signature triage). Populated from config at startup and via the
|
||||
/// `terminal-watch` socket command.
|
||||
pub terminal: RwLock<BTreeMap<String, TerminalRecorder>>,
|
||||
pub store: std::sync::Mutex<Store>,
|
||||
pub scheduler: Mutex<crate::scheduler::Scheduler>,
|
||||
pub last_warm_at: RwLock<Option<chrono::DateTime<Utc>>>,
|
||||
|
|
@ -48,6 +53,7 @@ impl DaemonState {
|
|||
sessions: DashMap::new(),
|
||||
agents: DashMap::new(),
|
||||
glasspane: RwLock::new(PaneSupervisor::new()),
|
||||
terminal: RwLock::new(BTreeMap::new()),
|
||||
store: std::sync::Mutex::new(store),
|
||||
scheduler: Mutex::new(crate::scheduler::Scheduler::new()),
|
||||
last_warm_at: RwLock::new(None),
|
||||
|
|
@ -70,6 +76,7 @@ pub struct DaemonLoopConfig {
|
|||
pub memory_handoff_interval: Duration,
|
||||
pub agent_stall_timeout: Duration,
|
||||
pub scheduler_interval: Duration,
|
||||
pub terminal_capture_interval: Duration,
|
||||
}
|
||||
|
||||
impl Default for DaemonLoopConfig {
|
||||
|
|
@ -80,6 +87,7 @@ impl Default for DaemonLoopConfig {
|
|||
memory_handoff_interval: Duration::from_secs(120),
|
||||
agent_stall_timeout: Duration::from_secs(300),
|
||||
scheduler_interval: Duration::from_secs(30),
|
||||
terminal_capture_interval: Duration::from_secs(5),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -96,11 +104,16 @@ pub async fn run_loop(
|
|||
let mut rotation_tick = tokio::time::interval(loop_config.session_rotation_interval);
|
||||
let mut handoff_tick = tokio::time::interval(loop_config.memory_handoff_interval);
|
||||
let mut scheduler_tick = tokio::time::interval(loop_config.scheduler_interval);
|
||||
let mut terminal_tick = tokio::time::interval(loop_config.terminal_capture_interval);
|
||||
|
||||
heartbeat_tick.tick().await;
|
||||
rotation_tick.tick().await;
|
||||
handoff_tick.tick().await;
|
||||
scheduler_tick.tick().await;
|
||||
terminal_tick.tick().await;
|
||||
|
||||
// Seed terminal-capture watch list from config.
|
||||
seed_terminal_watches(&state).await;
|
||||
|
||||
info!("daemon background loop started");
|
||||
|
||||
|
|
@ -130,6 +143,9 @@ pub async fn run_loop(
|
|||
_ = rotation_tick.tick() => session_rotation(&state).await,
|
||||
_ = handoff_tick.tick() => memory_handoff(&state).await,
|
||||
_ = scheduler_tick.tick() => scheduler_tick_fn(&state).await,
|
||||
_ = terminal_tick.tick(), if state.config.terminal_capture_enabled => {
|
||||
terminal_capture_tick(&state).await;
|
||||
}
|
||||
_ = shutdown_rx.recv() => {
|
||||
info!("daemon loop received shutdown signal");
|
||||
break;
|
||||
|
|
@ -479,6 +495,125 @@ async fn scheduler_tick_fn(state: &SharedState) {
|
|||
poll_tasks(state).await;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Terminal capture — tmux pane history + signature alerts
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Register every `terminal_watch_targets` entry as a Linux-signature recorder
|
||||
/// (idempotent — existing recorders are left untouched).
|
||||
pub(crate) async fn seed_terminal_watches(state: &SharedState) {
|
||||
if state.config.terminal_watch_targets.is_empty() {
|
||||
return;
|
||||
}
|
||||
let mut map = state.terminal.write().await;
|
||||
for target in &state.config.terminal_watch_targets {
|
||||
map.entry(target.clone())
|
||||
.or_insert_with(|| TerminalRecorder::linux(target.clone()));
|
||||
}
|
||||
info!(
|
||||
watched = map.len(),
|
||||
enabled = state.config.terminal_capture_enabled,
|
||||
"terminal capture watch list seeded"
|
||||
);
|
||||
}
|
||||
|
||||
/// Capture and record every watched pane once. Per-pane capture errors (e.g. a
|
||||
/// pane that has gone away) are logged at debug and skipped — one bad target
|
||||
/// must not stall the others.
|
||||
async fn terminal_capture_tick(state: &SharedState) {
|
||||
let targets: Vec<String> = state.terminal.read().await.keys().cloned().collect();
|
||||
for target in targets {
|
||||
if let Err(e) = capture_and_record(state, &target).await {
|
||||
debug!(target = %target, error = %e, "terminal capture skipped");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Capture one watched pane, fold it into its recorder, and route any
|
||||
/// edge-triggered alerts. Returns the [`Observation`] so callers (the poll loop
|
||||
/// and the `terminal-poll` socket command) can report what happened.
|
||||
///
|
||||
/// The blocking `tmux capture-pane` runs on a blocking thread so it never holds
|
||||
/// the async runtime; the recorder lock is taken only briefly to fold the text.
|
||||
pub(crate) async fn capture_and_record(
|
||||
state: &SharedState,
|
||||
target: &str,
|
||||
) -> Result<Observation, String> {
|
||||
let owned = target.to_string();
|
||||
let raw = tokio::task::spawn_blocking(move || colibri_glasspane::capture_tmux_pane(&owned))
|
||||
.await
|
||||
.map_err(|e| format!("capture task failed: {e}"))?
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
let observation = {
|
||||
let mut map = state.terminal.write().await;
|
||||
let recorder = map
|
||||
.get_mut(target)
|
||||
.ok_or_else(|| format!("pane {target:?} is not being watched"))?;
|
||||
recorder.observe(&raw, SystemTime::now())
|
||||
};
|
||||
|
||||
if let Observation::Recorded { uuid, new_alerts } = &observation {
|
||||
if new_alerts.is_empty() {
|
||||
debug!(target = %target, uuid = %uuid, "terminal frame recorded");
|
||||
} else {
|
||||
let message = format_terminal_alerts(state, target, new_alerts);
|
||||
warn!(target = %target, uuid = %uuid, alerts = new_alerts.len(), "terminal alert fired");
|
||||
notify_telegram(state, &message).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(observation)
|
||||
}
|
||||
|
||||
/// Build a human-readable, Telegram-friendly alert message from edge-triggered
|
||||
/// signature matches.
|
||||
fn format_terminal_alerts(state: &SharedState, target: &str, alerts: &[SignatureMatch]) -> String {
|
||||
let mut lines = vec![format!(
|
||||
"⚠ colibri@{}: {} new alert(s) on tmux {}",
|
||||
state.config.host,
|
||||
alerts.len(),
|
||||
target
|
||||
)];
|
||||
for a in alerts {
|
||||
let severity = format!("{:?}", a.severity).to_uppercase();
|
||||
let invoke = a
|
||||
.invoke
|
||||
.as_deref()
|
||||
.map(|s| format!(" [fix: run `{s}`]"))
|
||||
.unwrap_or_default();
|
||||
lines.push(format!("• {severity} {} — {}{invoke}", a.id, a.next_action));
|
||||
}
|
||||
lines.join("\n")
|
||||
}
|
||||
|
||||
/// Send an alert to Telegram if a bot token + chat id are configured.
|
||||
/// No-op (debug log only) when unconfigured, so the feature degrades cleanly.
|
||||
async fn notify_telegram(state: &SharedState, text: &str) {
|
||||
let (Some(token), Some(chat_id)) = (
|
||||
state.config.telegram_bot_token.as_deref(),
|
||||
state.config.telegram_chat_id.as_deref(),
|
||||
) else {
|
||||
debug!("telegram alert not sent: bot token / chat id not configured");
|
||||
return;
|
||||
};
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let url = format!("https://api.telegram.org/bot{token}/sendMessage");
|
||||
let send = client
|
||||
.post(&url)
|
||||
.json(&serde_json::json!({ "chat_id": chat_id, "text": text }))
|
||||
.timeout(Duration::from_secs(10))
|
||||
.send()
|
||||
.await;
|
||||
|
||||
match send {
|
||||
Ok(resp) if resp.status().is_success() => debug!("telegram alert sent"),
|
||||
Ok(resp) => warn!(status = %resp.status(), "telegram alert rejected"),
|
||||
Err(e) => warn!(error = %e, "telegram alert send failed"),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
|
|||
|
|
@ -105,6 +105,31 @@ pub enum ColibriCommand {
|
|||
},
|
||||
#[serde(rename = "list-tenants")]
|
||||
ListTenants,
|
||||
// ── Terminal capture ──────────────────────────────────────
|
||||
/// Start recording a tmux pane's terminal text (deduped history +
|
||||
/// signature triage). `target` is any tmux target spec.
|
||||
#[serde(rename = "terminal-watch")]
|
||||
TerminalWatch { target: String },
|
||||
/// Stop recording a pane and drop its history.
|
||||
#[serde(rename = "terminal-unwatch")]
|
||||
TerminalUnwatch { target: String },
|
||||
/// List watched panes with frame counts and active alerts.
|
||||
#[serde(rename = "terminal-list")]
|
||||
TerminalList,
|
||||
/// Return recorded frames for a pane (most recent `limit`, default 20).
|
||||
#[serde(rename = "terminal-history")]
|
||||
TerminalHistory {
|
||||
target: String,
|
||||
#[serde(default)]
|
||||
limit: Option<usize>,
|
||||
},
|
||||
/// Capture watched panes immediately (one `target`, or all when omitted)
|
||||
/// instead of waiting for the poll tick.
|
||||
#[serde(rename = "terminal-poll")]
|
||||
TerminalPoll {
|
||||
#[serde(default)]
|
||||
target: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Outbound control-plane response.
|
||||
|
|
|
|||
|
|
@ -82,7 +82,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
// Start the daemon background loop (heartbeat, session rotation, scheduler)
|
||||
let loop_state = state.clone();
|
||||
let loop_shutdown = state.shutdown_rx.resubscribe();
|
||||
let loop_config = daemon::DaemonLoopConfig::default();
|
||||
let loop_config = daemon::DaemonLoopConfig {
|
||||
terminal_capture_interval: std::time::Duration::from_secs(
|
||||
config.terminal_capture_interval_secs,
|
||||
),
|
||||
..daemon::DaemonLoopConfig::default()
|
||||
};
|
||||
let loop_handle = tokio::spawn(async move {
|
||||
daemon::run_loop(loop_state, loop_config, loop_shutdown).await;
|
||||
});
|
||||
|
|
|
|||
|
|
@ -363,6 +363,11 @@ mod tests {
|
|||
cache_warming_interval_hours: 0,
|
||||
headroom_enabled: false,
|
||||
headroom_socket_path: std::path::PathBuf::from("/var/run/colibri/headroom.sock"),
|
||||
terminal_capture_enabled: false,
|
||||
terminal_capture_interval_secs: 5,
|
||||
terminal_watch_targets: Vec::new(),
|
||||
telegram_bot_token: None,
|
||||
telegram_chat_id: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -285,6 +285,13 @@ async fn dispatch(cmd: ColibriCommand, state: &SharedState) -> ColibriResponse {
|
|||
collection_id,
|
||||
} => cmd_register_tenant(state, tenant_id, jail_root_path, collection_id).await,
|
||||
ColibriCommand::ListTenants => cmd_list_tenants(state).await,
|
||||
ColibriCommand::TerminalWatch { target } => cmd_terminal_watch(state, target).await,
|
||||
ColibriCommand::TerminalUnwatch { target } => cmd_terminal_unwatch(state, target).await,
|
||||
ColibriCommand::TerminalList => cmd_terminal_list(state).await,
|
||||
ColibriCommand::TerminalHistory { target, limit } => {
|
||||
cmd_terminal_history(state, target, limit).await
|
||||
}
|
||||
ColibriCommand::TerminalPoll { target } => cmd_terminal_poll(state, target).await,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -383,6 +390,115 @@ async fn cmd_glasspane_snapshot(state: &SharedState) -> ColibriResponse {
|
|||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Terminal capture commands
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async fn cmd_terminal_watch(state: &SharedState, target: String) -> ColibriResponse {
|
||||
if target.trim().is_empty() {
|
||||
return ColibriResponse::err("target must not be empty");
|
||||
}
|
||||
let mut map = state.terminal.write().await;
|
||||
let already = map.contains_key(&target);
|
||||
map.entry(target.clone())
|
||||
.or_insert_with(|| colibri_glasspane::TerminalRecorder::linux(target.clone()));
|
||||
info!(target = %target, already, "terminal-watch");
|
||||
ColibriResponse::ok(serde_json::json!({
|
||||
"target": target,
|
||||
"watching": true,
|
||||
"already_watched": already,
|
||||
"capture_enabled": state.config.terminal_capture_enabled,
|
||||
"watched_count": map.len(),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn cmd_terminal_unwatch(state: &SharedState, target: String) -> ColibriResponse {
|
||||
let removed = state.terminal.write().await.remove(&target).is_some();
|
||||
ColibriResponse::ok(serde_json::json!({
|
||||
"target": target,
|
||||
"removed": removed,
|
||||
}))
|
||||
}
|
||||
|
||||
async fn cmd_terminal_list(state: &SharedState) -> ColibriResponse {
|
||||
let map = state.terminal.read().await;
|
||||
let panes: Vec<_> = map
|
||||
.values()
|
||||
.map(|rec| {
|
||||
let latest = rec.latest();
|
||||
serde_json::json!({
|
||||
"target": rec.pane_id(),
|
||||
"frames": rec.len(),
|
||||
"active_alerts": rec.active_alerts().collect::<Vec<_>>(),
|
||||
"latest_uuid": latest.map(|f| f.uuid.clone()),
|
||||
"latest_at": latest.map(|f| f.observed_at.clone()),
|
||||
"latest_failures": latest.map(|f| f.detection.failures.len()).unwrap_or(0),
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
ColibriResponse::ok(serde_json::json!({
|
||||
"capture_enabled": state.config.terminal_capture_enabled,
|
||||
"interval_secs": state.config.terminal_capture_interval_secs,
|
||||
"panes": panes,
|
||||
}))
|
||||
}
|
||||
|
||||
async fn cmd_terminal_history(
|
||||
state: &SharedState,
|
||||
target: String,
|
||||
limit: Option<usize>,
|
||||
) -> ColibriResponse {
|
||||
let map = state.terminal.read().await;
|
||||
let Some(rec) = map.get(&target) else {
|
||||
return ColibriResponse::err(format!("pane {target:?} is not being watched"));
|
||||
};
|
||||
let frames: Vec<_> = rec.frames().collect();
|
||||
let n = limit.unwrap_or(20).min(frames.len());
|
||||
let tail = &frames[frames.len() - n..];
|
||||
match serde_json::to_value(tail) {
|
||||
Ok(value) => ColibriResponse::ok(serde_json::json!({
|
||||
"target": target,
|
||||
"total_frames": rec.len(),
|
||||
"returned": n,
|
||||
"frames": value,
|
||||
})),
|
||||
Err(e) => ColibriResponse::err(format!("history serialization failed: {e}")),
|
||||
}
|
||||
}
|
||||
|
||||
async fn cmd_terminal_poll(state: &SharedState, target: Option<String>) -> ColibriResponse {
|
||||
let targets: Vec<String> = match target {
|
||||
Some(t) => vec![t],
|
||||
None => state.terminal.read().await.keys().cloned().collect(),
|
||||
};
|
||||
if targets.is_empty() {
|
||||
return ColibriResponse::err("no panes are being watched");
|
||||
}
|
||||
|
||||
let mut results = Vec::new();
|
||||
for target in targets {
|
||||
let entry = match crate::daemon::capture_and_record(state, &target).await {
|
||||
Ok(colibri_glasspane::Observation::Unchanged) => serde_json::json!({
|
||||
"target": target,
|
||||
"status": "unchanged",
|
||||
}),
|
||||
Ok(colibri_glasspane::Observation::Recorded { uuid, new_alerts }) => serde_json::json!({
|
||||
"target": target,
|
||||
"status": "recorded",
|
||||
"uuid": uuid,
|
||||
"new_alerts": new_alerts,
|
||||
}),
|
||||
Err(e) => serde_json::json!({
|
||||
"target": target,
|
||||
"status": "error",
|
||||
"error": e,
|
||||
}),
|
||||
};
|
||||
results.push(entry);
|
||||
}
|
||||
ColibriResponse::ok(serde_json::json!({ "results": results }))
|
||||
}
|
||||
|
||||
async fn cmd_list_sessions(state: &SharedState) -> ColibriResponse {
|
||||
let mut sessions = Vec::new();
|
||||
for entry in state.sessions.iter() {
|
||||
|
|
@ -1031,6 +1147,11 @@ mod tests {
|
|||
cache_warming_interval_hours: 0,
|
||||
headroom_enabled: false,
|
||||
headroom_socket_path: std::path::PathBuf::from("/var/run/colibri/headroom.sock"),
|
||||
terminal_capture_enabled: false,
|
||||
terminal_capture_interval_secs: 5,
|
||||
terminal_watch_targets: Vec::new(),
|
||||
telegram_bot_token: None,
|
||||
telegram_chat_id: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1142,6 +1263,50 @@ mod tests {
|
|||
assert_eq!(pane.session_id.as_deref(), Some("pi-sample"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn terminal_watch_list_unwatch_roundtrip() {
|
||||
let state: SharedState = Arc::new(DaemonState::new(test_config()));
|
||||
|
||||
let watched = cmd_terminal_watch(&state, "smoketest:0".to_string()).await;
|
||||
assert!(watched.ok);
|
||||
assert_eq!(watched.data.unwrap()["already_watched"], false);
|
||||
|
||||
// Idempotent: watching again reports already_watched.
|
||||
let again = cmd_terminal_watch(&state, "smoketest:0".to_string()).await;
|
||||
assert_eq!(again.data.unwrap()["already_watched"], true);
|
||||
|
||||
let list = cmd_terminal_list(&state).await;
|
||||
let data = list.data.unwrap();
|
||||
assert_eq!(data["panes"].as_array().unwrap().len(), 1);
|
||||
assert_eq!(data["panes"][0]["target"], "smoketest:0");
|
||||
assert_eq!(data["panes"][0]["frames"], 0);
|
||||
|
||||
let removed = cmd_terminal_unwatch(&state, "smoketest:0".to_string()).await;
|
||||
assert_eq!(removed.data.unwrap()["removed"], true);
|
||||
let empty = cmd_terminal_list(&state).await;
|
||||
assert_eq!(empty.data.unwrap()["panes"].as_array().unwrap().len(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn terminal_watch_rejects_empty_target() {
|
||||
let state: SharedState = Arc::new(DaemonState::new(test_config()));
|
||||
assert!(!cmd_terminal_watch(&state, " ".to_string()).await.ok);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn terminal_history_errors_for_unwatched_pane() {
|
||||
let state: SharedState = Arc::new(DaemonState::new(test_config()));
|
||||
let resp = cmd_terminal_history(&state, "ghost".to_string(), None).await;
|
||||
assert!(!resp.ok);
|
||||
assert!(resp.error.unwrap().contains("not being watched"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn terminal_poll_errors_when_nothing_watched() {
|
||||
let state: SharedState = Arc::new(DaemonState::new(test_config()));
|
||||
assert!(!cmd_terminal_poll(&state, None).await.ok);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn clear_stale_socket_returns_true_when_absent() {
|
||||
let dir = std::env::temp_dir().join(format!(
|
||||
|
|
|
|||
|
|
@ -10,3 +10,6 @@ chrono = { version = "0.4", default-features = false, features = ["clock"] }
|
|||
portable-pty = "0.9"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
# Terminal-capture layer: content-hash frame ids + ANSI stripping.
|
||||
sha2 = "0.10"
|
||||
regex = "1"
|
||||
|
|
|
|||
|
|
@ -12,6 +12,14 @@
|
|||
//! is scaffolded with `portable-pty`; tests use sample JSONL readers until the
|
||||
//! FreeBSD validation lane exercises real terminals.
|
||||
|
||||
pub mod signatures;
|
||||
pub mod terminal;
|
||||
|
||||
pub use signatures::{Detection, Severity, Signature, SignatureMatch, SignatureSet};
|
||||
pub use terminal::{
|
||||
capture_tmux_pane, frame_uuid, strip_ansi, CapturedFrame, Observation, TerminalRecorder,
|
||||
};
|
||||
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
io::{self, BufRead, BufReader, Read, Write},
|
||||
|
|
@ -619,7 +627,7 @@ fn skip_false(value: &bool) -> bool {
|
|||
!*value
|
||||
}
|
||||
|
||||
fn system_time_to_rfc3339(time: SystemTime) -> String {
|
||||
pub(crate) fn system_time_to_rfc3339(time: SystemTime) -> String {
|
||||
let dt: DateTime<Utc> = time.into();
|
||||
dt.to_rfc3339_opts(SecondsFormat::Millis, true)
|
||||
}
|
||||
|
|
|
|||
361
crates/colibri-glasspane/src/signatures.rs
Normal file
361
crates/colibri-glasspane/src/signatures.rs
Normal file
|
|
@ -0,0 +1,361 @@
|
|||
//! Signature-based triage of captured terminal text.
|
||||
//!
|
||||
//! A [`SignatureSet`] scans stripped terminal text for known patterns and
|
||||
//! classifies the screen into `failures` / `warnings` / `info` / `healthy`.
|
||||
//! Each match carries a human `next_action` and an optional `invoke` (a skill
|
||||
//! the agent can run to remediate) — so a hit is not just "something happened"
|
||||
//! but "here is what it means and what to do".
|
||||
//!
|
||||
//! The detection engine is data-driven: the FreeBSD host and a Linux host load
|
||||
//! different [`Signature`] sets but share this matcher. This is the per-OS knob
|
||||
//! that the rest of Colibri's capability-routing can lean on. [`SignatureSet::linux_default`]
|
||||
//! ships a small, high-value starter set; callers can build their own with
|
||||
//! [`SignatureSet::new`].
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// How serious a matched signature is. Mirrors the Python skill's taxonomy
|
||||
/// (`error` / `warn` / `info` for failure patterns, `ok` for success patterns).
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum Severity {
|
||||
/// Known-broken state needing action.
|
||||
Error,
|
||||
/// Suspicious but not fatal; worth surfacing.
|
||||
Warn,
|
||||
/// Benign/expected note.
|
||||
Info,
|
||||
/// Known-healthy state — confirms things work.
|
||||
Ok,
|
||||
}
|
||||
|
||||
/// One known terminal-text pattern and what it means.
|
||||
///
|
||||
/// `patterns` are matched as **case-insensitive substrings**; the first one
|
||||
/// that hits records the signature (further patterns are not reported twice).
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct Signature {
|
||||
pub id: String,
|
||||
pub severity: Severity,
|
||||
/// Which skill/domain owns this knowledge (provenance for the operator).
|
||||
pub source: String,
|
||||
pub patterns: Vec<String>,
|
||||
/// Plain-language remediation hint.
|
||||
pub next_action: String,
|
||||
/// Optional skill to auto-invoke to fix the condition.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub invoke: Option<String>,
|
||||
}
|
||||
|
||||
impl Signature {
|
||||
/// Convenience builder for a failure/info signature without an `invoke`.
|
||||
pub fn new(
|
||||
id: impl Into<String>,
|
||||
severity: Severity,
|
||||
source: impl Into<String>,
|
||||
patterns: impl IntoIterator<Item = impl Into<String>>,
|
||||
next_action: impl Into<String>,
|
||||
) -> Self {
|
||||
Self {
|
||||
id: id.into(),
|
||||
severity,
|
||||
source: source.into(),
|
||||
patterns: patterns.into_iter().map(Into::into).collect(),
|
||||
next_action: next_action.into(),
|
||||
invoke: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Attach a remediation skill to auto-invoke.
|
||||
pub fn with_invoke(mut self, invoke: impl Into<String>) -> Self {
|
||||
self.invoke = Some(invoke.into());
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// A single signature hit against captured text.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct SignatureMatch {
|
||||
pub id: String,
|
||||
pub severity: Severity,
|
||||
pub source: String,
|
||||
/// The exact pattern string that matched.
|
||||
pub matched_text: String,
|
||||
pub next_action: String,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub invoke: Option<String>,
|
||||
}
|
||||
|
||||
/// The classified result of scanning one screen of text.
|
||||
///
|
||||
/// Buckets are filled in declaration order of the signatures; `failures` are
|
||||
/// the ones the agent should act on before acknowledging `healthy` state.
|
||||
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct Detection {
|
||||
pub failures: Vec<SignatureMatch>,
|
||||
pub warnings: Vec<SignatureMatch>,
|
||||
pub info: Vec<SignatureMatch>,
|
||||
pub healthy: Vec<SignatureMatch>,
|
||||
}
|
||||
|
||||
impl Detection {
|
||||
/// True when nothing matched at all.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.failures.is_empty()
|
||||
&& self.warnings.is_empty()
|
||||
&& self.info.is_empty()
|
||||
&& self.healthy.is_empty()
|
||||
}
|
||||
|
||||
/// True when at least one `error`-severity signature fired.
|
||||
pub fn has_failures(&self) -> bool {
|
||||
!self.failures.is_empty()
|
||||
}
|
||||
|
||||
/// All `error`/`warn` matches — the ones that should drive an alert.
|
||||
/// (Edge-triggered de-duplication is the recorder's job, not this method's.)
|
||||
pub fn alertable(&self) -> impl Iterator<Item = &SignatureMatch> {
|
||||
self.failures.iter().chain(self.warnings.iter())
|
||||
}
|
||||
|
||||
/// Flat list of every match across all buckets (failures → warnings →
|
||||
/// info → healthy), for compact JSON metadata.
|
||||
pub fn all(&self) -> Vec<SignatureMatch> {
|
||||
let mut out = Vec::new();
|
||||
out.extend(self.failures.iter().cloned());
|
||||
out.extend(self.warnings.iter().cloned());
|
||||
out.extend(self.info.iter().cloned());
|
||||
out.extend(self.healthy.iter().cloned());
|
||||
out
|
||||
}
|
||||
}
|
||||
|
||||
/// An ordered collection of signatures and the matcher over them.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct SignatureSet {
|
||||
signatures: Vec<Signature>,
|
||||
}
|
||||
|
||||
impl SignatureSet {
|
||||
/// Build a set from an explicit list of signatures.
|
||||
pub fn new(signatures: impl IntoIterator<Item = Signature>) -> Self {
|
||||
Self {
|
||||
signatures: signatures.into_iter().collect(),
|
||||
}
|
||||
}
|
||||
|
||||
/// An empty set — useful for capture-only history with no triage.
|
||||
pub fn empty() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.signatures.len()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.signatures.is_empty()
|
||||
}
|
||||
|
||||
pub fn signatures(&self) -> &[Signature] {
|
||||
&self.signatures
|
||||
}
|
||||
|
||||
/// Scan `text` and classify every matching signature.
|
||||
///
|
||||
/// Matching is case-insensitive substring containment; for each signature
|
||||
/// only the first matching pattern is recorded (mirrors the Python skill).
|
||||
pub fn detect(&self, text: &str) -> Detection {
|
||||
let lowered = text.to_lowercase();
|
||||
let mut detection = Detection::default();
|
||||
|
||||
for sig in &self.signatures {
|
||||
let Some(pattern) = sig
|
||||
.patterns
|
||||
.iter()
|
||||
.find(|p| lowered.contains(&p.to_lowercase()))
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let hit = SignatureMatch {
|
||||
id: sig.id.clone(),
|
||||
severity: sig.severity,
|
||||
source: sig.source.clone(),
|
||||
matched_text: pattern.clone(),
|
||||
next_action: sig.next_action.clone(),
|
||||
invoke: sig.invoke.clone(),
|
||||
};
|
||||
|
||||
match sig.severity {
|
||||
Severity::Error => detection.failures.push(hit),
|
||||
Severity::Warn => detection.warnings.push(hit),
|
||||
Severity::Info => detection.info.push(hit),
|
||||
Severity::Ok => detection.healthy.push(hit),
|
||||
}
|
||||
}
|
||||
|
||||
detection
|
||||
}
|
||||
|
||||
/// A small, high-value starter set for Linux hosts (systemd / kernel /
|
||||
/// container / network). Deliberately conservative — extend per deployment
|
||||
/// rather than matching broad words like "error" that produce false hits.
|
||||
pub fn linux_default() -> Self {
|
||||
Self::new([
|
||||
// ── failures ──────────────────────────────────────────────
|
||||
Signature::new(
|
||||
"systemd_unit_failed",
|
||||
Severity::Error,
|
||||
"systemd",
|
||||
["active: failed", "code=exited, status=1/failure", "failed to start"],
|
||||
"A systemd unit failed. Inspect with `systemctl status <unit>` and `journalctl -u <unit> -e`.",
|
||||
)
|
||||
.with_invoke("systemd-admin"),
|
||||
Signature::new(
|
||||
"oom_killer",
|
||||
Severity::Error,
|
||||
"kernel",
|
||||
["out of memory: killed process", "invoked oom-killer"],
|
||||
"The kernel OOM-killer reaped a process. Reduce memory pressure or raise limits/cgroup memory.",
|
||||
),
|
||||
Signature::new(
|
||||
"disk_full",
|
||||
Severity::Error,
|
||||
"filesystem",
|
||||
["no space left on device"],
|
||||
"A filesystem is full. Free space or grow the volume; check `df -h` and large logs.",
|
||||
),
|
||||
Signature::new(
|
||||
"docker_container_exited_error",
|
||||
Severity::Error,
|
||||
"docker",
|
||||
["exited (1)", "exited (137)", "exited (139)"],
|
||||
"A container exited non-zero. Check `docker logs <id>`; 137 = OOM/kill, 139 = segfault.",
|
||||
),
|
||||
Signature::new(
|
||||
"ip_forwarding_disabled",
|
||||
Severity::Error,
|
||||
"network",
|
||||
["net.ipv4.ip_forward = 0"],
|
||||
"IPv4 forwarding is off. Enable with `sysctl -w net.ipv4.ip_forward=1` and persist in sysctl.d.",
|
||||
),
|
||||
Signature::new(
|
||||
"firewall_drop_policy",
|
||||
Severity::Warn,
|
||||
"network",
|
||||
["chain input (policy drop", "policy drop 0 packets"],
|
||||
"Default INPUT policy is DROP. Confirm intended; add explicit pass rules for required services.",
|
||||
),
|
||||
// ── warnings ──────────────────────────────────────────────
|
||||
Signature::new(
|
||||
"connection_refused",
|
||||
Severity::Warn,
|
||||
"network",
|
||||
["connection refused"],
|
||||
"A connection was refused — the target service may be down or not listening on that port.",
|
||||
),
|
||||
Signature::new(
|
||||
"permission_denied",
|
||||
Severity::Warn,
|
||||
"filesystem",
|
||||
["permission denied"],
|
||||
"Permission denied. Check file ownership/mode or whether the command needs elevated rights.",
|
||||
),
|
||||
// ── healthy ───────────────────────────────────────────────
|
||||
Signature::new(
|
||||
"systemd_active_running",
|
||||
Severity::Ok,
|
||||
"systemd",
|
||||
["active: active (running)"],
|
||||
"Service is active and running.",
|
||||
),
|
||||
Signature::new(
|
||||
"ip_forwarding_enabled",
|
||||
Severity::Ok,
|
||||
"network",
|
||||
["net.ipv4.ip_forward = 1"],
|
||||
"IPv4 forwarding is enabled.",
|
||||
),
|
||||
Signature::new(
|
||||
"zpool_online",
|
||||
Severity::Ok,
|
||||
"zfs",
|
||||
["state: online"],
|
||||
"ZFS pool is healthy (ONLINE).",
|
||||
),
|
||||
])
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn detect_classifies_into_buckets() {
|
||||
let set = SignatureSet::linux_default();
|
||||
let text = "● nginx.service\n Active: failed (Result: exit-code)\nnet.ipv4.ip_forward = 1";
|
||||
let d = set.detect(text);
|
||||
assert_eq!(d.failures.len(), 1);
|
||||
assert_eq!(d.failures[0].id, "systemd_unit_failed");
|
||||
assert_eq!(d.failures[0].invoke.as_deref(), Some("systemd-admin"));
|
||||
assert_eq!(d.healthy.len(), 1);
|
||||
assert_eq!(d.healthy[0].id, "ip_forwarding_enabled");
|
||||
assert!(d.has_failures());
|
||||
assert!(!d.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn detect_is_case_insensitive() {
|
||||
let set = SignatureSet::linux_default();
|
||||
let d = set.detect("NO SPACE LEFT ON DEVICE while writing /var/log");
|
||||
assert_eq!(d.failures.len(), 1);
|
||||
assert_eq!(d.failures[0].id, "disk_full");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn only_first_matching_pattern_recorded_per_signature() {
|
||||
let set = SignatureSet::linux_default();
|
||||
// Two docker patterns present, but the signature should record once.
|
||||
let d = set.detect("web exited (1)\ndb exited (137)");
|
||||
assert_eq!(d.failures.len(), 1);
|
||||
assert_eq!(d.failures[0].id, "docker_container_exited_error");
|
||||
assert_eq!(d.failures[0].matched_text, "exited (1)");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clean_screen_yields_empty_detection() {
|
||||
let set = SignatureSet::linux_default();
|
||||
let d = set.detect("$ ls\nCargo.toml src target\n$ ");
|
||||
assert!(d.is_empty());
|
||||
assert!(!d.has_failures());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn alertable_covers_failures_and_warnings_only() {
|
||||
let set = SignatureSet::linux_default();
|
||||
let d = set.detect("Active: failed\nconnection refused\nActive: active (running)");
|
||||
let ids: Vec<&str> = d.alertable().map(|m| m.id.as_str()).collect();
|
||||
assert!(ids.contains(&"systemd_unit_failed"));
|
||||
assert!(ids.contains(&"connection_refused"));
|
||||
assert!(!ids.contains(&"systemd_active_running")); // healthy is not alertable
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn empty_set_matches_nothing() {
|
||||
let set = SignatureSet::empty();
|
||||
assert!(set.is_empty());
|
||||
assert!(set.detect("Active: failed\nout of memory: killed process").is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn detection_round_trips_json() {
|
||||
let set = SignatureSet::linux_default();
|
||||
let d = set.detect("Active: failed");
|
||||
let json = serde_json::to_string(&d).unwrap();
|
||||
let back: Detection = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(d, back);
|
||||
assert!(json.contains("\"error\"")); // severity serialized lowercase
|
||||
}
|
||||
}
|
||||
362
crates/colibri-glasspane/src/terminal.rs
Normal file
362
crates/colibri-glasspane/src/terminal.rs
Normal file
|
|
@ -0,0 +1,362 @@
|
|||
//! Terminal-text capture, content-hash framing, and deduplicated history.
|
||||
//!
|
||||
//! This is the complementary half of Glasspane's event model: where the rest
|
||||
//! of the crate derives agent state from structured JSONL events, this module
|
||||
//! records the **actual terminal text** of a pane and triages it with
|
||||
//! [`SignatureSet`](crate::signatures::SignatureSet).
|
||||
//!
|
||||
//! The key bet is the same one the Python `tmux-screenshot` skill made: a
|
||||
//! frame's identity is the **SHA-256 of its stripped text**. That makes
|
||||
//! "record pane history" cheap — a [`TerminalRecorder`] drops any frame whose
|
||||
//! hash equals the previous one, so polling a near-static pane every second
|
||||
//! collapses into a compact log of *actual* state transitions rather than
|
||||
//! thousands of duplicate frames.
|
||||
//!
|
||||
//! Alerts are **edge-triggered**: a failure/warning signature is reported only
|
||||
//! on the frame where it first appears, not on every subsequent frame that
|
||||
//! still shows it. When the condition clears and later recurs, it fires again.
|
||||
//!
|
||||
//! I/O is kept at the edges. [`capture_tmux_pane`] shells out to `tmux`, but
|
||||
//! [`TerminalRecorder::observe`] takes raw text directly so the dedup/triage
|
||||
//! logic is fully testable without a terminal (mirroring how PTY launch is
|
||||
//! separated from the event state model).
|
||||
|
||||
use std::{
|
||||
collections::{BTreeSet, VecDeque},
|
||||
io,
|
||||
process::Command,
|
||||
sync::OnceLock,
|
||||
time::SystemTime,
|
||||
};
|
||||
|
||||
use regex::Regex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
use crate::signatures::{Detection, SignatureMatch, SignatureSet};
|
||||
|
||||
/// Default number of frames a recorder keeps before evicting the oldest.
|
||||
pub const DEFAULT_HISTORY_CAPACITY: usize = 256;
|
||||
|
||||
/// One CSI escape sequence (SGR colors, cursor moves, etc.).
|
||||
fn csi_re() -> &'static Regex {
|
||||
static RE: OnceLock<Regex> = OnceLock::new();
|
||||
RE.get_or_init(|| Regex::new(r"\x1b\[[0-9;?]*[A-Za-z]").unwrap())
|
||||
}
|
||||
|
||||
/// Control characters to drop, preserving tab (`\x09`) and newline (`\x0a`).
|
||||
/// Includes a stray ESC (`\x1b`) left over from non-CSI sequences.
|
||||
fn ctrl_re() -> &'static Regex {
|
||||
static RE: OnceLock<Regex> = OnceLock::new();
|
||||
RE.get_or_init(|| Regex::new(r"[\x00-\x08\x0b-\x1f\x7f]").unwrap())
|
||||
}
|
||||
|
||||
/// Strip ANSI escape sequences and control characters from terminal output,
|
||||
/// leaving plain text suitable for hashing, diffing, and signature matching.
|
||||
/// Tabs and newlines are preserved.
|
||||
pub fn strip_ansi(text: &str) -> String {
|
||||
let no_csi = csi_re().replace_all(text, "");
|
||||
ctrl_re().replace_all(&no_csi, "").into_owned()
|
||||
}
|
||||
|
||||
/// Content-addressed frame id: the first 12 hex chars of `SHA-256(stripped)`.
|
||||
/// Identical screens produce identical ids — the basis for history dedup and
|
||||
/// for cheap verification (the id *is* the checksum of the text).
|
||||
pub fn frame_uuid(stripped: &str) -> String {
|
||||
let digest = Sha256::digest(stripped.as_bytes());
|
||||
let mut hex = String::with_capacity(12);
|
||||
for byte in digest.iter().take(6) {
|
||||
hex.push_str(&format!("{byte:02x}"));
|
||||
}
|
||||
hex
|
||||
}
|
||||
|
||||
/// One recorded screen of terminal text plus its triage result.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct CapturedFrame {
|
||||
/// `frame_uuid` of `text` — also the dedup key.
|
||||
pub uuid: String,
|
||||
/// ANSI-stripped terminal text.
|
||||
pub text: String,
|
||||
/// RFC3339 capture time (matches the Glasspane snapshot convention).
|
||||
pub observed_at: String,
|
||||
/// Signatures detected in this frame.
|
||||
pub detection: Detection,
|
||||
}
|
||||
|
||||
/// Outcome of feeding one capture to a [`TerminalRecorder`].
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum Observation {
|
||||
/// The screen was identical to the last recorded frame — dropped, no
|
||||
/// history entry, no alerts.
|
||||
Unchanged,
|
||||
/// The screen changed and was recorded.
|
||||
Recorded {
|
||||
/// Content hash of the new frame.
|
||||
uuid: String,
|
||||
/// Failure/warning signatures that *just appeared* (edge-triggered).
|
||||
/// Empty when nothing newly fired, even if the frame still shows a
|
||||
/// previously-reported condition.
|
||||
new_alerts: Vec<SignatureMatch>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Observation {
|
||||
/// True when this observation produced a new history frame.
|
||||
pub fn is_recorded(&self) -> bool {
|
||||
matches!(self, Observation::Recorded { .. })
|
||||
}
|
||||
|
||||
/// Newly-fired alerts, if any (empty for `Unchanged`).
|
||||
pub fn new_alerts(&self) -> &[SignatureMatch] {
|
||||
match self {
|
||||
Observation::Recorded { new_alerts, .. } => new_alerts,
|
||||
Observation::Unchanged => &[],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A deduplicated, signature-triaged history of one pane's terminal text.
|
||||
///
|
||||
/// Feed it raw captures with [`observe`](Self::observe); it keeps only frames
|
||||
/// whose content actually changed and reports failure/warning signatures on
|
||||
/// their rising edge. Bounded to `capacity` frames (ring buffer).
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TerminalRecorder {
|
||||
pane_id: String,
|
||||
capacity: usize,
|
||||
signatures: SignatureSet,
|
||||
last_hash: Option<String>,
|
||||
/// Failure/warning signature ids firing as of the last recorded frame.
|
||||
active: BTreeSet<String>,
|
||||
frames: VecDeque<CapturedFrame>,
|
||||
}
|
||||
|
||||
impl TerminalRecorder {
|
||||
/// Create a recorder for `pane_id` with an explicit signature set and
|
||||
/// history capacity.
|
||||
pub fn new(pane_id: impl Into<String>, signatures: SignatureSet, capacity: usize) -> Self {
|
||||
Self {
|
||||
pane_id: pane_id.into(),
|
||||
capacity: capacity.max(1),
|
||||
signatures,
|
||||
last_hash: None,
|
||||
active: BTreeSet::new(),
|
||||
frames: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Recorder with the Linux default signature set and default capacity.
|
||||
pub fn linux(pane_id: impl Into<String>) -> Self {
|
||||
Self::new(pane_id, SignatureSet::linux_default(), DEFAULT_HISTORY_CAPACITY)
|
||||
}
|
||||
|
||||
pub fn pane_id(&self) -> &str {
|
||||
&self.pane_id
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.frames.len()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.frames.is_empty()
|
||||
}
|
||||
|
||||
/// All retained frames, oldest first.
|
||||
pub fn frames(&self) -> impl Iterator<Item = &CapturedFrame> {
|
||||
self.frames.iter()
|
||||
}
|
||||
|
||||
/// The most recently recorded frame, if any.
|
||||
pub fn latest(&self) -> Option<&CapturedFrame> {
|
||||
self.frames.back()
|
||||
}
|
||||
|
||||
/// Failure/warning signature ids currently considered "firing".
|
||||
pub fn active_alerts(&self) -> impl Iterator<Item = &str> {
|
||||
self.active.iter().map(String::as_str)
|
||||
}
|
||||
|
||||
/// Record one raw terminal capture observed at `observed_at`.
|
||||
///
|
||||
/// Returns [`Observation::Unchanged`] if the stripped text hashes to the
|
||||
/// same id as the previous frame (dropped), otherwise
|
||||
/// [`Observation::Recorded`] with any edge-triggered alerts.
|
||||
pub fn observe(&mut self, raw: &str, observed_at: SystemTime) -> Observation {
|
||||
let text = strip_ansi(raw);
|
||||
let uuid = frame_uuid(&text);
|
||||
|
||||
if self.last_hash.as_deref() == Some(uuid.as_str()) {
|
||||
return Observation::Unchanged;
|
||||
}
|
||||
|
||||
let detection = self.signatures.detect(&text);
|
||||
|
||||
// Edge-trigger: alertable signatures not firing as of the last frame.
|
||||
let current: BTreeSet<String> =
|
||||
detection.alertable().map(|m| m.id.clone()).collect();
|
||||
let new_alerts: Vec<SignatureMatch> = detection
|
||||
.alertable()
|
||||
.filter(|m| !self.active.contains(&m.id))
|
||||
.cloned()
|
||||
.collect();
|
||||
self.active = current;
|
||||
|
||||
let frame = CapturedFrame {
|
||||
uuid: uuid.clone(),
|
||||
text,
|
||||
observed_at: crate::system_time_to_rfc3339(observed_at),
|
||||
detection,
|
||||
};
|
||||
self.frames.push_back(frame);
|
||||
if self.frames.len() > self.capacity {
|
||||
self.frames.pop_front();
|
||||
}
|
||||
self.last_hash = Some(uuid.clone());
|
||||
|
||||
Observation::Recorded { uuid, new_alerts }
|
||||
}
|
||||
|
||||
/// Capture the named tmux target and record it. Convenience wrapper over
|
||||
/// [`capture_tmux_pane`] + [`observe`](Self::observe) for the daemon's
|
||||
/// poll loop. Errors propagate the tmux capture failure.
|
||||
pub fn observe_tmux(&mut self, target: &str, observed_at: SystemTime) -> io::Result<Observation> {
|
||||
let raw = capture_tmux_pane(target)?;
|
||||
Ok(self.observe(&raw, observed_at))
|
||||
}
|
||||
}
|
||||
|
||||
/// Capture a tmux pane's visible content (with escape sequences) as a string.
|
||||
///
|
||||
/// `target` is any tmux target spec: `session`, `session:window`,
|
||||
/// `session:window.pane`, or an absolute pane id like `%3`. Runs
|
||||
/// `tmux capture-pane -e -p -t <target>`. Requires `tmux` on `PATH`.
|
||||
pub fn capture_tmux_pane(target: &str) -> io::Result<String> {
|
||||
let output = Command::new("tmux")
|
||||
.args(["capture-pane", "-e", "-p", "-t", target])
|
||||
.output()?;
|
||||
if !output.status.success() {
|
||||
return Err(io::Error::other(format!(
|
||||
"tmux capture-pane failed for {target:?}: {}",
|
||||
String::from_utf8_lossy(&output.stderr).trim()
|
||||
)));
|
||||
}
|
||||
Ok(String::from_utf8_lossy(&output.stdout).into_owned())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::time::Duration;
|
||||
|
||||
fn t(secs: u64) -> SystemTime {
|
||||
SystemTime::UNIX_EPOCH + Duration::from_secs(secs)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn strip_ansi_removes_color_and_control_keeps_text() {
|
||||
// green "ok", reset, then a cursor-home CSI and a stray carriage return.
|
||||
let raw = "\x1b[32mok\x1b[0m\x1b[Hdone\r\n";
|
||||
assert_eq!(strip_ansi(raw), "okdone\n");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn strip_ansi_preserves_tabs_and_newlines() {
|
||||
assert_eq!(strip_ansi("a\tb\nc"), "a\tb\nc");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn frame_uuid_is_stable_and_12_hex() {
|
||||
let a = frame_uuid("hello world");
|
||||
let b = frame_uuid("hello world");
|
||||
assert_eq!(a, b);
|
||||
assert_eq!(a.len(), 12);
|
||||
assert!(a.chars().all(|c| c.is_ascii_hexdigit()));
|
||||
assert_ne!(frame_uuid("hello world"), frame_uuid("hello worle"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn uuid_ignores_color_only_differences() {
|
||||
// Same text, different coloring → same frame id (dedup works on content).
|
||||
let plain = "\x1b[31mALERT\x1b[0m";
|
||||
let other = "\x1b[33mALERT\x1b[0m";
|
||||
assert_eq!(frame_uuid(&strip_ansi(plain)), frame_uuid(&strip_ansi(other)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn identical_screen_is_dropped() {
|
||||
let mut rec = TerminalRecorder::linux("pane-1");
|
||||
let first = rec.observe("$ idle prompt\n", t(1));
|
||||
assert!(first.is_recorded());
|
||||
let second = rec.observe("$ idle prompt\n", t(2));
|
||||
assert_eq!(second, Observation::Unchanged);
|
||||
assert_eq!(rec.len(), 1, "duplicate frame must not be stored");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn changed_screen_is_recorded() {
|
||||
let mut rec = TerminalRecorder::linux("pane-1");
|
||||
rec.observe("frame one\n", t(1));
|
||||
rec.observe("frame two\n", t(2));
|
||||
assert_eq!(rec.len(), 2);
|
||||
assert_eq!(rec.latest().unwrap().text, "frame two\n");
|
||||
assert_eq!(rec.latest().unwrap().observed_at, "1970-01-01T00:00:02.000Z");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn failure_alert_is_edge_triggered() {
|
||||
let mut rec = TerminalRecorder::linux("pane-1");
|
||||
|
||||
// Rising edge: failure appears → alert fires once.
|
||||
let o1 = rec.observe("boot log\nActive: failed\n", t(1));
|
||||
assert_eq!(o1.new_alerts().len(), 1);
|
||||
assert_eq!(o1.new_alerts()[0].id, "systemd_unit_failed");
|
||||
|
||||
// Different screen, failure still present → no repeat alert.
|
||||
let o2 = rec.observe("boot log\nActive: failed\nstill broken\n", t(2));
|
||||
assert!(o2.is_recorded());
|
||||
assert_eq!(o2.new_alerts().len(), 0, "persisting failure must not re-alert");
|
||||
|
||||
// Condition clears.
|
||||
let o3 = rec.observe("Active: active (running)\n", t(3));
|
||||
assert_eq!(o3.new_alerts().len(), 0);
|
||||
assert!(rec.active_alerts().next().is_none());
|
||||
|
||||
// Recurs → fires again (falling then rising edge).
|
||||
let o4 = rec.observe("Active: failed\nagain\n", t(4));
|
||||
assert_eq!(o4.new_alerts().len(), 1);
|
||||
assert_eq!(o4.new_alerts()[0].id, "systemd_unit_failed");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ring_buffer_evicts_oldest() {
|
||||
let mut rec = TerminalRecorder::new("pane-1", SignatureSet::empty(), 3);
|
||||
for i in 0..5 {
|
||||
rec.observe(&format!("frame {i}\n"), t(i));
|
||||
}
|
||||
assert_eq!(rec.len(), 3);
|
||||
let texts: Vec<String> = rec.frames().map(|f| f.text.clone()).collect();
|
||||
assert_eq!(texts, vec!["frame 2\n", "frame 3\n", "frame 4\n"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn capacity_floor_is_one() {
|
||||
let mut rec = TerminalRecorder::new("p", SignatureSet::empty(), 0);
|
||||
rec.observe("a\n", t(1));
|
||||
rec.observe("b\n", t(2));
|
||||
assert_eq!(rec.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn frame_carries_detection_for_storage() {
|
||||
let mut rec = TerminalRecorder::linux("pane-1");
|
||||
rec.observe("no space left on device\n", t(1));
|
||||
let frame = rec.latest().unwrap();
|
||||
assert!(frame.detection.has_failures());
|
||||
// Frame is serializable for history persistence.
|
||||
let json = serde_json::to_string(frame).unwrap();
|
||||
let back: CapturedFrame = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(*frame, back);
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue