Skip to content

feat(Broker): add PgmqBroker#399

Open
mducros-wm wants to merge 42 commits into
masterfrom
codex-pgmq-broker-j1
Open

feat(Broker): add PgmqBroker#399
mducros-wm wants to merge 42 commits into
masterfrom
codex-pgmq-broker-j1

Conversation

@mducros-wm

Copy link
Copy Markdown

No description provided.

@mducros-wm mducros-wm force-pushed the codex-pgmq-broker-j1 branch from e0018fe to e4bfcaf Compare May 20, 2026 08:39
Comment thread .github/workflows/tests.yml Outdated
- 5784:5672
postgres:
image: postgres:16
image: ghcr.io/pgmq/pg18-pgmq:latest

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "vanilla" pg image and pgmq installation as a plugin ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could but it seems more complex to me. Why don't you want to use the pgmq image ? I should remove the latest though

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to test the plugin installation and versionning

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the responsibility of remoulade to install the plugins ?

Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
@mducros-wm

Copy link
Copy Markdown
Author

Comment thread pyproject.toml Outdated
Comment thread remoulade/brokers/pgmq.py
Comment thread remoulade/brokers/pgmq.py
Comment thread remoulade/brokers/pgmq.py
self,
*,
url: str,
middleware: list["Middleware"] | None = None,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check if all builtin middleware are compatible.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Middleware manipulate remoulade object which are unchanged. However, because the delay is handled natively, some middleware method will never be called (before_delay_message, after_declare_delay_queue)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But imo, it makes no sense to call them with a pgmq broker

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the Middleware are not set by the user directly. Maybe you should have a look on the default middleware setup during init.

Comment thread pyproject.toml Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
Comment thread remoulade/brokers/pgmq.py Outdated
mducros-wm and others added 12 commits June 19, 2026 12:03
…gine

SQLAlchemy defaults `postgresql://` to the psycopg2 dialect, which the
project does not depend on. CI installs only psycopg v3, so the raw
engine built in the postgres_broker fixture failed at setup with
ModuleNotFoundError: No module named 'psycopg2'. Swap the scheme to
postgresql+psycopg://, mirroring what PGMQ does internally for the
broker. The plain url is kept for PostgresBroker, whose listener opens a
raw psycopg.connect() connection that does not understand +psycopg.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Revert the unjustified ^^^ -> --- underline change on the max_retries
retry option so it matches the other retry options (min_backoff,
max_backoff, retry_when, backoff_strategy).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Explain that queues are partitioned PGMQ queues backed by pg_partman,
what archive_partition_interval_in_days / archive_retention_interval_in_days
control, and that pg_partman maintenance must run periodically.

Also fix the broken sentence about the required PostgreSQL user and remove
the unused FUTURE_PARTITION_HORIZON constant.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
archive() runs in process_message's finally, outside the worker loop's
except Empty, so a transient DB error during ack/nack used to propagate
and kill the worker thread (which has no restart). Swallow and log it
like the RabbitMQ consumer does; the message is redelivered after its
visibility timeout, honoring at-least-once.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…cripts

encode_in_bytes routed through encode_in_json, whose throwaway json.dumps
validation pass ran on top of the real serialization: 2 passes for the
default JSONEncoder (RabbitMQ/Stub) and 3 for PydanticEncoder. Serialize
directly from the data / _encode_in_json output instead, bringing the
default path back to a single pass and Pydantic down to two. Output bytes
are unchanged. Also drop the misleading `# pragma: no cover` on the default
codec, which the Stub broker actually exercises.

Remove local_postgres_broker.py / local_postgres_consumer.py: local dev
scripts with hard-coded DSNs and prints that should never have been committed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The existing suite tests the broker's mechanics largely in isolation; the
middleware-driven behaviours were only exercised against the stub and
RabbitMQ brokers. Add real-Worker tests running through PGMQ:

- a flaky actor that fails then succeeds on retry,
- an always-failing actor whose exhausted retries end archived (proving the
  nack does not leave the message to be redelivered forever — no DLQ),
- result storage/retrieval via a Results middleware,
- a group aggregating results.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…table

join() counted messages by hand-building a Table("q_<name>", schema="pgmq")
and running count(*), coupling the broker to PGMQ's internal table layout.
Use the public metrics() API instead: queue_length covers visible, invisible
in-flight and native delayed messages alike, matching join()'s contract.
Prune the now-unused SQLAlchemy imports.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…e timeout

The polling fallback rounded timeout up to a one-second minimum, so a
consumer created with timeout=0 blocked ~1s per read instead of being
non-blocking as documented. Do a single immediate read when timeout is 0,
and validate in the consumer's __init__ that timeout is not negative
(matching the broker's other input validation) instead of silently
pretending to coerce it.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The fallback_encoder swallowed every decode error and returned the raw,
un-rehydrated payload through the fallback encoder, silently handing actors
the wrong data shape and hiding the original failure. Remove the argument so
decoding raises explicitly (ActorNotFound, ValidationError, ...). Drop the
two tests that asserted the silent-fallback behaviour; the raising paths are
already covered. Document both this and the simplejson->Pydantic
serialization change (Decimal now encoded as a string) as breaking.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- Fix copy-pasted Message.decode_json/encode_in_json docstrings that said
  "bytestring" while they handle JSON objects.
- Make convert_days_in_partman_syntax a staticmethod; it never used self.
- Reject prefetch < 1 in the consumer with a ValueError (matching the
  timeout validation) instead of claiming a coercion that never happened.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ener

The shared LISTEN/NOTIFY listener keyed a single wake event per queue, so a
second consumer on the same queue overwrote the first's event and a closing
consumer's unregister() silently stopped notifications for a still-active
sibling. Track a set of events per queue, wake them all, and only drop a
queue's channel routing once its last consumer leaves.

Also back `available` with a threading.Event instead of a bare bool so the
dispatch thread's writes and the consumer threads' reads are synchronized
rather than relying on CPython GIL atomicity; expose it as a read-only
bool property so callers and tests are unaffected.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@mducros-wm mducros-wm requested a review from fregogui June 19, 2026 14:56
Reintroduce the opt-in fallback_encoder argument so decoding can fall
back to another encoder instead of raising. Defaults to None, keeping
strict decoding as the default behaviour.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@mducros-wm

mducros-wm commented Jun 30, 2026

Copy link
Copy Markdown
Author

Hello @tavallaie, thanks for your presence here. I have a question about partitionned queues. I've looked how you config pg_partman when you create a partitionned queue and premake is at 4 and infinite_time_partition is at false. Then if a queue is empty there is no partition which are created. So what happen if the partition was not created because the queue was empty and now we want to add a new message for a partition which not exists ?

@mducros-wm mducros-wm force-pushed the codex-pgmq-broker-j1 branch from 6772534 to aed735c Compare July 2, 2026 09:44
@tavallaie

Copy link
Copy Markdown

Hello @tavallaie, thanks for your presence here. I have a question about partitionned queues. I've looked how you config pg_partman when you create a partitionned queue and premake is at 4 and infinite_time_partition is at false. Then if a queue is empty there is no partition which are created. So what happen if the partition was not created because the queue was empty and now we want to add a new message for a partition which not exists ?

@mducros-wm, great question that is out of my knowledge right now (it happen before I join the team).
I ask @ChuckHend to join and help us.

@ChuckHend

Copy link
Copy Markdown

@mducros-wm, great question that is out of my knowledge right now (it happen before I join the team).
I ask @ChuckHend to join and help us.

Is the question about message count based partition or time based partition? Pgmq's partitioned queues are a thin wrapper around pg_partmans API, so I can look at the project docs to investigate. The intention though is to create more partitions even when queue is empty for time based partitions.

For message count, the intention was to use message sequence to determine partition creation. But if there are no messages hitting queue then no new partitions would be created.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants