diff --git a/crates/colibri-daemon/src/daemon.rs b/crates/colibri-daemon/src/daemon.rs index 839fedf..f87118e 100644 --- a/crates/colibri-daemon/src/daemon.rs +++ b/crates/colibri-daemon/src/daemon.rs @@ -280,16 +280,20 @@ pub async fn heartbeat(state: &SharedState, _stall_timeout: Duration) { } }; match store.set_task_cost(task_id, &tc) { - Ok(t) => info!( - task_id = %task_id, - input_tokens = u.input_tokens, - output_tokens = u.output_tokens, - cache_read = u.cache_read_tokens, - cost = u.cost(), - success = tc.success, - status = %t.status.as_str(), - "task cost captured" - ), + Ok(t) => { + info!( + task_id = %task_id, + input_tokens = u.input_tokens, + output_tokens = u.output_tokens, + cache_read = u.cache_read_tokens, + cost = u.cost(), + success = tc.success, + status = %t.status.as_str(), + "task cost captured" + ); + // Best-effort push to mother for dashboard aggregation. + push_cost_to_mother(task_id, &tc); + } Err(e) => { warn!(task_id = %task_id, error = %e, "failed to write task cost") } @@ -308,6 +312,81 @@ pub async fn heartbeat(state: &SharedState, _stall_timeout: Duration) { } } +/// Best-effort push of task cost to mother node for dashboard aggregation. +/// +/// SSH's to mother with `report-task-cost` forced command, pipes TaskCost JSON +/// to stdin. Failures are logged as warnings — the local SQLite store remains +/// authoritative. +fn push_cost_to_mother(task_id: &str, tc: &colibri_store::TaskCost) { + let mother_host = match std::env::var("COLIBRI_MOTHER_HOST").ok() { + Some(h) => h, + None => return, + }; + let node_hostname = std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_string()); + + // Clone before moving into the spawned thread. + let task_id = task_id.to_string(); + let provider = tc.provider.clone(); + let model = tc.model.clone(); + let input_tokens = tc.input_tokens; + let output_tokens = tc.output_tokens; + let cache_read_tokens = tc.cache_read_tokens; + let cache_write_tokens = tc.cache_write_tokens; + let cost = tc.cost; + let success = tc.success; + + // Run SSH in a blocking thread — heartbeat is async, SSH is fast (<1s). + std::thread::spawn(move || { + use std::io::Write; + let payload = serde_json::json!({ + "node_hostname": node_hostname, + "task_id": task_id, + "provider": provider, + "model": model, + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "cache_read_tokens": cache_read_tokens, + "cache_write_tokens": cache_write_tokens, + "cost_usd": cost, + "success": success, + "finished_at": chrono::Utc::now().to_rfc3339(), + }); + let payload_line = serde_json::to_string(&payload).unwrap_or_default(); + let mut child = match std::process::Command::new("ssh") + .args([ + "-o", "BatchMode=yes", + "-o", "ConnectTimeout=5", + &mother_host, + "report-task-cost", + ]) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::piped()) + .spawn() + { + Ok(c) => c, + Err(e) => { + warn!(error = %e, "mother cost push: SSH spawn failed"); + return; + } + }; + if let Some(ref mut stdin) = child.stdin { + let _ = writeln!(stdin, "{payload_line}"); + } + match child.wait_with_output() { + Ok(out) if out.status.success() => { + debug!(task_id = %task_id, "mother cost push: ok"); + } + Ok(out) => { + let stderr = String::from_utf8_lossy(&out.stderr); + warn!(task_id = %task_id, stderr = %stderr.trim(), "mother cost push: failed"); + } + Err(e) => { + warn!(task_id = %task_id, error = %e, "mother cost push: error"); + } + } + }); +} async fn session_rotation(state: &SharedState) { let cm_str = state.cost_mode.read().await.clone(); let mut cost_mode = crate::cost::CostMode::parse(&cm_str).unwrap_or_default(); diff --git a/packaging/mother/colibri-mcp-ssh b/packaging/mother/colibri-mcp-ssh index 9310df2..2c00a70 100755 --- a/packaging/mother/colibri-mcp-ssh +++ b/packaging/mother/colibri-mcp-ssh @@ -5,9 +5,10 @@ # command with this script and stores the original in $SSH_ORIGINAL_COMMAND. # # Allowlist: -# "" → colibri-mcp in stdio MCP mode (persistent JSON-RPC channel) -# "tools" → colibri-mcp tools (one-shot discovery, debugging) -# everything else → rejected with exit 1 +# "" → colibri-mcp in stdio MCP mode (persistent JSON-RPC channel) +# "tools" → colibri-mcp tools (one-shot discovery, debugging) +# "report-task-cost" → read TaskCostSummary JSON from stdin, INSERT into task_costs +# everything else → rejected with exit 1 # # Why: the wrapper's job is to constrain what callers can do through the # SSH forced-command boundary. Without an allowlist, the caller can pass @@ -24,6 +25,31 @@ case "${SSH_ORIGINAL_COMMAND:-}" in "tools") exec /usr/local/bin/colibri-mcp tools ;; + "report-task-cost") + # Read TaskCostSummary JSON from stdin, INSERT into mother_hive.task_costs. + # Input: {"node_id":1,"task_id":"abc","provider":"deepseek","model":"deepseek-chat", + # "input_tokens":150,"output_tokens":80,"cache_read_tokens":200, + # "cache_write_tokens":50,"cost_usd":0.0042,"success":true, + # "finished_at":"2026-06-27T12:00:00Z"} + psql -d mother_hive -tA -v ON_ERROR_STOP=1 <<'PSQL' +INSERT INTO task_costs (node_id, task_id, provider, model, + input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, + cost_usd, success, finished_at) +SELECT + (j->>'node_id')::INTEGER, + j->>'task_id', + j->>'provider', + j->>'model', + COALESCE((j->>'input_tokens')::BIGINT, 0), + COALESCE((j->>'output_tokens')::BIGINT, 0), + COALESCE((j->>'cache_read_tokens')::BIGINT, 0), + COALESCE((j->>'cache_write_tokens')::BIGINT, 0), + COALESCE((j->>'cost_usd')::DOUBLE PRECISION, 0.0), + COALESCE((j->>'success')::BOOLEAN, false), + COALESCE((j->>'finished_at')::TIMESTAMPTZ, now()) +FROM (SELECT (pg_read_file('/dev/stdin')::JSONB) AS j) AS _; +PSQL + ;; *) printf '{"jsonrpc":"2.0","id":null,"error":{"code":-1,"message":"rejected: %s"}}\n' \ "$(printf '%s' "${SSH_ORIGINAL_COMMAND}" | sed 's/"/\\"/g')" >&2 diff --git a/packaging/mother/mother_schema.sql b/packaging/mother/mother_schema.sql index 28f2bb1..24329d6 100644 --- a/packaging/mother/mother_schema.sql +++ b/packaging/mother/mother_schema.sql @@ -51,6 +51,27 @@ CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON hive_nodes (last_seen DESC); CREATE INDEX IF NOT EXISTS idx_nodes_type ON hive_nodes (node_type); CREATE INDEX IF NOT EXISTS idx_nodes_cap_has_gpu ON hive_nodes ((capabilities->>'has_gpu')); +-- Per-task cost records pushed by daemon heartbeat after agent exit. +-- One row per task completion; aggregated for dashboard queries. +CREATE TABLE IF NOT EXISTS task_costs ( + id BIGSERIAL PRIMARY KEY, + node_id INTEGER REFERENCES hive_nodes(id) ON DELETE SET NULL, + task_id TEXT NOT NULL, -- daemon-local task id + provider TEXT, -- e.g. deepseek, openrouter + model TEXT, -- e.g. deepseek-chat + input_tokens BIGINT NOT NULL DEFAULT 0, + output_tokens BIGINT NOT NULL DEFAULT 0, + cache_read_tokens BIGINT NOT NULL DEFAULT 0, + cache_write_tokens BIGINT NOT NULL DEFAULT 0, + cost_usd DOUBLE PRECISION NOT NULL DEFAULT 0.0, + success BOOLEAN NOT NULL DEFAULT false, + finished_at TIMESTAMPTZ NOT NULL DEFAULT now(), + reported_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +CREATE INDEX IF NOT EXISTS idx_task_costs_node ON task_costs (node_id); +CREATE INDEX IF NOT EXISTS idx_task_costs_finished ON task_costs (finished_at DESC); +CREATE INDEX IF NOT EXISTS idx_task_costs_provider ON task_costs (provider, model); + CREATE TABLE IF NOT EXISTS build_queue ( id SERIAL PRIMARY KEY, node_id INTEGER REFERENCES hive_nodes(id),