Skip to content

E3: DLQ / archive on terminal failure (opt-in) #26

@lesnik512

Description

@lesnik512

Problem

Terminal failures DELETE silently — that's the documented spec (CLAUDE.md: "Terminal failures DELETE (no archive, no DLQ)"), backed by OutboxSubscriber._flush_terminal (faststream_outbox/subscriber/usecase.py:426-446). For audit-bearing workloads (finance, compliance, debugging), silently dropping poison messages is unacceptable. Today users have to wrap their handler with try/except + manual DLQ insert, which races the broker's DELETE.

Two distinct terminal-failure paths converge on _flush_terminal:

  1. OutboxInnerMessage.allow_delivery returns False — max_deliveries exceeded (message.py:111-122).
  2. Retry strategy returns None in _nack — terminal by retry policy (message.py:95-96, retry.py:46-52).

A third path also calls _flush_terminal: handler success via _ack (message.py:79-81). DLQ must fire for paths 1 and 2 only, not path 3.

Proposed design

Optional dlq_table: Table | None = None argument to OutboxBroker. On terminal failure, copy payload + headers + last-exception summary into dlq_table in the same transaction as the outbox DELETE. Provide make_dlq_table(metadata, table_name="outbox_dlq") analogous to make_outbox_table (faststream_outbox/schema.py:38-107) so users wire it into their Alembic the same way.

DLQ table factory

# faststream_outbox/schema.py (additions)

def make_dlq_table(metadata: "MetaData", table_name: str = "outbox_dlq") -> Table:
    table = Table(
        table_name,
        metadata,
        Column("id", BigInteger, primary_key=True, autoincrement=True),
        Column("original_id", BigInteger, nullable=False),
        Column("queue", String(255), nullable=False),
        Column("payload", LargeBinary, nullable=False),
        Column("headers", JSONB, nullable=True),
        Column("deliveries_count", BigInteger, nullable=False),
        Column("created_at", DateTime(timezone=True), nullable=False),
        Column("failed_at", DateTime(timezone=True), nullable=False, server_default=func.now()),
        Column("failure_reason", String(32), nullable=False),  # 'max_deliveries' | 'retry_terminal'
        Column("last_exception", String, nullable=True),
    )
    Index(f"{table_name}_queue_failed_idx", table.c.queue, table.c.failed_at)
    return table

No FK to the outbox table — once the row is in the DLQ, the outbox row is gone in the same transaction (the FK would be unsatisfiable). original_id is a plain BigInteger for operator forensics.

Atomic DELETE + INSERT via CTE

Extend OutboxClient.delete_with_lease (client.py:211-232) to accept an optional dlq_payload dict:

# faststream_outbox/client.py

async def delete_with_lease(
    self,
    conn: "AsyncConnection | None",
    message_id: int,
    acquired_token: uuid.UUID,
    *,
    dlq_payload: "Mapping[str, typing.Any] | None" = None,
) -> bool:
    """
    Delete *message_id* iff it still holds *acquired_token*. Returns True if deleted.

    When *dlq_payload* is provided, the same transaction also INSERTs into the
    configured DLQ table via a CTE:

        WITH deleted AS (
          DELETE FROM outbox WHERE id=:id AND acquired_token=:token RETURNING *
        )
        INSERT INTO outbox_dlq (original_id, queue, payload, headers, deliveries_count,
                                 created_at, failure_reason, last_exception)
        SELECT id, queue, payload, headers, deliveries_count, created_at,
               :failure_reason, :last_exception
        FROM deleted;

    If `deleted` is empty (lease lost), the INSERT inserts nothing — natural safety,
    no special case in Python. If the INSERT raises (DLQ schema mismatch, disk full),
    the whole transaction rolls back — the outbox row stays and retries (see "Atomicity"
    rationale below).
    """

dlq_payload carries {"failure_reason": "max_deliveries" | "retry_terminal", "last_exception": str | None}. When self._dlq_table is None or dlq_payload is None, the existing per-row DELETE shape is used unchanged.

Tracking terminal-failure-vs-success on the row

_flush_terminal is called for all three paths above; it currently can't tell why. Add a field to OutboxInnerMessage (message.py:32-62):

@dataclass(kw_only=True)
class OutboxInnerMessage:
    ...
    terminal_failure_reason: str | None = field(default=None, init=False)

Set it from the two failure paths:

_flush_terminal reads it to decide whether to build a dlq_payload:

async def _flush_terminal(self, row: OutboxInnerMessage, *, writer_conn: "AsyncConnection | None") -> None:
    if row.acquired_token is None:
        return
    dlq_payload = None
    if row.terminal_failure_reason is not None and self._outer_config.dlq_table is not None:
        dlq_payload = {
            "failure_reason": row.terminal_failure_reason,
            "last_exception": repr(row.last_exception) if row.last_exception is not None else None,
        }
    deleted = await self._client.delete_with_lease(
        writer_conn, row.id, row.acquired_token, dlq_payload=dlq_payload,
    )
    if not deleted:
        # existing event=lease_lost WARNING ...

Wiring

Site Change
OutboxBroker.__init__ (broker.py:133-182) New dlq_table: Table | None = None kwarg
OutboxBrokerConfig (configs.py:21-32) New dlq_table: Table | None field
OutboxClient.__init__ (client.py:122-125) Accept dlq_table: Table | None; store on self._dlq_table
FakeOutboxClient (testing.py) New _dlq_rows: list[_FakeDlqRow]; delete_with_lease accepts and applies dlq_payload
validate_schema (client.py:274-286) If dlq_table is configured, also validate it (same Alembic-diff path)

Non-obvious traps

Path 3 (handler success) goes through _flush_terminal. This caught the design twice in review — the obvious "wrap _flush_terminal with a DLQ insert" approach would archive every successful row. The terminal_failure_reason: str | None field on the message is what discriminates; it must be set on the failure paths and left None on _ack/_reject.

last_exception is a BaseException object. It lives on the row via _CaptureExceptionMiddleware.after_processed (broker.py:98-106). For DLQ storage we serialize with repr() — it's compact, type-aware, and matches what _log(..., exc_info=e) would render. Full tracebacks are not stored (string-formatting __traceback__ is expensive and DLQ rows are append-only artifacts, not live debugging); operators correlate by original_id + timestamp against application logs. Document this trade-off in the README.

Atomicity choice: roll back on DLQ INSERT failure. If the DLQ INSERT fails (schema mismatch, disk full, FK violation that shouldn't exist but might), the combined CTE transaction rolls back — the outbox row stays leased until the lease expires, then is reclaimed and retried. This surfaces DLQ misconfiguration loudly (rows pile up in the outbox, lease_lost rate spikes, drain wedges) instead of silently losing data. Best-effort DLQ ("commit DELETE even if INSERT fails") is the alternative; we reject it because the whole point of opt-in DLQ is "I care enough about these failures to want them durably recorded." Users who want best-effort can wrap their handler.

CTE INSERT requires the DELETE to RETURN all needed columns. The current delete_with_lease (client.py:228-232) doesn't RETURNING anything. The new form does RETURNING * (in the CTE form only when DLQ is configured) — small cost, only paid when DLQ is on.

make_dlq_table does not declare a NOTIFY channel. Unlike the outbox table, the DLQ has no LISTEN/NOTIFY: nobody's polling it. The 63-byte name validation (schema.py:33-57) is not needed and is dropped from the DLQ factory.

Edge cases handled

  • dlq_table=None (default): bit-for-bit identical to today. The terminal-delete code path takes the original branch; zero perf cost, no schema requirement.
  • Lease lost during DLQ write: deleted CTE returns empty rows → INSERT inserts nothing → DELETE is a no-op → rowcount == 0 triggers the existing event=lease_lost WARNING. No DLQ row created.
  • Handler success: terminal_failure_reason is Nonedlq_payload is None → original per-row DELETE path. No DLQ row created.
  • DLQ INSERT fails: CTE transaction rolls back. Outbox row stays leased until TTL, reclaimed, retried. Operator sees the failure surface as growing outbox + retry pressure.
  • max_deliveries=None: allow_delivery never sets terminal_failure_reason="max_deliveries"; the path is unreachable. Only retry_terminal is observed in that case.
  • NoRetry() strategy: first nack returns None → terminal_failure_reason="retry_terminal" → DLQ row with whatever exception fired.
  • DLQ table growth: no built-in TTL or reaper. Users own cleanup (Alembic, cron DELETE WHERE failed_at < now() - interval '90 days'). Document this.
  • Test broker: FakeOutboxClient._dlq_rows accumulates; expose a dlq_rows property for assertions.

Test plan

Unit (tests/test_unit.py)

  • test_terminal_failure_reason_set_on_max_deliveriesallow_delivery with deliveries_count > max_deliveries; assert row.terminal_failure_reason == "max_deliveries".
  • test_terminal_failure_reason_set_on_retry_terminal_nack with strategy returning None; assert row.terminal_failure_reason == "retry_terminal".
  • test_terminal_failure_reason_unset_on_ack_ack; assert row.terminal_failure_reason is None.
  • test_dlq_payload_built_only_on_failure — drive _flush_terminal for ack/reject path with DLQ configured, assert delete_with_lease called without dlq_payload.

Fake (tests/test_fake.py)

  • test_fake_dlq_captures_max_deliveries_failureTestOutboxBroker configured with DLQ; publish row, dispatch with deliveries_count > max_deliveries; assert broker.client.dlq_rows has one entry with failure_reason="max_deliveries".
  • test_fake_dlq_captures_retry_terminal_failure — same with NoRetry() and a raising handler; assert DLQ entry has failure_reason="retry_terminal" and serialized exception text.
  • test_fake_dlq_unconfigured_terminal_failure_silently_deletes — no DLQ; assert deletion still happens, no DLQ rows.

Integration (tests/test_integration.py)

  • test_dlq_atomic_insert_with_delete — publish row, fail handler with NoRetry(); assert outbox empty AND DLQ has one row with matching original_id, queue, payload, failure_reason, last_exception != None.
  • test_dlq_insert_failure_rolls_back_delete — drop the DLQ table mid-test (or pass a bad dlq_table reference), trigger terminal failure, assert outbox row stays (lease eventually expires, retried).
  • test_validate_schema_includes_dlq_when_configured — call broker.validate_schema() against a DB missing the DLQ table; assert it raises listing the DLQ as missing.

Files to change

File Change
faststream_outbox/schema.py make_dlq_table factory
faststream_outbox/client.py delete_with_lease(dlq_payload=...) CTE path; store _dlq_table; extend validate_schema
faststream_outbox/configs.py dlq_table field
faststream_outbox/broker.py dlq_table kwarg → config
faststream_outbox/message.py terminal_failure_reason field; set in allow_delivery + _nack
faststream_outbox/subscriber/usecase.py _flush_terminal builds dlq_payload when configured + failure
faststream_outbox/testing.py Fake DLQ list + delete_with_lease extension
faststream_outbox/__init__.py Export make_dlq_table
tests/test_unit.py / test_fake.py / test_integration.py Cases above
README.md New "Dead-letter queue" section: factory, wiring, exception-serialization note, growth/cleanup
CLAUDE.md DLQ paragraph in the architecture section: atomicity story + terminal_failure_reason discriminator

Relationship to existing work

  • M7 (event=lease_lost log) preserved — lease-lost still skips both the DELETE and the DLQ INSERT (CTE returns no rows).
  • E4 metrics recorder (issue E4: Metrics recorder seam for outbox subscribers #23) reserves the dlq_written event name. When E4 lands, fire dlq_written from _flush_terminal after a successful DLQ insert.
  • Schema validation (client.py:274-286) already delegates to Alembic's compare_metadata against a canonical-table copy. The DLQ extension follows the same shape: when dlq_table is configured, validate it the same way.

Open question for the implementer

Should make_dlq_table accept an optional table_name arg (mirroring make_outbox_table's table_name="outbox" default)? Yes, default "outbox_dlq". If the user already has an "outbox_dlq" table from a prior tool, they'll want to rename ours.

Verification

just lint
just test tests/test_unit.py tests/test_fake.py
just test

Coverage budget: ≥ 95% on new schema.py factory and the new client branch; no regression elsewhere.

Carved out of plan2.md E3.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions