feat: add zot runtime event normalization scaffold #3
1 changed files with 243 additions and 2 deletions
|
|
@ -5,8 +5,8 @@
|
||||||
//! we re-implement the API/state model, not the code.
|
//! we re-implement the API/state model, not the code.
|
||||||
//!
|
//!
|
||||||
//! Key bet (see `docs/COLIBRI-GLASSPANE-DESIGN.md`): agent state is **derived
|
//! Key bet (see `docs/COLIBRI-GLASSPANE-DESIGN.md`): agent state is **derived
|
||||||
//! deterministically from Pi `--mode json` events** (the colibri-pi-events
|
//! deterministically from agent events** (Pi `--mode json` events and zot RPC
|
||||||
//! taxonomy), not by screen-scraping terminal output.
|
//! events), not by screen-scraping terminal output.
|
||||||
//!
|
//!
|
||||||
//! Phase 3 starts the supervision layer: Colibri-owned pane ids are distinct
|
//! Phase 3 starts the supervision layer: Colibri-owned pane ids are distinct
|
||||||
//! from Pi session ids, ingestion is streaming, `last_event_at` is kept as real
|
//! from Pi session ids, ingestion is streaming, `last_event_at` is kept as real
|
||||||
|
|
@ -85,6 +85,87 @@ where
|
||||||
events.into_iter().fold(AgentState::Idle, apply_pi_event)
|
events.into_iter().fold(AgentState::Idle, apply_pi_event)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// zot runtime — normalized event taxonomy (Phase 1 scaffold)
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/// Supported agent runtimes. Pi is the current default; zot RPC is the second
|
||||||
|
/// harness being integrated via a normalized event layer.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum AgentRuntime {
|
||||||
|
#[default]
|
||||||
|
Pi,
|
||||||
|
Zot,
|
||||||
|
Local,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse one line of zot RPC NDJSON output and return a normalized event type.
|
||||||
|
/// Only the `type` field is used — unknown/missing types return `None`.
|
||||||
|
/// Permissive: zot emits `tool_use_start`, `tool_use_args`, `tool_use_end` in
|
||||||
|
/// addition to `tool_call`, `tool_progress`, `tool_result` during tool loops.
|
||||||
|
pub fn zot_event_type(line: &str) -> Option<String> {
|
||||||
|
let value: Value = serde_json::from_str(line.trim()).ok()?;
|
||||||
|
let ty = value.get("type")?.as_str()?;
|
||||||
|
|
||||||
|
// Map zot RPC event types to normalized Colibri event types.
|
||||||
|
// This is intentionally permissive — unknown types are passed through
|
||||||
|
// unchanged so forward-compatible zot events don't break the parser.
|
||||||
|
let normalized = match ty {
|
||||||
|
// Session lifecycle
|
||||||
|
"turn_start" => "turn_start",
|
||||||
|
"turn_end" => "turn_end",
|
||||||
|
"done" => "agent_end",
|
||||||
|
"error" => "error",
|
||||||
|
|
||||||
|
// Message flow
|
||||||
|
"user_message" => "message_update",
|
||||||
|
"assistant_start" => "message_start",
|
||||||
|
"text_delta" => "message_update",
|
||||||
|
"assistant_message" => "message_end",
|
||||||
|
|
||||||
|
// Tool calls — zot emits both `tool_call` and `tool_use_*` events
|
||||||
|
"tool_call" => "tool_execution_start",
|
||||||
|
"tool_use_start" => "tool_execution_start",
|
||||||
|
"tool_use_args" => "tool_execution_update",
|
||||||
|
"tool_progress" => "tool_execution_update",
|
||||||
|
"tool_use_end" => "tool_execution_update",
|
||||||
|
"tool_result" => "tool_execution_end",
|
||||||
|
|
||||||
|
// Command acknowledgment — no state change unless success:false
|
||||||
|
"response" => {
|
||||||
|
if let Some(false) = value.get("success").and_then(|v| v.as_bool()) {
|
||||||
|
return Some("error".to_string());
|
||||||
|
}
|
||||||
|
return None; // success:true → no state change
|
||||||
|
}
|
||||||
|
|
||||||
|
// Usage / stats — no state change
|
||||||
|
"usage" => return None,
|
||||||
|
|
||||||
|
// Unknown — pass through unchanged
|
||||||
|
_ => ty,
|
||||||
|
};
|
||||||
|
Some(normalized.to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fold a zot event into the agent state. Uses the same `apply_pi_event`
|
||||||
|
/// logic because the event taxonomy is normalized to the same type names.
|
||||||
|
pub fn apply_zot_event(state: AgentState, zot_line: &str) -> AgentState {
|
||||||
|
match zot_event_type(zot_line) {
|
||||||
|
Some(ref ty) => apply_pi_event(state, ty),
|
||||||
|
None => state,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fold a sequence of zot RPC JSONL lines into a final state.
|
||||||
|
pub fn fold_zot_events<'a, I>(lines: I) -> AgentState
|
||||||
|
where
|
||||||
|
I: IntoIterator<Item = &'a str>,
|
||||||
|
{
|
||||||
|
lines.into_iter().fold(AgentState::Idle, apply_zot_event)
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// Phase 2 — real Pi `--mode json` JSONL ingestion
|
// Phase 2 — real Pi `--mode json` JSONL ingestion
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
@ -814,3 +895,163 @@ mod tests {
|
||||||
assert!(!snapshot.panes[0].stalled);
|
assert!(!snapshot.panes[0].stalled);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// zot runtime tests — uses real captured zot RPC event lines
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod zot_runtime_tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
// Real zot RPC output captured from: echo '{"id":"1","type":"prompt","message":"say hi"}' | zot rpc --provider deepseek --model deepseek-v4-pro
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn zot_hello_session_folds_to_done() {
|
||||||
|
let lines = vec![
|
||||||
|
r#"{"command":"prompt","data":{"started":true},"id":"1","success":true,"type":"response"}"#,
|
||||||
|
r#"{"content":[{"text":"say hi","type":"text"}],"time":"...","type":"user_message"}"#,
|
||||||
|
r#"{"step":1,"type":"turn_start"}"#,
|
||||||
|
r#"{"type":"assistant_start"}"#,
|
||||||
|
r#"{"delta":"Hi","type":"text_delta"}"#,
|
||||||
|
r#"{"delta":" there!","type":"text_delta"}"#,
|
||||||
|
r#"{"type":"usage"}"#,
|
||||||
|
r#"{"content":[{"text":"Hi there!","type":"text"}],"time":"...","type":"assistant_message"}"#,
|
||||||
|
r#"{"stop":"end","type":"turn_end"}"#,
|
||||||
|
r#"{"type":"done"}"#,
|
||||||
|
];
|
||||||
|
let state = fold_zot_events(lines.iter().copied());
|
||||||
|
assert_eq!(state, AgentState::Done);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn zot_response_success_no_state_change() {
|
||||||
|
let line = r#"{"command":"prompt","data":{"started":true},"id":"1","success":true,"type":"response"}"#;
|
||||||
|
assert_eq!(zot_event_type(line), None);
|
||||||
|
assert_eq!(apply_zot_event(AgentState::Idle, line), AgentState::Idle);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn zot_response_failure_maps_to_error() {
|
||||||
|
let line = r#"{"command":"prompt","data":{},"id":"1","success":false,"type":"response","error":"something"}"#;
|
||||||
|
assert_eq!(zot_event_type(line).as_deref(), Some("error"));
|
||||||
|
assert_eq!(apply_zot_event(AgentState::Idle, line), AgentState::Error);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn zot_turn_start_maps_to_working() {
|
||||||
|
let line = r#"{"step":1,"type":"turn_start"}"#;
|
||||||
|
assert_eq!(zot_event_type(line).as_deref(), Some("turn_start"));
|
||||||
|
assert_eq!(apply_zot_event(AgentState::Idle, line), AgentState::Working);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn zot_text_delta_maps_to_message_update() {
|
||||||
|
let line = r#"{"delta":"Hello","type":"text_delta"}"#;
|
||||||
|
assert_eq!(zot_event_type(line).as_deref(), Some("message_update"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn zot_assistant_message_is_message_end_not_done() {
|
||||||
|
// Critical: assistant_message does NOT end the pane — zot may emit
|
||||||
|
// them during tool loops. Only turn_end/done signal completion.
|
||||||
|
let line =
|
||||||
|
r#"{"content":[{"text":"Hi!","type":"text"}],"time":"...","type":"assistant_message"}"#;
|
||||||
|
assert_eq!(zot_event_type(line).as_deref(), Some("message_end"));
|
||||||
|
assert_eq!(
|
||||||
|
apply_zot_event(AgentState::Working, line),
|
||||||
|
AgentState::Working
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn zot_turn_end_maps_to_done() {
|
||||||
|
let line = r#"{"stop":"end","type":"turn_end"}"#;
|
||||||
|
assert_eq!(zot_event_type(line).as_deref(), Some("turn_end"));
|
||||||
|
assert_eq!(apply_zot_event(AgentState::Working, line), AgentState::Done);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn zot_done_maps_to_agent_end() {
|
||||||
|
let line = r#"{"type":"done"}"#;
|
||||||
|
assert_eq!(zot_event_type(line).as_deref(), Some("agent_end"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn zot_tool_call_maps_to_execution_start() {
|
||||||
|
let line = r#"{"args":{"command":"ls"},"id":"call_00","name":"bash","type":"tool_call"}"#;
|
||||||
|
assert_eq!(
|
||||||
|
zot_event_type(line).as_deref(),
|
||||||
|
Some("tool_execution_start")
|
||||||
|
);
|
||||||
|
assert_eq!(apply_zot_event(AgentState::Idle, line), AgentState::Working);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn zot_tool_use_start_maps_to_execution_start() {
|
||||||
|
let line = r#"{"id":"call_00","name":"bash","type":"tool_use_start"}"#;
|
||||||
|
assert_eq!(
|
||||||
|
zot_event_type(line).as_deref(),
|
||||||
|
Some("tool_execution_start")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn zot_tool_use_args_maps_to_execution_update() {
|
||||||
|
let line = r#"{"delta":"\"ls\"","id":"call_00","type":"tool_use_args"}"#;
|
||||||
|
assert_eq!(
|
||||||
|
zot_event_type(line).as_deref(),
|
||||||
|
Some("tool_execution_update")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn zot_tool_result_maps_to_execution_end() {
|
||||||
|
let line = r#"{"content":[{"text":"...","type":"text"}],"id":"call_00","is_error":false,"type":"tool_result"}"#;
|
||||||
|
assert_eq!(zot_event_type(line).as_deref(), Some("tool_execution_end"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn zot_usage_no_state_change() {
|
||||||
|
let line = r#"{"cache_read":896,"input":14,"output":33,"type":"usage"}"#;
|
||||||
|
assert_eq!(zot_event_type(line), None);
|
||||||
|
assert_eq!(
|
||||||
|
apply_zot_event(AgentState::Working, line),
|
||||||
|
AgentState::Working
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn zot_unknown_event_passthrough() {
|
||||||
|
let line = r#"{"type":"some_future_zot_event","data":{}}"#;
|
||||||
|
assert_eq!(
|
||||||
|
zot_event_type(line).as_deref(),
|
||||||
|
Some("some_future_zot_event")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn zot_bad_json_returns_none() {
|
||||||
|
assert_eq!(zot_event_type("not json"), None);
|
||||||
|
assert_eq!(zot_event_type(""), None);
|
||||||
|
assert_eq!(zot_event_type(r#"{"no_type_field": true}"#), None);
|
||||||
|
assert_eq!(
|
||||||
|
apply_zot_event(AgentState::Idle, "not json"),
|
||||||
|
AgentState::Idle
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn agent_runtime_default_is_pi() {
|
||||||
|
assert_eq!(AgentRuntime::default(), AgentRuntime::Pi);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn agent_runtime_serialization() {
|
||||||
|
assert_eq!(serde_json::to_string(&AgentRuntime::Pi).unwrap(), r#""pi""#);
|
||||||
|
assert_eq!(
|
||||||
|
serde_json::to_string(&AgentRuntime::Zot).unwrap(),
|
||||||
|
r#""zot""#
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue