diff --git a/crates/colibri-daemon/src/socket.rs b/crates/colibri-daemon/src/socket.rs index 0f914a5..29042e2 100644 --- a/crates/colibri-daemon/src/socket.rs +++ b/crates/colibri-daemon/src/socket.rs @@ -438,15 +438,24 @@ pub async fn autospawn_agent_if_configured(state: &SharedState) { return; } + // Default argv depends on the harness: zot is a request/response peer + // driven over stdin (`zot rpc`), while pi self-drives (`pi --mode json`). + // `--mode json` is NOT a valid zot flag, so a single default would break one + // of them — pick per binary. Override with COLIBRI_AUTOSPAWN_ARGS. + let default_args = if agent_name == "zot" { + "rpc" + } else { + "--mode json" + }; let args: Vec = std::env::var("COLIBRI_AUTOSPAWN_ARGS") .ok() .filter(|s| !s.trim().is_empty()) - .unwrap_or_else(|| "--mode json".to_string()) + .unwrap_or_else(|| default_args.to_string()) .split_whitespace() .map(str::to_string) .collect(); - info!(binary = %agent_binary, ?args, "autospawn-pi: spawning Pi agent on host (DeepSeek-backed)"); + info!(binary = %agent_binary, ?args, "autospawn: spawning agent on host (DeepSeek-backed)"); // Collect hardware profile via clawdie-hw-probe (non-blocking). // Pass it to the spawned agent as CLAWDIE_HW_PROFILE so the agent can @@ -460,11 +469,11 @@ pub async fn autospawn_agent_if_configured(state: &SharedState) { if !stdout.is_empty() { info!( probe_bytes = output.stdout.len(), - "autospawn-pi: collected hardware profile" + "autospawn: collected hardware profile" ); extra_env.insert("CLAWDIE_HW_PROFILE".to_string(), stdout); } else { - warn!("autospawn-pi: clawdie-hw-probe returned empty stdout"); + warn!("autospawn: clawdie-hw-probe returned empty stdout"); } } Ok(output) => { @@ -472,18 +481,18 @@ pub async fn autospawn_agent_if_configured(state: &SharedState) { warn!( status = %output.status, stderr = %stderr.trim(), - "autospawn-pi: clawdie-hw-probe failed (continuing without hw profile)" + "autospawn: clawdie-hw-probe failed (continuing without hw profile)" ); } Err(e) => { warn!( error = %e, - "autospawn-pi: failed to run clawdie-hw-probe (continuing without hw profile)" + "autospawn: failed to run clawdie-hw-probe (continuing without hw profile)" ); } } } else { - debug!("autospawn-pi: clawdie-hw-probe not found at {probe_binary}; skipping hw profile"); + debug!("autospawn: clawdie-hw-probe not found at {probe_binary}; skipping hw profile"); } // provider=local → binary is the Pi executable; jail=None → host-spawn. @@ -500,11 +509,39 @@ pub async fn autospawn_agent_if_configured(state: &SharedState) { .await; if resp.ok { - info!("autospawn-pi: Pi agent spawned"); + info!("autospawn: agent spawned"); + + // An RPC agent (zot rpc) idles until it gets a prompt on stdin. Send a + // one-time bootstrap so it comes online and emits events — opt-in via + // COLIBRI_AUTOSPAWN_RPC_PROMPT so the default boot spends no tokens. + if let Some(prompt) = std::env::var("COLIBRI_AUTOSPAWN_RPC_PROMPT") + .ok() + .filter(|s| !s.trim().is_empty()) + { + // Resolve the agent id from the spawn response, then clone the + // RpcSender out and drop the registry guard before the async write. + let agent_id = resp + .data + .as_ref() + .and_then(|d| d.get("agent_id")) + .and_then(|v| v.as_str()) + .map(str::to_string); + let sender = agent_id + .as_ref() + .and_then(|id| state.agents.get(id).and_then(|e| e.value().rpc_sender())); + if let Some(sender) = sender { + match sender.send_prompt(&prompt).await { + Ok(id) => { + info!(rpc_id = %id, "autospawn: sent bootstrap prompt to rpc agent") + } + Err(e) => warn!(error = %e, "autospawn: bootstrap prompt failed"), + } + } + } } else { warn!( error = resp.error.as_deref().unwrap_or("unknown"), - "autospawn-pi: spawn failed (continuing; operator can spawn manually)" + "autospawn: spawn failed (continuing; operator can spawn manually)" ); } } @@ -559,6 +596,10 @@ async fn cmd_spawn_agent( ) }; + // An agent invoked in RPC mode (`zot rpc` / `--rpc`) blocks on stdin until + // it receives a prompt, so the spawner must pipe stdin and keep the writer. + let rpc_stdin = args.iter().any(|a| a == "rpc" || a == "--rpc"); + let agent_config = AgentSpawnConfig { binary, args, @@ -567,6 +608,7 @@ async fn cmd_spawn_agent( session_id: session_id.clone(), system_prompt, jail, + rpc_stdin, ..Default::default() }; diff --git a/crates/colibri-daemon/src/spawner.rs b/crates/colibri-daemon/src/spawner.rs index abc0db6..f9f1ab6 100644 --- a/crates/colibri-daemon/src/spawner.rs +++ b/crates/colibri-daemon/src/spawner.rs @@ -22,7 +22,8 @@ use std::time::Duration; use backon::{ExponentialBuilder, Retryable}; use serde::{Deserialize, Serialize}; use thiserror::Error; -use tokio::process::{Child, ChildStdout, Command}; +use tokio::io::AsyncWriteExt; +use tokio::process::{Child, ChildStdin, ChildStdout, Command}; use tokio::sync::{Mutex, RwLock}; use tracing::{error, info, warn}; use uuid::Uuid; @@ -303,22 +304,76 @@ async fn remove_staged_dir(path: &Path) { let _ = tokio::fs::remove_dir_all(path).await; } +/// A cloneable sender for an RPC agent's stdin. Decoupled from `AgentHandle` +/// (and thus the agent registry) so the async write does not hold a registry +/// lock. Cheap to clone (two `Arc`s). +#[derive(Clone)] +pub struct RpcSender { + stdin: Arc>>, + seq: Arc, +} + +impl RpcSender { + /// Send one prompt as a newline-delimited request on the agent's stdin. + /// Returns the request id used. Errors if the stdin pipe has closed. + pub async fn send_prompt(&self, message: &str) -> std::io::Result { + let id = (self + .seq + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + + 1) + .to_string(); + let line = build_rpc_prompt(&id, message); + let mut guard = self.stdin.lock().await; + let stdin = guard.as_mut().ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "rpc agent stdin is closed (or was never piped)", + ) + })?; + stdin.write_all(line.as_bytes()).await?; + stdin.flush().await?; + Ok(id) + } +} + +/// Build one newline-terminated RPC prompt request line in zot's wire format: +/// a bare JSON object `{"id","type":"prompt","message"}` (no JSON-RPC envelope), +/// matching the transcript in `docs/ZOT-RPC-TRANSCRIPT.md`. `serde_json` escapes +/// the message, so embedded quotes/newlines are safe. +fn build_rpc_prompt(id: &str, message: &str) -> String { + let obj = serde_json::json!({ + "id": id, + "type": "prompt", + "message": message, + }); + format!("{obj}\n") +} + async fn spawn_prepared_child( prepared: &PreparedSpawnCommand, agent_config: &AgentSpawnConfig, -) -> Result<(Child, Option), SpawnerError> { +) -> Result<(Child, Option, Option), SpawnerError> { let program = resolve_program(&prepared.program); info!( program = %program, requested = %prepared.program, args = ?prepared.argv, + rpc_stdin = agent_config.rpc_stdin, path = %std::env::var("PATH").unwrap_or_default(), "spawning agent subprocess" ); let mut cmd = Command::new(&program); + // An RPC agent (e.g. `zot rpc`) is a request/response peer: it blocks on + // stdin until it gets a prompt, so we pipe stdin and keep the writer. A + // self-driving agent (e.g. `pi --mode json`) gets a null stdin. + let stdin_mode = if agent_config.rpc_stdin { + Stdio::piped() + } else { + Stdio::null() + }; cmd.args(&prepared.argv) .envs(&prepared.env) - .stdin(Stdio::null()) + .stdin(stdin_mode) .stdout(Stdio::piped()) .stderr(Stdio::piped()); @@ -353,7 +408,8 @@ async fn spawn_prepared_child( } Ok(None) => { let stdout = child.stdout.take(); - Ok((child, stdout)) + let stdin = child.stdin.take(); + Ok((child, stdout, stdin)) } Err(e) => Err(SpawnerError::Io(e)), } @@ -521,6 +577,13 @@ pub struct AgentSpawnConfig { /// System prompt override. #[serde(default)] pub system_prompt: Option, + /// Drive the agent over its stdin as a request/response RPC peer + /// (e.g. `zot rpc`) instead of letting it run autonomously. When set, the + /// spawner pipes stdin and keeps the handle so the daemon can send prompts; + /// without it stdin is null and the agent must be a self-driving emitter + /// (e.g. `pi --mode json`). + #[serde(default)] + pub rpc_stdin: bool, /// Maximum retries for spawn failures. #[serde(default = "default_max_retries")] pub max_retries: u32, @@ -553,6 +616,7 @@ impl Default for AgentSpawnConfig { model: String::new(), session_id: None, system_prompt: None, + rpc_stdin: false, max_retries: default_max_retries(), startup_timeout_secs: default_startup_timeout_secs(), } @@ -585,6 +649,13 @@ pub struct AgentHandle { child: Mutex>, /// Stdout JSONL stream, taken by glasspane supervision after spawn. stdout: Mutex>, + /// Stdin writer for RPC agents (populated only when `config.rpc_stdin`). The + /// daemon writes newline-delimited RPC requests here; empty for self-driving + /// agents whose stdin is null. Arc-backed so an `RpcSender` can be cloned out + /// without holding the agent registry lock across the async write. + stdin: Arc>>, + /// Monotonic counter for RPC request ids on this agent. + rpc_seq: Arc, /// Creation timestamp. pub created_at: String, /// Optional staged jail payload directory to remove when the child exits. @@ -603,6 +674,8 @@ impl AgentHandle { status: RwLock::new(AgentStatus::Starting), child: Mutex::new(None), stdout: Mutex::new(None), + stdin: Arc::new(Mutex::new(None)), + rpc_seq: Arc::new(std::sync::atomic::AtomicU64::new(0)), created_at: chrono::Utc::now().to_rfc3339(), staging_dir, } @@ -613,6 +686,43 @@ impl AgentHandle { self.stdout.lock().await.take() } + /// Store the stdin writer for an RPC agent (called by the spawner). + pub async fn set_stdin(&self, stdin: Option) { + *self.stdin.lock().await = stdin; + } + + /// Whether this agent is driven over stdin (vs a self-driving emitter). + pub fn is_rpc(&self) -> bool { + self.config.rpc_stdin + } + + /// A cloneable handle for sending RPC prompts, decoupled from the agent + /// registry: callers clone this out, drop the registry guard, then write — + /// so the async write never holds the `DashMap` lock. `None` for + /// self-driving (non-RPC) agents. + pub fn rpc_sender(&self) -> Option { + if self.config.rpc_stdin { + Some(RpcSender { + stdin: self.stdin.clone(), + seq: self.rpc_seq.clone(), + }) + } else { + None + } + } + + /// Convenience: send one prompt to this RPC agent. Errors if non-RPC or the + /// stdin pipe has closed. + pub async fn send_rpc_prompt(&self, message: &str) -> std::io::Result { + match self.rpc_sender() { + Some(sender) => sender.send_prompt(message).await, + None => Err(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "agent was not spawned with rpc_stdin", + )), + } + } + /// Kill the agent subprocess. /// /// For a jailed agent the tracked child is the wrapper (`mdo`/`jexec`/ @@ -800,13 +910,14 @@ impl Spawner { op.retry(backoff) .await - .map(|(child, stdout)| (handle, child, stdout)) + .map(|(child, stdout, stdin)| (handle, child, stdout, stdin)) }; match spawn_result { - Ok((handle, child, stdout)) => { + Ok((handle, child, stdout, stdin)) => { *handle.child.lock().await = Some(child); *handle.stdout.lock().await = stdout; + handle.set_stdin(stdin).await; *handle.status.write().await = AgentStatus::Running; info!(agent_id = %agent_id, provider = ?provider, "agent spawned successfully"); return Ok(handle); @@ -1081,6 +1192,54 @@ mod jail_tests { } } +#[cfg(test)] +mod rpc_prompt_tests { + use super::*; + + #[test] + fn build_rpc_prompt_shape() { + let line = build_rpc_prompt("1", "check the current directory"); + assert!(line.ends_with('\n'), "request must be newline-terminated"); + let v: serde_json::Value = serde_json::from_str(line.trim_end()).unwrap(); + assert_eq!(v["id"], "1"); + assert_eq!(v["type"], "prompt"); + assert_eq!(v["message"], "check the current directory"); + } + + #[test] + fn build_rpc_prompt_escapes_quotes_and_newlines() { + // A naive format! would produce invalid JSON here; serde must escape. + let line = build_rpc_prompt("7", "say \"hi\"\nand bye"); + let v: serde_json::Value = serde_json::from_str(line.trim_end()).unwrap(); + assert_eq!(v["message"], "say \"hi\"\nand bye"); + } + + #[test] + fn rpc_stdin_default_is_false() { + assert!(!AgentSpawnConfig::default().rpc_stdin); + } + + #[test] + fn non_rpc_agent_has_no_sender() { + let cfg = AgentSpawnConfig { + binary: "pi".to_string(), + rpc_stdin: false, + ..Default::default() + }; + assert!(AgentHandle::new(cfg).rpc_sender().is_none()); + } + + #[test] + fn rpc_agent_exposes_sender() { + let cfg = AgentSpawnConfig { + binary: "zot".to_string(), + rpc_stdin: true, + ..Default::default() + }; + assert!(AgentHandle::new(cfg).rpc_sender().is_some()); + } +} + #[cfg(test)] mod staged_env_tests { use super::*;