diff --git a/examples/rpc/python/__pycache__/zot_client.cpython-314.pyc b/examples/rpc/python/__pycache__/zot_client.cpython-314.pyc new file mode 100644 index 0000000..6a874b0 Binary files /dev/null and b/examples/rpc/python/__pycache__/zot_client.cpython-314.pyc differ diff --git a/internal/agent/rpc.go b/internal/agent/rpc.go index 2ce07d5..739cec2 100644 --- a/internal/agent/rpc.go +++ b/internal/agent/rpc.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "encoding/json" + "errors" "fmt" "io" "os" @@ -67,10 +68,19 @@ type rpcServer struct { turnMu sync.Mutex // serialises one prompt at a time activeCancel context.CancelFunc authed bool + + // inFlight tracks long-running command goroutines so run() can + // wait for them before returning when stdin closes. Without this, + // piping a single 'prompt' command into 'zot rpc' would race the + // process exit against the agent loop and the prompt would never + // produce output. + inFlight sync.WaitGroup } // run reads NDJSON commands from in and dispatches them. Returns when -// in is closed. Errors writing to stdout are fatal. +// in is closed AND every in-flight long-running command (prompt / +// compact) has finished, so a quick `echo cmd | zot rpc` invocation +// still produces full output before the process exits. func (s *rpcServer) run(in io.Reader) error { requireToken := os.Getenv("ZOTCORE_RPC_TOKEN") != "" s.authed = !requireToken @@ -114,7 +124,9 @@ func (s *rpcServer) run(in io.Reader) error { } s.dispatch(head.Type, head.ID, []byte(line)) } - return sc.Err() + err := sc.Err() + s.inFlight.Wait() + return err } // dispatch routes a command. Long-running commands (prompt, compact) @@ -140,7 +152,11 @@ func (s *rpcServer) dispatch(cmd, id string, raw []byte) { s.writeError(id, cmd, err.Error()) return } - go s.runPrompt(id, req.Message, req.Images) + s.inFlight.Add(1) + go func() { + defer s.inFlight.Done() + s.runPrompt(id, req.Message, req.Images) + }() case "abort": if c := s.takeCancel(); c != nil { @@ -149,7 +165,11 @@ func (s *rpcServer) dispatch(cmd, id string, raw []byte) { s.writeResponse(id, cmd, nil) case "compact": - go s.runCompact(id) + s.inFlight.Add(1) + go func() { + defer s.inFlight.Done() + s.runCompact(id) + }() case "get_state": s.writeResponse(id, cmd, s.snapshotState()) @@ -222,9 +242,16 @@ func (s *rpcServer) runPrompt(id, message string, images []struct { } err := s.agent.Prompt(subCtx, message, imgs, func(ev core.AgentEvent) { + // EvDone is emitted by the agent loop and we re-emit our own + // 'done' below; suppressing it here avoids duplicate frames. + if _, ok := ev.(core.EvDone); ok { + return + } s.writeEvent(modes.EventToJSON(ev)) }) - if err != nil { + // Don't emit a stand-alone error event for cancellation; the prior + // turn_end with stop=aborted already carries that signal. + if err != nil && !errors.Is(err, context.Canceled) { s.writeEvent(map[string]any{"type": "error", "message": err.Error()}) } s.writeEvent(map[string]any{"type": "done"}) @@ -242,7 +269,9 @@ func (s *rpcServer) runCompact(id string) { s.writeResponse(id, "compact", map[string]any{"started": true}) summary, err := s.agent.Compact(subCtx, 4, nil) if err != nil { - s.writeEvent(map[string]any{"type": "error", "message": err.Error()}) + if !errors.Is(err, context.Canceled) { + s.writeEvent(map[string]any{"type": "error", "message": err.Error()}) + } return } s.writeEvent(map[string]any{