diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index f0ed3b9aaa7a..3b9198b7050d 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7400,10 +7400,6 @@ Throw an error if there are pending mutations when exporting a merge tree part. )", 0) \ DECLARE(Bool, export_merge_tree_part_throw_on_pending_patch_parts, true, R"( Throw an error if there are pending patch parts when exporting a merge tree part. -)", 0) \ - DECLARE(Bool, export_merge_tree_partition_lock_inside_the_task, false, R"( -Only lock a part when the task is already running. This might help with busy waiting where the scheduler locks a part, but the task ends in the pending list. -On the other hand, there is a chance once the task executes that part has already been locked by another replica and the task will simply early exit. )", 0) \ DECLARE(Bool, export_merge_tree_partition_system_table_prefer_remote_information, false, R"( Controls whether the system.replicated_partition_exports will prefer to query ZooKeeper to get the most up to date information or use the local information. @@ -7835,6 +7831,7 @@ Allow experimental database engine DataLakeCatalog with catalog_type = 'paimon_r MAKE_OBSOLETE(M, Bool, allow_experimental_object_type, false) \ MAKE_OBSOLETE(M, BoolAuto, insert_select_deduplicate, Field{"auto"}) \ MAKE_OBSOLETE(M, Bool, allow_retries_in_cluster_requests, false) \ + MAKE_OBSOLETE(M, Bool, export_merge_tree_partition_lock_inside_the_task, false) \ /** The section above is for obsolete settings. Do not add anything there. */ #endif /// __CLION_IDE__ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index a1b03a639ab8..f84854437a38 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -50,6 +50,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"iceberg_metadata_staleness_ms", 0, 0, "New setting allowing using cached metadata version at READ operations to prevent fetching from remote catalog"}, {"export_merge_tree_partition_task_timeout_seconds", 0, 3600, "New setting to control the timeout for export partition tasks."}, {"export_merge_tree_partition_manifest_ttl", 180, 86400, "Reasonable default for real usage"}, + {"export_merge_tree_partition_lock_inside_the_task", false, false, "Obsolete. No-op."}, }); addSettingsChanges(settings_changes_history, "26.1", { diff --git a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h index f1c96b120a28..dd5ef9886ded 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h @@ -119,7 +119,6 @@ struct ExportReplicatedMergeTreePartitionManifest size_t max_rows_per_file; MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy; String filename_pattern; - bool lock_inside_the_task; /// todo temporary bool write_full_path_in_iceberg_metadata = false; String iceberg_metadata_json; @@ -154,7 +153,6 @@ struct ExportReplicatedMergeTreePartitionManifest json.set("max_retries", max_retries); json.set("ttl_seconds", ttl_seconds); json.set("task_timeout_seconds", task_timeout_seconds); - json.set("lock_inside_the_task", lock_inside_the_task); json.set("write_full_path_in_iceberg_metadata", write_full_path_in_iceberg_metadata); std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM oss.exceptions(std::ios::failbit); @@ -208,8 +206,6 @@ struct ExportReplicatedMergeTreePartitionManifest /// what to do if it's not a valid value? } - manifest.lock_inside_the_task = json->getValue("lock_inside_the_task"); - manifest.write_full_path_in_iceberg_metadata = json->getValue("write_full_path_in_iceberg_metadata"); return manifest; diff --git a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp deleted file mode 100644 index d0dba7d6834a..000000000000 --- a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp +++ /dev/null @@ -1,71 +0,0 @@ -#include -#include - -namespace ProfileEvents -{ - extern const Event ExportPartitionZooKeeperRequests; - extern const Event ExportPartitionZooKeeperGetChildren; - extern const Event ExportPartitionZooKeeperCreate; -} -namespace DB -{ - -ExportPartFromPartitionExportTask::ExportPartFromPartitionExportTask( - StorageReplicatedMergeTree & storage_, - const std::string & key_, - const MergeTreePartExportManifest & manifest_) - : storage(storage_), - key(key_), - manifest(manifest_) -{ - export_part_task = std::make_shared(storage, manifest); -} - -bool ExportPartFromPartitionExportTask::executeStep() -{ - const auto zk = storage.getZooKeeper(); - const auto part_name = manifest.data_part->name; - - LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Attempting to lock part: {}", part_name); - - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate); - if (Coordination::Error::ZOK == zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / part_name, storage.replica_name, zkutil::CreateMode::Ephemeral)) - { - LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Locked part: {}", part_name); - export_part_task->executeStep(); - return false; - } - - std::lock_guard inner_lock(storage.export_manifests_mutex); - storage.export_manifests.erase(manifest); - - LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Failed to lock part {}, skipping", part_name); - return false; -} - -void ExportPartFromPartitionExportTask::cancel() noexcept -{ - export_part_task->cancel(); -} - -void ExportPartFromPartitionExportTask::onCompleted() -{ - export_part_task->onCompleted(); -} - -StorageID ExportPartFromPartitionExportTask::getStorageID() const -{ - return export_part_task->getStorageID(); -} - -Priority ExportPartFromPartitionExportTask::getPriority() const -{ - return export_part_task->getPriority(); -} - -String ExportPartFromPartitionExportTask::getQueryId() const -{ - return export_part_task->getQueryId(); -} -} diff --git a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h deleted file mode 100644 index e170b22b470d..000000000000 --- a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h +++ /dev/null @@ -1,36 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -namespace DB -{ - -/* - Decorator around the ExportPartTask to lock the part inside the task -*/ -class ExportPartFromPartitionExportTask : public IExecutableTask -{ -public: - explicit ExportPartFromPartitionExportTask( - StorageReplicatedMergeTree & storage_, - const std::string & key_, - const MergeTreePartExportManifest & manifest_); - bool executeStep() override; - void onCompleted() override; - StorageID getStorageID() const override; - Priority getPriority() const override; - String getQueryId() const override; - - void cancel() noexcept override; - -private: - StorageReplicatedMergeTree & storage; - std::string key; - MergeTreePartExportManifest manifest; - std::shared_ptr export_part_task; -}; - -} diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index 96cc648ffbb5..35aa610e2fea 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -7,7 +7,6 @@ #include #include "Storages/MergeTree/ExportPartitionUtils.h" #include "Storages/MergeTree/MergeTreePartExportManifest.h" -#include "Storages/MergeTree/ExportPartFromPartitionExportTask.h" #include "Formats/FormatFactory.h" #include @@ -176,90 +175,45 @@ void ExportPartitionTaskScheduler::run() auto context = ExportPartitionUtils::getContextCopyWithTaskSettings(storage.getContext(), manifest); - /// todo arthur this code path does not perform all the validations a simple part export does because we are not calling exportPartToTable directly. - /// the schema and everything else has been validated when the export partition task was created, but nothing prevents the destination table from being - /// recreated with a new schema before the export task is scheduled. - if (manifest.lock_inside_the_task) + try { - LOG_INFO(storage.log, "ExportPartition scheduler task: Locking part export inside the task"); - std::lock_guard part_export_lock(storage.export_manifests_mutex); + LOG_INFO(storage.log, "ExportPartition scheduler task: Exporting part to table"); - MergeTreePartExportManifest part_export_manifest( - destination_storage, - part, + LOG_INFO(storage.log, "ExportPartition scheduler task: Attempting to lock part: {}", zk_part_name); + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate); + if (Coordination::Error::ZOK != zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name, storage.replica_name, zkutil::CreateMode::Ephemeral)) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to lock part {}, skipping", zk_part_name); + continue; + } + + LOG_INFO(storage.log, "ExportPartition scheduler task: Locked part: {}", zk_part_name); + + storage.exportPartToTable( + part->name, + destination_storage_id, manifest.transaction_id, - manifest.query_id, - context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value, - context->getSettingsCopy(), - storage.getInMemoryMetadataPtr(), + context, manifest.iceberg_metadata_json, + /*allow_outdated_parts*/ true, [this, key, zk_part_name, manifest, destination_storage] (MergeTreePartExportManifest::CompletionCallbackResult result) { handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result); }); - part_export_manifest.task = std::make_shared(storage, key, part_export_manifest); - - /// todo arthur this might conflict with the standalone export part. what to do in this case? - if (!storage.export_manifests.emplace(part_export_manifest).second) - { - LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is already being exported, skipping", zk_part_name); - continue; - } - - if (!storage.background_moves_assignee.scheduleMoveTask(part_export_manifest.task)) - { - storage.export_manifests.erase(part_export_manifest); - LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to schedule export part task, skipping"); - return; - } - scheduled_exports_count++; } - else + catch (const Exception &) { - try - { - LOG_INFO(storage.log, "ExportPartition scheduler task: Exporting part to table"); - - LOG_INFO(storage.log, "ExportPartition scheduler task: Attempting to lock part: {}", zk_part_name); - - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate); - if (Coordination::Error::ZOK != zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name, storage.replica_name, zkutil::CreateMode::Ephemeral)) - { - LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to lock part {}, skipping", zk_part_name); - continue; - } - - LOG_INFO(storage.log, "ExportPartition scheduler task: Locked part: {}", zk_part_name); - - storage.exportPartToTable( - part->name, - destination_storage_id, - manifest.transaction_id, - context, - manifest.iceberg_metadata_json, - /*allow_outdated_parts*/ true, - [this, key, zk_part_name, manifest, destination_storage] - (MergeTreePartExportManifest::CompletionCallbackResult result) - { - handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result); - }); - - scheduled_exports_count++; - } - catch (const Exception &) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemove); - zk->tryRemove(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name); - /// we should not increment retry_count because the node might just be full - } + tryLogCurrentException(__PRETTY_FUNCTION__); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemove); + zk->tryRemove(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name); + /// we should not increment retry_count because the node might just be full } - } } } diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 568312cb3fab..9dbc7c243560 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -1406,7 +1406,6 @@ class MergeTreeData : public IStorage, public WithMutableContext friend class IMergedBlockOutputStream; // for access to log friend struct DataPartsLock; // for access to shared_parts_list/shared_ranges_in_parts friend class ExportPartTask; - friend class ExportPartFromPartitionExportTask; bool require_part_metadata; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 38ffa27c97b1..1331fdcec6ba 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -222,7 +222,6 @@ namespace Setting extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file; extern const SettingsBool export_merge_tree_part_throw_on_pending_mutations; extern const SettingsBool export_merge_tree_part_throw_on_pending_patch_parts; - extern const SettingsBool export_merge_tree_partition_lock_inside_the_task; extern const SettingsString export_merge_tree_part_filename_pattern; extern const SettingsBool write_full_path_in_iceberg_metadata; extern const SettingsBool allow_experimental_insert_into_iceberg; @@ -8264,7 +8263,6 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & manifest.parquet_parallel_encoding = query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding]; manifest.max_bytes_per_file = query_context->getSettingsRef()[Setting::export_merge_tree_part_max_bytes_per_file]; manifest.max_rows_per_file = query_context->getSettingsRef()[Setting::export_merge_tree_part_max_rows_per_file]; - manifest.lock_inside_the_task = query_context->getSettingsRef()[Setting::export_merge_tree_partition_lock_inside_the_task]; manifest.file_already_exists_policy = query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value; manifest.filename_pattern = query_context->getSettingsRef()[Setting::export_merge_tree_part_filename_pattern].value; diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 04a9e72e5513..9a4bad70fb14 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -406,7 +406,6 @@ class StorageReplicatedMergeTree final : public MergeTreeData friend class ReplicatedMergeMutateTaskBase; friend class ExportPartitionManifestUpdatingTask; friend class ExportPartitionTaskScheduler; - friend class ExportPartFromPartitionExportTask; using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker; using LogEntry = ReplicatedMergeTreeLogEntry;