mirror of
https://github.com/patriceckhart/zot.git
synced 2026-06-26 21:36:31 +02:00
feat(tui): smooth typewriter streaming across providers and turns
Assistant replies now visibly type out character-by-character at
a steady pace regardless of how the underlying provider chunks
its stream. Tool-using turns render their final summary in the
right place with no "written between two tool calls" duplication
and no reflow jump when the typing finishes.
Three bugs, one behaviour fix.
1) EvAssistantStart was unhandled.
The core emits EvAssistantStart at the top of every oneTurn
including every follow-up after a tool round-trip. The tui
was ignoring the event, so after the first EvAssistantMessage
closed out the tool_use message, streamOn stayed false and
every subsequent EvTextDelta filled the streaming buffer
invisibly. The final summary then appeared all at once when
EvAssistantMessage fired at the end of the follow-up turn.
handleEvent now has a case core.EvAssistantStart that resets
the streaming buffer and flips streamOn back on, so the
follow-up summary streams the same way the first reply does.
EvTextDelta also sets streamOn=true as a belt-and-suspenders
against stray delta sequences with no preceding start.
2) Oauth/subscription streaming chunks were too large.
Anthropics api-key channel drip-streams tokens, so a 400-char
summary arrives as ~25 small text_delta events and looks like
a typewriter without any extra work. The oauth channel
(anthropic-beta: oauth-2025-04-20) coalesces the same summary
into 3-4 fat chunks of 100+ chars each, so the user sees a
blank pane, then the whole paragraph lands in one frame.
Introduced a streaming pacer goroutine that uncouples "what
the provider sent us" from "what we paint on screen". Each
EvTextDelta now appends into i.streamPending. A ticker at
16ms drains paintPaceRate=6 runes per tick from streamPending
into the rendered i.streaming buffer, invalidating after
every move. Result: ~375 runes/sec typewriter pace that looks
identical regardless of upstream chunk shape. For long
replies the pacer can run slightly behind the model but
drains to zero within a second of the last delta.
When EvAssistantMessage arrives while the pacer still has
buffered runes, the handler sets streamFlushPending=true and
returns without clearing. The pacer finishes draining, then
on the next empty tick clears streamFlushPending + streaming
+ streamOn in one shot. Short turns that finish before the
pacer does anything stay on the synchronous reset path so we
don't wait on a ticker for zero work.
Abort paths (turn cancel, compact done, EvTurnEnd with
StopAborted) call a new resetStreamingStateLocked helper that
atomically clears streaming, streamPending, streamFlushPending
and streamOn so a fresh turn never inherits leftover runes.
3) The finalised assistant message double-painted during the
drain window.
When EvAssistantMessage fires, the agent appends the full
assistant message to a.messages. The tui reads the message
list on every redraw, so the complete text appeared in the
transcript immediately while the pacer was still spelling it
out below. Two copies on screen, one complete, one partial -
the complete one was what the user actually read.
redraw() now hides i.view.Messages[-1] while
streamFlushPending is true, so during the drain only the
streaming overlay is visible. When the pacer clears the flag
the overlay disappears and the finalised message returns in
the same frame with identical vertical footprint (both use
the same "zot" header plus the same markdown-rendered body),
so the swap reads as the caret landing on the last rune.
4) Live tool-call overlay carried over across turns.
While i.busy=true the view always appended every entry from
i.toolOrder/i.toolCalls under the streaming block. After a
tool round-trip those entries were already folded into the
transcript as an assistant(tool_use) message plus a tool role
message with the result, so the next turn's summary rendered
sandwiched between the finalised tool_use block above and the
live tool-call block below showing the same tool. The user
saw the summary "written between two reads".
The EvAssistantStart handler now resets i.toolCalls and
i.toolOrder. Any tools from the previous round are entirely
represented in the transcript at that point; the next
EvToolUseStart repopulates the overlay for the new round.
No more duplicate rendering.
Misc: extracted assistantMessageSideEffects so OnAssistant +
telegram mirroring fire on message arrival regardless of which
code path (sync-reset vs pacer-drain) handles the visual
transition. Also extracted the narrow duplicate-detection guard
in redraw so follow-up turns' typewriter streaming survives the
last-message-is-assistant invariant that holds across a tool
round-trip.
Tested manually with both short ("summarize this file") and
long ("read this package to understand it") flows on the oauth
channel; both now stream visibly.
This commit is contained in:
parent
676b5d7510
commit
bc9e57e884
1 changed files with 192 additions and 45 deletions
|
|
@ -154,21 +154,37 @@ type Interactive struct {
|
|||
ed *tui.Editor
|
||||
rend *tui.Renderer
|
||||
|
||||
mu sync.Mutex
|
||||
agent *core.Agent
|
||||
streaming strings.Builder
|
||||
streamOn bool
|
||||
toolCalls map[string]*tui.ToolCallView
|
||||
toolOrder []string
|
||||
statusErr string
|
||||
statusOK string
|
||||
helpBlock []string // rendered above the chat when /help was typed
|
||||
cumUsage provider.Usage
|
||||
lastCtxInput int // input_tokens of the most recent turn — approximates current context size
|
||||
busy bool
|
||||
dirty chan struct{}
|
||||
cancelTurn context.CancelFunc
|
||||
scrollOffset int // rows from the bottom; 0 = pinned to latest
|
||||
mu sync.Mutex
|
||||
agent *core.Agent
|
||||
streaming strings.Builder // what's currently painted on screen
|
||||
streamOn bool
|
||||
|
||||
// streamPending is the runes buffered after each EvTextDelta that
|
||||
// haven't yet been promoted into `streaming` for rendering. It
|
||||
// exists because some provider paths (notably Anthropic via the
|
||||
// oauth/subscription channel) coalesce the model's output into a
|
||||
// few fat chunks instead of drip-streaming. Painting those fat
|
||||
// chunks verbatim looks like the summary "just appears". The
|
||||
// paintPace goroutine drains a handful of runes per tick from
|
||||
// this buffer into `streaming`, giving every path the same
|
||||
// typewriter feel regardless of upstream chunk size.
|
||||
streamPending []rune
|
||||
// streamFlushPending is set when EvAssistantMessage fires while
|
||||
// streamPending still has unrendered runes. The ticker flushes
|
||||
// them, then closes out the stream (clearing flags, resetting
|
||||
// buffers) so the final paint matches the on-disk message.
|
||||
streamFlushPending bool
|
||||
toolCalls map[string]*tui.ToolCallView
|
||||
toolOrder []string
|
||||
statusErr string
|
||||
statusOK string
|
||||
helpBlock []string // rendered above the chat when /help was typed
|
||||
cumUsage provider.Usage
|
||||
lastCtxInput int // input_tokens of the most recent turn — approximates current context size
|
||||
busy bool
|
||||
dirty chan struct{}
|
||||
cancelTurn context.CancelFunc
|
||||
scrollOffset int // rows from the bottom; 0 = pinned to latest
|
||||
|
||||
// Messages typed while a turn is in flight. Each is delivered as
|
||||
// its own follow-up turn once the current one finishes. Rendered
|
||||
|
|
@ -292,6 +308,12 @@ func (i *Interactive) Run(ctx context.Context) error {
|
|||
_, _ = term.Write([]byte(tui.SeqAltScreenOn))
|
||||
defer term.Write([]byte(tui.SeqAltScreenOff + tui.SeqBracketedPasteOff + tui.SeqShowCursor))
|
||||
|
||||
// Streaming pacer: drains buffered text deltas at a steady rate
|
||||
// so typewriter feel is identical across providers regardless of
|
||||
// upstream chunk size. Starts here so it lives for the whole
|
||||
// session and exits with ctx.
|
||||
go i.runStreamPacer(ctx)
|
||||
|
||||
cols, rows := term.Size()
|
||||
i.rend.Resize(cols, rows)
|
||||
term.OnResize(func() {
|
||||
|
|
@ -526,6 +548,19 @@ func (i *Interactive) redraw() {
|
|||
} else {
|
||||
i.view.Messages = nil
|
||||
}
|
||||
// Pacer flush: while the streaming pacer is still draining the
|
||||
// buffer (i.e. EvAssistantMessage already fired but more runes
|
||||
// are queued), the final assistant message is already in
|
||||
// i.agent.Messages() in full. Painting it in the transcript
|
||||
// AND the streaming block at the same time shows the user the
|
||||
// complete text immediately — which defeats the whole pacer.
|
||||
// Hide the last message until the pacer catches up; once the
|
||||
// flush-pending latch clears, the message is revealed (the
|
||||
// streaming block disappears the same frame because streamOn
|
||||
// flips off, so the transition is seamless).
|
||||
if i.streamFlushPending && len(i.view.Messages) > 0 {
|
||||
i.view.Messages = i.view.Messages[:len(i.view.Messages)-1]
|
||||
}
|
||||
i.view.Streaming = i.streaming.String()
|
||||
i.view.StreamingActive = i.streamOn
|
||||
// Guard against the narrow race where EvAssistantMessage has
|
||||
|
|
@ -2013,8 +2048,7 @@ func (i *Interactive) runCompact(parent context.Context, auto bool) {
|
|||
summary, err := i.agent.Compact(ctx, 4, sink)
|
||||
i.mu.Lock()
|
||||
i.busy = false
|
||||
i.streamOn = false
|
||||
i.streaming.Reset()
|
||||
i.resetStreamingStateLocked()
|
||||
i.cancelTurn = nil
|
||||
i.autoCompacting = false
|
||||
switch {
|
||||
|
|
@ -2075,7 +2109,13 @@ func (i *Interactive) startTurn(parent context.Context, prompt string) {
|
|||
err := i.agent.Prompt(ctx, prompt, nil, sink)
|
||||
i.mu.Lock()
|
||||
i.busy = false
|
||||
i.streamOn = false
|
||||
// Don't touch streamPending / streamFlushPending here — the
|
||||
// pacer may still be draining the final deltas and needs to
|
||||
// paint them even though Prompt has returned. It will reset
|
||||
// streamOn on its own once the buffer empties.
|
||||
if len(i.streamPending) == 0 {
|
||||
i.streamOn = false
|
||||
}
|
||||
i.cancelTurn = nil
|
||||
if err != nil && ctx.Err() == nil {
|
||||
i.statusErr = err.Error()
|
||||
|
|
@ -2160,34 +2200,40 @@ func (i *Interactive) handleEvent(ev core.AgentEvent) {
|
|||
// and the final summary text pops in all at once instead
|
||||
// of typewriter-streaming delta by delta.
|
||||
i.streaming.Reset()
|
||||
i.streamPending = i.streamPending[:0]
|
||||
i.streamFlushPending = false
|
||||
i.streamOn = true
|
||||
// Clear the live tool-call overlay. Any tools from the
|
||||
// previous round are now fully folded into the transcript
|
||||
// (assistant tool_use block + tool role message with the
|
||||
// result), so keeping them in the overlay would duplicate
|
||||
// them in the view — once inside the finalised transcript
|
||||
// and once below the streaming block, with the streaming
|
||||
// summary sandwiched in between. The next EvToolUseStart
|
||||
// will populate fresh entries for this turn's tools.
|
||||
i.toolCalls = map[string]*tui.ToolCallView{}
|
||||
i.toolOrder = nil
|
||||
case core.EvTextDelta:
|
||||
i.streaming.WriteString(e.Delta)
|
||||
// Buffer into streamPending; the paintPace ticker drains
|
||||
// it into i.streaming a few runes at a time for a smooth
|
||||
// typewriter effect independent of upstream chunk size.
|
||||
i.streamPending = append(i.streamPending, []rune(e.Delta)...)
|
||||
i.streamOn = true
|
||||
case core.EvAssistantMessage:
|
||||
i.streaming.Reset()
|
||||
i.streamOn = false
|
||||
if i.cfg.OnAssistant != nil {
|
||||
i.cfg.OnAssistant(e.Message)
|
||||
}
|
||||
// Mirror the assistant's final visible text to the telegram
|
||||
// bridge when active. Only TextBlock content is forwarded;
|
||||
// tool_use blocks are internal. Send on a goroutine so the
|
||||
// network call doesn't hold the event loop lock.
|
||||
if i.telegramBridge != nil && i.telegramBridge.Active() {
|
||||
var sb strings.Builder
|
||||
for _, c := range e.Message.Content {
|
||||
if tb, ok := c.(provider.TextBlock); ok {
|
||||
if sb.Len() > 0 {
|
||||
sb.WriteString("\n")
|
||||
}
|
||||
sb.WriteString(tb.Text)
|
||||
}
|
||||
}
|
||||
if text := sb.String(); strings.TrimSpace(text) != "" {
|
||||
go i.telegramBridge.OnAssistantText(text)
|
||||
}
|
||||
// OnAssistant + telegram mirroring always fire on message
|
||||
// arrival — they read the FINAL message content, which is
|
||||
// complete regardless of what's still in the pacer.
|
||||
i.assistantMessageSideEffects(e.Message)
|
||||
// If the pacer still has characters to drain, keep streamOn
|
||||
// true and mark flush pending; the paintPace ticker will
|
||||
// drain the remainder and reset streaming state when done.
|
||||
// Otherwise (rare: full-replay sessions, abort paths) clear
|
||||
// synchronously so a later render doesn't show stale text.
|
||||
if len(i.streamPending) > 0 {
|
||||
i.streamFlushPending = true
|
||||
return
|
||||
}
|
||||
i.resetStreamingStateLocked()
|
||||
case core.EvToolUseStart:
|
||||
// Live streaming: pre-create the view so the user sees the
|
||||
// tool call being composed in real time. Any subsequent
|
||||
|
|
@ -2257,10 +2303,11 @@ func (i *Interactive) handleEvent(ev core.AgentEvent) {
|
|||
}
|
||||
case core.EvTurnEnd:
|
||||
if e.Stop == provider.StopAborted {
|
||||
// Aborted turn: discard the partial streaming text (it is not
|
||||
// persisted in the transcript) and clear any transient error.
|
||||
i.streaming.Reset()
|
||||
i.streamOn = false
|
||||
// Aborted turn: discard the partial streaming text (it
|
||||
// is not persisted in the transcript) and clear any
|
||||
// transient error. Also drop anything still buffered in
|
||||
// the pacer so nothing keeps drawing after the cancel.
|
||||
i.resetStreamingStateLocked()
|
||||
i.statusErr = ""
|
||||
i.statusOK = "cancelled"
|
||||
return
|
||||
|
|
@ -2955,3 +3002,103 @@ func assistantText(m provider.Message) string {
|
|||
}
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
// resetStreamingStateLocked clears every piece of streaming state
|
||||
// in one shot. Used by abort paths (turn cancel, compact finish,
|
||||
// queue drain) so the pacer doesn't keep draining stale runes from
|
||||
// a prior turn. Must be called with i.mu held.
|
||||
func (i *Interactive) resetStreamingStateLocked() {
|
||||
i.streaming.Reset()
|
||||
i.streamPending = i.streamPending[:0]
|
||||
i.streamFlushPending = false
|
||||
i.streamOn = false
|
||||
}
|
||||
|
||||
// assistantMessageSideEffects runs the non-visual hooks attached to
|
||||
// EvAssistantMessage: the host-provided OnAssistant callback and the
|
||||
// telegram-bridge mirror. Called with i.mu held.
|
||||
//
|
||||
// Factored out of handleEvent because the streaming pacer may defer
|
||||
// visual reset until after the last buffered rune has painted, but
|
||||
// the callbacks themselves must fire on message arrival so
|
||||
// downstream observers (session persistence, telegram, cost panels)
|
||||
// don't wait on a UI animation to catch up.
|
||||
func (i *Interactive) assistantMessageSideEffects(m provider.Message) {
|
||||
if i.cfg.OnAssistant != nil {
|
||||
i.cfg.OnAssistant(m)
|
||||
}
|
||||
if i.telegramBridge != nil && i.telegramBridge.Active() {
|
||||
var sb strings.Builder
|
||||
for _, c := range m.Content {
|
||||
if tb, ok := c.(provider.TextBlock); ok {
|
||||
if sb.Len() > 0 {
|
||||
sb.WriteString("\n")
|
||||
}
|
||||
sb.WriteString(tb.Text)
|
||||
}
|
||||
}
|
||||
if text := sb.String(); strings.TrimSpace(text) != "" {
|
||||
go i.telegramBridge.OnAssistantText(text)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// paintPaceRate is how many runes the streaming pacer releases per
|
||||
// tick. With a 16ms tick, 6 runes/tick is ~375 runes/s — fast enough
|
||||
// that a 500-rune summary finishes in ~1.3s, slow enough to look
|
||||
// like a human typing. Empirically matches the feel of provider
|
||||
// paths that already drip-stream natively.
|
||||
const paintPaceRate = 6
|
||||
|
||||
// paintPaceInterval is the tick interval for the streaming pacer.
|
||||
// 16ms lines up with the redraw throttle so we never paint faster
|
||||
// than the terminal can keep up.
|
||||
const paintPaceInterval = 16 * time.Millisecond
|
||||
|
||||
// runStreamPacer drains buffered deltas from streamPending into
|
||||
// streaming a small batch per tick, invalidating after each move.
|
||||
// It stops when the context cancels (tui shutdown).
|
||||
//
|
||||
// Why a pacer: providers differ wildly in how they chunk their
|
||||
// text_delta events. The API-key path on Anthropic emits ~30 drips
|
||||
// for a 400-token summary; the OAuth path can coalesce the same
|
||||
// summary into 3 fat chunks, visually indistinguishable from "the
|
||||
// whole reply just appeared". The pacer normalizes that so every
|
||||
// path looks the same on screen.
|
||||
func (i *Interactive) runStreamPacer(ctx context.Context) {
|
||||
t := time.NewTicker(paintPaceInterval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
i.mu.Lock()
|
||||
if len(i.streamPending) == 0 {
|
||||
// EvAssistantMessage already fired but the pacer
|
||||
// was still draining a tick ago. Everything is now
|
||||
// painted; clear the streaming flags so the next
|
||||
// redraw shows the finalised transcript message
|
||||
// and hides the streaming overlay.
|
||||
if i.streamFlushPending {
|
||||
i.streamFlushPending = false
|
||||
i.streaming.Reset()
|
||||
i.streamOn = false
|
||||
i.mu.Unlock()
|
||||
i.invalidate()
|
||||
continue
|
||||
}
|
||||
i.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
n := paintPaceRate
|
||||
if n > len(i.streamPending) {
|
||||
n = len(i.streamPending)
|
||||
}
|
||||
i.streaming.WriteString(string(i.streamPending[:n]))
|
||||
i.streamPending = i.streamPending[n:]
|
||||
i.mu.Unlock()
|
||||
i.invalidate()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue