Add Colibri event parsing foundation

Introduce pure Pi JSON event parsing and cross-host run manifest validation as the first code on the Colibri control branch.

---
Build: pass | Tests: pass — 2467 passed (184 files)
This commit is contained in:
Operator & Codex 2026-05-24 19:33:25 +02:00
parent c5820dec84
commit e5f05abd51
5 changed files with 492 additions and 0 deletions

View file

@ -129,6 +129,13 @@ Suggested phases:
5. **Herdr Linux display** — optional rendering/control surface only.
6. **Deprecation PRs** — remove legacy paths only after gates pass.
Initial implementation modules:
- `src/colibri-pi-events.ts` — pure parser/normalizer for Pi `--mode json`
JSONL events. No FreeBSD dependency, so Linux agents can test it.
- `src/colibri-run-manifest.ts` — validator and compact renderer for
cross-host interagent run manifests.
## Non-Goals For This Branch
- No Herdr FreeBSD port.

View file

@ -0,0 +1,107 @@
import { describe, expect, it } from 'vitest';
import {
normalizePiJsonEvent,
parsePiJsonLine,
parsePiJsonLines,
} from './colibri-pi-events.js';
const OBSERVED_AT = '2026-05-14T12:00:00.000Z';
describe('colibri pi event parsing', () => {
it('normalizes the pi session header', () => {
const event = normalizePiJsonEvent(
{
type: 'session',
version: 3,
id: 'session-1',
cwd: '/repo',
},
OBSERVED_AT,
);
expect(event).toMatchObject({
source: 'pi-json',
kind: 'pi.session_started',
observedAt: OBSERVED_AT,
piType: 'session',
sessionId: 'session-1',
cwd: '/repo',
});
});
it('extracts text deltas from message updates', () => {
const result = parsePiJsonLine(
JSON.stringify({
type: 'message_update',
assistantMessageEvent: { type: 'text_delta', delta: 'hello' },
}),
OBSERVED_AT,
);
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.event.kind).toBe('pi.message_text_delta');
expect(result.event.textDelta).toBe('hello');
});
it('normalizes tool lifecycle events', () => {
const result = parsePiJsonLine(
JSON.stringify({
type: 'tool_execution_end',
toolCallId: 'call-1',
toolName: 'bash',
isError: true,
}),
OBSERVED_AT,
);
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.event).toMatchObject({
kind: 'pi.tool_finished',
toolCallId: 'call-1',
toolName: 'bash',
toolIsError: true,
});
});
it('keeps unknown pi event types without failing', () => {
const result = parsePiJsonLine(
JSON.stringify({ type: 'future_event', value: 1 }),
OBSERVED_AT,
);
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.event.kind).toBe('pi.unknown');
expect(result.event.piType).toBe('future_event');
expect(result.event.raw).toMatchObject({ value: 1 });
});
it('reports malformed JSON lines', () => {
const result = parsePiJsonLine('{not-json', OBSERVED_AT);
expect(result.ok).toBe(false);
if (result.ok) return;
expect(result.error).toContain('invalid JSON');
});
it('parses newline-delimited pi events', () => {
const results = parsePiJsonLines(
[
JSON.stringify({ type: 'agent_start' }),
JSON.stringify({ type: 'turn_start' }),
JSON.stringify({ type: 'turn_end' }),
].join('\n'),
OBSERVED_AT,
);
expect(results).toHaveLength(3);
expect(results.map((result) => (result.ok ? result.event.kind : 'error'))).toEqual([
'pi.agent_started',
'pi.turn_started',
'pi.turn_finished',
]);
});
});

192
src/colibri-pi-events.ts Normal file
View file

@ -0,0 +1,192 @@
export type ColibriPiEventKind =
| 'pi.session_started'
| 'pi.agent_started'
| 'pi.agent_finished'
| 'pi.turn_started'
| 'pi.turn_finished'
| 'pi.message_started'
| 'pi.message_text_delta'
| 'pi.message_finished'
| 'pi.tool_started'
| 'pi.tool_updated'
| 'pi.tool_finished'
| 'pi.queue_updated'
| 'pi.compaction_started'
| 'pi.compaction_finished'
| 'pi.retry_started'
| 'pi.retry_finished'
| 'pi.unknown';
export interface ColibriPiEvent {
source: 'pi-json';
kind: ColibriPiEventKind;
observedAt: string;
piType: string;
sessionId?: string;
cwd?: string;
turnStatus?: 'started' | 'finished';
toolCallId?: string;
toolName?: string;
toolIsError?: boolean;
textDelta?: string;
queue?: {
steering: string[];
followUp: string[];
};
raw: Record<string, unknown>;
}
export type ColibriPiParseResult =
| { ok: true; event: ColibriPiEvent }
| { ok: false; error: string; line: string };
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}
function stringValue(value: unknown): string | undefined {
return typeof value === 'string' ? value : undefined;
}
function stringArrayValue(value: unknown): string[] {
if (!Array.isArray(value)) return [];
return value.filter((entry): entry is string => typeof entry === 'string');
}
function nowIso(): string {
return new Date().toISOString();
}
export function normalizePiJsonEvent(
raw: Record<string, unknown>,
observedAt: string = nowIso(),
): ColibriPiEvent {
const piType = stringValue(raw.type) ?? 'unknown';
const base: ColibriPiEvent = {
source: 'pi-json',
kind: 'pi.unknown',
observedAt,
piType,
raw,
};
if (piType === 'session') {
return {
...base,
kind: 'pi.session_started',
sessionId: stringValue(raw.id),
cwd: stringValue(raw.cwd),
};
}
if (piType === 'agent_start') return { ...base, kind: 'pi.agent_started' };
if (piType === 'agent_end') return { ...base, kind: 'pi.agent_finished' };
if (piType === 'turn_start') {
return { ...base, kind: 'pi.turn_started', turnStatus: 'started' };
}
if (piType === 'turn_end') {
return { ...base, kind: 'pi.turn_finished', turnStatus: 'finished' };
}
if (piType === 'message_start') return { ...base, kind: 'pi.message_started' };
if (piType === 'message_end') return { ...base, kind: 'pi.message_finished' };
if (piType === 'message_update') {
const update = isRecord(raw.assistantMessageEvent)
? raw.assistantMessageEvent
: undefined;
if (update?.type === 'text_delta') {
return {
...base,
kind: 'pi.message_text_delta',
textDelta: stringValue(update.delta) ?? '',
};
}
return base;
}
if (piType === 'tool_execution_start') {
return {
...base,
kind: 'pi.tool_started',
toolCallId: stringValue(raw.toolCallId),
toolName: stringValue(raw.toolName),
};
}
if (piType === 'tool_execution_update') {
return {
...base,
kind: 'pi.tool_updated',
toolCallId: stringValue(raw.toolCallId),
toolName: stringValue(raw.toolName),
};
}
if (piType === 'tool_execution_end') {
return {
...base,
kind: 'pi.tool_finished',
toolCallId: stringValue(raw.toolCallId),
toolName: stringValue(raw.toolName),
toolIsError: raw.isError === true,
};
}
if (piType === 'queue_update') {
return {
...base,
kind: 'pi.queue_updated',
queue: {
steering: stringArrayValue(raw.steering),
followUp: stringArrayValue(raw.followUp),
},
};
}
if (piType === 'compaction_start') {
return { ...base, kind: 'pi.compaction_started' };
}
if (piType === 'compaction_end') {
return { ...base, kind: 'pi.compaction_finished' };
}
if (piType === 'auto_retry_start') {
return { ...base, kind: 'pi.retry_started' };
}
if (piType === 'auto_retry_end') {
return { ...base, kind: 'pi.retry_finished' };
}
return base;
}
export function parsePiJsonLine(
line: string,
observedAt: string = nowIso(),
): ColibriPiParseResult {
const trimmed = line.trim();
if (!trimmed) return { ok: false, error: 'empty line', line };
let parsed: unknown;
try {
parsed = JSON.parse(trimmed);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
return { ok: false, error: `invalid JSON: ${message}`, line };
}
if (!isRecord(parsed)) {
return { ok: false, error: 'JSON line is not an object', line };
}
return { ok: true, event: normalizePiJsonEvent(parsed, observedAt) };
}
export function parsePiJsonLines(
text: string,
observedAt: string = nowIso(),
): ColibriPiParseResult[] {
return text
.split(/\r?\n/u)
.filter((line) => line.trim().length > 0)
.map((line) => parsePiJsonLine(line, observedAt));
}

View file

@ -0,0 +1,102 @@
import { describe, expect, it } from 'vitest';
import {
COLIBRI_RUN_MANIFEST_SCHEMA,
parseColibriRunManifest,
parseColibriRunManifestJson,
summarizeColibriRunManifest,
} from './colibri-run-manifest.js';
const VALID_MANIFEST = {
schema: COLIBRI_RUN_MANIFEST_SCHEMA,
test_id: 'osa-clean-download-20260514T120000Z',
role: 'server-capture',
host: 'osa',
agent: 'codex-iso-builder',
started_at: '2026-05-14T12:00:00Z',
ended_at: '2026-05-14T12:01:30Z',
protocols: {
download: 'HTTPS over TCP/443',
capture: 'local libpcap',
artifact_transfer: 'SSH over TCP/22 when needed',
},
network: {
capture_interface: 'vtnet0',
remote_host: 'debby',
},
artifacts: {
pcapng: '/home/clawdie/clawdie-iso/tmp/network-tests/test/osa.pcapng',
capinfos: '/home/clawdie/clawdie-iso/tmp/network-tests/test/capinfos.txt',
},
summary: {
retransmits_observed: false,
},
raw_transfer_required: false,
notes: ['summary-first exchange'],
};
describe('colibri run manifest', () => {
it('parses a valid interagent run manifest', () => {
const result = parseColibriRunManifest(VALID_MANIFEST);
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.manifest.test_id).toBe('osa-clean-download-20260514T120000Z');
expect(result.manifest.protocols.download).toBe('HTTPS over TCP/443');
expect(result.manifest.artifacts.pcapng).toContain('osa.pcapng');
});
it('applies safe defaults for optional structured fields', () => {
const result = parseColibriRunManifest({
schema: COLIBRI_RUN_MANIFEST_SCHEMA,
test_id: 'minimal-run',
role: 'client-download',
host: 'debby',
started_at: '2026-05-14T12:00:00Z',
});
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.manifest.protocols).toEqual({});
expect(result.manifest.network).toEqual({});
expect(result.manifest.artifacts).toEqual({});
expect(result.manifest.summary).toEqual({});
expect(result.manifest.raw_transfer_required).toBe(false);
expect(result.manifest.notes).toEqual([]);
});
it('rejects manifests with the wrong schema', () => {
const result = parseColibriRunManifest({
...VALID_MANIFEST,
schema: 'other.schema',
});
expect(result.ok).toBe(false);
if (result.ok) return;
expect(result.errors.join('\n')).toContain('schema');
});
it('parses JSON text and reports invalid JSON', () => {
expect(parseColibriRunManifestJson(JSON.stringify(VALID_MANIFEST)).ok).toBe(
true,
);
const result = parseColibriRunManifestJson('{not-json');
expect(result.ok).toBe(false);
if (result.ok) return;
expect(result.errors[0]).toContain('invalid JSON');
});
it('renders a compact summary for agents and handoffs', () => {
const result = parseColibriRunManifest(VALID_MANIFEST);
expect(result.ok).toBe(true);
if (!result.ok) return;
const summary = summarizeColibriRunManifest(result.manifest);
expect(summary).toContain('<colibri-run-manifest>');
expect(summary).toContain('test_id=osa-clean-download-20260514T120000Z');
expect(summary).toContain('protocols=download=HTTPS over TCP/443');
expect(summary).toContain('artifacts=pcapng,capinfos');
expect(summary).toContain('raw_transfer_required=NO');
});
});

View file

@ -0,0 +1,84 @@
import { z } from 'zod';
export const COLIBRI_RUN_MANIFEST_SCHEMA =
'clawdie.interagent.run-manifest.v1' as const;
export const ColibriRunManifestSchema = z.object({
schema: z.literal(COLIBRI_RUN_MANIFEST_SCHEMA),
test_id: z.string().min(1),
role: z.string().min(1),
host: z.string().min(1),
agent: z.string().min(1).optional(),
started_at: z.string().min(1),
ended_at: z.string().min(1).nullable().optional(),
protocols: z
.object({
download: z.string().min(1).optional(),
capture: z.string().min(1).optional(),
artifact_transfer: z.string().min(1).optional(),
})
.default({}),
network: z.record(z.string(), z.unknown()).default({}),
artifacts: z.record(z.string(), z.string()).default({}),
summary: z.record(z.string(), z.unknown()).default({}),
raw_transfer_required: z.boolean().default(false),
notes: z.array(z.string()).default([]),
});
export type ColibriRunManifest = z.infer<typeof ColibriRunManifestSchema>;
export type ColibriRunManifestParseResult =
| { ok: true; manifest: ColibriRunManifest }
| { ok: false; errors: string[] };
export function parseColibriRunManifest(
input: unknown,
): ColibriRunManifestParseResult {
const parsed = ColibriRunManifestSchema.safeParse(input);
if (parsed.success) return { ok: true, manifest: parsed.data };
return {
ok: false,
errors: parsed.error.issues.map((issue) => {
const path = issue.path.length > 0 ? issue.path.join('.') : '(root)';
return `${path}: ${issue.message}`;
}),
};
}
export function parseColibriRunManifestJson(
text: string,
): ColibriRunManifestParseResult {
let raw: unknown;
try {
raw = JSON.parse(text);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
return { ok: false, errors: [`invalid JSON: ${message}`] };
}
return parseColibriRunManifest(raw);
}
export function summarizeColibriRunManifest(
manifest: ColibriRunManifest,
): string {
const artifactNames = Object.keys(manifest.artifacts);
const protocolNames = Object.entries(manifest.protocols)
.filter((entry): entry is [string, string] => typeof entry[1] === 'string')
.map(([key, value]) => `${key}=${value}`);
return [
'<colibri-run-manifest>',
`schema=${manifest.schema}`,
`test_id=${manifest.test_id}`,
`role=${manifest.role}`,
`host=${manifest.host}`,
`agent=${manifest.agent ?? 'unknown'}`,
`started_at=${manifest.started_at}`,
`ended_at=${manifest.ended_at ?? 'pending'}`,
`protocols=${protocolNames.join(',') || 'none'}`,
`artifacts=${artifactNames.join(',') || 'none'}`,
`raw_transfer_required=${manifest.raw_transfer_required ? 'YES' : 'NO'}`,
'</colibri-run-manifest>',
].join('\n');
}