feat(daemon): zot-rpc spawn driver + binary-aware autospawn args (#143) #157

Merged
clawdie merged 1 commit from zot-rpc-spawn-driver into main 2026-06-24 00:36:15 +02:00
2 changed files with 216 additions and 15 deletions
Showing only changes of commit 0e9b16456b - Show all commits

View file

@ -438,15 +438,24 @@ pub async fn autospawn_agent_if_configured(state: &SharedState) {
return;
}
// Default argv depends on the harness: zot is a request/response peer
// driven over stdin (`zot rpc`), while pi self-drives (`pi --mode json`).
// `--mode json` is NOT a valid zot flag, so a single default would break one
// of them — pick per binary. Override with COLIBRI_AUTOSPAWN_ARGS.
let default_args = if agent_name == "zot" {
"rpc"
} else {
"--mode json"
};
let args: Vec<String> = std::env::var("COLIBRI_AUTOSPAWN_ARGS")
.ok()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| "--mode json".to_string())
.unwrap_or_else(|| default_args.to_string())
.split_whitespace()
.map(str::to_string)
.collect();
info!(binary = %agent_binary, ?args, "autospawn-pi: spawning Pi agent on host (DeepSeek-backed)");
info!(binary = %agent_binary, ?args, "autospawn: spawning agent on host (DeepSeek-backed)");
// Collect hardware profile via clawdie-hw-probe (non-blocking).
// Pass it to the spawned agent as CLAWDIE_HW_PROFILE so the agent can
@ -460,11 +469,11 @@ pub async fn autospawn_agent_if_configured(state: &SharedState) {
if !stdout.is_empty() {
info!(
probe_bytes = output.stdout.len(),
"autospawn-pi: collected hardware profile"
"autospawn: collected hardware profile"
);
extra_env.insert("CLAWDIE_HW_PROFILE".to_string(), stdout);
} else {
warn!("autospawn-pi: clawdie-hw-probe returned empty stdout");
warn!("autospawn: clawdie-hw-probe returned empty stdout");
}
}
Ok(output) => {
@ -472,18 +481,18 @@ pub async fn autospawn_agent_if_configured(state: &SharedState) {
warn!(
status = %output.status,
stderr = %stderr.trim(),
"autospawn-pi: clawdie-hw-probe failed (continuing without hw profile)"
"autospawn: clawdie-hw-probe failed (continuing without hw profile)"
);
}
Err(e) => {
warn!(
error = %e,
"autospawn-pi: failed to run clawdie-hw-probe (continuing without hw profile)"
"autospawn: failed to run clawdie-hw-probe (continuing without hw profile)"
);
}
}
} else {
debug!("autospawn-pi: clawdie-hw-probe not found at {probe_binary}; skipping hw profile");
debug!("autospawn: clawdie-hw-probe not found at {probe_binary}; skipping hw profile");
}
// provider=local → binary is the Pi executable; jail=None → host-spawn.
@ -500,11 +509,39 @@ pub async fn autospawn_agent_if_configured(state: &SharedState) {
.await;
if resp.ok {
info!("autospawn-pi: Pi agent spawned");
info!("autospawn: agent spawned");
// An RPC agent (zot rpc) idles until it gets a prompt on stdin. Send a
// one-time bootstrap so it comes online and emits events — opt-in via
// COLIBRI_AUTOSPAWN_RPC_PROMPT so the default boot spends no tokens.
if let Some(prompt) = std::env::var("COLIBRI_AUTOSPAWN_RPC_PROMPT")
.ok()
.filter(|s| !s.trim().is_empty())
{
// Resolve the agent id from the spawn response, then clone the
// RpcSender out and drop the registry guard before the async write.
let agent_id = resp
.data
.as_ref()
.and_then(|d| d.get("agent_id"))
.and_then(|v| v.as_str())
.map(str::to_string);
let sender = agent_id
.as_ref()
.and_then(|id| state.agents.get(id).and_then(|e| e.value().rpc_sender()));
if let Some(sender) = sender {
match sender.send_prompt(&prompt).await {
Ok(id) => {
info!(rpc_id = %id, "autospawn: sent bootstrap prompt to rpc agent")
}
Err(e) => warn!(error = %e, "autospawn: bootstrap prompt failed"),
}
}
}
} else {
warn!(
error = resp.error.as_deref().unwrap_or("unknown"),
"autospawn-pi: spawn failed (continuing; operator can spawn manually)"
"autospawn: spawn failed (continuing; operator can spawn manually)"
);
}
}
@ -559,6 +596,10 @@ async fn cmd_spawn_agent(
)
};
// An agent invoked in RPC mode (`zot rpc` / `--rpc`) blocks on stdin until
// it receives a prompt, so the spawner must pipe stdin and keep the writer.
let rpc_stdin = args.iter().any(|a| a == "rpc" || a == "--rpc");
let agent_config = AgentSpawnConfig {
binary,
args,
@ -567,6 +608,7 @@ async fn cmd_spawn_agent(
session_id: session_id.clone(),
system_prompt,
jail,
rpc_stdin,
..Default::default()
};

View file

@ -22,7 +22,8 @@ use std::time::Duration;
use backon::{ExponentialBuilder, Retryable};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::process::{Child, ChildStdout, Command};
use tokio::io::AsyncWriteExt;
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use tokio::sync::{Mutex, RwLock};
use tracing::{error, info, warn};
use uuid::Uuid;
@ -303,22 +304,76 @@ async fn remove_staged_dir(path: &Path) {
let _ = tokio::fs::remove_dir_all(path).await;
}
/// A cloneable sender for an RPC agent's stdin. Decoupled from `AgentHandle`
/// (and thus the agent registry) so the async write does not hold a registry
/// lock. Cheap to clone (two `Arc`s).
#[derive(Clone)]
pub struct RpcSender {
stdin: Arc<Mutex<Option<ChildStdin>>>,
seq: Arc<std::sync::atomic::AtomicU64>,
}
impl RpcSender {
/// Send one prompt as a newline-delimited request on the agent's stdin.
/// Returns the request id used. Errors if the stdin pipe has closed.
pub async fn send_prompt(&self, message: &str) -> std::io::Result<String> {
let id = (self
.seq
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
+ 1)
.to_string();
let line = build_rpc_prompt(&id, message);
let mut guard = self.stdin.lock().await;
let stdin = guard.as_mut().ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"rpc agent stdin is closed (or was never piped)",
)
})?;
stdin.write_all(line.as_bytes()).await?;
stdin.flush().await?;
Ok(id)
}
}
/// Build one newline-terminated RPC prompt request line in zot's wire format:
/// a bare JSON object `{"id","type":"prompt","message"}` (no JSON-RPC envelope),
/// matching the transcript in `docs/ZOT-RPC-TRANSCRIPT.md`. `serde_json` escapes
/// the message, so embedded quotes/newlines are safe.
fn build_rpc_prompt(id: &str, message: &str) -> String {
let obj = serde_json::json!({
"id": id,
"type": "prompt",
"message": message,
});
format!("{obj}\n")
}
async fn spawn_prepared_child(
prepared: &PreparedSpawnCommand,
agent_config: &AgentSpawnConfig,
) -> Result<(Child, Option<ChildStdout>), SpawnerError> {
) -> Result<(Child, Option<ChildStdout>, Option<ChildStdin>), SpawnerError> {
let program = resolve_program(&prepared.program);
info!(
program = %program,
requested = %prepared.program,
args = ?prepared.argv,
rpc_stdin = agent_config.rpc_stdin,
path = %std::env::var("PATH").unwrap_or_default(),
"spawning agent subprocess"
);
let mut cmd = Command::new(&program);
// An RPC agent (e.g. `zot rpc`) is a request/response peer: it blocks on
// stdin until it gets a prompt, so we pipe stdin and keep the writer. A
// self-driving agent (e.g. `pi --mode json`) gets a null stdin.
let stdin_mode = if agent_config.rpc_stdin {
Stdio::piped()
} else {
Stdio::null()
};
cmd.args(&prepared.argv)
.envs(&prepared.env)
.stdin(Stdio::null())
.stdin(stdin_mode)
.stdout(Stdio::piped())
.stderr(Stdio::piped());
@ -353,7 +408,8 @@ async fn spawn_prepared_child(
}
Ok(None) => {
let stdout = child.stdout.take();
Ok((child, stdout))
let stdin = child.stdin.take();
Ok((child, stdout, stdin))
}
Err(e) => Err(SpawnerError::Io(e)),
}
@ -521,6 +577,13 @@ pub struct AgentSpawnConfig {
/// System prompt override.
#[serde(default)]
pub system_prompt: Option<String>,
/// Drive the agent over its stdin as a request/response RPC peer
/// (e.g. `zot rpc`) instead of letting it run autonomously. When set, the
/// spawner pipes stdin and keeps the handle so the daemon can send prompts;
/// without it stdin is null and the agent must be a self-driving emitter
/// (e.g. `pi --mode json`).
#[serde(default)]
pub rpc_stdin: bool,
/// Maximum retries for spawn failures.
#[serde(default = "default_max_retries")]
pub max_retries: u32,
@ -553,6 +616,7 @@ impl Default for AgentSpawnConfig {
model: String::new(),
session_id: None,
system_prompt: None,
rpc_stdin: false,
max_retries: default_max_retries(),
startup_timeout_secs: default_startup_timeout_secs(),
}
@ -585,6 +649,13 @@ pub struct AgentHandle {
child: Mutex<Option<Child>>,
/// Stdout JSONL stream, taken by glasspane supervision after spawn.
stdout: Mutex<Option<ChildStdout>>,
/// Stdin writer for RPC agents (populated only when `config.rpc_stdin`). The
/// daemon writes newline-delimited RPC requests here; empty for self-driving
/// agents whose stdin is null. Arc-backed so an `RpcSender` can be cloned out
/// without holding the agent registry lock across the async write.
stdin: Arc<Mutex<Option<ChildStdin>>>,
/// Monotonic counter for RPC request ids on this agent.
rpc_seq: Arc<std::sync::atomic::AtomicU64>,
/// Creation timestamp.
pub created_at: String,
/// Optional staged jail payload directory to remove when the child exits.
@ -603,6 +674,8 @@ impl AgentHandle {
status: RwLock::new(AgentStatus::Starting),
child: Mutex::new(None),
stdout: Mutex::new(None),
stdin: Arc::new(Mutex::new(None)),
rpc_seq: Arc::new(std::sync::atomic::AtomicU64::new(0)),
created_at: chrono::Utc::now().to_rfc3339(),
staging_dir,
}
@ -613,6 +686,43 @@ impl AgentHandle {
self.stdout.lock().await.take()
}
/// Store the stdin writer for an RPC agent (called by the spawner).
pub async fn set_stdin(&self, stdin: Option<ChildStdin>) {
*self.stdin.lock().await = stdin;
}
/// Whether this agent is driven over stdin (vs a self-driving emitter).
pub fn is_rpc(&self) -> bool {
self.config.rpc_stdin
}
/// A cloneable handle for sending RPC prompts, decoupled from the agent
/// registry: callers clone this out, drop the registry guard, then write —
/// so the async write never holds the `DashMap` lock. `None` for
/// self-driving (non-RPC) agents.
pub fn rpc_sender(&self) -> Option<RpcSender> {
if self.config.rpc_stdin {
Some(RpcSender {
stdin: self.stdin.clone(),
seq: self.rpc_seq.clone(),
})
} else {
None
}
}
/// Convenience: send one prompt to this RPC agent. Errors if non-RPC or the
/// stdin pipe has closed.
pub async fn send_rpc_prompt(&self, message: &str) -> std::io::Result<String> {
match self.rpc_sender() {
Some(sender) => sender.send_prompt(message).await,
None => Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"agent was not spawned with rpc_stdin",
)),
}
}
/// Kill the agent subprocess.
///
/// For a jailed agent the tracked child is the wrapper (`mdo`/`jexec`/
@ -800,13 +910,14 @@ impl Spawner {
op.retry(backoff)
.await
.map(|(child, stdout)| (handle, child, stdout))
.map(|(child, stdout, stdin)| (handle, child, stdout, stdin))
};
match spawn_result {
Ok((handle, child, stdout)) => {
Ok((handle, child, stdout, stdin)) => {
*handle.child.lock().await = Some(child);
*handle.stdout.lock().await = stdout;
handle.set_stdin(stdin).await;
*handle.status.write().await = AgentStatus::Running;
info!(agent_id = %agent_id, provider = ?provider, "agent spawned successfully");
return Ok(handle);
@ -1081,6 +1192,54 @@ mod jail_tests {
}
}
#[cfg(test)]
mod rpc_prompt_tests {
use super::*;
#[test]
fn build_rpc_prompt_shape() {
let line = build_rpc_prompt("1", "check the current directory");
assert!(line.ends_with('\n'), "request must be newline-terminated");
let v: serde_json::Value = serde_json::from_str(line.trim_end()).unwrap();
assert_eq!(v["id"], "1");
assert_eq!(v["type"], "prompt");
assert_eq!(v["message"], "check the current directory");
}
#[test]
fn build_rpc_prompt_escapes_quotes_and_newlines() {
// A naive format! would produce invalid JSON here; serde must escape.
let line = build_rpc_prompt("7", "say \"hi\"\nand bye");
let v: serde_json::Value = serde_json::from_str(line.trim_end()).unwrap();
assert_eq!(v["message"], "say \"hi\"\nand bye");
}
#[test]
fn rpc_stdin_default_is_false() {
assert!(!AgentSpawnConfig::default().rpc_stdin);
}
#[test]
fn non_rpc_agent_has_no_sender() {
let cfg = AgentSpawnConfig {
binary: "pi".to_string(),
rpc_stdin: false,
..Default::default()
};
assert!(AgentHandle::new(cfg).rpc_sender().is_none());
}
#[test]
fn rpc_agent_exposes_sender() {
let cfg = AgentSpawnConfig {
binary: "zot".to_string(),
rpc_stdin: true,
..Default::default()
};
assert!(AgentHandle::new(cfg).rpc_sender().is_some());
}
}
#[cfg(test)]
mod staged_env_tests {
use super::*;