Skip to content

E2: Graceful drain on subscriber stop #22

@lesnik512

Description

@lesnik512

Problem

OutboxSubscriber.stop() (faststream_outbox/subscriber/usecase.py:144-147) currently wraps super().stop() in anyio.move_on_after(graceful_timeout). The parent's TasksMixin.stop() cancels worker tasks immediately after a brief lock-release wait — any handler still in flight gets CancelledError, and any row already pulled from _inflight but not yet dispatched is abandoned (its lease expires and is reclaimed on next start, but it's a duplicate-delivery surface in the meantime).

For an outbox (reliability primitive), drain semantics matter more than for a generic broker.

Proposed design

Introduce a separate _draining: bool flag distinct from self.running:

  • Fetch loop also checks _draining, so it stops claiming new rows when stop begins.
  • Worker loops keep while self.running: unchanged — they continue dispatching from _inflight.
  • stop() sets _draining=True, kicks _notify_event to wake the fetch loop from any idle sleep, awaits _inflight.join() bounded by graceful_timeout, then calls super().stop() (which sets running=False, releases the ack lock, and cancels remaining tasks).
# subscriber/usecase.py

class OutboxSubscriber(...):
    def __init__(self, ...):
        ...
        self._draining: bool = False  # set by stop() to halt _fetch_inner

    async def _fetch_inner(self, *, fetch_conn, listen_conn):
        ...
        while self.running and not self._draining:  # was: while self.running
            ...

    async def stop(self) -> None:
        self._draining = True
        self._notify_event.set()  # wake the fetch loop from any idle sleep
        timeout = self._outer_config.graceful_timeout
        with anyio.move_on_after(timeout):
            await self._inflight.join()
        await super().stop()  # sets running=False, releases ack lock, cancels tasks

Non-obvious trap

FastStream's SubscriberUsecase.consume() has an early-exit if not self.running: return None. The naive "set running=False first, then _inflight.join()" order from plan2.md would cause every queued row to skip dispatch — the worker pulls it, consume() returns None, the handler never runs, task_done() still fires. The separate _draining flag is what avoids this.

Edge cases handled

  • graceful_timeout=None: move_on_after(None) is a no-op → unbounded drain. Acceptable (user opted out of bounded shutdown).
  • Drain timeout exceeded: super().stop() runs unconditionally, cancelling remaining tasks. Rows still queued are abandoned; their leases expire and they're reclaimed next start.
  • Fetch loop mid-CTE when _draining flips: it completes the CTE, enqueues whatever rows it acquired, then exits the next iteration. Those rows drain normally.
  • Worker parked on _inflight.get() after drain: super().stop()TasksMixin.stop() cancels it; the await raises CancelledError, task ends cleanly.
  • run_loops=False test path: no loops, _inflight.join() returns instantly. No-op.

Test plan

Unit / fake (tests/test_fake.py)

  • Drain completes: run_loops=True, publish 5 rows, handler await asyncio.sleep(0.1). Call broker.stop(). Assert all 5 handlers ran to completion; assert FakeOutboxClient._rows is empty after stop.
  • Drain timeout: handler await asyncio.sleep(60), graceful_timeout=0.2. Call stop(). Assert it returns within ~0.5s and at least one handler was cancelled.
  • Fetch halts immediately: publish 100 rows, fetch_batch_size=10. Call stop() while only 10 are queued. Assert no new leases are claimed after stop() initiated.

Integration (tests/test_integration.py)

  • Graceful drain end-to-end: publish 20 rows, max_workers=4, handler await asyncio.sleep(0.2). Call broker.stop() immediately after publish. Assert all 20 handlers completed and all rows are deleted from the table.

Files to change

File Change
faststream_outbox/subscriber/usecase.py +self._draining in __init__ (~L116); update _fetch_inner loop guard (L208); rewrite stop() (L144–147)
tests/test_fake.py drain scenarios under run_loops=True
tests/test_integration.py end-to-end drain test against real Postgres
CLAUDE.md document drain semantics in the Two-loop subscriber section

Verification

just lint
just test tests/test_unit.py tests/test_fake.py
just test  # full suite including integration

Carved out of plan2.md E2.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions