From 76cbaca2fb200620169ec2f2f826a55cabf9e7e3 Mon Sep 17 00:00:00 2001 From: centra Date: Tue, 14 Apr 2026 20:44:06 +0900 Subject: [PATCH] v3 --- src/codex-runner.ts | 111 +++++++++++--- src/orchestrator.ts | 128 ++++++++++++++-- src/output-schema.ts | 204 ++++++++++++++++++++++++ src/schema-catalog.ts | 12 +- src/store.ts | 255 +++++++++++++++++++++--------- src/types.ts | 38 ++++- test/codex-runner.test.ts | 315 ++++++++++++++++++++++++++++++++++++++ test/orchestrator.test.ts | 96 +++++++++++- test/store.test.ts | 47 ++++++ 9 files changed, 1089 insertions(+), 117 deletions(-) create mode 100644 src/output-schema.ts create mode 100644 test/codex-runner.test.ts diff --git a/src/codex-runner.ts b/src/codex-runner.ts index 1725ee5..3b24bcc 100644 --- a/src/codex-runner.ts +++ b/src/codex-runner.ts @@ -1,53 +1,93 @@ import { Ajv } from "ajv"; import { Codex, type ThreadEvent, type ThreadOptions } from "@openai/codex-sdk"; +import { + buildSchemaFallbackPrompt, + buildValidationRetryPrompt, + isInvalidOutputSchemaError, + prepareOutputSchema, +} from "./output-schema.js"; import { progressEventsFromThreadEvent } from "./progress.js"; import type { AgentInvocation, AgentInvocationResult, RawAgentRunner } from "./types.js"; -function extractValidationError(message: string, errorText: string): string { - return [ - "The previous response did not parse or validate.", - `Problem: ${errorText}`, - "Respond again with JSON only, matching the schema exactly.", - "Previous response:", - message, - ].join("\n\n"); +interface CodexThread { + readonly id: string | null; + runStreamed(input: string, turnOptions?: { outputSchema?: unknown }): Promise<{ events: AsyncGenerator }>; +} + +interface CodexClient { + startThread(threadOptions?: ThreadOptions): CodexThread; + resumeThread(threadId: string, threadOptions?: ThreadOptions): CodexThread; +} + +class InvocationStreamError extends Error { + constructor( + message: string, + readonly sessionId: string | null, + ) { + super(message); + } } export class CodexSdkRunner implements RawAgentRunner { private readonly ajv = new Ajv({ allErrors: true, strict: false }); - private readonly codex: Codex; + private readonly codex: CodexClient; - constructor(codexCommand: string) { - this.codex = new Codex({ + constructor(codexCommand: string, codex?: CodexClient) { + this.codex = codex ?? new Codex({ codexPathOverride: codexCommand, }); } async invoke(request: AgentInvocation): Promise> { - let prompt = request.prompt; + const validate = this.ajv.compile(request.schema); + const preparedSchema = prepareOutputSchema(request.schema); + let prompt = preparedSchema.outputSchema + ? request.prompt + : buildSchemaFallbackPrompt(request.prompt, request.schema, preparedSchema.fallbackReason ?? undefined); let sessionId = request.sessionId; const maxAttempts = request.maxValidationRetries ?? 2; let lastFailure = ""; + let outputSchema: Record | undefined = preparedSchema.outputSchema ?? undefined; + let schemaEmbeddedInPrompt = outputSchema === undefined; - for (let attempt = 1; attempt <= maxAttempts; attempt += 1) { - const raw = await this.invokeOnce({ - ...request, - prompt, - sessionId, - }); + for (let attempt = 1; attempt <= maxAttempts;) { + let raw: Omit, "output">; + try { + raw = await this.invokeOnce( + { + ...request, + prompt, + sessionId, + }, + outputSchema, + ); + } catch (error) { + const errorText = error instanceof Error ? error.message : String(error); + lastFailure = errorText; + if (outputSchema && isInvalidOutputSchemaError(errorText)) { + if (error instanceof InvocationStreamError) { + sessionId = error.sessionId; + } + outputSchema = undefined; + schemaEmbeddedInPrompt = true; + prompt = buildSchemaFallbackPrompt(request.prompt, request.schema, errorText); + continue; + } + throw error; + } sessionId = raw.sessionId; try { const parsed = JSON.parse(raw.rawMessage) as unknown; - const validate = this.ajv.compile(request.schema); if (!validate(parsed)) { const errorText = this.ajv.errorsText(validate.errors); lastFailure = errorText; if (attempt === maxAttempts) { throw new Error(`Schema validation failed: ${errorText}`); } - prompt = extractValidationError(raw.rawMessage, errorText); + prompt = buildValidationRetryPrompt(raw.rawMessage, errorText, request.schema, schemaEmbeddedInPrompt); + attempt += 1; continue; } @@ -61,14 +101,18 @@ export class CodexSdkRunner implements RawAgentRunner { if (attempt === maxAttempts) { throw new Error(`Structured output failed after ${attempt} attempt(s): ${lastFailure}`); } - prompt = extractValidationError(raw.rawMessage, errorText); + prompt = buildValidationRetryPrompt(raw.rawMessage, errorText, request.schema, schemaEmbeddedInPrompt); + attempt += 1; } } throw new Error(`Structured output failed: ${lastFailure}`); } - private async invokeOnce(request: AgentInvocation): Promise, "output">> { + private async invokeOnce( + request: AgentInvocation, + outputSchema?: Record, + ): Promise, "output">> { const threadOptions: ThreadOptions = { sandboxMode: request.roleConfig.sandbox, workingDirectory: request.cwd, @@ -88,27 +132,39 @@ export class CodexSdkRunner implements RawAgentRunner { : this.codex.startThread(threadOptions); const streamed = await thread.runStreamed(request.prompt, { - outputSchema: request.schema, + outputSchema, }); const rawEvents: string[] = []; let sessionId = request.sessionId; let rawMessage = ""; + let lastStreamError: string | null = null; for await (const event of streamed.events) { rawEvents.push(JSON.stringify(event)); sessionId = this.extractSessionId(sessionId, event); rawMessage = this.extractLatestMessage(rawMessage, event); + if (event.type === "error") { + lastStreamError = event.message; + } if (request.onProgress) { const progressEvents = progressEventsFromThreadEvent(request.runId, request.role, event); for (const progressEvent of progressEvents) { await request.onProgress(progressEvent); } } - this.throwIfFailed(event); + try { + this.throwIfFailed(event); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + throw new InvocationStreamError(message, sessionId ?? thread.id); + } } if (!rawMessage.trim()) { + if (lastStreamError) { + throw new Error(lastStreamError); + } throw new Error("Codex SDK completed without a final agent message."); } @@ -137,10 +193,17 @@ export class CodexSdkRunner implements RawAgentRunner { private throwIfFailed(event: ThreadEvent): void { if (event.type === "error") { + if (this.isTransientStreamError(event.message)) { + return; + } throw new Error(event.message); } if (event.type === "turn.failed") { throw new Error(event.error.message); } } + + private isTransientStreamError(message: string): boolean { + return message.startsWith("Reconnecting..."); + } } diff --git a/src/orchestrator.ts b/src/orchestrator.ts index e1bca27..9582f19 100644 --- a/src/orchestrator.ts +++ b/src/orchestrator.ts @@ -8,7 +8,9 @@ import type { ProgressSink, RawAgentRunner, ResolvedConfig, + RunCheckpointPayload, RunRecord, + RunSnapshot, StrategyPlanOutput, StrategySelfCheckOutput, TaskDraft, @@ -28,7 +30,8 @@ import { import { checkVerdictSchema, implementationResultSchema, strategyPlanSchema, strategySelfCheckSchema } from "./schema-catalog.js"; import { RunStore } from "./store.js"; -const HEARTBEAT_INTERVAL_MS = 30_000; +const HEARTBEAT_INTERVAL_MS = 5 * 60_000; +const ACTIVITY_WRITE_THROTTLE_MS = 1_000; function nowIso(): string { return new Date().toISOString(); @@ -477,16 +480,52 @@ export class AgentROrchestrator { let activeItemKind: string | null = null; let activeCommandSummary: string | null = null; - this.store.updateRun(options.runId, { - lastEventAt: nowIso(), - activeRole: options.role, - activeItemKind: null, - activeCommandSummary: null, - }); + let lastProgressAt = Date.now(); + let lastActivityPersistAt = 0; + let persistedActiveRole: AgentRole | null = null; + let persistedActiveItemKind: string | null = null; + let persistedActiveCommandSummary: string | null = null; + this.persistRunActivity( + options.runId, + { + lastEventAt: nowIso(), + activeRole: options.role, + activeItemKind: null, + activeCommandSummary: null, + }, + ); + lastActivityPersistAt = Date.now(); + persistedActiveRole = options.role; + persistedActiveItemKind = null; + persistedActiveCommandSummary = null; + + const maybePersistActivity = (updates: { + lastEventAt: string; + activeRole: AgentRole | null; + activeItemKind: string | null; + activeCommandSummary: string | null; + }): void => { + const changed = + updates.activeRole !== persistedActiveRole || + updates.activeItemKind !== persistedActiveItemKind || + updates.activeCommandSummary !== persistedActiveCommandSummary; + const now = Date.now(); + if (!changed && now - lastActivityPersistAt < ACTIVITY_WRITE_THROTTLE_MS) { + return; + } + this.persistRunActivity(options.runId, updates); + lastActivityPersistAt = now; + persistedActiveRole = updates.activeRole; + persistedActiveItemKind = updates.activeItemKind; + persistedActiveCommandSummary = updates.activeCommandSummary; + }; const heartbeatTimer = setInterval(() => { + if (Date.now() - lastProgressAt < HEARTBEAT_INTERVAL_MS) { + return; + } const heartbeat = createHeartbeatProgressEvent(options.runId, options.role, activeItemKind, activeCommandSummary); - this.store.updateRun(options.runId, { + maybePersistActivity({ lastEventAt: heartbeat.ts, activeRole: options.role, activeItemKind, @@ -509,6 +548,7 @@ export class AgentROrchestrator { artifacts, maxValidationRetries: 2, onProgress: async (event) => { + lastProgressAt = Date.now(); if (event.activeItemKind !== undefined) { activeItemKind = event.activeItemKind; } @@ -520,13 +560,13 @@ export class AgentROrchestrator { event.kind === "turn.failed" || event.kind === "stream.error" || event.kind === "agent_message.completed"; - this.store.updateRun(options.runId, { + maybePersistActivity({ lastEventAt: event.ts, activeRole: clearActivity ? null : options.role, activeItemKind: clearActivity ? null : activeItemKind, activeCommandSummary: clearActivity ? null : activeCommandSummary, }); - this.recordProgressEvent(event, true); + this.recordProgressEvent(event, false); if (clearActivity) { activeItemKind = null; activeCommandSummary = null; @@ -557,7 +597,7 @@ export class AgentROrchestrator { updatedAt: nowIso(), }; this.store.saveAgentState(nextState); - this.store.updateRun(options.runId, { + maybePersistActivity({ lastEventAt: nowIso(), activeRole: null, activeItemKind: null, @@ -576,7 +616,7 @@ export class AgentROrchestrator { private checkpoint(runId: string): void { const snapshot = this.store.buildSnapshot(runId); - this.store.addCheckpoint(runId, snapshot.run.status, snapshot); + this.store.addCheckpoint(runId, snapshot.run.status, this.buildCheckpointPayload(snapshot)); } private emitEvent( @@ -605,4 +645,68 @@ export class AgentROrchestrator { } this.progressSink?.(event); } + + private persistRunActivity( + runId: string, + updates: { + lastEventAt: string; + activeRole: AgentRole | null; + activeItemKind: string | null; + activeCommandSummary: string | null; + }, + ): void { + this.store.updateRunActivity(runId, updates); + } + + private buildCheckpointPayload(snapshot: RunSnapshot): RunCheckpointPayload { + const currentTask = snapshot.run.currentTaskId + ? [...snapshot.inProgressTasks, ...snapshot.pendingTasks, ...snapshot.completedTasks, ...snapshot.blockedTasks].find( + (task) => task.id === snapshot.run.currentTaskId, + ) ?? null + : null; + const latestApproval = snapshot.approvals.at(-1) ?? null; + return { + run: snapshot.run, + counts: { + pending: snapshot.pendingTasks.length, + inProgress: snapshot.inProgressTasks.length, + completed: snapshot.completedTasks.length, + blocked: snapshot.blockedTasks.length, + approvals: snapshot.approvals.length, + artifacts: snapshot.artifacts.length, + }, + currentTask: currentTask + ? { + id: currentTask.id, + title: currentTask.title, + status: currentTask.status, + attemptCount: currentTask.attemptCount, + runtimeClass: currentTask.runtimeClass, + } + : null, + waitingInputs: [...snapshot.run.waitingInputs], + latestApproval: latestApproval + ? { + source: latestApproval.source, + verdict: latestApproval.verdict, + rationale: latestApproval.rationale, + createdAt: latestApproval.createdAt, + } + : null, + recentAttempts: snapshot.recentAttempts.map((attempt) => ({ + taskId: attempt.taskId, + attemptNumber: attempt.attemptNumber, + status: attempt.status, + summary: attempt.summary, + blockerSignature: attempt.blockerSignature, + createdAt: attempt.createdAt, + })), + recentEvents: snapshot.recentEvents.slice(-12).map((event) => ({ + ts: event.ts, + source: event.source, + kind: event.kind, + message: event.message, + })), + }; + } } diff --git a/src/output-schema.ts b/src/output-schema.ts new file mode 100644 index 0000000..a2b371a --- /dev/null +++ b/src/output-schema.ts @@ -0,0 +1,204 @@ +function isPlainObject(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +function hasObjectType(schema: Record): boolean { + const { type } = schema; + if (type === "object") { + return true; + } + return Array.isArray(type) && type.includes("object"); +} + +function hasArrayType(schema: Record): boolean { + const { type } = schema; + if (type === "array") { + return true; + } + return Array.isArray(type) && type.includes("array"); +} + +type NormalizeResult = + | { + ok: true; + schema: unknown; + } + | { + ok: false; + reason: string; + }; + +const UNSUPPORTED_OBJECT_KEYWORDS = [ + "patternProperties", + "propertyNames", + "dependentSchemas", + "unevaluatedProperties", + "allOf", + "anyOf", + "oneOf", + "if", + "then", + "else", + "not", +]; + +function normalizeSchemaNode(node: unknown, path: string): NormalizeResult { + if (!isPlainObject(node)) { + return { ok: true, schema: node }; + } + + for (const keyword of UNSUPPORTED_OBJECT_KEYWORDS) { + if (keyword in node) { + return { + ok: false, + reason: `Structured outputs do not safely support '${keyword}' at ${path}.`, + }; + } + } + + const normalized: Record = { ...node }; + + if (hasObjectType(node) || "properties" in node || "additionalProperties" in node) { + const propertiesValue = node.properties; + if (propertiesValue !== undefined && !isPlainObject(propertiesValue)) { + return { + ok: false, + reason: `Object schema at ${path} must declare properties as an object.`, + }; + } + + const properties = isPlainObject(propertiesValue) ? propertiesValue : {}; + const normalizedProperties: Record = {}; + for (const [key, value] of Object.entries(properties)) { + const child = normalizeSchemaNode(value, `${path}.properties.${key}`); + if (!child.ok) { + return child; + } + normalizedProperties[key] = child.schema; + } + + const additionalProperties = node.additionalProperties; + if (additionalProperties !== false) { + return { + ok: false, + reason: `Structured outputs require additionalProperties: false at ${path}.`, + }; + } + + const propertyKeys = Object.keys(normalizedProperties); + const required = node.required; + if ( + !Array.isArray(required) || + required.length !== propertyKeys.length || + required.some((value) => typeof value !== "string") || + propertyKeys.some((key) => !required.includes(key)) || + required.some((key) => !propertyKeys.includes(key)) + ) { + return { + ok: false, + reason: `Structured outputs require 'required' to match every property at ${path}.`, + }; + } + + normalized.properties = normalizedProperties; + normalized.required = required; + normalized.additionalProperties = additionalProperties; + } + + if (hasArrayType(node) || "items" in node) { + if (!("items" in node)) { + return { + ok: false, + reason: `Array schema at ${path} is missing 'items'.`, + }; + } + if (Array.isArray(node.items)) { + return { + ok: false, + reason: `Structured outputs do not safely support tuple-style 'items' at ${path}.`, + }; + } + const normalizedItems = normalizeSchemaNode(node.items, `${path}.items`); + if (!normalizedItems.ok) { + return normalizedItems; + } + normalized.items = normalizedItems.schema; + } + + return { + ok: true, + schema: normalized, + }; +} + +export function prepareOutputSchema(schema: Record): { + outputSchema: Record | null; + fallbackReason: string | null; +} { + const normalized = normalizeSchemaNode(schema, "$"); + if (!normalized.ok) { + return { + outputSchema: null, + fallbackReason: normalized.reason, + }; + } + + if (!isPlainObject(normalized.schema)) { + return { + outputSchema: null, + fallbackReason: "Structured outputs require a plain JSON object schema.", + }; + } + + return { + outputSchema: normalized.schema, + fallbackReason: null, + }; +} + +export function buildSchemaFallbackPrompt( + prompt: string, + schema: Record, + reason?: string, +): string { + return [ + prompt, + "Structured output enforcement is unavailable for this turn.", + reason ? `Reason: ${reason}` : null, + "Return a single JSON object only. Do not include markdown fences or any commentary.", + "Match this JSON schema exactly:", + JSON.stringify(schema, null, 2), + ] + .filter((value): value is string => Boolean(value)) + .join("\n\n"); +} + +export function buildValidationRetryPrompt( + message: string, + errorText: string, + schema: Record, + schemaEmbeddedInPrompt: boolean, +): string { + return [ + "The previous response did not parse or validate.", + `Problem: ${errorText}`, + schemaEmbeddedInPrompt + ? [ + "Respond again with JSON only.", + "Match this JSON schema exactly:", + JSON.stringify(schema, null, 2), + ].join("\n\n") + : "Respond again with JSON only, matching the schema exactly.", + "Previous response:", + message, + ].join("\n\n"); +} + +export function isInvalidOutputSchemaError(message: string): boolean { + const lowered = message.toLowerCase(); + return ( + lowered.includes("invalid_json_schema") || + lowered.includes("invalid schema for response_format") || + lowered.includes("text.format.schema") + ); +} diff --git a/src/schema-catalog.ts b/src/schema-catalog.ts index c598d67..c22ab75 100644 --- a/src/schema-catalog.ts +++ b/src/schema-catalog.ts @@ -141,9 +141,15 @@ export const implementationResultSchema = { items: { type: "string", minLength: 1 }, }, metrics: { - type: "object", - additionalProperties: { - type: ["string", "number", "boolean", "null"], + type: "array", + items: { + type: "object", + additionalProperties: false, + required: ["key", "value"], + properties: { + key: { type: "string", minLength: 1 }, + value: { type: ["string", "number", "boolean", "null"] }, + }, }, }, }, diff --git a/src/store.ts b/src/store.ts index 6941636..b2539aa 100644 --- a/src/store.ts +++ b/src/store.ts @@ -41,6 +41,13 @@ function stringOrNull(value: unknown): string | null { return value === null || value === undefined || value === "" ? null : String(value); } +function isStorageFullError(error: unknown): boolean { + if (!(error instanceof Error)) { + return false; + } + return /database or disk is full|SQLITE_FULL/i.test(error.message); +} + function mapTaskRow(row: Record): TaskRecord { return { id: String(row.id), @@ -114,6 +121,10 @@ export class RunStore { ensureDir(path.dirname(dbPath)); this.db = new DatabaseSync(dbPath); this.db.exec("PRAGMA journal_mode = WAL;"); + this.db.exec("PRAGMA synchronous = NORMAL;"); + this.db.exec("PRAGMA temp_store = MEMORY;"); + this.db.exec("PRAGMA wal_autocheckpoint = 200;"); + this.db.exec("PRAGMA journal_size_limit = 16777216;"); this.db.exec(` CREATE TABLE IF NOT EXISTS runs ( id TEXT PRIMARY KEY, @@ -218,6 +229,7 @@ export class RunStore { this.ensureColumn("runs", "active_command_summary", "TEXT"); this.ensureColumn("runs", "waiting_inputs_json", "TEXT NOT NULL DEFAULT '[]'"); this.ensureColumn("tasks", "runtime_class", "TEXT NOT NULL DEFAULT 'short'"); + this.pruneRunHistoryTables(); } private ensureColumn(table: string, columnName: string, columnDefinition: string): void { @@ -228,6 +240,32 @@ export class RunStore { this.db.exec(`ALTER TABLE ${table} ADD COLUMN ${columnName} ${columnDefinition};`); } + private pruneRunHistoryTables(): void { + this.runBestEffortWrite(() => { + this.db.exec(` + DELETE FROM checkpoints + WHERE rowid IN ( + SELECT rowid + FROM ( + SELECT rowid, ROW_NUMBER() OVER (PARTITION BY run_id ORDER BY rowid DESC) AS row_num + FROM checkpoints + ) + WHERE row_num > 24 + ); + + DELETE FROM events + WHERE rowid IN ( + SELECT rowid + FROM ( + SELECT rowid, ROW_NUMBER() OVER (PARTITION BY run_id ORDER BY id DESC) AS row_num + FROM events + ) + WHERE row_num > 200 + ); + `); + }); + } + createRun(goal: string, repoPath: string): RunRecord { const timestamp = nowIso(); const id = randomUUID(); @@ -275,46 +313,25 @@ export class RunStore { >, ): RunRecord { const current = this.getRun(runId); - const next: RunRecord = { - ...current, - status: updates.status ?? current.status, - summary: updates.summary ?? current.summary, - currentTaskId: updates.currentTaskId === undefined ? current.currentTaskId : updates.currentTaskId, - lastEventAt: updates.lastEventAt === undefined ? current.lastEventAt : updates.lastEventAt, - activeRole: updates.activeRole === undefined ? current.activeRole : updates.activeRole, - activeItemKind: updates.activeItemKind === undefined ? current.activeItemKind : updates.activeItemKind, - activeCommandSummary: - updates.activeCommandSummary === undefined ? current.activeCommandSummary : updates.activeCommandSummary, - waitingInputs: updates.waitingInputs === undefined ? current.waitingInputs : updates.waitingInputs, - updatedAt: nowIso(), - }; - this.db - .prepare(` - UPDATE runs - SET - status = ?, - summary = ?, - current_task_id = ?, - last_event_at = ?, - active_role = ?, - active_item_kind = ?, - active_command_summary = ?, - waiting_inputs_json = ?, - updated_at = ? - WHERE id = ? - `) - .run( - next.status, - next.summary, - next.currentTaskId, - next.lastEventAt, - next.activeRole, - next.activeItemKind, - next.activeCommandSummary, - toJson(next.waitingInputs), - next.updatedAt, - runId, - ); + const next = this.mergeRun(current, updates); + this.writeRun(runId, next); + return this.getRun(runId); + } + + updateRunActivity( + runId: string, + updates: Partial>, + ): RunRecord { + const current = this.getRun(runId); + const next = this.mergeRun(current, updates); + try { + this.writeRun(runId, next); + } catch (error) { + if (!isStorageFullError(error)) { + throw error; + } + return next; + } return this.getRun(runId); } @@ -334,28 +351,30 @@ export class RunStore { } saveAgentState(record: AgentStateRecord): void { - this.db - .prepare(` - INSERT INTO agents (run_id, role, session_id, turns, rotation_count, last_prompt_path, last_response_path, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(run_id, role) DO UPDATE SET - session_id = excluded.session_id, - turns = excluded.turns, - rotation_count = excluded.rotation_count, - last_prompt_path = excluded.last_prompt_path, - last_response_path = excluded.last_response_path, - updated_at = excluded.updated_at - `) - .run( - record.runId, - record.role, - record.sessionId, - record.turns, - record.rotationCount, - record.lastPromptPath, - record.lastResponsePath, - record.updatedAt, - ); + this.runBestEffortWrite(() => { + this.db + .prepare(` + INSERT INTO agents (run_id, role, session_id, turns, rotation_count, last_prompt_path, last_response_path, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(run_id, role) DO UPDATE SET + session_id = excluded.session_id, + turns = excluded.turns, + rotation_count = excluded.rotation_count, + last_prompt_path = excluded.last_prompt_path, + last_response_path = excluded.last_response_path, + updated_at = excluded.updated_at + `) + .run( + record.runId, + record.role, + record.sessionId, + record.turns, + record.rotationCount, + record.lastPromptPath, + record.lastResponsePath, + record.updatedAt, + ); + }); } requeueInProgressTasks(runId: string): number { @@ -556,12 +575,14 @@ export class RunStore { addArtifact(runId: string, role: AgentRole | null, kind: string, filePath: string, metadata: unknown): ArtifactRecord { const id = randomUUID(); const createdAt = nowIso(); - this.db - .prepare(` - INSERT INTO artifacts (id, run_id, role, kind, path, created_at, metadata_json) - VALUES (?, ?, ?, ?, ?, ?, ?) - `) - .run(id, runId, role, kind, filePath, createdAt, toJson(metadata)); + this.runBestEffortWrite(() => { + this.db + .prepare(` + INSERT INTO artifacts (id, run_id, role, kind, path, created_at, metadata_json) + VALUES (?, ?, ?, ?, ?, ?, ?) + `) + .run(id, runId, role, kind, filePath, createdAt, toJson(metadata)); + }); return { id, @@ -671,12 +692,27 @@ export class RunStore { addCheckpoint(runId: string, status: RunStatus, payload: unknown): CheckpointRecord { const id = randomUUID(); const createdAt = nowIso(); - this.db - .prepare(` - INSERT INTO checkpoints (id, run_id, status, payload_json, created_at) - VALUES (?, ?, ?, ?, ?) - `) - .run(id, runId, status, toJson(payload), createdAt); + this.runBestEffortWrite(() => { + this.db + .prepare(` + INSERT INTO checkpoints (id, run_id, status, payload_json, created_at) + VALUES (?, ?, ?, ?, ?) + `) + .run(id, runId, status, toJson(payload), createdAt); + this.db + .prepare(` + DELETE FROM checkpoints + WHERE run_id = ? + AND id NOT IN ( + SELECT id + FROM checkpoints + WHERE run_id = ? + ORDER BY rowid DESC + LIMIT 24 + ) + `) + .run(runId, runId); + }); return { id, @@ -722,4 +758,75 @@ export class RunStore { .all(runId, limit) as Record[]; return rows.map(mapAttemptRow).reverse(); } + + private mergeRun( + current: RunRecord, + updates: Partial< + Pick< + RunRecord, + | "status" + | "summary" + | "currentTaskId" + | "lastEventAt" + | "activeRole" + | "activeItemKind" + | "activeCommandSummary" + | "waitingInputs" + > + >, + ): RunRecord { + return { + ...current, + status: updates.status ?? current.status, + summary: updates.summary ?? current.summary, + currentTaskId: updates.currentTaskId === undefined ? current.currentTaskId : updates.currentTaskId, + lastEventAt: updates.lastEventAt === undefined ? current.lastEventAt : updates.lastEventAt, + activeRole: updates.activeRole === undefined ? current.activeRole : updates.activeRole, + activeItemKind: updates.activeItemKind === undefined ? current.activeItemKind : updates.activeItemKind, + activeCommandSummary: + updates.activeCommandSummary === undefined ? current.activeCommandSummary : updates.activeCommandSummary, + waitingInputs: updates.waitingInputs === undefined ? current.waitingInputs : updates.waitingInputs, + updatedAt: nowIso(), + }; + } + + private writeRun(runId: string, next: RunRecord): void { + this.db + .prepare(` + UPDATE runs + SET + status = ?, + summary = ?, + current_task_id = ?, + last_event_at = ?, + active_role = ?, + active_item_kind = ?, + active_command_summary = ?, + waiting_inputs_json = ?, + updated_at = ? + WHERE id = ? + `) + .run( + next.status, + next.summary, + next.currentTaskId, + next.lastEventAt, + next.activeRole, + next.activeItemKind, + next.activeCommandSummary, + toJson(next.waitingInputs), + next.updatedAt, + runId, + ); + } + + private runBestEffortWrite(callback: () => void): void { + try { + callback(); + } catch (error) { + if (!isStorageFullError(error)) { + throw error; + } + } + } } diff --git a/src/types.ts b/src/types.ts index 21d85ad..54338c8 100644 --- a/src/types.ts +++ b/src/types.ts @@ -140,13 +140,49 @@ export interface CheckpointRecord { createdAt: string; } +export interface CheckpointEventSummary { + ts: string; + source: string; + kind: string; + message: string; +} + +export interface CheckpointAttemptSummary { + taskId: string; + attemptNumber: number; + status: AttemptStatus; + summary: string; + blockerSignature: string | null; + createdAt: string; +} + +export interface RunCheckpointPayload { + run: RunRecord; + counts: { + pending: number; + inProgress: number; + completed: number; + blocked: number; + approvals: number; + artifacts: number; + }; + currentTask: Pick | null; + waitingInputs: string[]; + latestApproval: Pick | null; + recentAttempts: CheckpointAttemptSummary[]; + recentEvents: CheckpointEventSummary[]; +} + export interface VerificationResult { command: string; outcome: "passed" | "failed" | "not_run"; details: string; category: VerificationCategory; artifacts: string[]; - metrics: Record; + metrics: Array<{ + key: string; + value: string | number | boolean | null; + }>; } export interface StrategyPlanOutput { diff --git a/test/codex-runner.test.ts b/test/codex-runner.test.ts new file mode 100644 index 0000000..a199027 --- /dev/null +++ b/test/codex-runner.test.ts @@ -0,0 +1,315 @@ +import { describe, expect, test } from "vitest"; +import type { ThreadEvent } from "@openai/codex-sdk"; +import { CodexSdkRunner } from "../src/codex-runner.js"; +import type { AgentInvocation, AgentInvocationResult, InvocationArtifacts, ProgressEvent, RoleConfig } from "../src/types.js"; + +type FakeRunPlan = { + events?: ThreadEvent[]; + error?: Error; +}; + +class FakeThread { + constructor( + public readonly id: string | null, + private readonly plans: FakeRunPlan[], + private readonly calls: Array<{ input: string; outputSchema?: unknown }>, + ) {} + + async runStreamed(input: string, turnOptions?: { outputSchema?: unknown }): Promise<{ events: AsyncGenerator }> { + this.calls.push({ input, outputSchema: turnOptions?.outputSchema }); + const plan = this.plans.shift(); + if (!plan) { + throw new Error("Unexpected runStreamed call"); + } + if (plan.error) { + throw plan.error; + } + return { + events: this.generateEvents(plan.events ?? []), + }; + } + + private async *generateEvents(events: ThreadEvent[]): AsyncGenerator { + for (const event of events) { + yield event; + } + } +} + +class FakeCodex { + readonly calls: Array<{ input: string; outputSchema?: unknown }> = []; + + constructor(private readonly plans: FakeRunPlan[]) {} + + startThread(): FakeThread { + return new FakeThread("thread-started", this.plans, this.calls); + } + + resumeThread(threadId: string): FakeThread { + return new FakeThread(threadId, this.plans, this.calls); + } +} + +function buildArtifacts(): InvocationArtifacts { + return { + promptPath: "/tmp/prompt.json", + schemaPath: "/tmp/schema.json", + rawEventsPath: "/tmp/raw-events.jsonl", + stderrPath: "/tmp/stderr.log", + lastMessagePath: "/tmp/last-message.json", + responsePath: "/tmp/response.json", + }; +} + +function buildRoleConfig(): RoleConfig { + return { + sandbox: "read-only", + search: false, + skipGitRepoCheck: false, + extraArgs: [], + }; +} + +function buildRequest(onProgress?: (event: ProgressEvent) => void | Promise): AgentInvocation<{ summary: string }> { + return { + runId: "run-1", + role: "strategy", + sessionId: null, + prompt: "Summarize status", + schemaName: "summary", + schema: { + type: "object", + properties: { + summary: { type: "string" }, + }, + required: ["summary"], + additionalProperties: false, + }, + cwd: "/tmp", + roleConfig: buildRoleConfig(), + artifacts: buildArtifacts(), + onProgress, + }; +} + +describe("CodexSdkRunner", () => { + test("continues past transient reconnect stream errors", async () => { + const progressEvents: ProgressEvent[] = []; + const runner = new CodexSdkRunner("codex", new FakeCodex([ + { + events: [ + { + type: "thread.started", + thread_id: "thread-1", + }, + { + type: "error", + message: "Reconnecting... 2/12 (stream disconnected before completion: idle timeout waiting for websocket)", + }, + { + type: "item.completed", + item: { + id: "message-1", + type: "agent_message", + text: JSON.stringify({ summary: "Recovered after reconnect." }), + }, + }, + { + type: "turn.completed", + usage: { + input_tokens: 10, + cached_input_tokens: 0, + output_tokens: 5, + }, + }, + ], + }, + ])); + + const result = await runner.invoke(buildRequest((event) => { + progressEvents.push(event); + })); + + expect(result.output.summary).toBe("Recovered after reconnect."); + expect(result.sessionId).toBe("thread-1"); + expect(progressEvents.some((event) => event.kind === "stream.error")).toBe(true); + }); + + test("falls back to prompt-embedded schema for unsupported dynamic object keys", async () => { + const codex = new FakeCodex([ + { + events: [ + { + type: "thread.started", + thread_id: "thread-1", + }, + { + type: "item.completed", + item: { + id: "message-1", + type: "agent_message", + text: JSON.stringify({ + summary: "Captured metrics.", + metrics: { + latency_ms: 12, + }, + }), + }, + }, + { + type: "turn.completed", + usage: { + input_tokens: 10, + cached_input_tokens: 0, + output_tokens: 5, + }, + }, + ], + }, + ]); + const runner = new CodexSdkRunner("codex", codex); + + const result = await runner.invoke({ + ...buildRequest(), + schema: { + type: "object", + properties: { + summary: { type: "string" }, + metrics: { + type: "object", + additionalProperties: { + type: "number", + }, + }, + }, + required: ["summary", "metrics"], + additionalProperties: false, + }, + }); + + expect(result.output).toEqual({ + summary: "Captured metrics.", + metrics: { + latency_ms: 12, + }, + }); + expect(codex.calls).toHaveLength(1); + expect(codex.calls[0]?.outputSchema).toBeUndefined(); + expect(codex.calls[0]?.input).toContain("Structured output enforcement is unavailable"); + expect(codex.calls[0]?.input).toContain("\"metrics\""); + }); + + test("retries without outputSchema after invalid_json_schema errors", async () => { + const codex = new FakeCodex([ + { + error: new Error( + "Invalid schema for response_format 'codex_output_schema': invalid_json_schema at text.format.schema", + ), + }, + { + events: [ + { + type: "thread.started", + thread_id: "thread-1", + }, + { + type: "item.completed", + item: { + id: "message-1", + type: "agent_message", + text: JSON.stringify({ summary: "Recovered after schema fallback." }), + }, + }, + { + type: "turn.completed", + usage: { + input_tokens: 10, + cached_input_tokens: 0, + output_tokens: 5, + }, + }, + ], + }, + ]); + const runner = new CodexSdkRunner("codex", codex); + + const result = await runner.invoke(buildRequest()); + + expect(result.output.summary).toBe("Recovered after schema fallback."); + expect(codex.calls).toHaveLength(2); + expect(codex.calls[0]?.outputSchema).toBeDefined(); + expect(codex.calls[1]?.outputSchema).toBeUndefined(); + expect(codex.calls[1]?.input).toContain("Structured output enforcement is unavailable"); + expect(codex.calls[1]?.input).toContain("\"summary\""); + }); + + test("falls back when required does not match the declared properties", async () => { + const codex = new FakeCodex([ + { + events: [ + { + type: "thread.started", + thread_id: "thread-1", + }, + { + type: "item.completed", + item: { + id: "message-1", + type: "agent_message", + text: JSON.stringify({ + summary: "Normalized required keys.", + status: "ok", + }), + }, + }, + { + type: "turn.completed", + usage: { + input_tokens: 10, + cached_input_tokens: 0, + output_tokens: 5, + }, + }, + ], + }, + ]); + const runner = new CodexSdkRunner("codex", codex); + + await runner.invoke({ + ...buildRequest(), + schema: { + type: "object", + properties: { + summary: { type: "string" }, + status: { type: "string" }, + }, + required: ["summary"], + additionalProperties: false, + }, + }); + + expect(codex.calls).toHaveLength(1); + expect(codex.calls[0]?.outputSchema).toBeUndefined(); + expect(codex.calls[0]?.input).toContain("Structured output enforcement is unavailable"); + expect(codex.calls[0]?.input).toContain("\"status\""); + }); + + test("still fails on non-transient stream errors", async () => { + const runner = new CodexSdkRunner("codex", new FakeCodex([ + { + events: [ + { + type: "thread.started", + thread_id: "thread-1", + }, + { + type: "error", + message: "Fatal websocket failure", + }, + ], + }, + ])); + + await expect(runner.invoke(buildRequest())).rejects.toThrow("Fatal websocket failure"); + }); +}); diff --git a/test/orchestrator.test.ts b/test/orchestrator.test.ts index 394c89d..ba26c6d 100644 --- a/test/orchestrator.test.ts +++ b/test/orchestrator.test.ts @@ -1,7 +1,8 @@ import { mkdtempSync, readFileSync } from "node:fs"; import { tmpdir } from "node:os"; +import { DatabaseSync } from "node:sqlite"; import path from "node:path"; -import { describe, expect, test } from "vitest"; +import { describe, expect, test, vi } from "vitest"; import { ArtifactManager } from "../src/artifacts.js"; import { loadConfig } from "../src/config.js"; import { AgentROrchestrator } from "../src/orchestrator.js"; @@ -55,6 +56,41 @@ class ScriptedRunner implements RawAgentRunner { } } +class DeferredRunner implements RawAgentRunner { + private pending: + | { + request: AgentInvocation; + resolve: (value: AgentInvocationResult) => void; + } + | null = null; + + async invoke(request: AgentInvocation): Promise> { + return await new Promise>((resolve) => { + this.pending = { + request: request as AgentInvocation, + resolve: resolve as (value: AgentInvocationResult) => void, + }; + }); + } + + complete(output: Record): void { + if (!this.pending) { + throw new Error("No pending invocation to complete."); + } + + const { request, resolve } = this.pending; + this.pending = null; + resolve({ + sessionId: request.sessionId ?? "session-1", + output, + rawMessage: JSON.stringify(output), + rawEvents: ['{"type":"turn.completed"}'], + stderr: "", + artifacts: request.artifacts, + }); + } +} + describe("AgentROrchestrator", () => { test("runs through plan, implementation, self-check, and independent check", async () => { const repoPath = mkdtempSync(path.join(tmpdir(), "agent-r-orchestrator-")); @@ -95,7 +131,7 @@ describe("AgentROrchestrator", () => { details: "Help output rendered.", category: "test", artifacts: [], - metrics: {}, + metrics: [], }, ], followUps: [], @@ -138,6 +174,11 @@ describe("AgentROrchestrator", () => { const firstStatus = await orchestrator.runUntilStable(run.id, 10); const snapshot = store.buildSnapshot(run.id); + const db = new DatabaseSync(config.databasePath); + const checkpointRow = db + .prepare("SELECT payload_json FROM checkpoints WHERE run_id = ? ORDER BY rowid DESC LIMIT 1") + .get(run.id) as { payload_json: string }; + const checkpoint = JSON.parse(checkpointRow.payload_json) as Record; expect(firstStatus.status).toBe("done"); expect(snapshot.completedTasks).toHaveLength(1); @@ -145,6 +186,12 @@ describe("AgentROrchestrator", () => { expect(snapshot.pendingTasks).toHaveLength(0); expect(runner.prompts.at(1)).toContain("You must echo the exact task id"); expect(snapshot.run.waitingInputs).toEqual([]); + expect(checkpoint.counts).toMatchObject({ + completed: 1, + pending: 0, + }); + expect(checkpoint).not.toHaveProperty("artifacts.0.path"); + expect(checkpoint).not.toHaveProperty("recentEvents.0.payload"); }); test("strategy prompt explicitly permits read-only repository inspection", async () => { @@ -236,7 +283,7 @@ describe("AgentROrchestrator", () => { details: "No accessible physical node was available.", category: "proof", artifacts: [], - metrics: {}, + metrics: [], }, ], followUps: [], @@ -257,4 +304,47 @@ describe("AgentROrchestrator", () => { expect(snapshot.run.waitingInputs).toEqual(["Reachable physical node with ISO boot path"]); expect(snapshot.blockedTasks).toHaveLength(1); }); + + test("emits heartbeat only after five minutes of inactivity", async () => { + vi.useFakeTimers(); + + try { + const repoPath = mkdtempSync(path.join(tmpdir(), "agent-r-heartbeat-")); + const config = loadConfig(repoPath); + const store = new RunStore(config.databasePath); + const artifacts = new ArtifactManager(config.runsDir); + const runner = new DeferredRunner(); + const progressEvents: ProgressEvent[] = []; + const orchestrator = new AgentROrchestrator(config, store, artifacts, runner, (event) => { + progressEvents.push(event); + }); + + const run = orchestrator.createRun("Wait on a long-running strategy turn"); + const runPromise = orchestrator.runUntilStable(run.id, 1); + + await vi.advanceTimersByTimeAsync(299_999); + expect(progressEvents.filter((event) => event.kind === "heartbeat")).toHaveLength(0); + + await vi.advanceTimersByTimeAsync(1); + expect(progressEvents.filter((event) => event.kind === "heartbeat")).toHaveLength(1); + + runner.complete({ + decision: "blocked", + summary: "Still waiting.", + rationale: "Synthetic test stop.", + goalProgress: "None.", + risks: [], + tasks: [], + blockedReason: "Stop after heartbeat.", + blockerClass: "tool_failure", + requiredInputs: [], + fallbackTasks: [], + }); + + const finalRun = await runPromise; + expect(finalRun.status).toBe("blocked"); + } finally { + vi.useRealTimers(); + } + }); }); diff --git a/test/store.test.ts b/test/store.test.ts index f2c650e..ad94ef4 100644 --- a/test/store.test.ts +++ b/test/store.test.ts @@ -1,5 +1,6 @@ import { mkdtempSync } from "node:fs"; import { tmpdir } from "node:os"; +import { DatabaseSync } from "node:sqlite"; import path from "node:path"; import { describe, expect, test } from "vitest"; import { RunStore } from "../src/store.js"; @@ -41,4 +42,50 @@ describe("RunStore", () => { expect(snapshot.pendingTasks[0]?.runtimeClass).toBe("long"); expect(snapshot.run.waitingInputs).toEqual([]); }); + + test("keeps only recent checkpoints per run", () => { + const tempDir = mkdtempSync(path.join(tmpdir(), "agent-r-store-")); + const dbPath = path.join(tempDir, "state.sqlite"); + const store = new RunStore(dbPath); + const run = store.createRun("Build something", tempDir); + + for (let index = 0; index < 30; index += 1) { + store.addCheckpoint(run.id, "planning", { index }); + } + + const db = new DatabaseSync(dbPath); + const rows = db + .prepare("SELECT payload_json FROM checkpoints WHERE run_id = ? ORDER BY rowid DESC") + .all(run.id) as Array<{ payload_json: string }>; + + expect(rows).toHaveLength(24); + expect(JSON.parse(rows[0]!.payload_json)).toEqual({ index: 29 }); + expect(JSON.parse(rows.at(-1)!.payload_json)).toEqual({ index: 6 }); + }); + + test("continues when activity updates hit SQLITE_FULL", () => { + const tempDir = mkdtempSync(path.join(tmpdir(), "agent-r-store-")); + const store = new RunStore(path.join(tempDir, "state.sqlite")); + const run = store.createRun("Build something", tempDir); + const subject = store as unknown as { + writeRun: (runId: string, next: unknown) => void; + }; + const originalWriteRun = subject.writeRun.bind(subject); + + subject.writeRun = () => { + throw new Error("database or disk is full"); + }; + + expect(() => + store.updateRunActivity(run.id, { + lastEventAt: new Date().toISOString(), + activeRole: "strategy", + activeItemKind: "command_execution", + activeCommandSummary: "npm test", + }), + ).not.toThrow(); + + subject.writeRun = originalWriteRun; + expect(store.getRun(run.id).activeRole).toBeNull(); + }); });