From bc9e57e884444e14e77e000aa6b1aa39b1ff33b8 Mon Sep 17 00:00:00 2001 From: patriceckhart Date: Mon, 20 Apr 2026 13:26:26 +0200 Subject: [PATCH] feat(tui): smooth typewriter streaming across providers and turns Assistant replies now visibly type out character-by-character at a steady pace regardless of how the underlying provider chunks its stream. Tool-using turns render their final summary in the right place with no "written between two tool calls" duplication and no reflow jump when the typing finishes. Three bugs, one behaviour fix. 1) EvAssistantStart was unhandled. The core emits EvAssistantStart at the top of every oneTurn including every follow-up after a tool round-trip. The tui was ignoring the event, so after the first EvAssistantMessage closed out the tool_use message, streamOn stayed false and every subsequent EvTextDelta filled the streaming buffer invisibly. The final summary then appeared all at once when EvAssistantMessage fired at the end of the follow-up turn. handleEvent now has a case core.EvAssistantStart that resets the streaming buffer and flips streamOn back on, so the follow-up summary streams the same way the first reply does. EvTextDelta also sets streamOn=true as a belt-and-suspenders against stray delta sequences with no preceding start. 2) Oauth/subscription streaming chunks were too large. Anthropics api-key channel drip-streams tokens, so a 400-char summary arrives as ~25 small text_delta events and looks like a typewriter without any extra work. The oauth channel (anthropic-beta: oauth-2025-04-20) coalesces the same summary into 3-4 fat chunks of 100+ chars each, so the user sees a blank pane, then the whole paragraph lands in one frame. Introduced a streaming pacer goroutine that uncouples "what the provider sent us" from "what we paint on screen". Each EvTextDelta now appends into i.streamPending. A ticker at 16ms drains paintPaceRate=6 runes per tick from streamPending into the rendered i.streaming buffer, invalidating after every move. Result: ~375 runes/sec typewriter pace that looks identical regardless of upstream chunk shape. For long replies the pacer can run slightly behind the model but drains to zero within a second of the last delta. When EvAssistantMessage arrives while the pacer still has buffered runes, the handler sets streamFlushPending=true and returns without clearing. The pacer finishes draining, then on the next empty tick clears streamFlushPending + streaming + streamOn in one shot. Short turns that finish before the pacer does anything stay on the synchronous reset path so we don't wait on a ticker for zero work. Abort paths (turn cancel, compact done, EvTurnEnd with StopAborted) call a new resetStreamingStateLocked helper that atomically clears streaming, streamPending, streamFlushPending and streamOn so a fresh turn never inherits leftover runes. 3) The finalised assistant message double-painted during the drain window. When EvAssistantMessage fires, the agent appends the full assistant message to a.messages. The tui reads the message list on every redraw, so the complete text appeared in the transcript immediately while the pacer was still spelling it out below. Two copies on screen, one complete, one partial - the complete one was what the user actually read. redraw() now hides i.view.Messages[-1] while streamFlushPending is true, so during the drain only the streaming overlay is visible. When the pacer clears the flag the overlay disappears and the finalised message returns in the same frame with identical vertical footprint (both use the same "zot" header plus the same markdown-rendered body), so the swap reads as the caret landing on the last rune. 4) Live tool-call overlay carried over across turns. While i.busy=true the view always appended every entry from i.toolOrder/i.toolCalls under the streaming block. After a tool round-trip those entries were already folded into the transcript as an assistant(tool_use) message plus a tool role message with the result, so the next turn's summary rendered sandwiched between the finalised tool_use block above and the live tool-call block below showing the same tool. The user saw the summary "written between two reads". The EvAssistantStart handler now resets i.toolCalls and i.toolOrder. Any tools from the previous round are entirely represented in the transcript at that point; the next EvToolUseStart repopulates the overlay for the new round. No more duplicate rendering. Misc: extracted assistantMessageSideEffects so OnAssistant + telegram mirroring fire on message arrival regardless of which code path (sync-reset vs pacer-drain) handles the visual transition. Also extracted the narrow duplicate-detection guard in redraw so follow-up turns' typewriter streaming survives the last-message-is-assistant invariant that holds across a tool round-trip. Tested manually with both short ("summarize this file") and long ("read this package to understand it") flows on the oauth channel; both now stream visibly. --- internal/agent/modes/interactive.go | 237 ++++++++++++++++++++++------ 1 file changed, 192 insertions(+), 45 deletions(-) diff --git a/internal/agent/modes/interactive.go b/internal/agent/modes/interactive.go index 934d656..64f5e67 100644 --- a/internal/agent/modes/interactive.go +++ b/internal/agent/modes/interactive.go @@ -154,21 +154,37 @@ type Interactive struct { ed *tui.Editor rend *tui.Renderer - mu sync.Mutex - agent *core.Agent - streaming strings.Builder - streamOn bool - toolCalls map[string]*tui.ToolCallView - toolOrder []string - statusErr string - statusOK string - helpBlock []string // rendered above the chat when /help was typed - cumUsage provider.Usage - lastCtxInput int // input_tokens of the most recent turn — approximates current context size - busy bool - dirty chan struct{} - cancelTurn context.CancelFunc - scrollOffset int // rows from the bottom; 0 = pinned to latest + mu sync.Mutex + agent *core.Agent + streaming strings.Builder // what's currently painted on screen + streamOn bool + + // streamPending is the runes buffered after each EvTextDelta that + // haven't yet been promoted into `streaming` for rendering. It + // exists because some provider paths (notably Anthropic via the + // oauth/subscription channel) coalesce the model's output into a + // few fat chunks instead of drip-streaming. Painting those fat + // chunks verbatim looks like the summary "just appears". The + // paintPace goroutine drains a handful of runes per tick from + // this buffer into `streaming`, giving every path the same + // typewriter feel regardless of upstream chunk size. + streamPending []rune + // streamFlushPending is set when EvAssistantMessage fires while + // streamPending still has unrendered runes. The ticker flushes + // them, then closes out the stream (clearing flags, resetting + // buffers) so the final paint matches the on-disk message. + streamFlushPending bool + toolCalls map[string]*tui.ToolCallView + toolOrder []string + statusErr string + statusOK string + helpBlock []string // rendered above the chat when /help was typed + cumUsage provider.Usage + lastCtxInput int // input_tokens of the most recent turn — approximates current context size + busy bool + dirty chan struct{} + cancelTurn context.CancelFunc + scrollOffset int // rows from the bottom; 0 = pinned to latest // Messages typed while a turn is in flight. Each is delivered as // its own follow-up turn once the current one finishes. Rendered @@ -292,6 +308,12 @@ func (i *Interactive) Run(ctx context.Context) error { _, _ = term.Write([]byte(tui.SeqAltScreenOn)) defer term.Write([]byte(tui.SeqAltScreenOff + tui.SeqBracketedPasteOff + tui.SeqShowCursor)) + // Streaming pacer: drains buffered text deltas at a steady rate + // so typewriter feel is identical across providers regardless of + // upstream chunk size. Starts here so it lives for the whole + // session and exits with ctx. + go i.runStreamPacer(ctx) + cols, rows := term.Size() i.rend.Resize(cols, rows) term.OnResize(func() { @@ -526,6 +548,19 @@ func (i *Interactive) redraw() { } else { i.view.Messages = nil } + // Pacer flush: while the streaming pacer is still draining the + // buffer (i.e. EvAssistantMessage already fired but more runes + // are queued), the final assistant message is already in + // i.agent.Messages() in full. Painting it in the transcript + // AND the streaming block at the same time shows the user the + // complete text immediately — which defeats the whole pacer. + // Hide the last message until the pacer catches up; once the + // flush-pending latch clears, the message is revealed (the + // streaming block disappears the same frame because streamOn + // flips off, so the transition is seamless). + if i.streamFlushPending && len(i.view.Messages) > 0 { + i.view.Messages = i.view.Messages[:len(i.view.Messages)-1] + } i.view.Streaming = i.streaming.String() i.view.StreamingActive = i.streamOn // Guard against the narrow race where EvAssistantMessage has @@ -2013,8 +2048,7 @@ func (i *Interactive) runCompact(parent context.Context, auto bool) { summary, err := i.agent.Compact(ctx, 4, sink) i.mu.Lock() i.busy = false - i.streamOn = false - i.streaming.Reset() + i.resetStreamingStateLocked() i.cancelTurn = nil i.autoCompacting = false switch { @@ -2075,7 +2109,13 @@ func (i *Interactive) startTurn(parent context.Context, prompt string) { err := i.agent.Prompt(ctx, prompt, nil, sink) i.mu.Lock() i.busy = false - i.streamOn = false + // Don't touch streamPending / streamFlushPending here — the + // pacer may still be draining the final deltas and needs to + // paint them even though Prompt has returned. It will reset + // streamOn on its own once the buffer empties. + if len(i.streamPending) == 0 { + i.streamOn = false + } i.cancelTurn = nil if err != nil && ctx.Err() == nil { i.statusErr = err.Error() @@ -2160,34 +2200,40 @@ func (i *Interactive) handleEvent(ev core.AgentEvent) { // and the final summary text pops in all at once instead // of typewriter-streaming delta by delta. i.streaming.Reset() + i.streamPending = i.streamPending[:0] + i.streamFlushPending = false i.streamOn = true + // Clear the live tool-call overlay. Any tools from the + // previous round are now fully folded into the transcript + // (assistant tool_use block + tool role message with the + // result), so keeping them in the overlay would duplicate + // them in the view — once inside the finalised transcript + // and once below the streaming block, with the streaming + // summary sandwiched in between. The next EvToolUseStart + // will populate fresh entries for this turn's tools. + i.toolCalls = map[string]*tui.ToolCallView{} + i.toolOrder = nil case core.EvTextDelta: - i.streaming.WriteString(e.Delta) + // Buffer into streamPending; the paintPace ticker drains + // it into i.streaming a few runes at a time for a smooth + // typewriter effect independent of upstream chunk size. + i.streamPending = append(i.streamPending, []rune(e.Delta)...) i.streamOn = true case core.EvAssistantMessage: - i.streaming.Reset() - i.streamOn = false - if i.cfg.OnAssistant != nil { - i.cfg.OnAssistant(e.Message) - } - // Mirror the assistant's final visible text to the telegram - // bridge when active. Only TextBlock content is forwarded; - // tool_use blocks are internal. Send on a goroutine so the - // network call doesn't hold the event loop lock. - if i.telegramBridge != nil && i.telegramBridge.Active() { - var sb strings.Builder - for _, c := range e.Message.Content { - if tb, ok := c.(provider.TextBlock); ok { - if sb.Len() > 0 { - sb.WriteString("\n") - } - sb.WriteString(tb.Text) - } - } - if text := sb.String(); strings.TrimSpace(text) != "" { - go i.telegramBridge.OnAssistantText(text) - } + // OnAssistant + telegram mirroring always fire on message + // arrival — they read the FINAL message content, which is + // complete regardless of what's still in the pacer. + i.assistantMessageSideEffects(e.Message) + // If the pacer still has characters to drain, keep streamOn + // true and mark flush pending; the paintPace ticker will + // drain the remainder and reset streaming state when done. + // Otherwise (rare: full-replay sessions, abort paths) clear + // synchronously so a later render doesn't show stale text. + if len(i.streamPending) > 0 { + i.streamFlushPending = true + return } + i.resetStreamingStateLocked() case core.EvToolUseStart: // Live streaming: pre-create the view so the user sees the // tool call being composed in real time. Any subsequent @@ -2257,10 +2303,11 @@ func (i *Interactive) handleEvent(ev core.AgentEvent) { } case core.EvTurnEnd: if e.Stop == provider.StopAborted { - // Aborted turn: discard the partial streaming text (it is not - // persisted in the transcript) and clear any transient error. - i.streaming.Reset() - i.streamOn = false + // Aborted turn: discard the partial streaming text (it + // is not persisted in the transcript) and clear any + // transient error. Also drop anything still buffered in + // the pacer so nothing keeps drawing after the cancel. + i.resetStreamingStateLocked() i.statusErr = "" i.statusOK = "cancelled" return @@ -2955,3 +3002,103 @@ func assistantText(m provider.Message) string { } return sb.String() } + +// resetStreamingStateLocked clears every piece of streaming state +// in one shot. Used by abort paths (turn cancel, compact finish, +// queue drain) so the pacer doesn't keep draining stale runes from +// a prior turn. Must be called with i.mu held. +func (i *Interactive) resetStreamingStateLocked() { + i.streaming.Reset() + i.streamPending = i.streamPending[:0] + i.streamFlushPending = false + i.streamOn = false +} + +// assistantMessageSideEffects runs the non-visual hooks attached to +// EvAssistantMessage: the host-provided OnAssistant callback and the +// telegram-bridge mirror. Called with i.mu held. +// +// Factored out of handleEvent because the streaming pacer may defer +// visual reset until after the last buffered rune has painted, but +// the callbacks themselves must fire on message arrival so +// downstream observers (session persistence, telegram, cost panels) +// don't wait on a UI animation to catch up. +func (i *Interactive) assistantMessageSideEffects(m provider.Message) { + if i.cfg.OnAssistant != nil { + i.cfg.OnAssistant(m) + } + if i.telegramBridge != nil && i.telegramBridge.Active() { + var sb strings.Builder + for _, c := range m.Content { + if tb, ok := c.(provider.TextBlock); ok { + if sb.Len() > 0 { + sb.WriteString("\n") + } + sb.WriteString(tb.Text) + } + } + if text := sb.String(); strings.TrimSpace(text) != "" { + go i.telegramBridge.OnAssistantText(text) + } + } +} + +// paintPaceRate is how many runes the streaming pacer releases per +// tick. With a 16ms tick, 6 runes/tick is ~375 runes/s — fast enough +// that a 500-rune summary finishes in ~1.3s, slow enough to look +// like a human typing. Empirically matches the feel of provider +// paths that already drip-stream natively. +const paintPaceRate = 6 + +// paintPaceInterval is the tick interval for the streaming pacer. +// 16ms lines up with the redraw throttle so we never paint faster +// than the terminal can keep up. +const paintPaceInterval = 16 * time.Millisecond + +// runStreamPacer drains buffered deltas from streamPending into +// streaming a small batch per tick, invalidating after each move. +// It stops when the context cancels (tui shutdown). +// +// Why a pacer: providers differ wildly in how they chunk their +// text_delta events. The API-key path on Anthropic emits ~30 drips +// for a 400-token summary; the OAuth path can coalesce the same +// summary into 3 fat chunks, visually indistinguishable from "the +// whole reply just appeared". The pacer normalizes that so every +// path looks the same on screen. +func (i *Interactive) runStreamPacer(ctx context.Context) { + t := time.NewTicker(paintPaceInterval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + i.mu.Lock() + if len(i.streamPending) == 0 { + // EvAssistantMessage already fired but the pacer + // was still draining a tick ago. Everything is now + // painted; clear the streaming flags so the next + // redraw shows the finalised transcript message + // and hides the streaming overlay. + if i.streamFlushPending { + i.streamFlushPending = false + i.streaming.Reset() + i.streamOn = false + i.mu.Unlock() + i.invalidate() + continue + } + i.mu.Unlock() + continue + } + n := paintPaceRate + if n > len(i.streamPending) { + n = len(i.streamPending) + } + i.streaming.WriteString(string(i.streamPending[:n])) + i.streamPending = i.streamPending[n:] + i.mu.Unlock() + i.invalidate() + } + } +}