From 804753409dac10015cea6a8386b5c615f84c3da4 Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Sat, 23 May 2026 00:11:43 +0300 Subject: [PATCH] =?UTF-8?q?feat:=20durable-run=20supervisor=20=E2=80=94=20?= =?UTF-8?q?cross-worker=20/=20cross-DO=20durability?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit runReconnectableTurn (#23) recovered an interrupted turn only on a retry re-invocation, left an unattended window between worker death and that retry, depended on the sandbox runtime buffering events, and made the correctness-critical reconnect a per-product callback. It checkpointed the run handle as "a completed step at index 0" — an admitted migration-dodge. This relocates the durability boundary off the ephemeral worker onto an always-attached supervisor that owns the run. Substrate (platform-agnostic, tested in Node): - DurableRunStore gains an ordered, replayable stream-event log — appendStreamEvent / readStreamEvents, idempotent on eventId so a reconnecting adapter that re-yields a boundary event cannot double-log. RunHandle is real run-row state via setRunHandle, not a step hack. Schema v2 (durable_stream_events table + durable_runs.handle_json), implemented across the in-memory / file-system / D1 stores. - runSupervisedTurn — drains a run's events into the stream log as they flow, persists the reconnect pointer the instant the substrate yields it, heartbeats the lease. A fresh supervisor reads the log for its cursor and resumes via the adapter — fresh / resumed / replayed. - SandboxReconnectAdapter — one typed, conformance-tested contract. The dangerous reconnect glue lives once per substrate, never per product. Cloudflare host (thin): - SessionSupervisorDO — a Durable Object that hosts runSupervisedTurn; alarm() re-attaches a run a dropped response stream abandoned. CF types are structural (no @cloudflare/workers-types dep). runReconnectableTurn / run-handle.ts are removed — superseded; no product consumed them. RunHandle + the four-mode resolution carry forward. 15 new tests incl. the cross-worker chaos keystone (kill mid-stream, resume, no gap, no duplicate); suite 251 green; typecheck + biome + build clean. --- src/durable/d1-store.ts | 70 +++ src/durable/file-system-store.ts | 53 ++ src/durable/in-memory-store.ts | 41 +- src/durable/index.ts | 35 +- src/durable/run-handle.ts | 326 ----------- src/durable/schema.sql | 29 + src/durable/schema.ts | 31 +- src/durable/session-supervisor-do.ts | 178 ++++++ src/durable/supervisor.ts | 229 ++++++++ src/durable/tests/run-handle.test.ts | 506 ------------------ .../tests/session-supervisor-do.test.ts | 161 ++++++ src/durable/tests/supervisor.test.ts | 247 +++++++++ src/durable/types.ts | 63 +++ 13 files changed, 1122 insertions(+), 847 deletions(-) delete mode 100644 src/durable/run-handle.ts create mode 100644 src/durable/session-supervisor-do.ts create mode 100644 src/durable/supervisor.ts delete mode 100644 src/durable/tests/run-handle.test.ts create mode 100644 src/durable/tests/session-supervisor-do.test.ts create mode 100644 src/durable/tests/supervisor.test.ts diff --git a/src/durable/d1-store.ts b/src/durable/d1-store.ts index fc538bb..7137f1f 100644 --- a/src/durable/d1-store.ts +++ b/src/durable/d1-store.ts @@ -20,11 +20,13 @@ import { type DurableRunManifest, type DurableRunStore, type EventRecord, + type RunHandle, type RunOutcome, type RunRecord, type StepError, type StepKind, type StepRecord, + type StreamEventRecord, } from './types' const DEFAULT_LEASE_MS = 30_000 @@ -59,6 +61,15 @@ interface RunRow { lease_expires_at: string | null outcome_json: string | null step_count: number + handle_json: string | null +} + +interface StreamEventRow { + run_id: string + seq: number + event_id: string + payload_json: string | null + appended_at: string } interface StepRow { @@ -383,6 +394,54 @@ export class D1DurableRunStore implements DurableRunStore { return row ? rowToEventRecord(row) : undefined } + async appendStreamEvent(input: { + runId: string + eventId: string + payload: unknown + }): ReturnType { + const nowIso = new Date(this.now()).toISOString() + // INSERT OR IGNORE — the UNIQUE(run_id, event_id) index makes a re-append + // a no-op; seq is the next monotonic value, computed atomically with the + // insert. A new event_id never collides on the (run_id, seq) PK. + const res = await this.db + .prepare( + `INSERT OR IGNORE INTO durable_stream_events (run_id, seq, event_id, payload_json, appended_at) + VALUES ( + ?, + (SELECT COALESCE(MAX(seq), -1) + 1 FROM durable_stream_events WHERE run_id = ?), + ?, ?, ? + )`, + ) + .bind(input.runId, input.runId, input.eventId, JSON.stringify(input.payload ?? null), nowIso) + .run() + const accepted = (res.meta?.changes ?? 0) > 0 + const row = await this.db + .prepare('SELECT * FROM durable_stream_events WHERE run_id = ? AND event_id = ?') + .bind(input.runId, input.eventId) + .first() + if (!row) throw new Error('durable-runs: appendStreamEvent failed to persist or read back') + return { accepted, record: rowToStreamEventRecord(row) } + } + + async readStreamEvents( + runId: string, + afterSeq?: number, + ): Promise> { + const { results } = await this.db + .prepare('SELECT * FROM durable_stream_events WHERE run_id = ? AND seq > ? ORDER BY seq') + .bind(runId, afterSeq ?? -1) + .all() + return results.map(rowToStreamEventRecord) + } + + async setRunHandle(input: { runId: string; handle: RunHandle }): Promise { + const nowIso = new Date(this.now()).toISOString() + await this.db + .prepare('UPDATE durable_runs SET handle_json = ?, updated_at = ? WHERE run_id = ?') + .bind(JSON.stringify(input.handle), nowIso, input.runId) + .run() + } + async close(): Promise { // D1 binding lifecycle is owned by the runtime; no-op. } @@ -432,6 +491,17 @@ function rowToRunRecord(row: RunRow): RunRecord { leaseExpiresAt: row.lease_expires_at ?? undefined, outcome: row.outcome_json ? (JSON.parse(row.outcome_json) as RunOutcome) : undefined, stepCount: row.step_count, + handle: row.handle_json ? (JSON.parse(row.handle_json) as RunHandle) : undefined, + } +} + +function rowToStreamEventRecord(row: StreamEventRow): StreamEventRecord { + return { + runId: row.run_id, + seq: row.seq, + eventId: row.event_id, + payload: row.payload_json ? JSON.parse(row.payload_json) : null, + appendedAt: row.appended_at, } } diff --git a/src/durable/file-system-store.ts b/src/durable/file-system-store.ts index 9f5549f..bd71592 100644 --- a/src/durable/file-system-store.ts +++ b/src/durable/file-system-store.ts @@ -32,11 +32,13 @@ import { type DurableRunManifest, type DurableRunStore, type EventRecord, + type RunHandle, type RunOutcome, type RunRecord, type StepError, type StepKind, type StepRecord, + type StreamEventRecord, } from './types' const DEFAULT_LEASE_MS = 30_000 @@ -86,6 +88,7 @@ export class FileSystemDurableRunStore implements DurableRunStore { // Touch the jsonl files so listing them later doesn't ENOENT. await appendFile(join(dir, 'steps.jsonl'), '', 'utf8') await appendFile(join(dir, 'events.jsonl'), '', 'utf8') + await appendFile(join(dir, 'stream-events.jsonl'), '', 'utf8') return { run: record, completedSteps: [], leaseExpiresAt } } @@ -287,10 +290,60 @@ export class FileSystemDurableRunStore implements DurableRunStore { return undefined } + async appendStreamEvent(input: { + runId: string + eventId: string + payload: unknown + }): ReturnType { + const existing = await this.readStreamEventsRaw(input.runId) + const dup = existing.find((e) => e.eventId === input.eventId) + if (dup) return { accepted: false, record: dup } + const rec: StreamEventRecord = { + runId: input.runId, + seq: existing.length, + eventId: input.eventId, + payload: input.payload, + appendedAt: new Date(this.now()).toISOString(), + } + await appendFile( + join(this.runDir(input.runId), 'stream-events.jsonl'), + `${JSON.stringify(rec)}\n`, + 'utf8', + ) + return { accepted: true, record: rec } + } + + async readStreamEvents( + runId: string, + afterSeq?: number, + ): Promise> { + const cutoff = afterSeq ?? -1 + return (await this.readStreamEventsRaw(runId)).filter((e) => e.seq > cutoff) + } + + async setRunHandle(input: { runId: string; handle: RunHandle }): Promise { + const record = await this.readRun(input.runId) + record.handle = input.handle + record.updatedAt = new Date(this.now()).toISOString() + await this.writeRun(record) + } + async close(): Promise { // No persistent handles to close. } + private async readStreamEventsRaw(runId: string): Promise { + const path = join(this.runDir(runId), 'stream-events.jsonl') + if (!existsSync(path)) return [] + const content = await readFile(path, 'utf8') + const out: StreamEventRecord[] = [] + for (const line of content.split('\n')) { + if (!line) continue + out.push(JSON.parse(line) as StreamEventRecord) + } + return out.sort((a, b) => a.seq - b.seq) + } + /** @internal — used by tests to list runs in the store. */ async _listRunIds(): Promise { if (!existsSync(this.root)) return [] diff --git a/src/durable/in-memory-store.ts b/src/durable/in-memory-store.ts index 06e8b90..e51fa32 100644 --- a/src/durable/in-memory-store.ts +++ b/src/durable/in-memory-store.ts @@ -10,11 +10,13 @@ import type { DurableRunManifest, DurableRunStore, EventRecord, + RunHandle, RunOutcome, RunRecord, StepError, StepKind, StepRecord, + StreamEventRecord, } from './types' import { DurableRunDivergenceError, @@ -28,6 +30,8 @@ interface RunState { record: RunRecord steps: Map events: Map + /** Ordered, replayable event-stream log — `seq` is the array index. */ + streamEvents: StreamEventRecord[] } export class InMemoryDurableRunStore implements DurableRunStore { @@ -62,7 +66,7 @@ export class InMemoryDurableRunStore implements DurableRunStore { leaseExpiresAt, stepCount: 0, } - state = { record, steps: new Map(), events: new Map() } + state = { record, steps: new Map(), events: new Map(), streamEvents: [] } this.runs.set(input.runId, state) return { run: { ...record }, completedSteps: [], leaseExpiresAt } } @@ -256,6 +260,41 @@ export class InMemoryDurableRunStore implements DurableRunStore { return rec ? { ...rec } : undefined } + async appendStreamEvent(input: { + runId: string + eventId: string + payload: unknown + }): ReturnType { + const state = this.requireRun(input.runId) + const existing = state.streamEvents.find((e) => e.eventId === input.eventId) + if (existing) return { accepted: false, record: { ...existing } } + const rec: StreamEventRecord = { + runId: input.runId, + seq: state.streamEvents.length, + eventId: input.eventId, + payload: input.payload, + appendedAt: new Date(this.now()).toISOString(), + } + state.streamEvents.push(rec) + return { accepted: true, record: { ...rec } } + } + + async readStreamEvents( + runId: string, + afterSeq?: number, + ): Promise> { + const state = this.runs.get(runId) + if (!state) return [] + const cutoff = afterSeq ?? -1 + return state.streamEvents.filter((e) => e.seq > cutoff).map((e) => ({ ...e })) + } + + async setRunHandle(input: { runId: string; handle: RunHandle }): Promise { + const state = this.requireRun(input.runId) + state.record.handle = { ...input.handle } + state.record.updatedAt = new Date(this.now()).toISOString() + } + async close(): Promise { this.runs.clear() } diff --git a/src/durable/index.ts b/src/durable/index.ts index 46257b7..a89e7aa 100644 --- a/src/durable/index.ts +++ b/src/durable/index.ts @@ -25,19 +25,6 @@ export { D1DurableRunStore } from './d1-store' export { FileSystemDurableRunStore } from './file-system-store' export { canonicalHash, canonicalJson, deriveWorkerId, manifestHash, stepId } from './identity' export { InMemoryDurableRunStore } from './in-memory-store' -// ── Cross-worker sandbox-reconnect durability ───────────────────────── -// Checkpoints a substrate run handle at turn start so a fresh worker can -// re-attach to an in-flight sandbox run instead of re-running a long turn. -export type { - ReconnectableProduce, - ReconnectableTurnHandle, - ReconnectableTurnMode, - ReconnectableTurnProducer, - ReconnectProduce, - RunHandle, - RunReconnectableTurnOptions, -} from './run-handle' -export { runReconnectableTurn } from './run-handle' export type { DurableContext, RunDurableInput, RunDurableResult } from './runner' export { runDurable } from './runner' // Canonical D1 schema string + current version. Consumers wire via @@ -45,6 +32,26 @@ export { runDurable } from './runner' // during one-time bootstrap. `src/durable/schema.sql` is the source of // truth; `schema.ts` is the build-bundled string that ships in dist/. export { DURABLE_SCHEMA_SQL, DURABLE_SCHEMA_VERSION } from './schema' +// ── Durable-run supervisor — cross-worker / cross-DO durability ─────── +// Relocates the durability boundary off the ephemeral worker isolate: the +// supervisor drains a run's event stream into the substrate's own log, and +// a fresh supervisor re-attaches from the persisted cursor instead of +// re-prompting. SessionSupervisorDO hosts it on a Cloudflare Durable Object. +export type { + DurableObjectStateLike, + DurableObjectStorageLike, + SessionSupervisorDO, + SupervisorHostConfig, +} from './session-supervisor-do' +export { createSessionSupervisorDO } from './session-supervisor-do' +export type { + RunSupervisorOptions, + SandboxReconnectAdapter, + SupervisedEvent, + SupervisedRunHandle, + SupervisedRunMode, +} from './supervisor' +export { runSupervisedTurn } from './supervisor' // ── Durable turn primitive ──────────────────────────────────────────── // Streaming, backend-agnostic, checkpoint+replay durable turn. The single // reusable primitive every product chat handler routes through. @@ -58,6 +65,7 @@ export type { DurableRunManifest, DurableRunStore, EventRecord, + RunHandle, RunOutcome, RunRecord, RunStatus, @@ -65,6 +73,7 @@ export type { StepKind, StepRecord, StepStatus, + StreamEventRecord, } from './types' export { DurableAwaitEventTimeoutError, diff --git a/src/durable/run-handle.ts b/src/durable/run-handle.ts deleted file mode 100644 index a6347d4..0000000 --- a/src/durable/run-handle.ts +++ /dev/null @@ -1,326 +0,0 @@ -/** - * `runReconnectableTurn` — cross-worker sandbox-reconnect durability. - * - * `runDurableTurn` survives a worker crash *after* a turn completes: the - * cached final text replays. It cannot survive a crash *during* a 15-minute - * agentic turn — the producer's generator died with the isolate, so a fresh - * worker re-runs the whole turn. - * - * The fix exploits a property of sandbox-backed turns: the work runs inside - * an orchestrator-managed sandbox container that OUTLIVES the worker isolate. - * The `@tangle-network/sandbox` SDK's `streamPrompt` already auto-reconnects - * to the runtime event-replay endpoint when the SSE stream drops within a - * live worker. The gap is cross-worker: when the isolate itself dies, the - * generator is gone and a fresh worker has no pointer to the in-flight run. - * - * `runReconnectableTurn` closes that gap by checkpointing a **run handle** — - * `{ sandboxId, sessionId, runId }` — the instant the turn starts, before any - * events stream. On a retry that finds a `running` handle, a fresh worker - * calls the product-supplied `reconnect(handle)` callback (which wires the - * SDK's replay endpoint) instead of `produce`. The sandbox runtime IS the - * durable engine; this primitive only remembers the pointer. - * - * Substrate split: - * - sandbox products supply `produce` + `reconnect`. `produce` registers a - * handle via `register(handle)` once `streamPrompt` yields the run id; - * `reconnect` re-attaches to that run from a fresh process. - * - tcloud products supply `produce` only and omit `reconnect`. With no - * reconnect path a `running` handle on retry is treated as a stale lease - * and the turn re-runs — identical to `runDurableTurn` semantics. - * - * Storage: the handle is checkpointed as a *completed* step at index 0 (the - * handle step). The actual turn runs at step index 1. This reuses the - * existing `completeStep` JSON-result path with zero schema change — a - * completed step is the only step shape `startOrResume` returns to a retry, - * and it must be readable while the turn step is still `running`. Adding a - * `durable_steps` column would force a migration across all three stores and - * a new store method; the handle-step approach needs neither. - */ - -import { canonicalHash } from './identity' -import type { DurableTurnProducer } from './turn' -import type { DurableRunManifest, DurableRunStore, RunRecord, StepRecord } from './types' - -/** - * A pointer to a substrate run that outlives the worker isolate. Persisted at - * turn start so a fresh worker can re-attach instead of re-running. - */ -export interface RunHandle { - /** Which substrate owns the run. `sandbox` runs are reconnectable; `tcloud` - * runs are not (no cross-process replay endpoint). */ - kind: 'sandbox' | 'tcloud' - /** Orchestrator-managed sandbox id. Stable across worker isolates. Present - * for `kind: 'sandbox'`. */ - sandboxId?: string - /** Sandbox conversation/session id — the `sessionId` passed to - * `streamPrompt`. Lets the reconnect target the right thread. */ - sessionId?: string - /** The in-flight run id — the sandbox SDK's `executionId`, captured from - * the first `execution.started` SSE frame. The replay endpoint keys on it: - * `GET {runtimeUrl}/agents/run/{runId}/events?lastEventId=`. */ - runId?: string - /** Lifecycle of the substrate run as last observed by this primitive. */ - status: 'running' | 'completed' | 'failed' - /** Last SSE event id yielded — the replay cursor. A reconnect resumes from - * here so already-delivered events are not re-emitted. Undefined until the - * first frame with an `id:` arrives. */ - cursor?: string -} - -/** A producer for a reconnectable turn. Extends the plain turn producer with a - * `register` hook the producer calls once the substrate run id is known. */ -export interface ReconnectableTurnProducer extends DurableTurnProducer { - /** Set by `runReconnectableTurn` before `produce()` is called. The producer - * invokes it as soon as `streamPrompt` yields `execution.started` so the - * handle is checkpointed mid-turn; it may call it again with an updated - * `cursor` as events stream. Each call overwrites the prior handle. */ - register?: (handle: RunHandle) => void -} - -/** What a product supplies to build the live producer for a reconnectable - * turn. `register` is injected by `runReconnectableTurn`. */ -export type ReconnectableProduce = (ctx: { - /** Checkpoint the run handle. Call once `streamPrompt` yields the run id; - * call again to advance `cursor`. */ - register: (handle: RunHandle) => void -}) => DurableTurnProducer - -/** What a product supplies to re-attach to an in-flight substrate run from a - * fresh worker. Receives the handle checkpointed by the original turn. The - * returned producer's `stream` is the replay stream; `finalText()` is the - * reconnected turn's final text once the stream drains. */ -export type ReconnectProduce = (handle: RunHandle) => DurableTurnProducer - -export interface RunReconnectableTurnOptions { - store: DurableRunStore - /** Stable per-turn run id. The same id on a retry is what enables both - * replay (completed turn) and reconnect (in-flight turn). */ - runId: string - manifest: DurableRunManifest - /** Stable per-isolate worker id. */ - workerId: string - /** Lease window in ms. Default 900_000 (15 min) — a reconnectable turn is - * exactly the long-running case, so the lease must outlast the turn. */ - leaseMs?: number - /** Human-readable step label. Default `turn`. */ - intent?: string - /** Builds the live producer for a fresh turn. Called once, on a fresh run. */ - produce: ReconnectableProduce - /** Re-attaches to an in-flight substrate run. Called on a retry that finds - * a `running` handle. Omit for substrates with no cross-process replay - * (tcloud) — a `running` handle then falls through to a re-run. */ - reconnect?: ReconnectProduce - /** Synthesizes the single replay event from cached final text — used when a - * retry finds a *completed* turn (worker died after the turn finished). */ - replayEvent: (finalText: string) => TEvent - /** Optional live accumulator — see `RunDurableTurnOptions.accumulate`. */ - accumulate?: (event: TEvent, current: string) => string | undefined -} - -/** How the turn resolved — which of the three paths ran. */ -export type ReconnectableTurnMode = 'fresh' | 'reconnected' | 'replayed' | 'rerun' - -export interface ReconnectableTurnHandle { - /** Drop-in stream. Fresh/rerun forward producer events; reconnected forwards - * the replay stream; replayed emits one `replayEvent(cachedText)`. */ - stream: AsyncGenerator - /** Final text. Valid after `stream` drains. */ - finalText(): string - /** Which path ran. Valid after `stream` drains. */ - mode(): ReconnectableTurnMode - /** The run handle as last checkpointed. `undefined` if the producer never - * registered one (e.g. a tcloud turn). Valid after `stream` drains. */ - handle(): RunHandle | undefined - /** The durable `RunRecord` for this turn. Valid after `stream` drains. */ - record(): RunRecord | undefined -} - -/** Step 0 holds the run handle; step 1 holds the turn's final text. */ -const HANDLE_STEP = 0 -const TURN_STEP = 1 - -/** A handle in `running` state with no reconnect callback cannot be resumed — - * the turn must re-run. A `completed`/`failed` handle is informational only. */ -function isReconnectable(handle: RunHandle | undefined): handle is RunHandle { - return handle?.status === 'running' -} - -function readHandleStep(steps: ReadonlyArray): RunHandle | undefined { - const step = steps.find((s) => s.stepIndex === HANDLE_STEP) - if (!step || step.status !== 'completed') return undefined - const result = step.result as { handle?: RunHandle } | undefined - return result?.handle -} - -function readTurnStep(steps: ReadonlyArray): { finalText: string } | undefined { - const step = steps.find((s) => s.stepIndex === TURN_STEP) - if (!step || step.status !== 'completed') return undefined - const result = step.result as { finalText?: string } | undefined - return { finalText: result?.finalText ?? '' } -} - -export function runReconnectableTurn( - options: RunReconnectableTurnOptions, -): ReconnectableTurnHandle { - const { store, runId, manifest, workerId } = options - const leaseMs = options.leaseMs ?? 900_000 - const intent = options.intent ?? 'turn' - const inputHash = canonicalHash(manifest.input) - - let accumulated = '' - let mode: ReconnectableTurnMode = 'fresh' - let currentHandle: RunHandle | undefined - let finalRecord: RunRecord | undefined - // Serializes handle-step writes. `register` may fire repeatedly as the - // substrate streams; each write chains onto the prior so they land in call - // order and never interleave with the terminal completed-handle write. - let handleWrites: Promise = Promise.resolve() - - /** Drain a producer's stream: forward events, accumulate text, checkpoint - * the turn step on clean drain, fail it on throw. Shared by the fresh, - * rerun, and reconnected paths — all three differ only in which producer - * they hand in and whether the turn step was already begun. */ - async function* drainAndCheckpoint( - producer: DurableTurnProducer, - ): AsyncGenerator { - try { - for await (const event of producer.stream) { - if (options.accumulate) { - const next = options.accumulate(event, accumulated) - if (typeof next === 'string') accumulated = next - } - yield event - } - const producerText = producer.finalText() - if (producerText) accumulated = producerText - // Drain in-flight `register` writes BEFORE any further store write: - // both touch the run record, and concurrent writers corrupt stores - // that commit via tmp-file+rename. - await handleWrites - await store.completeStep({ - runId, - stepIndex: TURN_STEP, - result: { finalText: accumulated }, - }) - // Flip the handle to `completed` LAST so a late retry sees `completed` - // — never a stale `running` it would wrongly reconnect to. - if (currentHandle && currentHandle.status === 'running') { - currentHandle = { ...currentHandle, status: 'completed' } - await store.completeStep({ - runId, - stepIndex: HANDLE_STEP, - result: { handle: currentHandle }, - }) - } - finalRecord = await store.endRun({ - runId, - workerId, - status: 'completed', - outcome: { notes: intent, metadata: { chars: accumulated.length, mode } }, - }) - } catch (err) { - // Drain in-flight `register` writes before failing so no handle write - // outlives the turn (it would race store teardown). The handle stays - // `running` — exactly what a reconnecting retry needs. - await handleWrites - await store.failStep({ - runId, - stepIndex: TURN_STEP, - error: { message: err instanceof Error ? err.message : String(err) }, - }) - finalRecord = await store.endRun({ runId, workerId, status: 'failed' }) - throw err - } - } - - async function* stream(): AsyncGenerator { - const { completedSteps } = await store.startOrResume({ - runId, - manifest, - workerId, - leaseMs, - }) - - // ── Replay path — the turn already finished ───────────────────────── - const turnStep = readTurnStep(completedSteps) - if (turnStep) { - mode = 'replayed' - accumulated = turnStep.finalText - currentHandle = readHandleStep(completedSteps) - yield options.replayEvent(accumulated) - finalRecord = await store.endRun({ runId, workerId, status: 'completed' }) - return - } - - // ── Reconnect path — an in-flight handle survived a worker crash ──── - const priorHandle = readHandleStep(completedSteps) - if (isReconnectable(priorHandle) && options.reconnect) { - mode = 'reconnected' - currentHandle = priorHandle - // The turn step may be `running`/`failed` from the dead worker. begin - // bumps its attempt count; the reconnected stream re-checkpoints it. - await store.beginStep({ - runId, - stepIndex: TURN_STEP, - intent, - kind: 'llm', - inputHash, - }) - yield* drainAndCheckpoint(options.reconnect(priorHandle)) - return - } - - // ── Fresh / rerun path — produce live ─────────────────────────────── - // A `running` handle with no reconnect callback (tcloud, or a sandbox - // product that did not wire reconnect) cannot resume — it re-runs. - mode = priorHandle ? 'rerun' : 'fresh' - - // The handle step is checkpointed up front as a placeholder so a retry - // that crashes before `register` still sees the run shape. `register` - // overwrites it with the real pointer once the substrate yields the id. - const placeholder: RunHandle = { kind: 'tcloud', status: 'running' } - await store.beginStep({ - runId, - stepIndex: HANDLE_STEP, - intent: `${intent}:handle`, - kind: 'logic', - inputHash, - }) - await store.completeStep({ - runId, - stepIndex: HANDLE_STEP, - result: { handle: placeholder }, - }) - currentHandle = placeholder - - const register = (handle: RunHandle): void => { - currentHandle = handle - // The handle step is already `completed`; each `register` overwrites its - // result. Writes chain through `handleWrites` so they persist in call - // order and `drainAndCheckpoint` can await them before the terminal - // completed-handle write. A crash before a write lands loses only the - // pointer refinement, not correctness — a non-reconnectable handle - // re-runs. A rejected write must not abort the turn, so it is absorbed. - handleWrites = handleWrites - .then(() => store.completeStep({ runId, stepIndex: HANDLE_STEP, result: { handle } })) - .catch(() => undefined) - } - - await store.beginStep({ - runId, - stepIndex: TURN_STEP, - intent, - kind: 'llm', - inputHash, - }) - yield* drainAndCheckpoint(options.produce({ register })) - } - - return { - stream: stream(), - finalText: () => accumulated, - mode: () => mode, - handle: () => currentHandle, - record: () => finalRecord, - } -} diff --git a/src/durable/schema.sql b/src/durable/schema.sql index df5b71c..1cc8aab 100644 --- a/src/durable/schema.sql +++ b/src/durable/schema.sql @@ -65,3 +65,32 @@ CREATE TABLE IF NOT EXISTS durable_events ( INSERT OR IGNORE INTO durable_schema_info (version, applied_at) VALUES (1, strftime('%Y-%m-%dT%H:%M:%fZ', 'now')); + +-- ── Migration v2 — durable event-stream log + run handle ─────────────── +-- Run once on a database created at v1. `ALTER TABLE` is not idempotent; the +-- version trail in `durable_schema_info` is how migrations are sequenced — +-- never by blind re-execution of this block. +-- +-- - `durable_stream_events` is the ordered, replayable per-run event log. +-- `seq` is the store-assigned monotonic cursor; the UNIQUE index on +-- (run_id, event_id) makes appends idempotent — a reconnecting adapter +-- that re-yields a boundary event cannot double-log it. +-- - `durable_runs.handle_json` is the pointer (sandbox + substrate run id + +-- cursor) a fresh supervisor re-attaches by. + +ALTER TABLE durable_runs ADD COLUMN handle_json TEXT; + +CREATE TABLE IF NOT EXISTS durable_stream_events ( + run_id TEXT NOT NULL, + seq INTEGER NOT NULL, + event_id TEXT NOT NULL, + payload_json TEXT, + appended_at TEXT NOT NULL, + PRIMARY KEY (run_id, seq) +); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_durable_stream_events_event_id + ON durable_stream_events(run_id, event_id); + +INSERT OR IGNORE INTO durable_schema_info (version, applied_at) +VALUES (2, strftime('%Y-%m-%dT%H:%M:%fZ', 'now')); diff --git a/src/durable/schema.ts b/src/durable/schema.ts index dc2be5c..d4a43d7 100644 --- a/src/durable/schema.ts +++ b/src/durable/schema.ts @@ -15,7 +15,7 @@ * migration entry to durable_schema_info instead of mutating prior rows. */ -export const DURABLE_SCHEMA_VERSION = 1 +export const DURABLE_SCHEMA_VERSION = 2 export const DURABLE_SCHEMA_SQL = `-- Durable-run substrate — versioned schema for D1 / SQLite. -- @@ -84,4 +84,33 @@ CREATE TABLE IF NOT EXISTS durable_events ( INSERT OR IGNORE INTO durable_schema_info (version, applied_at) VALUES (1, strftime('%Y-%m-%dT%H:%M:%fZ', 'now')); + +-- ── Migration v2 — durable event-stream log + run handle ─────────────── +-- Run once on a database created at v1. \`ALTER TABLE\` is not idempotent; the +-- version trail in \`durable_schema_info\` is how migrations are sequenced — +-- never by blind re-execution of this block. +-- +-- - \`durable_stream_events\` is the ordered, replayable per-run event log. +-- \`seq\` is the store-assigned monotonic cursor; the UNIQUE index on +-- (run_id, event_id) makes appends idempotent — a reconnecting adapter +-- that re-yields a boundary event cannot double-log it. +-- - \`durable_runs.handle_json\` is the pointer (sandbox + substrate run id + +-- cursor) a fresh supervisor re-attaches by. + +ALTER TABLE durable_runs ADD COLUMN handle_json TEXT; + +CREATE TABLE IF NOT EXISTS durable_stream_events ( + run_id TEXT NOT NULL, + seq INTEGER NOT NULL, + event_id TEXT NOT NULL, + payload_json TEXT, + appended_at TEXT NOT NULL, + PRIMARY KEY (run_id, seq) +); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_durable_stream_events_event_id + ON durable_stream_events(run_id, event_id); + +INSERT OR IGNORE INTO durable_schema_info (version, applied_at) +VALUES (2, strftime('%Y-%m-%dT%H:%M:%fZ', 'now')); ` diff --git a/src/durable/session-supervisor-do.ts b/src/durable/session-supervisor-do.ts new file mode 100644 index 0000000..e1f6a8a --- /dev/null +++ b/src/durable/session-supervisor-do.ts @@ -0,0 +1,178 @@ +/** + * `SessionSupervisorDO` — the Cloudflare Durable Object host for + * `runSupervisedTurn`. + * + * A stateless Worker isolate is the wrong place to own a 15-minute run: it + * dies on a deploy roll or CPU limit. A Durable Object is addressable by + * session id and survives across requests — it is the right home for the + * supervisor. This host is deliberately thin: all the durability logic lives + * in the platform-agnostic `runSupervisedTurn`; the DO only hosts it and + * uses an `alarm()` to re-attach a run the response stream abandoned. + * + * - `fetch` resolves the run, records it, arms the orphan-check alarm, and + * streams the supervised events back. If the client disconnects, the + * supervisor stops being pulled and its short lease lapses. + * - `alarm()` is the recovery mechanism: it finds a recorded-but-unfinished + * run and re-drives `runSupervisedTurn` headlessly to completion (events + * land in the durable log; a later `fetch` replays them). A run still held + * by a live `fetch` raises `DurableRunLeaseHeldError` — not orphaned, so + * the alarm just re-arms. + * + * Structural CF types (`DurableObjectStateLike`) are defined locally so + * agent-runtime keeps no dependency on `@cloudflare/workers-types` — the same + * discipline as `D1DatabaseLike` in `d1-store.ts`. + */ + +import type { RunSupervisorOptions } from './supervisor' +import { runSupervisedTurn } from './supervisor' +import { DurableRunLeaseHeldError } from './types' + +/** Minimal Durable Object storage surface this host uses. Compatible with + * Cloudflare's `DurableObjectStorage`. */ +export interface DurableObjectStorageLike { + get(key: string): Promise + put(key: string, value: T): Promise + delete(key: string): Promise + /** Schedule the next `alarm()` invocation at an epoch-ms time. */ + setAlarm(scheduledTime: number): Promise +} + +/** Minimal Durable Object state surface — the `state` ctor argument. */ +export interface DurableObjectStateLike { + storage: DurableObjectStorageLike +} + +/** + * Product-supplied wiring for the host. `resolveRun` / `resolveOrphan` build + * the supervisor inputs (store, adapter, manifest) — the host owns no + * product policy. + */ +export interface SupervisorHostConfig { + /** Build supervisor inputs for an incoming request. `undefined` → 404. */ + resolveRun( + request: Request, + env: TEnv, + state: DurableObjectStateLike, + ): Promise | undefined> + /** Rebuild supervisor inputs for an orphan re-attach, from the recorded + * runId. `undefined` → the run is untrackable; the host stops tracking it. */ + resolveOrphan( + runId: string, + env: TEnv, + state: DurableObjectStateLike, + ): Promise | undefined> + /** Serialize one event into a response-stream chunk (an SSE or NDJSON + * line — the product owns the framing). */ + encodeEvent(event: TEvent): string + /** Delay before the orphan-check alarm fires. Default 60_000. */ + orphanCheckMs?: number + /** Time source — tests pin this. */ + now?: () => number +} + +/** The host instance surface — what a Cloudflare DO runtime invokes. */ +export interface SessionSupervisorDO { + fetch(request: Request): Promise + alarm(): Promise +} + +/** DO-storage key under which the host records the in-flight run id, so the + * orphan-check `alarm()` can find a run a dropped response stream left behind. */ +export const ACTIVE_RUN_KEY = 'agent-runtime:active-run-id' + +/** Drain a supervised stream without a consumer — events land in the durable + * log via the supervisor; nothing forwards them. */ +async function drainHeadless(stream: AsyncGenerator): Promise { + let next = await stream.next() + while (!next.done) next = await stream.next() +} + +/** + * Build the `SessionSupervisorDO` class for a product. Export the result from + * the Worker entrypoint and bind it in `wrangler.toml`: + * + * export const SessionSupervisor = createSessionSupervisorDO(config) + * + * # wrangler.toml + * [[durable_objects.bindings]] + * name = "SESSION_SUPERVISOR" + * class_name = "SessionSupervisor" + * [[migrations]] + * tag = "v1" + * new_classes = ["SessionSupervisor"] + */ +export function createSessionSupervisorDO( + config: SupervisorHostConfig, +): new ( + state: DurableObjectStateLike, + env: TEnv, +) => SessionSupervisorDO { + const orphanCheckMs = config.orphanCheckMs ?? 60_000 + const now = config.now ?? (() => Date.now()) + + return class implements SessionSupervisorDO { + constructor( + private readonly state: DurableObjectStateLike, + private readonly env: TEnv, + ) {} + + async fetch(request: Request): Promise { + const opts = await config.resolveRun(request, this.env, this.state) + if (!opts) return new Response('no run for this request', { status: 404 }) + + // Record the run + arm the orphan check before streaming, so a crash + // mid-stream still leaves a recovery trail. + await this.state.storage.put(ACTIVE_RUN_KEY, opts.runId) + await this.state.storage.setAlarm(now() + orphanCheckMs) + + const supervised = runSupervisedTurn(opts) + const storage = this.state.storage + const encoder = new TextEncoder() + const body = new ReadableStream({ + async pull(controller) { + try { + const next = await supervised.stream.next() + if (next.done) { + await storage.delete(ACTIVE_RUN_KEY) + controller.close() + return + } + controller.enqueue(encoder.encode(config.encodeEvent(next.value))) + } catch (err) { + controller.error(err instanceof Error ? err : new Error(String(err))) + } + }, + }) + return new Response(body, { + headers: { 'content-type': 'text/event-stream', 'cache-control': 'no-cache' }, + }) + } + + async alarm(): Promise { + const runId = await this.state.storage.get(ACTIVE_RUN_KEY) + if (!runId) return + + const opts = await config.resolveOrphan(runId, this.env, this.state) + if (!opts) { + await this.state.storage.delete(ACTIVE_RUN_KEY) + return + } + + try { + // Re-drive the run headlessly. startOrResume inside resolves the + // path: replay (already done), resume (orphaned + in-flight), or a + // fresh run. Events land in the durable log regardless. + await drainHeadless(runSupervisedTurn(opts).stream) + await this.state.storage.delete(ACTIVE_RUN_KEY) + } catch (err) { + if (err instanceof DurableRunLeaseHeldError) { + // A live fetch still owns the run — not orphaned. Re-check later. + await this.state.storage.setAlarm(now() + orphanCheckMs) + return + } + // The run failed for real — stop tracking it. + await this.state.storage.delete(ACTIVE_RUN_KEY) + } + } + } +} diff --git a/src/durable/supervisor.ts b/src/durable/supervisor.ts new file mode 100644 index 0000000..ee28f45 --- /dev/null +++ b/src/durable/supervisor.ts @@ -0,0 +1,229 @@ +/** + * `runSupervisedTurn` — relocates the durability boundary off the ephemeral + * worker isolate. + * + * `runDurableTurn` replays a *completed* turn; an interrupted turn re-runs. + * `runSupervisedTurn` closes that gap for sandbox-backed runs: the sandbox + * container is orchestrator-managed and outlives the worker, so instead of + * re-prompting, a fresh supervisor re-attaches to the in-flight substrate run + * and resumes draining its event stream. + * + * Durability is owned by the substrate, not hoped-for from the sandbox. The + * supervisor drains every event into the store's stream log as it flows + * (`appendStreamEvent`), persists the reconnect pointer the instant the + * substrate yields it (`setRunHandle`), and heartbeats the lease. A fresh + * supervisor reads the log for its cursor and calls `adapter.attach` to + * resume strictly after it — the append's idempotency on `eventId` dedups + * the reconnect seam, so no event is lost and none is delivered twice. + * + * The platform-agnostic core is here; `SessionSupervisorDO` hosts it on a + * Cloudflare Durable Object. The reconnect glue is one typed contract — + * `SandboxReconnectAdapter` — implemented once per substrate, never per + * product. + */ + +import { canonicalHash } from './identity' +import type { DurableRunManifest, DurableRunStore, RunHandle, RunRecord } from './types' + +/** One event drained from a supervised run. */ +export interface SupervisedEvent { + /** Stable substrate id — the dedup key and the reconnect cursor. */ + eventId: string + payload: TEvent + /** + * The substrate run handle, carried on the first frame(s) once the run id + * is known. The supervisor persists it so a fresh supervisor can re-attach. + * Omit on later frames; the last non-undefined handle wins. + */ + handle?: RunHandle +} + +/** + * Product-supplied glue to a reconnectable substrate run. The dangerous + * reconnect logic — re-attaching to a live distributed run — lives behind + * this one typed contract: implement it once per substrate (the sandbox SDK, + * etc.), never per product. + * + * Conformance (asserted by `supervisor.test.ts`): + * - `start()` yields the run's events; at least one early event carries a + * `handle` with `status: 'running'` and a defined `runId`. + * - `attach(handle, afterEventId)` yields only events strictly after + * `afterEventId`, and terminates cleanly when the run has no more. + * - `eventId`s are unique within a run. + */ +export interface SandboxReconnectAdapter { + /** Begin a fresh substrate run. */ + start(): AsyncIterable> + /** + * Re-attach to an in-flight run, resuming strictly after `afterEventId` + * (`undefined` → from the first event). + */ + attach( + handle: RunHandle, + afterEventId: string | undefined, + ): AsyncIterable> +} + +/** How the supervised turn resolved. */ +export type SupervisedRunMode = 'fresh' | 'resumed' | 'replayed' + +export interface RunSupervisorOptions { + store: DurableRunStore + /** Stable per-turn run id — the same id on a retry is what enables both + * replay (completed turn) and resume (in-flight turn). */ + runId: string + manifest: DurableRunManifest + /** Stable per-isolate worker id. */ + workerId: string + adapter: SandboxReconnectAdapter + /** Lease window in ms. Default 60_000 — deliberately short: the heartbeat + * keeps an actively-draining supervisor's lease alive, so an abandoned + * supervisor's lease lapses fast and a fresh supervisor can take over. */ + leaseMs?: number + /** Renew the lease at most this often while draining. Default 30_000 — + * must be below `leaseMs` or an active drain loses its own lease. */ + heartbeatMs?: number + /** Human-readable step label. Default `turn`. */ + intent?: string + /** Time source override — tests pin this for deterministic heartbeats. */ + now?: () => number +} + +export interface SupervisedRunHandle { + /** Drop-in stream. Fresh forwards live events; resumed re-yields the logged + * prefix then forwards live events; replayed re-yields the full log. */ + stream: AsyncGenerator + /** Which path ran. Valid after `stream` drains. */ + mode(): SupervisedRunMode + /** The durable RunRecord for the turn. Valid after `stream` drains. */ + record(): RunRecord | undefined +} + +const TURN_STEP = 0 + +export function runSupervisedTurn( + options: RunSupervisorOptions, +): SupervisedRunHandle { + const { store, runId, manifest, workerId, adapter } = options + const leaseMs = options.leaseMs ?? 60_000 + const heartbeatMs = options.heartbeatMs ?? 30_000 + const intent = options.intent ?? 'turn' + const now = options.now ?? (() => Date.now()) + const inputHash = canonicalHash(manifest.input) + + let mode: SupervisedRunMode = 'fresh' + let finalRecord: RunRecord | undefined + let currentHandle: RunHandle | undefined + // Set when the heartbeat finds the lease taken by another supervisor. The + // catch block then skips failStep/endRun — this worker no longer owns the + // run, and must not write its terminal state. + let leaseLost = false + + async function* drain( + source: AsyncIterable>, + ): AsyncGenerator { + let lastRenew = now() + for await (const event of source) { + // Persist the reconnect pointer the instant the substrate yields it, + // before the event itself — a crash between the two must leave a + // resumable handle, never an event a fresh supervisor cannot place. + if (event.handle) { + currentHandle = event.handle + await store.setRunHandle({ runId, handle: event.handle }) + } + // Drain into the durable log. Idempotent on eventId: a boundary event + // re-yielded across the reconnect seam returns accepted=false and is + // not re-forwarded — no duplicate reaches the caller. + const { accepted } = await store.appendStreamEvent({ + runId, + eventId: event.eventId, + payload: event.payload, + }) + if (now() - lastRenew >= heartbeatMs) { + const renewed = await store.renewLease({ runId, workerId, leaseMs }) + if (!renewed.ok) { + leaseLost = true + throw new Error(`durable-runs: lease lost on ${runId} — another supervisor took over`) + } + lastRenew = now() + } + if (accepted) yield event.payload + } + } + + async function* stream(): AsyncGenerator { + const { run, completedSteps } = await store.startOrResume({ + runId, + manifest, + workerId, + leaseMs, + }) + + // ── Replayed — the turn already completed ─────────────────────────── + if (completedSteps.some((s) => s.stepIndex === TURN_STEP && s.status === 'completed')) { + mode = 'replayed' + for (const e of await store.readStreamEvents(runId)) yield e.payload as TEvent + finalRecord = await store.endRun({ runId, workerId, status: 'completed' }) + return + } + + // ── Decide fresh vs resume ────────────────────────────────────────── + const logged = await store.readStreamEvents(runId) + const priorHandle = run.handle + const resumable = + priorHandle !== undefined && + priorHandle.status === 'running' && + typeof priorHandle.runId === 'string' + + let source: AsyncIterable> + if (resumable && priorHandle) { + mode = 'resumed' + currentHandle = priorHandle + // Re-yield the logged prefix so the caller sees the whole stream, then + // attach strictly after the last logged event. + for (const e of logged) yield e.payload as TEvent + const cursor = logged.length > 0 ? logged[logged.length - 1]!.eventId : priorHandle.cursor + source = adapter.attach(priorHandle, cursor) + } else { + mode = 'fresh' + source = adapter.start() + } + + await store.beginStep({ runId, stepIndex: TURN_STEP, intent, kind: 'llm', inputHash }) + try { + yield* drain(source) + const eventCount = (await store.readStreamEvents(runId)).length + // completeStep first — it is the "turn finished" signal the replay path + // keys on. A crash after it leaves a cleanly replayable run; flipping + // the handle first and crashing before completeStep would instead make + // a finished run look fresh (handle not `running`, step not completed). + await store.completeStep({ runId, stepIndex: TURN_STEP, result: { eventCount } }) + if (currentHandle && currentHandle.status === 'running') { + await store.setRunHandle({ runId, handle: { ...currentHandle, status: 'completed' } }) + } + finalRecord = await store.endRun({ + runId, + workerId, + status: 'completed', + outcome: { notes: intent, metadata: { events: eventCount, mode } }, + }) + } catch (err) { + if (!leaseLost) { + // The handle stays `running` — exactly what a resuming retry needs. + await store.failStep({ + runId, + stepIndex: TURN_STEP, + error: { message: err instanceof Error ? err.message : String(err) }, + }) + finalRecord = await store.endRun({ runId, workerId, status: 'failed' }) + } + throw err + } + } + + return { + stream: stream(), + mode: () => mode, + record: () => finalRecord, + } +} diff --git a/src/durable/tests/run-handle.test.ts b/src/durable/tests/run-handle.test.ts deleted file mode 100644 index 4d1422a..0000000 --- a/src/durable/tests/run-handle.test.ts +++ /dev/null @@ -1,506 +0,0 @@ -/** - * `runReconnectableTurn` tests — the cross-worker sandbox-reconnect path. - * - * Run identically against InMemory / FileSystem / D1-over-sqlite so the - * handle registry is proven on every store a product could deploy on. - * - * Each test names the regression it defends: - * - "fresh turn registers a handle" — handle is checkpointed at step 0 - * - "retry with running handle reconnects" — fresh worker calls reconnect, - * NOT produce; the 15-min turn is - * not re-run from the top - * - "completed handle replays" — a finished turn replays cached - * text, neither produce nor - * reconnect runs - * - "running handle, no reconnect = rerun" — tcloud substrate (no replay - * endpoint) falls through to a - * clean re-run - * - "reconnect failure fails the run" — a replay-endpoint error is not - * swallowed; the turn step fails - * - "register advances the cursor" — the last-observed SSE cursor is - * persisted for the replay GET - */ - -import { mkdtempSync, readFileSync, rmSync } from 'node:fs' -import { tmpdir } from 'node:os' -import { join } from 'node:path' - -import { afterEach, beforeEach, describe, expect, it } from 'vitest' - -import { - D1DurableRunStore, - type DurableRunManifest, - type DurableRunStore, - FileSystemDurableRunStore, - InMemoryDurableRunStore, - type ReconnectableProduce, - type ReconnectProduce, - type RunHandle, - runReconnectableTurn, -} from '../index' -import { createSqliteD1 } from './sqlite-d1-adapter' - -const SCHEMA_SQL = readFileSync(new URL('../schema.sql', import.meta.url), 'utf8') - -interface FakeEvent { - type: string - text?: string - runId?: string -} - -function makeManifest(turnIndex = 0): DurableRunManifest { - return { - projectId: 'test-product', - scenarioId: 'thread-1', - task: { - id: `chat:thread-1:${turnIndex}`, - intent: 'unit-test reconnectable turn', - domain: 'test', - requiredKnowledge: [], - metadata: { turnIndex }, - }, - input: { userMessage: `q-${turnIndex}` }, - } -} - -/** - * A fake sandbox producer. Emits an `execution.started` event carrying a run - * id (the producer registers the handle on it, mirroring how a real product - * watches `streamPrompt`), then text deltas, then a result. `throwAt` makes - * it explode after that many *yielded* events to simulate a mid-turn crash. - */ -function sandboxProducer(opts: { - sandboxId: string - sessionId: string - runId: string - chunks: string[] - register: (handle: RunHandle) => void - onConstruct?: () => void - throwAt?: number -}) { - opts.onConstruct?.() - let assembled = '' - async function* stream(): AsyncGenerator { - let emitted = 0 - // streamPrompt yields execution.started first — the product registers the - // run handle here, mid-turn, before any expensive work. - opts.register({ - kind: 'sandbox', - sandboxId: opts.sandboxId, - sessionId: opts.sessionId, - runId: opts.runId, - status: 'running', - }) - yield { type: 'execution.started', runId: opts.runId } - emitted += 1 - for (let i = 0; i < opts.chunks.length; i++) { - if (opts.throwAt !== undefined && emitted >= opts.throwAt) { - throw new Error('worker isolate died mid-turn') - } - assembled += opts.chunks[i] - // advance the replay cursor as frames arrive - opts.register({ - kind: 'sandbox', - sandboxId: opts.sandboxId, - sessionId: opts.sessionId, - runId: opts.runId, - status: 'running', - cursor: `evt-${i}`, - }) - yield { type: 'delta', text: opts.chunks[i] } - emitted += 1 - } - yield { type: 'result', text: assembled } - } - return { stream: stream(), finalText: () => assembled } -} - -const storeKinds = [ - { - name: 'InMemoryDurableRunStore', - factory: () => ({ store: new InMemoryDurableRunStore(), cleanup: () => undefined }), - }, - { - name: 'FileSystemDurableRunStore', - factory: () => { - const dir = mkdtempSync(join(tmpdir(), 'run-handle-test-')) - return { - store: new FileSystemDurableRunStore(dir), - cleanup: () => rmSync(dir, { recursive: true, force: true }), - } - }, - }, - { - name: 'D1DurableRunStore (better-sqlite3)', - factory: () => { - const handle = createSqliteD1() - handle.raw.exec(SCHEMA_SQL) - return { - store: new D1DurableRunStore(handle.db), - cleanup: () => handle.close(), - } - }, - }, -] as const - -for (const kind of storeKinds) { - describe(`runReconnectableTurn / ${kind.name}`, () => { - let store: DurableRunStore - let cleanup: () => void - - beforeEach(() => { - const made = kind.factory() - store = made.store - cleanup = made.cleanup - }) - - afterEach(async () => { - await store.close() - cleanup() - }) - - it('a fresh turn registers a run handle and checkpoints final text', async () => { - const handle = runReconnectableTurn({ - store, - runId: 'chat:thread-1:0', - manifest: makeManifest(0), - workerId: 'worker-a', - produce: ({ register }) => - sandboxProducer({ - sandboxId: 'sbx-1', - sessionId: 'sess-1', - runId: 'run-1', - chunks: ['Hello', ', ', 'world'], - register, - }), - replayEvent: (text) => ({ type: 'result', text }), - }) - - const events: FakeEvent[] = [] - for await (const e of handle.stream) events.push(e) - - expect(handle.mode()).toBe('fresh') - expect(handle.finalText()).toBe('Hello, world') - expect(handle.record()?.status).toBe('completed') - // handle was registered, and the final state reflects turn completion - const h = handle.handle() - expect(h).toBeDefined() - expect(h?.kind).toBe('sandbox') - expect(h?.sandboxId).toBe('sbx-1') - expect(h?.sessionId).toBe('sess-1') - expect(h?.runId).toBe('run-1') - expect(h?.status).toBe('completed') - // the handle is durably persisted at step 0 - const persisted = await store.loadStep('chat:thread-1:0', 0) - expect(persisted?.status).toBe('completed') - expect((persisted?.result as { handle: RunHandle }).handle.sandboxId).toBe('sbx-1') - }) - - it('a retry with a running handle reconnects instead of re-running produce', async () => { - const runId = 'chat:thread-1:1' - const manifest = makeManifest(1) - - // ── Attempt 1: worker-a starts the turn, crashes mid-stream ─────── - let firstProduces = 0 - const first = runReconnectableTurn({ - store, - runId, - manifest, - workerId: 'worker-a', - produce: ({ register }) => { - firstProduces += 1 - return sandboxProducer({ - sandboxId: 'sbx-2', - sessionId: 'sess-2', - runId: 'run-2', - chunks: ['long', '-turn', '-work'], - register, - throwAt: 2, // dies after execution.started + 1 delta - }) - }, - replayEvent: (text) => ({ type: 'result', text }), - }) - await expect( - (async () => { - for await (const _ of first.stream) { - /* drain until the crash */ - } - })(), - ).rejects.toThrow('worker isolate died mid-turn') - expect(firstProduces).toBe(1) - // the handle survived the crash in `running` state - const survived = await store.loadStep(runId, 0) - expect((survived?.result as { handle: RunHandle }).handle.status).toBe('running') - - // ── Attempt 2: worker-b retries — must reconnect, NOT produce ───── - let secondProduces = 0 - let reconnectedWith: RunHandle | undefined - const reconnect: ReconnectProduce = (h) => { - reconnectedWith = h - // the replay endpoint streams the rest of the turn from the cursor - const assembled = '[replayed] full answer' - async function* stream(): AsyncGenerator { - yield { type: 'delta', text: '[replayed] full answer' } - yield { type: 'result', text: assembled } - } - return { stream: stream(), finalText: () => assembled } - } - const second = runReconnectableTurn({ - store, - runId, - manifest, - workerId: 'worker-b', - produce: ({ register }) => { - secondProduces += 1 - return sandboxProducer({ - sandboxId: 'sbx-2', - sessionId: 'sess-2', - runId: 'run-2', - chunks: ['should', '-not', '-run'], - register, - }) - }, - reconnect, - replayEvent: (text) => ({ type: 'result', text }), - }) - const events: FakeEvent[] = [] - for await (const e of second.stream) events.push(e) - - // produce was NOT called again — the 15-min turn was not restarted - expect(secondProduces).toBe(0) - expect(second.mode()).toBe('reconnected') - // reconnect received the handle the crashed worker checkpointed - expect(reconnectedWith?.sandboxId).toBe('sbx-2') - expect(reconnectedWith?.runId).toBe('run-2') - expect(reconnectedWith?.cursor).toBe('evt-0') // last cursor before crash - expect(second.finalText()).toBe('[replayed] full answer') - expect(second.record()?.status).toBe('completed') - expect(events).toEqual([ - { type: 'delta', text: '[replayed] full answer' }, - { type: 'result', text: '[replayed] full answer' }, - ]) - }) - - it('a completed handle replays cached text — neither produce nor reconnect runs', async () => { - const runId = 'chat:thread-1:2' - const manifest = makeManifest(2) - - // Attempt 1 completes cleanly. - const first = runReconnectableTurn({ - store, - runId, - manifest, - workerId: 'worker-a', - produce: ({ register }) => - sandboxProducer({ - sandboxId: 'sbx-3', - sessionId: 'sess-3', - runId: 'run-3', - chunks: ['cached ', 'result'], - register, - }), - replayEvent: (text) => ({ type: 'result', text }), - }) - for await (const _ of first.stream) { - /* drain */ - } - expect(first.mode()).toBe('fresh') - - // Attempt 2 — same runId. Worker died after the turn finished but before - // the response reached the client. Must replay, run nothing. - let produces = 0 - let reconnects = 0 - const second = runReconnectableTurn({ - store, - runId, - manifest, - workerId: 'worker-b', - produce: ({ register }) => { - produces += 1 - return sandboxProducer({ - sandboxId: 'sbx-3', - sessionId: 'sess-3', - runId: 'run-3', - chunks: ['x'], - register, - }) - }, - reconnect: () => { - reconnects += 1 - async function* stream(): AsyncGenerator { - yield { type: 'result', text: 'wrong' } - } - return { stream: stream(), finalText: () => 'wrong' } - }, - replayEvent: (text) => ({ type: 'result', text }), - }) - const events: FakeEvent[] = [] - for await (const e of second.stream) events.push(e) - - expect(produces).toBe(0) - expect(reconnects).toBe(0) - expect(second.mode()).toBe('replayed') - expect(second.finalText()).toBe('cached result') - expect(events).toEqual([{ type: 'result', text: 'cached result' }]) - }) - - it('a running handle with no reconnect callback falls through to a re-run', async () => { - const runId = 'chat:thread-1:3' - const manifest = makeManifest(3) - - // Attempt 1 — a tcloud-style turn (no reconnect) crashes mid-stream. - const tcloudProduce: ReconnectableProduce = ({ register }) => { - register({ kind: 'tcloud', status: 'running' }) - let assembled = '' - async function* stream(): AsyncGenerator { - assembled += 'partial' - yield { type: 'delta', text: 'partial' } - throw new Error('tcloud turn crashed') - } - return { stream: stream(), finalText: () => assembled } - } - const first = runReconnectableTurn({ - store, - runId, - manifest, - workerId: 'worker-a', - produce: tcloudProduce, - replayEvent: (text) => ({ type: 'result', text }), - // no reconnect — tcloud has no cross-process replay endpoint - }) - await expect( - (async () => { - for await (const _ of first.stream) { - /* drain */ - } - })(), - ).rejects.toThrow('tcloud turn crashed') - - // Attempt 2 — running handle exists but reconnect is absent: re-run. - let produces = 0 - const second = runReconnectableTurn({ - store, - runId, - manifest, - workerId: 'worker-a', - produce: ({ register }) => { - produces += 1 - register({ kind: 'tcloud', status: 'running' }) - let assembled = '' - async function* stream(): AsyncGenerator { - assembled += 'recovered' - yield { type: 'delta', text: 'recovered' } - yield { type: 'result', text: assembled } - } - return { stream: stream(), finalText: () => assembled } - }, - replayEvent: (text) => ({ type: 'result', text }), - }) - const events: FakeEvent[] = [] - for await (const e of second.stream) events.push(e) - - expect(produces).toBe(1) // re-ran from the top - expect(second.mode()).toBe('rerun') - expect(second.finalText()).toBe('recovered') - expect(events).toEqual([ - { type: 'delta', text: 'recovered' }, - { type: 'result', text: 'recovered' }, - ]) - }) - - it('a reconnect-stream failure fails the run — the error is not swallowed', async () => { - const runId = 'chat:thread-1:4' - const manifest = makeManifest(4) - - // Attempt 1 crashes mid-turn, leaving a running handle. - const first = runReconnectableTurn({ - store, - runId, - manifest, - workerId: 'worker-a', - produce: ({ register }) => - sandboxProducer({ - sandboxId: 'sbx-5', - sessionId: 'sess-5', - runId: 'run-5', - chunks: ['a', 'b'], - register, - throwAt: 2, - }), - replayEvent: (text) => ({ type: 'result', text }), - }) - await expect( - (async () => { - for await (const _ of first.stream) { - /* drain */ - } - })(), - ).rejects.toThrow('worker isolate died mid-turn') - - // Attempt 2 reconnects, but the replay endpoint itself errors. - const second = runReconnectableTurn({ - store, - runId, - manifest, - workerId: 'worker-b', - produce: ({ register }) => - sandboxProducer({ - sandboxId: 'sbx-5', - sessionId: 'sess-5', - runId: 'run-5', - chunks: ['x'], - register, - }), - reconnect: () => { - async function* stream(): AsyncGenerator { - yield { type: 'delta', text: 'partial-replay' } - throw new Error('replay endpoint returned 410 Gone') - } - return { stream: stream(), finalText: () => '' } - }, - replayEvent: (text) => ({ type: 'result', text }), - }) - await expect( - (async () => { - for await (const _ of second.stream) { - /* drain */ - } - })(), - ).rejects.toThrow('replay endpoint returned 410 Gone') - expect(second.mode()).toBe('reconnected') - expect(second.record()?.status).toBe('failed') - // the turn step is marked failed so a further retry re-runs cleanly - const turnStep = await store.loadStep(runId, 1) - expect(turnStep?.status).toBe('failed') - }) - - it('register advances the persisted cursor as frames stream', async () => { - const runId = 'chat:thread-1:5' - const handle = runReconnectableTurn({ - store, - runId, - manifest: makeManifest(5), - workerId: 'worker-a', - produce: ({ register }) => - sandboxProducer({ - sandboxId: 'sbx-6', - sessionId: 'sess-6', - runId: 'run-6', - chunks: ['one', 'two', 'three'], - register, - }), - replayEvent: (text) => ({ type: 'result', text }), - }) - for await (const _ of handle.stream) { - /* drain */ - } - // three deltas streamed → last cursor observed was evt-2; after - // completion the persisted handle is flipped to `completed`. - const persisted = await store.loadStep(runId, 0) - const h = (persisted?.result as { handle: RunHandle }).handle - expect(h.status).toBe('completed') - expect(h.cursor).toBe('evt-2') - expect(h.runId).toBe('run-6') - }) - }) -} diff --git a/src/durable/tests/session-supervisor-do.test.ts b/src/durable/tests/session-supervisor-do.test.ts new file mode 100644 index 0000000..5486b1a --- /dev/null +++ b/src/durable/tests/session-supervisor-do.test.ts @@ -0,0 +1,161 @@ +import { describe, expect, it } from 'vitest' +import { InMemoryDurableRunStore } from '../in-memory-store' +import { + ACTIVE_RUN_KEY, + createSessionSupervisorDO, + type DurableObjectStateLike, + type DurableObjectStorageLike, + type SupervisorHostConfig, +} from '../session-supervisor-do' +import { + runSupervisedTurn, + type SandboxReconnectAdapter, + type SupervisedEvent, +} from '../supervisor' +import type { DurableRunManifest, RunHandle } from '../types' + +const manifest = { + projectId: 'test', + scenarioId: 'persona-1', + task: { id: 'task-1', intent: 'chat', domain: 'test' }, + input: { q: 'hello' }, +} as unknown as DurableRunManifest + +class FakeStorage implements DurableObjectStorageLike { + map = new Map() + alarmAt: number | null = null + async get(key: string): Promise { + return this.map.get(key) as T | undefined + } + async put(key: string, value: T): Promise { + this.map.set(key, value) + } + async delete(key: string): Promise { + return this.map.delete(key) + } + async setAlarm(scheduledTime: number): Promise { + this.alarmAt = scheduledTime + } +} + +class FakeState implements DurableObjectStateLike { + storage = new FakeStorage() +} + +/** A fixed-length scripted adapter — `start` carries the handle on frame 0, + * `attach` resumes strictly past the cursor. */ +function scriptedAdapter(n: number): SandboxReconnectAdapter { + const ids = Array.from({ length: n }, (_, i) => i) + return { + async *start(): AsyncGenerator> { + for (const i of ids) { + const handle: RunHandle | undefined = + i === 0 ? { kind: 'sandbox', runId: 's1', status: 'running' } : undefined + yield { eventId: `e${i}`, payload: i, handle } + } + }, + async *attach( + _handle: RunHandle, + afterEventId: string | undefined, + ): AsyncGenerator> { + const found = afterEventId ? ids.findIndex((i) => `e${i}` === afterEventId) : -1 + for (let i = found + 1; i < ids.length; i++) yield { eventId: `e${i}`, payload: i } + }, + } +} + +function config( + store: InMemoryDurableRunStore, +): SupervisorHostConfig> { + return { + async resolveRun() { + return { store, runId: 'do-run', manifest, workerId: 'w-fetch', adapter: scriptedAdapter(4) } + }, + async resolveOrphan(runId) { + return { store, runId, manifest, workerId: 'w-alarm', adapter: scriptedAdapter(4) } + }, + encodeEvent: (event) => `data: ${event}\n`, + now: () => 1000, + } +} + +async function consume( + stream: AsyncGenerator, + limit = Number.POSITIVE_INFINITY, +): Promise { + const out: T[] = [] + for await (const v of stream) { + out.push(v) + if (out.length >= limit) break + } + return out +} + +describe('SessionSupervisorDO — fetch', () => { + it('streams the supervised events, arms the alarm, and clears the run on completion', async () => { + const store = new InMemoryDurableRunStore() + const DO = createSessionSupervisorDO(config(store)) + const state = new FakeState() + const instance = new DO(state, {}) + + const res = await instance.fetch(new Request('http://do/')) + const text = await res.text() + + expect(res.headers.get('content-type')).toBe('text/event-stream') + expect(text).toBe('data: 0\ndata: 1\ndata: 2\ndata: 3\n') + expect(state.storage.alarmAt).toBe(1000 + 60_000) + // The run finished — the orphan-tracking key is cleared. + expect(state.storage.map.has(ACTIVE_RUN_KEY)).toBe(false) + expect((await store.readStreamEvents('do-run')).map((e) => e.payload)).toEqual([0, 1, 2, 3]) + }) + + it('404s when no run resolves for the request', async () => { + const store = new InMemoryDurableRunStore() + const DO = createSessionSupervisorDO({ + ...config(store), + async resolveRun() { + return undefined + }, + }) + const res = await new DO(new FakeState(), {}).fetch(new Request('http://do/')) + expect(res.status).toBe(404) + }) +}) + +describe('SessionSupervisorDO — alarm re-attaches an orphan', () => { + it('drives a run abandoned by a dropped response stream to completion', async () => { + const store = new InMemoryDurableRunStore() + + // A fetch drained 2 of 4 events, then its worker died — simulate by + // partially consuming a supervised turn and lapsing the lease. + await consume( + runSupervisedTurn({ + store, + runId: 'do-run', + manifest, + workerId: 'w-dead', + adapter: scriptedAdapter(4), + }).stream, + 2, + ) + store._expireLease('do-run') + expect(await store.readStreamEvents('do-run')).toHaveLength(2) + + // The DO still has the run recorded; the orphan-check alarm fires. + const DO = createSessionSupervisorDO(config(store)) + const state = new FakeState() + await state.storage.put(ACTIVE_RUN_KEY, 'do-run') + await new DO(state, {}).alarm() + + // The run was re-driven headlessly to completion and cleared. + expect((await store.readStreamEvents('do-run')).map((e) => e.payload)).toEqual([0, 1, 2, 3]) + expect(store._inspect('do-run')?.status).toBe('completed') + expect(state.storage.map.has(ACTIVE_RUN_KEY)).toBe(false) + }) + + it('is a no-op when no run is recorded', async () => { + const store = new InMemoryDurableRunStore() + const DO = createSessionSupervisorDO(config(store)) + await expect(new DO(new FakeState(), {}).alarm()).resolves.toBeUndefined() + }) +}) diff --git a/src/durable/tests/supervisor.test.ts b/src/durable/tests/supervisor.test.ts new file mode 100644 index 0000000..8efd013 --- /dev/null +++ b/src/durable/tests/supervisor.test.ts @@ -0,0 +1,247 @@ +import { describe, expect, it, vi } from 'vitest' +import { InMemoryDurableRunStore } from '../in-memory-store' +import { + type RunSupervisorOptions, + runSupervisedTurn, + type SandboxReconnectAdapter, + type SupervisedEvent, +} from '../supervisor' +import type { DurableRunManifest, RunHandle } from '../types' + +const manifest = { + projectId: 'test', + scenarioId: 'persona-1', + task: { id: 'task-1', intent: 'chat', domain: 'test' }, + input: { q: 'hello' }, +} as unknown as DurableRunManifest + +const HANDLE: RunHandle = { + kind: 'sandbox', + sandboxId: 'sbx-1', + runId: 'sbx-run-1', + status: 'running', +} + +interface ScriptEvent { + eventId: string + payload: number +} + +/** A controllable SandboxReconnectAdapter over a fixed script of events. + * `start` carries the handle on the first frame; `attach` resumes past the + * cursor (or re-yields the boundary when `reYieldBoundary`). */ +class ScriptedAdapter implements SandboxReconnectAdapter { + startCalls = 0 + attachCalls = 0 + constructor( + private readonly script: ScriptEvent[], + private readonly opts: { reYieldBoundary?: boolean } = {}, + ) {} + + async *start(): AsyncGenerator> { + this.startCalls += 1 + for (const [i, e] of this.script.entries()) { + yield { eventId: e.eventId, payload: e.payload, handle: i === 0 ? HANDLE : undefined } + } + } + + async *attach( + _handle: RunHandle, + afterEventId: string | undefined, + ): AsyncGenerator> { + this.attachCalls += 1 + const found = afterEventId ? this.script.findIndex((e) => e.eventId === afterEventId) : -1 + const from = found >= 0 ? found + (this.opts.reYieldBoundary ? 0 : 1) : 0 + for (let i = from; i < this.script.length; i++) { + const e = this.script[i]! + yield { eventId: e.eventId, payload: e.payload } + } + } +} + +const script: ScriptEvent[] = [0, 1, 2, 3, 4, 5].map((n) => ({ eventId: `e${n}`, payload: n })) + +function opts( + store: InMemoryDurableRunStore, + workerId: string, + adapter: SandboxReconnectAdapter, + extra: Partial> = {}, +): RunSupervisorOptions { + return { store, runId: 'turn-1', manifest, workerId, adapter, ...extra } +} + +async function consume( + stream: AsyncGenerator, + limit = Number.POSITIVE_INFINITY, +): Promise { + const out: T[] = [] + for await (const v of stream) { + out.push(v) + if (out.length >= limit) break + } + return out +} + +describe('runSupervisedTurn — fresh', () => { + it('drains the adapter stream into the durable log and forwards it', async () => { + const store = new InMemoryDurableRunStore() + const sup = runSupervisedTurn(opts(store, 'w1', new ScriptedAdapter(script))) + const out = await consume(sup.stream) + + expect(out).toEqual([0, 1, 2, 3, 4, 5]) + expect(sup.mode()).toBe('fresh') + expect(sup.record()?.status).toBe('completed') + const logged = await store.readStreamEvents('turn-1') + expect(logged.map((e) => e.payload)).toEqual([0, 1, 2, 3, 4, 5]) + expect(logged.map((e) => e.seq)).toEqual([0, 1, 2, 3, 4, 5]) + }) + + it('persists the run handle and flips it to completed on a clean drain', async () => { + const store = new InMemoryDurableRunStore() + const sup = runSupervisedTurn(opts(store, 'w1', new ScriptedAdapter(script))) + await consume(sup.stream) + expect(sup.record()?.handle).toMatchObject({ runId: 'sbx-run-1', status: 'completed' }) + }) +}) + +describe('runSupervisedTurn — replayed', () => { + it('a completed turn replays the logged stream without touching the adapter', async () => { + const store = new InMemoryDurableRunStore() + await consume(runSupervisedTurn(opts(store, 'w1', new ScriptedAdapter(script))).stream) + + const neverAdapter: SandboxReconnectAdapter = { + start() { + throw new Error('start must not run on replay') + }, + attach() { + throw new Error('attach must not run on replay') + }, + } + const replay = runSupervisedTurn(opts(store, 'w2', neverAdapter)) + const out = await consume(replay.stream) + expect(out).toEqual([0, 1, 2, 3, 4, 5]) + expect(replay.mode()).toBe('replayed') + }) +}) + +describe('runSupervisedTurn — chaos: cross-worker resume', () => { + it('a turn killed mid-stream resumes on a fresh worker with no gap and no duplicate', async () => { + const store = new InMemoryDurableRunStore() + + // Worker 1 drains 3 of 6 events, then its isolate "dies" — stop pulling. + const sup1 = runSupervisedTurn(opts(store, 'w1', new ScriptedAdapter(script))) + const partial = await consume(sup1.stream, 3) + expect(partial).toEqual([0, 1, 2]) + + // The dead worker no longer heartbeats — its lease lapses. + store._expireLease('turn-1') + expect(await store.readStreamEvents('turn-1')).toHaveLength(3) + + // Worker 2 picks the run up. The sandbox container outlived worker 1, so + // the adapter still has the whole script. + const adapter2 = new ScriptedAdapter(script) + const sup2 = runSupervisedTurn(opts(store, 'w2', adapter2)) + const out = await consume(sup2.stream) + + expect(sup2.mode()).toBe('resumed') + expect(adapter2.attachCalls).toBe(1) + expect(adapter2.startCalls).toBe(0) + // Worker 2's stream is the COMPLETE turn, each event exactly once. + expect(out).toEqual([0, 1, 2, 3, 4, 5]) + expect(sup2.record()?.status).toBe('completed') + // The durable log holds the full sequence with monotonic seqs — no gap. + const logged = await store.readStreamEvents('turn-1') + expect(logged.map((e) => e.payload)).toEqual([0, 1, 2, 3, 4, 5]) + expect(logged.map((e) => e.seq)).toEqual([0, 1, 2, 3, 4, 5]) + }) + + it('dedups the reconnect seam when the adapter re-yields the boundary event', async () => { + const store = new InMemoryDurableRunStore() + const sup1 = runSupervisedTurn(opts(store, 'w1', new ScriptedAdapter(script))) + await consume(sup1.stream, 3) + store._expireLease('turn-1') + + // adapter2.attach re-yields the boundary event e2 (inclusive resume). + const adapter2 = new ScriptedAdapter(script, { reYieldBoundary: true }) + const sup2 = runSupervisedTurn(opts(store, 'w2', adapter2)) + const out = await consume(sup2.stream) + + // The re-yielded e2 is idempotent on append → not double-forwarded. + expect(out).toEqual([0, 1, 2, 3, 4, 5]) + const logged = await store.readStreamEvents('turn-1') + expect(logged.map((e) => e.eventId)).toEqual(['e0', 'e1', 'e2', 'e3', 'e4', 'e5']) + }) + + it('survives two successive mid-stream deaths', async () => { + const store = new InMemoryDurableRunStore() + await consume(runSupervisedTurn(opts(store, 'w1', new ScriptedAdapter(script))).stream, 2) + store._expireLease('turn-1') + await consume(runSupervisedTurn(opts(store, 'w2', new ScriptedAdapter(script))).stream, 4) + store._expireLease('turn-1') + const sup3 = runSupervisedTurn(opts(store, 'w3', new ScriptedAdapter(script))) + const out = await consume(sup3.stream) + expect(out).toEqual([0, 1, 2, 3, 4, 5]) + expect((await store.readStreamEvents('turn-1')).map((e) => e.seq)).toEqual([0, 1, 2, 3, 4, 5]) + }) +}) + +describe('runSupervisedTurn — lease + heartbeat', () => { + it('renews the lease while draining a long turn', async () => { + const store = new InMemoryDurableRunStore() + const renew = vi.spyOn(store, 'renewLease') + let clock = 0 + const sup = runSupervisedTurn( + opts(store, 'w1', new ScriptedAdapter(script), { + heartbeatMs: 10, + now: () => { + clock += 100 // each call advances well past the heartbeat window + return clock + }, + }), + ) + await consume(sup.stream) + expect(renew).toHaveBeenCalled() + }) + + it('aborts without writing terminal state when the lease is lost mid-drain', async () => { + const store = new InMemoryDurableRunStore() + vi.spyOn(store, 'renewLease').mockResolvedValue({ ok: false }) + let clock = 0 + const sup = runSupervisedTurn( + opts(store, 'w1', new ScriptedAdapter(script), { + heartbeatMs: 1, + now: () => { + clock += 100 + return clock + }, + }), + ) + await expect(consume(sup.stream)).rejects.toThrow(/lease lost/) + // The supervisor that lost the lease must not mark the run failed — + // the new owner owns the terminal state. + expect(store._inspect('turn-1')?.status).not.toBe('failed') + }) +}) + +describe('SandboxReconnectAdapter — conformance', () => { + it('start yields a running handle with a runId on an early frame', async () => { + const adapter = new ScriptedAdapter(script) + const first = await adapter.start().next() + expect(first.done).toBe(false) + expect(first.value?.handle).toMatchObject({ status: 'running', runId: 'sbx-run-1' }) + }) + + it('attach yields only events strictly after the cursor', async () => { + const adapter = new ScriptedAdapter(script) + const out: number[] = [] + for await (const e of adapter.attach(HANDLE, 'e2')) out.push(e.payload) + expect(out).toEqual([3, 4, 5]) + }) + + it('attach from an undefined cursor yields the whole run', async () => { + const adapter = new ScriptedAdapter(script) + const out: number[] = [] + for await (const e of adapter.attach(HANDLE, undefined)) out.push(e.payload) + expect(out).toEqual([0, 1, 2, 3, 4, 5]) + }) +}) diff --git a/src/durable/types.ts b/src/durable/types.ts index dac576a..617fa6d 100644 --- a/src/durable/types.ts +++ b/src/durable/types.ts @@ -79,6 +79,44 @@ export interface EventRecord { emittedAt: string } +/** + * A pointer to a substrate run that outlives the worker isolate — the sandbox + * container is orchestrator-managed and survives a Worker death or a Durable + * Object migration. Persisted on the run row so a fresh supervisor re-attaches + * to the in-flight run instead of re-prompting. + */ +export interface RunHandle { + /** Which substrate owns the run. `sandbox` runs are reconnectable; + * `tcloud` runs have no cross-process replay endpoint. */ + kind: 'sandbox' | 'tcloud' + /** Orchestrator-managed sandbox id — stable across worker isolates. */ + sandboxId?: string + /** Sandbox conversation/session id. */ + sessionId?: string + /** The substrate run id (the sandbox SDK's `executionId`). The replay + * endpoint keys on it. */ + runId?: string + /** Lifecycle of the substrate run as last observed. */ + status: 'running' | 'completed' | 'failed' + /** Last substrate event id seen — the adapter's reconnect cursor. */ + cursor?: string +} + +/** + * One event in a run's ordered, replayable stream log. The supervisor drains + * a run's event stream into this log as it flows, so replay is guaranteed by + * the substrate rather than by the sandbox runtime's own buffering. + */ +export interface StreamEventRecord { + runId: string + /** Monotonic 0-based sequence — the store's ordering + cursor. */ + seq: number + /** Producer-supplied stable id — the dedup key and the substrate cursor. */ + eventId: string + payload: unknown + appendedAt: string +} + export type RunStatus = 'pending' | 'running' | 'completed' | 'failed' | 'suspended' export interface RunOutcome { @@ -116,6 +154,9 @@ export interface RunRecord { leaseExpiresAt?: string outcome?: RunOutcome stepCount: number + /** Pointer to the in-flight substrate run, when one has been registered. + * A fresh supervisor re-attaches by it. */ + handle?: RunHandle } /** @@ -195,6 +236,28 @@ export interface DurableRunStore { /** Load the cached event payload if it has been emitted. */ loadEvent(runId: string, key: string): Promise + /** + * Append an event to the run's ordered stream log. The store assigns the + * monotonic `seq`. Idempotent on `eventId`: re-appending a known id is a + * no-op that returns the existing record under `accepted: false` — so an + * adapter that re-yields a boundary event on reconnect cannot double-log. + */ + appendStreamEvent(input: { + runId: string + eventId: string + payload: unknown + }): Promise<{ accepted: boolean; record: StreamEventRecord }> + + /** + * Read the stream log in `seq` order. `afterSeq` (exclusive) resumes a + * reader from a cursor; omit for the whole log. + */ + readStreamEvents(runId: string, afterSeq?: number): Promise> + + /** Persist the run handle — the pointer a fresh supervisor re-attaches by. + * One per run; overwrites. */ + setRunHandle(input: { runId: string; handle: RunHandle }): Promise + /** Cleanup hook for in-memory / fs stores; no-op for D1. Idempotent. */ close(): Promise }