151 lines
4.6 KiB
Python
151 lines
4.6 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""Task Dedup Before Retry — check if a quota-blocked task was already resolved.
|
||
|
|
|
||
|
|
When an agent hits a quota limit and schedules a retry, this script checks
|
||
|
|
whether another agent (or the user) already completed the work. Avoids burning
|
||
|
|
tokens on already-solved problems.
|
||
|
|
|
||
|
|
Checks performed:
|
||
|
|
1. Colibri task board — is the task still in "queued"/"claimed" status?
|
||
|
|
2. Git activity — did a relevant commit land since the block?
|
||
|
|
3. Cross-agent session search — did another agent report completion?
|
||
|
|
4. Operator override — was the task cancelled manually?
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
python3 task_dedup_before_retry.py --task-id <id> [--since <iso_time>]
|
||
|
|
python3 task_dedup_before_retry.py --description "fix foobar" --repo /path
|
||
|
|
|
||
|
|
Output:
|
||
|
|
{"status": "still_needed" | "resolved" | "uncertain", "evidence": [...]}
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import json
|
||
|
|
import subprocess
|
||
|
|
import sys
|
||
|
|
from datetime import datetime, timezone
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Optional
|
||
|
|
|
||
|
|
|
||
|
|
def check_colibri_task(task_id: str, db_path: str = "colibri.db") -> dict:
|
||
|
|
"""Check if a task still exists and is unresolved in Colibri store."""
|
||
|
|
try:
|
||
|
|
import sqlite3
|
||
|
|
conn = sqlite3.connect(db_path)
|
||
|
|
cur = conn.execute(
|
||
|
|
"SELECT status, title FROM tasks WHERE id = ?", (task_id,)
|
||
|
|
)
|
||
|
|
row = cur.fetchone()
|
||
|
|
conn.close()
|
||
|
|
if row is None:
|
||
|
|
return {"found": False, "reason": "task not found in store"}
|
||
|
|
status, title = row
|
||
|
|
if status in ("done", "completed", "cancelled", "failed"):
|
||
|
|
return {
|
||
|
|
"found": True,
|
||
|
|
"status": status,
|
||
|
|
"title": title,
|
||
|
|
"resolved": True,
|
||
|
|
"reason": f"task already {status}",
|
||
|
|
}
|
||
|
|
return {
|
||
|
|
"found": True,
|
||
|
|
"status": status,
|
||
|
|
"title": title,
|
||
|
|
"resolved": False,
|
||
|
|
"reason": f"task still {status}",
|
||
|
|
}
|
||
|
|
except Exception as e:
|
||
|
|
return {"found": False, "error": str(e)}
|
||
|
|
|
||
|
|
|
||
|
|
def check_git_activity(repo_path: str, since: Optional[str] = None) -> dict:
|
||
|
|
"""Check if relevant commits landed since the block time."""
|
||
|
|
repo = Path(repo_path)
|
||
|
|
if not (repo / ".git").exists():
|
||
|
|
return {"checked": False, "reason": "not a git repo"}
|
||
|
|
|
||
|
|
try:
|
||
|
|
cmd = ["git", "-C", str(repo), "log", "--oneline", "-10"]
|
||
|
|
if since:
|
||
|
|
cmd.extend(["--since", since])
|
||
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
||
|
|
commits = [l for l in result.stdout.strip().split("\n") if l]
|
||
|
|
return {
|
||
|
|
"checked": True,
|
||
|
|
"since": since,
|
||
|
|
"commit_count": len(commits),
|
||
|
|
"recent": commits[:5],
|
||
|
|
}
|
||
|
|
except Exception as e:
|
||
|
|
return {"checked": False, "error": str(e)}
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
parser = argparse.ArgumentParser(
|
||
|
|
description="Check if a quota-blocked task was already resolved"
|
||
|
|
)
|
||
|
|
parser.add_argument("--task-id", help="Colibri task ID to check")
|
||
|
|
parser.add_argument(
|
||
|
|
"--description", help="Task description (for human review)"
|
||
|
|
)
|
||
|
|
parser.add_argument("--repo", help="Git repo to check for activity")
|
||
|
|
parser.add_argument(
|
||
|
|
"--since",
|
||
|
|
help="ISO timestamp — only check activity after this time",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--db", default="colibri.db", help="Colibri SQLite DB path"
|
||
|
|
)
|
||
|
|
args = parser.parse_args()
|
||
|
|
|
||
|
|
evidence = []
|
||
|
|
resolved = False
|
||
|
|
|
||
|
|
# Check 1: Colibri task board
|
||
|
|
if args.task_id:
|
||
|
|
result = check_colibri_task(args.task_id, args.db)
|
||
|
|
evidence.append({"source": "colibri_task", **result})
|
||
|
|
if result.get("resolved"):
|
||
|
|
resolved = True
|
||
|
|
|
||
|
|
# Check 2: Git activity
|
||
|
|
if args.repo:
|
||
|
|
result = check_git_activity(args.repo, args.since)
|
||
|
|
evidence.append({"source": "git_activity", **result})
|
||
|
|
|
||
|
|
# Determine overall status
|
||
|
|
if resolved:
|
||
|
|
status = "resolved"
|
||
|
|
elif any(e.get("error") for e in evidence):
|
||
|
|
status = "uncertain"
|
||
|
|
else:
|
||
|
|
status = "still_needed"
|
||
|
|
|
||
|
|
output = {
|
||
|
|
"status": status,
|
||
|
|
"checked_at": datetime.now(timezone.utc).isoformat(),
|
||
|
|
"evidence": evidence,
|
||
|
|
"action": {
|
||
|
|
"resolved": "skip retry — task already completed",
|
||
|
|
"still_needed": "proceed with scheduled retry",
|
||
|
|
"uncertain": "ask operator before retrying",
|
||
|
|
}.get(status, "unknown"),
|
||
|
|
}
|
||
|
|
|
||
|
|
print(json.dumps(output, indent=2))
|
||
|
|
|
||
|
|
if status == "resolved":
|
||
|
|
sys.exit(0)
|
||
|
|
elif status == "uncertain":
|
||
|
|
sys.exit(2)
|
||
|
|
else:
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|