From c32e6c8f7769d4719ab826ed3389f28e2c860710 Mon Sep 17 00:00:00 2001 From: Mytherin Date: Thu, 6 Feb 2025 21:54:29 +0100 Subject: [PATCH 1/3] Share resizeable buffers across encoders --- .../decoder/delta_byte_array_decoder.cpp | 20 +++++++++---------- .../delta_length_byte_array_decoder.cpp | 10 ++++------ .../parquet/decoder/dictionary_decoder.cpp | 4 ++-- extension/parquet/include/column_reader.hpp | 3 +++ .../decoder/delta_byte_array_decoder.hpp | 3 ++- .../delta_length_byte_array_decoder.hpp | 2 +- .../include/decoder/dictionary_decoder.hpp | 2 +- 7 files changed, 23 insertions(+), 21 deletions(-) diff --git a/extension/parquet/decoder/delta_byte_array_decoder.cpp b/extension/parquet/decoder/delta_byte_array_decoder.cpp index 4983ef41dd20..0d3f4966e95e 100644 --- a/extension/parquet/decoder/delta_byte_array_decoder.cpp +++ b/extension/parquet/decoder/delta_byte_array_decoder.cpp @@ -8,16 +8,14 @@ namespace duckdb { DeltaByteArrayDecoder::DeltaByteArrayDecoder(ColumnReader &reader) : reader(reader) { } -shared_ptr DeltaByteArrayDecoder::ReadDbpData(Allocator &allocator, ResizeableBuffer &buffer, - idx_t &value_count) { +void DeltaByteArrayDecoder::ReadDbpData(Allocator &allocator, ResizeableBuffer &buffer, ResizeableBuffer &result_buffer, + idx_t &value_count) { auto decoder = make_uniq(buffer.ptr, buffer.len); value_count = decoder->TotalValues(); - auto result = make_shared_ptr(); - result->resize(allocator, sizeof(uint32_t) * value_count); - decoder->GetBatch(result->ptr, value_count); + result_buffer.resize(allocator, sizeof(uint32_t) * value_count); + decoder->GetBatch(result_buffer.ptr, value_count); decoder->Finalize(); buffer.inc(buffer.len - decoder->BufferPtr().len); - return result; } void DeltaByteArrayDecoder::InitializePage() { @@ -27,8 +25,10 @@ void DeltaByteArrayDecoder::InitializePage() { auto &block = *reader.block; auto &allocator = reader.reader.allocator; idx_t prefix_count, suffix_count; - auto prefix_buffer = ReadDbpData(allocator, block, prefix_count); - auto suffix_buffer = ReadDbpData(allocator, block, suffix_count); + auto &prefix_buffer = reader.encoding_buffers[0]; + auto &suffix_buffer = reader.encoding_buffers[1]; + ReadDbpData(allocator, block, prefix_buffer, prefix_count); + ReadDbpData(allocator, block, suffix_buffer, suffix_count); if (prefix_count != suffix_count) { throw std::runtime_error("DELTA_BYTE_ARRAY - prefix and suffix counts are different - corrupt file?"); } @@ -37,8 +37,8 @@ void DeltaByteArrayDecoder::InitializePage() { byte_array_data = make_uniq(LogicalType::VARCHAR, nullptr); return; } - auto prefix_data = reinterpret_cast(prefix_buffer->ptr); - auto suffix_data = reinterpret_cast(suffix_buffer->ptr); + auto prefix_data = reinterpret_cast(prefix_buffer.ptr); + auto suffix_data = reinterpret_cast(suffix_buffer.ptr); byte_array_data = make_uniq(LogicalType::VARCHAR, prefix_count); byte_array_count = prefix_count; delta_offset = 0; diff --git a/extension/parquet/decoder/delta_length_byte_array_decoder.cpp b/extension/parquet/decoder/delta_length_byte_array_decoder.cpp index 8f7b76581e9e..9fa428d164f7 100644 --- a/extension/parquet/decoder/delta_length_byte_array_decoder.cpp +++ b/extension/parquet/decoder/delta_length_byte_array_decoder.cpp @@ -6,7 +6,8 @@ namespace duckdb { -DeltaLengthByteArrayDecoder::DeltaLengthByteArrayDecoder(ColumnReader &reader) : reader(reader) { +DeltaLengthByteArrayDecoder::DeltaLengthByteArrayDecoder(ColumnReader &reader) + : reader(reader), length_buffer(reader.encoding_buffers[0]) { } void DeltaLengthByteArrayDecoder::InitializePage() { @@ -16,16 +17,13 @@ void DeltaLengthByteArrayDecoder::InitializePage() { // read the binary packed lengths auto &block = *reader.block; auto &allocator = reader.reader.allocator; - length_buffer = DeltaByteArrayDecoder::ReadDbpData(allocator, block, byte_array_count); + DeltaByteArrayDecoder::ReadDbpData(allocator, block, length_buffer, byte_array_count); length_idx = 0; } void DeltaLengthByteArrayDecoder::Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t result_offset) { - if (!length_buffer) { - throw std::runtime_error("Internal error - DeltaLengthByteArrayDecoder called but there was no length buffer"); - } auto &block = *reader.block; - auto length_data = reinterpret_cast(length_buffer->ptr); + auto length_data = reinterpret_cast(length_buffer.ptr); auto result_data = FlatVector::GetData(result); auto &result_mask = FlatVector::Validity(result); for (idx_t row_idx = 0; row_idx < read_count; row_idx++) { diff --git a/extension/parquet/decoder/dictionary_decoder.cpp b/extension/parquet/decoder/dictionary_decoder.cpp index 3e016f82ed92..8bd38121880d 100644 --- a/extension/parquet/decoder/dictionary_decoder.cpp +++ b/extension/parquet/decoder/dictionary_decoder.cpp @@ -5,8 +5,8 @@ namespace duckdb { DictionaryDecoder::DictionaryDecoder(ColumnReader &reader) - : reader(reader), valid_sel(STANDARD_VECTOR_SIZE), dictionary_selection_vector(STANDARD_VECTOR_SIZE), - dictionary_size(0) { + : reader(reader), offset_buffer(reader.encoding_buffers[0]), valid_sel(STANDARD_VECTOR_SIZE), + dictionary_selection_vector(STANDARD_VECTOR_SIZE), dictionary_size(0) { } void DictionaryDecoder::InitializeDictionary(idx_t new_dictionary_size) { diff --git a/extension/parquet/include/column_reader.hpp b/extension/parquet/include/column_reader.hpp index 54dae91d521f..834c8a4ddfa8 100644 --- a/extension/parquet/include/column_reader.hpp +++ b/extension/parquet/include/column_reader.hpp @@ -203,6 +203,9 @@ class ColumnReader { DeltaByteArrayDecoder delta_byte_array_decoder; ByteStreamSplitDecoder byte_stream_split_decoder; + //! Resizeable buffers used for the various encodings above + ResizeableBuffer encoding_buffers[2]; + // dummies for Skip() parquet_filter_t none_filter; ResizeableBuffer dummy_define; diff --git a/extension/parquet/include/decoder/delta_byte_array_decoder.hpp b/extension/parquet/include/decoder/delta_byte_array_decoder.hpp index 486754b7310b..6cc89dd2709b 100644 --- a/extension/parquet/include/decoder/delta_byte_array_decoder.hpp +++ b/extension/parquet/include/decoder/delta_byte_array_decoder.hpp @@ -24,7 +24,8 @@ class DeltaByteArrayDecoder { void Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t result_offset); - static shared_ptr ReadDbpData(Allocator &allocator, ResizeableBuffer &buffer, idx_t &value_count); + static void ReadDbpData(Allocator &allocator, ResizeableBuffer &buffer, ResizeableBuffer &result_buffer, + idx_t &value_count); private: ColumnReader &reader; diff --git a/extension/parquet/include/decoder/delta_length_byte_array_decoder.hpp b/extension/parquet/include/decoder/delta_length_byte_array_decoder.hpp index 4056ea947587..900650468ea7 100644 --- a/extension/parquet/include/decoder/delta_length_byte_array_decoder.hpp +++ b/extension/parquet/include/decoder/delta_length_byte_array_decoder.hpp @@ -26,7 +26,7 @@ class DeltaLengthByteArrayDecoder { private: ColumnReader &reader; - shared_ptr length_buffer; + ResizeableBuffer &length_buffer; idx_t byte_array_count = 0; idx_t length_idx; }; diff --git a/extension/parquet/include/decoder/dictionary_decoder.hpp b/extension/parquet/include/decoder/dictionary_decoder.hpp index 25f47a146846..27604253fa22 100644 --- a/extension/parquet/include/decoder/dictionary_decoder.hpp +++ b/extension/parquet/include/decoder/dictionary_decoder.hpp @@ -29,7 +29,7 @@ class DictionaryDecoder { private: ColumnReader &reader; - ResizeableBuffer offset_buffer; + ResizeableBuffer &offset_buffer; unique_ptr dict_decoder; SelectionVector valid_sel; SelectionVector dictionary_selection_vector; From 2fdee8afdef0afd1f37d316fb6ec139a48fd2416 Mon Sep 17 00:00:00 2001 From: Mytherin Date: Thu, 6 Feb 2025 22:11:01 +0100 Subject: [PATCH 2/3] Remove PlainReference, and instead have two separate calls for Plain (one that takes a ref, one that takes a shared ptr) --- extension/parquet/column_reader.cpp | 9 ++++---- .../parquet/decoder/dictionary_decoder.cpp | 1 - extension/parquet/decoder/rle_decoder.cpp | 2 +- extension/parquet/include/column_reader.hpp | 22 +++++++++---------- .../include/reader/null_column_reader.hpp | 2 +- .../include/reader/string_column_reader.hpp | 9 ++++++-- .../reader/templated_column_reader.hpp | 5 ++--- .../parquet/reader/string_column_reader.cpp | 8 ++++--- 8 files changed, 32 insertions(+), 26 deletions(-) diff --git a/extension/parquet/column_reader.cpp b/extension/parquet/column_reader.cpp index fc841cc510fe..e7213ddb7d61 100644 --- a/extension/parquet/column_reader.cpp +++ b/extension/parquet/column_reader.cpp @@ -198,12 +198,14 @@ unique_ptr ColumnReader::Stats(idx_t row_group_idx_p, const vect return ParquetStatisticsUtils::TransformColumnStatistics(*this, columns); } -void ColumnReader::Plain(shared_ptr plain_data, uint8_t *defines, idx_t num_values, // NOLINT +void ColumnReader::Plain(ByteBuffer &plain_data, uint8_t *defines, idx_t num_values, // NOLINT parquet_filter_t *filter, idx_t result_offset, Vector &result) { - throw NotImplementedException("Plain"); + throw NotImplementedException("Plain not implemented"); } -void ColumnReader::PlainReference(shared_ptr &, Vector &result) { // NOLINT +void ColumnReader::Plain(shared_ptr &plain_data, uint8_t *defines, idx_t num_values, + parquet_filter_t *filter, idx_t result_offset, Vector &result) { + Plain(*plain_data, defines, num_values, filter, result_offset, result); } void ColumnReader::InitializeRead(idx_t row_group_idx_p, const vector &columns, TProtocol &protocol_p) { @@ -528,7 +530,6 @@ idx_t ColumnReader::Read(uint64_t num_values, parquet_filter_t &filter, data_ptr } else if (encoding == ColumnEncoding::BYTE_STREAM_SPLIT) { byte_stream_split_decoder.Read(define_ptr, read_now, result, result_offset); } else { - PlainReference(block, result); Plain(block, define_out, read_now, &filter, result_offset, result); } diff --git a/extension/parquet/decoder/dictionary_decoder.cpp b/extension/parquet/decoder/dictionary_decoder.cpp index 8bd38121880d..20e75b75685f 100644 --- a/extension/parquet/decoder/dictionary_decoder.cpp +++ b/extension/parquet/decoder/dictionary_decoder.cpp @@ -21,7 +21,6 @@ void DictionaryDecoder::InitializeDictionary(idx_t new_dictionary_size) { dictionary_id = reader.reader.file_name + "_" + reader.schema.name + "_" + std::to_string(reader.chunk_read_offset); // we use the first entry as a NULL, dictionary vectors don't have a separate validity mask FlatVector::Validity(*dictionary).SetInvalid(0); - reader.PlainReference(reader.block, *dictionary); reader.Plain(reader.block, nullptr, dictionary_size, nullptr, 1, *dictionary); } diff --git a/extension/parquet/decoder/rle_decoder.cpp b/extension/parquet/decoder/rle_decoder.cpp index fb8644387f83..88c098de4760 100644 --- a/extension/parquet/decoder/rle_decoder.cpp +++ b/extension/parquet/decoder/rle_decoder.cpp @@ -31,7 +31,7 @@ void RLEDecoder::Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t auto read_buf = make_shared_ptr(); read_buf->resize(reader.reader.allocator, sizeof(bool) * valid_count); rle_decoder->GetBatch(read_buf->ptr, valid_count); - reader.PlainTemplated>(read_buf, defines, read_count, nullptr, + reader.PlainTemplated>(*read_buf, defines, read_count, nullptr, result_offset, result); } diff --git a/extension/parquet/include/column_reader.hpp b/extension/parquet/include/column_reader.hpp index 834c8a4ddfa8..1bbefc71b7d7 100644 --- a/extension/parquet/include/column_reader.hpp +++ b/extension/parquet/include/column_reader.hpp @@ -96,22 +96,22 @@ class ColumnReader { virtual unique_ptr Stats(idx_t row_group_idx_p, const vector &columns); template - void PlainTemplated(shared_ptr plain_data, uint8_t *defines, uint64_t num_values, - parquet_filter_t *filter, idx_t result_offset, Vector &result) { + void PlainTemplated(ByteBuffer &plain_data, uint8_t *defines, uint64_t num_values, parquet_filter_t *filter, + idx_t result_offset, Vector &result) { if (HasDefines()) { - if (CONVERSION::PlainAvailable(*plain_data, num_values)) { - PlainTemplatedInternal(*plain_data, defines, num_values, filter, + if (CONVERSION::PlainAvailable(plain_data, num_values)) { + PlainTemplatedInternal(plain_data, defines, num_values, filter, result_offset, result); } else { - PlainTemplatedInternal(*plain_data, defines, num_values, filter, + PlainTemplatedInternal(plain_data, defines, num_values, filter, result_offset, result); } } else { - if (CONVERSION::PlainAvailable(*plain_data, num_values)) { - PlainTemplatedInternal(*plain_data, defines, num_values, filter, + if (CONVERSION::PlainAvailable(plain_data, num_values)) { + PlainTemplatedInternal(plain_data, defines, num_values, filter, result_offset, result); } else { - PlainTemplatedInternal(*plain_data, defines, num_values, filter, + PlainTemplatedInternal(plain_data, defines, num_values, filter, result_offset, result); } } @@ -142,10 +142,10 @@ class ColumnReader { protected: Allocator &GetAllocator(); // readers that use the default Read() need to implement those - virtual void Plain(shared_ptr plain_data, uint8_t *defines, idx_t num_values, parquet_filter_t *filter, + virtual void Plain(ByteBuffer &plain_data, uint8_t *defines, idx_t num_values, parquet_filter_t *filter, idx_t result_offset, Vector &result); - // these are nops for most types, but not for strings - virtual void PlainReference(shared_ptr &, Vector &result); + virtual void Plain(shared_ptr &plain_data, uint8_t *defines, idx_t num_values, + parquet_filter_t *filter, idx_t result_offset, Vector &result); // applies any skips that were registered using Skip() virtual void ApplyPendingSkips(idx_t num_values); diff --git a/extension/parquet/include/reader/null_column_reader.hpp b/extension/parquet/include/reader/null_column_reader.hpp index 10cda3f27417..9d7d9734b6bb 100644 --- a/extension/parquet/include/reader/null_column_reader.hpp +++ b/extension/parquet/include/reader/null_column_reader.hpp @@ -25,7 +25,7 @@ class NullColumnReader : public ColumnReader { shared_ptr dict; public: - void Plain(shared_ptr plain_data, uint8_t *defines, uint64_t num_values, parquet_filter_t *filter, + void Plain(ByteBuffer &plain_data, uint8_t *defines, uint64_t num_values, parquet_filter_t *filter, idx_t result_offset, Vector &result) override { (void)defines; (void)plain_data; diff --git a/extension/parquet/include/reader/string_column_reader.hpp b/extension/parquet/include/reader/string_column_reader.hpp index 08a0ebac9e35..4671671680cc 100644 --- a/extension/parquet/include/reader/string_column_reader.hpp +++ b/extension/parquet/include/reader/string_column_reader.hpp @@ -21,7 +21,7 @@ struct StringParquetValueConversion { static void UnsafePlainSkip(ByteBuffer &plain_data, ColumnReader &reader); }; -class StringColumnReader : public TemplatedColumnReader { +class StringColumnReader : public ColumnReader { public: static constexpr const PhysicalType TYPE = PhysicalType::VARCHAR; @@ -35,7 +35,12 @@ class StringColumnReader : public TemplatedColumnReader &plain_data, Vector &result) override; + void Plain(ByteBuffer &plain_data, uint8_t *defines, idx_t num_values, parquet_filter_t *filter, + idx_t result_offset, Vector &result) override { + throw NotImplementedException("StringColumnReader can only read plain data from a shared buffer"); + } + void Plain(shared_ptr &plain_data, uint8_t *defines, idx_t num_values, parquet_filter_t *filter, + idx_t result_offset, Vector &result) override; }; } // namespace duckdb diff --git a/extension/parquet/include/reader/templated_column_reader.hpp b/extension/parquet/include/reader/templated_column_reader.hpp index ea40b12c70d2..06fbdc9e2306 100644 --- a/extension/parquet/include/reader/templated_column_reader.hpp +++ b/extension/parquet/include/reader/templated_column_reader.hpp @@ -58,10 +58,9 @@ class TemplatedColumnReader : public ColumnReader { } } - void Plain(shared_ptr plain_data, uint8_t *defines, uint64_t num_values, parquet_filter_t *filter, + void Plain(ByteBuffer &plain_data, uint8_t *defines, uint64_t num_values, parquet_filter_t *filter, idx_t result_offset, Vector &result) override { - PlainTemplated(std::move(plain_data), defines, num_values, filter, result_offset, - result); + PlainTemplated(plain_data, defines, num_values, filter, result_offset, result); } }; diff --git a/extension/parquet/reader/string_column_reader.cpp b/extension/parquet/reader/string_column_reader.cpp index 45e5cc47e724..0f49fff74980 100644 --- a/extension/parquet/reader/string_column_reader.cpp +++ b/extension/parquet/reader/string_column_reader.cpp @@ -10,8 +10,7 @@ namespace duckdb { //===--------------------------------------------------------------------===// StringColumnReader::StringColumnReader(ParquetReader &reader, LogicalType type_p, const SchemaElement &schema_p, idx_t schema_idx_p, idx_t max_define_p, idx_t max_repeat_p) - : TemplatedColumnReader(reader, std::move(type_p), schema_p, schema_idx_p, - max_define_p, max_repeat_p) { + : ColumnReader(reader, std::move(type_p), schema_p, schema_idx_p, max_define_p, max_repeat_p) { fixed_width_string_length = 0; if (schema_p.type == Type::FIXED_LEN_BYTE_ARRAY) { D_ASSERT(schema_p.__isset.type_length); @@ -49,8 +48,11 @@ class ParquetStringVectorBuffer : public VectorBuffer { shared_ptr buffer; }; -void StringColumnReader::PlainReference(shared_ptr &plain_data, Vector &result) { +void StringColumnReader::Plain(shared_ptr &plain_data, uint8_t *defines, idx_t num_values, + parquet_filter_t *filter, idx_t result_offset, Vector &result) { StringVector::AddBuffer(result, make_buffer(plain_data)); + PlainTemplated(*plain_data, defines, num_values, filter, result_offset, + result); } string_t StringParquetValueConversion::PlainRead(ByteBuffer &plain_data, ColumnReader &reader) { From b6df17a11312c823f21864673f1b5afc809d88ef Mon Sep 17 00:00:00 2001 From: Mytherin Date: Thu, 6 Feb 2025 22:30:00 +0100 Subject: [PATCH 3/3] Share encoding buffers between all encoders --- .../decoder/byte_stream_split_decoder.cpp | 15 ++++++++------- .../decoder/delta_binary_packed_decoder.cpp | 18 +++++++++--------- .../decoder/delta_byte_array_decoder.cpp | 1 + extension/parquet/decoder/rle_decoder.cpp | 12 ++++++------ .../decoder/byte_stream_split_decoder.hpp | 1 + .../decoder/delta_binary_packed_decoder.hpp | 1 + .../parquet/include/decoder/rle_decoder.hpp | 1 + extension/parquet/include/resizable_buffer.hpp | 4 ++++ 8 files changed, 31 insertions(+), 22 deletions(-) diff --git a/extension/parquet/decoder/byte_stream_split_decoder.cpp b/extension/parquet/decoder/byte_stream_split_decoder.cpp index a1ff33ffbdcc..86f44ee6709b 100644 --- a/extension/parquet/decoder/byte_stream_split_decoder.cpp +++ b/extension/parquet/decoder/byte_stream_split_decoder.cpp @@ -4,7 +4,8 @@ namespace duckdb { -ByteStreamSplitDecoder::ByteStreamSplitDecoder(ColumnReader &reader) : reader(reader) { +ByteStreamSplitDecoder::ByteStreamSplitDecoder(ColumnReader &reader) + : reader(reader), decoded_data_buffer(reader.encoding_buffers[0]) { } void ByteStreamSplitDecoder::InitializePage() { @@ -26,22 +27,22 @@ void ByteStreamSplitDecoder::Read(uint8_t *defines, idx_t read_count, Vector &re } idx_t valid_count = read_count - null_count; - auto read_buf = make_shared_ptr(); auto &allocator = reader.reader.allocator; + decoded_data_buffer.reset(); switch (reader.schema.type) { case duckdb_parquet::Type::FLOAT: - read_buf->resize(allocator, sizeof(float) * valid_count); - bss_decoder->GetBatch(read_buf->ptr, valid_count); + decoded_data_buffer.resize(allocator, sizeof(float) * valid_count); + bss_decoder->GetBatch(decoded_data_buffer.ptr, valid_count); break; case duckdb_parquet::Type::DOUBLE: - read_buf->resize(allocator, sizeof(double) * valid_count); - bss_decoder->GetBatch(read_buf->ptr, valid_count); + decoded_data_buffer.resize(allocator, sizeof(double) * valid_count); + bss_decoder->GetBatch(decoded_data_buffer.ptr, valid_count); break; default: throw std::runtime_error("BYTE_STREAM_SPLIT encoding is only supported for FLOAT or DOUBLE data"); } - reader.Plain(read_buf, defines, read_count, nullptr, result_offset, result); + reader.Plain(decoded_data_buffer, defines, read_count, nullptr, result_offset, result); } } // namespace duckdb diff --git a/extension/parquet/decoder/delta_binary_packed_decoder.cpp b/extension/parquet/decoder/delta_binary_packed_decoder.cpp index 5e7db917a0e9..0dbcd5b0b606 100644 --- a/extension/parquet/decoder/delta_binary_packed_decoder.cpp +++ b/extension/parquet/decoder/delta_binary_packed_decoder.cpp @@ -4,7 +4,8 @@ namespace duckdb { -DeltaBinaryPackedDecoder::DeltaBinaryPackedDecoder(ColumnReader &reader) : reader(reader) { +DeltaBinaryPackedDecoder::DeltaBinaryPackedDecoder(ColumnReader &reader) + : reader(reader), decoded_data_buffer(reader.encoding_buffers[0]) { } void DeltaBinaryPackedDecoder::InitializePage() { @@ -14,9 +15,6 @@ void DeltaBinaryPackedDecoder::InitializePage() { } void DeltaBinaryPackedDecoder::Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t result_offset) { - // TODO keep this in the state - auto read_buf = make_shared_ptr(); - idx_t null_count = 0; if (defines) { // we need the null count because the dictionary offsets have no entries for nulls @@ -27,22 +25,24 @@ void DeltaBinaryPackedDecoder::Read(uint8_t *defines, idx_t read_count, Vector & idx_t valid_count = read_count - null_count; auto &allocator = reader.reader.allocator; + + decoded_data_buffer.reset(); switch (reader.schema.type) { case duckdb_parquet::Type::INT32: - read_buf->resize(allocator, sizeof(int32_t) * (valid_count)); - dbp_decoder->GetBatch(read_buf->ptr, valid_count); + decoded_data_buffer.resize(allocator, sizeof(int32_t) * (valid_count)); + dbp_decoder->GetBatch(decoded_data_buffer.ptr, valid_count); break; case duckdb_parquet::Type::INT64: - read_buf->resize(allocator, sizeof(int64_t) * (valid_count)); - dbp_decoder->GetBatch(read_buf->ptr, valid_count); + decoded_data_buffer.resize(allocator, sizeof(int64_t) * (valid_count)); + dbp_decoder->GetBatch(decoded_data_buffer.ptr, valid_count); break; default: throw std::runtime_error("DELTA_BINARY_PACKED should only be INT32 or INT64"); } // Plain() will put NULLs in the right place - reader.Plain(read_buf, defines, read_count, nullptr, result_offset, result); + reader.Plain(decoded_data_buffer, defines, read_count, nullptr, result_offset, result); } } // namespace duckdb diff --git a/extension/parquet/decoder/delta_byte_array_decoder.cpp b/extension/parquet/decoder/delta_byte_array_decoder.cpp index 0d3f4966e95e..6a95fec802c3 100644 --- a/extension/parquet/decoder/delta_byte_array_decoder.cpp +++ b/extension/parquet/decoder/delta_byte_array_decoder.cpp @@ -12,6 +12,7 @@ void DeltaByteArrayDecoder::ReadDbpData(Allocator &allocator, ResizeableBuffer & idx_t &value_count) { auto decoder = make_uniq(buffer.ptr, buffer.len); value_count = decoder->TotalValues(); + result_buffer.reset(); result_buffer.resize(allocator, sizeof(uint32_t) * value_count); decoder->GetBatch(result_buffer.ptr, value_count); decoder->Finalize(); diff --git a/extension/parquet/decoder/rle_decoder.cpp b/extension/parquet/decoder/rle_decoder.cpp index 88c098de4760..02690b95b203 100644 --- a/extension/parquet/decoder/rle_decoder.cpp +++ b/extension/parquet/decoder/rle_decoder.cpp @@ -5,7 +5,7 @@ namespace duckdb { -RLEDecoder::RLEDecoder(ColumnReader &reader) : reader(reader) { +RLEDecoder::RLEDecoder(ColumnReader &reader) : reader(reader), decoded_data_buffer(reader.encoding_buffers[0]) { } void RLEDecoder::InitializePage() { @@ -28,11 +28,11 @@ void RLEDecoder::Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t } } idx_t valid_count = read_count - null_count; - auto read_buf = make_shared_ptr(); - read_buf->resize(reader.reader.allocator, sizeof(bool) * valid_count); - rle_decoder->GetBatch(read_buf->ptr, valid_count); - reader.PlainTemplated>(*read_buf, defines, read_count, nullptr, - result_offset, result); + decoded_data_buffer.reset(); + decoded_data_buffer.resize(reader.reader.allocator, sizeof(bool) * valid_count); + rle_decoder->GetBatch(decoded_data_buffer.ptr, valid_count); + reader.PlainTemplated>(decoded_data_buffer, defines, read_count, + nullptr, result_offset, result); } } // namespace duckdb diff --git a/extension/parquet/include/decoder/byte_stream_split_decoder.hpp b/extension/parquet/include/decoder/byte_stream_split_decoder.hpp index 05766288139d..0fce221c5c24 100644 --- a/extension/parquet/include/decoder/byte_stream_split_decoder.hpp +++ b/extension/parquet/include/decoder/byte_stream_split_decoder.hpp @@ -24,6 +24,7 @@ class ByteStreamSplitDecoder { private: ColumnReader &reader; + ResizeableBuffer &decoded_data_buffer; unique_ptr bss_decoder; }; diff --git a/extension/parquet/include/decoder/delta_binary_packed_decoder.hpp b/extension/parquet/include/decoder/delta_binary_packed_decoder.hpp index 4a8f858c5fc5..66f210b1e6b7 100644 --- a/extension/parquet/include/decoder/delta_binary_packed_decoder.hpp +++ b/extension/parquet/include/decoder/delta_binary_packed_decoder.hpp @@ -25,6 +25,7 @@ class DeltaBinaryPackedDecoder { private: ColumnReader &reader; + ResizeableBuffer &decoded_data_buffer; unique_ptr dbp_decoder; }; diff --git a/extension/parquet/include/decoder/rle_decoder.hpp b/extension/parquet/include/decoder/rle_decoder.hpp index c1e482c4ab4c..6717bdbe7e4f 100644 --- a/extension/parquet/include/decoder/rle_decoder.hpp +++ b/extension/parquet/include/decoder/rle_decoder.hpp @@ -24,6 +24,7 @@ class RLEDecoder { private: ColumnReader &reader; + ResizeableBuffer &decoded_data_buffer; unique_ptr rle_decoder; }; diff --git a/extension/parquet/include/resizable_buffer.hpp b/extension/parquet/include/resizable_buffer.hpp index 1e225d510320..be00a0a7a75e 100644 --- a/extension/parquet/include/resizable_buffer.hpp +++ b/extension/parquet/include/resizable_buffer.hpp @@ -103,6 +103,10 @@ class ResizeableBuffer : public ByteBuffer { ptr = allocated_data.get(); } } + void reset() { + ptr = allocated_data.get(); + len = alloc_len; + } private: AllocatedData allocated_data;