diff --git a/src/group-queue.test.ts b/src/group-queue.test.ts index 609f5ba..999a88a 100644 --- a/src/group-queue.test.ts +++ b/src/group-queue.test.ts @@ -43,6 +43,7 @@ describe('GroupQueue', () => { beforeEach(() => { vi.useFakeTimers(); + vi.clearAllMocks(); queue = new GroupQueue(); }); @@ -341,6 +342,7 @@ describe('GroupQueue', () => { {} as any, 'container-1', 'test-group', + { acceptsFollowupIpc: true }, ); queue.notifyIdle('tg:100000001'); @@ -380,6 +382,7 @@ describe('GroupQueue', () => { {} as any, 'container-1', 'test-group', + { acceptsFollowupIpc: true }, ); // Container becomes idle @@ -444,6 +447,7 @@ describe('GroupQueue', () => { {} as any, 'container-1', 'test-group', + { acceptsFollowupIpc: true }, ); const result = await queue.sendMessage('tg:100000001', 'hello'); @@ -482,6 +486,7 @@ describe('GroupQueue', () => { {} as any, 'container-1', 'test-group', + { acceptsFollowupIpc: true }, ); const writeFileSync = vi.mocked(fs.default.writeFileSync); @@ -507,4 +512,109 @@ describe('GroupQueue', () => { resolveProcess!(); await vi.advanceTimersByTimeAsync(10); }); + + it('queues follow-up messages for active one-shot runners instead of writing IPC', async () => { + const fs = await import('fs'); + let resolveProcess: () => void; + + const processMessages = vi.fn(async () => { + await new Promise((resolve) => { + resolveProcess = resolve; + }); + return true; + }); + + queue.setProcessMessagesFn(processMessages); + queue.enqueueMessageCheck('tg:100000001'); + await vi.advanceTimersByTimeAsync(10); + + queue.registerProcess( + 'tg:100000001', + {} as any, + 'container-1', + 'test-group', + ); + + const result = await queue.sendMessage('tg:100000001', 'follow-up'); + expect(result).toBe(false); + + const writeFileSync = vi.mocked(fs.default.writeFileSync); + const payloadWrites = writeFileSync.mock.calls.filter( + (call) => typeof call[0] === 'string' && String(call[0]).endsWith('.tmp'), + ); + expect(payloadWrites).toHaveLength(0); + + queue.enqueueMessageCheck('tg:100000001'); + expect(processMessages).toHaveBeenCalledTimes(1); + + resolveProcess!(); + await vi.advanceTimersByTimeAsync(10); + expect(processMessages).toHaveBeenCalledTimes(2); + }); + + it('does not write close sentinels for one-shot runners', async () => { + const fs = await import('fs'); + + queue.registerProcess( + 'tg:100000001', + {} as any, + 'container-1', + 'test-group', + ); + + queue.closeStdin('tg:100000001'); + + const writeFileSync = vi.mocked(fs.default.writeFileSync); + const closeWrites = writeFileSync.mock.calls.filter( + (call) => typeof call[0] === 'string' && call[0].endsWith('_close'), + ); + expect(closeWrites).toHaveLength(0); + }); + + it('returns false if an IPC-capable process exits while follow-up enrichment is running', async () => { + const fs = await import('fs'); + const skillsPg = await import('./skills-pg.js'); + let resolveProcess: () => void; + let resolveEnrich: () => void; + + const processMessages = vi.fn(async () => { + await new Promise((resolve) => { + resolveProcess = resolve; + }); + return true; + }); + vi.mocked(skillsPg.enrichPromptWithBuiltinKnowledge).mockImplementationOnce( + async (text: string) => { + await new Promise((resolve) => { + resolveEnrich = resolve; + }); + return `[skills]\n${text}`; + }, + ); + + queue.setProcessMessagesFn(processMessages); + queue.enqueueMessageCheck('tg:100000001'); + await vi.advanceTimersByTimeAsync(10); + queue.registerProcess( + 'tg:100000001', + {} as any, + 'container-1', + 'test-group', + { acceptsFollowupIpc: true }, + ); + + const sendPromise = queue.sendMessage('tg:100000001', 'follow-up'); + await Promise.resolve(); + + resolveProcess!(); + await vi.advanceTimersByTimeAsync(10); + resolveEnrich!(); + + await expect(sendPromise).resolves.toBe(false); + const writeFileSync = vi.mocked(fs.default.writeFileSync); + const payloadWrites = writeFileSync.mock.calls.filter( + (call) => typeof call[0] === 'string' && String(call[0]).endsWith('.tmp'), + ); + expect(payloadWrites).toHaveLength(0); + }); }); diff --git a/src/group-queue.ts b/src/group-queue.ts index 4d86bf3..4cbdfe5 100644 --- a/src/group-queue.ts +++ b/src/group-queue.ts @@ -15,10 +15,77 @@ interface QueuedTask { const MAX_RETRIES = 5; const BASE_RETRY_MS = 5000; +function safeQuarantineName(input: string): string { + return input.replace(/[^a-zA-Z0-9_.-]/g, '-'); +} + +export function quarantineStrandedIpcInputs(dataDir = DATA_DIR): string[] { + const ipcBaseDir = path.join(dataDir, 'ipc'); + const moved: string[] = []; + + if (!fs.existsSync(ipcBaseDir)) return moved; + + let groupFolders: string[]; + try { + groupFolders = fs.readdirSync(ipcBaseDir); + } catch (err) { + logger.warn({ err, ipcBaseDir }, 'Failed to scan IPC directory'); + return moved; + } + + const quarantineDir = path.join(ipcBaseDir, 'quarantine'); + for (const groupFolder of groupFolders) { + if (groupFolder === 'quarantine' || groupFolder === 'errors') continue; + + const inputDir = path.join(ipcBaseDir, groupFolder, 'input'); + let stat: ReturnType; + try { + stat = fs.statSync(inputDir); + } catch { + continue; + } + if (!stat.isDirectory()) continue; + + let files: string[]; + try { + files = fs.readdirSync(inputDir).filter( + (file) => file === '_close' || file.endsWith('.json'), + ); + } catch (err) { + logger.warn({ err, inputDir }, 'Failed to scan IPC input directory'); + continue; + } + + for (const file of files) { + const source = path.join(inputDir, file); + const dest = path.join( + quarantineDir, + `${Date.now()}-${safeQuarantineName(groupFolder)}-${safeQuarantineName(file)}`, + ); + try { + fs.mkdirSync(quarantineDir, { recursive: true }); + fs.renameSync(source, dest); + moved.push(dest); + } catch (err) { + logger.warn({ err, source, dest }, 'Failed to quarantine stranded IPC input'); + } + } + } + + if (moved.length > 0) { + logger.warn( + { count: moved.length, files: moved }, + 'Quarantined stranded IPC input files from previous runs', + ); + } + return moved; +} + interface GroupState { active: boolean; idleWaiting: boolean; isTaskJail: boolean; + acceptsFollowupIpc: boolean; pendingMessages: boolean; pendingTasks: QueuedTask[]; process: ChildProcess | null; @@ -28,6 +95,10 @@ interface GroupState { retryTimer: NodeJS.Timeout | null; } +export interface RegisterProcessOptions { + acceptsFollowupIpc?: boolean; +} + export class GroupQueue { private groups = new Map(); private activeCount = 0; @@ -45,6 +116,7 @@ export class GroupQueue { active: false, idleWaiting: false, isTaskJail: false, + acceptsFollowupIpc: false, pendingMessages: false, pendingTasks: [], process: null, @@ -165,11 +237,13 @@ export class GroupQueue { proc: ChildProcess, jailRunId: string, groupFolder?: string, + options: RegisterProcessOptions = {}, ): void { const state = this.getGroup(groupJid); state.process = proc; state.jailRunId = jailRunId; state.active = true; + state.acceptsFollowupIpc = options.acceptsFollowupIpc === true; if (groupFolder) state.groupFolder = groupFolder; } @@ -186,17 +260,39 @@ export class GroupQueue { } /** - * Send a follow-up message to the active jail via IPC file. - * Returns true if the message was written, false if no active jail. + * Send a follow-up message to an active IPC-capable runner. + * Returns true only for a transport write; callers must not treat this as a + * processing acknowledgement unless the runner also provides one. */ async sendMessage(groupJid: string, text: string): Promise { const state = this.getGroup(groupJid); - if (!state.active || !state.groupFolder || state.isTaskJail) return false; - state.idleWaiting = false; // Agent is about to receive work, no longer idle + if ( + !state.active || + !state.groupFolder || + state.isTaskJail || + !state.acceptsFollowupIpc + ) { + return false; + } - const inputDir = path.join(DATA_DIR, 'ipc', state.groupFolder, 'input'); + const jailRunId = state.jailRunId; + const groupFolder = state.groupFolder; + const inputDir = path.join(DATA_DIR, 'ipc', groupFolder, 'input'); try { const enrichedText = await enrichPromptWithBuiltinKnowledge(text); + + const current = this.getGroup(groupJid); + if ( + !current.active || + current.jailRunId !== jailRunId || + current.groupFolder !== groupFolder || + current.isTaskJail || + !current.acceptsFollowupIpc + ) { + return false; + } + + current.idleWaiting = false; // Agent is about to receive work, no longer idle fs.mkdirSync(inputDir, { recursive: true }); const filename = `${Date.now()}-${Math.random().toString(36).slice(2, 6)}.json`; const filepath = path.join(inputDir, filename); @@ -217,7 +313,7 @@ export class GroupQueue { */ closeStdin(groupJid: string): void { const state = this.getGroup(groupJid); - if (!state.active || !state.groupFolder) return; + if (!state.active || !state.groupFolder || !state.acceptsFollowupIpc) return; const inputDir = path.join(DATA_DIR, 'ipc', state.groupFolder, 'input'); try { @@ -283,6 +379,7 @@ export class GroupQueue { state.process = null; state.jailRunId = null; state.groupFolder = null; + state.acceptsFollowupIpc = false; this.activeCount--; this.drainGroup(groupJid); } @@ -310,6 +407,7 @@ export class GroupQueue { state.process = null; state.jailRunId = null; state.groupFolder = null; + state.acceptsFollowupIpc = false; this.activeCount--; this.drainGroup(groupJid); } diff --git a/src/index.ts b/src/index.ts index 445fe77..678fcc9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -100,7 +100,7 @@ import { storeMessage, getPool as getOpsPool, } from './db.js'; -import { GroupQueue } from './group-queue.js'; +import { GroupQueue, quarantineStrandedIpcInputs } from './group-queue.js'; import { resolveGroupFolderPath } from './group-folder.js'; import { decideRootAdminBootstrapGroup } from './root-admin-bootstrap.js'; import { augmentPromptWithVision } from './vision.js'; @@ -1294,6 +1294,7 @@ async function main(): Promise { markDbInitialized(); logger.info('Database initialized'); await loadState(); + quarantineStrandedIpcInputs(); const rootAdminBootstrap = decideRootAdminBootstrapGroup({ tenantId: TENANT_ID.trim(), assistantName: ASSISTANT_NAME,