diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..819f629 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,11 @@ +node_modules +.git +.turbo +**/node_modules +**/.next +**/dist +**/coverage +**/.env +**/.env.* +apps/dashboard/.wrangler +apps/dashboard/.dev.vars* diff --git a/README.md b/README.md index 6387387..f1914de 100644 --- a/README.md +++ b/README.md @@ -212,6 +212,27 @@ The dev server uses `DEV_TUNNEL_URL` to allow the tunnel host and configure HMR | `pnpm --filter dashboard test` | Run tests | | `pnpm --filter dashboard deploy` | Build and deploy to Cloudflare Workers | +### Search-service scripts + +| Command | Description | +|---------|------------| +| `pnpm --filter @diffkit/search dev` | Run self-hosted search service locally | +| `pnpm --filter @diffkit/search start` | Start self-hosted search service | +| `pnpm --filter @diffkit/search check-types` | Type-check search service | +| `pnpm --filter @diffkit/search check` | Lint/format-check search service | + +## Self-hosted repo search service (Livegrep + apps/search) + +DiffKit's repo-search MVP is designed as: + +- Cloudflare Worker (`apps/dashboard`) for public API/control-plane orchestration. +- Self-hosted `apps/search` service for mirror sync/index workflow + search proxying. +- Livegrep backend for fast code search. + +For complete setup and deployment instructions (local + VPS), see: + +- `apps/search/README.md` + ## GitHub App Permissions Reference Expanding permissions after users have installed the app will require those installations to approve the new permission set. diff --git a/apps/dashboard/.dev.vars.example b/apps/dashboard/.dev.vars.example index 22ccee5..f6d0f82 100644 --- a/apps/dashboard/.dev.vars.example +++ b/apps/dashboard/.dev.vars.example @@ -114,3 +114,17 @@ DEV_TUNNEL_URL= # real bucket named there (see bucket_name). Your pub URL must be for that bucket — not only # preview_bucket_name (that name is for local simulation naming when not remote). R2_PUBLIC_BASE_URL= + +# ----------------------------------------------------------------------------- +# 7. Search service (apps/search + livegrep) +# ----------------------------------------------------------------------------- +# For local development with apps/search running on localhost:8910. +# The Worker fetches this endpoint for both query and control endpoints. +LIVEGREP_BASE_URL=http://127.0.0.1:8910 +SEARCH_CONTROL_BASE_URL=http://127.0.0.1:8910 + +# Must match SEARCH_CONTROL_TOKEN in apps/search environment (optional but recommended). +SEARCH_CONTROL_TOKEN=change-me + +# Optional: only needed if your apps/search endpoint requires bearer auth for /api/v1/search. +LIVEGREP_API_TOKEN= diff --git a/apps/dashboard/drizzle/0004_diffkit_search_mvp.sql b/apps/dashboard/drizzle/0004_diffkit_search_mvp.sql new file mode 100644 index 0000000..99d366f --- /dev/null +++ b/apps/dashboard/drizzle/0004_diffkit_search_mvp.sql @@ -0,0 +1,57 @@ +CREATE TABLE `search_repo_registry` ( + `id` text PRIMARY KEY NOT NULL, + `provider` text NOT NULL, + `owner` text NOT NULL, + `name` text NOT NULL, + `default_branch` text NOT NULL, + `is_enabled` integer NOT NULL, + `tier` text NOT NULL, + `last_seen_head_sha` text, + `last_indexed_head_sha` text, + `last_synced_at` integer, + `last_indexed_at` integer, + `status` text NOT NULL, + `is_private` integer NOT NULL, + `last_error` text, + `created_at` integer NOT NULL, + `updated_at` integer NOT NULL +); +--> statement-breakpoint +CREATE UNIQUE INDEX `search_repo_registry_provider_owner_name_uidx` ON `search_repo_registry` (`provider`,`owner`,`name`); +--> statement-breakpoint +CREATE INDEX `search_repo_registry_status_idx` ON `search_repo_registry` (`status`); +--> statement-breakpoint +CREATE INDEX `search_repo_registry_tier_status_idx` ON `search_repo_registry` (`tier`,`status`); +--> statement-breakpoint +CREATE INDEX `search_repo_registry_enabled_tier_idx` ON `search_repo_registry` (`is_enabled`,`tier`); +--> statement-breakpoint +CREATE TABLE `search_jobs` ( + `id` text PRIMARY KEY NOT NULL, + `repo_id` text NOT NULL, + `job_type` text NOT NULL, + `priority` text NOT NULL, + `status` text NOT NULL, + `attempt` integer NOT NULL, + `error` text, + `created_at` integer NOT NULL, + `updated_at` integer NOT NULL, + FOREIGN KEY (`repo_id`) REFERENCES `search_repo_registry`(`id`) ON UPDATE no action ON DELETE cascade +); +--> statement-breakpoint +CREATE INDEX `search_jobs_repo_type_status_idx` ON `search_jobs` (`repo_id`,`job_type`,`status`); +--> statement-breakpoint +CREATE INDEX `search_jobs_status_created_idx` ON `search_jobs` (`status`,`created_at`); +--> statement-breakpoint +CREATE TABLE `search_index_builds` ( + `id` text PRIMARY KEY NOT NULL, + `build_version` text NOT NULL, + `repo_count` integer NOT NULL, + `started_at` integer NOT NULL, + `finished_at` integer, + `status` text NOT NULL, + `manifest_r2_key` text +); +--> statement-breakpoint +CREATE UNIQUE INDEX `search_index_builds_build_version_uidx` ON `search_index_builds` (`build_version`); +--> statement-breakpoint +CREATE INDEX `search_index_builds_status_started_idx` ON `search_index_builds` (`status`,`started_at`); diff --git a/apps/dashboard/drizzle/meta/_journal.json b/apps/dashboard/drizzle/meta/_journal.json index b767305..ad8d69c 100644 --- a/apps/dashboard/drizzle/meta/_journal.json +++ b/apps/dashboard/drizzle/meta/_journal.json @@ -15,6 +15,13 @@ "when": 1775606872196, "tag": "0001_outstanding_blizzard", "breakpoints": true + }, + { + "idx": 2, + "version": "6", + "when": 1776921000000, + "tag": "0004_diffkit_search_mvp", + "breakpoints": true } ] } diff --git a/apps/dashboard/src/components/navigation/command-palette.tsx b/apps/dashboard/src/components/navigation/command-palette.tsx index 1a7b729..c0908a6 100644 --- a/apps/dashboard/src/components/navigation/command-palette.tsx +++ b/apps/dashboard/src/components/navigation/command-palette.tsx @@ -11,16 +11,21 @@ import { import { cn } from "@diffkit/ui/lib/utils"; import { useQuery, useQueryClient } from "@tanstack/react-query"; import { getRouteApi, useRouter } from "@tanstack/react-router"; +import type { ReactNode } from "react"; import { useEffect, useMemo, useRef, useState } from "react"; import type { CommandItem, CommandItemMeta } from "#/lib/command-palette/types"; import { cacheSearchResults, getCommandSearchItems, + getSearchCodeCommandItems, useCommandItems, } from "#/lib/command-palette/use-command-items"; import { useCommandPalette } from "#/lib/command-palette/use-command-palette"; import { formatRelativeTime } from "#/lib/format-relative-time"; -import { githubCommandPaletteSearchQueryOptions } from "#/lib/github.query"; +import { + codeSearchQueryOptions, + githubCommandPaletteSearchQueryOptions, +} from "#/lib/github.query"; const routeApi = getRouteApi("/_protected"); @@ -31,6 +36,7 @@ export function CommandPalette() { const { user } = routeApi.useRouteContext(); const scope = useMemo(() => ({ userId: user.id }), [user.id]); const [search, setSearch] = useState(""); + const [isCodeSearchDisabled, setIsCodeSearchDisabled] = useState(false); const debouncedSearch = useDebouncedValue(search, 250); const trimmedDebouncedSearch = debouncedSearch.trim(); const shouldSearchGitHub = open && trimmedDebouncedSearch.length >= 2; @@ -42,13 +48,39 @@ export function CommandPalette() { ), enabled: shouldSearchGitHub, }); + const codeSearchQuery = useQuery({ + ...codeSearchQueryOptions(scope, { + q: trimmedDebouncedSearch, + page: "1", + }), + enabled: shouldSearchGitHub && !isCodeSearchDisabled, + }); const searchItems = useMemo( () => getCommandSearchItems(githubSearchQuery.data), [githubSearchQuery.data], ); + const codeSearchItems = useMemo( + () => + getSearchCodeCommandItems(codeSearchQuery.data, async (item) => { + const [owner, repo, ...rest] = item.repo.split("/"); + if (!(owner && repo) || rest.length > 0) { + return; + } + const routeSplat = `main/${item.path}`; + await router.navigate({ + to: "/$owner/$repo/blob/$", + params: { + owner, + repo, + _splat: routeSplat, + }, + }); + }), + [codeSearchQuery.data, router], + ); const allItems = useMemo( - () => mergeCommandItems(items, searchItems), - [items, searchItems], + () => mergeCommandItems(items, searchItems, codeSearchItems), + [items, searchItems, codeSearchItems], ); const cachedSearchDataRef = useRef(githubSearchQuery.data); @@ -59,6 +91,12 @@ export function CommandPalette() { cacheSearchResults(queryClient, scope, data); }, [githubSearchQuery.data, queryClient, scope]); + useEffect(() => { + if (codeSearchQuery.data?.code_search_disabled) { + setIsCodeSearchDisabled(true); + } + }, [codeSearchQuery.data?.code_search_disabled]); + const groups = new Map(); for (const item of allItems) { const list = groups.get(item.group) ?? []; @@ -94,7 +132,9 @@ export function CommandPalette() { {getEmptyMessage( search, - shouldSearchGitHub && githubSearchQuery.isFetching, + shouldSearchGitHub && + (githubSearchQuery.isFetching || + (!isCodeSearchDisabled && codeSearchQuery.isFetching)), )} {Array.from(groups.entries()).map(([groupName, groupItems]) => ( @@ -110,10 +150,17 @@ export function CommandPalette() { className={cn("size-4 shrink-0", item.iconClassName)} /> )} -
-

{item.label}

- {item.meta && } -
+ {item.meta?.codeSearch ? ( + + ) : ( +
+

{item.label}

+ {item.meta && } +
+ )} {item.meta?.comments != null && item.meta.comments > 0 && ( @@ -147,10 +194,11 @@ function useDebouncedValue(value: string, delayMs: number) { function mergeCommandItems( localItems: CommandItem[], searchItems: CommandItem[], + codeItems: CommandItem[], ) { const itemsById = new Map(); - for (const item of [...localItems, ...searchItems]) { + for (const item of [...localItems, ...searchItems, ...codeItems]) { if (!itemsById.has(item.id)) { itemsById.set(item.id, item); } @@ -203,3 +251,69 @@ function ItemMeta({ meta }: { meta: CommandItemMeta }) { ); } + +function CodeSearchItemMeta({ + meta, + query, +}: { + meta: NonNullable; + query: string; +}) { + return ( +
+

{meta.repo}

+

{meta.path}

+
+ {meta.snippets.map((snippet) => ( +
+ + {snippet.lineNumber} + + + {highlightQueryMatch(snippet.line, query)} + +
+ ))} +
+
+ ); +} + +function highlightQueryMatch(text: string, query: string): ReactNode { + const normalizedQuery = query.trim(); + if (!normalizedQuery) { + return text; + } + + const lowerText = text.toLowerCase(); + const lowerQuery = normalizedQuery.toLowerCase(); + const parts: ReactNode[] = []; + let cursor = 0; + let hitIndex = 0; + + while (cursor < text.length) { + const foundAt = lowerText.indexOf(lowerQuery, cursor); + if (foundAt === -1) { + parts.push(text.slice(cursor)); + break; + } + if (foundAt > cursor) { + parts.push(text.slice(cursor, foundAt)); + } + const end = foundAt + normalizedQuery.length; + parts.push( + + {text.slice(foundAt, end)} + , + ); + cursor = end; + } + + return parts; +} diff --git a/apps/dashboard/src/db/schema.ts b/apps/dashboard/src/db/schema.ts index a6b45d0..e2c3e15 100644 --- a/apps/dashboard/src/db/schema.ts +++ b/apps/dashboard/src/db/schema.ts @@ -1,4 +1,10 @@ -import { index, integer, sqliteTable, text } from "drizzle-orm/sqlite-core"; +import { + index, + integer, + sqliteTable, + text, + uniqueIndex, +} from "drizzle-orm/sqlite-core"; export const user = sqliteTable("user", { id: text("id").primaryKey(), @@ -93,3 +99,95 @@ export const githubCacheNamespace = sqliteTable("github_cache_namespace", { version: integer("version").notNull(), updatedAt: integer("updated_at").notNull(), }); + +export const searchRepoRegistry = sqliteTable( + "search_repo_registry", + { + id: text("id").primaryKey(), + provider: text("provider", { enum: ["github"] }).notNull(), + owner: text("owner").notNull(), + name: text("name").notNull(), + defaultBranch: text("default_branch").notNull(), + isEnabled: integer("is_enabled", { mode: "boolean" }).notNull(), + tier: text("tier", { enum: ["hot", "warm", "cold"] }).notNull(), + lastSeenHeadSha: text("last_seen_head_sha"), + lastIndexedHeadSha: text("last_indexed_head_sha"), + lastSyncedAt: integer("last_synced_at", { mode: "timestamp" }), + lastIndexedAt: integer("last_indexed_at", { mode: "timestamp" }), + status: text("status", { + enum: ["ready", "syncing", "indexing", "not_indexed", "failed"], + }).notNull(), + isPrivate: integer("is_private", { mode: "boolean" }).notNull(), + lastError: text("last_error"), + createdAt: integer("created_at", { mode: "timestamp" }).notNull(), + updatedAt: integer("updated_at", { mode: "timestamp" }).notNull(), + }, + (table) => ({ + providerOwnerNameUidx: uniqueIndex( + "search_repo_registry_provider_owner_name_uidx", + ).on(table.provider, table.owner, table.name), + statusIdx: index("search_repo_registry_status_idx").on(table.status), + tierStatusIdx: index("search_repo_registry_tier_status_idx").on( + table.tier, + table.status, + ), + enabledTierIdx: index("search_repo_registry_enabled_tier_idx").on( + table.isEnabled, + table.tier, + ), + }), +); + +export const searchJobs = sqliteTable( + "search_jobs", + { + id: text("id").primaryKey(), + repoId: text("repo_id") + .notNull() + .references(() => searchRepoRegistry.id, { onDelete: "cascade" }), + jobType: text("job_type", { enum: ["sync", "index"] }).notNull(), + priority: text("priority", { + enum: ["interactive", "normal", "backfill"], + }).notNull(), + status: text("status", { + enum: ["queued", "running", "done", "failed"], + }).notNull(), + attempt: integer("attempt").notNull(), + error: text("error"), + createdAt: integer("created_at", { mode: "timestamp" }).notNull(), + updatedAt: integer("updated_at", { mode: "timestamp" }).notNull(), + }, + (table) => ({ + repoTypeStatusIdx: index("search_jobs_repo_type_status_idx").on( + table.repoId, + table.jobType, + table.status, + ), + statusCreatedIdx: index("search_jobs_status_created_idx").on( + table.status, + table.createdAt, + ), + }), +); + +export const searchIndexBuilds = sqliteTable( + "search_index_builds", + { + id: text("id").primaryKey(), + buildVersion: text("build_version").notNull(), + repoCount: integer("repo_count").notNull(), + startedAt: integer("started_at", { mode: "timestamp" }).notNull(), + finishedAt: integer("finished_at", { mode: "timestamp" }), + status: text("status").notNull(), + manifestR2Key: text("manifest_r2_key"), + }, + (table) => ({ + buildVersionUidx: uniqueIndex("search_index_builds_build_version_uidx").on( + table.buildVersion, + ), + statusStartedIdx: index("search_index_builds_status_started_idx").on( + table.status, + table.startedAt, + ), + }), +); diff --git a/apps/dashboard/src/entry-worker.ts b/apps/dashboard/src/entry-worker.ts index 52d434d..09e0535 100644 --- a/apps/dashboard/src/entry-worker.ts +++ b/apps/dashboard/src/entry-worker.ts @@ -1,4 +1,9 @@ import startEntry from "@tanstack/react-start/server-entry"; +import { + handleSearchQueue, + handleSearchScheduled, + maybeHandleSearchRequest, +} from "#/lib/search-worker"; export { SignalRelay } from "./lib/signal-relay.server"; @@ -53,6 +58,14 @@ export default { ): Promise { const url = new URL(request.url); + const searchResponse = await maybeHandleSearchRequest({ + request, + env: env as unknown as Cloudflare.Env, + }); + if (searchResponse) { + return applySecurityHeaders(searchResponse); + } + if ( url.pathname === "/api/ws/signals" && request.headers.get("Upgrade") === "websocket" @@ -87,4 +100,27 @@ export default { return applySecurityHeaders(response); }, + async queue( + batch: MessageBatch, + env: Record, + ctx: ExecutionContext, + ) { + await handleSearchQueue({ + batch: batch as MessageBatch< + import("#/lib/search-worker").SearchQueueMessage + >, + env: env as unknown as Cloudflare.Env, + ctx, + }); + }, + async scheduled( + _controller: ScheduledController, + env: Record, + ctx: ExecutionContext, + ) { + await handleSearchScheduled({ + env: env as unknown as Cloudflare.Env, + ctx, + }); + }, }; diff --git a/apps/dashboard/src/env.d.ts b/apps/dashboard/src/env.d.ts index e07a36c..62f5931 100644 --- a/apps/dashboard/src/env.d.ts +++ b/apps/dashboard/src/env.d.ts @@ -5,6 +5,10 @@ declare namespace Cloudflare { interface Env { /** Public base URL for R2 comment media (custom domain or r2.dev). */ R2_PUBLIC_BASE_URL?: string; + LIVEGREP_BASE_URL?: string; + LIVEGREP_API_TOKEN?: string; + SEARCH_CONTROL_BASE_URL?: string; + SEARCH_CONTROL_TOKEN?: string; GITHUB_OAUTH_CLIENT_ID?: string; GITHUB_OAUTH_CLIENT_SECRET?: string; GITHUB_APP_CLIENT_ID?: string; @@ -18,5 +22,8 @@ declare namespace Cloudflare { BETTER_AUTH_SECRET: string; BETTER_AUTH_URL: string; SIGNAL_RELAY: DurableObjectNamespace; + REPO_SYNC_QUEUE?: Queue; + INDEX_BUILD_QUEUE?: Queue; + SEARCH_INDEX_ARTIFACTS?: R2Bucket; } } diff --git a/apps/dashboard/src/lib/command-palette/types.ts b/apps/dashboard/src/lib/command-palette/types.ts index ee181d1..996e502 100644 --- a/apps/dashboard/src/lib/command-palette/types.ts +++ b/apps/dashboard/src/lib/command-palette/types.ts @@ -10,6 +10,15 @@ export type CommandItemMeta = { updatedAt?: string; language?: string | null; stars?: number; + codeSearch?: { + repo: string; + path: string; + totalMatches: number; + snippets: Array<{ + lineNumber: number; + line: string; + }>; + }; }; export type CommandItem = { diff --git a/apps/dashboard/src/lib/command-palette/use-command-items.ts b/apps/dashboard/src/lib/command-palette/use-command-items.ts index 8fff257..d9958e9 100644 --- a/apps/dashboard/src/lib/command-palette/use-command-items.ts +++ b/apps/dashboard/src/lib/command-palette/use-command-items.ts @@ -1,5 +1,6 @@ import { CodeIcon, + FileIcon, GitMergeIcon, GitPullRequestClosedIcon, GitPullRequestDraftIcon, @@ -20,6 +21,10 @@ import type { PullSummary, UserRepoSummary, } from "#/lib/github.types"; +import type { + SearchCodeResponse, + SearchCodeResultItem, +} from "#/lib/search.types"; import { getRegisteredCommands, subscribeCommands } from "./registry"; import type { CommandItem } from "./types"; @@ -260,6 +265,73 @@ export function getCommandSearchItems( return items; } +export function getSearchCodeCommandItems( + result: SearchCodeResponse | undefined, + onOpenResult: (item: SearchCodeResultItem) => void | Promise, +): CommandItem[] { + if (!result) { + return []; + } + + const grouped = new Map(); + for (const item of result.results.slice(0, 50)) { + const key = `${item.repo}:${item.path}`; + const existing = grouped.get(key); + if (existing) { + existing.push(item); + } else { + grouped.set(key, [item]); + } + } + + const items: CommandItem[] = []; + for (const itemsForFile of Array.from(grouped.values()).slice(0, 12)) { + const first = itemsForFile[0]; + if (!first) { + continue; + } + const snippets = itemsForFile + .slice() + .sort((a, b) => a.line_number - b.line_number) + .filter( + (item, index, list) => + index === 0 || item.line_number !== list[index - 1]?.line_number, + ) + .slice(0, 3) + .map((item) => ({ + lineNumber: item.line_number, + line: item.line, + })); + + items.push({ + id: `code-search:${first.repo}:${first.path}`, + label: first.path, + group: "Code Search", + icon: FileIcon, + keywords: [ + first.repo, + first.path, + ...snippets.map((snippet) => snippet.line), + ].filter(Boolean), + action: { + type: "execute", + fn: () => onOpenResult(first), + }, + meta: { + repo: first.repo, + codeSearch: { + repo: first.repo, + path: first.path, + totalMatches: itemsForFile.length, + snippets, + }, + }, + }); + } + + return items; +} + export function cacheSearchResults( queryClient: QueryClient, scope: GitHubQueryScope, diff --git a/apps/dashboard/src/lib/github.query.ts b/apps/dashboard/src/lib/github.query.ts index 154f9a4..caa31c3 100644 --- a/apps/dashboard/src/lib/github.query.ts +++ b/apps/dashboard/src/lib/github.query.ts @@ -53,6 +53,8 @@ import { import { githubCachePolicy } from "./github-cache-policy"; import { ensureDefinedQueryData } from "./query-data"; import type { ReposHubInput } from "./repos-hub-filter"; +import { searchCode } from "./search.functions"; +import type { SearchCodeInput } from "./search.types"; type RepoState = "all" | "closed" | "open"; type PullSort = "created" | "long-running" | "popularity" | "updated"; @@ -145,6 +147,8 @@ export const githubQueryKeys = { scope: GitHubQueryScope, input: CommandPaletteSearchInput, ) => ["github", scope.userId, "search", "commandPalette", input] as const, + code: (scope: GitHubQueryScope, input: SearchCodeInput) => + ["github", scope.userId, "search", "code", input] as const, }, pulls: { mine: (scope: GitHubQueryScope) => @@ -343,6 +347,18 @@ export function githubCommandPaletteSearchQueryOptions( }); } +export function codeSearchQueryOptions( + scope: GitHubQueryScope, + input: SearchCodeInput, +) { + return queryOptions({ + queryKey: githubQueryKeys.search.code(scope, input), + queryFn: () => searchCode({ data: input }), + staleTime: 30 * 1000, + gcTime: 5 * 60 * 1000, + }); +} + export function githubMyPullsQueryOptions(scope: GitHubQueryScope) { return queryOptions({ queryKey: githubQueryKeys.pulls.mine(scope), diff --git a/apps/dashboard/src/lib/search-worker.ts b/apps/dashboard/src/lib/search-worker.ts new file mode 100644 index 0000000..b614bbd --- /dev/null +++ b/apps/dashboard/src/lib/search-worker.ts @@ -0,0 +1,1185 @@ +import { and, desc, eq, isNull, lte, or } from "drizzle-orm"; +import { getDb } from "#/db"; +import { searchIndexBuilds, searchJobs, searchRepoRegistry } from "#/db/schema"; +import { getAuth } from "#/lib/auth.server"; +import { getGitHubClientByUserId } from "#/lib/auth-runtime"; +import { PRIVATE_ROUTE_HEADERS } from "#/lib/seo"; + +type SearchRepoTier = "hot" | "warm" | "cold"; +type SearchRepoStatus = + | "ready" + | "syncing" + | "indexing" + | "not_indexed" + | "failed"; +type SearchJobType = "sync" | "index"; +type SearchJobPriority = "interactive" | "normal" | "backfill"; +type SearchJobStatus = "queued" | "running" | "done" | "failed"; + +export type SearchQueueMessage = { + jobId: string; + repoId: string; + jobType: SearchJobType; + priority: SearchJobPriority; + trigger: "bootstrap" | "scheduled" | "retry" | "not_indexed"; +}; + +type SearchRepoRegistryRow = typeof searchRepoRegistry.$inferSelect; +type SearchJobRow = typeof searchJobs.$inferSelect; + +const REPO_PROVIDER = "github"; +const DEFAULT_REPO_TIER: SearchRepoTier = "hot"; +const REPO_SYNC_CADENCE_SECONDS: Record = { + hot: 15 * 60, + warm: 3 * 60 * 60, + cold: 24 * 60 * 60, +}; +const MAX_REPO_SIZE_MB = 10_000; +const MAX_REPO_SIZE_KB = MAX_REPO_SIZE_MB * 1024; +const MAX_QUEUE_RETRIES = 3; +const MANIFEST_RETENTION_DAYS = 30; + +function nowSeconds() { + return Math.floor(Date.now() / 1000); +} + +function toEpochSeconds(value: Date | number | null) { + if (typeof value === "number") { + return value; + } + return value ? Math.floor(value.getTime() / 1000) : null; +} + +function traceIdFromRequest(request: Request) { + const cfRay = request.headers.get("cf-ray"); + return cfRay || crypto.randomUUID(); +} + +function json(body: unknown, status = 200, headers?: HeadersInit): Response { + const response = Response.json(body, { status }); + response.headers.set("X-Robots-Tag", PRIVATE_ROUTE_HEADERS["X-Robots-Tag"]); + if (headers) { + for (const [headerName, headerValue] of Object.entries(headers)) { + response.headers.set(headerName, headerValue); + } + } + return response; +} + +async function requireSession(request: Request) { + const session = await getAuth().api.getSession({ + headers: request.headers, + }); + return session; +} + +function parseRepoRef(repoRef: string | null) { + if (!repoRef) { + return null; + } + const [owner, name, ...rest] = repoRef + .split("/") + .map((segment) => segment.trim()) + .filter(Boolean); + if (!owner || !name || rest.length > 0) { + return null; + } + return { owner, name }; +} + +async function getRepoByOwnerName(owner: string, name: string) { + const db = getDb(); + return db + .select() + .from(searchRepoRegistry) + .where( + and( + eq(searchRepoRegistry.provider, REPO_PROVIDER), + eq(searchRepoRegistry.owner, owner), + eq(searchRepoRegistry.name, name), + ), + ) + .get(); +} + +async function getRepoById(id: string) { + const db = getDb(); + return db + .select() + .from(searchRepoRegistry) + .where(eq(searchRepoRegistry.id, id)) + .get(); +} + +async function fetchGitHubRepoForUser({ + name, + owner, + userId, +}: { + owner: string; + name: string; + userId: string; +}) { + const github = await getGitHubClientByUserId(userId); + const { data } = await github.request("GET /repos/{owner}/{repo}", { + owner, + repo: name, + }); + return data; +} + +async function ensurePrivateRepoAccess({ + repo, + userId, +}: { + repo: SearchRepoRegistryRow; + userId: string; +}) { + if (!repo.isPrivate) { + return true; + } + try { + await fetchGitHubRepoForUser({ + userId, + owner: repo.owner, + name: repo.name, + }); + return true; + } catch { + return false; + } +} + +function etaBucketForTier(tier: SearchRepoTier) { + if (tier === "hot") { + return "<10m"; + } + if (tier === "warm") { + return "10-30m"; + } + return ">30m"; +} + +async function createSearchJob({ + jobType, + priority, + repoId, + status, + error, +}: { + repoId: string; + jobType: SearchJobType; + priority: SearchJobPriority; + status?: SearchJobStatus; + error?: string | null; +}) { + const db = getDb(); + const createdAt = nowSeconds(); + const duplicateQueuedJob = await db + .select({ id: searchJobs.id }) + .from(searchJobs) + .where( + and( + eq(searchJobs.repoId, repoId), + eq(searchJobs.jobType, jobType), + eq(searchJobs.status, "queued"), + ), + ) + .get(); + if (duplicateQueuedJob) { + return duplicateQueuedJob.id; + } + const jobId = crypto.randomUUID(); + await db.insert(searchJobs).values({ + id: jobId, + repoId, + jobType, + priority, + status: status ?? "queued", + attempt: 0, + error: error ?? null, + createdAt: new Date(createdAt * 1000), + updatedAt: new Date(createdAt * 1000), + }); + return jobId; +} + +async function enqueueSearchJob({ + jobType, + priority, + repoId, + trigger, + env, +}: { + repoId: string; + jobType: SearchJobType; + priority: SearchJobPriority; + trigger: SearchQueueMessage["trigger"]; + env: Cloudflare.Env; +}) { + const jobId = await createSearchJob({ + repoId, + jobType, + priority, + }); + const payload: SearchQueueMessage = { + jobId, + repoId, + jobType, + priority, + trigger, + }; + + if (jobType === "sync" && env.REPO_SYNC_QUEUE) { + await env.REPO_SYNC_QUEUE.send(payload); + return jobId; + } + if (jobType === "index" && env.INDEX_BUILD_QUEUE) { + await env.INDEX_BUILD_QUEUE.send(payload); + return jobId; + } + + // If queues are not configured yet, keep durable metadata by marking as failed. + const db = getDb(); + await db + .update(searchJobs) + .set({ + status: "failed", + error: "Queue binding missing", + attempt: 1, + updatedAt: new Date(nowSeconds() * 1000), + }) + .where(eq(searchJobs.id, jobId)); + + return jobId; +} + +async function upsertRepoFromGitHub({ + name, + owner, + userId, +}: { + owner: string; + name: string; + userId: string; +}) { + const githubRepo = await fetchGitHubRepoForUser({ + owner, + name, + userId, + }); + if ((githubRepo.size ?? 0) > MAX_REPO_SIZE_KB) { + throw new Error( + `Repository exceeds max size ${MAX_REPO_SIZE_MB}MB for MVP onboarding.`, + ); + } + + const db = getDb(); + const existing = await getRepoByOwnerName(owner, name); + const now = nowSeconds(); + const values = { + provider: "github" as const, + owner, + name, + defaultBranch: githubRepo.default_branch || "main", + isEnabled: true, + tier: DEFAULT_REPO_TIER, + status: "not_indexed" as const, + isPrivate: Boolean(githubRepo.private), + lastError: null, + updatedAt: new Date(now * 1000), + }; + + if (existing) { + await db + .update(searchRepoRegistry) + .set(values) + .where(eq(searchRepoRegistry.id, existing.id)); + return { + repoId: existing.id, + isPrivate: Boolean(githubRepo.private), + defaultBranch: values.defaultBranch, + }; + } + + const repoId = crypto.randomUUID(); + await db.insert(searchRepoRegistry).values({ + id: repoId, + ...values, + createdAt: new Date(now * 1000), + lastIndexedAt: null, + lastIndexedHeadSha: null, + lastSeenHeadSha: null, + lastSyncedAt: null, + }); + + return { + repoId, + isPrivate: Boolean(githubRepo.private), + defaultBranch: values.defaultBranch, + }; +} + +async function ensureRepoBootstrapJobs({ + env, + repoId, + priority, + trigger, +}: { + env: Cloudflare.Env; + repoId: string; + priority: SearchJobPriority; + trigger: SearchQueueMessage["trigger"]; +}) { + await enqueueSearchJob({ + env, + repoId, + jobType: "sync", + priority, + trigger, + }); + await enqueueSearchJob({ + env, + repoId, + jobType: "index", + priority, + trigger, + }); +} + +function parseSearchResultItem(item: unknown) { + if (!item || typeof item !== "object") { + return null; + } + const row = item as Record; + const repo = + typeof row.repo === "string" + ? row.repo + : typeof row.tree === "string" + ? row.tree + : null; + const path = typeof row.path === "string" ? row.path : null; + const line = typeof row.line === "string" ? row.line : ""; + const lineNumberRaw = row.line_number ?? row.lno; + const lineNumber = + typeof lineNumberRaw === "number" + ? lineNumberRaw + : typeof lineNumberRaw === "string" + ? Number.parseInt(lineNumberRaw, 10) + : Number.NaN; + + if ( + !repo || + !path || + !Number.isFinite(lineNumber) || + (lineNumber as number) <= 0 + ) { + return null; + } + + return { + repo, + path, + line_number: lineNumber, + line, + context_before: Array.isArray(row.context_before) + ? row.context_before.filter( + (entry): entry is string => typeof entry === "string", + ) + : [], + context_after: Array.isArray(row.context_after) + ? row.context_after.filter( + (entry): entry is string => typeof entry === "string", + ) + : [], + }; +} + +function normalizeLivegrepResults(payload: unknown) { + if (!payload || typeof payload !== "object") { + return { + results: [] as Array>, + partial: true, + }; + } + const record = payload as Record; + const source = Array.isArray(record.results) + ? record.results + : Array.isArray(record.data) + ? record.data + : []; + const results = source + .map(parseSearchResultItem) + .filter( + (item): item is NonNullable> => + Boolean(item), + ); + const partial = Boolean(record.partial); + return { + results, + partial, + }; +} + +async function queryLivegrep({ + env, + q, + repo, + path, + lang, + page, + traceId, +}: { + env: Cloudflare.Env; + q: string; + repo?: string | null; + path?: string | null; + lang?: string | null; + page?: string | null; + traceId: string; +}) { + if (!env.LIVEGREP_BASE_URL) { + throw new Error("LIVEGREP_BASE_URL is not configured"); + } + const endpoint = new URL("/api/v1/search", env.LIVEGREP_BASE_URL); + endpoint.searchParams.set("q", q); + if (repo) endpoint.searchParams.set("repo", repo); + if (path) endpoint.searchParams.set("path", path); + if (lang) endpoint.searchParams.set("lang", lang); + if (page) endpoint.searchParams.set("page", page); + + const response = await fetch(endpoint, { + method: "GET", + headers: { + Accept: "application/json", + ...(env.LIVEGREP_API_TOKEN + ? { Authorization: `Bearer ${env.LIVEGREP_API_TOKEN}` } + : {}), + "X-Trace-Id": traceId, + }, + signal: AbortSignal.timeout(7_500), + }); + + if (!response.ok) { + throw new Error(`Livegrep search failed with status ${response.status}`); + } + + return normalizeLivegrepResults(await response.json()); +} + +function toRepoStatusPayload(repo: SearchRepoRegistryRow) { + return { + status: + repo.status === "not_indexed" ? "NOT_INDEXED" : repo.status.toUpperCase(), + last_indexed_head_sha: repo.lastIndexedHeadSha, + last_seen_head_sha: repo.lastSeenHeadSha, + last_synced_at: toEpochSeconds(repo.lastSyncedAt), + last_indexed_at: toEpochSeconds(repo.lastIndexedAt), + tier: repo.tier, + eta_bucket: + repo.status === "not_indexed" ? etaBucketForTier(repo.tier) : undefined, + }; +} + +async function handleSearchGet({ + env, + request, +}: { + request: Request; + env: Cloudflare.Env; +}) { + const session = await requireSession(request); + if (!session) { + return json({ error: "Unauthorized" }, 401); + } + + const traceId = traceIdFromRequest(request); + const url = new URL(request.url); + const q = url.searchParams.get("q")?.trim() ?? ""; + if (!q) { + return json( + { error: "Missing required query param q", trace_id: traceId }, + 400, + ); + } + + const repoParam = url.searchParams.get("repo"); + const path = url.searchParams.get("path"); + const lang = url.searchParams.get("lang"); + const page = url.searchParams.get("page"); + + const repoStatus: Record> = {}; + let repoFilterForLivegrep: string | null = null; + + if (repoParam) { + const parsedRepo = parseRepoRef(repoParam); + if (!parsedRepo) { + return json( + { error: "repo must be formatted as owner/name", trace_id: traceId }, + 400, + ); + } + + let repo = await getRepoByOwnerName(parsedRepo.owner, parsedRepo.name); + if (!repo) { + try { + const created = await upsertRepoFromGitHub({ + owner: parsedRepo.owner, + name: parsedRepo.name, + userId: session.user.id, + }); + await ensureRepoBootstrapJobs({ + env, + repoId: created.repoId, + priority: "interactive", + trigger: "not_indexed", + }); + repo = await getRepoById(created.repoId); + } catch (error) { + return json( + { + error: + error instanceof Error + ? error.message + : "Unable to register repository for search", + trace_id: traceId, + }, + 403, + ); + } + } + + if (!repo) { + return json( + { error: "Repository could not be resolved", trace_id: traceId }, + 404, + ); + } + + if (!(await ensurePrivateRepoAccess({ repo, userId: session.user.id }))) { + return json( + { error: "Forbidden for private repository", trace_id: traceId }, + 403, + ); + } + + repoStatus[`${repo.owner}/${repo.name}`] = toRepoStatusPayload(repo); + repoFilterForLivegrep = `${repo.owner}/${repo.name}`; + + if (repo.status === "not_indexed") { + await ensureRepoBootstrapJobs({ + env, + repoId: repo.id, + priority: "interactive", + trigger: "not_indexed", + }); + return json({ + results: [], + repo_status: repoStatus, + partial: false, + trace_id: traceId, + }); + } + } + + const normalized = await queryLivegrep({ + env, + q, + repo: repoFilterForLivegrep, + path, + lang, + page, + traceId, + }); + + return json({ + results: normalized.results, + repo_status: repoStatus, + partial: normalized.partial, + trace_id: traceId, + }); +} + +async function handleRepoOnboarding({ + env, + request, +}: { + request: Request; + env: Cloudflare.Env; +}) { + const session = await requireSession(request); + if (!session) { + return json({ error: "Unauthorized" }, 401); + } + + let payload: unknown; + try { + payload = await request.json(); + } catch { + return json({ error: "Invalid JSON body" }, 400); + } + + if (!payload || typeof payload !== "object") { + return json({ error: "Invalid payload" }, 400); + } + const { provider, owner, name } = payload as Record; + if (provider !== REPO_PROVIDER) { + return json({ error: "Unsupported provider" }, 400); + } + if ( + typeof owner !== "string" || + owner.trim().length === 0 || + typeof name !== "string" || + name.trim().length === 0 + ) { + return json({ error: "owner and name are required" }, 400); + } + + try { + const repo = await upsertRepoFromGitHub({ + owner: owner.trim(), + name: name.trim(), + userId: session.user.id, + }); + await ensureRepoBootstrapJobs({ + env, + repoId: repo.repoId, + priority: "interactive", + trigger: "bootstrap", + }); + const saved = await getRepoById(repo.repoId); + return json( + { + repo: saved + ? { + id: saved.id, + provider: saved.provider, + owner: saved.owner, + name: saved.name, + default_branch: saved.defaultBranch, + status: saved.status, + tier: saved.tier, + is_private: saved.isPrivate, + } + : null, + job_priority: "interactive", + }, + 202, + ); + } catch (error) { + const message = + error instanceof Error ? error.message : "Failed to onboard repository"; + return json({ error: message }, 400); + } +} + +async function getLatestFailedJob(repoId: string) { + const db = getDb(); + return db + .select() + .from(searchJobs) + .where(and(eq(searchJobs.repoId, repoId), eq(searchJobs.status, "failed"))) + .orderBy(desc(searchJobs.updatedAt)) + .get(); +} + +async function handleRepoStatus({ + request, + repoId, +}: { + request: Request; + repoId: string; +}) { + const session = await requireSession(request); + if (!session) { + return json({ error: "Unauthorized" }, 401); + } + + const repo = await getRepoById(repoId); + if (!repo) { + return json({ error: "Repository not found" }, 404); + } + + if (!(await ensurePrivateRepoAccess({ repo, userId: session.user.id }))) { + return json({ error: "Forbidden for private repository" }, 403); + } + + const now = nowSeconds(); + const lastIndexedAt = toEpochSeconds(repo.lastIndexedAt); + const latestFailedJob = await getLatestFailedJob(repo.id); + + return json({ + repo_id: repo.id, + provider: repo.provider, + owner: repo.owner, + name: repo.name, + status: repo.status, + last_indexed_commit: repo.lastIndexedHeadSha, + staleness_seconds: lastIndexedAt ? Math.max(0, now - lastIndexedAt) : null, + latest_error: latestFailedJob?.error ?? repo.lastError ?? null, + last_synced_at: toEpochSeconds(repo.lastSyncedAt), + last_indexed_at: lastIndexedAt, + tier: repo.tier, + }); +} + +async function fetchSearchControl({ + env, + path, + body, + traceId, +}: { + env: Cloudflare.Env; + path: string; + body: Record; + traceId: string; +}) { + const baseUrl = env.SEARCH_CONTROL_BASE_URL || env.LIVEGREP_BASE_URL; + if (!baseUrl) { + throw new Error("SEARCH_CONTROL_BASE_URL is not configured"); + } + + const response = await fetch(new URL(path, baseUrl), { + method: "POST", + headers: { + "Content-Type": "application/json", + Accept: "application/json", + ...(env.SEARCH_CONTROL_TOKEN + ? { Authorization: `Bearer ${env.SEARCH_CONTROL_TOKEN}` } + : {}), + "X-Trace-Id": traceId, + }, + body: JSON.stringify(body), + signal: AbortSignal.timeout(20_000), + }); + + if (!response.ok) { + throw new Error(`Search control call failed with ${response.status}`); + } + + try { + return (await response.json()) as Record; + } catch { + return {}; + } +} + +async function setJobStatus({ + jobId, + status, + error, + attempt, +}: { + jobId: string; + status: SearchJobStatus; + error?: string | null; + attempt?: number; +}) { + const db = getDb(); + await db + .update(searchJobs) + .set({ + status, + error: error ?? null, + ...(typeof attempt === "number" ? { attempt } : {}), + updatedAt: new Date(nowSeconds() * 1000), + }) + .where(eq(searchJobs.id, jobId)); +} + +async function setRepoStatus({ + repoId, + status, + lastError, + lastSeenHeadSha, + lastIndexedHeadSha, + lastSyncedAt, + lastIndexedAt, +}: { + repoId: string; + status: SearchRepoStatus; + lastError?: string | null; + lastSeenHeadSha?: string | null; + lastIndexedHeadSha?: string | null; + lastSyncedAt?: number | null; + lastIndexedAt?: number | null; +}) { + const db = getDb(); + await db + .update(searchRepoRegistry) + .set({ + status, + ...(lastError !== undefined ? { lastError } : {}), + ...(lastSeenHeadSha !== undefined ? { lastSeenHeadSha } : {}), + ...(lastIndexedHeadSha !== undefined ? { lastIndexedHeadSha } : {}), + ...(lastSyncedAt !== undefined + ? { + lastSyncedAt: + lastSyncedAt === null ? null : new Date(lastSyncedAt * 1000), + } + : {}), + ...(lastIndexedAt !== undefined + ? { + lastIndexedAt: + lastIndexedAt === null ? null : new Date(lastIndexedAt * 1000), + } + : {}), + updatedAt: new Date(nowSeconds() * 1000), + }) + .where(eq(searchRepoRegistry.id, repoId)); +} + +async function runSyncJob({ + env, + job, + message, +}: { + env: Cloudflare.Env; + job: SearchJobRow; + message: SearchQueueMessage; +}) { + await setJobStatus({ + jobId: job.id, + status: "running", + attempt: job.attempt + 1, + }); + await setRepoStatus({ + repoId: job.repoId, + status: "syncing", + lastError: null, + }); + + const traceId = crypto.randomUUID(); + const repo = await getRepoById(job.repoId); + if (!repo) { + throw new Error("Repository not found for sync job"); + } + + const payload = await fetchSearchControl({ + env, + path: "/internal/repos/sync", + body: { + repo_id: repo.id, + provider: repo.provider, + owner: repo.owner, + name: repo.name, + default_branch: repo.defaultBranch, + trigger: message.trigger, + }, + traceId, + }); + + const syncedAt = nowSeconds(); + const headSha = + typeof payload.head_sha === "string" + ? payload.head_sha + : repo.lastSeenHeadSha; + + await setRepoStatus({ + repoId: repo.id, + status: repo.lastIndexedHeadSha === headSha ? "ready" : "not_indexed", + lastSeenHeadSha: headSha, + lastSyncedAt: syncedAt, + lastError: null, + }); + await setJobStatus({ jobId: job.id, status: "done", error: null }); + + if (repo.lastIndexedHeadSha !== headSha) { + await enqueueSearchJob({ + env, + repoId: repo.id, + jobType: "index", + priority: "normal", + trigger: "scheduled", + }); + } +} + +async function runIndexJob({ + env, + job, + message, +}: { + env: Cloudflare.Env; + job: SearchJobRow; + message: SearchQueueMessage; +}) { + await setJobStatus({ + jobId: job.id, + status: "running", + attempt: job.attempt + 1, + }); + await setRepoStatus({ + repoId: job.repoId, + status: "indexing", + lastError: null, + }); + + const traceId = crypto.randomUUID(); + const repo = await getRepoById(job.repoId); + if (!repo) { + throw new Error("Repository not found for index job"); + } + + const buildId = crypto.randomUUID(); + const startAt = nowSeconds(); + const db = getDb(); + const buildVersion = `${startAt}-${repo.id}`; + await db.insert(searchIndexBuilds).values({ + id: buildId, + buildVersion, + repoCount: 1, + startedAt: new Date(startAt * 1000), + finishedAt: null, + status: "running", + manifestR2Key: null, + }); + + const payload = await fetchSearchControl({ + env, + path: "/internal/index/build", + body: { + repo_id: repo.id, + owner: repo.owner, + name: repo.name, + default_branch: repo.defaultBranch, + head_sha: repo.lastSeenHeadSha, + trigger: message.trigger, + }, + traceId, + }); + + const finishedAt = nowSeconds(); + const indexedHeadSha = + typeof payload.head_sha === "string" + ? payload.head_sha + : repo.lastSeenHeadSha; + const manifestR2Key = + typeof payload.manifest_r2_key === "string" + ? payload.manifest_r2_key + : null; + + await db + .update(searchIndexBuilds) + .set({ + status: "done", + finishedAt: new Date(finishedAt * 1000), + manifestR2Key, + }) + .where(eq(searchIndexBuilds.id, buildId)); + + await setRepoStatus({ + repoId: repo.id, + status: "ready", + lastIndexedHeadSha: indexedHeadSha, + lastIndexedAt: finishedAt, + lastError: null, + }); + await setJobStatus({ jobId: job.id, status: "done", error: null }); +} + +async function markQueueFailure({ + error, + job, + message, +}: { + job: SearchJobRow; + message: Message; + error: unknown; +}) { + const errorMessage = + error instanceof Error ? error.message : "Unknown search queue failure"; + const attempt = Math.max(job.attempt + 1, message.attempts); + await setJobStatus({ + jobId: job.id, + status: "failed", + error: errorMessage, + attempt, + }); + await setRepoStatus({ + repoId: job.repoId, + status: "failed", + lastError: errorMessage, + }); +} + +async function processQueueMessage({ + env, + message, +}: { + env: Cloudflare.Env; + message: Message; +}) { + const body = message.body; + if (!body || typeof body !== "object") { + message.ack(); + return; + } + + const db = getDb(); + const job = await db + .select() + .from(searchJobs) + .where(eq(searchJobs.id, body.jobId)) + .get(); + if (!job) { + message.ack(); + return; + } + + try { + if (body.jobType === "sync") { + await runSyncJob({ env, job, message: body }); + } else if (body.jobType === "index") { + await runIndexJob({ env, job, message: body }); + } else { + throw new Error("Unsupported search queue job type"); + } + message.ack(); + } catch (error) { + await markQueueFailure({ error, job, message }); + if (message.attempts < MAX_QUEUE_RETRIES) { + message.retry({ + delaySeconds: Math.min(2 ** message.attempts * 15, 300), + }); + return; + } + message.ack(); + } +} + +async function scheduleRepoSyncJobs({ env }: { env: Cloudflare.Env }) { + const db = getDb(); + const currentTime = nowSeconds(); + const hotCutoff = new Date( + (currentTime - REPO_SYNC_CADENCE_SECONDS.hot) * 1000, + ); + const warmCutoff = new Date( + (currentTime - REPO_SYNC_CADENCE_SECONDS.warm) * 1000, + ); + const coldCutoff = new Date( + (currentTime - REPO_SYNC_CADENCE_SECONDS.cold) * 1000, + ); + + const dueRepos = await db + .select() + .from(searchRepoRegistry) + .where( + and( + eq(searchRepoRegistry.isEnabled, true), + or( + and( + eq(searchRepoRegistry.tier, "hot"), + or( + isNull(searchRepoRegistry.lastSyncedAt), + lte(searchRepoRegistry.lastSyncedAt, hotCutoff), + ), + ), + and( + eq(searchRepoRegistry.tier, "warm"), + or( + isNull(searchRepoRegistry.lastSyncedAt), + lte(searchRepoRegistry.lastSyncedAt, warmCutoff), + ), + ), + and( + eq(searchRepoRegistry.tier, "cold"), + or( + isNull(searchRepoRegistry.lastSyncedAt), + lte(searchRepoRegistry.lastSyncedAt, coldCutoff), + ), + ), + ), + ), + ) + .limit(200); + + for (const repo of dueRepos) { + await enqueueSearchJob({ + env, + repoId: repo.id, + jobType: "sync", + priority: "normal", + trigger: "scheduled", + }); + } +} + +async function cleanupSearchManifests(env: Cloudflare.Env) { + const bucket = env.SEARCH_INDEX_ARTIFACTS; + if (!bucket) { + return; + } + + const cutoff = Date.now() - MANIFEST_RETENTION_DAYS * 24 * 60 * 60 * 1000; + let cursor: string | undefined; + do { + const listed = await bucket.list({ + cursor, + limit: 500, + prefix: "search/manifests/", + }); + for (const object of listed.objects) { + if (object.uploaded.getTime() < cutoff) { + await bucket.delete(object.key); + } + } + cursor = listed.truncated ? listed.cursor : undefined; + } while (cursor); +} + +export async function maybeHandleSearchRequest({ + request, + env, +}: { + request: Request; + env: Cloudflare.Env; +}) { + const url = new URL(request.url); + if (!url.pathname.startsWith("/api/search")) { + return null; + } + + if (request.method === "GET" && url.pathname === "/api/search") { + return handleSearchGet({ request, env }); + } + if (request.method === "POST" && url.pathname === "/api/search/repos") { + return handleRepoOnboarding({ request, env }); + } + + const statusMatch = /^\/api\/search\/repos\/([^/]+)\/status$/.exec( + url.pathname, + ); + if (request.method === "GET" && statusMatch) { + return handleRepoStatus({ request, repoId: statusMatch[1] }); + } + + return json({ error: "Not found" }, 404); +} + +export async function handleSearchQueue({ + batch, + env, + ctx: _ctx, +}: { + batch: MessageBatch; + env: Cloudflare.Env; + ctx: ExecutionContext; +}) { + void _ctx; + for (const message of batch.messages) { + await processQueueMessage({ env, message }); + } +} + +export async function handleSearchScheduled({ + env, + ctx: _ctx, +}: { + env: Cloudflare.Env; + ctx: ExecutionContext; +}) { + void _ctx; + await scheduleRepoSyncJobs({ env }); + await cleanupSearchManifests(env); +} diff --git a/apps/dashboard/src/lib/search.functions.ts b/apps/dashboard/src/lib/search.functions.ts new file mode 100644 index 0000000..4211a08 --- /dev/null +++ b/apps/dashboard/src/lib/search.functions.ts @@ -0,0 +1,125 @@ +import { createServerFn } from "@tanstack/react-start"; +import { getRequest } from "@tanstack/react-start/server"; +import type { + SearchCodeInput, + SearchCodeResponse, + SearchOnboardRepoInput, + SearchOnboardRepoResponse, + SearchRepoStatusResponse, +} from "./search.types"; + +type SearchFetchErrorPayload = { + error?: string; + message?: string; + trace_id?: string; +}; + +function getRequestBaseUrl(request: Request) { + const url = new URL(request.url); + return `${url.protocol}//${url.host}`; +} + +async function parseSearchResponse(response: Response): Promise { + if (response.ok) { + return (await response.json()) as T; + } + + let payload: SearchFetchErrorPayload | null = null; + try { + payload = (await response.json()) as SearchFetchErrorPayload; + } catch { + payload = null; + } + const message = payload?.error || payload?.message || "Search request failed"; + throw new Error(message); +} + +function isLivegrepBaseUrlUnsetMessage(message: string) { + return message.includes("LIVEGREP_BASE_URL is not configured"); +} + +export const searchCode = createServerFn({ method: "GET" }) + .inputValidator(identityValidator) + .handler(async ({ data }): Promise => { + const request = getRequest(); + + const endpoint = new URL("/api/search", getRequestBaseUrl(request)); + endpoint.searchParams.set("q", data.q); + if (data.repo) endpoint.searchParams.set("repo", data.repo); + if (data.path) endpoint.searchParams.set("path", data.path); + if (data.lang) endpoint.searchParams.set("lang", data.lang); + if (data.page) endpoint.searchParams.set("page", data.page); + + const response = await fetch(endpoint, { + method: "GET", + headers: { + cookie: request.headers.get("cookie") ?? "", + Accept: "application/json", + }, + }); + + if (response.ok) { + return (await response.json()) as SearchCodeResponse; + } + + let payload: SearchFetchErrorPayload | null = null; + try { + payload = (await response.json()) as SearchFetchErrorPayload; + } catch { + payload = null; + } + const message = + payload?.error || payload?.message || "Search request failed"; + if (response.status >= 500 && isLivegrepBaseUrlUnsetMessage(message)) { + return { + results: [], + repo_status: {}, + partial: false, + trace_id: payload?.trace_id ?? "code-search-disabled", + code_search_disabled: true, + }; + } + + throw new Error(message); + }); + +export const onboardSearchRepo = createServerFn({ method: "POST" }) + .inputValidator(identityValidator) + .handler(async ({ data }): Promise => { + const request = getRequest(); + + const endpoint = new URL("/api/search/repos", getRequestBaseUrl(request)); + const response = await fetch(endpoint, { + method: "POST", + headers: { + cookie: request.headers.get("cookie") ?? "", + Accept: "application/json", + "Content-Type": "application/json", + }, + body: JSON.stringify(data), + }); + return parseSearchResponse(response); + }); + +export const getSearchRepoStatus = createServerFn({ method: "GET" }) + .inputValidator(identityValidator<{ repoId: string }>) + .handler(async ({ data }): Promise => { + const request = getRequest(); + + const endpoint = new URL( + `/api/search/repos/${encodeURIComponent(data.repoId)}/status`, + getRequestBaseUrl(request), + ); + const response = await fetch(endpoint, { + method: "GET", + headers: { + cookie: request.headers.get("cookie") ?? "", + Accept: "application/json", + }, + }); + return parseSearchResponse(response); + }); + +function identityValidator(data: TInput) { + return data; +} diff --git a/apps/dashboard/src/lib/search.types.ts b/apps/dashboard/src/lib/search.types.ts new file mode 100644 index 0000000..e2f4daf --- /dev/null +++ b/apps/dashboard/src/lib/search.types.ts @@ -0,0 +1,72 @@ +export type SearchRepoTier = "hot" | "warm" | "cold"; +export type SearchEtaBucket = "<10m" | "10-30m" | ">30m"; + +export type SearchRepoStatus = { + status: string; + default_branch?: string; + last_indexed_head_sha: string | null; + last_seen_head_sha: string | null; + last_synced_at: number | null; + last_indexed_at: number | null; + tier: SearchRepoTier; + eta_bucket?: SearchEtaBucket; +}; + +export type SearchCodeResultItem = { + repo: string; + path: string; + line_number: number; + line: string; + context_before: string[]; + context_after: string[]; +}; + +export type SearchCodeResponse = { + results: SearchCodeResultItem[]; + repo_status: Record; + partial: boolean; + trace_id: string; + code_search_disabled?: boolean; +}; + +export type SearchCodeInput = { + q: string; + repo?: string; + path?: string; + lang?: string; + page?: string; +}; + +export type SearchOnboardRepoInput = { + provider: "github"; + owner: string; + name: string; +}; + +export type SearchOnboardRepoResponse = { + repo: { + id: string; + provider: "github"; + owner: string; + name: string; + default_branch: string; + status: string; + tier: SearchRepoTier; + is_private: boolean; + } | null; + job_priority: "interactive" | "normal" | "backfill"; +}; + +export type SearchRepoStatusResponse = { + repo_id: string; + provider: "github"; + owner: string; + name: string; + status: string; + last_indexed_commit: string | null; + staleness_seconds: number | null; + latest_error: string | null; + last_synced_at: number | null; + last_indexed_at: number | null; + tier: SearchRepoTier; +}; diff --git a/apps/dashboard/wrangler.jsonc.example b/apps/dashboard/wrangler.jsonc.example index 5c703b6..2eeceaf 100644 --- a/apps/dashboard/wrangler.jsonc.example +++ b/apps/dashboard/wrangler.jsonc.example @@ -45,6 +45,33 @@ "migrations_dir": "drizzle" } ], + "queues": { + "producers": [ + { + "binding": "REPO_SYNC_QUEUE", + "queue": "diffkit-repo-sync" + }, + { + "binding": "INDEX_BUILD_QUEUE", + "queue": "diffkit-index-build" + } + ], + "consumers": [ + { + "queue": "diffkit-repo-sync", + "max_batch_size": 25, + "max_batch_timeout": 5 + }, + { + "queue": "diffkit-index-build", + "max_batch_size": 10, + "max_batch_timeout": 10 + } + ] + }, + "triggers": { + "crons": ["*/15 * * * *"] + }, "durable_objects": { "bindings": [ { @@ -75,6 +102,15 @@ "bucket_name": "YOUR_COMMENT_MEDIA_BUCKET_NAME", "preview_bucket_name": "YOUR_COMMENT_MEDIA_PREVIEW_BUCKET_NAME", "remote": true + }, + { + "binding": "SEARCH_INDEX_ARTIFACTS", + "bucket_name": "YOUR_SEARCH_INDEX_ARTIFACTS_BUCKET", + "preview_bucket_name": "YOUR_SEARCH_INDEX_ARTIFACTS_PREVIEW_BUCKET" } - ] + ], + "vars": { + "LIVEGREP_BASE_URL": "http://127.0.0.1:8910", + "SEARCH_CONTROL_BASE_URL": "http://127.0.0.1:8910" + } } diff --git a/apps/search/.dockerignore b/apps/search/.dockerignore new file mode 100644 index 0000000..e1fbe2b --- /dev/null +++ b/apps/search/.dockerignore @@ -0,0 +1,5 @@ +node_modules +dist +.env +.DS_Store +coverage diff --git a/apps/search/.env.example b/apps/search/.env.example new file mode 100644 index 0000000..63ccecd --- /dev/null +++ b/apps/search/.env.example @@ -0,0 +1,20 @@ +PORT=8910 +HOST=0.0.0.0 + +# Optional bearer token for Worker -> search service internal endpoints. +SEARCH_CONTROL_TOKEN= + +# Local storage paths +SEARCH_INDEX_DIR=/var/lib/diffkit-search +SEARCH_INDEX_PATH=/var/lib/diffkit-search/livegrep.idx +SEARCH_REPO_DIR=/var/lib/diffkit-search/repos +SEARCH_MANIFEST_PREFIX=search/manifests/ + +# Livegrep upstream (this service proxies /api/v1/search to this URL) +LIVEGREP_BASE_URL=http://127.0.0.1:8911 +LIVEGREP_SEARCH_PATH=/api/v1/search +LIVEGREP_API_TOKEN= + +# Policy +ALLOWED_REPO_PROVIDERS=github +MAX_REPO_SIZE_MB=10000 diff --git a/apps/search/Dockerfile b/apps/search/Dockerfile new file mode 100644 index 0000000..a86164d --- /dev/null +++ b/apps/search/Dockerfile @@ -0,0 +1,24 @@ +FROM node:22-bookworm-slim + +WORKDIR /app + +RUN apt-get update \ + && apt-get install -y --no-install-recommends git ca-certificates \ + && rm -rf /var/lib/apt/lists/* + +RUN corepack enable && corepack prepare pnpm@10.15.0 --activate + +# Copy only workspace manifests first so dependency install can be cached. +COPY package.json pnpm-lock.yaml pnpm-workspace.yaml /app/ +COPY patches /app/patches +COPY apps/search/package.json /app/apps/search/package.json + +# Install only dependencies required by the search service. +RUN pnpm install --frozen-lockfile --filter @diffkit/search + +# Copy search service sources after dependencies. +COPY apps/search /app/apps/search + +EXPOSE 8910 + +CMD ["pnpm", "--filter", "@diffkit/search", "start"] diff --git a/apps/search/README.md b/apps/search/README.md new file mode 100644 index 0000000..0535f7a --- /dev/null +++ b/apps/search/README.md @@ -0,0 +1,174 @@ +# @diffkit/search + +Self-hosted search control/data-plane service for DiffKit. + +This app is meant to run privately (VPS/container host) and is called by the Cloudflare Worker. + +Endpoints: + +- `GET /healthz` +- `GET /api/v1/search` +- `POST /internal/repos/sync` +- `POST /internal/index/build` + +## What runs where + +- `apps/dashboard` (Cloudflare Worker): public API, auth, orchestration. +- `apps/search` (your VPS): sync/index control and Livegrep query bridge. +- Livegrep (your VPS): actual code search backend/frontend. + +## Livegrep Docker images (official) + +Livegrep publishes images to GHCR: + +- `ghcr.io/livegrep/livegrep/indexer` +- `ghcr.io/livegrep/livegrep/base` + +The stack here uses those images directly. + +## VPS deployment (copy/paste) + +### 1) Prepare directories + +```bash +mkdir -p /opt/diffkit-search +cd /opt/diffkit-search +mkdir -p livegrep-data search-data +``` + +### 2) Build a Livegrep index (example: this repo) + +```bash +docker run --rm \ + -v "/opt/diffkit-search/livegrep-data:/data" \ + ghcr.io/livegrep/livegrep/indexer \ + /livegrep/bin/livegrep-github-reindex \ + -repo stylessh/diffkit \ + -http \ + -dir /data +``` + +This creates `/opt/diffkit-search/livegrep-data/livegrep.idx`. + +### 3) Start Livegrep backend + frontend + +```bash +docker network create diffkit-search || true + +docker run -d --rm \ + --name livegrep-backend \ + --network diffkit-search \ + -v "/opt/diffkit-search/livegrep-data:/data" \ + ghcr.io/livegrep/livegrep/base \ + /livegrep/bin/codesearch -load_index /data/livegrep.idx -grpc 0.0.0.0:9999 + +docker run -d --rm \ + --name livegrep-frontend \ + --network diffkit-search \ + -p 8911:8911 \ + ghcr.io/livegrep/livegrep/base \ + /livegrep/bin/livegrep -docroot /livegrep/web -listen 0.0.0.0:8911 --connect livegrep-backend:9999 +``` + +### 4) Build and run `apps/search` + +From repo root: + +```bash +docker build -f apps/search/Dockerfile -t diffkit-search:latest . +``` + +Run: + +```bash +docker run -d --rm \ + --name diffkit-search \ + --network diffkit-search \ + -p 8910:8910 \ + -e SEARCH_CONTROL_TOKEN=change-me \ + -e LIVEGREP_UPSTREAM_BASE_URL=http://livegrep-frontend:8911 \ + -e LIVEGREP_SEARCH_PATH=/api/v1/search \ + -e SEARCH_STORAGE_ROOT=/var/lib/diffkit-search \ + -v "/opt/diffkit-search/search-data:/var/lib/diffkit-search" \ + diffkit-search:latest +``` + +### 5) Point Worker to this service + +Set these Worker vars to your VPS private/public URL: + +- `LIVEGREP_BASE_URL=https://:8910` +- `SEARCH_CONTROL_BASE_URL=https://:8910` +- `SEARCH_CONTROL_TOKEN=change-me` + +## Local testing + +### Fast local stack + +You can use `apps/search/docker-compose.livegrep.yml`: + +1. Build initial index: +```bash +./apps/search/scripts/bootstrap-livegrep-index.sh stylessh/diffkit +``` +This creates `apps/search/.local/livegrep.idx` and `apps/search/.local/livegrep.json`. +2. Start services: +```bash +docker compose -f apps/search/docker-compose.livegrep.yml up --build -d +``` + +This starts: + +- `apps/search` at `http://localhost:8910` +- Livegrep frontend at `http://localhost:8911` + +### Smoke tests + +Health: + +```bash +curl -s http://localhost:8910/healthz | jq +``` + +Search via `apps/search`: + +```bash +curl -sG http://localhost:8910/api/v1/search \ + --data-urlencode "q=createServer" \ + --data-urlencode "repo=stylessh/diffkit" | jq +``` + +Internal sync (token required): + +```bash +curl -sX POST http://localhost:8910/internal/repos/sync \ + -H "Authorization: Bearer change-me" \ + -H "Content-Type: application/json" \ + -d '{ + "repo_id":"stylessh/diffkit", + "provider":"github", + "owner":"stylessh", + "name":"diffkit", + "default_branch":"main" + }' | jq +``` + +Internal index build: + +```bash +curl -sX POST http://localhost:8910/internal/index/build \ + -H "Authorization: Bearer change-me" \ + -H "Content-Type: application/json" \ + -d '{ + "repo_id":"stylessh/diffkit", + "owner":"stylessh", + "name":"diffkit", + "default_branch":"main" + }' | jq +``` + +## Notes + +- The index build in `src/index-build.ts` is intentionally an MVP placeholder. +- Replace placeholder steps with your production Livegrep index publish/swap flow. +- Keep this service private behind firewall / private networking / Cloudflare Access. diff --git a/apps/search/docker-compose.livegrep.yml b/apps/search/docker-compose.livegrep.yml new file mode 100644 index 0000000..b872d1b --- /dev/null +++ b/apps/search/docker-compose.livegrep.yml @@ -0,0 +1,65 @@ +version: "3.9" + +services: + livegrep-backend: + image: ghcr.io/livegrep/livegrep/base:latest + command: + [ + "/livegrep/bin/codesearch", + "-load_index", + "/data/livegrep.idx", + "-grpc", + "0.0.0.0:9999" + ] + volumes: + - ./.local:/data + networks: + - diffkit-search + restart: unless-stopped + + livegrep-frontend: + image: ghcr.io/livegrep/livegrep/base:latest + command: + [ + "/livegrep/bin/livegrep", + "-docroot", + "/livegrep/web", + "-listen", + "0.0.0.0:8910", + "--connect", + "livegrep-backend:9999" + ] + ports: + - "8911:8910" + depends_on: + - livegrep-backend + networks: + - diffkit-search + restart: unless-stopped + + search-service: + build: + context: ../.. + dockerfile: apps/search/Dockerfile + environment: + PORT: "8910" + HOST: "0.0.0.0" + SEARCH_CONTROL_TOKEN: "${SEARCH_CONTROL_TOKEN:?set SEARCH_CONTROL_TOKEN in your shell or .env}" + LIVEGREP_UPSTREAM_BASE_URL: "http://livegrep-frontend:8910" + LIVEGREP_SEARCH_PATH: "/api/v1/search" + SEARCH_STORAGE_ROOT: "/var/lib/diffkit-search" + ALLOWED_REPO_PROVIDERS: "github" + MAX_REPO_SIZE_MB: "10000" + ports: + - "8910:8910" + volumes: + - ./.local:/var/lib/diffkit-search + depends_on: + - livegrep-frontend + networks: + - diffkit-search + restart: unless-stopped + +networks: + diffkit-search: + driver: bridge diff --git a/apps/search/package.json b/apps/search/package.json new file mode 100644 index 0000000..82969ba --- /dev/null +++ b/apps/search/package.json @@ -0,0 +1,17 @@ +{ + "name": "@diffkit/search", + "private": true, + "type": "module", + "scripts": { + "dev": "tsx watch src/server.ts", + "start": "tsx src/server.ts", + "check-types": "tsc --noEmit", + "check": "biome check", + "format": "biome format" + }, + "devDependencies": { + "@types/node": "^22.18.10", + "tsx": "^4.21.0", + "typescript": "^5.9.3" + } +} diff --git a/apps/search/scripts/bootstrap-livegrep-index.sh b/apps/search/scripts/bootstrap-livegrep-index.sh new file mode 100755 index 0000000..c19a166 --- /dev/null +++ b/apps/search/scripts/bootstrap-livegrep-index.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash + +set -euo pipefail + +if [[ "${1-}" == "" ]]; then + echo "Usage: $0 " + echo "Example: $0 stylessh/diffkit" + exit 1 +fi + +REPO="$1" + +if [[ ! "$REPO" =~ ^[^/]+/[^/]+$ ]]; then + echo "Invalid repo '$REPO'. Expected format: owner/repo" + exit 1 +fi + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +APP_DIR="$(cd "$SCRIPT_DIR/.." && pwd)" +DATA_DIR="$APP_DIR/.local" + +mkdir -p "$DATA_DIR" + +echo "Bootstrapping livegrep index for $REPO" +echo "Output directory: $DATA_DIR" + +docker run --rm \ + -v "$DATA_DIR:/data" \ + ghcr.io/livegrep/livegrep/indexer:latest \ + /livegrep/bin/livegrep-github-reindex \ + -repo "$REPO" \ + -http \ + -dir /data + +echo +echo "Done. Expected files:" +echo " $DATA_DIR/livegrep.idx" +echo " $DATA_DIR/livegrep.json" diff --git a/apps/search/src/config.ts b/apps/search/src/config.ts new file mode 100644 index 0000000..d4f2e20 --- /dev/null +++ b/apps/search/src/config.ts @@ -0,0 +1,68 @@ +import path from "node:path"; + +const DEFAULT_PORT = 8910; +const DEFAULT_HOST = "0.0.0.0"; +const DEFAULT_ALLOWED_PROVIDERS = "github"; +const DEFAULT_LIVEGREP_UPSTREAM = "http://127.0.0.1:8911"; +const DEFAULT_STORAGE_ROOT = "./.search-data"; +const DEFAULT_LIVEGREP_SEARCH_PATH = "/api/v1/search"; +const DEFAULT_MAX_REPO_SIZE_MB = 10_000; + +function parseNumber(value: string | undefined, fallback: number): number { + if (!value) { + return fallback; + } + const parsed = Number(value); + if (!Number.isFinite(parsed) || parsed <= 0) { + return fallback; + } + return parsed; +} + +function parseAllowedProviders(raw: string | undefined): Set { + const source = raw ?? DEFAULT_ALLOWED_PROVIDERS; + return new Set( + source + .split(",") + .map((item) => item.trim().toLowerCase()) + .filter(Boolean) + ); +} + +function optionalString(value: string | undefined): string | null { + if (!value) { + return null; + } + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : null; +} + +export interface SearchServiceConfig { + allowedProviders: Set; + host: string; + livegrepApiToken: string | null; + livegrepSearchPath: string; + livegrepUpstreamBaseUrl: string; + maxRepoSizeMb: number; + port: number; + searchControlToken: string | null; + storageRoot: string; +} + +export function getConfig(): SearchServiceConfig { + const env = process.env; + const storageRoot = env.SEARCH_STORAGE_ROOT ?? DEFAULT_STORAGE_ROOT; + return { + port: parseNumber(env.PORT, DEFAULT_PORT), + host: env.HOST ?? DEFAULT_HOST, + livegrepUpstreamBaseUrl: + env.LIVEGREP_UPSTREAM_BASE_URL ?? DEFAULT_LIVEGREP_UPSTREAM, + livegrepSearchPath: + env.LIVEGREP_SEARCH_PATH ?? DEFAULT_LIVEGREP_SEARCH_PATH, + allowedProviders: parseAllowedProviders(env.ALLOWED_REPO_PROVIDERS), + livegrepApiToken: optionalString(env.LIVEGREP_API_TOKEN), + searchControlToken: optionalString(env.SEARCH_CONTROL_TOKEN), + storageRoot: path.resolve(storageRoot), + maxRepoSizeMb: parseNumber(env.MAX_REPO_SIZE_MB, DEFAULT_MAX_REPO_SIZE_MB), + }; +} diff --git a/apps/search/src/index-build.ts b/apps/search/src/index-build.ts new file mode 100644 index 0000000..a4a6cc7 --- /dev/null +++ b/apps/search/src/index-build.ts @@ -0,0 +1,103 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import { getConfig } from "./config.js"; +import type { + BuildIndexRequest, + BuildIndexResponse, + SearchRepoRuntimeState, +} from "./types.js"; + +const config = getConfig(); + +function repoCacheDir(owner: string, name: string) { + return path.join(config.storageRoot, "repos", owner, `${name}.git`); +} + +function buildOutputDir(owner: string, name: string) { + return path.join(config.storageRoot, "builds", owner, name); +} + +async function writePlaceholderIndexArtifacts({ + buildDir, + headSha, + owner, + repo, +}: { + buildDir: string; + owner: string; + repo: string; + headSha: string; +}) { + await fs.mkdir(buildDir, { recursive: true }); + const timestamp = new Date().toISOString(); + const indexPayload = { + generated_at: timestamp, + owner, + repo, + head_sha: headSha, + note: "MVP placeholder artifact; replace with real livegrep index pipeline.", + }; + await fs.writeFile( + path.join(buildDir, "index.json"), + JSON.stringify(indexPayload, null, 2), + "utf8" + ); +} + +export async function buildRepoIndex({ + body, + repoState, +}: { + body: BuildIndexRequest; + repoState: SearchRepoRuntimeState; +}): Promise { + const repoDir = repoCacheDir(body.owner, body.name); + const repoStats = await fs.stat(repoDir).catch(() => null); + if (!repoStats?.isDirectory()) { + throw new Error( + `Mirror not found at ${repoDir}. Sync ${body.owner}/${body.name} first.` + ); + } + + const selectedHead = body.head_sha || repoState.lastHeadSha; + if (!selectedHead) { + throw new Error("Missing head_sha and no synced head available."); + } + + const buildDir = buildOutputDir(body.owner, body.name); + await writePlaceholderIndexArtifacts({ + buildDir, + owner: body.owner, + repo: body.name, + headSha: selectedHead, + }); + + const manifestKey = [ + "search/manifests", + body.owner, + body.name, + `${Date.now()}-${selectedHead.slice(0, 12)}.json`, + ].join("/"); + const manifestPath = path.join(config.storageRoot, manifestKey); + await fs.mkdir(path.dirname(manifestPath), { recursive: true }); + await fs.writeFile( + manifestPath, + JSON.stringify( + { + repo: `${body.owner}/${body.name}`, + repo_id: body.repo_id, + head_sha: selectedHead, + generated_at: new Date().toISOString(), + build_dir: buildDir, + }, + null, + 2 + ), + "utf8" + ); + + return { + head_sha: selectedHead, + manifest_r2_key: manifestKey, + }; +} diff --git a/apps/search/src/livegrep-client.ts b/apps/search/src/livegrep-client.ts new file mode 100644 index 0000000..60e5a91 --- /dev/null +++ b/apps/search/src/livegrep-client.ts @@ -0,0 +1,131 @@ +import { getConfig } from "./config.js"; +import type { + LivegrepSearchItem, + LivegrepSearchRequest, + LivegrepSearchResponse, +} from "./types.js"; + +interface UpstreamSearchResponse { + data?: unknown; + partial?: unknown; + results?: unknown; +} + +function normalizeStringArray(value: unknown): string[] { + if (!Array.isArray(value)) { + return []; + } + return value.filter((entry): entry is string => typeof entry === "string"); +} + +function normalizeLineNumber(value: unknown): number | null { + if (typeof value === "number" && Number.isFinite(value) && value > 0) { + return value; + } + if (typeof value === "string") { + const parsed = Number.parseInt(value, 10); + if (Number.isFinite(parsed) && parsed > 0) { + return parsed; + } + } + return null; +} + +function asSearchResultItem(item: unknown): LivegrepSearchItem | null { + if (!item || typeof item !== "object") { + return null; + } + const row = item as Record; + let repo: string | null = null; + if (typeof row.repo === "string") { + repo = row.repo; + } else if (typeof row.tree === "string") { + repo = row.tree; + } + const path = typeof row.path === "string" ? row.path : null; + const lineNumber = normalizeLineNumber(row.line_number ?? row.lno); + if (!(repo && path) || lineNumber === null) { + return null; + } + + return { + repo, + path, + line_number: lineNumber, + line: typeof row.line === "string" ? row.line : "", + context_before: normalizeStringArray(row.context_before), + context_after: normalizeStringArray(row.context_after), + }; +} + +function toArray(value: unknown): unknown[] { + return Array.isArray(value) ? value : []; +} + +export class LivegrepClient { + readonly #baseUrl: URL; + readonly #token: string | null; + + constructor(params: { baseUrl: string; token: string | null }) { + this.#baseUrl = new URL(params.baseUrl); + this.#token = params.token; + } + + async search(params: LivegrepSearchRequest): Promise { + const endpoint = new URL(this.#baseUrl); + endpoint.searchParams.set("q", params.q); + if (params.repo) { + endpoint.searchParams.set("repo", params.repo); + } + if (params.path) { + endpoint.searchParams.set("path", params.path); + } + if (params.lang) { + endpoint.searchParams.set("lang", params.lang); + } + if (params.page) { + endpoint.searchParams.set("page", params.page); + } + + const response = await fetch(endpoint, { + method: "GET", + headers: { + Accept: "application/json", + ...(this.#token ? { Authorization: `Bearer ${this.#token}` } : {}), + }, + }); + + if (!response.ok) { + throw new Error(`Livegrep upstream returned ${response.status}`); + } + + const payload = (await response.json()) as UpstreamSearchResponse; + const source = toArray(payload.results).length + ? toArray(payload.results) + : toArray(payload.data); + + return { + results: source + .map(asSearchResultItem) + .filter((item): item is LivegrepSearchItem => Boolean(item)), + partial: Boolean(payload.partial), + }; + } +} + +const config = getConfig(); +const searchEndpoint = new URL( + config.livegrepSearchPath, + config.livegrepUpstreamBaseUrl +).toString(); + +const client = new LivegrepClient({ + baseUrl: searchEndpoint, + token: config.livegrepApiToken, +}); + +export function searchLivegrep( + params: LivegrepSearchRequest +): Promise { + return client.search(params); +} diff --git a/apps/search/src/repo-sync.ts b/apps/search/src/repo-sync.ts new file mode 100644 index 0000000..c644d4a --- /dev/null +++ b/apps/search/src/repo-sync.ts @@ -0,0 +1,63 @@ +import { mkdir, rm } from "node:fs/promises"; +import { runGit } from "./shell.js"; +import { getRepoPaths, getRepoState, upsertRepoState } from "./state.js"; + +export async function syncRepository(params: { + repoId: string; + owner: string; + name: string; + defaultBranch: string; + githubToken: string; +}) { + const { defaultBranch, githubToken, name, owner, repoId } = params; + const paths = getRepoPaths(repoId); + + await mkdir(paths.repoDir, { recursive: true }); + const remote = `https://x-access-token:${encodeURIComponent(githubToken)}@github.com/${owner}/${name}.git`; + + const existing = await getRepoState(repoId); + if (existing?.mirrorPath) { + await runGit([ + "-C", + paths.mirrorPath, + "remote", + "set-url", + "origin", + remote, + ]); + } else { + await rm(paths.mirrorPath, { force: true }); + await runGit([ + "clone", + "--mirror", + "--filter=blob:none", + remote, + paths.mirrorPath, + ]); + } + + await runGit(["-C", paths.mirrorPath, "fetch", "--prune", "origin"]); + const headSha = ( + await runGit([ + "-C", + paths.mirrorPath, + "rev-parse", + `refs/remotes/origin/${defaultBranch}`, + ]) + ).trim(); + + await upsertRepoState({ + repoId, + owner, + name, + defaultBranch, + headSha, + mirrorPath: paths.mirrorPath, + updatedAt: Date.now(), + }); + + return { + repo_id: repoId, + head_sha: headSha, + }; +} diff --git a/apps/search/src/search.ts b/apps/search/src/search.ts new file mode 100644 index 0000000..b86bac7 --- /dev/null +++ b/apps/search/src/search.ts @@ -0,0 +1,101 @@ +import { searchLivegrep } from "./livegrep-client.js"; +import { getRepoRuntimeStateByRef } from "./state.js"; +import type { + LivegrepSearchItem, + LivegrepSearchResponse, + SearchQueryParams, +} from "./types.js"; + +function normalizeRepoFilter(repo?: string) { + if (!repo) { + return undefined; + } + const trimmed = repo.trim(); + if (!trimmed) { + return undefined; + } + return trimmed; +} + +function normalizePathFilter(path?: string) { + if (!path) { + return undefined; + } + const trimmed = path.trim(); + if (!trimmed) { + return undefined; + } + return trimmed; +} + +function normalizeLangFilter(lang?: string) { + if (!lang) { + return undefined; + } + const trimmed = lang.trim(); + if (!trimmed) { + return undefined; + } + return trimmed; +} + +function normalizePage(page?: string) { + if (!page) { + return undefined; + } + const parsed = Number.parseInt(page, 10); + if (!Number.isFinite(parsed) || parsed < 1) { + return undefined; + } + return String(parsed); +} + +function isRepoAllowed(repoFilter: string) { + const [owner, name, ...rest] = repoFilter + .split("/") + .map((value) => value.trim()) + .filter(Boolean); + if (!(owner && name) || rest.length > 0) { + return false; + } + const repoRef = `${owner}/${name}`; + return getRepoRuntimeStateByRef(repoRef); +} + +async function remapResultRepoPath(result: LivegrepSearchItem) { + const mapped = await getRepoRuntimeStateByRef(result.repo); + if (!mapped) { + return result; + } + return { + ...result, + repo: mapped.repoRef, + }; +} + +export async function searchCode( + query: SearchQueryParams +): Promise { + const repo = normalizeRepoFilter(query.repo); + if (repo) { + const allowed = await isRepoAllowed(repo); + if (!allowed) { + return { + results: [], + partial: false, + }; + } + } + const response = await searchLivegrep({ + q: query.q, + repo, + path: normalizePathFilter(query.path), + lang: normalizeLangFilter(query.lang), + page: normalizePage(query.page), + }); + + return { + results: await Promise.all(response.results.map(remapResultRepoPath)), + partial: response.partial, + }; +} diff --git a/apps/search/src/server.ts b/apps/search/src/server.ts new file mode 100644 index 0000000..4987423 --- /dev/null +++ b/apps/search/src/server.ts @@ -0,0 +1,294 @@ +import { createServer } from "node:http"; +import { getConfig } from "./config.js"; +import { buildRepoIndex } from "./index-build.js"; +import { syncRepository } from "./repo-sync.js"; +import { searchCode } from "./search.js"; +import type { + BuildIndexRequest, + BuildIndexResponse, + LivegrepSearchResponse, + SearchQueryParams, + SearchRepoRuntimeState, +} from "./types.js"; + +const config = getConfig(); + +interface ResponsePayload { + body: string; + headers?: Record; + status: number; +} + +type RequestLike = AsyncIterable & { + headers: Record; + url?: string; + method?: string; +}; + +function toRequestLike( + request: import("node:http").IncomingMessage, + requestUrl: string +): RequestLike { + const iterable: AsyncIterable = { + async *[Symbol.asyncIterator]() { + for await (const chunk of request) { + yield chunk as Buffer | string; + } + }, + }; + + return { + ...iterable, + headers: request.headers, + url: `http://localhost${requestUrl}`, + method: request.method, + }; +} + +function jsonResponse(status: number, payload: unknown): ResponsePayload { + return { + status, + headers: { + "Content-Type": "application/json; charset=utf-8", + }, + body: JSON.stringify(payload), + }; +} + +async function parseJsonBody(request: RequestLike): Promise { + const chunks: Buffer[] = []; + for await (const chunk of request) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + } + if (chunks.length === 0) { + return null; + } + return JSON.parse(Buffer.concat(chunks).toString("utf8")); +} + +function readHeader( + headers: Record, + name: string +): string | undefined { + const raw = headers[name.toLowerCase()]; + if (Array.isArray(raw)) { + return raw[0]; + } + return raw; +} + +function isAuthorized(headers: Record) { + if (!config.searchControlToken) { + return true; + } + const authHeader = readHeader(headers, "authorization"); + return authHeader === `Bearer ${config.searchControlToken}`; +} + +function parsePath(url: string | undefined) { + if (!url) { + return "/"; + } + return new URL(url, "http://localhost").pathname; +} + +async function handleSearchRequest(request: RequestLike) { + const url = new URL(request.url ?? "/", "http://localhost"); + const q = url.searchParams.get("q") ?? ""; + if (!q.trim()) { + return jsonResponse(400, { error: "Missing required q query param" }); + } + + const query: SearchQueryParams = { + q, + repo: url.searchParams.get("repo") ?? undefined, + path: url.searchParams.get("path") ?? undefined, + lang: url.searchParams.get("lang") ?? undefined, + page: url.searchParams.get("page") ?? undefined, + }; + try { + const response: LivegrepSearchResponse = await searchCode(query); + return jsonResponse(200, response); + } catch (error) { + // Keep palette/search UX stable when upstream livegrep is down. + return jsonResponse(200, { + results: [], + partial: true, + upstream_error: + error instanceof Error + ? error.message + : "Livegrep upstream unavailable", + }); + } +} + +function parseString(value: unknown) { + return typeof value === "string" ? value : null; +} + +function parseDefaultBranch(value: unknown) { + return typeof value === "string" && value.trim() ? value : "main"; +} + +async function handleSyncRequest(request: RequestLike) { + const body = (await parseJsonBody(request)) as Record | null; + if (!body) { + return jsonResponse(400, { error: "Invalid JSON body" }); + } + + const repoId = parseString(body.repo_id); + const owner = parseString(body.owner); + const name = parseString(body.name); + const defaultBranch = parseDefaultBranch(body.default_branch); + + if (!(repoId && owner && name)) { + return jsonResponse(400, { error: "repo_id, owner, name are required" }); + } + const provider = parseString(body.provider) ?? "github"; + if (!config.allowedProviders.has(provider.toLowerCase())) { + return jsonResponse(403, { error: `Provider ${provider} is not allowed` }); + } + + const githubToken = + parseString(body.github_token) ?? process.env.GITHUB_TOKEN ?? ""; + if (!githubToken) { + return jsonResponse(500, { + error: "Missing github token. Set GITHUB_TOKEN or pass github_token.", + }); + } + + const result = await syncRepository({ + repoId, + owner, + name, + defaultBranch, + githubToken, + }); + return jsonResponse(200, result); +} + +async function handleBuildIndexRequest(request: RequestLike) { + const body = (await parseJsonBody(request)) as Record | null; + if (!body) { + return jsonResponse(400, { error: "Invalid JSON body" }); + } + + const repoId = parseString(body.repo_id); + const owner = parseString(body.owner); + const name = parseString(body.name); + const defaultBranch = parseDefaultBranch(body.default_branch); + const headSha = parseString(body.head_sha); + + if (!(repoId && owner && name)) { + return jsonResponse(400, { error: "repo_id, owner, name are required" }); + } + + const buildRequest: BuildIndexRequest = { + repo_id: repoId, + owner, + name, + default_branch: defaultBranch, + head_sha: headSha, + }; + const repoState: SearchRepoRuntimeState = { + repoId, + repoRef: `${owner}/${name}`, + owner, + name, + defaultBranch, + provider: "github", + lastHeadSha: headSha, + lastIndexedHeadSha: null, + lastSyncedAt: null, + lastIndexedAt: null, + mirrorPath: null, + }; + const result: BuildIndexResponse = await buildRepoIndex({ + body: buildRequest, + repoState, + }); + return jsonResponse(200, result); +} + +async function runWithServerError( + action: () => Promise, + fallbackMessage: string +): Promise { + try { + return await action(); + } catch (error) { + return jsonResponse(500, { + error: error instanceof Error ? error.message : fallbackMessage, + }); + } +} + +function handleInternalRequest( + request: RequestLike, + handler: (request: RequestLike) => Promise, + fallbackMessage: string +): Promise { + if (!isAuthorized(request.headers)) { + return Promise.resolve(jsonResponse(401, { error: "Unauthorized" })); + } + return runWithServerError(() => handler(request), fallbackMessage); +} + +function handleRequest(request: RequestLike): Promise { + const method = request.method ?? "GET"; + const path = parsePath(request.url); + + if (method === "GET" && path === "/healthz") { + return Promise.resolve( + jsonResponse(200, { + ok: true, + service: "diffkit-search", + mode: config.livegrepUpstreamBaseUrl ? "proxy-livegrep" : "stub", + }) + ); + } + + if (method === "GET" && path === "/api/v1/search") { + return runWithServerError( + () => handleSearchRequest(request), + "Search failed" + ); + } + + if (method === "POST" && path === "/internal/repos/sync") { + return handleInternalRequest( + request, + handleSyncRequest, + "Repo sync failed" + ); + } + + if (method === "POST" && path === "/internal/index/build") { + return handleInternalRequest( + request, + handleBuildIndexRequest, + "Index build failed" + ); + } + + return Promise.resolve(jsonResponse(404, { error: "Not found" })); +} + +const server = createServer(async (req, res) => { + const requestUrl = req.url ?? "/"; + const response = await handleRequest(toRequestLike(req, requestUrl)); + + res.statusCode = response.status; + for (const [headerName, headerValue] of Object.entries( + response.headers ?? {} + )) { + res.setHeader(headerName, headerValue); + } + res.end(response.body); +}); + +server.listen(config.port, config.host, () => { + console.log( + `[search] listening on http://${config.host}:${config.port} (mode=${config.livegrepUpstreamBaseUrl ? "proxy-livegrep" : "stub"})` + ); +}); diff --git a/apps/search/src/shell.ts b/apps/search/src/shell.ts new file mode 100644 index 0000000..cc2b734 --- /dev/null +++ b/apps/search/src/shell.ts @@ -0,0 +1,37 @@ +import { spawn } from "node:child_process"; + +export function runGit(args: string[]): Promise { + return new Promise((resolve, reject) => { + const process = spawn("git", args, { + stdio: ["ignore", "pipe", "pipe"], + }); + + let stdout = ""; + let stderr = ""; + + process.stdout.on("data", (chunk) => { + stdout += chunk.toString(); + }); + + process.stderr.on("data", (chunk) => { + stderr += chunk.toString(); + }); + + process.on("error", (error) => { + reject(error); + }); + + process.on("close", (code) => { + if (code === 0) { + resolve(stdout); + return; + } + + reject( + new Error( + `git ${args.join(" ")} failed with exit code ${code ?? "unknown"}: ${stderr.trim()}` + ) + ); + }); + }); +} diff --git a/apps/search/src/state.ts b/apps/search/src/state.ts new file mode 100644 index 0000000..291e95d --- /dev/null +++ b/apps/search/src/state.ts @@ -0,0 +1,354 @@ +import { mkdir, readFile, writeFile } from "node:fs/promises"; +import path from "node:path"; +import { getConfig } from "./config.js"; +import type { + RepoProvider, + RepoRecord, + SearchRepoRuntimeState, + SearchState, +} from "./types.js"; + +function createInitialState(): SearchState { + return { + reposById: {}, + repoIdByRef: {}, + lastBuildVersion: null, + lastBuildAt: null, + }; +} + +function defaultRepoRecord({ + repoId, + defaultBranch, + name, + owner, + provider, +}: { + repoId: string; + owner: string; + name: string; + defaultBranch: string; + provider: RepoProvider; +}): RepoRecord { + return { + repoId, + owner, + name, + provider, + defaultBranch, + mirrorPath: null, + lastSyncedAt: null, + lastSyncedHeadSha: null, + lastIndexedAt: null, + lastIndexedHeadSha: null, + lastManifestPath: null, + }; +} + +export class SearchStateStore { + private readonly dataDir: string; + private readonly stateFilePath: string; + + constructor(dataDir: string) { + this.dataDir = dataDir; + this.stateFilePath = path.join(dataDir, "search-state.json"); + } + + private async ensureDataDir() { + await mkdir(this.dataDir, { recursive: true }); + } + + async readState(): Promise { + await this.ensureDataDir(); + try { + const raw = await readFile(this.stateFilePath, "utf8"); + const parsed = JSON.parse(raw) as SearchState; + return { + reposById: parsed.reposById ?? {}, + repoIdByRef: parsed.repoIdByRef ?? {}, + lastBuildVersion: parsed.lastBuildVersion ?? null, + lastBuildAt: parsed.lastBuildAt ?? null, + }; + } catch (error) { + if ( + error && + typeof error === "object" && + "code" in error && + error.code === "ENOENT" + ) { + return createInitialState(); + } + throw error; + } + } + + private async writeState(state: SearchState) { + await this.ensureDataDir(); + await writeFile(this.stateFilePath, JSON.stringify(state, null, 2), "utf8"); + } + + async withState( + updater: (state: SearchState) => Promise | T + ): Promise { + const state = await this.readState(); + const result = await updater(state); + await this.writeState(state); + return result; + } + + upsertRepo(input: { + owner: string; + name: string; + defaultBranch: string; + provider: RepoProvider; + }): Promise { + return this.withState((state) => { + const repoKey = `${input.owner}/${input.name}`; + const repoId = state.repoIdByRef[repoKey] ?? repoKey; + const existing = state.reposById[repoId]; + const next: RepoRecord = existing + ? { + ...existing, + defaultBranch: input.defaultBranch, + provider: input.provider, + } + : { + ...defaultRepoRecord({ + repoId, + owner: input.owner, + name: input.name, + defaultBranch: input.defaultBranch, + provider: input.provider, + }), + mirrorPath: null, + }; + state.reposById[repoId] = next; + state.repoIdByRef[repoKey] = repoId; + return next; + }); + } + + updateRepo( + repoId: string, + updater: (repo: RepoRecord) => RepoRecord + ): Promise { + return this.withState((state) => { + const repo = state.reposById[repoId]; + if (!repo) { + throw new Error(`Repository ${repoId} not found in search state`); + } + const next = updater(repo); + state.reposById[repoId] = next; + state.repoIdByRef[repoRef(next.owner, next.name)] = repoId; + return next; + }); + } + + async setBuildVersion({ + buildVersion, + lastBuildAt, + }: { + buildVersion: string; + lastBuildAt: string; + }) { + await this.withState((state) => { + state.lastBuildVersion = buildVersion; + state.lastBuildAt = lastBuildAt; + }); + } +} + +const config = getConfig(); +const stateStore = new SearchStateStore(config.storageRoot); + +export function getRepoPaths(repoId: string) { + const repoDir = path.join(config.storageRoot, "repos", repoId); + return { + repoDir, + mirrorPath: path.join(repoDir, "mirror.git"), + }; +} + +export function repoRef(owner: string, name: string) { + return `${owner}/${name}`; +} + +export async function ensureRepoRuntimeState(input: { + repoId: string; + owner: string; + name: string; + defaultBranch: string; + provider: RepoProvider; +}): Promise { + const existing = await getRepoState(input.repoId); + if (existing) { + return existing; + } + const ref = repoRef(input.owner, input.name); + const existingByRef = await getRepoRuntimeStateByRef(ref); + if (existingByRef) { + return existingByRef; + } + + await stateStore.upsertRepo({ + owner: input.owner, + name: input.name, + defaultBranch: input.defaultBranch, + provider: input.provider, + }); + await stateStore.withState((state) => { + const existingRecordId = state.repoIdByRef[ref]; + const existingRecord = existingRecordId + ? state.reposById[existingRecordId] + : null; + state.reposById[input.repoId] = { + repoId: input.repoId, + provider: existingRecord?.provider ?? input.provider, + owner: input.owner, + name: input.name, + defaultBranch: existingRecord?.defaultBranch ?? input.defaultBranch, + mirrorPath: existingRecord?.mirrorPath ?? null, + lastSyncedHeadSha: existingRecord?.lastSyncedHeadSha ?? null, + lastIndexedHeadSha: existingRecord?.lastIndexedHeadSha ?? null, + lastSyncedAt: existingRecord?.lastSyncedAt ?? null, + lastIndexedAt: existingRecord?.lastIndexedAt ?? null, + lastManifestPath: existingRecord?.lastManifestPath ?? null, + }; + state.repoIdByRef[ref] = input.repoId; + }); + + const repo = await getRepoState(input.repoId); + if (!repo) { + throw new Error(`Failed to persist runtime state for repo ${input.repoId}`); + } + return { + repoId: input.repoId, + repoRef: repoRef(repo.owner, repo.name), + owner: repo.owner, + name: repo.name, + defaultBranch: repo.defaultBranch, + provider: repo.provider, + lastHeadSha: repo.lastHeadSha, + lastIndexedHeadSha: repo.lastIndexedHeadSha, + lastSyncedAt: repo.lastSyncedAt, + lastIndexedAt: repo.lastIndexedAt, + mirrorPath: repo.mirrorPath, + }; +} + +export async function getRepoRuntimeStateByRef( + ref: string +): Promise { + const state = await stateStore.readState(); + const repoId = state.repoIdByRef[ref]; + if (!repoId) { + return null; + } + const repo = state.reposById[repoId]; + if (!repo) { + return null; + } + return { + repoId, + repoRef: ref, + owner: repo.owner, + name: repo.name, + defaultBranch: repo.defaultBranch, + provider: repo.provider, + lastHeadSha: repo.lastSyncedHeadSha, + lastIndexedHeadSha: repo.lastIndexedHeadSha, + lastSyncedAt: repo.lastSyncedAt, + lastIndexedAt: repo.lastIndexedAt, + mirrorPath: repo.mirrorPath, + }; +} + +export async function getRepoState( + repoId: string +): Promise { + const state = await stateStore.readState(); + const repo = state.reposById[repoId]; + if (!repo) { + return null; + } + return { + repoId, + repoRef: repoRef(repo.owner, repo.name), + owner: repo.owner, + name: repo.name, + defaultBranch: repo.defaultBranch, + provider: repo.provider, + lastHeadSha: repo.lastSyncedHeadSha, + lastIndexedHeadSha: repo.lastIndexedHeadSha, + lastSyncedAt: repo.lastSyncedAt, + lastIndexedAt: repo.lastIndexedAt, + mirrorPath: repo.mirrorPath, + }; +} + +export async function upsertRepoState(params: { + repoId: string; + owner: string; + name: string; + defaultBranch: string; + headSha: string; + mirrorPath: string; + updatedAt: number; +}): Promise { + await stateStore.withState((state) => { + const ref = repoRef(params.owner, params.name); + const existing = state.reposById[params.repoId]; + state.reposById[params.repoId] = { + repoId: params.repoId, + provider: existing?.provider ?? "github", + owner: params.owner, + name: params.name, + defaultBranch: params.defaultBranch, + mirrorPath: params.mirrorPath, + lastSyncedHeadSha: params.headSha, + lastIndexedHeadSha: existing?.lastIndexedHeadSha ?? null, + lastSyncedAt: params.updatedAt, + lastIndexedAt: existing?.lastIndexedAt ?? null, + lastManifestPath: existing?.lastManifestPath ?? null, + }; + state.repoIdByRef[ref] = params.repoId; + }); +} + +export async function updateRepoSyncState(params: { + repoRef: string; + defaultBranch: string; + headSha: string; + mirrorPath: string; +}): Promise { + const state = await stateStore.readState(); + const repoId = state.repoIdByRef[params.repoRef]; + if (!repoId) { + throw new Error(`Repository ${params.repoRef} not found in search state`); + } + await stateStore.updateRepo(repoId, (repo) => ({ + ...repo, + defaultBranch: params.defaultBranch, + lastSyncedHeadSha: params.headSha, + mirrorPath: params.mirrorPath, + lastSyncedAt: Date.now(), + })); +} + +export async function updateRepoIndexState(params: { + repoRef: string; + headSha: string; + manifestPath: string; +}): Promise { + const state = await stateStore.readState(); + const repoId = state.repoIdByRef[params.repoRef]; + if (!repoId) { + throw new Error(`Repository ${params.repoRef} not found in search state`); + } + await stateStore.updateRepo(repoId, (repo) => ({ + ...repo, + lastIndexedHeadSha: params.headSha, + lastIndexedAt: Date.now(), + lastManifestPath: params.manifestPath, + })); +} diff --git a/apps/search/src/types.ts b/apps/search/src/types.ts new file mode 100644 index 0000000..1e4b9a6 --- /dev/null +++ b/apps/search/src/types.ts @@ -0,0 +1,93 @@ +export type RepoProvider = "github"; + +export interface SearchResult { + context_after: string[]; + context_before: string[]; + line: string; + line_number: number; + path: string; + repo: string; +} + +export interface LivegrepSearchItem extends SearchResult {} + +export interface LivegrepSearchRequest { + lang?: string; + page?: string; + path?: string; + q: string; + repo?: string; +} + +export interface LivegrepSearchResponse { + partial: boolean; + results: SearchResult[]; +} + +export interface SearchQueryParams extends LivegrepSearchRequest {} + +export interface RepoSyncRequest { + default_branch?: string; + github_token?: string; + name: string; + owner: string; + provider?: RepoProvider; + repo_id: string; + trigger?: string; +} + +export interface RepoSyncResponse { + head_sha: string; + repo_id: string; +} + +export interface BuildIndexRequest { + default_branch?: string; + head_sha?: string | null; + name: string; + owner: string; + repo_id: string; + trigger?: string; +} + +export interface BuildIndexResponse { + head_sha: string; + manifest_r2_key: string; +} + +export interface SearchRepoRuntimeState { + defaultBranch: string; + lastHeadSha: string | null; + lastIndexedAt: number | null; + lastIndexedHeadSha: string | null; + lastSyncedAt: number | null; + mirrorPath: string | null; + name: string; + owner: string; + provider: RepoProvider; + repoId: string; + repoRef: string; +} + +export interface RepoRecord { + defaultBranch: string; + lastIndexedAt: number | null; + lastIndexedHeadSha: string | null; + lastManifestPath: string | null; + lastSyncedAt: number | null; + lastSyncedHeadSha: string | null; + mirrorPath: string | null; + name: string; + owner: string; + provider: RepoProvider; + repoId: string; +} + +export interface SearchState { + lastBuildAt: string | null; + lastBuildVersion: string | null; + repoIdByRef: Record; + reposById: Record; +} + +export interface RepoRuntimeState extends SearchRepoRuntimeState {} diff --git a/apps/search/tsconfig.json b/apps/search/tsconfig.json new file mode 100644 index 0000000..a3664e5 --- /dev/null +++ b/apps/search/tsconfig.json @@ -0,0 +1,16 @@ +{ + "compilerOptions": { + "target": "ES2022", + "lib": ["ES2022"], + "module": "ESNext", + "moduleResolution": "bundler", + "esModuleInterop": true, + "strict": true, + "noEmit": true, + "skipLibCheck": true, + "types": ["node"], + "baseUrl": ".", + "allowImportingTsExtensions": true + }, + "include": ["src/**/*.ts"] +} diff --git a/package.json b/package.json index f4202a7..43679f5 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "scripts": { "predev": "node scripts/link-worktree-dev-vars.mjs", "build": "turbo run build", - "dev": "turbo run dev", + "dev": "turbo run dev --filter=@diffkit/dashboard", "lint": "turbo run lint", "check": "turbo run check", "check-types": "turbo run check-types", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d6cf06e..be3577c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -172,6 +172,18 @@ importers: specifier: ^4.81.1 version: 4.81.1(@cloudflare/workers-types@4.20260413.1) + apps/search: + devDependencies: + '@types/node': + specifier: ^22.18.10 + version: 22.19.17 + tsx: + specifier: ^4.21.0 + version: 4.21.0 + typescript: + specifier: ^5.9.3 + version: 5.9.3 + packages/icons: dependencies: '@hugeicons/react':