mirror of
https://github.com/patriceckhart/zot.git
synced 2026-06-26 21:36:31 +02:00
The status-bar was showing 2x the real cost. Anthropic's SSE stream
sends the full cumulative usage payload on both message_start AND
message_delta, and our code was summing them with += on each. Cache
tokens, the biggest cost component on multi-turn sessions, were
therefore counted twice on every single API call.
Fix: assign instead of accumulate within one Stream() invocation.
Cross-call accumulation still happens correctly in
core.CostTracker.Add(). Verified end-to-end: a truly fresh "read
sample.ts on desktop" session that used to report $0.15 now reports
$0.07 with the same cache-hit rate.
While chasing that, audited and corrected the rest of the request
pipeline so the cache actually hits cleanly.
Provider layer (internal/provider/anthropic.go):
- cache_control on the Claude Code identity line (was uncached),
giving Anthropic a first stable checkpoint independent of the
user system prompt. Turns a cold start from R=0 into R>0 for
any subsequent fresh session within the cache TTL.
- tool_result blocks go in their OWN new user message instead of
merging into the preceding user message. Merging was mutating
the prior user message's content array between turns, busting
byte-identical prefix match in Anthropic's cache.
- tagLastUserCache: exactly one cache_control on the last user
message (was two), so identity + sysprompt + last-tool +
last-user fits Anthropic's 4-breakpoint budget exactly.
- user-agent dropped its "(external, cli)" suffix to match the
canonical Claude Code string exactly.
- ZOT_DEBUG_ANTHROPIC=<path> env hook appends each outgoing
request body (one JSON object per line) to that file. Off by
default; for debugging cache / cost issues in the field.
- Usage field handling now correctly assigns the latest value
from each SSE event instead of summing.
Core (internal/core/tool.go):
- Registry.Specs() now sorts tools alphabetically. Go map
iteration order is randomized per call; randomized tool arrays
were breaking Anthropic's byte-level prefix match on every
single call within a session.
System prompt (internal/agent/systemprompt.go):
- Restored a substantial default prompt with structured tools +
operating guidelines sections. The earlier aggressive trim
dropped us under Anthropic's 1024-token minimum cacheable
prefix floor: prefixes below 1024 tokens are silently NOT
cached by Anthropic, so every fresh session started cold with
R=0 no matter what else we did.
- Current default ~1040 tokens on its own; with identity and
tools it's ~1400, comfortably above the 1024 floor.
- --system-prompt, --append-system-prompt, and
$ZOT_HOME/SYSTEM.md escape hatches all still work and take
precedence.
Model catalog (internal/provider/models.go):
- claude-opus-4-5: 1M ctx / 128k max -> 200k ctx / 64k max. I had
over-extrapolated; 1M context is a 4.6+ feature.
- gpt-5.4: 400k -> 272k. Canonical value on both the OpenAI
direct API and the ChatGPT Codex OAuth backend.
- gpt-5.1, gpt-5.2, gpt-5.3, gpt-5.4-mini: pinned to 272k.
OpenAI advertises 400k on direct and Codex caps at 272k. zot
serves both from one catalog row per id, so we pin to the
smaller number to keep the context-usage meter honest under
subscription auth. Direct-API users see a conservative estimate
instead of an inflated one.
README:
- Tiny capitalization touch-up on the opening line.
718 lines
21 KiB
Go
718 lines
21 KiB
Go
package provider
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
const anthropicDefaultBaseURL = "https://api.anthropic.com"
|
|
const anthropicAPIVersion = "2023-06-01"
|
|
|
|
// Stealth identity used when talking to Anthropic via subscription OAuth.
|
|
// These values mimic the official Claude Code CLI so Anthropic's edge
|
|
// accepts the request; diverging from them causes 429 rate_limit_error
|
|
// or 403 on the very first request.
|
|
const (
|
|
claudeCodeVersion = "2.1.75"
|
|
claudeCodeIdentity = "You are Claude Code, Anthropic's official CLI for Claude."
|
|
)
|
|
|
|
// Claude Code's canonical tool casing. When running under OAuth we must
|
|
// advertise tool names that match this list (case-insensitive lookup),
|
|
// because Anthropic's backend cross-checks them.
|
|
var claudeCodeToolNames = map[string]string{
|
|
"read": "Read",
|
|
"write": "Write",
|
|
"edit": "Edit",
|
|
"bash": "Bash",
|
|
"grep": "Grep",
|
|
"glob": "Glob",
|
|
}
|
|
|
|
func toClaudeCodeToolName(name string) string {
|
|
if cc, ok := claudeCodeToolNames[strings.ToLower(name)]; ok {
|
|
return cc
|
|
}
|
|
return name
|
|
}
|
|
|
|
func fromClaudeCodeToolName(name string, tools []Tool) string {
|
|
lower := strings.ToLower(name)
|
|
for _, t := range tools {
|
|
if strings.ToLower(t.Name) == lower {
|
|
return t.Name
|
|
}
|
|
}
|
|
return name
|
|
}
|
|
|
|
// anthropicClient implements Client against the Anthropic Messages API.
|
|
type anthropicClient struct {
|
|
apiKey string
|
|
baseURL string
|
|
oauthTok string // when non-empty, send Bearer auth instead of x-api-key
|
|
http *http.Client
|
|
}
|
|
|
|
// NewAnthropic creates an Anthropic client using an API key. baseURL may be empty.
|
|
func NewAnthropic(apiKey, baseURL string) Client {
|
|
if baseURL == "" {
|
|
baseURL = anthropicDefaultBaseURL
|
|
}
|
|
return &anthropicClient{
|
|
apiKey: apiKey,
|
|
baseURL: strings.TrimRight(baseURL, "/"),
|
|
http: &http.Client{Timeout: 0},
|
|
}
|
|
}
|
|
|
|
// NewAnthropicOAuth creates an Anthropic client using a subscription OAuth access token.
|
|
func NewAnthropicOAuth(accessToken, baseURL string) Client {
|
|
if baseURL == "" {
|
|
baseURL = anthropicDefaultBaseURL
|
|
}
|
|
return &anthropicClient{
|
|
oauthTok: accessToken,
|
|
baseURL: strings.TrimRight(baseURL, "/"),
|
|
http: &http.Client{Timeout: 0},
|
|
}
|
|
}
|
|
|
|
func (c *anthropicClient) Name() string { return "anthropic" }
|
|
|
|
// ---- wire types ----
|
|
|
|
type anthTextBlock struct {
|
|
Type string `json:"type"` // "text"
|
|
Text string `json:"text"`
|
|
CacheControl *anthCacheCtrl `json:"cache_control,omitempty"`
|
|
}
|
|
|
|
type anthImageSource struct {
|
|
Type string `json:"type"` // "base64"
|
|
MediaType string `json:"media_type"`
|
|
Data string `json:"data"`
|
|
}
|
|
|
|
type anthImageBlock struct {
|
|
Type string `json:"type"` // "image"
|
|
Source anthImageSource `json:"source"`
|
|
CacheControl *anthCacheCtrl `json:"cache_control,omitempty"`
|
|
}
|
|
|
|
type anthToolUseBlock struct {
|
|
Type string `json:"type"` // "tool_use"
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
Input json.RawMessage `json:"input"`
|
|
CacheControl *anthCacheCtrl `json:"cache_control,omitempty"`
|
|
}
|
|
|
|
type anthToolResultBlock struct {
|
|
Type string `json:"type"` // "tool_result"
|
|
ToolUseID string `json:"tool_use_id"`
|
|
Content json.RawMessage `json:"content"` // string or array of blocks
|
|
IsError bool `json:"is_error,omitempty"`
|
|
CacheControl *anthCacheCtrl `json:"cache_control,omitempty"`
|
|
}
|
|
|
|
type anthCacheCtrl struct {
|
|
Type string `json:"type"` // "ephemeral"
|
|
TTL string `json:"ttl,omitempty"`
|
|
}
|
|
|
|
type anthMessage struct {
|
|
Role string `json:"role"`
|
|
Content []interface{} `json:"content"`
|
|
}
|
|
|
|
type anthSystemBlock struct {
|
|
Type string `json:"type"` // "text"
|
|
Text string `json:"text"`
|
|
CacheControl *anthCacheCtrl `json:"cache_control,omitempty"`
|
|
}
|
|
|
|
type anthTool struct {
|
|
Name string `json:"name"`
|
|
Description string `json:"description,omitempty"`
|
|
InputSchema json.RawMessage `json:"input_schema"`
|
|
CacheControl *anthCacheCtrl `json:"cache_control,omitempty"`
|
|
}
|
|
|
|
type anthThinking struct {
|
|
Type string `json:"type"` // "enabled"
|
|
BudgetTokens int `json:"budget_tokens"`
|
|
}
|
|
|
|
type anthRequest struct {
|
|
Model string `json:"model"`
|
|
MaxTokens int `json:"max_tokens"`
|
|
System []anthSystemBlock `json:"system,omitempty"`
|
|
Messages []anthMessage `json:"messages"`
|
|
Tools []anthTool `json:"tools,omitempty"`
|
|
Temperature *float32 `json:"temperature,omitempty"`
|
|
Thinking *anthThinking `json:"thinking,omitempty"`
|
|
Stream bool `json:"stream"`
|
|
}
|
|
|
|
// ---- request building ----
|
|
|
|
func (c *anthropicClient) buildRequest(req Request) (*anthRequest, error) {
|
|
m, err := FindModel("anthropic", req.Model)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
maxTok := req.MaxTokens
|
|
if maxTok <= 0 {
|
|
maxTok = m.MaxOutput
|
|
}
|
|
|
|
out := &anthRequest{
|
|
Model: req.Model,
|
|
MaxTokens: maxTok,
|
|
Temperature: req.Temperature,
|
|
Stream: true,
|
|
}
|
|
|
|
// System prompt assembly differs between api-key and OAuth modes.
|
|
// OAuth requests MUST begin with the Claude Code identity line or
|
|
// Anthropic rejects them (429 rate_limit_error with zero tokens used).
|
|
//
|
|
// Cache budget: anthropic caps cache_control to 4 breakpoints per
|
|
// request. We spend them on:
|
|
// 1. claude-code identity (OAuth only; stable forever)
|
|
// 2. user system prompt (changes per-session at most)
|
|
// 3. last tool definition (tools change rarely)
|
|
// 4. last message block (advances every turn)
|
|
//
|
|
// The identity line gets its OWN cache_control so the prefix
|
|
// [identity] is cacheable independently of the user system
|
|
// prompt. Without that, the cache prefix starts after block 2
|
|
// and any drift in the user prompt (e.g. the Current date
|
|
// line flipping at midnight) invalidates everything, including
|
|
// the 17 identity tokens we have to re-send every request
|
|
// forever.
|
|
if c.oauthTok != "" {
|
|
out.System = []anthSystemBlock{{
|
|
Type: "text",
|
|
Text: claudeCodeIdentity,
|
|
CacheControl: &anthCacheCtrl{Type: "ephemeral"},
|
|
}}
|
|
if req.System != "" {
|
|
out.System = append(out.System, anthSystemBlock{
|
|
Type: "text",
|
|
Text: req.System,
|
|
CacheControl: &anthCacheCtrl{Type: "ephemeral"},
|
|
})
|
|
}
|
|
} else if req.System != "" {
|
|
out.System = []anthSystemBlock{{
|
|
Type: "text",
|
|
Text: req.System,
|
|
CacheControl: &anthCacheCtrl{Type: "ephemeral"},
|
|
}}
|
|
}
|
|
|
|
if req.Reasoning != "" && m.Reasoning {
|
|
budget := anthropicReasoningBudget(req.Reasoning)
|
|
if budget > 0 {
|
|
out.Thinking = &anthThinking{Type: "enabled", BudgetTokens: budget}
|
|
// Reasoning requires max_tokens > budget.
|
|
if out.MaxTokens <= budget {
|
|
out.MaxTokens = budget + 4096
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, t := range req.Tools {
|
|
name := t.Name
|
|
if c.oauthTok != "" {
|
|
name = toClaudeCodeToolName(name)
|
|
}
|
|
out.Tools = append(out.Tools, anthTool{
|
|
Name: name,
|
|
Description: t.Description,
|
|
InputSchema: t.Schema,
|
|
})
|
|
}
|
|
// Cache the last tool definition (applies cache breakpoint to the whole tools array).
|
|
if n := len(out.Tools); n > 0 {
|
|
out.Tools[n-1].CacheControl = &anthCacheCtrl{Type: "ephemeral"}
|
|
}
|
|
|
|
// Convert messages. Anthropic's wire format has only "user" and
|
|
// "assistant" roles; tool_result blocks live inside user messages.
|
|
//
|
|
// CRITICAL: tool_result blocks go into their OWN new user
|
|
// message, they are NOT merged into the preceding user message.
|
|
// Merging would mutate the prior user message's content array
|
|
// between turn N and turn N+1: turn N caches the prefix ending at
|
|
// [user: "read sample.ts"], turn N+1 sends
|
|
// [user: "read sample.ts" + tool_result=...] which is a
|
|
// different block sequence, busting the cache prefix match.
|
|
// Anthropic's API happily accepts consecutive user messages, and
|
|
// emitting them separately keeps each message bit-stable across
|
|
// turns, so the cache prefix matches for the entire history up
|
|
// to the newest block.
|
|
for _, msg := range req.Messages {
|
|
renameTools := c.oauthTok != ""
|
|
switch msg.Role {
|
|
case RoleUser:
|
|
out.Messages = append(out.Messages, anthMessage{
|
|
Role: "user",
|
|
Content: convertAnthContent(msg.Content, renameTools),
|
|
})
|
|
case RoleTool:
|
|
out.Messages = append(out.Messages, anthMessage{
|
|
Role: "user",
|
|
Content: convertAnthContent(msg.Content, renameTools),
|
|
})
|
|
case RoleAssistant:
|
|
out.Messages = append(out.Messages, anthMessage{
|
|
Role: "assistant",
|
|
Content: convertAnthContent(msg.Content, renameTools),
|
|
})
|
|
}
|
|
}
|
|
|
|
// Tag the LAST user message with cache_control. Spends the 4th
|
|
// breakpoint. For prefixes under ~1024 tokens (Anthropic's
|
|
// minimum cacheable block size for Opus), no cache is written.
|
|
tagLastUserCache(out.Messages)
|
|
|
|
return out, nil
|
|
}
|
|
|
|
// tagLastUserCache marks the last block of the most recent user
|
|
// message. One marker; combined with identity + systemPrompt +
|
|
// last-tool, spends Anthropic's 4-breakpoint budget.
|
|
func tagLastUserCache(msgs []anthMessage) {
|
|
for i := len(msgs) - 1; i >= 0; i-- {
|
|
if msgs[i].Role == "user" {
|
|
markLastBlockEphemeral(msgs[i].Content)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// markLastBlockEphemeral sets CacheControl on the last entry in blocks
|
|
// regardless of whether it's a text, image, tool_use, or tool_result.
|
|
// Each block type carries its own CacheControl pointer so we type-
|
|
// switch + reassign the slice element.
|
|
func markLastBlockEphemeral(blocks []interface{}) {
|
|
if len(blocks) == 0 {
|
|
return
|
|
}
|
|
i := len(blocks) - 1
|
|
cc := &anthCacheCtrl{Type: "ephemeral"}
|
|
switch v := blocks[i].(type) {
|
|
case anthTextBlock:
|
|
v.CacheControl = cc
|
|
blocks[i] = v
|
|
case anthImageBlock:
|
|
v.CacheControl = cc
|
|
blocks[i] = v
|
|
case anthToolUseBlock:
|
|
v.CacheControl = cc
|
|
blocks[i] = v
|
|
case anthToolResultBlock:
|
|
v.CacheControl = cc
|
|
blocks[i] = v
|
|
}
|
|
}
|
|
|
|
func anthropicReasoningBudget(level string) int {
|
|
switch strings.ToLower(level) {
|
|
case "low":
|
|
return 2048
|
|
case "medium":
|
|
return 8192
|
|
case "high":
|
|
return 16384
|
|
default:
|
|
return 0
|
|
}
|
|
}
|
|
|
|
func convertAnthContent(blocks []Content, renameTools bool) []interface{} {
|
|
out := make([]interface{}, 0, len(blocks))
|
|
for _, b := range blocks {
|
|
switch v := b.(type) {
|
|
case TextBlock:
|
|
if v.Text == "" {
|
|
continue
|
|
}
|
|
out = append(out, anthTextBlock{Type: "text", Text: v.Text})
|
|
case ImageBlock:
|
|
out = append(out, anthImageBlock{
|
|
Type: "image",
|
|
Source: anthImageSource{
|
|
Type: "base64",
|
|
MediaType: v.MimeType,
|
|
Data: base64.StdEncoding.EncodeToString(v.Data),
|
|
},
|
|
})
|
|
case ToolCallBlock:
|
|
args := v.Arguments
|
|
if len(args) == 0 {
|
|
args = json.RawMessage("{}")
|
|
}
|
|
name := v.Name
|
|
if renameTools {
|
|
name = toClaudeCodeToolName(name)
|
|
}
|
|
out = append(out, anthToolUseBlock{
|
|
Type: "tool_use", ID: v.ID, Name: name, Input: args,
|
|
})
|
|
case ToolResultBlock:
|
|
// Flatten content to a string if all text; else array of blocks.
|
|
content, _ := anthBuildToolResultContent(v.Content)
|
|
out = append(out, anthToolResultBlock{
|
|
Type: "tool_result",
|
|
ToolUseID: v.CallID,
|
|
Content: content,
|
|
IsError: v.IsError,
|
|
})
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func anthBuildToolResultContent(blocks []Content) (json.RawMessage, error) {
|
|
onlyText := true
|
|
var sb strings.Builder
|
|
for _, b := range blocks {
|
|
if tb, ok := b.(TextBlock); ok {
|
|
if sb.Len() > 0 {
|
|
sb.WriteString("\n")
|
|
}
|
|
sb.WriteString(tb.Text)
|
|
} else {
|
|
onlyText = false
|
|
break
|
|
}
|
|
}
|
|
if onlyText {
|
|
if sb.Len() == 0 {
|
|
return json.Marshal("")
|
|
}
|
|
return json.Marshal(sb.String())
|
|
}
|
|
// Array form: text + image blocks.
|
|
arr := make([]interface{}, 0, len(blocks))
|
|
for _, b := range blocks {
|
|
switch v := b.(type) {
|
|
case TextBlock:
|
|
arr = append(arr, anthTextBlock{Type: "text", Text: v.Text})
|
|
case ImageBlock:
|
|
arr = append(arr, anthImageBlock{
|
|
Type: "image",
|
|
Source: anthImageSource{
|
|
Type: "base64",
|
|
MediaType: v.MimeType,
|
|
Data: base64.StdEncoding.EncodeToString(v.Data),
|
|
},
|
|
})
|
|
}
|
|
}
|
|
return json.Marshal(arr)
|
|
}
|
|
|
|
// ---- streaming ----
|
|
|
|
func (c *anthropicClient) Stream(ctx context.Context, req Request) (<-chan Event, error) {
|
|
wire, err := c.buildRequest(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
body, err := json.Marshal(wire)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Optional debug dump: when $ZOT_DEBUG_ANTHROPIC is a file path
|
|
// we append every outgoing request body to it, one JSON object
|
|
// per line. Useful for diffing turn N vs turn N+1 to understand
|
|
// why the cache prefix isn't matching.
|
|
if dump := os.Getenv("ZOT_DEBUG_ANTHROPIC"); dump != "" {
|
|
if f, derr := os.OpenFile(dump, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o600); derr == nil {
|
|
_, _ = f.Write(body)
|
|
_, _ = f.Write([]byte{'\n'})
|
|
_ = f.Close()
|
|
}
|
|
}
|
|
|
|
httpReq, err := http.NewRequestWithContext(ctx, "POST", c.baseURL+"/v1/messages", bytes.NewReader(body))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
httpReq.Header.Set("content-type", "application/json")
|
|
httpReq.Header.Set("anthropic-version", anthropicAPIVersion)
|
|
if c.oauthTok != "" {
|
|
// Claude-Code-shaped request: identical headers and values as the
|
|
// official CLI. Any drift triggers Anthropic's anti-abuse check and
|
|
// rate-limits (or outright blocks) the request.
|
|
httpReq.Header.Set("accept", "application/json")
|
|
httpReq.Header.Set("authorization", "Bearer "+c.oauthTok)
|
|
httpReq.Header.Set("anthropic-beta", "claude-code-20250219,oauth-2025-04-20,fine-grained-tool-streaming-2025-05-14")
|
|
httpReq.Header.Set("anthropic-dangerous-direct-browser-access", "true")
|
|
httpReq.Header.Set("user-agent", "claude-cli/"+claudeCodeVersion)
|
|
httpReq.Header.Set("x-app", "cli")
|
|
// Remove x-api-key entirely by NOT setting it.
|
|
} else {
|
|
httpReq.Header.Set("accept", "text/event-stream")
|
|
httpReq.Header.Set("x-api-key", c.apiKey)
|
|
}
|
|
|
|
resp, err := c.http.Do(httpReq)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("anthropic: %w", err)
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
b, _ := io.ReadAll(resp.Body)
|
|
resp.Body.Close()
|
|
return nil, fmt.Errorf("anthropic: http %d: %s", resp.StatusCode, strings.TrimSpace(string(b)))
|
|
}
|
|
|
|
out := make(chan Event, 16)
|
|
go c.runStream(ctx, resp, req, out)
|
|
return out, nil
|
|
}
|
|
|
|
func (c *anthropicClient) runStream(ctx context.Context, resp *http.Response, req Request, out chan<- Event) {
|
|
defer close(out)
|
|
defer resp.Body.Close()
|
|
|
|
model, _ := FindModel("anthropic", req.Model)
|
|
out <- EventStart{Model: req.Model, Provider: "anthropic"}
|
|
|
|
raw := make(chan sseEvent, 16)
|
|
go readSSE(resp.Body, raw)
|
|
|
|
// State for assembling the assistant message. Blocks are indexed
|
|
// by their `index` field from Anthropic so we can preserve the
|
|
// interleaved order the model emitted them in (text may come
|
|
// before OR after tool_use; mixing both happens frequently).
|
|
type blockEntry struct {
|
|
kind string // "text" | "tool_use"
|
|
textBuf strings.Builder
|
|
toolCall ToolCallBlock
|
|
toolArgs strings.Builder
|
|
}
|
|
var (
|
|
blocks = map[int]*blockEntry{}
|
|
blockOrder []int // insertion order of indexes
|
|
activeIdx = -1
|
|
usage Usage
|
|
stop StopReason = StopEnd
|
|
finalErr error
|
|
)
|
|
_ = activeIdx // read-only indicator used for legacy parity
|
|
|
|
registerBlock := func(idx int, kind string) *blockEntry {
|
|
if be, ok := blocks[idx]; ok {
|
|
return be
|
|
}
|
|
be := &blockEntry{kind: kind}
|
|
blocks[idx] = be
|
|
blockOrder = append(blockOrder, idx)
|
|
return be
|
|
}
|
|
|
|
assembleMsg := func() Message {
|
|
content := []Content{}
|
|
for _, idx := range blockOrder {
|
|
be := blocks[idx]
|
|
switch be.kind {
|
|
case "text":
|
|
if be.textBuf.Len() > 0 {
|
|
content = append(content, TextBlock{Text: be.textBuf.String()})
|
|
}
|
|
case "tool_use":
|
|
tc := be.toolCall
|
|
args := be.toolArgs.String()
|
|
if args == "" {
|
|
args = "{}"
|
|
}
|
|
tc.Arguments = json.RawMessage(args)
|
|
content = append(content, tc)
|
|
}
|
|
}
|
|
return Message{Role: RoleAssistant, Content: content, Time: time.Now()}
|
|
}
|
|
|
|
sendDone := func() {
|
|
usage.CostUSD = ComputeCost(model, usage)
|
|
out <- EventUsage{Usage: usage}
|
|
out <- EventDone{Stop: stop, Err: finalErr, Message: assembleMsg()}
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
stop = StopAborted
|
|
finalErr = ctx.Err()
|
|
sendDone()
|
|
return
|
|
case ev, ok := <-raw:
|
|
if !ok {
|
|
sendDone()
|
|
return
|
|
}
|
|
// Parse event payload based on event: type.
|
|
var payload map[string]json.RawMessage
|
|
if err := json.Unmarshal([]byte(ev.Data), &payload); err != nil {
|
|
continue
|
|
}
|
|
switch ev.Event {
|
|
case "content_block_start":
|
|
var idx int
|
|
if b, ok := payload["index"]; ok {
|
|
_ = json.Unmarshal(b, &idx)
|
|
}
|
|
var block struct {
|
|
Type string `json:"type"`
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
Text string `json:"text"`
|
|
Input json.RawMessage `json:"input"`
|
|
}
|
|
if b, ok := payload["content_block"]; ok {
|
|
_ = json.Unmarshal(b, &block)
|
|
}
|
|
activeIdx = idx
|
|
switch block.Type {
|
|
case "tool_use":
|
|
name := block.Name
|
|
if c.oauthTok != "" {
|
|
name = fromClaudeCodeToolName(name, req.Tools)
|
|
}
|
|
be := registerBlock(idx, "tool_use")
|
|
be.toolCall = ToolCallBlock{ID: block.ID, Name: name}
|
|
out <- EventToolStart{ID: block.ID, Name: name}
|
|
case "text":
|
|
registerBlock(idx, "text")
|
|
case "thinking":
|
|
// not surfaced
|
|
}
|
|
case "content_block_delta":
|
|
var idx int
|
|
if b, ok := payload["index"]; ok {
|
|
_ = json.Unmarshal(b, &idx)
|
|
}
|
|
var d struct {
|
|
Type string `json:"type"`
|
|
Text string `json:"text"`
|
|
PartialJSON string `json:"partial_json"`
|
|
Thinking string `json:"thinking"`
|
|
}
|
|
if b, ok := payload["delta"]; ok {
|
|
_ = json.Unmarshal(b, &d)
|
|
}
|
|
switch d.Type {
|
|
case "text_delta":
|
|
if be, ok := blocks[idx]; ok && be.kind == "text" {
|
|
be.textBuf.WriteString(d.Text)
|
|
}
|
|
out <- EventTextDelta{Delta: d.Text}
|
|
case "input_json_delta":
|
|
if be, ok := blocks[idx]; ok && be.kind == "tool_use" {
|
|
be.toolArgs.WriteString(d.PartialJSON)
|
|
out <- EventToolArgs{ID: be.toolCall.ID, Delta: d.PartialJSON}
|
|
}
|
|
case "thinking_delta":
|
|
// Not surfaced in v1.
|
|
}
|
|
case "content_block_stop":
|
|
var idx int
|
|
if b, ok := payload["index"]; ok {
|
|
_ = json.Unmarshal(b, &idx)
|
|
}
|
|
if be, ok := blocks[idx]; ok && be.kind == "tool_use" {
|
|
out <- EventToolEnd{ID: be.toolCall.ID}
|
|
}
|
|
activeIdx = -1
|
|
case "message_start":
|
|
var m struct {
|
|
Message struct {
|
|
Usage struct {
|
|
InputTokens int `json:"input_tokens"`
|
|
OutputTokens int `json:"output_tokens"`
|
|
CacheReadInputTokens int `json:"cache_read_input_tokens"`
|
|
CacheCreationInputTokens int `json:"cache_creation_input_tokens"`
|
|
} `json:"usage"`
|
|
} `json:"message"`
|
|
}
|
|
_ = json.Unmarshal([]byte(ev.Data), &m)
|
|
// Anthropic sends cumulative values on message_start and
|
|
// again on message_delta (refreshed), so assign, don't
|
|
// accumulate. Accumulating doubles cache_creation_input
|
|
// which can be 50-70% of cost.
|
|
usage.InputTokens = m.Message.Usage.InputTokens
|
|
usage.OutputTokens = m.Message.Usage.OutputTokens
|
|
usage.CacheReadTokens = m.Message.Usage.CacheReadInputTokens
|
|
usage.CacheWriteTokens = m.Message.Usage.CacheCreationInputTokens
|
|
case "message_delta":
|
|
var m struct {
|
|
Delta struct {
|
|
StopReason string `json:"stop_reason"`
|
|
} `json:"delta"`
|
|
Usage struct {
|
|
InputTokens int `json:"input_tokens"`
|
|
OutputTokens int `json:"output_tokens"`
|
|
CacheReadInputTokens int `json:"cache_read_input_tokens"`
|
|
CacheCreationInputTokens int `json:"cache_creation_input_tokens"`
|
|
} `json:"usage"`
|
|
}
|
|
_ = json.Unmarshal([]byte(ev.Data), &m)
|
|
// Refresh usage from the latest cumulative totals
|
|
// Anthropic provides. Only apply non-zero fields in case
|
|
// a given delta only carries output tokens.
|
|
if m.Usage.InputTokens > 0 {
|
|
usage.InputTokens = m.Usage.InputTokens
|
|
}
|
|
if m.Usage.OutputTokens > 0 {
|
|
usage.OutputTokens = m.Usage.OutputTokens
|
|
}
|
|
if m.Usage.CacheReadInputTokens > 0 {
|
|
usage.CacheReadTokens = m.Usage.CacheReadInputTokens
|
|
}
|
|
if m.Usage.CacheCreationInputTokens > 0 {
|
|
usage.CacheWriteTokens = m.Usage.CacheCreationInputTokens
|
|
}
|
|
switch m.Delta.StopReason {
|
|
case "end_turn", "stop_sequence":
|
|
stop = StopEnd
|
|
case "max_tokens":
|
|
stop = StopLength
|
|
case "tool_use":
|
|
stop = StopToolUse
|
|
}
|
|
case "message_stop":
|
|
sendDone()
|
|
return
|
|
case "error":
|
|
var e struct {
|
|
Error struct {
|
|
Type string `json:"type"`
|
|
Message string `json:"message"`
|
|
} `json:"error"`
|
|
}
|
|
_ = json.Unmarshal([]byte(ev.Data), &e)
|
|
stop = StopError
|
|
finalErr = fmt.Errorf("anthropic %s: %s", e.Error.Type, e.Error.Message)
|
|
sendDone()
|
|
return
|
|
}
|
|
_ = activeIdx
|
|
}
|
|
}
|
|
}
|