agent-r/src/store.ts

647 lines
19 KiB
TypeScript

import { randomUUID } from "node:crypto";
import { DatabaseSync } from "node:sqlite";
import path from "node:path";
import { ensureDir } from "./fs-utils.js";
import type {
AgentRole,
AgentStateRecord,
ApprovalRecord,
ApprovalSource,
ApprovalVerdict,
ArtifactRecord,
AttemptStatus,
CheckpointRecord,
EventRecord,
RunRecord,
RunSnapshot,
RunStatus,
TaskAttemptRecord,
TaskDraft,
TaskRecord,
TaskStatus,
} from "./types.js";
function nowIso(): string {
return new Date().toISOString();
}
function parseJson<T>(value: string | null): T {
if (!value) {
return [] as T;
}
return JSON.parse(value) as T;
}
function toJson(value: unknown): string {
return JSON.stringify(value);
}
function mapTaskRow(row: Record<string, unknown>): TaskRecord {
return {
id: String(row.id),
runId: String(row.run_id),
title: String(row.title),
objective: String(row.objective),
acceptanceCriteria: parseJson<string[]>(String(row.acceptance_criteria_json)),
verificationSteps: parseJson<string[]>(String(row.verification_steps_json)),
allowedPaths: parseJson<string[]>(String(row.allowed_paths_json)),
status: row.status as TaskStatus,
attemptCount: Number(row.attempt_count),
implementationSummary: row.implementation_summary ? String(row.implementation_summary) : null,
blockerSignature: row.blocker_signature ? String(row.blocker_signature) : null,
createdAt: String(row.created_at),
updatedAt: String(row.updated_at),
};
}
function mapAttemptRow(row: Record<string, unknown>): TaskAttemptRecord {
return {
id: String(row.id),
runId: String(row.run_id),
taskId: String(row.task_id),
attemptNumber: Number(row.attempt_number),
status: row.status as AttemptStatus,
summary: String(row.summary),
blockerSignature: row.blocker_signature ? String(row.blocker_signature) : null,
resultJson: String(row.result_json),
createdAt: String(row.created_at),
};
}
function mapRunRow(row: Record<string, unknown>): RunRecord {
return {
id: String(row.id),
goal: String(row.goal),
repoPath: String(row.repo_path),
status: row.status as RunStatus,
summary: row.summary ? String(row.summary) : null,
cycleCount: Number(row.cycle_count),
replanCount: Number(row.replan_count),
currentTaskId: row.current_task_id ? String(row.current_task_id) : null,
createdAt: String(row.created_at),
updatedAt: String(row.updated_at),
};
}
function mapAgentRow(row: Record<string, unknown>): AgentStateRecord {
return {
runId: String(row.run_id),
role: row.role as AgentRole,
sessionId: row.session_id ? String(row.session_id) : null,
turns: Number(row.turns),
rotationCount: Number(row.rotation_count),
lastPromptPath: row.last_prompt_path ? String(row.last_prompt_path) : null,
lastResponsePath: row.last_response_path ? String(row.last_response_path) : null,
updatedAt: String(row.updated_at),
};
}
export class RunStore {
private readonly db: DatabaseSync;
constructor(dbPath: string) {
ensureDir(path.dirname(dbPath));
this.db = new DatabaseSync(dbPath);
this.db.exec("PRAGMA journal_mode = WAL;");
this.db.exec(`
CREATE TABLE IF NOT EXISTS runs (
id TEXT PRIMARY KEY,
goal TEXT NOT NULL,
repo_path TEXT NOT NULL,
status TEXT NOT NULL,
summary TEXT,
cycle_count INTEGER NOT NULL DEFAULT 0,
replan_count INTEGER NOT NULL DEFAULT 0,
current_task_id TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS agents (
run_id TEXT NOT NULL,
role TEXT NOT NULL,
session_id TEXT,
turns INTEGER NOT NULL DEFAULT 0,
rotation_count INTEGER NOT NULL DEFAULT 0,
last_prompt_path TEXT,
last_response_path TEXT,
updated_at TEXT NOT NULL,
PRIMARY KEY (run_id, role)
);
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
run_id TEXT NOT NULL,
title TEXT NOT NULL,
objective TEXT NOT NULL,
acceptance_criteria_json TEXT NOT NULL,
verification_steps_json TEXT NOT NULL,
allowed_paths_json TEXT NOT NULL,
status TEXT NOT NULL,
attempt_count INTEGER NOT NULL DEFAULT 0,
implementation_summary TEXT,
blocker_signature TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS task_attempts (
id TEXT PRIMARY KEY,
run_id TEXT NOT NULL,
task_id TEXT NOT NULL,
attempt_number INTEGER NOT NULL,
status TEXT NOT NULL,
summary TEXT NOT NULL,
blocker_signature TEXT,
result_json TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL,
ts TEXT NOT NULL,
source TEXT NOT NULL,
kind TEXT NOT NULL,
message TEXT NOT NULL,
payload_json TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS artifacts (
id TEXT PRIMARY KEY,
run_id TEXT NOT NULL,
role TEXT,
kind TEXT NOT NULL,
path TEXT NOT NULL,
created_at TEXT NOT NULL,
metadata_json TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS checkpoints (
id TEXT PRIMARY KEY,
run_id TEXT NOT NULL,
status TEXT NOT NULL,
payload_json TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS approvals (
id TEXT PRIMARY KEY,
run_id TEXT NOT NULL,
source TEXT NOT NULL,
verdict TEXT NOT NULL,
rationale TEXT NOT NULL,
payload_json TEXT NOT NULL,
created_at TEXT NOT NULL
);
`);
}
createRun(goal: string, repoPath: string): RunRecord {
const timestamp = nowIso();
const id = randomUUID();
this.db
.prepare(`
INSERT INTO runs (id, goal, repo_path, status, summary, cycle_count, replan_count, current_task_id, created_at, updated_at)
VALUES (?, ?, ?, 'planning', NULL, 0, 0, NULL, ?, ?)
`)
.run(id, goal, repoPath, timestamp, timestamp);
this.addEvent(id, "system", "run_created", "Run created.", { goal, repoPath });
return this.getRun(id);
}
getRun(runId: string): RunRecord {
const row = this.db.prepare("SELECT * FROM runs WHERE id = ?").get(runId) as Record<string, unknown> | undefined;
if (!row) {
throw new Error(`Run not found: ${runId}`);
}
return mapRunRow(row);
}
listRuns(): RunRecord[] {
const rows = this.db.prepare("SELECT * FROM runs ORDER BY created_at DESC").all() as Record<string, unknown>[];
return rows.map(mapRunRow);
}
updateRun(runId: string, updates: Partial<Pick<RunRecord, "status" | "summary" | "currentTaskId">>): RunRecord {
const current = this.getRun(runId);
const next: RunRecord = {
...current,
status: updates.status ?? current.status,
summary: updates.summary ?? current.summary,
currentTaskId: updates.currentTaskId === undefined ? current.currentTaskId : updates.currentTaskId,
updatedAt: nowIso(),
};
this.db
.prepare(`
UPDATE runs
SET status = ?, summary = ?, current_task_id = ?, updated_at = ?
WHERE id = ?
`)
.run(next.status, next.summary, next.currentTaskId, next.updatedAt, runId);
return this.getRun(runId);
}
incrementCycle(runId: string): void {
this.db.prepare("UPDATE runs SET cycle_count = cycle_count + 1, updated_at = ? WHERE id = ?").run(nowIso(), runId);
}
incrementReplanCount(runId: string): void {
this.db.prepare("UPDATE runs SET replan_count = replan_count + 1, updated_at = ? WHERE id = ?").run(nowIso(), runId);
}
getAgentState(runId: string, role: AgentRole): AgentStateRecord | null {
const row = this.db.prepare("SELECT * FROM agents WHERE run_id = ? AND role = ?").get(runId, role) as
| Record<string, unknown>
| undefined;
return row ? mapAgentRow(row) : null;
}
saveAgentState(record: AgentStateRecord): void {
this.db
.prepare(`
INSERT INTO agents (run_id, role, session_id, turns, rotation_count, last_prompt_path, last_response_path, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(run_id, role) DO UPDATE SET
session_id = excluded.session_id,
turns = excluded.turns,
rotation_count = excluded.rotation_count,
last_prompt_path = excluded.last_prompt_path,
last_response_path = excluded.last_response_path,
updated_at = excluded.updated_at
`)
.run(
record.runId,
record.role,
record.sessionId,
record.turns,
record.rotationCount,
record.lastPromptPath,
record.lastResponsePath,
record.updatedAt,
);
}
requeueInProgressTasks(runId: string): number {
const timestamp = nowIso();
const result = this.db
.prepare(`
UPDATE tasks
SET status = 'pending', updated_at = ?
WHERE run_id = ? AND status = 'in_progress'
`)
.run(timestamp, runId);
const changes = Number(result.changes ?? 0);
if (changes > 0) {
this.addEvent(runId, "system", "requeue_in_progress", "Requeued in-progress tasks on resume.", { changes });
}
return changes;
}
replacePendingTasks(runId: string, tasks: TaskDraft[]): TaskRecord[] {
const now = nowIso();
this.db
.prepare(`
UPDATE tasks
SET status = 'abandoned', updated_at = ?
WHERE run_id = ? AND status IN ('pending', 'in_progress')
`)
.run(now, runId);
const insertStmt = this.db.prepare(`
INSERT INTO tasks (
id, run_id, title, objective, acceptance_criteria_json, verification_steps_json,
allowed_paths_json, status, attempt_count, implementation_summary, blocker_signature, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, 'pending', 0, NULL, NULL, ?, ?)
`);
const records: TaskRecord[] = [];
for (const task of tasks) {
const id = randomUUID();
insertStmt.run(
id,
runId,
task.title,
task.objective,
toJson(task.acceptanceCriteria),
toJson(task.verificationSteps),
toJson(task.allowedPaths),
now,
now,
);
records.push(this.getTask(id));
}
return records;
}
listTasks(runId: string, statuses?: TaskStatus[]): TaskRecord[] {
if (!statuses?.length) {
const rows = this.db
.prepare("SELECT * FROM tasks WHERE run_id = ? ORDER BY created_at ASC")
.all(runId) as Record<string, unknown>[];
return rows.map(mapTaskRow);
}
const placeholders = statuses.map(() => "?").join(", ");
const rows = this.db
.prepare(`SELECT * FROM tasks WHERE run_id = ? AND status IN (${placeholders}) ORDER BY created_at ASC`)
.all(runId, ...statuses) as Record<string, unknown>[];
return rows.map(mapTaskRow);
}
getTask(taskId: string): TaskRecord {
const row = this.db.prepare("SELECT * FROM tasks WHERE id = ?").get(taskId) as Record<string, unknown> | undefined;
if (!row) {
throw new Error(`Task not found: ${taskId}`);
}
return mapTaskRow(row);
}
claimNextPendingTask(runId: string): TaskRecord | null {
const row = this.db
.prepare(`
SELECT * FROM tasks
WHERE run_id = ? AND status = 'pending'
ORDER BY created_at ASC
LIMIT 1
`)
.get(runId) as Record<string, unknown> | undefined;
if (!row) {
return null;
}
const taskId = String(row.id);
this.db
.prepare(`
UPDATE tasks
SET status = 'in_progress', attempt_count = attempt_count + 1, updated_at = ?
WHERE id = ?
`)
.run(nowIso(), taskId);
return this.getTask(taskId);
}
completeTask(taskId: string, summary: string): TaskRecord {
this.db
.prepare(`
UPDATE tasks
SET status = 'completed', implementation_summary = ?, blocker_signature = NULL, updated_at = ?
WHERE id = ?
`)
.run(summary, nowIso(), taskId);
return this.getTask(taskId);
}
abandonTask(taskId: string, summary: string | null, blockerSignature: string | null): TaskRecord {
this.db
.prepare(`
UPDATE tasks
SET status = 'abandoned', implementation_summary = ?, blocker_signature = ?, updated_at = ?
WHERE id = ?
`)
.run(summary, blockerSignature, nowIso(), taskId);
return this.getTask(taskId);
}
blockTask(taskId: string, summary: string | null, blockerSignature: string | null): TaskRecord {
this.db
.prepare(`
UPDATE tasks
SET status = 'blocked', implementation_summary = ?, blocker_signature = ?, updated_at = ?
WHERE id = ?
`)
.run(summary, blockerSignature, nowIso(), taskId);
return this.getTask(taskId);
}
latestTaskAttempt(taskId: string): TaskAttemptRecord | null {
const row = this.db
.prepare("SELECT * FROM task_attempts WHERE task_id = ? ORDER BY created_at DESC LIMIT 1")
.get(taskId) as Record<string, unknown> | undefined;
return row ? mapAttemptRow(row) : null;
}
addTaskAttempt(
runId: string,
taskId: string,
attemptNumber: number,
status: AttemptStatus,
summary: string,
result: unknown,
blockerSignature: string | null,
): TaskAttemptRecord {
const id = randomUUID();
const createdAt = nowIso();
this.db
.prepare(`
INSERT INTO task_attempts (id, run_id, task_id, attempt_number, status, summary, blocker_signature, result_json, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`)
.run(id, runId, taskId, attemptNumber, status, summary, blockerSignature, toJson(result), createdAt);
return this.latestTaskAttempt(taskId) as TaskAttemptRecord;
}
addEvent(runId: string, source: string, kind: string, message: string, payload: unknown): void {
this.db
.prepare(`
INSERT INTO events (run_id, ts, source, kind, message, payload_json)
VALUES (?, ?, ?, ?, ?, ?)
`)
.run(runId, nowIso(), source, kind, message, toJson(payload));
}
listEvents(runId: string, limit = 50): EventRecord[] {
const rows = this.db
.prepare(`
SELECT * FROM events
WHERE run_id = ?
ORDER BY id DESC
LIMIT ?
`)
.all(runId, limit) as Record<string, unknown>[];
return rows
.map((row) => ({
id: Number(row.id),
runId: String(row.run_id),
ts: String(row.ts),
source: String(row.source),
kind: String(row.kind),
message: String(row.message),
payload: JSON.parse(String(row.payload_json)),
}))
.reverse();
}
addArtifact(runId: string, role: AgentRole | null, kind: string, filePath: string, metadata: unknown): ArtifactRecord {
const id = randomUUID();
const createdAt = nowIso();
this.db
.prepare(`
INSERT INTO artifacts (id, run_id, role, kind, path, created_at, metadata_json)
VALUES (?, ?, ?, ?, ?, ?, ?)
`)
.run(id, runId, role, kind, filePath, createdAt, toJson(metadata));
return {
id,
runId,
role,
kind,
path: filePath,
createdAt,
metadata,
};
}
listArtifacts(runId: string, role?: AgentRole): ArtifactRecord[] {
const rows = role
? (this.db
.prepare(`
SELECT * FROM artifacts
WHERE run_id = ? AND role = ?
ORDER BY created_at ASC
`)
.all(runId, role) as Record<string, unknown>[])
: (this.db
.prepare(`
SELECT * FROM artifacts
WHERE run_id = ?
ORDER BY created_at ASC
`)
.all(runId) as Record<string, unknown>[]);
return rows.map((row) => ({
id: String(row.id),
runId: String(row.run_id),
role: row.role ? (String(row.role) as AgentRole) : null,
kind: String(row.kind),
path: String(row.path),
createdAt: String(row.created_at),
metadata: JSON.parse(String(row.metadata_json)),
}));
}
addApproval(
runId: string,
source: ApprovalSource,
verdict: ApprovalVerdict,
rationale: string,
payload: unknown,
): ApprovalRecord {
const id = randomUUID();
const createdAt = nowIso();
this.db
.prepare(`
INSERT INTO approvals (id, run_id, source, verdict, rationale, payload_json, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
`)
.run(id, runId, source, verdict, rationale, toJson(payload), createdAt);
return {
id,
runId,
source,
verdict,
rationale,
payload,
createdAt,
};
}
listApprovals(runId: string): ApprovalRecord[] {
const rows = this.db
.prepare("SELECT * FROM approvals WHERE run_id = ? ORDER BY created_at ASC")
.all(runId) as Record<string, unknown>[];
return rows.map((row) => ({
id: String(row.id),
runId: String(row.run_id),
source: row.source as ApprovalSource,
verdict: row.verdict as ApprovalVerdict,
rationale: String(row.rationale),
payload: JSON.parse(String(row.payload_json)),
createdAt: String(row.created_at),
}));
}
latestApproval(runId: string, source: ApprovalSource): ApprovalRecord | null {
const row = this.db
.prepare(`
SELECT * FROM approvals
WHERE run_id = ? AND source = ?
ORDER BY created_at DESC
LIMIT 1
`)
.get(runId, source) as Record<string, unknown> | undefined;
if (!row) {
return null;
}
return {
id: String(row.id),
runId: String(row.run_id),
source: row.source as ApprovalSource,
verdict: row.verdict as ApprovalVerdict,
rationale: String(row.rationale),
payload: JSON.parse(String(row.payload_json)),
createdAt: String(row.created_at),
};
}
addCheckpoint(runId: string, status: RunStatus, payload: unknown): CheckpointRecord {
const id = randomUUID();
const createdAt = nowIso();
this.db
.prepare(`
INSERT INTO checkpoints (id, run_id, status, payload_json, created_at)
VALUES (?, ?, ?, ?, ?)
`)
.run(id, runId, status, toJson(payload), createdAt);
return {
id,
runId,
status,
payload,
createdAt,
};
}
buildSnapshot(runId: string): RunSnapshot {
const run = this.getRun(runId);
const agents = (["strategy", "implementation", "checker"] as const).reduce<RunSnapshot["agents"]>((acc, role) => {
const agent = this.getAgentState(runId, role);
if (agent) {
acc[role] = agent;
}
return acc;
}, {});
return {
run,
agents,
pendingTasks: this.listTasks(runId, ["pending"]),
inProgressTasks: this.listTasks(runId, ["in_progress"]),
completedTasks: this.listTasks(runId, ["completed"]),
blockedTasks: this.listTasks(runId, ["blocked", "abandoned"]),
recentAttempts: this.listRecentAttempts(runId),
recentEvents: this.listEvents(runId),
approvals: this.listApprovals(runId),
artifacts: this.listArtifacts(runId),
};
}
listRecentAttempts(runId: string, limit = 12): TaskAttemptRecord[] {
const rows = this.db
.prepare(`
SELECT * FROM task_attempts
WHERE run_id = ?
ORDER BY created_at DESC
LIMIT ?
`)
.all(runId, limit) as Record<string, unknown>[];
return rows.map(mapAttemptRow).reverse();
}
}