Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions src/durable/d1-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import {
type DurableRunManifest,
type DurableRunStore,
type EventRecord,
type RunHandle,
type RunOutcome,
type RunRecord,
type StepError,
type StepKind,
type StepRecord,
type StreamEventRecord,
} from './types'

const DEFAULT_LEASE_MS = 30_000
Expand Down Expand Up @@ -59,6 +61,15 @@ interface RunRow {
lease_expires_at: string | null
outcome_json: string | null
step_count: number
handle_json: string | null
}

interface StreamEventRow {
run_id: string
seq: number
event_id: string
payload_json: string | null
appended_at: string
}

interface StepRow {
Expand Down Expand Up @@ -383,6 +394,54 @@ export class D1DurableRunStore implements DurableRunStore {
return row ? rowToEventRecord(row) : undefined
}

async appendStreamEvent(input: {
runId: string
eventId: string
payload: unknown
}): ReturnType<DurableRunStore['appendStreamEvent']> {
const nowIso = new Date(this.now()).toISOString()
// INSERT OR IGNORE — the UNIQUE(run_id, event_id) index makes a re-append
// a no-op; seq is the next monotonic value, computed atomically with the
// insert. A new event_id never collides on the (run_id, seq) PK.
const res = await this.db
.prepare(
`INSERT OR IGNORE INTO durable_stream_events (run_id, seq, event_id, payload_json, appended_at)
VALUES (
?,
(SELECT COALESCE(MAX(seq), -1) + 1 FROM durable_stream_events WHERE run_id = ?),
?, ?, ?
)`,
)
.bind(input.runId, input.runId, input.eventId, JSON.stringify(input.payload ?? null), nowIso)
.run()
const accepted = (res.meta?.changes ?? 0) > 0
const row = await this.db
.prepare('SELECT * FROM durable_stream_events WHERE run_id = ? AND event_id = ?')
.bind(input.runId, input.eventId)
.first<StreamEventRow>()
if (!row) throw new Error('durable-runs: appendStreamEvent failed to persist or read back')
return { accepted, record: rowToStreamEventRecord(row) }
}

async readStreamEvents(
runId: string,
afterSeq?: number,
): Promise<ReadonlyArray<StreamEventRecord>> {
const { results } = await this.db
.prepare('SELECT * FROM durable_stream_events WHERE run_id = ? AND seq > ? ORDER BY seq')
.bind(runId, afterSeq ?? -1)
.all<StreamEventRow>()
return results.map(rowToStreamEventRecord)
}

async setRunHandle(input: { runId: string; handle: RunHandle }): Promise<void> {
const nowIso = new Date(this.now()).toISOString()
await this.db
.prepare('UPDATE durable_runs SET handle_json = ?, updated_at = ? WHERE run_id = ?')
.bind(JSON.stringify(input.handle), nowIso, input.runId)
.run()
}

async close(): Promise<void> {
// D1 binding lifecycle is owned by the runtime; no-op.
}
Expand Down Expand Up @@ -432,6 +491,17 @@ function rowToRunRecord(row: RunRow): RunRecord {
leaseExpiresAt: row.lease_expires_at ?? undefined,
outcome: row.outcome_json ? (JSON.parse(row.outcome_json) as RunOutcome) : undefined,
stepCount: row.step_count,
handle: row.handle_json ? (JSON.parse(row.handle_json) as RunHandle) : undefined,
}
}

function rowToStreamEventRecord(row: StreamEventRow): StreamEventRecord {
return {
runId: row.run_id,
seq: row.seq,
eventId: row.event_id,
payload: row.payload_json ? JSON.parse(row.payload_json) : null,
appendedAt: row.appended_at,
}
}

Expand Down
53 changes: 53 additions & 0 deletions src/durable/file-system-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ import {
type DurableRunManifest,
type DurableRunStore,
type EventRecord,
type RunHandle,
type RunOutcome,
type RunRecord,
type StepError,
type StepKind,
type StepRecord,
type StreamEventRecord,
} from './types'

const DEFAULT_LEASE_MS = 30_000
Expand Down Expand Up @@ -86,6 +88,7 @@ export class FileSystemDurableRunStore implements DurableRunStore {
// Touch the jsonl files so listing them later doesn't ENOENT.
await appendFile(join(dir, 'steps.jsonl'), '', 'utf8')
await appendFile(join(dir, 'events.jsonl'), '', 'utf8')
await appendFile(join(dir, 'stream-events.jsonl'), '', 'utf8')
return { run: record, completedSteps: [], leaseExpiresAt }
}

Expand Down Expand Up @@ -287,10 +290,60 @@ export class FileSystemDurableRunStore implements DurableRunStore {
return undefined
}

async appendStreamEvent(input: {
runId: string
eventId: string
payload: unknown
}): ReturnType<DurableRunStore['appendStreamEvent']> {
const existing = await this.readStreamEventsRaw(input.runId)
const dup = existing.find((e) => e.eventId === input.eventId)
if (dup) return { accepted: false, record: dup }
const rec: StreamEventRecord = {
runId: input.runId,
seq: existing.length,
eventId: input.eventId,
payload: input.payload,
appendedAt: new Date(this.now()).toISOString(),
}
await appendFile(
join(this.runDir(input.runId), 'stream-events.jsonl'),
`${JSON.stringify(rec)}\n`,
'utf8',
)
return { accepted: true, record: rec }
}

async readStreamEvents(
runId: string,
afterSeq?: number,
): Promise<ReadonlyArray<StreamEventRecord>> {
const cutoff = afterSeq ?? -1
return (await this.readStreamEventsRaw(runId)).filter((e) => e.seq > cutoff)
}

async setRunHandle(input: { runId: string; handle: RunHandle }): Promise<void> {
const record = await this.readRun(input.runId)
record.handle = input.handle
record.updatedAt = new Date(this.now()).toISOString()
await this.writeRun(record)
}

async close(): Promise<void> {
// No persistent handles to close.
}

private async readStreamEventsRaw(runId: string): Promise<StreamEventRecord[]> {
const path = join(this.runDir(runId), 'stream-events.jsonl')
if (!existsSync(path)) return []
const content = await readFile(path, 'utf8')
const out: StreamEventRecord[] = []
for (const line of content.split('\n')) {
if (!line) continue
out.push(JSON.parse(line) as StreamEventRecord)
}
return out.sort((a, b) => a.seq - b.seq)
}

/** @internal — used by tests to list runs in the store. */
async _listRunIds(): Promise<string[]> {
if (!existsSync(this.root)) return []
Expand Down
41 changes: 40 additions & 1 deletion src/durable/in-memory-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import type {
DurableRunManifest,
DurableRunStore,
EventRecord,
RunHandle,
RunOutcome,
RunRecord,
StepError,
StepKind,
StepRecord,
StreamEventRecord,
} from './types'
import {
DurableRunDivergenceError,
Expand All @@ -28,6 +30,8 @@ interface RunState {
record: RunRecord
steps: Map<number, StepRecord>
events: Map<string, EventRecord>
/** Ordered, replayable event-stream log — `seq` is the array index. */
streamEvents: StreamEventRecord[]
}

export class InMemoryDurableRunStore implements DurableRunStore {
Expand Down Expand Up @@ -62,7 +66,7 @@ export class InMemoryDurableRunStore implements DurableRunStore {
leaseExpiresAt,
stepCount: 0,
}
state = { record, steps: new Map(), events: new Map() }
state = { record, steps: new Map(), events: new Map(), streamEvents: [] }
this.runs.set(input.runId, state)
return { run: { ...record }, completedSteps: [], leaseExpiresAt }
}
Expand Down Expand Up @@ -256,6 +260,41 @@ export class InMemoryDurableRunStore implements DurableRunStore {
return rec ? { ...rec } : undefined
}

async appendStreamEvent(input: {
runId: string
eventId: string
payload: unknown
}): ReturnType<DurableRunStore['appendStreamEvent']> {
const state = this.requireRun(input.runId)
const existing = state.streamEvents.find((e) => e.eventId === input.eventId)
if (existing) return { accepted: false, record: { ...existing } }
const rec: StreamEventRecord = {
runId: input.runId,
seq: state.streamEvents.length,
eventId: input.eventId,
payload: input.payload,
appendedAt: new Date(this.now()).toISOString(),
}
state.streamEvents.push(rec)
return { accepted: true, record: { ...rec } }
}

async readStreamEvents(
runId: string,
afterSeq?: number,
): Promise<ReadonlyArray<StreamEventRecord>> {
const state = this.runs.get(runId)
if (!state) return []
const cutoff = afterSeq ?? -1
return state.streamEvents.filter((e) => e.seq > cutoff).map((e) => ({ ...e }))
}

async setRunHandle(input: { runId: string; handle: RunHandle }): Promise<void> {
const state = this.requireRun(input.runId)
state.record.handle = { ...input.handle }
state.record.updatedAt = new Date(this.now()).toISOString()
}

async close(): Promise<void> {
this.runs.clear()
}
Expand Down
35 changes: 22 additions & 13 deletions src/durable/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,33 @@ export { D1DurableRunStore } from './d1-store'
export { FileSystemDurableRunStore } from './file-system-store'
export { canonicalHash, canonicalJson, deriveWorkerId, manifestHash, stepId } from './identity'
export { InMemoryDurableRunStore } from './in-memory-store'
// ── Cross-worker sandbox-reconnect durability ─────────────────────────
// Checkpoints a substrate run handle at turn start so a fresh worker can
// re-attach to an in-flight sandbox run instead of re-running a long turn.
export type {
ReconnectableProduce,
ReconnectableTurnHandle,
ReconnectableTurnMode,
ReconnectableTurnProducer,
ReconnectProduce,
RunHandle,
RunReconnectableTurnOptions,
} from './run-handle'
export { runReconnectableTurn } from './run-handle'
export type { DurableContext, RunDurableInput, RunDurableResult } from './runner'
export { runDurable } from './runner'
// Canonical D1 schema string + current version. Consumers wire via
// await env.DB.exec(DURABLE_SCHEMA_SQL)
// during one-time bootstrap. `src/durable/schema.sql` is the source of
// truth; `schema.ts` is the build-bundled string that ships in dist/.
export { DURABLE_SCHEMA_SQL, DURABLE_SCHEMA_VERSION } from './schema'
// ── Durable-run supervisor — cross-worker / cross-DO durability ───────
// Relocates the durability boundary off the ephemeral worker isolate: the
// supervisor drains a run's event stream into the substrate's own log, and
// a fresh supervisor re-attaches from the persisted cursor instead of
// re-prompting. SessionSupervisorDO hosts it on a Cloudflare Durable Object.
export type {
DurableObjectStateLike,
DurableObjectStorageLike,
SessionSupervisorDO,
SupervisorHostConfig,
} from './session-supervisor-do'
export { createSessionSupervisorDO } from './session-supervisor-do'
export type {
RunSupervisorOptions,
SandboxReconnectAdapter,
SupervisedEvent,
SupervisedRunHandle,
SupervisedRunMode,
} from './supervisor'
export { runSupervisedTurn } from './supervisor'
// ── Durable turn primitive ────────────────────────────────────────────
// Streaming, backend-agnostic, checkpoint+replay durable turn. The single
// reusable primitive every product chat handler routes through.
Expand All @@ -58,13 +65,15 @@ export type {
DurableRunManifest,
DurableRunStore,
EventRecord,
RunHandle,
RunOutcome,
RunRecord,
RunStatus,
StepError,
StepKind,
StepRecord,
StepStatus,
StreamEventRecord,
} from './types'
export {
DurableAwaitEventTimeoutError,
Expand Down
Loading
Loading