layered-soul/scripts/colibri_poll.py

22 lines
865 B
Python
Raw Normal View History

#!/usr/bin/env python3
"""Colibri task poller — checks the board for tasks assigned to an agent."""
import json, subprocess, sys, argparse
def poll(agent_name, socket_path, colibri_bin="colibri"):
result = subprocess.run(
[colibri_bin, "--socket", socket_path, "list-tasks", "--status", "started"],
capture_output=True, text=True, timeout=10
)
tasks = json.loads(result.stdout)
mine = [t for t in tasks if t.get("agent_name") == agent_name or t.get("agent_id") == agent_name]
print(json.dumps(mine, indent=2))
return 0 if mine else 1
if __name__ == "__main__":
p = argparse.ArgumentParser()
p.add_argument("--agent", required=True)
p.add_argument("--socket", required=True)
p.add_argument("--colibri", default="colibri")
args = p.parse_args()
sys.exit(poll(args.agent, args.socket, args.colibri))