Skip to content

E4: Metrics recorder seam for outbox subscribers #23

@lesnik512

Description

@lesnik512

Problem

Today the only observability the outbox subscriber emits is structured logs. _flush_terminal / _flush_retry already write extra={"event": "lease_lost", ...} (M7 in plan.md), and log-aggregator pipelines can count those records by field. But that only covers one event, only at WARNING, and only via log volume — counters like fetched, dispatched, acked, nacked_retried, nacked_terminal are still invisible. Ops teams that want a Prometheus/StatsD dashboard for the outbox have nowhere to hook.

plan2.md E4 named this MetricsMiddleware, but the naming was misleading: most of the events we want to count (fetched, lease_lost) happen in _fetch_loop / _flush_*, which do not flow through FastStream's middleware bus. A user-supplied MetricsMiddleware registered via broker_middlewares would never see them.

Proposed design

A single callable seam — a MetricsRecorder — threaded from the broker constructor through the broker config down to every instrumentation site in the subscriber. No middleware bus, no class hierarchy; just one function the user provides that bridges to whatever metrics backend they use.

# faststream_outbox/metrics.py (new)

from collections.abc import Callable, Mapping
import typing

MetricsRecorder = Callable[[str, Mapping[str, typing.Any]], None]
"""
A callable the broker invokes at well-defined instrumentation points.

Args:
    event: short stable identifier — one of "fetched", "dispatched", "acked",
        "nacked_retried", "nacked_terminal", "lease_lost", "dlq_written".
    tags: structured fields (queue, subscriber name, count, phase, etc.). Always
        includes "queue" and "subscriber". Other fields depend on the event.

The recorder is called from the event loop and must not block. Wrap any
synchronous statsd/prom client call so it can't raise.
"""

def _noop_recorder(_event: str, _tags: Mapping[str, typing.Any]) -> None:
    """Default — no-op so the instrumentation sites can call unconditionally."""

Wiring

# broker.py — constructor adds one new kwarg
class OutboxBroker(...):
    def __init__(
        self,
        engine: "AsyncEngine | None" = None,
        *,
        outbox_table: "Table",
        ...,
        metrics_recorder: "MetricsRecorder | None" = None,
        ...
    ) -> None:
        ...
        broker_config = OutboxBrokerConfig(
            ...,
            metrics_recorder=metrics_recorder or _noop_recorder,
        )
# configs.py — config holds the recorder
@dataclass(kw_only=True)
class OutboxBrokerConfig(BrokerConfig):
    engine: "AsyncEngine | None" = None
    client: "OutboxClient | None" = None
    metrics_recorder: "MetricsRecorder" = field(default=_noop_recorder)

Instrumentation points

All sites live in subscriber/usecase.py. Tags always include {"queue": ..., "subscriber": self.calls_options[0].call_name} (or similar); per-event extras shown below.

Event Site Per-event tags
fetched _fetch_inner after rows = await self._client.fetch(...) returns {"count": len(rows)} — emit once per batch, even when 0 (so dashboards see idle pulses)
dispatched dispatch_one immediately before await self.consume(row) {"deliveries_count": row.deliveries_count}
acked dispatch_one, after consume succeeds and row.to_delete is True (terminal success path) {"deliveries_count": row.deliveries_count}
nacked_retried dispatch_one, after consume failure when row.pending_delay_seconds is not None (retry was scheduled) {"deliveries_count": ..., "next_delay_seconds": row.pending_delay_seconds}
nacked_terminal dispatch_one, when allow_delivery returns False or retry strategy returns terminal {"deliveries_count": ..., "reason": "max_deliveries" | "retry_terminal"}
lease_lost already-existing flush sites where deleted/updated is False {"phase": "terminal" | "retry", "deliveries_count": ...} — emit in addition to the existing WARNING log so the structured field stays canonical
dlq_written (placeholder — wired only if/when E3 lands) TBD with E3

The recorder is invoked unconditionally — _noop_recorder is the default, so there's no if recorder is not None: guard cluttering call sites.

Why a callable, not a Protocol class

  • Simpler API surface — no abstract methods to subclass, no __init_subclass__ hooks to maintain.
  • StatsD / Prometheus / Datadog adapters are 5-line lambdas (lambda e, t: statsd.incr(f"outbox.{e}", tags=t)).
  • Matches the FastStream convention of accepting raw callables (CustomCallable, BrokerMiddleware) rather than custom Protocol types where possible.

Why not a FastStream BaseMiddleware

A BaseMiddleware would only see handler-level events (dispatched, acked, nacked_*) because that's all FastStream routes through the middleware bus. It wouldn't see fetched (no message yet) or lease_lost (post-handler flush failure). Splitting metrics across two seams — a middleware and a recorder — is the kind of API surface duplication that S3 was rejected for; one seam wins.

Non-obvious trap

The recorder is called from the event loop. If a user's recorder blocks (e.g., a synchronous Prometheus Counter.inc() is fine, but a requests.post(statsd_url) is not) it will stall the fetch loop or the worker. Document this in the docstring and surface it in the README. We do not try to wrap user-supplied recorders in asyncio.to_thread — that would silently swallow ordering guarantees and create a per-event task explosion. User's responsibility.

Edge cases handled

  • No recorder passed: _noop_recorder is the default; instrumentation sites all call unconditionally. Zero overhead beyond a tail-callable lookup.
  • Recorder raises: instrumentation sites wrap each call in try/except and log at DEBUG. A broken recorder must not poison the dispatch loop.
  • Test-broker path: TestOutboxBroker inherits the recorder field; sync dispatch_one emits the same events as the loop path. No special test-mode handling.
  • Multiple subscribers: each subscriber emits its own events with subscriber tag; recorders dedupe by tag combination.
  • run_loops=False: still emits dispatched / acked / nacked_* (publish → sync dispatch path goes through dispatch_one), does not emit fetched (no fetch loop ran). Acceptable — tests typically only need to assert handler-level counters.

Test plan

Unit (tests/test_unit.py)

  • Default no-op: construct broker without metrics_recorder, run a publish/consume cycle in test mode, assert no AttributeError / TypeError.
  • Recorder receives correct events: construct broker with a list-appending recorder, publish two rows, assert dispatched and acked fired with expected tags.
  • Recorder raise is swallowed: pass a recorder that raises RuntimeError; assert the consume cycle still completes and the row is deleted.
  • Lease-lost tag parity: trigger a lease_lost flush (mock delete_with_lease returning False), assert recorder got ("lease_lost", {..., "phase": "terminal"}) matching the existing log's extra dict.
  • Fetched batch count: drive _fetch_inner once with a 3-row return, assert recorder got ("fetched", {"count": 3, ...}).
  • Idle fetch emits count=0: drive _fetch_inner with empty return, assert recorder got ("fetched", {"count": 0, ...}).

Integration (tests/test_integration.py)

  • End-to-end recorder fires: publish 20 rows against real Postgres with max_workers=4, recorder appends events, assert ≥ 20 dispatched + 20 acked + some fetched after the drain.
  • Retry path emits nacked_retried: publish 1 row whose handler always raises, with ExponentialRetry(max_attempts=2); assert one nacked_retried then one nacked_terminal.

Files to change

File Change
faststream_outbox/metrics.py (new) MetricsRecorder type alias + _noop_recorder
faststream_outbox/broker.py accept metrics_recorder kwarg; thread into OutboxBrokerConfig
faststream_outbox/configs.py add metrics_recorder: MetricsRecorder field with _noop_recorder default
faststream_outbox/subscriber/usecase.py emit at the 5 sites listed above; wrap each call in try/except/log at DEBUG
faststream_outbox/__init__.py export MetricsRecorder
tests/test_unit.py unit cases listed above
tests/test_integration.py end-to-end + retry-path cases
README.md new "Metrics" section with statsd + prometheus snippets, plus the "must not block" warning
CLAUDE.md document the recorder seam in the architecture section

Relationship to existing work

  • M7 (lease_lost structured log) stays as-is. The recorder fires in addition to the WARNING log, not instead. Log-pipeline counting via the event field remains supported for users who don't want a metrics backend.
  • E3 (DLQ) adds a dlq_written event when it lands. No change needed here besides reserving the event name.
  • plan2.md E4 is this issue; the original MetricsMiddleware name is rejected per the "Why not a BaseMiddleware" rationale above.

Verification

just lint
just test tests/test_unit.py tests/test_fake.py  # fast, no Postgres
just test                                         # full suite including integration

Coverage budget: ≥ 99% on the new metrics.py module (the no-op recorder needs an explicit "called with nothing happens" test) and no regression on existing modules.

Carved out of plan2.md E4.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions