diff --git a/internal/agent/modes/interactive.go b/internal/agent/modes/interactive.go index 934d656..64f5e67 100644 --- a/internal/agent/modes/interactive.go +++ b/internal/agent/modes/interactive.go @@ -154,21 +154,37 @@ type Interactive struct { ed *tui.Editor rend *tui.Renderer - mu sync.Mutex - agent *core.Agent - streaming strings.Builder - streamOn bool - toolCalls map[string]*tui.ToolCallView - toolOrder []string - statusErr string - statusOK string - helpBlock []string // rendered above the chat when /help was typed - cumUsage provider.Usage - lastCtxInput int // input_tokens of the most recent turn — approximates current context size - busy bool - dirty chan struct{} - cancelTurn context.CancelFunc - scrollOffset int // rows from the bottom; 0 = pinned to latest + mu sync.Mutex + agent *core.Agent + streaming strings.Builder // what's currently painted on screen + streamOn bool + + // streamPending is the runes buffered after each EvTextDelta that + // haven't yet been promoted into `streaming` for rendering. It + // exists because some provider paths (notably Anthropic via the + // oauth/subscription channel) coalesce the model's output into a + // few fat chunks instead of drip-streaming. Painting those fat + // chunks verbatim looks like the summary "just appears". The + // paintPace goroutine drains a handful of runes per tick from + // this buffer into `streaming`, giving every path the same + // typewriter feel regardless of upstream chunk size. + streamPending []rune + // streamFlushPending is set when EvAssistantMessage fires while + // streamPending still has unrendered runes. The ticker flushes + // them, then closes out the stream (clearing flags, resetting + // buffers) so the final paint matches the on-disk message. + streamFlushPending bool + toolCalls map[string]*tui.ToolCallView + toolOrder []string + statusErr string + statusOK string + helpBlock []string // rendered above the chat when /help was typed + cumUsage provider.Usage + lastCtxInput int // input_tokens of the most recent turn — approximates current context size + busy bool + dirty chan struct{} + cancelTurn context.CancelFunc + scrollOffset int // rows from the bottom; 0 = pinned to latest // Messages typed while a turn is in flight. Each is delivered as // its own follow-up turn once the current one finishes. Rendered @@ -292,6 +308,12 @@ func (i *Interactive) Run(ctx context.Context) error { _, _ = term.Write([]byte(tui.SeqAltScreenOn)) defer term.Write([]byte(tui.SeqAltScreenOff + tui.SeqBracketedPasteOff + tui.SeqShowCursor)) + // Streaming pacer: drains buffered text deltas at a steady rate + // so typewriter feel is identical across providers regardless of + // upstream chunk size. Starts here so it lives for the whole + // session and exits with ctx. + go i.runStreamPacer(ctx) + cols, rows := term.Size() i.rend.Resize(cols, rows) term.OnResize(func() { @@ -526,6 +548,19 @@ func (i *Interactive) redraw() { } else { i.view.Messages = nil } + // Pacer flush: while the streaming pacer is still draining the + // buffer (i.e. EvAssistantMessage already fired but more runes + // are queued), the final assistant message is already in + // i.agent.Messages() in full. Painting it in the transcript + // AND the streaming block at the same time shows the user the + // complete text immediately — which defeats the whole pacer. + // Hide the last message until the pacer catches up; once the + // flush-pending latch clears, the message is revealed (the + // streaming block disappears the same frame because streamOn + // flips off, so the transition is seamless). + if i.streamFlushPending && len(i.view.Messages) > 0 { + i.view.Messages = i.view.Messages[:len(i.view.Messages)-1] + } i.view.Streaming = i.streaming.String() i.view.StreamingActive = i.streamOn // Guard against the narrow race where EvAssistantMessage has @@ -2013,8 +2048,7 @@ func (i *Interactive) runCompact(parent context.Context, auto bool) { summary, err := i.agent.Compact(ctx, 4, sink) i.mu.Lock() i.busy = false - i.streamOn = false - i.streaming.Reset() + i.resetStreamingStateLocked() i.cancelTurn = nil i.autoCompacting = false switch { @@ -2075,7 +2109,13 @@ func (i *Interactive) startTurn(parent context.Context, prompt string) { err := i.agent.Prompt(ctx, prompt, nil, sink) i.mu.Lock() i.busy = false - i.streamOn = false + // Don't touch streamPending / streamFlushPending here — the + // pacer may still be draining the final deltas and needs to + // paint them even though Prompt has returned. It will reset + // streamOn on its own once the buffer empties. + if len(i.streamPending) == 0 { + i.streamOn = false + } i.cancelTurn = nil if err != nil && ctx.Err() == nil { i.statusErr = err.Error() @@ -2160,34 +2200,40 @@ func (i *Interactive) handleEvent(ev core.AgentEvent) { // and the final summary text pops in all at once instead // of typewriter-streaming delta by delta. i.streaming.Reset() + i.streamPending = i.streamPending[:0] + i.streamFlushPending = false i.streamOn = true + // Clear the live tool-call overlay. Any tools from the + // previous round are now fully folded into the transcript + // (assistant tool_use block + tool role message with the + // result), so keeping them in the overlay would duplicate + // them in the view — once inside the finalised transcript + // and once below the streaming block, with the streaming + // summary sandwiched in between. The next EvToolUseStart + // will populate fresh entries for this turn's tools. + i.toolCalls = map[string]*tui.ToolCallView{} + i.toolOrder = nil case core.EvTextDelta: - i.streaming.WriteString(e.Delta) + // Buffer into streamPending; the paintPace ticker drains + // it into i.streaming a few runes at a time for a smooth + // typewriter effect independent of upstream chunk size. + i.streamPending = append(i.streamPending, []rune(e.Delta)...) i.streamOn = true case core.EvAssistantMessage: - i.streaming.Reset() - i.streamOn = false - if i.cfg.OnAssistant != nil { - i.cfg.OnAssistant(e.Message) - } - // Mirror the assistant's final visible text to the telegram - // bridge when active. Only TextBlock content is forwarded; - // tool_use blocks are internal. Send on a goroutine so the - // network call doesn't hold the event loop lock. - if i.telegramBridge != nil && i.telegramBridge.Active() { - var sb strings.Builder - for _, c := range e.Message.Content { - if tb, ok := c.(provider.TextBlock); ok { - if sb.Len() > 0 { - sb.WriteString("\n") - } - sb.WriteString(tb.Text) - } - } - if text := sb.String(); strings.TrimSpace(text) != "" { - go i.telegramBridge.OnAssistantText(text) - } + // OnAssistant + telegram mirroring always fire on message + // arrival — they read the FINAL message content, which is + // complete regardless of what's still in the pacer. + i.assistantMessageSideEffects(e.Message) + // If the pacer still has characters to drain, keep streamOn + // true and mark flush pending; the paintPace ticker will + // drain the remainder and reset streaming state when done. + // Otherwise (rare: full-replay sessions, abort paths) clear + // synchronously so a later render doesn't show stale text. + if len(i.streamPending) > 0 { + i.streamFlushPending = true + return } + i.resetStreamingStateLocked() case core.EvToolUseStart: // Live streaming: pre-create the view so the user sees the // tool call being composed in real time. Any subsequent @@ -2257,10 +2303,11 @@ func (i *Interactive) handleEvent(ev core.AgentEvent) { } case core.EvTurnEnd: if e.Stop == provider.StopAborted { - // Aborted turn: discard the partial streaming text (it is not - // persisted in the transcript) and clear any transient error. - i.streaming.Reset() - i.streamOn = false + // Aborted turn: discard the partial streaming text (it + // is not persisted in the transcript) and clear any + // transient error. Also drop anything still buffered in + // the pacer so nothing keeps drawing after the cancel. + i.resetStreamingStateLocked() i.statusErr = "" i.statusOK = "cancelled" return @@ -2955,3 +3002,103 @@ func assistantText(m provider.Message) string { } return sb.String() } + +// resetStreamingStateLocked clears every piece of streaming state +// in one shot. Used by abort paths (turn cancel, compact finish, +// queue drain) so the pacer doesn't keep draining stale runes from +// a prior turn. Must be called with i.mu held. +func (i *Interactive) resetStreamingStateLocked() { + i.streaming.Reset() + i.streamPending = i.streamPending[:0] + i.streamFlushPending = false + i.streamOn = false +} + +// assistantMessageSideEffects runs the non-visual hooks attached to +// EvAssistantMessage: the host-provided OnAssistant callback and the +// telegram-bridge mirror. Called with i.mu held. +// +// Factored out of handleEvent because the streaming pacer may defer +// visual reset until after the last buffered rune has painted, but +// the callbacks themselves must fire on message arrival so +// downstream observers (session persistence, telegram, cost panels) +// don't wait on a UI animation to catch up. +func (i *Interactive) assistantMessageSideEffects(m provider.Message) { + if i.cfg.OnAssistant != nil { + i.cfg.OnAssistant(m) + } + if i.telegramBridge != nil && i.telegramBridge.Active() { + var sb strings.Builder + for _, c := range m.Content { + if tb, ok := c.(provider.TextBlock); ok { + if sb.Len() > 0 { + sb.WriteString("\n") + } + sb.WriteString(tb.Text) + } + } + if text := sb.String(); strings.TrimSpace(text) != "" { + go i.telegramBridge.OnAssistantText(text) + } + } +} + +// paintPaceRate is how many runes the streaming pacer releases per +// tick. With a 16ms tick, 6 runes/tick is ~375 runes/s — fast enough +// that a 500-rune summary finishes in ~1.3s, slow enough to look +// like a human typing. Empirically matches the feel of provider +// paths that already drip-stream natively. +const paintPaceRate = 6 + +// paintPaceInterval is the tick interval for the streaming pacer. +// 16ms lines up with the redraw throttle so we never paint faster +// than the terminal can keep up. +const paintPaceInterval = 16 * time.Millisecond + +// runStreamPacer drains buffered deltas from streamPending into +// streaming a small batch per tick, invalidating after each move. +// It stops when the context cancels (tui shutdown). +// +// Why a pacer: providers differ wildly in how they chunk their +// text_delta events. The API-key path on Anthropic emits ~30 drips +// for a 400-token summary; the OAuth path can coalesce the same +// summary into 3 fat chunks, visually indistinguishable from "the +// whole reply just appeared". The pacer normalizes that so every +// path looks the same on screen. +func (i *Interactive) runStreamPacer(ctx context.Context) { + t := time.NewTicker(paintPaceInterval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + i.mu.Lock() + if len(i.streamPending) == 0 { + // EvAssistantMessage already fired but the pacer + // was still draining a tick ago. Everything is now + // painted; clear the streaming flags so the next + // redraw shows the finalised transcript message + // and hides the streaming overlay. + if i.streamFlushPending { + i.streamFlushPending = false + i.streaming.Reset() + i.streamOn = false + i.mu.Unlock() + i.invalidate() + continue + } + i.mu.Unlock() + continue + } + n := paintPaceRate + if n > len(i.streamPending) { + n = len(i.streamPending) + } + i.streaming.WriteString(string(i.streamPending[:n])) + i.streamPending = i.streamPending[n:] + i.mu.Unlock() + i.invalidate() + } + } +}