feat(mother): Layer 2 — task_costs table + daemon push pipeline #232

Merged
clawdie merged 1 commit from feat/layer2-dashboard-pipeline into main 2026-06-27 13:57:29 +02:00
3 changed files with 139 additions and 13 deletions

View file

@ -280,16 +280,20 @@ pub async fn heartbeat(state: &SharedState, _stall_timeout: Duration) {
}
};
match store.set_task_cost(task_id, &tc) {
Ok(t) => info!(
task_id = %task_id,
input_tokens = u.input_tokens,
output_tokens = u.output_tokens,
cache_read = u.cache_read_tokens,
cost = u.cost(),
success = tc.success,
status = %t.status.as_str(),
"task cost captured"
),
Ok(t) => {
info!(
task_id = %task_id,
input_tokens = u.input_tokens,
output_tokens = u.output_tokens,
cache_read = u.cache_read_tokens,
cost = u.cost(),
success = tc.success,
status = %t.status.as_str(),
"task cost captured"
);
// Best-effort push to mother for dashboard aggregation.
push_cost_to_mother(task_id, &tc);
}
Err(e) => {
warn!(task_id = %task_id, error = %e, "failed to write task cost")
}
@ -308,6 +312,81 @@ pub async fn heartbeat(state: &SharedState, _stall_timeout: Duration) {
}
}
/// Best-effort push of task cost to mother node for dashboard aggregation.
///
/// SSH's to mother with `report-task-cost` forced command, pipes TaskCost JSON
/// to stdin. Failures are logged as warnings — the local SQLite store remains
/// authoritative.
fn push_cost_to_mother(task_id: &str, tc: &colibri_store::TaskCost) {
let mother_host = match std::env::var("COLIBRI_MOTHER_HOST").ok() {
Some(h) => h,
None => return,
};
let node_hostname = std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_string());
// Clone before moving into the spawned thread.
let task_id = task_id.to_string();
let provider = tc.provider.clone();
let model = tc.model.clone();
let input_tokens = tc.input_tokens;
let output_tokens = tc.output_tokens;
let cache_read_tokens = tc.cache_read_tokens;
let cache_write_tokens = tc.cache_write_tokens;
let cost = tc.cost;
let success = tc.success;
// Run SSH in a blocking thread — heartbeat is async, SSH is fast (<1s).
std::thread::spawn(move || {
use std::io::Write;
let payload = serde_json::json!({
"node_hostname": node_hostname,
"task_id": task_id,
"provider": provider,
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cache_read_tokens": cache_read_tokens,
"cache_write_tokens": cache_write_tokens,
"cost_usd": cost,
"success": success,
"finished_at": chrono::Utc::now().to_rfc3339(),
});
let payload_line = serde_json::to_string(&payload).unwrap_or_default();
let mut child = match std::process::Command::new("ssh")
.args([
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=5",
&mother_host,
"report-task-cost",
])
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
{
Ok(c) => c,
Err(e) => {
warn!(error = %e, "mother cost push: SSH spawn failed");
return;
}
};
if let Some(ref mut stdin) = child.stdin {
let _ = writeln!(stdin, "{payload_line}");
}
match child.wait_with_output() {
Ok(out) if out.status.success() => {
debug!(task_id = %task_id, "mother cost push: ok");
}
Ok(out) => {
let stderr = String::from_utf8_lossy(&out.stderr);
warn!(task_id = %task_id, stderr = %stderr.trim(), "mother cost push: failed");
}
Err(e) => {
warn!(task_id = %task_id, error = %e, "mother cost push: error");
}
}
});
}
async fn session_rotation(state: &SharedState) {
let cm_str = state.cost_mode.read().await.clone();
let mut cost_mode = crate::cost::CostMode::parse(&cm_str).unwrap_or_default();

View file

@ -5,9 +5,10 @@
# command with this script and stores the original in $SSH_ORIGINAL_COMMAND.
#
# Allowlist:
# "" → colibri-mcp in stdio MCP mode (persistent JSON-RPC channel)
# "tools" → colibri-mcp tools (one-shot discovery, debugging)
# everything else → rejected with exit 1
# "" → colibri-mcp in stdio MCP mode (persistent JSON-RPC channel)
# "tools" → colibri-mcp tools (one-shot discovery, debugging)
# "report-task-cost" → read TaskCostSummary JSON from stdin, INSERT into task_costs
# everything else → rejected with exit 1
#
# Why: the wrapper's job is to constrain what callers can do through the
# SSH forced-command boundary. Without an allowlist, the caller can pass
@ -24,6 +25,31 @@ case "${SSH_ORIGINAL_COMMAND:-}" in
"tools")
exec /usr/local/bin/colibri-mcp tools
;;
"report-task-cost")
# Read TaskCostSummary JSON from stdin, INSERT into mother_hive.task_costs.
# Input: {"node_id":1,"task_id":"abc","provider":"deepseek","model":"deepseek-chat",
# "input_tokens":150,"output_tokens":80,"cache_read_tokens":200,
# "cache_write_tokens":50,"cost_usd":0.0042,"success":true,
# "finished_at":"2026-06-27T12:00:00Z"}
psql -d mother_hive -tA -v ON_ERROR_STOP=1 <<'PSQL'
INSERT INTO task_costs (node_id, task_id, provider, model,
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
cost_usd, success, finished_at)
SELECT
(j->>'node_id')::INTEGER,
j->>'task_id',
j->>'provider',
j->>'model',
COALESCE((j->>'input_tokens')::BIGINT, 0),
COALESCE((j->>'output_tokens')::BIGINT, 0),
COALESCE((j->>'cache_read_tokens')::BIGINT, 0),
COALESCE((j->>'cache_write_tokens')::BIGINT, 0),
COALESCE((j->>'cost_usd')::DOUBLE PRECISION, 0.0),
COALESCE((j->>'success')::BOOLEAN, false),
COALESCE((j->>'finished_at')::TIMESTAMPTZ, now())
FROM (SELECT (pg_read_file('/dev/stdin')::JSONB) AS j) AS _;
PSQL
;;
*)
printf '{"jsonrpc":"2.0","id":null,"error":{"code":-1,"message":"rejected: %s"}}\n' \
"$(printf '%s' "${SSH_ORIGINAL_COMMAND}" | sed 's/"/\\"/g')" >&2

View file

@ -51,6 +51,27 @@ CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON hive_nodes (last_seen DESC);
CREATE INDEX IF NOT EXISTS idx_nodes_type ON hive_nodes (node_type);
CREATE INDEX IF NOT EXISTS idx_nodes_cap_has_gpu ON hive_nodes ((capabilities->>'has_gpu'));
-- Per-task cost records pushed by daemon heartbeat after agent exit.
-- One row per task completion; aggregated for dashboard queries.
CREATE TABLE IF NOT EXISTS task_costs (
id BIGSERIAL PRIMARY KEY,
node_id INTEGER REFERENCES hive_nodes(id) ON DELETE SET NULL,
task_id TEXT NOT NULL, -- daemon-local task id
provider TEXT, -- e.g. deepseek, openrouter
model TEXT, -- e.g. deepseek-chat
input_tokens BIGINT NOT NULL DEFAULT 0,
output_tokens BIGINT NOT NULL DEFAULT 0,
cache_read_tokens BIGINT NOT NULL DEFAULT 0,
cache_write_tokens BIGINT NOT NULL DEFAULT 0,
cost_usd DOUBLE PRECISION NOT NULL DEFAULT 0.0,
success BOOLEAN NOT NULL DEFAULT false,
finished_at TIMESTAMPTZ NOT NULL DEFAULT now(),
reported_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_task_costs_node ON task_costs (node_id);
CREATE INDEX IF NOT EXISTS idx_task_costs_finished ON task_costs (finished_at DESC);
CREATE INDEX IF NOT EXISTS idx_task_costs_provider ON task_costs (provider, model);
CREATE TABLE IF NOT EXISTS build_queue (
id SERIAL PRIMARY KEY,
node_id INTEGER REFERENCES hive_nodes(id),