You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Terminal failures DELETE silently — that's the documented spec (CLAUDE.md: "Terminal failures DELETE (no archive, no DLQ)"), backed by OutboxSubscriber._flush_terminal (faststream_outbox/subscriber/usecase.py:426-446). For audit-bearing workloads (finance, compliance, debugging), silently dropping poison messages is unacceptable. Today users have to wrap their handler with try/except + manual DLQ insert, which races the broker's DELETE.
Two distinct terminal-failure paths converge on _flush_terminal:
A third path also calls _flush_terminal: handler success via _ack (message.py:79-81). DLQ must fire for paths 1 and 2 only, not path 3.
Proposed design
Optional dlq_table: Table | None = None argument to OutboxBroker. On terminal failure, copy payload + headers + last-exception summary into dlq_tablein the same transaction as the outbox DELETE. Provide make_dlq_table(metadata, table_name="outbox_dlq") analogous to make_outbox_table (faststream_outbox/schema.py:38-107) so users wire it into their Alembic the same way.
No FK to the outbox table — once the row is in the DLQ, the outbox row is gone in the same transaction (the FK would be unsatisfiable). original_id is a plain BigInteger for operator forensics.
Atomic DELETE + INSERT via CTE
Extend OutboxClient.delete_with_lease (client.py:211-232) to accept an optional dlq_payload dict:
# faststream_outbox/client.pyasyncdefdelete_with_lease(
self,
conn: "AsyncConnection | None",
message_id: int,
acquired_token: uuid.UUID,
*,
dlq_payload: "Mapping[str, typing.Any] | None"=None,
) ->bool:
""" Delete *message_id* iff it still holds *acquired_token*. Returns True if deleted. When *dlq_payload* is provided, the same transaction also INSERTs into the configured DLQ table via a CTE: WITH deleted AS ( DELETE FROM outbox WHERE id=:id AND acquired_token=:token RETURNING * ) INSERT INTO outbox_dlq (original_id, queue, payload, headers, deliveries_count, created_at, failure_reason, last_exception) SELECT id, queue, payload, headers, deliveries_count, created_at, :failure_reason, :last_exception FROM deleted; If `deleted` is empty (lease lost), the INSERT inserts nothing — natural safety, no special case in Python. If the INSERT raises (DLQ schema mismatch, disk full), the whole transaction rolls back — the outbox row stays and retries (see "Atomicity" rationale below). """
dlq_payload carries {"failure_reason": "max_deliveries" | "retry_terminal", "last_exception": str | None}. When self._dlq_table is None or dlq_payload is None, the existing per-row DELETE shape is used unchanged.
Tracking terminal-failure-vs-success on the row
_flush_terminal is called for all three paths above; it currently can't tell why. Add a field to OutboxInnerMessage (message.py:32-62):
If dlq_table is configured, also validate it (same Alembic-diff path)
Non-obvious traps
Path 3 (handler success) goes through _flush_terminal. This caught the design twice in review — the obvious "wrap _flush_terminal with a DLQ insert" approach would archive every successful row. The terminal_failure_reason: str | None field on the message is what discriminates; it must be set on the failure paths and left None on _ack/_reject.
last_exception is a BaseException object. It lives on the row via _CaptureExceptionMiddleware.after_processed (broker.py:98-106). For DLQ storage we serialize with repr() — it's compact, type-aware, and matches what _log(..., exc_info=e) would render. Full tracebacks are not stored (string-formatting __traceback__ is expensive and DLQ rows are append-only artifacts, not live debugging); operators correlate by original_id + timestamp against application logs. Document this trade-off in the README.
Atomicity choice: roll back on DLQ INSERT failure. If the DLQ INSERT fails (schema mismatch, disk full, FK violation that shouldn't exist but might), the combined CTE transaction rolls back — the outbox row stays leased until the lease expires, then is reclaimed and retried. This surfaces DLQ misconfiguration loudly (rows pile up in the outbox, lease_lost rate spikes, drain wedges) instead of silently losing data. Best-effort DLQ ("commit DELETE even if INSERT fails") is the alternative; we reject it because the whole point of opt-in DLQ is "I care enough about these failures to want them durably recorded." Users who want best-effort can wrap their handler.
CTE INSERT requires the DELETE to RETURN all needed columns. The current delete_with_lease (client.py:228-232) doesn't RETURNING anything. The new form does RETURNING * (in the CTE form only when DLQ is configured) — small cost, only paid when DLQ is on.
make_dlq_table does not declare a NOTIFY channel. Unlike the outbox table, the DLQ has no LISTEN/NOTIFY: nobody's polling it. The 63-byte name validation (schema.py:33-57) is not needed and is dropped from the DLQ factory.
Edge cases handled
dlq_table=None (default): bit-for-bit identical to today. The terminal-delete code path takes the original branch; zero perf cost, no schema requirement.
Lease lost during DLQ write: deleted CTE returns empty rows → INSERT inserts nothing → DELETE is a no-op → rowcount == 0 triggers the existing event=lease_lost WARNING. No DLQ row created.
Handler success: terminal_failure_reason is None → dlq_payload is None → original per-row DELETE path. No DLQ row created.
DLQ INSERT fails: CTE transaction rolls back. Outbox row stays leased until TTL, reclaimed, retried. Operator sees the failure surface as growing outbox + retry pressure.
max_deliveries=None: allow_delivery never sets terminal_failure_reason="max_deliveries"; the path is unreachable. Only retry_terminal is observed in that case.
NoRetry() strategy: first nack returns None → terminal_failure_reason="retry_terminal" → DLQ row with whatever exception fired.
DLQ table growth: no built-in TTL or reaper. Users own cleanup (Alembic, cron DELETE WHERE failed_at < now() - interval '90 days'). Document this.
Test broker: FakeOutboxClient._dlq_rows accumulates; expose a dlq_rows property for assertions.
test_terminal_failure_reason_unset_on_ack — _ack; assert row.terminal_failure_reason is None.
test_dlq_payload_built_only_on_failure — drive _flush_terminal for ack/reject path with DLQ configured, assert delete_with_lease called without dlq_payload.
Fake (tests/test_fake.py)
test_fake_dlq_captures_max_deliveries_failure — TestOutboxBroker configured with DLQ; publish row, dispatch with deliveries_count > max_deliveries; assert broker.client.dlq_rows has one entry with failure_reason="max_deliveries".
test_fake_dlq_captures_retry_terminal_failure — same with NoRetry() and a raising handler; assert DLQ entry has failure_reason="retry_terminal" and serialized exception text.
test_fake_dlq_unconfigured_terminal_failure_silently_deletes — no DLQ; assert deletion still happens, no DLQ rows.
Integration (tests/test_integration.py)
test_dlq_atomic_insert_with_delete — publish row, fail handler with NoRetry(); assert outbox empty AND DLQ has one row with matching original_id, queue, payload, failure_reason, last_exception != None.
test_dlq_insert_failure_rolls_back_delete — drop the DLQ table mid-test (or pass a bad dlq_table reference), trigger terminal failure, assert outbox row stays (lease eventually expires, retried).
test_validate_schema_includes_dlq_when_configured — call broker.validate_schema() against a DB missing the DLQ table; assert it raises listing the DLQ as missing.
Files to change
File
Change
faststream_outbox/schema.py
make_dlq_table factory
faststream_outbox/client.py
delete_with_lease(dlq_payload=...) CTE path; store _dlq_table; extend validate_schema
faststream_outbox/configs.py
dlq_table field
faststream_outbox/broker.py
dlq_table kwarg → config
faststream_outbox/message.py
terminal_failure_reason field; set in allow_delivery + _nack
faststream_outbox/subscriber/usecase.py
_flush_terminal builds dlq_payload when configured + failure
Schema validation (client.py:274-286) already delegates to Alembic's compare_metadata against a canonical-table copy. The DLQ extension follows the same shape: when dlq_table is configured, validate it the same way.
Open question for the implementer
Should make_dlq_table accept an optional table_name arg (mirroring make_outbox_table's table_name="outbox" default)? Yes, default "outbox_dlq". If the user already has an "outbox_dlq" table from a prior tool, they'll want to rename ours.
Verification
just lint
just test tests/test_unit.py tests/test_fake.py
just test
Coverage budget: ≥ 95% on new schema.py factory and the new client branch; no regression elsewhere.
Problem
Terminal failures
DELETEsilently — that's the documented spec (CLAUDE.md: "Terminal failuresDELETE(no archive, no DLQ)"), backed byOutboxSubscriber._flush_terminal(faststream_outbox/subscriber/usecase.py:426-446). For audit-bearing workloads (finance, compliance, debugging), silently dropping poison messages is unacceptable. Today users have to wrap their handler withtry/except+ manual DLQ insert, which races the broker'sDELETE.Two distinct terminal-failure paths converge on
_flush_terminal:OutboxInnerMessage.allow_deliveryreturns False —max_deliveriesexceeded (message.py:111-122)._nack— terminal by retry policy (message.py:95-96, retry.py:46-52).A third path also calls
_flush_terminal: handler success via_ack(message.py:79-81). DLQ must fire for paths 1 and 2 only, not path 3.Proposed design
Optional
dlq_table: Table | None = Noneargument toOutboxBroker. On terminal failure, copypayload+headers+ last-exception summary intodlq_tablein the same transaction as the outboxDELETE. Providemake_dlq_table(metadata, table_name="outbox_dlq")analogous tomake_outbox_table(faststream_outbox/schema.py:38-107) so users wire it into their Alembic the same way.DLQ table factory
No FK to the outbox table — once the row is in the DLQ, the outbox row is gone in the same transaction (the FK would be unsatisfiable).
original_idis a plainBigIntegerfor operator forensics.Atomic DELETE + INSERT via CTE
Extend
OutboxClient.delete_with_lease(client.py:211-232) to accept an optionaldlq_payloaddict:dlq_payloadcarries{"failure_reason": "max_deliveries" | "retry_terminal", "last_exception": str | None}. Whenself._dlq_table is Noneordlq_payload is None, the existing per-row DELETE shape is used unchanged.Tracking terminal-failure-vs-success on the row
_flush_terminalis called for all three paths above; it currently can't tell why. Add a field toOutboxInnerMessage(message.py:32-62):Set it from the two failure paths:
allow_deliveryreturns False (message.py:113-115):self.terminal_failure_reason = "max_deliveries"._nackwhendelay is None(message.py:95-96):self.terminal_failure_reason = "retry_terminal"._ack(message.py:79-81) and_reject(message.py:100-102): leave at None._flush_terminalreads it to decide whether to build adlq_payload:Wiring
OutboxBroker.__init__(broker.py:133-182)dlq_table: Table | None = NonekwargOutboxBrokerConfig(configs.py:21-32)dlq_table: Table | NonefieldOutboxClient.__init__(client.py:122-125)dlq_table: Table | None; store onself._dlq_tableFakeOutboxClient(testing.py)_dlq_rows: list[_FakeDlqRow];delete_with_leaseaccepts and appliesdlq_payloadvalidate_schema(client.py:274-286)dlq_tableis configured, also validate it (same Alembic-diff path)Non-obvious traps
Path 3 (handler success) goes through
_flush_terminal. This caught the design twice in review — the obvious "wrap_flush_terminalwith a DLQ insert" approach would archive every successful row. Theterminal_failure_reason: str | Nonefield on the message is what discriminates; it must be set on the failure paths and left None on_ack/_reject.last_exceptionis aBaseExceptionobject. It lives on the row via_CaptureExceptionMiddleware.after_processed(broker.py:98-106). For DLQ storage we serialize withrepr()— it's compact, type-aware, and matches what_log(..., exc_info=e)would render. Full tracebacks are not stored (string-formatting__traceback__is expensive and DLQ rows are append-only artifacts, not live debugging); operators correlate byoriginal_id+ timestamp against application logs. Document this trade-off in the README.Atomicity choice: roll back on DLQ INSERT failure. If the DLQ INSERT fails (schema mismatch, disk full, FK violation that shouldn't exist but might), the combined CTE transaction rolls back — the outbox row stays leased until the lease expires, then is reclaimed and retried. This surfaces DLQ misconfiguration loudly (rows pile up in the outbox, lease_lost rate spikes, drain wedges) instead of silently losing data. Best-effort DLQ ("commit DELETE even if INSERT fails") is the alternative; we reject it because the whole point of opt-in DLQ is "I care enough about these failures to want them durably recorded." Users who want best-effort can wrap their handler.
CTE INSERT requires the DELETE to RETURN all needed columns. The current
delete_with_lease(client.py:228-232) doesn'tRETURNINGanything. The new form doesRETURNING *(in the CTE form only when DLQ is configured) — small cost, only paid when DLQ is on.make_dlq_tabledoes not declare a NOTIFY channel. Unlike the outbox table, the DLQ has no LISTEN/NOTIFY: nobody's polling it. The 63-byte name validation (schema.py:33-57) is not needed and is dropped from the DLQ factory.Edge cases handled
dlq_table=None(default): bit-for-bit identical to today. The terminal-delete code path takes the original branch; zero perf cost, no schema requirement.deletedCTE returns empty rows → INSERT inserts nothing → DELETE is a no-op →rowcount == 0triggers the existingevent=lease_lostWARNING. No DLQ row created.terminal_failure_reason is None→dlq_payload is None→ original per-row DELETE path. No DLQ row created.max_deliveries=None:allow_deliverynever setsterminal_failure_reason="max_deliveries"; the path is unreachable. Onlyretry_terminalis observed in that case.NoRetry()strategy: first nack returns None →terminal_failure_reason="retry_terminal"→ DLQ row with whatever exception fired.DELETE WHERE failed_at < now() - interval '90 days'). Document this.FakeOutboxClient._dlq_rowsaccumulates; expose adlq_rowsproperty for assertions.Test plan
Unit (
tests/test_unit.py)test_terminal_failure_reason_set_on_max_deliveries—allow_deliverywithdeliveries_count > max_deliveries; assertrow.terminal_failure_reason == "max_deliveries".test_terminal_failure_reason_set_on_retry_terminal—_nackwith strategy returning None; assertrow.terminal_failure_reason == "retry_terminal".test_terminal_failure_reason_unset_on_ack—_ack; assertrow.terminal_failure_reason is None.test_dlq_payload_built_only_on_failure— drive_flush_terminalfor ack/reject path with DLQ configured, assertdelete_with_leasecalled withoutdlq_payload.Fake (
tests/test_fake.py)test_fake_dlq_captures_max_deliveries_failure—TestOutboxBrokerconfigured with DLQ; publish row, dispatch withdeliveries_count > max_deliveries; assertbroker.client.dlq_rowshas one entry withfailure_reason="max_deliveries".test_fake_dlq_captures_retry_terminal_failure— same withNoRetry()and a raising handler; assert DLQ entry hasfailure_reason="retry_terminal"and serialized exception text.test_fake_dlq_unconfigured_terminal_failure_silently_deletes— no DLQ; assert deletion still happens, no DLQ rows.Integration (
tests/test_integration.py)test_dlq_atomic_insert_with_delete— publish row, fail handler withNoRetry(); assert outbox empty AND DLQ has one row with matchingoriginal_id,queue,payload,failure_reason,last_exception != None.test_dlq_insert_failure_rolls_back_delete— drop the DLQ table mid-test (or pass a baddlq_tablereference), trigger terminal failure, assert outbox row stays (lease eventually expires, retried).test_validate_schema_includes_dlq_when_configured— callbroker.validate_schema()against a DB missing the DLQ table; assert it raises listing the DLQ as missing.Files to change
faststream_outbox/schema.pymake_dlq_tablefactoryfaststream_outbox/client.pydelete_with_lease(dlq_payload=...)CTE path; store_dlq_table; extendvalidate_schemafaststream_outbox/configs.pydlq_tablefieldfaststream_outbox/broker.pydlq_tablekwarg → configfaststream_outbox/message.pyterminal_failure_reasonfield; set inallow_delivery+_nackfaststream_outbox/subscriber/usecase.py_flush_terminalbuildsdlq_payloadwhen configured + failurefaststream_outbox/testing.pydelete_with_leaseextensionfaststream_outbox/__init__.pymake_dlq_tabletests/test_unit.py/test_fake.py/test_integration.pyREADME.mdCLAUDE.mdterminal_failure_reasondiscriminatorRelationship to existing work
event=lease_lostlog) preserved — lease-lost still skips both the DELETE and the DLQ INSERT (CTE returns no rows).dlq_writtenevent name. When E4 lands, firedlq_writtenfrom_flush_terminalafter a successful DLQ insert.compare_metadataagainst a canonical-table copy. The DLQ extension follows the same shape: whendlq_tableis configured, validate it the same way.Open question for the implementer
Should
make_dlq_tableaccept an optionaltable_namearg (mirroringmake_outbox_table'stable_name="outbox"default)? Yes, default"outbox_dlq". If the user already has an "outbox_dlq" table from a prior tool, they'll want to rename ours.Verification
Coverage budget: ≥ 95% on new
schema.pyfactory and the new client branch; no regression elsewhere.Carved out of
plan2.mdE3.