zot/packages/core/queue_test.go
patriceckhart fa7d8d8be5 refactor: split source into packages/{provider,core,tui,agent}
Single Go module, four top-level packages under packages/. Import
paths become github.com/patriceckhart/zot/packages/<name>; downstream
consumers can depend on individual packages without pulling the rest.

Layout:
  packages/provider/     LLM clients + catalog
  packages/provider/auth/ credential store + OAuth + login server
  packages/core/         agent loop, sessions, cost
  packages/tui/          terminal toolkit + chat view
  packages/agent/        CLI wiring, system prompt
    extensions/ extproto/ modes/ tools/ skills/ swarm/
    sdk/  (was pkg/zotcore, package renamed zotcore -> sdk)
    ext/  (was pkg/zotext, package renamed zotext -> ext)

internal/ and pkg/ removed. The internal/assets logo moved into
packages/provider/auth/assets.

Public Go SDK identifiers renamed:
  pkg/zotcore (package zotcore) -> packages/agent/sdk (package sdk)
  pkg/zotext  (package zotext)  -> packages/agent/ext (package ext)

This breaks Go-based extensions and embedders; the JSON wire protocol
for extensions and RPC is unchanged, so non-Go extensions, already-
built extension binaries, and zot rpc consumers are unaffected.

Docs, examples, and the built-in write-zot-extension skill updated
for the new paths and identifiers. Shadow-bug fixes in code samples
(ext := ext.New -> e := ext.New).
2026-05-27 09:07:15 +02:00

152 lines
4.4 KiB
Go

package core
import (
"context"
"encoding/json"
"sync"
"sync/atomic"
"testing"
"github.com/patriceckhart/zot/packages/provider"
)
type queueFakeClient struct {
calls int32
}
func (c *queueFakeClient) Name() string { return "queue-fake" }
func (c *queueFakeClient) Stream(ctx context.Context, req provider.Request) (<-chan provider.Event, error) {
call := atomic.AddInt32(&c.calls, 1)
out := make(chan provider.Event, 4)
go func() {
defer close(out)
out <- provider.EventStart{Provider: "queue-fake", Model: req.Model}
switch call {
case 1:
out <- provider.EventToolStart{ID: "t1", Name: "echo"}
out <- provider.EventToolEnd{ID: "t1"}
out <- provider.EventDone{Stop: provider.StopToolUse, Message: provider.Message{
Role: provider.RoleAssistant,
Content: []provider.Content{
provider.TextBlock{Text: "using tool"},
provider.ToolCallBlock{ID: "t1", Name: "echo", Arguments: json.RawMessage(`{}`)},
},
}}
case 2:
out <- provider.EventTextDelta{Delta: "saw queued"}
out <- provider.EventDone{Stop: provider.StopEnd, Message: provider.Message{
Role: provider.RoleAssistant,
Content: []provider.Content{provider.TextBlock{Text: "saw queued"}},
}}
default:
out <- provider.EventDone{Stop: provider.StopEnd, Message: provider.Message{
Role: provider.RoleAssistant,
Content: []provider.Content{provider.TextBlock{Text: "extra"}},
}}
}
}()
return out, nil
}
// blockingTool waits until the test has queued a message, then
// returns. This pins the core behaviour: queued user text is delivered
// after the current tool batch finishes and before the next model call.
type blockingTool struct {
started chan struct{}
release chan struct{}
}
func (t *blockingTool) Name() string { return "echo" }
func (t *blockingTool) Description() string { return "echoes" }
func (t *blockingTool) Schema() json.RawMessage { return json.RawMessage(`{"type":"object"}`) }
func (t *blockingTool) Execute(ctx context.Context, args json.RawMessage, progress func(string)) (ToolResult, error) {
close(t.started)
select {
case <-ctx.Done():
return ToolResult{Content: []provider.Content{provider.TextBlock{Text: ctx.Err().Error()}}, IsError: true}, ctx.Err()
case <-t.release:
}
return ToolResult{Content: []provider.Content{provider.TextBlock{Text: "tool ok"}}}, nil
}
func TestQueuedMessageInjectedAfterToolBatchBeforeNextModelCall(t *testing.T) {
client := &queueFakeClient{}
tool := &blockingTool{started: make(chan struct{}), release: make(chan struct{})}
a := NewAgent(client, "fake-model", "system", Registry{"echo": tool})
var (
mu sync.Mutex
texts []string
)
sink := func(ev AgentEvent) {
switch e := ev.(type) {
case EvUserMessage:
mu.Lock()
texts = append(texts, "user:"+extractText(e.Message))
mu.Unlock()
case EvAssistantMessage:
mu.Lock()
texts = append(texts, "asst:"+extractText(e.Message))
mu.Unlock()
}
}
done := make(chan error, 1)
go func() {
done <- a.Prompt(context.Background(), "do X", nil, sink)
}()
<-tool.started
if !a.QueueMessage("also do Y") {
t.Fatal("QueueMessage returned false")
}
close(tool.release)
if err := <-done; err != nil {
t.Fatalf("Prompt returned %v", err)
}
if got := atomic.LoadInt32(&client.calls); got != 2 {
t.Fatalf("Stream calls = %d; want 2", got)
}
mu.Lock()
defer mu.Unlock()
if !queueTestContains(texts, "user:also do Y") {
t.Fatalf("queued message was not emitted as user message; texts=%v", texts)
}
if !queueTestContains(texts, "asst:saw queued") {
t.Fatalf("second assistant response missing; texts=%v", texts)
}
}
func queueTestContains(xs []string, want string) bool {
for _, x := range xs {
if x == want {
return true
}
}
return false
}
func TestQueueMessageSnapshotPopAndDrain(t *testing.T) {
a := NewAgent(nil, "fake", "", Registry{})
if a.QueueMessage(" ") {
t.Fatal("blank queue message accepted")
}
a.QueueMessage("one")
a.QueueMessage("two")
if got := a.PendingQueuedMessages(); len(got) != 2 || got[0] != "one" || got[1] != "two" {
t.Fatalf("PendingQueuedMessages = %v; want [one two]", got)
}
if text, ok := a.PopQueuedMessage(); !ok || text != "two" {
t.Fatalf("PopQueuedMessage = %q,%v; want two,true", text, ok)
}
if got := a.DrainQueuedMessages(); len(got) != 1 || got[0] != "one" {
t.Fatalf("DrainQueuedMessages = %v; want [one]", got)
}
if got := a.QueuedMessageCount(); got != 0 {
t.Fatalf("QueuedMessageCount = %d; want 0", got)
}
}