244 lines
7.9 KiB
Python
244 lines
7.9 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""Quota Reset ETR — calculate Estimated Time to Reset from provider error messages.
|
||
|
|
|
||
|
|
Parses quota-exhaustion errors from common providers (Z.AI/GLM, DeepSeek,
|
||
|
|
OpenRouter, Anthropic, OpenAI) and computes the reset time in UTC and local
|
||
|
|
time. Outputs a cron-ready timestamp for scheduling retry.
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
python3 quota_reset_etr.py "error message or timestamp string"
|
||
|
|
python3 quota_reset_etr.py "2026-06-18 15:43:27" --tz Asia/Shanghai
|
||
|
|
|
||
|
|
Output:
|
||
|
|
{
|
||
|
|
"status": "quota_exhausted",
|
||
|
|
"reset_utc": "2026-06-18T07:43:27Z",
|
||
|
|
"reset_local": "2026-06-18T09:43:27+02:00",
|
||
|
|
"cron_iso": "2026-06-18T09:44:00+02:00",
|
||
|
|
"provider": "z.ai",
|
||
|
|
"confidence": "high",
|
||
|
|
"retry_after_seconds": 12345,
|
||
|
|
"human": "Resets Jun 18 at 09:43 CEST (09:44 safe retry)"
|
||
|
|
}
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import json
|
||
|
|
import re
|
||
|
|
import sys
|
||
|
|
from datetime import datetime, timedelta, timezone
|
||
|
|
from typing import Optional
|
||
|
|
|
||
|
|
# ── Provider timezone map ──────────────────────────────
|
||
|
|
# Sources of truth per provider, verified against real error messages.
|
||
|
|
|
||
|
|
PROVIDER_TZ = {
|
||
|
|
"z.ai": "Asia/Shanghai", # Zhipu AI — Beijing UTC+8
|
||
|
|
"glm": "Asia/Shanghai", # GLM API — same as z.ai
|
||
|
|
"deepseek": "Asia/Shanghai", # DeepSeek — Hangzhou UTC+8
|
||
|
|
"openrouter": "UTC", # OpenRouter — UTC
|
||
|
|
"anthropic": "America/Los_Angeles", # Anthropic — PST/PDT
|
||
|
|
"openai": "America/Los_Angeles", # OpenAI — PST/PDT
|
||
|
|
}
|
||
|
|
|
||
|
|
# ── Error message patterns ─────────────────────────────
|
||
|
|
|
||
|
|
# Chinese-style: "您的限额将在 2026-06-18 15:43:27 重置"
|
||
|
|
CN_RESET_PATTERN = re.compile(
|
||
|
|
r"(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s*重置"
|
||
|
|
)
|
||
|
|
|
||
|
|
# English-style: "quota will reset on 2026-06-18T15:43:27Z"
|
||
|
|
EN_RESET_PATTERN = re.compile(
|
||
|
|
r"reset\s+(?:on\s+)?(\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2})",
|
||
|
|
re.IGNORECASE,
|
||
|
|
)
|
||
|
|
|
||
|
|
# ISO 8601 with timezone: "2026-06-18T15:43:27+08:00"
|
||
|
|
ISO_TZ_PATTERN = re.compile(
|
||
|
|
r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}[+-]\d{2}:\d{2})"
|
||
|
|
)
|
||
|
|
|
||
|
|
# Retry-After header style: seconds
|
||
|
|
RETRY_AFTER_PATTERN = re.compile(
|
||
|
|
r"retry[_-]after[:\s]+(\d+)", re.IGNORECASE
|
||
|
|
)
|
||
|
|
|
||
|
|
# Plain timestamp: "2026-06-18 15:43:27"
|
||
|
|
PLAIN_TS_PATTERN = re.compile(
|
||
|
|
r"(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})"
|
||
|
|
)
|
||
|
|
|
||
|
|
# ── Parse functions ────────────────────────────────────
|
||
|
|
|
||
|
|
def parse_reset_time(
|
||
|
|
error_text: str,
|
||
|
|
provider_tz: Optional[str] = None,
|
||
|
|
) -> tuple[datetime, str, str]:
|
||
|
|
"""Parse a quota error message and return (reset_utc, provider, confidence).
|
||
|
|
|
||
|
|
Returns (datetime, provider_name, confidence_level).
|
||
|
|
Confidence: "high" (explicit timestamp), "medium" (provider default),
|
||
|
|
"low" (generic fallback).
|
||
|
|
"""
|
||
|
|
# Try ISO with timezone first (gold standard)
|
||
|
|
m = ISO_TZ_PATTERN.search(error_text)
|
||
|
|
if m:
|
||
|
|
dt = datetime.fromisoformat(m.group(1))
|
||
|
|
return dt.astimezone(timezone.utc), "iso8601", "high"
|
||
|
|
|
||
|
|
# Try Chinese reset pattern
|
||
|
|
m = CN_RESET_PATTERN.search(error_text)
|
||
|
|
if m:
|
||
|
|
dt = datetime.strptime(m.group(1), "%Y-%m-%d %H:%M:%S")
|
||
|
|
tz_name = provider_tz or "Asia/Shanghai"
|
||
|
|
return _to_utc(dt, tz_name), "z.ai", "high"
|
||
|
|
|
||
|
|
# Try English reset pattern
|
||
|
|
m = EN_RESET_PATTERN.search(error_text)
|
||
|
|
if m:
|
||
|
|
ts = m.group(1).replace(" ", "T")
|
||
|
|
# Check if timezone suffix present
|
||
|
|
if "+" in ts or ts.endswith("Z"):
|
||
|
|
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
|
||
|
|
else:
|
||
|
|
dt = datetime.strptime(ts[:19], "%Y-%m-%dT%H:%M:%S")
|
||
|
|
tz_name = provider_tz or "UTC"
|
||
|
|
dt = _to_utc(dt, tz_name)
|
||
|
|
return dt.astimezone(timezone.utc), "generic", "high"
|
||
|
|
|
||
|
|
# Try plain timestamp
|
||
|
|
m = PLAIN_TS_PATTERN.search(error_text)
|
||
|
|
if m:
|
||
|
|
dt = datetime.strptime(m.group(1), "%Y-%m-%d %H:%M:%S")
|
||
|
|
tz_name = provider_tz or "UTC"
|
||
|
|
return _to_utc(dt, tz_name), "generic", "medium"
|
||
|
|
|
||
|
|
# Try Retry-After (seconds from now)
|
||
|
|
m = RETRY_AFTER_PATTERN.search(error_text)
|
||
|
|
if m:
|
||
|
|
seconds = int(m.group(1))
|
||
|
|
reset = datetime.now(timezone.utc) + timedelta(seconds=seconds)
|
||
|
|
return reset, "generic", "medium"
|
||
|
|
|
||
|
|
# Generic 429 fallback: guess 1 hour
|
||
|
|
if "429" in error_text or "quota" in error_text.lower():
|
||
|
|
reset = datetime.now(timezone.utc) + timedelta(hours=1)
|
||
|
|
return reset, "generic", "low"
|
||
|
|
|
||
|
|
raise ValueError(f"Cannot parse reset time from: {error_text[:200]}")
|
||
|
|
|
||
|
|
|
||
|
|
def _to_utc(dt: datetime, tz_name: str) -> datetime:
|
||
|
|
"""Convert a naive datetime from a named timezone to UTC."""
|
||
|
|
import zoneinfo
|
||
|
|
tz = zoneinfo.ZoneInfo(tz_name)
|
||
|
|
return dt.replace(tzinfo=tz).astimezone(timezone.utc)
|
||
|
|
|
||
|
|
|
||
|
|
def detect_provider(error_text: str) -> Optional[str]:
|
||
|
|
"""Guess the provider from error text patterns."""
|
||
|
|
text = error_text.lower()
|
||
|
|
# Chinese providers
|
||
|
|
if "z.ai" in text or "智谱" in text or "zhipu" in text:
|
||
|
|
return "z.ai"
|
||
|
|
if "glm" in text or "使用上限" in error_text or "重置" in error_text:
|
||
|
|
return "glm" # GLM API — Chinese quota messages
|
||
|
|
if "deepseek" in text:
|
||
|
|
return "deepseek"
|
||
|
|
# Western providers
|
||
|
|
if "openrouter" in text:
|
||
|
|
return "openrouter"
|
||
|
|
if "anthropic" in text:
|
||
|
|
return "anthropic"
|
||
|
|
if "openai" in text:
|
||
|
|
return "openai"
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
# ── Main ───────────────────────────────────────────────
|
||
|
|
|
||
|
|
def main():
|
||
|
|
parser = argparse.ArgumentParser(
|
||
|
|
description="Calculate quota reset ETR from provider error messages"
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"input",
|
||
|
|
nargs="?",
|
||
|
|
help="Error message or timestamp string (reads stdin if omitted)",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--tz",
|
||
|
|
help="Timezone override (e.g., Asia/Shanghai, UTC, America/Los_Angeles)",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--cron-offset",
|
||
|
|
type=int,
|
||
|
|
default=60,
|
||
|
|
help="Seconds to add for safe cron retry window (default: 60)",
|
||
|
|
)
|
||
|
|
args = parser.parse_args()
|
||
|
|
|
||
|
|
# Read input
|
||
|
|
if args.input:
|
||
|
|
text = args.input
|
||
|
|
elif not sys.stdin.isatty():
|
||
|
|
text = sys.stdin.read().strip()
|
||
|
|
else:
|
||
|
|
parser.print_help()
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
if not text:
|
||
|
|
print(json.dumps({"status": "error", "message": "empty input"}))
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
# Detect provider
|
||
|
|
provider = detect_provider(text)
|
||
|
|
provider_tz = args.tz or PROVIDER_TZ.get(provider or "", "UTC")
|
||
|
|
|
||
|
|
try:
|
||
|
|
reset_utc, detected_provider, confidence = parse_reset_time(
|
||
|
|
text, provider_tz
|
||
|
|
)
|
||
|
|
except ValueError as e:
|
||
|
|
print(json.dumps({"status": "error", "message": str(e)}))
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
provider = provider or detected_provider
|
||
|
|
now_utc = datetime.now(timezone.utc)
|
||
|
|
retry_after = max(0, int((reset_utc - now_utc).total_seconds()))
|
||
|
|
|
||
|
|
# Safe cron time: reset + buffer
|
||
|
|
cron_dt = reset_utc + timedelta(seconds=args.cron_offset)
|
||
|
|
|
||
|
|
# Local time for display
|
||
|
|
local_tz = datetime.now().astimezone().tzinfo
|
||
|
|
reset_local = reset_utc.astimezone(local_tz)
|
||
|
|
|
||
|
|
result = {
|
||
|
|
"status": "quota_exhausted",
|
||
|
|
"provider": provider,
|
||
|
|
"confidence": confidence,
|
||
|
|
"reset_utc": reset_utc.strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||
|
|
"reset_local": reset_local.strftime("%Y-%m-%dT%H:%M:%S%z"),
|
||
|
|
"cron_iso": cron_dt.strftime("%Y-%m-%dT%H:%M:%S%z"),
|
||
|
|
"retry_after_seconds": retry_after,
|
||
|
|
"human": (
|
||
|
|
f"Resets {reset_local.strftime('%b %d at %H:%M')} "
|
||
|
|
f"{reset_local.tzname()} "
|
||
|
|
f"({cron_dt.strftime('%H:%M')} safe retry)"
|
||
|
|
),
|
||
|
|
}
|
||
|
|
|
||
|
|
print(json.dumps(result, indent=2))
|
||
|
|
|
||
|
|
if confidence == "low":
|
||
|
|
print("\n# WARNING: low confidence — verify manually", file=sys.stderr)
|
||
|
|
sys.exit(2)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|