colibri/crates/colibri-client/tests/live_socket_check.rs
Sam & Claude daed5db908
Some checks failed
CI / rust (pull_request) Has been cancelled
CI / markdown (pull_request) Has been cancelled
CI / port (pull_request) Has been cancelled
CI / agent-jail-pkgs (pull_request) Has been cancelled
test: rename 'fake' test agent → 'sample' (lighter, less loaded)
'fake' carries a broad/negative association. Rename the test-double agent and
all its references to 'sample' (it emits a canned sample of pi-format JSONL):

- scripts/fake-pi-agent.py → scripts/sample-pi-agent.py (git mv, mode kept)
- pi_spawn_live.rs: sample-pi-agent.py, pane label 'sample-pi'
- socket.rs: fn sample_agent_stdout_stream_updates_glasspane, labels, 'pi-sample'
- live_socket_check.rs: sample_agent; colibri.rs: /tmp/sample-agent
- glasspane/spawner doc comments: 'sample JSONL readers'
- docs (ISO-SERVICE-LAYOUT, PRIORITY-HANDOFF, ISO-ACCEPTANCE-RUNBOOK)

Pure rename; no behavior change. ./scripts/ci-checks.sh green.
Stacks on #158.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 18:19:49 +02:00

429 lines
14 KiB
Rust

use std::{
process::Command,
sync::Arc,
time::{Duration, Instant},
};
use colibri_client::DaemonClient;
use colibri_daemon::daemon::poll_tasks;
use colibri_daemon::{socket, DaemonConfig, DaemonState, SharedState};
use colibri_glasspane::AgentState;
use uuid::Uuid;
fn check_config() -> DaemonConfig {
let data_dir = std::env::temp_dir().join(format!("colibri-live-test-{}", Uuid::new_v4()));
DaemonConfig {
socket_path: data_dir.join("colibri.sock"),
data_dir: data_dir.clone(),
db_path: data_dir.join("colibri.sqlite"),
deepseek_api_key: None,
deepseek_endpoint: "https://api.deepseek.com/chat/completions".to_string(),
deepseek_model: "deepseek-chat".to_string(),
openrouter_api_key: None,
openrouter_endpoint: "https://openrouter.ai/api/v1/chat/completions".to_string(),
anthropic_api_key: None,
anthropic_endpoint: "https://api.anthropic.com/v1/messages".to_string(),
host: "live-test-host".to_string(),
max_context_tokens: 128_000,
cost_mode: "smart".to_string(),
scheduler_prompt_injection: false,
cache_warming_enabled: false,
cache_warming_interval_hours: 0,
headroom_enabled: false,
headroom_socket_path: std::path::PathBuf::from("/var/run/colibri/headroom.sock"),
}
}
async fn wait_for_socket(client: &DaemonClient) {
let deadline = Instant::now() + Duration::from_secs(5);
loop {
if client.status().await.is_ok() {
return;
}
assert!(
Instant::now() < deadline,
"daemon socket did not become ready"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
async fn run_colibri_cli(socket_path: &std::path::Path, args: &[&str]) -> serde_json::Value {
let bin = env!("CARGO_BIN_EXE_colibri");
let socket_path = socket_path.to_path_buf();
let args: Vec<String> = args.iter().map(|arg| arg.to_string()).collect();
let output = tokio::task::spawn_blocking(move || {
Command::new(bin)
.arg("--socket")
.arg(socket_path)
.args(args)
.output()
.expect("run colibri CLI")
})
.await
.expect("join colibri CLI task");
assert!(
output.status.success(),
"colibri CLI failed\nstdout:\n{}\nstderr:\n{}",
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr)
);
serde_json::from_slice(&output.stdout).expect("parse colibri CLI JSON output")
}
async fn wait_for_state(client: &DaemonClient, expected: AgentState) -> String {
let deadline = Instant::now() + Duration::from_secs(8);
loop {
let snapshot = client.glasspane_snapshot().await.unwrap();
if let Some(pane) = snapshot.panes.first() {
if pane.state == expected {
return pane.id.clone();
}
}
assert!(
Instant::now() < deadline,
"snapshot did not reach state {expected:?}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
#[tokio::test]
async fn daemon_client_live_socket_check_with_local_sample_agent() {
let config = check_config();
tokio::fs::create_dir_all(&config.data_dir).await.unwrap();
let sample_agent = env!("CARGO_BIN_EXE_colibri-test-agent");
let state: SharedState = Arc::new(DaemonState::new(config.clone()));
let shutdown = state.shutdown_rx.resubscribe();
let server_state = state.clone();
let server = tokio::spawn(async move {
let _ = socket::serve(server_state, shutdown).await;
});
let client = DaemonClient::new(config.socket_path.clone());
wait_for_socket(&client).await;
let status = client.status().await.unwrap();
assert_eq!(status["daemon"], "colibri-daemon");
assert_eq!(status["host"], "live-test-host");
let empty_snapshot = client.glasspane_snapshot().await.unwrap();
assert!(empty_snapshot.panes.is_empty());
let spawn = client
.spawn_agent(
"local",
sample_agent,
Some("test-session".to_string()),
None,
)
.await
.unwrap();
let agent_id = spawn["agent_id"].as_str().unwrap().to_string();
let pane_id = wait_for_state(&client, AgentState::Idle).await;
assert_eq!(pane_id, agent_id);
wait_for_state(&client, AgentState::Working).await;
wait_for_state(&client, AgentState::Blocked).await;
wait_for_state(&client, AgentState::Done).await;
let snapshot = client.glasspane_snapshot().await.unwrap();
let pane = snapshot
.panes
.iter()
.find(|pane| pane.id == agent_id)
.unwrap();
assert_eq!(pane.session_id.as_deref(), Some("manual-test"));
assert!(pane.cwd.is_some());
let kill = client.kill_agent(agent_id).await.unwrap();
assert_eq!(kill["status"], "stopped");
let _ = state.shutdown_tx.send(());
server.await.unwrap();
let _ = tokio::fs::remove_dir_all(config.data_dir).await;
}
#[tokio::test]
async fn colibri_cli_task_commands_use_socket_api() {
let config = check_config();
tokio::fs::create_dir_all(&config.data_dir).await.unwrap();
let state: SharedState = Arc::new(DaemonState::new(config.clone()));
let shutdown = state.shutdown_rx.resubscribe();
let server_state = state.clone();
let server = tokio::spawn(async move {
let _ = socket::serve(server_state, shutdown).await;
});
let client = DaemonClient::new(config.socket_path.clone());
wait_for_socket(&client).await;
let created = run_colibri_cli(
&config.socket_path,
&[
"create-task",
"--title",
"cli-created-task",
"--description",
"created through the colibri binary",
],
)
.await;
assert_eq!(created["title"], "cli-created-task");
assert_eq!(created["status"], "queued");
let tasks = run_colibri_cli(&config.socket_path, &["list-tasks", "--status", "queued"]).await;
assert!(tasks
.as_array()
.unwrap()
.iter()
.any(|task| task["title"] == "cli-created-task"));
let intake = run_colibri_cli(
&config.socket_path,
&[
"intake-task",
"--title",
"cli-intake-task",
"--description",
"queued through the scheduler intake",
"--capability",
"freebsd",
"--capabilities",
"sqlite,scheduler",
],
)
.await;
assert_eq!(intake["status"], "queued");
let _ = state.shutdown_tx.send(());
server.await.unwrap();
let _ = tokio::fs::remove_dir_all(config.data_dir).await;
}
#[tokio::test]
async fn poll_tasks_spawns_agent_for_claimed_task() {
let mut config = check_config();
let sample_agent = env!("CARGO_BIN_EXE_colibri-test-agent");
// Set COLIBRI_AGENT_BINARY so poll_tasks uses the test agent binary
config.data_dir =
std::env::temp_dir().join(format!("colibri-poll-tasks-test-{}", Uuid::new_v4()));
tokio::fs::create_dir_all(&config.data_dir).await.unwrap();
std::env::set_var("COLIBRI_AGENT_BINARY", sample_agent);
let state: SharedState = Arc::new(DaemonState::new(config.clone()));
let shutdown = state.shutdown_rx.resubscribe();
let server_state = state.clone();
let server = tokio::spawn(async move {
let _ = socket::serve(server_state, shutdown).await;
});
let client = DaemonClient::new(config.socket_path.clone());
wait_for_socket(&client).await;
// Register an agent
{
let store = state.store.lock().unwrap();
store
.register_agent("test-agent", serde_json::json!([]))
.unwrap();
}
// Create and claim a task directly in the store
let task = {
let store = state.store.lock().unwrap();
let task = store.create_task("poll-tasks-check", None).unwrap();
let agent = store.list_agents().unwrap();
let agent_id = &agent[0].id;
store.claim_task(&task.id, agent_id).unwrap();
store
.get_task(&task.id)
.unwrap()
.expect("task should exist")
};
assert_eq!(
task.status,
colibri_store::TaskStatus::Claimed,
"task should be claimed before poll_tasks"
);
// Call poll_tasks — should spawn the test agent for this claimed task
poll_tasks(&state).await;
// Verify an agent handle was created
let deadline = Instant::now() + Duration::from_secs(10);
loop {
if !state.agents.is_empty() {
break;
}
assert!(Instant::now() < deadline, "no agent spawned by poll_tasks");
tokio::time::sleep(Duration::from_millis(50)).await;
}
// Task should have transitioned to Started
let updated_task = {
let store = state.store.lock().unwrap();
store
.get_task(&task.id)
.unwrap()
.expect("task should exist")
};
assert_eq!(
updated_task.status,
colibri_store::TaskStatus::Started,
"task should be Started after poll_tasks spawns"
);
// A session should exist for this task
assert!(
state.sessions.contains_key(&format!("task-{}", task.id)),
"session should be created for task"
);
// Glasspane should observe the full lifecycle: the test agent runs
// through Idle → Working → Blocked → Working → Done quickly (10ms steps).
// Wait for Done — the terminal state — which proves the whole path worked:
// poll_tasks spawned the agent, stdout streamed to glasspane, and the
// state machine processed the JSONL events.
let deadline = Instant::now() + Duration::from_secs(20);
loop {
let snap = client.glasspane_snapshot().await.unwrap();
if snap.panes.iter().any(|p| p.state == AgentState::Done) {
break;
}
assert!(
Instant::now() < deadline,
"agent did not reach Done — snapshot: {:?}",
snap.panes
.iter()
.map(|p| (p.id.clone(), p.state))
.collect::<Vec<_>>()
);
tokio::time::sleep(Duration::from_millis(100)).await;
}
let _ = state.shutdown_tx.send(());
server.await.unwrap();
std::env::remove_var("COLIBRI_AGENT_BINARY");
let _ = tokio::fs::remove_dir_all(config.data_dir).await;
}
#[tokio::test]
async fn harness_double_spawn_session_isolation() {
let config = check_config();
tokio::fs::create_dir_all(&config.data_dir).await.unwrap();
let sample_agent = env!("CARGO_BIN_EXE_colibri-test-agent");
let state: SharedState = Arc::new(DaemonState::new(config.clone()));
let shutdown = state.shutdown_rx.resubscribe();
let server_state = state.clone();
let server = tokio::spawn(async move {
let _ = socket::serve(server_state, shutdown).await;
});
let client = DaemonClient::new(config.socket_path.clone());
wait_for_socket(&client).await;
// Spawn two agents with different session IDs
client
.spawn_agent(
"local",
sample_agent,
Some("harness-session-a".to_string()),
None,
)
.await
.unwrap();
client
.spawn_agent(
"local",
sample_agent,
Some("harness-session-b".to_string()),
None,
)
.await
.unwrap();
// Wait for both to reach Done
let deadline = Instant::now() + Duration::from_secs(20);
loop {
let snap = client.glasspane_snapshot().await.unwrap();
let done_count = snap
.panes
.iter()
.filter(|p| p.state == AgentState::Done)
.count();
if done_count >= 2 {
break;
}
assert!(Instant::now() < deadline, "agents did not reach Done");
tokio::time::sleep(Duration::from_millis(100)).await;
}
// Verify both agents reached Done
let snap = client.glasspane_snapshot().await.unwrap();
assert_eq!(snap.panes.len(), 2);
// Both test agents default to "manual-test" unless --session-id is passed
let pane_a = &snap.panes[0];
let pane_b = &snap.panes[1];
assert_eq!(pane_a.state, AgentState::Done);
assert_eq!(pane_b.state, AgentState::Done);
assert_ne!(pane_a.id, pane_b.id);
// Verify session isolation: both share the same session_id (test agent default)
assert_eq!(pane_a.session_id, pane_b.session_id);
// Kill one agent — snapshot may still include stopped panes briefly
let kill = client.kill_agent(&pane_a.id).await.unwrap();
assert_eq!(kill["status"], "stopped");
let _ = state.shutdown_tx.send(());
server.await.unwrap();
let _ = tokio::fs::remove_dir_all(config.data_dir).await;
}
#[tokio::test]
async fn register_tenant_and_list_over_socket() {
// Retires the raw-SQLite insert in the first-proof runbook: register-tenant
// + list-tenants now flow through the socket, returning the tenant record.
let config = check_config();
tokio::fs::create_dir_all(&config.data_dir).await.unwrap();
let state: SharedState = Arc::new(DaemonState::new(config.clone()));
let shutdown = state.shutdown_rx.resubscribe();
let server_state = state.clone();
let server = tokio::spawn(async move {
let _ = socket::serve(server_state, shutdown).await;
});
let client = DaemonClient::new(config.socket_path.clone());
wait_for_socket(&client).await;
let registered = client
.register_tenant("proof0", "/usr/local/bastille/jails/proof0/root", "proof0")
.await
.unwrap();
assert_eq!(registered["tenant_id"], "proof0");
assert_eq!(registered["status"], "provisioned");
assert_eq!(
registered["jail_root_path"],
"/usr/local/bastille/jails/proof0/root"
);
let listed = client.list_tenants().await.unwrap();
let found = listed
.as_array()
.unwrap()
.iter()
.find(|t| t["tenant_id"] == "proof0")
.expect("registered tenant appears in list-tenants");
assert_eq!(found["collection_id"], "proof0");
let _ = state.shutdown_tx.send(());
server.await.unwrap();
let _ = tokio::fs::remove_dir_all(config.data_dir).await;
}