Problem
Today the only observability the outbox subscriber emits is structured logs. _flush_terminal / _flush_retry already write extra={"event": "lease_lost", ...} (M7 in plan.md), and log-aggregator pipelines can count those records by field. But that only covers one event, only at WARNING, and only via log volume — counters like fetched, dispatched, acked, nacked_retried, nacked_terminal are still invisible. Ops teams that want a Prometheus/StatsD dashboard for the outbox have nowhere to hook.
plan2.md E4 named this MetricsMiddleware, but the naming was misleading: most of the events we want to count (fetched, lease_lost) happen in _fetch_loop / _flush_*, which do not flow through FastStream's middleware bus. A user-supplied MetricsMiddleware registered via broker_middlewares would never see them.
Proposed design
A single callable seam — a MetricsRecorder — threaded from the broker constructor through the broker config down to every instrumentation site in the subscriber. No middleware bus, no class hierarchy; just one function the user provides that bridges to whatever metrics backend they use.
# faststream_outbox/metrics.py (new)
from collections.abc import Callable, Mapping
import typing
MetricsRecorder = Callable[[str, Mapping[str, typing.Any]], None]
"""
A callable the broker invokes at well-defined instrumentation points.
Args:
event: short stable identifier — one of "fetched", "dispatched", "acked",
"nacked_retried", "nacked_terminal", "lease_lost", "dlq_written".
tags: structured fields (queue, subscriber name, count, phase, etc.). Always
includes "queue" and "subscriber". Other fields depend on the event.
The recorder is called from the event loop and must not block. Wrap any
synchronous statsd/prom client call so it can't raise.
"""
def _noop_recorder(_event: str, _tags: Mapping[str, typing.Any]) -> None:
"""Default — no-op so the instrumentation sites can call unconditionally."""
Wiring
# broker.py — constructor adds one new kwarg
class OutboxBroker(...):
def __init__(
self,
engine: "AsyncEngine | None" = None,
*,
outbox_table: "Table",
...,
metrics_recorder: "MetricsRecorder | None" = None,
...
) -> None:
...
broker_config = OutboxBrokerConfig(
...,
metrics_recorder=metrics_recorder or _noop_recorder,
)
# configs.py — config holds the recorder
@dataclass(kw_only=True)
class OutboxBrokerConfig(BrokerConfig):
engine: "AsyncEngine | None" = None
client: "OutboxClient | None" = None
metrics_recorder: "MetricsRecorder" = field(default=_noop_recorder)
Instrumentation points
All sites live in subscriber/usecase.py. Tags always include {"queue": ..., "subscriber": self.calls_options[0].call_name} (or similar); per-event extras shown below.
| Event |
Site |
Per-event tags |
fetched |
_fetch_inner after rows = await self._client.fetch(...) returns |
{"count": len(rows)} — emit once per batch, even when 0 (so dashboards see idle pulses) |
dispatched |
dispatch_one immediately before await self.consume(row) |
{"deliveries_count": row.deliveries_count} |
acked |
dispatch_one, after consume succeeds and row.to_delete is True (terminal success path) |
{"deliveries_count": row.deliveries_count} |
nacked_retried |
dispatch_one, after consume failure when row.pending_delay_seconds is not None (retry was scheduled) |
{"deliveries_count": ..., "next_delay_seconds": row.pending_delay_seconds} |
nacked_terminal |
dispatch_one, when allow_delivery returns False or retry strategy returns terminal |
{"deliveries_count": ..., "reason": "max_deliveries" | "retry_terminal"} |
lease_lost |
already-existing flush sites where deleted/updated is False |
{"phase": "terminal" | "retry", "deliveries_count": ...} — emit in addition to the existing WARNING log so the structured field stays canonical |
dlq_written |
(placeholder — wired only if/when E3 lands) |
TBD with E3 |
The recorder is invoked unconditionally — _noop_recorder is the default, so there's no if recorder is not None: guard cluttering call sites.
Why a callable, not a Protocol class
- Simpler API surface — no abstract methods to subclass, no
__init_subclass__ hooks to maintain.
- StatsD / Prometheus / Datadog adapters are 5-line lambdas (
lambda e, t: statsd.incr(f"outbox.{e}", tags=t)).
- Matches the FastStream convention of accepting raw callables (
CustomCallable, BrokerMiddleware) rather than custom Protocol types where possible.
Why not a FastStream BaseMiddleware
A BaseMiddleware would only see handler-level events (dispatched, acked, nacked_*) because that's all FastStream routes through the middleware bus. It wouldn't see fetched (no message yet) or lease_lost (post-handler flush failure). Splitting metrics across two seams — a middleware and a recorder — is the kind of API surface duplication that S3 was rejected for; one seam wins.
Non-obvious trap
The recorder is called from the event loop. If a user's recorder blocks (e.g., a synchronous Prometheus Counter.inc() is fine, but a requests.post(statsd_url) is not) it will stall the fetch loop or the worker. Document this in the docstring and surface it in the README. We do not try to wrap user-supplied recorders in asyncio.to_thread — that would silently swallow ordering guarantees and create a per-event task explosion. User's responsibility.
Edge cases handled
- No recorder passed:
_noop_recorder is the default; instrumentation sites all call unconditionally. Zero overhead beyond a tail-callable lookup.
- Recorder raises: instrumentation sites wrap each call in
try/except and log at DEBUG. A broken recorder must not poison the dispatch loop.
- Test-broker path:
TestOutboxBroker inherits the recorder field; sync dispatch_one emits the same events as the loop path. No special test-mode handling.
- Multiple subscribers: each subscriber emits its own events with
subscriber tag; recorders dedupe by tag combination.
run_loops=False: still emits dispatched / acked / nacked_* (publish → sync dispatch path goes through dispatch_one), does not emit fetched (no fetch loop ran). Acceptable — tests typically only need to assert handler-level counters.
Test plan
Unit (tests/test_unit.py)
- Default no-op: construct broker without
metrics_recorder, run a publish/consume cycle in test mode, assert no AttributeError / TypeError.
- Recorder receives correct events: construct broker with a list-appending recorder, publish two rows, assert
dispatched and acked fired with expected tags.
- Recorder raise is swallowed: pass a recorder that raises
RuntimeError; assert the consume cycle still completes and the row is deleted.
- Lease-lost tag parity: trigger a
lease_lost flush (mock delete_with_lease returning False), assert recorder got ("lease_lost", {..., "phase": "terminal"}) matching the existing log's extra dict.
- Fetched batch count: drive
_fetch_inner once with a 3-row return, assert recorder got ("fetched", {"count": 3, ...}).
- Idle fetch emits count=0: drive
_fetch_inner with empty return, assert recorder got ("fetched", {"count": 0, ...}).
Integration (tests/test_integration.py)
- End-to-end recorder fires: publish 20 rows against real Postgres with
max_workers=4, recorder appends events, assert ≥ 20 dispatched + 20 acked + some fetched after the drain.
- Retry path emits
nacked_retried: publish 1 row whose handler always raises, with ExponentialRetry(max_attempts=2); assert one nacked_retried then one nacked_terminal.
Files to change
| File |
Change |
faststream_outbox/metrics.py (new) |
MetricsRecorder type alias + _noop_recorder |
faststream_outbox/broker.py |
accept metrics_recorder kwarg; thread into OutboxBrokerConfig |
faststream_outbox/configs.py |
add metrics_recorder: MetricsRecorder field with _noop_recorder default |
faststream_outbox/subscriber/usecase.py |
emit at the 5 sites listed above; wrap each call in try/except/log at DEBUG |
faststream_outbox/__init__.py |
export MetricsRecorder |
tests/test_unit.py |
unit cases listed above |
tests/test_integration.py |
end-to-end + retry-path cases |
README.md |
new "Metrics" section with statsd + prometheus snippets, plus the "must not block" warning |
CLAUDE.md |
document the recorder seam in the architecture section |
Relationship to existing work
- M7 (lease_lost structured log) stays as-is. The recorder fires in addition to the WARNING log, not instead. Log-pipeline counting via the
event field remains supported for users who don't want a metrics backend.
- E3 (DLQ) adds a
dlq_written event when it lands. No change needed here besides reserving the event name.
- plan2.md E4 is this issue; the original
MetricsMiddleware name is rejected per the "Why not a BaseMiddleware" rationale above.
Verification
just lint
just test tests/test_unit.py tests/test_fake.py # fast, no Postgres
just test # full suite including integration
Coverage budget: ≥ 99% on the new metrics.py module (the no-op recorder needs an explicit "called with nothing happens" test) and no regression on existing modules.
Carved out of plan2.md E4.
Problem
Today the only observability the outbox subscriber emits is structured logs.
_flush_terminal/_flush_retryalready writeextra={"event": "lease_lost", ...}(M7 inplan.md), and log-aggregator pipelines can count those records by field. But that only covers one event, only at WARNING, and only via log volume — counters likefetched,dispatched,acked,nacked_retried,nacked_terminalare still invisible. Ops teams that want a Prometheus/StatsD dashboard for the outbox have nowhere to hook.plan2.mdE4 named thisMetricsMiddleware, but the naming was misleading: most of the events we want to count (fetched,lease_lost) happen in_fetch_loop/_flush_*, which do not flow through FastStream's middleware bus. A user-suppliedMetricsMiddlewareregistered viabroker_middlewareswould never see them.Proposed design
A single callable seam — a MetricsRecorder — threaded from the broker constructor through the broker config down to every instrumentation site in the subscriber. No middleware bus, no class hierarchy; just one function the user provides that bridges to whatever metrics backend they use.
Wiring
Instrumentation points
All sites live in
subscriber/usecase.py. Tags always include{"queue": ..., "subscriber": self.calls_options[0].call_name}(or similar); per-event extras shown below.fetched_fetch_innerafterrows = await self._client.fetch(...)returns{"count": len(rows)}— emit once per batch, even when 0 (so dashboards see idle pulses)dispatcheddispatch_oneimmediately beforeawait self.consume(row){"deliveries_count": row.deliveries_count}ackeddispatch_one, after consume succeeds androw.to_deleteis True (terminal success path){"deliveries_count": row.deliveries_count}nacked_retrieddispatch_one, after consume failure whenrow.pending_delay_seconds is not None(retry was scheduled){"deliveries_count": ..., "next_delay_seconds": row.pending_delay_seconds}nacked_terminaldispatch_one, whenallow_deliveryreturns False or retry strategy returns terminal{"deliveries_count": ..., "reason": "max_deliveries" | "retry_terminal"}lease_lostdeleted/updatedis False{"phase": "terminal" | "retry", "deliveries_count": ...}— emit in addition to the existing WARNING log so the structured field stays canonicaldlq_writtenThe recorder is invoked unconditionally —
_noop_recorderis the default, so there's noif recorder is not None:guard cluttering call sites.Why a callable, not a Protocol class
__init_subclass__hooks to maintain.lambda e, t: statsd.incr(f"outbox.{e}", tags=t)).CustomCallable,BrokerMiddleware) rather than custom Protocol types where possible.Why not a FastStream
BaseMiddlewareA
BaseMiddlewarewould only see handler-level events (dispatched,acked,nacked_*) because that's all FastStream routes through the middleware bus. It wouldn't seefetched(no message yet) orlease_lost(post-handler flush failure). Splitting metrics across two seams — a middleware and a recorder — is the kind of API surface duplication that S3 was rejected for; one seam wins.Non-obvious trap
The recorder is called from the event loop. If a user's recorder blocks (e.g., a synchronous Prometheus
Counter.inc()is fine, but arequests.post(statsd_url)is not) it will stall the fetch loop or the worker. Document this in the docstring and surface it in the README. We do not try to wrap user-supplied recorders inasyncio.to_thread— that would silently swallow ordering guarantees and create a per-event task explosion. User's responsibility.Edge cases handled
_noop_recorderis the default; instrumentation sites all call unconditionally. Zero overhead beyond a tail-callable lookup.try/exceptand log at DEBUG. A broken recorder must not poison the dispatch loop.TestOutboxBrokerinherits the recorder field; syncdispatch_oneemits the same events as the loop path. No special test-mode handling.subscribertag; recorders dedupe by tag combination.run_loops=False: still emitsdispatched/acked/nacked_*(publish → sync dispatch path goes throughdispatch_one), does not emitfetched(no fetch loop ran). Acceptable — tests typically only need to assert handler-level counters.Test plan
Unit (
tests/test_unit.py)metrics_recorder, run a publish/consume cycle in test mode, assert no AttributeError / TypeError.dispatchedandackedfired with expected tags.RuntimeError; assert the consume cycle still completes and the row is deleted.lease_lostflush (mockdelete_with_leasereturningFalse), assert recorder got("lease_lost", {..., "phase": "terminal"})matching the existing log'sextradict._fetch_inneronce with a 3-row return, assert recorder got("fetched", {"count": 3, ...})._fetch_innerwith empty return, assert recorder got("fetched", {"count": 0, ...}).Integration (
tests/test_integration.py)max_workers=4, recorder appends events, assert ≥ 20dispatched+ 20acked+ somefetchedafter the drain.nacked_retried: publish 1 row whose handler always raises, withExponentialRetry(max_attempts=2); assert onenacked_retriedthen onenacked_terminal.Files to change
faststream_outbox/metrics.py(new)MetricsRecordertype alias +_noop_recorderfaststream_outbox/broker.pymetrics_recorderkwarg; thread intoOutboxBrokerConfigfaststream_outbox/configs.pymetrics_recorder: MetricsRecorderfield with_noop_recorderdefaultfaststream_outbox/subscriber/usecase.pyfaststream_outbox/__init__.pyMetricsRecordertests/test_unit.pytests/test_integration.pyREADME.mdCLAUDE.mdRelationship to existing work
eventfield remains supported for users who don't want a metrics backend.dlq_writtenevent when it lands. No change needed here besides reserving the event name.MetricsMiddlewarename is rejected per the "Why not a BaseMiddleware" rationale above.Verification
Coverage budget: ≥ 99% on the new
metrics.pymodule (the no-op recorder needs an explicit "called with nothing happens" test) and no regression on existing modules.Carved out of
plan2.mdE4.