provider: replay reasoning items and repair orphan tool results

This commit is contained in:
patriceckhart 2026-05-05 15:00:41 +02:00
parent ae4e019ee4
commit 7134fb7c2a
9 changed files with 226 additions and 63 deletions

View file

@ -113,7 +113,7 @@ func (a *extToolAdapter) NewExtensionTool(info ExtensionToolInfo) core.Tool {
// dropped to keep the per-extension stream sane.
func trimMessagesForResume(msgs []provider.Message, keepTail int) []provider.Message {
if keepTail <= 0 || len(msgs) <= keepTail {
return msgs
return provider.RepairOrphanedToolResults(msgs)
}
var out []provider.Message
start := len(msgs) - keepTail
@ -128,7 +128,7 @@ func trimMessagesForResume(msgs []provider.Message, keepTail int) []provider.Mes
start++
}
out = append(out, msgs[start:]...)
return out
return provider.RepairOrphanedToolResults(out)
}
func fanoutAgentEvent(mgr *extensions.Manager, ev core.AgentEvent) {
@ -586,7 +586,7 @@ func runInteractive(ctx context.Context, args Args, version string) error {
return err
}
fullMsgCount := len(msgs)
msgs = trimMessagesForResume(msgs, 20)
msgs = trimMessagesForResume(msgs, 100)
persistMu.Lock()
// Flush any unsaved messages to the old session before swapping.
// Per-message persistence keeps sessBaselineMsgs current, so

View file

@ -124,36 +124,7 @@ func (a *Agent) Compact(ctx context.Context, keepTail int, sink func(delta strin
// compaction when the tail preserves a tool_result but the tool_use
// that produced it was summarized away.
func repairOrphanedToolResults(msgs []provider.Message) []provider.Message {
// Collect all tool_use IDs present in the messages.
useIDs := map[string]bool{}
for _, m := range msgs {
for _, c := range m.Content {
if tc, ok := c.(provider.ToolCallBlock); ok {
useIDs[tc.ID] = true
}
}
}
// Filter out tool_result blocks referencing missing tool_use IDs.
out := make([]provider.Message, 0, len(msgs))
for _, m := range msgs {
var filtered []provider.Content
for _, c := range m.Content {
if tr, ok := c.(provider.ToolResultBlock); ok {
if !useIDs[tr.CallID] {
continue // orphaned
}
}
filtered = append(filtered, c)
}
if len(filtered) > 0 {
copy := m
copy.Content = filtered
out = append(out, copy)
}
// Drop messages that became empty after filtering.
}
return out
return provider.RepairOrphanedToolResults(msgs)
}
// serializeTranscript renders a list of provider.Message into a plain

View file

@ -594,12 +594,15 @@ func hydrateMessage(lineBytes []byte) (provider.Message, error) {
msg := provider.Message{Role: row.Message.Role, Time: row.Message.Time}
for _, raw := range row.Message.Content {
var head struct {
Text string `json:"text"`
MimeType string `json:"mime_type"`
Data []byte `json:"data"`
ID string `json:"id"`
Name string `json:"name"`
CallID string `json:"call_id"`
Text string `json:"text"`
MimeType string `json:"mime_type"`
Data []byte `json:"data"`
ID string `json:"id"`
Name string `json:"name"`
CallID string `json:"call_id"`
ReasoningID string `json:"reasoning_id"`
Summary string `json:"summary"`
Encrypted string `json:"encrypted_content"`
// ToolCallBlock also has Arguments, ToolResultBlock has Content + IsError
}
if err := json.Unmarshal(raw, &head); err != nil {
@ -607,6 +610,12 @@ func hydrateMessage(lineBytes []byte) (provider.Message, error) {
}
// Discriminate by presence of fields.
switch {
case head.ReasoningID != "" || head.Encrypted != "":
msg.Content = append(msg.Content, provider.ReasoningBlock{
ID: head.ReasoningID,
Summary: head.Summary,
Encrypted: head.Encrypted,
})
case head.Name != "" && head.ID != "":
var tc struct {
ID string `json:"id"`

View file

@ -262,6 +262,7 @@ func (c *anthropicClient) buildRequest(req Request) (*anthRequest, error) {
// emitting them separately keeps each message bit-stable across
// turns, so the cache prefix matches for the entire history up
// to the newest block.
req.Messages = RepairOrphanedToolResults(req.Messages)
for _, msg := range req.Messages {
renameTools := c.oauthTok != ""
switch msg.Role {
@ -363,7 +364,7 @@ func convertAnthContent(blocks []Content, renameTools bool) []interface{} {
})
case ToolCallBlock:
args := v.Arguments
if len(args) == 0 {
if len(args) == 0 || !json.Valid(args) {
args = json.RawMessage("{}")
}
name := v.Name

View file

@ -177,6 +177,7 @@ func (c *openaiClient) buildRequest(req Request) (*oaiRequest, error) {
out.Messages = append(out.Messages, oaiMessage{Role: "system", Content: req.System})
}
req.Messages = RepairOrphanedToolResults(req.Messages)
for _, msg := range req.Messages {
switch msg.Role {
case RoleUser:

View file

@ -106,6 +106,24 @@ type codexFunctionCallOutput struct {
Output string `json:"output"` // string (or ResponseFunctionCallOutputItemList for images; v1 only uses string)
}
// codexReasoningItem mirrors the Responses API "reasoning" output item.
// We capture it on incoming streams and replay it verbatim on follow-up
// requests: the API rejects assistant tool-call replays without it when
// thinking is enabled.
type codexReasoningItem struct {
Type string `json:"type"` // "reasoning"
ID string `json:"id,omitempty"`
EncryptedContent string `json:"encrypted_content,omitempty"`
// Summary is required by the Responses API even when no summary text
// was streamed; encode an empty array rather than omitting the field.
Summary []codexReasoningSummary `json:"summary"`
}
type codexReasoningSummary struct {
Type string `json:"type"` // "summary_text"
Text string `json:"text"`
}
type codexTool struct {
Type string `json:"type"` // "function"
Name string `json:"name"`
@ -159,6 +177,7 @@ func (c *codexClient) buildRequest(req Request) (*codexRequest, error) {
}
msgIdx := 0
req.Messages = RepairOrphanedToolResults(req.Messages)
for _, msg := range req.Messages {
switch msg.Role {
case RoleUser:
@ -179,10 +198,25 @@ func (c *codexClient) buildRequest(req Request) (*codexRequest, error) {
}
body.Input = append(body.Input, codexInputMessage{Role: "user", Content: content})
case RoleAssistant:
// Emit one output_message per text block and one function_call per tool call,
// preserving the order so model sees the same interleaving we captured.
// Emit one output_message per text block, one function_call per
// tool call, and one reasoning item per ReasoningBlock,
// preserving the order so the model sees the same interleaving
// we captured. The reasoning replay is what keeps OpenAI
// Codex from rejecting follow-up tool calls with
// "thinking is enabled but reasoning_content is missing".
for _, c := range msg.Content {
switch v := c.(type) {
case ReasoningBlock:
item := codexReasoningItem{
Type: "reasoning",
ID: v.ID,
EncryptedContent: v.Encrypted,
Summary: []codexReasoningSummary{},
}
if v.Summary != "" {
item.Summary = []codexReasoningSummary{{Type: "summary_text", Text: v.Summary}}
}
body.Input = append(body.Input, item)
case TextBlock:
if v.Text == "" {
continue
@ -199,7 +233,7 @@ func (c *codexClient) buildRequest(req Request) (*codexRequest, error) {
})
case ToolCallBlock:
args := string(v.Arguments)
if args == "" {
if args == "" || !json.Valid([]byte(args)) {
args = "{}"
}
callID, _ := splitCallID(v.ID)
@ -297,11 +331,14 @@ func (c *codexClient) runStream(ctx context.Context, resp *http.Response, req Re
// item is either a "message" (text) or a "function_call". We track
// the in-flight item by its index.
type itemState struct {
kind string // "message" | "function_call"
kind string // "message" | "function_call" | "reasoning"
callID string
name string
argsBuf strings.Builder
textBuf strings.Builder
summary strings.Builder
rawID string
encrypted string
announced bool
}
var (
@ -323,12 +360,21 @@ func (c *codexClient) runStream(ctx context.Context, resp *http.Response, req Re
}
case "function_call":
args := it.argsBuf.String()
if args == "" {
if args == "" || !json.Valid([]byte(args)) {
args = "{}"
}
content = append(content, ToolCallBlock{
ID: it.callID, Name: it.name, Arguments: json.RawMessage(args),
})
case "reasoning":
if it.encrypted == "" && it.summary.Len() == 0 && it.rawID == "" {
continue
}
content = append(content, ReasoningBlock{
ID: it.rawID,
Summary: it.summary.String(),
Encrypted: it.encrypted,
})
}
}
return Message{Role: RoleAssistant, Content: content, Time: time.Now()}
@ -364,10 +410,11 @@ func (c *codexClient) runStream(ctx context.Context, resp *http.Response, req Re
var p struct {
OutputIndex int `json:"output_index"`
Item struct {
Type string `json:"type"` // "message" | "function_call"
ID string `json:"id"`
CallID string `json:"call_id"`
Name string `json:"name"`
Type string `json:"type"` // "message" | "function_call" | "reasoning"
ID string `json:"id"`
CallID string `json:"call_id"`
Name string `json:"name"`
EncryptedContent string `json:"encrypted_content"`
} `json:"item"`
}
_ = json.Unmarshal([]byte(ev.Data), &p)
@ -383,6 +430,10 @@ func (c *codexClient) runStream(ctx context.Context, resp *http.Response, req Re
it.announced = true
out <- EventToolStart{ID: it.callID, Name: it.name}
}
case "reasoning":
it.kind = "reasoning"
it.rawID = p.Item.ID
it.encrypted = p.Item.EncryptedContent
default:
continue
}
@ -398,6 +449,17 @@ func (c *codexClient) runStream(ctx context.Context, resp *http.Response, req Re
it.textBuf.WriteString(p.Delta)
out <- EventTextDelta{Delta: p.Delta}
}
case "response.reasoning_summary_text.delta":
var p struct {
OutputIndex int `json:"output_index"`
Delta string `json:"delta"`
}
_ = json.Unmarshal([]byte(ev.Data), &p)
if it, ok := items[p.OutputIndex]; ok && it.kind == "reasoning" {
it.summary.WriteString(p.Delta)
}
case "response.reasoning_summary_text.done":
// summary text already accumulated via deltas
case "response.function_call_arguments.delta":
var p struct {
OutputIndex int `json:"output_index"`
@ -411,10 +473,38 @@ func (c *codexClient) runStream(ctx context.Context, resp *http.Response, req Re
case "response.output_item.done":
var p struct {
OutputIndex int `json:"output_index"`
Item struct {
Type string `json:"type"`
ID string `json:"id"`
EncryptedContent string `json:"encrypted_content"`
Summary []struct {
Type string `json:"type"`
Text string `json:"text"`
} `json:"summary"`
} `json:"item"`
}
_ = json.Unmarshal([]byte(ev.Data), &p)
if it, ok := items[p.OutputIndex]; ok && it.kind == "function_call" {
out <- EventToolEnd{ID: it.callID}
if it, ok := items[p.OutputIndex]; ok {
switch it.kind {
case "function_call":
out <- EventToolEnd{ID: it.callID}
case "reasoning":
if p.Item.EncryptedContent != "" {
it.encrypted = p.Item.EncryptedContent
}
if it.rawID == "" && p.Item.ID != "" {
it.rawID = p.Item.ID
}
for _, s := range p.Item.Summary {
if s.Text == "" {
continue
}
if it.summary.Len() > 0 {
it.summary.WriteString("\n")
}
it.summary.WriteString(s.Text)
}
}
}
case "response.completed", "response.done":
var p struct {

View file

@ -60,6 +60,55 @@ type ToolResultBlock struct {
func (ToolResultBlock) isContent() {}
// ReasoningBlock carries the assistant's chain-of-thought metadata so
// providers that require it on follow-up requests (OpenAI Codex with
// thinking enabled) can replay the same payload they emitted earlier.
// Summary is the human-readable reasoning summary (may be empty); the
// encrypted blob is opaque to zot. ID is the provider-issued reasoning
// item id.
type ReasoningBlock struct {
ID string `json:"reasoning_id,omitempty"`
Summary string `json:"summary,omitempty"`
Encrypted string `json:"encrypted_content,omitempty"`
}
func (ReasoningBlock) isContent() {}
// RepairOrphanedToolResults removes tool_result content blocks (and
// entire messages that become empty) when the matching tool_use ID
// does not appear anywhere in the given messages. Resume tails,
// compaction repair, and provider request builders all need this so
// the upstream API never sees a tool_call_id with no corresponding
// assistant tool_call earlier in the same request.
func RepairOrphanedToolResults(msgs []Message) []Message {
useIDs := map[string]bool{}
for _, m := range msgs {
for _, c := range m.Content {
if tc, ok := c.(ToolCallBlock); ok {
useIDs[tc.ID] = true
}
}
}
out := make([]Message, 0, len(msgs))
for _, m := range msgs {
var filtered []Content
for _, c := range m.Content {
if tr, ok := c.(ToolResultBlock); ok {
if !useIDs[tr.CallID] {
continue
}
}
filtered = append(filtered, c)
}
if len(filtered) > 0 {
copy := m
copy.Content = filtered
out = append(out, copy)
}
}
return out
}
// Message is a single turn in the conversation.
type Message struct {
Role Role `json:"role"`

View file

@ -302,16 +302,27 @@ func (r *Renderer) DrawLog(chat, bottom []string, cursorBottomRow, cursorCol int
w.WriteString(line)
w.WriteString("\r\n")
}
w.WriteString(SeqSaveCursor)
writeBlock(&w, bottomFrame)
r.logInit = true
} else {
// Return to the saved top-of-bottom-band anchor instead of relying
// on relative cursor movement from the last exposed editor cursor.
// If the terminal naturally scrolled between frames, save/restore is
// less prone to drift that leaves duplicated transcript blocks until
// ctrl+l forces a clear repaint.
w.WriteString(SeqRestoreCursor)
// Walk back up to the top of the previous bottom block. The cursor
// was last positioned somewhere inside the bottom band by the
// previous Draw (final ShowCursor below); we don't trust the
// terminal's saved cursor across frames because terminal-driven
// scrolling would invalidate it. Instead, rebuild the relative
// position from r.cursorRow inside the previous bottomFrame.
prevBottomRows := len(r.logBottom)
prevCursorRow := r.cursorRow
if prevCursorRow < 0 || prevCursorRow >= prevBottomRows {
prevCursorRow = prevBottomRows - 1
if prevCursorRow < 0 {
prevCursorRow = 0
}
}
up := prevCursorRow
if prevBottomRows > 0 && up > 0 {
w.WriteString("\x1b[" + itoa(up) + "A")
}
w.WriteString("\r")
prefix := len(r.logChat) <= len(chatFrame)
@ -324,10 +335,9 @@ func (r *Renderer) DrawLog(chat, bottom []string, cursorBottomRow, cursorCol int
}
}
if prefix {
// Erase old bottom (and anything below the saved anchor), then
// append only genuinely new chat rows. They become real terminal
// scrollback, and inline image escapes are emitted once here — not
// on every keystroke.
// Erase old bottom band entirely, then append only genuinely new
// chat rows above the new bottom band. New chat rows become real
// terminal scrollback; inline image escapes are emitted once here.
w.WriteString(SeqEraseToEnd)
for _, line := range chatFrame[len(r.logChat):] {
w.WriteString("\x1b[0m")
@ -354,7 +364,6 @@ func (r *Renderer) DrawLog(chat, bottom []string, cursorBottomRow, cursorCol int
w.WriteString("\r\n")
}
}
w.WriteString(SeqSaveCursor)
writeBlock(&w, bottomFrame)
}

View file

@ -20,6 +20,38 @@ func expandTabs(s string) string {
return strings.ReplaceAll(s, "\t", " ")
}
// sanitizeUserBubbleLine prepares a single user-bubble row for safe
// rendering. Pasted content from another terminal can contain
// embedded ANSI escape sequences, control bytes, and tabs that
// either reset the bubble's background colour or move the cursor in
// ways that break the bubble's painted column.
func sanitizeUserBubbleLine(s string) string {
if s == "" {
return s
}
s = expandTabs(s)
var b strings.Builder
b.Grow(len(s))
for i := 0; i < len(s); {
c := s[i]
if c == 0x1b { // ESC: drop CSI/OSC/DCS and simple escapes.
i = skipEscapeSequence(s, i)
continue
}
if c == '\r' || c == '\b' || c == 0x07 {
i++
continue
}
if c < 0x20 || c == 0x7f {
i++
continue
}
b.WriteByte(c)
i++
}
return b.String()
}
// pathFromToolArgs returns the "path" argument from a tool_call's
// JSON arguments, or "" if the args aren't a JSON object or don't
// include one. Used to pick a syntax language for rendering the
@ -540,6 +572,7 @@ func (v *View) renderMessage(m provider.Message, width int, turnOpen bool) []str
switch b := c.(type) {
case provider.TextBlock:
for _, l := range strings.Split(b.Text, "\n") {
l = sanitizeUserBubbleLine(l)
for _, w := range wrapLine(l, innerWidth, "") {
bubble = append(bubble, row(w))
}