Framework-agnostic multi-agent task delegation for TypeScript.
- In-memory message bus for named specialist agents
- Deterministic coordinator for fixed workflows
- DAG coordinator for dependency-aware parallel execution
- LLM coordinator for planning from natural-language goals
- Anthropic, OpenAI, Ollama, and custom planner support
- Pluggable persistence through the
BusStoreinterface - Dead-letter queue for failed tasks
- Observable task lifecycle events with
bus.watch() - Zero runtime dependencies, Node >= 18
npm install @ekaone/agent-relaypnpm add @ekaone/agent-relayyarn add @ekaone/agent-relayimport { createBus, createAgent, runDeterministic } from '@ekaone/agent-relay'
const bus = createBus()
bus.register(createAgent({
name: 'npm',
description: 'Runs npm lifecycle commands',
commands: ['install', 'build', 'test'],
handler: async task => {
const { command } = task.input as { command: string }
return { success: true, command }
},
}))
bus.register(createAgent({
name: 'deploy',
description: 'Runs deployment commands',
commands: ['restart'],
handler: async task => {
const { command } = task.input as { command: string }
return { success: true, command }
},
}))
const results = await runDeterministic(bus, [
{ to: 'npm', input: { command: 'install' } },
{ to: 'npm', input: { command: 'build' } },
{ to: 'npm', input: { command: 'test' } },
{ to: 'deploy', input: { command: 'restart' } },
])Independent steps run in parallel. Dependent steps wait for their upstream outputs and can derive input through a resolver function.
import { createBus, createAgent, runDAG } from '@ekaone/agent-relay'
const bus = createBus()
bus.register(fetchAgent)
bus.register(summaryAgent)
bus.register(sentimentAgent)
bus.register(reportAgent)
const results = await runDAG(bus, [
{ id: 'fetch', to: 'fetch', input: { url } },
{
id: 'summary',
to: 'summarize',
dependsOn: ['fetch'],
input: deps => ({ text: deps['fetch']!.output.body }),
},
{
id: 'sentiment',
to: 'sentiment',
dependsOn: ['fetch'],
input: deps => ({ text: deps['fetch']!.output.body }),
},
{
id: 'report',
to: 'report',
dependsOn: ['summary', 'sentiment'],
input: deps => ({
summary: deps['summary']!.output,
sentiment: deps['sentiment']!.output,
}),
},
])
console.log(results['report']!.output)If a step fails, its dependents are marked cancelled. Independent branches continue running.
import { createBus, createAgent, createCoordinator } from '@ekaone/agent-relay'
const bus = createBus()
bus.register(summarizerAgent)
bus.register(reviewerAgent)
const coordinator = createCoordinator(bus, {
provider: 'anthropic',
apiKey: process.env.ANTHROPIC_API_KEY,
model: 'claude-sonnet-4-20250514',
})
const plan = await coordinator.plan('summarize and review the article')
const results = await coordinator.run('summarize and review the article')Creates the central message bus.
const bus = createBus({
store: myCustomStore,
defaultTimeoutMs: 30_000,
})| Method | Description |
|---|---|
bus.register(agent) |
Register a specialist agent |
bus.manifest() |
List registered agents and capabilities |
bus.history() |
Return all tasks saved by the store |
bus.dispatch(task) |
Dispatch a pre-built task |
bus.watch(listener, filter?) |
Observe agent and task lifecycle events |
bus.deadLetter.drain() |
Pull failed tasks out of the dead-letter queue |
bus.deadLetter.retry(id, bus) |
Re-dispatch a failed task by ID |
Observes bus events and returns an unsubscribe function.
const unsubscribe = bus.watch(event => {
if (event.type === 'agent:registered') {
console.log('agent registered:', event.agent.name)
return
}
console.log(event.type, event.task.id, event.task.status)
})BusEvent is a discriminated union keyed by type:
type BusEvent =
| { type: 'agent:registered'; timestamp: number; agent: AgentManifestEntry }
| { type: 'task:queued'; timestamp: number; task: AgentTask }
| { type: 'task:running'; timestamp: number; task: AgentTask }
| { type: 'task:done'; timestamp: number; task: AgentTask }
| { type: 'task:failed'; timestamp: number; task: AgentTask }
| { type: 'task:cancelled'; timestamp: number; task: AgentTask }Events come from two explicit sources:
register()emitsagent:registered.- The bus wraps
store.save(task)and emits task events after successful saves.
Async listeners are fire-and-forget. Listener return values are ignored, and thrown errors or rejected promises are caught so they never affect registration, dispatch, DAG execution, or store writes.
Filters support type, task status, and task target to:
bus.watch(
event => console.log(event.task.id),
{ type: 'task:done', status: 'done', to: 'summarizer' }
)Filters combine with AND: when type, status, and to are all set, an event must match all three to fire.
send() and delegate() differ intentionally. send() persists a pending task, so it emits task:queued; delegate() creates a task and dispatches it immediately, so it emits task:running and a terminal task event, but not task:queued.
Defines a specialist agent.
const agent = createAgent<TInput, TOutput>({
name: 'summarizer',
description: 'Summarizes long text',
commands: ['summarize'],
timeoutMs: 10_000,
handler: async task => {
return doWork(task.input)
},
})Sends a task and awaits the result.
const result = await delegate<string, string>(bus, {
from: 'coordinator',
to: 'summarizer',
input: 'Long text here...',
})
if (result.status === 'done') console.log(result.output)
if (result.status === 'failed') console.error(result.error)Low-level enqueue and FIFO polling.
const taskId = await send(bus, { from: 'coordinator', to: 'worker', input: 'job' })
const task = await receive(bus, 'worker')Executes a fixed sequence of delegation steps.
const results = await runDeterministic(bus, [
{ to: 'npm', input: { command: 'build' } },
{ to: 'deploy', input: { command: 'restart' } },
], {
continueOnError: false,
})Executes steps as a directed acyclic graph. Steps with no unresolved dependencies run in parallel.
const results = await runDAG(bus, [
{ id: 'a', to: 'fetch', input: { url } },
{ id: 'b', to: 'summarize', dependsOn: ['a'], input: deps => ({ text: deps['a']!.output.body }) },
])runDAG validates duplicate step IDs, unknown dependencies, and cycles before dispatching anything.
Creates an LLM-powered coordinator. It reads bus.manifest() and produces a delegation plan.
const coordinator = createCoordinator(bus, {
provider: 'anthropic',
apiKey: process.env.ANTHROPIC_API_KEY,
model: 'claude-sonnet-4-20250514',
maxSteps: 20,
continueOnError: false,
resolvePlan: async (goal, manifest) => ({ steps: [] }),
})| Provider | provider value |
Key env var | Default model |
|---|---|---|---|
| Anthropic | 'anthropic' |
ANTHROPIC_API_KEY |
claude-sonnet-4-20250514 |
| OpenAI | 'openai' |
OPENAI_API_KEY |
gpt-4o |
| Ollama | 'ollama' |
none | llama3 |
| Custom | 'custom' |
none | provide resolvePlan |
import type { AgentTask, BusStore } from '@ekaone/agent-relay'
const myStore: BusStore = {
async save(task) {},
async find(id) { return null },
async list(filter) { return [] },
async clear() {},
}
const bus = createBus({ store: myStore })SQLite adapter: @ekaone/agent-relay-sqlite is planned for v0.2.0.
interface AgentTask<TInput, TOutput> {
id: string
from: string
to: string
input: TInput
output?: TOutput
status: 'pending' | 'running' | 'done' | 'failed' | 'cancelled'
error?: Error
createdAt: number
updatedAt: number
}See examples/ for runnable examples:
npx tsx examples/deterministic.ts
npx tsx examples/dag-pipeline.ts
npx tsx examples/watch-events.ts
ANTHROPIC_API_KEY=sk-... npx tsx examples/llm-coordinator.ts
npx tsx examples/custom-store.ts- v0.1.0: core bus, agents, delegate, deterministic + LLM coordinator, DLQ, pluggable store
- v0.1.2:
runDAG()with dependency graph execution - v0.2.0: SQLite adapter, streaming task output, richer event watching
- v0.3.0: agent checkpointing and replay
MIT (c) Eka Prasetia