Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.9.2"
version = "6.9.3"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
7 changes: 7 additions & 0 deletions src/include/homestore/logstore/log_store_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ typedef std::function< void(std::shared_ptr< HomeLogStore >, logstore_seq_num_t)

typedef int64_t logid_t;

VENUM(flush_mode_t, uint32_t, // Various flush modes (can be or'ed together)
INLINE = 1 << 0, // Allow flush inline with the append
TIMER = 1 << 1, // Allow timer based automatic flush
EXPLICIT = 1 << 2, // Allow explcitly user calling flush
);

struct logdev_key {
logid_t idx;
off_t dev_offset;
Expand Down Expand Up @@ -172,4 +178,5 @@ struct logstore_superblk {
logstore_seq_num_t m_first_seq_num{0};
};
#pragma pack()

} // namespace homestore
6 changes: 3 additions & 3 deletions src/include/homestore/logstore_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@ class LogStoreService {
* chunks. Logdev can start with zero chunks and dynamically add chunks based on write request.
* @return Newly created log dev id.
*/
logdev_id_t create_new_logdev();
logdev_id_t create_new_logdev(flush_mode_t flush_mode);

/**
* @brief Open a log dev.
*
* @param logdev_id: Logdev ID
* @return Newly created log dev id.
*/
void open_logdev(logdev_id_t logdev_id);
void open_logdev(logdev_id_t logdev_id, flush_mode_t flush_mode);

/**
* @brief Destroy a log dev.
Expand Down Expand Up @@ -178,7 +178,7 @@ class LogStoreService {
void delete_unopened_logdevs();

private:
std::shared_ptr< LogDev > create_new_logdev_internal(logdev_id_t logdev_id);
std::shared_ptr< LogDev > create_new_logdev_internal(logdev_id_t logdev_id, flush_mode_t flush_mode);
void on_meta_blk_found(const sisl::byte_view& buf, void* meta_cookie);
logdev_id_t get_next_logdev_id();
void logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie);
Expand Down
15 changes: 9 additions & 6 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void LogDev::start(bool format, std::shared_ptr< JournalVirtualDev > vdev) {
// First read the info block
if (format) {
HS_LOG_ASSERT(m_logdev_meta.is_empty(), "Expected meta to be not present");
m_logdev_meta.create(m_logdev_id);
m_logdev_meta.create(m_logdev_id, m_flush_mode);
m_vdev_jd->update_data_start_offset(0);
} else {
HS_LOG_ASSERT(!m_logdev_meta.is_empty(), "Expected meta data to be read already before loading");
Expand Down Expand Up @@ -108,7 +108,6 @@ LogDev::~LogDev() {
HS_LOG_ASSERT((m_pending_flush_size.load() == 0),
"LogDev stop attempted while writes to logdev are pending completion");

if (allow_timer_flush()) stop_timer();
m_log_records.reset(nullptr);
m_logdev_meta.reset();
m_log_idx.store(0);
Expand Down Expand Up @@ -149,6 +148,7 @@ void LogDev::stop() {
// after we call stop, we need to do any pending device truncations
truncate();
m_id_logstore_map.clear();
if (allow_timer_flush()) stop_timer();
}

void LogDev::destroy() {
Expand Down Expand Up @@ -521,7 +521,7 @@ void LogDev::on_flush_completion(LogGroup* lg) {
// since we support out-of-order lsn write, so no need to guarantee the order of logstore write completion
for (auto const& [idx, req] : req_map) {
m_pending_callback++;
iomanager.run_on_forget(iomgr::reactor_regex::random_worker, iomgr::fiber_regex::syncio_only,
iomanager.run_on_forget(iomgr::reactor_regex::random_worker, /* iomgr::fiber_regex::syncio_only, */
[this, dev_offset, idx, req]() {
auto ld_key = logdev_key{idx, dev_offset};
auto comp_cb = req->log_store->get_comp_cb();
Expand Down Expand Up @@ -561,11 +561,13 @@ uint64_t LogDev::truncate() {
// Persist the logstore superblock to ensure correct start LSN during recovery. Avoid such scenario:
// 1. Follower1 appends logs up to 100, then is stopped by a sigkill.
// 2. Upon restart, a baseline resync is triggered using snapshot 2000.
// 3. Baseline resync completed with start_lsn=2001, but m_trunc_ld_key remains {0,0} since we cannot get a valid
// 3. Baseline resync completed with start_lsn=2001, but m_trunc_ld_key remains {0,0} since we cannot get a
// valid
// device offset for LSN 2000 to update it.
// 4. Follower1 appends logs from 2001 to 2500, making tail_lsn > 2000.
// 5. Get m_trunc_ld_key={0,0}, goto here and return 0 without persist.
// 6. Follower1 is killed again, after restart, its start index remains 0, misinterpreting the range as [1,2500].
// 6. Follower1 is killed again, after restart, its start index remains 0, misinterpreting the range as
// [1,2500].
m_logdev_meta.persist();
decr_pending_request_num();
return 0;
Expand Down Expand Up @@ -781,7 +783,7 @@ nlohmann::json LogDev::get_status(int verbosity) const {
/////////////////////////////// LogDevMetadata Section ///////////////////////////////////////
LogDevMetadata::LogDevMetadata() : m_sb{logdev_sb_meta_name}, m_rollback_sb{logdev_rollback_sb_meta_name} {}

logdev_superblk* LogDevMetadata::create(logdev_id_t id) {
logdev_superblk* LogDevMetadata::create(logdev_id_t id, flush_mode_t flush_mode) {
logdev_superblk* sb = m_sb.create(logdev_sb_size_needed(0));
rollback_superblk* rsb = m_rollback_sb.create(rollback_superblk::size_needed(1));

Expand All @@ -790,6 +792,7 @@ logdev_superblk* LogDevMetadata::create(logdev_id_t id) {

m_id_reserver = std::make_unique< sisl::IDReserver >();
m_sb->logdev_id = id;
m_sb->flush_mode = flush_mode;
m_sb.write();

m_rollback_sb->logdev_id = id;
Expand Down
10 changes: 3 additions & 7 deletions src/lib/logstore/log_dev.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ struct logdev_superblk {
uint32_t num_stores{0};
uint64_t start_dev_offset{0};
logid_t key_idx{0};
flush_mode_t flush_mode;

// The meta data starts immediately after the super block
// Equivalent of:
// logstore_superblk meta[0];
Expand Down Expand Up @@ -481,7 +483,7 @@ class LogDevMetadata {
LogDevMetadata& operator=(LogDevMetadata&&) noexcept = delete;
~LogDevMetadata() = default;

logdev_superblk* create(logdev_id_t id);
logdev_superblk* create(logdev_id_t id, flush_mode_t);
void reset();
std::vector< std::pair< logstore_id_t, logstore_superblk > > load();
void persist();
Expand Down Expand Up @@ -572,12 +574,6 @@ struct logstore_info {
static std::string const logdev_sb_meta_name{"Logdev_sb"};
static std::string const logdev_rollback_sb_meta_name{"Logdev_rollback_sb"};

VENUM(flush_mode_t, uint32_t, // Various flush modes (can be or'ed together)
INLINE = 1 << 0, // Allow flush inline with the append
TIMER = 1 << 1, // Allow timer based automatic flush
EXPLICIT = 1 << 2, // Allow explcitly user calling flush
);

class LogDev : public std::enable_shared_from_this< LogDev > {
friend class HomeLogStore;

Expand Down
15 changes: 8 additions & 7 deletions src/lib/logstore/log_store_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ logdev_id_t LogStoreService::get_next_logdev_id() {
return id;
}

logdev_id_t LogStoreService::create_new_logdev() {
logdev_id_t LogStoreService::create_new_logdev(flush_mode_t flush_mode) {
if (is_stopping()) return 0;
incr_pending_request_num();
folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx);
logdev_id_t logdev_id = get_next_logdev_id();
auto logdev = create_new_logdev_internal(logdev_id);
auto logdev = create_new_logdev_internal(logdev_id, flush_mode);
logdev->start(true /* format */, m_logdev_vdev);
COUNTER_INCREMENT(m_metrics, logdevs_count, 1);
HS_LOG(INFO, logstore, "Created log_dev={}", logdev_id);
Expand Down Expand Up @@ -189,19 +189,19 @@ void LogStoreService::delete_unopened_logdevs() {
m_unopened_logdev.clear();
}

std::shared_ptr< LogDev > LogStoreService::create_new_logdev_internal(logdev_id_t logdev_id) {
auto logdev = std::make_shared< LogDev >(logdev_id);
std::shared_ptr< LogDev > LogStoreService::create_new_logdev_internal(logdev_id_t logdev_id, flush_mode_t flush_mode) {
auto logdev = std::make_shared< LogDev >(logdev_id, flush_mode);
const auto it = m_id_logdev_map.find(logdev_id);
HS_REL_ASSERT((it == m_id_logdev_map.end()), "logdev id {} already exists", logdev_id);
m_id_logdev_map.insert(std::make_pair<>(logdev_id, logdev));
return logdev;
}

void LogStoreService::open_logdev(logdev_id_t logdev_id) {
void LogStoreService::open_logdev(logdev_id_t logdev_id, flush_mode_t flush_mode) {
Comment thread
sanebay marked this conversation as resolved.
folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx);
const auto it = m_id_logdev_map.find(logdev_id);
if (it == m_id_logdev_map.end()) {
auto logdev = std::make_shared< LogDev >(logdev_id);
auto logdev = std::make_shared< LogDev >(logdev_id, flush_mode);
m_id_logdev_map.emplace(logdev_id, logdev);
LOGDEBUGMOD(logstore, "log_dev={} does not exist, created!", logdev_id);
}
Expand Down Expand Up @@ -238,13 +238,14 @@ void LogStoreService::logdev_super_blk_found(const sisl::byte_view& buf, void* m
folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx);
std::shared_ptr< LogDev > logdev;
auto id = sb->logdev_id;
auto flush_mode = sb->flush_mode;
const auto it = m_id_logdev_map.find(id);
// We could update the logdev map either with logdev or rollback superblks found callbacks.
if (it != m_id_logdev_map.end()) {
logdev = it->second;
HS_LOG(DEBUG, logstore, "Log dev superblk found log_dev={}", id);
} else {
logdev = std::make_shared< LogDev >(id);
logdev = std::make_shared< LogDev >(id, flush_mode);
m_id_logdev_map.emplace(id, logdev);
// when recover logdev meta blk, we get all the logdevs from the superblk. we put them in m_unopened_logdev
// too. after logdev meta blks are all recovered, when a client opens a logdev, we remove it from
Expand Down
8 changes: 4 additions & 4 deletions src/lib/replication/log_store/home_raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore
m_dummy_log_entry = nuraft::cs_new< nuraft::log_entry >(0, nuraft::buffer::alloc(0), nuraft::log_val_type::app_log);

if (logstore_id == UINT32_MAX) {
m_logdev_id = logstore_service().create_new_logdev();
m_logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT);
m_log_store = logstore_service().create_new_log_store(m_logdev_id, true);
if (!m_log_store) { throw std::runtime_error("Failed to create log store"); }
m_logstore_id = m_log_store->get_store_id();
Expand All @@ -101,7 +101,7 @@ HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore
m_logdev_id = logdev_id;
m_logstore_id = logstore_id;
LOGDEBUGMOD(replication, "Opening existing home log_dev={} log_store={}", m_logdev_id, logstore_id);
logstore_service().open_logdev(m_logdev_id);
logstore_service().open_logdev(m_logdev_id, flush_mode_t::EXPLICIT);
Comment thread
sanebay marked this conversation as resolved.
m_log_store_future = logstore_service()
.open_log_store(m_logdev_id, logstore_id, true, log_found_cb, log_replay_done_cb)
.thenValue([this](auto log_store) {
Expand Down Expand Up @@ -382,8 +382,8 @@ ulong HomeRaftLogStore::last_durable_index() {

void HomeRaftLogStore::purge_all_logs() {
auto last_lsn = m_log_store->get_contiguous_issued_seq_num(m_last_durable_lsn);
REPL_STORE_LOG(INFO, "Store={} LogDev={}: Purging all logs in the log store, last_lsn={}",
m_logstore_id, m_logdev_id, last_lsn);
REPL_STORE_LOG(INFO, "Store={} LogDev={}: Purging all logs in the log store, last_lsn={}", m_logstore_id,
m_logdev_id, last_lsn);
m_log_store->truncate(last_lsn, false /* in_memory_truncate_only */);
}

Expand Down
26 changes: 21 additions & 5 deletions src/lib/replication/repl_dev/solo_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace homestore {
SoloReplDev::SoloReplDev(superblk< repl_dev_superblk >&& rd_sb, bool load_existing) :
m_rd_sb{std::move(rd_sb)}, m_group_id{m_rd_sb->group_id} {
if (load_existing) {
logstore_service().open_logdev(m_rd_sb->logdev_id);
logstore_service().open_logdev(m_rd_sb->logdev_id, flush_mode_t::TIMER);
logstore_service()
.open_log_store(m_rd_sb->logdev_id, m_rd_sb->logstore_id, true /* append_mode */)
.thenValue([this](auto log_store) {
Expand All @@ -19,7 +19,7 @@ SoloReplDev::SoloReplDev(superblk< repl_dev_superblk >&& rd_sb, bool load_existi
m_data_journal->register_log_found_cb(bind_this(SoloReplDev::on_log_found, 3));
});
} else {
m_logdev_id = logstore_service().create_new_logdev();
m_logdev_id = logstore_service().create_new_logdev(flush_mode_t::TIMER);
m_data_journal = logstore_service().create_new_log_store(m_logdev_id, true /* append_mode */);
m_rd_sb->logstore_id = m_data_journal->get_store_id();
m_rd_sb->logdev_id = m_logdev_id;
Expand All @@ -30,6 +30,8 @@ SoloReplDev::SoloReplDev(superblk< repl_dev_superblk >&& rd_sb, bool load_existi
void SoloReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value,
repl_req_ptr_t rreq, trace_id_t tid) {
if (!rreq) { auto rreq = repl_req_ptr_t(new repl_req_ctx{}); }

incr_pending_request_num();
auto status = rreq->init(repl_key{.server_id = 0, .term = 1, .dsn = 1, .traceID = tid},
value.size ? journal_type_t::HS_DATA_LINKED : journal_type_t::HS_DATA_INLINED, true,
header, key, value.size, m_listener);
Expand Down Expand Up @@ -60,6 +62,7 @@ void SoloReplDev::write_journal(repl_req_ptr_t rreq) {

data_service().commit_blk(rreq->local_blkid());
m_listener->on_commit(rreq->lsn(), rreq->header(), rreq->key(), rreq->local_blkid(), rreq);
decr_pending_request_num();
});
}

Expand All @@ -68,7 +71,6 @@ void SoloReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx
uint32_t remain_size = buf.size() - sizeof(repl_journal_entry);
HS_REL_ASSERT_EQ(entry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR,
"Mismatched version of journal entry found");
HS_REL_ASSERT_EQ(entry->code, journal_type_t::HS_DATA_LINKED, "Found a journal entry which is not data");

uint8_t const* raw_ptr = r_cast< uint8_t const* >(entry) + sizeof(repl_journal_entry);
sisl::blob header{raw_ptr, entry->user_header_size};
Expand All @@ -95,11 +97,25 @@ void SoloReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx

folly::Future< std::error_code > SoloReplDev::async_read(MultiBlkId const& bid, sisl::sg_list& sgs, uint32_t size,
bool part_of_batch, trace_id_t tid) {
return data_service().async_read(bid, sgs, size, part_of_batch);
if (is_stopping()) {
LOGINFO("repl dev is being shutdown!");
return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_canceled));
}
incr_pending_request_num();
auto result = data_service().async_read(bid, sgs, size, part_of_batch);
decr_pending_request_num();
return result;
}

folly::Future< std::error_code > SoloReplDev::async_free_blks(int64_t, MultiBlkId const& bid, trace_id_t tid) {
return data_service().async_free_blk(bid);
if (is_stopping()) {
LOGINFO("repl dev is being shutdown!");
return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_canceled));
}
incr_pending_request_num();
auto result = data_service().async_free_blk(bid);
decr_pending_request_num();
return result;
}

uint32_t SoloReplDev::get_blk_size() const { return data_service().get_blk_size(); }
Expand Down
2 changes: 1 addition & 1 deletion src/lib/replication/repl_dev/solo_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class SoloReplDev : public ReplDev {
bool is_ready_for_traffic() const override { return true; }
void purge() override {}

std::shared_ptr<snapshot_context> deserialize_snapshot_context(sisl::io_blob_safe &snp_ctx) override {
std::shared_ptr< snapshot_context > deserialize_snapshot_context(sisl::io_blob_safe& snp_ctx) override {
return nullptr;
}

Expand Down
23 changes: 21 additions & 2 deletions src/lib/replication/service/generic_repl_svc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ hs_stats GenericReplService::get_cap_stats() const {

///////////////////// SoloReplService specializations and CP Callbacks /////////////////////////////
SoloReplService::SoloReplService(cshared< ReplApplication >& repl_app) : GenericReplService{repl_app} {}
SoloReplService::~SoloReplService(){};
SoloReplService::~SoloReplService() {};

void SoloReplService::start() {
for (auto const& [buf, mblk] : m_sb_bufs) {
Expand All @@ -97,7 +97,23 @@ void SoloReplService::start() {
}

void SoloReplService::stop() {
// TODO: Implement graceful shutdown for soloReplService
start_stopping();
while (true) {
auto pending_request_num = get_pending_request_num();
if (!pending_request_num) break;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

// stop all repl_devs
{
std::unique_lock lg(m_rd_map_mtx);
for (auto it = m_rd_map.begin(); it != m_rd_map.end(); ++it) {
auto rdev = std::dynamic_pointer_cast< SoloReplDev >(it->second);
rdev->stop();
}
}
hs()->logstore_service().stop();
hs()->data_service().stop();
}

AsyncReplResult< shared< ReplDev > > SoloReplService::create_repl_dev(group_id_t group_id,
Expand All @@ -110,17 +126,20 @@ AsyncReplResult< shared< ReplDev > > SoloReplService::create_repl_dev(group_id_t
auto listener = m_repl_app->create_repl_dev_listener(group_id);
listener->set_repl_dev(rdev);
rdev->attach_listener(std::move(listener));
incr_pending_request_num();

{
std::unique_lock lg(m_rd_map_mtx);
auto [it, happened] = m_rd_map.emplace(group_id, rdev);
if (!happened) {
// We should never reach here, as we have failed to emplace in map, but couldn't find entry
DEBUG_ASSERT(false, "Unable to put the repl_dev in rd map");
decr_pending_request_num();
return make_async_error< shared< ReplDev > >(ReplServiceError::SERVER_ALREADY_EXISTS);
}
}

decr_pending_request_num();
return make_async_success< shared< ReplDev > >(rdev);
}

Expand Down
2 changes: 1 addition & 1 deletion src/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ if (${io_tests})
add_test(NAME DataService-Epoll COMMAND test_data_service)
add_test(NAME RaftReplDev-Epoll COMMAND test_raft_repl_dev)
# add_test(NAME RaftReplDevDynamic-Epoll COMMAND test_raft_repl_dev_dynamic)
# add_test(NAME SoloReplDev-Epoll COMMAND test_solo_repl_dev)
add_test(NAME SoloReplDev-Epoll COMMAND test_solo_repl_dev)
endif()

can_build_spdk_io_tests(spdk_tests)
Expand Down
2 changes: 1 addition & 1 deletion src/tests/log_store_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class BenchLogStore {
public:
friend class SampleDB;
BenchLogStore() {
m_logdev_id = logstore_service().create_new_logdev();
m_logdev_id = logstore_service().create_new_logdev(flush_mode_t::EXPLICIT);
m_log_store = logstore_service().create_new_log_store(m_logdev_id, true /* append_mode */);
m_log_store->register_log_found_cb(bind_this(BenchLogStore::on_log_found, 3));
m_nth_entry.store(0);
Expand Down
Loading
Loading