diff --git a/bun.lock b/bun.lock index 11efe55..60e1082 100644 --- a/bun.lock +++ b/bun.lock @@ -53,6 +53,7 @@ "version": "0.2.0", "dependencies": { "@mariozechner/pi-coding-agent": "0.73.1", + "croner": "9.0.0", "typebox": "1.1.38", }, }, @@ -290,6 +291,8 @@ "color-name": ["color-name@1.1.4", "", {}, "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA=="], + "croner": ["croner@9.0.0", "", {}, "sha512-onMB0OkDjkXunhdW9htFjEhqrD54+M94i6ackoUkjHKbRnXdyEyKRelp4nJ1kAz32+s27jP1FsebpJCVl0BsvA=="], + "data-uri-to-buffer": ["data-uri-to-buffer@6.0.2", "", {}, "sha512-7hvf7/GW8e86rW0ptuwS3OcBGDjIi6SZva7hCyWC0yYry2cOPmLIjXAUHI6DK2HsnwJd9ifmt57i8eV2n4YNpw=="], "debug": ["debug@4.4.3", "", { "dependencies": { "ms": "^2.1.3" } }, "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA=="], diff --git a/packages/cli/src/cli.ts b/packages/cli/src/cli.ts index 5c3fd35..d3adc54 100644 --- a/packages/cli/src/cli.ts +++ b/packages/cli/src/cli.ts @@ -179,7 +179,13 @@ async function run( console.error('bob run: a prompt is required, e.g. `bob run "summarize my inbox"`'); return 2; } - const result = await runAgent({ name, prompt, model }); + // captureStdout collects the assistant's final text into result.stdout. + // runAgent itself is silent (it accumulates deltas, never writes to console), + // so without this `bob run` printed nothing — print the response here. + const result = await runAgent({ name, prompt, model, captureStdout: true }); + if (result.stdout && result.stdout.trim().length > 0) { + console.log(result.stdout); + } return result.exitCode; } diff --git a/packages/shell/package.json b/packages/shell/package.json index 6993b74..ea4df1c 100644 --- a/packages/shell/package.json +++ b/packages/shell/package.json @@ -23,6 +23,7 @@ }, "dependencies": { "@mariozechner/pi-coding-agent": "0.73.1", + "croner": "9.0.0", "typebox": "1.1.38" }, "keywords": [ diff --git a/packages/shell/src/bob-yaml.ts b/packages/shell/src/bob-yaml.ts index 57349d9..8fa363e 100644 --- a/packages/shell/src/bob-yaml.ts +++ b/packages/shell/src/bob-yaml.ts @@ -134,6 +134,62 @@ export function readBlock(yamlText: string, key: string): Record> { + const lines = yamlText.split(/\r?\n/); + const entries: Array> = []; + let inBlock = false; + let current: Record | undefined; + let dashIndent = -1; + + for (const rawLine of lines) { + if (/^[A-Za-z0-9_-]+\s*:/.test(rawLine)) { + inBlock = rawLine.match(/^([A-Za-z0-9_-]+)\s*:/)?.[1] === "cron"; + current = undefined; + continue; + } + if (!inBlock) continue; + const t = rawLine.trim(); + if (t === "" || t.startsWith("#")) continue; + const indent = rawLine.length - rawLine.replace(/^ +/, "").length; + + if (t.startsWith("-")) { + // New entry. The text after "-" may be the first `key: value`. + current = {}; + entries.push(current); + dashIndent = indent; + const after = t.slice(1).trim(); + if (after) addCronKv(current, after); + } else if (current && indent > dashIndent) { + addCronKv(current, t); + } else { + // Unexpected shape under cron: — stop reading the block. + break; + } + } + return entries; +} + +function addCronKv(obj: Record, kv: string): void { + // Same `name: value` shape as readBlock — coerceScalar trims + strips quotes; + // cron values are all strings (name / cron-expr / prompt), so stringify. + const m = kv.match(/^([A-Za-z0-9_-]+)\s*:(.*)$/); + if (!m) return; + obj[m[1]] = String(coerceScalar(m[2].trim())); +} + function splitList(inner: string): string[] { if (inner.trim() === "") return []; return inner diff --git a/packages/shell/src/cron.ts b/packages/shell/src/cron.ts new file mode 100644 index 0000000..c9c0b23 --- /dev/null +++ b/packages/shell/src/cron.ts @@ -0,0 +1,116 @@ +// Scheduled work for the PERSISTENT runtime — `cron:` in bob.yaml. +// +// Bob's persistent session (`bob serve`) can run prompts on a schedule: each +// bob.yaml `cron:` entry { name, schedule (cron expr), prompt } fires its prompt +// INTO the live session on its cadence. This is how an agent does proactive work +// (e.g. Pulse's daily intel brief) without a second process — and crucially +// without a second Discord gateway connection (a `bob run` per tick would open a +// duplicate login for the same bot token and fight the persistent session). The +// scheduled prompt drives the same warm session that handles inbound, so the +// agent can use its capabilities (discord_reply, flair_*) to act on the tick. +// +// CONCURRENCY: fires are SERIALIZED through a single promise chain so two cron +// entries never run at once, and `fire` itself (in persistent.ts) awaits the +// session's idle barrier before prompting so a tick doesn't cut into an in- +// flight inbound turn. pi's AgentSession processes one turn at a time. +// +// TESTABILITY: croner only computes the next fire time; the clock + timers + the +// fire callback are all injected, so tests drive ticks deterministically with no +// real wall-clock wait and no LLM. + +import { Cron } from "croner"; +import type { CronEntry } from "./index.js"; + +export type TimerHandle = ReturnType; + +export interface CronSchedulerDeps { + // The agent's cron entries (from bob.yaml `cron:`). + entries: CronEntry[]; + // What to do when an entry fires. In production this awaits the session's + // idle barrier then `session.prompt(entry.prompt)`. Serialized by the + // scheduler — never called concurrently with itself. + fire: (entry: CronEntry) => Promise; + // Logger seam. Defaults to console.error. + log?: (msg: string) => void; + // Clock seam (ms since epoch). Defaults to Date.now. + now?: () => number; + // Timer seams. Default to setTimeout/clearTimeout. Tests inject fakes to fire + // ticks deterministically. + setTimer?: (cb: () => void, ms: number) => TimerHandle; + clearTimer?: (h: TimerHandle) => void; +} + +export interface CronSchedulerHandle { + // Cancel all pending timers. Idempotent. The persistent runtime calls this on + // shutdown so a pending tick can't fire into a disposed session. + stop(): void; +} + +// Cron's smallest unit is 1 minute; clamp the computed delay so a slightly-past +// or clock-skewed nextRun can't busy-loop with a 0ms timer. +const MIN_DELAY_MS = 1_000; + +// Start scheduling the entries. Returns a handle whose stop() cancels everything. +// Each entry self-reschedules after firing (compute next → set a timer). An entry +// with an invalid cron expression is logged + skipped (it never fires) rather +// than taking down the whole scheduler. +export function startCronScheduler(deps: CronSchedulerDeps): CronSchedulerHandle { + const log = deps.log ?? ((m: string) => console.error(m)); + const now = deps.now ?? (() => Date.now()); + const setTimer = deps.setTimer ?? ((cb, ms) => setTimeout(cb, ms)); + const clearTimer = deps.clearTimer ?? ((h) => clearTimeout(h)); + + let stopped = false; + const timers = new Set(); + // Single promise chain — serializes all fires so two entries (or a re-entrant + // tick) never run concurrently. + let chain: Promise = Promise.resolve(); + + const scheduleNext = (entry: CronEntry, cron: Cron): void => { + if (stopped) return; + const next = cron.nextRun(new Date(now())); + if (!next) { + log(`cron: ${entry.name} has no future run — not rescheduling`); + return; + } + const delay = Math.max(MIN_DELAY_MS, next.getTime() - now()); + const handle = setTimer(() => { + timers.delete(handle); + if (stopped) return; + // Enqueue the fire on the serial chain, then reschedule the NEXT run. + chain = chain + .then(() => (stopped ? undefined : fireOne(entry))) + .catch((err) => { + const reason = err instanceof Error ? err.message : "cron fire failed"; + log(`cron: ${entry.name} fire error: ${reason}`); + }) + .finally(() => scheduleNext(entry, cron)); + }, delay); + timers.add(handle); + }; + + const fireOne = async (entry: CronEntry): Promise => { + log(`cron: firing ${entry.name}`); + await deps.fire(entry); + }; + + for (const entry of deps.entries) { + let cron: Cron; + try { + cron = new Cron(entry.schedule); + } catch (err) { + const reason = err instanceof Error ? err.message : "invalid cron expression"; + log(`cron: skipping ${entry.name} — bad schedule "${entry.schedule}": ${reason}`); + continue; + } + scheduleNext(entry, cron); + } + + return { + stop(): void { + stopped = true; + for (const h of timers) clearTimer(h); + timers.clear(); + }, + }; +} diff --git a/packages/shell/src/persistent.ts b/packages/shell/src/persistent.ts index f66440f..3321d61 100644 --- a/packages/shell/src/persistent.ts +++ b/packages/shell/src/persistent.ts @@ -28,6 +28,7 @@ import { homedir } from "node:os"; import { join } from "node:path"; import { SessionManager } from "@mariozechner/pi-coding-agent"; +import { type CronSchedulerHandle, startCronScheduler } from "./cron.js"; import { createPiRunSession, type RunSession, @@ -90,7 +91,7 @@ function neverResolves(): Promise { export async function startPersistent(opts: RunPersistentOptions): Promise { const log = opts.log ?? ((m: string) => console.error(m)); const root = opts.agentsRoot ?? join(homedir(), "agents"); - const { provider, model, config } = resolveRunConfig({ + const { provider, model, config, cron } = resolveRunConfig({ name: opts.name, agentsRoot: root, model: opts.model, @@ -101,12 +102,36 @@ export async function startPersistent(opts: RunPersistentOptions): Promise 0) { + log(`[bob] scheduling ${cron.length} cron job(s) for ${opts.name}`); + cronScheduler = startCronScheduler({ + entries: cron, + fire: async (entry) => { + try { + await session.waitForIdle?.(); + } catch { + // proceed — pi serializes turns regardless + } + await session.prompt(entry.prompt); + }, + log, + }); + } + let disposed = false; let disposing: Promise | undefined; const shutdown = async (): Promise => { if (disposed) return; if (disposing) return disposing; disposing = (async () => { + // Stop scheduling first so a pending cron tick can't fire into a session + // we're about to dispose. + cronScheduler?.stop(); // Await any in-flight turn so we don't cut off a reply mid-stream. The // RunSession seam exposes `prompt` but not an idle barrier; production's // pi AgentSession has `agent.waitForIdle()`. We call it best-effort diff --git a/packages/shell/src/run.ts b/packages/shell/src/run.ts index 42f85ee..b7127f4 100644 --- a/packages/shell/src/run.ts +++ b/packages/shell/src/run.ts @@ -34,7 +34,9 @@ import { ModelRegistry, SessionManager, } from "@mariozechner/pi-coding-agent"; +import { readCron } from "./bob-yaml.js"; import { capabilityConfigEnv, resolveCapabilities } from "./capability-loader.js"; +import type { CronEntry } from "./index.js"; // Same regex as init.ts AGENT_NAME — names are filesystem paths, keep them // strict-safe (no `..`, no `/`, no newlines). @@ -214,6 +216,26 @@ export interface ResolvedRunConfig { provider: string; model: string; config: RunSessionConfig; + // bob.yaml `cron:` entries (validated). Only the PERSISTENT runtime uses + // these (it schedules them into the live session); `bob run` ignores them. + cron: CronEntry[]; +} + +// Parse + validate bob.yaml `cron:` into CronEntry[]. Drops any entry missing +// name/schedule/prompt — a single malformed entry shouldn't stop the agent from +// starting (the scheduler additionally skips an unparseable schedule). +function parseCron(yamlText: string): CronEntry[] { + return readCron(yamlText) + .filter( + (e) => + typeof e.name === "string" && + e.name.length > 0 && + typeof e.schedule === "string" && + e.schedule.length > 0 && + typeof e.prompt === "string" && + e.prompt.length > 0, + ) + .map((e) => ({ name: e.name, schedule: e.schedule, prompt: e.prompt })); } export function resolveRunConfig(opts: ResolveRunConfigOptions): ResolvedRunConfig { @@ -249,7 +271,7 @@ export function resolveRunConfig(opts: ResolveRunConfigOptions): ResolvedRunConf extensionSources: resolution.extensionSources, capabilityEnv: capabilityConfigEnv(resolution), }; - return { agentDir, provider, model, config }; + return { agentDir, provider, model, config, cron: parseCron(yamlText) }; } // Real SDK factory: stand up a fresh, in-memory pi AgentSession for the agent, diff --git a/packages/shell/test/cron.test.ts b/packages/shell/test/cron.test.ts new file mode 100644 index 0000000..dfc8d56 --- /dev/null +++ b/packages/shell/test/cron.test.ts @@ -0,0 +1,154 @@ +import { describe, expect, it } from "bun:test"; +import { readCron } from "../src/bob-yaml.js"; +import { startCronScheduler, type TimerHandle } from "../src/cron.js"; + +// Fake timers: capture scheduled callbacks so a test can fire ticks +// deterministically (no real wall-clock wait). +function fakeTimers() { + const scheduled: Array<{ id: number; cb: () => void; ms: number }> = []; + let nextId = 0; + const setTimer = (cb: () => void, ms: number): TimerHandle => { + const id = ++nextId; + scheduled.push({ id, cb, ms }); + return id as unknown as TimerHandle; + }; + const clearTimer = (h: TimerHandle): void => { + const i = scheduled.findIndex((s) => s.id === (h as unknown as number)); + if (i >= 0) scheduled.splice(i, 1); + }; + // Fire every currently-scheduled timer once, then let the serial chain settle. + const tick = async (): Promise => { + const due = scheduled.splice(0, scheduled.length); + for (const s of due) s.cb(); + // Flush the scheduler's promise chain (fire → catch → finally reschedule). + await new Promise((r) => setTimeout(r, 5)); + }; + return { setTimer, clearTimer, scheduled, tick }; +} + +describe("startCronScheduler", () => { + const NOW = 1_700_000_000_000; // fixed ms + + it("schedules an entry, fires it on tick, then reschedules the next run", async () => { + const fired: string[] = []; + const t = fakeTimers(); + const h = startCronScheduler({ + entries: [{ name: "brief", schedule: "0 9 * * *", prompt: "compose the brief" }], + fire: async (e) => { + fired.push(e.name); + }, + now: () => NOW, + setTimer: t.setTimer, + clearTimer: t.clearTimer, + log: () => {}, + }); + + expect(t.scheduled).toHaveLength(1); // one pending timer for the entry + expect(t.scheduled[0]?.ms).toBeGreaterThan(0); // some future delay + + await t.tick(); + expect(fired).toEqual(["brief"]); // fired exactly once + expect(t.scheduled).toHaveLength(1); // and rescheduled the next run + + h.stop(); + expect(t.scheduled).toHaveLength(0); // stop cancels pending timers + }); + + it("skips an entry with an invalid cron expression (never schedules it)", () => { + const t = fakeTimers(); + const h = startCronScheduler({ + entries: [{ name: "bad", schedule: "not a cron expr", prompt: "x" }], + fire: async () => { + throw new Error("must not fire a bad-schedule entry"); + }, + now: () => NOW, + setTimer: t.setTimer, + clearTimer: t.clearTimer, + log: () => {}, + }); + expect(t.scheduled).toHaveLength(0); + h.stop(); + }); + + it("a valid entry still schedules even if a sibling has a bad schedule", () => { + const t = fakeTimers(); + const h = startCronScheduler({ + entries: [ + { name: "bad", schedule: "@@@", prompt: "x" }, + { name: "good", schedule: "*/5 * * * *", prompt: "y" }, + ], + fire: async () => {}, + now: () => NOW, + setTimer: t.setTimer, + clearTimer: t.clearTimer, + log: () => {}, + }); + expect(t.scheduled).toHaveLength(1); // only the good one + h.stop(); + }); + + it("a fire error doesn't stop future runs (reschedules anyway)", async () => { + let calls = 0; + const t = fakeTimers(); + const h = startCronScheduler({ + entries: [{ name: "flaky", schedule: "* * * * *", prompt: "p" }], + fire: async () => { + calls++; + throw new Error("transient"); + }, + now: () => NOW, + setTimer: t.setTimer, + clearTimer: t.clearTimer, + log: () => {}, + }); + await t.tick(); + expect(calls).toBe(1); + expect(t.scheduled).toHaveLength(1); // rescheduled despite the throw + h.stop(); + }); +}); + +describe("readCron", () => { + it("parses a block-sequence of cron maps", () => { + const yaml = [ + "agent:", + " id: pulse", + "cron:", + " - name: daily_intel", + ' schedule: "0 16 * * *"', + ' prompt: "Run the daily intel check."', + " - name: weekly", + ' schedule: "0 9 * * 5"', + ' prompt: "Weekly sweep."', + "channels:", + " tps_mail:", + " inbox: ~/x", + ].join("\n"); + const entries = readCron(yaml); + expect(entries).toHaveLength(2); + expect(entries[0]).toEqual({ + name: "daily_intel", + schedule: "0 16 * * *", + prompt: "Run the daily intel check.", + }); + expect(entries[1]?.name).toBe("weekly"); + }); + + it("returns [] when there is no cron block", () => { + expect(readCron("agent:\n id: pulse\n")).toEqual([]); + }); + + it("ignores comments + blank lines inside the block", () => { + const yaml = [ + "cron:", + " # a comment", + "", + " - name: x", + " schedule: '* * * * *'", + " prompt: 'go'", + ].join("\n"); + const entries = readCron(yaml); + expect(entries).toHaveLength(1); + expect(entries[0]?.name).toBe("x"); + }); +});