diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index bd3494f75ab0..2e04912d9c70 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -215,14 +215,12 @@ object VeloxConfig extends ConfigRegistry { .bytesConf(ByteUnit.BYTE) .createWithDefaultString("32MB") - val COLUMNAR_VELOX_ASYNC_TIMEOUT = + val COLUMNAR_VELOX_ASYNC_TIMEOUT_ON_TASK_STOPPING = buildStaticConf("spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping") - .doc( - "Timeout for asynchronous execution when task is being stopped in Velox backend. " + - "It's recommended to set to a number larger than network connection timeout that the " + - "possible async tasks are relying on.") + .doc("Timeout in milliseconds when waiting for runtime-scoped async work to finish during" + + " teardown.") .timeConf(TimeUnit.MILLISECONDS) - .createWithDefault(30000) + .createWithDefault(30000L) val COLUMNAR_VELOX_SPLIT_PRELOAD_PER_DRIVER = buildConf("spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver") diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index e36bc6a1c609..ce46869e7161 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -68,9 +68,6 @@ DECLARE_bool(velox_ssd_odirect); DECLARE_bool(velox_memory_pool_capacity_transfer_across_tasks); DECLARE_int32(cache_prefetch_min_pct); -DECLARE_int32(gluten_velox_async_timeout_on_task_stopping); -DEFINE_int32(gluten_velox_async_timeout_on_task_stopping, 30000, "Async timout when task is being stopped"); - using namespace facebook; namespace gluten { @@ -146,14 +143,10 @@ void VeloxBackend::init( // Set velox_memory_use_hugepages. FLAGS_velox_memory_use_hugepages = backendConf_->get(kMemoryUseHugePages, kMemoryUseHugePagesDefault); - // Async timeout. - FLAGS_gluten_velox_async_timeout_on_task_stopping = - backendConf_->get(kVeloxAsyncTimeoutOnTaskStopping, kVeloxAsyncTimeoutOnTaskStoppingDefault); - // Set cache_prefetch_min_pct default as 0 to force all loads are prefetched in DirectBufferInput. FLAGS_cache_prefetch_min_pct = backendConf_->get(kCachePrefetchMinPct, 0); - auto hiveConf = createHiveConnectorConfig(backendConf_); + hiveConnectorConfig_ = createHiveConnectorConfig(backendConf_); // Setup and register. velox::filesystems::registerLocalFileSystem(); @@ -169,7 +162,7 @@ void VeloxBackend::init( #endif #ifdef ENABLE_ABFS velox::filesystems::registerAbfsFileSystem(); - velox::filesystems::registerAzureClientProvider(*hiveConf); + velox::filesystems::registerAzureClientProvider(*hiveConnectorConfig_); #endif #ifdef GLUTEN_ENABLE_GPU @@ -190,8 +183,13 @@ void VeloxBackend::init( } #endif + const auto spillThreadNum = backendConf_->get(kSpillThreadNum, kSpillThreadNumDefaultValue); + if (spillThreadNum > 0) { + spillExecutor_ = std::make_unique(spillThreadNum); + } + initJolFilesystem(); - initConnector(hiveConf); + initConnector(hiveConnectorConfig_); velox::dwio::common::registerFileSinks(); velox::parquet::registerParquetReaderFactory(); @@ -313,6 +311,7 @@ void VeloxBackend::initCache() { } void VeloxBackend::initConnector(const std::shared_ptr& hiveConf) { + (void)hiveConf; auto ioThreads = backendConf_->get(kVeloxIOThreads, kVeloxIOThreadsDefault); GLUTEN_CHECK( ioThreads >= 0, @@ -321,24 +320,28 @@ void VeloxBackend::initConnector(const std::shared_ptr( ioThreads, std::make_unique>()); } - velox::connector::registerConnector( - std::make_shared(kHiveConnectorId, hiveConf, ioExecutor_.get())); +} - // Register value-stream connector for runtime iterator-based inputs - auto valueStreamDynamicFilterEnabled = - backendConf_->get(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault); - velox::connector::registerConnector( - std::make_shared(kIteratorConnectorId, hiveConf, valueStreamDynamicFilterEnabled)); +std::shared_ptr VeloxBackend::createHiveConnector( + const std::string& connectorId, + folly::Executor* ioExecutor) const { + return std::make_shared(connectorId, hiveConnectorConfig_, ioExecutor); +} + +std::shared_ptr VeloxBackend::createValueStreamConnector( + const std::string& connectorId, + bool dynamicFilterEnabled) const { + return std::make_shared(connectorId, hiveConnectorConfig_, dynamicFilterEnabled); +} #ifdef GLUTEN_ENABLE_GPU - if (backendConf_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && - backendConf_->get(kCudfEnabled, kCudfEnabledDefault)) { - facebook::velox::cudf_velox::connector::hive::CudfHiveConnectorFactory factory; - auto hiveConnector = factory.newConnector(kCudfHiveConnectorId, hiveConf, ioExecutor_.get()); - facebook::velox::connector::registerConnector(hiveConnector); - } -#endif +std::shared_ptr VeloxBackend::createCudfHiveConnector( + const std::string& connectorId, + folly::Executor* ioExecutor) const { + facebook::velox::cudf_velox::connector::hive::CudfHiveConnectorFactory factory; + return factory.newConnector(connectorId, hiveConnectorConfig_, ioExecutor); } +#endif void VeloxBackend::initUdf() { auto got = backendConf_->get(kVeloxUdfLibraryPaths, ""); @@ -378,6 +381,8 @@ void VeloxBackend::tearDown() { // Destruct IOThreadPoolExecutor will join all threads. // On threads exit, thread local variables can be constructed with referencing global variables. // So, we need to destruct IOThreadPoolExecutor and stop the threads before global variables get destructed. + executor_.reset(); + spillExecutor_.reset(); ioExecutor_.reset(); globalMemoryManager_.reset(); diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h index c6fbf965cf08..9894efa7747c 100644 --- a/cpp/velox/compute/VeloxBackend.h +++ b/cpp/velox/compute/VeloxBackend.h @@ -27,6 +27,7 @@ #include "velox/common/caching/AsyncDataCache.h" #include "velox/common/config/Config.h" #include "velox/common/memory/MmapAllocator.h" +#include "velox/connectors/Connector.h" #include "jni/JniHashTable.h" #include "memory/VeloxMemoryManager.h" @@ -58,9 +59,31 @@ class VeloxBackend { } folly::Executor* executor() const { + return executor_.get(); + } + + folly::Executor* spillExecutor() const { + return spillExecutor_.get(); + } + + folly::Executor* ioExecutor() const { return ioExecutor_.get(); } + std::shared_ptr createHiveConnector( + const std::string& connectorId, + folly::Executor* ioExecutor) const; + + std::shared_ptr createValueStreamConnector( + const std::string& connectorId, + bool dynamicFilterEnabled) const; + +#ifdef GLUTEN_ENABLE_GPU + std::shared_ptr createCudfHiveConnector( + const std::string& connectorId, + folly::Executor* ioExecutor) const; +#endif + void tearDown(); private: @@ -90,8 +113,11 @@ class VeloxBackend { std::shared_ptr asyncDataCache_; std::unique_ptr ssdCacheExecutor_; + std::unique_ptr executor_; + std::unique_ptr spillExecutor_; std::unique_ptr ioExecutor_; std::shared_ptr cacheAllocator_; + std::shared_ptr hiveConnectorConfig_; std::string cachePathPrefix_; std::string cacheFilePrefix_; diff --git a/cpp/velox/compute/VeloxConnectorIds.h b/cpp/velox/compute/VeloxConnectorIds.h new file mode 100644 index 000000000000..e6082bae8bdf --- /dev/null +++ b/cpp/velox/compute/VeloxConnectorIds.h @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace gluten { + +struct VeloxConnectorIds { + std::string hive; + std::string iterator; + std::string cudfHive; + bool hiveRegistered{false}; + bool iteratorRegistered{false}; + bool cudfHiveRegistered{false}; +}; + +} // namespace gluten diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index 627bd396b7df..f3ffab59a6a8 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -30,12 +30,20 @@ VeloxPlanConverter::VeloxPlanConverter( velox::memory::MemoryPool* veloxPool, const facebook::velox::config::ConfigBase* veloxCfg, const std::vector>& rowVectors, + VeloxConnectorIds connectorIds, const std::optional writeFilesTempPath, const std::optional writeFileName, bool validationMode) : validationMode_(validationMode), veloxCfg_(veloxCfg), - substraitVeloxPlanConverter_(veloxPool, veloxCfg, rowVectors, writeFilesTempPath, writeFileName, validationMode) { + substraitVeloxPlanConverter_( + veloxPool, + veloxCfg, + rowVectors, + std::move(connectorIds), + writeFilesTempPath, + writeFileName, + validationMode) { VELOX_USER_CHECK_NOT_NULL(veloxCfg_); } diff --git a/cpp/velox/compute/VeloxPlanConverter.h b/cpp/velox/compute/VeloxPlanConverter.h index 0b597a91f9ed..1aee2c36bd12 100644 --- a/cpp/velox/compute/VeloxPlanConverter.h +++ b/cpp/velox/compute/VeloxPlanConverter.h @@ -21,6 +21,7 @@ #include #include +#include "compute/VeloxConnectorIds.h" #include "substrait/SubstraitToVeloxPlan.h" #include "substrait/plan.pb.h" @@ -33,6 +34,7 @@ class VeloxPlanConverter { facebook::velox::memory::MemoryPool* veloxPool, const facebook::velox::config::ConfigBase* veloxCfg, const std::vector>& rowVectors, + VeloxConnectorIds connectorIds, const std::optional writeFilesTempPath = std::nullopt, const std::optional writeFileName = std::nullopt, bool validationMode = false); diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 7c1276f49abf..f69d55fd0b89 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -20,19 +20,26 @@ #include #include +#include #include +#include +#include + +#include #include "VeloxBackend.h" #include "compute/ResultIterator.h" #include "compute/Runtime.h" #include "compute/VeloxPlanConverter.h" #include "config/VeloxConfig.h" +#include "operators/plannodes/IteratorSplit.h" #include "operators/serializer/VeloxRowToColumnarConverter.h" #include "shuffle/VeloxShuffleReader.h" #include "shuffle/VeloxShuffleWriter.h" #include "utils/ConfigExtractor.h" #include "utils/VeloxArrowUtils.h" #include "utils/VeloxWholeStageDumper.h" +#include "velox/common/process/StackTrace.h" DECLARE_bool(velox_exception_user_stacktrace_enabled); DECLARE_bool(velox_memory_use_hugepages); @@ -62,6 +69,133 @@ using namespace facebook; namespace gluten { +namespace { + +class HookedExecutor final : public folly::Executor { + public: + HookedExecutor(folly::Executor* parent, std::string name, bool debug) + : parent_(parent), name_(std::move(name)), debug_(debug) {} + + void add(folly::Func func) override { + GLUTEN_CHECK(parent_ != nullptr, "Parent executor is null."); + inFlight_.fetch_add(1, std::memory_order_relaxed); + parent_->add(wrap(std::move(func), 0)); + } + + void addWithPriority(folly::Func func, int8_t priority) override { + GLUTEN_CHECK(parent_ != nullptr, "Parent executor is null."); + inFlight_.fetch_add(1, std::memory_order_relaxed); + parent_->addWithPriority(wrap(std::move(func), priority), priority); + } + + uint8_t getNumPriorities() const override { + return parent_ == nullptr ? 1 : parent_->getNumPriorities(); + } + + bool join(std::chrono::milliseconds timeout) { + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, timeout, [&] { return inFlight_.load(std::memory_order_acquire) == 0; }); + } + + void dumpOutstandingTasks() const { + if (!debug_) { + return; + } + std::lock_guard lock(taskMutex_); + if (inFlightTasks_.empty()) { + LOG(WARNING) << "Hooked executor " << name_ << " timed out with no tracked in-flight tasks."; + return; + } + for (const auto& [taskId, info] : inFlightTasks_) { + const auto elapsedMs = + std::chrono::duration_cast(std::chrono::steady_clock::now() - info.enqueueTime) + .count(); + LOG(WARNING) << "Outstanding task in hooked executor " << name_ << ": taskId=" << taskId + << ", elapsedMs=" << elapsedMs << ", priority=" << static_cast(info.priority) + << ", submitStacktrace:\n" + << info.submitStacktrace; + } + } + + private: + struct TaskInfo { + std::chrono::steady_clock::time_point enqueueTime; + int8_t priority; + std::string submitStacktrace; + }; + + folly::Func wrap(folly::Func func, int8_t priority) { + auto* self = this; + const auto taskId = nextTaskId_.fetch_add(1, std::memory_order_relaxed); + if (debug_) { + TaskInfo info{ + .enqueueTime = std::chrono::steady_clock::now(), + .priority = priority, + .submitStacktrace = velox::process::StackTrace().toString()}; + std::lock_guard lock(taskMutex_); + inFlightTasks_[taskId] = std::move(info); + } + return [func = std::move(func), self, taskId]() mutable { + auto done = folly::makeGuard([&] { + if (self->debug_) { + std::lock_guard lock(self->taskMutex_); + self->inFlightTasks_.erase(taskId); + } + if (self->inFlight_.fetch_sub(1, std::memory_order_acq_rel) == 1) { + std::lock_guard lock(self->mutex_); + self->cv_.notify_all(); + } + }); + func(); + }; + } + + folly::Executor* parent_; + std::string name_; + bool debug_; + std::atomic nextTaskId_{0}; + std::atomic inFlight_{0}; + std::mutex mutex_; + std::condition_variable cv_; + mutable std::mutex taskMutex_; + std::unordered_map inFlightTasks_; +}; + +std::unique_ptr makeHookedExecutor(folly::Executor* parent, const std::string& name, bool debug) { + if (parent == nullptr) { + return nullptr; + } + return std::make_unique(parent, name, debug); +} + +void joinHookedExecutor(std::unique_ptr& executor, std::chrono::milliseconds timeout, bool debug) { + if (executor == nullptr) { + return; + } + auto* hookedExecutor = dynamic_cast(executor.get()); + GLUTEN_CHECK(hookedExecutor != nullptr, "Expected HookedExecutor"); + if (!hookedExecutor->join(timeout)) { + LOG(WARNING) << "Timed out waiting for hooked executor to finish after " << timeout.count() << " ms."; + if (debug) { + hookedExecutor->dumpOutstandingTasks(); + } + } + executor.reset(); +} + +std::string makeScopedConnectorId(const std::string& base, uint64_t runtimeId) { + return fmt::format("{}-runtime-{}", base, runtimeId); +} + +VeloxConnectorIds makeScopedConnectorIds(uint64_t runtimeId) { + return VeloxConnectorIds{ + .hive = makeScopedConnectorId(kHiveConnectorId, runtimeId), + .iterator = makeScopedConnectorId(kIteratorConnectorId, runtimeId), + .cudfHive = makeScopedConnectorId(kCudfHiveConnectorId, runtimeId)}; +} + +} // namespace + VeloxRuntime::VeloxRuntime( const std::string& kind, VeloxMemoryManager* vmm, @@ -80,6 +214,78 @@ VeloxRuntime::VeloxRuntime( FLAGS_velox_memory_use_hugepages = veloxCfg_->get(kMemoryUseHugePages, FLAGS_velox_memory_use_hugepages); FLAGS_velox_memory_pool_capacity_transfer_across_tasks = veloxCfg_->get( kMemoryPoolCapacityTransferAcrossTasks, FLAGS_velox_memory_pool_capacity_transfer_across_tasks); + + static std::atomic runtimeId{0}; + connectorIds_ = makeScopedConnectorIds(runtimeId++); + + initializeExecutors(); + registerConnectors(); +} + +VeloxRuntime::~VeloxRuntime() { + const auto timeoutMs = + veloxCfg_->get(kVeloxAsyncTimeoutOnTaskStopping, kVeloxAsyncTimeoutOnTaskStoppingDefault); + const auto timeout = std::chrono::milliseconds(timeoutMs); + joinHookedExecutor(executor_, timeout, debugModeEnabled_); + joinHookedExecutor(spillExecutor_, timeout, debugModeEnabled_); + joinHookedExecutor(ioExecutor_, timeout, debugModeEnabled_); + unregisterConnectors(); +} + +void VeloxRuntime::initializeExecutors() { + executor_ = makeHookedExecutor(VeloxBackend::get()->executor(), kind_ + ".executor", debugModeEnabled_); + spillExecutor_ = makeHookedExecutor(VeloxBackend::get()->spillExecutor(), kind_ + ".spill", debugModeEnabled_); + ioExecutor_ = makeHookedExecutor(VeloxBackend::get()->ioExecutor(), kind_ + ".io", debugModeEnabled_); +} + +void VeloxRuntime::registerConnectors() { + auto* backend = VeloxBackend::get(); + connectorIds_.hiveRegistered = + velox::connector::registerConnector(backend->createHiveConnector(connectorIds_.hive, ioExecutor_.get())); + GLUTEN_CHECK(connectorIds_.hiveRegistered, "Failed to register scoped hive connector: " + connectorIds_.hive); + GLUTEN_CHECK( + velox::connector::hasConnector(connectorIds_.hive), + "Scoped hive connector not found after registration: " + connectorIds_.hive); + + const auto valueStreamDynamicFilterEnabled = + veloxCfg_->get(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault); + connectorIds_.iteratorRegistered = velox::connector::registerConnector( + backend->createValueStreamConnector(connectorIds_.iterator, valueStreamDynamicFilterEnabled)); + GLUTEN_CHECK( + connectorIds_.iteratorRegistered, "Failed to register scoped iterator connector: " + connectorIds_.iterator); + GLUTEN_CHECK( + velox::connector::hasConnector(connectorIds_.iterator), + "Scoped iterator connector not found after registration: " + connectorIds_.iterator); + +#ifdef GLUTEN_ENABLE_GPU + if (veloxCfg_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && + veloxCfg_->get(kCudfEnabled, kCudfEnabledDefault)) { + connectorIds_.cudfHiveRegistered = velox::connector::registerConnector( + backend->createCudfHiveConnector(connectorIds_.cudfHive, ioExecutor_.get())); + GLUTEN_CHECK( + connectorIds_.cudfHiveRegistered, "Failed to register scoped cudf hive connector: " + connectorIds_.cudfHive); + GLUTEN_CHECK( + velox::connector::hasConnector(connectorIds_.cudfHive), + "Scoped cudf hive connector not found after registration: " + connectorIds_.cudfHive); + } +#endif +} + +void VeloxRuntime::unregisterConnectors() { +#ifdef GLUTEN_ENABLE_GPU + if (connectorIds_.cudfHiveRegistered) { + velox::connector::unregisterConnector(connectorIds_.cudfHive); + connectorIds_.cudfHiveRegistered = false; + } +#endif + if (connectorIds_.iteratorRegistered) { + velox::connector::unregisterConnector(connectorIds_.iterator); + connectorIds_.iteratorRegistered = false; + } + if (connectorIds_.hiveRegistered) { + velox::connector::unregisterConnector(connectorIds_.hive); + connectorIds_.hiveRegistered = false; + } } void VeloxRuntime::parsePlan(const uint8_t* data, int32_t size) { @@ -153,7 +359,8 @@ void VeloxRuntime::getInfoAndIds( std::string VeloxRuntime::planString(bool details, const std::unordered_map& sessionConf) { auto veloxMemoryPool = gluten::defaultLeafVeloxMemoryPool(); - VeloxPlanConverter veloxPlanConverter(veloxMemoryPool.get(), veloxCfg_.get(), {}, std::nullopt, std::nullopt, true); + VeloxPlanConverter veloxPlanConverter( + veloxMemoryPool.get(), veloxCfg_.get(), {}, connectorIds_, std::nullopt, std::nullopt, true); auto veloxPlan = veloxPlanConverter.toVeloxPlan(substraitPlan_, localFiles_); return veloxPlan->toString(details, true); } @@ -173,6 +380,7 @@ std::shared_ptr VeloxRuntime::createResultIterator( memoryManager()->getLeafMemoryPool().get(), veloxCfg_.get(), inputs, + connectorIds_, *localWriteFilesTempPath(), *localWriteFileName()); veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_, std::move(localFiles_)); @@ -194,6 +402,9 @@ std::shared_ptr VeloxRuntime::createResultIterator( scanIds, scanInfos, streamIds, + executor_.get(), + spillExecutor_.get(), + connectorIds_, spillDir, veloxCfg_, taskInfo_.has_value() ? taskInfo_.value() : SparkTaskInfo{}); diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index 2cb75c5a124c..37f4da33439b 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -19,9 +19,11 @@ #include "WholeStageResultIterator.h" #include "compute/Runtime.h" +#include "compute/VeloxConnectorIds.h" #ifdef GLUTEN_ENABLE_ENHANCED_FEATURES #include "iceberg/IcebergWriter.h" #endif +#include #include "memory/VeloxMemoryManager.h" #include "operators/serializer/VeloxColumnarBatchSerializer.h" #include "operators/serializer/VeloxColumnarToRowConverter.h" @@ -42,6 +44,8 @@ class VeloxRuntime final : public Runtime { VeloxMemoryManager* vmm, const std::unordered_map& confMap); + ~VeloxRuntime() override; + void setSparkTaskInfo(SparkTaskInfo taskInfo) override { static std::atomic vtId{0}; taskInfo.vId = vtId++; @@ -120,6 +124,22 @@ class VeloxRuntime final : public Runtime { return debugModeEnabled_; } + folly::Executor* executor() const { + return executor_.get(); + } + + folly::Executor* spillExecutor() const { + return spillExecutor_.get(); + } + + folly::Executor* ioExecutor() const { + return ioExecutor_.get(); + } + + const VeloxConnectorIds& connectorIds() const { + return connectorIds_; + } + static void getInfoAndIds( const std::unordered_map>& splitInfoMap, const std::unordered_set& leafPlanNodeIds, @@ -128,9 +148,17 @@ class VeloxRuntime final : public Runtime { std::vector& streamIds); private: + void initializeExecutors(); + void registerConnectors(); + void unregisterConnectors(); + std::shared_ptr veloxPlan_; std::shared_ptr veloxCfg_; bool debugModeEnabled_{false}; + std::unique_ptr executor_; + std::unique_ptr spillExecutor_; + std::unique_ptr ioExecutor_; + VeloxConnectorIds connectorIds_; std::unordered_map> emptySchemaBatchLoopUp_; }; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 3f85dddb83f4..efe4f995322b 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -75,6 +75,9 @@ WholeStageResultIterator::WholeStageResultIterator( const std::vector& scanNodeIds, const std::vector>& scanInfos, const std::vector& streamIds, + folly::Executor* executor, + folly::Executor* spillExecutor, + VeloxConnectorIds connectorIds, const std::string spillDir, const std::shared_ptr& veloxCfg, const SparkTaskInfo& taskInfo) @@ -84,15 +87,14 @@ WholeStageResultIterator::WholeStageResultIterator( enableCudf_(veloxCfg_->get(kCudfEnabled, kCudfEnabledDefault)), #endif taskInfo_(taskInfo), + executor_(executor), veloxPlan_(planNode), + spillExecutor_(spillExecutor), + connectorIds_(std::move(connectorIds)), scanNodeIds_(scanNodeIds), scanInfos_(scanInfos), streamIds_(streamIds) { spillStrategy_ = veloxCfg_->get(kSpillStrategy, kSpillStrategyDefaultValue); - auto spillThreadNum = veloxCfg_->get(kSpillThreadNum, kSpillThreadNumDefaultValue); - if (spillThreadNum > 0) { - spillExecutor_ = std::make_shared(spillThreadNum); - } getOrderedNodeIds(veloxPlan_, orderedNodeIds_); auto fileSystem = velox::filesystems::getFileSystem(spillDir, nullptr); @@ -159,7 +161,7 @@ WholeStageResultIterator::WholeStageResultIterator( std::unordered_map customSplitInfo{{"table_format", "hive-iceberg"}}; auto deleteFiles = icebergSplitInfo->deleteFilesVec[idx]; split = std::make_shared( - kHiveConnectorId, + connectorIds_.hive, paths[idx], format, starts[idx], @@ -173,11 +175,11 @@ WholeStageResultIterator::WholeStageResultIterator( metadataColumn, properties[idx]); } else { - auto connectorId = kHiveConnectorId; + auto connectorId = connectorIds_.hive; #ifdef GLUTEN_ENABLE_GPU if (canUseCudfConnector && enableCudf_ && veloxCfg_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault)) { - connectorId = kCudfHiveConnectorId; + connectorId = connectorIds_.cudfHive; } #endif split = std::make_shared( @@ -212,14 +214,21 @@ WholeStageResultIterator::WholeStageResultIterator( std::shared_ptr WholeStageResultIterator::createNewVeloxQueryCtx() { std::unordered_map> connectorConfigs; - connectorConfigs[kHiveConnectorId] = createHiveConnectorSessionConfig(veloxCfg_); + auto hiveSessionConfig = createHiveConnectorSessionConfig(veloxCfg_); + connectorConfigs[connectorIds_.hive] = hiveSessionConfig; + connectorConfigs[connectorIds_.iterator] = hiveSessionConfig; +#ifdef GLUTEN_ENABLE_GPU + if (!connectorIds_.cudfHive.empty()) { + connectorConfigs[connectorIds_.cudfHive] = hiveSessionConfig; + } +#endif std::shared_ptr ctx = velox::core::QueryCtx::create( - nullptr, + executor_, facebook::velox::core::QueryConfig{getQueryContextConf()}, connectorConfigs, gluten::VeloxBackend::get()->getAsyncDataCache(), memoryManager_->getAggregateMemoryPool(), - spillExecutor_.get(), + spillExecutor_, fmt::format( "Gluten_Stage_{}_TID_{}_VTID_{}", std::to_string(taskInfo_.stageId), @@ -365,7 +374,7 @@ void WholeStageResultIterator::addIteratorSplits(const std::vector(kIteratorConnectorId, inputIterators[i]); + auto connectorSplit = std::make_shared(connectorIds_.iterator, inputIterators[i]); exec::Split split(folly::copy(connectorSplit), -1); task_->addSplit(streamIds_[i], std::move(split)); } diff --git a/cpp/velox/compute/WholeStageResultIterator.h b/cpp/velox/compute/WholeStageResultIterator.h index 9bb6ef8b11f7..4fcc002ffd22 100644 --- a/cpp/velox/compute/WholeStageResultIterator.h +++ b/cpp/velox/compute/WholeStageResultIterator.h @@ -16,7 +16,9 @@ */ #pragma once +#include #include "compute/Runtime.h" +#include "compute/VeloxConnectorIds.h" #include "iceberg/IcebergPlanConverter.h" #include "memory/SplitAwareColumnarBatchIterator.h" #include "memory/VeloxColumnarBatch.h" @@ -41,14 +43,22 @@ class WholeStageResultIterator : public SplitAwareColumnarBatchIterator { const std::vector& scanNodeIds, const std::vector>& scanInfos, const std::vector& streamIds, + folly::Executor* executor, + folly::Executor* spillExecutor, + VeloxConnectorIds connectorIds, const std::string spillDir, const std::shared_ptr& veloxCfg, const SparkTaskInfo& taskInfo); virtual ~WholeStageResultIterator() { - if (task_ != nullptr && task_->isRunning()) { - // calling .wait() may take no effect in single thread execution mode - task_->requestCancel().wait(); + if (task_ != nullptr) { + if (task_->isRunning()) { + // calling .wait() may take no effect in single thread execution mode + task_->requestCancel().wait(); + } + auto deletionFuture = task_->taskDeletionFuture(); + task_.reset(); + deletionFuture.wait(); } #ifdef GLUTEN_ENABLE_GPU if (enableCudf_) { @@ -126,12 +136,14 @@ class WholeStageResultIterator : public SplitAwareColumnarBatchIterator { const bool enableCudf_; #endif const SparkTaskInfo taskInfo_; + folly::Executor* executor_; std::shared_ptr task_; std::shared_ptr veloxPlan_; /// Spill. std::string spillStrategy_; - std::shared_ptr spillExecutor_ = nullptr; + folly::Executor* spillExecutor_ = nullptr; + VeloxConnectorIds connectorIds_; /// Metrics std::unique_ptr metrics_{}; diff --git a/cpp/velox/cudf/CudfPlanValidator.cc b/cpp/velox/cudf/CudfPlanValidator.cc index 346620096697..f93b5d5ded51 100644 --- a/cpp/velox/cudf/CudfPlanValidator.cc +++ b/cpp/velox/cudf/CudfPlanValidator.cc @@ -47,7 +47,7 @@ bool CudfPlanValidator::validate(const ::substrait::Plan& substraitPlan) { std::shared_ptr veloxCfg = std::make_shared( std::unordered_map{{kCudfEnabled, "true"}}); VeloxPlanConverter veloxPlanConverter( - veloxMemoryPool.get(), veloxCfg.get(), inputs, std::nullopt, std::nullopt, true); + veloxMemoryPool.get(), veloxCfg.get(), inputs, {}, std::nullopt, std::nullopt, true); auto planNode = veloxPlanConverter.toVeloxPlan(substraitPlan, localFiles); std::unordered_set emptySet; velox::core::PlanFragment planFragment{planNode, velox::core::ExecutionStrategy::kUngrouped, 1, emptySet}; diff --git a/cpp/velox/memory/GlutenDirectBufferedInput.h b/cpp/velox/memory/GlutenDirectBufferedInput.h index d5b588770ec9..9c197d4f7bbf 100644 --- a/cpp/velox/memory/GlutenDirectBufferedInput.h +++ b/cpp/velox/memory/GlutenDirectBufferedInput.h @@ -17,6 +17,8 @@ #pragma once +#include + #include "velox/dwio/common/DirectBufferedInput.h" namespace gluten { @@ -47,10 +49,33 @@ class GlutenDirectBufferedInput : public facebook::velox::dwio::common::DirectBu std::move(fileReadOps)) {} ~GlutenDirectBufferedInput() override { + const bool logTableScanPool = + pool() != nullptr && + pool()->name().find("TableScan#tspool") != std::string::npos; + if (logTableScanPool) { + const auto self = pool()->shared_from_this(); + LOG(WARNING) << "GlutenDirectBufferedInput::~GlutenDirectBufferedInput begin" + << " pool=" << pool()->name() + << " poolUseCount=" << (self.use_count() - 1) + << " requests=" << requests_.size() + << " coalescedLoads=" << coalescedLoads_.size(); + for (size_t i = 0; i < coalescedLoads_.size(); ++i) { + const auto& load = coalescedLoads_[i]; + LOG(WARNING) << " load[" << i << "]" + << " state=" << static_cast(load->state()) + << " requests=" << load->size(); + } + } requests_.clear(); // Cancel all the planned loads as soon as possible to avoid unnecessary IO. for (auto& load : coalescedLoads_) { if (load->state() == facebook::velox::cache::CoalescedLoad::State::kPlanned) { + if (logTableScanPool) { + LOG(WARNING) << "GlutenDirectBufferedInput dtor cancel planned load" + << " pool=" << pool()->name() + << " loadState=" << static_cast(load->state()) + << " requests=" << load->size(); + } load->cancel(); } } @@ -58,14 +83,33 @@ class GlutenDirectBufferedInput : public facebook::velox::dwio::common::DirectBu // when the VeloxMemoryManager used by the whole task is trying to destruct. for (auto& load : coalescedLoads_) { if (load->state() == facebook::velox::cache::CoalescedLoad::State::kLoading) { + if (logTableScanPool) { + LOG(WARNING) << "GlutenDirectBufferedInput dtor wait loading load" + << " pool=" << pool()->name() + << " loadState=" << static_cast(load->state()) + << " requests=" << load->size(); + } folly::SemiFuture waitFuture(false); if (!load->loadOrFuture(&waitFuture)) { auto& exec = folly::QueuedImmediateExecutor::instance(); std::move(waitFuture).via(&exec).wait(); } + if (logTableScanPool) { + LOG(WARNING) << "GlutenDirectBufferedInput dtor finished load wait" + << " pool=" << pool()->name() + << " loadState=" << static_cast(load->state()) + << " requests=" << load->size(); + } } } coalescedLoads_.clear(); + if (logTableScanPool) { + const auto self = pool()->shared_from_this(); + LOG(WARNING) << "GlutenDirectBufferedInput::~GlutenDirectBufferedInput end" + << " pool=" << pool()->name() + << " poolUseCount=" << (self.use_count() - 1) + << " coalescedLoads=" << coalescedLoads_.size(); + } } std::unique_ptr clone() const override { diff --git a/cpp/velox/memory/VeloxMemoryManager.cc b/cpp/velox/memory/VeloxMemoryManager.cc index d829516e0dde..f628b8cdaede 100644 --- a/cpp/velox/memory/VeloxMemoryManager.cc +++ b/cpp/velox/memory/VeloxMemoryManager.cc @@ -31,8 +31,6 @@ #include "memory/ArrowMemoryPool.h" #include "utils/Exception.h" -DECLARE_int32(gluten_velox_async_timeout_on_task_stopping); - namespace gluten { using namespace facebook; @@ -443,26 +441,9 @@ bool VeloxMemoryManager::tryDestructSafe() { } VeloxMemoryManager::~VeloxMemoryManager() { - static const uint32_t kWaitTimeoutMs = FLAGS_gluten_velox_async_timeout_on_task_stopping; // 30s by default - uint32_t accumulatedWaitMs = 0UL; - bool destructed = false; - for (int32_t tryCount = 0; accumulatedWaitMs < kWaitTimeoutMs; tryCount++) { - destructed = tryDestructSafe(); - if (destructed) { - if (tryCount > 0) { - LOG(INFO) << "All the outstanding memory resources successfully released. "; - } - break; - } - uint32_t waitMs = 50 * static_cast(pow(1.5, tryCount)); // 50ms, 75ms, 112.5ms ... - LOG(INFO) << "There are still outstanding Velox memory allocations. Waiting for " << waitMs - << " ms to let possible async tasks done... "; - usleep(waitMs * 1000); - accumulatedWaitMs += waitMs; - } + bool destructed = tryDestructSafe(); if (!destructed) { - LOG(ERROR) << "Failed to release Velox memory manager after " << accumulatedWaitMs - << "ms as there are still outstanding memory resources. "; + LOG(ERROR) << "Failed to release Velox memory manager as there are still outstanding memory resources. "; } #ifdef ENABLE_JEMALLOC_STATS malloc_stats_print(nullptr, nullptr, nullptr); diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index ec17594cc094..5477176ce85f 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -874,7 +874,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: const auto& compressionKind = writerOptions->compressionKind.value_or(common::CompressionKind::CompressionKind_SNAPPY); std::shared_ptr tableHandle = std::make_shared( - kHiveConnectorId, + connectorIds_.hive, makeHiveInsertTableHandle( tableColumnNames, /*inputType->names() clolumn name is different*/ inputType->children(), @@ -1408,7 +1408,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode( // Create TableHandle bool dynamicFilterEnabled = veloxCfg_->get(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault); - auto tableHandle = std::make_shared(kIteratorConnectorId, dynamicFilterEnabled); + auto tableHandle = std::make_shared(connectorIds_.iterator, dynamicFilterEnabled); // Create column assignments connector::ColumnHandleMap assignments; @@ -1573,11 +1573,11 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: connector::ConnectorTableHandlePtr tableHandle; auto remainingFilter = readRel.has_filter() ? exprConverter_->toVeloxExpr(readRel.filter(), baseSchema) : nullptr; - auto connectorId = kHiveConnectorId; + auto connectorId = connectorIds_.hive; if (useCudfTableHandle(splitInfos_) && veloxCfg_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && veloxCfg_->get(kCudfEnabled, kCudfEnabledDefault)) { #ifdef GLUTEN_ENABLE_GPU - connectorId = kCudfHiveConnectorId; + connectorId = connectorIds_.cudfHive; #endif } common::SubfieldFilters subfieldFilters; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index 47bf3a0525b1..373601916d4d 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -19,6 +19,7 @@ #include "SubstraitToVeloxExpr.h" #include "TypeUtils.h" +#include "compute/VeloxConnectorIds.h" #include "velox/connectors/hive/FileProperties.h" #include "velox/connectors/hive/TableHandle.h" #include "velox/core/PlanNode.h" @@ -80,12 +81,14 @@ class SubstraitToVeloxPlanConverter { memory::MemoryPool* pool, const facebook::velox::config::ConfigBase* veloxCfg, const std::vector>& inputIters, + VeloxConnectorIds connectorIds, const std::optional writeFilesTempPath = std::nullopt, const std::optional writeFileName = std::nullopt, bool validationMode = false) : pool_(pool), veloxCfg_(veloxCfg), inputIters_(inputIters), + connectorIds_(std::move(connectorIds)), writeFilesTempPath_(writeFilesTempPath), writeFileName_(writeFileName), validationMode_(validationMode) { @@ -308,6 +311,8 @@ class SubstraitToVeloxPlanConverter { /// Input row-vectors for query trace mode (ValuesNode / cuDF ValueStream support) std::vector> inputIters_; + VeloxConnectorIds connectorIds_; + /// The temporary path used to write files. std::optional writeFilesTempPath_; std::optional writeFileName_; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h index 6bfca5ec36b8..55d2d2c96843 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h @@ -19,6 +19,8 @@ #include #include "SubstraitToVeloxPlan.h" +#include "config/VeloxConfig.h" +#include "operators/plannodes/IteratorSplit.h" #include "velox/core/QueryCtx.h" using namespace facebook; @@ -34,7 +36,13 @@ class SubstraitToVeloxPlanValidator { {velox::core::QueryConfig::kSparkPartitionId, "0"}, {velox::core::QueryConfig::kSessionTimezone, "UTC"}}; veloxCfg_ = std::make_shared(std::move(configs)); planConverter_ = std::make_unique( - pool, veloxCfg_.get(), std::vector>{}, std::nullopt, std::nullopt, true); + pool, + veloxCfg_.get(), + std::vector>{}, + VeloxConnectorIds{.hive = kHiveConnectorId, .iterator = kIteratorConnectorId, .cudfHive = kCudfHiveConnectorId}, + std::nullopt, + std::nullopt, + true); queryCtx_ = velox::core::QueryCtx::create(nullptr, velox::core::QueryConfig(veloxCfg_->rawConfigs())); // An execution context used for function validation. execCtx_ = std::make_unique(pool, queryCtx_.get()); diff --git a/cpp/velox/tests/FunctionTest.cc b/cpp/velox/tests/FunctionTest.cc index 2eb13402cb46..74be36ee0f54 100644 --- a/cpp/velox/tests/FunctionTest.cc +++ b/cpp/velox/tests/FunctionTest.cc @@ -47,7 +47,8 @@ class FunctionTest : public ::testing::Test, public test::VectorTestBase { std::make_shared( pool(), veloxCfg_.get(), - std::vector>()); + std::vector>{}, + VeloxConnectorIds{}); }; TEST_F(FunctionTest, makeNames) { diff --git a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc index 8222f74caae6..a1d1cea02b77 100644 --- a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc +++ b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc @@ -71,8 +71,11 @@ class Substrait2VeloxPlanConversionTest : public exec::test::HiveConnectorTestBa std::shared_ptr tmpDir_{exec::test::TempDirectoryPath::create()}; std::shared_ptr veloxCfg_ = std::make_shared(std::unordered_map()); - std::shared_ptr planConverter_ = - std::make_shared(pool(), veloxCfg_.get(), std::vector>()); + std::shared_ptr planConverter_ = std::make_shared( + pool(), + veloxCfg_.get(), + std::vector>{}, + VeloxConnectorIds{.hive = facebook::velox::exec::test::kHiveConnectorId}); }; // This test will firstly generate mock TPC-H lineitem ORC file. Then, Velox's diff --git a/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc b/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc index f903481ee972..38d0ebcef7bf 100644 --- a/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc +++ b/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc @@ -43,7 +43,13 @@ TEST_F(Substrait2VeloxValuesNodeConversionTest, valuesNode) { JsonToProtoConverter::readFromFile(planPath, substraitPlan); auto veloxCfg = std::make_shared(std::unordered_map()); std::shared_ptr planConverter_ = std::make_shared( - pool_.get(), veloxCfg.get(), std::vector>(), std::nullopt, std::nullopt, true); + pool_.get(), + veloxCfg.get(), + std::vector>{}, + VeloxConnectorIds{}, + std::nullopt, + std::nullopt, + true); auto veloxPlan = planConverter_->toVeloxPlan(substraitPlan); RowVectorPtr expectedData = makeRowVector( diff --git a/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc b/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc index f8b4b04ca243..61add0d7bd32 100644 --- a/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc +++ b/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc @@ -75,7 +75,8 @@ class VeloxSubstraitRoundTripTest : public OperatorTestBase { std::make_shared( pool_.get(), veloxCfg.get(), - std::vector>(), + std::vector>{}, + VeloxConnectorIds{}, std::nullopt, std::nullopt, true); @@ -102,7 +103,8 @@ class VeloxSubstraitRoundTripTest : public OperatorTestBase { std::make_shared( pool_.get(), veloxCfg.get(), - std::vector>(), + std::vector>{}, + VeloxConnectorIds{}, std::nullopt, std::nullopt, true); diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index 767875bb167e..ec929a3f0d96 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -15,7 +15,7 @@ nav_order: 16 | spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver | 2 | The split preload per task | | spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct | 90 | If partial aggregation aggregationPct greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | | spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows | 100000 | If partial aggregation input rows number greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | -| spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping | 30000ms | Timeout for asynchronous execution when task is being stopped in Velox backend. It's recommended to set to a number larger than network connection timeout that the possible async tasks are relying on. | +| spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping | 30000ms | Timeout in milliseconds when waiting for runtime-scoped async work to finish during teardown. | | spark.gluten.sql.columnar.backend.velox.cacheEnabled | false | Enable Velox cache, default off. It's recommended to enablesoft-affinity as well when enable velox cache. | | spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | 0 | Set prefetch cache min pct for velox file scan | | spark.gluten.sql.columnar.backend.velox.checkUsageLeak | true | Enable check memory usage leak. | diff --git a/ep/build-velox/src/get-velox.sh b/ep/build-velox/src/get-velox.sh index ca14bc988059..8bc27a115224 100755 --- a/ep/build-velox/src/get-velox.sh +++ b/ep/build-velox/src/get-velox.sh @@ -140,6 +140,21 @@ function apply_provided_velox_patch { fi } +function apply_local_velox_patches { + local patch_file="${CURRENT_DIR}/velox-memory-usecount.patch" + if [[ ! -f "$patch_file" ]]; then + echo "Local Velox patch not found: $patch_file" + exit 1 + fi + + pushd "$VELOX_HOME" + (git apply --check "$patch_file" && git apply "$patch_file") || { + echo "Failed to apply local Velox patch: $patch_file" + exit 1 + } + popd +} + function apply_compilation_fixes { sudo cp ${CURRENT_DIR}/modify_arrow.patch ${VELOX_HOME}/CMake/resolve_dependency_modules/arrow/ sudo cp ${CURRENT_DIR}/modify_arrow_dataset_scan_option.patch ${VELOX_HOME}/CMake/resolve_dependency_modules/arrow/ @@ -227,6 +242,7 @@ if [[ "$RUN_SETUP_SCRIPT" == "ON" ]]; then fi apply_provided_velox_patch +apply_local_velox_patches apply_compilation_fixes diff --git a/ep/build-velox/src/velox-memory-usecount.patch b/ep/build-velox/src/velox-memory-usecount.patch new file mode 100644 index 000000000000..aa41583bc082 --- /dev/null +++ b/ep/build-velox/src/velox-memory-usecount.patch @@ -0,0 +1,548 @@ +diff --git a/velox/common/caching/AsyncDataCache.cpp b/velox/common/caching/AsyncDataCache.cpp +index 0a3331b88..8701f4431 100644 +--- a/velox/common/caching/AsyncDataCache.cpp ++++ b/velox/common/caching/AsyncDataCache.cpp +@@ -38,6 +38,29 @@ namespace facebook::velox::cache { + using memory::MachinePageCount; + using memory::MemoryAllocator; + ++namespace { ++ ++void logCoalescedLoadIfTableScan( ++ const char* where, ++ const CoalescedLoad* load, ++ std::string_view extra = "") { ++ const auto* pool = load->memoryPool(); ++ if (pool == nullptr || ++ pool->name().find("TableScan#tspool") == std::string::npos) { ++ return; ++ } ++ const auto self = pool->shared_from_this(); ++ LOG(WARNING) << where << " load=" << static_cast(load) ++ << " pool=" << pool->name() ++ << " poolUseCount=" << (self.use_count() - 1) ++ << " state=" << static_cast(load->state()) ++ << " size=" << load->size() ++ << (extra.empty() ? "" : " ") ++ << extra; ++} ++ ++} // namespace ++ + AsyncDataCacheEntry::AsyncDataCacheEntry(CacheShard* shard) : shard_(shard) { + accessStats_.reset(); + } +@@ -314,33 +337,49 @@ CachePin CacheShard::initEntry( + } + + CoalescedLoad::~CoalescedLoad() { ++ logCoalescedLoadIfTableScan("CoalescedLoad::~CoalescedLoad begin", this); + // Continue possibly waiting threads. + setEndState(State::kCancelled); ++ logCoalescedLoadIfTableScan("CoalescedLoad::~CoalescedLoad end", this); + } + + bool CoalescedLoad::loadOrFuture( + folly::SemiFuture* wait, + bool ssdSavable) { ++ logCoalescedLoadIfTableScan( ++ "CoalescedLoad::loadOrFuture enter", ++ this, ++ wait == nullptr ? "wait=null" : "wait=non-null"); + { + common::testutil::TestValue::adjust( + "facebook::velox::cache::CoalescedLoad::loadOrFuture", this); + std::lock_guard l(mutex_); + if (state_ == State::kCancelled || state_ == State::kLoaded) { ++ logCoalescedLoadIfTableScan( ++ "CoalescedLoad::loadOrFuture already-finished", this); + return true; + } + if (state_ == State::kLoading) { + if (wait == nullptr) { ++ logCoalescedLoadIfTableScan( ++ "CoalescedLoad::loadOrFuture already-loading", this, "wait=null"); + return false; + } + if (promise_ == nullptr) { + promise_ = std::make_unique>(); + } + *wait = promise_->getSemiFuture(); ++ logCoalescedLoadIfTableScan( ++ "CoalescedLoad::loadOrFuture already-loading", ++ this, ++ "wait=attached"); + return false; + } + + VELOX_CHECK_EQ(State::kPlanned, state_); + state_ = State::kLoading; ++ logCoalescedLoadIfTableScan( ++ "CoalescedLoad::loadOrFuture transition-to-loading", this); + } + + // Outside of 'mutex_'. +@@ -355,9 +394,12 @@ bool CoalescedLoad::loadOrFuture( + entry->setExclusiveToShared(ssdSavable); + } + setEndState(State::kLoaded); ++ logCoalescedLoadIfTableScan("CoalescedLoad::loadOrFuture loaded", this); + } catch (std::exception&) { + try { + setEndState(State::kCancelled); ++ logCoalescedLoadIfTableScan( ++ "CoalescedLoad::loadOrFuture cancelled", this); + } catch (std::exception&) { + // May not throw from inside catch. + } +diff --git a/velox/common/caching/AsyncDataCache.h b/velox/common/caching/AsyncDataCache.h +index 8d61e2a0d..6ace12efb 100644 +--- a/velox/common/caching/AsyncDataCache.h ++++ b/velox/common/caching/AsyncDataCache.h +@@ -471,6 +471,10 @@ class CoalescedLoad { + /// storage. + virtual bool isSsdLoad() const = 0; + ++ virtual memory::MemoryPool* memoryPool() const { ++ return nullptr; ++ } ++ + protected: + // Makes entries for 'keys_' and loads their content. Elements of 'keys_' that + // are already loaded or loading are expected to be left out. The returned +diff --git a/velox/common/memory/Memory.cpp b/velox/common/memory/Memory.cpp +index 7c1f9d8dc..ee99ff4ca 100644 +--- a/velox/common/memory/Memory.cpp ++++ b/velox/common/memory/Memory.cpp +@@ -400,7 +400,6 @@ std::string MemoryManager::toString(bool detail) const { + } else { + out << "\t" << pool->name() << "\n"; + } +- out << "\trefcount " << pool.use_count() << "\n"; + } + out << allocator_->toString() << "\n"; + out << arbitrator_->toString(); +diff --git a/velox/common/memory/MemoryPool.cpp b/velox/common/memory/MemoryPool.cpp +index 66ba6804c..9e9bef2b9 100644 +--- a/velox/common/memory/MemoryPool.cpp ++++ b/velox/common/memory/MemoryPool.cpp +@@ -70,6 +70,7 @@ struct MemoryUsage { + uint64_t currentUsage; + uint64_t reservedUsage; + uint64_t peakUsage; ++ int64_t useCount; + + bool operator>(const MemoryUsage& other) const { + return std::tie(reservedUsage, currentUsage, peakUsage, name) > +@@ -82,11 +83,12 @@ struct MemoryUsage { + + std::string toString() const { + return fmt::format( +- "{} usage {} reserved {} peak {}", ++ "{} usage {} reserved {} peak {} useCount {}", + name, + succinctBytes(currentUsage), + succinctBytes(reservedUsage), +- succinctBytes(peakUsage)); ++ succinctBytes(peakUsage), ++ useCount); + } + }; + +@@ -124,11 +126,14 @@ void treeMemoryUsageVisitor( + if (stats.empty() && skipEmptyPool) { + return; + } ++ const auto self = pool->shared_from_this(); ++ const auto owners = self.use_count() - 1; + const MemoryUsage usage{ + .name = pool->name(), + .currentUsage = stats.usedBytes, + .reservedUsage = stats.reservedBytes, + .peakUsage = stats.peakBytes, ++ .useCount = owners, + }; + out << std::string(indent, ' ') << usage.toString() << "\n"; + +@@ -1042,11 +1047,14 @@ std::string MemoryPoolImpl::treeMemoryUsage(bool skipEmptyPool) const { + { + std::lock_guard l(mutex_); + const Stats stats = statsLocked(); ++ const auto self = shared_from_this(); ++ const auto owners = self.use_count() - 1; + const MemoryUsage usage{ + .name = name(), + .currentUsage = stats.usedBytes, + .reservedUsage = stats.reservedBytes, +- .peakUsage = stats.peakBytes}; ++ .peakUsage = stats.peakBytes, ++ .useCount = owners}; + out << usage.toString() << "\n"; + } + +diff --git a/velox/dwio/common/DirectBufferedInput.cpp b/velox/dwio/common/DirectBufferedInput.cpp +index 45d148c32..6bd5f7e9a 100644 +--- a/velox/dwio/common/DirectBufferedInput.cpp ++++ b/velox/dwio/common/DirectBufferedInput.cpp +@@ -31,6 +31,29 @@ using cache::CoalescedLoad; + using cache::ScanTracker; + using cache::TrackingId; + ++namespace { ++ ++bool shouldLogTableScanPool(const memory::MemoryPool* pool) { ++ return pool != nullptr && ++ pool->name().find("TableScan#tspool") != std::string::npos; ++} ++ ++void logTableScanPoolState( ++ const char* where, ++ memory::MemoryPool* pool, ++ std::string_view extra = "") { ++ if (!shouldLogTableScanPool(pool)) { ++ return; ++ } ++ const auto self = pool->shared_from_this(); ++ LOG(WARNING) << where << " pool=" << pool->name() ++ << " poolUseCount=" << (self.use_count() - 1) ++ << (extra.empty() ? "" : " ") ++ << extra; ++} ++ ++} // namespace ++ + std::unique_ptr DirectBufferedInput::enqueue( + Region region, + const StreamIdentifier* sid = nullptr) { +@@ -208,6 +231,19 @@ void DirectBufferedInput::readRegion( + }); + } + ++DirectCoalescedLoad::~DirectCoalescedLoad() { ++ if (!shouldLogTableScanPool(pool_)) { ++ return; ++ } ++ const auto self = pool_->shared_from_this(); ++ LOG(WARNING) << "DirectCoalescedLoad::~DirectCoalescedLoad" ++ << " load=" << static_cast(this) ++ << " pool=" << pool_->name() ++ << " poolUseCount=" << (self.use_count() - 1) ++ << " state=" << static_cast(state()) ++ << " requests=" << size(); ++} ++ + void DirectBufferedInput::readRegions( + const std::vector& requests, + bool prefetch, +@@ -227,10 +263,34 @@ void DirectBufferedInput::readRegions( + if (load->state() == CoalescedLoad::State::kPlanned) { + AsyncLoadHolder loadHolder{ + .load = load, .pool = pool_->shared_from_this()}; ++ if (shouldLogTableScanPool(pool_)) { ++ LOG(WARNING) << "DirectBufferedInput::readRegions schedule async load" ++ << " pool=" << pool_->name() ++ << " poolUseCount=" << (loadHolder.pool.use_count() - 1) ++ << " loadIndex=" << i ++ << " loadState=" << static_cast(load->state()) ++ << " requests=" << load->size(); ++ } + executor_->add([asyncLoad = std::move(loadHolder)]() { + process::TraceContext trace("Read Ahead"); + VELOX_CHECK_NOT_NULL(asyncLoad.load); ++ if (shouldLogTableScanPool(asyncLoad.pool.get())) { ++ LOG(WARNING) ++ << "DirectBufferedInput async load begin" ++ << " pool=" << asyncLoad.pool->name() ++ << " poolUseCount=" << (asyncLoad.pool.use_count() - 1) ++ << " loadState=" << static_cast(asyncLoad.load->state()) ++ << " requests=" << asyncLoad.load->size(); ++ } + asyncLoad.load->loadOrFuture(nullptr); ++ if (shouldLogTableScanPool(asyncLoad.pool.get())) { ++ LOG(WARNING) ++ << "DirectBufferedInput async load end" ++ << " pool=" << asyncLoad.pool->name() ++ << " poolUseCount=" << (asyncLoad.pool.use_count() - 1) ++ << " loadState=" << static_cast(asyncLoad.load->state()) ++ << " requests=" << asyncLoad.load->size(); ++ } + }); + } + } +@@ -252,13 +312,27 @@ std::shared_ptr DirectBufferedInput::coalescedLoad( + } + + void DirectBufferedInput::reset() { ++ if (shouldLogTableScanPool(pool_)) { ++ LOG(WARNING) << "DirectBufferedInput::reset begin" ++ << " pool=" << pool_->name() ++ << " coalescedLoads=" << coalescedLoads_.size() ++ << " streamMap=" << streamToCoalescedLoad_.rlock()->size() ++ << " requests=" << requests_.size(); ++ } + BufferedInput::reset(); + for (auto& load : coalescedLoads_) { ++ if (shouldLogTableScanPool(pool_)) { ++ LOG(WARNING) << "DirectBufferedInput::reset cancel load" ++ << " pool=" << pool_->name() ++ << " loadState=" << static_cast(load->state()) ++ << " requests=" << load->size(); ++ } + load->cancel(); + } + coalescedLoads_.clear(); + streamToCoalescedLoad_.wlock()->clear(); + requests_.clear(); ++ logTableScanPoolState("DirectBufferedInput::reset end", pool_); + } + + namespace { +diff --git a/velox/dwio/common/DirectBufferedInput.h b/velox/dwio/common/DirectBufferedInput.h +index c23e104e9..8fed5a147 100644 +--- a/velox/dwio/common/DirectBufferedInput.h ++++ b/velox/dwio/common/DirectBufferedInput.h +@@ -92,6 +92,10 @@ class DirectCoalescedLoad : public cache::CoalescedLoad { + return false; + } + ++ memory::MemoryPool* memoryPool() const override { ++ return pool_; ++ } ++ + /// Returns the buffer for 'region' in either 'data' or 'tinyData'. 'region' + /// must match a region given to DirectBufferedInput::enqueue(). + int32_t +@@ -109,6 +113,8 @@ class DirectCoalescedLoad : public cache::CoalescedLoad { + return size; + } + ++ ~DirectCoalescedLoad() override; ++ + private: + const std::shared_ptr ioStatistics_; + const std::shared_ptr ioStats_; +diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp +index 972fa31fb..eb03a4e1d 100644 +--- a/velox/exec/Operator.cpp ++++ b/velox/exec/Operator.cpp +@@ -39,6 +39,21 @@ OperatorCtx::OperatorCtx( + operatorType_(operatorType), + pool_(driverCtx_->addOperatorPool(planNodeId, operatorType_)) {} + ++OperatorCtx::~OperatorCtx() { ++ if (operatorType_ != OperatorType::kTableScan) { ++ return; ++ } ++ const auto self = pool_->shared_from_this(); ++ LOG(WARNING) << "OperatorCtx::~OperatorCtx" ++ << " taskId=" << taskId() ++ << " planNodeId=" << planNodeId_ ++ << " operatorId=" << operatorId_ ++ << " operatorType=" << operatorType_ ++ << " pool=" << pool_->name() ++ << " poolUseCount=" << (self.use_count() - 1) ++ << " execCtx=" << (execCtx_ == nullptr ? "null" : "non-null"); ++} ++ + core::ExecCtx* OperatorCtx::execCtx() const { + if (!execCtx_) { + execCtx_ = std::make_unique( +diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h +index 7349e7b7e..153520633 100644 +--- a/velox/exec/Operator.h ++++ b/velox/exec/Operator.h +@@ -46,6 +46,7 @@ class OperatorCtx { + const core::PlanNodeId& planNodeId, + int32_t operatorId, + std::string_view operatorType = ""); ++ ~OperatorCtx(); + + const std::shared_ptr& task() const { + return driverCtx_->task; +diff --git a/velox/exec/TableScan.cpp b/velox/exec/TableScan.cpp +index 41912fac6..6e308d00c 100644 +--- a/velox/exec/TableScan.cpp ++++ b/velox/exec/TableScan.cpp +@@ -533,7 +533,31 @@ int32_t TableScan::calculateBatchSize(int64_t currentEstimatedRowSize) { + return batchSize; + } + ++TableScan::~TableScan() { ++ LOG(WARNING) << "TableScan::~TableScan" ++ << " operatorPool=" << pool()->name() ++ << " connectorPool=" ++ << (connectorPool_ == nullptr ? std::string("") ++ : connectorPool_->name()) ++ << " dataSource=" << (dataSource_ == nullptr ? "null" : "non-null") ++ << " connectorQueryCtx=" ++ << (connectorQueryCtx_ == nullptr ? "null" : "non-null") ++ << " blockingFuture=" ++ << (blockingFuture_.valid() ? "valid" : "empty"); ++} ++ + void TableScan::close() { ++ LOG(WARNING) << "TableScan::close" ++ << " operatorPool=" << pool()->name() ++ << " connectorPool=" << (connectorPool_ == nullptr ? std::string("") : connectorPool_->name()) ++ << " dataSource=" << (dataSource_ == nullptr ? "null" : "non-null") ++ << " connectorQueryCtx=" << (connectorQueryCtx_ == nullptr ? "null" : "non-null") ++ << " blockingFuture=" << (blockingFuture_.valid() ? "valid" : "empty") ++ << " needNewSplit=" << needNewSplit_ ++ << " noMoreSplits=" << noMoreSplits_ ++ << " maxPreloadedSplits=" << maxPreloadedSplits_ ++ << " numPreloadedSplits=" << numPreloadedSplits_ ++ << " numReadyPreloadedSplits=" << numReadyPreloadedSplits_; + Operator::close(); + + if (dataSource_ != nullptr) { +diff --git a/velox/exec/TableScan.h b/velox/exec/TableScan.h +index e35732114..267a495b3 100644 +--- a/velox/exec/TableScan.h ++++ b/velox/exec/TableScan.h +@@ -29,6 +29,7 @@ class TableScan : public SourceOperator { + int32_t operatorId, + DriverCtx* driverCtx, + const std::shared_ptr& tableScanNode); ++ ~TableScan() override; + + void initialize() override; + +diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp +index fce8c7eb2..981521f00 100644 +--- a/velox/exec/Task.cpp ++++ b/velox/exec/Task.cpp +@@ -23,6 +23,7 @@ + #include "velox/common/base/Counters.h" + #include "velox/common/base/StatsReporter.h" + #include "velox/common/file/FileSystems.h" ++#include "velox/common/process/StackTrace.h" + #include "velox/common/testutil/TestValue.h" + #include "velox/common/time/Timer.h" + #include "velox/exec/Exchange.h" +@@ -187,6 +188,16 @@ std::string makeUuid() { + return boost::lexical_cast(boost::uuids::random_generator()()); + } + ++uint64_t nextTableScanPoolDebugId() { ++ static std::atomic id{0}; ++ return id++; ++} ++ ++uint64_t nextTableScanConnectorPoolDebugId() { ++ static std::atomic id{0}; ++ return id++; ++} ++ + // Returns true if an operator is a hash join operator given 'operatorType'. + bool isHashJoinOperator(const std::string& operatorType) { + return (operatorType == OperatorType::kHashBuild) || +@@ -467,8 +478,39 @@ Task::~Task() { + CLEAR(exchangeClientByPlanNode_.clear()); + CLEAR(exchangeClients_.clear()); + CLEAR(exception_ = nullptr); ++ std::vector>> ++ debugChildPools; ++ if (!childPools_.empty()) { ++ LOG(WARNING) << "Task::~Task(" << taskId_ << ") childPools_ before clear: " ++ << childPools_.size(); ++ for (const auto& childPool : childPools_) { ++ if (childPool == nullptr) { ++ LOG(WARNING) << " childPool="; ++ continue; ++ } ++ debugChildPools.emplace_back(childPool->name(), childPool); ++ LOG(WARNING) << " childPool=" << childPool->name() ++ << " useCount=" << (childPool.use_count() - 1); ++ } ++ } + CLEAR(nodePools_.clear()); ++ if (!debugChildPools.empty()) { ++ LOG(WARNING) << "Task::~Task(" << taskId_ ++ << ") childPools_ after nodePools_.clear():"; ++ for (const auto& [name, childPool] : debugChildPools) { ++ LOG(WARNING) << " childPool=" << name ++ << " useCount=" << childPool.use_count(); ++ } ++ } + CLEAR(childPools_.clear()); ++ if (!debugChildPools.empty()) { ++ LOG(WARNING) << "Task::~Task(" << taskId_ ++ << ") childPools_ after childPools_.clear():"; ++ for (const auto& [name, childPool] : debugChildPools) { ++ LOG(WARNING) << " childPool=" << name ++ << " useCount=" << childPool.use_count(); ++ } ++ } + CLEAR(pool_.reset()); + CLEAR(planFragment_ = core::PlanFragment()); + CLEAR(queryCtx_.reset()); +@@ -769,9 +811,26 @@ velox::memory::MemoryPool* Task::addOperatorPool( + } else { + nodePool = getOrAddNodePool(planNodeId); + } +- childPools_.push_back(nodePool->addLeafChild( +- fmt::format( +- "op.{}.{}.{}.{}", planNodeId, pipelineId, driverId, operatorType))); ++ auto poolName = ++ fmt::format("op.{}.{}.{}.{}", planNodeId, pipelineId, driverId, operatorType); ++ if (operatorType == OperatorType::kTableScan) { ++ const auto poolDebugId = nextTableScanPoolDebugId(); ++ poolName = fmt::format("{}#tspool.{}", poolName, poolDebugId); ++ LOG(WARNING) << "Created TableScan operator pool" ++ << " poolDebugId=tspool." << poolDebugId ++ << " taskId=" << taskId_ ++ << " planNodeId=" << planNodeId ++ << " splitGroupId=" << splitGroupId ++ << " pipelineId=" << pipelineId ++ << " driverId=" << driverId ++ << " operatorType=" << operatorType ++ << " nodePool=" << nodePool->name() ++ << "\nTask plan:\n" ++ << (planFragment_.planNode ? planFragment_.planNode->toString(true, true) : "") ++ << "\nCreation stacktrace:\n" ++ << velox::process::StackTrace().toString(); ++ } ++ childPools_.push_back(nodePool->addLeafChild(poolName)); + return childPools_.back().get(); + } + +@@ -782,14 +841,31 @@ velox::memory::MemoryPool* Task::addConnectorPoolLocked( + const std::string& operatorType, + const std::string& connectorId) { + auto* nodePool = getOrAddNodePool(planNodeId); +- childPools_.push_back(nodePool->addAggregateChild( +- fmt::format( +- "op.{}.{}.{}.{}.{}", +- planNodeId, +- pipelineId, +- driverId, +- operatorType, +- connectorId))); ++ auto poolName = fmt::format( ++ "op.{}.{}.{}.{}.{}", ++ planNodeId, ++ pipelineId, ++ driverId, ++ operatorType, ++ connectorId); ++ if (operatorType == OperatorType::kTableScan) { ++ const auto connectorPoolDebugId = nextTableScanConnectorPoolDebugId(); ++ poolName = fmt::format("{}#tsconnector.{}", poolName, connectorPoolDebugId); ++ LOG(WARNING) << "Created TableScan connector pool" ++ << " connectorPoolDebugId=tsconnector." << connectorPoolDebugId ++ << " taskId=" << taskId_ ++ << " planNodeId=" << planNodeId ++ << " pipelineId=" << pipelineId ++ << " driverId=" << driverId ++ << " operatorType=" << operatorType ++ << " connectorId=" << connectorId ++ << " nodePool=" << nodePool->name() ++ << "\nTask plan:\n" ++ << (planFragment_.planNode ? planFragment_.planNode->toString(true, true) : "") ++ << "\nCreation stacktrace:\n" ++ << velox::process::StackTrace().toString(); ++ } ++ childPools_.push_back(nodePool->addAggregateChild(poolName)); + return childPools_.back().get(); + } + diff --git a/tools/gluten-it/sbin/gluten-it.sh b/tools/gluten-it/sbin/gluten-it.sh index 6acfd420b80b..26a495f50d62 100755 --- a/tools/gluten-it/sbin/gluten-it.sh +++ b/tools/gluten-it/sbin/gluten-it.sh @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -set -euf +set -euo pipefail BASEDIR=$(dirname $0) @@ -24,9 +24,10 @@ if [[ ! -d $LIB_DIR ]]; then exit 1 fi -JAR_PATH=$LIB_DIR/* +JAR_PATH=$(printf ':%s' "$LIB_DIR"/*.jar) +JAR_PATH=${JAR_PATH#:} -SPARK_JVM_OPTIONS=$($JAVA_HOME/bin/java -cp $JAR_PATH org.apache.gluten.integration.SparkJvmOptions) +SPARK_JVM_OPTIONS=$($JAVA_HOME/bin/java -cp "$JAR_PATH" org.apache.gluten.integration.SparkJvmOptions) EMBEDDED_SPARK_HOME=$BASEDIR/../spark-home @@ -43,10 +44,19 @@ echo "SPARK_SCALA_VERSION set at [$SPARK_SCALA_VERSION]." GLUTEN_IT_JVM_ARGS=${GLUTEN_IT_JVM_ARGS:-"-Xmx2G"} -$JAVA_HOME/bin/java \ - $SPARK_JVM_OPTIONS \ - $GLUTEN_IT_JVM_ARGS \ - -XX:ErrorFile=/var/log/java/hs_err_pid%p.log \ - -Dio.netty.tryReflectionSetAccessible=true \ - -cp $JAR_PATH \ - org.apache.gluten.integration.Cli "$@" +run_gluten_it() { + $JAVA_HOME/bin/java \ + $SPARK_JVM_OPTIONS \ + $GLUTEN_IT_JVM_ARGS \ + -XX:ErrorFile=/var/log/java/hs_err_pid%p.log \ + -Dio.netty.tryReflectionSetAccessible=true \ + -cp "$JAR_PATH" \ + org.apache.gluten.integration.Cli "$@" +} + +if [[ "${GITHUB_ACTIONS:-}" == "true" ]]; then + run_gluten_it "$@" 2>&1 | tail -n 10000 + exit ${PIPESTATUS[0]} +fi + +run_gluten_it "$@"