Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion components/anthropic/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import { setGenerative } from '../../resources/models/backendRegistry.ts';
import {
assignFiniteTokenCount,
composeSignal,
MAX_ERROR_BODY_BYTES,
normalizeOrigin,
parseJsonResponse,
readBoundedJson,
requireCredential,
requireModel,
} from '../../resources/models/backendHelpers.ts';
Expand Down Expand Up @@ -59,6 +61,12 @@ const MAX_SSE_BUFFER_CHARS = 1 << 20;
const MAX_TOOL_CALL_ARGS_CHARS = 1 << 20;
// Cap for upstream `error.message` we surface to operators.
const MAX_UPSTREAM_ERROR_MESSAGE_CHARS = 500;
// Maximum number of distinct tool-call accumulator entries. Anthropic keys by
// `index` from content_block_start events; a hostile upstream can open unbounded
// entries if content_block_stop never arrives for earlier indices.
const MAX_TOOL_CALL_ACCUMULATOR_ENTRIES = 128;
// Total tool-call argument chars across all content blocks in one stream.
const MAX_TOTAL_TOOL_CALL_ARGS_CHARS = 8 * 1024 * 1024; // 8 MiB

const log = harperLogger.forComponent('anthropic').conditional;

Expand Down Expand Up @@ -138,6 +146,7 @@ export class AnthropicBackend implements ModelBackend {
// partial strings.
const toolBuf = new Map<number, AnthropicToolCallAccumulator>();
let finalFinishReason: GenerateResult['finishReason'] | undefined;
let totalArgChars = 0;

for await (const event of readSse(res.body)) {
const chunk: GenerateChunk = {};
Expand Down Expand Up @@ -165,6 +174,13 @@ export class AnthropicBackend implements ModelBackend {
event.index !== undefined &&
event.content_block?.type === 'tool_use'
) {
// Cap total accumulator entries: `index` is upstream-controlled and
// a content_block_stop may never arrive, leaking entries indefinitely.
if (toolBuf.size >= MAX_TOOL_CALL_ACCUMULATOR_ENTRIES) {
throw new AnthropicBackendError(
`Anthropic tool-call accumulator exceeded ${MAX_TOOL_CALL_ACCUMULATOR_ENTRIES} distinct tool-call entries`
);
}
toolBuf.set(event.index, {
id: event.content_block.id,
name: event.content_block.name,
Expand All @@ -183,6 +199,12 @@ export class AnthropicBackend implements ModelBackend {
`Anthropic tool-call arguments exceed ${MAX_TOOL_CALL_ARGS_CHARS} chars (index ${event.index})`
);
}
totalArgChars += event.delta.partial_json.length;
if (totalArgChars > MAX_TOTAL_TOOL_CALL_ARGS_CHARS) {
throw new AnthropicBackendError(
`Anthropic tool-call arguments exceed total stream cap of ${MAX_TOTAL_TOOL_CALL_ARGS_CHARS} chars`
);
}
acc.argumentsBuf += event.delta.partial_json;
}
}
Expand Down Expand Up @@ -271,7 +293,12 @@ export class AnthropicBackendError extends ServerError {

async function readErrorSuffix(res: Response): Promise<string> {
try {
const body = (await res.json()) as { error?: { message?: unknown; type?: unknown } };
const body = await readBoundedJson<{ error?: { message?: unknown; type?: unknown } }>(
res,
'Anthropic error response',
AnthropicBackendError,
MAX_ERROR_BODY_BYTES
);
const message = body?.error?.message;
if (typeof message === 'string' && message.length > 0) {
const truncated =
Expand Down
65 changes: 55 additions & 10 deletions components/bedrock/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ const DEFAULT_MAX_TOKENS = 4096;
// Max accumulated `bytes` from streamed Claude tool-use input_json_delta;
// matches the cap in `components/anthropic/index.ts`.
const MAX_TOOL_CALL_ARGS_CHARS = 1 << 20;
// Maximum number of distinct tool-call accumulator entries. Upstream-controlled
// `index` values could otherwise allocate unbounded map entries if
// content_block_stop events never arrive. Matches the cap in the direct backends.
const MAX_TOOL_CALL_ACCUMULATOR_ENTRIES = 128;
// Total tool-call argument chars across all content blocks in one stream.
const MAX_TOTAL_TOOL_CALL_ARGS_CHARS = 8 * 1024 * 1024; // 8 MiB

const log = harperLogger.forComponent('bedrock').conditional;

Expand Down Expand Up @@ -199,7 +205,7 @@ export class BedrockBackend implements ModelBackend {
const model = opts.model ?? this.#defaultModel;
requireModel(model, 'generate', BedrockBackendError);
const family = familyOf(model);
const body = buildGenerateBody(family, input, opts);
const body = buildGenerateBody(family, model, input, opts);

const client = await this.#getClient();
const sdk = await loadSdk();
Expand All @@ -219,7 +225,7 @@ export class BedrockBackend implements ModelBackend {
const model = opts.model ?? this.#defaultModel;
requireModel(model, 'generateStream', BedrockBackendError);
const family = familyOf(model);
const body = buildGenerateBody(family, input, opts);
const body = buildGenerateBody(family, model, input, opts);

const client = await this.#getClient();
const sdk = await loadSdk();
Expand Down Expand Up @@ -288,13 +294,16 @@ export class BedrockBackendError extends ServerError {

type Family = 'anthropic' | 'amazon' | 'meta' | 'cohere' | 'mistral' | 'unknown';

const KNOWN_FAMILIES: Set<Family> = new Set(['anthropic', 'amazon', 'meta', 'cohere', 'mistral']);

function familyOf(modelId: string): Family {
const prefix = modelId.split('.', 1)[0]?.toLowerCase() ?? '';
if (prefix === 'anthropic') return 'anthropic';
if (prefix === 'amazon') return 'amazon';
if (prefix === 'meta') return 'meta';
if (prefix === 'cohere') return 'cohere';
if (prefix === 'mistral') return 'mistral';
// Cross-region inference-profile IDs are prefixed with a geographic segment
// (e.g. `us.anthropic.claude-3-5-sonnet-…`, `eu.meta.llama3-…`, `global.…`).
// Split on '.' and walk segments: the first segment that is a known family wins.
const segments = modelId.toLowerCase().split('.');
for (const seg of segments) {
if (KNOWN_FAMILIES.has(seg as Family)) return seg as Family;
}
return 'unknown';
}

Expand Down Expand Up @@ -349,10 +358,33 @@ function extractEmbedResult(

// ---------- generate body / result extraction ----------

function buildGenerateBody(family: Family, input: GenerateInput, opts: BackendOpts<GenerateOpts>): object {
/**
* amazon.nova-* models use the Converse API messages-v1 shape, not the Titan
* `inputText` shape. The Converse migration is a larger follow-up; for now
* throw a clear error so operators get actionable feedback instead of a
* malformed-request 400 from Bedrock.
*/
function rejectNovaModel(modelId: string): void {
if (modelId.toLowerCase().includes('nova')) {
throw new BedrockBackendError(
`amazon.nova models are not yet supported by the bedrock backend (model: ${modelId}); ` +
'these models require the Converse API shape, not the legacy InvokeModel shape'
);
}
}

function buildGenerateBody(
family: Family,
modelId: string,
input: GenerateInput,
opts: BackendOpts<GenerateOpts>
): object {
if (family === 'anthropic') return buildAnthropicBody(input, opts);
if (family === 'meta') return buildLlamaBody(input, opts);
if (family === 'amazon') return buildTitanGenerateBody(input, opts);
if (family === 'amazon') {
rejectNovaModel(modelId);
return buildTitanGenerateBody(input, opts);
}
if (family === 'mistral') return buildMistralBody(input, opts);
if (family === 'cohere') return buildCohereGenerateBody(input, opts);
throw new BedrockBackendError(`Bedrock generate not supported for model family '${family}'`);
Expand Down Expand Up @@ -526,6 +558,7 @@ async function* parseAnthropicStream(
const decoder = new TextDecoder('utf-8');
const toolBuf = new Map<number, { id: string; name: string; argumentsBuf: string }>();
let finalFinishReason: GenerateResult['finishReason'] | undefined;
let totalArgChars = 0;

for await (const event of body) {
if (!event.chunk?.bytes) continue;
Expand Down Expand Up @@ -563,6 +596,12 @@ async function* parseAnthropicStream(
}

if (type === 'content_block_start' && index !== undefined && contentBlock?.type === 'tool_use') {
// Cap total accumulator entries; content_block_stop may never arrive.
if (toolBuf.size >= MAX_TOOL_CALL_ACCUMULATOR_ENTRIES) {
throw new BedrockBackendError(
`Bedrock tool-call accumulator exceeded ${MAX_TOOL_CALL_ACCUMULATOR_ENTRIES} distinct tool-call entries`
);
}
toolBuf.set(index, { id: contentBlock.id ?? '', name: contentBlock.name ?? '', argumentsBuf: '' });
}
if (type === 'content_block_delta' && index !== undefined && delta) {
Expand All @@ -576,6 +615,12 @@ async function* parseAnthropicStream(
`Bedrock tool-call arguments exceed ${MAX_TOOL_CALL_ARGS_CHARS} chars (index ${index})`
);
}
totalArgChars += delta.partial_json.length;
if (totalArgChars > MAX_TOTAL_TOOL_CALL_ARGS_CHARS) {
throw new BedrockBackendError(
`Bedrock tool-call arguments exceed total stream cap of ${MAX_TOTAL_TOOL_CALL_ARGS_CHARS} chars`
);
}
acc.argumentsBuf += delta.partial_json;
}
}
Expand Down
79 changes: 63 additions & 16 deletions components/openai/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import { setEmbedding, setGenerative } from '../../resources/models/backendRegis
import {
assignFiniteTokenCount,
composeSignal,
MAX_ERROR_BODY_BYTES,
normalizeOrigin,
parseJsonResponse,
readBoundedJson,
requireCredential,
requireModel,
} from '../../resources/models/backendHelpers.ts';
Expand Down Expand Up @@ -60,6 +62,15 @@ const MAX_TOOL_CALL_ARGS_CHARS = 1 << 20;
// cap defends against a misbehaving compat shim that returns megabytes of
// "error" prose.
const MAX_UPSTREAM_ERROR_MESSAGE_CHARS = 500;
// Maximum number of distinct tool-call accumulator entries. OpenAI keys by
// upstream-controlled `delta.index`; without a cardinality cap a hostile
// upstream can allocate unbounded map entries (one per index value). Real
// responses use single-digit counts.
const MAX_TOOL_CALL_ACCUMULATOR_ENTRIES = 128;
// Total tool-call argument chars across all entries in one stream. The per-entry
// cap (1 MiB) plus the 128-entry cap still allows ~128 MiB accumulated; this cap
// keeps any single stream well-bounded. Real responses use tens of KB.
const MAX_TOTAL_TOOL_CALL_ARGS_CHARS = 8 * 1024 * 1024; // 8 MiB

const log = harperLogger.forComponent('openai').conditional;

Expand Down Expand Up @@ -100,10 +111,19 @@ export class OpenAIBackend implements ModelBackend {
readonly #organization?: string;
readonly #requestTimeoutMs?: number;
readonly #fetch: typeof fetch;
// True only when talking to api.openai.com itself. OpenAI's reasoning models
// (o-series, gpt-5 family) reject `max_tokens` in favour of `max_completion_tokens`;
// OpenAI-compatible shims (vLLM, Ollama-compat, older gateways) only know `max_tokens`.
readonly #isNativeOpenAI: boolean;

constructor(config: OpenAIBackendConfig = {}, fetchImpl: typeof fetch = fetch) {
this.#apiKey = requireCredential(config.apiKey, 'OpenAI', 'apiKey', OpenAIBackendError);
this.#baseUrl = normalizeOrigin(config.baseUrl, { host: DEFAULT_BASE_URL, secure: true });
try {
this.#isNativeOpenAI = new URL(this.#baseUrl).hostname === 'api.openai.com';
} catch {
this.#isNativeOpenAI = false;
}
this.#defaultModel = config.model;
this.#organization = config.organization;
this.#requestTimeoutMs = config.requestTimeoutMs;
Expand Down Expand Up @@ -147,7 +167,7 @@ export class OpenAIBackend implements ModelBackend {
async generate(input: GenerateInput, opts: BackendOpts<GenerateOpts>): Promise<ModelCallResult<GenerateResult>> {
const model = opts.model ?? this.#defaultModel;
requireModel(model, 'generate', OpenAIBackendError);
const body = buildChatRequest(model, input, opts, false);
const body = buildChatRequest(model, input, opts, false, this.#isNativeOpenAI);
const res = await this.#post('/chat/completions', body, opts.signal);
const data = await parseJsonResponse<OpenAIChatResponse>(res, 'OpenAI /chat/completions', OpenAIBackendError);
const choice = data.choices?.[0];
Expand All @@ -173,7 +193,7 @@ export class OpenAIBackend implements ModelBackend {
async *generateStream(input: GenerateInput, opts: BackendOpts<GenerateOpts>): AsyncIterable<GenerateChunk> {
const model = opts.model ?? this.#defaultModel;
requireModel(model, 'generateStream', OpenAIBackendError);
const body = buildChatRequest(model, input, opts, true);
const body = buildChatRequest(model, input, opts, true, this.#isNativeOpenAI);
const res = await this.#post('/chat/completions', body, opts.signal);
if (!res.body) throw new OpenAIBackendError('OpenAI /chat/completions returned no body for streaming');

Expand All @@ -184,6 +204,7 @@ export class OpenAIBackend implements ModelBackend {
// never a partial string.
const toolBuf = new Map<number, ToolCallAccumulator>();
let finalFinishReason: GenerateResult['finishReason'] | undefined;
let totalArgChars = 0;

for await (const event of readSse(res.body)) {
const choice = event.choices?.[0];
Expand All @@ -195,7 +216,7 @@ export class OpenAIBackend implements ModelBackend {
}
if (Array.isArray(delta?.tool_calls)) {
for (const tcDelta of delta.tool_calls) {
accumulateToolCallDelta(toolBuf, tcDelta);
totalArgChars = accumulateToolCallDelta(toolBuf, tcDelta, totalArgChars);
}
}
if (choice.finish_reason) {
Expand Down Expand Up @@ -249,7 +270,12 @@ export class OpenAIBackend implements ModelBackend {

async function readErrorSuffix(res: Response): Promise<string> {
try {
const body = (await res.json()) as { error?: { message?: unknown; type?: unknown } };
const body = await readBoundedJson<{ error?: { message?: unknown; type?: unknown } }>(
res,
'OpenAI error response',
OpenAIBackendError,
MAX_ERROR_BODY_BYTES
);
const message = body?.error?.message;
if (typeof message === 'string' && message.length > 0) {
const truncated =
Expand Down Expand Up @@ -292,7 +318,8 @@ function buildChatRequest(
model: string,
input: GenerateInput,
opts: BackendOpts<GenerateOpts>,
stream: boolean
stream: boolean,
isNativeOpenAI: boolean
): Record<string, unknown> {
const messages = normalizeMessages(input);
const tools = extractTools(input);
Expand All @@ -309,13 +336,15 @@ function buildChatRequest(
}
if (typeof opts.temperature === 'number') body.temperature = opts.temperature;
if (typeof opts.maxTokens === 'number') {
// `max_tokens` is broadly supported across OpenAI and OpenAI-compatible
// endpoints. OpenAI is migrating to `max_completion_tokens` for o1/o3+
// models but still accepts `max_tokens` on chat completions. Compat
// endpoints (Azure, vLLM, Together, OpenRouter) mostly accept the older
// field. Switch to `max_completion_tokens` when v1 models we ship
// against require it.
body.max_tokens = opts.maxTokens;
// api.openai.com's reasoning/gpt-5 models reject `max_tokens` (400); use
// `max_completion_tokens` there. OpenAI-compatible shims (vLLM, Ollama-compat,
// older gateways) only understand `max_tokens`, so keep the legacy field for
// any custom baseUrl.
if (isNativeOpenAI) {
body.max_completion_tokens = opts.maxTokens;
} else {
body.max_tokens = opts.maxTokens;
}
}
const responseFormat = mapResponseFormat(opts.responseFormat);
if (responseFormat) body.response_format = responseFormat;
Expand Down Expand Up @@ -428,26 +457,44 @@ interface ToolCallAccumulator {
argumentsBuf: string;
}

function accumulateToolCallDelta(buf: Map<number, ToolCallAccumulator>, delta: OpenAIToolCallDelta): void {
function accumulateToolCallDelta(
buf: Map<number, ToolCallAccumulator>,
delta: OpenAIToolCallDelta,
totalArgChars: number
): number {
const index = typeof delta.index === 'number' ? delta.index : 0;
let acc = buf.get(index);
if (!acc) {
// Cap total accumulator entries: `index` is upstream-controlled and an
// adversarial stream can allocate unbounded map entries without this guard.
if (buf.size >= MAX_TOOL_CALL_ACCUMULATOR_ENTRIES) {
throw new OpenAIBackendError(
`OpenAI tool-call accumulator exceeded ${MAX_TOOL_CALL_ACCUMULATOR_ENTRIES} distinct tool-call entries`
);
}
acc = { argumentsBuf: '' };
buf.set(index, acc);
}
if (delta.id) acc.id = delta.id;
if (delta.function?.name) acc.name = delta.function.name;
if (typeof delta.function?.arguments === 'string') {
// Defend against an unbounded accumulator: the per-event SSE buffer cap
// stops a single oversize event, but tool-call arguments are *built up*
// across many sub-cap events. Throw before V8 hits string-length limits.
// Per-entry cap: the per-event SSE buffer cap stops a single oversize event,
// but tool-call arguments are *built up* across many sub-cap events.
if (acc.argumentsBuf.length + delta.function.arguments.length > MAX_TOOL_CALL_ARGS_CHARS) {
throw new OpenAIBackendError(
`OpenAI tool-call arguments exceed ${MAX_TOOL_CALL_ARGS_CHARS} chars (index ${index})`
);
}
// Total-stream cap: 128 entries each at 1 MiB still allows ~128 MiB accumulated.
totalArgChars += delta.function.arguments.length;
if (totalArgChars > MAX_TOTAL_TOOL_CALL_ARGS_CHARS) {
throw new OpenAIBackendError(
`OpenAI tool-call arguments exceed total stream cap of ${MAX_TOTAL_TOOL_CALL_ARGS_CHARS} chars`
);
}
acc.argumentsBuf += delta.function.arguments;
}
return totalArgChars;
}

function flushToolCallBuffer(buf: Map<number, ToolCallAccumulator>): Partial<ToolCall>[] {
Expand Down
5 changes: 5 additions & 0 deletions resources/models/agentLoop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,11 @@ async function runSingleToolCall(
iteration: number,
maxResultBytes: number
): Promise<DispatchedToolCall> {
// Guard: don't start a side-effecting handler if the caller has already
// aborted. The catch below already rethrows AbortError, so a pre-aborted
// signal that fires after entry but before the handler await is also covered.
ctx.signal?.throwIfAborted();

const entry: ToolTraceEntry = {
iteration,
toolCallId: call.id,
Expand Down
Loading