layered-soul/scripts/task_dedup_before_retry.py

151 lines
4.6 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""Task Dedup Before Retry — check if a quota-blocked task was already resolved.
When an agent hits a quota limit and schedules a retry, this script checks
whether another agent (or the user) already completed the work. Avoids burning
tokens on already-solved problems.
Checks performed:
1. Colibri task board is the task still in "queued"/"claimed" status?
2. Git activity did a relevant commit land since the block?
3. Cross-agent session search did another agent report completion?
4. Operator override was the task cancelled manually?
Usage:
python3 task_dedup_before_retry.py --task-id <id> [--since <iso_time>]
python3 task_dedup_before_retry.py --description "fix foobar" --repo /path
Output:
{"status": "still_needed" | "resolved" | "uncertain", "evidence": [...]}
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
def check_colibri_task(task_id: str, db_path: str = "colibri.db") -> dict:
"""Check if a task still exists and is unresolved in Colibri store."""
try:
import sqlite3
conn = sqlite3.connect(db_path)
cur = conn.execute(
"SELECT status, title FROM tasks WHERE id = ?", (task_id,)
)
row = cur.fetchone()
conn.close()
if row is None:
return {"found": False, "reason": "task not found in store"}
status, title = row
if status in ("done", "completed", "cancelled", "failed"):
return {
"found": True,
"status": status,
"title": title,
"resolved": True,
"reason": f"task already {status}",
}
return {
"found": True,
"status": status,
"title": title,
"resolved": False,
"reason": f"task still {status}",
}
except Exception as e:
return {"found": False, "error": str(e)}
def check_git_activity(repo_path: str, since: Optional[str] = None) -> dict:
"""Check if relevant commits landed since the block time."""
repo = Path(repo_path)
if not (repo / ".git").exists():
return {"checked": False, "reason": "not a git repo"}
try:
cmd = ["git", "-C", str(repo), "log", "--oneline", "-10"]
if since:
cmd.extend(["--since", since])
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
commits = [l for l in result.stdout.strip().split("\n") if l]
return {
"checked": True,
"since": since,
"commit_count": len(commits),
"recent": commits[:5],
}
except Exception as e:
return {"checked": False, "error": str(e)}
def main():
parser = argparse.ArgumentParser(
description="Check if a quota-blocked task was already resolved"
)
parser.add_argument("--task-id", help="Colibri task ID to check")
parser.add_argument(
"--description", help="Task description (for human review)"
)
parser.add_argument("--repo", help="Git repo to check for activity")
parser.add_argument(
"--since",
help="ISO timestamp — only check activity after this time",
)
parser.add_argument(
"--db", default="colibri.db", help="Colibri SQLite DB path"
)
args = parser.parse_args()
evidence = []
resolved = False
# Check 1: Colibri task board
if args.task_id:
result = check_colibri_task(args.task_id, args.db)
evidence.append({"source": "colibri_task", **result})
if result.get("resolved"):
resolved = True
# Check 2: Git activity
if args.repo:
result = check_git_activity(args.repo, args.since)
evidence.append({"source": "git_activity", **result})
# Determine overall status
if resolved:
status = "resolved"
elif any(e.get("error") for e in evidence):
status = "uncertain"
else:
status = "still_needed"
output = {
"status": status,
"checked_at": datetime.now(timezone.utc).isoformat(),
"evidence": evidence,
"action": {
"resolved": "skip retry — task already completed",
"still_needed": "proceed with scheduled retry",
"uncertain": "ask operator before retrying",
}.get(status, "unknown"),
}
print(json.dumps(output, indent=2))
if status == "resolved":
sys.exit(0)
elif status == "uncertain":
sys.exit(2)
else:
sys.exit(1)
if __name__ == "__main__":
main()