feat(publishmq): NATS JetStream source with per-account multi-tenancy#910
feat(publishmq): NATS JetStream source with per-account multi-tenancy#910michaeldoehler wants to merge 7 commits into
Conversation
Adds a publish-mq-only driver for NATS JetStream. Outpost reads events from one or more pre-provisioned JetStream consumers via a pull-based multiplexed subscription. Key design points: - Multi-account: one NATS Account per Outpost tenant. Each account gets its own connection and pull loop; messages are merged into a single Receive channel. - Account.TenantID overrides the tenant_id field on incoming payloads, so an Account can only ever produce events for its mapped tenant. - Stream and Consumer are operator-provisioned. Outpost only verifies existence on Init and fails loudly if either is missing. - Auth via credentials_file (.creds, Operator/JWT-resolver mode). - ConcurrentSubscription: pull concurrency is bounded by PullMaxMessages per account; upstream consumer skips its own semaphore. - Publish() is intentionally unimplemented; JetStream is read-only here.
Adds dynamic add/remove of NATS Accounts at runtime via a watched
directory. Layout under accounts_dir:
<account-name>/
user.creds NATS .creds (JWT + NKey seed)
meta.yaml stream/consumer/tenant_id metadata
The watcher debounces filesystem events (250ms) and triggers a
reconcile against the current connection set. Static accounts from
config.Accounts are preserved across reconciles; only dir-derived
accounts are added or removed.
Refactors NATSQueue internals to keep connections in a map keyed by
account name, with safe add/remove that also starts/stops the
per-account pump when a subscription is active.
Adds PublishNATSConfig + PublishNATSAccountConfig to the PublishMQ config, plus GetInfraType / GetQueueConfig branches that map them onto the mqs.NATSConfig the driver expects. Static account lists and the watched accounts_dir can be used independently or combined; the queue treats them as additive.
Adds a small NATS publisher to the local dev publish service, matching the existing rabbitmq/aws_sqs/gcp_pubsub helpers. Reads URL/subject/ stream/consumer/creds from environment with defaults matching the docker-compose example. declareNATS() creates a work-queue stream + durable consumer so a fresh local NATS server is usable in seconds.
Adds four integration tests covering the NATS driver: - TestIntegrationMQ_NATS: basic publish + receive + ack via JetStream - TestIntegrationMQ_NATS_TenantOverride: account.TenantID rewrites the payload's tenant_id field even when payload contains a value - TestIntegrationMQ_NATS_MultiAccount: two accounts consumed in parallel, each tagged with its own tenant_id - TestIntegrationMQ_NATS_AccountsDir: directory watcher picks up an account directory created after Init and starts consuming from it within a few seconds Supporting infrastructure: - internal/util/testinfra/nats.go: nats:2.10-alpine testcontainer with JetStream enabled - internal/util/testutil/nats.go: stream/consumer declare + teardown helpers plus a small publish helper for injecting test events - testinfra.Config.NATSURL + TEST_NATS_URL in .env.test Drive-by: relax NATSAccountConfig validation so credentials_file is optional (no-auth and token-via-URL deployments are legitimate); the accounts-dir loader only defaults to user.creds when the file actually exists in the account directory.
- docs/content/publishing/publish-from-nats.mdoc: new guide covering message structure, prerequisites, configuration (env + yaml), the accounts-dir layout, and the multi-tenancy / NATS-account pattern. - .env.example: PUBLISH_NATS_SERVERS / PUBLISH_NATS_ACCOUNTS_DIR. - .outpost.yaml.example: full publishmq.nats block under publishmq. - contributing/mq.md: tick NATS in the supported-MQ list and add a section describing scope, configuration, infra ownership, and retry/visibility behavior. - examples/docker-compose/compose-publish-nats.yml + helper script: single-node JetStream container for local development, paired with the existing publish dev service (method=nats).
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds NATS JetStream as a publish-mq source, including runtime account discovery via a watched accounts directory, integration test coverage, and developer tooling/docs for local setup.
Changes:
- Implement NATS JetStream queue driver with multi-account support and optional
accounts_dirhot-add/remove. - Add test infrastructure utilities (testcontainers + JetStream provisioning helpers) and integration tests for NATS behaviors.
- Extend publishmq config, docs, and examples to support NATS JetStream.
Reviewed changes
Copilot reviewed 19 out of 20 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/util/testutil/nats.go | Test helper to provision/teardown JetStream streams/consumers and publish test messages. |
| internal/util/testinfra/testinfra.go | Adds TEST_NATS_URL support to test infra config. |
| internal/util/testinfra/nats.go | Testcontainers-based NATS JetStream bring-up + per-test queue config generator. |
| internal/mqs/queue_nats_test.go | Integration tests for NATS driver: basic consume/ack, tenant overrides, multi-account, accounts_dir watcher. |
| internal/mqs/queue_nats.go | New NATS JetStream queue driver implementation with per-account connections and multiplexed subscription. |
| internal/mqs/queue.go | Wires NATS into NewQueue selection logic. |
| internal/mqs/nats_accounts.go | Accounts directory loader + fsnotify watcher with debounce. |
| internal/config/publishmq.go | Adds publishmq NATS config schema and mapping to mqs.QueueConfig. |
| go.mod | Adds direct dependencies for fsnotify and nats.go. |
| go.sum | Adds checksums for new NATS dependencies. |
| examples/docker-compose/start-nats-publish.sh | Convenience script to start a local JetStream container for publishing examples. |
| examples/docker-compose/compose-publish-nats.yml | Docker Compose service definition for local NATS JetStream. |
| docs/content/publishing/publish-from-nats.mdoc | End-user documentation for configuring publish-from-NATS JetStream. |
| contributing/mq.md | Documents NATS JetStream support in contributor MQ guide. |
| cmd/publish/publish_nats.go | Adds dev publish/declare helper for NATS JetStream. |
| cmd/publish/publish_handler.go | Routes method=nats publish requests to NATS implementation. |
| cmd/publish/declare_handler.go | Routes method=nats declare requests to NATS implementation. |
| .outpost.yaml.example | Adds example publishmq NATS config block. |
| .env.test | Adds TEST_NATS_URL placeholder. |
| .env.example | Adds example publishmq NATS env vars. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| jmsg, err := iter.Next() | ||
| if err != nil { | ||
| if errors.Is(err, jetstream.ErrMsgIteratorClosed) { | ||
| return | ||
| } | ||
| continue | ||
| } |
| cfg.NATSURL = endpoint | ||
| cfg.cleanupFns = append(cfg.cleanupFns, func() { | ||
| if err := container.Terminate(ctx); err != nil { | ||
| log.Printf("failed to terminate nats container: %s", err) | ||
| } | ||
| }) |
| accountDir := filepath.Join(dir, e.Name()) | ||
| metaPath := filepath.Join(accountDir, "meta.yaml") | ||
| if _, err := os.Stat(metaPath); err != nil { | ||
| continue |
| defer w.w.Close() | ||
|
|
||
| var ( | ||
| timer *time.Timer | ||
| timerC <-chan time.Time | ||
| armTimer = func() { | ||
| if timer != nil { | ||
| timer.Stop() | ||
| } | ||
| timer = time.NewTimer(debounceWindow) |
| if err := js.DeleteStream(ctx, acc.Stream); err != nil { | ||
| // Best-effort teardown; ignore "not found" so re-runs don't fail. | ||
| if !strings.Contains(err.Error(), "stream not found") { | ||
| return err | ||
| } | ||
| } |
| mockServerURL = "http://" + mockServerURL | ||
| } | ||
| natsURL := v.GetString("TEST_NATS_URL") | ||
| if natsURL != "" && !strings.Contains(natsURL, "nats://") { |
| if _, alreadyHave := current[acc.Name]; alreadyHave { | ||
| continue | ||
| } | ||
| _ = q.addAccount(context.Background(), acc) |
- queue_nats.go pump: add 250ms backoff between non-fatal iter.Next() errors so the loop doesn't busy-spin during transient consumer unavailability (e.g. leadership change, connection blip). - queue_nats.go reconcileFromDir: log addAccount failures instead of silently dropping them, so operators can spot bad creds / missing streams / unreachable servers when a tenant directory lands. - nats_accounts.go loadAccountsFromDir: surface non-ENOENT os.Stat errors (permission, transient IO) instead of treating them as 'subdirectory has no meta.yaml'. - nats_accounts.go watcher: switch to a single reusable timer with Reset/Stop and proper channel drain, eliminating per-event timer allocations and the Stop-without-drain edge case. - testutil/nats.go: replace strings.Contains substring matching on 'stream not found' with errors.Is(err, jetstream.ErrStreamNotFound). - testinfra/nats.go EnsureNATS: move the cfg.NATSURL check inside sync.Once.Do to close a data race between concurrent t.Parallel() callers and the container-start write path. Verified with -race. - testinfra/testinfra.go: use strings.HasPrefix (nats:// and tls://) instead of strings.Contains for the scheme normalization.
|
Thanks for the review. Pushed
On the remaining point:
|
|
Hi @michaeldoehler, thanks for opening the PR, this was quite interesting. My main concern is the complexity around multi-tenancy via multi-account NATS setup. Can you elaborate more on this design decision? Is this a current use case within your system? From Outpost POV, we see the publisher to Outpost as 1 entity (your service), which would then result in 1 publishmq only. I'd love to understand the use case where per-account publish queues are necessary for each tenant? |
|
Hi @alexluong, that's a fair question and your framing is right for the common case. Let me sketch what we're working with, because I think once the picture's on the table the rest makes sense. We're building a B2B platform where each customer has their own workspace, end to end. Data in one place, identity in another, billing in a third, kept apart at the boundary in a way the systems themselves can enforce, not by convention. When a customer onboards, that boundary extends down to messaging too: each customer gets their own NATS account with their own credentials. So from Outpost's perspective it really is "one publisher, one publishmq" per customer. The wrinkle is that there are many customers and the natural thing for Outpost to do is consume across all of them. That's where the per-account connections and the override come from. They aren't adding complexity for its own sake, they're how we let Outpost speak to the boundary that already exists. The reason we lean on NATS accounts rather than a shared account with We also use account-level resource limits (bytes/sec, max connections, JetStream quota) to back our plan tiers. Subject conventions plus external counters can fake some of that, but it leaks too easily to be reliable. Reading the PR top to bottom, the multi-conn and watcher pieces all come back to that single shape: the tenant boundary is already a NATS account, Outpost needs to consume across all of them, and our provisioning flow doesn't wait for an Outpost restart, it drops a creds file plus meta.yaml into a directory and expects everything downstream to keep up. I tried to keep that from leaking into the simple case. With one account in config you get the RabbitMQ shape exactly: one connection, no watcher, no override, no fan-in. Anything new sits behind the NATS provider branch, the other publish-mq paths and the infra-mq adapters aren't touched. The maintenance concern is fair though. Multi-conn lifecycle, the watcher, the override logic, that's genuine surface area to carry. If a smaller first step would land more comfortably, I can break this into two PRs: a single-account driver matching the RabbitMQ shape first, then the multi-account and directory bits as a follow-up that gets evaluated on its own. The commits already roughly map to that split. If neither of those sounds right, point me at what feels heaviest from your side and I'll see what can be simplified without losing the property we need. |
Adds NATS JetStream as a publish-mq source for Outpost. Outpost reads
events from one or more pre-provisioned JetStream consumers; streams
and consumers are operator-owned. The driver is built for the common
SaaS pattern of one NATS Account per Outpost tenant.
Why
NATS JetStream is a popular event bus in self-hosted Kubernetes / on-prem
setups, and a natural source for Outpost's webhook-fanout role. The driver
slots into the existing PublishMQ interface alongside RabbitMQ, AWS SQS,
GCP Pub/Sub and Azure Service Bus.
Scope is intentionally limited to publish-mq only — Outpost does not run
its internal delivery/log queues on NATS. That keeps the surface area
small and avoids dragging JetStream into Outpost's auto-provisioning
contract.
Design highlights
+ "PublishNATSAccountConfig" +holds its own credentialsfile and is dialled on its own NATS connection. Pull loops run in
parallel and feed into a single
+ "Subscription.Receive" +channel.+ "tenant_id" +(recommended),Outpost overrides the payload's
+ "tenant_id" +so a publisher with credsfor Account A can only produce events for the mapped tenant.
+ "accounts_dir" +is watched via+ "fsnotify" +with a 250ms debounce. New tenant subdirectories trigger anew connection without restarting Outpost; removed dirs drain the
connection. Provisioning flow: mint NATS account JWT → push to resolver
→ drop
+ "meta.yaml" +++ ".creds" +into+ "accounts_dir" +→ Outpost picks it up.+ "credentials_file" +(.creds) is the primary mode. It isoptional — empty means no-auth (valid for trusted-network setups or
token-via-URL).
before Outpost starts.
+ "Init" +verifies both and fails loudly on anymissing piece.
+ "AckWait" +/+ "MaxDeliver" +are configured on the consumer.+ "Publish()" +intentionally returns an error;JetStream is read-only from Outpost's side.
Configuration
+ "```yaml" +publishmq:
nats:
servers:
- nats://nats:4222
accounts_dir: /etc/outpost/nats-accounts
accounts:
- name: acme
credentials_file: /etc/outpost/acme.creds
stream: events
consumer: outpost
tenant_id: acme
+ "```" +Env vars:
+ "PUBLISH_NATS_SERVERS" +(comma-separated cluster URLs),+ "PUBLISH_NATS_ACCOUNTS_DIR" +.Accounts-dir layout:
+ "```" +/etc/outpost/nats-accounts/
├── acme/
│ ├── user.creds # optional NATS .creds (JWT + NKey seed)
│ └── meta.yaml # name/stream/consumer/tenant_id
└── globex/
├── user.creds
└── meta.yaml
+ "```" +Full guide added at
+ "docs/content/publishing/publish-from-nats.mdoc" +.Commit walk-through
+ "feat(mqs): add NATS JetStream queue driver" +— driver skeleton,+ "NATSConfig" +/+ "NATSAccountConfig" +, pull-consumer subscription, tenantoverride, wired into
+ "mqs.QueueConfig" +and+ "NewQueue" +.+ "feat(mqs): NATS accounts directory watcher" +— refactors connectionstorage to a name-keyed map and adds the
+ "fsnotify" +-based watcherwith dynamic add/remove.
+ "feat(config): wire PublishNATSConfig into publishmq" +— configsurface area plus
+ "GetInfraType" +/+ "GetQueueConfig" +branches.+ "feat(cmd/publish): NATS JetStream dev publish helper" +— adds+ "method=nats" +to the existing publish dev service.+ "test(mqs): NATS JetStream integration tests + testinfra" +— fourintegration tests (basic, tenant-override, multi-account, watcher),
+ "nats:2.10-alpine" +testcontainer, declare/teardown helpers.+ "docs: publish-from-nats guide + env/yaml examples + compose" +—user-facing guide,
+ ".env.example" +,+ ".outpost.yaml.example" +,+ "contributing/mq.md" +update, and a docker-compose example with+ "nats:2.10-alpine" ++ helper script.Verification
+ "go build ./..." +++ "go vet ./..." +clean.+ "go test -run TestIntegrationMQ_NATS -count=1 ./internal/mqs/..." +passes locally (testcontainer
+ "nats:2.10-alpine -js" +):+ "TestIntegrationMQ_NATS" +— basic publish/receive/ack+ "TestIntegrationMQ_NATS_TenantOverride" +— payload tenant_id isoverridden by account config
+ "TestIntegrationMQ_NATS_MultiAccount" +— two accounts consumed inparallel, each tagged with its own tenant
+ "TestIntegrationMQ_NATS_AccountsDir" +— watcher picks up anaccount dir created after Init
Dependencies
Promotes
+ "github.com/nats-io/nats.go" +from indirect to direct (alreadypresent in
+ "go.sum" +via transitive deps; no version bump). Promotes+ "github.com/fsnotify/fsnotify" +from indirect to direct.Out of scope
separate decision around JetStream's storage/replication model.
+ "contributing/mq.md" +).Happy to split commits differently or rework any part — the staging is
designed to keep each commit independently reviewable.