Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion packages/cli/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,13 @@ async function run(
console.error('bob run: a prompt is required, e.g. `bob run <name> "summarize my inbox"`');
return 2;
}
const result = await runAgent({ name, prompt, model });
// captureStdout collects the assistant's final text into result.stdout.
// runAgent itself is silent (it accumulates deltas, never writes to console),
// so without this `bob run` printed nothing — print the response here.
const result = await runAgent({ name, prompt, model, captureStdout: true });
if (result.stdout && result.stdout.trim().length > 0) {
console.log(result.stdout);
}
return result.exitCode;
}

Expand Down
1 change: 1 addition & 0 deletions packages/shell/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
},
"dependencies": {
"@mariozechner/pi-coding-agent": "0.73.1",
"croner": "9.0.0",
"typebox": "1.1.38"
},
"keywords": [
Expand Down
56 changes: 56 additions & 0 deletions packages/shell/src/bob-yaml.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,62 @@ export function readBlock(yamlText: string, key: string): Record<string, unknown
return found ? out : undefined;
}

// Read the top-level `cron:` block-sequence of maps into raw entries. Targets
// the exact shape `bob init` documents:
//
// cron:
// - name: morning_briefing
// schedule: "0 9 * * *"
// prompt: "Compose the brief."
//
// Each `- key: value` starts an entry; subsequent `key: value` lines indented
// deeper than the `-` add to it. A column-0 key (or EOF) ends the block. Values
// are coerced as scalars (quotes stripped). Returns [] when absent. The caller
// validates required keys (name/schedule/prompt) + maps to CronEntry — keeping
// this reader dependency-free + free of a layering cycle with index.ts.
export function readCron(yamlText: string): Array<Record<string, string>> {
const lines = yamlText.split(/\r?\n/);
const entries: Array<Record<string, string>> = [];
let inBlock = false;
let current: Record<string, string> | undefined;
let dashIndent = -1;

for (const rawLine of lines) {
if (/^[A-Za-z0-9_-]+\s*:/.test(rawLine)) {
inBlock = rawLine.match(/^([A-Za-z0-9_-]+)\s*:/)?.[1] === "cron";
current = undefined;
continue;
}
if (!inBlock) continue;
const t = rawLine.trim();
if (t === "" || t.startsWith("#")) continue;
const indent = rawLine.length - rawLine.replace(/^ +/, "").length;

if (t.startsWith("-")) {
// New entry. The text after "-" may be the first `key: value`.
current = {};
entries.push(current);
dashIndent = indent;
const after = t.slice(1).trim();
if (after) addCronKv(current, after);
} else if (current && indent > dashIndent) {
addCronKv(current, t);
} else {
// Unexpected shape under cron: — stop reading the block.
break;
}
}
return entries;
}

function addCronKv(obj: Record<string, string>, kv: string): void {
// Same `name: value` shape as readBlock — coerceScalar trims + strips quotes;
// cron values are all strings (name / cron-expr / prompt), so stringify.
const m = kv.match(/^([A-Za-z0-9_-]+)\s*:(.*)$/);
if (!m) return;
obj[m[1]] = String(coerceScalar(m[2].trim()));
}

function splitList(inner: string): string[] {
if (inner.trim() === "") return [];
return inner
Expand Down
116 changes: 116 additions & 0 deletions packages/shell/src/cron.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Scheduled work for the PERSISTENT runtime — `cron:` in bob.yaml.
//
// Bob's persistent session (`bob serve`) can run prompts on a schedule: each
// bob.yaml `cron:` entry { name, schedule (cron expr), prompt } fires its prompt
// INTO the live session on its cadence. This is how an agent does proactive work
// (e.g. Pulse's daily intel brief) without a second process — and crucially
// without a second Discord gateway connection (a `bob run` per tick would open a
// duplicate login for the same bot token and fight the persistent session). The
// scheduled prompt drives the same warm session that handles inbound, so the
// agent can use its capabilities (discord_reply, flair_*) to act on the tick.
//
// CONCURRENCY: fires are SERIALIZED through a single promise chain so two cron
// entries never run at once, and `fire` itself (in persistent.ts) awaits the
// session's idle barrier before prompting so a tick doesn't cut into an in-
// flight inbound turn. pi's AgentSession processes one turn at a time.
//
// TESTABILITY: croner only computes the next fire time; the clock + timers + the
// fire callback are all injected, so tests drive ticks deterministically with no
// real wall-clock wait and no LLM.

import { Cron } from "croner";
import type { CronEntry } from "./index.js";

export type TimerHandle = ReturnType<typeof setTimeout>;

export interface CronSchedulerDeps {
// The agent's cron entries (from bob.yaml `cron:`).
entries: CronEntry[];
// What to do when an entry fires. In production this awaits the session's
// idle barrier then `session.prompt(entry.prompt)`. Serialized by the
// scheduler — never called concurrently with itself.
fire: (entry: CronEntry) => Promise<void>;
// Logger seam. Defaults to console.error.
log?: (msg: string) => void;
// Clock seam (ms since epoch). Defaults to Date.now.
now?: () => number;
// Timer seams. Default to setTimeout/clearTimeout. Tests inject fakes to fire
// ticks deterministically.
setTimer?: (cb: () => void, ms: number) => TimerHandle;
clearTimer?: (h: TimerHandle) => void;
}

export interface CronSchedulerHandle {
// Cancel all pending timers. Idempotent. The persistent runtime calls this on
// shutdown so a pending tick can't fire into a disposed session.
stop(): void;
}

// Cron's smallest unit is 1 minute; clamp the computed delay so a slightly-past
// or clock-skewed nextRun can't busy-loop with a 0ms timer.
const MIN_DELAY_MS = 1_000;

// Start scheduling the entries. Returns a handle whose stop() cancels everything.
// Each entry self-reschedules after firing (compute next → set a timer). An entry
// with an invalid cron expression is logged + skipped (it never fires) rather
// than taking down the whole scheduler.
export function startCronScheduler(deps: CronSchedulerDeps): CronSchedulerHandle {
const log = deps.log ?? ((m: string) => console.error(m));
const now = deps.now ?? (() => Date.now());
const setTimer = deps.setTimer ?? ((cb, ms) => setTimeout(cb, ms));
const clearTimer = deps.clearTimer ?? ((h) => clearTimeout(h));

let stopped = false;
const timers = new Set<TimerHandle>();
// Single promise chain — serializes all fires so two entries (or a re-entrant
// tick) never run concurrently.
let chain: Promise<void> = Promise.resolve();

const scheduleNext = (entry: CronEntry, cron: Cron): void => {
if (stopped) return;
const next = cron.nextRun(new Date(now()));
if (!next) {
log(`cron: ${entry.name} has no future run — not rescheduling`);
return;
}
const delay = Math.max(MIN_DELAY_MS, next.getTime() - now());
const handle = setTimer(() => {
timers.delete(handle);
if (stopped) return;
// Enqueue the fire on the serial chain, then reschedule the NEXT run.
chain = chain
.then(() => (stopped ? undefined : fireOne(entry)))
.catch((err) => {
const reason = err instanceof Error ? err.message : "cron fire failed";
log(`cron: ${entry.name} fire error: ${reason}`);
})
.finally(() => scheduleNext(entry, cron));
}, delay);
timers.add(handle);
};

const fireOne = async (entry: CronEntry): Promise<void> => {
log(`cron: firing ${entry.name}`);
await deps.fire(entry);
};

for (const entry of deps.entries) {
let cron: Cron;
try {
cron = new Cron(entry.schedule);
} catch (err) {
const reason = err instanceof Error ? err.message : "invalid cron expression";
log(`cron: skipping ${entry.name} — bad schedule "${entry.schedule}": ${reason}`);
continue;
}
scheduleNext(entry, cron);
}

return {
stop(): void {
stopped = true;
for (const h of timers) clearTimer(h);
timers.clear();
},
};
}
27 changes: 26 additions & 1 deletion packages/shell/src/persistent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import { homedir } from "node:os";
import { join } from "node:path";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import { type CronSchedulerHandle, startCronScheduler } from "./cron.js";
import {
createPiRunSession,
type RunSession,
Expand Down Expand Up @@ -90,7 +91,7 @@ function neverResolves(): Promise<void> {
export async function startPersistent(opts: RunPersistentOptions): Promise<PersistentHandle> {
const log = opts.log ?? ((m: string) => console.error(m));
const root = opts.agentsRoot ?? join(homedir(), "agents");
const { provider, model, config } = resolveRunConfig({
const { provider, model, config, cron } = resolveRunConfig({
name: opts.name,
agentsRoot: root,
model: opts.model,
Expand All @@ -101,12 +102,36 @@ export async function startPersistent(opts: RunPersistentOptions): Promise<Persi

log(`[bob] persistent session up for ${opts.name} (${provider}/${model})`);

// Scheduled work: fire each bob.yaml `cron:` prompt INTO this live session on
// its cadence (one gateway, no second `bob run` process). Await the idle
// barrier first so a tick doesn't cut into an in-flight inbound turn; fires
// are serialized by the scheduler. No-op when the agent declares no cron.
let cronScheduler: CronSchedulerHandle | undefined;
if (cron.length > 0) {
log(`[bob] scheduling ${cron.length} cron job(s) for ${opts.name}`);
cronScheduler = startCronScheduler({
entries: cron,
fire: async (entry) => {
try {
await session.waitForIdle?.();
} catch {
// proceed — pi serializes turns regardless
}
await session.prompt(entry.prompt);
},
log,
});
}

let disposed = false;
let disposing: Promise<void> | undefined;
const shutdown = async (): Promise<void> => {
if (disposed) return;
if (disposing) return disposing;
disposing = (async () => {
// Stop scheduling first so a pending cron tick can't fire into a session
// we're about to dispose.
cronScheduler?.stop();
// Await any in-flight turn so we don't cut off a reply mid-stream. The
// RunSession seam exposes `prompt` but not an idle barrier; production's
// pi AgentSession has `agent.waitForIdle()`. We call it best-effort
Expand Down
24 changes: 23 additions & 1 deletion packages/shell/src/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ import {
ModelRegistry,
SessionManager,
} from "@mariozechner/pi-coding-agent";
import { readCron } from "./bob-yaml.js";
import { capabilityConfigEnv, resolveCapabilities } from "./capability-loader.js";
import type { CronEntry } from "./index.js";

// Same regex as init.ts AGENT_NAME — names are filesystem paths, keep them
// strict-safe (no `..`, no `/`, no newlines).
Expand Down Expand Up @@ -214,6 +216,26 @@ export interface ResolvedRunConfig {
provider: string;
model: string;
config: RunSessionConfig;
// bob.yaml `cron:` entries (validated). Only the PERSISTENT runtime uses
// these (it schedules them into the live session); `bob run` ignores them.
cron: CronEntry[];
}

// Parse + validate bob.yaml `cron:` into CronEntry[]. Drops any entry missing
// name/schedule/prompt — a single malformed entry shouldn't stop the agent from
// starting (the scheduler additionally skips an unparseable schedule).
function parseCron(yamlText: string): CronEntry[] {
return readCron(yamlText)
.filter(
(e) =>
typeof e.name === "string" &&
e.name.length > 0 &&
typeof e.schedule === "string" &&
e.schedule.length > 0 &&
typeof e.prompt === "string" &&
e.prompt.length > 0,
)
.map((e) => ({ name: e.name, schedule: e.schedule, prompt: e.prompt }));
}

export function resolveRunConfig(opts: ResolveRunConfigOptions): ResolvedRunConfig {
Expand Down Expand Up @@ -249,7 +271,7 @@ export function resolveRunConfig(opts: ResolveRunConfigOptions): ResolvedRunConf
extensionSources: resolution.extensionSources,
capabilityEnv: capabilityConfigEnv(resolution),
};
return { agentDir, provider, model, config };
return { agentDir, provider, model, config, cron: parseCron(yamlText) };
}

// Real SDK factory: stand up a fresh, in-memory pi AgentSession for the agent,
Expand Down
Loading
Loading