#!/usr/bin/env python3 """Colibri task poller — checks the board for tasks assigned to an agent. Defaults from env vars so cron can run it without arguments. Matches by agent UUID (COLIBRI_AGENT_ID) since tasks reference agent_id, not name. Queries COLIBRI_SOCKET directly, no CLI dependency needed.""" import json import os import socket import sys import time AGENT_ID = os.environ.get("COLIBRI_AGENT_ID", "") SOCKET_PATH = os.environ.get("COLIBRI_SOCKET", "/var/run/colibri/colibri.sock") def _send_cmd(cmd: dict) -> dict: """Send a JSON command to the Colibri daemon socket, return response.""" payload = (json.dumps(cmd) + "\n").encode() try: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(5) sock.connect(SOCKET_PATH) sock.sendall(payload) sock.shutdown(socket.SHUT_WR) chunks = [] while True: data = sock.recv(4096) if not data: break chunks.append(data) sock.close() return json.loads(b"".join(chunks).decode()) except Exception as e: print(json.dumps({"error": str(e)})) sys.exit(0) def resolve_agent_id(name_or_id: str) -> str: """If given a UUID, return it. If given a name, look up the ID from the daemon.""" if len(name_or_id) > 30 and "-" in name_or_id: return name_or_id # looks like a UUID resp = _send_cmd({"cmd": "list-agents"}) if not resp.get("ok"): return name_or_id for agent in resp.get("data", []): if agent.get("name") == name_or_id: return agent["id"] return name_or_id def main(): agent_id = resolve_agent_id(AGENT_ID) if not agent_id: print("[]") sys.exit(0) resp = _send_cmd({"cmd": "list-tasks", "status": "started"}) if not resp.get("ok"): print("[]") sys.exit(0) tasks = resp.get("data", []) mine = [t for t in tasks if t.get("agent_id") == agent_id] if mine: print(json.dumps(mine, indent=2)) else: print("[]") if __name__ == "__main__": main()