Normalize markdown formatting after the latest main updates.\n\nChecks: python3 scripts/layered_soul.py validate .; npx --yes prettier@3 --check '**/*.md'; git diff --check.
4.1 KiB
Scheduler Module Integration Pattern
How the scheduler was added to colibri-daemon (2026-05-27) as part of T1.3
(Phase 4 — execution & scheduling).
Module Structure
crates/colibri-daemon/src/scheduler.rs
— Schedule enum (once, interval, cron)
— ScheduledJob struct
— TaskRequest struct (intake)
— Scheduler (job table + intake queue)
— leader/delegate: capability_match_score, pick_agent
— cron_matches (simplified 5-field matcher)
Integration Steps
Step 1: Create the module
// crates/colibri-daemon/src/scheduler.rs
pub struct Scheduler {
pub jobs: Vec<ScheduledJob>,
pub intake_queue: Vec<TaskRequest>,
}
impl Default for Scheduler { ... }
Step 2: Register in lib.rs
pub mod scheduler;
Step 3: Add to DaemonState
pub scheduler: Mutex<Scheduler>,
Uses tokio::sync::Mutex (async-aware), not std::sync::Mutex.
Step 4: Add scheduler_interval to DaemonLoopConfig
pub scheduler_interval: Duration, // default: 30s
Step 5: Add 4th tick to run_loop
let mut scheduler_tick = tokio::time::interval(loop_config.scheduler_interval);
scheduler_tick.tick().await; // skip initial burst
loop {
tokio::select! {
// ... existing ticks ...
_ = scheduler_tick.tick() => {
scheduler_tick_fn(&state).await;
}
}
}
Step 6: Add socket commands
Extend HerdrCommand with IntakeTask variant:
#[serde(rename = "intake-task")]
IntakeTask { title, description, capabilities }
Handler submits to scheduler intake queue:
scheduler.submit(TaskRequest { title, description, required_capabilities });
Step 7: Start daemon::run_loop from main.rs (CRITICAL)
The scheduler tick runs inside daemon::run_loop. If main.rs only spawns
socket::serve but never spawns daemon::run_loop, the scheduler never ticks.
Jobs queued via intake-task sit in the in-memory queue forever.
// crates/colibri-daemon/src/main.rs
let loop_handle = tokio::spawn(async move {
daemon::run_loop(loop_state, loop_config, loop_shutdown).await;
});
Verify: after intake-task → wait 30s → list-tasks should show the task.
If list-tasks returns empty after 30s, the loop isn't running.
Step 8: Update DaemonLoopConfig default test
The test in daemon.rs tests DaemonLoopConfig::default() — add scheduler_interval assertion.
Capability Matching
pub fn pick_agent(required: &[String], agents: &[Agent]) -> Option<&Agent>
- Filters agents to
idleoractiveonly - Scores by overlap between required tags and agent capabilities
- Highest score wins; ties go to first found
Cron Format
Simplified 5-field: "min hour dom month dow". Supports * wildcard.
%uformat for day-of-week (1=Monday, 7=Sunday)- Leading zeros stripped for comparison (
"00"matches"0")
Pitfalls
- tokio::sync::Mutex vs std::sync::Mutex: The scheduler uses
tokio::sync::Mutexbecausetick()is async and holds the lock across an.await.std::sync::Mutexwould block the async runtime. - Cron zero-padding:
chrono::format("%M")produces "00" for minute 0. Strip leading zeros before comparing with cron fields. - Test count verification: After adding tests, verify the actual count from
cargo test --workspaceoutput (sumN passedper crate). Do NOT use--list— it overcounts helper modules. The scheduler added 12 tests to the daemon crate. - Store deadlock in tick(): Do NOT hold
state.store.lock()across a code branch that also tries to lock the store again. The intake drain loop and job-firing loop both access the store — release the lock between operations. Symptom:list-taskstimes out after a successfulintake-task. Fix: structure the drain loop to collect task data first, then lock the store once to create+assign tasks. - daemon::run_loop not started: The scheduler tick runs inside
daemon::run_loop. Ifmain.rsonly spawns the socket server,intake-taskqueues into memory but never reaches SQLite. Always verifymain.rsspawns bothsocket::serveanddaemon::run_loop.