colibri/crates/colibri-client/tests/live_socket_check.rs
Sam & Claude 06601a09c4
Some checks failed
CI / rust (pull_request) Has been cancelled
CI / markdown (pull_request) Has been cancelled
CI / port (pull_request) Has been cancelled
CI / agent-jail-pkgs (pull_request) Has been cancelled
refactor: clear pi-era residue from the harness-neutral agent path
Sweep for stale naming/defaults left over from the pi-only era (the same
class of bug as the pi_binary compile break):

- socket.rs: non-local spawn defaulted the binary to `hermes-agent` (a binary
  that does not exist) and hardcoded `--mode json` (invalid for zot) — a
  reachable latent bug via `colibri spawn-agent <provider>`. Default to zot and
  derive argv from a single default_agent_args() helper, shared with autospawn
  (removes the duplicated zot-vs-pi arg logic).
- glasspane/tui/client/daemon: rename the stale wire field `pi_session_id` →
  `session_id` (zot agents have session ids too). `#[serde(alias =
  "pi_session_id")]` keeps deserializing legacy/persisted snapshots.
- contracts + runtime_inventory: record `zot` version alongside `pi` for
  complete agent provenance (detect_zot_version()).
- Harness-neutral stale comments ("Pi agent", "hermes-agent" example,
  COLIBRI_PI_BINARY in a test doc).
- cmd_spawn_agent: #[allow(clippy::too_many_arguments)] — this lint was already
  failing `clippy -D warnings`, i.e. the CI gate was red on main and thus
  unenforceable. The gate (scripts/ci-checks.sh) now passes green.
- AGENTS.md: document that ci-checks.sh passing is mandatory before merge while
  no Actions runner enforces .forgejo/workflows/ci.yml.

Validated: ./scripts/ci-checks.sh green (fmt, clippy -D warnings, tests, md).

Stacks on #157 (zot-rpc driver) and #156 (0.12 build fix).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 18:04:45 +02:00

424 lines
14 KiB
Rust

use std::{
process::Command,
sync::Arc,
time::{Duration, Instant},
};
use colibri_client::DaemonClient;
use colibri_daemon::daemon::poll_tasks;
use colibri_daemon::{socket, DaemonConfig, DaemonState, SharedState};
use colibri_glasspane::AgentState;
use uuid::Uuid;
fn check_config() -> DaemonConfig {
let data_dir = std::env::temp_dir().join(format!("colibri-live-test-{}", Uuid::new_v4()));
DaemonConfig {
socket_path: data_dir.join("colibri.sock"),
data_dir: data_dir.clone(),
db_path: data_dir.join("colibri.sqlite"),
deepseek_api_key: None,
deepseek_endpoint: "https://api.deepseek.com/chat/completions".to_string(),
deepseek_model: "deepseek-chat".to_string(),
openrouter_api_key: None,
openrouter_endpoint: "https://openrouter.ai/api/v1/chat/completions".to_string(),
anthropic_api_key: None,
anthropic_endpoint: "https://api.anthropic.com/v1/messages".to_string(),
host: "live-test-host".to_string(),
max_context_tokens: 128_000,
cost_mode: "smart".to_string(),
scheduler_prompt_injection: false,
cache_warming_enabled: false,
cache_warming_interval_hours: 0,
headroom_enabled: false,
headroom_socket_path: std::path::PathBuf::from("/var/run/colibri/headroom.sock"),
}
}
async fn wait_for_socket(client: &DaemonClient) {
let deadline = Instant::now() + Duration::from_secs(5);
loop {
if client.status().await.is_ok() {
return;
}
assert!(
Instant::now() < deadline,
"daemon socket did not become ready"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
async fn run_colibri_cli(socket_path: &std::path::Path, args: &[&str]) -> serde_json::Value {
let bin = env!("CARGO_BIN_EXE_colibri");
let socket_path = socket_path.to_path_buf();
let args: Vec<String> = args.iter().map(|arg| arg.to_string()).collect();
let output = tokio::task::spawn_blocking(move || {
Command::new(bin)
.arg("--socket")
.arg(socket_path)
.args(args)
.output()
.expect("run colibri CLI")
})
.await
.expect("join colibri CLI task");
assert!(
output.status.success(),
"colibri CLI failed\nstdout:\n{}\nstderr:\n{}",
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr)
);
serde_json::from_slice(&output.stdout).expect("parse colibri CLI JSON output")
}
async fn wait_for_state(client: &DaemonClient, expected: AgentState) -> String {
let deadline = Instant::now() + Duration::from_secs(8);
loop {
let snapshot = client.glasspane_snapshot().await.unwrap();
if let Some(pane) = snapshot.panes.first() {
if pane.state == expected {
return pane.id.clone();
}
}
assert!(
Instant::now() < deadline,
"snapshot did not reach state {expected:?}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
#[tokio::test]
async fn daemon_client_live_socket_check_with_local_fake_agent() {
let config = check_config();
tokio::fs::create_dir_all(&config.data_dir).await.unwrap();
let fake_agent = env!("CARGO_BIN_EXE_colibri-test-agent");
let state: SharedState = Arc::new(DaemonState::new(config.clone()));
let shutdown = state.shutdown_rx.resubscribe();
let server_state = state.clone();
let server = tokio::spawn(async move {
let _ = socket::serve(server_state, shutdown).await;
});
let client = DaemonClient::new(config.socket_path.clone());
wait_for_socket(&client).await;
let status = client.status().await.unwrap();
assert_eq!(status["daemon"], "colibri-daemon");
assert_eq!(status["host"], "live-test-host");
let empty_snapshot = client.glasspane_snapshot().await.unwrap();
assert!(empty_snapshot.panes.is_empty());
let spawn = client
.spawn_agent("local", fake_agent, Some("test-session".to_string()), None)
.await
.unwrap();
let agent_id = spawn["agent_id"].as_str().unwrap().to_string();
let pane_id = wait_for_state(&client, AgentState::Idle).await;
assert_eq!(pane_id, agent_id);
wait_for_state(&client, AgentState::Working).await;
wait_for_state(&client, AgentState::Blocked).await;
wait_for_state(&client, AgentState::Done).await;
let snapshot = client.glasspane_snapshot().await.unwrap();
let pane = snapshot
.panes
.iter()
.find(|pane| pane.id == agent_id)
.unwrap();
assert_eq!(pane.session_id.as_deref(), Some("manual-test"));
assert!(pane.cwd.is_some());
let kill = client.kill_agent(agent_id).await.unwrap();
assert_eq!(kill["status"], "stopped");
let _ = state.shutdown_tx.send(());
server.await.unwrap();
let _ = tokio::fs::remove_dir_all(config.data_dir).await;
}
#[tokio::test]
async fn colibri_cli_task_commands_use_socket_api() {
let config = check_config();
tokio::fs::create_dir_all(&config.data_dir).await.unwrap();
let state: SharedState = Arc::new(DaemonState::new(config.clone()));
let shutdown = state.shutdown_rx.resubscribe();
let server_state = state.clone();
let server = tokio::spawn(async move {
let _ = socket::serve(server_state, shutdown).await;
});
let client = DaemonClient::new(config.socket_path.clone());
wait_for_socket(&client).await;
let created = run_colibri_cli(
&config.socket_path,
&[
"create-task",
"--title",
"cli-created-task",
"--description",
"created through the colibri binary",
],
)
.await;
assert_eq!(created["title"], "cli-created-task");
assert_eq!(created["status"], "queued");
let tasks = run_colibri_cli(&config.socket_path, &["list-tasks", "--status", "queued"]).await;
assert!(tasks
.as_array()
.unwrap()
.iter()
.any(|task| task["title"] == "cli-created-task"));
let intake = run_colibri_cli(
&config.socket_path,
&[
"intake-task",
"--title",
"cli-intake-task",
"--description",
"queued through the scheduler intake",
"--capability",
"freebsd",
"--capabilities",
"sqlite,scheduler",
],
)
.await;
assert_eq!(intake["status"], "queued");
let _ = state.shutdown_tx.send(());
server.await.unwrap();
let _ = tokio::fs::remove_dir_all(config.data_dir).await;
}
#[tokio::test]
async fn poll_tasks_spawns_agent_for_claimed_task() {
let mut config = check_config();
let fake_agent = env!("CARGO_BIN_EXE_colibri-test-agent");
// Set COLIBRI_AGENT_BINARY so poll_tasks uses the test agent binary
config.data_dir =
std::env::temp_dir().join(format!("colibri-poll-tasks-test-{}", Uuid::new_v4()));
tokio::fs::create_dir_all(&config.data_dir).await.unwrap();
std::env::set_var("COLIBRI_AGENT_BINARY", fake_agent);
let state: SharedState = Arc::new(DaemonState::new(config.clone()));
let shutdown = state.shutdown_rx.resubscribe();
let server_state = state.clone();
let server = tokio::spawn(async move {
let _ = socket::serve(server_state, shutdown).await;
});
let client = DaemonClient::new(config.socket_path.clone());
wait_for_socket(&client).await;
// Register an agent
{
let store = state.store.lock().unwrap();
store
.register_agent("test-agent", serde_json::json!([]))
.unwrap();
}
// Create and claim a task directly in the store
let task = {
let store = state.store.lock().unwrap();
let task = store.create_task("poll-tasks-check", None).unwrap();
let agent = store.list_agents().unwrap();
let agent_id = &agent[0].id;
store.claim_task(&task.id, agent_id).unwrap();
store
.get_task(&task.id)
.unwrap()
.expect("task should exist")
};
assert_eq!(
task.status,
colibri_store::TaskStatus::Claimed,
"task should be claimed before poll_tasks"
);
// Call poll_tasks — should spawn the test agent for this claimed task
poll_tasks(&state).await;
// Verify an agent handle was created
let deadline = Instant::now() + Duration::from_secs(10);
loop {
if !state.agents.is_empty() {
break;
}
assert!(Instant::now() < deadline, "no agent spawned by poll_tasks");
tokio::time::sleep(Duration::from_millis(50)).await;
}
// Task should have transitioned to Started
let updated_task = {
let store = state.store.lock().unwrap();
store
.get_task(&task.id)
.unwrap()
.expect("task should exist")
};
assert_eq!(
updated_task.status,
colibri_store::TaskStatus::Started,
"task should be Started after poll_tasks spawns"
);
// A session should exist for this task
assert!(
state.sessions.contains_key(&format!("task-{}", task.id)),
"session should be created for task"
);
// Glasspane should observe the full lifecycle: the test agent runs
// through Idle → Working → Blocked → Working → Done quickly (10ms steps).
// Wait for Done — the terminal state — which proves the whole path worked:
// poll_tasks spawned the agent, stdout streamed to glasspane, and the
// state machine processed the JSONL events.
let deadline = Instant::now() + Duration::from_secs(20);
loop {
let snap = client.glasspane_snapshot().await.unwrap();
if snap.panes.iter().any(|p| p.state == AgentState::Done) {
break;
}
assert!(
Instant::now() < deadline,
"agent did not reach Done — snapshot: {:?}",
snap.panes
.iter()
.map(|p| (p.id.clone(), p.state))
.collect::<Vec<_>>()
);
tokio::time::sleep(Duration::from_millis(100)).await;
}
let _ = state.shutdown_tx.send(());
server.await.unwrap();
std::env::remove_var("COLIBRI_AGENT_BINARY");
let _ = tokio::fs::remove_dir_all(config.data_dir).await;
}
#[tokio::test]
async fn harness_double_spawn_session_isolation() {
let config = check_config();
tokio::fs::create_dir_all(&config.data_dir).await.unwrap();
let fake_agent = env!("CARGO_BIN_EXE_colibri-test-agent");
let state: SharedState = Arc::new(DaemonState::new(config.clone()));
let shutdown = state.shutdown_rx.resubscribe();
let server_state = state.clone();
let server = tokio::spawn(async move {
let _ = socket::serve(server_state, shutdown).await;
});
let client = DaemonClient::new(config.socket_path.clone());
wait_for_socket(&client).await;
// Spawn two agents with different session IDs
client
.spawn_agent(
"local",
fake_agent,
Some("harness-session-a".to_string()),
None,
)
.await
.unwrap();
client
.spawn_agent(
"local",
fake_agent,
Some("harness-session-b".to_string()),
None,
)
.await
.unwrap();
// Wait for both to reach Done
let deadline = Instant::now() + Duration::from_secs(20);
loop {
let snap = client.glasspane_snapshot().await.unwrap();
let done_count = snap
.panes
.iter()
.filter(|p| p.state == AgentState::Done)
.count();
if done_count >= 2 {
break;
}
assert!(Instant::now() < deadline, "agents did not reach Done");
tokio::time::sleep(Duration::from_millis(100)).await;
}
// Verify both agents reached Done
let snap = client.glasspane_snapshot().await.unwrap();
assert_eq!(snap.panes.len(), 2);
// Both test agents default to "manual-test" unless --session-id is passed
let pane_a = &snap.panes[0];
let pane_b = &snap.panes[1];
assert_eq!(pane_a.state, AgentState::Done);
assert_eq!(pane_b.state, AgentState::Done);
assert_ne!(pane_a.id, pane_b.id);
// Verify session isolation: both share the same session_id (test agent default)
assert_eq!(pane_a.session_id, pane_b.session_id);
// Kill one agent — snapshot may still include stopped panes briefly
let kill = client.kill_agent(&pane_a.id).await.unwrap();
assert_eq!(kill["status"], "stopped");
let _ = state.shutdown_tx.send(());
server.await.unwrap();
let _ = tokio::fs::remove_dir_all(config.data_dir).await;
}
#[tokio::test]
async fn register_tenant_and_list_over_socket() {
// Retires the raw-SQLite insert in the first-proof runbook: register-tenant
// + list-tenants now flow through the socket, returning the tenant record.
let config = check_config();
tokio::fs::create_dir_all(&config.data_dir).await.unwrap();
let state: SharedState = Arc::new(DaemonState::new(config.clone()));
let shutdown = state.shutdown_rx.resubscribe();
let server_state = state.clone();
let server = tokio::spawn(async move {
let _ = socket::serve(server_state, shutdown).await;
});
let client = DaemonClient::new(config.socket_path.clone());
wait_for_socket(&client).await;
let registered = client
.register_tenant("proof0", "/usr/local/bastille/jails/proof0/root", "proof0")
.await
.unwrap();
assert_eq!(registered["tenant_id"], "proof0");
assert_eq!(registered["status"], "provisioned");
assert_eq!(
registered["jail_root_path"],
"/usr/local/bastille/jails/proof0/root"
);
let listed = client.list_tenants().await.unwrap();
let found = listed
.as_array()
.unwrap()
.iter()
.find(|t| t["tenant_id"] == "proof0")
.expect("registered tenant appears in list-tenants");
assert_eq!(found["collection_id"], "proof0");
let _ = state.shutdown_tx.send(());
server.await.unwrap();
let _ = tokio::fs::remove_dir_all(config.data_dir).await;
}