colibri/crates/colibri-daemon/src/main.rs
Sam & Claude c3e91189f3 refactor: rename COLIBRI_AUTOSPAWN_PI → COLIBRI_AUTOSPAWN
Harness-neutral naming throughout the daemon:
- COLIBRI_AUTOSPAWN_PI    → COLIBRI_AUTOSPAWN
- COLIBRI_PI_BINARY       → COLIBRI_AUTOSPAWN_BINARY  (default: zot)
- COLIBRI_AUTOSPAWN_PI_ARGS → COLIBRI_AUTOSPAWN_ARGS
- Function: autospawn_pi_if_configured → autospawn_agent_if_configured
- Log prefix: autospawn-pi → autospawn
- Variable: pi_binary → agent_binary, pi_name → agent_name
2026-06-23 11:13:58 +02:00

155 lines
5.7 KiB
Rust

//! colibri-daemon — always-on Rust service for the Colibri control plane.
//!
//! Starts:
//! - Session manager (JSONL persistence + context window management)
//! - Agent spawner (subprocess lifecycle + provider routing)
//! - Colibri control-plane socket API
//!
//! Graceful shutdown on SIGTERM/SIGINT (or Ctrl+C).
use std::sync::Arc;
use colibri_daemon::{daemon, session, socket, DaemonConfig, DaemonState, SharedState};
use tracing::info;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialise tracing
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
info!("colibri-daemon starting up");
// Load configuration
let config = DaemonConfig::from_env();
info!(
host = %config.host,
data_dir = %config.data_dir.display(),
socket_path = %config.socket_path.display(),
cost_mode = %config.cost_mode,
max_context_tokens = config.max_context_tokens,
"configuration loaded"
);
// Build shared state
let state: SharedState = Arc::new(DaemonState::new(config.clone()));
// Ensure data directories exist
tokio::fs::create_dir_all(&config.data_dir).await?;
tokio::fs::create_dir_all(config.data_dir.join("sessions")).await?;
// Bootstrap: load any existing sessions
if let Ok(mut entries) = tokio::fs::read_dir(config.data_dir.join("sessions")).await {
while let Ok(Some(entry)) = entries.next_entry().await {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) == Some("jsonl") {
if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
let session_id = stem.to_string();
let cfg = Arc::new(config.clone());
match session::Session::load(session_id.clone(), cfg) {
Ok(sess) => {
info!(session_id = %session_id, "loaded existing session");
state.sessions.insert(session_id, sess);
}
Err(e) => {
tracing::warn!(
session_id = %session_id,
error = %e,
"failed to load session file"
);
}
}
}
}
}
}
// Start the Colibri control-plane socket server
let socket_state = state.clone();
let socket_shutdown = state.shutdown_rx.resubscribe();
let socket_handle =
tokio::spawn(async move { socket::serve(socket_state, socket_shutdown).await });
// Auto-spawn one Pi agent if configured (live "Operator Image" OOTB flow).
// Runs after the socket server is up so the spawn registers on the board;
// no-op unless COLIBRI_AUTOSPAWN is set and a DeepSeek key is present.
socket::autospawn_agent_if_configured(&state).await;
// Start the daemon background loop (heartbeat, session rotation, scheduler)
let loop_state = state.clone();
let loop_shutdown = state.shutdown_rx.resubscribe();
let loop_config = daemon::DaemonLoopConfig::default();
let loop_handle = tokio::spawn(async move {
daemon::run_loop(loop_state, loop_config, loop_shutdown).await;
});
// Listen for shutdown signals
let shutdown_state = state.clone();
tokio::spawn(async move {
wait_for_shutdown_signal().await;
info!("received shutdown signal, initiating graceful shutdown");
let _ = shutdown_state.shutdown_tx.send(());
// Give sub-tasks a moment to clean up
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
// Kill any running agent subprocesses
for entry in shutdown_state.agents.iter() {
let handle = entry.value();
info!(agent_id = %handle.id, "killing agent");
let _ = handle.kill().await;
}
});
// Wait for both the socket server and the daemon loop to finish
let (socket_result, loop_result) = tokio::join!(socket_handle, loop_handle);
socket_result??;
loop_result?;
info!("colibri-daemon shut down cleanly");
Ok(())
}
/// Wait for a process shutdown signal.
///
/// On Unix this resolves on **either** SIGTERM (what `service stop`/`restart`
/// sends — daemon(8) forwards it to this child) or SIGINT (Ctrl+C). Handling
/// SIGTERM is what lets the graceful path run on a normal `service stop`: the
/// socket is removed (see `socket::serve`) and spawned agents are reaped. Awaiting
/// only `ctrl_c()` (SIGINT) would leave SIGTERM on its default disposition —
/// immediate kill with no cleanup, leaking the socket file and child agents.
#[cfg(unix)]
async fn wait_for_shutdown_signal() {
use tokio::signal::unix::{signal, SignalKind};
let mut sigterm = match signal(SignalKind::terminate()) {
Ok(s) => s,
Err(e) => {
tracing::error!(error = %e, "failed to install SIGTERM handler; falling back to SIGINT only");
tokio::signal::ctrl_c().await.ok();
return;
}
};
let mut sigint = match signal(SignalKind::interrupt()) {
Ok(s) => s,
Err(e) => {
tracing::error!(error = %e, "failed to install SIGINT handler; waiting on SIGTERM only");
sigterm.recv().await;
return;
}
};
tokio::select! {
_ = sigterm.recv() => info!("received SIGTERM"),
_ = sigint.recv() => info!("received SIGINT"),
}
}
#[cfg(not(unix))]
async fn wait_for_shutdown_signal() {
tokio::signal::ctrl_c().await.ok();
}