#!/usr/bin/env python3 """Mark a Colibri task as done or failed. Usage: colibri_task_done.py [done|failed] """ import json import os import socket import sys SOCKET_PATH = os.environ.get("COLIBRI_SOCKET", "/var/run/colibri/colibri.sock") def _send_cmd(cmd: dict) -> dict: payload = (json.dumps(cmd) + "\n").encode() try: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(5) sock.connect(SOCKET_PATH) sock.sendall(payload) sock.shutdown(socket.SHUT_WR) chunks = [] while True: data = sock.recv(4096) if not data: break chunks.append(data) sock.close() return json.loads(b"".join(chunks).decode()) except Exception as e: print(json.dumps({"ok": False, "error": str(e)})) sys.exit(1) def main(): if len(sys.argv) < 2: print("Usage: colibri_task_done.py [done|failed]", file=sys.stderr) sys.exit(1) task_id = sys.argv[1] status = sys.argv[2] if len(sys.argv) > 2 else "done" resp = _send_cmd({"cmd": "transition-task", "task_id": task_id, "status": status}) print(json.dumps(resp, indent=2)) sys.exit(0 if resp.get("ok") else 1) if __name__ == "__main__": main()