Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.7.8"
version = "6.8.0"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
18 changes: 11 additions & 7 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct repl_req_ctx;
using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >;
using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >;
using repl_req_ptr_t = boost::intrusive_ptr< repl_req_ctx >;
using trace_id_t = u_int64_t;

VENUM(repl_req_state_t, uint32_t,
INIT = 0, // Initial state
Expand All @@ -53,9 +54,10 @@ static constexpr uint64_t HOMESTORE_RESYNC_DATA_MAGIC = 0xa65dbd27c213f327;
static constexpr uint32_t HOMESTORE_RESYNC_DATA_PROTOCOL_VERSION_V1 = 0x01;

struct repl_key {
int32_t server_id{0}; // Server Id which this req is originated from
uint64_t term; // RAFT term number
uint64_t dsn{0}; // Data sequence number to tie the data with the raft journal entry
int32_t server_id{0}; // Server Id which this req is originated from
uint64_t term; // RAFT term number
uint64_t dsn{0}; // Data sequence number to tie the data with the raft journal entry
trace_id_t traceID{0}; // tracing ID provided by application that connects logs.

struct Hasher {
size_t operator()(repl_key const& rk) const {
Expand Down Expand Up @@ -120,6 +122,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
repl_key const& rkey() const { return m_rkey; }
uint64_t dsn() const { return m_rkey.dsn; }
uint64_t term() const { return m_rkey.term; }
trace_id_t traceID() const { return m_rkey.traceID; }
int64_t lsn() const { return m_lsn; }
bool is_proposer() const { return m_is_proposer; }
journal_type_t op_code() const { return m_op_code; }
Expand Down Expand Up @@ -385,7 +388,7 @@ class ReplDevListener {
}

/// @brief when restart, after all the logs are replayed and before joining raft group, notify the upper layer
virtual void on_log_replay_done(const group_id_t& group_id){};
virtual void on_log_replay_done(const group_id_t& group_id) {};

private:
std::weak_ptr< ReplDev > m_repl_dev;
Expand Down Expand Up @@ -416,7 +419,7 @@ class ReplDev {
/// @param ctx - User supplied context which will be passed to listener
/// callbacks
virtual void async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value,
repl_req_ptr_t ctx) = 0;
repl_req_ptr_t ctx, trace_id_t tid = 0) = 0;

/// @brief Reads the data and returns a future to continue on
/// @param bid Block id to read
Expand All @@ -427,13 +430,14 @@ class ReplDev {
/// @return A Future with std::error_code to notify if it has successfully read the data or any error code in case
/// of failure
virtual folly::Future< std::error_code > async_read(MultiBlkId const& blkid, sisl::sg_list& sgs, uint32_t size,
bool part_of_batch = false) = 0;
bool part_of_batch = false, trace_id_t tid = 0) = 0;

/// @brief After data is replicated and on_commit to the listener is called. the blkids can be freed.
///
/// @param lsn - LSN of the old blkids that is being freed
/// @param blkids - blkids to be freed.
virtual folly::Future< std::error_code > async_free_blks(int64_t lsn, MultiBlkId const& blkid) = 0;
virtual folly::Future< std::error_code > async_free_blks(int64_t lsn, MultiBlkId const& blkid,
trace_id_t tid = 0) = 0;

/// @brief Try to switch the current replica where this method called to become a leader.
/// @return True if it is successful, false otherwise.
Expand Down
19 changes: 11 additions & 8 deletions src/lib/replication/log_store/repl_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ uint64_t ReplLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) {
// We don't want to transform anything that is not an app log
if (entry->get_val_type() != nuraft::log_val_type::app_log || entry->get_buf_ptr()->size() == 0) {
ulong lsn = HomeRaftLogStore::append(entry);
RD_LOGD("append entry term={}, log_val_type={} lsn={} size={}", entry->get_term(),
RD_LOGD(NO_TRACE_ID, "None-APP log: append entry term={}, log_val_type={} lsn={} size={}", entry->get_term(),
static_cast< uint32_t >(entry->get_val_type()), lsn, entry->get_buf().size());
return lsn;
}
Expand All @@ -19,7 +19,7 @@ uint64_t ReplLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) {
ulong lsn = HomeRaftLogStore::append(entry);
m_sm.link_lsn_to_req(rreq, int64_cast(lsn));

RD_LOGD("Raft Channel: Received append log entry rreq=[{}]", rreq->to_compact_string());
RD_LOGT(rreq->traceID(), "Raft Channel: Received append log entry rreq=[{}]", rreq->to_compact_string());
return lsn;
}

Expand All @@ -33,7 +33,7 @@ void ReplLogStore::write_at(ulong index, nuraft::ptr< nuraft::log_entry >& entry
repl_req_ptr_t rreq = m_sm.localize_journal_entry_finish(*entry);
HomeRaftLogStore::write_at(index, entry);
m_sm.link_lsn_to_req(rreq, int64_cast(index));
RD_LOGD("Raft Channel: Received write_at log entry rreq=[{}]", rreq->to_compact_string());
RD_LOGT(rreq->traceID(), "Raft Channel: Received write_at log entry rreq=[{}]", rreq->to_compact_string());
}

void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
Expand All @@ -54,8 +54,8 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
}
}

RD_LOGT("Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={} {}", start_lsn, count,
reqs->size(), proposer_reqs->size());
RD_LOGT(NO_TRACE_ID, "Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={} {}",
start_lsn, count, reqs->size(), proposer_reqs->size());

if (!reqs->empty()) {
// Check the map if data corresponding to all of these requsts have been received and written. If not, schedule
Expand Down Expand Up @@ -85,7 +85,9 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
// so skip waiting data written and mark reqs as flushed here.
for (auto const& rreq : *proposer_reqs) {
if (rreq) {
RD_LOGT("Raft Channel: end_of_append_batch, I am proposer for lsn {}, only flushed log for it", rreq->lsn());
RD_LOGT(rreq->traceID(),
"Raft Channel: end_of_append_batch, I am proposer for lsn {}, only flushed log for it",
rreq->lsn());
rreq->add_state(repl_req_state_t::LOG_FLUSHED);
}
}
Expand All @@ -95,7 +97,7 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
auto rreq = m_sm.lsn_to_req(lsn);
if (rreq != nullptr) {
if (rreq->has_state(repl_req_state_t::ERRORED)) {
RD_LOGE("Raft Channel: rreq=[{}] met some errors before", rreq->to_compact_string());
RD_LOGE(rreq->traceID(), "Raft Channel: rreq=[{}] met some errors before", rreq->to_compact_string());
continue;
}
rreq->set_is_volatile(false);
Expand All @@ -107,9 +109,10 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
}

std::string ReplLogStore::rdev_name() const { return m_rd.rdev_name(); }
std::string ReplLogStore::identify_str() const { return m_rd.identify_str(); }

bool ReplLogStore::compact(ulong compact_upto_lsn) {
RD_LOG(DEBUG, "Raft Channel: compact_to_lsn={}", compact_upto_lsn);
RD_LOGD(NO_TRACE_ID, "Raft Channel: compact_to_lsn={}", compact_upto_lsn);
m_rd.on_compact(compact_upto_lsn);
return HomeRaftLogStore::compact(compact_upto_lsn);
}
Expand Down
1 change: 1 addition & 0 deletions src/lib/replication/log_store/repl_log_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class ReplLogStore : public HomeRaftLogStore {

private:
std::string rdev_name() const;
std::string identify_str() const;
};

} // namespace homestore
1 change: 1 addition & 0 deletions src/lib/replication/push_data_rpc.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ native_include "boost/uuid/uuid.hpp";
namespace homestore;

table PushDataRequest {
traceID: uint64; // traceID for the REQ
issuer_replica_id : int32; // Replica id of the issuer
raft_term : uint64; // Raft term number
dsn : uint64; // Data Sequence number
Expand Down
11 changes: 7 additions & 4 deletions src/lib/replication/repl_dev/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ ReplServiceError repl_req_ctx::init(repl_key rkey, journal_type_t op_code, bool
if (has_linked_data() && !has_state(repl_req_state_t::BLK_ALLOCATED)) {
auto alloc_status = alloc_local_blks(listener, data_size);
if (alloc_status != ReplServiceError::OK) {
LOGERROR("Allocate blk for rreq failed error={}", alloc_status);
LOGERRORMOD(replication, "[traceID={}] Allocate blk for rreq failed error={}", m_rkey.traceID,
alloc_status);
}
return alloc_status;
}
Expand All @@ -56,6 +57,7 @@ void repl_req_ctx::create_journal_entry(bool is_raft_buf, int32_t server_id) {
}

m_journal_entry->code = m_op_code;
m_journal_entry->traceID = m_rkey.traceID;
m_journal_entry->server_id = server_id;
m_journal_entry->dsn = m_rkey.dsn;
m_journal_entry->user_header_size = m_header.size();
Expand Down Expand Up @@ -106,7 +108,7 @@ ReplServiceError repl_req_ctx::alloc_local_blks(cshared< ReplDevListener >& list

if (hints_result.value().committed_blk_id.has_value()) {
//if the committed_blk_id is already present, use it and skip allocation and commitment
LOGINFO("For Repl_key=[{}] data already exists, skip", rkey().to_string());
LOGINFOMOD(replication, "[traceID={}] For Repl_key=[{}] data already exists, skip", rkey().traceID, rkey().to_string());
m_local_blkid = hints_result.value().committed_blk_id.value();
add_state(repl_req_state_t::BLK_ALLOCATED);
add_state(repl_req_state_t::DATA_RECEIVED);
Expand All @@ -120,6 +122,7 @@ ReplServiceError repl_req_ctx::alloc_local_blks(cshared< ReplDevListener >& list
auto status = data_service().alloc_blks(sisl::round_up(uint32_cast(data_size), data_service().get_blk_size()),
hints_result.value(), m_local_blkid);
if (status != BlkAllocStatus::SUCCESS) {
LOGWARNMOD(replication, "[traceID={}] block allocation failure, repl_key=[{}], status=[{}]", rkey().traceID, rkey(), status);
DEBUG_ASSERT_EQ(status, BlkAllocStatus::SUCCESS, "Unable to allocate blks");
return ReplServiceError::NO_SPACE_LEFT;
}
Expand All @@ -135,7 +138,7 @@ void repl_req_ctx::set_lsn(int64_t lsn) {
"Changing lsn for request={} on the fly can cause race condition, not expected. lsn {}, m_lsn {}",
to_string(), lsn, m_lsn);
m_lsn = lsn;
LOGTRACEMOD(replication, "Setting lsn={} for request={}", lsn, to_string());
LOGTRACEMOD(replication, "[traceID={}] Setting lsn={} for request={}", rkey().traceID, lsn, to_string());
}

bool repl_req_ctx::save_pushed_data(intrusive< sisl::GenericRpcData > const& pushed_data, uint8_t const* data,
Expand Down Expand Up @@ -199,7 +202,7 @@ void repl_req_ctx::release_data() {
// explicitly clear m_buf_for_unaligned_data as unaligned pushdata/fetchdata will be saved here
m_buf_for_unaligned_data = sisl::io_blob_safe{};
if (m_pushed_data) {
LOGTRACEMOD(replication, "m_pushed_data addr={}, m_rkey={}, m_lsn={}",
LOGTRACEMOD(replication, "[traceID={}] m_pushed_data addr={}, m_rkey={}, m_lsn={}", rkey().traceID,
static_cast< void* >(m_pushed_data.get()), m_rkey.to_string(), m_lsn);
m_pushed_data->send_response();
m_pushed_data = nullptr;
Expand Down
5 changes: 3 additions & 2 deletions src/lib/replication/repl_dev/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ struct repl_journal_entry {
uint16_t minor_version{JOURNAL_ENTRY_MINOR};

journal_type_t code;
int32_t server_id; // Server id from where journal entry is originated
uint64_t dsn; // Data seq number
trace_id_t traceID; // traceID provided by application, mostly for consolidate logs.
int32_t server_id; // Server id from where journal entry is originated
uint64_t dsn; // Data seq number
uint32_t user_header_size;
uint32_t key_size;
uint32_t value_size;
Expand Down
Loading
Loading