From 741c643b1b85b9cec960f468cb547e3592b39d49 Mon Sep 17 00:00:00 2001 From: mi-skam <40042054+mi-skam@users.noreply.github.com> Date: Fri, 26 Jun 2026 07:39:35 +0200 Subject: [PATCH] refactor: extract generic BotAdapter interface from Telegram bot Extract protocol-agnostic bot logic into a new packages/agent/modes/bot package, enabling future messaging backends (Discord, Slack, etc.) to reuse turn queueing, agent prompting, and command dispatch. New package modes/bot: - adapter.go: BotAdapter interface + InboundMessage/Command types - runner.go: Generic Runner with turn queue, drainQueue, runTurn - status.go: StatusSnapshot + FormatStatus (moved from telegram) - commands.go: IsStopCommand (moved from telegram) Telegram adapter (modes/telegram/adapter.go): - Implements BotAdapter for Telegram protocol - Owns polling, pairing, user filtering, image download - ChannelID encoded as string (fmt.Sprintf(%d, chatID)) Refactored telegram/bot.go: - Bot type now a thin wrapper (deprecated, kept for compatibility) - Utility functions (chunkMessage, isImageMIME, guessImageMIME) retained Updated botcmd.go: - Constructs adapter + runner directly instead of Bot wrapper - Uses runner.UpdateRuntimeConfig for credential refresh Shims in telegram/status.go and telegram/commands.go re-export from bot package for backward compatibility with interactive.go. All tests pass. No import cycles: modes/bot does not import modes/telegram. --- packages/agent/botcmd.go | 27 +- packages/agent/modes/bot/adapter.go | 51 +++ packages/agent/modes/bot/commands.go | 11 + packages/agent/modes/bot/runner.go | 254 +++++++++++++++ packages/agent/modes/bot/status.go | 120 +++++++ packages/agent/modes/telegram/adapter.go | 250 +++++++++++++++ packages/agent/modes/telegram/bot.go | 369 ++-------------------- packages/agent/modes/telegram/commands.go | 11 +- packages/agent/modes/telegram/status.go | 121 +------ 9 files changed, 730 insertions(+), 484 deletions(-) create mode 100644 packages/agent/modes/bot/adapter.go create mode 100644 packages/agent/modes/bot/commands.go create mode 100644 packages/agent/modes/bot/runner.go create mode 100644 packages/agent/modes/bot/status.go create mode 100644 packages/agent/modes/telegram/adapter.go diff --git a/packages/agent/botcmd.go b/packages/agent/botcmd.go index df667a5..8955f9b 100644 --- a/packages/agent/botcmd.go +++ b/packages/agent/botcmd.go @@ -15,6 +15,7 @@ import ( "syscall" "time" + "github.com/patriceckhart/zot/packages/agent/modes/bot" "github.com/patriceckhart/zot/packages/agent/modes/telegram" "github.com/patriceckhart/zot/packages/core" ) @@ -362,18 +363,20 @@ func botRun(rawTail []string, version string) error { } } - var b *telegram.Bot - b = &telegram.Bot{ - Client: telegram.NewClient(cfg.BotToken), - Agent: agent, - Config: cfg, + // Construct the Telegram adapter and generic runner. + adapter := telegram.NewAdapter( + telegram.NewClient(cfg.BotToken), + &cfg, + func(c telegram.Config) error { + return telegram.SaveConfig(ZotHome(), c) + }, + ) + var runner *bot.Runner + runner = bot.NewRunner(adapter, agent, bot.Config{ ZotHome: ZotHome(), Provider: resolved.Provider, AuthMethod: resolved.AuthMethod, CWD: args.CWD, - Save: func(c telegram.Config) error { - return telegram.SaveConfig(ZotHome(), c) - }, RefreshCreds: func() error { // Re-run the same resolver the tui uses so we pick up // refreshed oauth tokens, re-logins, and model switches. @@ -385,12 +388,10 @@ func botRun(rawTail []string, version string) error { } agent.Client = next.NewClient() agent.Model = next.Model - b.Provider = next.Provider - b.AuthMethod = next.AuthMethod - b.CWD = next.CWD + runner.UpdateRuntimeConfig(next.Provider, next.AuthMethod, next.CWD) return nil }, - } + }) // Record our pid so `zot telegram-bot status` / `zot telegram-bot stop` can find us, // regardless of whether we were started directly or via `bot start`. @@ -407,7 +408,7 @@ func botRun(rawTail []string, version string) error { cancel() }() defer cancel() - return b.Run(ctx) + return runner.Run(ctx) } // openOrCreateSessionForBot reuses the same logic as interactive mode diff --git a/packages/agent/modes/bot/adapter.go b/packages/agent/modes/bot/adapter.go new file mode 100644 index 0000000..18fcfbe --- /dev/null +++ b/packages/agent/modes/bot/adapter.go @@ -0,0 +1,51 @@ +// Package bot provides a protocol-agnostic runner for long-running bot +// modes. Concrete transports (Telegram, Discord, …) implement the +// BotAdapter interface; the Runner handles turn queueing, agent +// prompting, command dispatch, and credential refresh. +package bot + +import ( + "context" + + "github.com/patriceckhart/zot/packages/provider" +) + +// InboundMessage is a protocol-normalised message from a user. +type InboundMessage struct { + ChannelID string // opaque; adapter owns encoding (e.g. fmt.Sprintf("%d", chatID)) + MessageID string // optional reply anchor + Text string + Images []provider.ImageBlock +} + +// Command is a built-in bot command that bypasses the agent. +type Command int + +const ( + CmdStart Command = iota // first-time pairing / welcome + CmdHelp // usage information + CmdStatus // agent/provider state + CmdStop // cancel the active turn +) + +// BotAdapter is the transport layer a concrete protocol must implement. +// The Runner calls these methods; it never touches protocol types directly. +type BotAdapter interface { + // Run drives inbound polling; calls handler for normal messages and + // commandHandler for built-in commands. Blocks until ctx is done. + Run(ctx context.Context, + handler func(InboundMessage), + commandHandler func(Command, InboundMessage), + ) error + + // Send delivers a reply. The adapter chunks to protocol limits. + Send(ctx context.Context, channelID, text string) error + + // IndicateWorking fires a "typing…" signal; returns a stop func. + // Return a no-op if the protocol doesn't support it. + IndicateWorking(ctx context.Context, channelID string) (stop func()) + + // StatusText appends protocol-specific info to /status replies + // (e.g. "@botname"). Return "" if there is nothing to add. + StatusText() string +} diff --git a/packages/agent/modes/bot/commands.go b/packages/agent/modes/bot/commands.go new file mode 100644 index 0000000..afcd6a7 --- /dev/null +++ b/packages/agent/modes/bot/commands.go @@ -0,0 +1,11 @@ +package bot + +import "strings" + +// IsStopCommand reports whether text should abort the active turn. +// Users often type plain "stop" rather than bot-style "/stop"; keep +// this intentionally narrow so normal prompts like "stop doing X" +// still go to the agent. +func IsStopCommand(text string) bool { + return strings.EqualFold(strings.TrimSpace(text), "stop") +} diff --git a/packages/agent/modes/bot/runner.go b/packages/agent/modes/bot/runner.go new file mode 100644 index 0000000..508da00 --- /dev/null +++ b/packages/agent/modes/bot/runner.go @@ -0,0 +1,254 @@ +package bot + +import ( + "context" + "fmt" + "io" + "os" + "strings" + "sync" + + "github.com/patriceckhart/zot/packages/core" + "github.com/patriceckhart/zot/packages/provider" +) + +// stderr is a tiny hook so tests can redirect bot logging. +var stderr = func() io.Writer { return os.Stderr } + +// Config holds runner-level settings that are protocol-independent. +type Config struct { + ZotHome string + Provider string + AuthMethod string + CWD string + RefreshCreds func() error +} + +// queuedTurn is an inbound message waiting to become a prompt. +type queuedTurn struct { + channelID string + messageID string + prompt string + images []provider.ImageBlock +} + +// Runner is the protocol-agnostic bot engine. It owns the turn queue, +// dispatches prompts to the agent, and streams replies back through +// the BotAdapter. +type Runner struct { + agent *core.Agent + adapter BotAdapter + cfg Config + + mu sync.Mutex + busy bool + activeCtx context.CancelFunc + queue []queuedTurn + lastCtxInput int + runCtx context.Context // set at Run entry; used by goroutines +} + +// NewRunner creates a Runner wired to the given adapter and agent. +func NewRunner(adapter BotAdapter, agent *core.Agent, cfg Config) *Runner { + return &Runner{ + agent: agent, + adapter: adapter, + cfg: cfg, + } +} + +// UpdateRuntimeConfig updates provider/auth/cwd at runtime (e.g. after +// credential refresh). This is thread-safe. +func (r *Runner) UpdateRuntimeConfig(provider, authMethod, cwd string) { + r.mu.Lock() + defer r.mu.Unlock() + r.cfg.Provider = provider + r.cfg.AuthMethod = authMethod + r.cfg.CWD = cwd +} + +// Run starts the adapter's polling loop and blocks until ctx cancels. +func (r *Runner) Run(ctx context.Context) error { + r.mu.Lock() + r.runCtx = ctx + r.mu.Unlock() + + return r.adapter.Run(ctx, r.handleMessage, r.handleCommand) +} + +// handleMessage is called by the adapter for every normal inbound message. +func (r *Runner) handleMessage(msg InboundMessage) { + if msg.Text == "" && len(msg.Images) == 0 { + return + } + + r.mu.Lock() + r.queue = append(r.queue, queuedTurn{ + channelID: msg.ChannelID, + messageID: msg.MessageID, + prompt: msg.Text, + images: msg.Images, + }) + idle := !r.busy + r.mu.Unlock() + + if idle { + go r.drainQueue() + } +} + +// handleCommand is called by the adapter for built-in commands. +func (r *Runner) handleCommand(cmd Command, msg InboundMessage) { + switch cmd { + case CmdStart, CmdHelp: + _ = r.adapter.Send(context.Background(), msg.ChannelID, + "send me any message and i'll forward it to zot. attach an image and i'll pass it to the model. commands: /status, /stop, or plain stop.") + case CmdStatus: + r.sendStatus(msg.ChannelID) + case CmdStop: + r.cancelActiveTurn(msg.ChannelID, msg.MessageID) + } +} + +// drainQueue runs queued turns one at a time until the queue is empty. +func (r *Runner) drainQueue() { + r.mu.Lock() + parent := r.runCtx + r.mu.Unlock() + + for { + r.mu.Lock() + if len(r.queue) == 0 { + r.busy = false + r.activeCtx = nil + r.mu.Unlock() + return + } + t := r.queue[0] + r.queue = r.queue[1:] + r.busy = true + turnCtx, cancel := context.WithCancel(parent) + r.activeCtx = cancel + r.mu.Unlock() + + if r.cfg.RefreshCreds != nil { + if err := r.cfg.RefreshCreds(); err != nil { + fmt.Fprintln(stderr(), "bot: refresh creds:", err) + } + } + r.runTurn(turnCtx, t) + cancel() + } +} + +// runTurn sends the queued prompt to the agent and streams the reply. +func (r *Runner) runTurn(ctx context.Context, t queuedTurn) { + stopWorking := r.adapter.IndicateWorking(ctx, t.channelID) + defer stopWorking() + + var replyBuilder strings.Builder + var lastAssistantText string + var turnErr error + + sink := func(ev core.AgentEvent) { + switch e := ev.(type) { + case core.EvTextDelta: + replyBuilder.WriteString(e.Delta) + case core.EvUsage: + r.mu.Lock() + if e.Usage.InputTokens > 0 { + r.lastCtxInput = e.Usage.InputTokens + e.Usage.CacheReadTokens + e.Usage.CacheWriteTokens + } + r.mu.Unlock() + case core.EvAssistantMessage: + var sb strings.Builder + for _, c := range e.Message.Content { + if tb, ok := c.(provider.TextBlock); ok { + if sb.Len() > 0 { + sb.WriteString("\n") + } + sb.WriteString(tb.Text) + } + } + if sb.Len() > 0 { + lastAssistantText = sb.String() + } + replyBuilder.Reset() + case core.EvTurnEnd: + if e.Err != nil { + turnErr = e.Err + } + } + } + + if err := r.agent.Prompt(ctx, t.prompt, t.images, sink); err != nil { + turnErr = err + } + + reply := strings.TrimSpace(lastAssistantText) + if reply == "" { + reply = strings.TrimSpace(replyBuilder.String()) + } + if turnErr != nil && ctx.Err() == nil { + reply = "error: " + turnErr.Error() + } + if reply == "" { + reply = "(no reply)" + } + + // Adapter.Send is responsible for chunking to protocol limits. + if err := r.adapter.Send(context.Background(), t.channelID, reply); err != nil { + fmt.Fprintln(stderr(), "bot: send reply:", err) + } +} + +// cancelActiveTurn aborts the currently running turn, if any. +func (r *Runner) cancelActiveTurn(channelID, messageID string) { + r.mu.Lock() + cancel := r.activeCtx + r.mu.Unlock() + if cancel != nil { + cancel() + _ = r.adapter.Send(context.Background(), channelID, "cancelled the current turn.") + } else { + _ = r.adapter.Send(context.Background(), channelID, "nothing running.") + } +} + +// sendStatus describes agent state to the user. +func (r *Runner) sendStatus(channelID string) { + r.mu.Lock() + busy := r.busy + queued := len(r.queue) + ctxUsed := r.lastCtxInput + providerName := r.cfg.Provider + authMethod := r.cfg.AuthMethod + cwd := r.cfg.CWD + r.mu.Unlock() + + model := r.agent.Model + ctxMax := 0 + if m, err := provider.FindModel(providerName, model); err == nil { + ctxMax = m.ContextWindow + } + + status := FormatStatus(StatusSnapshot{ + Provider: providerName, + Model: model, + CWD: cwd, + Usage: r.agent.Cost(), + Subscription: authMethod == "oauth", + ContextUsed: ctxUsed, + ContextMax: ctxMax, + Busy: busy, + Queued: queued, + }) + + if extra := r.adapter.StatusText(); extra != "" { + status += "\n" + extra + } + + _ = r.adapter.Send(context.Background(), channelID, status) +} + + diff --git a/packages/agent/modes/bot/status.go b/packages/agent/modes/bot/status.go new file mode 100644 index 0000000..33f0ca6 --- /dev/null +++ b/packages/agent/modes/bot/status.go @@ -0,0 +1,120 @@ +package bot + +import ( + "fmt" + "os" + "strings" + + "github.com/patriceckhart/zot/packages/provider" +) + +// StatusSnapshot is the small cross-host state bundle rendered for +// /status replies. +type StatusSnapshot struct { + Provider string + Model string + CWD string + Usage provider.Usage + Subscription bool + ContextUsed int + ContextMax int + Busy bool + Queued int +} + +// FormatStatus renders the same compact model/usage/cost/context +// information shown in the TUI status bar, plus the current directory. +func FormatStatus(s StatusSnapshot) string { + providerName := strings.TrimSpace(s.Provider) + model := strings.TrimSpace(s.Model) + if providerName == "" { + providerName = "unknown" + } + if model == "" { + model = "unknown" + } + + var stats []string + if s.Usage.InputTokens > 0 { + stats = append(stats, fmt.Sprintf("↑%s", formatTokens(s.Usage.InputTokens))) + } + if s.Usage.OutputTokens > 0 { + stats = append(stats, fmt.Sprintf("↓%s", formatTokens(s.Usage.OutputTokens))) + } + if s.Usage.CacheReadTokens > 0 { + stats = append(stats, fmt.Sprintf("R%s", formatTokens(s.Usage.CacheReadTokens))) + } + if s.Usage.CacheWriteTokens > 0 { + stats = append(stats, fmt.Sprintf("W%s", formatTokens(s.Usage.CacheWriteTokens))) + } + if s.Usage.CostUSD > 0 || s.Subscription { + cost := fmt.Sprintf("$%.3f", s.Usage.CostUSD) + if s.Subscription { + cost += " (sub)" + } + stats = append(stats, cost) + } + if ctx := contextUsage(s.ContextUsed, s.ContextMax); ctx != "" { + stats = append(stats, ctx) + } + + line := fmt.Sprintf("(%s) %s", providerName, model) + if len(stats) > 0 { + line += " " + strings.Join(stats, " ") + } + + state := "idle" + if s.Busy { + state = "working" + } + lines := []string{line, "state: " + state} + if s.Queued > 0 { + lines = append(lines, fmt.Sprintf("queued: %d", s.Queued)) + } + if cwd := shortenHome(strings.TrimSpace(s.CWD)); cwd != "" { + lines = append(lines, "cwd: "+cwd) + } + return strings.Join(lines, "\n") +} + +func contextUsage(used, max int) string { + if max <= 0 { + if used <= 0 { + return "" + } + return formatTokens(used) + } + pct := float64(used) / float64(max) * 100 + return fmt.Sprintf("%.1f%%/%s", pct, formatTokens(max)) +} + +func formatTokens(n int) string { + switch { + case n < 0: + return "0" + case n < 1000: + return fmt.Sprintf("%d", n) + case n < 10000: + return fmt.Sprintf("%.1fk", float64(n)/1000) + case n < 1_000_000: + return fmt.Sprintf("%dk", (n+500)/1000) + case n < 10_000_000: + return fmt.Sprintf("%.1fM", float64(n)/1_000_000) + default: + return fmt.Sprintf("%dM", (n+500_000)/1_000_000) + } +} + +func shortenHome(path string) string { + home, err := os.UserHomeDir() + if err != nil || home == "" { + return path + } + if path == home { + return "~" + } + if strings.HasPrefix(path, home+string(os.PathSeparator)) { + return "~" + strings.TrimPrefix(path, home) + } + return path +} diff --git a/packages/agent/modes/telegram/adapter.go b/packages/agent/modes/telegram/adapter.go new file mode 100644 index 0000000..a83e866 --- /dev/null +++ b/packages/agent/modes/telegram/adapter.go @@ -0,0 +1,250 @@ +package telegram + +import ( + "context" + "fmt" + "strconv" + "strings" + "time" + + "github.com/patriceckhart/zot/packages/agent/modes/bot" + "github.com/patriceckhart/zot/packages/provider" +) + +// Adapter implements bot.BotAdapter for Telegram. +type Adapter struct { + Client *Client + Cfg *Config // pointer so Run can mutate and persist + Save func(Config) error +} + +// NewAdapter creates a Telegram adapter. +func NewAdapter(client *Client, cfg *Config, save func(Config) error) *Adapter { + return &Adapter{Client: client, Cfg: cfg, Save: save} +} + +// Run drives the Telegram long-polling loop. It performs initial +// GetMe, handles pairing, and dispatches inbound messages to the +// generic handler / commandHandler callbacks. +func (a *Adapter) Run(ctx context.Context, + handler func(bot.InboundMessage), + commandHandler func(bot.Command, bot.InboundMessage), +) error { + if a.Cfg.BotToken == "" { + return fmt.Errorf("no bot token configured; run `zot bot setup` first") + } + me, err := a.Client.GetMe(ctx) + if err != nil { + return fmt.Errorf("getMe: %w", err) + } + // Keep the stored username/id in sync with the actual bot. + if a.Cfg.BotID != me.ID || a.Cfg.BotUsername != me.Username { + a.Cfg.BotID = me.ID + a.Cfg.BotUsername = me.Username + _ = a.Save(*a.Cfg) + } + + fmt.Printf("telegram bridge online as @%s (id=%d)\n", me.Username, me.ID) + if a.Cfg.AllowedUserID == 0 { + fmt.Println("no user paired yet — send /start to the bot from Telegram to claim it") + } else { + fmt.Printf("paired with telegram user id %d\n", a.Cfg.AllowedUserID) + } + + return a.pollLoop(ctx, handler, commandHandler) +} + +// pollLoop long-polls Telegram for updates and dispatches them. +func (a *Adapter) pollLoop(ctx context.Context, + handler func(bot.InboundMessage), + commandHandler func(bot.Command, bot.InboundMessage), +) error { + backoff := time.Second + for { + if err := ctx.Err(); err != nil { + return err + } + updates, err := a.Client.GetUpdates(ctx, a.Cfg.LastUpdateID+1, 30) + if err != nil { + if ctx.Err() != nil { + return ctx.Err() + } + fmt.Fprintln(stderr(), "telegram: getUpdates error:", err) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(backoff): + } + if backoff < 30*time.Second { + backoff *= 2 + } + continue + } + backoff = time.Second + for _, u := range updates { + a.handleUpdate(ctx, u, handler, commandHandler) + a.Cfg.LastUpdateID = u.UpdateID + _ = a.Save(*a.Cfg) + } + } +} + +// handleUpdate processes a single Telegram update. Telegram-specific +// concerns (pairing, user filtering, image download) live here; the +// generic callbacks are called for normal messages and commands. +func (a *Adapter) handleUpdate(ctx context.Context, u Update, + handler func(bot.InboundMessage), + commandHandler func(bot.Command, bot.InboundMessage), +) { + msg := u.Message + if msg == nil { + msg = u.Edited + } + if msg == nil || msg.From == nil || msg.From.IsBot { + return + } + if msg.Chat.Type != "private" { + return + } + + chanID := fmt.Sprintf("%d", msg.Chat.ID) + msgID := fmt.Sprintf("%d", msg.MessageID) + + // Pairing: first user who sends /start claims the bridge. + text := strings.TrimSpace(msg.Text) + if a.Cfg.AllowedUserID == 0 { + if strings.HasPrefix(text, "/start") { + a.Cfg.AllowedUserID = msg.From.ID + _ = a.Save(*a.Cfg) + _ = a.Client.SendMessage(ctx, msg.Chat.ID, + fmt.Sprintf("paired with @%s. send any message and i'll forward it to zot.", msg.From.Username), + msg.MessageID) + return + } + _ = a.Client.SendMessage(ctx, msg.Chat.ID, + "this bot isn't paired yet. send /start to claim it.", + msg.MessageID) + return + } + + // Enforce allowed user. + if msg.From.ID != a.Cfg.AllowedUserID { + _ = a.Client.SendMessage(ctx, msg.Chat.ID, + "this bot is paired with a different user.", + msg.MessageID) + return + } + + inbound := bot.InboundMessage{ + ChannelID: chanID, + MessageID: msgID, + } + + // Built-in commands that bypass the agent. + switch text { + case "/start": + commandHandler(bot.CmdStart, inbound) + return + case "/help": + commandHandler(bot.CmdHelp, inbound) + return + case "/status": + commandHandler(bot.CmdStatus, inbound) + return + case "/stop": + commandHandler(bot.CmdStop, inbound) + return + } + if bot.IsStopCommand(text) { + commandHandler(bot.CmdStop, inbound) + return + } + + // Build the prompt: combine text + caption; download image attachments. + prompt := strings.TrimSpace(msg.Text) + if msg.Caption != "" { + if prompt != "" { + prompt += "\n" + } + prompt += msg.Caption + } + + var images []provider.ImageBlock + if len(msg.Photo) > 0 { + largest := msg.Photo[len(msg.Photo)-1] + if data, mime, err := a.download(ctx, largest.FileID, ""); err == nil { + images = append(images, provider.ImageBlock{MimeType: mime, Data: data}) + } else { + fmt.Fprintln(stderr(), "telegram: download photo:", err) + } + } + if msg.Document != nil && isImageMIME(msg.Document.MimeType) { + if data, mime, err := a.download(ctx, msg.Document.FileID, msg.Document.MimeType); err == nil { + images = append(images, provider.ImageBlock{MimeType: mime, Data: data}) + } + } + + inbound.Text = prompt + inbound.Images = images + handler(inbound) +} + +// Send delivers a reply to a Telegram chat. channelID is parsed back +// to int64. Messages are chunked to 4000 runes (Telegram limit 4096). +func (a *Adapter) Send(ctx context.Context, channelID, text string) error { + chatID, err := strconv.ParseInt(channelID, 10, 64) + if err != nil { + return fmt.Errorf("invalid channelID %q: %w", channelID, err) + } + for _, chunk := range chunkMessage(text, 4000) { + if err := a.Client.SendMessage(ctx, chatID, chunk, 0); err != nil { + return err + } + } + return nil +} + +// IndicateWorking keeps Telegram's "typing..." indicator alive until +// the returned stop function is called. +func (a *Adapter) IndicateWorking(ctx context.Context, channelID string) (stop func()) { + chatID, err := strconv.ParseInt(channelID, 10, 64) + if err != nil { + return func() {} + } + tctx, cancel := context.WithCancel(ctx) + go func() { + for { + _ = a.Client.SendChatAction(tctx, chatID, "typing") + select { + case <-tctx.Done(): + return + case <-time.After(4 * time.Second): + } + } + }() + return cancel +} + +// StatusText returns the bot's @username for inclusion in /status. +func (a *Adapter) StatusText() string { + if a.Cfg.BotUsername != "" { + return "@" + a.Cfg.BotUsername + } + return "" +} + +// download fetches a file from Telegram and returns bytes + mime. +func (a *Adapter) download(ctx context.Context, fileID, mime string) ([]byte, string, error) { + f, err := a.Client.GetFile(ctx, fileID) + if err != nil { + return nil, "", err + } + data, err := a.Client.DownloadFile(ctx, f.FilePath) + if err != nil { + return nil, "", err + } + if mime == "" { + mime = guessImageMIME(f.FilePath) + } + return data, mime, nil +} diff --git a/packages/agent/modes/telegram/bot.go b/packages/agent/modes/telegram/bot.go index 560bfba..ac21f74 100644 --- a/packages/agent/modes/telegram/bot.go +++ b/packages/agent/modes/telegram/bot.go @@ -2,18 +2,17 @@ package telegram import ( "context" - "fmt" "strings" - "sync" - "time" + "github.com/patriceckhart/zot/packages/agent/modes/bot" "github.com/patriceckhart/zot/packages/core" - "github.com/patriceckhart/zot/packages/provider" ) -// Bot owns the Telegram polling loop and dispatches inbound DMs to -// the agent. It is a long-running goroutine; Run blocks until ctx -// cancels. +// Bot is a thin wrapper around bot.Runner + Adapter. It exists to +// keep the exported API stable for botcmd.go. +// +// Deprecated: Use telegram.NewAdapter + bot.NewRunner directly. +// Bot is kept for backward compatibility but is no longer used internally. type Bot struct { Client *Client Agent *core.Agent @@ -22,353 +21,29 @@ type Bot struct { Provider string AuthMethod string CWD string - // Save persists cfg to bot.json. Called whenever the bot pairs - // with a new allowed user or advances LastUpdateID. - Save func(Config) error - // RefreshCreds is called before every turn to pick up newly - // refreshed OAuth tokens. Optional; when nil, the bot uses the - // credential it was built with. Implementations typically call - // agent.ResolveCredentialFull which auto-refreshes expired tokens. + Save func(Config) error RefreshCreds func() error - mu sync.Mutex - busy bool - activeCtx context.CancelFunc - queue []queuedTurn - lastCtxInput int + runner *bot.Runner + adapter *Adapter } -// queuedTurn is an inbound DM waiting to become a prompt. -type queuedTurn struct { - chatID int64 - messageID int - prompt string - images []provider.ImageBlock -} - -// Run drives the bot. Returns when ctx is cancelled or GetMe fails. +// Run starts the bot. It constructs the adapter and runner on first +// call, then delegates to runner.Run. func (b *Bot) Run(ctx context.Context) error { - if b.Config.BotToken == "" { - return fmt.Errorf("no bot token configured; run `zot bot setup` first") + if b.adapter == nil { + b.adapter = NewAdapter(b.Client, &b.Config, b.Save) } - me, err := b.Client.GetMe(ctx) - if err != nil { - return fmt.Errorf("getMe: %w", err) + if b.runner == nil { + b.runner = bot.NewRunner(b.adapter, b.Agent, bot.Config{ + ZotHome: b.ZotHome, + Provider: b.Provider, + AuthMethod: b.AuthMethod, + CWD: b.CWD, + RefreshCreds: b.RefreshCreds, + }) } - // Keep the stored username/id in sync with the actual bot. - if b.Config.BotID != me.ID || b.Config.BotUsername != me.Username { - b.Config.BotID = me.ID - b.Config.BotUsername = me.Username - _ = b.Save(b.Config) - } - - fmt.Printf("telegram bridge online as @%s (id=%d)\n", me.Username, me.ID) - if b.Config.AllowedUserID == 0 { - fmt.Println("no user paired yet — send /start to the bot from Telegram to claim it") - } else { - fmt.Printf("paired with telegram user id %d\n", b.Config.AllowedUserID) - } - - return b.pollLoop(ctx) -} - -// pollLoop long-polls Telegram for updates and dispatches them. -func (b *Bot) pollLoop(ctx context.Context) error { - backoff := time.Second - for { - if err := ctx.Err(); err != nil { - return err - } - updates, err := b.Client.GetUpdates(ctx, b.Config.LastUpdateID+1, 30) - if err != nil { - if ctx.Err() != nil { - return ctx.Err() - } - fmt.Fprintln(stderr(), "telegram: getUpdates error:", err) - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(backoff): - } - if backoff < 30*time.Second { - backoff *= 2 - } - continue - } - backoff = time.Second - for _, u := range updates { - if err := b.handleUpdate(ctx, u); err != nil { - fmt.Fprintln(stderr(), "telegram: handleUpdate:", err) - } - b.Config.LastUpdateID = u.UpdateID - _ = b.Save(b.Config) - } - } -} - -// handleUpdate processes a single Telegram update. -func (b *Bot) handleUpdate(ctx context.Context, u Update) error { - msg := u.Message - if msg == nil { - msg = u.Edited - } - if msg == nil || msg.From == nil || msg.From.IsBot { - return nil - } - if msg.Chat.Type != "private" { - return nil - } - - // Pairing: first user who sends /start claims the bridge. - text := strings.TrimSpace(msg.Text) - if b.Config.AllowedUserID == 0 { - if strings.HasPrefix(text, "/start") { - b.Config.AllowedUserID = msg.From.ID - _ = b.Save(b.Config) - _ = b.Client.SendMessage(ctx, msg.Chat.ID, - fmt.Sprintf("paired with @%s. send any message and i'll forward it to zot.", msg.From.Username), - msg.MessageID) - return nil - } - _ = b.Client.SendMessage(ctx, msg.Chat.ID, - "this bot isn't paired yet. send /start to claim it.", - msg.MessageID) - return nil - } - - // Enforce allowed user. - if msg.From.ID != b.Config.AllowedUserID { - _ = b.Client.SendMessage(ctx, msg.Chat.ID, - "this bot is paired with a different user.", - msg.MessageID) - return nil - } - - // Built-in commands that bypass the agent. - switch text { - case "/start", "/help": - _ = b.Client.SendMessage(ctx, msg.Chat.ID, - "send me any message and i'll forward it to zot. attach an image and i'll pass it to the model. commands: /status, /stop, or plain stop.", - msg.MessageID) - return nil - case "/status": - return b.sendStatus(ctx, msg.Chat.ID, msg.MessageID) - case "/stop": - b.cancelActiveTurn(ctx, msg.Chat.ID, msg.MessageID) - return nil - } - if isStopCommand(text) { - b.cancelActiveTurn(ctx, msg.Chat.ID, msg.MessageID) - return nil - } - - // Build the prompt: combine text + caption; download image attachments. - prompt := strings.TrimSpace(msg.Text) - if msg.Caption != "" { - if prompt != "" { - prompt += "\n" - } - prompt += msg.Caption - } - - var images []provider.ImageBlock - if len(msg.Photo) > 0 { - // Photos arrive in multiple sizes; take the largest (last in the slice). - largest := msg.Photo[len(msg.Photo)-1] - if data, mime, err := b.download(ctx, largest.FileID, ""); err == nil { - images = append(images, provider.ImageBlock{MimeType: mime, Data: data}) - } else { - fmt.Fprintln(stderr(), "telegram: download photo:", err) - } - } - if msg.Document != nil && isImageMIME(msg.Document.MimeType) { - if data, mime, err := b.download(ctx, msg.Document.FileID, msg.Document.MimeType); err == nil { - images = append(images, provider.ImageBlock{MimeType: mime, Data: data}) - } - } - - if prompt == "" && len(images) == 0 { - return nil - } - - b.mu.Lock() - b.queue = append(b.queue, queuedTurn{ - chatID: msg.Chat.ID, - messageID: msg.MessageID, - prompt: prompt, - images: images, - }) - idle := !b.busy - b.mu.Unlock() - - if idle { - go b.drainQueue(ctx) - } - return nil -} - -// drainQueue runs queued turns one at a time until the queue is empty. -func (b *Bot) drainQueue(parent context.Context) { - for { - b.mu.Lock() - if len(b.queue) == 0 { - b.busy = false - b.activeCtx = nil - b.mu.Unlock() - return - } - t := b.queue[0] - b.queue = b.queue[1:] - b.busy = true - turnCtx, cancel := context.WithCancel(parent) - b.activeCtx = cancel - b.mu.Unlock() - - if b.RefreshCreds != nil { - if err := b.RefreshCreds(); err != nil { - fmt.Fprintln(stderr(), "telegram: refresh creds:", err) - } - } - b.runTurn(turnCtx, t) - cancel() - } -} - -// runTurn sends the queued prompt to the agent and streams the reply. -func (b *Bot) runTurn(ctx context.Context, t queuedTurn) { - stopTyping := b.startTyping(ctx, t.chatID) - defer stopTyping() - - var replyBuilder strings.Builder - var lastAssistantText string - var turnErr error - - sink := func(ev core.AgentEvent) { - switch e := ev.(type) { - case core.EvTextDelta: - replyBuilder.WriteString(e.Delta) - case core.EvUsage: - b.mu.Lock() - if e.Usage.InputTokens > 0 { - b.lastCtxInput = e.Usage.InputTokens + e.Usage.CacheReadTokens + e.Usage.CacheWriteTokens - } - b.mu.Unlock() - case core.EvAssistantMessage: - var sb strings.Builder - for _, c := range e.Message.Content { - if tb, ok := c.(provider.TextBlock); ok { - if sb.Len() > 0 { - sb.WriteString("\n") - } - sb.WriteString(tb.Text) - } - } - if sb.Len() > 0 { - lastAssistantText = sb.String() - } - replyBuilder.Reset() - case core.EvTurnEnd: - if e.Err != nil { - turnErr = e.Err - } - } - } - - if err := b.Agent.Prompt(ctx, t.prompt, t.images, sink); err != nil { - turnErr = err - } - - reply := strings.TrimSpace(lastAssistantText) - if reply == "" { - reply = strings.TrimSpace(replyBuilder.String()) - } - if turnErr != nil && ctx.Err() == nil { - reply = "error: " + turnErr.Error() - } - if reply == "" { - reply = "(no reply)" - } - // Telegram caps messages at 4096 chars. Chunk to be safe. - for _, chunk := range chunkMessage(reply, 4000) { - if err := b.Client.SendMessage(context.Background(), t.chatID, chunk, 0); err != nil { - fmt.Fprintln(stderr(), "telegram: sendMessage:", err) - break - } - } -} - -// startTyping keeps Telegram's "typing..." indicator alive until the -// returned stop function is called. -func (b *Bot) startTyping(ctx context.Context, chatID int64) func() { - tctx, cancel := context.WithCancel(ctx) - go func() { - for { - _ = b.Client.SendChatAction(tctx, chatID, "typing") - select { - case <-tctx.Done(): - return - case <-time.After(4 * time.Second): - } - } - }() - return cancel -} - -func (b *Bot) cancelActiveTurn(ctx context.Context, chatID int64, replyTo int) { - b.mu.Lock() - cancel := b.activeCtx - b.mu.Unlock() - if cancel != nil { - cancel() - _ = b.Client.SendMessage(ctx, chatID, "cancelled the current turn.", replyTo) - } else { - _ = b.Client.SendMessage(ctx, chatID, "nothing running.", replyTo) - } -} - -// sendStatus describes agent state to the Telegram user. -func (b *Bot) sendStatus(ctx context.Context, chatID int64, replyTo int) error { - b.mu.Lock() - busy := b.busy - queued := len(b.queue) - ctxUsed := b.lastCtxInput - providerName := b.Provider - authMethod := b.AuthMethod - cwd := b.CWD - b.mu.Unlock() - - model := b.Agent.Model - ctxMax := 0 - if m, err := provider.FindModel(providerName, model); err == nil { - ctxMax = m.ContextWindow - } - return b.Client.SendMessage(ctx, chatID, FormatStatus(StatusSnapshot{ - Provider: providerName, - Model: model, - CWD: cwd, - Usage: b.Agent.Cost(), - Subscription: authMethod == "oauth", - ContextUsed: ctxUsed, - ContextMax: ctxMax, - Busy: busy, - Queued: queued, - }), replyTo) -} - -// download fetches a file from Telegram and returns bytes + mime. -func (b *Bot) download(ctx context.Context, fileID, mime string) ([]byte, string, error) { - f, err := b.Client.GetFile(ctx, fileID) - if err != nil { - return nil, "", err - } - data, err := b.Client.DownloadFile(ctx, f.FilePath) - if err != nil { - return nil, "", err - } - if mime == "" { - mime = guessImageMIME(f.FilePath) - } - return data, mime, nil + return b.runner.Run(ctx) } // chunkMessage splits s into chunks no larger than limit runes, on line diff --git a/packages/agent/modes/telegram/commands.go b/packages/agent/modes/telegram/commands.go index c1af6bb..47a6b6b 100644 --- a/packages/agent/modes/telegram/commands.go +++ b/packages/agent/modes/telegram/commands.go @@ -1,11 +1,6 @@ package telegram -import "strings" +import "github.com/patriceckhart/zot/packages/agent/modes/bot" -// isStopCommand reports whether text should abort the active turn. -// Telegram users often type plain "stop" rather than bot-style -// "/stop"; keep this intentionally narrow so normal prompts like -// "stop doing X" still go to the agent. -func isStopCommand(text string) bool { - return strings.EqualFold(strings.TrimSpace(text), "stop") -} +// isStopCommand is a shim to bot.IsStopCommand for backward compatibility. +var isStopCommand = bot.IsStopCommand diff --git a/packages/agent/modes/telegram/status.go b/packages/agent/modes/telegram/status.go index 7038340..211bd88 100644 --- a/packages/agent/modes/telegram/status.go +++ b/packages/agent/modes/telegram/status.go @@ -1,120 +1,9 @@ package telegram -import ( - "fmt" - "os" - "strings" +import "github.com/patriceckhart/zot/packages/agent/modes/bot" - "github.com/patriceckhart/zot/packages/provider" -) +// StatusSnapshot is an alias for bot.StatusSnapshot for backward compatibility. +type StatusSnapshot = bot.StatusSnapshot -// StatusSnapshot is the small cross-host state bundle rendered for -// Telegram /status replies. -type StatusSnapshot struct { - Provider string - Model string - CWD string - Usage provider.Usage - Subscription bool - ContextUsed int - ContextMax int - Busy bool - Queued int -} - -// FormatStatus renders the same compact model/usage/cost/context -// information shown in the TUI status bar, plus the current directory. -func FormatStatus(s StatusSnapshot) string { - providerName := strings.TrimSpace(s.Provider) - model := strings.TrimSpace(s.Model) - if providerName == "" { - providerName = "unknown" - } - if model == "" { - model = "unknown" - } - - var stats []string - if s.Usage.InputTokens > 0 { - stats = append(stats, fmt.Sprintf("↑%s", formatTokens(s.Usage.InputTokens))) - } - if s.Usage.OutputTokens > 0 { - stats = append(stats, fmt.Sprintf("↓%s", formatTokens(s.Usage.OutputTokens))) - } - if s.Usage.CacheReadTokens > 0 { - stats = append(stats, fmt.Sprintf("R%s", formatTokens(s.Usage.CacheReadTokens))) - } - if s.Usage.CacheWriteTokens > 0 { - stats = append(stats, fmt.Sprintf("W%s", formatTokens(s.Usage.CacheWriteTokens))) - } - if s.Usage.CostUSD > 0 || s.Subscription { - cost := fmt.Sprintf("$%.3f", s.Usage.CostUSD) - if s.Subscription { - cost += " (sub)" - } - stats = append(stats, cost) - } - if ctx := contextUsage(s.ContextUsed, s.ContextMax); ctx != "" { - stats = append(stats, ctx) - } - - line := fmt.Sprintf("(%s) %s", providerName, model) - if len(stats) > 0 { - line += " " + strings.Join(stats, " ") - } - - state := "idle" - if s.Busy { - state = "working" - } - lines := []string{line, "state: " + state} - if s.Queued > 0 { - lines = append(lines, fmt.Sprintf("queued: %d", s.Queued)) - } - if cwd := shortenHome(strings.TrimSpace(s.CWD)); cwd != "" { - lines = append(lines, "cwd: "+cwd) - } - return strings.Join(lines, "\n") -} - -func contextUsage(used, max int) string { - if max <= 0 { - if used <= 0 { - return "" - } - return formatTokens(used) - } - pct := float64(used) / float64(max) * 100 - return fmt.Sprintf("%.1f%%/%s", pct, formatTokens(max)) -} - -func formatTokens(n int) string { - switch { - case n < 0: - return "0" - case n < 1000: - return fmt.Sprintf("%d", n) - case n < 10000: - return fmt.Sprintf("%.1fk", float64(n)/1000) - case n < 1_000_000: - return fmt.Sprintf("%dk", (n+500)/1000) - case n < 10_000_000: - return fmt.Sprintf("%.1fM", float64(n)/1_000_000) - default: - return fmt.Sprintf("%dM", (n+500_000)/1_000_000) - } -} - -func shortenHome(path string) string { - home, err := os.UserHomeDir() - if err != nil || home == "" { - return path - } - if path == home { - return "~" - } - if strings.HasPrefix(path, home+string(os.PathSeparator)) { - return "~" + strings.TrimPrefix(path, home) - } - return path -} +// FormatStatus is an alias for bot.FormatStatus for backward compatibility. +var FormatStatus = bot.FormatStatus