refactor: extract generic BotAdapter interface from Telegram bot

Extract protocol-agnostic bot logic into a new packages/agent/modes/bot
package, enabling future messaging backends (Discord, Slack, etc.) to
reuse turn queueing, agent prompting, and command dispatch.

New package modes/bot:
- adapter.go: BotAdapter interface + InboundMessage/Command types
- runner.go: Generic Runner with turn queue, drainQueue, runTurn
- status.go: StatusSnapshot + FormatStatus (moved from telegram)
- commands.go: IsStopCommand (moved from telegram)

Telegram adapter (modes/telegram/adapter.go):
- Implements BotAdapter for Telegram protocol
- Owns polling, pairing, user filtering, image download
- ChannelID encoded as string (fmt.Sprintf(%d, chatID))

Refactored telegram/bot.go:
- Bot type now a thin wrapper (deprecated, kept for compatibility)
- Utility functions (chunkMessage, isImageMIME, guessImageMIME) retained

Updated botcmd.go:
- Constructs adapter + runner directly instead of Bot wrapper
- Uses runner.UpdateRuntimeConfig for credential refresh

Shims in telegram/status.go and telegram/commands.go re-export from
bot package for backward compatibility with interactive.go.

All tests pass. No import cycles: modes/bot does not import modes/telegram.
This commit is contained in:
mi-skam 2026-06-26 07:39:35 +02:00
parent b325477870
commit 741c643b1b
No known key found for this signature in database
9 changed files with 730 additions and 484 deletions

View file

@ -15,6 +15,7 @@ import (
"syscall"
"time"
"github.com/patriceckhart/zot/packages/agent/modes/bot"
"github.com/patriceckhart/zot/packages/agent/modes/telegram"
"github.com/patriceckhart/zot/packages/core"
)
@ -362,18 +363,20 @@ func botRun(rawTail []string, version string) error {
}
}
var b *telegram.Bot
b = &telegram.Bot{
Client: telegram.NewClient(cfg.BotToken),
Agent: agent,
Config: cfg,
// Construct the Telegram adapter and generic runner.
adapter := telegram.NewAdapter(
telegram.NewClient(cfg.BotToken),
&cfg,
func(c telegram.Config) error {
return telegram.SaveConfig(ZotHome(), c)
},
)
var runner *bot.Runner
runner = bot.NewRunner(adapter, agent, bot.Config{
ZotHome: ZotHome(),
Provider: resolved.Provider,
AuthMethod: resolved.AuthMethod,
CWD: args.CWD,
Save: func(c telegram.Config) error {
return telegram.SaveConfig(ZotHome(), c)
},
RefreshCreds: func() error {
// Re-run the same resolver the tui uses so we pick up
// refreshed oauth tokens, re-logins, and model switches.
@ -385,12 +388,10 @@ func botRun(rawTail []string, version string) error {
}
agent.Client = next.NewClient()
agent.Model = next.Model
b.Provider = next.Provider
b.AuthMethod = next.AuthMethod
b.CWD = next.CWD
runner.UpdateRuntimeConfig(next.Provider, next.AuthMethod, next.CWD)
return nil
},
}
})
// Record our pid so `zot telegram-bot status` / `zot telegram-bot stop` can find us,
// regardless of whether we were started directly or via `bot start`.
@ -407,7 +408,7 @@ func botRun(rawTail []string, version string) error {
cancel()
}()
defer cancel()
return b.Run(ctx)
return runner.Run(ctx)
}
// openOrCreateSessionForBot reuses the same logic as interactive mode

View file

@ -0,0 +1,51 @@
// Package bot provides a protocol-agnostic runner for long-running bot
// modes. Concrete transports (Telegram, Discord, …) implement the
// BotAdapter interface; the Runner handles turn queueing, agent
// prompting, command dispatch, and credential refresh.
package bot
import (
"context"
"github.com/patriceckhart/zot/packages/provider"
)
// InboundMessage is a protocol-normalised message from a user.
type InboundMessage struct {
ChannelID string // opaque; adapter owns encoding (e.g. fmt.Sprintf("%d", chatID))
MessageID string // optional reply anchor
Text string
Images []provider.ImageBlock
}
// Command is a built-in bot command that bypasses the agent.
type Command int
const (
CmdStart Command = iota // first-time pairing / welcome
CmdHelp // usage information
CmdStatus // agent/provider state
CmdStop // cancel the active turn
)
// BotAdapter is the transport layer a concrete protocol must implement.
// The Runner calls these methods; it never touches protocol types directly.
type BotAdapter interface {
// Run drives inbound polling; calls handler for normal messages and
// commandHandler for built-in commands. Blocks until ctx is done.
Run(ctx context.Context,
handler func(InboundMessage),
commandHandler func(Command, InboundMessage),
) error
// Send delivers a reply. The adapter chunks to protocol limits.
Send(ctx context.Context, channelID, text string) error
// IndicateWorking fires a "typing…" signal; returns a stop func.
// Return a no-op if the protocol doesn't support it.
IndicateWorking(ctx context.Context, channelID string) (stop func())
// StatusText appends protocol-specific info to /status replies
// (e.g. "@botname"). Return "" if there is nothing to add.
StatusText() string
}

View file

@ -0,0 +1,11 @@
package bot
import "strings"
// IsStopCommand reports whether text should abort the active turn.
// Users often type plain "stop" rather than bot-style "/stop"; keep
// this intentionally narrow so normal prompts like "stop doing X"
// still go to the agent.
func IsStopCommand(text string) bool {
return strings.EqualFold(strings.TrimSpace(text), "stop")
}

View file

@ -0,0 +1,254 @@
package bot
import (
"context"
"fmt"
"io"
"os"
"strings"
"sync"
"github.com/patriceckhart/zot/packages/core"
"github.com/patriceckhart/zot/packages/provider"
)
// stderr is a tiny hook so tests can redirect bot logging.
var stderr = func() io.Writer { return os.Stderr }
// Config holds runner-level settings that are protocol-independent.
type Config struct {
ZotHome string
Provider string
AuthMethod string
CWD string
RefreshCreds func() error
}
// queuedTurn is an inbound message waiting to become a prompt.
type queuedTurn struct {
channelID string
messageID string
prompt string
images []provider.ImageBlock
}
// Runner is the protocol-agnostic bot engine. It owns the turn queue,
// dispatches prompts to the agent, and streams replies back through
// the BotAdapter.
type Runner struct {
agent *core.Agent
adapter BotAdapter
cfg Config
mu sync.Mutex
busy bool
activeCtx context.CancelFunc
queue []queuedTurn
lastCtxInput int
runCtx context.Context // set at Run entry; used by goroutines
}
// NewRunner creates a Runner wired to the given adapter and agent.
func NewRunner(adapter BotAdapter, agent *core.Agent, cfg Config) *Runner {
return &Runner{
agent: agent,
adapter: adapter,
cfg: cfg,
}
}
// UpdateRuntimeConfig updates provider/auth/cwd at runtime (e.g. after
// credential refresh). This is thread-safe.
func (r *Runner) UpdateRuntimeConfig(provider, authMethod, cwd string) {
r.mu.Lock()
defer r.mu.Unlock()
r.cfg.Provider = provider
r.cfg.AuthMethod = authMethod
r.cfg.CWD = cwd
}
// Run starts the adapter's polling loop and blocks until ctx cancels.
func (r *Runner) Run(ctx context.Context) error {
r.mu.Lock()
r.runCtx = ctx
r.mu.Unlock()
return r.adapter.Run(ctx, r.handleMessage, r.handleCommand)
}
// handleMessage is called by the adapter for every normal inbound message.
func (r *Runner) handleMessage(msg InboundMessage) {
if msg.Text == "" && len(msg.Images) == 0 {
return
}
r.mu.Lock()
r.queue = append(r.queue, queuedTurn{
channelID: msg.ChannelID,
messageID: msg.MessageID,
prompt: msg.Text,
images: msg.Images,
})
idle := !r.busy
r.mu.Unlock()
if idle {
go r.drainQueue()
}
}
// handleCommand is called by the adapter for built-in commands.
func (r *Runner) handleCommand(cmd Command, msg InboundMessage) {
switch cmd {
case CmdStart, CmdHelp:
_ = r.adapter.Send(context.Background(), msg.ChannelID,
"send me any message and i'll forward it to zot. attach an image and i'll pass it to the model. commands: /status, /stop, or plain stop.")
case CmdStatus:
r.sendStatus(msg.ChannelID)
case CmdStop:
r.cancelActiveTurn(msg.ChannelID, msg.MessageID)
}
}
// drainQueue runs queued turns one at a time until the queue is empty.
func (r *Runner) drainQueue() {
r.mu.Lock()
parent := r.runCtx
r.mu.Unlock()
for {
r.mu.Lock()
if len(r.queue) == 0 {
r.busy = false
r.activeCtx = nil
r.mu.Unlock()
return
}
t := r.queue[0]
r.queue = r.queue[1:]
r.busy = true
turnCtx, cancel := context.WithCancel(parent)
r.activeCtx = cancel
r.mu.Unlock()
if r.cfg.RefreshCreds != nil {
if err := r.cfg.RefreshCreds(); err != nil {
fmt.Fprintln(stderr(), "bot: refresh creds:", err)
}
}
r.runTurn(turnCtx, t)
cancel()
}
}
// runTurn sends the queued prompt to the agent and streams the reply.
func (r *Runner) runTurn(ctx context.Context, t queuedTurn) {
stopWorking := r.adapter.IndicateWorking(ctx, t.channelID)
defer stopWorking()
var replyBuilder strings.Builder
var lastAssistantText string
var turnErr error
sink := func(ev core.AgentEvent) {
switch e := ev.(type) {
case core.EvTextDelta:
replyBuilder.WriteString(e.Delta)
case core.EvUsage:
r.mu.Lock()
if e.Usage.InputTokens > 0 {
r.lastCtxInput = e.Usage.InputTokens + e.Usage.CacheReadTokens + e.Usage.CacheWriteTokens
}
r.mu.Unlock()
case core.EvAssistantMessage:
var sb strings.Builder
for _, c := range e.Message.Content {
if tb, ok := c.(provider.TextBlock); ok {
if sb.Len() > 0 {
sb.WriteString("\n")
}
sb.WriteString(tb.Text)
}
}
if sb.Len() > 0 {
lastAssistantText = sb.String()
}
replyBuilder.Reset()
case core.EvTurnEnd:
if e.Err != nil {
turnErr = e.Err
}
}
}
if err := r.agent.Prompt(ctx, t.prompt, t.images, sink); err != nil {
turnErr = err
}
reply := strings.TrimSpace(lastAssistantText)
if reply == "" {
reply = strings.TrimSpace(replyBuilder.String())
}
if turnErr != nil && ctx.Err() == nil {
reply = "error: " + turnErr.Error()
}
if reply == "" {
reply = "(no reply)"
}
// Adapter.Send is responsible for chunking to protocol limits.
if err := r.adapter.Send(context.Background(), t.channelID, reply); err != nil {
fmt.Fprintln(stderr(), "bot: send reply:", err)
}
}
// cancelActiveTurn aborts the currently running turn, if any.
func (r *Runner) cancelActiveTurn(channelID, messageID string) {
r.mu.Lock()
cancel := r.activeCtx
r.mu.Unlock()
if cancel != nil {
cancel()
_ = r.adapter.Send(context.Background(), channelID, "cancelled the current turn.")
} else {
_ = r.adapter.Send(context.Background(), channelID, "nothing running.")
}
}
// sendStatus describes agent state to the user.
func (r *Runner) sendStatus(channelID string) {
r.mu.Lock()
busy := r.busy
queued := len(r.queue)
ctxUsed := r.lastCtxInput
providerName := r.cfg.Provider
authMethod := r.cfg.AuthMethod
cwd := r.cfg.CWD
r.mu.Unlock()
model := r.agent.Model
ctxMax := 0
if m, err := provider.FindModel(providerName, model); err == nil {
ctxMax = m.ContextWindow
}
status := FormatStatus(StatusSnapshot{
Provider: providerName,
Model: model,
CWD: cwd,
Usage: r.agent.Cost(),
Subscription: authMethod == "oauth",
ContextUsed: ctxUsed,
ContextMax: ctxMax,
Busy: busy,
Queued: queued,
})
if extra := r.adapter.StatusText(); extra != "" {
status += "\n" + extra
}
_ = r.adapter.Send(context.Background(), channelID, status)
}

View file

@ -0,0 +1,120 @@
package bot
import (
"fmt"
"os"
"strings"
"github.com/patriceckhart/zot/packages/provider"
)
// StatusSnapshot is the small cross-host state bundle rendered for
// /status replies.
type StatusSnapshot struct {
Provider string
Model string
CWD string
Usage provider.Usage
Subscription bool
ContextUsed int
ContextMax int
Busy bool
Queued int
}
// FormatStatus renders the same compact model/usage/cost/context
// information shown in the TUI status bar, plus the current directory.
func FormatStatus(s StatusSnapshot) string {
providerName := strings.TrimSpace(s.Provider)
model := strings.TrimSpace(s.Model)
if providerName == "" {
providerName = "unknown"
}
if model == "" {
model = "unknown"
}
var stats []string
if s.Usage.InputTokens > 0 {
stats = append(stats, fmt.Sprintf("↑%s", formatTokens(s.Usage.InputTokens)))
}
if s.Usage.OutputTokens > 0 {
stats = append(stats, fmt.Sprintf("↓%s", formatTokens(s.Usage.OutputTokens)))
}
if s.Usage.CacheReadTokens > 0 {
stats = append(stats, fmt.Sprintf("R%s", formatTokens(s.Usage.CacheReadTokens)))
}
if s.Usage.CacheWriteTokens > 0 {
stats = append(stats, fmt.Sprintf("W%s", formatTokens(s.Usage.CacheWriteTokens)))
}
if s.Usage.CostUSD > 0 || s.Subscription {
cost := fmt.Sprintf("$%.3f", s.Usage.CostUSD)
if s.Subscription {
cost += " (sub)"
}
stats = append(stats, cost)
}
if ctx := contextUsage(s.ContextUsed, s.ContextMax); ctx != "" {
stats = append(stats, ctx)
}
line := fmt.Sprintf("(%s) %s", providerName, model)
if len(stats) > 0 {
line += " " + strings.Join(stats, " ")
}
state := "idle"
if s.Busy {
state = "working"
}
lines := []string{line, "state: " + state}
if s.Queued > 0 {
lines = append(lines, fmt.Sprintf("queued: %d", s.Queued))
}
if cwd := shortenHome(strings.TrimSpace(s.CWD)); cwd != "" {
lines = append(lines, "cwd: "+cwd)
}
return strings.Join(lines, "\n")
}
func contextUsage(used, max int) string {
if max <= 0 {
if used <= 0 {
return ""
}
return formatTokens(used)
}
pct := float64(used) / float64(max) * 100
return fmt.Sprintf("%.1f%%/%s", pct, formatTokens(max))
}
func formatTokens(n int) string {
switch {
case n < 0:
return "0"
case n < 1000:
return fmt.Sprintf("%d", n)
case n < 10000:
return fmt.Sprintf("%.1fk", float64(n)/1000)
case n < 1_000_000:
return fmt.Sprintf("%dk", (n+500)/1000)
case n < 10_000_000:
return fmt.Sprintf("%.1fM", float64(n)/1_000_000)
default:
return fmt.Sprintf("%dM", (n+500_000)/1_000_000)
}
}
func shortenHome(path string) string {
home, err := os.UserHomeDir()
if err != nil || home == "" {
return path
}
if path == home {
return "~"
}
if strings.HasPrefix(path, home+string(os.PathSeparator)) {
return "~" + strings.TrimPrefix(path, home)
}
return path
}

View file

@ -0,0 +1,250 @@
package telegram
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"github.com/patriceckhart/zot/packages/agent/modes/bot"
"github.com/patriceckhart/zot/packages/provider"
)
// Adapter implements bot.BotAdapter for Telegram.
type Adapter struct {
Client *Client
Cfg *Config // pointer so Run can mutate and persist
Save func(Config) error
}
// NewAdapter creates a Telegram adapter.
func NewAdapter(client *Client, cfg *Config, save func(Config) error) *Adapter {
return &Adapter{Client: client, Cfg: cfg, Save: save}
}
// Run drives the Telegram long-polling loop. It performs initial
// GetMe, handles pairing, and dispatches inbound messages to the
// generic handler / commandHandler callbacks.
func (a *Adapter) Run(ctx context.Context,
handler func(bot.InboundMessage),
commandHandler func(bot.Command, bot.InboundMessage),
) error {
if a.Cfg.BotToken == "" {
return fmt.Errorf("no bot token configured; run `zot bot setup` first")
}
me, err := a.Client.GetMe(ctx)
if err != nil {
return fmt.Errorf("getMe: %w", err)
}
// Keep the stored username/id in sync with the actual bot.
if a.Cfg.BotID != me.ID || a.Cfg.BotUsername != me.Username {
a.Cfg.BotID = me.ID
a.Cfg.BotUsername = me.Username
_ = a.Save(*a.Cfg)
}
fmt.Printf("telegram bridge online as @%s (id=%d)\n", me.Username, me.ID)
if a.Cfg.AllowedUserID == 0 {
fmt.Println("no user paired yet — send /start to the bot from Telegram to claim it")
} else {
fmt.Printf("paired with telegram user id %d\n", a.Cfg.AllowedUserID)
}
return a.pollLoop(ctx, handler, commandHandler)
}
// pollLoop long-polls Telegram for updates and dispatches them.
func (a *Adapter) pollLoop(ctx context.Context,
handler func(bot.InboundMessage),
commandHandler func(bot.Command, bot.InboundMessage),
) error {
backoff := time.Second
for {
if err := ctx.Err(); err != nil {
return err
}
updates, err := a.Client.GetUpdates(ctx, a.Cfg.LastUpdateID+1, 30)
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
fmt.Fprintln(stderr(), "telegram: getUpdates error:", err)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(backoff):
}
if backoff < 30*time.Second {
backoff *= 2
}
continue
}
backoff = time.Second
for _, u := range updates {
a.handleUpdate(ctx, u, handler, commandHandler)
a.Cfg.LastUpdateID = u.UpdateID
_ = a.Save(*a.Cfg)
}
}
}
// handleUpdate processes a single Telegram update. Telegram-specific
// concerns (pairing, user filtering, image download) live here; the
// generic callbacks are called for normal messages and commands.
func (a *Adapter) handleUpdate(ctx context.Context, u Update,
handler func(bot.InboundMessage),
commandHandler func(bot.Command, bot.InboundMessage),
) {
msg := u.Message
if msg == nil {
msg = u.Edited
}
if msg == nil || msg.From == nil || msg.From.IsBot {
return
}
if msg.Chat.Type != "private" {
return
}
chanID := fmt.Sprintf("%d", msg.Chat.ID)
msgID := fmt.Sprintf("%d", msg.MessageID)
// Pairing: first user who sends /start claims the bridge.
text := strings.TrimSpace(msg.Text)
if a.Cfg.AllowedUserID == 0 {
if strings.HasPrefix(text, "/start") {
a.Cfg.AllowedUserID = msg.From.ID
_ = a.Save(*a.Cfg)
_ = a.Client.SendMessage(ctx, msg.Chat.ID,
fmt.Sprintf("paired with @%s. send any message and i'll forward it to zot.", msg.From.Username),
msg.MessageID)
return
}
_ = a.Client.SendMessage(ctx, msg.Chat.ID,
"this bot isn't paired yet. send /start to claim it.",
msg.MessageID)
return
}
// Enforce allowed user.
if msg.From.ID != a.Cfg.AllowedUserID {
_ = a.Client.SendMessage(ctx, msg.Chat.ID,
"this bot is paired with a different user.",
msg.MessageID)
return
}
inbound := bot.InboundMessage{
ChannelID: chanID,
MessageID: msgID,
}
// Built-in commands that bypass the agent.
switch text {
case "/start":
commandHandler(bot.CmdStart, inbound)
return
case "/help":
commandHandler(bot.CmdHelp, inbound)
return
case "/status":
commandHandler(bot.CmdStatus, inbound)
return
case "/stop":
commandHandler(bot.CmdStop, inbound)
return
}
if bot.IsStopCommand(text) {
commandHandler(bot.CmdStop, inbound)
return
}
// Build the prompt: combine text + caption; download image attachments.
prompt := strings.TrimSpace(msg.Text)
if msg.Caption != "" {
if prompt != "" {
prompt += "\n"
}
prompt += msg.Caption
}
var images []provider.ImageBlock
if len(msg.Photo) > 0 {
largest := msg.Photo[len(msg.Photo)-1]
if data, mime, err := a.download(ctx, largest.FileID, ""); err == nil {
images = append(images, provider.ImageBlock{MimeType: mime, Data: data})
} else {
fmt.Fprintln(stderr(), "telegram: download photo:", err)
}
}
if msg.Document != nil && isImageMIME(msg.Document.MimeType) {
if data, mime, err := a.download(ctx, msg.Document.FileID, msg.Document.MimeType); err == nil {
images = append(images, provider.ImageBlock{MimeType: mime, Data: data})
}
}
inbound.Text = prompt
inbound.Images = images
handler(inbound)
}
// Send delivers a reply to a Telegram chat. channelID is parsed back
// to int64. Messages are chunked to 4000 runes (Telegram limit 4096).
func (a *Adapter) Send(ctx context.Context, channelID, text string) error {
chatID, err := strconv.ParseInt(channelID, 10, 64)
if err != nil {
return fmt.Errorf("invalid channelID %q: %w", channelID, err)
}
for _, chunk := range chunkMessage(text, 4000) {
if err := a.Client.SendMessage(ctx, chatID, chunk, 0); err != nil {
return err
}
}
return nil
}
// IndicateWorking keeps Telegram's "typing..." indicator alive until
// the returned stop function is called.
func (a *Adapter) IndicateWorking(ctx context.Context, channelID string) (stop func()) {
chatID, err := strconv.ParseInt(channelID, 10, 64)
if err != nil {
return func() {}
}
tctx, cancel := context.WithCancel(ctx)
go func() {
for {
_ = a.Client.SendChatAction(tctx, chatID, "typing")
select {
case <-tctx.Done():
return
case <-time.After(4 * time.Second):
}
}
}()
return cancel
}
// StatusText returns the bot's @username for inclusion in /status.
func (a *Adapter) StatusText() string {
if a.Cfg.BotUsername != "" {
return "@" + a.Cfg.BotUsername
}
return ""
}
// download fetches a file from Telegram and returns bytes + mime.
func (a *Adapter) download(ctx context.Context, fileID, mime string) ([]byte, string, error) {
f, err := a.Client.GetFile(ctx, fileID)
if err != nil {
return nil, "", err
}
data, err := a.Client.DownloadFile(ctx, f.FilePath)
if err != nil {
return nil, "", err
}
if mime == "" {
mime = guessImageMIME(f.FilePath)
}
return data, mime, nil
}

View file

@ -2,18 +2,17 @@ package telegram
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/patriceckhart/zot/packages/agent/modes/bot"
"github.com/patriceckhart/zot/packages/core"
"github.com/patriceckhart/zot/packages/provider"
)
// Bot owns the Telegram polling loop and dispatches inbound DMs to
// the agent. It is a long-running goroutine; Run blocks until ctx
// cancels.
// Bot is a thin wrapper around bot.Runner + Adapter. It exists to
// keep the exported API stable for botcmd.go.
//
// Deprecated: Use telegram.NewAdapter + bot.NewRunner directly.
// Bot is kept for backward compatibility but is no longer used internally.
type Bot struct {
Client *Client
Agent *core.Agent
@ -22,353 +21,29 @@ type Bot struct {
Provider string
AuthMethod string
CWD string
// Save persists cfg to bot.json. Called whenever the bot pairs
// with a new allowed user or advances LastUpdateID.
Save func(Config) error
// RefreshCreds is called before every turn to pick up newly
// refreshed OAuth tokens. Optional; when nil, the bot uses the
// credential it was built with. Implementations typically call
// agent.ResolveCredentialFull which auto-refreshes expired tokens.
Save func(Config) error
RefreshCreds func() error
mu sync.Mutex
busy bool
activeCtx context.CancelFunc
queue []queuedTurn
lastCtxInput int
runner *bot.Runner
adapter *Adapter
}
// queuedTurn is an inbound DM waiting to become a prompt.
type queuedTurn struct {
chatID int64
messageID int
prompt string
images []provider.ImageBlock
}
// Run drives the bot. Returns when ctx is cancelled or GetMe fails.
// Run starts the bot. It constructs the adapter and runner on first
// call, then delegates to runner.Run.
func (b *Bot) Run(ctx context.Context) error {
if b.Config.BotToken == "" {
return fmt.Errorf("no bot token configured; run `zot bot setup` first")
if b.adapter == nil {
b.adapter = NewAdapter(b.Client, &b.Config, b.Save)
}
me, err := b.Client.GetMe(ctx)
if err != nil {
return fmt.Errorf("getMe: %w", err)
if b.runner == nil {
b.runner = bot.NewRunner(b.adapter, b.Agent, bot.Config{
ZotHome: b.ZotHome,
Provider: b.Provider,
AuthMethod: b.AuthMethod,
CWD: b.CWD,
RefreshCreds: b.RefreshCreds,
})
}
// Keep the stored username/id in sync with the actual bot.
if b.Config.BotID != me.ID || b.Config.BotUsername != me.Username {
b.Config.BotID = me.ID
b.Config.BotUsername = me.Username
_ = b.Save(b.Config)
}
fmt.Printf("telegram bridge online as @%s (id=%d)\n", me.Username, me.ID)
if b.Config.AllowedUserID == 0 {
fmt.Println("no user paired yet — send /start to the bot from Telegram to claim it")
} else {
fmt.Printf("paired with telegram user id %d\n", b.Config.AllowedUserID)
}
return b.pollLoop(ctx)
}
// pollLoop long-polls Telegram for updates and dispatches them.
func (b *Bot) pollLoop(ctx context.Context) error {
backoff := time.Second
for {
if err := ctx.Err(); err != nil {
return err
}
updates, err := b.Client.GetUpdates(ctx, b.Config.LastUpdateID+1, 30)
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
fmt.Fprintln(stderr(), "telegram: getUpdates error:", err)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(backoff):
}
if backoff < 30*time.Second {
backoff *= 2
}
continue
}
backoff = time.Second
for _, u := range updates {
if err := b.handleUpdate(ctx, u); err != nil {
fmt.Fprintln(stderr(), "telegram: handleUpdate:", err)
}
b.Config.LastUpdateID = u.UpdateID
_ = b.Save(b.Config)
}
}
}
// handleUpdate processes a single Telegram update.
func (b *Bot) handleUpdate(ctx context.Context, u Update) error {
msg := u.Message
if msg == nil {
msg = u.Edited
}
if msg == nil || msg.From == nil || msg.From.IsBot {
return nil
}
if msg.Chat.Type != "private" {
return nil
}
// Pairing: first user who sends /start claims the bridge.
text := strings.TrimSpace(msg.Text)
if b.Config.AllowedUserID == 0 {
if strings.HasPrefix(text, "/start") {
b.Config.AllowedUserID = msg.From.ID
_ = b.Save(b.Config)
_ = b.Client.SendMessage(ctx, msg.Chat.ID,
fmt.Sprintf("paired with @%s. send any message and i'll forward it to zot.", msg.From.Username),
msg.MessageID)
return nil
}
_ = b.Client.SendMessage(ctx, msg.Chat.ID,
"this bot isn't paired yet. send /start to claim it.",
msg.MessageID)
return nil
}
// Enforce allowed user.
if msg.From.ID != b.Config.AllowedUserID {
_ = b.Client.SendMessage(ctx, msg.Chat.ID,
"this bot is paired with a different user.",
msg.MessageID)
return nil
}
// Built-in commands that bypass the agent.
switch text {
case "/start", "/help":
_ = b.Client.SendMessage(ctx, msg.Chat.ID,
"send me any message and i'll forward it to zot. attach an image and i'll pass it to the model. commands: /status, /stop, or plain stop.",
msg.MessageID)
return nil
case "/status":
return b.sendStatus(ctx, msg.Chat.ID, msg.MessageID)
case "/stop":
b.cancelActiveTurn(ctx, msg.Chat.ID, msg.MessageID)
return nil
}
if isStopCommand(text) {
b.cancelActiveTurn(ctx, msg.Chat.ID, msg.MessageID)
return nil
}
// Build the prompt: combine text + caption; download image attachments.
prompt := strings.TrimSpace(msg.Text)
if msg.Caption != "" {
if prompt != "" {
prompt += "\n"
}
prompt += msg.Caption
}
var images []provider.ImageBlock
if len(msg.Photo) > 0 {
// Photos arrive in multiple sizes; take the largest (last in the slice).
largest := msg.Photo[len(msg.Photo)-1]
if data, mime, err := b.download(ctx, largest.FileID, ""); err == nil {
images = append(images, provider.ImageBlock{MimeType: mime, Data: data})
} else {
fmt.Fprintln(stderr(), "telegram: download photo:", err)
}
}
if msg.Document != nil && isImageMIME(msg.Document.MimeType) {
if data, mime, err := b.download(ctx, msg.Document.FileID, msg.Document.MimeType); err == nil {
images = append(images, provider.ImageBlock{MimeType: mime, Data: data})
}
}
if prompt == "" && len(images) == 0 {
return nil
}
b.mu.Lock()
b.queue = append(b.queue, queuedTurn{
chatID: msg.Chat.ID,
messageID: msg.MessageID,
prompt: prompt,
images: images,
})
idle := !b.busy
b.mu.Unlock()
if idle {
go b.drainQueue(ctx)
}
return nil
}
// drainQueue runs queued turns one at a time until the queue is empty.
func (b *Bot) drainQueue(parent context.Context) {
for {
b.mu.Lock()
if len(b.queue) == 0 {
b.busy = false
b.activeCtx = nil
b.mu.Unlock()
return
}
t := b.queue[0]
b.queue = b.queue[1:]
b.busy = true
turnCtx, cancel := context.WithCancel(parent)
b.activeCtx = cancel
b.mu.Unlock()
if b.RefreshCreds != nil {
if err := b.RefreshCreds(); err != nil {
fmt.Fprintln(stderr(), "telegram: refresh creds:", err)
}
}
b.runTurn(turnCtx, t)
cancel()
}
}
// runTurn sends the queued prompt to the agent and streams the reply.
func (b *Bot) runTurn(ctx context.Context, t queuedTurn) {
stopTyping := b.startTyping(ctx, t.chatID)
defer stopTyping()
var replyBuilder strings.Builder
var lastAssistantText string
var turnErr error
sink := func(ev core.AgentEvent) {
switch e := ev.(type) {
case core.EvTextDelta:
replyBuilder.WriteString(e.Delta)
case core.EvUsage:
b.mu.Lock()
if e.Usage.InputTokens > 0 {
b.lastCtxInput = e.Usage.InputTokens + e.Usage.CacheReadTokens + e.Usage.CacheWriteTokens
}
b.mu.Unlock()
case core.EvAssistantMessage:
var sb strings.Builder
for _, c := range e.Message.Content {
if tb, ok := c.(provider.TextBlock); ok {
if sb.Len() > 0 {
sb.WriteString("\n")
}
sb.WriteString(tb.Text)
}
}
if sb.Len() > 0 {
lastAssistantText = sb.String()
}
replyBuilder.Reset()
case core.EvTurnEnd:
if e.Err != nil {
turnErr = e.Err
}
}
}
if err := b.Agent.Prompt(ctx, t.prompt, t.images, sink); err != nil {
turnErr = err
}
reply := strings.TrimSpace(lastAssistantText)
if reply == "" {
reply = strings.TrimSpace(replyBuilder.String())
}
if turnErr != nil && ctx.Err() == nil {
reply = "error: " + turnErr.Error()
}
if reply == "" {
reply = "(no reply)"
}
// Telegram caps messages at 4096 chars. Chunk to be safe.
for _, chunk := range chunkMessage(reply, 4000) {
if err := b.Client.SendMessage(context.Background(), t.chatID, chunk, 0); err != nil {
fmt.Fprintln(stderr(), "telegram: sendMessage:", err)
break
}
}
}
// startTyping keeps Telegram's "typing..." indicator alive until the
// returned stop function is called.
func (b *Bot) startTyping(ctx context.Context, chatID int64) func() {
tctx, cancel := context.WithCancel(ctx)
go func() {
for {
_ = b.Client.SendChatAction(tctx, chatID, "typing")
select {
case <-tctx.Done():
return
case <-time.After(4 * time.Second):
}
}
}()
return cancel
}
func (b *Bot) cancelActiveTurn(ctx context.Context, chatID int64, replyTo int) {
b.mu.Lock()
cancel := b.activeCtx
b.mu.Unlock()
if cancel != nil {
cancel()
_ = b.Client.SendMessage(ctx, chatID, "cancelled the current turn.", replyTo)
} else {
_ = b.Client.SendMessage(ctx, chatID, "nothing running.", replyTo)
}
}
// sendStatus describes agent state to the Telegram user.
func (b *Bot) sendStatus(ctx context.Context, chatID int64, replyTo int) error {
b.mu.Lock()
busy := b.busy
queued := len(b.queue)
ctxUsed := b.lastCtxInput
providerName := b.Provider
authMethod := b.AuthMethod
cwd := b.CWD
b.mu.Unlock()
model := b.Agent.Model
ctxMax := 0
if m, err := provider.FindModel(providerName, model); err == nil {
ctxMax = m.ContextWindow
}
return b.Client.SendMessage(ctx, chatID, FormatStatus(StatusSnapshot{
Provider: providerName,
Model: model,
CWD: cwd,
Usage: b.Agent.Cost(),
Subscription: authMethod == "oauth",
ContextUsed: ctxUsed,
ContextMax: ctxMax,
Busy: busy,
Queued: queued,
}), replyTo)
}
// download fetches a file from Telegram and returns bytes + mime.
func (b *Bot) download(ctx context.Context, fileID, mime string) ([]byte, string, error) {
f, err := b.Client.GetFile(ctx, fileID)
if err != nil {
return nil, "", err
}
data, err := b.Client.DownloadFile(ctx, f.FilePath)
if err != nil {
return nil, "", err
}
if mime == "" {
mime = guessImageMIME(f.FilePath)
}
return data, mime, nil
return b.runner.Run(ctx)
}
// chunkMessage splits s into chunks no larger than limit runes, on line

View file

@ -1,11 +1,6 @@
package telegram
import "strings"
import "github.com/patriceckhart/zot/packages/agent/modes/bot"
// isStopCommand reports whether text should abort the active turn.
// Telegram users often type plain "stop" rather than bot-style
// "/stop"; keep this intentionally narrow so normal prompts like
// "stop doing X" still go to the agent.
func isStopCommand(text string) bool {
return strings.EqualFold(strings.TrimSpace(text), "stop")
}
// isStopCommand is a shim to bot.IsStopCommand for backward compatibility.
var isStopCommand = bot.IsStopCommand

View file

@ -1,120 +1,9 @@
package telegram
import (
"fmt"
"os"
"strings"
import "github.com/patriceckhart/zot/packages/agent/modes/bot"
"github.com/patriceckhart/zot/packages/provider"
)
// StatusSnapshot is an alias for bot.StatusSnapshot for backward compatibility.
type StatusSnapshot = bot.StatusSnapshot
// StatusSnapshot is the small cross-host state bundle rendered for
// Telegram /status replies.
type StatusSnapshot struct {
Provider string
Model string
CWD string
Usage provider.Usage
Subscription bool
ContextUsed int
ContextMax int
Busy bool
Queued int
}
// FormatStatus renders the same compact model/usage/cost/context
// information shown in the TUI status bar, plus the current directory.
func FormatStatus(s StatusSnapshot) string {
providerName := strings.TrimSpace(s.Provider)
model := strings.TrimSpace(s.Model)
if providerName == "" {
providerName = "unknown"
}
if model == "" {
model = "unknown"
}
var stats []string
if s.Usage.InputTokens > 0 {
stats = append(stats, fmt.Sprintf("↑%s", formatTokens(s.Usage.InputTokens)))
}
if s.Usage.OutputTokens > 0 {
stats = append(stats, fmt.Sprintf("↓%s", formatTokens(s.Usage.OutputTokens)))
}
if s.Usage.CacheReadTokens > 0 {
stats = append(stats, fmt.Sprintf("R%s", formatTokens(s.Usage.CacheReadTokens)))
}
if s.Usage.CacheWriteTokens > 0 {
stats = append(stats, fmt.Sprintf("W%s", formatTokens(s.Usage.CacheWriteTokens)))
}
if s.Usage.CostUSD > 0 || s.Subscription {
cost := fmt.Sprintf("$%.3f", s.Usage.CostUSD)
if s.Subscription {
cost += " (sub)"
}
stats = append(stats, cost)
}
if ctx := contextUsage(s.ContextUsed, s.ContextMax); ctx != "" {
stats = append(stats, ctx)
}
line := fmt.Sprintf("(%s) %s", providerName, model)
if len(stats) > 0 {
line += " " + strings.Join(stats, " ")
}
state := "idle"
if s.Busy {
state = "working"
}
lines := []string{line, "state: " + state}
if s.Queued > 0 {
lines = append(lines, fmt.Sprintf("queued: %d", s.Queued))
}
if cwd := shortenHome(strings.TrimSpace(s.CWD)); cwd != "" {
lines = append(lines, "cwd: "+cwd)
}
return strings.Join(lines, "\n")
}
func contextUsage(used, max int) string {
if max <= 0 {
if used <= 0 {
return ""
}
return formatTokens(used)
}
pct := float64(used) / float64(max) * 100
return fmt.Sprintf("%.1f%%/%s", pct, formatTokens(max))
}
func formatTokens(n int) string {
switch {
case n < 0:
return "0"
case n < 1000:
return fmt.Sprintf("%d", n)
case n < 10000:
return fmt.Sprintf("%.1fk", float64(n)/1000)
case n < 1_000_000:
return fmt.Sprintf("%dk", (n+500)/1000)
case n < 10_000_000:
return fmt.Sprintf("%.1fM", float64(n)/1_000_000)
default:
return fmt.Sprintf("%dM", (n+500_000)/1_000_000)
}
}
func shortenHome(path string) string {
home, err := os.UserHomeDir()
if err != nil || home == "" {
return path
}
if path == home {
return "~"
}
if strings.HasPrefix(path, home+string(os.PathSeparator)) {
return "~" + strings.TrimPrefix(path, home)
}
return path
}
// FormatStatus is an alias for bot.FormatStatus for backward compatibility.
var FormatStatus = bot.FormatStatus