fix(rpc): wait for in-flight commands on stdin close, drop dup events

three real issues found by actually running zot rpc end-to-end:

1. piping a single command into the process and letting stdin close
   would race the agent loop against the process exit, swallowing
   the entire prompt response. fix: track in-flight prompt and
   compact goroutines in a sync.WaitGroup; run() now waits on it
   before returning.

2. each prompt emitted two consecutive done frames - one from the
   agent loop EvDone passing through EventToJSON, one added at the
   end of runPrompt. suppress EvDone in the prompt sink so only the
   explicit terminator remains.

3. cancelling a turn produced a spurious error frame on top of the
   turn_end stop=aborted that already carried the cancellation.
   suppress error frames when the underlying error is
   context.Canceled, in both prompt and compact paths.

verified manually: ping, get_state, get_models, set_model
(valid + invalid id), clear + get_messages, abort, malformed json,
unknown command, auth gate (missing/wrong/correct token),
stdin-close-while-prompt-running, in-process SDK Prompt, plus all
four reference clients parse cleanly and shell + python actually
drive the protocol.
This commit is contained in:
patriceckhart 2026-04-19 12:35:13 +02:00
parent a670442c9e
commit ebc5dad18c
2 changed files with 35 additions and 6 deletions

View file

@ -4,6 +4,7 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
@ -67,10 +68,19 @@ type rpcServer struct {
turnMu sync.Mutex // serialises one prompt at a time
activeCancel context.CancelFunc
authed bool
// inFlight tracks long-running command goroutines so run() can
// wait for them before returning when stdin closes. Without this,
// piping a single 'prompt' command into 'zot rpc' would race the
// process exit against the agent loop and the prompt would never
// produce output.
inFlight sync.WaitGroup
}
// run reads NDJSON commands from in and dispatches them. Returns when
// in is closed. Errors writing to stdout are fatal.
// in is closed AND every in-flight long-running command (prompt /
// compact) has finished, so a quick `echo cmd | zot rpc` invocation
// still produces full output before the process exits.
func (s *rpcServer) run(in io.Reader) error {
requireToken := os.Getenv("ZOTCORE_RPC_TOKEN") != ""
s.authed = !requireToken
@ -114,7 +124,9 @@ func (s *rpcServer) run(in io.Reader) error {
}
s.dispatch(head.Type, head.ID, []byte(line))
}
return sc.Err()
err := sc.Err()
s.inFlight.Wait()
return err
}
// dispatch routes a command. Long-running commands (prompt, compact)
@ -140,7 +152,11 @@ func (s *rpcServer) dispatch(cmd, id string, raw []byte) {
s.writeError(id, cmd, err.Error())
return
}
go s.runPrompt(id, req.Message, req.Images)
s.inFlight.Add(1)
go func() {
defer s.inFlight.Done()
s.runPrompt(id, req.Message, req.Images)
}()
case "abort":
if c := s.takeCancel(); c != nil {
@ -149,7 +165,11 @@ func (s *rpcServer) dispatch(cmd, id string, raw []byte) {
s.writeResponse(id, cmd, nil)
case "compact":
go s.runCompact(id)
s.inFlight.Add(1)
go func() {
defer s.inFlight.Done()
s.runCompact(id)
}()
case "get_state":
s.writeResponse(id, cmd, s.snapshotState())
@ -222,9 +242,16 @@ func (s *rpcServer) runPrompt(id, message string, images []struct {
}
err := s.agent.Prompt(subCtx, message, imgs, func(ev core.AgentEvent) {
// EvDone is emitted by the agent loop and we re-emit our own
// 'done' below; suppressing it here avoids duplicate frames.
if _, ok := ev.(core.EvDone); ok {
return
}
s.writeEvent(modes.EventToJSON(ev))
})
if err != nil {
// Don't emit a stand-alone error event for cancellation; the prior
// turn_end with stop=aborted already carries that signal.
if err != nil && !errors.Is(err, context.Canceled) {
s.writeEvent(map[string]any{"type": "error", "message": err.Error()})
}
s.writeEvent(map[string]any{"type": "done"})
@ -242,7 +269,9 @@ func (s *rpcServer) runCompact(id string) {
s.writeResponse(id, "compact", map[string]any{"started": true})
summary, err := s.agent.Compact(subCtx, 4, nil)
if err != nil {
s.writeEvent(map[string]any{"type": "error", "message": err.Error()})
if !errors.Is(err, context.Canceled) {
s.writeEvent(map[string]any{"type": "error", "message": err.Error()})
}
return
}
s.writeEvent(map[string]any{