8000 Parquet Reader: Split DeltaLengthByteArray decoder from DeltaByteArray, and read the strings in a streaming manner by Mytherin · Pull Request #16105 · duckdb/duckdb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Parquet Reader: Split DeltaLengthByteArray decoder from DeltaByteArray, and read the strings in a streaming manner #16105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions extension/parquet/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ ColumnReader::ColumnReader(ParquetReader &reader, LogicalType type_p, const Sche
idx_t max_define_p, idx_t max_repeat_p)
: schema(schema_p), file_idx(file_idx_p), max_define(max_define_p), max_repeat(max_repeat_p), reader(reader),
type(std::move(type_p)), page_rows_available(0), dictionary_decoder(*this), delta_binary_packed_decoder(*this),
rle_decoder(*this), delta_byte_array_decoder(*this), byte_stream_split_decoder(*this) {
rle_decoder(*this), delta_length_byte_array_decoder(*this), delta_byte_array_decoder(*this),
byte_stream_split_decoder(*this) {

// dummies for Skip()
dummy_define.resize(reader.allocator, STANDARD_VECTOR_SIZE);
Expand Down Expand Up @@ -450,12 +451,12 @@ void ColumnReader::PrepareDataPage(PageHeader &page_hdr) {
}
case Encoding::DELTA_LENGTH_BYTE_ARRAY: {
encoding = ColumnEncoding::DELTA_LENGTH_BYTE_ARRAY;
delta_byte_array_decoder.InitializeDeltaLengthByteArray();
delta_length_byte_array_decoder.InitializePage();
break;
}
case Encoding::DELTA_BYTE_ARRAY: {
encoding = ColumnEncoding::DELTA_BYTE_ARRAY;
delta_byte_array_decoder.InitializeDeltaByteArray();
delta_byte_array_decoder.InitializePage();
break;
}
case Encoding::BYTE_STREAM_SPLIT: {
Expand Down Expand Up @@ -520,9 +521,9 @@ idx_t ColumnReader::Read(uint64_t num_values, parquet_filter_t &filter, data_ptr
delta_binary_packed_decoder.Read(define_ptr, read_now, result, result_offset);
} else if (encoding == ColumnEncoding::RLE) {
rle_decoder.Read(define_ptr, read_now, result, result_offset);
} else if (encoding == ColumnEncoding::DELTA_LENGTH_BYTE_ARRAY ||
encoding == ColumnEncoding::DELTA_BYTE_ARRAY) {
// DELTA_BYTE_ARRAY or DELTA_LENGTH_BYTE_ARRAY
} else if (encoding == ColumnEncoding::DELTA_LENGTH_BYTE_ARRAY) {
delta_length_byte_array_decoder.Read(define_ptr, read_now, result, result_offset);
} else if (encoding == ColumnEncoding::DELTA_BYTE_ARRAY) {
delta_byte_array_decoder.Read(define_ptr, read_now, result, result_offset);
} else if (encoding == ColumnEncoding::BYTE_STREAM_SPLIT) {
byte_stream_split_decoder.Read(define_ptr, read_now, result, result_offset);
Expand Down
1 change: 1 addition & 0 deletions extension/parquet/decoder/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ add_library_unity(
byte_stream_split_decoder.cpp
delta_binary_packed_decoder.cpp
delta_byte_array_decoder.cpp
delta_length_byte_array_decoder.cpp
dictionary_decoder.cpp
rle_decoder.cpp)
set(PARQUET_EXTENSION_FILES
Expand Down
34 changes: 3 additions & 31 deletions extension/parquet/decoder/delta_byte_array_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ namespace duckdb {
DeltaByteArrayDecoder::DeltaByteArrayDecoder(ColumnReader &reader) : reader(reader) {
}

static shared_ptr<ResizeableBuffer> ReadDbpData(Allocator &allocator, ResizeableBuffer &buffer, idx_t &value_count) {
shared_ptr<ResizeableBuffer> DeltaByteArrayDecoder::ReadDbpData(Allocator &allocator, ResizeableBuffer &buffer,
idx_t &value_count) {
auto decoder = make_uniq<DbpDecoder>(buffer.ptr, buffer.len);
value_count = decoder->TotalValues();
auto result = make_shared_ptr<ResizeableBuffer>();
Expand All @@ -19,36 +20,7 @@ static shared_ptr<ResizeableBuffer> ReadDbpData(Allocator &allocator, Resizeable
return result;
}

void DeltaByteArrayDecoder::InitializeDeltaLengthByteArray() {
if (reader.type.InternalType() != PhysicalType::VARCHAR) {
throw std::runtime_error("Delta Length Byte Array encoding is only supported for string/blob data");
}
auto &block = *reader.block;
auto &allocator = reader.reader.allocator;
idx_t value_count;
auto length_buffer = ReadDbpData(allocator, block, value_count);
if (value_count == 0) {
// no values
byte_array_data = make_uniq<Vector>(LogicalType::VARCHAR, nullptr);
return;
}
auto length_data = reinterpret_cast<uint32_t *>(length_buffer->ptr);
byte_array_data = make_uniq<Vector>(LogicalType::VARCHAR, value_count);
byte_array_count = value_count;
delta_offset = 0;
auto string_data = FlatVector::GetData<string_t>(*byte_array_data);
for (idx_t i = 0; i < value_count; i++) {
auto str_len = length_data[i];
block.available(str_len);
string_data[i] = StringVector::EmptyString(*byte_array_data, str_len);
auto result_data = string_data[i].GetDataWriteable();
memcpy(result_data, block.ptr, length_data[i]);
block.inc(length_data[i]);
string_data[i].Finalize();
}
}

void DeltaByteArrayDecoder::InitializeDeltaByteArray() {
void DeltaByteArrayDecoder::InitializePage() {
if (reader.type.InternalType() != PhysicalType::VARCHAR) {
throw std::runtime_error("Delta Byte Array encoding is only supported for string/blob data");
}
Expand Down
53 changes: 53 additions & 0 deletions extension/parquet/decoder/delta_length_byte_array_decoder.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#include "decoder/delta_length_byte_array_decoder.hpp"
#include "decoder/delta_byte_array_decoder.hpp"
#include "column_reader.hpp"
#include "parquet_reader.hpp"
#include "reader/templated_column_reader.hpp"

namespace duckdb {

DeltaLengthByteArrayDecoder::DeltaLengthByteArrayDecoder(ColumnReader &reader) : reader(reader) {
}

void DeltaLengthByteArrayDecoder::InitializePage() {
if (reader.type.InternalType() != PhysicalType::VARCHAR) {
throw std::runtime_error("Delta Length Byte Array encoding is only supported for string/blob data");
}
// read the binary packed lengths
auto &block = *reader.block;
auto &allocator = reader.reader.allocator;
length_buffer = DeltaByteArrayDecoder::ReadDbpData(allocator, block, byte_array_count);
length_idx = 0;
}

void DeltaLengthByteArrayDecoder::Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t result_offset) {
if (!length_buffer) {
throw std::runtime_error("Internal error - DeltaLengthByteArrayDecoder called but there was no length buffer");
}
auto &block = *reader.block;
auto length_data = reinterpret_cast<uint32_t *>(length_buffer->ptr);
auto result_data = FlatVector::GetData<string_t>(result);
auto &result_mask = FlatVector::Validity(result);
for (idx_t row_idx = 0; row_idx < read_count; row_idx++) {
auto result_idx = result_offset + row_idx;
if (defines && defines[result_idx] != reader.max_define) {
result_mask.SetInvalid(result_idx);
continue;
}
if (length_idx >= byte_array_count) {
throw IOException(
"DELTA_LENGTH_BYTE_ARRAY - length mismatch between values and byte array lengths (attempted "
"read of %d from %d entries) - corrupt file?",
length_idx, byte_array_count);
}
auto str_len = length_data[length_idx++];
block.available(str_len);
result_data[result_idx] = StringVector::EmptyString(result, str_len);
auto str_data = result_data[result_idx].GetDataWriteable();
memcpy(str_data, block.ptr, str_len);
block.inc(str_len);
result_data[result_idx].Finalize();
}
}

} // namespace duckdb
3 changes: 3 additions & 0 deletions extension/parquet/include/column_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "decoder/delta_binary_packed_decoder.hpp"
#include "decoder/dictionary_decoder.hpp"
#include "decoder/rle_decoder.hpp"
#include "decoder/delta_length_byte_array_decoder.hpp"
#include "decoder/delta_byte_array_decoder.hpp"
#ifndef DUCKDB_AMALGAMATION

Expand Down Expand Up @@ -56,6 +57,7 @@ class ColumnReader {
friend class ByteStreamSplitDecoder;
friend class DeltaBinaryPackedDecoder;
friend class DeltaByteArrayDecoder;
friend class DeltaLengthByteArrayDecoder;
friend class DictionaryDecoder;
friend class RLEDecoder;

Expand Down Expand Up @@ -197,6 +199,7 @@ class ColumnReader {
DictionaryDecoder dictionary_decoder;
DeltaBinaryPackedDecoder delta_binary_packed_decoder;
RLEDecoder rle_decoder;
DeltaLengthByteArrayDecoder delta_length_byte_array_decoder;
DeltaByteArrayDecoder delta_byte_array_decoder;
ByteStreamSplitDecoder byte_stream_split_decoder;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ class DeltaByteArrayDecoder {
explicit DeltaByteArrayDecoder(ColumnReader &reader);

public:
void InitializeDeltaLengthByteArray();
void InitializeDeltaByteArray();
void InitializePage();

void Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t result_offset);

static shared_ptr<ResizeableBuffer> ReadDbpData(Allocator &allocator, ResizeableBuffer &buffer, idx_t &value_count);

private:
ColumnReader &reader;
unique_ptr<Vector> byte_array_data;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// decoder/delta_length_byte_array_decoder.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include "duckdb.hpp"
#include "parquet_dbp_decoder.hpp"
#include "resizable_buffer.hpp"

namespace duckdb {
class ColumnReader;

class DeltaLengthByteArrayDecoder {
public:
explicit DeltaLengthByteArrayDecoder(ColumnReader &reader);

public:
void InitializePage();

void Read(uint8_t *defines, idx_t read_count, Vector &result, idx_t result_offset);

private:
ColumnReader &reader;
shared_ptr<ResizeableBuffer> length_buffer;
idx_t byte_array_count = 0;
idx_t length_idx;
};

} // namespace duckdb
Loading
0