feat: add zot runtime event normalization scaffold #3

Merged
clawdie merged 1 commit from feat/zot-runtime-event-adapter into main 2026-05-31 16:03:53 +02:00

View file

@ -5,8 +5,8 @@
//! we re-implement the API/state model, not the code. //! we re-implement the API/state model, not the code.
//! //!
//! Key bet (see `docs/COLIBRI-GLASSPANE-DESIGN.md`): agent state is **derived //! Key bet (see `docs/COLIBRI-GLASSPANE-DESIGN.md`): agent state is **derived
//! deterministically from Pi `--mode json` events** (the colibri-pi-events //! deterministically from agent events** (Pi `--mode json` events and zot RPC
//! taxonomy), not by screen-scraping terminal output. //! events), not by screen-scraping terminal output.
//! //!
//! Phase 3 starts the supervision layer: Colibri-owned pane ids are distinct //! Phase 3 starts the supervision layer: Colibri-owned pane ids are distinct
//! from Pi session ids, ingestion is streaming, `last_event_at` is kept as real //! from Pi session ids, ingestion is streaming, `last_event_at` is kept as real
@ -85,6 +85,87 @@ where
events.into_iter().fold(AgentState::Idle, apply_pi_event) events.into_iter().fold(AgentState::Idle, apply_pi_event)
} }
// ---------------------------------------------------------------------------
// zot runtime — normalized event taxonomy (Phase 1 scaffold)
// ---------------------------------------------------------------------------
/// Supported agent runtimes. Pi is the current default; zot RPC is the second
/// harness being integrated via a normalized event layer.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AgentRuntime {
#[default]
Pi,
Zot,
Local,
}
/// Parse one line of zot RPC NDJSON output and return a normalized event type.
/// Only the `type` field is used — unknown/missing types return `None`.
/// Permissive: zot emits `tool_use_start`, `tool_use_args`, `tool_use_end` in
/// addition to `tool_call`, `tool_progress`, `tool_result` during tool loops.
pub fn zot_event_type(line: &str) -> Option<String> {
let value: Value = serde_json::from_str(line.trim()).ok()?;
let ty = value.get("type")?.as_str()?;
// Map zot RPC event types to normalized Colibri event types.
// This is intentionally permissive — unknown types are passed through
// unchanged so forward-compatible zot events don't break the parser.
let normalized = match ty {
// Session lifecycle
"turn_start" => "turn_start",
"turn_end" => "turn_end",
"done" => "agent_end",
"error" => "error",
// Message flow
"user_message" => "message_update",
"assistant_start" => "message_start",
"text_delta" => "message_update",
"assistant_message" => "message_end",
// Tool calls — zot emits both `tool_call` and `tool_use_*` events
"tool_call" => "tool_execution_start",
"tool_use_start" => "tool_execution_start",
"tool_use_args" => "tool_execution_update",
"tool_progress" => "tool_execution_update",
"tool_use_end" => "tool_execution_update",
"tool_result" => "tool_execution_end",
// Command acknowledgment — no state change unless success:false
"response" => {
if let Some(false) = value.get("success").and_then(|v| v.as_bool()) {
return Some("error".to_string());
}
return None; // success:true → no state change
}
// Usage / stats — no state change
"usage" => return None,
// Unknown — pass through unchanged
_ => ty,
};
Some(normalized.to_string())
}
/// Fold a zot event into the agent state. Uses the same `apply_pi_event`
/// logic because the event taxonomy is normalized to the same type names.
pub fn apply_zot_event(state: AgentState, zot_line: &str) -> AgentState {
match zot_event_type(zot_line) {
Some(ref ty) => apply_pi_event(state, ty),
None => state,
}
}
/// Fold a sequence of zot RPC JSONL lines into a final state.
pub fn fold_zot_events<'a, I>(lines: I) -> AgentState
where
I: IntoIterator<Item = &'a str>,
{
lines.into_iter().fold(AgentState::Idle, apply_zot_event)
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Phase 2 — real Pi `--mode json` JSONL ingestion // Phase 2 — real Pi `--mode json` JSONL ingestion
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -814,3 +895,163 @@ mod tests {
assert!(!snapshot.panes[0].stalled); assert!(!snapshot.panes[0].stalled);
} }
} }
// ---------------------------------------------------------------------------
// zot runtime tests — uses real captured zot RPC event lines
// ---------------------------------------------------------------------------
#[cfg(test)]
mod zot_runtime_tests {
use super::*;
// Real zot RPC output captured from: echo '{"id":"1","type":"prompt","message":"say hi"}' | zot rpc --provider deepseek --model deepseek-v4-pro
#[test]
fn zot_hello_session_folds_to_done() {
let lines = vec![
r#"{"command":"prompt","data":{"started":true},"id":"1","success":true,"type":"response"}"#,
r#"{"content":[{"text":"say hi","type":"text"}],"time":"...","type":"user_message"}"#,
r#"{"step":1,"type":"turn_start"}"#,
r#"{"type":"assistant_start"}"#,
r#"{"delta":"Hi","type":"text_delta"}"#,
r#"{"delta":" there!","type":"text_delta"}"#,
r#"{"type":"usage"}"#,
r#"{"content":[{"text":"Hi there!","type":"text"}],"time":"...","type":"assistant_message"}"#,
r#"{"stop":"end","type":"turn_end"}"#,
r#"{"type":"done"}"#,
];
let state = fold_zot_events(lines.iter().copied());
assert_eq!(state, AgentState::Done);
}
#[test]
fn zot_response_success_no_state_change() {
let line = r#"{"command":"prompt","data":{"started":true},"id":"1","success":true,"type":"response"}"#;
assert_eq!(zot_event_type(line), None);
assert_eq!(apply_zot_event(AgentState::Idle, line), AgentState::Idle);
}
#[test]
fn zot_response_failure_maps_to_error() {
let line = r#"{"command":"prompt","data":{},"id":"1","success":false,"type":"response","error":"something"}"#;
assert_eq!(zot_event_type(line).as_deref(), Some("error"));
assert_eq!(apply_zot_event(AgentState::Idle, line), AgentState::Error);
}
#[test]
fn zot_turn_start_maps_to_working() {
let line = r#"{"step":1,"type":"turn_start"}"#;
assert_eq!(zot_event_type(line).as_deref(), Some("turn_start"));
assert_eq!(apply_zot_event(AgentState::Idle, line), AgentState::Working);
}
#[test]
fn zot_text_delta_maps_to_message_update() {
let line = r#"{"delta":"Hello","type":"text_delta"}"#;
assert_eq!(zot_event_type(line).as_deref(), Some("message_update"));
}
#[test]
fn zot_assistant_message_is_message_end_not_done() {
// Critical: assistant_message does NOT end the pane — zot may emit
// them during tool loops. Only turn_end/done signal completion.
let line =
r#"{"content":[{"text":"Hi!","type":"text"}],"time":"...","type":"assistant_message"}"#;
assert_eq!(zot_event_type(line).as_deref(), Some("message_end"));
assert_eq!(
apply_zot_event(AgentState::Working, line),
AgentState::Working
);
}
#[test]
fn zot_turn_end_maps_to_done() {
let line = r#"{"stop":"end","type":"turn_end"}"#;
assert_eq!(zot_event_type(line).as_deref(), Some("turn_end"));
assert_eq!(apply_zot_event(AgentState::Working, line), AgentState::Done);
}
#[test]
fn zot_done_maps_to_agent_end() {
let line = r#"{"type":"done"}"#;
assert_eq!(zot_event_type(line).as_deref(), Some("agent_end"));
}
#[test]
fn zot_tool_call_maps_to_execution_start() {
let line = r#"{"args":{"command":"ls"},"id":"call_00","name":"bash","type":"tool_call"}"#;
assert_eq!(
zot_event_type(line).as_deref(),
Some("tool_execution_start")
);
assert_eq!(apply_zot_event(AgentState::Idle, line), AgentState::Working);
}
#[test]
fn zot_tool_use_start_maps_to_execution_start() {
let line = r#"{"id":"call_00","name":"bash","type":"tool_use_start"}"#;
assert_eq!(
zot_event_type(line).as_deref(),
Some("tool_execution_start")
);
}
#[test]
fn zot_tool_use_args_maps_to_execution_update() {
let line = r#"{"delta":"\"ls\"","id":"call_00","type":"tool_use_args"}"#;
assert_eq!(
zot_event_type(line).as_deref(),
Some("tool_execution_update")
);
}
#[test]
fn zot_tool_result_maps_to_execution_end() {
let line = r#"{"content":[{"text":"...","type":"text"}],"id":"call_00","is_error":false,"type":"tool_result"}"#;
assert_eq!(zot_event_type(line).as_deref(), Some("tool_execution_end"));
}
#[test]
fn zot_usage_no_state_change() {
let line = r#"{"cache_read":896,"input":14,"output":33,"type":"usage"}"#;
assert_eq!(zot_event_type(line), None);
assert_eq!(
apply_zot_event(AgentState::Working, line),
AgentState::Working
);
}
#[test]
fn zot_unknown_event_passthrough() {
let line = r#"{"type":"some_future_zot_event","data":{}}"#;
assert_eq!(
zot_event_type(line).as_deref(),
Some("some_future_zot_event")
);
}
#[test]
fn zot_bad_json_returns_none() {
assert_eq!(zot_event_type("not json"), None);
assert_eq!(zot_event_type(""), None);
assert_eq!(zot_event_type(r#"{"no_type_field": true}"#), None);
assert_eq!(
apply_zot_event(AgentState::Idle, "not json"),
AgentState::Idle
);
}
#[test]
fn agent_runtime_default_is_pi() {
assert_eq!(AgentRuntime::default(), AgentRuntime::Pi);
}
#[test]
fn agent_runtime_serialization() {
assert_eq!(serde_json::to_string(&AgentRuntime::Pi).unwrap(), r#""pi""#);
assert_eq!(
serde_json::to_string(&AgentRuntime::Zot).unwrap(),
r#""zot""#
);
}
}