8000 [Feature] Change bucket number from physical partition level to materialized index level by xiangguangyxg · Pull Request #59441 · StarRocks/starrocks · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[Feature] Change bucket number from physical partition level to materialized index level #59441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 21 additions & 29 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,14 +564,13 @@ Status OlapTablePartitionParam::remove_partitions(const std::vector<int64_t>& pa
}

Status OlapTablePartitionParam::_find_tablets_with_list_partition(
Chunk* chunk, Columns partition_columns, std::vector<OlapTablePartition*>* partitions,
std::vector<uint32_t>* indexes, std::vector<uint8_t>* selection, std::vector<int>* invalid_row_indexs,
std::vector<std::vector<std::string>>* partition_not_exist_row_values) {
Chunk* chunk, const Columns& partition_columns, const std::vector<uint32_t>& hashes,
std::vector<OlapTablePartition*>* partitions, std::vector<uint8_t>* selection,
std::vector<int>* invalid_row_indexs, std::vector<std::vector<std::string>>* partition_not_exist_row_values) {
size_t num_rows = chunk->num_rows();
ChunkRow row;
row.columns = &partition_columns;
row.index = 0;
std::vector<Column*> partition_data_columns;
ChunkRow row(&partition_columns, 0);

std::vector<const Column*> partition_data_columns;
partition_data_columns.reserve(partition_columns.size());
for (auto& column : *(row.columns)) {
partition_data_columns.emplace_back(ColumnHelper::get_data_column(column.get()));
Expand All @@ -587,11 +586,9 @@ Status OlapTablePartitionParam::_find_tablets_with_list_partition(
row.index = i;
// list partition
auto it = _partitions_map.find(&row);
if (it != _partitions_map.end() &&
(part = _partitions[it->second[(*indexes)[i] % it->second.size()]]) != nullptr &&
if (it != _partitions_map.end() && (part = _partitions[it->second[hashes[i] % it->second.size()]]) != nullptr &&
_part_contains(part, &row)) {
(*partitions)[i] = part;
(*indexes)[i] = (*indexes)[i] % part->num_buckets;
} else {
if (partition_not_exist_row_values) {
auto partition_value_items = std::make_unique<std::vector<std::string>>();
Expand Down Expand Up @@ -622,13 +619,11 @@ Status OlapTablePartitionParam::_find_tablets_with_list_partition(
}

Status OlapTablePartitionParam::_find_tablets_with_range_partition(
Chunk* chunk, Columns partition_columns, std::vector<OlapTablePartition*>* partitions,
std::vector<uint32_t>* indexes, std::vector<uint8_t>* selection, std::vector<int>* invalid_row_indexs,
std::vector<std::vector<std::string>>* partition_not_exist_row_values) {
Chunk* chunk, const Columns& partition_columns, const std::vector<uint32_t>& hashes,
std::vector<OlapTablePartition*>* partitions, std::vector<uint8_t>* selection,
std::vector<int>* invalid_row_indexs, std::vector<std::vector<std::string>>* partition_not_exist_row_values) {
size_t num_rows = chunk->num_rows();
ChunkRow row;
row.columns = &partition_columns;
row.index = 0;
ChunkRow row(&partition_columns, 0);

std::set<std::vector<std::string>, VectorCompare> partition_columns_set;
for (size_t i = 0; i < num_rows; ++i) {
Expand All @@ -639,11 +634,9 @@ Status OlapTablePartitionParam::_find_tablets_with_range_partition(
row.index = i;
// range partition
auto it = _partitions_map.upper_bound(&row);
if (it != _partitions_map.end() &&
(part = _partitions[it->second[(*indexes)[i] % it->second.size()]]) != nullptr &&
if (it != _partitions_map.end() && (part = _partitions[it->second[hashes[i] % it->second.size()]]) != nullptr &&
_part_contains(part, &row)) {
(*partitions)[i] = part;
(*indexes)[i] = (*indexes)[i] % part->num_buckets;
} else {
if (partition_not_exist_row_values) {
// only support single column partition for range partition now
Expand Down Expand Up @@ -675,13 +668,13 @@ Status OlapTablePartitionParam::_find_tablets_with_range_partition(
}

Status OlapTablePartitionParam::find_tablets(Chunk* chunk, std::vector<OlapTablePartition*>* partitions,
std::vector<uint32_t>* indexes, std::vector<uint8_t>* selection,
std::vector<uint32_t>* hashes, std::vector<uint8_t>* selection,
std::vector<int>* invalid_row_indexs, int64_t txn_id,
std::vector<std::vector<std::string>>* partition_not_exist_row_values) {
size_t num_rows = chunk->num_rows();
partitions->resize(num_rows);

_compute_hashes(chunk, indexes);
_compute_hashes(chunk, hashes);

if (!_partition_columns.empty()) {
Columns partition_columns(_partition_slot_descs.size());
Expand All @@ -700,10 +693,10 @@ Status OlapTablePartitionParam::find_tablets(Chunk* chunk, std::vector<OlapTable

bool is_list_partition = _t_param.partitions[0].__isset.in_keys;
if (is_list_partition) {
return _find_tablets_with_list_partition(chunk, partition_columns, partitions, indexes, selection,
return _find_tablets_with_list_partition(chunk, partition_columns, *hashes, partitions, selection,
invalid_row_indexs, partition_not_exist_row_values);
} else {
return _find_tablets_with_range_partition(chunk, partition_columns, partitions, indexes, selection,
return _find_tablets_with_range_partition(chunk, partition_columns, *hashes, partitions, selection,
invalid_row_indexs, partition_not_exist_row_values);
}
} else {
Expand All @@ -713,28 +706,27 @@ Status OlapTablePartitionParam::find_tablets(Chunk* chunk, std::vector<OlapTable
auto& part_ids = _partitions_map.begin()->second;
for (size_t i = 0; i < num_rows; ++i) {
if ((*selection)[i]) {
(*partitions)[i] = _partitions[part_ids[(*indexes)[i] % _partitions.size()]];
(*indexes)[i] = (*indexes)[i] % (*partitions)[i]->num_buckets;
(*partitions)[i] = _partitions[part_ids[(*hashes)[i] % _partitions.size()]];
}
}
}
return Status::OK();
}

void OlapTablePartitionParam::_compute_hashes(Chunk* chunk, std::vector<uint32_t>* indexes) {
void OlapTablePartitionParam::_compute_hashes(const Chunk* chunk, std::vector<uint32_t>* hashes) {
size_t num_rows = chunk->num_rows();
indexes->assign(num_rows, 0);
hashes->assign(num_rows, 0);

for (size_t i = 0; i < _distributed_slot_descs.size(); ++i) {
_distributed_columns[i] = chunk->get_column_by_slot_id(_distributed_slot_descs[i]->id()).get();
_distributed_columns[i]->crc32_hash(&(*indexes)[0], 0, num_rows);
_distributed_columns[i]->crc32_hash(&(*hashes)[0], 0, num_rows);
}

// if no distributed columns, use random distribution
if (_distributed_slot_descs.size() == 0) {
uint32_t r = _rand.Next();
for (auto i = 0; i < num_rows; ++i) {
(*indexes)[i] = r++;
(*hashes)[i] = r++;
}
}
}
Expand Down
30 changes: 15 additions & 15 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ class StarRocksNodesInfo {

struct ChunkRow {
ChunkRow() = default;
ChunkRow(Columns* columns_, uint32_t index_) : columns(columns_), index(index_) {}
ChunkRow(const Columns* columns_, uint32_t index_) : columns(columns_), index(index_) {}

std::string debug_string();

Columns* columns = nullptr;
const Columns* columns = nullptr;
uint32_t index = 0;
};

Expand Down Expand Up @@ -258,7 +258,7 @@ class OlapTablePartitionParam {
// `invalid_row_index` stores index that chunk[index]
// has been filtered out for not being able to find tablet.
// it could be any row, becauset it's just for outputing error message for user to diagnose.
Status find_tablets(Chunk* chunk, std::vector<OlapTablePartition*>* partitions, std::vector<uint32_t>* indexes,
Status find_tablets(Chunk* chunk, std::vector<OlapTablePartition*>* partitions, std::vector<uint32_t>* hashes,
std::vector<uint8_t>* selection, std::vector<int>* invalid_row_indexs, int64_t txn_id,
std::vector<std::vector<std::string>>* partition_not_exist_row_values);

Expand All @@ -276,40 +276,40 @@ class OlapTablePartitionParam {
/**
* @brief find tablets with range partition table
* @param chunk input chunk
* @param partition_columns input partition columns
* @param partition_columns input partition columns
* @param hashes input row hashes
* @param partitions output partitions
* @param indexes output partition indexes
* @param selection chunk's selection
* @param invalid_row_indexs output invalid row indexs
* @param partition_not_exist_row_values output partition not exist row values
* @return Status
*/
Status _find_tablets_with_range_partition(Chunk* chunk, Columns partition_columns,
Status _find_tablets_with_range_partition(Chunk* chunk, const Columns& partition_columns,
const std::vector<uint32_t>& hashes,
std::vector<OlapTablePartition*>* partitions,
std::vector<uint32_t>* indexes, std::vector<uint8_t>* selection,
std::vector<int>* invalid_row_indexs,
std::vector<uint8_t>* selection, std::vector<int>* invalid_row_indexs,
std::vector<std::vector<std::string>>* partition_not_exist_row_values);

/**
* @brief find tablets with list partition table
* @param chunk input chunk
* @param partition_columns input partition columns
* @param partition_columns input partition columns
* @param hashes input row hashes
* @param partitions output partitions
* @param indexes output partition indexes
* @param selection chunk's selection
* @param invalid_row_indexs output invalid row indexs
* @param partition_not_exist_row_values output partition not exist row values
* @return Status
*/
Status _find_tablets_with_list_partition(Chunk* chunk, Columns partition_columns,
Status _find_tablets_with_list_partition(Chunk* chunk, const Columns& partition_columns,
const std::vector<uint32_t>& hashes,
std::vector<OlapTablePartition*>* partitions,
std::vector<uint32_t>* indexes, std::vector<uint8_t>* selection,
std::vector<int>* invalid_row_indexs,
std::vector<uint8_t>* selection, std::vector<int>* invalid_row_indexs,
std::vector<std::vector<std::string>>* partition_not_exist_row_values);

Status _create_partition_keys(const std::vector<TExprNode>& t_exprs, ChunkRow* part_key);

void _compute_hashes(Chunk* chunk, std::vector<uint32_t>* indexes);
void _compute_hashes(const Chunk* chunk, std::vector<uint32_t>* hashes);

// check if this partition contain this key
bool _part_contains(OlapTablePartition* part, ChunkRow* key) const {
Expand All @@ -327,7 +327,7 @@ class OlapTablePartitionParam {
std::vector<SlotDescriptor*> _partition_slot_descs;
std::vector<SlotDescriptor*> _distributed_slot_descs;
Columns _partition_columns;
std::vector<Column*> _distributed_columns;
std::vector<const Column*> _distributed_columns;
std::vector<ExprContext*> _partitions_expr_ctxs;

ObjectPool _obj_pool;
Expand Down
8 changes: 4 additions & 4 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ Status OlapTableSink::_send_chunk(RuntimeState* state, Chunk* chunk, bool nonblo
if (_enable_automatic_partition && !_has_automatic_partition) {
_partition_not_exist_row_values.clear();

RETURN_IF_ERROR(_vectorized_partition->find_tablets(chunk, &_partitions, &_tablet_indexes,
RETURN_IF_ERROR(_vectorized_partition->find_tablets(chunk, &_partitions, &_record_hashes,
&_validate_selection, &invalid_row_indexs, _txn_id,
&_partition_not_exist_row_values));

Expand All @@ -699,14 +699,14 @@ Status OlapTableSink::_send_chunk(RuntimeState* state, Chunk* chunk, bool nonblo
_automatic_partition_token->wait();
RETURN_IF_ERROR(this->_automatic_partition_status);
// after the partition is created, go through the data again
RETURN_IF_ERROR(_vectorized_partition->find_tablets(chunk, &_partitions, &_tablet_indexes,
RETURN_IF_ERROR(_vectorized_partition->find_tablets(chunk, &_partitions, &_record_hashes,
&_validate_selection, &invalid_row_indexs,
_txn_id, nullptr));
}
}
} else {
RETURN_IF_ERROR(this->_automatic_partition_status);
RETURN_IF_ERROR(_vectorized_partition->find_tablets(chunk, &_partitions, &_tablet_indexes,
RETURN_IF_ERROR(_vectorized_partition->find_tablets(chunk, &_partitions, &_record_hashes,
&_validate_selection, &invalid_row_indexs, _txn_id,
nullptr));
_has_automatic_partition = false;
Expand Down Expand Up @@ -761,7 +761,7 @@ Status OlapTableSink::_send_chunk(RuntimeState* state, Chunk* chunk, bool nonblo
StarRocksMetrics::instance()->load_bytes_total.increment(serialize_size);

SCOPED_TIMER(_ts_profile->send_data_timer);
return _tablet_sink_sender->send_chunk(_schema.get(), _partitions, _tablet_indexes, _validate_select_idx,
return _tablet_sink_sender->send_chunk(_schema.get(), _partitions, _record_hashes, _validate_select_idx,
_index_id_partition_ids, chunk);
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/tablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class OlapTableSink : public AsyncDataSink {
std::vector<std::unique_ptr<IndexChannel>> _channels;
std::vector<OlapTablePartition*> _partitions;
std::unordered_map<int64_t, std::set<int64_t>> _index_id_partition_ids;
std::vector<uint32_t> _tablet_indexes;
std::vector<uint32_t> _record_hashes;
// Store the output expr comput result column
std::unique_ptr<Chunk> _output_chunk;
bool _open_done{false};
Expand Down
18 changes: 12 additions & 6 deletions be/src/exec/tablet_sink_colocate_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ TabletSinkColocateSender::TabletSinkColocateSender(

Status TabletSinkColocateSender::send_chunk(const OlapTableSchemaParam* schema,
const std::vector<OlapTablePartition*>& partitions,
const std::vector<uint32_t>& tablet_indexes,
const std::vector<uint32_t>& record_hashes,
const std::vector<uint16_t>& validate_select_idx,
std::unordered_map<int64_t, std::set<int64_t>>& index_id_partition_id,
Chunk* chunk) {
if (UNLIKELY(!_colocate_mv_index)) {
return TabletSinkSender::send_chunk(schema, partitions, tablet_indexes, validate_select_idx,
return TabletSinkSender::send_chunk(schema, partitions, record_hashes, validate_select_idx,
index_id_partition_id, chunk);
}

Expand All @@ -59,8 +59,11 @@ Status TabletSinkColocateSender::send_chunk(const OlapTableSchemaParam* schema,
auto* index = schema->indexes()[i];
for (size_t j = 0; j < selection_size; ++j) {
uint16_t selection = validate_select_idx[j];
index_id_partition_id[index->index_id].emplace(partitions[selection]->id);
_index_tablet_ids[i][selection] = partitions[selection]->indexes[i].tablets[tablet_indexes[selection]];
const auto* partition = partitions[selection];
index_id_partition_id[index->index_id].emplace(partition->id);
const auto& tablets = partition->indexes[i].tablets;
// TODO: remove num_buckets
_index_tablet_ids[i][selection] = tablets[record_hashes[selection] % partition->num_buckets];
}
}
return _send_chunks(schema, chunk, _index_tablet_ids, validate_select_idx);
Expand All @@ -71,8 +74,11 @@ Status TabletSinkColocateSender::send_chunk(const OlapTableSchemaParam* schema,
auto* index = schema->indexes()[i];
_index_tablet_ids[i].resize(num_rows);
for (size_t j = 0; j < num_rows; ++j) {
index_id_partition_id[index->index_id].emplace(partitions[j]->id);
_index_tablet_ids[i][j] = partitions[j]->indexes[i].tablets[tablet_indexes[j]];
const auto* partition = partitions[j];
index_id_partition_id[index->index_id].emplace(partition->id);
const auto& tablets = partition->indexes[i].tablets;
// TODO: remove num_buckets
_index_tablet_ids[i][j] = tablets[record_hashes[j] % partition->num_buckets];
}
}
return _send_chunks(schema, chunk, _index_tablet_ids, validate_select_idx);
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/tablet_sink_colocate_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class TabletSinkColocateSender final : public TabletSinkSender {

public:
Status send_chunk(const OlapTableSchemaParam* schema, const std::vector<OlapTablePartition*>& partitions,
const std::vector<uint32_t>& tablet_indexes, const std::vector<uint16_t>& validate_select_idx,
const std::vector<uint32_t>& record_hashes, const std::vector<uint16_t>& validate_select_idx,
std::unordered_map<int64_t, std::set<int64_t>>& index_id_partition_id, Chunk* chunkk) override;

Status try_open(RuntimeState* state) override;
Expand Down
Loading
Loading
0