diff --git a/README.md b/README.md index ae8b16f..4f52907 100644 --- a/README.md +++ b/README.md @@ -17,13 +17,15 @@ npm install npm run dev -- run "Build a plugin system" --repo /path/to/repo npm run build -./dist/index.js run "Build a plugin system" --repo /path/to/repo -./dist/index.js resume --repo /path/to/repo -./dist/index.js status --repo /path/to/repo +./dist/index.js run "Build a plugin system" --repo /path/to/repo --progress human +./dist/index.js resume --repo /path/to/repo --progress quiet +./dist/index.js status --repo /path/to/repo --watch ./dist/index.js inspect --repo /path/to/repo ./dist/index.js logs --repo /path/to/repo --agent strategy ``` +`run` and `resume` stream progress to stdout by default. Use `--progress quiet` to suppress it or `--progress jsonl` for machine-readable event lines. + ## Config Place `agent-r.config.toml` in the target repo if you want to override defaults. diff --git a/nohup.out b/nohup.out new file mode 100644 index 0000000..c4c57db --- /dev/null +++ b/nohup.out @@ -0,0 +1,9 @@ +run: a9a6d691-4b9d-4aff-b58f-b1ecd2e0b400 +status: blocked +goal: このultracloudクラウド基盤について、1. クラウド基盤として実用的となること、2. ベアメタルなデプロイに向け、実機にうまくデプロイするための経路が存在すること、3. 各コンポーネントの責務が重複したり過剰に分割されたりすることなく役割を果たし、適度な分離(VM基盤がほしいだけなのにとてつもなくデカい手間がかかるみたいな)がなされること、4. 手軽に実用可能となること(昔のminioが簡単に試せたように、docker imageとかで使えると良いかも?)などを十分に満足するものとなるまで開発と検証を進める(大規模な改変も行って良い。)ようにしてください。当面の間は検証にVM(QEMUを実機に見立ててクラウド基盤をデプロイする。VM基盤とかはネステッドKVMを使う)を使うようにしてください。環境はNixで整えてください。 +summary: 実行環境が `bwrap: loopback: Failed RTM_NEWADDR: Operation not permitted` でシェル起動に失敗しており、`/mnt/d2/centra/photoncloud-monorepo` の読取・検証・実装を開始できません。少なくとも read-only の shell 実行が復旧するまで進行不能です。 +cycles: 6 +replans: 2 +current task: none +run id: a9a6d691-4b9d-4aff-b58f-b1ecd2e0b400 +pending tasks: 0 diff --git a/package-lock.json b/package-lock.json index c491a02..620e99d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -26,31 +26,6 @@ "node": ">=24.0.0" } }, - "node_modules/@emnapi/core": { - "version": "1.9.2", - "resolved": "https://registry.npmjs.org/@emnapi/core/-/core-1.9.2.tgz", - "integrity": "sha512-UC+ZhH3XtczQYfOlu3lNEkdW/p4dsJ1r/bP7H8+rhao3TTTMO1ATq/4DdIi23XuGoFY+Cz0JmCbdVl0hz9jZcA==", - "dev": true, - "license": "MIT", - "optional": true, - "peer": true, - "dependencies": { - "@emnapi/wasi-threads": "1.2.1", - "tslib": "^2.4.0" - } - }, - "node_modules/@emnapi/runtime": { - "version": "1.9.2", - "resolved": "https://registry.npmjs.org/@emnapi/runtime/-/runtime-1.9.2.tgz", - "integrity": "sha512-3U4+MIWHImeyu1wnmVygh5WlgfYDtyf0k8AbLhMFxOipihf6nrWC4syIm/SwEeec0mNSafiiNnMJwbza/Is6Lw==", - "dev": true, - "license": "MIT", - "optional": true, - "peer": true, - "dependencies": { - "tslib": "^2.4.0" - } - }, "node_modules/@emnapi/wasi-threads": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/@emnapi/wasi-threads/-/wasi-threads-1.2.1.tgz", diff --git a/src/codex-runner.ts b/src/codex-runner.ts index f7d716c..1725ee5 100644 --- a/src/codex-runner.ts +++ b/src/codex-runner.ts @@ -1,5 +1,6 @@ import { Ajv } from "ajv"; import { Codex, type ThreadEvent, type ThreadOptions } from "@openai/codex-sdk"; +import { progressEventsFromThreadEvent } from "./progress.js"; import type { AgentInvocation, AgentInvocationResult, RawAgentRunner } from "./types.js"; function extractValidationError(message: string, errorText: string): string { @@ -98,6 +99,12 @@ export class CodexSdkRunner implements RawAgentRunner { rawEvents.push(JSON.stringify(event)); sessionId = this.extractSessionId(sessionId, event); rawMessage = this.extractLatestMessage(rawMessage, event); + if (request.onProgress) { + const progressEvents = progressEventsFromThreadEvent(request.runId, request.role, event); + for (const progressEvent of progressEvents) { + await request.onProgress(progressEvent); + } + } this.throwIfFailed(event); } diff --git a/src/index.ts b/src/index.ts index 2949e01..a074dbc 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,44 +1,73 @@ #!/usr/bin/env -S node --disable-warning=ExperimentalWarning import path from "node:path"; +import { setTimeout as delay } from "node:timers/promises"; import { Command } from "commander"; import { ArtifactManager } from "./artifacts.js"; import { CodexSdkRunner } from "./codex-runner.js"; import { loadConfig } from "./config.js"; import { listFilesRecursive, readTextFile } from "./fs-utils.js"; import { AgentROrchestrator } from "./orchestrator.js"; +import { ConsoleProgressReporter } from "./progress.js"; import { RunStore } from "./store.js"; -import type { AgentRole } from "./types.js"; +import { formatRunStatusLines } from "./status-view.js"; +import type { AgentRole, ProgressMode, RunSnapshot } from "./types.js"; function resolveRepoPath(repo?: string): string { return path.resolve(repo ?? process.cwd()); } -function bootstrap(repo?: string, configPath?: string) { +function parseProgressMode(value: string): ProgressMode { + if (value === "human" || value === "jsonl" || value === "quiet") { + return value; + } + throw new Error(`Unsupported progress mode: ${value}`); +} + +function bootstrap(repo?: string, configPath?: string, progressMode: ProgressMode = "quiet") { const repoPath = resolveRepoPath(repo); const config = loadConfig(repoPath, configPath); const store = new RunStore(config.databasePath); const artifacts = new ArtifactManager(config.runsDir); const runner = new CodexSdkRunner(config.codexCommand); - const orchestrator = new AgentROrchestrator(config, store, artifacts, runner); + const reporter = new ConsoleProgressReporter(progressMode); + const orchestrator = new AgentROrchestrator(config, store, artifacts, runner, (event) => reporter.emit(event)); return { repoPath, config, store, artifacts, orchestrator }; } -function printRunStatus(label: string, run: { - id: string; - status: string; - summary: string | null; - goal: string; - cycleCount: number; - replanCount: number; - currentTaskId: string | null; -}): void { - console.log(`${label}: ${run.id}`); - console.log(`status: ${run.status}`); - console.log(`goal: ${run.goal}`); - console.log(`summary: ${run.summary ?? "none"}`); - console.log(`cycles: ${run.cycleCount}`); - console.log(`replans: ${run.replanCount}`); - console.log(`current task: ${run.currentTaskId ?? "none"}`); +function printRunSnapshot(snapshot: RunSnapshot, mode: ProgressMode = "human"): void { + if (mode === "jsonl") { + console.log( + JSON.stringify({ + type: "run_status", + run: snapshot.run, + counts: { + pending: snapshot.pendingTasks.length, + inProgress: snapshot.inProgressTasks.length, + completed: snapshot.completedTasks.length, + blocked: snapshot.blockedTasks.length, + }, + }), + ); + return; + } + + for (const line of formatRunStatusLines(snapshot)) { + console.log(line); + } +} + +async function watchRunStatus(store: RunStore, runId: string, intervalMs = 2_000): Promise { + for (;;) { + const snapshot = store.buildSnapshot(runId); + if (process.stdout.isTTY) { + console.clear(); + } + printRunSnapshot(snapshot); + if (snapshot.run.status === "done" || snapshot.run.status === "blocked" || snapshot.run.status === "waiting_input") { + return; + } + await delay(intervalMs); + } } const program = new Command(); @@ -51,14 +80,17 @@ program .option("--repo ", "target repository path") .option("--config ", "path to agent-r config file") .option("--max-cycles ", "override max cycles for this invocation", Number) - .action(async (goal: string, options: { repo?: string; config?: string; maxCycles?: number }) => { - const { orchestrator, store } = bootstrap(options.repo, options.config); + .option("--progress ", "progress output: human, jsonl, or quiet", parseProgressMode, "human") + .action( + async (goal: string, options: { repo?: string; config?: string; maxCycles?: number; progress?: ProgressMode }) => { + const progressMode = options.progress ?? "human"; + const { orchestrator, store } = bootstrap(options.repo, options.config, progressMode); const run = orchestrator.createRun(goal); console.log(`run id: ${run.id}`); const finalRun = await orchestrator.runUntilStable(run.id, options.maxCycles); - printRunStatus("run", finalRun); - console.log(`pending tasks: ${store.buildSnapshot(run.id).pendingTasks.length}`); - }); + printRunSnapshot(store.buildSnapshot(finalRun.id), progressMode); + }, + ); program .command("resume") @@ -66,24 +98,29 @@ program .option("--repo ", "target repository path") .option("--config ", "path to agent-r config file") .option("--max-cycles ", "override max cycles for this invocation", Number) - .action(async (runId: string, options: { repo?: string; config?: string; maxCycles?: number }) => { - const { orchestrator } = bootstrap(options.repo, options.config); + .option("--progress ", "progress output: human, jsonl, or quiet", parseProgressMode, "human") + .action( + async (runId: string, options: { repo?: string; config?: string; maxCycles?: number; progress?: ProgressMode }) => { + const progressMode = options.progress ?? "human"; + const { orchestrator, store } = bootstrap(options.repo, options.config, progressMode); const finalRun = await orchestrator.runUntilStable(runId, options.maxCycles); - printRunStatus("run", finalRun); - }); + printRunSnapshot(store.buildSnapshot(finalRun.id), progressMode); + }, + ); program .command("status") .argument("", "run id") .option("--repo ", "target repository path") .option("--config ", "path to agent-r config file") - .action((runId: string, options: { repo?: string; config?: string }) => { + .option("--watch", "watch until the run reaches done, blocked, or waiting_input") + .action(async (runId: string, options: { repo?: string; config?: string; watch?: boolean }) => { const { store } = bootstrap(options.repo, options.config); - const snapshot = store.buildSnapshot(runId); - printRunStatus("run", snapshot.run); - console.log(`pending: ${snapshot.pendingTasks.length}`); - console.log(`completed: ${snapshot.completedTasks.length}`); - console.log(`blocked/abandoned: ${snapshot.blockedTasks.length}`); + if (options.watch) { + await watchRunStatus(store, runId); + return; + } + printRunSnapshot(store.buildSnapshot(runId)); }); program @@ -109,8 +146,7 @@ program ? store.listArtifacts(runId, options.agent).map((artifact) => artifact.path) : listFilesRecursive(artifacts.getRunDir(runId)); - console.log(`run: ${runId}`); - console.log(`status: ${snapshot.run.status}`); + printRunSnapshot(snapshot); console.log(""); for (const filePath of artifactFiles) { diff --git a/src/orchestrator.ts b/src/orchestrator.ts index 186dd2b..e1bca27 100644 --- a/src/orchestrator.ts +++ b/src/orchestrator.ts @@ -1,18 +1,20 @@ import type { AgentRole, AgentStateRecord, + BlockerClass, CheckVerdictOutput, ImplementationResultOutput, + ProgressEvent, + ProgressSink, RawAgentRunner, ResolvedConfig, RunRecord, - RunSnapshot, StrategyPlanOutput, StrategySelfCheckOutput, TaskDraft, - TaskRecord, } from "./types.js"; import { ArtifactManager } from "./artifacts.js"; +import { createHeartbeatProgressEvent } from "./progress.js"; import { buildCheckPrompt, buildImplementationPrompt, @@ -26,6 +28,8 @@ import { import { checkVerdictSchema, implementationResultSchema, strategyPlanSchema, strategySelfCheckSchema } from "./schema-catalog.js"; import { RunStore } from "./store.js"; +const HEARTBEAT_INTERVAL_MS = 30_000; + function nowIso(): string { return new Date().toISOString(); } @@ -49,24 +53,72 @@ function uniqueTaskDrafts(tasks: TaskDraft[]): TaskDraft[] { return result; } +function shouldWaitForInput(blockerClass: BlockerClass | null): boolean { + return blockerClass === "external_resource" || blockerClass === "human_input"; +} + export class AgentROrchestrator { constructor( private readonly config: ResolvedConfig, private readonly store: RunStore, private readonly artifacts: ArtifactManager, private readonly runner: RawAgentRunner, + private readonly progressSink?: ProgressSink, ) {} createRun(goal: string): RunRecord { - return this.store.createRun(goal, this.config.repoPath); + const run = this.store.createRun(goal, this.config.repoPath); + this.recordProgressEvent( + { + ts: run.lastEventAt ?? run.createdAt, + runId: run.id, + source: "system", + kind: "run_created", + message: "Run created.", + payload: { + goal, + repoPath: this.config.repoPath, + }, + }, + false, + ); + return run; } async runUntilStable(runId: string, maxCycles = this.config.maxCyclesPerInvocation): Promise { - this.store.requeueInProgressTasks(runId); + const current = this.store.getRun(runId); + if (current.status === "waiting_input") { + this.store.updateRun(runId, { + status: "planning", + currentTaskId: null, + waitingInputs: [], + activeRole: null, + activeItemKind: null, + activeCommandSummary: null, + }); + this.emitEvent(runId, "system", "waiting_input.resumed", "Resuming a run that was waiting for input.", {}); + } + + const requeued = this.store.requeueInProgressTasks(runId); + if (requeued > 0) { + this.recordProgressEvent( + { + ts: nowIso(), + runId, + source: "system", + kind: "requeue_in_progress", + message: "Requeued in-progress tasks on resume.", + payload: { + changes: requeued, + }, + }, + false, + ); + } for (let cycle = 0; cycle < maxCycles; cycle += 1) { const snapshot = this.store.buildSnapshot(runId); - if (snapshot.run.status === "done" || snapshot.run.status === "blocked") { + if (snapshot.run.status === "done" || snapshot.run.status === "blocked" || snapshot.run.status === "waiting_input") { return snapshot.run; } @@ -94,7 +146,13 @@ export class AgentROrchestrator { } } - this.store.addEvent(runId, "system", "cycle_budget_exhausted", "Reached max cycles for this invocation.", { maxCycles }); + this.emitEvent( + runId, + "system", + "cycle_budget_exhausted", + "Reached max cycles for this invocation.", + { maxCycles }, + ); return this.store.getRun(runId); } @@ -109,8 +167,12 @@ export class AgentROrchestrator { }); const plan = response.output; - this.store.addEvent(runId, "strategy", "strategy_plan", formatStrategyPlanSummary(plan), plan); - this.store.updateRun(runId, { summary: plan.summary, currentTaskId: null }); + this.emitEvent(runId, "strategy", "strategy_plan", formatStrategyPlanSummary(plan), plan); + this.store.updateRun(runId, { + summary: plan.summary, + currentTaskId: null, + waitingInputs: [], + }); this.store.incrementReplanCount(runId); if (this.store.getRun(runId).replanCount > this.config.maxReplans) { @@ -118,18 +180,27 @@ export class AgentROrchestrator { status: "blocked", summary: `Exceeded replan limit (${this.config.maxReplans}).`, currentTaskId: null, + waitingInputs: [], }); - this.store.addEvent(runId, "system", "blocked", "Run blocked because the replan limit was exceeded.", {}); + this.emitEvent(runId, "system", "blocked", "Run blocked because the replan limit was exceeded.", {}); return; } if (plan.decision === "blocked") { + const waitingForInput = shouldWaitForInput(plan.blockerClass); this.store.updateRun(runId, { - status: "blocked", + status: waitingForInput ? "waiting_input" : "blocked", summary: plan.blockedReason ?? plan.summary, currentTaskId: null, + waitingInputs: waitingForInput ? plan.requiredInputs : [], }); - this.store.addEvent(runId, "strategy", "blocked", "Strategy declared the run blocked.", plan); + this.emitEvent( + runId, + "strategy", + waitingForInput ? "waiting_input" : "blocked", + waitingForInput ? "Strategy is waiting for required input." : "Strategy declared the run blocked.", + plan, + ); this.checkpoint(runId); return; } @@ -139,6 +210,7 @@ export class AgentROrchestrator { status: "self_check", summary: plan.summary, currentTaskId: null, + waitingInputs: [], }); this.checkpoint(runId); return; @@ -150,8 +222,9 @@ export class AgentROrchestrator { status: "blocked", summary: "Strategy returned continue without any tasks.", currentTaskId: null, + waitingInputs: [], }); - this.store.addEvent(runId, "system", "blocked", "Run blocked because no tasks were returned.", plan); + this.emitEvent(runId, "system", "blocked", "Run blocked because no tasks were returned.", plan); this.checkpoint(runId); return; } @@ -161,6 +234,7 @@ export class AgentROrchestrator { status: "planning", summary: plan.summary, currentTaskId: null, + waitingInputs: [], }); this.checkpoint(runId); } @@ -176,8 +250,12 @@ export class AgentROrchestrator { status: "implementing", currentTaskId: claimedTask.id, summary: `Implementing ${claimedTask.title}`, + waitingInputs: [], + activeRole: null, + activeItemKind: null, + activeCommandSummary: null, }); - this.store.addEvent(runId, "system", "task_dispatched", `Dispatched task ${claimedTask.id}.`, { taskId: claimedTask.id }); + this.emitEvent(runId, "system", "task_dispatched", `Dispatched task ${claimedTask.id}.`, { taskId: claimedTask.id }); const snapshot = this.store.buildSnapshot(runId); const response = await this.invokeRole({ @@ -206,7 +284,7 @@ export class AgentROrchestrator { result, signature, ); - this.store.addEvent(runId, "implementation", "implementation_result", formatImplementationSummary(result), result); + this.emitEvent(runId, "implementation", "implementation_result", formatImplementationSummary(result), result); if (result.status === "completed") { this.store.completeTask(claimedTask.id, result.summary); @@ -214,6 +292,24 @@ export class AgentROrchestrator { status: "planning", currentTaskId: null, summary: result.summary, + waitingInputs: [], + }); + this.checkpoint(runId); + return; + } + + if (shouldWaitForInput(result.blockerClass)) { + this.store.blockTask(claimedTask.id, result.summary, signature); + this.store.updateRun(runId, { + status: "waiting_input", + currentTaskId: null, + summary: result.summary, + waitingInputs: result.requiredInputs, + }); + this.emitEvent(runId, "system", "waiting_input", `Waiting for input for task ${claimedTask.title}.`, { + taskId: claimedTask.id, + blockerClass: result.blockerClass, + requiredInputs: result.requiredInputs, }); this.checkpoint(runId); return; @@ -231,8 +327,9 @@ export class AgentROrchestrator { status: "blocked", currentTaskId: null, summary: `Repeated blocker for task ${claimedTask.title}.`, + waitingInputs: [], }); - this.store.addEvent(runId, "system", "blocked", "Run blocked because the same blocker repeated.", { + this.emitEvent(runId, "system", "blocked", "Run blocked because the same blocker repeated.", { taskId: claimedTask.id, blockerSignature: signature, }); @@ -243,6 +340,7 @@ export class AgentROrchestrator { status: "planning", currentTaskId: null, summary: result.summary, + waitingInputs: [], }); this.checkpoint(runId); return; @@ -254,8 +352,9 @@ export class AgentROrchestrator { status: "blocked", currentTaskId: null, summary: `Task ${claimedTask.title} exceeded the retry limit.`, + waitingInputs: [], }); - this.store.addEvent(runId, "system", "blocked", "Run blocked because task retry limit was exceeded.", { + this.emitEvent(runId, "system", "blocked", "Run blocked because task retry limit was exceeded.", { taskId: claimedTask.id, attempts: latestTask.attemptCount, }); @@ -268,6 +367,7 @@ export class AgentROrchestrator { status: "planning", currentTaskId: null, summary: result.summary, + waitingInputs: [], }); this.checkpoint(runId); } @@ -278,7 +378,7 @@ export class AgentROrchestrator { | StrategyPlanOutput | undefined; if (!lastPlan) { - this.store.updateRun(runId, { status: "planning" }); + this.store.updateRun(runId, { status: "planning", waitingInputs: [] }); return; } @@ -298,12 +398,13 @@ export class AgentROrchestrator { selfCheck.rationale, selfCheck, ); - this.store.addEvent(runId, "strategy", "strategy_self_check", formatSelfCheckSummary(selfCheck), selfCheck); + this.emitEvent(runId, "strategy", "strategy_self_check", formatSelfCheckSummary(selfCheck), selfCheck); this.store.updateRun(runId, { status: selfCheck.readyForIndependentCheck ? "independent_check" : "planning", summary: selfCheck.summary, currentTaskId: null, + waitingInputs: [], }); this.checkpoint(runId); } @@ -315,7 +416,7 @@ export class AgentROrchestrator { | undefined; if (!latestSelfCheck) { - this.store.updateRun(runId, { status: "planning" }); + this.store.updateRun(runId, { status: "planning", waitingInputs: [] }); return; } @@ -335,13 +436,14 @@ export class AgentROrchestrator { verdict.rationale, verdict, ); - this.store.addEvent(runId, "checker", "check_verdict", formatCheckVerdictSummary(verdict), verdict); + this.emitEvent(runId, "checker", "check_verdict", formatCheckVerdictSummary(verdict), verdict); if (verdict.verdict === "approved") { this.store.updateRun(runId, { status: "done", summary: verdict.summary, currentTaskId: null, + waitingInputs: [], }); this.checkpoint(runId); return; @@ -351,6 +453,7 @@ export class AgentROrchestrator { status: "planning", summary: verdict.summary, currentTaskId: null, + waitingInputs: [], }); this.checkpoint(runId); } @@ -372,44 +475,99 @@ export class AgentROrchestrator { this.artifacts.writePrompt(artifacts.promptPath, prompt); this.artifacts.writeSchema(artifacts.schemaPath, options.schema); - const result = await this.runner.invoke({ - runId: options.runId, - role: options.role, - sessionId: nextSession, - prompt, - schemaName: options.schemaName, - schema: options.schema, - cwd: this.config.repoPath, - roleConfig: this.config.roles[options.role], - artifacts, - maxValidationRetries: 2, + let activeItemKind: string | null = null; + let activeCommandSummary: string | null = null; + this.store.updateRun(options.runId, { + lastEventAt: nowIso(), + activeRole: options.role, + activeItemKind: null, + activeCommandSummary: null, }); - this.artifacts.writeRawEvents(artifacts.rawEventsPath, result.rawEvents); - this.artifacts.writeStderr(artifacts.stderrPath, result.stderr); - this.artifacts.writeLastMessage(artifacts.lastMessagePath, result.rawMessage); - this.artifacts.writeResponse(artifacts.responsePath, result.output); + const heartbeatTimer = setInterval(() => { + const heartbeat = createHeartbeatProgressEvent(options.runId, options.role, activeItemKind, activeCommandSummary); + this.store.updateRun(options.runId, { + lastEventAt: heartbeat.ts, + activeRole: options.role, + activeItemKind, + activeCommandSummary, + }); + this.recordProgressEvent(heartbeat, false); + }, HEARTBEAT_INTERVAL_MS); + heartbeatTimer.unref?.(); - this.store.addArtifact(options.runId, options.role, `${options.schemaName}.prompt`, artifacts.promptPath, {}); - this.store.addArtifact(options.runId, options.role, `${options.schemaName}.schema`, artifacts.schemaPath, {}); - this.store.addArtifact(options.runId, options.role, `${options.schemaName}.events`, artifacts.rawEventsPath, {}); - this.store.addArtifact(options.runId, options.role, `${options.schemaName}.stderr`, artifacts.stderrPath, {}); - this.store.addArtifact(options.runId, options.role, `${options.schemaName}.last`, artifacts.lastMessagePath, {}); - this.store.addArtifact(options.runId, options.role, `${options.schemaName}.response`, artifacts.responsePath, {}); + try { + const result = await this.runner.invoke({ + runId: options.runId, + role: options.role, + sessionId: nextSession, + prompt, + schemaName: options.schemaName, + schema: options.schema, + cwd: this.config.repoPath, + roleConfig: this.config.roles[options.role], + artifacts, + maxValidationRetries: 2, + onProgress: async (event) => { + if (event.activeItemKind !== undefined) { + activeItemKind = event.activeItemKind; + } + if (event.activeCommandSummary !== undefined) { + activeCommandSummary = event.activeCommandSummary; + } + const clearActivity = + event.kind === "turn.completed" || + event.kind === "turn.failed" || + event.kind === "stream.error" || + event.kind === "agent_message.completed"; + this.store.updateRun(options.runId, { + lastEventAt: event.ts, + activeRole: clearActivity ? null : options.role, + activeItemKind: clearActivity ? null : activeItemKind, + activeCommandSummary: clearActivity ? null : activeCommandSummary, + }); + this.recordProgressEvent(event, true); + if (clearActivity) { + activeItemKind = null; + activeCommandSummary = null; + } + }, + }); - const nextState: AgentStateRecord = { - runId: options.runId, - role: options.role, - sessionId: result.sessionId, - turns: (this.shouldRotate(state) ? 0 : state?.turns ?? 0) + 1, - rotationCount: state ? state.rotationCount + (this.shouldRotate(state) ? 1 : 0) : 0, - lastPromptPath: artifacts.promptPath, - lastResponsePath: artifacts.responsePath, - updatedAt: nowIso(), - }; - this.store.saveAgentState(nextState); + this.artifacts.writeRawEvents(artifacts.rawEventsPath, result.rawEvents); + this.artifacts.writeStderr(artifacts.stderrPath, result.stderr); + this.artifacts.writeLastMessage(artifacts.lastMessagePath, result.rawMessage); + this.artifacts.writeResponse(artifacts.responsePath, result.output); - return { output: result.output }; + this.store.addArtifact(options.runId, options.role, `${options.schemaName}.prompt`, artifacts.promptPath, {}); + this.store.addArtifact(options.runId, options.role, `${options.schemaName}.schema`, artifacts.schemaPath, {}); + this.store.addArtifact(options.runId, options.role, `${options.schemaName}.events`, artifacts.rawEventsPath, {}); + this.store.addArtifact(options.runId, options.role, `${options.schemaName}.stderr`, artifacts.stderrPath, {}); + this.store.addArtifact(options.runId, options.role, `${options.schemaName}.last`, artifacts.lastMessagePath, {}); + this.store.addArtifact(options.runId, options.role, `${options.schemaName}.response`, artifacts.responsePath, {}); + + const nextState: AgentStateRecord = { + runId: options.runId, + role: options.role, + sessionId: result.sessionId, + turns: (this.shouldRotate(state) ? 0 : state?.turns ?? 0) + 1, + rotationCount: state ? state.rotationCount + (this.shouldRotate(state) ? 1 : 0) : 0, + lastPromptPath: artifacts.promptPath, + lastResponsePath: artifacts.responsePath, + updatedAt: nowIso(), + }; + this.store.saveAgentState(nextState); + this.store.updateRun(options.runId, { + lastEventAt: nowIso(), + activeRole: null, + activeItemKind: null, + activeCommandSummary: null, + }); + + return { output: result.output }; + } finally { + clearInterval(heartbeatTimer); + } } private shouldRotate(state: AgentStateRecord | null): boolean { @@ -420,4 +578,31 @@ export class AgentROrchestrator { const snapshot = this.store.buildSnapshot(runId); this.store.addCheckpoint(runId, snapshot.run.status, snapshot); } + + private emitEvent( + runId: string, + source: AgentRole | "system", + kind: string, + message: string, + payload: unknown, + ): void { + this.recordProgressEvent( + { + ts: nowIso(), + runId, + source, + kind, + message, + payload, + }, + true, + ); + } + + private recordProgressEvent(event: ProgressEvent, persist: boolean): void { + if (persist) { + this.store.addEvent(event.runId, event.source, event.kind, event.message, event.payload); + } + this.progressSink?.(event); + } } diff --git a/src/progress.ts b/src/progress.ts new file mode 100644 index 0000000..1706fe9 --- /dev/null +++ b/src/progress.ts @@ -0,0 +1,312 @@ +import type { ThreadEvent, ThreadItem } from "@openai/codex-sdk"; +import type { AgentRole, ProgressEvent, ProgressMode } from "./types.js"; + +function nowIso(): string { + return new Date().toISOString(); +} + +function truncate(value: string, maxLength = 140): string { + const trimmed = value.replace(/\s+/g, " ").trim(); + if (!trimmed) { + return ""; + } + if (trimmed.length <= maxLength) { + return trimmed; + } + return `${trimmed.slice(0, maxLength - 1)}…`; +} + +function summarizeCommand(command: string): string { + return truncate(command, 120); +} + +function summarizeOutput(output: string): string { + const lines = output + .split("\n") + .map((line) => line.trim()) + .filter(Boolean); + if (!lines.length) { + return ""; + } + const candidate = lines.at(-1) ?? lines[0] ?? ""; + return truncate(candidate, 100); +} + +function summarizeTodoItems(items: Array<{ text: string; completed: boolean }>): string { + if (!items.length) { + return "No todo items."; + } + const completed = items.filter((item) => item.completed).length; + const preview = items + .slice(0, 3) + .map((item) => `${item.completed ? "[x]" : "[ ]"} ${truncate(item.text, 48)}`) + .join("; "); + return `${completed}/${items.length} complete${preview ? ` | ${preview}` : ""}`; +} + +function summarizeFileChanges(changes: Array<{ path: string; kind: string }>): string { + if (!changes.length) { + return "No file changes."; + } + const preview = changes + .slice(0, 3) + .map((change) => `${change.kind} ${truncate(change.path, 48)}`) + .join(", "); + const remainder = changes.length > 3 ? ` (+${changes.length - 3} more)` : ""; + return `${preview}${remainder}`; +} + +function eventFromItemState( + runId: string, + role: AgentRole, + phase: "started" | "updated" | "completed", + item: ThreadItem, +): ProgressEvent | null { + const ts = nowIso(); + + switch (item.type) { + case "todo_list": + return { + ts, + runId, + source: role, + kind: "todo_list.updated", + message: summarizeTodoItems(item.items), + payload: { + status: phase, + items: item.items, + }, + activeItemKind: phase === "completed" ? null : item.type, + activeCommandSummary: null, + }; + case "command_execution": { + const summary = summarizeCommand(item.command); + const outputSummary = summarizeOutput(item.aggregated_output); + const status = phase === "completed" ? item.status : "in_progress"; + const kind = status === "failed" ? "command.failed" : status === "completed" ? "command.completed" : "command.started"; + const detail = + status === "completed" || status === "failed" + ? `${summary} exit=${item.exit_code ?? "?"}${outputSummary ? ` | ${outputSummary}` : ""}` + : summary; + return { + ts, + runId, + source: role, + kind, + message: detail, + payload: { + command: item.command, + status: item.status, + exitCode: item.exit_code ?? null, + outputSummary, + }, + activeItemKind: status === "completed" || status === "failed" ? null : item.type, + activeCommandSummary: status === "completed" || status === "failed" ? null : summary, + }; + } + case "file_change": + return { + ts, + runId, + source: role, + kind: phase === "completed" ? `file_change.${item.status}` : "file_change.started", + message: summarizeFileChanges(item.changes), + payload: { + status: phase === "completed" ? item.status : "in_progress", + changes: item.changes, + }, + activeItemKind: phase === "completed" ? null : item.type, + activeCommandSummary: null, + }; + case "mcp_tool_call": { + const status = phase === "completed" ? item.status : "in_progress"; + return { + ts, + runId, + source: role, + kind: status === "failed" ? "mcp_tool_call.failed" : status === "completed" ? "mcp_tool_call.completed" : "mcp_tool_call.started", + message: truncate(`${item.server}.${item.tool}`, 120), + payload: { + server: item.server, + tool: item.tool, + status, + error: item.error?.message ?? null, + }, + activeItemKind: status === "completed" || status === "failed" ? null : item.type, + activeCommandSummary: null, + }; + } + case "web_search": + return { + ts, + runId, + source: role, + kind: phase === "completed" ? "web_search.completed" : "web_search.started", + message: truncate(item.query, 120), + payload: { + query: item.query, + status: phase, + }, + activeItemKind: phase === "completed" ? null : item.type, + activeCommandSummary: null, + }; + case "error": + return { + ts, + runId, + source: role, + kind: "item.error", + message: truncate(item.message, 160), + payload: { + message: item.message, + }, + activeItemKind: null, + activeCommandSummary: null, + }; + case "agent_message": + if (phase !== "completed") { + return null; + } + return { + ts, + runId, + source: role, + kind: "agent_message.completed", + message: `Received structured response (${item.text.length} chars).`, + payload: { + length: item.text.length, + }, + activeItemKind: null, + activeCommandSummary: null, + }; + default: + return null; + } +} + +export function progressEventsFromThreadEvent(runId: string, role: AgentRole, event: ThreadEvent): ProgressEvent[] { + switch (event.type) { + case "thread.started": + return [ + { + ts: nowIso(), + runId, + source: role, + kind: "thread.started", + message: `Started thread ${event.thread_id}.`, + payload: { + threadId: event.thread_id, + }, + activeItemKind: null, + activeCommandSummary: null, + }, + ]; + case "turn.started": + return [ + { + ts: nowIso(), + runId, + source: role, + kind: "turn.started", + message: "Started turn.", + payload: {}, + }, + ]; + case "turn.completed": + return [ + { + ts: nowIso(), + runId, + source: role, + kind: "turn.completed", + message: `Completed turn with ${event.usage.output_tokens} output tokens.`, + payload: event.usage, + activeItemKind: null, + activeCommandSummary: null, + }, + ]; + case "turn.failed": + return [ + { + ts: nowIso(), + runId, + source: role, + kind: "turn.failed", + message: truncate(event.error.message, 160), + payload: event.error, + activeItemKind: null, + activeCommandSummary: null, + }, + ]; + case "error": + return [ + { + ts: nowIso(), + runId, + source: role, + kind: "stream.error", + message: truncate(event.message, 160), + payload: { + message: event.message, + }, + activeItemKind: null, + activeCommandSummary: null, + }, + ]; + case "item.started": { + const mapped = eventFromItemState(runId, role, "started", event.item); + return mapped ? [mapped] : []; + } + case "item.updated": { + const mapped = event.item.type === "todo_list" ? eventFromItemState(runId, role, "updated", event.item) : null; + return mapped ? [mapped] : []; + } + case "item.completed": { + const mapped = eventFromItemState(runId, role, "completed", event.item); + return mapped ? [mapped] : []; + } + } +} + +export function createHeartbeatProgressEvent( + runId: string, + source: AgentRole, + activeItemKind: string | null, + activeCommandSummary: string | null, +): ProgressEvent { + const focus = activeCommandSummary ?? activeItemKind ?? "agent work"; + return { + ts: nowIso(), + runId, + source, + kind: "heartbeat", + message: `Still running ${focus}.`, + payload: { + activeItemKind, + activeCommandSummary, + }, + activeItemKind, + activeCommandSummary, + }; +} + +export function formatProgressEventHuman(event: ProgressEvent): string { + return `${event.ts} [${event.source}] [${event.kind}] ${event.message}`; +} + +export class ConsoleProgressReporter { + constructor(private readonly mode: ProgressMode) {} + + emit(event: ProgressEvent): void { + if (this.mode === "quiet") { + return; + } + + if (this.mode === "jsonl") { + console.log(JSON.stringify(event)); + return; + } + + console.log(formatProgressEventHuman(event)); + } +} diff --git a/src/prompts.ts b/src/prompts.ts index e9ee383..9f88c8b 100644 --- a/src/prompts.ts +++ b/src/prompts.ts @@ -15,6 +15,7 @@ function renderTask(task: TaskDraft | TaskRecord): string { return [ `Title: ${task.title}`, `Objective: ${task.objective}`, + `Runtime class: ${task.runtimeClass}`, `Acceptance:`, bulletList(task.acceptanceCriteria), `Verification:`, @@ -50,13 +51,26 @@ function renderRecentAttempts(snapshot: RunSnapshot): string { return snapshot.recentAttempts .map((attempt) => { - const payload = JSON.parse(attempt.resultJson) as { changes?: string[]; followUps?: string[]; blockers?: string[] }; + const payload = JSON.parse(attempt.resultJson) as { + changes?: string[]; + followUps?: string[]; + blockers?: string[]; + requiredInputs?: string[]; + fallbackTasks?: TaskDraft[]; + verification?: Array<{ command: string; outcome: string; category?: string; details?: string }>; + }; + const verification = (payload.verification ?? []) + .map((item) => `${item.outcome} ${item.category ?? "other"}: ${item.command}`) + .join("; "); return [ `- Task ${attempt.taskId} attempt ${attempt.attemptNumber}: ${attempt.status}`, ` Summary: ${attempt.summary}`, ` Changes: ${(payload.changes ?? []).join("; ") || "none"}`, + ` Verification: ${verification || "none"}`, ` Follow-ups: ${(payload.followUps ?? []).join("; ") || "none"}`, ` Blockers: ${(payload.blockers ?? []).join("; ") || "none"}`, + ` Required inputs: ${(payload.requiredInputs ?? []).join("; ") || "none"}`, + ` Fallback tasks: ${(payload.fallbackTasks ?? []).map((task) => task.title).join("; ") || "none"}`, ].join("\n"); }) .join("\n"); @@ -127,7 +141,11 @@ Decision policy: - Use \`decision: "done"\` only when no substantial work remains. - Use \`decision: "blocked"\` only for external blockers or unresolved contradictions. - Always include \`blockedReason\`. Use \`null\` unless \`decision\` is \`"blocked"\`. +- Always include \`blockerClass\`. Use \`"external_resource"\` or \`"human_input"\` when the run should wait for input; otherwise use \`null\` unless blocked. +- Always include \`requiredInputs\`. Use an empty array when nothing external is needed. +- Always include \`fallbackTasks\`. Use an empty array when there is no alternate backlog. - Keep the backlog concise. Prefer a few substantive tasks over many trivial tasks. +- Every task must include \`runtimeClass\`. Use \`"long"\` for experiments, proofs, or long-running validations; otherwise use \`"short"\`. `.trim(); } @@ -141,6 +159,8 @@ Role rules: - Stay within the task's allowed paths unless the task itself truly requires broader changes, and call that out if it happens. - Run relevant verification when possible. - If you cannot complete the task, return \`needs_replan\` or \`blocked\` with concrete blockers. +- When the blocker is missing external infrastructure or missing user-provided material, set \`blockerClass\` to \`"external_resource"\` or \`"human_input"\` and list the exact \`requiredInputs\`. +- Always fill \`fallbackTasks\` with an empty array when you have no alternate backlog suggestion. Return JSON that matches the provided schema exactly. @@ -174,6 +194,7 @@ Role rules: - Re-evaluate the original goal against the repository and the execution history. - You may inspect the repo, but you still must not edit files. - Be strict. If meaningful work remains, do not send the run to the checker. +- Prefer concrete evidence from verification results and saved artifacts over implementation summaries. Return JSON that matches the provided schema exactly. @@ -207,6 +228,7 @@ Role rules: - You must not edit files. - Evaluate whether the original user goal is actually complete. - Reject completion when there are still meaningful tasks, risks, or missing evidence. +- Prefer verification evidence and saved artifacts over narrative summaries. Return JSON that matches the provided schema exactly. diff --git a/src/schema-catalog.ts b/src/schema-catalog.ts index ef29a8e..c598d67 100644 --- a/src/schema-catalog.ts +++ b/src/schema-catalog.ts @@ -1,7 +1,7 @@ export const taskDraftSchema = { type: "object", additionalProperties: false, - required: ["title", "objective", "acceptanceCriteria", "verificationSteps", "allowedPaths"], + required: ["title", "objective", "acceptanceCriteria", "verificationSteps", "allowedPaths", "runtimeClass"], properties: { title: { type: "string", minLength: 1 }, objective: { type: "string", minLength: 1 }, @@ -17,13 +17,28 @@ export const taskDraftSchema = { type: "array", items: { type: "string", minLength: 1 }, }, + runtimeClass: { + type: "string", + enum: ["short", "long"], + }, }, } as const; export const strategyPlanSchema = { type: "object", additionalProperties: false, - required: ["decision", "summary", "rationale", "goalProgress", "risks", "tasks", "blockedReason"], + required: [ + "decision", + "summary", + "rationale", + "goalProgress", + "risks", + "tasks", + "blockedReason", + "blockerClass", + "requiredInputs", + "fallbackTasks", + ], properties: { decision: { type: "string", @@ -43,6 +58,18 @@ export const strategyPlanSchema = { blockedReason: { type: ["string", "null"], }, + blockerClass: { + type: ["string", "null"], + enum: ["external_resource", "human_input", "tool_failure", "contradiction", "unknown", null], + }, + requiredInputs: { + type: "array", + items: { type: "string", minLength: 1 }, + }, + fallbackTasks: { + type: "array", + items: taskDraftSchema, + }, }, } as const; @@ -77,6 +104,9 @@ export const implementationResultSchema = { "followUps", "touchedFiles", "blockers", + "blockerClass", + "requiredInputs", + "fallbackTasks", ], properties: { taskId: { type: "string", minLength: 1 }, @@ -94,7 +124,7 @@ export const implementationResultSchema = { items: { type: "object", additionalProperties: false, - required: ["command", "outcome", "details"], + required: ["command", "outcome", "details", "category", "artifacts", "metrics"], properties: { command: { type: "string", minLength: 1 }, outcome: { @@ -102,6 +132,20 @@ export const implementationResultSchema = { enum: ["passed", "failed", "not_run"], }, details: { type: "string", minLength: 1 }, + category: { + type: "string", + enum: ["test", "benchmark", "proof", "inspection", "other"], + }, + artifacts: { + type: "array", + items: { type: "string", minLength: 1 }, + }, + metrics: { + type: "object", + additionalProperties: { + type: ["string", "number", "boolean", "null"], + }, + }, }, }, }, @@ -117,6 +161,18 @@ export const implementationResultSchema = { type: "array", items: { type: "string", minLength: 1 }, }, + blockerClass: { + type: ["string", "null"], + enum: ["external_resource", "human_input", "tool_failure", "contradiction", "unknown", null], + }, + requiredInputs: { + type: "array", + items: { type: "string", minLength: 1 }, + }, + fallbackTasks: { + type: "array", + items: taskDraftSchema, + }, }, } as const; diff --git a/src/status-view.ts b/src/status-view.ts new file mode 100644 index 0000000..6ccb5bc --- /dev/null +++ b/src/status-view.ts @@ -0,0 +1,28 @@ +import type { RunSnapshot } from "./types.js"; + +export function formatRunStatusLines(snapshot: RunSnapshot): string[] { + const lastEvent = snapshot.recentEvents.at(-1); + const waitingInputs = snapshot.run.waitingInputs.length + ? snapshot.run.waitingInputs.map((value) => `waiting input: ${value}`) + : []; + + return [ + `run: ${snapshot.run.id}`, + `status: ${snapshot.run.status}`, + `goal: ${snapshot.run.goal}`, + `summary: ${snapshot.run.summary ?? "none"}`, + `cycles: ${snapshot.run.cycleCount}`, + `replans: ${snapshot.run.replanCount}`, + `current task: ${snapshot.run.currentTaskId ?? "none"}`, + `active role: ${snapshot.run.activeRole ?? "none"}`, + `active item: ${snapshot.run.activeItemKind ?? "none"}`, + `active command: ${snapshot.run.activeCommandSummary ?? "none"}`, + `last event at: ${snapshot.run.lastEventAt ?? "none"}`, + `last event: ${lastEvent ? `${lastEvent.ts} ${lastEvent.source}.${lastEvent.kind} ${lastEvent.message}` : "none"}`, + `pending: ${snapshot.pendingTasks.length}`, + `in progress: ${snapshot.inProgressTasks.length}`, + `completed: ${snapshot.completedTasks.length}`, + `blocked/abandoned: ${snapshot.blockedTasks.length}`, + ...waitingInputs, + ]; +} diff --git a/src/store.ts b/src/store.ts index d169406..6941636 100644 --- a/src/store.ts +++ b/src/store.ts @@ -15,6 +15,7 @@ import type { RunRecord, RunSnapshot, RunStatus, + TaskRuntimeClass, TaskAttemptRecord, TaskDraft, TaskRecord, @@ -36,6 +37,10 @@ function toJson(value: unknown): string { return JSON.stringify(value); } +function stringOrNull(value: unknown): string | null { + return value === null || value === undefined || value === "" ? null : String(value); +} + function mapTaskRow(row: Record): TaskRecord { return { id: String(row.id), @@ -45,6 +50,7 @@ function mapTaskRow(row: Record): TaskRecord { acceptanceCriteria: parseJson(String(row.acceptance_criteria_json)), verificationSteps: parseJson(String(row.verification_steps_json)), allowedPaths: parseJson(String(row.allowed_paths_json)), + runtimeClass: (row.runtime_class ? String(row.runtime_class) : "short") as TaskRuntimeClass, status: row.status as TaskStatus, attemptCount: Number(row.attempt_count), implementationSummary: row.implementation_summary ? String(row.implementation_summary) : null, @@ -78,6 +84,11 @@ function mapRunRow(row: Record): RunRecord { cycleCount: Number(row.cycle_count), replanCount: Number(row.replan_count), currentTaskId: row.current_task_id ? String(row.current_task_id) : null, + lastEventAt: stringOrNull(row.last_event_at), + activeRole: (stringOrNull(row.active_role) as AgentRole | null) ?? null, + activeItemKind: stringOrNull(row.active_item_kind), + activeCommandSummary: stringOrNull(row.active_command_summary), + waitingInputs: parseJson(String(row.waiting_inputs_json ?? "[]")), createdAt: String(row.created_at), updatedAt: String(row.updated_at), }; @@ -113,6 +124,11 @@ export class RunStore { cycle_count INTEGER NOT NULL DEFAULT 0, replan_count INTEGER NOT NULL DEFAULT 0, current_task_id TEXT, + last_event_at TEXT, + active_role TEXT, + active_item_kind TEXT, + active_command_summary TEXT, + waiting_inputs_json TEXT NOT NULL DEFAULT '[]', created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); @@ -137,6 +153,7 @@ export class RunStore { acceptance_criteria_json TEXT NOT NULL, verification_steps_json TEXT NOT NULL, allowed_paths_json TEXT NOT NULL, + runtime_class TEXT NOT NULL DEFAULT 'short', status TEXT NOT NULL, attempt_count INTEGER NOT NULL DEFAULT 0, implementation_summary TEXT, @@ -195,6 +212,20 @@ export class RunStore { created_at TEXT NOT NULL ); `); + this.ensureColumn("runs", "last_event_at", "TEXT"); + this.ensureColumn("runs", "active_role", "TEXT"); + this.ensureColumn("runs", "active_item_kind", "TEXT"); + this.ensureColumn("runs", "active_command_summary", "TEXT"); + this.ensureColumn("runs", "waiting_inputs_json", "TEXT NOT NULL DEFAULT '[]'"); + this.ensureColumn("tasks", "runtime_class", "TEXT NOT NULL DEFAULT 'short'"); + } + + private ensureColumn(table: string, columnName: string, columnDefinition: string): void { + const rows = this.db.prepare(`PRAGMA table_info(${table})`).all() as Array<{ name: string }>; + if (rows.some((row) => row.name === columnName)) { + return; + } + this.db.exec(`ALTER TABLE ${table} ADD COLUMN ${columnName} ${columnDefinition};`); } createRun(goal: string, repoPath: string): RunRecord { @@ -202,10 +233,13 @@ export class RunStore { const id = randomUUID(); this.db .prepare(` - INSERT INTO runs (id, goal, repo_path, status, summary, cycle_count, replan_count, current_task_id, created_at, updated_at) - VALUES (?, ?, ?, 'planning', NULL, 0, 0, NULL, ?, ?) + INSERT INTO runs ( + id, goal, repo_path, status, summary, cycle_count, replan_count, current_task_id, + last_event_at, active_role, active_item_kind, active_command_summary, waiting_inputs_json, created_at, updated_at + ) + VALUES (?, ?, ?, 'planning', NULL, 0, 0, NULL, ?, NULL, NULL, NULL, '[]', ?, ?) `) - .run(id, goal, repoPath, timestamp, timestamp); + .run(id, goal, repoPath, timestamp, timestamp, timestamp); this.addEvent(id, "system", "run_created", "Run created.", { goal, repoPath }); return this.getRun(id); @@ -224,22 +258,63 @@ export class RunStore { return rows.map(mapRunRow); } - updateRun(runId: string, updates: Partial>): RunRecord { + updateRun( + runId: string, + updates: Partial< + Pick< + RunRecord, + | "status" + | "summary" + | "currentTaskId" + | "lastEventAt" + | "activeRole" + | "activeItemKind" + | "activeCommandSummary" + | "waitingInputs" + > + >, + ): RunRecord { const current = this.getRun(runId); const next: RunRecord = { ...current, status: updates.status ?? current.status, summary: updates.summary ?? current.summary, currentTaskId: updates.currentTaskId === undefined ? current.currentTaskId : updates.currentTaskId, + lastEventAt: updates.lastEventAt === undefined ? current.lastEventAt : updates.lastEventAt, + activeRole: updates.activeRole === undefined ? current.activeRole : updates.activeRole, + activeItemKind: updates.activeItemKind === undefined ? current.activeItemKind : updates.activeItemKind, + activeCommandSummary: + updates.activeCommandSummary === undefined ? current.activeCommandSummary : updates.activeCommandSummary, + waitingInputs: updates.waitingInputs === undefined ? current.waitingInputs : updates.waitingInputs, updatedAt: nowIso(), }; this.db .prepare(` UPDATE runs - SET status = ?, summary = ?, current_task_id = ?, updated_at = ? + SET + status = ?, + summary = ?, + current_task_id = ?, + last_event_at = ?, + active_role = ?, + active_item_kind = ?, + active_command_summary = ?, + waiting_inputs_json = ?, + updated_at = ? WHERE id = ? `) - .run(next.status, next.summary, next.currentTaskId, next.updatedAt, runId); + .run( + next.status, + next.summary, + next.currentTaskId, + next.lastEventAt, + next.activeRole, + next.activeItemKind, + next.activeCommandSummary, + toJson(next.waitingInputs), + next.updatedAt, + runId, + ); return this.getRun(runId); } @@ -310,10 +385,10 @@ export class RunStore { .run(now, runId); const insertStmt = this.db.prepare(` - INSERT INTO tasks ( - id, run_id, title, objective, acceptance_criteria_json, verification_steps_json, - allowed_paths_json, status, attempt_count, implementation_summary, blocker_signature, created_at, updated_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, 'pending', 0, NULL, NULL, ?, ?) + INSERT INTO tasks ( + id, run_id, title, objective, acceptance_criteria_json, verification_steps_json, + allowed_paths_json, runtime_class, status, attempt_count, implementation_summary, blocker_signature, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'pending', 0, NULL, NULL, ?, ?) `); const records: TaskRecord[] = []; @@ -327,6 +402,7 @@ export class RunStore { toJson(task.acceptanceCriteria), toJson(task.verificationSteps), toJson(task.allowedPaths), + task.runtimeClass, now, now, ); @@ -445,12 +521,14 @@ export class RunStore { } addEvent(runId: string, source: string, kind: string, message: string, payload: unknown): void { + const timestamp = nowIso(); this.db .prepare(` INSERT INTO events (run_id, ts, source, kind, message, payload_json) VALUES (?, ?, ?, ?, ?, ?) `) - .run(runId, nowIso(), source, kind, message, toJson(payload)); + .run(runId, timestamp, source, kind, message, toJson(payload)); + this.db.prepare("UPDATE runs SET last_event_at = ?, updated_at = ? WHERE id = ?").run(timestamp, timestamp, runId); } listEvents(runId: string, limit = 50): EventRecord[] { diff --git a/src/types.ts b/src/types.ts index 98cb65a..21d85ad 100644 --- a/src/types.ts +++ b/src/types.ts @@ -2,12 +2,17 @@ export const agentRoles = ["strategy", "implementation", "checker"] as const; export type AgentRole = (typeof agentRoles)[number]; export type SandboxMode = "read-only" | "workspace-write" | "danger-full-access"; +export type TaskRuntimeClass = "short" | "long"; +export type BlockerClass = "external_resource" | "human_input" | "tool_failure" | "contradiction" | "unknown"; +export type VerificationCategory = "test" | "benchmark" | "proof" | "inspection" | "other"; +export type ProgressMode = "human" | "jsonl" | "quiet"; export type RunStatus = | "planning" | "implementing" | "self_check" | "independent_check" | "blocked" + | "waiting_input" | "done"; export type TaskStatus = "pending" | "in_progress" | "completed" | "blocked" | "abandoned"; export type AttemptStatus = "completed" | "needs_replan" | "blocked"; @@ -45,6 +50,11 @@ export interface RunRecord { cycleCount: number; replanCount: number; currentTaskId: string | null; + lastEventAt: string | null; + activeRole: AgentRole | null; + activeItemKind: string | null; + activeCommandSummary: string | null; + waitingInputs: string[]; createdAt: string; updatedAt: string; } @@ -66,6 +76,7 @@ export interface TaskDraft { acceptanceCriteria: string[]; verificationSteps: string[]; allowedPaths: string[]; + runtimeClass: TaskRuntimeClass; } export interface TaskRecord extends TaskDraft { @@ -133,6 +144,9 @@ export interface VerificationResult { command: string; outcome: "passed" | "failed" | "not_run"; details: string; + category: VerificationCategory; + artifacts: string[]; + metrics: Record; } export interface StrategyPlanOutput { @@ -143,6 +157,9 @@ export interface StrategyPlanOutput { risks: string[]; tasks: TaskDraft[]; blockedReason: string | null; + blockerClass: BlockerClass | null; + requiredInputs: string[]; + fallbackTasks: TaskDraft[]; } export interface StrategySelfCheckOutput { @@ -162,6 +179,9 @@ export interface ImplementationResultOutput { followUps: string[]; touchedFiles: string[]; blockers: string[]; + blockerClass: BlockerClass | null; + requiredInputs: string[]; + fallbackTasks: TaskDraft[]; } export interface CheckVerdictOutput { @@ -205,6 +225,7 @@ export interface AgentInvocation { roleConfig: RoleConfig; artifacts: InvocationArtifacts; maxValidationRetries?: number; + onProgress?: (event: ProgressEvent) => void | Promise; } export interface AgentInvocationResult { @@ -228,3 +249,16 @@ export interface RunArtifactsIndex { stderrPath: string; lastMessagePath: string; } + +export interface ProgressEvent { + ts: string; + runId: string; + source: AgentRole | "system"; + kind: string; + message: string; + payload: unknown; + activeItemKind?: string | null; + activeCommandSummary?: string | null; +} + +export type ProgressSink = (event: ProgressEvent) => void; diff --git a/test/orchestrator.test.ts b/test/orchestrator.test.ts index b23f51b..394c89d 100644 --- a/test/orchestrator.test.ts +++ b/test/orchestrator.test.ts @@ -7,10 +7,11 @@ import { loadConfig } from "../src/config.js"; import { AgentROrchestrator } from "../src/orchestrator.js"; import { strategyPlanSchema } from "../src/schema-catalog.js"; import { RunStore } from "../src/store.js"; -import type { AgentInvocation, AgentInvocationResult, RawAgentRunner } from "../src/types.js"; +import type { AgentInvocation, AgentInvocationResult, ProgressEvent, RawAgentRunner } from "../src/types.js"; class ScriptedRunner implements RawAgentRunner { public readonly prompts: string[] = []; + public readonly progressEvents: ProgressEvent[] = []; constructor( private readonly outputs: string[], @@ -24,6 +25,18 @@ class ScriptedRunner implements RawAgentRunner { this.prompts.push(request.prompt); readFileSync(request.artifacts.schemaPath, "utf8"); + if (request.onProgress) { + const progressEvent: ProgressEvent = { + ts: new Date().toISOString(), + runId: request.runId, + source: request.role, + kind: "turn.started", + message: "Started scripted turn.", + payload: {}, + }; + this.progressEvents.push(progressEvent); + await request.onProgress(progressEvent); + } const parsed = JSON.parse(next) as Record; if (typeof parsed.taskId === "string" && !parsed.taskId) { const match = request.prompt.match(/exact task id `([^`]+)`/); @@ -62,19 +75,35 @@ describe("AgentROrchestrator", () => { acceptanceCriteria: ["A run command exists."], verificationSteps: ["Run the help command."], allowedPaths: ["src", "package.json"], + runtimeClass: "short", }, ], blockedReason: null, + blockerClass: null, + requiredInputs: [], + fallbackTasks: [], }), JSON.stringify({ taskId: "", status: "completed", summary: "Added the CLI skeleton.", changes: ["Created an entrypoint."], - verification: [{ command: "agent-r --help", outcome: "passed", details: "Help output rendered." }], + verification: [ + { + command: "agent-r --help", + outcome: "passed", + details: "Help output rendered.", + category: "test", + artifacts: [], + metrics: {}, + }, + ], followUps: [], touchedFiles: ["src/index.ts", "package.json"], blockers: [], + blockerClass: null, + requiredInputs: [], + fallbackTasks: [], }), JSON.stringify({ decision: "done", @@ -84,6 +113,9 @@ describe("AgentROrchestrator", () => { risks: [], tasks: [], blockedReason: null, + blockerClass: null, + requiredInputs: [], + fallbackTasks: [], }), JSON.stringify({ readyForIndependentCheck: true, @@ -112,6 +144,7 @@ describe("AgentROrchestrator", () => { expect(snapshot.approvals).toHaveLength(2); expect(snapshot.pendingTasks).toHaveLength(0); expect(runner.prompts.at(1)).toContain("You must echo the exact task id"); + expect(snapshot.run.waitingInputs).toEqual([]); }); test("strategy prompt explicitly permits read-only repository inspection", async () => { @@ -128,6 +161,9 @@ describe("AgentROrchestrator", () => { risks: [], tasks: [], blockedReason: "Stop immediately.", + blockerClass: "contradiction", + requiredInputs: [], + fallbackTasks: [], }), ]); @@ -149,11 +185,76 @@ describe("AgentROrchestrator", () => { risks: [], tasks: [], blockedReason: null, + blockerClass: null, + requiredInputs: [], + fallbackTasks: [], }), ) as Record; expect(strategyPlanSchema.required).toContain("blockedReason"); + expect(strategyPlanSchema.required).toContain("blockerClass"); expect(strategyPlanSchema.properties.blockedReason.type).toEqual(["string", "null"]); expect(valid.blockedReason).toBeNull(); }); + + test("moves to waiting_input when implementation reports missing external input", async () => { + const repoPath = mkdtempSync(path.join(tmpdir(), "agent-r-waiting-input-")); + const config = loadConfig(repoPath); + const store = new RunStore(config.databasePath); + const artifacts = new ArtifactManager(config.runsDir); + const runner = new ScriptedRunner([ + JSON.stringify({ + decision: "continue", + summary: "Need a hardware proof task.", + rationale: "Work remains.", + goalProgress: "Partial.", + risks: [], + tasks: [ + { + title: "Run hardware proof", + objective: "Execute the physical node proof.", + acceptanceCriteria: ["Proof succeeds."], + verificationSteps: ["Run the hardware script."], + allowedPaths: ["scripts/**"], + runtimeClass: "long", + }, + ], + blockedReason: null, + blockerClass: null, + requiredInputs: [], + fallbackTasks: [], + }), + JSON.stringify({ + taskId: "", + status: "blocked", + summary: "Waiting for real hardware.", + changes: [], + verification: [ + { + command: "scripts/hardware-proof.sh", + outcome: "not_run", + details: "No accessible physical node was available.", + category: "proof", + artifacts: [], + metrics: {}, + }, + ], + followUps: [], + touchedFiles: [], + blockers: ["No accessible physical node."], + blockerClass: "external_resource", + requiredInputs: ["Reachable physical node with ISO boot path"], + fallbackTasks: [], + }), + ]); + + const orchestrator = new AgentROrchestrator(config, store, artifacts, runner); + const run = orchestrator.createRun("Prove the hardware path"); + const finalRun = await orchestrator.runUntilStable(run.id, 5); + const snapshot = store.buildSnapshot(run.id); + + expect(finalRun.status).toBe("waiting_input"); + expect(snapshot.run.waitingInputs).toEqual(["Reachable physical node with ISO boot path"]); + expect(snapshot.blockedTasks).toHaveLength(1); + }); }); diff --git a/test/progress.test.ts b/test/progress.test.ts new file mode 100644 index 0000000..eb027a8 --- /dev/null +++ b/test/progress.test.ts @@ -0,0 +1,37 @@ +import { describe, expect, test } from "vitest"; +import { formatProgressEventHuman, progressEventsFromThreadEvent } from "../src/progress.js"; + +describe("progress helpers", () => { + test("summarizes command and todo events for stdout", () => { + const todoEvents = progressEventsFromThreadEvent("run-1", "implementation", { + type: "item.started", + item: { + id: "todo-1", + type: "todo_list", + items: [ + { text: "Inspect repository state", completed: true }, + { text: "Run proof command", completed: false }, + ], + }, + }); + + const commandEvents = progressEventsFromThreadEvent("run-1", "implementation", { + type: "item.completed", + item: { + id: "cmd-1", + type: "command_execution", + command: "/bin/bash -lc 'npm test'", + aggregated_output: "ok 5 tests\n", + exit_code: 0, + status: "completed", + }, + }); + + expect(todoEvents).toHaveLength(1); + expect(todoEvents[0]?.kind).toBe("todo_list.updated"); + expect(todoEvents[0]?.message).toContain("1/2 complete"); + expect(commandEvents[0]?.kind).toBe("command.completed"); + expect(commandEvents[0]?.message).toContain("exit=0"); + expect(formatProgressEventHuman(commandEvents[0]!)).toContain("[implementation] [command.completed]"); + }); +}); diff --git a/test/store.test.ts b/test/store.test.ts index 8ab2683..f2c650e 100644 --- a/test/store.test.ts +++ b/test/store.test.ts @@ -17,6 +17,7 @@ describe("RunStore", () => { acceptanceCriteria: ["It works"], verificationSteps: ["Run tests"], allowedPaths: ["src"], + runtimeClass: "short", }, { title: "Task 2", @@ -24,6 +25,7 @@ describe("RunStore", () => { acceptanceCriteria: ["It still works"], verificationSteps: ["Run tests again"], allowedPaths: ["tests"], + runtimeClass: "long", }, ]); @@ -36,5 +38,7 @@ describe("RunStore", () => { expect(snapshot.pendingTasks).toHaveLength(1); expect(snapshot.completedTasks).toHaveLength(1); expect(snapshot.recentEvents.at(-1)?.kind).toBe("custom"); + expect(snapshot.pendingTasks[0]?.runtimeClass).toBe("long"); + expect(snapshot.run.waitingInputs).toEqual([]); }); });