Skip to content

ekaone/agent-relay

Repository files navigation

@ekaone/agent-relay

Framework-agnostic multi-agent task delegation for TypeScript.

  • In-memory message bus for named specialist agents
  • Deterministic coordinator for fixed workflows
  • DAG coordinator for dependency-aware parallel execution
  • LLM coordinator for planning from natural-language goals
  • Anthropic, OpenAI, Ollama, and custom planner support
  • Pluggable persistence through the BusStore interface
  • Dead-letter queue for failed tasks
  • Observable task lifecycle events with bus.watch()
  • Zero runtime dependencies, Node >= 18

Install

npm install @ekaone/agent-relay
pnpm add @ekaone/agent-relay
yarn add @ekaone/agent-relay

Quick Start

Deterministic coordinator

import { createBus, createAgent, runDeterministic } from '@ekaone/agent-relay'

const bus = createBus()

bus.register(createAgent({
  name: 'npm',
  description: 'Runs npm lifecycle commands',
  commands: ['install', 'build', 'test'],
  handler: async task => {
    const { command } = task.input as { command: string }
    return { success: true, command }
  },
}))

bus.register(createAgent({
  name: 'deploy',
  description: 'Runs deployment commands',
  commands: ['restart'],
  handler: async task => {
    const { command } = task.input as { command: string }
    return { success: true, command }
  },
}))

const results = await runDeterministic(bus, [
  { to: 'npm', input: { command: 'install' } },
  { to: 'npm', input: { command: 'build' } },
  { to: 'npm', input: { command: 'test' } },
  { to: 'deploy', input: { command: 'restart' } },
])

DAG coordinator

Independent steps run in parallel. Dependent steps wait for their upstream outputs and can derive input through a resolver function.

import { createBus, createAgent, runDAG } from '@ekaone/agent-relay'

const bus = createBus()
bus.register(fetchAgent)
bus.register(summaryAgent)
bus.register(sentimentAgent)
bus.register(reportAgent)

const results = await runDAG(bus, [
  { id: 'fetch', to: 'fetch', input: { url } },
  {
    id: 'summary',
    to: 'summarize',
    dependsOn: ['fetch'],
    input: deps => ({ text: deps['fetch']!.output.body }),
  },
  {
    id: 'sentiment',
    to: 'sentiment',
    dependsOn: ['fetch'],
    input: deps => ({ text: deps['fetch']!.output.body }),
  },
  {
    id: 'report',
    to: 'report',
    dependsOn: ['summary', 'sentiment'],
    input: deps => ({
      summary: deps['summary']!.output,
      sentiment: deps['sentiment']!.output,
    }),
  },
])

console.log(results['report']!.output)

If a step fails, its dependents are marked cancelled. Independent branches continue running.

LLM coordinator

import { createBus, createAgent, createCoordinator } from '@ekaone/agent-relay'

const bus = createBus()
bus.register(summarizerAgent)
bus.register(reviewerAgent)

const coordinator = createCoordinator(bus, {
  provider: 'anthropic',
  apiKey: process.env.ANTHROPIC_API_KEY,
  model: 'claude-sonnet-4-20250514',
})

const plan = await coordinator.plan('summarize and review the article')
const results = await coordinator.run('summarize and review the article')

API

createBus(options?)

Creates the central message bus.

const bus = createBus({
  store: myCustomStore,
  defaultTimeoutMs: 30_000,
})
Method Description
bus.register(agent) Register a specialist agent
bus.manifest() List registered agents and capabilities
bus.history() Return all tasks saved by the store
bus.dispatch(task) Dispatch a pre-built task
bus.watch(listener, filter?) Observe agent and task lifecycle events
bus.deadLetter.drain() Pull failed tasks out of the dead-letter queue
bus.deadLetter.retry(id, bus) Re-dispatch a failed task by ID

bus.watch(listener, filter?)

Observes bus events and returns an unsubscribe function.

const unsubscribe = bus.watch(event => {
  if (event.type === 'agent:registered') {
    console.log('agent registered:', event.agent.name)
    return
  }

  console.log(event.type, event.task.id, event.task.status)
})

BusEvent is a discriminated union keyed by type:

type BusEvent =
  | { type: 'agent:registered'; timestamp: number; agent: AgentManifestEntry }
  | { type: 'task:queued'; timestamp: number; task: AgentTask }
  | { type: 'task:running'; timestamp: number; task: AgentTask }
  | { type: 'task:done'; timestamp: number; task: AgentTask }
  | { type: 'task:failed'; timestamp: number; task: AgentTask }
  | { type: 'task:cancelled'; timestamp: number; task: AgentTask }

Events come from two explicit sources:

  • register() emits agent:registered.
  • The bus wraps store.save(task) and emits task events after successful saves.

Async listeners are fire-and-forget. Listener return values are ignored, and thrown errors or rejected promises are caught so they never affect registration, dispatch, DAG execution, or store writes.

Filters support type, task status, and task target to:

bus.watch(
  event => console.log(event.task.id),
  { type: 'task:done', status: 'done', to: 'summarizer' }
)

Filters combine with AND: when type, status, and to are all set, an event must match all three to fire.

send() and delegate() differ intentionally. send() persists a pending task, so it emits task:queued; delegate() creates a task and dispatches it immediately, so it emits task:running and a terminal task event, but not task:queued.

createAgent(definition)

Defines a specialist agent.

const agent = createAgent<TInput, TOutput>({
  name: 'summarizer',
  description: 'Summarizes long text',
  commands: ['summarize'],
  timeoutMs: 10_000,
  handler: async task => {
    return doWork(task.input)
  },
})

delegate(bus, options)

Sends a task and awaits the result.

const result = await delegate<string, string>(bus, {
  from: 'coordinator',
  to: 'summarizer',
  input: 'Long text here...',
})

if (result.status === 'done') console.log(result.output)
if (result.status === 'failed') console.error(result.error)

send(bus, options) / receive(bus, agentName)

Low-level enqueue and FIFO polling.

const taskId = await send(bus, { from: 'coordinator', to: 'worker', input: 'job' })
const task = await receive(bus, 'worker')

runDeterministic(bus, steps, options?)

Executes a fixed sequence of delegation steps.

const results = await runDeterministic(bus, [
  { to: 'npm', input: { command: 'build' } },
  { to: 'deploy', input: { command: 'restart' } },
], {
  continueOnError: false,
})

runDAG(bus, steps, options?)

Executes steps as a directed acyclic graph. Steps with no unresolved dependencies run in parallel.

const results = await runDAG(bus, [
  { id: 'a', to: 'fetch', input: { url } },
  { id: 'b', to: 'summarize', dependsOn: ['a'], input: deps => ({ text: deps['a']!.output.body }) },
])

runDAG validates duplicate step IDs, unknown dependencies, and cycles before dispatching anything.

createCoordinator(bus, config)

Creates an LLM-powered coordinator. It reads bus.manifest() and produces a delegation plan.

const coordinator = createCoordinator(bus, {
  provider: 'anthropic',
  apiKey: process.env.ANTHROPIC_API_KEY,
  model: 'claude-sonnet-4-20250514',
  maxSteps: 20,
  continueOnError: false,
  resolvePlan: async (goal, manifest) => ({ steps: [] }),
})

AI Providers

Provider provider value Key env var Default model
Anthropic 'anthropic' ANTHROPIC_API_KEY claude-sonnet-4-20250514
OpenAI 'openai' OPENAI_API_KEY gpt-4o
Ollama 'ollama' none llama3
Custom 'custom' none provide resolvePlan

Pluggable Store

import type { AgentTask, BusStore } from '@ekaone/agent-relay'

const myStore: BusStore = {
  async save(task) {},
  async find(id) { return null },
  async list(filter) { return [] },
  async clear() {},
}

const bus = createBus({ store: myStore })

SQLite adapter: @ekaone/agent-relay-sqlite is planned for v0.2.0.

Task Schema

interface AgentTask<TInput, TOutput> {
  id: string
  from: string
  to: string
  input: TInput
  output?: TOutput
  status: 'pending' | 'running' | 'done' | 'failed' | 'cancelled'
  error?: Error
  createdAt: number
  updatedAt: number
}

Examples

See examples/ for runnable examples:

npx tsx examples/deterministic.ts
npx tsx examples/dag-pipeline.ts
npx tsx examples/watch-events.ts
ANTHROPIC_API_KEY=sk-... npx tsx examples/llm-coordinator.ts
npx tsx examples/custom-store.ts

Roadmap

  • v0.1.0: core bus, agents, delegate, deterministic + LLM coordinator, DLQ, pluggable store
  • v0.1.2: runDAG() with dependency graph execution
  • v0.2.0: SQLite adapter, streaming task output, richer event watching
  • v0.3.0: agent checkpointing and replay

License

MIT (c) Eka Prasetia

Links

About

Framework-agnostic multi-agent task delegation

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors