8000 Leverage `VectorType` in `ColumnDataCollection` by lnkuiper · Pull Request #17881 · duckdb/duckdb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Leverage VectorType in ColumnDataCollection #17881

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 27 additions & 29 deletions src/common/types/column/column_data_allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,44 +195,42 @@ data_ptr_t ColumnDataAllocator::GetDataPointer(ChunkManagementState &state, uint
return state.handles[block_id].Ptr() + offset;
}

void ColumnDataAllocator::UnswizzlePointers(ChunkManagementState &state, Vector &result, idx_t v_offset, uint16_t count,
uint32_t block_id, uint32_t offset) {
void ColumnDataAllocator::UnswizzlePointers(ChunkManagementState &state, Vector &result,
SwizzleMetaData &swizzle_segment, const VectorMetaData &string_heap_segment,
const idx_t &v_offset, const bool &copied) {
D_ASSERT(result.GetType().InternalType() == PhysicalType::VARCHAR);
lock_guard<mutex> guard(lock);
const auto old_base_ptr = char_ptr_cast(swizzle_segment.ptr);
const auto new_base_ptr =
char_ptr_cast(GetDataPointer(state, string_heap_segment.block_id, string_heap_segment.offset));
if (old_base_ptr == new_base_ptr) {
return; // pointers are still valid
}

auto &validity = FlatVector::Validity(result);
auto strings = FlatVector::GetData<string_t>(result);
const auto &validity = FlatVector::Validity(result);
const auto strings = FlatVector::GetData<string_t>(result);

// find first non-inlined string
auto i = NumericCast<uint32_t>(v_offset);
const uint32_t end = NumericCast<uint32_t>(v_offset + count);
for (; i < end; i++) {
if (!validity.RowIsValid(i)) {
// recompute pointers
const auto start = NumericCast<idx_t>(v_offset + swizzle_segment.offset);
const auto end = start + NumericCast<idx_t>(swizzle_segment.count);
for (idx_t i = start; i < end; i++) {
auto &str = strings[i];
if (!validity.RowIsValid(i) || str.IsInlined()) {
continue;
}
if (!strings[i].IsInlined()) {
break;
const auto str_offset = str.GetPointer() - old_base_ptr;
D_ASSERT(str_offset >= 0);
str.SetPointer(new_base_ptr + str_offset);
#ifdef D_ASSERT_IS_ENABLED
if (result.GetType() == LogicalType::VARCHAR) {
str.Verify();
}
}
// at least one string must be non-inlined, otherwise this function should not be called
D_ASSERT(i < end);

auto base_ptr = char_ptr_cast(GetDataPointer(state, block_id, offset));
if (strings[i].GetData() == base_ptr) {
// pointers are still valid
return;
#endif
}

// pointer mismatch! pointers are invalid, set them correctly
for (; i < end; i++) {
if (!validity.RowIsValid(i)) {
continue;
}
if (strings[i].IsInlined()) {
continue;
}
strings[i].SetPointer(base_ptr);
base_ptr += strings[i].GetSize();
if (!copied) {
// if the data was not copied, we modified data on the blocks. store the new base ptr
swizzle_segment.ptr = data_ptr_cast(new_base_ptr);
}
}

Expand Down
280 changes: 214 additions & 66 deletions src/common/types/column/column_data_collection.cpp
EFB7
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,142 @@ static void ColumnDataCopy(ColumnDataMetaData &meta_data, const UnifiedVectorFor
TemplatedColumnDataCopy<StandardValueCopy<T>>(meta_data, source_data, source, offset, copy_count);
}

bool ColumnDataCopyCompressedStrings(ColumnDataMetaData &meta_data, const VectorDataIndex &current_index,
VectorDataIndex &child_index, const UnifiedVectorFormat &source_data,
Vector &source, const idx_t &offset, const idx_t &vector_remaining,
idx_t &append_count, idx_t &heap_size, data_ptr_t &base_heap_ptr) {
// check if we can do the optimization at all
switch (source.GetVectorType()) {
case VectorType::CONSTANT_VECTOR: {
const auto &constant_string = ConstantVector::GetData<string_t>(source)[0];
if (ConstantVector::IsNull(source) || constant_string.IsInlined()) {
return false; // regular path is OK
}
heap_size = constant_string.GetSize();
break;
}
case VectorType::DICTIONARY_VECTOR: {
const auto dictionary_size = DictionaryVector::DictionarySize(source);
if (!dictionary_size.IsValid() || dictionary_size.GetIndex() >= vector_remaining / 2) {
return false; // not a dictionary from storage or dictionary too large
}

const auto &dictionary_vector = DictionaryVector::Child(source);
const auto dictionary_strings = FlatVector::GetData<string_t>(dictionary_vector);
const auto &dictionary_validity = FlatVector::Validity(dictionary_vector);

// Compute total size needed for dictionary strings
const auto dictionary_size_idx = dictionary_size.GetIndex();
if (dictionary_validity.AllValid()) {
for (idx_t i = 0; i < dictionary_size_idx; i++) {
const auto &dictionary_string = dictionary_strings[i];
heap_size += !dictionary_string.IsInlined() * dictionary_string.GetSize();
}
} else {
for (idx_t i = 0; i < dictionary_size_idx; i++) {
const auto &dictionary_string = dictionary_strings[i];
const auto add_size = dictionary_validity.RowIsValidUnsafe(i) && !dictionary_string.IsInlined();
heap_size += add_size * dictionary_string.GetSize();
}
}

if (heap_size == 0) {
return false; // regular path is OK
}
break;
}
default:
return false;
}
D_ASSERT(heap_size != 0);

auto &segment = meta_data.segment;
auto &append_state = meta_data.state;

// allocate string heap for the compressed strings
child_index = segment.AllocateStringHeap(heap_size, meta_data.chunk_data, append_state, child_index);
if (!meta_data.GetVectorMetaData().child_index.IsValid()) {
meta_data.GetVectorMetaData().child_index = meta_data.segment.AddChildIndex(child_index);
}
auto &child_segment = segment.GetVectorData(child_index);
base_heap_ptr = segment.allocator->GetDataPointer(append_state.current_chunk_state, child_segment.block_id,
child_segment.offset);

auto &current_segment = segment.GetVectorData(current_index);
const auto base_ptr = segment.allocator->GetDataPointer(append_state.current_chunk_state, current_segment.block_id,
current_segment.offset);

// initialize validity mask
auto validity_data = ColumnDataCollectionSegment::GetValidityPointerForWriting(base_ptr, sizeof(string_t));
ValidityMask target_validity(validity_data, STANDARD_VECTOR_SIZE);
if (current_segment.count == 0) {
// first time appending to this vector
// all data here is still uninitialized
// initialize the validity mask to set all to valid
target_validity.SetAllValid(STANDARD_VECTOR_SIZE);
}

// now write the compressed data
const auto target_entries = reinterpret_cast<string_t *>(base_ptr);
if (source.GetVectorType() == VectorType::CONSTANT_VECTOR) {
// copy over the constant string
auto constant_string = ConstantVector::GetData<string_t>(source)[0];
memcpy(base_heap_ptr, constant_string.GetData(), constant_string.GetSize());
constant_string.SetPointer(char_ptr_cast(base_heap_ptr));

// duplicate it
for (idx_t i = 0; i < vector_remaining; i++) {
const auto target_idx = current_segment.count + i;
target_entries[target_idx] = constant_string;
}
} else {
D_ASSERT(source.GetVectorType() == VectorType::DICTIONARY_VECTOR);
const auto dictionary_size = DictionaryVector::DictionarySize(source);
const auto &dictionary_vector = DictionaryVector::Child(source);
const auto dictionary_strings = FlatVector::GetData<string_t>(dictionary_vector);
const auto &dictionary_validity = FlatVector::Validity(dictionary_vector);

// Copy over dictionary, computing offsets as we go
idx_t current_string_offset = 0;
idx_t string_offsets[STANDARD_VECTOR_SIZE];
const auto dictionary_size_idx = dictionary_size.GetIndex();
for (idx_t i = 0; i < dictionary_size_idx; i++) {
const auto &dictionary_string = dictionary_strings[i];
if (dictionary_validity.RowIsValid(i) && !dictionary_string.IsInlined()) {
string_offsets[i] = current_string_offset;
memcpy(base_heap_ptr + current_string_offset, dictionary_string.GetPointer(),
dictionary_string.GetSize());
current_string_offset += dictionary_string.GetSize();
}
}

// Now copy over the string vector, pointing to the new dictionary
const auto source_entries = UnifiedVectorFormat::GetData<string_t>(source_data);
for (idx_t i = 0; i < vector_remaining; i++) {
const auto source_idx = UnsafeNumericCast<idx_t>((*source_data.sel)[offset + i]);
const auto target_idx = current_segment.count + i;
if (!source_data.validity.RowIsValid(source_idx)) {
target_validity.SetInvalid(target_idx);
continue;
}
const auto &source_entry = source_entries[source_idx];
auto &target_entry = target_entries[target_idx];
target_entry = source_entry;
if (!source_entry.IsInlined()) {
target_entry.SetPointer(char_ptr_cast(base_heap_ptr + string_offsets[source_idx]));
#ifdef D_ASSERT_IS_ENABLED
if (source.GetType() == LogicalType::VARCHAR) {
target_entry.Verify();
}
#endif
}
}
}

append_count = vector_remaining;
return true;
}

template <>
void ColumnDataCopy<string_t>(ColumnDataMetaData &meta_data, const UnifiedVectorFormat &source_data, Vector &source,
idx_t offset, idx_t copy_count) {
Expand Down Expand Up @@ -486,85 +622,97 @@ void ColumnDataCopy<string_t>(ColumnDataMetaData &meta_data, const UnifiedVector
auto block_size = meta_data.segment.allocator->GetBufferManager().GetBlockSize();
while (remaining > 0) {
// how many values fit in the current string vector
idx_t vector_remaining =
const auto vector_remaining =
MinValue<idx_t>(STANDARD_VECTOR_SIZE - segment.GetVectorData(current_index).count, remaining);

// 'append_count' is less if we cannot fit that amount of non-inlined strings on one buffer-managed block
idx_t append_count;
idx_t append_count = 0;
idx_t heap_size = 0;
const auto source_entries = UnifiedVectorFormat::GetData<string_t>(source_data);
for (append_count = 0; append_count < vector_remaining; append_count++) {
auto source_idx = source_data.sel->get_index(offset + append_count);
if (!source_data.validity.RowIsValid(source_idx)) {
continue;
}
const auto &entry = source_entries[source_idx];
if (entry.IsInlined()) {
continue;
}
if (heap_size + entry.GetSize() > block_size) {
break;
data_ptr_t base_heap_ptr = nullptr;
if (!ColumnDataCopyCompressedStrings(meta_data, current_index, child_index, source_data, source, offset,
6D4E vector_remaining, append_count, heap_size, base_heap_ptr)) {
// 'append_count' is less if we cannot fit that amount of non-inlined strings on one buffer-managed block
const auto source_entries = UnifiedVectorFormat::GetData<string_t>(source_data);
for (; append_count < vector_remaining; append_count++) {
auto source_idx = source_data.sel->get_index(offset + append_count);
if (!source_data.validity.RowIsValid(source_idx)) {
continue;
}
const auto &entry = source_entries[source_idx];
if (entry.IsInlined()) {
continue;
}
if (heap_size + entry.GetSize() > block_size) {
break;
}
heap_size += entry.GetSize();
}
heap_size += entry.GetSize();
}

if (vector_remaining != 0 && append_count == 0) {
// The string exceeds Storage::DEFAULT_BLOCK_SIZE, so we allocate one block at a time for long strings.
auto source_idx = source_data.sel->get_index(offset + append_count);
D_ASSERT(source_data.validity.RowIsValid(source_idx));
D_ASSERT(!source_entries[source_idx].IsInlined());
D_ASSERT(source_entries[source_idx].GetSize() > block_size);
heap_size += source_entries[source_idx].GetSize();
append_count++;
}

// allocate string heap for the next 'append_count' strings
data_ptr_t heap_ptr = nullptr;
if (heap_size != 0) {
child_index = segment.AllocateStringHeap(heap_size, meta_data.chunk_data, append_state, child_index);
if (!meta_data.GetVectorMetaData().child_index.IsValid()) {
meta_data.GetVectorMetaData().child_index = meta_data.segment.AddChildIndex(child_index);
if (vector_remaining != 0 && append_count == 0) {
// The string exceeds Storage::DEFAULT_BLOCK_SIZE, so we allocate one block at a time for long strings.
auto source_idx = source_data.sel->get_index(offset + append_count);
D_ASSERT(source_data.validity.RowIsValid(source_idx));
D_ASSERT(!source_entries[source_idx].IsInlined());
D_ASSERT(source_entries[source_idx].GetSize() > block_size);
heap_size += source_entries[source_idx].GetSize();
append_count++;
}
auto &child_segment = segment.GetVectorData(child_index);
heap_ptr = segment.allocator->GetDataPointer(append_state.current_chunk_state, child_segment.block_id,
child_segment.offset);
}

auto &current_segment = segment.GetVectorData(current_index);
auto base_ptr = segment.allocator->GetDataPointer(append_state.current_chunk_state, current_segment.block_id,
current_segment.offset);
auto validity_data = ColumnDataCollectionSegment::GetValidityPointerForWriting(base_ptr, sizeof(string_t));
ValidityMask target_validity(validity_data, STANDARD_VECTOR_SIZE);
if (current_segment.count == 0) {
// first time appending to this vector
// all data here is still uninitialized
// initialize the validity mask to set all to valid
target_validity.SetAllValid(STANDARD_VECTOR_SIZE);
}
// allocate string heap for the next 'append_count' strings
if (heap_size != 0) {
child_index = segment.AllocateStringHeap(heap_size, meta_data.chunk_data, append_state, child_index);
if (!meta_data.GetVectorMetaData().child_index.IsValid()) {
meta_data.GetVectorMetaData().child_index = meta_data.segment.AddChildIndex(child_index);
}
const auto &child_segment = segment.GetVectorData(child_index);
base_heap_ptr = segment.allocator->GetDataPointer(append_state.current_chunk_state,
child_segment.block_id, child_segment.offset);
}

auto target_entries = reinterpret_cast<string_t *>(base_ptr);
for (idx_t i = 0; i < append_count; i++) {
auto source_idx = source_data.sel->get_index(offset + i);
auto target_idx = current_segment.count + i;
if (!source_data.validity.RowIsValid(source_idx)) {
target_validity.SetInvalid(target_idx);
continue;
// We get a reference to the "current_segment" only after allocating the string heap above,
// because this can resize the vector holding the segments, moving it somewhere else
auto &current_segment = segment.GetVectorData(current_index);
auto base_ptr = segment.allocator->GetDataPointer(append_state.current_chunk_state,
current_segment.block_id, current_segment.offset);
auto validity_data = ColumnDataCollectionSegment::GetValidityPointerForWriting(base_ptr, sizeof(string_t));
ValidityMask target_validity(validity_data, STANDARD_VECTOR_SIZE);
if (current_segment.count == 0) {
// first time appending to this vector
// all data here is still uninitialized
// initialize the validity mask to set all to valid
target_validity.SetAllValid(STANDARD_VECTOR_SIZE);
}
const auto &source_entry = source_entries[source_idx];
auto &target_entry = target_entries[target_idx];
if (source_entry.IsInlined()) {
target_entry = source_entry;
} else {
D_ASSERT(heap_ptr != nullptr);
memcpy(heap_ptr, source_entry.GetData(), source_entry.GetSize());
target_entry =
string_t(const_char_ptr_cast(heap_ptr), UnsafeNumericCast<uint32_t>(source_entry.GetSize()));
heap_ptr += source_entry.GetSize();

auto target_entries = reinterpret_cast<string_t *>(base_ptr);
data_ptr_t heap_ptr = base_heap_ptr;
for (idx_t i = 0; i < append_count; i++) {
auto source_idx = source_data.sel->get_index(offset + i);
auto target_idx = current_segment.count + i;
if (!source_data.validity.RowIsValid(source_idx)) {
target_validity.SetInvalid(target_idx);
continue;
}
const auto &source_entry = source_entries[source_idx];
auto &target_entry = target_entries[target_idx];
if (source_entry.IsInlined()) {
target_entry = source_entry;
} else {
D_ASSERT(base_heap_ptr != nullptr);
memcpy(heap_ptr, source_entry.GetData(), source_entry.GetSize());
target_entry =
string_t(const_char_ptr_cast(heap_ptr), UnsafeNumericCast<uint32_t>(source_entry.GetSize()));
heap_ptr += source_entry.GetSize();
}
}
}

auto &current_segment = segment.GetVectorData(current_index);
if (heap_size != 0) {
current_segment.swizzle_data.emplace_back(child_index, current_segment.count, append_count);
#ifdef D_ASSERT_IS_ENABLED
const auto &child_segment = segment.GetVectorData(child_index);
D_ASSERT(base_heap_ptr == segment.allocator->GetDataPointer(append_state.current_chunk_state,
child_segment.block_id, child_segment.offset));
#endif
current_segment.swizzle_data.emplace_back(child_index, base_heap_ptr, current_segment.count, append_count);
}

current_segment.count += append_count;
Expand Down
5 changes: 3 additions & 2 deletions src/common/types/column/column_data_collection_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,14 @@ idx_t ColumnDataCollectionSegment::ReadVector(ChunkManagementState &state, Vecto
} else if (internal_type == PhysicalType::VARCHAR) {
if (allocator->GetType() == ColumnDataAllocatorType::BUFFER_MANAGER_ALLOCATOR) {
auto next_index = vector_index;
const auto copied =
vdata.next_data.IsValid() || state.properties == ColumnDataScanProperties::DISALLOW_ZERO_COPY;
idx_t offset = 0;
while (next_index.IsValid()) {
auto &current_vdata = GetVectorData(next_index);
for (auto &swizzle_segment : current_vdata.swizzle_data) {
auto &string_heap_segment = GetVectorData(swizzle_segment.child_index);
allocator->UnswizzlePointers(state, result, offset + swizzle_segment.offset, swizzle_segment.count,
string_heap_segment.block_id, string_heap_segment.offset);
allocator->UnswizzlePointers(state, result, swizzle_segment, string_heap_segment, offset, copied);
}
offset += current_vdata.count;
next_index = current_vdata.next_data;
Expand Down
Loading
Loading
0