diff --git a/crates/colibri-daemon/src/socket.rs b/crates/colibri-daemon/src/socket.rs index 5711e89..e760461 100644 --- a/crates/colibri-daemon/src/socket.rs +++ b/crates/colibri-daemon/src/socket.rs @@ -398,10 +398,21 @@ async fn cmd_spawn_agent( let agent_label = handle.config.binary.clone(); let stdout = handle.take_stdout().await; - state.glasspane.write().await.attach_pane_at( + // Bind the pane to the harness so its stdout JSONL is parsed with the + // right taxonomy: a `zot` binary emits zot events (normalized via + // zot_event_type); everything else is treated as Pi. + let runtime = match std::path::Path::new(&agent_label) + .file_name() + .and_then(|s| s.to_str()) + { + Some("zot") => colibri_glasspane::AgentRuntime::Zot, + _ => colibri_glasspane::AgentRuntime::Pi, + }; + state.glasspane.write().await.attach_pane_with_runtime( id.clone(), agent_label, SystemTime::now(), + runtime, ); state.agents.insert(id.clone(), handle); diff --git a/crates/colibri-glasspane/src/lib.rs b/crates/colibri-glasspane/src/lib.rs index e0ce3cf..e9c392b 100644 --- a/crates/colibri-glasspane/src/lib.rs +++ b/crates/colibri-glasspane/src/lib.rs @@ -247,6 +247,9 @@ pub struct PiJsonlIngestor { pi_session_id: Option, cwd: Option, last_event_at: Option, + /// Which harness produced the stream. Pi events are read directly; zot + /// events are normalized through `zot_event_type` first. + runtime: AgentRuntime, } impl Default for PiJsonlIngestor { @@ -256,11 +259,20 @@ impl Default for PiJsonlIngestor { pi_session_id: None, cwd: None, last_event_at: None, + runtime: AgentRuntime::Pi, } } } impl PiJsonlIngestor { + /// Ingestor for a specific harness (Pi reads raw, Zot is normalized). + pub fn with_runtime(runtime: AgentRuntime) -> Self { + Self { + runtime, + ..Self::default() + } + } + pub fn state(&self) -> AgentState { self.state } @@ -285,7 +297,15 @@ impl PiJsonlIngestor { observed_at: SystemTime, ) -> Option { let value: Value = serde_json::from_str(line.trim()).ok()?; - let ty = value.get("type")?.as_str()?; + // Pi events use Colibri's taxonomy directly; zot events are normalized + // (e.g. tool_use_start -> tool_execution_start, response success:false + // -> error, response/usage -> skipped). + let ty: String = match self.runtime { + // Pi and Local both emit the Colibri/Pi taxonomy directly. + AgentRuntime::Pi | AgentRuntime::Local => value.get("type")?.as_str()?.to_string(), + AgentRuntime::Zot => zot_event_type(line)?, + }; + let ty = ty.as_str(); self.state = apply_pi_event(self.state, ty); self.last_event_at = Some(observed_at); @@ -330,11 +350,22 @@ pub struct SupervisedPane { impl SupervisedPane { pub fn new(id: impl Into, agent: impl Into, started_at: SystemTime) -> Self { + Self::new_with_runtime(id, agent, started_at, AgentRuntime::Pi) + } + + /// Like [`new`](Self::new) but binds the pane to a specific harness so its + /// stream is parsed correctly (Pi raw vs zot normalized). + pub fn new_with_runtime( + id: impl Into, + agent: impl Into, + started_at: SystemTime, + runtime: AgentRuntime, + ) -> Self { Self { id: id.into(), agent: agent.into(), started_at, - ingestor: PiJsonlIngestor::default(), + ingestor: PiJsonlIngestor::with_runtime(runtime), } } @@ -401,7 +432,19 @@ impl PaneSupervisor { agent: impl Into, started_at: SystemTime, ) -> &mut SupervisedPane { - let pane = SupervisedPane::new(pane_id, agent, started_at); + self.attach_pane_with_runtime(pane_id, agent, started_at, AgentRuntime::Pi) + } + + /// Attach a pane bound to a specific harness so its stdout JSONL is parsed + /// with the right taxonomy (Pi raw vs zot normalized). + pub fn attach_pane_with_runtime( + &mut self, + pane_id: impl Into, + agent: impl Into, + started_at: SystemTime, + runtime: AgentRuntime, + ) -> &mut SupervisedPane { + let pane = SupervisedPane::new_with_runtime(pane_id, agent, started_at, runtime); self.panes.entry(pane.id.clone()).or_insert(pane) } @@ -945,6 +988,35 @@ mod zot_runtime_tests { assert_eq!(apply_zot_event(AgentState::Idle, line), AgentState::Working); } + // Runtime-aware streaming path: a Zot-bound pane normalizes raw zot events + // (incl. tool_use_*) through the same PaneSupervisor.ingest_line_at the + // daemon's stdout reader uses — not just incidentally via Pi names. + #[test] + fn zot_pane_streams_tool_loop_to_done() { + let t0 = std::time::UNIX_EPOCH; + let mut sup = PaneSupervisor::new(); + sup.attach_pane_with_runtime("pane-1", "zot", t0, AgentRuntime::Zot); + + let raw_zot = [ + r#"{"step":1,"type":"turn_start"}"#, + r#"{"type":"tool_use_start","name":"bash"}"#, // -> tool_execution_start + r#"{"type":"tool_result"}"#, // -> tool_execution_end + r#"{"type":"usage"}"#, // -> skipped + ]; + for (i, line) in raw_zot.iter().enumerate() { + sup.ingest_line_at("pane-1", line, t0 + Duration::from_secs(i as u64)); + } + // mid tool-loop: Working (precise, via normalized tool_execution_* events) + assert_eq!(sup.get("pane-1").unwrap().state(), AgentState::Working); + + sup.ingest_line_at( + "pane-1", + r#"{"type":"turn_end"}"#, + t0 + Duration::from_secs(9), + ); + assert_eq!(sup.get("pane-1").unwrap().state(), AgentState::Done); + } + #[test] fn zot_text_delta_maps_to_message_update() { let line = r#"{"delta":"Hello","type":"text_delta"}"#;