Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions resources/models/openaiStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/**
* `openaiStream()` — format an internal `generateStream()` token iterator into the
* OpenAI-compatible Server-Sent Events shape so unmodified OpenAI / LangChain.js /
* Vercel AI SDK clients can consume a Harper chat-completions stream (#514, #510).
*
* The yielded `{ data }` messages pass through Harper's existing `text/event-stream`
* serializer (`server/serverHelpers/contentTypes.ts`) unchanged: an object `data`
* is JSON-stringified to `data: {json}\n\n`, and the terminal `{ data: '[DONE]' }`
* sentinel serializes to `data: [DONE]\n\n` — exactly OpenAI's wire format. We emit
* NO SSE `event:` or `id:` lines: OpenAI's stream is `data:`-only and the completion
* id lives inside the JSON payload, not as an SSE field.
*/

import { randomUUID } from 'node:crypto';
import type { GenerateChunk, GenerateResult } from './types.ts';

type OpenAIFinishReason = GenerateResult['finishReason'];

export interface OpenAIStreamOptions {
/** Advertised model name echoed back on every chunk. */
model?: string;
/** Reuse a caller-supplied completion id across all chunks; one is generated when omitted. */
id?: string;
}

interface OpenAIToolCallDelta {
index: number;
id: string;
type: 'function';
function: { name: string; arguments: string };
}

interface OpenAIDelta {
role?: 'assistant';
content?: string;
tool_calls?: OpenAIToolCallDelta[];
}

interface OpenAIChunk {
id: string;
object: 'chat.completion.chunk';
created: number;
model: string;
choices: Array<{ index: number; delta: OpenAIDelta; finish_reason: OpenAIFinishReason | null }>;
}

/** SSE message envelope consumed by Harper's `text/event-stream` serializer. */
export interface OpenAIStreamMessage {
data: OpenAIChunk | string;
}

/**
* Wrap a `GenerateChunk` async iterable as OpenAI `chat.completion.chunk` SSE messages,
* terminated by the `[DONE]` sentinel. Content deltas stream inline; tool calls are
* assembled and flushed once (see the tool-call note below).
*/
export async function* openaiStream(
tokens: AsyncIterable<GenerateChunk>,
opts: OpenAIStreamOptions = {}
): AsyncGenerator<OpenAIStreamMessage> {
const id = opts.id ?? `chatcmpl-${randomUUID().replaceAll('-', '')}`;
const created = Math.floor(Date.now() / 1000);
const model = opts.model ?? '';

let roleSent = false;
let finishReason: OpenAIFinishReason | undefined;

// Tool-call assembly. Backends pre-parse arguments to objects and may re-send the
// same id with partial fields (see `mergeToolCallDelta` in agentLoop.ts), so we
// accumulate by id here and emit each call's arguments as ONE stringified blob.
// Emitting incremental fragments would corrupt the OpenAI client's concatenation
// (`{"a":1}` + `{"b":2}` → invalid JSON) — Harper's already-buffered upstream model
// means we cannot faithfully reproduce per-token argument fragments anyway.
const toolAssembly = new Map<string, { index: number; name?: string; arguments: object }>();

const chunk = (delta: OpenAIDelta, finish: OpenAIFinishReason | null): OpenAIStreamMessage => ({
data: {
id,
object: 'chat.completion.chunk',
created,
model,
choices: [{ index: 0, delta, finish_reason: finish }],
},
});

for await (const token of tokens) {
if (token.deltaContent !== undefined) {
const delta: OpenAIDelta = {};
if (!roleSent) {
delta.role = 'assistant';
roleSent = true;
}
delta.content = token.deltaContent;
yield chunk(delta, null);
}
if (token.deltaToolCalls) {
for (const incoming of token.deltaToolCalls) {
if (!incoming.id) continue;
const existing = toolAssembly.get(incoming.id) ?? { index: toolAssembly.size, arguments: {} };
if (incoming.name) existing.name = incoming.name;
if (incoming.arguments) existing.arguments = { ...existing.arguments, ...incoming.arguments };
toolAssembly.set(incoming.id, existing);
}
}
if (token.finishReason) finishReason = token.finishReason;
}

if (toolAssembly.size > 0) {
const toolCalls: OpenAIToolCallDelta[] = [];
for (const [callId, call] of toolAssembly) {
toolCalls.push({
index: call.index,
id: callId,
type: 'function',
function: { name: call.name ?? '', arguments: JSON.stringify(call.arguments) },
});
}
const delta: OpenAIDelta = {};
if (!roleSent) {
delta.role = 'assistant';
roleSent = true;
}
delta.tool_calls = toolCalls;
yield chunk(delta, null);
}

const finish: OpenAIFinishReason = finishReason ?? (toolAssembly.size > 0 ? 'tool_calls' : 'stop');
const terminalDelta: OpenAIDelta = {};
if (!roleSent) terminalDelta.role = 'assistant';
yield chunk(terminalDelta, finish);

yield { data: '[DONE]' };
}
137 changes: 137 additions & 0 deletions unitTests/resources/models/openaiStream.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
'use strict';

const assert = require('node:assert/strict');
const { openaiStream } = require('#src/resources/models/openaiStream');
const { contentTypes } = require('#src/server/serverHelpers/contentTypes');

// The real SSE serializer — assert the helper's messages pass through it unchanged.
const sse = contentTypes.get('text/event-stream');

async function collect(iter) {
const out = [];
for await (const message of iter) out.push(message);
return out;
}

async function* gen(...chunks) {
for (const c of chunks) yield c;
}

describe('openaiStream', () => {
it('formats content deltas as OpenAI chat.completion.chunk messages', async () => {
const msgs = await collect(
openaiStream(gen({ deltaContent: 'Hello' }, { deltaContent: ' world' }, { finishReason: 'stop' }), {
model: 'llama-3.3-70b',
id: 'chatcmpl-test',
})
);

const first = msgs[0].data;
assert.equal(first.object, 'chat.completion.chunk');
assert.equal(first.id, 'chatcmpl-test');
assert.equal(first.model, 'llama-3.3-70b');
assert.equal(first.choices[0].index, 0);
assert.equal(first.choices[0].delta.role, 'assistant');
assert.equal(first.choices[0].delta.content, 'Hello');
assert.equal(first.choices[0].finish_reason, null);

// role is announced once, only on the first chunk
assert.equal(msgs[1].data.choices[0].delta.role, undefined);
assert.equal(msgs[1].data.choices[0].delta.content, ' world');

// terminal chunk: empty delta + finish_reason, followed by the sentinel
const terminal = msgs[msgs.length - 2].data;
assert.deepEqual(terminal.choices[0].delta, {});
assert.equal(terminal.choices[0].finish_reason, 'stop');
assert.equal(msgs.at(-1).data, '[DONE]');
});

it('emits a terminal [DONE] sentinel that serializes to `data: [DONE]`', async () => {
const msgs = await collect(openaiStream(gen({ deltaContent: 'hi' }), {}));
const done = msgs.at(-1);
assert.deepEqual(done, { data: '[DONE]' });
assert.equal(sse.serialize(done), 'data: [DONE]\n\n');
});

it('passes through the real SSE serializer to OpenAI wire shape (no event/id lines)', async () => {
const msgs = await collect(openaiStream(gen({ deltaContent: 'hi' }), { model: 'm', id: 'chatcmpl-x' }));
const wire = msgs.map((m) => sse.serialize(m)).join('');

assert.ok(!/^event:/m.test(wire), 'must not emit SSE `event:` lines');
assert.ok(!/^id:/m.test(wire), 'must not emit SSE `id:` lines');
assert.ok(wire.endsWith('data: [DONE]\n\n'), 'must end with the [DONE] sentinel');

// first event is a parseable OpenAI chunk
const firstData = wire.split('\n\n')[0].replace(/^data: /, '');
const parsed = JSON.parse(firstData);
assert.equal(parsed.object, 'chat.completion.chunk');
assert.equal(parsed.choices[0].delta.content, 'hi');
});

it('assembles streamed tool-call deltas into one tool_calls delta with stringified arguments', async () => {
const msgs = await collect(
openaiStream(
gen(
{ deltaToolCalls: [{ id: 'call_1', name: 'get_weather' }] },
{ deltaToolCalls: [{ id: 'call_1', arguments: { city: 'NYC' } }] },
{ deltaToolCalls: [{ id: 'call_1', arguments: { unit: 'c' } }] },
{ finishReason: 'tool_calls' }
),
{ model: 'm' }
)
);

const toolChunk = msgs.map((m) => m.data).find((d) => typeof d === 'object' && d.choices[0].delta.tool_calls);
assert.ok(toolChunk, 'expected a tool_calls delta chunk');

const call = toolChunk.choices[0].delta.tool_calls[0];
assert.equal(call.index, 0);
assert.equal(call.id, 'call_1');
assert.equal(call.type, 'function');
assert.equal(call.function.name, 'get_weather');
// arguments merged across deltas and stringified exactly once → valid JSON
assert.deepEqual(JSON.parse(call.function.arguments), { city: 'NYC', unit: 'c' });

assert.equal(msgs[msgs.length - 2].data.choices[0].finish_reason, 'tool_calls');
});

it('indexes multiple distinct tool calls in arrival order', async () => {
const msgs = await collect(
openaiStream(
gen({
deltaToolCalls: [
{ id: 'a', name: 'first', arguments: { x: 1 } },
{ id: 'b', name: 'second', arguments: { y: 2 } },
],
})
)
);
const toolChunk = msgs.map((m) => m.data).find((d) => typeof d === 'object' && d.choices[0].delta.tool_calls);
const calls = toolChunk.choices[0].delta.tool_calls;
assert.equal(calls.length, 2);
assert.deepEqual(
calls.map((c) => [c.index, c.id]),
[
[0, 'a'],
[1, 'b'],
]
);
// no explicit finishReason but tool calls present → 'tool_calls'
assert.equal(msgs[msgs.length - 2].data.choices[0].finish_reason, 'tool_calls');
});

it('handles an empty stream: announces role + stop + [DONE]', async () => {
const msgs = await collect(openaiStream(gen(), {}));
assert.equal(msgs.length, 2);
assert.equal(msgs[0].data.choices[0].delta.role, 'assistant');
assert.equal(msgs[0].data.choices[0].finish_reason, 'stop');
assert.deepEqual(msgs[1], { data: '[DONE]' });
});

it('generates a chatcmpl- id when none is supplied, stable across all chunks', async () => {
const msgs = await collect(openaiStream(gen({ deltaContent: 'a' }, { deltaContent: 'b' })));
const ids = msgs.filter((m) => typeof m.data === 'object').map((m) => m.data.id);
assert.ok(ids[0].startsWith('chatcmpl-'));
assert.ok(new Set(ids).size === 1, 'id must be identical across every chunk');
});
});
Loading