8000 Parquet Reader: Share ResizeableBuffers across decoders, and unify Plain/PlainReference by Mytherin · Pull Request #16113 · duckdb/duckdb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Parquet Reader: Share ResizeableBuffers across decoders, and unify Plain/PlainReference #16113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions extension/parquet/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,14 @@ unique_ptr<BaseStatistics> ColumnReader::Stats(idx_t row_group_idx_p, const vect
return ParquetStatisticsUtils::TransformColumnStatistics(*this, columns);
}

void ColumnReader::Plain(shared_ptr<ByteBuffer> plain_data, uint8_t *defines, idx_t num_val 8000 ues, // NOLINT
void ColumnReader::Plain(ByteBuffer &plain_data, uint8_t *defines, idx_t num_values, // NOLINT
parquet_filter_t *filter, idx_t result_offset, Vector &result) {
throw NotImplementedException("Plain");
throw NotImplementedException("Plain not implemented");
}

void ColumnReader::PlainReference(shared_ptr<ResizeableBuffer> &, Vector &result) { // NOLINT
void ColumnReader::Plain(shared_ptr<ResizeableBuffer> &plain_data, uint8_t *defines, idx_t num_values,
parquet_filter_t *filter, idx_t result_offset, Vector &result) {
Plain(*plain_data, defines, num_values, filter, result_offset, result);
}

void ColumnReader::InitializeRead(idx_t row_group_idx_p, const vector<ColumnChunk> &columns, TProtocol &protocol_p) {
Expand Down Expand Up @@ -528,7 +530,6 @@ idx_t ColumnReader::Read(uint64_t num_values, parquet_filter_t &filter, data_ptr
} else if (encoding == ColumnEncoding::BYTE_STREAM_SPLIT) {
byte_stream_split_decoder.Read(define_ptr, read_now, result, result_offset);
} else {
PlainReference(block, result);
Plain(block, define_out, read_now, &filter, result_offset, result);
}

Expand Down
15 changes: 8 additions & 7 deletions extension/parquet/decoder/byte_stream_split_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

namespace duckdb {

ByteStreamSplitDecoder::ByteStreamSplitDecoder(ColumnReader &reader) : reader(reader) {
ByteStreamSplitDecoder::ByteStreamSplitDecoder(ColumnReader &reader)
: reader(reader), decoded_data_buffer(reader.encoding_buffers[0]) {
}

void ByteStreamSplitDecoder::InitializePage() {
Expand All @@ -26,22 +27,22 @@ void ByteStreamSplitDecoder::Read(uint8_t *defines, idx_t read_count, Vector &re
}
idx_t valid_count = read_count - null_count;

auto read_buf = make_shared_ptr<ResizeableBuffer>();
auto &allocator = reader.reader.allocator;
decoded_data_buffer.reset();
switch (reader.schema.type) {
case duckdb_parquet::Type::FLOAT:
read_buf->resize(allocator, sizeof(float) * valid_count);
bss_decoder->GetBatch<float>(read_buf->ptr, valid_count);
decoded_data_buffer.resize(allocator, sizeof(float) * valid_count);
bss_decoder->GetBatch<float>(decoded_data_buffer.ptr, valid_count);
break;
case duckdb_parquet::Type::DOUBLE:
read_buf->resize(allocator, sizeof(double) * valid_count);
bss_decoder->GetBatch<double>(read_buf->ptr, valid_count);
decoded_data_buffer.resize(allocator, sizeof(double) * valid_count);
bss_decoder->GetBatch<double>(decoded_data_buffer.ptr, valid_count);
break;
default:
throw std::runtime_error("BYTE_STREAM_SPLIT encoding is only supported for FLOAT or DOUBLE data");
}

reader.Plain(read_buf, defines, read_count, nullptr, result_offset, result);
reader.Plain(decoded_data_buffer, defines, read_count, nullptr, result_offset, result);
}

} // namespace duckdb
18 changes: 9 additions & 9 deletions extension/parquet/decoder/delta_binary_packed_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

namespace duckdb {

DeltaBinaryPackedDecoder::DeltaBinaryPackedDecoder(ColumnReader &reader) : reader(reader) {
DeltaBinaryPackedDecoder::DeltaBinaryPackedDecoder(ColumnReader &reader)
: reader(reader), decoded_data_buffer(reader.encoding_buffers[0]) {
}

void DeltaBinaryPackedDecoder::InitializePage() {
Expand All @@ -14,9 +15,6 @@ void DeltaBinaryPackedDecoder::InitializePage() {
}

void DeltaBinaryPackedDecoder::Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t result_offset) {
// TODO keep this in the state
auto read_buf = make_shared_ptr<ResizeableBuffer>();

idx_t null_count = 0;
if (defines) {
// we need the null count because the dictionary offsets have no entries for nulls
Expand All @@ -27,22 +25,24 @@ void DeltaBinaryPackedDecoder::Read(uint8_t *defines, idx_t read_count, Vector &
idx_t valid_count = read_count - null_count;

auto &allocator = reader.reader.allocator;

decoded_data_buffer.reset();
switch (reader.schema.type) {
case duckdb_parquet::Type::INT32:
read_buf->resize(allocator, sizeof(int32_t) * (valid_count));
dbp_decoder->GetBatch<int32_t>(read_buf->ptr, valid_count);
decoded_data_buffer.resize(allocator, sizeof(int32_t) * (valid_count));
dbp_decoder->GetBatch<int32_t>(decoded_data_buffer.ptr, valid_count);

break;
case duckdb_parquet::Type::INT64:
read_buf->resize(allocator, sizeof(int64_t) * (valid_count));
dbp_decoder->GetBatch<int64_t>(read_buf->ptr, valid_count);
decoded_data_buffer.resize(allocator, sizeof(int64_t) * (valid_count));
dbp_decoder->GetBatch<int64_t>(decoded_data_buffer.ptr, valid_count);
break;

default:
throw std::runtime_error("DELTA_BINARY_PACKED should only be INT32 or INT64");
}
// Plain() will put NULLs in the right place
reader.Plain(read_buf, defines, read_count, nullptr, result_offset, result);
reader.Plain(decoded_data_buffer, defines, read_count, nullptr, result_offset, result);
}

} // namespace duckdb
21 changes: 11 additions & 10 deletions extension/parquet/decoder/delta_byte_array_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@ namespace duckdb {
DeltaByteArrayDecoder::DeltaByteArrayDecoder(ColumnReader &reader) : reader(reader) {
}

shared_ptr<ResizeableBuffer> DeltaByteArrayDecoder::ReadDbpData(Allocator &allocator, ResizeableBuffer &buffer,
idx_t &value_count) {
void DeltaByteArrayDecoder::ReadDbpData(Allocator &allocator, ResizeableBuffer &buffer, ResizeableBuffer &result_buffer,
idx_t &value_count) {
auto decoder = make_uniq<DbpDecoder>(buffer.ptr, buffer.len);
value_count = decod 6D40 er->TotalValues();
auto result = make_shared_ptr<ResizeableBuffer>();
result->resize(allocator, sizeof(uint32_t) * value_count);
decoder->GetBatch<uint32_t>(result->ptr, value_count);
result_buffer.reset();
result_buffer.resize(allocator, sizeof(uint32_t) * value_count);
decoder->GetBatch<uint32_t>(result_buffer.ptr, value_count);
decoder->Finalize();
buffer.inc(buffer.len - decoder->BufferPtr().len);
return result;
}

void DeltaByteArrayDecoder::InitializePage() {
Expand All @@ -27,8 +26,10 @@ void DeltaByteArrayDecoder::InitializePage() {
auto &block = *reader.block;
auto &allocator = reader.reader.allocator;
idx_t prefix_count, suffix_count;
auto prefix_buffer = ReadDbpData(allocator, block, prefix_count);
auto suffix_buffer = ReadDbpData(allocator, block, suffix_count);
auto &prefix_buffer = reader.encoding_buffers[0];
auto &suffix_buffer = reader.encoding_buffers[1];
ReadDbpData(allocator, block, prefix_buffer, prefix_count);
ReadDbpData(allocator, block, suffix_buffer, suffix_count);
if (prefix_count != suffix_count) {
throw std::runtime_error("DELTA_BYTE_ARRAY - prefix and suffix counts are different - corrupt file?");
}
Expand All @@ -37,8 +38,8 @@ void DeltaByteArrayDecoder::InitializePage() {
byte_array_data = make_uniq<Vector>(LogicalType::VARCHAR, nullptr);
return;
}
auto prefix_data = reinterpret_cast<uint32_t *>(prefix_buffer->ptr);
auto suffix_data = reinterpret_cast<uint32_t *>(suffix_buffer->ptr);
auto prefix_data = reinterpret_cast<uint32_t *>(prefix_buffer.ptr);
auto suffix_data = reinterpret_cast<uint32_t *>(suffix_buffer.ptr);
byte_array_data = make_uniq<Vector>(LogicalType::VARCHAR, prefix_count);
byte_array_count = prefix_count;
delta_offset = 0;
Expand Down
10 changes: 4 additions & 6 deletions extension/parquet/decoder/delta_length_byte_array_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

namespace duckdb {

DeltaLengthByteArrayDecoder::DeltaLengthByteArrayDecoder(ColumnReader &reader) : reader(reader) {
DeltaLengthByteArrayDecoder::DeltaLengthByteArrayDecoder(ColumnReader &reader)
: reader(reader), length_buffer(reader.encoding_buffers[0]) {
}

void DeltaLengthByteArrayDecoder::InitializePage() {
Expand All @@ -16,16 +17,13 @@ void DeltaLengthByteArrayDecoder::InitializePage() {
// read the binary packed lengths
auto &block = *reader.block;
auto &allocator = reader.reader.allocator;
length_buffer = DeltaByteArrayDecoder::ReadDbpData(allocator, block, byte_array_count);
DeltaByteArrayDecoder::ReadDbpData(allocator, block, length_buffer, byte_array_count);
length_idx = 0;
}

void DeltaLengthByteArrayDecoder::Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t result_offset) {
if (!length_buffer) {
throw std::runtime_error("Internal error - DeltaLengthByteArrayDecoder called but there was no length buffer");
}
auto &block = *reader.block;
auto length_data = reinterpret_cast<uint32_t *>(length_buffer->ptr);
auto length_data = reinterpret_cast<uint32_t *>(length_buffer.ptr);
auto result_data = FlatVector::GetData<string_t>(result);
auto &result_mask = FlatVector::Validity(result);
for (idx_t row_idx = 0; row_idx < read_count; row_idx++) {
Expand Down
5 changes: 2 additions & 3 deletions extension/parquet/decoder/dictionary_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
namespace duckdb {

DictionaryDecoder::DictionaryDecoder(ColumnReader &reader)
: reader(reader), valid_sel(STANDARD_VECTOR_SIZE), dictionary_selection_vector(STANDARD_VECTOR_SIZE),
dictionary_size(0) {
: reader(reader), offset_buffer(reader.encoding_buffers[0]), valid_sel(STANDARD_VECTOR_SIZE),
dictionary_selection_vector(STANDARD_VECTOR_SIZE), dictionary_size(0) {
}

void DictionaryDecoder::InitializeDictionary(idx_t new_dictionary_size) {
Expand All @@ -23,7 +23,6 @@ void DictionaryDecoder::InitializeDictionary(idx_t new_dictionary_size) {
auto &dict_validity = FlatVector::Validity(*dictionary);
dict_validity.Reset(dictionary_size + 1);
dict_validity.SetInvalid(dictionary_size);
reader.PlainReference(reader.block, *dictionary);
reader.Plain(reader.block, nullptr, dictionary_size, nullptr, 0, *dictionary);
}

Expand Down
12 changes: 6 additions & 6 deletions extension/parquet/decoder/rle_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace duckdb {

RLEDecoder::RLEDecoder(ColumnReader &reader) : reader(reader) {
RLEDecoder::RLEDecoder(ColumnReader &reader) : reader(reader), decoded_data_buffer(reader.encoding_buffers[0]) {
}

void RLEDecoder::InitializePage() {
Expand All @@ -28,11 +28,11 @@ void RLEDecoder::Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t
}
}
idx_t valid_count = read_count - null_count;
auto read_buf = make_shared_ptr<ResizeableBuffer>();
read_buf->resize(reader.reader.allocator, sizeof(bool) * valid_count);
rle_decoder->GetBatch<uint8_t>(read_buf->ptr, valid_count);
reader.PlainTemplated<bool, TemplatedParquetValueConversion<bool>>(read_buf, defines, read_count, nullptr,
result_offset, result);
decoded_data_buffer.reset();
decoded_data_buffer.resize(reader.reader.allocator, sizeof(bool) * valid_count);
rle_decoder->GetBatch<uint8_t>(decoded_data_buffer.ptr, valid_count);
reader.PlainTemplated<bool, TemplatedParquetValueConversion<bool>>(decoded_data_buffer, defines, read_count,
nullptr, result_offset, result);
}

} // namespace duckdb
25 changes: 14 additions & 11 deletions extension/parquet/include/column_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,22 @@ class ColumnReader {
virtual unique_ptr<BaseStatistics> Stats(idx_t row_group_idx_p, const vector<ColumnChunk> &columns);

template <class VALUE_TYPE, class CONVERSION>
void PlainTemplated(shared_ptr<ByteBuffer> plain_data, uint8_t *defines, uint64_t num_values,
parquet_filter_t *filter, idx_t result_offset, Vector &result) {
void PlainTemplated(ByteBuffer &plain_data, uint8_t *defines, uint64_t num_values, parquet_filter_t *filter,
idx_t result_offset, Vector &result) {
if (HasDefines()) {
if (CONVERSION::PlainAvailable(*plain_data, num_values)) {
PlainTemplatedInternal<VALUE_TYPE, CONVERSION, true, true>(*plain_data, defines, num_values, filter,
if (CONVERSION::PlainAvailable(plain_data, num_values)) {
PlainTemplatedInternal<VALUE_TYPE, CONVERSION, true, true>(plain_data, defines, num_values, filter,
result_offset, result);
} else {
PlainTemplatedInternal<VALUE_TYPE, CONVERSION, true, false>(*plain_data, defines, num_values, filter,
PlainTemplatedInternal<VALUE_TYPE, CONVERSION, true, false>(plain_data, defines, num_values, filter,
result_offset, result);
}
} else {
if (CONVERSION::PlainAvailable(*plain_data, num_values)) {
PlainTemplatedInternal<VALUE_TYPE, CONVERSION, false, true>(*plain_data, defines, num_values, filter,
if (CONVERSION::PlainAvailable(plain_data, num_values)) {
PlainTemplatedInternal<VALUE_TYPE, CONVERSION, false, true>(plain_data, defines, num_values, filter,
result_offset, result);
} else {
PlainTemplatedInternal<VALUE_TYPE, CONVERSION, false, false>(*plain_data, defines, num_values, filter,
PlainTemplatedInternal<VALUE_TYPE, CONVERSION, false, false>(plain_data, defines, num_values, filter,
result_offset, result);
}
}
Expand Down Expand Up @@ -142,10 +142,10 @@ class ColumnReader {
protected:
Allocator &GetAllocator();
// readers that use the default Read() need to implement those
virtual void Plain(shared_ptr<ByteBuffer> plain_data, uint8_t *defines, idx_t num_values, parquet_filter_t *filter,
virtual void Plain(ByteBuffer &plain_data, uint8_t *defines, idx_t num_values, parquet_filter_t *filter,
idx_t result_offset, Vector &result);
// these are nops for most types, but not for strings
virtual void PlainReference(shared_ptr<ResizeableBuffer> &, Vector &result);
virtual void Plain(shared_ptr<ResizeableBuffer> &plain_data, uint8_t *defines, idx_t num_values,
parquet_filter_t *filter, idx_t result_offset, Vector &result);

// applies any skips that were registered using Skip()
virtual void ApplyPendingSkips(idx_t num_values);
Expand Down Expand Up @@ -203,6 +203,9 @@ class ColumnReader {
DeltaByteArrayDecoder delta_byte_array_decoder;
ByteStreamSplitDecoder byte_stream_split_decoder;

//! Resizeable buffers used for the various encodings above
ResizeableBuffer encoding_buffers[2];

// dummies for Skip()
parquet_filter_t none_filter;
ResizeableBuffer dummy_define;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class ByteStreamSplitDecoder {

private:
ColumnReader &reader;
ResizeableBuffer &decoded_data_buffer;
unique_ptr<BssDecoder> bss_decoder;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class DeltaBinaryPackedDecoder {

private:
ColumnReader &reader;
ResizeableBuffer &decoded_data_buffer;
unique_ptr<DbpDecoder> dbp_decoder;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class DeltaByteArrayDecoder {

void Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t result_offset);

static shared_ptr<ResizeableBuffer> ReadDbpData(Allocator &allocator, ResizeableBuffer &buffer, idx_t &value_count);
static void ReadDbpData(Allocator &allocator, ResizeableBuffer &buffer, ResizeableBuffer &result_buffer,
idx_t &value_count);

private:
ColumnReader &reader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class DeltaLengthByteArrayDecoder {

private:
ColumnReader &reader;
shared_ptr<ResizeableBuffer> length_buffer;
ResizeableBuffer &length_buffer;
idx_t byte_array_count = 0;
idx_t length_idx;
};
Expand Down
2 changes: 1 addition & 1 deletion extension/parquet/include/decoder/dictionary_decoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DictionaryDecoder {

private:
ColumnReader &reader;
ResizeableBuffer offset_buffer;
ResizeableBuffer &offset_buffer;
unique_ptr<RleBpDecoder> dict_decoder;
SelectionVector valid_sel;
SelectionVector dictionary_selection_vector;
Expand Down
1 change: 1 addition & 0 deletions extension/parquet/include/decoder/rle_decoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class RLEDecoder {

private:
ColumnReader &reader;
ResizeableBuffer &decoded_data_buffer;
unique_ptr<RleBpDecoder> rle_decoder;
};

Expand Down
2 changes: 1 addition & 1 deletion extension/parquet/include/reader/null_column_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class NullColumnReader : public ColumnReader {
shared_ptr<ResizeableBuffer> dict;

public:
void Plain(shared_ptr<ByteBuffer> plain_data, uint8_t *defines, uint64_t num_values, parquet_filter_t *filter,
void Plain(ByteBuffer &plain_data, uint8_t *defines, uint64_t num_values, parquet_filter_t *filter,
idx_t result_offset, Vector &result) override {
(void)defines;
(void)plain_data;
Expand Down
9 changes: 7 additions & 2 deletions extension/parquet/include/reader/string_column_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ struct StringParquetValueConversion {
static void UnsafePlainSkip(ByteBuffer &plain_data, ColumnReader &reader);
};

class StringColumnReader : public TemplatedColumnReader<string_t, StringParquetValueConversion> {
class StringColumnReader : public ColumnReader {
public:
static constexpr const PhysicalType TYPE = PhysicalType::VARCHAR;

Expand All @@ -35,7 +35,12 @@ class StringColumnReader : public TemplatedColumnReader<string_t, StringParquetV
uint32_t VerifyString(const char *str_data, uint32_t str_len);

protected:
void PlainReference(shared_ptr<ResizeableBuffer> &plain_data, Vector &result) override;
void Plain(ByteBuffer &plain_data, uint8_t *defines, idx_t num_values, parquet_filter_t *filter,
idx_t result_offset, Vector &result) override {
throw NotImplementedException("StringColumnReader can only read plain data from a shared buffer");
}
void Plain(shared_ptr<ResizeableBuffer> &plain_data, uint8_t *defines, idx_t num_values, parquet_filter_t *filter,
idx_t result_offset, Vector &result) override;
};

} // namespace duckdb
5 changes: 2 additions & 3 deletions extension/parquet/include/reader/templated_column_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,9 @@ class TemplatedColumnReader : public ColumnReader {
}
}

void Plain(shared_ptr<ByteBuffer> plain_data, uint8_t *defines, uint64_t num_values, parquet_filter_t *filter,
void Plain(ByteBuffer &plain_data, uint8_t *defines, uint64_t num_values, parquet_filter_t *filter,
idx_t result_offset, Vector &result) override {
PlainTemplated<VALUE_TYPE, VALUE_CONVERSION>(std::move(plain_data), defines, num_values, filter, result_offset,
result);
PlainTemplated<VALUE_TYPE, VALUE_CONVERSION>(plain_data, defines, num_values, filter, result_offset, result);
}
};

Expand Down
4 changes: 4 additions & 0 deletions extension/parquet/include/resizable_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class ResizeableBuffer : public ByteBuffer {
ptr = allocated_data.get();
}
}
void reset() {
ptr = allocated_data.get();
len = alloc_len;
}

private:
AllocatedData allocated_data;
Expand Down
Loading
Loading
0