From 11bf9b942784ea7198bf6aa889cf8ec1ef77c3b7 Mon Sep 17 00:00:00 2001 From: Sanal P Date: Tue, 1 Apr 2025 04:42:46 -0700 Subject: [PATCH] Fix solo repl dev log flush and graceful shutdown. Add flush mode to logdev as nublocks uses timer, nuobject uses explicit log flush mode. Flush mode has to be stored in superblk to support recovery. Enable solo repl dev UT. Add graceful shutdown for UT to work. --- conanfile.py | 2 +- .../homestore/logstore/log_store_internal.hpp | 7 +++ src/include/homestore/logstore_service.hpp | 6 +-- src/lib/logstore/log_dev.cpp | 15 +++--- src/lib/logstore/log_dev.hpp | 10 ++-- src/lib/logstore/log_store_service.cpp | 15 +++--- .../log_store/home_raft_log_store.cpp | 8 ++-- .../replication/repl_dev/solo_repl_dev.cpp | 26 +++++++++-- src/lib/replication/repl_dev/solo_repl_dev.h | 2 +- .../replication/service/generic_repl_svc.cpp | 23 +++++++++- src/tests/CMakeLists.txt | 2 +- src/tests/log_store_benchmark.cpp | 2 +- src/tests/test_log_dev.cpp | 46 +++++++++---------- src/tests/test_log_store.cpp | 6 +-- src/tests/test_log_store_long_run.cpp | 6 +-- src/tests/test_solo_repl_dev.cpp | 20 ++++---- 16 files changed, 119 insertions(+), 77 deletions(-) diff --git a/conanfile.py b/conanfile.py index e4da8a5db..8b399c66c 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.9.2" + version = "6.9.3" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/logstore/log_store_internal.hpp b/src/include/homestore/logstore/log_store_internal.hpp index 9b7019cfb..7768086ee 100644 --- a/src/include/homestore/logstore/log_store_internal.hpp +++ b/src/include/homestore/logstore/log_store_internal.hpp @@ -52,6 +52,12 @@ typedef std::function< void(std::shared_ptr< HomeLogStore >, logstore_seq_num_t) typedef int64_t logid_t; +VENUM(flush_mode_t, uint32_t, // Various flush modes (can be or'ed together) + INLINE = 1 << 0, // Allow flush inline with the append + TIMER = 1 << 1, // Allow timer based automatic flush + EXPLICIT = 1 << 2, // Allow explcitly user calling flush +); + struct logdev_key { logid_t idx; off_t dev_offset; @@ -172,4 +178,5 @@ struct logstore_superblk { logstore_seq_num_t m_first_seq_num{0}; }; #pragma pack() + } // namespace homestore \ No newline at end of file diff --git a/src/include/homestore/logstore_service.hpp b/src/include/homestore/logstore_service.hpp index fe65c7c13..48183a56c 100644 --- a/src/include/homestore/logstore_service.hpp +++ b/src/include/homestore/logstore_service.hpp @@ -94,7 +94,7 @@ class LogStoreService { * chunks. Logdev can start with zero chunks and dynamically add chunks based on write request. * @return Newly created log dev id. */ - logdev_id_t create_new_logdev(); + logdev_id_t create_new_logdev(flush_mode_t flush_mode); /** * @brief Open a log dev. @@ -102,7 +102,7 @@ class LogStoreService { * @param logdev_id: Logdev ID * @return Newly created log dev id. */ - void open_logdev(logdev_id_t logdev_id); + void open_logdev(logdev_id_t logdev_id, flush_mode_t flush_mode); /** * @brief Destroy a log dev. @@ -178,7 +178,7 @@ class LogStoreService { void delete_unopened_logdevs(); private: - std::shared_ptr< LogDev > create_new_logdev_internal(logdev_id_t logdev_id); + std::shared_ptr< LogDev > create_new_logdev_internal(logdev_id_t logdev_id, flush_mode_t flush_mode); void on_meta_blk_found(const sisl::byte_view& buf, void* meta_cookie); logdev_id_t get_next_logdev_id(); void logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 3716cb70e..a23b7c900 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -64,7 +64,7 @@ void LogDev::start(bool format, std::shared_ptr< JournalVirtualDev > vdev) { // First read the info block if (format) { HS_LOG_ASSERT(m_logdev_meta.is_empty(), "Expected meta to be not present"); - m_logdev_meta.create(m_logdev_id); + m_logdev_meta.create(m_logdev_id, m_flush_mode); m_vdev_jd->update_data_start_offset(0); } else { HS_LOG_ASSERT(!m_logdev_meta.is_empty(), "Expected meta data to be read already before loading"); @@ -108,7 +108,6 @@ LogDev::~LogDev() { HS_LOG_ASSERT((m_pending_flush_size.load() == 0), "LogDev stop attempted while writes to logdev are pending completion"); - if (allow_timer_flush()) stop_timer(); m_log_records.reset(nullptr); m_logdev_meta.reset(); m_log_idx.store(0); @@ -149,6 +148,7 @@ void LogDev::stop() { // after we call stop, we need to do any pending device truncations truncate(); m_id_logstore_map.clear(); + if (allow_timer_flush()) stop_timer(); } void LogDev::destroy() { @@ -521,7 +521,7 @@ void LogDev::on_flush_completion(LogGroup* lg) { // since we support out-of-order lsn write, so no need to guarantee the order of logstore write completion for (auto const& [idx, req] : req_map) { m_pending_callback++; - iomanager.run_on_forget(iomgr::reactor_regex::random_worker, iomgr::fiber_regex::syncio_only, + iomanager.run_on_forget(iomgr::reactor_regex::random_worker, /* iomgr::fiber_regex::syncio_only, */ [this, dev_offset, idx, req]() { auto ld_key = logdev_key{idx, dev_offset}; auto comp_cb = req->log_store->get_comp_cb(); @@ -561,11 +561,13 @@ uint64_t LogDev::truncate() { // Persist the logstore superblock to ensure correct start LSN during recovery. Avoid such scenario: // 1. Follower1 appends logs up to 100, then is stopped by a sigkill. // 2. Upon restart, a baseline resync is triggered using snapshot 2000. - // 3. Baseline resync completed with start_lsn=2001, but m_trunc_ld_key remains {0,0} since we cannot get a valid + // 3. Baseline resync completed with start_lsn=2001, but m_trunc_ld_key remains {0,0} since we cannot get a + // valid // device offset for LSN 2000 to update it. // 4. Follower1 appends logs from 2001 to 2500, making tail_lsn > 2000. // 5. Get m_trunc_ld_key={0,0}, goto here and return 0 without persist. - // 6. Follower1 is killed again, after restart, its start index remains 0, misinterpreting the range as [1,2500]. + // 6. Follower1 is killed again, after restart, its start index remains 0, misinterpreting the range as + // [1,2500]. m_logdev_meta.persist(); decr_pending_request_num(); return 0; @@ -781,7 +783,7 @@ nlohmann::json LogDev::get_status(int verbosity) const { /////////////////////////////// LogDevMetadata Section /////////////////////////////////////// LogDevMetadata::LogDevMetadata() : m_sb{logdev_sb_meta_name}, m_rollback_sb{logdev_rollback_sb_meta_name} {} -logdev_superblk* LogDevMetadata::create(logdev_id_t id) { +logdev_superblk* LogDevMetadata::create(logdev_id_t id, flush_mode_t flush_mode) { logdev_superblk* sb = m_sb.create(logdev_sb_size_needed(0)); rollback_superblk* rsb = m_rollback_sb.create(rollback_superblk::size_needed(1)); @@ -790,6 +792,7 @@ logdev_superblk* LogDevMetadata::create(logdev_id_t id) { m_id_reserver = std::make_unique< sisl::IDReserver >(); m_sb->logdev_id = id; + m_sb->flush_mode = flush_mode; m_sb.write(); m_rollback_sb->logdev_id = id; diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index 719a58861..d43dab219 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -404,6 +404,8 @@ struct logdev_superblk { uint32_t num_stores{0}; uint64_t start_dev_offset{0}; logid_t key_idx{0}; + flush_mode_t flush_mode; + // The meta data starts immediately after the super block // Equivalent of: // logstore_superblk meta[0]; @@ -481,7 +483,7 @@ class LogDevMetadata { LogDevMetadata& operator=(LogDevMetadata&&) noexcept = delete; ~LogDevMetadata() = default; - logdev_superblk* create(logdev_id_t id); + logdev_superblk* create(logdev_id_t id, flush_mode_t); void reset(); std::vector< std::pair< logstore_id_t, logstore_superblk > > load(); void persist(); @@ -572,12 +574,6 @@ struct logstore_info { static std::string const logdev_sb_meta_name{"Logdev_sb"}; static std::string const logdev_rollback_sb_meta_name{"Logdev_rollback_sb"}; -VENUM(flush_mode_t, uint32_t, // Various flush modes (can be or'ed together) - INLINE = 1 << 0, // Allow flush inline with the append - TIMER = 1 << 1, // Allow timer based automatic flush - EXPLICIT = 1 << 2, // Allow explcitly user calling flush -); - class LogDev : public std::enable_shared_from_this< LogDev > { friend class HomeLogStore; diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index 86f404e8c..542204386 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -142,12 +142,12 @@ logdev_id_t LogStoreService::get_next_logdev_id() { return id; } -logdev_id_t LogStoreService::create_new_logdev() { +logdev_id_t LogStoreService::create_new_logdev(flush_mode_t flush_mode) { if (is_stopping()) return 0; incr_pending_request_num(); folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); logdev_id_t logdev_id = get_next_logdev_id(); - auto logdev = create_new_logdev_internal(logdev_id); + auto logdev = create_new_logdev_internal(logdev_id, flush_mode); logdev->start(true /* format */, m_logdev_vdev); COUNTER_INCREMENT(m_metrics, logdevs_count, 1); HS_LOG(INFO, logstore, "Created log_dev={}", logdev_id); @@ -189,19 +189,19 @@ void LogStoreService::delete_unopened_logdevs() { m_unopened_logdev.clear(); } -std::shared_ptr< LogDev > LogStoreService::create_new_logdev_internal(logdev_id_t logdev_id) { - auto logdev = std::make_shared< LogDev >(logdev_id); +std::shared_ptr< LogDev > LogStoreService::create_new_logdev_internal(logdev_id_t logdev_id, flush_mode_t flush_mode) { + auto logdev = std::make_shared< LogDev >(logdev_id, flush_mode); const auto it = m_id_logdev_map.find(logdev_id); HS_REL_ASSERT((it == m_id_logdev_map.end()), "logdev id {} already exists", logdev_id); m_id_logdev_map.insert(std::make_pair<>(logdev_id, logdev)); return logdev; } -void LogStoreService::open_logdev(logdev_id_t logdev_id) { +void LogStoreService::open_logdev(logdev_id_t logdev_id, flush_mode_t flush_mode) { folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); const auto it = m_id_logdev_map.find(logdev_id); if (it == m_id_logdev_map.end()) { - auto logdev = std::make_shared< LogDev >(logdev_id); + auto logdev = std::make_shared< LogDev >(logdev_id, flush_mode); m_id_logdev_map.emplace(logdev_id, logdev); LOGDEBUGMOD(logstore, "log_dev={} does not exist, created!", logdev_id); } @@ -238,13 +238,14 @@ void LogStoreService::logdev_super_blk_found(const sisl::byte_view& buf, void* m folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); std::shared_ptr< LogDev > logdev; auto id = sb->logdev_id; + auto flush_mode = sb->flush_mode; const auto it = m_id_logdev_map.find(id); // We could update the logdev map either with logdev or rollback superblks found callbacks. if (it != m_id_logdev_map.end()) { logdev = it->second; HS_LOG(DEBUG, logstore, "Log dev superblk found log_dev={}", id); } else { - logdev = std::make_shared< LogDev >(id); + logdev = std::make_shared< LogDev >(id, flush_mode); m_id_logdev_map.emplace(id, logdev); // when recover logdev meta blk, we get all the logdevs from the superblk. we put them in m_unopened_logdev // too. after logdev meta blks are all recovered, when a client opens a logdev, we remove it from diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index be7039059..1dc9fb199 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -92,7 +92,7 @@ HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore m_dummy_log_entry = nuraft::cs_new< nuraft::log_entry >(0, nuraft::buffer::alloc(0), nuraft::log_val_type::app_log); if (logstore_id == UINT32_MAX) { - m_logdev_id = logstore_service().create_new_logdev(); + m_logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); m_log_store = logstore_service().create_new_log_store(m_logdev_id, true); if (!m_log_store) { throw std::runtime_error("Failed to create log store"); } m_logstore_id = m_log_store->get_store_id(); @@ -101,7 +101,7 @@ HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore m_logdev_id = logdev_id; m_logstore_id = logstore_id; LOGDEBUGMOD(replication, "Opening existing home log_dev={} log_store={}", m_logdev_id, logstore_id); - logstore_service().open_logdev(m_logdev_id); + logstore_service().open_logdev(m_logdev_id, flush_mode_t::EXPLICIT); m_log_store_future = logstore_service() .open_log_store(m_logdev_id, logstore_id, true, log_found_cb, log_replay_done_cb) .thenValue([this](auto log_store) { @@ -382,8 +382,8 @@ ulong HomeRaftLogStore::last_durable_index() { void HomeRaftLogStore::purge_all_logs() { auto last_lsn = m_log_store->get_contiguous_issued_seq_num(m_last_durable_lsn); - REPL_STORE_LOG(INFO, "Store={} LogDev={}: Purging all logs in the log store, last_lsn={}", - m_logstore_id, m_logdev_id, last_lsn); + REPL_STORE_LOG(INFO, "Store={} LogDev={}: Purging all logs in the log store, last_lsn={}", m_logstore_id, + m_logdev_id, last_lsn); m_log_store->truncate(last_lsn, false /* in_memory_truncate_only */); } diff --git a/src/lib/replication/repl_dev/solo_repl_dev.cpp b/src/lib/replication/repl_dev/solo_repl_dev.cpp index bc278303a..22f2446d0 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.cpp +++ b/src/lib/replication/repl_dev/solo_repl_dev.cpp @@ -10,7 +10,7 @@ namespace homestore { SoloReplDev::SoloReplDev(superblk< repl_dev_superblk >&& rd_sb, bool load_existing) : m_rd_sb{std::move(rd_sb)}, m_group_id{m_rd_sb->group_id} { if (load_existing) { - logstore_service().open_logdev(m_rd_sb->logdev_id); + logstore_service().open_logdev(m_rd_sb->logdev_id, flush_mode_t::TIMER); logstore_service() .open_log_store(m_rd_sb->logdev_id, m_rd_sb->logstore_id, true /* append_mode */) .thenValue([this](auto log_store) { @@ -19,7 +19,7 @@ SoloReplDev::SoloReplDev(superblk< repl_dev_superblk >&& rd_sb, bool load_existi m_data_journal->register_log_found_cb(bind_this(SoloReplDev::on_log_found, 3)); }); } else { - m_logdev_id = logstore_service().create_new_logdev(); + m_logdev_id = logstore_service().create_new_logdev(flush_mode_t::TIMER); m_data_journal = logstore_service().create_new_log_store(m_logdev_id, true /* append_mode */); m_rd_sb->logstore_id = m_data_journal->get_store_id(); m_rd_sb->logdev_id = m_logdev_id; @@ -30,6 +30,8 @@ SoloReplDev::SoloReplDev(superblk< repl_dev_superblk >&& rd_sb, bool load_existi void SoloReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value, repl_req_ptr_t rreq, trace_id_t tid) { if (!rreq) { auto rreq = repl_req_ptr_t(new repl_req_ctx{}); } + + incr_pending_request_num(); auto status = rreq->init(repl_key{.server_id = 0, .term = 1, .dsn = 1, .traceID = tid}, value.size ? journal_type_t::HS_DATA_LINKED : journal_type_t::HS_DATA_INLINED, true, header, key, value.size, m_listener); @@ -60,6 +62,7 @@ void SoloReplDev::write_journal(repl_req_ptr_t rreq) { data_service().commit_blk(rreq->local_blkid()); m_listener->on_commit(rreq->lsn(), rreq->header(), rreq->key(), rreq->local_blkid(), rreq); + decr_pending_request_num(); }); } @@ -68,7 +71,6 @@ void SoloReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx uint32_t remain_size = buf.size() - sizeof(repl_journal_entry); HS_REL_ASSERT_EQ(entry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR, "Mismatched version of journal entry found"); - HS_REL_ASSERT_EQ(entry->code, journal_type_t::HS_DATA_LINKED, "Found a journal entry which is not data"); uint8_t const* raw_ptr = r_cast< uint8_t const* >(entry) + sizeof(repl_journal_entry); sisl::blob header{raw_ptr, entry->user_header_size}; @@ -95,11 +97,25 @@ void SoloReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx folly::Future< std::error_code > SoloReplDev::async_read(MultiBlkId const& bid, sisl::sg_list& sgs, uint32_t size, bool part_of_batch, trace_id_t tid) { - return data_service().async_read(bid, sgs, size, part_of_batch); + if (is_stopping()) { + LOGINFO("repl dev is being shutdown!"); + return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_canceled)); + } + incr_pending_request_num(); + auto result = data_service().async_read(bid, sgs, size, part_of_batch); + decr_pending_request_num(); + return result; } folly::Future< std::error_code > SoloReplDev::async_free_blks(int64_t, MultiBlkId const& bid, trace_id_t tid) { - return data_service().async_free_blk(bid); + if (is_stopping()) { + LOGINFO("repl dev is being shutdown!"); + return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_canceled)); + } + incr_pending_request_num(); + auto result = data_service().async_free_blk(bid); + decr_pending_request_num(); + return result; } uint32_t SoloReplDev::get_blk_size() const { return data_service().get_blk_size(); } diff --git a/src/lib/replication/repl_dev/solo_repl_dev.h b/src/lib/replication/repl_dev/solo_repl_dev.h index 397f461da..579506db1 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.h +++ b/src/lib/replication/repl_dev/solo_repl_dev.h @@ -58,7 +58,7 @@ class SoloReplDev : public ReplDev { bool is_ready_for_traffic() const override { return true; } void purge() override {} - std::shared_ptr deserialize_snapshot_context(sisl::io_blob_safe &snp_ctx) override { + std::shared_ptr< snapshot_context > deserialize_snapshot_context(sisl::io_blob_safe& snp_ctx) override { return nullptr; } diff --git a/src/lib/replication/service/generic_repl_svc.cpp b/src/lib/replication/service/generic_repl_svc.cpp index f357cb819..20a5f8436 100644 --- a/src/lib/replication/service/generic_repl_svc.cpp +++ b/src/lib/replication/service/generic_repl_svc.cpp @@ -78,7 +78,7 @@ hs_stats GenericReplService::get_cap_stats() const { ///////////////////// SoloReplService specializations and CP Callbacks ///////////////////////////// SoloReplService::SoloReplService(cshared< ReplApplication >& repl_app) : GenericReplService{repl_app} {} -SoloReplService::~SoloReplService(){}; +SoloReplService::~SoloReplService() {}; void SoloReplService::start() { for (auto const& [buf, mblk] : m_sb_bufs) { @@ -97,7 +97,23 @@ void SoloReplService::start() { } void SoloReplService::stop() { - // TODO: Implement graceful shutdown for soloReplService + start_stopping(); + while (true) { + auto pending_request_num = get_pending_request_num(); + if (!pending_request_num) break; + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + + // stop all repl_devs + { + std::unique_lock lg(m_rd_map_mtx); + for (auto it = m_rd_map.begin(); it != m_rd_map.end(); ++it) { + auto rdev = std::dynamic_pointer_cast< SoloReplDev >(it->second); + rdev->stop(); + } + } + hs()->logstore_service().stop(); + hs()->data_service().stop(); } AsyncReplResult< shared< ReplDev > > SoloReplService::create_repl_dev(group_id_t group_id, @@ -110,6 +126,7 @@ AsyncReplResult< shared< ReplDev > > SoloReplService::create_repl_dev(group_id_t auto listener = m_repl_app->create_repl_dev_listener(group_id); listener->set_repl_dev(rdev); rdev->attach_listener(std::move(listener)); + incr_pending_request_num(); { std::unique_lock lg(m_rd_map_mtx); @@ -117,10 +134,12 @@ AsyncReplResult< shared< ReplDev > > SoloReplService::create_repl_dev(group_id_t if (!happened) { // We should never reach here, as we have failed to emplace in map, but couldn't find entry DEBUG_ASSERT(false, "Unable to put the repl_dev in rd map"); + decr_pending_request_num(); return make_async_error< shared< ReplDev > >(ReplServiceError::SERVER_ALREADY_EXISTS); } } + decr_pending_request_num(); return make_async_success< shared< ReplDev > >(rdev); } diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 940d2e891..cba159954 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -131,7 +131,7 @@ if (${io_tests}) add_test(NAME DataService-Epoll COMMAND test_data_service) add_test(NAME RaftReplDev-Epoll COMMAND test_raft_repl_dev) # add_test(NAME RaftReplDevDynamic-Epoll COMMAND test_raft_repl_dev_dynamic) - # add_test(NAME SoloReplDev-Epoll COMMAND test_solo_repl_dev) + add_test(NAME SoloReplDev-Epoll COMMAND test_solo_repl_dev) endif() can_build_spdk_io_tests(spdk_tests) diff --git a/src/tests/log_store_benchmark.cpp b/src/tests/log_store_benchmark.cpp index c4e37fa25..986ab1cc7 100644 --- a/src/tests/log_store_benchmark.cpp +++ b/src/tests/log_store_benchmark.cpp @@ -55,7 +55,7 @@ class BenchLogStore { public: friend class SampleDB; BenchLogStore() { - m_logdev_id = logstore_service().create_new_logdev(); + m_logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); m_log_store = logstore_service().create_new_log_store(m_logdev_id, true /* append_mode */); m_log_store->register_log_found_cb(bind_this(BenchLogStore::on_log_found, 3)); m_nth_entry.store(0); diff --git a/src/tests/test_log_dev.cpp b/src/tests/test_log_dev.cpp index 7bde7bc12..45ecee96f 100644 --- a/src/tests/test_log_dev.cpp +++ b/src/tests/test_log_dev.cpp @@ -158,9 +158,10 @@ class LogDevTest : public ::testing::Test { } } - void insert_batch_sync(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t& lsn, int64_t batch, uint32_t fixed_size = 0) { + void insert_batch_sync(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t& lsn, int64_t batch, + uint32_t fixed_size = 0) { bool io_memory{false}; - std::vector data_vector; + std::vector< test_log_data* > data_vector; for (int64_t i = 0; i < batch; ++i) { auto* d = prepare_data(lsn + i, io_memory, fixed_size); @@ -246,20 +247,16 @@ class LogDevTest : public ::testing::Test { logid_t get_last_truncate_idx(logdev_id_t logdev_id) { auto status = logstore_service().get_logdev(logdev_id)->get_status(0); - if (status.contains("last_truncate_log_idx")) { - return s_cast(status["last_truncate_log_idx"]); - } + if (status.contains("last_truncate_log_idx")) { return s_cast< logid_t >(status["last_truncate_log_idx"]); } LOGERROR("Failed to get last_truncate_log_idx from logdev status for logdev_id {}", logdev_id); - return static_cast(-1); + return static_cast< logid_t >(-1); } logid_t get_current_log_idx(logdev_id_t logdev_id) { auto status = logstore_service().get_logdev(logdev_id)->get_status(0); - if (status.contains("current_log_idx")) { - return s_cast(status["current_log_idx"]); - } + if (status.contains("current_log_idx")) { return s_cast< logid_t >(status["current_log_idx"]); } LOGERROR("Failed to get current_log_idx from logdev status for logdev_id {}", logdev_id); - return static_cast(-1); + return static_cast< logid_t >(-1); } }; @@ -268,7 +265,7 @@ TEST_F(LogDevTest, WriteSyncThenRead) { for (uint32_t iteration{0}; iteration < iterations; ++iteration) { LOGINFO("Iteration {}", iteration); - auto logdev_id = logstore_service().create_new_logdev(); + auto logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); auto log_store = logstore_service().create_new_log_store(logdev_id, false); const auto store_id = log_store->get_store_id(); @@ -288,7 +285,7 @@ TEST_F(LogDevTest, WriteSyncThenRead) { TEST_F(LogDevTest, Rollback) { LOGINFO("Step 1: Create a single logstore to start rollback test"); - auto logdev_id = logstore_service().create_new_logdev(); + auto logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); auto log_store = logstore_service().create_new_log_store(logdev_id, false); auto store_id = log_store->get_store_id(); @@ -296,7 +293,7 @@ TEST_F(LogDevTest, Rollback) { auto restart = [&]() { std::promise< bool > p; auto starting_cb = [&]() { - logstore_service().open_logdev(logdev_id); + logstore_service().open_logdev(logdev_id, flush_mode_t::EXPLICIT); logstore_service().open_log_store(logdev_id, store_id, false /* append_mode */).thenValue([&](auto store) { log_store = store; p.set_value(true); @@ -355,7 +352,7 @@ TEST_F(LogDevTest, Rollback) { TEST_F(LogDevTest, ReTruncate) { LOGINFO("Step 1: Create a single logstore to start re-truncate test"); - auto logdev_id = logstore_service().create_new_logdev(); + auto logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); auto log_store = logstore_service().create_new_log_store(logdev_id, false); @@ -382,7 +379,7 @@ TEST_F(LogDevTest, ReTruncate) { TEST_F(LogDevTest, TruncateWithExceedingLSN) { LOGINFO("Step 1: Create a single logstore to start truncate with exceeding LSN test"); - auto logdev_id = logstore_service().create_new_logdev(); + auto logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); auto log_store = logstore_service().create_new_log_store(logdev_id, false); @@ -426,7 +423,7 @@ TEST_F(LogDevTest, TruncateWithExceedingLSN) { TEST_F(LogDevTest, TruncateAfterRestart) { LOGINFO("Step 1: Create a single logstore to start truncate with overlapping LSN test"); - auto logdev_id = logstore_service().create_new_logdev(); + auto logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); auto log_store = logstore_service().create_new_log_store(logdev_id, false); auto store_id = log_store->get_store_id(); @@ -434,7 +431,7 @@ TEST_F(LogDevTest, TruncateAfterRestart) { auto restart = [&]() { std::promise< bool > p; auto starting_cb = [&]() { - logstore_service().open_logdev(logdev_id); + logstore_service().open_logdev(logdev_id, flush_mode_t::EXPLICIT); logstore_service().open_log_store(logdev_id, store_id, false /* append_mode */).thenValue([&](auto store) { log_store = store; p.set_value(true); @@ -477,13 +474,12 @@ TEST_F(LogDevTest, TruncateAfterRestart) { TEST_F(LogDevTest, TruncateAcrossMultipleStores) { LOGINFO("Step 1: Create 3 log stores to start truncate across multiple stores test"); - auto logdev_id = logstore_service().create_new_logdev(); + auto logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); auto store1 = logstore_service().create_new_log_store(logdev_id, false); auto store2 = logstore_service().create_new_log_store(logdev_id, false); auto store3 = logstore_service().create_new_log_store(logdev_id, false); - LOGINFO("Step 2: Insert 100 entries to store {}", store1->get_store_id()); logstore_seq_num_t cur_lsn = 0; kickstart_inserts(store1, cur_lsn, 100); @@ -644,15 +640,15 @@ TEST_F(LogDevTest, TruncateAcrossMultipleStores) { TEST_F(LogDevTest, TruncateLogsAfterFlushAndRestart) { LOGINFO("Step 1: Create a single logstore to start truncate-logs-after-flush-and-restart test"); - auto logdev_id = logstore_service().create_new_logdev(); + auto logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); auto log_store = logstore_service().create_new_log_store(logdev_id, false); auto store_id = log_store->get_store_id(); auto restart = [&]() { - std::promise < bool > p; + std::promise< bool > p; auto starting_cb = [&]() { - logstore_service().open_logdev(logdev_id); + logstore_service().open_logdev(logdev_id, flush_mode_t::EXPLICIT); logstore_service().open_log_store(logdev_id, store_id, false /* append_mode */).thenValue([&](auto store) { log_store = store; p.set_value(true); @@ -712,7 +708,7 @@ TEST_F(LogDevTest, CreateRemoveLogDev) { ASSERT_EQ(vdev->num_descriptors(), 0); for (uint32_t i{0}; i < num_logdev; ++i) { - auto id = logstore_service().create_new_logdev(); + auto id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); s_max_flush_multiple = logstore_service().get_logdev(id)->get_flush_size_multiple(); auto store = logstore_service().create_new_log_store(id, false); log_stores.push_back(store); @@ -760,7 +756,7 @@ TEST_F(LogDevTest, DeleteUnopenedLogDev) { // Test deletion of unopened logdev. std::set< logdev_id_t > id_set, unopened_id_set; for (uint32_t i{0}; i < num_logdev; ++i) { - auto id = logstore_service().create_new_logdev(); + auto id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); id_set.insert(id); if (i >= num_logdev / 2) { unopened_id_set.insert(id); } s_max_flush_multiple = logstore_service().get_logdev(id)->get_flush_size_multiple(); @@ -784,7 +780,7 @@ TEST_F(LogDevTest, DeleteUnopenedLogDev) { auto starting_cb = [&]() { auto it = id_set.begin(); for (uint32_t i{0}; i < id_set.size() / 2; i++, it++) { - logstore_service().open_logdev(*it); + logstore_service().open_logdev(*it, flush_mode_t::EXPLICIT); } }; start_homestore(true /* restart */, starting_cb); diff --git a/src/tests/test_log_store.cpp b/src/tests/test_log_store.cpp index 3b1b2c60b..8f18d71f2 100644 --- a/src/tests/test_log_store.cpp +++ b/src/tests/test_log_store.cpp @@ -455,7 +455,7 @@ class SampleDB { for (uint32_t i{0}; i < n_log_stores; ++i) { SampleLogStoreClient* client = m_log_store_clients[i].get(); - logstore_service().open_logdev(client->m_logdev_id); + logstore_service().open_logdev(client->m_logdev_id, flush_mode_t::EXPLICIT); logstore_service() .open_log_store(client->m_logdev_id, client->m_store_id, false /* append_mode */) .thenValue([i, this, client](auto log_store) { client->set_log_store(log_store); }); @@ -479,7 +479,7 @@ class SampleDB { std::vector< logdev_id_t > logdev_id_vec; for (uint32_t i{0}; i < n_log_devs; ++i) { - logdev_id_vec.push_back(logstore_service().create_new_logdev()); + logdev_id_vec.push_back(logstore_service().create_new_logdev(flush_mode_t::EXPLICIT)); } for (uint32_t i{0}; i < n_log_stores; ++i) { @@ -1225,7 +1225,7 @@ TEST_F(LogStoreTest, WriteSyncThenRead) { for (uint32_t iteration{0}; iteration < iterations; ++iteration) { LOGINFO("Iteration {}", iteration); - auto logdev_id = logstore_service().create_new_logdev(); + auto logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); auto tmp_log_store = logstore_service().create_new_log_store(logdev_id, false); const auto store_id = tmp_log_store->get_store_id(); LOGINFO("Created new log store -> id {}", store_id); diff --git a/src/tests/test_log_store_long_run.cpp b/src/tests/test_log_store_long_run.cpp index 5fd0ec21f..5a7437754 100644 --- a/src/tests/test_log_store_long_run.cpp +++ b/src/tests/test_log_store_long_run.cpp @@ -294,7 +294,7 @@ class LogStoreLongRun : public ::testing::Test { HS_SETTINGS_FACTORY().save(); for (uint32_t i{0}; i < n_log_stores; ++i) { SampleLogStoreClient* client = m_log_store_clients[i].get(); - logstore_service().open_logdev(client->m_logdev_id); + logstore_service().open_logdev(client->m_logdev_id, flush_mode_t::EXPLICIT); logstore_service() .open_log_store(client->m_logdev_id, client->m_store_id, false /* append_mode */) .thenValue([i, this, client](auto log_store) { client->set_log_store(log_store); }); @@ -318,7 +318,7 @@ class LogStoreLongRun : public ::testing::Test { std::vector< logdev_id_t > logdev_id_vec; for (uint32_t i{0}; i < n_log_devs; ++i) - logdev_id_vec.push_back(logstore_service().create_new_logdev()); + logdev_id_vec.push_back(logstore_service().create_new_logdev(flush_mode_t::EXPLICIT)); for (uint32_t i{0}; i < n_log_stores; ++i) m_log_store_clients.push_back(std::make_unique< SampleLogStoreClient >( @@ -466,7 +466,7 @@ class LogStoreLongRun : public ::testing::Test { validate_num_stores(); // Create a new logstore. - auto logdev_id = logstore_service().create_new_logdev(); + auto logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); m_log_store_clients.push_back(std::make_unique< SampleLogStoreClient >( logdev_id, bind_this(LogStoreLongRun::on_log_insert_completion, 3))); validate_num_stores(); diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp index ec45ef5b4..aaec8851f 100644 --- a/src/tests/test_solo_repl_dev.cpp +++ b/src/tests/test_solo_repl_dev.cpp @@ -99,6 +99,7 @@ class SoloReplDevTest : public testing::Test { void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids, cintrusive< repl_req_ctx >& ctx) override { + LOGINFO("Received on_commit lsn={}", lsn); if (ctx == nullptr) { m_test.validate_replay(*repl_dev(), lsn, header, key, blkids); } else { @@ -232,8 +233,8 @@ class SoloReplDevTest : public testing::Test { uint32_t size = blkids.blk_count() * g_block_size; if (size) { auto read_sgs = HSTestHelper::create_sgs(size, size); - LOGDEBUG("[{}] Validating replay of lsn={} blkid = {}", boost::uuids::to_string(rdev.group_id()), lsn, - blkids.to_string()); + LOGINFO("[{}] Validating replay of lsn={} blkid = {}", boost::uuids::to_string(rdev.group_id()), lsn, + blkids.to_string()); rdev.async_read(blkids, read_sgs, size) .thenValue([this, hdr = *jhdr, read_sgs, lsn, blkids, &rdev](auto&& err) { RELEASE_ASSERT(!err, "Error during async_read"); @@ -243,8 +244,8 @@ class SoloReplDevTest : public testing::Test { HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, hdr.data_pattern); iomanager.iobuf_free(uintptr_cast(iov.iov_base)); } - LOGDEBUG("[{}] Replay of lsn={} blkid={} validated successfully", - boost::uuids::to_string(rdev.group_id()), lsn, blkids.to_string()); + LOGINFO("[{}] Replay of lsn={} blkid={} validated successfully", + boost::uuids::to_string(rdev.group_id()), lsn, blkids.to_string()); m_task_waiter.one_complete(); }); } else { @@ -258,15 +259,15 @@ class SoloReplDevTest : public testing::Test { req->read_sgs = HSTestHelper::create_sgs(req->write_sgs.size, req->write_sgs.size); auto const cap = hs()->repl_service().get_cap_stats(); - LOGDEBUG("Write complete with cap stats: used={} total={}", cap.used_capacity, cap.total_capacity); + LOGINFO("Write complete with cap stats: used={} total={}", cap.used_capacity, cap.total_capacity); rdev.async_read(req->written_blkids, req->read_sgs, req->read_sgs.size) .thenValue([this, &rdev, req](auto&& err) { RELEASE_ASSERT(!err, "Error during async_read"); - LOGDEBUG("[{}] Write complete with lsn={} for size={} blkids={}", - boost::uuids::to_string(rdev.group_id()), req->lsn(), req->write_sgs.size, - req->written_blkids.to_string()); + LOGINFO("[{}] Write complete with lsn={} for size={} blkids={}", + boost::uuids::to_string(rdev.group_id()), req->lsn(), req->write_sgs.size, + req->written_blkids.to_string()); auto hdr = r_cast< test_repl_req::journal_header* >(req->header->bytes()); HS_REL_ASSERT_EQ(hdr->data_size, req->read_sgs.size, "journal hdr data size mismatch with actual size"); @@ -298,7 +299,9 @@ TEST_F(SoloReplDevTest, TestRandomSizedDataBlock) { uint32_t key_size = rand() % 512 + 8; this->write_io(key_size, nblks * g_block_size, g_block_size); }); + this->m_io_runner.execute().get(); + LOGINFO("Step 2: Restart homestore and validate replay data.", g_block_size); this->m_task_waiter.start([this]() { this->restart(); }).get(); } @@ -306,6 +309,7 @@ TEST_F(SoloReplDevTest, TestHeaderOnly) { LOGINFO("Step 1: run on worker threads to schedule write"); this->m_io_runner.set_task([this]() { this->write_io(0u, 0u, g_block_size); }); this->m_io_runner.execute().get(); + LOGINFO("Step 2: Restart homestore and validate replay data.", g_block_size); this->m_task_waiter.start([this]() { this->restart(); }).get(); }