diff --git a/conanfile.py b/conanfile.py index e4da8a5db..8b399c66c 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.9.2" + version = "6.9.3" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/logstore/log_store_internal.hpp b/src/include/homestore/logstore/log_store_internal.hpp index 9b7019cfb..7768086ee 100644 --- a/src/include/homestore/logstore/log_store_internal.hpp +++ b/src/include/homestore/logstore/log_store_internal.hpp @@ -52,6 +52,12 @@ typedef std::function< void(std::shared_ptr< HomeLogStore >, logstore_seq_num_t) typedef int64_t logid_t; +VENUM(flush_mode_t, uint32_t, // Various flush modes (can be or'ed together) + INLINE = 1 << 0, // Allow flush inline with the append + TIMER = 1 << 1, // Allow timer based automatic flush + EXPLICIT = 1 << 2, // Allow explcitly user calling flush +); + struct logdev_key { logid_t idx; off_t dev_offset; @@ -172,4 +178,5 @@ struct logstore_superblk { logstore_seq_num_t m_first_seq_num{0}; }; #pragma pack() + } // namespace homestore \ No newline at end of file diff --git a/src/include/homestore/logstore_service.hpp b/src/include/homestore/logstore_service.hpp index fe65c7c13..48183a56c 100644 --- a/src/include/homestore/logstore_service.hpp +++ b/src/include/homestore/logstore_service.hpp @@ -94,7 +94,7 @@ class LogStoreService { * chunks. Logdev can start with zero chunks and dynamically add chunks based on write request. * @return Newly created log dev id. */ - logdev_id_t create_new_logdev(); + logdev_id_t create_new_logdev(flush_mode_t flush_mode); /** * @brief Open a log dev. @@ -102,7 +102,7 @@ class LogStoreService { * @param logdev_id: Logdev ID * @return Newly created log dev id. */ - void open_logdev(logdev_id_t logdev_id); + void open_logdev(logdev_id_t logdev_id, flush_mode_t flush_mode); /** * @brief Destroy a log dev. @@ -178,7 +178,7 @@ class LogStoreService { void delete_unopened_logdevs(); private: - std::shared_ptr< LogDev > create_new_logdev_internal(logdev_id_t logdev_id); + std::shared_ptr< LogDev > create_new_logdev_internal(logdev_id_t logdev_id, flush_mode_t flush_mode); void on_meta_blk_found(const sisl::byte_view& buf, void* meta_cookie); logdev_id_t get_next_logdev_id(); void logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie); diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 3716cb70e..a23b7c900 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -64,7 +64,7 @@ void LogDev::start(bool format, std::shared_ptr< JournalVirtualDev > vdev) { // First read the info block if (format) { HS_LOG_ASSERT(m_logdev_meta.is_empty(), "Expected meta to be not present"); - m_logdev_meta.create(m_logdev_id); + m_logdev_meta.create(m_logdev_id, m_flush_mode); m_vdev_jd->update_data_start_offset(0); } else { HS_LOG_ASSERT(!m_logdev_meta.is_empty(), "Expected meta data to be read already before loading"); @@ -108,7 +108,6 @@ LogDev::~LogDev() { HS_LOG_ASSERT((m_pending_flush_size.load() == 0), "LogDev stop attempted while writes to logdev are pending completion"); - if (allow_timer_flush()) stop_timer(); m_log_records.reset(nullptr); m_logdev_meta.reset(); m_log_idx.store(0); @@ -149,6 +148,7 @@ void LogDev::stop() { // after we call stop, we need to do any pending device truncations truncate(); m_id_logstore_map.clear(); + if (allow_timer_flush()) stop_timer(); } void LogDev::destroy() { @@ -521,7 +521,7 @@ void LogDev::on_flush_completion(LogGroup* lg) { // since we support out-of-order lsn write, so no need to guarantee the order of logstore write completion for (auto const& [idx, req] : req_map) { m_pending_callback++; - iomanager.run_on_forget(iomgr::reactor_regex::random_worker, iomgr::fiber_regex::syncio_only, + iomanager.run_on_forget(iomgr::reactor_regex::random_worker, /* iomgr::fiber_regex::syncio_only, */ [this, dev_offset, idx, req]() { auto ld_key = logdev_key{idx, dev_offset}; auto comp_cb = req->log_store->get_comp_cb(); @@ -561,11 +561,13 @@ uint64_t LogDev::truncate() { // Persist the logstore superblock to ensure correct start LSN during recovery. Avoid such scenario: // 1. Follower1 appends logs up to 100, then is stopped by a sigkill. // 2. Upon restart, a baseline resync is triggered using snapshot 2000. - // 3. Baseline resync completed with start_lsn=2001, but m_trunc_ld_key remains {0,0} since we cannot get a valid + // 3. Baseline resync completed with start_lsn=2001, but m_trunc_ld_key remains {0,0} since we cannot get a + // valid // device offset for LSN 2000 to update it. // 4. Follower1 appends logs from 2001 to 2500, making tail_lsn > 2000. // 5. Get m_trunc_ld_key={0,0}, goto here and return 0 without persist. - // 6. Follower1 is killed again, after restart, its start index remains 0, misinterpreting the range as [1,2500]. + // 6. Follower1 is killed again, after restart, its start index remains 0, misinterpreting the range as + // [1,2500]. m_logdev_meta.persist(); decr_pending_request_num(); return 0; @@ -781,7 +783,7 @@ nlohmann::json LogDev::get_status(int verbosity) const { /////////////////////////////// LogDevMetadata Section /////////////////////////////////////// LogDevMetadata::LogDevMetadata() : m_sb{logdev_sb_meta_name}, m_rollback_sb{logdev_rollback_sb_meta_name} {} -logdev_superblk* LogDevMetadata::create(logdev_id_t id) { +logdev_superblk* LogDevMetadata::create(logdev_id_t id, flush_mode_t flush_mode) { logdev_superblk* sb = m_sb.create(logdev_sb_size_needed(0)); rollback_superblk* rsb = m_rollback_sb.create(rollback_superblk::size_needed(1)); @@ -790,6 +792,7 @@ logdev_superblk* LogDevMetadata::create(logdev_id_t id) { m_id_reserver = std::make_unique< sisl::IDReserver >(); m_sb->logdev_id = id; + m_sb->flush_mode = flush_mode; m_sb.write(); m_rollback_sb->logdev_id = id; diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index 719a58861..d43dab219 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -404,6 +404,8 @@ struct logdev_superblk { uint32_t num_stores{0}; uint64_t start_dev_offset{0}; logid_t key_idx{0}; + flush_mode_t flush_mode; + // The meta data starts immediately after the super block // Equivalent of: // logstore_superblk meta[0]; @@ -481,7 +483,7 @@ class LogDevMetadata { LogDevMetadata& operator=(LogDevMetadata&&) noexcept = delete; ~LogDevMetadata() = default; - logdev_superblk* create(logdev_id_t id); + logdev_superblk* create(logdev_id_t id, flush_mode_t); void reset(); std::vector< std::pair< logstore_id_t, logstore_superblk > > load(); void persist(); @@ -572,12 +574,6 @@ struct logstore_info { static std::string const logdev_sb_meta_name{"Logdev_sb"}; static std::string const logdev_rollback_sb_meta_name{"Logdev_rollback_sb"}; -VENUM(flush_mode_t, uint32_t, // Various flush modes (can be or'ed together) - INLINE = 1 << 0, // Allow flush inline with the append - TIMER = 1 << 1, // Allow timer based automatic flush - EXPLICIT = 1 << 2, // Allow explcitly user calling flush -); - class LogDev : public std::enable_shared_from_this< LogDev > { friend class HomeLogStore; diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index 86f404e8c..542204386 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -142,12 +142,12 @@ logdev_id_t LogStoreService::get_next_logdev_id() { return id; } -logdev_id_t LogStoreService::create_new_logdev() { +logdev_id_t LogStoreService::create_new_logdev(flush_mode_t flush_mode) { if (is_stopping()) return 0; incr_pending_request_num(); folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); logdev_id_t logdev_id = get_next_logdev_id(); - auto logdev = create_new_logdev_internal(logdev_id); + auto logdev = create_new_logdev_internal(logdev_id, flush_mode); logdev->start(true /* format */, m_logdev_vdev); COUNTER_INCREMENT(m_metrics, logdevs_count, 1); HS_LOG(INFO, logstore, "Created log_dev={}", logdev_id); @@ -189,19 +189,19 @@ void LogStoreService::delete_unopened_logdevs() { m_unopened_logdev.clear(); } -std::shared_ptr< LogDev > LogStoreService::create_new_logdev_internal(logdev_id_t logdev_id) { - auto logdev = std::make_shared< LogDev >(logdev_id); +std::shared_ptr< LogDev > LogStoreService::create_new_logdev_internal(logdev_id_t logdev_id, flush_mode_t flush_mode) { + auto logdev = std::make_shared< LogDev >(logdev_id, flush_mode); const auto it = m_id_logdev_map.find(logdev_id); HS_REL_ASSERT((it == m_id_logdev_map.end()), "logdev id {} already exists", logdev_id); m_id_logdev_map.insert(std::make_pair<>(logdev_id, logdev)); return logdev; } -void LogStoreService::open_logdev(logdev_id_t logdev_id) { +void LogStoreService::open_logdev(logdev_id_t logdev_id, flush_mode_t flush_mode) { folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); const auto it = m_id_logdev_map.find(logdev_id); if (it == m_id_logdev_map.end()) { - auto logdev = std::make_shared< LogDev >(logdev_id); + auto logdev = std::make_shared< LogDev >(logdev_id, flush_mode); m_id_logdev_map.emplace(logdev_id, logdev); LOGDEBUGMOD(logstore, "log_dev={} does not exist, created!", logdev_id); } @@ -238,13 +238,14 @@ void LogStoreService::logdev_super_blk_found(const sisl::byte_view& buf, void* m folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx); std::shared_ptr< LogDev > logdev; auto id = sb->logdev_id; + auto flush_mode = sb->flush_mode; const auto it = m_id_logdev_map.find(id); // We could update the logdev map either with logdev or rollback superblks found callbacks. if (it != m_id_logdev_map.end()) { logdev = it->second; HS_LOG(DEBUG, logstore, "Log dev superblk found log_dev={}", id); } else { - logdev = std::make_shared< LogDev >(id); + logdev = std::make_shared< LogDev >(id, flush_mode); m_id_logdev_map.emplace(id, logdev); // when recover logdev meta blk, we get all the logdevs from the superblk. we put them in m_unopened_logdev // too. after logdev meta blks are all recovered, when a client opens a logdev, we remove it from diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index be7039059..1dc9fb199 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -92,7 +92,7 @@ HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore m_dummy_log_entry = nuraft::cs_new< nuraft::log_entry >(0, nuraft::buffer::alloc(0), nuraft::log_val_type::app_log); if (logstore_id == UINT32_MAX) { - m_logdev_id = logstore_service().create_new_logdev(); + m_logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); m_log_store = logstore_service().create_new_log_store(m_logdev_id, true); if (!m_log_store) { throw std::runtime_error("Failed to create log store"); } m_logstore_id = m_log_store->get_store_id(); @@ -101,7 +101,7 @@ HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore m_logdev_id = logdev_id; m_logstore_id = logstore_id; LOGDEBUGMOD(replication, "Opening existing home log_dev={} log_store={}", m_logdev_id, logstore_id); - logstore_service().open_logdev(m_logdev_id); + logstore_service().open_logdev(m_logdev_id, flush_mode_t::EXPLICIT); m_log_store_future = logstore_service() .open_log_store(m_logdev_id, logstore_id, true, log_found_cb, log_replay_done_cb) .thenValue([this](auto log_store) { @@ -382,8 +382,8 @@ ulong HomeRaftLogStore::last_durable_index() { void HomeRaftLogStore::purge_all_logs() { auto last_lsn = m_log_store->get_contiguous_issued_seq_num(m_last_durable_lsn); - REPL_STORE_LOG(INFO, "Store={} LogDev={}: Purging all logs in the log store, last_lsn={}", - m_logstore_id, m_logdev_id, last_lsn); + REPL_STORE_LOG(INFO, "Store={} LogDev={}: Purging all logs in the log store, last_lsn={}", m_logstore_id, + m_logdev_id, last_lsn); m_log_store->truncate(last_lsn, false /* in_memory_truncate_only */); } diff --git a/src/lib/replication/repl_dev/solo_repl_dev.cpp b/src/lib/replication/repl_dev/solo_repl_dev.cpp index bc278303a..22f2446d0 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.cpp +++ b/src/lib/replication/repl_dev/solo_repl_dev.cpp @@ -10,7 +10,7 @@ namespace homestore { SoloReplDev::SoloReplDev(superblk< repl_dev_superblk >&& rd_sb, bool load_existing) : m_rd_sb{std::move(rd_sb)}, m_group_id{m_rd_sb->group_id} { if (load_existing) { - logstore_service().open_logdev(m_rd_sb->logdev_id); + logstore_service().open_logdev(m_rd_sb->logdev_id, flush_mode_t::TIMER); logstore_service() .open_log_store(m_rd_sb->logdev_id, m_rd_sb->logstore_id, true /* append_mode */) .thenValue([this](auto log_store) { @@ -19,7 +19,7 @@ SoloReplDev::SoloReplDev(superblk< repl_dev_superblk >&& rd_sb, bool load_existi m_data_journal->register_log_found_cb(bind_this(SoloReplDev::on_log_found, 3)); }); } else { - m_logdev_id = logstore_service().create_new_logdev(); + m_logdev_id = logstore_service().create_new_logdev(flush_mode_t::TIMER); m_data_journal = logstore_service().create_new_log_store(m_logdev_id, true /* append_mode */); m_rd_sb->logstore_id = m_data_journal->get_store_id(); m_rd_sb->logdev_id = m_logdev_id; @@ -30,6 +30,8 @@ SoloReplDev::SoloReplDev(superblk< repl_dev_superblk >&& rd_sb, bool load_existi void SoloReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value, repl_req_ptr_t rreq, trace_id_t tid) { if (!rreq) { auto rreq = repl_req_ptr_t(new repl_req_ctx{}); } + + incr_pending_request_num(); auto status = rreq->init(repl_key{.server_id = 0, .term = 1, .dsn = 1, .traceID = tid}, value.size ? journal_type_t::HS_DATA_LINKED : journal_type_t::HS_DATA_INLINED, true, header, key, value.size, m_listener); @@ -60,6 +62,7 @@ void SoloReplDev::write_journal(repl_req_ptr_t rreq) { data_service().commit_blk(rreq->local_blkid()); m_listener->on_commit(rreq->lsn(), rreq->header(), rreq->key(), rreq->local_blkid(), rreq); + decr_pending_request_num(); }); } @@ -68,7 +71,6 @@ void SoloReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx uint32_t remain_size = buf.size() - sizeof(repl_journal_entry); HS_REL_ASSERT_EQ(entry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR, "Mismatched version of journal entry found"); - HS_REL_ASSERT_EQ(entry->code, journal_type_t::HS_DATA_LINKED, "Found a journal entry which is not data"); uint8_t const* raw_ptr = r_cast< uint8_t const* >(entry) + sizeof(repl_journal_entry); sisl::blob header{raw_ptr, entry->user_header_size}; @@ -95,11 +97,25 @@ void SoloReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx folly::Future< std::error_code > SoloReplDev::async_read(MultiBlkId const& bid, sisl::sg_list& sgs, uint32_t size, bool part_of_batch, trace_id_t tid) { - return data_service().async_read(bid, sgs, size, part_of_batch); + if (is_stopping()) { + LOGINFO("repl dev is being shutdown!"); + return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_canceled)); + } + incr_pending_request_num(); + auto result = data_service().async_read(bid, sgs, size, part_of_batch); + decr_pending_request_num(); + return result; } folly::Future< std::error_code > SoloReplDev::async_free_blks(int64_t, MultiBlkId const& bid, trace_id_t tid) { - return data_service().async_free_blk(bid); + if (is_stopping()) { + LOGINFO("repl dev is being shutdown!"); + return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_canceled)); + } + incr_pending_request_num(); + auto result = data_service().async_free_blk(bid); + decr_pending_request_num(); + return result; } uint32_t SoloReplDev::get_blk_size() const { return data_service().get_blk_size(); } diff --git a/src/lib/replication/repl_dev/solo_repl_dev.h b/src/lib/replication/repl_dev/solo_repl_dev.h index 397f461da..579506db1 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.h +++ b/src/lib/replication/repl_dev/solo_repl_dev.h @@ -58,7 +58,7 @@ class SoloReplDev : public ReplDev { bool is_ready_for_traffic() const override { return true; } void purge() override {} - std::shared_ptr deserialize_snapshot_context(sisl::io_blob_safe &snp_ctx) override { + std::shared_ptr< snapshot_context > deserialize_snapshot_context(sisl::io_blob_safe& snp_ctx) override { return nullptr; } diff --git a/src/lib/replication/service/generic_repl_svc.cpp b/src/lib/replication/service/generic_repl_svc.cpp index f357cb819..20a5f8436 100644 --- a/src/lib/replication/service/generic_repl_svc.cpp +++ b/src/lib/replication/service/generic_repl_svc.cpp @@ -78,7 +78,7 @@ hs_stats GenericReplService::get_cap_stats() const { ///////////////////// SoloReplService specializations and CP Callbacks ///////////////////////////// SoloReplService::SoloReplService(cshared< ReplApplication >& repl_app) : GenericReplService{repl_app} {} -SoloReplService::~SoloReplService(){}; +SoloReplService::~SoloReplService() {}; void SoloReplService::start() { for (auto const& [buf, mblk] : m_sb_bufs) { @@ -97,7 +97,23 @@ void SoloReplService::start() { } void SoloReplService::stop() { - // TODO: Implement graceful shutdown for soloReplService + start_stopping(); + while (true) { + auto pending_request_num = get_pending_request_num(); + if (!pending_request_num) break; + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + + // stop all repl_devs + { + std::unique_lock lg(m_rd_map_mtx); + for (auto it = m_rd_map.begin(); it != m_rd_map.end(); ++it) { + auto rdev = std::dynamic_pointer_cast< SoloReplDev >(it->second); + rdev->stop(); + } + } + hs()->logstore_service().stop(); + hs()->data_service().stop(); } AsyncReplResult< shared< ReplDev > > SoloReplService::create_repl_dev(group_id_t group_id, @@ -110,6 +126,7 @@ AsyncReplResult< shared< ReplDev > > SoloReplService::create_repl_dev(group_id_t auto listener = m_repl_app->create_repl_dev_listener(group_id); listener->set_repl_dev(rdev); rdev->attach_listener(std::move(listener)); + incr_pending_request_num(); { std::unique_lock lg(m_rd_map_mtx); @@ -117,10 +134,12 @@ AsyncReplResult< shared< ReplDev > > SoloReplService::create_repl_dev(group_id_t if (!happened) { // We should never reach here, as we have failed to emplace in map, but couldn't find entry DEBUG_ASSERT(false, "Unable to put the repl_dev in rd map"); + decr_pending_request_num(); return make_async_error< shared< ReplDev > >(ReplServiceError::SERVER_ALREADY_EXISTS); } } + decr_pending_request_num(); return make_async_success< shared< ReplDev > >(rdev); } diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 940d2e891..cba159954 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -131,7 +131,7 @@ if (${io_tests}) add_test(NAME DataService-Epoll COMMAND test_data_service) add_test(NAME RaftReplDev-Epoll COMMAND test_raft_repl_dev) # add_test(NAME RaftReplDevDynamic-Epoll COMMAND test_raft_repl_dev_dynamic) - # add_test(NAME SoloReplDev-Epoll COMMAND test_solo_repl_dev) + add_test(NAME SoloReplDev-Epoll COMMAND test_solo_repl_dev) endif() can_build_spdk_io_tests(spdk_tests) diff --git a/src/tests/log_store_benchmark.cpp b/src/tests/log_store_benchmark.cpp index c4e37fa25..986ab1cc7 100644 --- a/src/tests/log_store_benchmark.cpp +++ b/src/tests/log_store_benchmark.cpp @@ -55,7 +55,7 @@ class BenchLogStore { public: friend class SampleDB; BenchLogStore() { - m_logdev_id = logstore_service().create_new_logdev(); + m_logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); m_log_store = logstore_service().create_new_log_store(m_logdev_id, true /* append_mode */); m_log_store->register_log_found_cb(bind_this(BenchLogStore::on_log_found, 3)); m_nth_entry.store(0); diff --git a/src/tests/test_log_dev.cpp b/src/tests/test_log_dev.cpp index 7bde7bc12..45ecee96f 100644 --- a/src/tests/test_log_dev.cpp +++ b/src/tests/test_log_dev.cpp @@ -158,9 +158,10 @@ class LogDevTest : public ::testing::Test { } } - void insert_batch_sync(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t& lsn, int64_t batch, uint32_t fixed_size = 0) { + void insert_batch_sync(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t& lsn, int64_t batch, + uint32_t fixed_size = 0) { bool io_memory{false}; - std::vector data_vector; + std::vector< test_log_data* > data_vector; for (int64_t i = 0; i < batch; ++i) { auto* d = prepare_data(lsn + i, io_memory, fixed_size); @@ -246,20 +247,16 @@ class LogDevTest : public ::testing::Test { logid_t get_last_truncate_idx(logdev_id_t logdev_id) { auto status = logstore_service().get_logdev(logdev_id)->get_status(0); - if (status.contains("last_truncate_log_idx")) { - return s_cast(status["last_truncate_log_idx"]); - } + if (status.contains("last_truncate_log_idx")) { return s_cast< logid_t >(status["last_truncate_log_idx"]); } LOGERROR("Failed to get last_truncate_log_idx from logdev status for logdev_id {}", logdev_id); - return static_cast(-1); + return static_cast< logid_t >(-1); } logid_t get_current_log_idx(logdev_id_t logdev_id) { auto status = logstore_service().get_logdev(logdev_id)->get_status(0); - if (status.contains("current_log_idx")) { - return s_cast(status["current_log_idx"]); - } + if (status.contains("current_log_idx")) { return s_cast< logid_t >(status["current_log_idx"]); } LOGERROR("Failed to get current_log_idx from logdev status for logdev_id {}", logdev_id); - return static_cast(-1); + return static_cast< logid_t >(-1); } }; @@ -268,7 +265,7 @@ TEST_F(LogDevTest, WriteSyncThenRead) { for (uint32_t iteration{0}; iteration < iterations; ++iteration) { LOGINFO("Iteration {}", iteration); - auto logdev_id = logstore_service().create_new_logdev(); + auto logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); auto log_store = logstore_service().create_new_log_store(logdev_id, false); const auto store_id = log_store->get_store_id(); @@ -288,7 +285,7 @@ TEST_F(LogDevTest, WriteSyncThenRead) { TEST_F(LogDevTest, Rollback) { LOGINFO("Step 1: Create a single logstore to start rollback test"); - auto logdev_id = logstore_service().create_new_logdev(); + auto logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); auto log_store = logstore_service().create_new_log_store(logdev_id, false); auto store_id = log_store->get_store_id(); @@ -296,7 +293,7 @@ TEST_F(LogDevTest, Rollback) { auto restart = [&]() { std::promise< bool > p; auto starting_cb = [&]() { - logstore_service().open_logdev(logdev_id); + logstore_service().open_logdev(logdev_id, flush_mode_t::EXPLICIT); logstore_service().open_log_store(logdev_id, store_id, false /* append_mode */).thenValue([&](auto store) { log_store = store; p.set_value(true); @@ -355,7 +352,7 @@ TEST_F(LogDevTest, Rollback) { TEST_F(LogDevTest, ReTruncate) { LOGINFO("Step 1: Create a single logstore to start re-truncate test"); - auto logdev_id = logstore_service().create_new_logdev(); + auto logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); auto log_store = logstore_service().create_new_log_store(logdev_id, false); @@ -382,7 +379,7 @@ TEST_F(LogDevTest, ReTruncate) { TEST_F(LogDevTest, TruncateWithExceedingLSN) { LOGINFO("Step 1: Create a single logstore to start truncate with exceeding LSN test"); - auto logdev_id = logstore_service().create_new_logdev(); + auto logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); auto log_store = logstore_service().create_new_log_store(logdev_id, false); @@ -426,7 +423,7 @@ TEST_F(LogDevTest, TruncateWithExceedingLSN) { TEST_F(LogDevTest, TruncateAfterRestart) { LOGINFO("Step 1: Create a single logstore to start truncate with overlapping LSN test"); - auto logdev_id = logstore_service().create_new_logdev(); + auto logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); auto log_store = logstore_service().create_new_log_store(logdev_id, false); auto store_id = log_store->get_store_id(); @@ -434,7 +431,7 @@ TEST_F(LogDevTest, TruncateAfterRestart) { auto restart = [&]() { std::promise< bool > p; auto starting_cb = [&]() { - logstore_service().open_logdev(logdev_id); + logstore_service().open_logdev(logdev_id, flush_mode_t::EXPLICIT); logstore_service().open_log_store(logdev_id, store_id, false /* append_mode */).thenValue([&](auto store) { log_store = store; p.set_value(true); @@ -477,13 +474,12 @@ TEST_F(LogDevTest, TruncateAfterRestart) { TEST_F(LogDevTest, TruncateAcrossMultipleStores) { LOGINFO("Step 1: Create 3 log stores to start truncate across multiple stores test"); - auto logdev_id = logstore_service().create_new_logdev(); + auto logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); auto store1 = logstore_service().create_new_log_store(logdev_id, false); auto store2 = logstore_service().create_new_log_store(logdev_id, false); auto store3 = logstore_service().create_new_log_store(logdev_id, false); - LOGINFO("Step 2: Insert 100 entries to store {}", store1->get_store_id()); logstore_seq_num_t cur_lsn = 0; kickstart_inserts(store1, cur_lsn, 100); @@ -644,15 +640,15 @@ TEST_F(LogDevTest, TruncateAcrossMultipleStores) { TEST_F(LogDevTest, TruncateLogsAfterFlushAndRestart) { LOGINFO("Step 1: Create a single logstore to start truncate-logs-after-flush-and-restart test"); - auto logdev_id = logstore_service().create_new_logdev(); + auto logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); auto log_store = logstore_service().create_new_log_store(logdev_id, false); auto store_id = log_store->get_store_id(); auto restart = [&]() { - std::promise < bool > p; + std::promise< bool > p; auto starting_cb = [&]() { - logstore_service().open_logdev(logdev_id); + logstore_service().open_logdev(logdev_id, flush_mode_t::EXPLICIT); logstore_service().open_log_store(logdev_id, store_id, false /* append_mode */).thenValue([&](auto store) { log_store = store; p.set_value(true); @@ -712,7 +708,7 @@ TEST_F(LogDevTest, CreateRemoveLogDev) { ASSERT_EQ(vdev->num_descriptors(), 0); for (uint32_t i{0}; i < num_logdev; ++i) { - auto id = logstore_service().create_new_logdev(); + auto id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); s_max_flush_multiple = logstore_service().get_logdev(id)->get_flush_size_multiple(); auto store = logstore_service().create_new_log_store(id, false); log_stores.push_back(store); @@ -760,7 +756,7 @@ TEST_F(LogDevTest, DeleteUnopenedLogDev) { // Test deletion of unopened logdev. std::set< logdev_id_t > id_set, unopened_id_set; for (uint32_t i{0}; i < num_logdev; ++i) { - auto id = logstore_service().create_new_logdev(); + auto id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); id_set.insert(id); if (i >= num_logdev / 2) { unopened_id_set.insert(id); } s_max_flush_multiple = logstore_service().get_logdev(id)->get_flush_size_multiple(); @@ -784,7 +780,7 @@ TEST_F(LogDevTest, DeleteUnopenedLogDev) { auto starting_cb = [&]() { auto it = id_set.begin(); for (uint32_t i{0}; i < id_set.size() / 2; i++, it++) { - logstore_service().open_logdev(*it); + logstore_service().open_logdev(*it, flush_mode_t::EXPLICIT); } }; start_homestore(true /* restart */, starting_cb); diff --git a/src/tests/test_log_store.cpp b/src/tests/test_log_store.cpp index 3b1b2c60b..8f18d71f2 100644 --- a/src/tests/test_log_store.cpp +++ b/src/tests/test_log_store.cpp @@ -455,7 +455,7 @@ class SampleDB { for (uint32_t i{0}; i < n_log_stores; ++i) { SampleLogStoreClient* client = m_log_store_clients[i].get(); - logstore_service().open_logdev(client->m_logdev_id); + logstore_service().open_logdev(client->m_logdev_id, flush_mode_t::EXPLICIT); logstore_service() .open_log_store(client->m_logdev_id, client->m_store_id, false /* append_mode */) .thenValue([i, this, client](auto log_store) { client->set_log_store(log_store); }); @@ -479,7 +479,7 @@ class SampleDB { std::vector< logdev_id_t > logdev_id_vec; for (uint32_t i{0}; i < n_log_devs; ++i) { - logdev_id_vec.push_back(logstore_service().create_new_logdev()); + logdev_id_vec.push_back(logstore_service().create_new_logdev(flush_mode_t::EXPLICIT)); } for (uint32_t i{0}; i < n_log_stores; ++i) { @@ -1225,7 +1225,7 @@ TEST_F(LogStoreTest, WriteSyncThenRead) { for (uint32_t iteration{0}; iteration < iterations; ++iteration) { LOGINFO("Iteration {}", iteration); - auto logdev_id = logstore_service().create_new_logdev(); + auto logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); auto tmp_log_store = logstore_service().create_new_log_store(logdev_id, false); const auto store_id = tmp_log_store->get_store_id(); LOGINFO("Created new log store -> id {}", store_id); diff --git a/src/tests/test_log_store_long_run.cpp b/src/tests/test_log_store_long_run.cpp index 5fd0ec21f..5a7437754 100644 --- a/src/tests/test_log_store_long_run.cpp +++ b/src/tests/test_log_store_long_run.cpp @@ -294,7 +294,7 @@ class LogStoreLongRun : public ::testing::Test { HS_SETTINGS_FACTORY().save(); for (uint32_t i{0}; i < n_log_stores; ++i) { SampleLogStoreClient* client = m_log_store_clients[i].get(); - logstore_service().open_logdev(client->m_logdev_id); + logstore_service().open_logdev(client->m_logdev_id, flush_mode_t::EXPLICIT); logstore_service() .open_log_store(client->m_logdev_id, client->m_store_id, false /* append_mode */) .thenValue([i, this, client](auto log_store) { client->set_log_store(log_store); }); @@ -318,7 +318,7 @@ class LogStoreLongRun : public ::testing::Test { std::vector< logdev_id_t > logdev_id_vec; for (uint32_t i{0}; i < n_log_devs; ++i) - logdev_id_vec.push_back(logstore_service().create_new_logdev()); + logdev_id_vec.push_back(logstore_service().create_new_logdev(flush_mode_t::EXPLICIT)); for (uint32_t i{0}; i < n_log_stores; ++i) m_log_store_clients.push_back(std::make_unique< SampleLogStoreClient >( @@ -466,7 +466,7 @@ class LogStoreLongRun : public ::testing::Test { validate_num_stores(); // Create a new logstore. - auto logdev_id = logstore_service().create_new_logdev(); + auto logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT); m_log_store_clients.push_back(std::make_unique< SampleLogStoreClient >( logdev_id, bind_this(LogStoreLongRun::on_log_insert_completion, 3))); validate_num_stores(); diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp index ec45ef5b4..aaec8851f 100644 --- a/src/tests/test_solo_repl_dev.cpp +++ b/src/tests/test_solo_repl_dev.cpp @@ -99,6 +99,7 @@ class SoloReplDevTest : public testing::Test { void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids, cintrusive< repl_req_ctx >& ctx) override { + LOGINFO("Received on_commit lsn={}", lsn); if (ctx == nullptr) { m_test.validate_replay(*repl_dev(), lsn, header, key, blkids); } else { @@ -232,8 +233,8 @@ class SoloReplDevTest : public testing::Test { uint32_t size = blkids.blk_count() * g_block_size; if (size) { auto read_sgs = HSTestHelper::create_sgs(size, size); - LOGDEBUG("[{}] Validating replay of lsn={} blkid = {}", boost::uuids::to_string(rdev.group_id()), lsn, - blkids.to_string()); + LOGINFO("[{}] Validating replay of lsn={} blkid = {}", boost::uuids::to_string(rdev.group_id()), lsn, + blkids.to_string()); rdev.async_read(blkids, read_sgs, size) .thenValue([this, hdr = *jhdr, read_sgs, lsn, blkids, &rdev](auto&& err) { RELEASE_ASSERT(!err, "Error during async_read"); @@ -243,8 +244,8 @@ class SoloReplDevTest : public testing::Test { HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, hdr.data_pattern); iomanager.iobuf_free(uintptr_cast(iov.iov_base)); } - LOGDEBUG("[{}] Replay of lsn={} blkid={} validated successfully", - boost::uuids::to_string(rdev.group_id()), lsn, blkids.to_string()); + LOGINFO("[{}] Replay of lsn={} blkid={} validated successfully", + boost::uuids::to_string(rdev.group_id()), lsn, blkids.to_string()); m_task_waiter.one_complete(); }); } else { @@ -258,15 +259,15 @@ class SoloReplDevTest : public testing::Test { req->read_sgs = HSTestHelper::create_sgs(req->write_sgs.size, req->write_sgs.size); auto const cap = hs()->repl_service().get_cap_stats(); - LOGDEBUG("Write complete with cap stats: used={} total={}", cap.used_capacity, cap.total_capacity); + LOGINFO("Write complete with cap stats: used={} total={}", cap.used_capacity, cap.total_capacity); rdev.async_read(req->written_blkids, req->read_sgs, req->read_sgs.size) .thenValue([this, &rdev, req](auto&& err) { RELEASE_ASSERT(!err, "Error during async_read"); - LOGDEBUG("[{}] Write complete with lsn={} for size={} blkids={}", - boost::uuids::to_string(rdev.group_id()), req->lsn(), req->write_sgs.size, - req->written_blkids.to_string()); + LOGINFO("[{}] Write complete with lsn={} for size={} blkids={}", + boost::uuids::to_string(rdev.group_id()), req->lsn(), req->write_sgs.size, + req->written_blkids.to_string()); auto hdr = r_cast< test_repl_req::journal_header* >(req->header->bytes()); HS_REL_ASSERT_EQ(hdr->data_size, req->read_sgs.size, "journal hdr data size mismatch with actual size"); @@ -298,7 +299,9 @@ TEST_F(SoloReplDevTest, TestRandomSizedDataBlock) { uint32_t key_size = rand() % 512 + 8; this->write_io(key_size, nblks * g_block_size, g_block_size); }); + this->m_io_runner.execute().get(); + LOGINFO("Step 2: Restart homestore and validate replay data.", g_block_size); this->m_task_waiter.start([this]() { this->restart(); }).get(); } @@ -306,6 +309,7 @@ TEST_F(SoloReplDevTest, TestHeaderOnly) { LOGINFO("Step 1: run on worker threads to schedule write"); this->m_io_runner.set_task([this]() { this->write_io(0u, 0u, g_block_size); }); this->m_io_runner.execute().get(); + LOGINFO("Step 2: Restart homestore and validate replay data.", g_block_size); this->m_task_waiter.start([this]() { this->restart(); }).get(); }