From ebc5dad18c622b7ddc19aecf16a64f85aa4ef6b6 Mon Sep 17 00:00:00 2001 From: patriceckhart Date: Sun, 19 Apr 2026 12:35:13 +0200 Subject: [PATCH] fix(rpc): wait for in-flight commands on stdin close, drop dup events three real issues found by actually running zot rpc end-to-end: 1. piping a single command into the process and letting stdin close would race the agent loop against the process exit, swallowing the entire prompt response. fix: track in-flight prompt and compact goroutines in a sync.WaitGroup; run() now waits on it before returning. 2. each prompt emitted two consecutive done frames - one from the agent loop EvDone passing through EventToJSON, one added at the end of runPrompt. suppress EvDone in the prompt sink so only the explicit terminator remains. 3. cancelling a turn produced a spurious error frame on top of the turn_end stop=aborted that already carried the cancellation. suppress error frames when the underlying error is context.Canceled, in both prompt and compact paths. verified manually: ping, get_state, get_models, set_model (valid + invalid id), clear + get_messages, abort, malformed json, unknown command, auth gate (missing/wrong/correct token), stdin-close-while-prompt-running, in-process SDK Prompt, plus all four reference clients parse cleanly and shell + python actually drive the protocol. --- .../__pycache__/zot_client.cpython-314.pyc | Bin 0 -> 6864 bytes internal/agent/rpc.go | 41 +++++++++++++++--- 2 files changed, 35 insertions(+), 6 deletions(-) create mode 100644 examples/rpc/python/__pycache__/zot_client.cpython-314.pyc diff --git a/examples/rpc/python/__pycache__/zot_client.cpython-314.pyc b/examples/rpc/python/__pycache__/zot_client.cpython-314.pyc new file mode 100644 index 0000000000000000000000000000000000000000..6a874b0b77f3694123baf5552ae0b024e7bbebcb GIT binary patch literal 6864 zcmb_hZEPFInV#kD@>?V&l9DV_*6PFZ(Ne@jcKn_A{2@2C9En*wyqsc-p~;oB4#{P9 zm$oIKfYAGrnlzHy9Hh2CqTJyw3eciyQJ@WQKv?ALHdP8XDosXS)XXbh5eP-0*t8x|HO|%a}qFXwpjmqRyfQh0n@)IhV=` z@lkz5$qVVMEai1!NKplSL=s*aQ*=Qsq+b%woa^rs3aX+jX(ij`c0aGBhNVMp0Ybt2 z6q+W@&$|kvLd%eRjdn4VlCyGtSkNU+Z*jZN6;fC8TDjv+L6hDoyunf>6#4fu8v4~I+R{i^TH5JNE?x~E}>6> z2D+r?VR{*<0K-W6v@B`D|Bk-_{baK8CAX5#j&=#BbA_ywgNbXx<)WqwQeG(zj|j>T z?be#Lpew>9NhoU4P%(?@q}-sMs^jbm3aES-<{}9hC9TC^&AM`#jdg?%vVxW#k#ean zca$+aseE41Q@X6=wZ3$D(+FALZ)WXpp>&*FBsv3VvJHlHR%8GHHi`hwLVL%YDEM(UNI!uBL+7A3901C6FXR5#*X1lCfCR?&iXhGAjQSE+JAeS*qaC6&<(Ka$szQd-GKJ*ujxQ5yxP3adplJeS+o5ore% zfCL(WGA7NE7`Dto^cmt3P&Ya`y5euT;cvR>?^twpEc!c2q59d=GpE6EEQi`AP9g@u zQ~cn_#K}Jf>pl+D-?p(;+g58EW-ovL@_cBiwtYF!zUCp}onPGbl2H9haMz9CuDgV( z+Gapw7SjLI;7BIlIp#a=VXjx#AB)+pN9`!b!pEKbZ#fp^-#QqSohbi5xEtKNN4lHC zxEPkZvGXl=2t?CBxghzfyWyZUz|6H~fbdOBKmzX(0bqW8qj?zY=`jr&Yz}dBvzx*o zFEabVXfeRC3|3qTZ~!yR$prJ+4hzarTOV*>hc4$Nph!`^@?fMmHC#Q{(oz8_zrkHi z$vSv+JR+$_Z#O;qamX9~GWh6+bh)2{8sIG+|J1?BgHuN*kKXjOBcj5kKxo!A%niZaD{s)_>pj(M6E3F?`G=m5QrwyG%>*s4K@OD z6aYW7HVxVA-7Ve7HNmcARv zY1%Z?+OI!q57P_Q2{C8~D_2KHmme~EeR-w@zQQiICHO5-)L>}~>#_X-Z6XzI8XwSR z$Kb{ai%nIe+y0H_c$LmhptGaAuZcW+7UtWGCBtxrMsN+j3nxSQq-lA;UbmLD?Y4b= zE$~hgZ!KfVlFH~E1U8ARvaCb74yeGRzNW7F{3BXzxa>B1(H?6o(zi@utnv{`IAXp= zJ`G--8jWV8UcAYFMG z9y#o0C*V@g!Iy?32M-|dZffS-_;QlWNLf7vS1OticmPq9Y%-n7X2+cF7cp~DFgSFx z2J<)cN-$D9H%7g}I9g)xVaRy6&oHp7W&vZ!Uo*si@p2A6?HKZ7^Nbh%$N z#}m{9Ofp(yuvsau3LvZa0ATR+J$L<;=Q2E$QunT+e^K$EG}vKDuOJI}_^W#`!o8L+8sA9bv+f!9tZ&A**wnog*fYWZ(Gw_nYo>Z9d*}Xcp=Y9Z z+1vez(>LXr^vu;S?3?f`JG(!rsh@pp=CRq>#PJezV4ty1U-^M=;#kQam_EDg7ghs7 z>~$p|+z1HsHE-9xRlDGLw`Qq%?{eVD`zk*Qw3MoAre9vJZe9(9OSMh&wzutX+2^nP zJLnTzc;%O&Up4%q;n&CCKlR?J-|YLKd+A8u(i8oQkH;6|&n-vfIv$1!^RzimsLx=v%l2H36FzoOf zky35_?9j~6+=YeirP{7(mTIwkVb9O@zq9|{{YxE(7k3}IO_-;4ly)?~IsC)n`H`g^ zvBk#6-*x<&ec$n(<2UXPjx8PN`#@XVe+~#~_<82jb)6g>=tMAbqhdpEZ2uT$h@;L% zV3axceD~Y?-rBbiU25IC*z)9J?Y{py_x`|p10M#SDb?Y<-Q}P1PI_PW-8o2tEqA+# z%lDcCRv0&fmVYr}N8x+EFEs&>as9C#_RJpg`zMY!Lh^@D7sUS)@?yNFrk7`t10#%* z$!L|~NG9Ql-=xDsMR)`UI;UPJI_p>e>L$D7rMg|u7UE@zW__e6hr_3 literal 0 HcmV?d00001 diff --git a/internal/agent/rpc.go b/internal/agent/rpc.go index 2ce07d5..739cec2 100644 --- a/internal/agent/rpc.go +++ b/internal/agent/rpc.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "encoding/json" + "errors" "fmt" "io" "os" @@ -67,10 +68,19 @@ type rpcServer struct { turnMu sync.Mutex // serialises one prompt at a time activeCancel context.CancelFunc authed bool + + // inFlight tracks long-running command goroutines so run() can + // wait for them before returning when stdin closes. Without this, + // piping a single 'prompt' command into 'zot rpc' would race the + // process exit against the agent loop and the prompt would never + // produce output. + inFlight sync.WaitGroup } // run reads NDJSON commands from in and dispatches them. Returns when -// in is closed. Errors writing to stdout are fatal. +// in is closed AND every in-flight long-running command (prompt / +// compact) has finished, so a quick `echo cmd | zot rpc` invocation +// still produces full output before the process exits. func (s *rpcServer) run(in io.Reader) error { requireToken := os.Getenv("ZOTCORE_RPC_TOKEN") != "" s.authed = !requireToken @@ -114,7 +124,9 @@ func (s *rpcServer) run(in io.Reader) error { } s.dispatch(head.Type, head.ID, []byte(line)) } - return sc.Err() + err := sc.Err() + s.inFlight.Wait() + return err } // dispatch routes a command. Long-running commands (prompt, compact) @@ -140,7 +152,11 @@ func (s *rpcServer) dispatch(cmd, id string, raw []byte) { s.writeError(id, cmd, err.Error()) return } - go s.runPrompt(id, req.Message, req.Images) + s.inFlight.Add(1) + go func() { + defer s.inFlight.Done() + s.runPrompt(id, req.Message, req.Images) + }() case "abort": if c := s.takeCancel(); c != nil { @@ -149,7 +165,11 @@ func (s *rpcServer) dispatch(cmd, id string, raw []byte) { s.writeResponse(id, cmd, nil) case "compact": - go s.runCompact(id) + s.inFlight.Add(1) + go func() { + defer s.inFlight.Done() + s.runCompact(id) + }() case "get_state": s.writeResponse(id, cmd, s.snapshotState()) @@ -222,9 +242,16 @@ func (s *rpcServer) runPrompt(id, message string, images []struct { } err := s.agent.Prompt(subCtx, message, imgs, func(ev core.AgentEvent) { + // EvDone is emitted by the agent loop and we re-emit our own + // 'done' below; suppressing it here avoids duplicate frames. + if _, ok := ev.(core.EvDone); ok { + return + } s.writeEvent(modes.EventToJSON(ev)) }) - if err != nil { + // Don't emit a stand-alone error event for cancellation; the prior + // turn_end with stop=aborted already carries that signal. + if err != nil && !errors.Is(err, context.Canceled) { s.writeEvent(map[string]any{"type": "error", "message": err.Error()}) } s.writeEvent(map[string]any{"type": "done"}) @@ -242,7 +269,9 @@ func (s *rpcServer) runCompact(id string) { s.writeResponse(id, "compact", map[string]any{"started": true}) summary, err := s.agent.Compact(subCtx, 4, nil) if err != nil { - s.writeEvent(map[string]any{"type": "error", "message": err.Error()}) + if !errors.Is(err, context.Canceled) { + s.writeEvent(map[string]any{"type": "error", "message": err.Error()}) + } return } s.writeEvent(map[string]any{