Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KVStore: Refine parallel prehandle snapshot (part-3) #9198

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 17 additions & 250 deletions dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,97 +33,22 @@ extern const int ILLFORMAT_RAFT_ROW;

namespace DB::DM
{

SSTFilesToBlockInputStream::SSTFilesToBlockInputStream( //
RegionPtr region_,
UInt64 snapshot_index_,
const SSTViewVec & snaps_,
const TiFlashRaftProxyHelper * proxy_helper_,
SnapshotSSTReader && snap_reader_,
TMTContext & tmt_,
std::optional<SSTScanSoftLimit> && soft_limit_,
std::shared_ptr<PreHandlingTrace::Item> prehandle_task_,
SSTFilesToBlockInputStreamOpts && opts_)
: region(std::move(region_))
, snapshot_index(snapshot_index_)
, snaps(snaps_)
, proxy_helper(proxy_helper_)
, tmt(tmt_)
, soft_limit(std::move(soft_limit_))
, prehandle_task(prehandle_task_)
, opts(std::move(opts_))
, snap_reader(std::move(snap_reader_))
, log(Logger::get(opts.log_prefix, fmt::format("region_id={} split_id={}", region->id(), snap_reader.getSplitId())))
{
const size_t split_id
= soft_limit.has_value() ? soft_limit.value().split_id : DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT;
log = Logger::get(opts.log_prefix, fmt::format("region_id={} split_id={}", region->id(), split_id));

// We have to initialize sst readers at an earlier stage,
// due to prehandle snapshot of single region feature in raftstore v2.
std::vector<SSTView> ssts_default;
std::vector<SSTView> ssts_write;
std::vector<SSTView> ssts_lock;

auto make_inner_func = [&](const TiFlashRaftProxyHelper * proxy_helper,
SSTView snap,
SSTReader::RegionRangeFilter range,
const LoggerPtr & log_) {
return std::make_unique<MonoSSTReader>(proxy_helper, snap, range, log_);
};
for (UInt64 i = 0; i < snaps.len; ++i)
{
const auto & snapshot = snaps.views[i];
switch (snapshot.type)
{
case ColumnFamilyType::Default:
ssts_default.push_back(snapshot);
break;
case ColumnFamilyType::Write:
ssts_write.push_back(snapshot);
break;
case ColumnFamilyType::Lock:
ssts_lock.push_back(snapshot);
break;
}
}

// Pass the log to SSTReader inorder to filter logs by table_id suffix
if (!ssts_default.empty())
{
default_cf_reader = std::make_unique<MultiSSTReader<MonoSSTReader, SSTView>>(
proxy_helper,
ColumnFamilyType::Default,
make_inner_func,
ssts_default,
log,
region->getRange());
}
if (!ssts_write.empty())
{
write_cf_reader = std::make_unique<MultiSSTReader<MonoSSTReader, SSTView>>(
proxy_helper,
ColumnFamilyType::Write,
make_inner_func,
ssts_write,
log,
region->getRange());
}
if (!ssts_lock.empty())
{
lock_cf_reader = std::make_unique<MultiSSTReader<MonoSSTReader, SSTView>>(
proxy_helper,
ColumnFamilyType::Lock,
make_inner_func,
ssts_lock,
log,
region->getRange());
}
LOG_INFO(
log,
"Finish Construct MultiSSTReader, write={} lock={} default={} region_id={} snapshot_index={}",
ssts_write.size(),
ssts_lock.size(),
ssts_default.size(),
this->region->id(),
snapshot_index);

// Init stat info.
process_keys.default_cf = 0;
process_keys.write_cf = 0;
Expand All @@ -137,46 +62,26 @@ SSTFilesToBlockInputStream::~SSTFilesToBlockInputStream() = default;

void SSTFilesToBlockInputStream::readPrefix() {}

void SSTFilesToBlockInputStream::checkFinishedState(SSTReaderPtr & reader, ColumnFamilyType cf)
{
// There must be no data left when we write suffix
if (!reader)
return;
if (!reader->remained())
return;

// now the stream must be stopped by `soft_limit`, let's check the keys in reader
RUNTIME_CHECK_MSG(soft_limit.has_value(), "soft_limit.has_value(), cf={}", magic_enum::enum_name(cf));
BaseBuffView cur = reader->keyView();
RUNTIME_CHECK_MSG(
buffToStrView(cur) > soft_limit.value().raw_end,
"cur > raw_end, cf={}",
magic_enum::enum_name(cf));
}

void SSTFilesToBlockInputStream::readSuffix()
{
// For aborted task, we don't need to check the finish state
if (!prehandle_task->isAbort())
{
checkFinishedState(write_cf_reader, ColumnFamilyType::Write);
checkFinishedState(default_cf_reader, ColumnFamilyType::Default);
checkFinishedState(lock_cf_reader, ColumnFamilyType::Lock);
snap_reader.checkFinishedState();
}

// reset all SSTReaders and return without writting blocks any more.
write_cf_reader.reset();
default_cf_reader.reset();
lock_cf_reader.reset();
snap_reader.reset();
}

Block SSTFilesToBlockInputStream::read()
{
std::string loaded_write_cf_key;

while (write_cf_reader && write_cf_reader->remained())
while (snap_reader.write_cf_reader && snap_reader.write_cf_reader->remained())
{
bool should_stop_advancing = maybeStopBySoftLimit(ColumnFamilyType::Write, write_cf_reader.get());
bool should_stop_advancing
= snap_reader.maybeStopBySoftLimit(ColumnFamilyType::Write, snap_reader.write_cf_reader.get());
if (should_stop_advancing)
{
// Load the last batch
Expand All @@ -191,8 +96,8 @@ Block SSTFilesToBlockInputStream::read()
// the lock column family, we will load all key-values which rowkeys are equal
// or less that the last rowkey from the write column family.
{
BaseBuffView key = write_cf_reader->keyView();
BaseBuffView value = write_cf_reader->valueView();
BaseBuffView key = snap_reader.write_cf_reader->keyView();
BaseBuffView value = snap_reader.write_cf_reader->valueView();
auto tikv_key = TiKVKey(key.data, key.len);
region->insert(ColumnFamilyType::Write, std::move(tikv_key), TiKVValue(value.data, value.len));
++process_keys.write_cf;
Expand All @@ -202,7 +107,7 @@ Block SSTFilesToBlockInputStream::read()
loaded_write_cf_key.assign(key.data, key.len);
}
} // Notice: `key`, `value` are string-view-like object, should never use after `next` called
write_cf_reader->next();
snap_reader.write_cf_reader->next();

// If there is enough data to form a Block, we will load all keys before `loaded_write_cf_key` in other cf.
if (process_keys.write_cf % opts.expected_size == 0)
Expand Down Expand Up @@ -243,14 +148,14 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(
DecodedTiKVKey * last_loaded_rowkey;
if (cf == ColumnFamilyType::Default)
{
reader = default_cf_reader.get();
reader = snap_reader.default_cf_reader.get();
p_process_keys = &process_keys.default_cf;
p_process_keys_bytes = &process_keys.default_cf_bytes;
last_loaded_rowkey = &default_last_loaded_rowkey;
}
else if (cf == ColumnFamilyType::Lock)
{
reader = lock_cf_reader.get();
reader = snap_reader.lock_cf_reader.get();
p_process_keys = &process_keys.lock_cf;
p_process_keys_bytes = &process_keys.lock_cf_bytes;
last_loaded_rowkey = &lock_last_loaded_rowkey;
Expand All @@ -260,7 +165,7 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(

if (reader && reader->remained())
{
maybeSkipBySoftLimit(cf, reader);
snap_reader.maybeSkipBySoftLimit(cf, reader);
}

Stopwatch sw;
Expand All @@ -270,7 +175,7 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(
{
while (reader && reader->remained())
{
if (maybeStopBySoftLimit(cf, reader))
if (snap_reader.maybeStopBySoftLimit(cf, reader))
{
break;
}
Expand Down Expand Up @@ -329,7 +234,7 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(
// Let's try to load keys until process_keys_offset_end
while (reader && reader->remained() && *p_process_keys < process_keys_offset_end)
{
if (maybeStopBySoftLimit(cf, reader))
if (snap_reader.maybeStopBySoftLimit(cf, reader))
{
break;
}
Expand Down Expand Up @@ -395,142 +300,4 @@ Block SSTFilesToBlockInputStream::readCommitedBlock()
}
}

size_t SSTFilesToBlockInputStream::getApproxBytes() const
{
size_t total = 0;
if (write_cf_reader)
total += write_cf_reader->approxSize();
if (lock_cf_reader)
total += lock_cf_reader->approxSize();
if (default_cf_reader)
total += default_cf_reader->approxSize();
return total;
}

std::vector<std::string> SSTFilesToBlockInputStream::findSplitKeys(size_t splits_count) const
{
return write_cf_reader->findSplitKeys(splits_count);
}

// Returning false means no skip is performed, the reader is intact.
// Returning true means skip is performed, must read from current value.
bool SSTFilesToBlockInputStream::maybeSkipBySoftLimit(ColumnFamilyType cf, SSTReader * reader)
{
if (!soft_limit.has_value())
return false;
const auto & start_limit = soft_limit.value().getStartLimit();
// If start is set to "", then there is no soft limit for start.
if (!start_limit)
return false;

if (!reader)
return false;

if (reader && reader->remained())
{
auto key = reader->keyView();
if (soft_limit.value().raw_start < buffToStrView(key))
{
// This happens when there is too many untrimmed data,
// or it is already seeked.
LOG_TRACE(
log,
"Re-Seek backward is forbidden, start_limit={} current={} cf={} split_id={} region_id={}",
soft_limit.value().raw_start.toDebugString(),
Redact::keyToDebugString(key.data, key.len),
magic_enum::enum_name(cf),
soft_limit.value().split_id,
region->id());
return false;
}
}
// Safety `soft_limit` outlives returned base buff view.
reader->seek(cppStringAsBuff(soft_limit.value().raw_start));

// Skip other versions of the same PK.
// TODO(split) use seek to optimize if failed several iterations.
size_t skipped_times = 0;
while (reader && reader->remained())
{
// Read until find the next pk.
auto key = reader->keyView();
// TODO the copy could be eliminated, but with many modifications.
auto tikv_key = TiKVKey(key.data, key.len);
auto current_truncated_ts = RecordKVFormat::getRawTiDBPK(RecordKVFormat::decodeTiKVKey(tikv_key));
// If found a new pk.
if (current_truncated_ts != start_limit)
{
RUNTIME_CHECK_MSG(
current_truncated_ts > start_limit,
"current pk decreases as reader advances, skipped_times={} start_raw={} start_pk={} current_pk={} "
"current_raw={} cf={} split_id={} region_id={}",
skipped_times,
soft_limit.value().raw_start.toDebugString(),
start_limit.value().toDebugString(),
current_truncated_ts.toDebugString(),
tikv_key.toDebugString(),
magic_enum::enum_name(cf),
soft_limit.value().split_id,
region->id());
LOG_INFO(
log,
"Re-Seek after skipped_times={} start_raw={} start_pk={} current_raw={} current_pk={} cf={} "
"split_id={} region_id={}",
skipped_times,
soft_limit.value().raw_start.toDebugString(),
start_limit.value().toDebugString(),
tikv_key.toDebugString(),
current_truncated_ts.toDebugString(),
magic_enum::enum_name(cf),
soft_limit.value().split_id,
region->id());
return true;
}
skipped_times++;
reader->next();
}
// `start_limit` is the last pk of the sst file.
LOG_INFO(
log,
"Re-Seek to the last key of write cf start_raw={} start_pk={} cf={} split_id={} region_id={}",
soft_limit.value().raw_start.toDebugString(),
start_limit.value().toDebugString(),
magic_enum::enum_name(cf),
soft_limit.value().split_id,
region->id());
return false;
}

bool SSTFilesToBlockInputStream::maybeStopBySoftLimit(ColumnFamilyType cf, SSTReader * reader)
{
if (!soft_limit.has_value())
return false;
const SSTScanSoftLimit & sl = soft_limit.value();
const auto & end_limit = soft_limit.value().getEndLimit();
if (!end_limit)
return false;

assert(reader != nullptr);
auto key = reader->keyView();
// TODO the copy could be eliminated, but with many modifications.
auto tikv_key = TiKVKey(key.data, key.len);
auto current_truncated_ts = RecordKVFormat::getRawTiDBPK(RecordKVFormat::decodeTiKVKey(tikv_key));
if (current_truncated_ts > end_limit)
{
LOG_INFO(
log,
"Reach end for split={} current={} pk={} end_limit={} cf={} split_id={} region_id={}",
sl.toDebugString(),
tikv_key.toDebugString(),
current_truncated_ts.toDebugString(),
end_limit->toDebugString(),
magic_enum::enum_name(cf),
soft_limit.value().split_id,
region->id());
// Seek to the end of reader to prevent further check.
reader->seekToLast();
return true;
}
return false;
}
} // namespace DB::DM
Loading