mirror of
https://github.com/patriceckhart/zot.git
synced 2026-06-27 22:06:31 +02:00
Single Go module, four top-level packages under packages/. Import
paths become github.com/patriceckhart/zot/packages/<name>; downstream
consumers can depend on individual packages without pulling the rest.
Layout:
packages/provider/ LLM clients + catalog
packages/provider/auth/ credential store + OAuth + login server
packages/core/ agent loop, sessions, cost
packages/tui/ terminal toolkit + chat view
packages/agent/ CLI wiring, system prompt
extensions/ extproto/ modes/ tools/ skills/ swarm/
sdk/ (was pkg/zotcore, package renamed zotcore -> sdk)
ext/ (was pkg/zotext, package renamed zotext -> ext)
internal/ and pkg/ removed. The internal/assets logo moved into
packages/provider/auth/assets.
Public Go SDK identifiers renamed:
pkg/zotcore (package zotcore) -> packages/agent/sdk (package sdk)
pkg/zotext (package zotext) -> packages/agent/ext (package ext)
This breaks Go-based extensions and embedders; the JSON wire protocol
for extensions and RPC is unchanged, so non-Go extensions, already-
built extension binaries, and zot rpc consumers are unaffected.
Docs, examples, and the built-in write-zot-extension skill updated
for the new paths and identifiers. Shadow-bug fixes in code samples
(ext := ext.New -> e := ext.New).
789 lines
24 KiB
Go
789 lines
24 KiB
Go
package core
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/patriceckhart/zot/packages/provider"
|
|
)
|
|
|
|
// Session is a JSONL-backed conversation transcript tied to a cwd.
|
|
type Session struct {
|
|
ID string
|
|
Path string
|
|
Meta SessionMeta
|
|
writer *os.File
|
|
buf *bufio.Writer
|
|
|
|
// freshFile is true when the file was created by NewSession (this
|
|
// process owns it) and false when OpenSession reopened an existing
|
|
// transcript. Used by Close() to delete the file if the run never
|
|
// appended any messages — prevents a flood of empty session files
|
|
// from sessions the user opens then exits without prompting.
|
|
freshFile bool
|
|
|
|
// messagesAppended counts AppendMessage calls. Combined with
|
|
// freshFile it tells Close() whether the session left any content
|
|
// worth keeping.
|
|
messagesAppended int
|
|
}
|
|
|
|
// SessionMeta is written as the first line of every session file.
|
|
type SessionMeta struct {
|
|
ID string `json:"id"`
|
|
CWD string `json:"cwd"`
|
|
Model string `json:"model"`
|
|
Provider string `json:"provider"`
|
|
Started time.Time `json:"started"`
|
|
Version string `json:"version"`
|
|
Title string `json:"title,omitempty"`
|
|
|
|
// Parent is the ID of the session this one was forked from, or
|
|
// empty for top-level sessions. The tree picker walks parents
|
|
// upward and sibling files (same cwd dir, same parent ID)
|
|
// laterally to render the branch topology.
|
|
Parent string `json:"parent,omitempty"`
|
|
// ForkPoint is the 0-indexed message position within the parent
|
|
// transcript where this branch diverges. Messages 0..ForkPoint-1
|
|
// are copied from the parent verbatim; the user's next turn on
|
|
// the child session continues from there.
|
|
ForkPoint int `json:"fork_point,omitempty"`
|
|
}
|
|
|
|
// sessionLine is the on-disk row type. Message is kept as a raw
|
|
// JSON message on reads (because Content is an interface slice that
|
|
// the default unmarshaler cannot reconstruct); it is written with a
|
|
// regular provider.Message value.
|
|
type sessionLine struct {
|
|
Type string `json:"type"`
|
|
Meta *SessionMeta `json:"meta,omitempty"`
|
|
Message *provider.Message `json:"message,omitempty"`
|
|
Messages []provider.Message `json:"messages,omitempty"`
|
|
Usage *provider.Usage `json:"usage,omitempty"`
|
|
Cumulative *provider.Usage `json:"cumulative,omitempty"`
|
|
}
|
|
|
|
type sessionLineHead struct {
|
|
Type string `json:"type"`
|
|
}
|
|
|
|
// SessionsDir returns the per-cwd sessions directory under root.
|
|
func SessionsDir(root, cwd string) string {
|
|
sum := sha256.Sum256([]byte(cwd))
|
|
short := hex.EncodeToString(sum[:8])
|
|
return filepath.Join(root, "sessions", short)
|
|
}
|
|
|
|
// NewSession creates and opens a new session file under
|
|
// SessionsDir(root, cwd) with an autogenerated, time-stamped name.
|
|
func NewSession(root, cwd, providerName, model, version string) (*Session, error) {
|
|
dir := SessionsDir(root, cwd)
|
|
if err := os.MkdirAll(dir, 0o755); err != nil {
|
|
return nil, err
|
|
}
|
|
id := uuid.NewString()
|
|
name := fmt.Sprintf("%s-%s.jsonl", time.Now().UTC().Format("20060102-150405"), id[:8])
|
|
p := filepath.Join(dir, name)
|
|
return newSessionAt(p, cwd, providerName, model, version)
|
|
}
|
|
|
|
// NewSessionAtPath creates a session at an explicit file path. Used
|
|
// by callers (notably the swarm-agent child) that need the session
|
|
// file to live at a path chosen by their parent rather than under
|
|
// SessionsDir. Returns an error if the file already exists — use
|
|
// OpenSession for that case.
|
|
func NewSessionAtPath(path, cwd, providerName, model, version string) (*Session, error) {
|
|
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
|
|
return nil, err
|
|
}
|
|
return newSessionAt(path, cwd, providerName, model, version)
|
|
}
|
|
|
|
// newSessionAt is the shared implementation. Both NewSession and
|
|
// NewSessionAtPath funnel through here so the meta-line layout,
|
|
// freshFile bookkeeping, and id format stay identical.
|
|
func newSessionAt(p, cwd, providerName, model, version string) (*Session, error) {
|
|
id := uuid.NewString()
|
|
f, err := os.OpenFile(p, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o644)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s := &Session{
|
|
ID: id,
|
|
Path: p,
|
|
Meta: SessionMeta{ID: id, CWD: cwd, Provider: providerName, Model: model, Started: time.Now().UTC(), Version: version},
|
|
writer: f,
|
|
buf: bufio.NewWriter(f),
|
|
freshFile: true,
|
|
}
|
|
if err := s.writeLine(sessionLine{Type: "meta", Meta: &s.Meta}); err != nil {
|
|
f.Close()
|
|
return nil, err
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
func forEachJSONLLine(r io.Reader, fn func([]byte) error) error {
|
|
br := bufio.NewReader(r)
|
|
for {
|
|
line, err := br.ReadBytes('\n')
|
|
if len(line) > 0 {
|
|
line = bytes.TrimRight(line, "\r\n")
|
|
if len(line) > 0 {
|
|
if ferr := fn(line); ferr != nil {
|
|
return ferr
|
|
}
|
|
}
|
|
}
|
|
if err == io.EOF {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// SessionUsage returns the most recent cumulative usage row stored in
|
|
// a session file. Sessions append one usage row per completed turn; the
|
|
// latest row's cumulative field is the session total. Missing usage rows
|
|
// are valid for old/empty sessions and return the zero value.
|
|
func SessionUsage(path string) (provider.Usage, error) {
|
|
cum, _, err := SessionUsageDetail(path)
|
|
return cum, err
|
|
}
|
|
|
|
// SessionUsageDetail returns the latest cumulative usage and the
|
|
// per-turn usage of the final completed turn. The per-turn row drives
|
|
// the live "context used" gauge in the status bar (input + cache
|
|
// approximates the prompt size the model just saw), letting the TUI
|
|
// rehydrate the gauge on resume instead of starting at 0% until the
|
|
// next turn lands.
|
|
func SessionUsageDetail(path string) (cumulative, lastTurn provider.Usage, err error) {
|
|
f, ferr := os.Open(path)
|
|
if ferr != nil {
|
|
return provider.Usage{}, provider.Usage{}, ferr
|
|
}
|
|
defer f.Close()
|
|
|
|
// Some historical sessions logged the per-turn `usage` field as a copy
|
|
// of `cumulative` instead of the true delta. To recover an accurate
|
|
// last-turn snapshot (used by the status-bar context gauge on resume),
|
|
// we always derive lastTurn from the delta between the final two
|
|
// cumulative rows. For prompt-size purposes, cache_read/cache_write
|
|
// reflect the most recent prompt directly, so we take those from the
|
|
// final cumulative row as-is rather than as a delta.
|
|
var prevCum provider.Usage
|
|
var haveCum bool
|
|
if ierr := forEachJSONLLine(f, func(line []byte) error {
|
|
var head sessionLineHead
|
|
if err := json.Unmarshal(line, &head); err != nil || head.Type != "usage" {
|
|
return nil
|
|
}
|
|
var row struct {
|
|
Cumulative provider.Usage `json:"cumulative"`
|
|
}
|
|
if err := json.Unmarshal(line, &row); err != nil {
|
|
return nil
|
|
}
|
|
if haveCum {
|
|
prevCum = cumulative
|
|
}
|
|
cumulative = row.Cumulative
|
|
haveCum = true
|
|
return nil
|
|
}); ierr != nil {
|
|
return provider.Usage{}, provider.Usage{}, ierr
|
|
}
|
|
if haveCum {
|
|
// input/output are monotonic totals -> per-turn = delta.
|
|
lastTurn.InputTokens = nonNegDelta(cumulative.InputTokens, prevCum.InputTokens)
|
|
lastTurn.OutputTokens = nonNegDelta(cumulative.OutputTokens, prevCum.OutputTokens)
|
|
// cache_read/write on the final row already represent the last prompt's
|
|
// cache hit/creation, not a running total of bytes; use directly.
|
|
lastTurn.CacheReadTokens = cumulative.CacheReadTokens - prevCum.CacheReadTokens
|
|
if lastTurn.CacheReadTokens < 0 {
|
|
lastTurn.CacheReadTokens = cumulative.CacheReadTokens
|
|
}
|
|
lastTurn.CacheWriteTokens = cumulative.CacheWriteTokens - prevCum.CacheWriteTokens
|
|
if lastTurn.CacheWriteTokens < 0 {
|
|
lastTurn.CacheWriteTokens = cumulative.CacheWriteTokens
|
|
}
|
|
lastTurn.CostUSD = cumulative.CostUSD - prevCum.CostUSD
|
|
if lastTurn.CostUSD < 0 {
|
|
lastTurn.CostUSD = 0
|
|
}
|
|
}
|
|
return cumulative, lastTurn, nil
|
|
}
|
|
|
|
func nonNegDelta(cur, prev int) int {
|
|
if cur < prev {
|
|
return cur
|
|
}
|
|
return cur - prev
|
|
}
|
|
|
|
// OpenSession opens an existing session for appending.
|
|
func OpenSession(path string) (*Session, []provider.Message, error) {
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
defer f.Close()
|
|
|
|
var meta SessionMeta
|
|
var messages []provider.Message
|
|
if err := forEachJSONLLine(f, func(line []byte) error {
|
|
var head sessionLineHead
|
|
if err := json.Unmarshal(line, &head); err != nil {
|
|
return nil
|
|
}
|
|
switch head.Type {
|
|
case "meta":
|
|
var row struct {
|
|
Meta SessionMeta `json:"meta"`
|
|
}
|
|
if err := json.Unmarshal(line, &row); err == nil {
|
|
meta = row.Meta
|
|
}
|
|
case "message":
|
|
if msg, err := hydrateMessage(line); err == nil && len(msg.Content) > 0 {
|
|
messages = append(messages, msg)
|
|
}
|
|
case "compaction":
|
|
if compacted, err := hydrateCompaction(line); err == nil {
|
|
messages = compacted
|
|
}
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
messages = repairToolUseResultPairs(messages)
|
|
out, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0o644)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
s := &Session{ID: meta.ID, Path: path, Meta: meta, writer: out, buf: bufio.NewWriter(out)}
|
|
return s, messages, nil
|
|
}
|
|
|
|
// repairToolUseResultPairs walks a restored transcript and
|
|
// synthesises stub tool_result blocks for any assistant
|
|
// tool_use blocks that aren't paired with a matching result in
|
|
// the next message. Anthropic (and OpenAI via the responses API)
|
|
// reject any request whose transcript leaves a tool_use without
|
|
// its matching tool_result immediately after, with errors like:
|
|
//
|
|
// messages.8: `tool_use` ids were found without `tool_result`
|
|
// blocks immediately after
|
|
//
|
|
// Corruption gets into the transcript two ways we know of:
|
|
//
|
|
// - Older zot builds that persisted the assistant tool_use row
|
|
// before the tool_result row, then crashed between the two.
|
|
// - Abort paths in older builds that didn't drop the mid-turn
|
|
// assistant message cleanly.
|
|
//
|
|
// Rather than change runtime semantics (which would risk hiding a
|
|
// real bug), we scrub on load: any unmatched tool_use gets a stub
|
|
// tool_result injected as a RoleTool message so the next
|
|
// outbound request passes the provider's validity check. The stub
|
|
// reads "tool call was aborted; no result recorded." so the
|
|
// model can see what happened and decide whether to retry.
|
|
//
|
|
// Runs once per OpenSession call. No cost on the hot path.
|
|
func repairToolUseResultPairs(msgs []provider.Message) []provider.Message {
|
|
if len(msgs) == 0 {
|
|
return msgs
|
|
}
|
|
out := make([]provider.Message, 0, len(msgs)+2)
|
|
for i, m := range msgs {
|
|
out = append(out, m)
|
|
if m.Role != provider.RoleAssistant {
|
|
continue
|
|
}
|
|
// Collect tool_use ids in this assistant message.
|
|
var ids []string
|
|
for _, c := range m.Content {
|
|
if tc, ok := c.(provider.ToolCallBlock); ok {
|
|
ids = append(ids, tc.ID)
|
|
}
|
|
}
|
|
if len(ids) == 0 {
|
|
continue
|
|
}
|
|
// Look at the next message (if any) and collect tool_result
|
|
// CallIDs it covers.
|
|
have := map[string]bool{}
|
|
if i+1 < len(msgs) && msgs[i+1].Role == provider.RoleTool {
|
|
for _, c := range msgs[i+1].Content {
|
|
if tr, ok := c.(provider.ToolResultBlock); ok {
|
|
have[tr.CallID] = true
|
|
}
|
|
}
|
|
}
|
|
// Build stubs for any missing id.
|
|
var stubs []provider.Content
|
|
for _, id := range ids {
|
|
if have[id] {
|
|
continue
|
|
}
|
|
stubs = append(stubs, provider.ToolResultBlock{
|
|
CallID: id,
|
|
Content: []provider.Content{provider.TextBlock{Text: "tool call was aborted; no result recorded."}},
|
|
IsError: true,
|
|
})
|
|
}
|
|
if len(stubs) == 0 {
|
|
continue
|
|
}
|
|
// Merge into the next tool-role message if present,
|
|
// otherwise insert a synthetic one right after the
|
|
// assistant message. Merging keeps the tool-role row
|
|
// count stable; inserting handles the common case where
|
|
// no tool message was persisted at all.
|
|
if i+1 < len(msgs) && msgs[i+1].Role == provider.RoleTool {
|
|
msgs[i+1].Content = append(msgs[i+1].Content, stubs...)
|
|
// We already appended m to out; the modified next
|
|
// message will be appended on the following iteration.
|
|
continue
|
|
}
|
|
out = append(out, provider.Message{
|
|
Role: provider.RoleTool,
|
|
Content: stubs,
|
|
Time: m.Time,
|
|
})
|
|
}
|
|
return out
|
|
}
|
|
|
|
// LatestSession returns the most recent session file for cwd, or "".
|
|
func LatestSession(root, cwd string) string {
|
|
paths := ListSessions(root, cwd)
|
|
if len(paths) == 0 {
|
|
return ""
|
|
}
|
|
return paths[0]
|
|
}
|
|
|
|
// SessionSummary describes one on-disk session at a glance for UI pickers.
|
|
type SessionSummary struct {
|
|
Path string
|
|
Started time.Time
|
|
Model string
|
|
Provider string
|
|
MessageCount int
|
|
FirstUserText string
|
|
TotalCost float64
|
|
Title string
|
|
}
|
|
|
|
// RenameSession updates the title field in the session's meta line.
|
|
// It rewrites the first line of the file (the meta line) with the
|
|
// updated title.
|
|
// RenameSession appends a rename line to the session file. This is
|
|
// safe even for the currently active session because it opens the
|
|
// file independently and appends (doesn't rewrite).
|
|
func RenameSession(path, title string) error {
|
|
f, err := os.OpenFile(path, os.O_WRONLY|os.O_APPEND, 0o644)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
line, _ := json.Marshal(map[string]string{"type": "rename", "title": title})
|
|
line = append(line, '\n')
|
|
_, err = f.Write(line)
|
|
return err
|
|
}
|
|
|
|
// DescribeSessions returns lightweight summaries for every session in
|
|
// cwd, newest first. Parses only the first few lines and the last usage
|
|
// line so it's cheap to run on every dialog open.
|
|
func DescribeSessions(root, cwd string) []SessionSummary {
|
|
paths := ListSessions(root, cwd)
|
|
summaries := make([]SessionSummary, 0, len(paths))
|
|
for _, p := range paths {
|
|
summaries = append(summaries, describeSession(p))
|
|
}
|
|
return summaries
|
|
}
|
|
|
|
func describeSession(path string) SessionSummary {
|
|
s := SessionSummary{Path: path}
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
return s
|
|
}
|
|
defer f.Close()
|
|
_ = forEachJSONLLine(f, func(line []byte) error {
|
|
var head sessionLineHead
|
|
if err := json.Unmarshal(line, &head); err != nil {
|
|
return nil
|
|
}
|
|
switch head.Type {
|
|
case "meta":
|
|
var row struct {
|
|
Meta SessionMeta `json:"meta"`
|
|
}
|
|
if err := json.Unmarshal(line, &row); err == nil {
|
|
s.Started = row.Meta.Started
|
|
s.Model = row.Meta.Model
|
|
s.Provider = row.Meta.Provider
|
|
s.Title = row.Meta.Title
|
|
}
|
|
case "message":
|
|
s.MessageCount++
|
|
if s.FirstUserText == "" {
|
|
s.FirstUserText = firstUserText(line)
|
|
}
|
|
case "compaction":
|
|
if compacted, err := hydrateCompaction(line); err == nil {
|
|
s.MessageCount = len(compacted)
|
|
if s.FirstUserText == "" && len(compacted) > 0 {
|
|
s.FirstUserText = firstTextFromMessage(compacted[0])
|
|
}
|
|
}
|
|
case "rename":
|
|
var row struct {
|
|
Title string `json:"title"`
|
|
}
|
|
if err := json.Unmarshal(line, &row); err == nil && row.Title != "" {
|
|
s.Title = row.Title
|
|
}
|
|
case "usage":
|
|
var row struct {
|
|
Cumulative provider.Usage `json:"cumulative"`
|
|
}
|
|
if err := json.Unmarshal(line, &row); err == nil {
|
|
s.TotalCost = row.Cumulative.CostUSD
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
return s
|
|
}
|
|
|
|
func firstUserText(line []byte) string {
|
|
var row struct {
|
|
Message struct {
|
|
Role string `json:"role"`
|
|
Content []struct {
|
|
Text string `json:"text"`
|
|
} `json:"content"`
|
|
} `json:"message"`
|
|
}
|
|
if err := json.Unmarshal(line, &row); err != nil {
|
|
return ""
|
|
}
|
|
if row.Message.Role != "user" {
|
|
return ""
|
|
}
|
|
for _, c := range row.Message.Content {
|
|
if c.Text != "" {
|
|
return c.Text
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func firstTextFromMessage(msg provider.Message) string {
|
|
for _, c := range msg.Content {
|
|
if tb, ok := c.(provider.TextBlock); ok && tb.Text != "" {
|
|
return tb.Text
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// PruneEmptySessions deletes session files in cwd's session directory
|
|
// that contain only a meta line (no messages were ever appended).
|
|
// Cleans up the backlog of empty stubs created by old zot versions
|
|
// that wrote a meta line at NewSession time and never followed up.
|
|
// Errors are swallowed; the caller treats this as best-effort.
|
|
func PruneEmptySessions(root, cwd string) {
|
|
dir := SessionsDir(root, cwd)
|
|
entries, err := os.ReadDir(dir)
|
|
if err != nil {
|
|
return
|
|
}
|
|
for _, e := range entries {
|
|
if e.IsDir() || !strings.HasSuffix(e.Name(), ".jsonl") {
|
|
continue
|
|
}
|
|
p := filepath.Join(dir, e.Name())
|
|
if sessionHasNoMessages(p) {
|
|
_ = os.Remove(p)
|
|
}
|
|
}
|
|
}
|
|
|
|
// sessionHasNoMessages returns true when the file at path contains
|
|
// no lines of type "message". Meta-only / usage-only files count as
|
|
// empty. Used by PruneEmptySessions and the Describe path.
|
|
func sessionHasNoMessages(path string) bool {
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
defer f.Close()
|
|
hasMessage := false
|
|
_ = forEachJSONLLine(f, func(line []byte) error {
|
|
var head sessionLineHead
|
|
if err := json.Unmarshal(line, &head); err != nil {
|
|
return nil
|
|
}
|
|
if head.Type == "message" {
|
|
hasMessage = true
|
|
return io.EOF
|
|
}
|
|
return nil
|
|
})
|
|
return !hasMessage
|
|
}
|
|
|
|
// ListSessions returns session file paths for cwd, most-recently-
|
|
// modified first. Sorting on filesystem ModTime instead of the
|
|
// timestamp embedded in the filename means a long-running session
|
|
// the user actually returned to recently floats to the top of
|
|
// /sessions, /continue, and the resume picker, even when it was
|
|
// originally created days earlier than newer but idle sessions.
|
|
// Files with identical ModTime fall back to filename desc so the
|
|
// order stays stable across calls.
|
|
func ListSessions(root, cwd string) []string {
|
|
dir := SessionsDir(root, cwd)
|
|
entries, err := os.ReadDir(dir)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
type rec struct {
|
|
path string
|
|
mod time.Time
|
|
}
|
|
var files []rec
|
|
for _, e := range entries {
|
|
if e.IsDir() || !strings.HasSuffix(e.Name(), ".jsonl") {
|
|
continue
|
|
}
|
|
p := filepath.Join(dir, e.Name())
|
|
info, err := e.Info()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
files = append(files, rec{path: p, mod: info.ModTime()})
|
|
}
|
|
sort.Slice(files, func(i, j int) bool {
|
|
if !files[i].mod.Equal(files[j].mod) {
|
|
return files[i].mod.After(files[j].mod)
|
|
}
|
|
return files[i].path > files[j].path
|
|
})
|
|
out := make([]string, 0, len(files))
|
|
for _, r := range files {
|
|
out = append(out, r.path)
|
|
}
|
|
return out
|
|
}
|
|
|
|
// AppendMessage writes a message to the session.
|
|
func (s *Session) AppendMessage(m provider.Message) error {
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
if err := s.writeLine(sessionLine{Type: "message", Message: &m}); err != nil {
|
|
return err
|
|
}
|
|
s.messagesAppended++
|
|
return nil
|
|
}
|
|
|
|
// AppendCompaction writes a checkpoint that replaces all earlier
|
|
// transcript rows when the session is resumed. The old rows remain in
|
|
// the JSONL file for audit/export, while loaders use the latest
|
|
// compaction row as the effective transcript.
|
|
func (s *Session) AppendCompaction(messages []provider.Message) error {
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
if err := s.writeLine(sessionLine{Type: "compaction", Messages: messages}); err != nil {
|
|
return err
|
|
}
|
|
s.messagesAppended = len(messages)
|
|
return nil
|
|
}
|
|
|
|
// UpdateModel records a provider/model switch in the session file.
|
|
// The reader keeps the most recent meta entry, so the session resumes
|
|
// with the updated model.
|
|
func (s *Session) UpdateModel(providerName, model string) error {
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
s.Meta.Provider = providerName
|
|
s.Meta.Model = model
|
|
return s.writeLine(sessionLine{Type: "meta", Meta: &s.Meta})
|
|
}
|
|
|
|
// AppendUsage writes a usage row to the session.
|
|
func (s *Session) AppendUsage(u, cum provider.Usage) error {
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
return s.writeLine(sessionLine{Type: "usage", Usage: &u, Cumulative: &cum})
|
|
}
|
|
|
|
// Close flushes and closes the session file. If the session was
|
|
// freshly created in this process and never had any messages
|
|
// appended (the user opened zot, looked around, and exited without
|
|
// prompting), the file is deleted on close so the sessions list
|
|
// doesn't fill up with empty meta-only stubs.
|
|
func (s *Session) Close() error {
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
flushErr := s.buf.Flush()
|
|
closeErr := s.writer.Close()
|
|
if s.freshFile && s.messagesAppended == 0 {
|
|
// Best-effort cleanup. We deliberately don't propagate the
|
|
// remove error: if it fails (file already gone, perms changed)
|
|
// the worst case is one stale empty file in the listing.
|
|
_ = os.Remove(s.Path)
|
|
}
|
|
if flushErr != nil {
|
|
return flushErr
|
|
}
|
|
return closeErr
|
|
}
|
|
|
|
func (s *Session) writeLine(row sessionLine) error {
|
|
b, err := json.Marshal(row)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, err := s.buf.Write(b); err != nil {
|
|
return err
|
|
}
|
|
if err := s.buf.WriteByte('\n'); err != nil {
|
|
return err
|
|
}
|
|
return s.buf.Flush()
|
|
}
|
|
|
|
// ---- content (de)serialization ----
|
|
//
|
|
// provider.Content is an interface; encoding/json drops type information.
|
|
// We persist messages by reading the raw "message" object back and
|
|
// rebuilding Content from discriminated fields.
|
|
|
|
func hydrateCompaction(lineBytes []byte) ([]provider.Message, error) {
|
|
var row struct {
|
|
Messages []json.RawMessage `json:"messages"`
|
|
}
|
|
if err := json.Unmarshal(lineBytes, &row); err != nil {
|
|
return nil, err
|
|
}
|
|
messages := make([]provider.Message, 0, len(row.Messages))
|
|
for _, raw := range row.Messages {
|
|
msg, err := hydrateMessageObject(raw)
|
|
if err == nil && len(msg.Content) > 0 {
|
|
messages = append(messages, msg)
|
|
}
|
|
}
|
|
return messages, nil
|
|
}
|
|
|
|
func hydrateMessage(lineBytes []byte) (provider.Message, error) {
|
|
var row struct {
|
|
Message json.RawMessage `json:"message"`
|
|
}
|
|
if err := json.Unmarshal(lineBytes, &row); err != nil {
|
|
return provider.Message{}, err
|
|
}
|
|
return hydrateMessageObject(row.Message)
|
|
}
|
|
|
|
func hydrateMessageObject(rawMessage []byte) (provider.Message, error) {
|
|
var row struct {
|
|
Role provider.Role `json:"role"`
|
|
Content []json.RawMessage `json:"content"`
|
|
Time time.Time `json:"time"`
|
|
Meta map[string]string `json:"meta,omitempty"`
|
|
}
|
|
if err := json.Unmarshal(rawMessage, &row); err != nil {
|
|
return provider.Message{}, err
|
|
}
|
|
msg := provider.Message{Role: row.Role, Time: row.Time, Meta: row.Meta}
|
|
for _, raw := range row.Content {
|
|
var head struct {
|
|
Text string `json:"text"`
|
|
MimeType string `json:"mime_type"`
|
|
Data []byte `json:"data"`
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
CallID string `json:"call_id"`
|
|
ReasoningID string `json:"reasoning_id"`
|
|
Summary string `json:"summary"`
|
|
Encrypted string `json:"encrypted_content"`
|
|
// ToolCallBlock also has Arguments, ToolResultBlock has Content + IsError
|
|
}
|
|
if err := json.Unmarshal(raw, &head); err != nil {
|
|
continue
|
|
}
|
|
// Discriminate by presence of fields.
|
|
switch {
|
|
case head.ReasoningID != "" || head.Encrypted != "":
|
|
msg.Content = append(msg.Content, provider.ReasoningBlock{
|
|
ID: head.ReasoningID,
|
|
Summary: head.Summary,
|
|
Encrypted: head.Encrypted,
|
|
})
|
|
case head.Name != "" && head.ID != "":
|
|
var tc struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
Arguments json.RawMessage `json:"arguments"`
|
|
}
|
|
_ = json.Unmarshal(raw, &tc)
|
|
msg.Content = append(msg.Content, provider.ToolCallBlock{ID: tc.ID, Name: tc.Name, Arguments: tc.Arguments})
|
|
case head.CallID != "":
|
|
var tr struct {
|
|
CallID string `json:"call_id"`
|
|
Content []json.RawMessage `json:"content"`
|
|
IsError bool `json:"is_error"`
|
|
}
|
|
_ = json.Unmarshal(raw, &tr)
|
|
block := provider.ToolResultBlock{CallID: tr.CallID, IsError: tr.IsError}
|
|
for _, c := range tr.Content {
|
|
var inner struct {
|
|
Text string `json:"text"`
|
|
MimeType string `json:"mime_type"`
|
|
Data []byte `json:"data"`
|
|
}
|
|
_ = json.Unmarshal(c, &inner)
|
|
if inner.MimeType != "" {
|
|
block.Content = append(block.Content, provider.ImageBlock{MimeType: inner.MimeType, Data: inner.Data})
|
|
} else {
|
|
block.Content = append(block.Content, provider.TextBlock{Text: inner.Text})
|
|
}
|
|
}
|
|
msg.Content = append(msg.Content, block)
|
|
case head.MimeType != "":
|
|
msg.Content = append(msg.Content, provider.ImageBlock{MimeType: head.MimeType, Data: head.Data})
|
|
default:
|
|
msg.Content = append(msg.Content, provider.TextBlock{Text: head.Text})
|
|
}
|
|
}
|
|
return msg, nil
|
|
}
|