colibri/crates/colibri-glasspane/src/lib.rs
Sam & Claude 7bc9483156
Some checks failed
CI / rust (pull_request) Has been cancelled
CI / markdown (pull_request) Has been cancelled
CI / port (pull_request) Has been cancelled
CI / agent-jail-pkgs (pull_request) Has been cancelled
refactor(glasspane): pi_type → event_type; close out wiki residue
Residue item #1: rename the pi-era `pi_type` field/param to `event_type` in
colibri-glasspane. It names the normalized event-type string (zot events map
onto the same taxonomy), so the harness-neutral name is correct. Internal only
— PiStreamUpdate is not serialized — so no wire impact.

Wiki ledger updated:
- pi_type → event_type added to Shipped (now enforced by wiki-lint).
- Residue items resolved and recorded under Structural decisions:
  - FEATURE_COLIBRI is an internal build-time escape hatch, not a user-facing
    flag — README clarified (clawdie-iso #130).
  - clawdie-gui is the stable operator command; clawdie-startx retained as a
    back-compat alias (both installed) — verified intentional, not drift.
- Known residue now down to the dangling ADR reference only.

Verified: ci-checks.sh green (fmt/clippy/test/markdown); wiki-lint --strict clean.
2026-06-24 10:44:15 +02:00

1135 lines
38 KiB
Rust

//! colibri-glasspane — FreeBSD-native agent supervision ("the radar").
//!
//! Sessions, panes, and a semantic agent-state per pane, exposed behind
//! Colibri's unified API.
//!
//! Key bet: agent state is **derived deterministically from agent events**
//! (zot/pi `--mode json` JSONL events), not by screen-scraping terminal output.
//!
//! Phase 3 starts the supervision layer: Colibri-owned pane ids are distinct
//! from Pi session ids, ingestion is streaming, `last_event_at` is kept as real
//! time internally, and `stalled` is derived from event silence. Live PTY launch
//! is scaffolded with `portable-pty`; tests use sample JSONL readers until the
//! FreeBSD validation lane exercises real terminals.
use std::{
collections::BTreeMap,
io::{self, BufRead, BufReader, Read, Write},
path::PathBuf,
time::{Duration, SystemTime},
};
use chrono::{DateTime, SecondsFormat, Utc};
use portable_pty::{native_pty_system, Child, CommandBuilder, PtySize};
use serde::{Deserialize, Serialize};
use serde_json::Value;
pub const GLASSPANE_SNAPSHOT_SCHEMA: &str = "clawdie.glasspane.snapshot.v1";
pub const DEFAULT_STALL_AFTER: Duration = Duration::from_secs(4 * 60 * 60);
pub type PaneId = String;
/// The semantic state of a supervised agent — the 5-state model.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum AgentState {
/// Ready, between turns.
#[default]
Idle,
/// Actively producing (turn/message/tool in progress).
Working,
/// Waiting on steering / approval / input.
Blocked,
/// Turn or agent finished.
Done,
/// Failed or retries exhausted.
Error,
}
/// Fold one Pi `--mode json` event (its raw `type` field) into the agent state.
/// Mirrors the colibri-pi-events taxonomy; unknown types preserve the state
/// (forward-compatible with new Pi event kinds).
pub fn apply_pi_event(state: AgentState, event_type: &str) -> AgentState {
match event_type {
"session" | "session_started" => AgentState::Idle,
"agent_start"
| "turn_start"
| "message_start"
| "message_update"
| "message_end"
| "tool_execution_start"
| "tool_execution_update"
| "tool_execution_end"
| "auto_compaction_start"
| "auto_compaction_end"
| "compaction_start"
| "compaction_end"
| "auto_retry_start"
| "auto_retry_end" => AgentState::Working,
// Pending steering / follow-up / approval — the operator's attention is needed.
"queue_update" => AgentState::Blocked,
"turn_end" | "agent_end" => AgentState::Done,
"error" => AgentState::Error,
// Unknown / unmodeled event → state unchanged.
_ => state,
}
}
/// Fold a sequence of Pi event types into a final state (starting from `Idle`).
pub fn fold_pi_events<'a, I>(events: I) -> AgentState
where
I: IntoIterator<Item = &'a str>,
{
events.into_iter().fold(AgentState::Idle, apply_pi_event)
}
// ---------------------------------------------------------------------------
// zot runtime — normalized event taxonomy (Phase 1 scaffold)
// ---------------------------------------------------------------------------
/// Supported agent runtimes. Pi is the current default; zot RPC is the second
/// harness being integrated via a normalized event layer.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AgentRuntime {
#[default]
Pi,
Zot,
Local,
}
/// Parse one line of zot RPC NDJSON output and return a normalized event type.
/// Only the `type` field is used — unknown/missing types return `None`.
/// Permissive: zot emits `tool_use_start`, `tool_use_args`, `tool_use_end` in
/// addition to `tool_call`, `tool_progress`, `tool_result` during tool loops.
pub fn zot_event_type(line: &str) -> Option<String> {
let value: Value = serde_json::from_str(line.trim()).ok()?;
let ty = value.get("type")?.as_str()?;
// Map zot RPC event types to normalized Colibri event types.
// This is intentionally permissive — unknown types are passed through
// unchanged so forward-compatible zot events don't break the parser.
let normalized = match ty {
// Session lifecycle
"turn_start" => "turn_start",
"turn_end" => "turn_end",
"done" => "agent_end",
"error" => "error",
// Message flow
"user_message" => "message_update",
"assistant_start" => "message_start",
"text_delta" => "message_update",
"assistant_message" => "message_end",
// Tool calls — zot emits both `tool_call` and `tool_use_*` events.
// tool_use_start already maps to tool_execution_start; the standalone
// tool_call is a duplicate that would double-fire. Skip it.
"tool_call" => return None,
"tool_use_start" => "tool_execution_start",
"tool_use_args" => "tool_execution_update",
"tool_progress" => "tool_execution_update",
"tool_use_end" => "tool_execution_update",
"tool_result" => "tool_execution_end",
// Command acknowledgment — no state change unless success:false
"response" => {
if let Some(false) = value.get("success").and_then(|v| v.as_bool()) {
return Some("error".to_string());
}
return None; // success:true → no state change
}
// Usage / stats — no state change
"usage" => return None,
// Unknown — pass through unchanged
_ => ty,
};
Some(normalized.to_string())
}
/// Fold a zot event into the agent state. Uses the same `apply_pi_event`
/// logic because the event taxonomy is normalized to the same type names.
pub fn apply_zot_event(state: AgentState, zot_line: &str) -> AgentState {
match zot_event_type(zot_line) {
Some(ref ty) => apply_pi_event(state, ty),
None => state,
}
}
/// Fold a sequence of zot RPC JSONL lines into a final state.
pub fn fold_zot_events<'a, I>(lines: I) -> AgentState
where
I: IntoIterator<Item = &'a str>,
{
lines.into_iter().fold(AgentState::Idle, apply_zot_event)
}
// ---------------------------------------------------------------------------
// Phase 2 — real Pi `--mode json` JSONL ingestion
// ---------------------------------------------------------------------------
/// Parse one line of Pi `--mode json` output and return its `type` field.
/// Non-objects / blank / malformed lines yield `None` (lenient — never panics).
pub fn pi_event_type(line: &str) -> Option<String> {
let value: Value = serde_json::from_str(line.trim()).ok()?;
value.get("type")?.as_str().map(str::to_string)
}
/// Fold a whole Pi `--mode json` JSONL stream into the final agent state.
/// Blank/unparseable lines are skipped, so partial or noisy streams are safe.
pub fn agent_state_from_jsonl(jsonl: &str) -> AgentState {
jsonl
.lines()
.filter_map(pi_event_type)
.fold(AgentState::Idle, |state, ty| apply_pi_event(state, &ty))
}
/// Derive a [`Pane`] from a Pi `--mode json` stream with an explicit
/// Colibri-owned pane id. The Pi session id is captured separately.
pub fn pane_from_jsonl_with_id(
pane_id: impl Into<String>,
agent: impl Into<String>,
jsonl: &str,
) -> Pane {
let observed_at = SystemTime::UNIX_EPOCH;
let mut ingestor = PiJsonlIngestor::default();
ingestor.ingest_jsonl_at(jsonl, observed_at);
Pane {
id: pane_id.into(),
agent: agent.into(),
state: ingestor.state,
session_id: ingestor.session_id,
last_event_at: None,
cwd: ingestor.cwd,
stalled: false,
}
}
/// Derive a [`Pane`] from a Pi `--mode json` stream. Prefer
/// [`pane_from_jsonl_with_id`] once a real supervisor owns the pane id.
pub fn pane_from_jsonl(agent: impl Into<String>, jsonl: &str) -> Pane {
pane_from_jsonl_with_id("jsonl", agent, jsonl)
}
// ---------------------------------------------------------------------------
// Phase 3 — streaming ingestion + pane supervision
// ---------------------------------------------------------------------------
/// One accepted structured event from a streaming Pi JSONL reader.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PiStreamUpdate {
pub event_type: String,
pub state: AgentState,
pub session_id: Option<String>,
pub cwd: Option<String>,
pub observed_at: SystemTime,
}
/// Summary returned after draining a pane's JSONL reader.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct PaneReaderStats {
pub total_lines: usize,
pub accepted_lines: usize,
pub skipped_lines: usize,
pub last_update: Option<PiStreamUpdate>,
}
/// Stateful streaming ingestor for Pi `--mode json` JSONL.
///
/// This is intentionally independent from PTY/process ownership: tests can feed
/// sample readers, while live panes can wire PTY stdout to the same API later.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PiJsonlIngestor {
state: AgentState,
session_id: Option<String>,
cwd: Option<String>,
last_event_at: Option<SystemTime>,
/// Which harness produced the stream. Pi events are read directly; zot
/// events are normalized through `zot_event_type` first.
runtime: AgentRuntime,
}
impl Default for PiJsonlIngestor {
fn default() -> Self {
Self {
state: AgentState::Idle,
session_id: None,
cwd: None,
last_event_at: None,
runtime: AgentRuntime::Pi,
}
}
}
impl PiJsonlIngestor {
/// Ingestor for a specific harness (Pi reads raw, Zot is normalized).
pub fn with_runtime(runtime: AgentRuntime) -> Self {
Self {
runtime,
..Self::default()
}
}
pub fn state(&self) -> AgentState {
self.state
}
pub fn session_id(&self) -> Option<&str> {
self.session_id.as_deref()
}
pub fn cwd(&self) -> Option<&str> {
self.cwd.as_deref()
}
pub fn last_event_at(&self) -> Option<SystemTime> {
self.last_event_at
}
/// Ingest one JSONL line at a caller-supplied observation time.
/// Malformed/type-less lines are skipped and return `None`.
pub fn ingest_line_at(
&mut self,
line: &str,
observed_at: SystemTime,
) -> Option<PiStreamUpdate> {
let value: Value = serde_json::from_str(line.trim()).ok()?;
// Pi events use Colibri's taxonomy directly; zot events are normalized
// (e.g. tool_use_start -> tool_execution_start, response success:false
// -> error, response/usage -> skipped).
let ty: String = match self.runtime {
// Pi and Local both emit the Colibri/Pi taxonomy directly.
AgentRuntime::Pi | AgentRuntime::Local => value.get("type")?.as_str()?.to_string(),
AgentRuntime::Zot => zot_event_type(line)?,
};
let ty = ty.as_str();
self.state = apply_pi_event(self.state, ty);
self.last_event_at = Some(observed_at);
if ty == "session" || ty == "session_started" {
if let Some(sid) = value.get("id").and_then(Value::as_str) {
self.session_id = Some(sid.to_string());
}
if let Some(cwd) = value.get("cwd").and_then(Value::as_str) {
self.cwd = Some(cwd.to_string());
}
}
Some(PiStreamUpdate {
event_type: ty.to_string(),
state: self.state,
session_id: self.session_id.clone(),
cwd: self.cwd.clone(),
observed_at,
})
}
/// Ingest a JSONL blob as if all accepted events were seen at `observed_at`.
/// This preserves Phase-2 lenience while sharing the streaming state machine.
pub fn ingest_jsonl_at(&mut self, jsonl: &str, observed_at: SystemTime) -> usize {
jsonl
.lines()
.filter_map(|line| self.ingest_line_at(line, observed_at))
.count()
}
}
/// A supervised pane owned by Colibri. Its `id` is **not** the Pi session id;
/// the Pi id is discovered from the JSONL header and stored separately.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SupervisedPane {
pub id: PaneId,
pub agent: String,
pub started_at: SystemTime,
ingestor: PiJsonlIngestor,
}
impl SupervisedPane {
pub fn new(id: impl Into<PaneId>, agent: impl Into<String>, started_at: SystemTime) -> Self {
Self::new_with_runtime(id, agent, started_at, AgentRuntime::Pi)
}
/// Like [`new`](Self::new) but binds the pane to a specific harness so its
/// stream is parsed correctly (Pi raw vs zot normalized).
pub fn new_with_runtime(
id: impl Into<PaneId>,
agent: impl Into<String>,
started_at: SystemTime,
runtime: AgentRuntime,
) -> Self {
Self {
id: id.into(),
agent: agent.into(),
started_at,
ingestor: PiJsonlIngestor::with_runtime(runtime),
}
}
pub fn state(&self) -> AgentState {
self.ingestor.state()
}
pub fn session_id(&self) -> Option<&str> {
self.ingestor.session_id()
}
pub fn cwd(&self) -> Option<&str> {
self.ingestor.cwd()
}
pub fn last_event_at(&self) -> Option<SystemTime> {
self.ingestor.last_event_at()
}
pub fn ingest_line_at(
&mut self,
line: &str,
observed_at: SystemTime,
) -> Option<PiStreamUpdate> {
self.ingestor.ingest_line_at(line, observed_at)
}
pub fn is_stalled_at(&self, now: SystemTime, stall_after: Duration) -> bool {
if !matches!(self.state(), AgentState::Working | AgentState::Blocked) {
return false;
}
let silence_since = self.last_event_at().unwrap_or(self.started_at);
now.duration_since(silence_since)
.is_ok_and(|silent_for| silent_for >= stall_after)
}
pub fn to_pane_at(&self, now: SystemTime, stall_after: Duration) -> Pane {
Pane {
id: self.id.clone(),
agent: self.agent.clone(),
state: self.state(),
session_id: self.session_id().map(str::to_string),
last_event_at: self.last_event_at().map(system_time_to_rfc3339),
cwd: self.cwd().map(str::to_string),
stalled: self.is_stalled_at(now, stall_after),
}
}
}
/// In-memory pane supervisor used by tests and, later, the daemon/socket layer.
#[derive(Debug, Default, Clone)]
pub struct PaneSupervisor {
panes: BTreeMap<PaneId, SupervisedPane>,
}
impl PaneSupervisor {
pub fn new() -> Self {
Self::default()
}
pub fn attach_pane_at(
&mut self,
pane_id: impl Into<PaneId>,
agent: impl Into<String>,
started_at: SystemTime,
) -> &mut SupervisedPane {
self.attach_pane_with_runtime(pane_id, agent, started_at, AgentRuntime::Pi)
}
/// Attach a pane bound to a specific harness so its stdout JSONL is parsed
/// with the right taxonomy (Pi raw vs zot normalized).
pub fn attach_pane_with_runtime(
&mut self,
pane_id: impl Into<PaneId>,
agent: impl Into<String>,
started_at: SystemTime,
runtime: AgentRuntime,
) -> &mut SupervisedPane {
let pane = SupervisedPane::new_with_runtime(pane_id, agent, started_at, runtime);
self.panes.entry(pane.id.clone()).or_insert(pane)
}
pub fn get(&self, pane_id: &str) -> Option<&SupervisedPane> {
self.panes.get(pane_id)
}
pub fn get_mut(&mut self, pane_id: &str) -> Option<&mut SupervisedPane> {
self.panes.get_mut(pane_id)
}
pub fn ingest_line_at(
&mut self,
pane_id: &str,
line: &str,
observed_at: SystemTime,
) -> Option<PiStreamUpdate> {
self.get_mut(pane_id)?.ingest_line_at(line, observed_at)
}
/// Test-first Phase-3 path: feed a sample/recorded JSONL reader through the
/// same streaming API that a PTY stdout reader will use.
pub fn ingest_jsonl_reader_at<R: BufRead>(
&mut self,
pane_id: impl Into<PaneId>,
agent: impl Into<String>,
reader: R,
started_at: SystemTime,
) -> io::Result<usize> {
let pane_id = pane_id.into();
self.attach_pane_at(pane_id.clone(), agent, started_at);
let mut observed_at = started_at;
run_pane_reader_with_clock(self, &pane_id, reader, || {
let current = observed_at;
observed_at += Duration::from_millis(1);
current
})
.map(|stats| stats.accepted_lines)
}
pub fn snapshot_at(
&self,
host: impl Into<String>,
observed_at: SystemTime,
stall_after: Duration,
) -> GlasspaneSnapshot {
GlasspaneSnapshot::new(
host,
system_time_to_rfc3339(observed_at),
self.panes
.values()
.map(|pane| pane.to_pane_at(observed_at, stall_after))
.collect(),
)
}
}
/// Drain a JSONL reader into an already-attached pane using wall-clock time.
///
/// This is the Phase-3.1 bridge between sample JSONL readers and real PTY stdout:
/// both paths feed the same streaming ingestor and update the same supervisor.
pub fn run_pane_reader<R: BufRead>(
supervisor: &mut PaneSupervisor,
pane_id: &str,
reader: R,
) -> io::Result<PaneReaderStats> {
run_pane_reader_with_clock(supervisor, pane_id, reader, SystemTime::now)
}
/// Deterministic variant of [`run_pane_reader`] for tests.
pub fn run_pane_reader_with_clock<R, F>(
supervisor: &mut PaneSupervisor,
pane_id: &str,
reader: R,
mut now: F,
) -> io::Result<PaneReaderStats>
where
R: BufRead,
F: FnMut() -> SystemTime,
{
if supervisor.get(pane_id).is_none() {
return Err(io::Error::new(
io::ErrorKind::NotFound,
format!("pane {pane_id:?} is not attached"),
));
}
let mut stats = PaneReaderStats::default();
for line in reader.lines() {
let line = line?;
stats.total_lines += 1;
let observed_at = now();
match supervisor.ingest_line_at(pane_id, &line, observed_at) {
Some(update) => {
stats.accepted_lines += 1;
stats.last_update = Some(update);
}
None => stats.skipped_lines += 1,
}
}
Ok(stats)
}
/// Minimal portable-pty launch spec. Live FreeBSD validation is intentionally
/// later; this keeps the process/PTY seam explicit while tests use sample readers.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PtyLaunchSpec {
pub program: String,
pub args: Vec<String>,
pub cwd: Option<PathBuf>,
pub size: PtySize,
}
impl PtyLaunchSpec {
pub fn new(program: impl Into<String>) -> Self {
Self {
program: program.into(),
args: Vec::new(),
cwd: None,
size: PtySize::default(),
}
}
}
/// Handles for a live PTY-backed pane process.
pub struct PtyPane {
pub child: Box<dyn Child + Send + Sync>,
pub reader: BufReader<Box<dyn Read + Send>>,
pub writer: Box<dyn Write + Send>,
}
/// Spawn a command under a native PTY. The caller owns wiring `reader` into
/// [`PiJsonlIngestor`] / [`PaneSupervisor`]. Not exercised by CI yet; sample JSONL
/// readers cover the supervision state model first.
pub fn spawn_pty_command(
spec: &PtyLaunchSpec,
) -> Result<PtyPane, Box<dyn std::error::Error + Send + Sync>> {
let pty_system = native_pty_system();
let pair = pty_system.openpty(spec.size)?;
let mut cmd = CommandBuilder::new(&spec.program);
cmd.args(&spec.args);
if let Some(cwd) = &spec.cwd {
cmd.cwd(cwd.as_os_str());
}
let child = pair.slave.spawn_command(cmd)?;
let reader = BufReader::new(pair.master.try_clone_reader()?);
let writer = pair.master.take_writer()?;
Ok(PtyPane {
child,
reader,
writer,
})
}
/// Drain the reader side of a live PTY-backed pane into the supervisor.
///
/// The caller owns spawning/waiting/killing the child; this function only folds
/// stdout JSONL into Glasspane state. CI still covers this through sample readers.
pub fn run_pty_pane_reader(
supervisor: &mut PaneSupervisor,
pane_id: &str,
pty: &mut PtyPane,
) -> io::Result<PaneReaderStats> {
run_pane_reader(supervisor, pane_id, &mut pty.reader)
}
fn skip_false(value: &bool) -> bool {
!*value
}
fn system_time_to_rfc3339(time: SystemTime) -> String {
let dt: DateTime<Utc> = time.into();
dt.to_rfc3339_opts(SecondsFormat::Millis, true)
}
/// A supervised pane — one agent occupying one PTY/session slot.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Pane {
/// Colibri-owned pane id. Distinct from the agent's own session id.
pub id: String,
pub agent: String,
pub state: AgentState,
// `alias` keeps deserializing the legacy `pi_session_id` key (pre-zot wire
// format / persisted snapshots) onto the harness-neutral field.
#[serde(
default,
alias = "pi_session_id",
skip_serializing_if = "Option::is_none"
)]
pub session_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_event_at: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cwd: Option<String>,
#[serde(default, skip_serializing_if = "skip_false")]
pub stalled: bool,
}
/// A point-in-time view of every supervised pane — what a display client
/// (Zed, web board, terminal) renders. Wire contract:
/// `clawdie.glasspane.snapshot.v1`.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct GlasspaneSnapshot {
pub schema: String,
pub host: String,
pub observed_at: String,
pub panes: Vec<Pane>,
}
impl GlasspaneSnapshot {
pub fn new(host: impl Into<String>, observed_at: impl Into<String>, panes: Vec<Pane>) -> Self {
Self {
schema: GLASSPANE_SNAPSHOT_SCHEMA.to_string(),
host: host.into(),
observed_at: observed_at.into(),
panes,
}
}
/// Count of panes in each non-idle state — the "radar" summary
/// (e.g. "2 blocked, 5 working").
pub fn count(&self, state: AgentState) -> usize {
self.panes.iter().filter(|p| p.state == state).count()
}
pub fn stalled_count(&self) -> usize {
self.panes.iter().filter(|p| p.stalled).count()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
fn t(seconds: u64) -> SystemTime {
SystemTime::UNIX_EPOCH + Duration::from_secs(seconds)
}
#[test]
fn lifecycle_idle_working_done() {
assert_eq!(fold_pi_events(["session"]), AgentState::Idle);
assert_eq!(
fold_pi_events(["session", "turn_start", "tool_execution_start"]),
AgentState::Working
);
assert_eq!(
fold_pi_events(["session", "turn_start", "message_update", "turn_end"]),
AgentState::Done
);
}
#[test]
fn queue_update_blocks() {
assert_eq!(
apply_pi_event(AgentState::Working, "queue_update"),
AgentState::Blocked
);
}
#[test]
fn error_sets_error() {
assert_eq!(
apply_pi_event(AgentState::Working, "error"),
AgentState::Error
);
}
#[test]
fn current_pi_compaction_events_are_working() {
assert_eq!(
fold_pi_events(["session", "auto_compaction_start", "auto_compaction_end"]),
AgentState::Working
);
}
#[test]
fn unknown_event_preserves_state() {
assert_eq!(
apply_pi_event(AgentState::Working, "some_future_event"),
AgentState::Working
);
}
#[test]
fn snapshot_round_trips_and_counts() {
let snap = GlasspaneSnapshot::new(
"domedog",
"2026-05-27T00:00:00Z",
vec![
Pane {
id: "p1".into(),
agent: "pi".into(),
state: AgentState::Working,
session_id: Some("pi-session-1".into()),
last_event_at: None,
cwd: Some("/repo".into()),
stalled: false,
},
Pane {
id: "p2".into(),
agent: "pi".into(),
state: AgentState::Blocked,
session_id: None,
last_event_at: None,
cwd: None,
stalled: true,
},
],
);
assert_eq!(snap.count(AgentState::Blocked), 1);
assert_eq!(snap.stalled_count(), 1);
let json = serde_json::to_string(&snap).unwrap();
assert!(json.contains("\"working\""));
assert!(json.contains(GLASSPANE_SNAPSHOT_SCHEMA));
let back: GlasspaneSnapshot = serde_json::from_str(&json).unwrap();
assert_eq!(snap, back);
}
// ---- Phase 2: real Pi --mode json ingestion ----
#[test]
fn jsonl_stream_folds_and_captures_session() {
// First line is a verbatim pi 0.75.5 session header.
let jsonl = [
r#"{"type":"session","version":3,"id":"019e5e59-6645-7e21-aca2-b57ccf0f8578","cwd":"/home/clawdija/clawdie-ai"}"#,
r#"{"type":"turn_start"}"#,
r#"{"type":"tool_execution_start","toolName":"bash"}"#,
r#"{"type":"turn_end"}"#,
]
.join("\n");
assert_eq!(agent_state_from_jsonl(&jsonl), AgentState::Done);
let pane = pane_from_jsonl_with_id("pane-a", "pi", &jsonl);
assert_eq!(pane.state, AgentState::Done);
assert_eq!(pane.id, "pane-a");
assert_eq!(
pane.session_id.as_deref(),
Some("019e5e59-6645-7e21-aca2-b57ccf0f8578")
);
assert_eq!(pane.cwd.as_deref(), Some("/home/clawdija/clawdie-ai"));
}
#[test]
fn jsonl_blocked_on_queue_update() {
let jsonl = [
r#"{"type":"session","id":"s","cwd":"/x"}"#,
r#"{"type":"turn_start"}"#,
r#"{"type":"queue_update","steering":["approve?"]}"#,
]
.join("\n");
assert_eq!(agent_state_from_jsonl(&jsonl), AgentState::Blocked);
}
#[test]
fn jsonl_lenient_skips_malformed() {
// garbage, non-JSON, blank, and type-less lines are all skipped.
let jsonl = "not json\n{bad}\n{\"type\":\"turn_start\"}\n\n{\"no_type\":1}";
assert_eq!(agent_state_from_jsonl(jsonl), AgentState::Working);
}
// ---- Phase 3: streaming ingestion + supervision ----
#[test]
fn streaming_ingestor_tracks_real_event_time_and_session() {
let mut ingestor = PiJsonlIngestor::default();
assert!(ingestor.ingest_line_at("not json", t(1)).is_none());
let update = ingestor
.ingest_line_at(r#"{"type":"session","id":"pi-s","cwd":"/repo"}"#, t(10))
.unwrap();
assert_eq!(update.event_type, "session");
assert_eq!(update.observed_at, t(10));
assert_eq!(ingestor.session_id(), Some("pi-s"));
assert_eq!(ingestor.cwd(), Some("/repo"));
assert_eq!(ingestor.last_event_at(), Some(t(10)));
ingestor.ingest_line_at(r#"{"type":"turn_start"}"#, t(12));
assert_eq!(ingestor.state(), AgentState::Working);
assert_eq!(ingestor.last_event_at(), Some(t(12)));
}
#[test]
fn supervisor_keeps_colibri_pane_id_separate_from_session_id() {
let mut supervisor = PaneSupervisor::new();
supervisor.attach_pane_at("pane-1", "pi", t(0));
supervisor.ingest_line_at(
"pane-1",
r#"{"type":"session","id":"pi-session-1","cwd":"/repo"}"#,
t(1),
);
let pane = supervisor
.snapshot_at("host", t(2), DEFAULT_STALL_AFTER)
.panes
.pop()
.unwrap();
assert_eq!(pane.id, "pane-1");
assert_eq!(pane.session_id.as_deref(), Some("pi-session-1"));
assert_ne!(pane.id, pane.session_id.unwrap());
}
#[test]
fn supervisor_sample_jsonl_reader_uses_streaming_api() {
let jsonl = [
r#"{"type":"session","id":"pi-s","cwd":"/repo"}"#,
"not json",
r#"{"type":"turn_start"}"#,
r#"{"type":"tool_execution_start","toolName":"bash"}"#,
]
.join("\n");
let mut supervisor = PaneSupervisor::new();
let accepted = supervisor
.ingest_jsonl_reader_at("pane-a", "pi", Cursor::new(jsonl), t(100))
.unwrap();
assert_eq!(accepted, 3);
let pane = supervisor.get("pane-a").unwrap();
assert_eq!(pane.state(), AgentState::Working);
assert_eq!(pane.session_id(), Some("pi-s"));
assert_eq!(
pane.last_event_at(),
Some(t(100) + Duration::from_millis(3))
);
}
#[test]
fn pane_reader_loop_reports_stats_and_last_update() {
let jsonl = [
r#"{"type":"session","id":"pi-s","cwd":"/repo"}"#,
"not json",
r#"{"type":"turn_start"}"#,
r#"{"type":"turn_end"}"#,
]
.join("\n");
let mut supervisor = PaneSupervisor::new();
supervisor.attach_pane_at("pane-reader", "pi", t(5));
let mut tick = 10;
let stats =
run_pane_reader_with_clock(&mut supervisor, "pane-reader", Cursor::new(jsonl), || {
let current = t(tick);
tick += 1;
current
})
.unwrap();
assert_eq!(stats.total_lines, 4);
assert_eq!(stats.accepted_lines, 3);
assert_eq!(stats.skipped_lines, 1);
assert_eq!(stats.last_update.unwrap().event_type, "turn_end");
let pane = supervisor.get("pane-reader").unwrap();
assert_eq!(pane.state(), AgentState::Done);
assert_eq!(pane.session_id(), Some("pi-s"));
assert_eq!(pane.last_event_at(), Some(t(13)));
}
#[test]
fn pane_reader_loop_errors_for_unattached_pane() {
let mut supervisor = PaneSupervisor::new();
let err = run_pane_reader(
&mut supervisor,
"missing",
Cursor::new(r#"{"type":"turn_start"}"#),
)
.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::NotFound);
}
#[test]
fn stalled_is_derived_from_event_silence_for_active_panes() {
let mut supervisor = PaneSupervisor::new();
supervisor.attach_pane_at("pane-active", "pi", t(0));
supervisor.ingest_line_at("pane-active", r#"{"type":"turn_start"}"#, t(10));
let fresh = supervisor.snapshot_at("host", t(20), Duration::from_secs(60));
assert_eq!(fresh.stalled_count(), 0);
let stalled = supervisor.snapshot_at("host", t(71), Duration::from_secs(60));
assert_eq!(stalled.stalled_count(), 1);
assert!(stalled.panes[0].stalled);
}
#[test]
fn done_panes_do_not_become_stalled() {
let mut supervisor = PaneSupervisor::new();
supervisor.attach_pane_at("pane-done", "pi", t(0));
supervisor.ingest_line_at("pane-done", r#"{"type":"turn_start"}"#, t(10));
supervisor.ingest_line_at("pane-done", r#"{"type":"turn_end"}"#, t(11));
let snapshot = supervisor.snapshot_at("host", t(10_000), Duration::from_secs(60));
assert_eq!(snapshot.panes[0].state, AgentState::Done);
assert!(!snapshot.panes[0].stalled);
}
}
// ---------------------------------------------------------------------------
// zot runtime tests — uses real captured zot RPC event lines
// ---------------------------------------------------------------------------
#[cfg(test)]
mod zot_runtime_tests {
use super::*;
// Real zot RPC output captured from: echo '{"id":"1","type":"prompt","message":"say hi"}' | zot rpc --provider deepseek --model deepseek-v4-pro
#[test]
fn zot_hello_session_folds_to_done() {
let lines = vec![
r#"{"command":"prompt","data":{"started":true},"id":"1","success":true,"type":"response"}"#,
r#"{"content":[{"text":"say hi","type":"text"}],"time":"...","type":"user_message"}"#,
r#"{"step":1,"type":"turn_start"}"#,
r#"{"type":"assistant_start"}"#,
r#"{"delta":"Hi","type":"text_delta"}"#,
r#"{"delta":" there!","type":"text_delta"}"#,
r#"{"type":"usage"}"#,
r#"{"content":[{"text":"Hi there!","type":"text"}],"time":"...","type":"assistant_message"}"#,
r#"{"stop":"end","type":"turn_end"}"#,
r#"{"type":"done"}"#,
];
let state = fold_zot_events(lines.iter().copied());
assert_eq!(state, AgentState::Done);
}
#[test]
fn zot_response_success_no_state_change() {
let line = r#"{"command":"prompt","data":{"started":true},"id":"1","success":true,"type":"response"}"#;
assert_eq!(zot_event_type(line), None);
assert_eq!(apply_zot_event(AgentState::Idle, line), AgentState::Idle);
}
#[test]
fn zot_response_failure_maps_to_error() {
let line = r#"{"command":"prompt","data":{},"id":"1","success":false,"type":"response","error":"something"}"#;
assert_eq!(zot_event_type(line).as_deref(), Some("error"));
assert_eq!(apply_zot_event(AgentState::Idle, line), AgentState::Error);
}
#[test]
fn zot_turn_start_maps_to_working() {
let line = r#"{"step":1,"type":"turn_start"}"#;
assert_eq!(zot_event_type(line).as_deref(), Some("turn_start"));
assert_eq!(apply_zot_event(AgentState::Idle, line), AgentState::Working);
}
// Runtime-aware streaming path: a Zot-bound pane normalizes raw zot events
// (incl. tool_use_*) through the same PaneSupervisor.ingest_line_at the
// daemon's stdout reader uses — not just incidentally via Pi names.
#[test]
fn zot_pane_streams_tool_loop_to_done() {
let t0 = std::time::UNIX_EPOCH;
let mut sup = PaneSupervisor::new();
sup.attach_pane_with_runtime("pane-1", "zot", t0, AgentRuntime::Zot);
let raw_zot = [
r#"{"step":1,"type":"turn_start"}"#,
r#"{"type":"tool_use_start","name":"bash"}"#, // -> tool_execution_start
r#"{"type":"tool_result"}"#, // -> tool_execution_end
r#"{"type":"usage"}"#, // -> skipped
];
for (i, line) in raw_zot.iter().enumerate() {
sup.ingest_line_at("pane-1", line, t0 + Duration::from_secs(i as u64));
}
// mid tool-loop: Working (precise, via normalized tool_execution_* events)
assert_eq!(sup.get("pane-1").unwrap().state(), AgentState::Working);
sup.ingest_line_at(
"pane-1",
r#"{"type":"turn_end"}"#,
t0 + Duration::from_secs(9),
);
assert_eq!(sup.get("pane-1").unwrap().state(), AgentState::Done);
}
#[test]
fn zot_text_delta_maps_to_message_update() {
let line = r#"{"delta":"Hello","type":"text_delta"}"#;
assert_eq!(zot_event_type(line).as_deref(), Some("message_update"));
}
#[test]
fn zot_assistant_message_is_message_end_not_done() {
// Critical: assistant_message does NOT end the pane — zot may emit
// them during tool loops. Only turn_end/done signal completion.
let line =
r#"{"content":[{"text":"Hi!","type":"text"}],"time":"...","type":"assistant_message"}"#;
assert_eq!(zot_event_type(line).as_deref(), Some("message_end"));
assert_eq!(
apply_zot_event(AgentState::Working, line),
AgentState::Working
);
}
#[test]
fn zot_turn_end_maps_to_done() {
let line = r#"{"stop":"end","type":"turn_end"}"#;
assert_eq!(zot_event_type(line).as_deref(), Some("turn_end"));
assert_eq!(apply_zot_event(AgentState::Working, line), AgentState::Done);
}
#[test]
fn zot_done_maps_to_agent_end() {
let line = r#"{"type":"done"}"#;
assert_eq!(zot_event_type(line).as_deref(), Some("agent_end"));
}
#[test]
fn zot_tool_call_is_skipped() {
// tool_call is intentionally skipped — tool_use_start already covers
// tool_execution_start; mapping both would double-fire (colibri#143).
let line = r#"{"args":{"command":"ls"},"id":"call_00","name":"bash","type":"tool_call"}"#;
assert_eq!(zot_event_type(line).as_deref(), None);
// State is unchanged (no event consumed).
assert_eq!(apply_zot_event(AgentState::Idle, line), AgentState::Idle);
}
#[test]
fn zot_tool_use_start_maps_to_execution_start() {
let line = r#"{"id":"call_00","name":"bash","type":"tool_use_start"}"#;
assert_eq!(
zot_event_type(line).as_deref(),
Some("tool_execution_start")
);
}
#[test]
fn zot_tool_use_args_maps_to_execution_update() {
let line = r#"{"delta":"\"ls\"","id":"call_00","type":"tool_use_args"}"#;
assert_eq!(
zot_event_type(line).as_deref(),
Some("tool_execution_update")
);
}
#[test]
fn zot_tool_result_maps_to_execution_end() {
let line = r#"{"content":[{"text":"...","type":"text"}],"id":"call_00","is_error":false,"type":"tool_result"}"#;
assert_eq!(zot_event_type(line).as_deref(), Some("tool_execution_end"));
}
#[test]
fn zot_usage_no_state_change() {
let line = r#"{"cache_read":896,"input":14,"output":33,"type":"usage"}"#;
assert_eq!(zot_event_type(line), None);
assert_eq!(
apply_zot_event(AgentState::Working, line),
AgentState::Working
);
}
#[test]
fn zot_unknown_event_passthrough() {
let line = r#"{"type":"some_future_zot_event","data":{}}"#;
assert_eq!(
zot_event_type(line).as_deref(),
Some("some_future_zot_event")
);
}
#[test]
fn zot_bad_json_returns_none() {
assert_eq!(zot_event_type("not json"), None);
assert_eq!(zot_event_type(""), None);
assert_eq!(zot_event_type(r#"{"no_type_field": true}"#), None);
assert_eq!(
apply_zot_event(AgentState::Idle, "not json"),
AgentState::Idle
);
}
#[test]
fn agent_runtime_default_is_pi() {
assert_eq!(AgentRuntime::default(), AgentRuntime::Pi);
}
#[test]
fn agent_runtime_serialization() {
assert_eq!(serde_json::to_string(&AgentRuntime::Pi).unwrap(), r#""pi""#);
assert_eq!(
serde_json::to_string(&AgentRuntime::Zot).unwrap(),
r#""zot""#
);
}
}