feat(mother): Layer 2 — task_costs table + daemon push pipeline #232
3 changed files with 139 additions and 13 deletions
|
|
@ -280,16 +280,20 @@ pub async fn heartbeat(state: &SharedState, _stall_timeout: Duration) {
|
|||
}
|
||||
};
|
||||
match store.set_task_cost(task_id, &tc) {
|
||||
Ok(t) => info!(
|
||||
task_id = %task_id,
|
||||
input_tokens = u.input_tokens,
|
||||
output_tokens = u.output_tokens,
|
||||
cache_read = u.cache_read_tokens,
|
||||
cost = u.cost(),
|
||||
success = tc.success,
|
||||
status = %t.status.as_str(),
|
||||
"task cost captured"
|
||||
),
|
||||
Ok(t) => {
|
||||
info!(
|
||||
task_id = %task_id,
|
||||
input_tokens = u.input_tokens,
|
||||
output_tokens = u.output_tokens,
|
||||
cache_read = u.cache_read_tokens,
|
||||
cost = u.cost(),
|
||||
success = tc.success,
|
||||
status = %t.status.as_str(),
|
||||
"task cost captured"
|
||||
);
|
||||
// Best-effort push to mother for dashboard aggregation.
|
||||
push_cost_to_mother(task_id, &tc);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(task_id = %task_id, error = %e, "failed to write task cost")
|
||||
}
|
||||
|
|
@ -308,6 +312,81 @@ pub async fn heartbeat(state: &SharedState, _stall_timeout: Duration) {
|
|||
}
|
||||
}
|
||||
|
||||
/// Best-effort push of task cost to mother node for dashboard aggregation.
|
||||
///
|
||||
/// SSH's to mother with `report-task-cost` forced command, pipes TaskCost JSON
|
||||
/// to stdin. Failures are logged as warnings — the local SQLite store remains
|
||||
/// authoritative.
|
||||
fn push_cost_to_mother(task_id: &str, tc: &colibri_store::TaskCost) {
|
||||
let mother_host = match std::env::var("COLIBRI_MOTHER_HOST").ok() {
|
||||
Some(h) => h,
|
||||
None => return,
|
||||
};
|
||||
let node_hostname = std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_string());
|
||||
|
||||
// Clone before moving into the spawned thread.
|
||||
let task_id = task_id.to_string();
|
||||
let provider = tc.provider.clone();
|
||||
let model = tc.model.clone();
|
||||
let input_tokens = tc.input_tokens;
|
||||
let output_tokens = tc.output_tokens;
|
||||
let cache_read_tokens = tc.cache_read_tokens;
|
||||
let cache_write_tokens = tc.cache_write_tokens;
|
||||
let cost = tc.cost;
|
||||
let success = tc.success;
|
||||
|
||||
// Run SSH in a blocking thread — heartbeat is async, SSH is fast (<1s).
|
||||
std::thread::spawn(move || {
|
||||
use std::io::Write;
|
||||
let payload = serde_json::json!({
|
||||
"node_hostname": node_hostname,
|
||||
"task_id": task_id,
|
||||
"provider": provider,
|
||||
"model": model,
|
||||
"input_tokens": input_tokens,
|
||||
"output_tokens": output_tokens,
|
||||
"cache_read_tokens": cache_read_tokens,
|
||||
"cache_write_tokens": cache_write_tokens,
|
||||
"cost_usd": cost,
|
||||
"success": success,
|
||||
"finished_at": chrono::Utc::now().to_rfc3339(),
|
||||
});
|
||||
let payload_line = serde_json::to_string(&payload).unwrap_or_default();
|
||||
let mut child = match std::process::Command::new("ssh")
|
||||
.args([
|
||||
"-o", "BatchMode=yes",
|
||||
"-o", "ConnectTimeout=5",
|
||||
&mother_host,
|
||||
"report-task-cost",
|
||||
])
|
||||
.stdin(std::process::Stdio::piped())
|
||||
.stdout(std::process::Stdio::null())
|
||||
.stderr(std::process::Stdio::piped())
|
||||
.spawn()
|
||||
{
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
warn!(error = %e, "mother cost push: SSH spawn failed");
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Some(ref mut stdin) = child.stdin {
|
||||
let _ = writeln!(stdin, "{payload_line}");
|
||||
}
|
||||
match child.wait_with_output() {
|
||||
Ok(out) if out.status.success() => {
|
||||
debug!(task_id = %task_id, "mother cost push: ok");
|
||||
}
|
||||
Ok(out) => {
|
||||
let stderr = String::from_utf8_lossy(&out.stderr);
|
||||
warn!(task_id = %task_id, stderr = %stderr.trim(), "mother cost push: failed");
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(task_id = %task_id, error = %e, "mother cost push: error");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
async fn session_rotation(state: &SharedState) {
|
||||
let cm_str = state.cost_mode.read().await.clone();
|
||||
let mut cost_mode = crate::cost::CostMode::parse(&cm_str).unwrap_or_default();
|
||||
|
|
|
|||
|
|
@ -5,9 +5,10 @@
|
|||
# command with this script and stores the original in $SSH_ORIGINAL_COMMAND.
|
||||
#
|
||||
# Allowlist:
|
||||
# "" → colibri-mcp in stdio MCP mode (persistent JSON-RPC channel)
|
||||
# "tools" → colibri-mcp tools (one-shot discovery, debugging)
|
||||
# everything else → rejected with exit 1
|
||||
# "" → colibri-mcp in stdio MCP mode (persistent JSON-RPC channel)
|
||||
# "tools" → colibri-mcp tools (one-shot discovery, debugging)
|
||||
# "report-task-cost" → read TaskCostSummary JSON from stdin, INSERT into task_costs
|
||||
# everything else → rejected with exit 1
|
||||
#
|
||||
# Why: the wrapper's job is to constrain what callers can do through the
|
||||
# SSH forced-command boundary. Without an allowlist, the caller can pass
|
||||
|
|
@ -24,6 +25,31 @@ case "${SSH_ORIGINAL_COMMAND:-}" in
|
|||
"tools")
|
||||
exec /usr/local/bin/colibri-mcp tools
|
||||
;;
|
||||
"report-task-cost")
|
||||
# Read TaskCostSummary JSON from stdin, INSERT into mother_hive.task_costs.
|
||||
# Input: {"node_id":1,"task_id":"abc","provider":"deepseek","model":"deepseek-chat",
|
||||
# "input_tokens":150,"output_tokens":80,"cache_read_tokens":200,
|
||||
# "cache_write_tokens":50,"cost_usd":0.0042,"success":true,
|
||||
# "finished_at":"2026-06-27T12:00:00Z"}
|
||||
psql -d mother_hive -tA -v ON_ERROR_STOP=1 <<'PSQL'
|
||||
INSERT INTO task_costs (node_id, task_id, provider, model,
|
||||
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
|
||||
cost_usd, success, finished_at)
|
||||
SELECT
|
||||
(j->>'node_id')::INTEGER,
|
||||
j->>'task_id',
|
||||
j->>'provider',
|
||||
j->>'model',
|
||||
COALESCE((j->>'input_tokens')::BIGINT, 0),
|
||||
COALESCE((j->>'output_tokens')::BIGINT, 0),
|
||||
COALESCE((j->>'cache_read_tokens')::BIGINT, 0),
|
||||
COALESCE((j->>'cache_write_tokens')::BIGINT, 0),
|
||||
COALESCE((j->>'cost_usd')::DOUBLE PRECISION, 0.0),
|
||||
COALESCE((j->>'success')::BOOLEAN, false),
|
||||
COALESCE((j->>'finished_at')::TIMESTAMPTZ, now())
|
||||
FROM (SELECT (pg_read_file('/dev/stdin')::JSONB) AS j) AS _;
|
||||
PSQL
|
||||
;;
|
||||
*)
|
||||
printf '{"jsonrpc":"2.0","id":null,"error":{"code":-1,"message":"rejected: %s"}}\n' \
|
||||
"$(printf '%s' "${SSH_ORIGINAL_COMMAND}" | sed 's/"/\\"/g')" >&2
|
||||
|
|
|
|||
|
|
@ -51,6 +51,27 @@ CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON hive_nodes (last_seen DESC);
|
|||
CREATE INDEX IF NOT EXISTS idx_nodes_type ON hive_nodes (node_type);
|
||||
CREATE INDEX IF NOT EXISTS idx_nodes_cap_has_gpu ON hive_nodes ((capabilities->>'has_gpu'));
|
||||
|
||||
-- Per-task cost records pushed by daemon heartbeat after agent exit.
|
||||
-- One row per task completion; aggregated for dashboard queries.
|
||||
CREATE TABLE IF NOT EXISTS task_costs (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
node_id INTEGER REFERENCES hive_nodes(id) ON DELETE SET NULL,
|
||||
task_id TEXT NOT NULL, -- daemon-local task id
|
||||
provider TEXT, -- e.g. deepseek, openrouter
|
||||
model TEXT, -- e.g. deepseek-chat
|
||||
input_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
output_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
cache_read_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
cache_write_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
cost_usd DOUBLE PRECISION NOT NULL DEFAULT 0.0,
|
||||
success BOOLEAN NOT NULL DEFAULT false,
|
||||
finished_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
reported_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_task_costs_node ON task_costs (node_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_task_costs_finished ON task_costs (finished_at DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_task_costs_provider ON task_costs (provider, model);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS build_queue (
|
||||
id SERIAL PRIMARY KEY,
|
||||
node_id INTEGER REFERENCES hive_nodes(id),
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue