Add commit retry and concurrency validation for writes#3320
Open
lawofcycles wants to merge 9 commits intoapache:mainfrom
Open
Add commit retry and concurrency validation for writes#3320lawofcycles wants to merge 9 commits intoapache:mainfrom
lawofcycles wants to merge 9 commits intoapache:mainfrom
Conversation
Add automatic retry with exponential backoff when catalog commits fail due to concurrent transactions (CommitFailedException), and integrate the existing validation functions from validate.py into the write path to detect incompatible concurrent modifications (ValidationException). The retry loop is placed in Transaction.commit_transaction(). On each retry attempt, table metadata is refreshed, registered snapshot producers are re-executed to regenerate manifests, and data conflict validation is run. Uncommitted manifests from failed attempts are cleaned up after a successful commit. Validation is performed for _OverwriteFiles and _DeleteFiles based on the table's isolation level (serializable/snapshot). _FastAppendFiles and _MergeAppendFiles do not require validation since appends never conflict. Signed-off-by: Sotaro Hikita <[email protected]>
Skip _validate_no_new_delete_files and _validate_deleted_data_files when conflict_detection_filter is None, matching Java's BaseOverwriteFiles.validate() behavior for rowFilter == AlwaysFalse(). Route isolation level property based on the calling operation. Transaction.delete() uses write.delete.isolation-level (default). Transaction.overwrite(), dynamic_partition_overwrite(), and upsert() use write.update.isolation-level via _isolation_level_property on the snapshot producer. Remove unused WRITE_MERGE_ISOLATION_LEVEL constant.` Signed-off-by: Sotaro Hikita <[email protected]>
Use Operation enum instead of string literals for producer construction. Use .value for IsolationLevel string comparison to avoid unreachable statement warning. Signed-off-by: Sotaro Hikita <[email protected]>
Signed-off-by: Sotaro Hikita <[email protected]>
Fix _build_delete_files_partition_predicate overwriting _case_sensitive to True by passing the current value to delete_by_predicate. This caused case-insensitive deletes to fail when _OverwriteFiles was used with a user-specified predicate. Move import random/time to file top level. Add total timeout (commit.retry.total-timeout-ms) to the retry loop. Add comments for intentional validation duplication and cached_property clearing. Stabilize test_commit_retry_on_commit_failed by removing flaky patch.object assertion. Signed-off-by: Sotaro Hikita <[email protected]>
Signed-off-by: Sotaro Hikita <[email protected]>
Signed-off-by: Sotaro Hikita <[email protected]>
Signed-off-by: Sotaro Hikita <[email protected]>
In CI, pyiceberg.table module is loaded twice, creating two distinct Transaction class objects. patch.object on the test-imported Transaction does not affect the runtime Transaction used by Table.append(). Fix by resolving Transaction from pyiceberg.table module at runtime. Signed-off-by: Sotaro Hikita <[email protected]>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #3319
Closes #819
Closes #269
Rationale for this change
PyIceberg currently fails immediately with
CommitFailedExceptionwhen a concurrent transaction commits first, regardless of whether the writes actually conflict. Java Iceberg handles this transparently through its retry loop inSnapshotProducer.commit().This PR adds automatic commit retry with exponential backoff and data conflict validation to PyIceberg, matching Java Iceberg's behavior. On
CommitFailedException, the retry loop refreshes table metadata, re-runs validation, and regenerates manifests. If validation detects a real data conflict, the operation aborts withValidationExceptioninstead of retrying.The retry loop is placed in
Transaction.commit_transaction()rather than in individual snapshot producers. This is necessary becauseTransaction.delete()uses two producers (_DeleteFiles+_OverwriteFiles) that must be committed atomically. Retrying at the producer level would break this atomicity.Validation behavior follows Java's
BaseOverwriteFiles.validate(), using the existing validation functions fromvalidate.pythat were contributed through #1935, #1938, #2050, and #3049.Are these changes tested?
Yes. 54 unit tests and 8 integration tests covering retry success,
ValidationExceptionabort, retry exhaustion, isolation levels, partition-level conflict detection (both user-specified and auto-computed filters), manifest cleanup, and producer state reset.Are there any user-facing changes?
Yes. Previously, all concurrent write conflicts resulted in
CommitFailedException. Now:ValidationExceptioninstead ofCommitFailedExceptionThe following new table properties are supported.
commit.retry.num-retries(default: 4)commit.retry.min-wait-ms(default: 100)commit.retry.max-wait-ms(default: 60000)write.delete.isolation-level(default: serializable)write.update.isolation-level(default: serializable)