Problem
OutboxSubscriber.stop() (faststream_outbox/subscriber/usecase.py:144-147) currently wraps super().stop() in anyio.move_on_after(graceful_timeout). The parent's TasksMixin.stop() cancels worker tasks immediately after a brief lock-release wait — any handler still in flight gets CancelledError, and any row already pulled from _inflight but not yet dispatched is abandoned (its lease expires and is reclaimed on next start, but it's a duplicate-delivery surface in the meantime).
For an outbox (reliability primitive), drain semantics matter more than for a generic broker.
Proposed design
Introduce a separate _draining: bool flag distinct from self.running:
- Fetch loop also checks
_draining, so it stops claiming new rows when stop begins.
- Worker loops keep
while self.running: unchanged — they continue dispatching from _inflight.
stop() sets _draining=True, kicks _notify_event to wake the fetch loop from any idle sleep, awaits _inflight.join() bounded by graceful_timeout, then calls super().stop() (which sets running=False, releases the ack lock, and cancels remaining tasks).
# subscriber/usecase.py
class OutboxSubscriber(...):
def __init__(self, ...):
...
self._draining: bool = False # set by stop() to halt _fetch_inner
async def _fetch_inner(self, *, fetch_conn, listen_conn):
...
while self.running and not self._draining: # was: while self.running
...
async def stop(self) -> None:
self._draining = True
self._notify_event.set() # wake the fetch loop from any idle sleep
timeout = self._outer_config.graceful_timeout
with anyio.move_on_after(timeout):
await self._inflight.join()
await super().stop() # sets running=False, releases ack lock, cancels tasks
Non-obvious trap
FastStream's SubscriberUsecase.consume() has an early-exit if not self.running: return None. The naive "set running=False first, then _inflight.join()" order from plan2.md would cause every queued row to skip dispatch — the worker pulls it, consume() returns None, the handler never runs, task_done() still fires. The separate _draining flag is what avoids this.
Edge cases handled
graceful_timeout=None: move_on_after(None) is a no-op → unbounded drain. Acceptable (user opted out of bounded shutdown).
- Drain timeout exceeded:
super().stop() runs unconditionally, cancelling remaining tasks. Rows still queued are abandoned; their leases expire and they're reclaimed next start.
- Fetch loop mid-CTE when
_draining flips: it completes the CTE, enqueues whatever rows it acquired, then exits the next iteration. Those rows drain normally.
- Worker parked on
_inflight.get() after drain: super().stop() → TasksMixin.stop() cancels it; the await raises CancelledError, task ends cleanly.
run_loops=False test path: no loops, _inflight.join() returns instantly. No-op.
Test plan
Unit / fake (tests/test_fake.py)
- Drain completes:
run_loops=True, publish 5 rows, handler await asyncio.sleep(0.1). Call broker.stop(). Assert all 5 handlers ran to completion; assert FakeOutboxClient._rows is empty after stop.
- Drain timeout: handler
await asyncio.sleep(60), graceful_timeout=0.2. Call stop(). Assert it returns within ~0.5s and at least one handler was cancelled.
- Fetch halts immediately: publish 100 rows,
fetch_batch_size=10. Call stop() while only 10 are queued. Assert no new leases are claimed after stop() initiated.
Integration (tests/test_integration.py)
- Graceful drain end-to-end: publish 20 rows,
max_workers=4, handler await asyncio.sleep(0.2). Call broker.stop() immediately after publish. Assert all 20 handlers completed and all rows are deleted from the table.
Files to change
| File |
Change |
faststream_outbox/subscriber/usecase.py |
+self._draining in __init__ (~L116); update _fetch_inner loop guard (L208); rewrite stop() (L144–147) |
tests/test_fake.py |
drain scenarios under run_loops=True |
tests/test_integration.py |
end-to-end drain test against real Postgres |
CLAUDE.md |
document drain semantics in the Two-loop subscriber section |
Verification
just lint
just test tests/test_unit.py tests/test_fake.py
just test # full suite including integration
Carved out of plan2.md E2.
Problem
OutboxSubscriber.stop()(faststream_outbox/subscriber/usecase.py:144-147) currently wrapssuper().stop()inanyio.move_on_after(graceful_timeout). The parent'sTasksMixin.stop()cancels worker tasks immediately after a brief lock-release wait — any handler still in flight getsCancelledError, and any row already pulled from_inflightbut not yet dispatched is abandoned (its lease expires and is reclaimed on next start, but it's a duplicate-delivery surface in the meantime).For an outbox (reliability primitive), drain semantics matter more than for a generic broker.
Proposed design
Introduce a separate
_draining: boolflag distinct fromself.running:_draining, so it stops claiming new rows when stop begins.while self.running:unchanged — they continue dispatching from_inflight.stop()sets_draining=True, kicks_notify_eventto wake the fetch loop from any idle sleep, awaits_inflight.join()bounded bygraceful_timeout, then callssuper().stop()(which setsrunning=False, releases the ack lock, and cancels remaining tasks).Non-obvious trap
FastStream's
SubscriberUsecase.consume()has an early-exitif not self.running: return None. The naive "setrunning=Falsefirst, then_inflight.join()" order fromplan2.mdwould cause every queued row to skip dispatch — the worker pulls it,consume()returnsNone, the handler never runs,task_done()still fires. The separate_drainingflag is what avoids this.Edge cases handled
graceful_timeout=None:move_on_after(None)is a no-op → unbounded drain. Acceptable (user opted out of bounded shutdown).super().stop()runs unconditionally, cancelling remaining tasks. Rows still queued are abandoned; their leases expire and they're reclaimed next start._drainingflips: it completes the CTE, enqueues whatever rows it acquired, then exits the next iteration. Those rows drain normally._inflight.get()after drain:super().stop()→TasksMixin.stop()cancels it; the await raisesCancelledError, task ends cleanly.run_loops=Falsetest path: no loops,_inflight.join()returns instantly. No-op.Test plan
Unit / fake (
tests/test_fake.py)run_loops=True, publish 5 rows, handlerawait asyncio.sleep(0.1). Callbroker.stop(). Assert all 5 handlers ran to completion; assertFakeOutboxClient._rowsis empty after stop.await asyncio.sleep(60),graceful_timeout=0.2. Callstop(). Assert it returns within ~0.5s and at least one handler was cancelled.fetch_batch_size=10. Callstop()while only 10 are queued. Assert no new leases are claimed afterstop()initiated.Integration (
tests/test_integration.py)max_workers=4, handlerawait asyncio.sleep(0.2). Callbroker.stop()immediately after publish. Assert all 20 handlers completed and all rows are deleted from the table.Files to change
faststream_outbox/subscriber/usecase.py+self._drainingin__init__(~L116); update_fetch_innerloop guard (L208); rewritestop()(L144–147)tests/test_fake.pyrun_loops=Truetests/test_integration.pyCLAUDE.mdVerification
Carved out of
plan2.mdE2.