Prevent queued Telegram follow-up loss
--- Build: pass | Tests: pass — 2269 passed (674 files)
This commit is contained in:
parent
cbc8e31d04
commit
b6d72d9353
3 changed files with 216 additions and 7 deletions
|
|
@ -43,6 +43,7 @@ describe('GroupQueue', () => {
|
|||
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
vi.clearAllMocks();
|
||||
queue = new GroupQueue();
|
||||
});
|
||||
|
||||
|
|
@ -341,6 +342,7 @@ describe('GroupQueue', () => {
|
|||
{} as any,
|
||||
'container-1',
|
||||
'test-group',
|
||||
{ acceptsFollowupIpc: true },
|
||||
);
|
||||
queue.notifyIdle('tg:100000001');
|
||||
|
||||
|
|
@ -380,6 +382,7 @@ describe('GroupQueue', () => {
|
|||
{} as any,
|
||||
'container-1',
|
||||
'test-group',
|
||||
{ acceptsFollowupIpc: true },
|
||||
);
|
||||
|
||||
// Container becomes idle
|
||||
|
|
@ -444,6 +447,7 @@ describe('GroupQueue', () => {
|
|||
{} as any,
|
||||
'container-1',
|
||||
'test-group',
|
||||
{ acceptsFollowupIpc: true },
|
||||
);
|
||||
|
||||
const result = await queue.sendMessage('tg:100000001', 'hello');
|
||||
|
|
@ -482,6 +486,7 @@ describe('GroupQueue', () => {
|
|||
{} as any,
|
||||
'container-1',
|
||||
'test-group',
|
||||
{ acceptsFollowupIpc: true },
|
||||
);
|
||||
|
||||
const writeFileSync = vi.mocked(fs.default.writeFileSync);
|
||||
|
|
@ -507,4 +512,109 @@ describe('GroupQueue', () => {
|
|||
resolveProcess!();
|
||||
await vi.advanceTimersByTimeAsync(10);
|
||||
});
|
||||
|
||||
it('queues follow-up messages for active one-shot runners instead of writing IPC', async () => {
|
||||
const fs = await import('fs');
|
||||
let resolveProcess: () => void;
|
||||
|
||||
const processMessages = vi.fn(async () => {
|
||||
await new Promise<void>((resolve) => {
|
||||
resolveProcess = resolve;
|
||||
});
|
||||
return true;
|
||||
});
|
||||
|
||||
queue.setProcessMessagesFn(processMessages);
|
||||
queue.enqueueMessageCheck('tg:100000001');
|
||||
await vi.advanceTimersByTimeAsync(10);
|
||||
|
||||
queue.registerProcess(
|
||||
'tg:100000001',
|
||||
{} as any,
|
||||
'container-1',
|
||||
'test-group',
|
||||
);
|
||||
|
||||
const result = await queue.sendMessage('tg:100000001', 'follow-up');
|
||||
expect(result).toBe(false);
|
||||
|
||||
const writeFileSync = vi.mocked(fs.default.writeFileSync);
|
||||
const payloadWrites = writeFileSync.mock.calls.filter(
|
||||
(call) => typeof call[0] === 'string' && String(call[0]).endsWith('.tmp'),
|
||||
);
|
||||
expect(payloadWrites).toHaveLength(0);
|
||||
|
||||
queue.enqueueMessageCheck('tg:100000001');
|
||||
expect(processMessages).toHaveBeenCalledTimes(1);
|
||||
|
||||
resolveProcess!();
|
||||
await vi.advanceTimersByTimeAsync(10);
|
||||
expect(processMessages).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('does not write close sentinels for one-shot runners', async () => {
|
||||
const fs = await import('fs');
|
||||
|
||||
queue.registerProcess(
|
||||
'tg:100000001',
|
||||
{} as any,
|
||||
'container-1',
|
||||
'test-group',
|
||||
);
|
||||
|
||||
queue.closeStdin('tg:100000001');
|
||||
|
||||
const writeFileSync = vi.mocked(fs.default.writeFileSync);
|
||||
const closeWrites = writeFileSync.mock.calls.filter(
|
||||
(call) => typeof call[0] === 'string' && call[0].endsWith('_close'),
|
||||
);
|
||||
expect(closeWrites).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('returns false if an IPC-capable process exits while follow-up enrichment is running', async () => {
|
||||
const fs = await import('fs');
|
||||
const skillsPg = await import('./skills-pg.js');
|
||||
let resolveProcess: () => void;
|
||||
let resolveEnrich: () => void;
|
||||
|
||||
const processMessages = vi.fn(async () => {
|
||||
await new Promise<void>((resolve) => {
|
||||
resolveProcess = resolve;
|
||||
});
|
||||
return true;
|
||||
});
|
||||
vi.mocked(skillsPg.enrichPromptWithBuiltinKnowledge).mockImplementationOnce(
|
||||
async (text: string) => {
|
||||
await new Promise<void>((resolve) => {
|
||||
resolveEnrich = resolve;
|
||||
});
|
||||
return `[skills]\n${text}`;
|
||||
},
|
||||
);
|
||||
|
||||
queue.setProcessMessagesFn(processMessages);
|
||||
queue.enqueueMessageCheck('tg:100000001');
|
||||
await vi.advanceTimersByTimeAsync(10);
|
||||
queue.registerProcess(
|
||||
'tg:100000001',
|
||||
{} as any,
|
||||
'container-1',
|
||||
'test-group',
|
||||
{ acceptsFollowupIpc: true },
|
||||
);
|
||||
|
||||
const sendPromise = queue.sendMessage('tg:100000001', 'follow-up');
|
||||
await Promise.resolve();
|
||||
|
||||
resolveProcess!();
|
||||
await vi.advanceTimersByTimeAsync(10);
|
||||
resolveEnrich!();
|
||||
|
||||
await expect(sendPromise).resolves.toBe(false);
|
||||
const writeFileSync = vi.mocked(fs.default.writeFileSync);
|
||||
const payloadWrites = writeFileSync.mock.calls.filter(
|
||||
(call) => typeof call[0] === 'string' && String(call[0]).endsWith('.tmp'),
|
||||
);
|
||||
expect(payloadWrites).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -15,10 +15,77 @@ interface QueuedTask {
|
|||
const MAX_RETRIES = 5;
|
||||
const BASE_RETRY_MS = 5000;
|
||||
|
||||
function safeQuarantineName(input: string): string {
|
||||
return input.replace(/[^a-zA-Z0-9_.-]/g, '-');
|
||||
}
|
||||
|
||||
export function quarantineStrandedIpcInputs(dataDir = DATA_DIR): string[] {
|
||||
const ipcBaseDir = path.join(dataDir, 'ipc');
|
||||
const moved: string[] = [];
|
||||
|
||||
if (!fs.existsSync(ipcBaseDir)) return moved;
|
||||
|
||||
let groupFolders: string[];
|
||||
try {
|
||||
groupFolders = fs.readdirSync(ipcBaseDir);
|
||||
} catch (err) {
|
||||
logger.warn({ err, ipcBaseDir }, 'Failed to scan IPC directory');
|
||||
return moved;
|
||||
}
|
||||
|
||||
const quarantineDir = path.join(ipcBaseDir, 'quarantine');
|
||||
for (const groupFolder of groupFolders) {
|
||||
if (groupFolder === 'quarantine' || groupFolder === 'errors') continue;
|
||||
|
||||
const inputDir = path.join(ipcBaseDir, groupFolder, 'input');
|
||||
let stat: ReturnType<typeof fs.statSync>;
|
||||
try {
|
||||
stat = fs.statSync(inputDir);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
if (!stat.isDirectory()) continue;
|
||||
|
||||
let files: string[];
|
||||
try {
|
||||
files = fs.readdirSync(inputDir).filter(
|
||||
(file) => file === '_close' || file.endsWith('.json'),
|
||||
);
|
||||
} catch (err) {
|
||||
logger.warn({ err, inputDir }, 'Failed to scan IPC input directory');
|
||||
continue;
|
||||
}
|
||||
|
||||
for (const file of files) {
|
||||
const source = path.join(inputDir, file);
|
||||
const dest = path.join(
|
||||
quarantineDir,
|
||||
`${Date.now()}-${safeQuarantineName(groupFolder)}-${safeQuarantineName(file)}`,
|
||||
);
|
||||
try {
|
||||
fs.mkdirSync(quarantineDir, { recursive: true });
|
||||
fs.renameSync(source, dest);
|
||||
moved.push(dest);
|
||||
} catch (err) {
|
||||
logger.warn({ err, source, dest }, 'Failed to quarantine stranded IPC input');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (moved.length > 0) {
|
||||
logger.warn(
|
||||
{ count: moved.length, files: moved },
|
||||
'Quarantined stranded IPC input files from previous runs',
|
||||
);
|
||||
}
|
||||
return moved;
|
||||
}
|
||||
|
||||
interface GroupState {
|
||||
active: boolean;
|
||||
idleWaiting: boolean;
|
||||
isTaskJail: boolean;
|
||||
acceptsFollowupIpc: boolean;
|
||||
pendingMessages: boolean;
|
||||
pendingTasks: QueuedTask[];
|
||||
process: ChildProcess | null;
|
||||
|
|
@ -28,6 +95,10 @@ interface GroupState {
|
|||
retryTimer: NodeJS.Timeout | null;
|
||||
}
|
||||
|
||||
export interface RegisterProcessOptions {
|
||||
acceptsFollowupIpc?: boolean;
|
||||
}
|
||||
|
||||
export class GroupQueue {
|
||||
private groups = new Map<string, GroupState>();
|
||||
private activeCount = 0;
|
||||
|
|
@ -45,6 +116,7 @@ export class GroupQueue {
|
|||
active: false,
|
||||
idleWaiting: false,
|
||||
isTaskJail: false,
|
||||
acceptsFollowupIpc: false,
|
||||
pendingMessages: false,
|
||||
pendingTasks: [],
|
||||
process: null,
|
||||
|
|
@ -165,11 +237,13 @@ export class GroupQueue {
|
|||
proc: ChildProcess,
|
||||
jailRunId: string,
|
||||
groupFolder?: string,
|
||||
options: RegisterProcessOptions = {},
|
||||
): void {
|
||||
const state = this.getGroup(groupJid);
|
||||
state.process = proc;
|
||||
state.jailRunId = jailRunId;
|
||||
state.active = true;
|
||||
state.acceptsFollowupIpc = options.acceptsFollowupIpc === true;
|
||||
if (groupFolder) state.groupFolder = groupFolder;
|
||||
}
|
||||
|
||||
|
|
@ -186,17 +260,39 @@ export class GroupQueue {
|
|||
}
|
||||
|
||||
/**
|
||||
* Send a follow-up message to the active jail via IPC file.
|
||||
* Returns true if the message was written, false if no active jail.
|
||||
* Send a follow-up message to an active IPC-capable runner.
|
||||
* Returns true only for a transport write; callers must not treat this as a
|
||||
* processing acknowledgement unless the runner also provides one.
|
||||
*/
|
||||
async sendMessage(groupJid: string, text: string): Promise<boolean> {
|
||||
const state = this.getGroup(groupJid);
|
||||
if (!state.active || !state.groupFolder || state.isTaskJail) return false;
|
||||
state.idleWaiting = false; // Agent is about to receive work, no longer idle
|
||||
if (
|
||||
!state.active ||
|
||||
!state.groupFolder ||
|
||||
state.isTaskJail ||
|
||||
!state.acceptsFollowupIpc
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const inputDir = path.join(DATA_DIR, 'ipc', state.groupFolder, 'input');
|
||||
const jailRunId = state.jailRunId;
|
||||
const groupFolder = state.groupFolder;
|
||||
const inputDir = path.join(DATA_DIR, 'ipc', groupFolder, 'input');
|
||||
try {
|
||||
const enrichedText = await enrichPromptWithBuiltinKnowledge(text);
|
||||
|
||||
const current = this.getGroup(groupJid);
|
||||
if (
|
||||
!current.active ||
|
||||
current.jailRunId !== jailRunId ||
|
||||
current.groupFolder !== groupFolder ||
|
||||
current.isTaskJail ||
|
||||
!current.acceptsFollowupIpc
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
|
||||
current.idleWaiting = false; // Agent is about to receive work, no longer idle
|
||||
fs.mkdirSync(inputDir, { recursive: true });
|
||||
const filename = `${Date.now()}-${Math.random().toString(36).slice(2, 6)}.json`;
|
||||
const filepath = path.join(inputDir, filename);
|
||||
|
|
@ -217,7 +313,7 @@ export class GroupQueue {
|
|||
*/
|
||||
closeStdin(groupJid: string): void {
|
||||
const state = this.getGroup(groupJid);
|
||||
if (!state.active || !state.groupFolder) return;
|
||||
if (!state.active || !state.groupFolder || !state.acceptsFollowupIpc) return;
|
||||
|
||||
const inputDir = path.join(DATA_DIR, 'ipc', state.groupFolder, 'input');
|
||||
try {
|
||||
|
|
@ -283,6 +379,7 @@ export class GroupQueue {
|
|||
state.process = null;
|
||||
state.jailRunId = null;
|
||||
state.groupFolder = null;
|
||||
state.acceptsFollowupIpc = false;
|
||||
this.activeCount--;
|
||||
this.drainGroup(groupJid);
|
||||
}
|
||||
|
|
@ -310,6 +407,7 @@ export class GroupQueue {
|
|||
state.process = null;
|
||||
state.jailRunId = null;
|
||||
state.groupFolder = null;
|
||||
state.acceptsFollowupIpc = false;
|
||||
this.activeCount--;
|
||||
this.drainGroup(groupJid);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -100,7 +100,7 @@ import {
|
|||
storeMessage,
|
||||
getPool as getOpsPool,
|
||||
} from './db.js';
|
||||
import { GroupQueue } from './group-queue.js';
|
||||
import { GroupQueue, quarantineStrandedIpcInputs } from './group-queue.js';
|
||||
import { resolveGroupFolderPath } from './group-folder.js';
|
||||
import { decideRootAdminBootstrapGroup } from './root-admin-bootstrap.js';
|
||||
import { augmentPromptWithVision } from './vision.js';
|
||||
|
|
@ -1294,6 +1294,7 @@ async function main(): Promise<void> {
|
|||
markDbInitialized();
|
||||
logger.info('Database initialized');
|
||||
await loadState();
|
||||
quarantineStrandedIpcInputs();
|
||||
const rootAdminBootstrap = decideRootAdminBootstrapGroup({
|
||||
tenantId: TENANT_ID.trim(),
|
||||
assistantName: ASSISTANT_NAME,
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue