feat(glasspane): runtime-aware ingestion so zot panes parse precisely (Sam & Claude) #29
2 changed files with 87 additions and 4 deletions
|
|
@ -398,10 +398,21 @@ async fn cmd_spawn_agent(
|
|||
let agent_label = handle.config.binary.clone();
|
||||
let stdout = handle.take_stdout().await;
|
||||
|
||||
state.glasspane.write().await.attach_pane_at(
|
||||
// Bind the pane to the harness so its stdout JSONL is parsed with the
|
||||
// right taxonomy: a `zot` binary emits zot events (normalized via
|
||||
// zot_event_type); everything else is treated as Pi.
|
||||
let runtime = match std::path::Path::new(&agent_label)
|
||||
.file_name()
|
||||
.and_then(|s| s.to_str())
|
||||
{
|
||||
Some("zot") => colibri_glasspane::AgentRuntime::Zot,
|
||||
_ => colibri_glasspane::AgentRuntime::Pi,
|
||||
};
|
||||
state.glasspane.write().await.attach_pane_with_runtime(
|
||||
id.clone(),
|
||||
agent_label,
|
||||
SystemTime::now(),
|
||||
runtime,
|
||||
);
|
||||
state.agents.insert(id.clone(), handle);
|
||||
|
||||
|
|
|
|||
|
|
@ -247,6 +247,9 @@ pub struct PiJsonlIngestor {
|
|||
pi_session_id: Option<String>,
|
||||
cwd: Option<String>,
|
||||
last_event_at: Option<SystemTime>,
|
||||
/// Which harness produced the stream. Pi events are read directly; zot
|
||||
/// events are normalized through `zot_event_type` first.
|
||||
runtime: AgentRuntime,
|
||||
}
|
||||
|
||||
impl Default for PiJsonlIngestor {
|
||||
|
|
@ -256,11 +259,20 @@ impl Default for PiJsonlIngestor {
|
|||
pi_session_id: None,
|
||||
cwd: None,
|
||||
last_event_at: None,
|
||||
runtime: AgentRuntime::Pi,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PiJsonlIngestor {
|
||||
/// Ingestor for a specific harness (Pi reads raw, Zot is normalized).
|
||||
pub fn with_runtime(runtime: AgentRuntime) -> Self {
|
||||
Self {
|
||||
runtime,
|
||||
..Self::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn state(&self) -> AgentState {
|
||||
self.state
|
||||
}
|
||||
|
|
@ -285,7 +297,15 @@ impl PiJsonlIngestor {
|
|||
observed_at: SystemTime,
|
||||
) -> Option<PiStreamUpdate> {
|
||||
let value: Value = serde_json::from_str(line.trim()).ok()?;
|
||||
let ty = value.get("type")?.as_str()?;
|
||||
// Pi events use Colibri's taxonomy directly; zot events are normalized
|
||||
// (e.g. tool_use_start -> tool_execution_start, response success:false
|
||||
// -> error, response/usage -> skipped).
|
||||
let ty: String = match self.runtime {
|
||||
// Pi and Local both emit the Colibri/Pi taxonomy directly.
|
||||
AgentRuntime::Pi | AgentRuntime::Local => value.get("type")?.as_str()?.to_string(),
|
||||
AgentRuntime::Zot => zot_event_type(line)?,
|
||||
};
|
||||
let ty = ty.as_str();
|
||||
|
||||
self.state = apply_pi_event(self.state, ty);
|
||||
self.last_event_at = Some(observed_at);
|
||||
|
|
@ -330,11 +350,22 @@ pub struct SupervisedPane {
|
|||
|
||||
impl SupervisedPane {
|
||||
pub fn new(id: impl Into<PaneId>, agent: impl Into<String>, started_at: SystemTime) -> Self {
|
||||
Self::new_with_runtime(id, agent, started_at, AgentRuntime::Pi)
|
||||
}
|
||||
|
||||
/// Like [`new`](Self::new) but binds the pane to a specific harness so its
|
||||
/// stream is parsed correctly (Pi raw vs zot normalized).
|
||||
pub fn new_with_runtime(
|
||||
id: impl Into<PaneId>,
|
||||
agent: impl Into<String>,
|
||||
started_at: SystemTime,
|
||||
runtime: AgentRuntime,
|
||||
) -> Self {
|
||||
Self {
|
||||
id: id.into(),
|
||||
agent: agent.into(),
|
||||
started_at,
|
||||
ingestor: PiJsonlIngestor::default(),
|
||||
ingestor: PiJsonlIngestor::with_runtime(runtime),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -401,7 +432,19 @@ impl PaneSupervisor {
|
|||
agent: impl Into<String>,
|
||||
started_at: SystemTime,
|
||||
) -> &mut SupervisedPane {
|
||||
let pane = SupervisedPane::new(pane_id, agent, started_at);
|
||||
self.attach_pane_with_runtime(pane_id, agent, started_at, AgentRuntime::Pi)
|
||||
}
|
||||
|
||||
/// Attach a pane bound to a specific harness so its stdout JSONL is parsed
|
||||
/// with the right taxonomy (Pi raw vs zot normalized).
|
||||
pub fn attach_pane_with_runtime(
|
||||
&mut self,
|
||||
pane_id: impl Into<PaneId>,
|
||||
agent: impl Into<String>,
|
||||
started_at: SystemTime,
|
||||
runtime: AgentRuntime,
|
||||
) -> &mut SupervisedPane {
|
||||
let pane = SupervisedPane::new_with_runtime(pane_id, agent, started_at, runtime);
|
||||
self.panes.entry(pane.id.clone()).or_insert(pane)
|
||||
}
|
||||
|
||||
|
|
@ -945,6 +988,35 @@ mod zot_runtime_tests {
|
|||
assert_eq!(apply_zot_event(AgentState::Idle, line), AgentState::Working);
|
||||
}
|
||||
|
||||
// Runtime-aware streaming path: a Zot-bound pane normalizes raw zot events
|
||||
// (incl. tool_use_*) through the same PaneSupervisor.ingest_line_at the
|
||||
// daemon's stdout reader uses — not just incidentally via Pi names.
|
||||
#[test]
|
||||
fn zot_pane_streams_tool_loop_to_done() {
|
||||
let t0 = std::time::UNIX_EPOCH;
|
||||
let mut sup = PaneSupervisor::new();
|
||||
sup.attach_pane_with_runtime("pane-1", "zot", t0, AgentRuntime::Zot);
|
||||
|
||||
let raw_zot = [
|
||||
r#"{"step":1,"type":"turn_start"}"#,
|
||||
r#"{"type":"tool_use_start","name":"bash"}"#, // -> tool_execution_start
|
||||
r#"{"type":"tool_result"}"#, // -> tool_execution_end
|
||||
r#"{"type":"usage"}"#, // -> skipped
|
||||
];
|
||||
for (i, line) in raw_zot.iter().enumerate() {
|
||||
sup.ingest_line_at("pane-1", line, t0 + Duration::from_secs(i as u64));
|
||||
}
|
||||
// mid tool-loop: Working (precise, via normalized tool_execution_* events)
|
||||
assert_eq!(sup.get("pane-1").unwrap().state(), AgentState::Working);
|
||||
|
||||
sup.ingest_line_at(
|
||||
"pane-1",
|
||||
r#"{"type":"turn_end"}"#,
|
||||
t0 + Duration::from_secs(9),
|
||||
);
|
||||
assert_eq!(sup.get("pane-1").unwrap().state(), AgentState::Done);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn zot_text_delta_maps_to_message_update() {
|
||||
let line = r#"{"delta":"Hello","type":"text_delta"}"#;
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue