This commit is contained in:
centra 2026-04-14 20:44:06 +09:00
parent ba8f83dfef
commit 76cbaca2fb
Signed by: centra
GPG key ID: 0C09689D20B25ACA
9 changed files with 1089 additions and 117 deletions

View file

@ -1,53 +1,93 @@
import { Ajv } from "ajv";
import { Codex, type ThreadEvent, type ThreadOptions } from "@openai/codex-sdk";
import {
buildSchemaFallbackPrompt,
buildValidationRetryPrompt,
isInvalidOutputSchemaError,
prepareOutputSchema,
} from "./output-schema.js";
import { progressEventsFromThreadEvent } from "./progress.js";
import type { AgentInvocation, AgentInvocationResult, RawAgentRunner } from "./types.js";
function extractValidationError(message: string, errorText: string): string {
return [
"The previous response did not parse or validate.",
`Problem: ${errorText}`,
"Respond again with JSON only, matching the schema exactly.",
"Previous response:",
message,
].join("\n\n");
interface CodexThread {
readonly id: string | null;
runStreamed(input: string, turnOptions?: { outputSchema?: unknown }): Promise<{ events: AsyncGenerator<ThreadEvent> }>;
}
interface CodexClient {
startThread(threadOptions?: ThreadOptions): CodexThread;
resumeThread(threadId: string, threadOptions?: ThreadOptions): CodexThread;
}
class InvocationStreamError extends Error {
constructor(
message: string,
readonly sessionId: string | null,
) {
super(message);
}
}
export class CodexSdkRunner implements RawAgentRunner {
private readonly ajv = new Ajv({ allErrors: true, strict: false });
private readonly codex: Codex;
private readonly codex: CodexClient;
constructor(codexCommand: string) {
this.codex = new Codex({
constructor(codexCommand: string, codex?: CodexClient) {
this.codex = codex ?? new Codex({
codexPathOverride: codexCommand,
});
}
async invoke<T>(request: AgentInvocation<T>): Promise<AgentInvocationResult<T>> {
let prompt = request.prompt;
const validate = this.ajv.compile(request.schema);
const preparedSchema = prepareOutputSchema(request.schema);
let prompt = preparedSchema.outputSchema
? request.prompt
: buildSchemaFallbackPrompt(request.prompt, request.schema, preparedSchema.fallbackReason ?? undefined);
let sessionId = request.sessionId;
const maxAttempts = request.maxValidationRetries ?? 2;
let lastFailure = "";
let outputSchema: Record<string, unknown> | undefined = preparedSchema.outputSchema ?? undefined;
let schemaEmbeddedInPrompt = outputSchema === undefined;
for (let attempt = 1; attempt <= maxAttempts; attempt += 1) {
const raw = await this.invokeOnce({
...request,
prompt,
sessionId,
});
for (let attempt = 1; attempt <= maxAttempts;) {
let raw: Omit<AgentInvocationResult<T>, "output">;
try {
raw = await this.invokeOnce(
{
...request,
prompt,
sessionId,
},
outputSchema,
);
} catch (error) {
const errorText = error instanceof Error ? error.message : String(error);
lastFailure = errorText;
if (outputSchema && isInvalidOutputSchemaError(errorText)) {
if (error instanceof InvocationStreamError) {
sessionId = error.sessionId;
}
outputSchema = undefined;
schemaEmbeddedInPrompt = true;
prompt = buildSchemaFallbackPrompt(request.prompt, request.schema, errorText);
continue;
}
throw error;
}
sessionId = raw.sessionId;
try {
const parsed = JSON.parse(raw.rawMessage) as unknown;
const validate = this.ajv.compile(request.schema);
if (!validate(parsed)) {
const errorText = this.ajv.errorsText(validate.errors);
lastFailure = errorText;
if (attempt === maxAttempts) {
throw new Error(`Schema validation failed: ${errorText}`);
}
prompt = extractValidationError(raw.rawMessage, errorText);
prompt = buildValidationRetryPrompt(raw.rawMessage, errorText, request.schema, schemaEmbeddedInPrompt);
attempt += 1;
continue;
}
@ -61,14 +101,18 @@ export class CodexSdkRunner implements RawAgentRunner {
if (attempt === maxAttempts) {
throw new Error(`Structured output failed after ${attempt} attempt(s): ${lastFailure}`);
}
prompt = extractValidationError(raw.rawMessage, errorText);
prompt = buildValidationRetryPrompt(raw.rawMessage, errorText, request.schema, schemaEmbeddedInPrompt);
attempt += 1;
}
}
throw new Error(`Structured output failed: ${lastFailure}`);
}
private async invokeOnce<T>(request: AgentInvocation<T>): Promise<Omit<AgentInvocationResult<T>, "output">> {
private async invokeOnce<T>(
request: AgentInvocation<T>,
outputSchema?: Record<string, unknown>,
): Promise<Omit<AgentInvocationResult<T>, "output">> {
const threadOptions: ThreadOptions = {
sandboxMode: request.roleConfig.sandbox,
workingDirectory: request.cwd,
@ -88,27 +132,39 @@ export class CodexSdkRunner implements RawAgentRunner {
: this.codex.startThread(threadOptions);
const streamed = await thread.runStreamed(request.prompt, {
outputSchema: request.schema,
outputSchema,
});
const rawEvents: string[] = [];
let sessionId = request.sessionId;
let rawMessage = "";
let lastStreamError: string | null = null;
for await (const event of streamed.events) {
rawEvents.push(JSON.stringify(event));
sessionId = this.extractSessionId(sessionId, event);
rawMessage = this.extractLatestMessage(rawMessage, event);
if (event.type === "error") {
lastStreamError = event.message;
}
if (request.onProgress) {
const progressEvents = progressEventsFromThreadEvent(request.runId, request.role, event);
for (const progressEvent of progressEvents) {
await request.onProgress(progressEvent);
}
}
this.throwIfFailed(event);
try {
this.throwIfFailed(event);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
throw new InvocationStreamError(message, sessionId ?? thread.id);
}
}
if (!rawMessage.trim()) {
if (lastStreamError) {
throw new Error(lastStreamError);
}
throw new Error("Codex SDK completed without a final agent message.");
}
@ -137,10 +193,17 @@ export class CodexSdkRunner implements RawAgentRunner {
private throwIfFailed(event: ThreadEvent): void {
if (event.type === "error") {
if (this.isTransientStreamError(event.message)) {
return;
}
throw new Error(event.message);
}
if (event.type === "turn.failed") {
throw new Error(event.error.message);
}
}
private isTransientStreamError(message: string): boolean {
return message.startsWith("Reconnecting...");
}
}

View file

@ -8,7 +8,9 @@ import type {
ProgressSink,
RawAgentRunner,
ResolvedConfig,
RunCheckpointPayload,
RunRecord,
RunSnapshot,
StrategyPlanOutput,
StrategySelfCheckOutput,
TaskDraft,
@ -28,7 +30,8 @@ import {
import { checkVerdictSchema, implementationResultSchema, strategyPlanSchema, strategySelfCheckSchema } from "./schema-catalog.js";
import { RunStore } from "./store.js";
const HEARTBEAT_INTERVAL_MS = 30_000;
const HEARTBEAT_INTERVAL_MS = 5 * 60_000;
const ACTIVITY_WRITE_THROTTLE_MS = 1_000;
function nowIso(): string {
return new Date().toISOString();
@ -477,16 +480,52 @@ export class AgentROrchestrator {
let activeItemKind: string | null = null;
let activeCommandSummary: string | null = null;
this.store.updateRun(options.runId, {
lastEventAt: nowIso(),
activeRole: options.role,
activeItemKind: null,
activeCommandSummary: null,
});
let lastProgressAt = Date.now();
let lastActivityPersistAt = 0;
let persistedActiveRole: AgentRole | null = null;
let persistedActiveItemKind: string | null = null;
let persistedActiveCommandSummary: string | null = null;
this.persistRunActivity(
options.runId,
{
lastEventAt: nowIso(),
activeRole: options.role,
activeItemKind: null,
activeCommandSummary: null,
},
);
lastActivityPersistAt = Date.now();
persistedActiveRole = options.role;
persistedActiveItemKind = null;
persistedActiveCommandSummary = null;
const maybePersistActivity = (updates: {
lastEventAt: string;
activeRole: AgentRole | null;
activeItemKind: string | null;
activeCommandSummary: string | null;
}): void => {
const changed =
updates.activeRole !== persistedActiveRole ||
updates.activeItemKind !== persistedActiveItemKind ||
updates.activeCommandSummary !== persistedActiveCommandSummary;
const now = Date.now();
if (!changed && now - lastActivityPersistAt < ACTIVITY_WRITE_THROTTLE_MS) {
return;
}
this.persistRunActivity(options.runId, updates);
lastActivityPersistAt = now;
persistedActiveRole = updates.activeRole;
persistedActiveItemKind = updates.activeItemKind;
persistedActiveCommandSummary = updates.activeCommandSummary;
};
const heartbeatTimer = setInterval(() => {
if (Date.now() - lastProgressAt < HEARTBEAT_INTERVAL_MS) {
return;
}
const heartbeat = createHeartbeatProgressEvent(options.runId, options.role, activeItemKind, activeCommandSummary);
this.store.updateRun(options.runId, {
maybePersistActivity({
lastEventAt: heartbeat.ts,
activeRole: options.role,
activeItemKind,
@ -509,6 +548,7 @@ export class AgentROrchestrator {
artifacts,
maxValidationRetries: 2,
onProgress: async (event) => {
lastProgressAt = Date.now();
if (event.activeItemKind !== undefined) {
activeItemKind = event.activeItemKind;
}
@ -520,13 +560,13 @@ export class AgentROrchestrator {
event.kind === "turn.failed" ||
event.kind === "stream.error" ||
event.kind === "agent_message.completed";
this.store.updateRun(options.runId, {
maybePersistActivity({
lastEventAt: event.ts,
activeRole: clearActivity ? null : options.role,
activeItemKind: clearActivity ? null : activeItemKind,
activeCommandSummary: clearActivity ? null : activeCommandSummary,
});
this.recordProgressEvent(event, true);
this.recordProgressEvent(event, false);
if (clearActivity) {
activeItemKind = null;
activeCommandSummary = null;
@ -557,7 +597,7 @@ export class AgentROrchestrator {
updatedAt: nowIso(),
};
this.store.saveAgentState(nextState);
this.store.updateRun(options.runId, {
maybePersistActivity({
lastEventAt: nowIso(),
activeRole: null,
activeItemKind: null,
@ -576,7 +616,7 @@ export class AgentROrchestrator {
private checkpoint(runId: string): void {
const snapshot = this.store.buildSnapshot(runId);
this.store.addCheckpoint(runId, snapshot.run.status, snapshot);
this.store.addCheckpoint(runId, snapshot.run.status, this.buildCheckpointPayload(snapshot));
}
private emitEvent(
@ -605,4 +645,68 @@ export class AgentROrchestrator {
}
this.progressSink?.(event);
}
private persistRunActivity(
runId: string,
updates: {
lastEventAt: string;
activeRole: AgentRole | null;
activeItemKind: string | null;
activeCommandSummary: string | null;
},
): void {
this.store.updateRunActivity(runId, updates);
}
private buildCheckpointPayload(snapshot: RunSnapshot): RunCheckpointPayload {
const currentTask = snapshot.run.currentTaskId
? [...snapshot.inProgressTasks, ...snapshot.pendingTasks, ...snapshot.completedTasks, ...snapshot.blockedTasks].find(
(task) => task.id === snapshot.run.currentTaskId,
) ?? null
: null;
const latestApproval = snapshot.approvals.at(-1) ?? null;
return {
run: snapshot.run,
counts: {
pending: snapshot.pendingTasks.length,
inProgress: snapshot.inProgressTasks.length,
completed: snapshot.completedTasks.length,
blocked: snapshot.blockedTasks.length,
approvals: snapshot.approvals.length,
artifacts: snapshot.artifacts.length,
},
currentTask: currentTask
? {
id: currentTask.id,
title: currentTask.title,
status: currentTask.status,
attemptCount: currentTask.attemptCount,
runtimeClass: currentTask.runtimeClass,
}
: null,
waitingInputs: [...snapshot.run.waitingInputs],
latestApproval: latestApproval
? {
source: latestApproval.source,
verdict: latestApproval.verdict,
rationale: latestApproval.rationale,
createdAt: latestApproval.createdAt,
}
: null,
recentAttempts: snapshot.recentAttempts.map((attempt) => ({
taskId: attempt.taskId,
attemptNumber: attempt.attemptNumber,
status: attempt.status,
summary: attempt.summary,
blockerSignature: attempt.blockerSignature,
createdAt: attempt.createdAt,
})),
recentEvents: snapshot.recentEvents.slice(-12).map((event) => ({
ts: event.ts,
source: event.source,
kind: event.kind,
message: event.message,
})),
};
}
}

204
src/output-schema.ts Normal file
View file

@ -0,0 +1,204 @@
function isPlainObject(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function hasObjectType(schema: Record<string, unknown>): boolean {
const { type } = schema;
if (type === "object") {
return true;
}
return Array.isArray(type) && type.includes("object");
}
function hasArrayType(schema: Record<string, unknown>): boolean {
const { type } = schema;
if (type === "array") {
return true;
}
return Array.isArray(type) && type.includes("array");
}
type NormalizeResult =
| {
ok: true;
schema: unknown;
}
| {
ok: false;
reason: string;
};
const UNSUPPORTED_OBJECT_KEYWORDS = [
"patternProperties",
"propertyNames",
"dependentSchemas",
"unevaluatedProperties",
"allOf",
"anyOf",
"oneOf",
"if",
"then",
"else",
"not",
];
function normalizeSchemaNode(node: unknown, path: string): NormalizeResult {
if (!isPlainObject(node)) {
return { ok: true, schema: node };
}
for (const keyword of UNSUPPORTED_OBJECT_KEYWORDS) {
if (keyword in node) {
return {
ok: false,
reason: `Structured outputs do not safely support '${keyword}' at ${path}.`,
};
}
}
const normalized: Record<string, unknown> = { ...node };
if (hasObjectType(node) || "properties" in node || "additionalProperties" in node) {
const propertiesValue = node.properties;
if (propertiesValue !== undefined && !isPlainObject(propertiesValue)) {
return {
ok: false,
reason: `Object schema at ${path} must declare properties as an object.`,
};
}
const properties = isPlainObject(propertiesValue) ? propertiesValue : {};
const normalizedProperties: Record<string, unknown> = {};
for (const [key, value] of Object.entries(properties)) {
const child = normalizeSchemaNode(value, `${path}.properties.${key}`);
if (!child.ok) {
return child;
}
normalizedProperties[key] = child.schema;
}
const additionalProperties = node.additionalProperties;
if (additionalProperties !== false) {
return {
ok: false,
reason: `Structured outputs require additionalProperties: false at ${path}.`,
};
}
const propertyKeys = Object.keys(normalizedProperties);
const required = node.required;
if (
!Array.isArray(required) ||
required.length !== propertyKeys.length ||
required.some((value) => typeof value !== "string") ||
propertyKeys.some((key) => !required.includes(key)) ||
required.some((key) => !propertyKeys.includes(key))
) {
return {
ok: false,
reason: `Structured outputs require 'required' to match every property at ${path}.`,
};
}
normalized.properties = normalizedProperties;
normalized.required = required;
normalized.additionalProperties = additionalProperties;
}
if (hasArrayType(node) || "items" in node) {
if (!("items" in node)) {
return {
ok: false,
reason: `Array schema at ${path} is missing 'items'.`,
};
}
if (Array.isArray(node.items)) {
return {
ok: false,
reason: `Structured outputs do not safely support tuple-style 'items' at ${path}.`,
};
}
const normalizedItems = normalizeSchemaNode(node.items, `${path}.items`);
if (!normalizedItems.ok) {
return normalizedItems;
}
normalized.items = normalizedItems.schema;
}
return {
ok: true,
schema: normalized,
};
}
export function prepareOutputSchema(schema: Record<string, unknown>): {
outputSchema: Record<string, unknown> | null;
fallbackReason: string | null;
} {
const normalized = normalizeSchemaNode(schema, "$");
if (!normalized.ok) {
return {
outputSchema: null,
fallbackReason: normalized.reason,
};
}
if (!isPlainObject(normalized.schema)) {
return {
outputSchema: null,
fallbackReason: "Structured outputs require a plain JSON object schema.",
};
}
return {
outputSchema: normalized.schema,
fallbackReason: null,
};
}
export function buildSchemaFallbackPrompt(
prompt: string,
schema: Record<string, unknown>,
reason?: string,
): string {
return [
prompt,
"Structured output enforcement is unavailable for this turn.",
reason ? `Reason: ${reason}` : null,
"Return a single JSON object only. Do not include markdown fences or any commentary.",
"Match this JSON schema exactly:",
JSON.stringify(schema, null, 2),
]
.filter((value): value is string => Boolean(value))
.join("\n\n");
}
export function buildValidationRetryPrompt(
message: string,
errorText: string,
schema: Record<string, unknown>,
schemaEmbeddedInPrompt: boolean,
): string {
return [
"The previous response did not parse or validate.",
`Problem: ${errorText}`,
schemaEmbeddedInPrompt
? [
"Respond again with JSON only.",
"Match this JSON schema exactly:",
JSON.stringify(schema, null, 2),
].join("\n\n")
: "Respond again with JSON only, matching the schema exactly.",
"Previous response:",
message,
].join("\n\n");
}
export function isInvalidOutputSchemaError(message: string): boolean {
const lowered = message.toLowerCase();
return (
lowered.includes("invalid_json_schema") ||
lowered.includes("invalid schema for response_format") ||
lowered.includes("text.format.schema")
);
}

View file

@ -141,9 +141,15 @@ export const implementationResultSchema = {
items: { type: "string", minLength: 1 },
},
metrics: {
type: "object",
additionalProperties: {
type: ["string", "number", "boolean", "null"],
type: "array",
items: {
type: "object",
additionalProperties: false,
required: ["key", "value"],
properties: {
key: { type: "string", minLength: 1 },
value: { type: ["string", "number", "boolean", "null"] },
},
},
},
},

View file

@ -41,6 +41,13 @@ function stringOrNull(value: unknown): string | null {
return value === null || value === undefined || value === "" ? null : String(value);
}
function isStorageFullError(error: unknown): boolean {
if (!(error instanceof Error)) {
return false;
}
return /database or disk is full|SQLITE_FULL/i.test(error.message);
}
function mapTaskRow(row: Record<string, unknown>): TaskRecord {
return {
id: String(row.id),
@ -114,6 +121,10 @@ export class RunStore {
ensureDir(path.dirname(dbPath));
this.db = new DatabaseSync(dbPath);
this.db.exec("PRAGMA journal_mode = WAL;");
this.db.exec("PRAGMA synchronous = NORMAL;");
this.db.exec("PRAGMA temp_store = MEMORY;");
this.db.exec("PRAGMA wal_autocheckpoint = 200;");
this.db.exec("PRAGMA journal_size_limit = 16777216;");
this.db.exec(`
CREATE TABLE IF NOT EXISTS runs (
id TEXT PRIMARY KEY,
@ -218,6 +229,7 @@ export class RunStore {
this.ensureColumn("runs", "active_command_summary", "TEXT");
this.ensureColumn("runs", "waiting_inputs_json", "TEXT NOT NULL DEFAULT '[]'");
this.ensureColumn("tasks", "runtime_class", "TEXT NOT NULL DEFAULT 'short'");
this.pruneRunHistoryTables();
}
private ensureColumn(table: string, columnName: string, columnDefinition: string): void {
@ -228,6 +240,32 @@ export class RunStore {
this.db.exec(`ALTER TABLE ${table} ADD COLUMN ${columnName} ${columnDefinition};`);
}
private pruneRunHistoryTables(): void {
this.runBestEffortWrite(() => {
this.db.exec(`
DELETE FROM checkpoints
WHERE rowid IN (
SELECT rowid
FROM (
SELECT rowid, ROW_NUMBER() OVER (PARTITION BY run_id ORDER BY rowid DESC) AS row_num
FROM checkpoints
)
WHERE row_num > 24
);
DELETE FROM events
WHERE rowid IN (
SELECT rowid
FROM (
SELECT rowid, ROW_NUMBER() OVER (PARTITION BY run_id ORDER BY id DESC) AS row_num
FROM events
)
WHERE row_num > 200
);
`);
});
}
createRun(goal: string, repoPath: string): RunRecord {
const timestamp = nowIso();
const id = randomUUID();
@ -275,46 +313,25 @@ export class RunStore {
>,
): RunRecord {
const current = this.getRun(runId);
const next: RunRecord = {
...current,
status: updates.status ?? current.status,
summary: updates.summary ?? current.summary,
currentTaskId: updates.currentTaskId === undefined ? current.currentTaskId : updates.currentTaskId,
lastEventAt: updates.lastEventAt === undefined ? current.lastEventAt : updates.lastEventAt,
activeRole: updates.activeRole === undefined ? current.activeRole : updates.activeRole,
activeItemKind: updates.activeItemKind === undefined ? current.activeItemKind : updates.activeItemKind,
activeCommandSummary:
updates.activeCommandSummary === undefined ? current.activeCommandSummary : updates.activeCommandSummary,
waitingInputs: updates.waitingInputs === undefined ? current.waitingInputs : updates.waitingInputs,
updatedAt: nowIso(),
};
this.db
.prepare(`
UPDATE runs
SET
status = ?,
summary = ?,
current_task_id = ?,
last_event_at = ?,
active_role = ?,
active_item_kind = ?,
active_command_summary = ?,
waiting_inputs_json = ?,
updated_at = ?
WHERE id = ?
`)
.run(
next.status,
next.summary,
next.currentTaskId,
next.lastEventAt,
next.activeRole,
next.activeItemKind,
next.activeCommandSummary,
toJson(next.waitingInputs),
next.updatedAt,
runId,
);
const next = this.mergeRun(current, updates);
this.writeRun(runId, next);
return this.getRun(runId);
}
updateRunActivity(
runId: string,
updates: Partial<Pick<RunRecord, "lastEventAt" | "activeRole" | "activeItemKind" | "activeCommandSummary">>,
): RunRecord {
const current = this.getRun(runId);
const next = this.mergeRun(current, updates);
try {
this.writeRun(runId, next);
} catch (error) {
if (!isStorageFullError(error)) {
throw error;
}
return next;
}
return this.getRun(runId);
}
@ -334,28 +351,30 @@ export class RunStore {
}
saveAgentState(record: AgentStateRecord): void {
this.db
.prepare(`
INSERT INTO agents (run_id, role, session_id, turns, rotation_count, last_prompt_path, last_response_path, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(run_id, role) DO UPDATE SET
session_id = excluded.session_id,
turns = excluded.turns,
rotation_count = excluded.rotation_count,
last_prompt_path = excluded.last_prompt_path,
last_response_path = excluded.last_response_path,
updated_at = excluded.updated_at
`)
.run(
record.runId,
record.role,
record.sessionId,
record.turns,
record.rotationCount,
record.lastPromptPath,
record.lastResponsePath,
record.updatedAt,
);
this.runBestEffortWrite(() => {
this.db
.prepare(`
INSERT INTO agents (run_id, role, session_id, turns, rotation_count, last_prompt_path, last_response_path, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(run_id, role) DO UPDATE SET
session_id = excluded.session_id,
turns = excluded.turns,
rotation_count = excluded.rotation_count,
last_prompt_path = excluded.last_prompt_path,
last_response_path = excluded.last_response_path,
updated_at = excluded.updated_at
`)
.run(
record.runId,
record.role,
record.sessionId,
record.turns,
record.rotationCount,
record.lastPromptPath,
record.lastResponsePath,
record.updatedAt,
);
});
}
requeueInProgressTasks(runId: string): number {
@ -556,12 +575,14 @@ export class RunStore {
addArtifact(runId: string, role: AgentRole | null, kind: string, filePath: string, metadata: unknown): ArtifactRecord {
const id = randomUUID();
const createdAt = nowIso();
this.db
.prepare(`
INSERT INTO artifacts (id, run_id, role, kind, path, created_at, metadata_json)
VALUES (?, ?, ?, ?, ?, ?, ?)
`)
.run(id, runId, role, kind, filePath, createdAt, toJson(metadata));
this.runBestEffortWrite(() => {
this.db
.prepare(`
INSERT INTO artifacts (id, run_id, role, kind, path, created_at, metadata_json)
VALUES (?, ?, ?, ?, ?, ?, ?)
`)
.run(id, runId, role, kind, filePath, createdAt, toJson(metadata));
});
return {
id,
@ -671,12 +692,27 @@ export class RunStore {
addCheckpoint(runId: string, status: RunStatus, payload: unknown): CheckpointRecord {
const id = randomUUID();
const createdAt = nowIso();
this.db
.prepare(`
INSERT INTO checkpoints (id, run_id, status, payload_json, created_at)
VALUES (?, ?, ?, ?, ?)
`)
.run(id, runId, status, toJson(payload), createdAt);
this.runBestEffortWrite(() => {
this.db
.prepare(`
INSERT INTO checkpoints (id, run_id, status, payload_json, created_at)
VALUES (?, ?, ?, ?, ?)
`)
.run(id, runId, status, toJson(payload), createdAt);
this.db
.prepare(`
DELETE FROM checkpoints
WHERE run_id = ?
AND id NOT IN (
SELECT id
FROM checkpoints
WHERE run_id = ?
ORDER BY rowid DESC
LIMIT 24
)
`)
.run(runId, runId);
});
return {
id,
@ -722,4 +758,75 @@ export class RunStore {
.all(runId, limit) as Record<string, unknown>[];
return rows.map(mapAttemptRow).reverse();
}
private mergeRun(
current: RunRecord,
updates: Partial<
Pick<
RunRecord,
| "status"
| "summary"
| "currentTaskId"
| "lastEventAt"
| "activeRole"
| "activeItemKind"
| "activeCommandSummary"
| "waitingInputs"
>
>,
): RunRecord {
return {
...current,
status: updates.status ?? current.status,
summary: updates.summary ?? current.summary,
currentTaskId: updates.currentTaskId === undefined ? current.currentTaskId : updates.currentTaskId,
lastEventAt: updates.lastEventAt === undefined ? current.lastEventAt : updates.lastEventAt,
activeRole: updates.activeRole === undefined ? current.activeRole : updates.activeRole,
activeItemKind: updates.activeItemKind === undefined ? current.activeItemKind : updates.activeItemKind,
activeCommandSummary:
updates.activeCommandSummary === undefined ? current.activeCommandSummary : updates.activeCommandSummary,
waitingInputs: updates.waitingInputs === undefined ? current.waitingInputs : updates.waitingInputs,
updatedAt: nowIso(),
};
}
private writeRun(runId: string, next: RunRecord): void {
this.db
.prepare(`
UPDATE runs
SET
status = ?,
summary = ?,
current_task_id = ?,
last_event_at = ?,
active_role = ?,
active_item_kind = ?,
active_command_summary = ?,
waiting_inputs_json = ?,
updated_at = ?
WHERE id = ?
`)
.run(
next.status,
next.summary,
next.currentTaskId,
next.lastEventAt,
next.activeRole,
next.activeItemKind,
next.activeCommandSummary,
toJson(next.waitingInputs),
next.updatedAt,
runId,
);
}
private runBestEffortWrite(callback: () => void): void {
try {
callback();
} catch (error) {
if (!isStorageFullError(error)) {
throw error;
}
}
}
}

View file

@ -140,13 +140,49 @@ export interface CheckpointRecord {
createdAt: string;
}
export interface CheckpointEventSummary {
ts: string;
source: string;
kind: string;
message: string;
}
export interface CheckpointAttemptSummary {
taskId: string;
attemptNumber: number;
status: AttemptStatus;
summary: string;
blockerSignature: string | null;
createdAt: string;
}
export interface RunCheckpointPayload {
run: RunRecord;
counts: {
pending: number;
inProgress: number;
completed: number;
blocked: number;
approvals: number;
artifacts: number;
};
currentTask: Pick<TaskRecord, "id" | "title" | "status" | "attemptCount" | "runtimeClass"> | null;
waitingInputs: string[];
latestApproval: Pick<ApprovalRecord, "source" | "verdict" | "rationale" | "createdAt"> | null;
recentAttempts: CheckpointAttemptSummary[];
recentEvents: CheckpointEventSummary[];
}
export interface VerificationResult {
command: string;
outcome: "passed" | "failed" | "not_run";
details: string;
category: VerificationCategory;
artifacts: string[];
metrics: Record<string, string | number | boolean | null>;
metrics: Array<{
key: string;
value: string | number | boolean | null;
}>;
}
export interface StrategyPlanOutput {

315
test/codex-runner.test.ts Normal file
View file

@ -0,0 +1,315 @@
import { describe, expect, test } from "vitest";
import type { ThreadEvent } from "@openai/codex-sdk";
import { CodexSdkRunner } from "../src/codex-runner.js";
import type { AgentInvocation, AgentInvocationResult, InvocationArtifacts, ProgressEvent, RoleConfig } from "../src/types.js";
type FakeRunPlan = {
events?: ThreadEvent[];
error?: Error;
};
class FakeThread {
constructor(
public readonly id: string | null,
private readonly plans: FakeRunPlan[],
private readonly calls: Array<{ input: string; outputSchema?: unknown }>,
) {}
async runStreamed(input: string, turnOptions?: { outputSchema?: unknown }): Promise<{ events: AsyncGenerator<ThreadEvent> }> {
this.calls.push({ input, outputSchema: turnOptions?.outputSchema });
const plan = this.plans.shift();
if (!plan) {
throw new Error("Unexpected runStreamed call");
}
if (plan.error) {
throw plan.error;
}
return {
events: this.generateEvents(plan.events ?? []),
};
}
private async *generateEvents(events: ThreadEvent[]): AsyncGenerator<ThreadEvent> {
for (const event of events) {
yield event;
}
}
}
class FakeCodex {
readonly calls: Array<{ input: string; outputSchema?: unknown }> = [];
constructor(private readonly plans: FakeRunPlan[]) {}
startThread(): FakeThread {
return new FakeThread("thread-started", this.plans, this.calls);
}
resumeThread(threadId: string): FakeThread {
return new FakeThread(threadId, this.plans, this.calls);
}
}
function buildArtifacts(): InvocationArtifacts {
return {
promptPath: "/tmp/prompt.json",
schemaPath: "/tmp/schema.json",
rawEventsPath: "/tmp/raw-events.jsonl",
stderrPath: "/tmp/stderr.log",
lastMessagePath: "/tmp/last-message.json",
responsePath: "/tmp/response.json",
};
}
function buildRoleConfig(): RoleConfig {
return {
sandbox: "read-only",
search: false,
skipGitRepoCheck: false,
extraArgs: [],
};
}
function buildRequest(onProgress?: (event: ProgressEvent) => void | Promise<void>): AgentInvocation<{ summary: string }> {
return {
runId: "run-1",
role: "strategy",
sessionId: null,
prompt: "Summarize status",
schemaName: "summary",
schema: {
type: "object",
properties: {
summary: { type: "string" },
},
required: ["summary"],
additionalProperties: false,
},
cwd: "/tmp",
roleConfig: buildRoleConfig(),
artifacts: buildArtifacts(),
onProgress,
};
}
describe("CodexSdkRunner", () => {
test("continues past transient reconnect stream errors", async () => {
const progressEvents: ProgressEvent[] = [];
const runner = new CodexSdkRunner("codex", new FakeCodex([
{
events: [
{
type: "thread.started",
thread_id: "thread-1",
},
{
type: "error",
message: "Reconnecting... 2/12 (stream disconnected before completion: idle timeout waiting for websocket)",
},
{
type: "item.completed",
item: {
id: "message-1",
type: "agent_message",
text: JSON.stringify({ summary: "Recovered after reconnect." }),
},
},
{
type: "turn.completed",
usage: {
input_tokens: 10,
cached_input_tokens: 0,
output_tokens: 5,
},
},
],
},
]));
const result = await runner.invoke(buildRequest((event) => {
progressEvents.push(event);
}));
expect(result.output.summary).toBe("Recovered after reconnect.");
expect(result.sessionId).toBe("thread-1");
expect(progressEvents.some((event) => event.kind === "stream.error")).toBe(true);
});
test("falls back to prompt-embedded schema for unsupported dynamic object keys", async () => {
const codex = new FakeCodex([
{
events: [
{
type: "thread.started",
thread_id: "thread-1",
},
{
type: "item.completed",
item: {
id: "message-1",
type: "agent_message",
text: JSON.stringify({
summary: "Captured metrics.",
metrics: {
latency_ms: 12,
},
}),
},
},
{
type: "turn.completed",
usage: {
input_tokens: 10,
cached_input_tokens: 0,
output_tokens: 5,
},
},
],
},
]);
const runner = new CodexSdkRunner("codex", codex);
const result = await runner.invoke({
...buildRequest(),
schema: {
type: "object",
properties: {
summary: { type: "string" },
metrics: {
type: "object",
additionalProperties: {
type: "number",
},
},
},
required: ["summary", "metrics"],
additionalProperties: false,
},
});
expect(result.output).toEqual({
summary: "Captured metrics.",
metrics: {
latency_ms: 12,
},
});
expect(codex.calls).toHaveLength(1);
expect(codex.calls[0]?.outputSchema).toBeUndefined();
expect(codex.calls[0]?.input).toContain("Structured output enforcement is unavailable");
expect(codex.calls[0]?.input).toContain("\"metrics\"");
});
test("retries without outputSchema after invalid_json_schema errors", async () => {
const codex = new FakeCodex([
{
error: new Error(
"Invalid schema for response_format 'codex_output_schema': invalid_json_schema at text.format.schema",
),
},
{
events: [
{
type: "thread.started",
thread_id: "thread-1",
},
{
type: "item.completed",
item: {
id: "message-1",
type: "agent_message",
text: JSON.stringify({ summary: "Recovered after schema fallback." }),
},
},
{
type: "turn.completed",
usage: {
input_tokens: 10,
cached_input_tokens: 0,
output_tokens: 5,
},
},
],
},
]);
const runner = new CodexSdkRunner("codex", codex);
const result = await runner.invoke(buildRequest());
expect(result.output.summary).toBe("Recovered after schema fallback.");
expect(codex.calls).toHaveLength(2);
expect(codex.calls[0]?.outputSchema).toBeDefined();
expect(codex.calls[1]?.outputSchema).toBeUndefined();
expect(codex.calls[1]?.input).toContain("Structured output enforcement is unavailable");
expect(codex.calls[1]?.input).toContain("\"summary\"");
});
test("falls back when required does not match the declared properties", async () => {
const codex = new FakeCodex([
{
events: [
{
type: "thread.started",
thread_id: "thread-1",
},
{
type: "item.completed",
item: {
id: "message-1",
type: "agent_message",
text: JSON.stringify({
summary: "Normalized required keys.",
status: "ok",
}),
},
},
{
type: "turn.completed",
usage: {
input_tokens: 10,
cached_input_tokens: 0,
output_tokens: 5,
},
},
],
},
]);
const runner = new CodexSdkRunner("codex", codex);
await runner.invoke({
...buildRequest(),
schema: {
type: "object",
properties: {
summary: { type: "string" },
status: { type: "string" },
},
required: ["summary"],
additionalProperties: false,
},
});
expect(codex.calls).toHaveLength(1);
expect(codex.calls[0]?.outputSchema).toBeUndefined();
expect(codex.calls[0]?.input).toContain("Structured output enforcement is unavailable");
expect(codex.calls[0]?.input).toContain("\"status\"");
});
test("still fails on non-transient stream errors", async () => {
const runner = new CodexSdkRunner("codex", new FakeCodex([
{
events: [
{
type: "thread.started",
thread_id: "thread-1",
},
{
type: "error",
message: "Fatal websocket failure",
},
],
},
]));
await expect(runner.invoke(buildRequest())).rejects.toThrow("Fatal websocket failure");
});
});

View file

@ -1,7 +1,8 @@
import { mkdtempSync, readFileSync } from "node:fs";
import { tmpdir } from "node:os";
import { DatabaseSync } from "node:sqlite";
import path from "node:path";
import { describe, expect, test } from "vitest";
import { describe, expect, test, vi } from "vitest";
import { ArtifactManager } from "../src/artifacts.js";
import { loadConfig } from "../src/config.js";
import { AgentROrchestrator } from "../src/orchestrator.js";
@ -55,6 +56,41 @@ class ScriptedRunner implements RawAgentRunner {
}
}
class DeferredRunner implements RawAgentRunner {
private pending:
| {
request: AgentInvocation<unknown>;
resolve: (value: AgentInvocationResult<unknown>) => void;
}
| null = null;
async invoke<T>(request: AgentInvocation<T>): Promise<AgentInvocationResult<T>> {
return await new Promise<AgentInvocationResult<T>>((resolve) => {
this.pending = {
request: request as AgentInvocation<unknown>,
resolve: resolve as (value: AgentInvocationResult<unknown>) => void,
};
});
}
complete(output: Record<string, unknown>): void {
if (!this.pending) {
throw new Error("No pending invocation to complete.");
}
const { request, resolve } = this.pending;
this.pending = null;
resolve({
sessionId: request.sessionId ?? "session-1",
output,
rawMessage: JSON.stringify(output),
rawEvents: ['{"type":"turn.completed"}'],
stderr: "",
artifacts: request.artifacts,
});
}
}
describe("AgentROrchestrator", () => {
test("runs through plan, implementation, self-check, and independent check", async () => {
const repoPath = mkdtempSync(path.join(tmpdir(), "agent-r-orchestrator-"));
@ -95,7 +131,7 @@ describe("AgentROrchestrator", () => {
details: "Help output rendered.",
category: "test",
artifacts: [],
metrics: {},
metrics: [],
},
],
followUps: [],
@ -138,6 +174,11 @@ describe("AgentROrchestrator", () => {
const firstStatus = await orchestrator.runUntilStable(run.id, 10);
const snapshot = store.buildSnapshot(run.id);
const db = new DatabaseSync(config.databasePath);
const checkpointRow = db
.prepare("SELECT payload_json FROM checkpoints WHERE run_id = ? ORDER BY rowid DESC LIMIT 1")
.get(run.id) as { payload_json: string };
const checkpoint = JSON.parse(checkpointRow.payload_json) as Record<string, unknown>;
expect(firstStatus.status).toBe("done");
expect(snapshot.completedTasks).toHaveLength(1);
@ -145,6 +186,12 @@ describe("AgentROrchestrator", () => {
expect(snapshot.pendingTasks).toHaveLength(0);
expect(runner.prompts.at(1)).toContain("You must echo the exact task id");
expect(snapshot.run.waitingInputs).toEqual([]);
expect(checkpoint.counts).toMatchObject({
completed: 1,
pending: 0,
});
expect(checkpoint).not.toHaveProperty("artifacts.0.path");
expect(checkpoint).not.toHaveProperty("recentEvents.0.payload");
});
test("strategy prompt explicitly permits read-only repository inspection", async () => {
@ -236,7 +283,7 @@ describe("AgentROrchestrator", () => {
details: "No accessible physical node was available.",
category: "proof",
artifacts: [],
metrics: {},
metrics: [],
},
],
followUps: [],
@ -257,4 +304,47 @@ describe("AgentROrchestrator", () => {
expect(snapshot.run.waitingInputs).toEqual(["Reachable physical node with ISO boot path"]);
expect(snapshot.blockedTasks).toHaveLength(1);
});
test("emits heartbeat only after five minutes of inactivity", async () => {
vi.useFakeTimers();
try {
const repoPath = mkdtempSync(path.join(tmpdir(), "agent-r-heartbeat-"));
const config = loadConfig(repoPath);
const store = new RunStore(config.databasePath);
const artifacts = new ArtifactManager(config.runsDir);
const runner = new DeferredRunner();
const progressEvents: ProgressEvent[] = [];
const orchestrator = new AgentROrchestrator(config, store, artifacts, runner, (event) => {
progressEvents.push(event);
});
const run = orchestrator.createRun("Wait on a long-running strategy turn");
const runPromise = orchestrator.runUntilStable(run.id, 1);
await vi.advanceTimersByTimeAsync(299_999);
expect(progressEvents.filter((event) => event.kind === "heartbeat")).toHaveLength(0);
await vi.advanceTimersByTimeAsync(1);
expect(progressEvents.filter((event) => event.kind === "heartbeat")).toHaveLength(1);
runner.complete({
decision: "blocked",
summary: "Still waiting.",
rationale: "Synthetic test stop.",
goalProgress: "None.",
risks: [],
tasks: [],
blockedReason: "Stop after heartbeat.",
blockerClass: "tool_failure",
requiredInputs: [],
fallbackTasks: [],
});
const finalRun = await runPromise;
expect(finalRun.status).toBe("blocked");
} finally {
vi.useRealTimers();
}
});
});

View file

@ -1,5 +1,6 @@
import { mkdtempSync } from "node:fs";
import { tmpdir } from "node:os";
import { DatabaseSync } from "node:sqlite";
import path from "node:path";
import { describe, expect, test } from "vitest";
import { RunStore } from "../src/store.js";
@ -41,4 +42,50 @@ describe("RunStore", () => {
expect(snapshot.pendingTasks[0]?.runtimeClass).toBe("long");
expect(snapshot.run.waitingInputs).toEqual([]);
});
test("keeps only recent checkpoints per run", () => {
const tempDir = mkdtempSync(path.join(tmpdir(), "agent-r-store-"));
const dbPath = path.join(tempDir, "state.sqlite");
const store = new RunStore(dbPath);
const run = store.createRun("Build something", tempDir);
for (let index = 0; index < 30; index += 1) {
store.addCheckpoint(run.id, "planning", { index });
}
const db = new DatabaseSync(dbPath);
const rows = db
.prepare("SELECT payload_json FROM checkpoints WHERE run_id = ? ORDER BY rowid DESC")
.all(run.id) as Array<{ payload_json: string }>;
expect(rows).toHaveLength(24);
expect(JSON.parse(rows[0]!.payload_json)).toEqual({ index: 29 });
expect(JSON.parse(rows.at(-1)!.payload_json)).toEqual({ index: 6 });
});
test("continues when activity updates hit SQLITE_FULL", () => {
const tempDir = mkdtempSync(path.join(tmpdir(), "agent-r-store-"));
const store = new RunStore(path.join(tempDir, "state.sqlite"));
const run = store.createRun("Build something", tempDir);
const subject = store as unknown as {
writeRun: (runId: string, next: unknown) => void;
};
const originalWriteRun = subject.writeRun.bind(subject);
subject.writeRun = () => {
throw new Error("database or disk is full");
};
expect(() =>
store.updateRunActivity(run.id, {
lastEventAt: new Date().toISOString(),
activeRole: "strategy",
activeItemKind: "command_execution",
activeCommandSummary: "npm test",
}),
).not.toThrow();
subject.writeRun = originalWriteRun;
expect(store.getRun(run.id).activeRole).toBeNull();
});
});