Prevent queued Telegram follow-up loss

---
Build: pass | Tests: pass — 2269 passed (674 files)
This commit is contained in:
Operator & Codex 2026-05-10 08:04:14 +02:00
parent cbc8e31d04
commit b6d72d9353
3 changed files with 216 additions and 7 deletions

View file

@ -43,6 +43,7 @@ describe('GroupQueue', () => {
beforeEach(() => {
vi.useFakeTimers();
vi.clearAllMocks();
queue = new GroupQueue();
});
@ -341,6 +342,7 @@ describe('GroupQueue', () => {
{} as any,
'container-1',
'test-group',
{ acceptsFollowupIpc: true },
);
queue.notifyIdle('tg:100000001');
@ -380,6 +382,7 @@ describe('GroupQueue', () => {
{} as any,
'container-1',
'test-group',
{ acceptsFollowupIpc: true },
);
// Container becomes idle
@ -444,6 +447,7 @@ describe('GroupQueue', () => {
{} as any,
'container-1',
'test-group',
{ acceptsFollowupIpc: true },
);
const result = await queue.sendMessage('tg:100000001', 'hello');
@ -482,6 +486,7 @@ describe('GroupQueue', () => {
{} as any,
'container-1',
'test-group',
{ acceptsFollowupIpc: true },
);
const writeFileSync = vi.mocked(fs.default.writeFileSync);
@ -507,4 +512,109 @@ describe('GroupQueue', () => {
resolveProcess!();
await vi.advanceTimersByTimeAsync(10);
});
it('queues follow-up messages for active one-shot runners instead of writing IPC', async () => {
const fs = await import('fs');
let resolveProcess: () => void;
const processMessages = vi.fn(async () => {
await new Promise<void>((resolve) => {
resolveProcess = resolve;
});
return true;
});
queue.setProcessMessagesFn(processMessages);
queue.enqueueMessageCheck('tg:100000001');
await vi.advanceTimersByTimeAsync(10);
queue.registerProcess(
'tg:100000001',
{} as any,
'container-1',
'test-group',
);
const result = await queue.sendMessage('tg:100000001', 'follow-up');
expect(result).toBe(false);
const writeFileSync = vi.mocked(fs.default.writeFileSync);
const payloadWrites = writeFileSync.mock.calls.filter(
(call) => typeof call[0] === 'string' && String(call[0]).endsWith('.tmp'),
);
expect(payloadWrites).toHaveLength(0);
queue.enqueueMessageCheck('tg:100000001');
expect(processMessages).toHaveBeenCalledTimes(1);
resolveProcess!();
await vi.advanceTimersByTimeAsync(10);
expect(processMessages).toHaveBeenCalledTimes(2);
});
it('does not write close sentinels for one-shot runners', async () => {
const fs = await import('fs');
queue.registerProcess(
'tg:100000001',
{} as any,
'container-1',
'test-group',
);
queue.closeStdin('tg:100000001');
const writeFileSync = vi.mocked(fs.default.writeFileSync);
const closeWrites = writeFileSync.mock.calls.filter(
(call) => typeof call[0] === 'string' && call[0].endsWith('_close'),
);
expect(closeWrites).toHaveLength(0);
});
it('returns false if an IPC-capable process exits while follow-up enrichment is running', async () => {
const fs = await import('fs');
const skillsPg = await import('./skills-pg.js');
let resolveProcess: () => void;
let resolveEnrich: () => void;
const processMessages = vi.fn(async () => {
await new Promise<void>((resolve) => {
resolveProcess = resolve;
});
return true;
});
vi.mocked(skillsPg.enrichPromptWithBuiltinKnowledge).mockImplementationOnce(
async (text: string) => {
await new Promise<void>((resolve) => {
resolveEnrich = resolve;
});
return `[skills]\n${text}`;
},
);
queue.setProcessMessagesFn(processMessages);
queue.enqueueMessageCheck('tg:100000001');
await vi.advanceTimersByTimeAsync(10);
queue.registerProcess(
'tg:100000001',
{} as any,
'container-1',
'test-group',
{ acceptsFollowupIpc: true },
);
const sendPromise = queue.sendMessage('tg:100000001', 'follow-up');
await Promise.resolve();
resolveProcess!();
await vi.advanceTimersByTimeAsync(10);
resolveEnrich!();
await expect(sendPromise).resolves.toBe(false);
const writeFileSync = vi.mocked(fs.default.writeFileSync);
const payloadWrites = writeFileSync.mock.calls.filter(
(call) => typeof call[0] === 'string' && String(call[0]).endsWith('.tmp'),
);
expect(payloadWrites).toHaveLength(0);
});
});

View file

@ -15,10 +15,77 @@ interface QueuedTask {
const MAX_RETRIES = 5;
const BASE_RETRY_MS = 5000;
function safeQuarantineName(input: string): string {
return input.replace(/[^a-zA-Z0-9_.-]/g, '-');
}
export function quarantineStrandedIpcInputs(dataDir = DATA_DIR): string[] {
const ipcBaseDir = path.join(dataDir, 'ipc');
const moved: string[] = [];
if (!fs.existsSync(ipcBaseDir)) return moved;
let groupFolders: string[];
try {
groupFolders = fs.readdirSync(ipcBaseDir);
} catch (err) {
logger.warn({ err, ipcBaseDir }, 'Failed to scan IPC directory');
return moved;
}
const quarantineDir = path.join(ipcBaseDir, 'quarantine');
for (const groupFolder of groupFolders) {
if (groupFolder === 'quarantine' || groupFolder === 'errors') continue;
const inputDir = path.join(ipcBaseDir, groupFolder, 'input');
let stat: ReturnType<typeof fs.statSync>;
try {
stat = fs.statSync(inputDir);
} catch {
continue;
}
if (!stat.isDirectory()) continue;
let files: string[];
try {
files = fs.readdirSync(inputDir).filter(
(file) => file === '_close' || file.endsWith('.json'),
);
} catch (err) {
logger.warn({ err, inputDir }, 'Failed to scan IPC input directory');
continue;
}
for (const file of files) {
const source = path.join(inputDir, file);
const dest = path.join(
quarantineDir,
`${Date.now()}-${safeQuarantineName(groupFolder)}-${safeQuarantineName(file)}`,
);
try {
fs.mkdirSync(quarantineDir, { recursive: true });
fs.renameSync(source, dest);
moved.push(dest);
} catch (err) {
logger.warn({ err, source, dest }, 'Failed to quarantine stranded IPC input');
}
}
}
if (moved.length > 0) {
logger.warn(
{ count: moved.length, files: moved },
'Quarantined stranded IPC input files from previous runs',
);
}
return moved;
}
interface GroupState {
active: boolean;
idleWaiting: boolean;
isTaskJail: boolean;
acceptsFollowupIpc: boolean;
pendingMessages: boolean;
pendingTasks: QueuedTask[];
process: ChildProcess | null;
@ -28,6 +95,10 @@ interface GroupState {
retryTimer: NodeJS.Timeout | null;
}
export interface RegisterProcessOptions {
acceptsFollowupIpc?: boolean;
}
export class GroupQueue {
private groups = new Map<string, GroupState>();
private activeCount = 0;
@ -45,6 +116,7 @@ export class GroupQueue {
active: false,
idleWaiting: false,
isTaskJail: false,
acceptsFollowupIpc: false,
pendingMessages: false,
pendingTasks: [],
process: null,
@ -165,11 +237,13 @@ export class GroupQueue {
proc: ChildProcess,
jailRunId: string,
groupFolder?: string,
options: RegisterProcessOptions = {},
): void {
const state = this.getGroup(groupJid);
state.process = proc;
state.jailRunId = jailRunId;
state.active = true;
state.acceptsFollowupIpc = options.acceptsFollowupIpc === true;
if (groupFolder) state.groupFolder = groupFolder;
}
@ -186,17 +260,39 @@ export class GroupQueue {
}
/**
* Send a follow-up message to the active jail via IPC file.
* Returns true if the message was written, false if no active jail.
* Send a follow-up message to an active IPC-capable runner.
* Returns true only for a transport write; callers must not treat this as a
* processing acknowledgement unless the runner also provides one.
*/
async sendMessage(groupJid: string, text: string): Promise<boolean> {
const state = this.getGroup(groupJid);
if (!state.active || !state.groupFolder || state.isTaskJail) return false;
state.idleWaiting = false; // Agent is about to receive work, no longer idle
if (
!state.active ||
!state.groupFolder ||
state.isTaskJail ||
!state.acceptsFollowupIpc
) {
return false;
}
const inputDir = path.join(DATA_DIR, 'ipc', state.groupFolder, 'input');
const jailRunId = state.jailRunId;
const groupFolder = state.groupFolder;
const inputDir = path.join(DATA_DIR, 'ipc', groupFolder, 'input');
try {
const enrichedText = await enrichPromptWithBuiltinKnowledge(text);
const current = this.getGroup(groupJid);
if (
!current.active ||
current.jailRunId !== jailRunId ||
current.groupFolder !== groupFolder ||
current.isTaskJail ||
!current.acceptsFollowupIpc
) {
return false;
}
current.idleWaiting = false; // Agent is about to receive work, no longer idle
fs.mkdirSync(inputDir, { recursive: true });
const filename = `${Date.now()}-${Math.random().toString(36).slice(2, 6)}.json`;
const filepath = path.join(inputDir, filename);
@ -217,7 +313,7 @@ export class GroupQueue {
*/
closeStdin(groupJid: string): void {
const state = this.getGroup(groupJid);
if (!state.active || !state.groupFolder) return;
if (!state.active || !state.groupFolder || !state.acceptsFollowupIpc) return;
const inputDir = path.join(DATA_DIR, 'ipc', state.groupFolder, 'input');
try {
@ -283,6 +379,7 @@ export class GroupQueue {
state.process = null;
state.jailRunId = null;
state.groupFolder = null;
state.acceptsFollowupIpc = false;
this.activeCount--;
this.drainGroup(groupJid);
}
@ -310,6 +407,7 @@ export class GroupQueue {
state.process = null;
state.jailRunId = null;
state.groupFolder = null;
state.acceptsFollowupIpc = false;
this.activeCount--;
this.drainGroup(groupJid);
}

View file

@ -100,7 +100,7 @@ import {
storeMessage,
getPool as getOpsPool,
} from './db.js';
import { GroupQueue } from './group-queue.js';
import { GroupQueue, quarantineStrandedIpcInputs } from './group-queue.js';
import { resolveGroupFolderPath } from './group-folder.js';
import { decideRootAdminBootstrapGroup } from './root-admin-bootstrap.js';
import { augmentPromptWithVision } from './vision.js';
@ -1294,6 +1294,7 @@ async function main(): Promise<void> {
markDbInitialized();
logger.info('Database initialized');
await loadState();
quarantineStrandedIpcInputs();
const rootAdminBootstrap = decideRootAdminBootstrapGroup({
tenantId: TENANT_ID.trim(),
assistantName: ASSISTANT_NAME,