#!/usr/bin/env python3 """Task Dedup Before Retry — check if a quota-blocked task was already resolved. When an agent hits a quota limit and schedules a retry, this script checks whether another agent (or the user) already completed the work. Avoids burning tokens on already-solved problems. Checks performed: 1. Colibri task board — is the task still in "queued"/"claimed" status? 2. Git activity — did a relevant commit land since the block? 3. Cross-agent session search — did another agent report completion? 4. Operator override — was the task cancelled manually? Usage: python3 task_dedup_before_retry.py --task-id [--since ] python3 task_dedup_before_retry.py --description "fix foobar" --repo /path Output: {"status": "still_needed" | "resolved" | "uncertain", "evidence": [...]} """ from __future__ import annotations import argparse import json import subprocess import sys from datetime import datetime, timezone from pathlib import Path from typing import Optional def check_colibri_task(task_id: str, db_path: str = "colibri.db") -> dict: """Check if a task still exists and is unresolved in Colibri store.""" try: import sqlite3 conn = sqlite3.connect(db_path) cur = conn.execute( "SELECT status, title FROM tasks WHERE id = ?", (task_id,) ) row = cur.fetchone() conn.close() if row is None: return {"found": False, "reason": "task not found in store"} status, title = row if status in ("done", "completed", "cancelled", "failed"): return { "found": True, "status": status, "title": title, "resolved": True, "reason": f"task already {status}", } return { "found": True, "status": status, "title": title, "resolved": False, "reason": f"task still {status}", } except Exception as e: return {"found": False, "error": str(e)} def check_git_activity(repo_path: str, since: Optional[str] = None) -> dict: """Check if relevant commits landed since the block time.""" repo = Path(repo_path) if not (repo / ".git").exists(): return {"checked": False, "reason": "not a git repo"} try: cmd = ["git", "-C", str(repo), "log", "--oneline", "-10"] if since: cmd.extend(["--since", since]) result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) commits = [l for l in result.stdout.strip().split("\n") if l] return { "checked": True, "since": since, "commit_count": len(commits), "recent": commits[:5], } except Exception as e: return {"checked": False, "error": str(e)} def main(): parser = argparse.ArgumentParser( description="Check if a quota-blocked task was already resolved" ) parser.add_argument("--task-id", help="Colibri task ID to check") parser.add_argument( "--description", help="Task description (for human review)" ) parser.add_argument("--repo", help="Git repo to check for activity") parser.add_argument( "--since", help="ISO timestamp — only check activity after this time", ) parser.add_argument( "--db", default="colibri.db", help="Colibri SQLite DB path" ) args = parser.parse_args() evidence = [] resolved = False # Check 1: Colibri task board if args.task_id: result = check_colibri_task(args.task_id, args.db) evidence.append({"source": "colibri_task", **result}) if result.get("resolved"): resolved = True # Check 2: Git activity if args.repo: result = check_git_activity(args.repo, args.since) evidence.append({"source": "git_activity", **result}) # Determine overall status if resolved: status = "resolved" elif any(e.get("error") for e in evidence): status = "uncertain" else: status = "still_needed" output = { "status": status, "checked_at": datetime.now(timezone.utc).isoformat(), "evidence": evidence, "action": { "resolved": "skip retry — task already completed", "still_needed": "proceed with scheduled retry", "uncertain": "ask operator before retrying", }.get(status, "unknown"), } print(json.dumps(output, indent=2)) if status == "resolved": sys.exit(0) elif status == "uncertain": sys.exit(2) else: sys.exit(1) if __name__ == "__main__": main()