import { describe, expect, test } from "vitest"; import type { ThreadEvent } from "@openai/codex-sdk"; import { CodexSdkRunner } from "../src/codex-runner.js"; import type { AgentInvocation, AgentInvocationResult, InvocationArtifacts, ProgressEvent, RoleConfig } from "../src/types.js"; type FakeRunPlan = { events?: ThreadEvent[]; error?: Error; }; class FakeThread { constructor( public readonly id: string | null, private readonly plans: FakeRunPlan[], private readonly calls: Array<{ input: string; outputSchema?: unknown }>, ) {} async runStreamed(input: string, turnOptions?: { outputSchema?: unknown }): Promise<{ events: AsyncGenerator }> { this.calls.push({ input, outputSchema: turnOptions?.outputSchema }); const plan = this.plans.shift(); if (!plan) { throw new Error("Unexpected runStreamed call"); } if (plan.error) { throw plan.error; } return { events: this.generateEvents(plan.events ?? []), }; } private async *generateEvents(events: ThreadEvent[]): AsyncGenerator { for (const event of events) { yield event; } } } class FakeCodex { readonly calls: Array<{ input: string; outputSchema?: unknown }> = []; constructor(private readonly plans: FakeRunPlan[]) {} startThread(): FakeThread { return new FakeThread("thread-started", this.plans, this.calls); } resumeThread(threadId: string): FakeThread { return new FakeThread(threadId, this.plans, this.calls); } } function buildArtifacts(): InvocationArtifacts { return { promptPath: "/tmp/prompt.json", schemaPath: "/tmp/schema.json", rawEventsPath: "/tmp/raw-events.jsonl", stderrPath: "/tmp/stderr.log", lastMessagePath: "/tmp/last-message.json", responsePath: "/tmp/response.json", }; } function buildRoleConfig(): RoleConfig { return { sandbox: "read-only", search: false, skipGitRepoCheck: false, extraArgs: [], }; } function buildRequest(onProgress?: (event: ProgressEvent) => void | Promise): AgentInvocation<{ summary: string }> { return { runId: "run-1", role: "strategy", sessionId: null, prompt: "Summarize status", schemaName: "summary", schema: { type: "object", properties: { summary: { type: "string" }, }, required: ["summary"], additionalProperties: false, }, cwd: "/tmp", roleConfig: buildRoleConfig(), artifacts: buildArtifacts(), onProgress, }; } describe("CodexSdkRunner", () => { test("continues past transient reconnect stream errors", async () => { const progressEvents: ProgressEvent[] = []; const runner = new CodexSdkRunner("codex", new FakeCodex([ { events: [ { type: "thread.started", thread_id: "thread-1", }, { type: "error", message: "Reconnecting... 2/12 (stream disconnected before completion: idle timeout waiting for websocket)", }, { type: "item.completed", item: { id: "message-1", type: "agent_message", text: JSON.stringify({ summary: "Recovered after reconnect." }), }, }, { type: "turn.completed", usage: { input_tokens: 10, cached_input_tokens: 0, output_tokens: 5, }, }, ], }, ])); const result = await runner.invoke(buildRequest((event) => { progressEvents.push(event); })); expect(result.output.summary).toBe("Recovered after reconnect."); expect(result.sessionId).toBe("thread-1"); expect(progressEvents.some((event) => event.kind === "stream.error")).toBe(true); }); test("falls back to prompt-embedded schema for unsupported dynamic object keys", async () => { const codex = new FakeCodex([ { events: [ { type: "thread.started", thread_id: "thread-1", }, { type: "item.completed", item: { id: "message-1", type: "agent_message", text: JSON.stringify({ summary: "Captured metrics.", metrics: { latency_ms: 12, }, }), }, }, { type: "turn.completed", usage: { input_tokens: 10, cached_input_tokens: 0, output_tokens: 5, }, }, ], }, ]); const runner = new CodexSdkRunner("codex", codex); const result = await runner.invoke({ ...buildRequest(), schema: { type: "object", properties: { summary: { type: "string" }, metrics: { type: "object", additionalProperties: { type: "number", }, }, }, required: ["summary", "metrics"], additionalProperties: false, }, }); expect(result.output).toEqual({ summary: "Captured metrics.", metrics: { latency_ms: 12, }, }); expect(codex.calls).toHaveLength(1); expect(codex.calls[0]?.outputSchema).toBeUndefined(); expect(codex.calls[0]?.input).toContain("Structured output enforcement is unavailable"); expect(codex.calls[0]?.input).toContain("\"metrics\""); }); test("retries without outputSchema after invalid_json_schema errors", async () => { const codex = new FakeCodex([ { error: new Error( "Invalid schema for response_format 'codex_output_schema': invalid_json_schema at text.format.schema", ), }, { events: [ { type: "thread.started", thread_id: "thread-1", }, { type: "item.completed", item: { id: "message-1", type: "agent_message", text: JSON.stringify({ summary: "Recovered after schema fallback." }), }, }, { type: "turn.completed", usage: { input_tokens: 10, cached_input_tokens: 0, output_tokens: 5, }, }, ], }, ]); const runner = new CodexSdkRunner("codex", codex); const result = await runner.invoke(buildRequest()); expect(result.output.summary).toBe("Recovered after schema fallback."); expect(codex.calls).toHaveLength(2); expect(codex.calls[0]?.outputSchema).toBeDefined(); expect(codex.calls[1]?.outputSchema).toBeUndefined(); expect(codex.calls[1]?.input).toContain("Structured output enforcement is unavailable"); expect(codex.calls[1]?.input).toContain("\"summary\""); }); test("falls back when required does not match the declared properties", async () => { const codex = new FakeCodex([ { events: [ { type: "thread.started", thread_id: "thread-1", }, { type: "item.completed", item: { id: "message-1", type: "agent_message", text: JSON.stringify({ summary: "Normalized required keys.", status: "ok", }), }, }, { type: "turn.completed", usage: { input_tokens: 10, cached_input_tokens: 0, output_tokens: 5, }, }, ], }, ]); const runner = new CodexSdkRunner("codex", codex); await runner.invoke({ ...buildRequest(), schema: { type: "object", properties: { summary: { type: "string" }, status: { type: "string" }, }, required: ["summary"], additionalProperties: false, }, }); expect(codex.calls).toHaveLength(1); expect(codex.calls[0]?.outputSchema).toBeUndefined(); expect(codex.calls[0]?.input).toContain("Structured output enforcement is unavailable"); expect(codex.calls[0]?.input).toContain("\"status\""); }); test("still fails on non-transient stream errors", async () => { const runner = new CodexSdkRunner("codex", new FakeCodex([ { events: [ { type: "thread.started", thread_id: "thread-1", }, { type: "error", message: "Fatal websocket failure", }, ], }, ])); await expect(runner.invoke(buildRequest())).rejects.toThrow("Fatal websocket failure"); }); });