feat(Broker): add PgmqBroker#399
Conversation
e0018fe to
e4bfcaf
Compare
2263626 to
0ad1c3a
Compare
| - 5784:5672 | ||
| postgres: | ||
| image: postgres:16 | ||
| image: ghcr.io/pgmq/pg18-pgmq:latest |
There was a problem hiding this comment.
Maybe "vanilla" pg image and pgmq installation as a plugin ?
There was a problem hiding this comment.
I could but it seems more complex to me. Why don't you want to use the pgmq image ? I should remove the latest though
There was a problem hiding this comment.
to test the plugin installation and versionning
There was a problem hiding this comment.
Is it the responsibility of remoulade to install the plugins ?
4c5155e to
39443d2
Compare
eb5c39e to
1940b4b
Compare
1940b4b to
f48ad0b
Compare
f48ad0b to
23fb4e1
Compare
|
If you want to see on paxone https://gitlab.wiremind.io/wiremind/paxone/paxone-backend/-/merge_requests/4967 |
| self, | ||
| *, | ||
| url: str, | ||
| middleware: list["Middleware"] | None = None, |
There was a problem hiding this comment.
Can you check if all builtin middleware are compatible.
There was a problem hiding this comment.
Middleware manipulate remoulade object which are unchanged. However, because the delay is handled natively, some middleware method will never be called (before_delay_message, after_declare_delay_queue)
There was a problem hiding this comment.
But imo, it makes no sense to call them with a pgmq broker
There was a problem hiding this comment.
Most of the Middleware are not set by the user directly. Maybe you should have a look on the default middleware setup during init.
…gine SQLAlchemy defaults `postgresql://` to the psycopg2 dialect, which the project does not depend on. CI installs only psycopg v3, so the raw engine built in the postgres_broker fixture failed at setup with ModuleNotFoundError: No module named 'psycopg2'. Swap the scheme to postgresql+psycopg://, mirroring what PGMQ does internally for the broker. The plain url is kept for PostgresBroker, whose listener opens a raw psycopg.connect() connection that does not understand +psycopg. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Revert the unjustified ^^^ -> --- underline change on the max_retries retry option so it matches the other retry options (min_backoff, max_backoff, retry_when, backoff_strategy). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Explain that queues are partitioned PGMQ queues backed by pg_partman, what archive_partition_interval_in_days / archive_retention_interval_in_days control, and that pg_partman maintenance must run periodically. Also fix the broken sentence about the required PostgreSQL user and remove the unused FUTURE_PARTITION_HORIZON constant. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
archive() runs in process_message's finally, outside the worker loop's except Empty, so a transient DB error during ack/nack used to propagate and kill the worker thread (which has no restart). Swallow and log it like the RabbitMQ consumer does; the message is redelivered after its visibility timeout, honoring at-least-once. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…cripts encode_in_bytes routed through encode_in_json, whose throwaway json.dumps validation pass ran on top of the real serialization: 2 passes for the default JSONEncoder (RabbitMQ/Stub) and 3 for PydanticEncoder. Serialize directly from the data / _encode_in_json output instead, bringing the default path back to a single pass and Pydantic down to two. Output bytes are unchanged. Also drop the misleading `# pragma: no cover` on the default codec, which the Stub broker actually exercises. Remove local_postgres_broker.py / local_postgres_consumer.py: local dev scripts with hard-coded DSNs and prints that should never have been committed. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The existing suite tests the broker's mechanics largely in isolation; the middleware-driven behaviours were only exercised against the stub and RabbitMQ brokers. Add real-Worker tests running through PGMQ: - a flaky actor that fails then succeeds on retry, - an always-failing actor whose exhausted retries end archived (proving the nack does not leave the message to be redelivered forever — no DLQ), - result storage/retrieval via a Results middleware, - a group aggregating results. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…table
join() counted messages by hand-building a Table("q_<name>", schema="pgmq")
and running count(*), coupling the broker to PGMQ's internal table layout.
Use the public metrics() API instead: queue_length covers visible, invisible
in-flight and native delayed messages alike, matching join()'s contract.
Prune the now-unused SQLAlchemy imports.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…e timeout The polling fallback rounded timeout up to a one-second minimum, so a consumer created with timeout=0 blocked ~1s per read instead of being non-blocking as documented. Do a single immediate read when timeout is 0, and validate in the consumer's __init__ that timeout is not negative (matching the broker's other input validation) instead of silently pretending to coerce it. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The fallback_encoder swallowed every decode error and returned the raw, un-rehydrated payload through the fallback encoder, silently handing actors the wrong data shape and hiding the original failure. Remove the argument so decoding raises explicitly (ActorNotFound, ValidationError, ...). Drop the two tests that asserted the silent-fallback behaviour; the raising paths are already covered. Document both this and the simplejson->Pydantic serialization change (Decimal now encoded as a string) as breaking. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- Fix copy-pasted Message.decode_json/encode_in_json docstrings that said "bytestring" while they handle JSON objects. - Make convert_days_in_partman_syntax a staticmethod; it never used self. - Reject prefetch < 1 in the consumer with a ValueError (matching the timeout validation) instead of claiming a coercion that never happened. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ener The shared LISTEN/NOTIFY listener keyed a single wake event per queue, so a second consumer on the same queue overwrote the first's event and a closing consumer's unregister() silently stopped notifications for a still-active sibling. Track a set of events per queue, wake them all, and only drop a queue's channel routing once its last consumer leaves. Also back `available` with a threading.Event instead of a bare bool so the dispatch thread's writes and the consumer threads' reads are synchronized rather than relying on CPython GIL atomicity; expose it as a read-only bool property so callers and tests are unaffected. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Reintroduce the opt-in fallback_encoder argument so decoding can fall back to another encoder instead of raising. Defaults to None, keeping strict decoding as the default behaviour. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
23188c6 to
0dc3aa8
Compare
|
Hello @tavallaie, thanks for your presence here. I have a question about partitionned queues. I've looked how you config pg_partman when you create a partitionned queue and premake is at 4 and infinite_time_partition is at false. Then if a queue is empty there is no partition which are created. So what happen if the partition was not created because the queue was empty and now we want to add a new message for a partition which not exists ? |
3881665 to
6772534
Compare
6772534 to
aed735c
Compare
@mducros-wm, great question that is out of my knowledge right now (it happen before I join the team). |
Is the question about message count based partition or time based partition? Pgmq's partitioned queues are a thin wrapper around pg_partmans API, so I can look at the project docs to investigate. The intention though is to create more partitions even when queue is empty for time based partitions. For message count, the intention was to use message sequence to determine partition creation. But if there are no messages hitting queue then no new partitions would be created. |
No description provided.