diff --git a/.github/workflows/CloudTesting.yml b/.github/workflows/CloudTesting.yml index 593694e..5b0201c 100644 --- a/.github/workflows/CloudTesting.yml +++ b/.github/workflows/CloudTesting.yml @@ -39,7 +39,7 @@ jobs: - name: Setup vcpkg uses: lukka/run-vcpkg@v11.1 with: - vcpkgGitCommitId: a1a1cbc975abf909a6c8985a6a2b8fe20bbd9bd6 + vcpkgGitCommitId: 5e5d0e1cd7785623065e77eff011afdeec1a3574 - name: Configure OpenSSL for Rust run: | diff --git a/.github/workflows/LocalTesting.yml b/.github/workflows/LocalTesting.yml index 5c425b8..2a7b57d 100644 --- a/.github/workflows/LocalTesting.yml +++ b/.github/workflows/LocalTesting.yml @@ -40,7 +40,7 @@ jobs: - name: Setup vcpkg uses: lukka/run-vcpkg@v11.1 with: - vcpkgGitCommitId: a1a1cbc975abf909a6c8985a6a2b8fe20bbd9bd6 + vcpkgGitCommitId: 5e5d0e1cd7785623065e77eff011afdeec1a3574 - uses: actions/setup-node@v4 @@ -121,7 +121,7 @@ jobs: - name: Setup vcpkg uses: lukka/run-vcpkg@v11.1 with: - vcpkgGitCommitId: a1a1cbc975abf909a6c8985a6a2b8fe20bbd9bd6 + vcpkgGitCommitId: 5e5d0e1cd7785623065e77eff011afdeec1a3574 - name: Configure OpenSSL for Rust run: | @@ -204,7 +204,7 @@ jobs: - name: Setup vcpkg uses: lukka/run-vcpkg@v11.1 with: - vcpkgGitCommitId: a1a1cbc975abf909a6c8985a6a2b8fe20bbd9bd6 + vcpkgGitCommitId: 5e5d0e1cd7785623065e77eff011afdeec1a3574 - name: Configure OpenSSL for Rust run: | @@ -254,7 +254,7 @@ jobs: - name: Setup vcpkg uses: lukka/run-vcpkg@v11.1 with: - vcpkgGitCommitId: a1a1cbc975abf909a6c8985a6a2b8fe20bbd9bd6 + vcpkgGitCommitId: 5e5d0e1cd7785623065e77eff011afdeec1a3574 - name: Configure OpenSSL for Rust run: | diff --git a/.github/workflows/MainDistributionPipeline.yml b/.github/workflows/MainDistributionPipeline.yml index 955efdd..51cf3db 100644 --- a/.github/workflows/MainDistributionPipeline.yml +++ b/.github/workflows/MainDistributionPipeline.yml @@ -14,10 +14,10 @@ concurrency: jobs: duckdb-stable-build: name: Build extension binaries - uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@v1.2.1 + uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@v1.3.0 with: - duckdb_version: v1.2.1 - ci_tools_version: v1.2.1 + duckdb_version: v1.3.0 + ci_tools_version: v1.3.0 extension_name: delta enable_rust: true exclude_archs: 'wasm_mvp;wasm_eh;wasm_threads;windows_amd64_rtools;windows_amd64_mingw;linux_amd64_musl' @@ -26,11 +26,11 @@ jobs: duckdb-stable-deploy: name: Deploy extension binaries needs: duckdb-stable-build - uses: duckdb/extension-ci-tools/.github/workflows/_extension_deploy.yml@v1.2.1 + uses: duckdb/extension-ci-tools/.github/workflows/_extension_deploy.yml@v1.3.0 secrets: inherit with: extension_name: delta - duckdb_version: v1.2.1 - ci_tools_version: v1.2.1 + duckdb_version: v1.3.0 + ci_tools_version: v1.3.0 exclude_archs: 'wasm_mvp;wasm_eh;wasm_threads;windows_amd64_rtools;windows_amd64_mingw;linux_amd64_musl' - deploy_latest: ${{ startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main' }} \ No newline at end of file + deploy_latest: true \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 6c61530..7b5fc0c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 2.8.12) +cmake_minimum_required(VERSION 3.5) include(ExternalProject) # Core config @@ -14,6 +14,7 @@ include_directories(src/include) set(EXTENSION_SOURCES src/delta_extension.cpp src/delta_functions.cpp + src/delta_log_types.cpp src/delta_macros.cpp src/delta_utils.cpp src/functions/delta_scan/delta_scan.cpp @@ -143,7 +144,7 @@ ExternalProject_Add( # the c++ headers. Currently, when bumping the kernel version, the produced # header in ./src/include/delta_kernel_ffi.hpp should be also bumped, applying # the fix - GIT_TAG v0.7.0 + GIT_TAG v0.8.0 # Prints the env variables passed to the cargo build to the terminal, useful # in debugging because passing them through CMake is an error-prone mess CONFIGURE_COMMAND ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} @@ -153,8 +154,9 @@ ExternalProject_Add( # Build debug build BUILD_COMMAND ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} cargo build - --package delta_kernel_ffi --workspace $<$:--release> - --all-features ${RUST_PLATFORM_PARAM} + --package delta_kernel_ffi --workspace + $<$,$>:--release> --all-features + ${RUST_PLATFORM_PARAM} # Build DATs COMMAND ${CMAKE_COMMAND} -E env ${RUST_UNSET_ENV_VARS} ${RUST_ENV_VARS} cargo build diff --git a/Makefile b/Makefile index 7eb8376..1f3ef3c 100644 --- a/Makefile +++ b/Makefile @@ -36,4 +36,4 @@ include benchmark/benchmark.Makefile # Generate some test data to test with generate-data: python3 -m pip install delta-spark duckdb pandas deltalake pyspark - python3 scripts/generate_test_data.py + python3 scripts/data_generator/generate_test_data.py diff --git a/duckdb b/duckdb index 8e52ec4..71c5c07 160000 --- a/duckdb +++ b/duckdb @@ -1 +1 @@ -Subproject commit 8e52ec43959ab363643d63cb78ee214577111da4 +Subproject commit 71c5c07cdd295e9409c0505885033ae9eb6b5ddd diff --git a/extension-ci-tools b/extension-ci-tools index 58970c5..71d2002 160000 --- a/extension-ci-tools +++ b/extension-ci-tools @@ -1 +1 @@ -Subproject commit 58970c538d35919db875096460c05806056f4de0 +Subproject commit 71d20029c5314dfc34f3bbdab808b9bce03b8003 diff --git a/scripts/data_generator/delta_rs_generator/__init__.py b/scripts/data_generator/delta_rs_generator/__init__.py new file mode 100644 index 0000000..f20b7b0 --- /dev/null +++ b/scripts/data_generator/delta_rs_generator/__init__.py @@ -0,0 +1,106 @@ +from deltalake import DeltaTable, write_deltalake +from pyspark.sql import SparkSession +from delta import * +from pyspark.sql.functions import * +import duckdb +import pandas as pd +import os +import shutil +import math +import glob + +def generate_test_data_delta_rs_multi(base_path, path, init, tables, splits = 1): + """ + generate_test_data_delta_rs generates some test data using delta-rs and duckdb + + :param path: the test data path (prefixed with BASE_PATH) + :param init: a duckdb query initializes the duckdb tables that will be written + :param tables: list of dicts containing the fields: name, query, (optionally) part_column + :return: describe what it returns + """ + generated_path = f"{base_path}/{path}" + + if (os.path.isdir(generated_path)): + return + try: + os.makedirs(f"{generated_path}") + + # First we write a DuckDB file TODO: this should go in N appends as well? + con = duckdb.connect(f"{generated_path}/duckdb.db") + + con.sql(init) + + # Then we write the parquet files + for table in tables: + total_count = con.sql(f"select count(*) from ({table['query']})").fetchall()[0][0] + # At least 1 tuple per file + if total_count < splits: + splits = total_count + tuples_per_file = total_count // splits + remainder = total_count % splits + + file_no = 0 + write_from = 0 + while file_no < splits: + os.makedirs(f"{generated_path}/{table['name']}/parquet", exist_ok=True) + # Write DuckDB's reference data + write_to = write_from + tuples_per_file + (1 if file_no < remainder else 0) + con.sql(f"COPY ({table['query']} where rowid >= {write_from} and rowid < {write_to}) to '{generated_path}/{table['name']}/parquet/data_{file_no}.parquet' (FORMAT parquet)") + file_no += 1 + write_from = write_to + + for table in tables: + con = duckdb.connect(f"{generated_path}/duckdb.db") + file_list = list(glob.glob(f"{generated_path}/{table['name']}/parquet/*.parquet")) + file_list = sorted(file_list) + for file in file_list: + test_table_df = con.sql(f'from "{file}"').arrow() + os.makedirs(f"{generated_path}/{table['name']}/delta_lake", exist_ok=True) + write_deltalake(f"{generated_path}/{table['name']}/delta_lake", test_table_df, mode="append") + except: + if (os.path.isdir(generated_path)): + shutil.rmtree(generated_path) + raise + +def generate_test_data_delta_rs(base_path, path, query, part_column=False, add_golden_table=True): + """ + generate_test_data_delta_rs generates some test data using delta-rs and duckdb + + :param path: the test data path (prefixed with base_path) + :param query: a duckdb query that produces a table called 'test_table' + :param part_column: Optionally the name of the column to partition by + :return: describe what it returns + """ + + + generated_path = f"{base_path}/{path}" + + if (os.path.isdir(generated_path)): + return + + try: + con = duckdb.connect() + + con.sql(query) + + # Write delta table data + test_table_df = con.sql("FROM test_table;").df() + if (part_column): + write_deltalake(f"{generated_path}/delta_lake", test_table_df, partition_by=[part_column]) + else: + write_deltalake(f"{generated_path}/delta_lake", test_table_df) + + if add_golden_table: + # Write DuckDB's reference data + os.mkdir(f'{generated_path}/duckdb') + if (part_column): + con.sql(f"COPY test_table to '{generated_path}/duckdb' (FORMAT parquet, PARTITION_BY {part_column})") + else: + con.sql(f"COPY test_table to '{generated_path}/duckdb/data.parquet' (FORMAT parquet)") + except: + if (os.path.isdir(generated_path)): + shutil.rmtree(generated_path) + raise + + +__all__ = ["generate_test_data_delta_rs", "generate_test_data_delta_rs_multi"] \ No newline at end of file diff --git a/scripts/data_generator/generate_test_data.py b/scripts/data_generator/generate_test_data.py new file mode 100644 index 0000000..388d738 --- /dev/null +++ b/scripts/data_generator/generate_test_data.py @@ -0,0 +1,172 @@ +from deltalake import DeltaTable, write_deltalake +from pyspark.sql import SparkSession +from delta import * +from pyspark.sql.functions import * +import duckdb +import pandas as pd +import os +import shutil +import math +import glob + +BASE_PATH = os.path.dirname(os.path.realpath(__file__)) + "/../../data/generated" +TMP_PATH = '/tmp' + +from delta_rs_generator import * +from pyspark_generator import * + +################################################ +### TPC-H +################################################ + +### TPC-H SF1 DELTA-RS +init = "call dbgen(sf=0.01);" +tables = ["customer","lineitem","nation","orders","part","partsupp","region","supplier"] +queries = [f"from {x}" for x in tables] +tables = [{'name': x[0], 'query':x[1]} for x in zip(tables,queries)] +generate_test_data_delta_rs_multi(BASE_PATH, "delta_rs_tpch_sf0_01", init, tables) + +## TPC-H SF1 PYSPARK +if (not os.path.isdir(BASE_PATH + '/tpch_sf1')): + con = duckdb.connect() + con.query(f"call dbgen(sf=1); EXPORT DATABASE '{TMP_PATH}/tpch_sf1_export' (FORMAT parquet)") + for table in ["customer","lineitem","nation","orders","part","partsupp","region","supplier"]: + generate_test_data_pyspark(BASE_PATH,f"tpch_sf1_{table}", f'tpch_sf1/{table}', f'{TMP_PATH}/tpch_sf1_export/{table}.parquet') + con.query(f"attach '{BASE_PATH + '/tpch_sf1/duckdb.db'}' as duckdb_out") + for table in ["customer","lineitem","nation","orders","part","partsupp","region","supplier"]: + con.query(f"create table duckdb_out.{table} as from {table}") + +################################################ +### TPC-DS +################################################ + +## TPC-DS SF0.01 full dataset +con = duckdb.connect() +con.query(f"call dsdgen(sf=0.01); EXPORT DATABASE '{TMP_PATH}/tpcds_sf0_01_export' (FORMAT parquet)") +for table in ["call_center","catalog_page","catalog_returns","catalog_sales","customer","customer_demographics","customer_address","date_dim","household_demographics","inventory","income_band","item","promotion","reason","ship_mode","store","store_returns","store_sales","time_dim","warehouse","web_page","web_returns","web_sales","web_site"]: + generate_test_data_pyspark(BASE_PATH,f"tpcds_sf0_01_{table}", f'tpcds_sf0_01/{table}', f'{TMP_PATH}/tpcds_sf0_01_export/{table}.parquet') + +## TPC-DS SF1 +if (not os.path.isdir(BASE_PATH + '/tpcds_sf1')): + con = duckdb.connect() + con.query(f"call dsdgen(sf=1); EXPORT DATABASE '{TMP_PATH}/tpcds_sf1_export' (FORMAT parquet)") + for table in ["call_center","catalog_page","catalog_returns","catalog_sales","customer","customer_demographics","customer_address","date_dim","household_demographics","inventory","income_band","item","promotion","reason","ship_mode","store","store_returns","store_sales","time_dim","warehouse","web_page","web_returns","web_sales","web_site"]: + generate_test_data_pyspark(BASE_PATH,f"tpcds_sf1_{table}", f'tpcds_sf1/{table}', f'{TMP_PATH}/tpcds_sf1_export/{table}.parquet') + con.query(f"attach '{BASE_PATH + '/tpcds_sf1/duckdb.db'}' as duckdb_out") + for table in ["call_center","catalog_page","catalog_returns","catalog_sales","customer","customer_demographics","customer_address","date_dim","household_demographics","inventory","income_band","item","promotion","reason","ship_mode","store","store_returns","store_sales","time_dim","warehouse","web_page","web_returns","web_sales","web_site"]: + con.query(f"create table duckdb_out.{table} as from {table}") + +################################################ +### Partitioned test data +################################################ + +### Simple partitioned table +query = "CREATE table test_table AS SELECT i, i%2 as part from range(0,10) tbl(i);" +generate_test_data_delta_rs(BASE_PATH,"simple_partitioned", query, "part") + +### Partitioned table with + symbol +query = "CREATE table test_table AS SELECT i, (i%2)::VARCHAR || '+/' as part from range(0,10) tbl(i);" +generate_test_data_delta_rs(BASE_PATH,"simple_partitioned_with_url_encoding", query, "part") + +### Simple partitioned table +query = "CREATE table test_table AS SELECT i, i%20 as part from range(0,10000) tbl(i);" +generate_test_data_delta_rs(BASE_PATH,"simple_partitioned_large", query, "part") + +### Lineitem SF0.01 10 Partitions +query = "call dbgen(sf=0.01);" +query += "CREATE table test_table AS SELECT *, l_orderkey%10 as part from lineitem;" +generate_test_data_delta_rs(BASE_PATH,"lineitem_sf0_01_10part", query, "part") + +## Partitioned table with all types we can file skip on +for type in ["bool", "int", "tinyint", "smallint", "bigint", "float", "double", "varchar"]: + query = f"CREATE table test_table as select i::{type} as value1, (i)::{type} as value2, (i)::{type} as value3, i::{type} as part from range(0,5) tbl(i)" + generate_test_data_delta_rs(BASE_PATH,f"test_file_skipping/{type}", query, "part") + +## Partitioned table with all types we can file skip on +for type in ["int"]: + query = f"CREATE table test_table as select i::{type}+10 as value1, (i)::{type}+100 as value2, (i)::{type}+1000 as value3, i::{type} as part from range(0,5) tbl(i)" + generate_test_data_delta_rs(BASE_PATH,f"test_file_skipping_2/{type}", query, "part") + +################################################ +### Testing specific data types +################################################ + +## Simple table with a blob as a value +query = "create table test_table as SELECT encode('ABCDE') as blob, encode('ABCDE') as blob_part, 'ABCDE' as string UNION ALL SELECT encode('😈') as blob, encode('😈') as blob_part, '😈' as string" +generate_test_data_delta_rs(BASE_PATH,"simple_blob_table", query, "blob_part", add_golden_table=False) + +## Simple partitioned table with structs +query = "CREATE table test_table AS SELECT {'i':i, 'j':i+1} as value, i%2 as part from range(0,10) tbl(i);" +generate_test_data_delta_rs(BASE_PATH,"simple_partitioned_with_structs", query, "part") + +################################################ +### Deletion vectors +################################################ + +## Simple table with deletion vector +con = duckdb.connect() +con.query(f"COPY (SELECT i as id, ('val' || i::VARCHAR) as value FROM range(0,1000000) tbl(i))TO '{TMP_PATH}/simple_sf1_with_dv.parquet'") +generate_test_data_pyspark(BASE_PATH,'simple_sf1_with_dv', 'simple_sf1_with_dv', f'{TMP_PATH}/simple_sf1_with_dv.parquet', "id % 1000 = 0") + +## Lineitem SF0.01 with deletion vector +con = duckdb.connect() +con.query(f"call dbgen(sf=0.01); COPY (from lineitem) TO '{TMP_PATH}/modified_lineitem_sf0_01.parquet'") +generate_test_data_pyspark(BASE_PATH,'lineitem_sf0_01_with_dv', 'lineitem_sf0_01_with_dv', f'{TMP_PATH}/modified_lineitem_sf0_01.parquet', "l_shipdate = '1994-01-01'") + +## Lineitem SF1 with deletion vector +con = duckdb.connect() +con.query(f"call dbgen(sf=1); COPY (from lineitem) TO '{TMP_PATH}/modified_lineitem_sf1.parquet'") +generate_test_data_pyspark(BASE_PATH,'lineitem_sf1_with_dv', 'lineitem_sf1_with_dv', f'{TMP_PATH}/modified_lineitem_sf1.parquet', "l_shipdate = '1994-01-01'") + +################################################ +### Schema evolution +################################################ + +## Table with simple evolution: adding a column +base_query = 'select CAST(1 as INT) as a;' +queries = [ + 'ALTER TABLE evolution_simple ADD COLUMN b BIGINT;', + 'INSERT INTO evolution_simple VALUES (2, 2);' +] +generate_test_data_pyspark_by_queries(BASE_PATH,'evolution_simple', 'evolution_simple', base_query, queries) + +## Table that drops and re-adds a column with the same name for max confusion +base_query = "select 'value1' as a, 'value2' as b;" +queries = [ + "ALTER TABLE evolution_column_change DROP COLUMN b;", + "INSERT INTO evolution_column_change VALUES ('value3');", + "ALTER TABLE evolution_column_change ADD COLUMN b BIGINT;", + "INSERT INTO evolution_column_change VALUES ('value4', 5);", +] +generate_test_data_pyspark_by_queries(BASE_PATH,'evolution_column_change', 'evolution_column_change', base_query, queries) + +## CREATE table that has all type widenings from the spec +base_query = "select CAST(42 AS BYTE) as integer, CAST(42.42 AS FLOAT) as float, CAST(42 AS INT) as int_to_double, CAST('2042-01-01' AS DATE) as date, CAST('42.42' as DECIMAL(4,2)) as decimal, CAST(42 AS INT) as int_to_decimal, CAST(42 AS BIGINT) as long_to_decimal" +queries = [ + "ALTER TABLE evolution_type_widening ALTER COLUMN integer TYPE SMALLINT;", + # TODO: add these once pyspark supports it + # "ALTER TABLE evolution_type_widening ALTER COLUMN float TYPE DOUBLE;", + # "ALTER TABLE evolution_type_widening ALTER COLUMN int_to_double TYPE DOUBLE;", + # "ALTER TABLE evolution_type_widening ALTER COLUMN date TYPE TIMESTAMP_NTZ;", + # "ALTER TABLE evolution_type_widening ALTER COLUMN decimal TYPE DECIMAL(5,2);", + # "ALTER TABLE evolution_type_widening ALTER COLUMN int_to_decimal TYPE DECIMAL(5,2);", + # "ALTER TABLE evolution_type_widening ALTER COLUMN long_to_decimal TYPE DECIMAL(5,2);", + "INSERT INTO evolution_type_widening VALUES (42, 42.42, 42, '2042-01-01', 42.42, 42, 42);", +] +generate_test_data_pyspark_by_queries(BASE_PATH,'evolution_type_widening', 'evolution_type_widening', base_query, queries) + +## CREATE table that has struct widening +base_query = "select named_struct('struct_field_a', 'value1', 'struct_field_b', 'value2') as top_level_column;" +queries = [ + "ALTER TABLE evolution_struct_field_modification ADD COLUMNS (top_level_column.struct_field_c STRING AFTER struct_field_b)", + "INSERT INTO evolution_struct_field_modification VALUES (named_struct('struct_field_a', 'value3', 'struct_field_b', 'value4', 'struct_field_c', 'value5'));", +] +generate_test_data_pyspark_by_queries(BASE_PATH,'evolution_struct_field_modification', 'evolution_struct_field_modification', base_query, queries) + +## CREATE table that has nested struct widening +base_query = "select named_struct('top_level_struct', named_struct('struct_field_a', 'value1', 'struct_field_b', 'value2')) as top_level_column;" +queries = [ + "ALTER TABLE evolution_struct_field_modification_nested ADD COLUMNS (top_level_column.top_level_struct.struct_field_c STRING AFTER struct_field_b)", + "INSERT INTO evolution_struct_field_modification_nested VALUES (named_struct('top_level_struct', named_struct('struct_field_a', 'value3', 'struct_field_b', 'value4', 'struct_field_c', 'value5')));", +] +generate_test_data_pyspark_by_queries(BASE_PATH,'evolution_struct_field_modification_nested', 'evolution_struct_field_modification_nested', base_query, queries) diff --git a/scripts/data_generator/pyspark_generator/__init__.py b/scripts/data_generator/pyspark_generator/__init__.py new file mode 100644 index 0000000..2442f52 --- /dev/null +++ b/scripts/data_generator/pyspark_generator/__init__.py @@ -0,0 +1,112 @@ +from deltalake import DeltaTable, write_deltalake +from pyspark.sql import SparkSession +from delta import * +from pyspark.sql.functions import * +import duckdb +import pandas as pd +import os +import shutil +import math +import glob + +def generate_test_data_pyspark(base_path, name, current_path, input_path, delete_predicate = False): + """ + generate_test_data_pyspark generates some test data using pyspark and duckdb + + :param current_path: the test data path + :param input_path: the path to an input parquet file + :return: describe what it returns + """ + + full_path = base_path + '/' + current_path + if (os.path.isdir(full_path)): + return + + try: + ## SPARK SESSION + builder = SparkSession.builder.appName("MyApp") \ + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ + .config("spark.driver.memory", "8g") \ + .config('spark.driver.host','127.0.0.1') + + spark = configure_spark_with_delta_pip(builder).getOrCreate() + + ## CONFIG + delta_table_path = base_path + '/' + current_path + '/delta_lake' + parquet_reference_path = base_path + '/' + current_path + '/parquet' + + ## CREATE DIRS + os.makedirs(delta_table_path, exist_ok=True) + os.makedirs(parquet_reference_path, exist_ok=True) + + ## DATA GENERATION + # df = spark.read.parquet(input_path) + # df.write.format("delta").mode("overwrite").save(delta_table_path) + spark.sql(f"CREATE TABLE test_table_{name} USING delta LOCATION '{delta_table_path}' AS SELECT * FROM parquet.`{input_path}`") + + ## CREATE + ## CONFIGURE USAGE OF DELETION VECTORS + if (delete_predicate): + spark.sql(f"ALTER TABLE test_table_{name} SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);") + + ## ADDING DELETES + deltaTable = DeltaTable.forPath(spark, delta_table_path) + if delete_predicate: + deltaTable.delete(delete_predicate) + + ## WRITING THE PARQUET FILES + df = spark.table(f'test_table_{name}') + df.write.parquet(parquet_reference_path, mode='overwrite') + + except: + if (os.path.isdir(full_path)): + shutil.rmtree(full_path) + raise + +def generate_test_data_pyspark_by_queries(base_path, name, current_path, base_query, queries): + """ + schema_evolve_pyspark_deltatable generates some test data using pyspark and duckdb + + :param current_path: the test data path + :param input_path: the path to an input parquet file + :return: describe what it returns + """ + + full_path = base_path + '/' + current_path + if (os.path.isdir(full_path)): + return + + try: + ## SPARK SESSION + builder = SparkSession.builder.appName("MyApp") \ + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ + .config("spark.driver.memory", "8g") \ + .config('spark.driver.host','127.0.0.1') + + spark = configure_spark_with_delta_pip(builder).getOrCreate() + + ## CONFIG + delta_table_path = full_path + '/delta_lake' + + ## CREATE DIRS + os.makedirs(delta_table_path, exist_ok=True) + + ## DATA GENERATION + # df = spark.read.parquet(input_path) + # df.write.format("delta").mode("overwrite").save(delta_table_path) + spark.sql(f"CREATE TABLE {name} USING delta LOCATION '{delta_table_path}' AS {base_query}") + + spark.sql(f"ALTER TABLE {name} SET TBLPROPERTIES ('delta.minReaderVersion' = '2', 'delta.minWriterVersion' = '5', 'delta.columnMapping.mode' = 'name', 'delta.enableTypeWidening' = 'true');") + + for query in queries: + spark.sql(query) + + except: + if (os.path.isdir(full_path)): + shutil.rmtree(full_path) + raise + + +__all__ = ["generate_test_data_pyspark_by_queries", "generate_test_data_pyspark"] \ No newline at end of file diff --git a/scripts/generate_test_data.py b/scripts/generate_test_data.py deleted file mode 100644 index fa98990..0000000 --- a/scripts/generate_test_data.py +++ /dev/null @@ -1,244 +0,0 @@ -from deltalake import DeltaTable, write_deltalake -from pyspark.sql import SparkSession -from delta import * -from pyspark.sql.functions import * -import duckdb -import pandas as pd -import os -import shutil -import math -import glob - -BASE_PATH = os.path.dirname(os.path.realpath(__file__)) + "/../data/generated" -TMP_PATH = '/tmp' - -def delete_old_files(): - if (os.path.isdir(BASE_PATH)): - shutil.rmtree(BASE_PATH) - -def generate_test_data_delta_rs_multi(path, init, tables, splits = 1): - """ - generate_test_data_delta_rs generates some test data using delta-rs and duckdb - - :param path: the test data path (prefixed with BASE_PATH) - :param init: a duckdb query initializes the duckdb tables that will be written - :param tables: list of dicts containing the fields: name, query, (optionally) part_column - :return: describe what it returns - """ - generated_path = f"{BASE_PATH}/{path}" - - if (os.path.isdir(generated_path)): - return - - os.makedirs(f"{generated_path}") - - # First we write a DuckDB file TODO: this should go in N appends as well? - con = duckdb.connect(f"{generated_path}/duckdb.db") - - con.sql(init) - - # Then we write the parquet files - for table in tables: - total_count = con.sql(f"select count(*) from ({table['query']})").fetchall()[0][0] - # At least 1 tuple per file - if total_count < splits: - splits = total_count - tuples_per_file = total_count // splits - remainder = total_count % splits - - file_no = 0 - write_from = 0 - while file_no < splits: - os.makedirs(f"{generated_path}/{table['name']}/parquet", exist_ok=True) - # Write DuckDB's reference data - write_to = write_from + tuples_per_file + (1 if file_no < remainder else 0) - con.sql(f"COPY ({table['query']} where rowid >= {write_from} and rowid < {write_to}) to '{generated_path}/{table['name']}/parquet/data_{file_no}.parquet' (FORMAT parquet)") - file_no += 1 - write_from = write_to - - for table in tables: - con = duckdb.connect(f"{generated_path}/duckdb.db") - file_list = list(glob.glob(f"{generated_path}/{table['name']}/parquet/*.parquet")) - file_list = sorted(file_list) - for file in file_list: - test_table_df = con.sql(f'from "{file}"').arrow() - os.makedirs(f"{generated_path}/{table['name']}/delta_lake", exist_ok=True) - write_deltalake(f"{generated_path}/{table['name']}/delta_lake", test_table_df, mode="append") - -def generate_test_data_delta_rs(path, query, part_column=False, add_golden_table=True): - """ - generate_test_data_delta_rs generates some test data using delta-rs and duckdb - - :param path: the test data path (prefixed with BASE_PATH) - :param query: a duckdb query that produces a table called 'test_table' - :param part_column: Optionally the name of the column to partition by - :return: describe what it returns - """ - generated_path = f"{BASE_PATH}/{path}" - - if (os.path.isdir(generated_path)): - return - - con = duckdb.connect() - - con.sql(query) - - # Write delta table data - test_table_df = con.sql("FROM test_table;").df() - if (part_column): - write_deltalake(f"{generated_path}/delta_lake", test_table_df, partition_by=[part_column]) - else: - write_deltalake(f"{generated_path}/delta_lake", test_table_df) - - if add_golden_table: - # Write DuckDB's reference data - os.mkdir(f'{generated_path}/duckdb') - if (part_column): - con.sql(f"COPY test_table to '{generated_path}/duckdb' (FORMAT parquet, PARTITION_BY {part_column})") - else: - con.sql(f"COPY test_table to '{generated_path}/duckdb/data.parquet' (FORMAT parquet)") - -def generate_test_data_pyspark(name, current_path, input_path, delete_predicate = False): - """ - generate_test_data_pyspark generates some test data using pyspark and duckdb - - :param current_path: the test data path - :param input_path: the path to an input parquet file - :return: describe what it returns - """ - - if (os.path.isdir(BASE_PATH + '/' + current_path)): - return - - ## SPARK SESSION - builder = SparkSession.builder.appName("MyApp") \ - .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ - .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ - .config("spark.driver.memory", "8g")\ - .config('spark.driver.host','127.0.0.1') - - spark = configure_spark_with_delta_pip(builder).getOrCreate() - - ## CONFIG - delta_table_path = BASE_PATH + '/' + current_path + '/delta_lake' - parquet_reference_path = BASE_PATH + '/' + current_path + '/parquet' - - ## CREATE DIRS - os.makedirs(delta_table_path, exist_ok=True) - os.makedirs(parquet_reference_path, exist_ok=True) - - ## DATA GENERATION - # df = spark.read.parquet(input_path) - # df.write.format("delta").mode("overwrite").save(delta_table_path) - spark.sql(f"CREATE TABLE test_table_{name} USING delta LOCATION '{delta_table_path}' AS SELECT * FROM parquet.`{input_path}`") - - ## CREATE - ## CONFIGURE USAGE OF DELETION VECTORS - if (delete_predicate): - spark.sql(f"ALTER TABLE test_table_{name} SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);") - - ## ADDING DELETES - deltaTable = DeltaTable.forPath(spark, delta_table_path) - if delete_predicate: - deltaTable.delete(delete_predicate) - - ## WRITING THE PARQUET FILES - df = spark.table(f'test_table_{name}') - df.write.parquet(parquet_reference_path, mode='overwrite') - -# TO CLEAN, uncomment -# delete_old_files() - -### TPCH SF1 -init = "call dbgen(sf=0.01);" -tables = ["customer","lineitem","nation","orders","part","partsupp","region","supplier"] -queries = [f"from {x}" for x in tables] -tables = [{'name': x[0], 'query':x[1]} for x in zip(tables,queries)] -generate_test_data_delta_rs_multi("delta_rs_tpch_sf0_01", init, tables) - -### Simple partitioned table -query = "CREATE table test_table AS SELECT i, i%2 as part from range(0,10) tbl(i);" -generate_test_data_delta_rs("simple_partitioned", query, "part") - -### Partitioned table with + symbol -query = "CREATE table test_table AS SELECT i, (i%2)::VARCHAR || '+/' as part from range(0,10) tbl(i);" -generate_test_data_delta_rs("simple_partitioned_with_url_encoding", query, "part") - -### Simple partitioned table -query = "CREATE table test_table AS SELECT i, i%20 as part from range(0,10000) tbl(i);" -generate_test_data_delta_rs("simple_partitioned_large", query, "part") - -### Lineitem SF0.01 No partitions -query = "call dbgen(sf=0.01);" -query += "CREATE table test_table AS SELECT * as part from lineitem;" -generate_test_data_delta_rs("lineitem_sf0_01", query) - -### Lineitem SF0.01 10 Partitions -query = "call dbgen(sf=0.01);" -query += "CREATE table test_table AS SELECT *, l_orderkey%10 as part from lineitem;" -generate_test_data_delta_rs("lineitem_sf0_01_10part", query, "part") - -## Simple table with a blob as a value -query = "create table test_table as SELECT encode('ABCDE') as blob, encode('ABCDE') as blob_part, 'ABCDE' as string UNION ALL SELECT encode('😈') as blob, encode('😈') as blob_part, '😈' as string" -generate_test_data_delta_rs("simple_blob_table", query, "blob_part", add_golden_table=False) - -## Simple partitioned table with structs -query = "CREATE table test_table AS SELECT {'i':i, 'j':i+1} as value, i%2 as part from range(0,10) tbl(i);" -generate_test_data_delta_rs("simple_partitioned_with_structs", query, "part") - -## Partitioned table with all types we can file skip on -for type in ["bool", "int", "tinyint", "smallint", "bigint", "float", "double", "varchar"]: - query = f"CREATE table test_table as select i::{type} as value1, (i)::{type} as value2, (i)::{type} as value3, i::{type} as part from range(0,5) tbl(i)" - generate_test_data_delta_rs(f"test_file_skipping/{type}", query, "part") - -## Partitioned table with all types we can file skip on -for type in ["int"]: - query = f"CREATE table test_table as select i::{type}+10 as value1, (i)::{type}+100 as value2, (i)::{type}+1000 as value3, i::{type} as part from range(0,5) tbl(i)" - generate_test_data_delta_rs(f"test_file_skipping_2/{type}", query, "part") - -## Simple table with deletion vector -con = duckdb.connect() -con.query(f"COPY (SELECT i as id, ('val' || i::VARCHAR) as value FROM range(0,1000000) tbl(i))TO '{TMP_PATH}/simple_sf1_with_dv.parquet'") -generate_test_data_pyspark('simple_sf1_with_dv', 'simple_sf1_with_dv', f'{TMP_PATH}/simple_sf1_with_dv.parquet', "id % 1000 = 0") - -## Lineitem SF0.01 with deletion vector -con = duckdb.connect() -con.query(f"call dbgen(sf=0.01); COPY (from lineitem) TO '{TMP_PATH}/modified_lineitem_sf0_01.parquet'") -generate_test_data_pyspark('lineitem_sf0_01_with_dv', 'lineitem_sf0_01_with_dv', f'{TMP_PATH}/modified_lineitem_sf0_01.parquet', "l_shipdate = '1994-01-01'") - -## Lineitem SF1 with deletion vector -con = duckdb.connect() -con.query(f"call dbgen(sf=1); COPY (from lineitem) TO '{TMP_PATH}/modified_lineitem_sf1.parquet'") -generate_test_data_pyspark('lineitem_sf1_with_dv', 'lineitem_sf1_with_dv', f'{TMP_PATH}/modified_lineitem_sf1.parquet', "l_shipdate = '1994-01-01'") - -## TPCH SF0.01 full dataset -con = duckdb.connect() -con.query(f"call dbgen(sf=0.01); EXPORT DATABASE '{TMP_PATH}/tpch_sf0_01_export' (FORMAT parquet)") -for table in ["customer","lineitem","nation","orders","part","partsupp","region","supplier"]: - generate_test_data_pyspark(f"tpch_sf0_01_{table}", f'tpch_sf0_01/{table}', f'{TMP_PATH}/tpch_sf0_01_export/{table}.parquet') - -## TPCDS SF0.01 full dataset -con = duckdb.connect() -con.query(f"call dsdgen(sf=0.01); EXPORT DATABASE '{TMP_PATH}/tpcds_sf0_01_export' (FORMAT parquet)") -for table in ["call_center","catalog_page","catalog_returns","catalog_sales","customer","customer_demographics","customer_address","date_dim","household_demographics","inventory","income_band","item","promotion","reason","ship_mode","store","store_returns","store_sales","time_dim","warehouse","web_page","web_returns","web_sales","web_site"]: - generate_test_data_pyspark(f"tpcds_sf0_01_{table}", f'tpcds_sf0_01/{table}', f'{TMP_PATH}/tpcds_sf0_01_export/{table}.parquet') - -## TPCH SF1 full dataset -if (not os.path.isdir(BASE_PATH + '/tpch_sf1')): - con = duckdb.connect() - con.query(f"call dbgen(sf=1); EXPORT DATABASE '{TMP_PATH}/tpch_sf1_export' (FORMAT parquet)") - for table in ["customer","lineitem","nation","orders","part","partsupp","region","supplier"]: - generate_test_data_pyspark(f"tpch_sf1_{table}", f'tpch_sf1/{table}', f'{TMP_PATH}/tpch_sf1_export/{table}.parquet') - con.query(f"attach '{BASE_PATH + '/tpch_sf1/duckdb.db'}' as duckdb_out") - for table in ["customer","lineitem","nation","orders","part","partsupp","region","supplier"]: - con.query(f"create table duckdb_out.{table} as from {table}") - -## TPCDS SF1 full dataset -if (not os.path.isdir(BASE_PATH + '/tpcds_sf1')): - con = duckdb.connect() - con.query(f"call dsdgen(sf=1); EXPORT DATABASE '{TMP_PATH}/tpcds_sf1_export' (FORMAT parquet)") - for table in ["call_center","catalog_page","catalog_returns","catalog_sales","customer","customer_demographics","customer_address","date_dim","household_demographics","inventory","income_band","item","promotion","reason","ship_mode","store","store_returns","store_sales","time_dim","warehouse","web_page","web_returns","web_sales","web_site"]: - generate_test_data_pyspark(f"tpcds_sf1_{table}", f'tpcds_sf1/{table}', f'{TMP_PATH}/tpcds_sf1_export/{table}.parquet') - con.query(f"attach '{BASE_PATH + '/tpcds_sf1/duckdb.db'}' as duckdb_out") - for table in ["call_center","catalog_page","catalog_returns","catalog_sales","customer","customer_demographics","customer_address","date_dim","household_demographics","inventory","income_band","item","promotion","reason","ship_mode","store","store_returns","store_sales","time_dim","warehouse","web_page","web_returns","web_sales","web_site"]: - con.query(f"create table duckdb_out.{table} as from {table}") \ No newline at end of file diff --git a/src/delta_extension.cpp b/src/delta_extension.cpp index 0f8a64a..40129c1 100644 --- a/src/delta_extension.cpp +++ b/src/delta_extension.cpp @@ -4,6 +4,7 @@ #include "delta_utils.hpp" #include "delta_functions.hpp" +#include "delta_log_types.hpp" #include "delta_macros.hpp" #include "storage/delta_catalog.hpp" #include "storage/delta_transaction_manager.hpp" @@ -83,6 +84,8 @@ static void LoadInternal(DatabaseInstance &instance) { DeltaMacros::RegisterMacros(instance); + DeltaLogTypes::RegisterLogTypes(instance); + LoggerCallback::Initialize(instance); } diff --git a/src/delta_log_types.cpp b/src/delta_log_types.cpp new file mode 100644 index 0000000..7de1cff --- /dev/null +++ b/src/delta_log_types.cpp @@ -0,0 +1,36 @@ +#include "delta_log_types.hpp" +#include "delta_utils.hpp" + +namespace duckdb { + +constexpr LogLevel DeltaKernelLogType::LEVEL; + +DeltaKernelLogType::DeltaKernelLogType() : LogType(NAME, LEVEL, GetLogType()) { +} + +LogicalType DeltaKernelLogType::GetLogType() { + child_list_t request_child_list = {{"target", LogicalType::VARCHAR}, + {"message", LogicalType::VARCHAR}, + {"file", LogicalType::VARCHAR}, + {"line", LogicalType::UINTEGER}}; + return LogicalType::STRUCT(request_child_list); +} + +string DeltaKernelLogType::ConstructLogMessage(ffi::Event event) { + Value file, line; + + auto file_string = KernelUtils::FromDeltaString(event.file); + if (!file_string.empty()) { + file = Value(KernelUtils::FromDeltaString(event.file)); + line = Value::UINTEGER(event.line); + } + + child_list_t message_child_list = {{"target", Value(KernelUtils::FromDeltaString(event.target))}, + {"message", Value(KernelUtils::FromDeltaString(event.message))}, + {"file", Value(file)}, + {"line", Value(line)}}; + + return Value::STRUCT(message_child_list).ToString(); +} + +}; // namespace duckdb diff --git a/src/delta_macros.cpp b/src/delta_macros.cpp index 1f06933..5c3a8e8 100644 --- a/src/delta_macros.cpp +++ b/src/delta_macros.cpp @@ -9,6 +9,7 @@ namespace duckdb { +// TODO: use Structured logging for this // Macro to fetch the pushed down filters for the most recent query static constexpr auto DELTA_FILTER_PUSHDOWN_MACRO = R"( SELECT @@ -24,7 +25,7 @@ JOIN duckdb_logs as l2 ON l1.transaction_id = l2.transaction_id WHERE l2.type='delta.FilterPushdown' AND - l1.type = 'duckdb.ClientContext.BeginQuery' + l1.type = 'QueryLog' ORDER BY l1.transaction_id )"; diff --git a/src/delta_utils.cpp b/src/delta_utils.cpp index aa2f48e..b067e93 100644 --- a/src/delta_utils.cpp +++ b/src/delta_utils.cpp @@ -1,5 +1,8 @@ #include "delta_utils.hpp" +#include "delta_log_types.hpp" +#include "duckdb/common/operator/decimal_cast_operators.hpp" + #include "duckdb.hpp" #include "duckdb/common/types/decimal.hpp" #include "duckdb/main/extension_util.hpp" @@ -28,9 +31,7 @@ void ExpressionVisitor::VisitComparisonExpression(void *state, uintptr_t sibling state_cast->AppendToList(sibling_list_id, std::move(expression)); } -unique_ptr>> -ExpressionVisitor::VisitKernelExpression(const ffi::Handle *expression) { - ExpressionVisitor state; +ffi::EngineExpressionVisitor ExpressionVisitor::CreateVisitor(ExpressionVisitor &state) { ffi::EngineExpressionVisitor visitor; visitor.data = &state; @@ -86,6 +87,28 @@ ExpressionVisitor::VisitKernelExpression(const ffi::Handle>> +ExpressionVisitor::VisitKernelExpression(const ffi::Expression *expression) { + ExpressionVisitor state; + auto visitor = CreateVisitor(state); + + uintptr_t result = ffi::visit_expression_ref(expression, &visitor); + + if (state.error.HasError()) { + state.error.Throw(); + } + + return state.TakeFieldList(result); +} + +unique_ptr>> +ExpressionVisitor::VisitKernelExpression(const ffi::Handle *expression) { + ExpressionVisitor state; + auto visitor = CreateVisitor(state); + uintptr_t result = ffi::visit_expression(expression, &visitor); if (state.error.HasError()) { @@ -254,14 +277,33 @@ void ExpressionVisitor::VisitIsNullExpression(void *state, uintptr_t sibling_lis state_cast->AppendToList(sibling_list_id, std::move(expression)); } -// FIXME: this is not 100% correct yet: value_ms is ignored -void ExpressionVisitor::VisitDecimalLiteral(void *state, uintptr_t sibling_list_id, uint64_t value_ms, - uint64_t value_ls, uint8_t precision, uint8_t scale) { +// This function is a workaround for the fact that duckdb disallows using hugeints to store decimals with precision < 18 +// whereas kernel does allow this. +static int64_t GetTruncatedDecimalValue(int64_t value_ms, uint64_t value_ls) { + // First trim msb from lower half + auto new_value_ls = value_ls << 1; + new_value_ls = new_value_ls >> 1; + + // Now cast the lower half to signed + auto lower_cast = UnsafeNumericCast(value_ls); + + // If value_ms was negative, we need to invert + if (value_ms < 0) { + lower_cast = -lower_cast; + } + return lower_cast; +} + +void ExpressionVisitor::VisitDecimalLiteral(void *state, uintptr_t sibling_list_id, int64_t value_ms, uint64_t value_ls, + uint8_t precision, uint8_t scale) { try { - if (precision >= Decimal::MAX_WIDTH_INT64 || value_ls > (uint64_t)NumericLimits::Maximum()) { - throw NotImplementedException("ExpressionVisitor::VisitDecimalLiteral HugeInt decimals"); + Value decimal_value; + if (precision < Decimal::MAX_WIDTH_INT64) { + decimal_value = Value::DECIMAL(GetTruncatedDecimalValue(value_ms, value_ls), precision, scale); + } else { + decimal_value = Value::DECIMAL({value_ms, value_ls}, precision, scale); } - auto expression = make_uniq(Value::DECIMAL(42, 18, 10)); + auto expression = make_uniq(decimal_value); static_cast(state)->AppendToList(sibling_list_id, std::move(expression)); } catch (Exception &e) { static_cast(state)->error = ErrorData(e); @@ -269,12 +311,27 @@ void ExpressionVisitor::VisitDecimalLiteral(void *state, uintptr_t sibling_list_ } void ExpressionVisitor::VisitColumnExpression(void *state, uintptr_t sibling_list_id, ffi::KernelStringSlice name) { - auto expression = make_uniq(string(name.ptr, name.len)); + auto col_ref_string = string(name.ptr, name.len); + + // Delta ColRefs are sometimes backtick-ed + if (col_ref_string[0] == '`' && col_ref_string[col_ref_string.size() - 1] == '`') { + col_ref_string = col_ref_string.substr(1, col_ref_string.size() - 2); + } + + auto expression = make_uniq(col_ref_string); static_cast(state)->AppendToList(sibling_list_id, std::move(expression)); } + void ExpressionVisitor::VisitStructExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id) { - static_cast(state)->AppendToList(sibling_list_id, - std::move(make_uniq(Value(42)))); + auto state_cast = static_cast(state); + + auto children_values = state_cast->TakeFieldList(child_list_id); + if (!children_values) { + return; + } + + unique_ptr expression = make_uniq("struct_pack", std::move(*children_values)); + state_cast->AppendToList(sibling_list_id, std::move(expression)); } uintptr_t ExpressionVisitor::MakeFieldList(ExpressionVisitor *state, uintptr_t capacity_hint) { @@ -311,8 +368,7 @@ unique_ptr ExpressionVisitor::TakeFieldList(uintpt return rval; } -unique_ptr SchemaVisitor::VisitSnapshotSchema(ffi::SharedSnapshot *snapshot) { - SchemaVisitor state; +ffi::EngineSchemaVisitor SchemaVisitor::CreateSchemaVisitor(SchemaVisitor &state) { ffi::EngineSchemaVisitor visitor; visitor.data = &state; @@ -342,13 +398,46 @@ unique_ptr SchemaVisitor::VisitSnapshotSchema(ffi::Sha visitor.visit_timestamp = VisitSimpleType(); visitor.visit_timestamp_ntz = VisitSimpleType(); + return visitor; +} + +unique_ptr SchemaVisitor::VisitSnapshotSchema(ffi::SharedSnapshot *snapshot) { + SchemaVisitor state; + auto visitor = CreateSchemaVisitor(state); + auto schema = logical_schema(snapshot); uintptr_t result = visit_schema(schema, &visitor); free_schema(schema); + if (state.error.HasError()) { + state.error.Throw(); + } + return state.TakeFieldList(result); } +unique_ptr SchemaVisitor::VisitSnapshotGlobalReadSchema(ffi::SharedGlobalScanState *state, + bool logical) { + SchemaVisitor visitor_state; + auto visitor = CreateSchemaVisitor(visitor_state); + + ffi::Handle schema; + if (logical) { + schema = ffi::get_global_logical_schema(state); + } else { + schema = ffi::get_global_read_schema(state); + } + + uintptr_t result = visit_schema(schema, &visitor); + free_schema(schema); + + if (visitor_state.error.HasError()) { + visitor_state.error.Throw(); + } + + return visitor_state.TakeFieldList(result); +} + void SchemaVisitor::VisitDecimal(SchemaVisitor *state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, bool is_nullable, const ffi::CStringMap *metadata, uint8_t precision, uint8_t scale) { state->AppendToList(sibling_list_id, name, LogicalType::DECIMAL(precision, scale)); @@ -393,7 +482,8 @@ uintptr_t SchemaVisitor::MakeFieldListImpl(uintptr_t capacity_hint) { void SchemaVisitor::AppendToList(uintptr_t id, ffi::KernelStringSlice name, LogicalType &&child) { auto it = inflight_lists.find(id); if (it == inflight_lists.end()) { - throw InternalException("Unhandled error in SchemaVisitor::AppendToList child"); + error = ErrorData(ExceptionType::INTERNAL, "Unhandled error in SchemaVisitor::AppendToList"); + return; } it->second->emplace_back(std::make_pair(string(name.ptr, name.len), std::move(child))); } @@ -401,7 +491,8 @@ void SchemaVisitor::AppendToList(uintptr_t id, ffi::KernelStringSlice name, Logi unique_ptr SchemaVisitor::TakeFieldList(uintptr_t id) { auto it = inflight_lists.find(id); if (it == inflight_lists.end()) { - throw InternalException("Unhandled error in SchemaVisitor::TakeFieldList"); + error = ErrorData(ExceptionType::INTERNAL, "Unhandled error in SchemaVisitor::TakeFieldList"); + return make_uniq(); } auto rval = std::move(it->second); inflight_lists.erase(it); @@ -469,7 +560,7 @@ string DuckDBEngineError::KernelErrorEnumToString(ffi::KernelError err) { return StringUtil::Format("EnumOutOfRange (enum val out of range: %d)", (int)err); } -void DuckDBEngineError::Throw(string from_where) { +string DuckDBEngineError::IntoString() { // Make copies before calling delete this auto etype_copy = etype; auto message_copy = error_message; @@ -477,9 +568,7 @@ void DuckDBEngineError::Throw(string from_where) { // Consume error by calling delete this (remember this error is created by // kernel using AllocateError) delete this; - throw IOException("Hit DeltaKernel FFI error (from: %s): Hit error: %u (%s) " - "with message (%s)", - from_where.c_str(), etype_copy, KernelErrorEnumToString(etype_copy), message_copy); + return StringUtil::Format("DeltKernel %s (%u): %s", KernelErrorEnumToString(etype_copy), etype_copy, message_copy); } ffi::KernelStringSlice KernelUtils::ToDeltaString(const string &str) { @@ -496,6 +585,26 @@ vector KernelUtils::FromDeltaBoolSlice(const struct ffi::KernelBoolSlice s return result; } +vector> & +KernelUtils::UnpackTopLevelStruct(const vector> &parsed_expression) { + if (parsed_expression.size() != 1) { + throw IOException("Unexpected size of transformation expression returned by delta kernel: %d", + parsed_expression.size()); + } + + const auto &root_expression = parsed_expression.get(0); + if (root_expression->type != ExpressionType::FUNCTION) { + throw IOException("Unexpected type of root expression returned by delta kernel: %d", root_expression->type); + } + + if (root_expression->Cast().function_name != "struct_pack") { + throw IOException("Unexpected function of root expression returned by delta kernel: %s", + root_expression->Cast().function_name); + } + + return root_expression->Cast().children; +} + PredicateVisitor::PredicateVisitor(const vector &column_names, optional_ptr filters) { predicate = this; visitor = (uintptr_t(*)(void *, ffi::KernelExpressionVisitorState *)) & VisitPredicate; @@ -542,7 +651,13 @@ uintptr_t PredicateVisitor::VisitConstantFilter(const string &col_name, const Co ffi::KernelExpressionVisitorState *state) { auto maybe_left = ffi::visit_expression_column(state, KernelUtils::ToDeltaString(col_name), DuckDBEngineError::AllocateError); - uintptr_t left = KernelUtils::UnpackResult(maybe_left, "VisitConstantFilter failed to visit_expression_column"); + + uintptr_t left; + auto left_res = KernelUtils::TryUnpackResult(maybe_left, left); + if (left_res.HasError()) { + error_data = left_res; + return ~0; + } uintptr_t right = ~0; auto &value = filter.constant; @@ -574,7 +689,11 @@ uintptr_t PredicateVisitor::VisitConstantFilter(const string &col_name, const Co auto str = StringValue::Get(value); auto maybe_right = ffi::visit_expression_literal_string(state, KernelUtils::ToDeltaString(str), DuckDBEngineError::AllocateError); - right = KernelUtils::UnpackResult(maybe_right, "VisitConstantFilter failed to visit_expression_literal_string"); + auto right_res = KernelUtils::TryUnpackResult(maybe_right, right); + if (right_res.HasError()) { + error_data = right_res; + return ~0; + } break; } default: @@ -619,7 +738,13 @@ uintptr_t PredicateVisitor::VisitAndFilter(const string &col_name, const Conjunc uintptr_t PredicateVisitor::VisitIsNull(const string &col_name, ffi::KernelExpressionVisitorState *state) { auto maybe_inner = ffi::visit_expression_column(state, KernelUtils::ToDeltaString(col_name), DuckDBEngineError::AllocateError); - uintptr_t inner = KernelUtils::UnpackResult(maybe_inner, "VisitIsNull failed to visit_expression_column"); + uintptr_t inner; + + auto err = KernelUtils::TryUnpackResult(maybe_inner, inner); + if (err.HasError()) { + error_data = err; + return ~0; + } return ffi::visit_expression_is_null(state, inner); } @@ -651,25 +776,15 @@ void LoggerCallback::Initialize(DatabaseInstance &db_p) { } } -static string ConvertLogMessage(ffi::Event event) { - auto log_type = KernelUtils::FromDeltaString(event.target); - auto message = KernelUtils::FromDeltaString(event.message); - auto file = KernelUtils::FromDeltaString(event.file); - string constructed_log_message; - if (!file.empty()) { - constructed_log_message = StringUtil::Format("[%s] %s@%u : %s ", log_type, file, event.line, message); - } else { - constructed_log_message = message; - } - - return constructed_log_message; -} void LoggerCallback::CallbackEvent(ffi::Event event) { auto &instance = GetInstance(); auto db_locked = instance.db.lock(); if (db_locked) { - auto transformed_log_level = GetDuckDBLogLevel(event.level); - DUCKDB_LOG(*db_locked, "delta.Kernel", transformed_log_level, ConvertLogMessage(event)); + // Note: this slightly offbeat invocation of logging API is because we are passing through the log level instead + // of using the same + // log level for every message of this log type. We may + DUCKDB_LOG_INTERNAL(*db_locked, DeltaKernelLogType::NAME, GetDuckDBLogLevel(event.level), + DeltaKernelLogType::ConstructLogMessage(event)); } } diff --git a/src/functions/delta_scan/delta_multi_file_list.cpp b/src/functions/delta_scan/delta_multi_file_list.cpp index 02864b8..ca3d385 100644 --- a/src/functions/delta_scan/delta_multi_file_list.cpp +++ b/src/functions/delta_scan/delta_multi_file_list.cpp @@ -18,10 +18,6 @@ namespace duckdb { -static void *allocate_string(const struct ffi::KernelStringSlice slice) { - return new string(slice.ptr, slice.len); -} - static string url_decode(string input) { string result; result.reserve(input.size()); @@ -71,7 +67,13 @@ static ffi::EngineBuilder *CreateBuilder(ClientContext &context, const string &p !StringUtil::StartsWith(path, "abfs://") && !StringUtil::StartsWith(path, "abfss://")) { auto interface_builder_res = ffi::get_engine_builder(KernelUtils::ToDeltaString(path), DuckDBEngineError::AllocateError); - return KernelUtils::UnpackResult(interface_builder_res, "get_engine_interface_builder for path " + path); + + ffi::EngineBuilder *return_value; + auto res = KernelUtils::TryUnpackResult(interface_builder_res, return_value); + if (res.HasError()) { + res.Throw(); + } + return return_value; } string bucket; @@ -156,7 +158,11 @@ static ffi::EngineBuilder *CreateBuilder(ClientContext &context, const string &p auto interface_builder_res = ffi::get_engine_builder(KernelUtils::ToDeltaString(cleaned_path), DuckDBEngineError::AllocateError); - builder = KernelUtils::UnpackResult(interface_builder_res, "get_engine_interface_builder for path " + cleaned_path); + + auto res = KernelUtils::TryUnpackResult(interface_builder_res, builder); + if (res.HasError()) { + res.Throw(); + } // For S3 or Azure paths we need to trim the url, set the container, and fetch a potential secret auto &secret_manager = SecretManager::Get(context); @@ -335,9 +341,19 @@ static void KernelPartitionStringVisitor(ffi::NullableCvoid engine_context, ffi: data->partitions.push_back(KernelUtils::FromDeltaString(slice)); } +static Value GetPartitionValueFromExpression(const vector> &parsed_expression, + idx_t index) { + auto &column_expressions = KernelUtils::UnpackTopLevelStruct(parsed_expression); + auto &child = column_expressions[index]; + if (!child || child->type != ExpressionType::VALUE_CONSTANT) { + throw IOException("Failed to parse partition value from kernel-provided transformation"); + } + return child->Cast().value; +} + void ScanDataCallBack::VisitCallbackInternal(ffi::NullableCvoid engine_context, ffi::KernelStringSlice path, int64_t size, const ffi::Stats *stats, const ffi::DvInfo *dv_info, - const ffi::CStringMap *partition_values) { + const ffi::Expression *transform) { auto context = (ScanDataCallBack *)engine_context; auto &snapshot = context->snapshot; @@ -348,7 +364,7 @@ void ScanDataCallBack::VisitCallbackInternal(ffi::NullableCvoid engine_context, path_string = url_decode(path_string); // First we append the file to our resolved files - snapshot.resolved_files.push_back(DeltaMultiFileList::ToDuckDBPath(path_string)); + snapshot.resolved_files.emplace_back(DeltaMultiFileList::ToDuckDBPath(path_string)); snapshot.metadata.emplace_back(make_uniq()); D_ASSERT(snapshot.resolved_files.size() == snapshot.metadata.size()); @@ -374,31 +390,53 @@ void ScanDataCallBack::VisitCallbackInternal(ffi::NullableCvoid engine_context, } if (!do_workaround) { - auto selection_vector = - KernelUtils::UnpackResult(selection_vector_res, "selection_vector_from_dv for path " + snapshot.GetPath()); + ffi::KernelBoolSlice selection_vector; + auto res = KernelUtils::TryUnpackResult(selection_vector_res, selection_vector); + if (res.HasError()) { + context->error = res; + return; + } if (selection_vector.ptr) { snapshot.metadata.back()->selection_vector = selection_vector; } } // Lookup all columns for potential hits in the constant map - case_insensitive_map_t constant_map; - for (const auto &col : snapshot.names) { - auto key = KernelUtils::ToDeltaString(col); - auto *partition_val = (string *)ffi::get_from_string_map(partition_values, key, allocate_string); - if (partition_val) { - constant_map[col] = *partition_val; - delete partition_val; + if (transform) { + ExpressionVisitor visitor; + auto parsed_transformation_expression = visitor.VisitKernelExpression(transform); + + if (!parsed_transformation_expression) { + context->error = ErrorData(ExceptionType::IO, + "Failed to parse transformation expression from delta kernel: null returned"); + return; + } + + case_insensitive_map_t constant_map; + for (idx_t i = 0; i < snapshot.partitions.size(); ++i) { + const auto &partition_id = context->snapshot.partition_ids[i]; + const auto &partition_name = context->snapshot.partitions[i]; + + constant_map[partition_name] = + GetPartitionValueFromExpression(*parsed_transformation_expression, partition_id); + } + snapshot.metadata.back()->partition_map = std::move(constant_map); + snapshot.metadata.back()->transform_expression = + std::move(parsed_transformation_expression); // FIXME: currently not used + } else { + if (!snapshot.partitions.empty()) { + context->error = ErrorData(ExceptionType::IO, + "Failed to fetch partitions from delta kernel transform! Transform is empty"); + return; } } - snapshot.metadata.back()->partition_map = std::move(constant_map); } void ScanDataCallBack::VisitCallback(ffi::NullableCvoid engine_context, ffi::KernelStringSlice path, int64_t size, const ffi::Stats *stats, const ffi::DvInfo *dv_info, const ffi::Expression *transform, const ffi::CStringMap *partition_values) { try { - return VisitCallbackInternal(engine_context, path, size, stats, dv_info, partition_values); + return VisitCallbackInternal(engine_context, path, size, stats, dv_info, transform); } catch (std::runtime_error &e) { auto context = (ScanDataCallBack *)engine_context; context->error = ErrorData(e); @@ -407,7 +445,7 @@ void ScanDataCallBack::VisitCallback(ffi::NullableCvoid engine_context, ffi::Ker void ScanDataCallBack::VisitData(void *engine_context, ffi::ExclusiveEngineData *engine_data, const struct ffi::KernelBoolSlice selection_vec, const ffi::CTransforms *transforms) { - ffi::visit_scan_data(engine_data, selection_vec, transforms, engine_context, ScanDataCallBack::VisitCallback); + ffi::visit_scan_data(engine_data, selection_vec, transforms, engine_context, VisitCallback); } DeltaMultiFileList::DeltaMultiFileList(ClientContext &context_p, const string &path) @@ -415,7 +453,7 @@ DeltaMultiFileList::DeltaMultiFileList(ClientContext &context_p, const string &p } string DeltaMultiFileList::GetPath() const { - return GetPaths()[0]; + return GetPaths()[0].path; } string DeltaMultiFileList::ToDuckDBPath(const string &raw_path) { @@ -455,7 +493,6 @@ void DeltaMultiFileList::Bind(vector &return_types, vector EnsureSnapshotInitialized(); unique_ptr schema; - { auto snapshot_ref = snapshot->GetLockingRef(); schema = SchemaVisitor::VisitSnapshotSchema(snapshot_ref.GetPtr()); @@ -471,7 +508,7 @@ void DeltaMultiFileList::Bind(vector &return_types, vector this->types = return_types; } -string DeltaMultiFileList::GetFileInternal(idx_t i) const { +OpenFileInfo DeltaMultiFileList::GetFileInternal(idx_t i) const { EnsureScanInitialized(); // We already have this file @@ -480,7 +517,7 @@ string DeltaMultiFileList::GetFileInternal(idx_t i) const { } if (files_exhausted) { - return ""; + return OpenFileInfo(); } ScanDataCallBack callback_context(*this); @@ -493,12 +530,16 @@ string DeltaMultiFileList::GetFileInternal(idx_t i) const { callback_context.error.Throw(); } - auto have_scan_data = TryUnpackKernelResult(have_scan_data_res); + bool have_scan_data; + auto scan_data_res = KernelUtils::TryUnpackResult(have_scan_data_res, have_scan_data); + if (scan_data_res.HasError()) { + throw IOException("Failed to unpack scan data from kernel: %s", scan_data_res.RawMessage()); + } // kernel has indicated that we have no more data to scan if (!have_scan_data) { files_exhausted = true; - return ""; + return OpenFileInfo(); } } @@ -507,22 +548,22 @@ string DeltaMultiFileList::GetFileInternal(idx_t i) const { idx_t DeltaMultiFileList::GetTotalFileCountInternal() const { idx_t i = resolved_files.size(); - while (!GetFileInternal(i).empty()) { + while (!GetFileInternal(i).path.empty()) { i++; } return resolved_files.size(); } -string DeltaMultiFileList::GetFile(idx_t i) { +OpenFileInfo DeltaMultiFileList::GetFile(idx_t i) { // TODO: profile this: we should be able to use atomics here to optimize unique_lock lck(lock); return GetFileInternal(i); } void DeltaMultiFileList::InitializeSnapshot() const { - auto path_slice = KernelUtils::ToDeltaString(paths[0]); + auto path_slice = KernelUtils::ToDeltaString(paths[0].path); - auto interface_builder = CreateBuilder(context, paths[0]); + auto interface_builder = CreateBuilder(context, paths[0].path); extern_engine = TryUnpackKernelResult(ffi::builder_build(interface_builder)); if (!snapshot) { @@ -537,6 +578,99 @@ void DeltaMultiFileList::InitializeSnapshot() const { initialized_snapshot = true; } +static void InjectColumnIdentifiers(const vector &names, const vector &types, + const vector &all_names, const vector &all_types, + vector &global_column_defs) { + for (idx_t i = 0; i < names.size(); i++) { + auto &col = global_column_defs[i]; + col.default_expression = make_uniq(Value(col.type)); + col.identifier = Value(all_names[i]); + + if (col.type.id() == LogicalTypeId::STRUCT) { + vector child_names; + vector child_types; + for (idx_t j = 0; j < StructType::GetChildCount(col.type); j++) { + child_names.emplace_back(StructType::GetChildName(col.type, j)); + child_types.emplace_back(StructType::GetChildType(col.type, j)); + } + + vector child_all_names; + vector child_all_types; + for (idx_t j = 0; j < StructType::GetChildCount(all_types[i]); j++) { + child_all_names.emplace_back(StructType::GetChildName(all_types[i], j)); + child_all_types.emplace_back(StructType::GetChildType(all_types[i], j)); + } + + InjectColumnIdentifiers(child_names, child_types, child_all_names, child_all_types, col.children); + } + } +} + +static vector ConstructGlobalColDefs(const vector &names, + const vector &types, + const vector &partitions, + ffi::SharedGlobalScanState *scan_state) { + vector physical_names; + vector physical_types; + vector logical_names; + vector logical_types; + unordered_map name_map; + unordered_map physical_type_map; + unordered_set partition_set; + + for (const auto &partition : partitions) { + partition_set.insert(partition); + } + + auto schema_physical = SchemaVisitor::VisitSnapshotGlobalReadSchema(scan_state, false); + auto schema_logical = SchemaVisitor::VisitSnapshotGlobalReadSchema(scan_state, true); + + for (idx_t i = 0; i < schema_physical->size(); i++) { + physical_names.push_back((*schema_physical)[i].first); + physical_types.push_back((*schema_physical)[i].second); + } + for (idx_t i = 0; i < schema_logical->size(); i++) { + logical_names.push_back((*schema_logical)[i].first); + logical_types.push_back((*schema_logical)[i].second); + } + + idx_t physical_idx = 0; + for (idx_t i = 0; i < logical_names.size(); i++) { + auto &logical_name = logical_names[i]; + if (partition_set.find(logical_name) != partition_set.end()) { + continue; + } + if (physical_idx >= physical_names.size()) { + throw IOException("Failed to map physical schema to logical"); + } + name_map[logical_names[i]] = physical_names[physical_idx]; + physical_type_map[logical_names[i]] = physical_types[physical_idx]; + physical_idx++; + } + + vector all_names; + vector all_types; + for (idx_t i = 0; i < names.size(); i++) { + auto &name = names[i]; + auto &type = types[i]; + + auto lu = name_map.find(name); + if (lu != name_map.end()) { + all_names.push_back(lu->second); + all_types.push_back(physical_type_map[name]); + } else { + all_names.push_back(name); + all_types.push_back(type); + } + } + + auto global_column_defs = MultiFileColumnDefinition::ColumnsFromNamesAndTypes(names, types); + + InjectColumnIdentifiers(names, types, all_names, all_types, global_column_defs); + + return global_column_defs; +} + void DeltaMultiFileList::InitializeScan() const { auto snapshot_ref = snapshot->GetLockingRef(); @@ -544,6 +678,11 @@ void DeltaMultiFileList::InitializeScan() const { PredicateVisitor visitor(names, &table_filters); scan = TryUnpackKernelResult(ffi::scan(snapshot_ref.GetPtr(), extern_engine.get(), &visitor)); + if (visitor.error_data.HasError()) { + throw IOException("Failed to initialize Scan for Delta table at '%s'. Original error: '%s'", paths[0].path, + visitor.error_data.Message()); + } + // Create GlobalState global_state = ffi::get_global_scan_state(scan.get()); @@ -551,15 +690,31 @@ void DeltaMultiFileList::InitializeScan() const { scan_data_iterator = TryUnpackKernelResult(ffi::kernel_scan_data_init(extern_engine.get(), scan.get())); // Load partitions - auto partition_count = ffi::get_partition_column_count(global_state.get()); + auto partition_count = ffi::get_partition_column_count(snapshot_ref.GetPtr()); if (partition_count > 0) { - auto string_slice_iterator = ffi::get_partition_columns(global_state.get()); + auto string_slice_iterator = ffi::get_partition_columns(snapshot_ref.GetPtr()); KernelPartitionVisitorData data; while (string_slice_next(string_slice_iterator, &data, KernelPartitionStringVisitor)) { } partitions = data.partitions; + + for (auto &partition : partitions) { + for (idx_t i = 0; i < names.size(); i++) { + if (partition == names[i]) { + partition_ids.push_back(i); + break; + } + } + } + + if (partitions.size() != partition_ids.size()) { + throw IOException("Failed to map partitions to columns"); + } } + + lazy_loaded_schema = ConstructGlobalColDefs(names, types, partitions, global_state.get()); + initialized_scan = true; } @@ -578,7 +733,7 @@ void DeltaMultiFileList::EnsureScanInitialized() const { unique_ptr DeltaMultiFileList::PushdownInternal(ClientContext &context, TableFilterSet &new_filters) const { - auto filtered_list = make_uniq(context, paths[0]); + auto filtered_list = make_uniq(context, paths[0].path); TableFilterSet result_filter_set; @@ -596,6 +751,8 @@ unique_ptr DeltaMultiFileList::PushdownInternal(ClientContex filtered_list->table_filters = std::move(result_filter_set); filtered_list->names = names; + filtered_list->types = types; + filtered_list->lazy_loaded_schema = lazy_loaded_schema; // Copy over the snapshot, this avoids reparsing metadata { @@ -606,8 +763,7 @@ unique_ptr DeltaMultiFileList::PushdownInternal(ClientContex return filtered_list; } -static DeltaFilterPushdownMode GetDeltaFilterPushdownMode(ClientContext &context, - const MultiFileReaderOptions &options) { +static DeltaFilterPushdownMode GetDeltaFilterPushdownMode(ClientContext &context, const MultiFileOptions &options) { auto res = options.custom_options.find("pushdown_filters"); if (res != options.custom_options.end()) { auto str = res->second.GetValue(); @@ -617,7 +773,7 @@ static DeltaFilterPushdownMode GetDeltaFilterPushdownMode(ClientContext &context return DEFAULT_PUSHDOWN_MODE; } unique_ptr DeltaMultiFileList::ComplexFilterPushdown(ClientContext &context, - const MultiFileReaderOptions &options, + const MultiFileOptions &options, MultiFilePushdownInfo &info, vector> &filters) { auto pushdown_mode = GetDeltaFilterPushdownMode(context, options); @@ -635,7 +791,8 @@ unique_ptr DeltaMultiFileList::ComplexFilterPushdown(ClientContex combiner.AddFilter(riter->get()->Copy()); } - auto filter_set = combiner.GenerateTableScanFilters(info.column_indexes); + vector pushdown_results; + auto filter_set = combiner.GenerateTableScanFilters(info.column_indexes, pushdown_results); if (filter_set.filters.empty()) { return nullptr; } @@ -749,7 +906,7 @@ void DeltaMultiFileList::ReportFilterPushdown(ClientContext &context, DeltaMulti } unique_ptr -DeltaMultiFileList::DynamicFilterPushdown(ClientContext &context, const MultiFileReaderOptions &options, +DeltaMultiFileList::DynamicFilterPushdown(ClientContext &context, const MultiFileOptions &options, const vector &names, const vector &types, const vector &column_ids, TableFilterSet &filters) const { auto pushdown_mode = GetDeltaFilterPushdownMode(context, options); @@ -782,11 +939,11 @@ DeltaMultiFileList::DynamicFilterPushdown(ClientContext &context, const MultiFil return nullptr; } -vector DeltaMultiFileList::GetAllFiles() { +vector DeltaMultiFileList::GetAllFiles() { unique_lock lck(lock); idx_t i = resolved_files.size(); // TODO: this can probably be improved - while (!GetFileInternal(i).empty()) { + while (!GetFileInternal(i).path.empty()) { i++; } return resolved_files; @@ -852,6 +1009,12 @@ vector DeltaMultiFileList::GetPartitionColumns() { return partitions; } +vector &DeltaMultiFileList::GetLazyLoadedGlobalColumns() const { + unique_lock lck(lock); + EnsureScanInitialized(); + return lazy_loaded_schema; +} + unique_ptr DeltaMultiFileReader::CreateInstance(const TableFunction &table_function) { auto result = make_uniq(); @@ -862,4 +1025,4 @@ unique_ptr DeltaMultiFileReader::CreateInstance(const TableFunc return std::move(result); } -} // namespace duckdb \ No newline at end of file +} // namespace duckdb diff --git a/src/functions/delta_scan/delta_multi_file_reader.cpp b/src/functions/delta_scan/delta_multi_file_reader.cpp index b36853a..5721f49 100644 --- a/src/functions/delta_scan/delta_multi_file_reader.cpp +++ b/src/functions/delta_scan/delta_multi_file_reader.cpp @@ -1,7 +1,6 @@ #include "functions/delta_scan/delta_multi_file_list.hpp" #include "functions/delta_scan/delta_multi_file_reader.hpp" - -#include +#include "functions/delta_scan/delta_scan.hpp" #include "duckdb/common/local_file_system.hpp" #include "duckdb/common/types/data_chunk.hpp" @@ -21,42 +20,43 @@ namespace duckdb { -// Generate the correct Selection Vector Based on the Raw delta KernelBoolSlice dv and the row_id_column -// TODO: this probably is slower than needed (we can do with less branches in the for loop for most cases) -static SelectionVector DuckSVFromDeltaSV(const ffi::KernelBoolSlice &dv, Vector row_id_column, idx_t count, - idx_t &select_count) { - D_ASSERT(row_id_column.GetType() == LogicalType::BIGINT); - - UnifiedVectorFormat data; - row_id_column.ToUnifiedFormat(count, data); - auto row_ids = UnifiedVectorFormat::GetData(data); +constexpr column_t DeltaMultiFileReader::DELTA_FILE_NUMBER_COLUMN_ID; - SelectionVector result {count}; - idx_t current_select = 0; - for (idx_t i = 0; i < count; i++) { - auto row_id = row_ids[data.sel->get_index(i)]; +struct DeltaDeleteFilter : public DeleteFilter { +public: + DeltaDeleteFilter(const ffi::KernelBoolSlice &dv) : dv(dv) { + } - if (row_id >= dv.len || dv.ptr[row_id]) { - result.data()[current_select] = i; - current_select++; +public: + idx_t Filter(row_t start_row_index, idx_t count, SelectionVector &result_sel) override { + if (count == 0) { + return 0; } + result_sel.Initialize(STANDARD_VECTOR_SIZE); + idx_t current_select = 0; + for (idx_t i = 0; i < count; i++) { + auto row_id = i + start_row_index; + + const bool is_selected = row_id >= dv.len || dv.ptr[row_id]; + result_sel.set_index(current_select, i); + current_select += is_selected; + } + return current_select; } - select_count = current_select; - - return result; -} +public: + const ffi::KernelBoolSlice &dv; +}; -// Note: this overrides MultifileReader::FinalizeBind removing the lines adding the hive_partitioning indexes -// the reason is that we (ab)use those to use them to forward the delta partitioning information. -static void FinalizeBindBaseOverride(const MultiFileReaderOptions &file_options, const MultiFileReaderBindData &options, - const string &filename, - const vector &local_columns, - const vector &global_columns, - const vector &global_column_ids, MultiFileReaderData &reader_data, - ClientContext &context, optional_ptr global_state) { +void FinalizeBindBaseOverride(MultiFileReaderData &reader_data, const MultiFileOptions &file_options, + const MultiFileReaderBindData &options, + const vector &global_columns, + const vector &global_column_ids, ClientContext &context, + optional_ptr global_state) { // create a map of name -> column index + auto &local_columns = reader_data.reader->GetColumns(); + auto &filename = reader_data.reader->GetFileName(); case_insensitive_map_t name_map; if (file_options.union_by_name) { for (idx_t col_idx = 0; col_idx < local_columns.size(); col_idx++) { @@ -65,21 +65,32 @@ static void FinalizeBindBaseOverride(const MultiFileReaderOptions &file_options, } } for (idx_t i = 0; i < global_column_ids.size(); i++) { - auto &col_idx = global_column_ids[i]; - if (col_idx.IsRowIdColumn()) { - // row-id - reader_data.constant_map.emplace_back(i, Value::BIGINT(42)); + auto global_idx = MultiFileGlobalIndex(i); + auto &col_id = global_column_ids[i]; + auto column_id = col_id.GetPrimaryIndex(); + if ((options.filename_idx.IsValid() && column_id == options.filename_idx.GetIndex()) || + column_id == MultiFileReader::COLUMN_IDENTIFIER_FILENAME) { + // filename + reader_data.constant_map.Add(global_idx, Value(filename)); + continue; + } + if (column_id == MultiFileReader::COLUMN_IDENTIFIER_FILE_INDEX) { + // filename + reader_data.constant_map.Add(global_idx, Value::UBIGINT(reader_data.reader->file_list_idx.GetIndex())); continue; } - auto column_id = col_idx.GetPrimaryIndex(); - if (column_id == options.filename_idx) { + if (column_id == DeltaMultiFileReader::DELTA_FILE_NUMBER_COLUMN_ID) { // filename - reader_data.constant_map.emplace_back(i, Value(filename)); + reader_data.constant_map.Add(global_idx, Value::UBIGINT(7)); + continue; + } + + if (IsVirtualColumn(column_id)) { continue; } if (file_options.union_by_name) { auto &column = global_columns[column_id]; - auto &name = column.name; + auto name = column.name; auto &type = column.type; auto entry = name_map.find(name); @@ -87,47 +98,23 @@ static void FinalizeBindBaseOverride(const MultiFileReaderOptions &file_options, if (not_present_in_file) { // we need to project a column with name \"global_name\" - but it does not exist in the current file // push a NULL value of the specified type - reader_data.constant_map.emplace_back(i, Value(type)); + reader_data.constant_map.Add(global_idx, Value(type)); continue; } } } } -// Parses the columns that are used by the delta extension into -void DeltaMultiFileReaderGlobalState::SetColumnIdx(const string &column, idx_t idx) { - if (column == "file_row_number") { - file_row_number_idx = idx; - return; - } else if (column == "delta_file_number") { - delta_file_number_idx = idx; - return; - } - throw IOException("Unknown column '%s' found as required by the DeltaMultiFileReader"); -} - -bool DeltaMultiFileReader::Bind(MultiFileReaderOptions &options, MultiFileList &files, - vector &return_types, vector &names, - MultiFileReaderBindData &bind_data) { +bool DeltaMultiFileReader::Bind(MultiFileOptions &options, MultiFileList &files, vector &return_types, + vector &names, MultiFileReaderBindData &bind_data) { auto &delta_snapshot = dynamic_cast(files); delta_snapshot.Bind(return_types, names); - // We need to parse this option - bool file_row_number_enabled = options.custom_options.find("file_row_number") != options.custom_options.end(); - if (file_row_number_enabled) { - bind_data.file_row_number_idx = names.size(); - return_types.emplace_back(LogicalType::BIGINT); - names.emplace_back("file_row_number"); - } else { - // TODO: this is a bogus ID? Change for flag indicating it should be enabled? - bind_data.file_row_number_idx = names.size(); - } - return true; -}; +} -void DeltaMultiFileReader::BindOptions(MultiFileReaderOptions &options, MultiFileList &files, +void DeltaMultiFileReader::BindOptions(MultiFileOptions &options, MultiFileList &files, vector &return_types, vector &names, MultiFileReaderBindData &bind_data) { @@ -161,62 +148,82 @@ void DeltaMultiFileReader::BindOptions(MultiFileReaderOptions &options, MultiFil } } - auto demo_gen_col_opt = options.custom_options.find("delta_file_number"); - if (demo_gen_col_opt != options.custom_options.end()) { - if (demo_gen_col_opt->second.GetValue()) { - names.push_back("delta_file_number"); - return_types.push_back(LogicalType::UBIGINT); - } + // FIXME: this is slightly hacky here + bind_data.schema = MultiFileColumnDefinition::ColumnsFromNamesAndTypes(names, return_types); + + // Set defaults + for (auto &col : bind_data.schema) { + col.default_expression = make_uniq(Value(col.type)); } } -void DeltaMultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_options, - const MultiFileReaderBindData &options, const string &filename, - const vector &local_columns, - const vector &global_columns, - const vector &global_column_ids, MultiFileReaderData &reader_data, - ClientContext &context, optional_ptr global_state) { - FinalizeBindBaseOverride(file_options, options, filename, local_columns, global_columns, global_column_ids, - reader_data, context, global_state); - - // Handle custom delta option set in MultiFileReaderOptions::custom_options - auto file_number_opt = file_options.custom_options.find("delta_file_number"); - if (file_number_opt != file_options.custom_options.end()) { - if (file_number_opt->second.GetValue()) { - D_ASSERT(global_state); - auto &delta_global_state = global_state->Cast(); - D_ASSERT(delta_global_state.delta_file_number_idx != DConstants::INVALID_INDEX); - - // We add the constant column for the delta_file_number option - // NOTE: we add a placeholder here, to demonstrate how we can also populate extra columns in the - // FinalizeChunk - reader_data.constant_map.emplace_back(delta_global_state.delta_file_number_idx, Value::UBIGINT(0)); +ReaderInitializeType DeltaMultiFileReader::InitializeReader(MultiFileReaderData &reader_data, + const MultiFileBindData &bind_data, + const vector &global_columns, + const vector &global_column_ids, + optional_ptr table_filters, + ClientContext &context, MultiFileGlobalState &gstate) { + auto &global_state = gstate.multi_file_reader_state; + D_ASSERT(global_state); + auto &delta_global_state = global_state->Cast(); + auto &snapshot = delta_global_state.file_list->Cast(); + + vector *global_columns_to_use; + auto &scan_columns = snapshot.GetLazyLoadedGlobalColumns(); + vector column_copy; + if (scan_columns.size() != global_columns.size()) { + column_copy = scan_columns; + for (idx_t i = scan_columns.size(); i < global_columns.size(); i++) { + column_copy.push_back(global_columns[i]); } + global_columns_to_use = &column_copy; + } else { + global_columns_to_use = &scan_columns; } + FinalizeBind(reader_data, bind_data.file_options, bind_data.reader_bind, *global_columns_to_use, global_column_ids, + context, global_state); + return CreateMapping(context, reader_data, *global_columns_to_use, global_column_ids, table_filters, + gstate.file_list, bind_data.reader_bind, bind_data.virtual_columns); +} + +void DeltaMultiFileReader::FinalizeBind(MultiFileReaderData &reader_data, const MultiFileOptions &file_options, + const MultiFileReaderBindData &options, + const vector &global_columns, + const vector &global_column_ids, ClientContext &context, + optional_ptr global_state) { + FinalizeBindBaseOverride(reader_data, file_options, options, global_columns, global_column_ids, context, + global_state); + // Get the metadata for this file D_ASSERT(global_state->file_list); const auto &snapshot = dynamic_cast(*global_state->file_list); - auto &file_metadata = snapshot.GetMetaData(reader_data.file_list_idx.GetIndex()); + auto &file_metadata = snapshot.GetMetaData(reader_data.reader->file_list_idx.GetIndex()); + // TODO: inject these in the global column definitions instead? if (!file_metadata.partition_map.empty()) { for (idx_t i = 0; i < global_column_ids.size(); i++) { + auto global_idx = MultiFileGlobalIndex(i); column_t col_id = global_column_ids[i].GetPrimaryIndex(); - if (IsRowIdColumnId(col_id)) { + + if (IsVirtualColumn(col_id)) { continue; } + auto col_partition_entry = file_metadata.partition_map.find(global_columns[col_id].name); if (col_partition_entry != file_metadata.partition_map.end()) { auto ¤t_type = global_columns[col_id].type; - if (current_type == LogicalType::BLOB) { - reader_data.constant_map.emplace_back(i, Value::BLOB_RAW(col_partition_entry->second)); - } else { - auto maybe_value = Value(col_partition_entry->second).DefaultCastAs(current_type); - reader_data.constant_map.emplace_back(i, maybe_value); - } + auto maybe_value = Value(col_partition_entry->second).DefaultCastAs(current_type); + reader_data.constant_map.Add(global_idx, maybe_value); } } } + + auto &reader = *reader_data.reader; + if (file_metadata.selection_vector.ptr) { + //! Push the deletes into the parquet scan + reader.deletion_filter = make_uniq(file_metadata.selection_vector); + } } shared_ptr DeltaMultiFileReader::CreateFileList(ClientContext &context, const vector &paths, @@ -237,9 +244,9 @@ shared_ptr DeltaMultiFileReader::CreateFileList(ClientContext &co } unique_ptr -DeltaMultiFileReader::InitializeGlobalState(ClientContext &context, const MultiFileReaderOptions &file_options, +DeltaMultiFileReader::InitializeGlobalState(ClientContext &context, const MultiFileOptions &file_options, const MultiFileReaderBindData &bind_data, const MultiFileList &file_list, - const vector &global_columns, + const vector &global_columns, const vector &global_column_ids) { vector extra_columns; vector> mapped_columns; @@ -248,227 +255,37 @@ DeltaMultiFileReader::InitializeGlobalState(ClientContext &context, const MultiF case_insensitive_map_t selected_columns; for (idx_t i = 0; i < global_column_ids.size(); i++) { auto global_id = global_column_ids[i].GetPrimaryIndex(); - if (IsRowIdColumnId(global_id)) { - continue; - } - auto &global_name = global_columns[global_id].name; - selected_columns.insert({global_name, i}); - } - - // TODO: only add file_row_number column if there are deletes - case_insensitive_map_t columns_to_map = { - {"file_row_number", LogicalType::BIGINT}, - }; - - // Add the delta_file_number column to the columns to map - auto demo_gen_col_opt = file_options.custom_options.find("delta_file_number"); - if (demo_gen_col_opt != file_options.custom_options.end()) { - if (demo_gen_col_opt->second.GetValue()) { - columns_to_map.insert({"delta_file_number", LogicalType::UBIGINT}); - } - } - - // Map every column to either a column in the projection, or add it to the extra columns if it doesn't exist - idx_t col_offset = 0; - for (const auto &required_column : columns_to_map) { - // First check if the column is in the projection - auto res = selected_columns.find(required_column.first); - if (res != selected_columns.end()) { - // The column is in the projection, no special handling is required; we simply store the index - mapped_columns.push_back({required_column.first, res->second}); + if (IsVirtualColumn(global_id)) { continue; } - // The column is NOT in the projection: it needs to be added as an extra_column - - // Calculate the index of the added column (extra columns are added after all other columns) - idx_t current_col_idx = global_column_ids.size() + col_offset++; - - // Add column to the map, to ensure the MultiFileReader can find it when processing the Chunk - mapped_columns.push_back({required_column.first, current_col_idx}); - - // Ensure the result DataChunk has a vector of the correct type to store this column - extra_columns.push_back(required_column.second); + auto global_name = global_columns[global_id].name; + selected_columns.insert({global_name, i}); } auto res = make_uniq(extra_columns, &file_list); - // Parse all the mapped columns into the DeltaMultiFileReaderGlobalState for easy use; - for (const auto &mapped_column : mapped_columns) { - res->SetColumnIdx(mapped_column.first, mapped_column.second); - } - return std::move(res); } -// This code is duplicated from MultiFileReader::CreateNameMapping the difference is that for columns that are not found -// in the parquet files, we just add null constant columns -static void CustomMulfiFileNameMapping(const string &file_name, - const vector &local_columns, - const vector &global_columns, - const vector &global_column_ids, MultiFileReaderData &reader_data, - const string &initial_file, - optional_ptr global_state) { - // we have expected types: create a map of name -> column index - case_insensitive_map_t name_map; - for (idx_t col_idx = 0; col_idx < local_columns.size(); col_idx++) { - name_map[local_columns[col_idx].name] = col_idx; - } - for (idx_t i = 0; i < global_column_ids.size(); i++) { - // check if this is a constant column - bool constant = false; - for (auto &entry : reader_data.constant_map) { - if (entry.column_id == i) { - constant = true; - break; - } - } - if (constant) { - // this column is constant for this file - continue; - } - // not constant - look up the column in the name map - auto global_id = global_column_ids[i].GetPrimaryIndex(); - if (global_id >= global_columns.size()) { - throw InternalException( - "MultiFileReader::CreatePositionalMapping - global_id is out of range in global_types for this file"); - } - auto &global_name = global_columns[global_id].name; - auto entry = name_map.find(global_name); - if (entry == name_map.end()) { - string candidate_names; - for (auto &local_column : local_columns) { - if (!candidate_names.empty()) { - candidate_names += ", "; - } - candidate_names += local_column.name; - } - // FIXME: this override is pretty hacky: for missing columns we just insert NULL constants - auto &global_type = global_columns[global_id].type; - Value val(global_type); - reader_data.constant_map.push_back({i, val}); - continue; - } - // we found the column in the local file - check if the types are the same - auto local_id = entry->second; - D_ASSERT(global_id < global_columns.size()); - D_ASSERT(local_id < local_columns.size()); - auto &global_type = global_columns[global_id].type; - auto &local_type = local_columns[local_id].type; - if (global_type != local_type) { - reader_data.cast_map[local_id] = global_type; - } - // the types are the same - create the mapping - reader_data.column_mapping.push_back(i); - reader_data.column_ids.push_back(local_id); - } - - reader_data.empty_columns = reader_data.column_ids.empty(); -} - -void DeltaMultiFileReader::CreateColumnMapping(const string &file_name, - const vector &local_columns, - const vector &global_columns, - const vector &global_column_ids, - MultiFileReaderData &reader_data, - const MultiFileReaderBindData &bind_data, const string &initial_file, - optional_ptr global_state) { - // First call the base implementation to do most mapping - CustomMulfiFileNameMapping(file_name, local_columns, global_columns, global_column_ids, reader_data, initial_file, - global_state); - - // Then we handle delta specific mapping - D_ASSERT(global_state); - auto &delta_global_state = global_state->Cast(); - - // Check if the file_row_number column is an "extra_column" which is not part of the projection - if (delta_global_state.file_row_number_idx >= global_column_ids.size()) { - D_ASSERT(delta_global_state.file_row_number_idx != DConstants::INVALID_INDEX); - - // Build the name map - case_insensitive_map_t name_map; - for (idx_t col_idx = 0; col_idx < local_columns.size(); col_idx++) { - name_map[local_columns[col_idx].name] = col_idx; - } - - // Lookup the required column in the local map - auto entry = name_map.find("file_row_number"); - if (entry == name_map.end()) { - throw IOException("Failed to find the file_row_number column"); - } - - // Register the column to be scanned from this file - reader_data.column_ids.push_back(entry->second); - reader_data.column_mapping.push_back(delta_global_state.file_row_number_idx); - } - - // This may have changed: update it - reader_data.empty_columns = reader_data.column_ids.empty(); -} - -void DeltaMultiFileReader::FinalizeChunk(ClientContext &context, const MultiFileReaderBindData &bind_data, - const MultiFileReaderData &reader_data, DataChunk &chunk, +void DeltaMultiFileReader::FinalizeChunk(ClientContext &context, const MultiFileBindData &bind_data, + BaseFileReader &reader, const MultiFileReaderData &reader_data, + DataChunk &input_chunk, DataChunk &output_chunk, ExpressionExecutor &executor, optional_ptr global_state) { // Base class finalization first - MultiFileReader::FinalizeChunk(context, bind_data, reader_data, chunk, global_state); + MultiFileReader::FinalizeChunk(context, bind_data, reader, reader_data, input_chunk, output_chunk, executor, + global_state); D_ASSERT(global_state); auto &delta_global_state = global_state->Cast(); D_ASSERT(delta_global_state.file_list); - - // Get the metadata for this file - const auto &snapshot = dynamic_cast(*global_state->file_list); - auto &metadata = snapshot.GetMetaData(reader_data.file_list_idx.GetIndex()); - - if (metadata.selection_vector.ptr && chunk.size() != 0) { - D_ASSERT(delta_global_state.file_row_number_idx != DConstants::INVALID_INDEX); - auto &file_row_number_column = chunk.data[delta_global_state.file_row_number_idx]; - - // Construct the selection vector using the file_row_number column and the raw selection vector from delta - idx_t select_count; - auto sv = DuckSVFromDeltaSV(metadata.selection_vector, file_row_number_column, chunk.size(), select_count); - chunk.Slice(sv, select_count); - } - - // Note: this demo function shows how we can use DuckDB's Binder create expression-based generated columns - if (delta_global_state.delta_file_number_idx != DConstants::INVALID_INDEX) { - //! Create Dummy expression (0 + file_number) - vector> child_expr; - child_expr.push_back(make_uniq(Value::UBIGINT(0))); - child_expr.push_back(make_uniq(Value::UBIGINT(7))); - unique_ptr expr = - make_uniq("+", std::move(child_expr), nullptr, nullptr, false, true); - - //! s dummy expression - auto binder = Binder::CreateBinder(context); - ExpressionBinder expr_binder(*binder, context); - auto bound_expr = expr_binder.Bind(expr, nullptr); - - //! Execute dummy expression into result column - ExpressionExecutor expr_executor(context); - expr_executor.AddExpression(*bound_expr); - - //! Execute the expression directly into the output Chunk - expr_executor.ExecuteExpression(chunk.data[delta_global_state.delta_file_number_idx]); - } }; -bool DeltaMultiFileReader::ParseOption(const string &key, const Value &val, MultiFileReaderOptions &options, +bool DeltaMultiFileReader::ParseOption(const string &key, const Value &val, MultiFileOptions &options, ClientContext &context) { auto loption = StringUtil::Lower(key); - if (loption == "delta_file_number") { - options.custom_options[loption] = val; - return true; - } - - // We need to capture this one to know whether to emit - if (loption == "file_row_number") { - options.custom_options[loption] = val; - return true; - } - if (loption == "pushdown_partition_info") { options.custom_options["pushdown_partition_info"] = val; return true; @@ -483,4 +300,4 @@ bool DeltaMultiFileReader::ParseOption(const string &key, const Value &val, Mult return MultiFileReader::ParseOption(key, val, options, context); } -} // namespace duckdb \ No newline at end of file +} // namespace duckdb diff --git a/src/functions/delta_scan/delta_scan.cpp b/src/functions/delta_scan/delta_scan.cpp index e663376..a1e8d37 100644 --- a/src/functions/delta_scan/delta_scan.cpp +++ b/src/functions/delta_scan/delta_scan.cpp @@ -55,6 +55,23 @@ static InsertionOrderPreservingMap DeltaFunctionToString(TableFunctionTo return result; } +virtual_column_map_t DeltaVirtualColumns(ClientContext &, optional_ptr bind_data_p) { + virtual_column_map_t result; + result.insert( + make_pair(MultiFileReader::COLUMN_IDENTIFIER_FILENAME, TableColumn("filename", LogicalType::VARCHAR))); + result.insert(make_pair(MultiFileReader::COLUMN_IDENTIFIER_FILE_ROW_NUMBER, + TableColumn("file_row_number", LogicalType::BIGINT))); + result.insert(make_pair(COLUMN_IDENTIFIER_ROW_ID, TableColumn("rowid", LogicalType::BIGINT))); + result.insert(make_pair(COLUMN_IDENTIFIER_EMPTY, TableColumn("", LogicalType::BOOLEAN))); + + result.insert(make_pair(DeltaMultiFileReader::DELTA_FILE_NUMBER_COLUMN_ID, + TableColumn("delta_file_number", LogicalType::UBIGINT))); + + auto &bind_data = bind_data_p->Cast(); + bind_data.virtual_columns = result; + return result; +} + TableFunctionSet DeltaFunctions::GetDeltaScanFunction(DatabaseInstance &instance) { // Parquet extension needs to be loaded for this to make sense ExtensionHelper::AutoLoadExtension(instance, "parquet"); @@ -75,15 +92,14 @@ TableFunctionSet DeltaFunctions::GetDeltaScanFunction(DatabaseInstance &instance function.statistics = nullptr; function.table_scan_progress = nullptr; function.get_bind_info = nullptr; + function.get_virtual_columns = DeltaVirtualColumns; + function.late_materialization = false; function.to_string = DeltaFunctionToString; // Schema param is just confusing here function.named_parameters.erase("schema"); - // Demonstration of a generated column based on information from DeltaMultiFileList - function.named_parameters["delta_file_number"] = LogicalType::BOOLEAN; - function.named_parameters["pushdown_partition_info"] = LogicalType::BOOLEAN; function.named_parameters["pushdown_filters"] = LogicalType::VARCHAR; diff --git a/src/include/delta_kernel_ffi.hpp b/src/include/delta_kernel_ffi.hpp index bd5cda5..32645be 100644 --- a/src/include/delta_kernel_ffi.hpp +++ b/src/include/delta_kernel_ffi.hpp @@ -430,7 +430,7 @@ struct EngineExpressionVisitor { /// Visit a 128bit `decimal` value with the given precision and scale. The 128bit integer /// is split into the most significant 64 bits in `value_ms`, and the least significant 64 /// bits in `value_ls`. The `decimal` belongs to the list identified by `sibling_list_id`. - void (*visit_literal_decimal)(void *data, uintptr_t sibling_list_id, uint64_t value_ms, uint64_t value_ls, + void (*visit_literal_decimal)(void *data, uintptr_t sibling_list_id, int64_t value_ms, uint64_t value_ls, uint8_t precision, uint8_t scale); /// Visit a struct literal belonging to the list identified by `sibling_list_id`. /// The field names of the struct are in a list identified by `child_field_list_id`. @@ -762,9 +762,21 @@ void free_schema(Handle schema); /// /// # Safety /// -/// Caller is responsible for passing a valid handle. +/// Caller is responsible for passing a valid snapshot handle. NullableCvoid snapshot_table_root(Handle snapshot, AllocateStringFn allocate_fn); +/// Get a count of the number of partition columns for this snapshot +/// +/// # Safety +/// Caller is responsible for passing a valid snapshot handle +uintptr_t get_partition_column_count(Handle snapshot); + +/// Get an iterator of the list of partition columns for this snapshot. +/// +/// # Safety +/// Caller is responsible for passing a valid snapshot handle. +Handle get_partition_columns(Handle snapshot); + /// # Safety /// /// The iterator must be valid (returned by [kernel_scan_data_init]) and not yet freed by @@ -1023,18 +1035,6 @@ Handle get_global_read_schema(Handle state) /// Engine is responsible for providing a valid GlobalScanState pointer Handle get_global_logical_schema(Handle state); -/// Get a count of the number of partition columns for this scan -/// -/// # Safety -/// Caller is responsible for passing a valid global scan pointer. -uintptr_t get_partition_column_count(Handle state); - -/// Get an iterator of the list of partition columns for this scan. -/// -/// # Safety -/// Caller is responsible for passing a valid global scan pointer. -Handle get_partition_columns(Handle state); - /// # Safety /// /// Caller is responsible for passing a valid global scan state pointer. diff --git a/src/include/delta_log_types.hpp b/src/include/delta_log_types.hpp new file mode 100644 index 0000000..9b9c118 --- /dev/null +++ b/src/include/delta_log_types.hpp @@ -0,0 +1,42 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// delta_log_types.hpp +// +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include "delta_kernel_ffi.hpp" +#include "duckdb/main/database.hpp" + +namespace duckdb { + +class DeltaKernelLogType : public LogType { +public: + static constexpr const char *NAME = "DeltaKernel"; + static constexpr LogLevel LEVEL = + LogLevel::LOG_DEBUG; // WARNING: DeltaKernelLogType is special in that it overrides this base logtype + + //! Construct the log types + DeltaKernelLogType(); + + static LogicalType GetLogType(); + + static string ConstructLogMessage(ffi::Event event); + + // FIXME: HTTPLogType should be structured probably + static string ConstructLogMessage(const string &str) { + return str; + } +}; + +class DeltaLogTypes { +public: + static void RegisterLogTypes(DatabaseInstance &instance) { + instance.GetLogManager().RegisterLogType(make_uniq()); + } +}; + +} // namespace duckdb diff --git a/src/include/delta_utils.hpp b/src/include/delta_utils.hpp index 2eeb9bc..e31835e 100644 --- a/src/include/delta_utils.hpp +++ b/src/include/delta_utils.hpp @@ -23,6 +23,8 @@ class ExpressionVisitor : public ffi::EngineExpressionVisitor { public: unique_ptr>> VisitKernelExpression(const ffi::Handle *expression); + unique_ptr>> VisitKernelExpression(const ffi::Expression *expression); + ffi::EngineExpressionVisitor CreateVisitor(ExpressionVisitor &state); private: unordered_map> inflight_lists; @@ -60,7 +62,7 @@ class ExpressionVisitor : public ffi::EngineExpressionVisitor { static void VisitArrayLiteral(void *state, uintptr_t sibling_list_id, uintptr_t child_id); static void VisitStructLiteral(void *data, uintptr_t sibling_list_id, uintptr_t child_field_list_value, uintptr_t child_value_list_id); - static void VisitDecimalLiteral(void *state, uintptr_t sibling_list_id, uint64_t value_ms, uint64_t value_ls, + static void VisitDecimalLiteral(void *state, uintptr_t sibling_list_id, int64_t value_ms, uint64_t value_ls, uint8_t precision, uint8_t scale); static void VisitColumnExpression(void *state, uintptr_t sibling_list_id, ffi::KernelStringSlice name); static void VisitStructExpression(void *state, uintptr_t sibling_list_id, uintptr_t child_list_id); @@ -135,11 +137,16 @@ class SchemaVisitor { using FieldList = child_list_t; static unique_ptr VisitSnapshotSchema(ffi::SharedSnapshot *snapshot); + static unique_ptr VisitSnapshotGlobalReadSchema(ffi::SharedGlobalScanState *state, bool logical); private: unordered_map> inflight_lists; uintptr_t next_id = 1; + ErrorData error; + + static ffi::EngineSchemaVisitor CreateSchemaVisitor(SchemaVisitor &state); + typedef void(SimpleTypeVisitorFunction)(void *, uintptr_t, ffi::KernelStringSlice, bool is_nullable, const ffi::CStringMap *metadata); @@ -175,8 +182,8 @@ struct DuckDBEngineError : ffi::EngineError { // Convert a kernel error enum to a string static string KernelErrorEnumToString(ffi::KernelError err); - // Throw the error as an IOException - [[noreturn]] void Throw(string from_info); + // Return the error as a string (WARNING: consumes the object by calling `delete this`) + string IntoString(); // The error message from Kernel string error_message; @@ -301,28 +308,33 @@ struct KernelUtils { static string FromDeltaString(const struct ffi::KernelStringSlice slice); static vector FromDeltaBoolSlice(const struct ffi::KernelBoolSlice slice); - // TODO: all kernel results need to be unpacked, not doing so will result in an error. This should be cleaned up + // Unpacks (and frees) a kernel result, either storing the result in out_value, or setting error_data template - static T UnpackResult(ffi::ExternResult result, const string &from_where) { + static ErrorData TryUnpackResult(ffi::ExternResult result, T &out_value) { if (result.tag == ffi::ExternResult::Tag::Err) { if (result.err._0) { auto error_cast = static_cast(result.err._0); - error_cast->Throw(from_where); + return ErrorData(ExceptionType::IO, error_cast->IntoString()); } - throw IOException("Hit DeltaKernel FFI error (from: %s): Hit error, but error was nullptr", - from_where.c_str()); + return ErrorData(ExceptionType::IO, StringUtil::Format("Unknown Delta kernel error")); } if (result.tag == ffi::ExternResult::Tag::Ok) { - return result.ok._0; + out_value = result.ok._0; + return {}; } - throw IOException("Invalid error ExternResult tag found!"); + return ErrorData(ExceptionType::IO, "Invalid Delta kernel ExternResult"); } + + static vector> & + UnpackTopLevelStruct(const vector> &parsed_expression); }; class PredicateVisitor : public ffi::EnginePredicate { public: PredicateVisitor(const vector &column_names, optional_ptr filters); + ErrorData error_data; + private: unordered_map column_filters; diff --git a/src/include/functions/delta_scan/delta_multi_file_list.hpp b/src/include/functions/delta_scan/delta_multi_file_list.hpp index 4fc8067..9ca2b6c 100644 --- a/src/include/functions/delta_scan/delta_multi_file_list.hpp +++ b/src/include/functions/delta_scan/delta_multi_file_list.hpp @@ -11,7 +11,8 @@ #include "delta_utils.hpp" #include "functions/delta_scan/delta_multi_file_list.hpp" -#include "duckdb/common/multi_file_reader.hpp" +#include "duckdb/common/multi_file/multi_file_reader.hpp" +#include "duckdb/common/multi_file/multi_file_data.hpp" namespace duckdb { @@ -32,7 +33,10 @@ struct DeltaFileMetaData { idx_t file_number = DConstants::INVALID_INDEX; idx_t cardinality = DConstants::INVALID_INDEX; ffi::KernelBoolSlice selection_vector = {nullptr, 0}; - case_insensitive_map_t partition_map; + + case_insensitive_map_t partition_map; + + unique_ptr>> transform_expression; }; //! The DeltaMultiFileList implements the MultiFileList API to allow injecting it into the regular DuckDB parquet scan @@ -48,18 +52,18 @@ class DeltaMultiFileList : public MultiFileList { //! MultiFileList API public: void Bind(vector &return_types, vector &names); - unique_ptr ComplexFilterPushdown(ClientContext &context, const MultiFileReaderOptions &options, + unique_ptr ComplexFilterPushdown(ClientContext &context, const MultiFileOptions &options, MultiFilePushdownInfo &info, vector> &filters) override; - unique_ptr DynamicFilterPushdown(ClientContext &context, const MultiFileReaderOptions &options, + unique_ptr DynamicFilterPushdown(ClientContext &context, const MultiFileOptions &options, const vector &names, const vector &types, const vector &column_ids, TableFilterSet &filters) const override; unique_ptr PushdownInternal(ClientContext &context, TableFilterSet &new_filters) const; - vector GetAllFiles() override; + vector GetAllFiles() override; FileExpandResult GetExpandResult() override; idx_t GetTotalFileCount() override; unique_ptr GetCardinality(ClientContext &context) override; @@ -67,12 +71,14 @@ class DeltaMultiFileList : public MultiFileList { idx_t GetVersion(); vector GetPartitionColumns(); + vector &GetLazyLoadedGlobalColumns() const; + protected: //! Get the i-th expanded file - string GetFile(idx_t i) override; + OpenFileInfo GetFile(idx_t i) override; protected: - string GetFileInternal(idx_t i) const; + OpenFileInfo GetFileInternal(idx_t i) const; idx_t GetTotalFileCountInternal() const; void InitializeSnapshot() const; void InitializeScan() const; @@ -85,8 +91,12 @@ class DeltaMultiFileList : public MultiFileList { template T TryUnpackKernelResult(ffi::ExternResult result) const { - return KernelUtils::UnpackResult( - result, StringUtil::Format("While trying to read from delta table: '%s'", paths[0])); + T return_value; + auto res = KernelUtils::TryUnpackResult(result, return_value); + if (res.HasError()) { + res.Throw(); + } + return return_value; } protected: @@ -103,6 +113,7 @@ class DeltaMultiFileList : public MultiFileList { mutable KernelScanDataIterator scan_data_iterator; mutable vector partitions; + mutable vector partition_ids; //! Current file list resolution state mutable bool initialized_snapshot = false; @@ -112,15 +123,20 @@ class DeltaMultiFileList : public MultiFileList { //! Metadata map for files mutable vector> metadata; - mutable vector resolved_files; + mutable vector resolved_files; mutable TableFilterSet table_filters; //! Names vector names; vector types; + bool have_bound = false; ClientContext &context; + + // The schema containing the proper column identifiers, lazily loaded to avoid prematurely initializing the kernel + // scan + mutable vector lazy_loaded_schema; }; // Callback for the ffi::kernel_scan_data_next callback @@ -135,10 +151,10 @@ struct ScanDataCallBack { const struct ffi::CStringMap *partition_values); static void VisitCallbackInternal(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size, const ffi::Stats *stats, const ffi::DvInfo *dv_info, - const struct ffi::CStringMap *partition_values); + const ffi::Expression *transform); const DeltaMultiFileList &snapshot; ErrorData error; }; -} // namespace duckdb \ No newline at end of file +} // namespace duckdb diff --git a/src/include/functions/delta_scan/delta_multi_file_reader.hpp b/src/include/functions/delta_scan/delta_multi_file_reader.hpp index bc6aa05..da600ff 100644 --- a/src/include/functions/delta_scan/delta_multi_file_reader.hpp +++ b/src/include/functions/delta_scan/delta_multi_file_reader.hpp @@ -9,7 +9,9 @@ #pragma once #include "delta_utils.hpp" -#include "duckdb/common/multi_file_reader.hpp" +#include "duckdb/common/multi_file/multi_file_reader.hpp" +#include "duckdb/common/multi_file/multi_file_data.hpp" +#include "duckdb/common/multi_file/multi_file_states.hpp" namespace duckdb { @@ -19,15 +21,11 @@ struct DeltaMultiFileReaderGlobalState : public MultiFileReaderGlobalState { DeltaMultiFileReaderGlobalState(vector extra_columns_p, optional_ptr file_list_p) : MultiFileReaderGlobalState(extra_columns_p, file_list_p) { } - //! The idx of the file number column in the result chunk - idx_t delta_file_number_idx = DConstants::INVALID_INDEX; - //! The idx of the file_row_number column in the result chunk - idx_t file_row_number_idx = DConstants::INVALID_INDEX; - - void SetColumnIdx(const string &column, idx_t idx); }; struct DeltaMultiFileReader : public MultiFileReader { + static constexpr column_t DELTA_FILE_NUMBER_COLUMN_ID = UINT64_C(10000000000000000000); + static unique_ptr CreateInstance(const TableFunction &table_function); //! Return a DeltaMultiFileList shared_ptr CreateFileList(ClientContext &context, const vector &paths, @@ -35,43 +33,41 @@ struct DeltaMultiFileReader : public MultiFileReader { //! Override the regular parquet bind using the MultiFileReader Bind. The bind from these are what DuckDB's file //! readers will try read - bool Bind(MultiFileReaderOptions &options, MultiFileList &files, vector &return_types, - vector &names, MultiFileReaderBindData &bind_data) override; + bool Bind(MultiFileOptions &options, MultiFileList &files, vector &return_types, vector &names, + MultiFileReaderBindData &bind_data) override; //! Override the Options bind - void BindOptions(MultiFileReaderOptions &options, MultiFileList &files, vector &return_types, + void BindOptions(MultiFileOptions &options, MultiFileList &files, vector &return_types, vector &names, MultiFileReaderBindData &bind_data) override; - void CreateColumnMapping(const string &file_name, const vector &local_columns, - const vector &global_columns, - const vector &global_column_ids, MultiFileReaderData &reader_data, - const MultiFileReaderBindData &bind_data, const string &initial_file, - optional_ptr global_state) override; - unique_ptr - InitializeGlobalState(ClientContext &context, const MultiFileReaderOptions &file_options, + InitializeGlobalState(ClientContext &context, const MultiFileOptions &file_options, const MultiFileReaderBindData &bind_data, const MultiFileList &file_list, - const vector &global_columns, + const vector &global_columns, const vector &global_column_ids) override; - void FinalizeBind(const MultiFileReaderOptions &file_options, const MultiFileReaderBindData &options, - const string &filename, const vector &local_columns, - const vector &global_columns, - const vector &global_column_ids, MultiFileReaderData &reader_data, - ClientContext &context, optional_ptr global_state) override; + ReaderInitializeType InitializeReader(MultiFileReaderData &reader_data, const MultiFileBindData &bind_data, + const vector &global_columns, + const vector &global_column_ids, + optional_ptr table_filters, ClientContext &context, + MultiFileGlobalState &gstate) override; + + void FinalizeBind(MultiFileReaderData &reader_data, const MultiFileOptions &file_options, + const MultiFileReaderBindData &options, const vector &global_columns, + const vector &global_column_ids, ClientContext &context, + optional_ptr global_state) override; //! Override the FinalizeChunk method - void FinalizeChunk(ClientContext &context, const MultiFileReaderBindData &bind_data, - const MultiFileReaderData &reader_data, DataChunk &chunk, - optional_ptr global_state) override; + void FinalizeChunk(ClientContext &context, const MultiFileBindData &bind_data, BaseFileReader &reader, + const MultiFileReaderData &reader_data, DataChunk &input_chunk, DataChunk &output_chunk, + ExpressionExecutor &executor, optional_ptr global_state) override; //! Override the ParseOption call to parse delta_scan specific options - bool ParseOption(const string &key, const Value &val, MultiFileReaderOptions &options, - ClientContext &context) override; + bool ParseOption(const string &key, const Value &val, MultiFileOptions &options, ClientContext &context) override; // A snapshot can be injected into the multifilereader, this ensures the GetMultiFileList can return this snapshot // (note that the path should match the one passed to CreateFileList) shared_ptr snapshot; }; -} // namespace duckdb \ No newline at end of file +} // namespace duckdb diff --git a/src/include/storage/delta_catalog.hpp b/src/include/storage/delta_catalog.hpp index 3f5f979..19dd1ad 100644 --- a/src/include/storage/delta_catalog.hpp +++ b/src/include/storage/delta_catalog.hpp @@ -45,18 +45,18 @@ class DeltaCatalog : public Catalog { void ScanSchemas(ClientContext &context, std::function callback) override; - optional_ptr GetSchema(CatalogTransaction transaction, const string &schema_name, - OnEntryNotFound if_not_found, - QueryErrorContext error_context = QueryErrorContext()) override; - - unique_ptr PlanInsert(ClientContext &context, LogicalInsert &op, - unique_ptr plan) override; - unique_ptr PlanCreateTableAs(ClientContext &context, LogicalCreateTable &op, - unique_ptr plan) override; - unique_ptr PlanDelete(ClientContext &context, LogicalDelete &op, - unique_ptr plan) override; - unique_ptr PlanUpdate(ClientContext &context, LogicalUpdate &op, - unique_ptr plan) override; + optional_ptr LookupSchema(CatalogTransaction transaction, const EntryLookupInfo &schema_lookup, + OnEntryNotFound if_not_found) override; + + PhysicalOperator &PlanInsert(ClientContext &context, PhysicalPlanGenerator &planner, LogicalInsert &op, + optional_ptr plan) override; + PhysicalOperator &PlanCreateTableAs(ClientContext &context, PhysicalPlanGenerator &planner, LogicalCreateTable &op, + PhysicalOperator &plan) override; + PhysicalOperator &PlanDelete(ClientContext &context, PhysicalPlanGenerator &planner, LogicalDelete &op, + PhysicalOperator &plan) override; + PhysicalOperator &PlanUpdate(ClientContext &context, PhysicalPlanGenerator &planner, LogicalUpdate &op, + PhysicalOperator &plan) override; + unique_ptr BindCreateIndex(Binder &binder, CreateStatement &stmt, TableCatalogEntry &table, unique_ptr plan) override; diff --git a/src/include/storage/delta_schema_entry.hpp b/src/include/storage/delta_schema_entry.hpp index 26c57f0..f7d0c23 100644 --- a/src/include/storage/delta_schema_entry.hpp +++ b/src/include/storage/delta_schema_entry.hpp @@ -39,7 +39,7 @@ class DeltaSchemaEntry : public SchemaCatalogEntry { void Scan(ClientContext &context, CatalogType type, const std::function &callback) override; void Scan(CatalogType type, const std::function &callback) override; void DropEntry(ClientContext &context, DropInfo &info) override; - optional_ptr GetEntry(CatalogTransaction transaction, CatalogType type, const string &name) override; + optional_ptr LookupEntry(CatalogTransaction transaction, const EntryLookupInfo &lookup_info) override; optional_ptr GetCachedTable(); diff --git a/src/storage/delta_catalog.cpp b/src/storage/delta_catalog.cpp index 5335974..0d6fd22 100644 --- a/src/storage/delta_catalog.cpp +++ b/src/storage/delta_catalog.cpp @@ -34,9 +34,10 @@ void DeltaCatalog::ScanSchemas(ClientContext &context, std::function DeltaCatalog::GetSchema(CatalogTransaction transaction, const string &schema_name, - OnEntryNotFound if_not_found, - QueryErrorContext error_context) { +optional_ptr DeltaCatalog::LookupSchema(CatalogTransaction transaction, + const EntryLookupInfo &schema_lookup, + OnEntryNotFound if_not_found) { + auto &schema_name = schema_lookup.GetEntryName(); if (schema_name == DEFAULT_SCHEMA || schema_name == INVALID_SCHEMA) { return main_schema.get(); } @@ -90,25 +91,25 @@ DatabaseSize DeltaCatalog::GetDatabaseSize(ClientContext &context) { return size; } -unique_ptr DeltaCatalog::PlanInsert(ClientContext &context, LogicalInsert &op, - unique_ptr plan) { - throw NotImplementedException("DeltaCatalog does not support inserts"); +PhysicalOperator &DeltaCatalog::PlanInsert(ClientContext &context, PhysicalPlanGenerator &planner, LogicalInsert &op, + optional_ptr plan) { + throw NotImplementedException("DeltaCatalog PlanInsert"); } -unique_ptr DeltaCatalog::PlanCreateTableAs(ClientContext &context, LogicalCreateTable &op, - unique_ptr plan) { - throw NotImplementedException("DeltaCatalog does not support creating new tables"); +PhysicalOperator &DeltaCatalog::PlanCreateTableAs(ClientContext &context, PhysicalPlanGenerator &planner, + LogicalCreateTable &op, PhysicalOperator &plan) { + throw NotImplementedException("DeltaCatalog PlanCreateTableAs"); } -unique_ptr DeltaCatalog::PlanDelete(ClientContext &context, LogicalDelete &op, - unique_ptr plan) { - throw NotImplementedException("DeltaCatalog does not support deletes"); +PhysicalOperator &DeltaCatalog::PlanDelete(ClientContext &context, PhysicalPlanGenerator &planner, LogicalDelete &op, + PhysicalOperator &plan) { + throw NotImplementedException("DeltaCatalog PlanDelete"); } -unique_ptr DeltaCatalog::PlanUpdate(ClientContext &context, LogicalUpdate &op, - unique_ptr plan) { - throw NotImplementedException("DeltaCatalog does not support updates"); +PhysicalOperator &DeltaCatalog::PlanUpdate(ClientContext &context, PhysicalPlanGenerator &planner, LogicalUpdate &op, + PhysicalOperator &plan) { + throw NotImplementedException("DeltaCatalog PlanUpdate"); } unique_ptr DeltaCatalog::BindCreateIndex(Binder &binder, CreateStatement &stmt, TableCatalogEntry &table, unique_ptr plan) { - throw NotImplementedException("DeltaCatalog does not support creating indices"); + throw NotImplementedException("DeltaCatalog BindCreateIndex"); } } // namespace duckdb diff --git a/src/storage/delta_schema_entry.cpp b/src/storage/delta_schema_entry.cpp index 6790d1d..386fa52 100644 --- a/src/storage/delta_schema_entry.cpp +++ b/src/storage/delta_schema_entry.cpp @@ -125,12 +125,14 @@ void DeltaSchemaEntry::Scan(ClientContext &context, CatalogType type, const std::function &callback) { if (CatalogTypeIsSupported(type)) { auto transaction = catalog.GetCatalogTransaction(context); - auto default_table = GetEntry(transaction, type, catalog.GetName()); + auto lookup_info = EntryLookupInfo(type, catalog.GetName()); + auto default_table = LookupEntry(transaction, lookup_info); if (default_table) { callback(*default_table); } } } + void DeltaSchemaEntry::Scan(CatalogType type, const std::function &callback) { throw NotImplementedException("Scan without context not supported"); } @@ -139,13 +141,15 @@ void DeltaSchemaEntry::DropEntry(ClientContext &context, DropInfo &info) { throw NotImplementedException("Delta tables do not support dropping"); } -optional_ptr DeltaSchemaEntry::GetEntry(CatalogTransaction transaction, CatalogType type, - const string &name) { +optional_ptr DeltaSchemaEntry::LookupEntry(CatalogTransaction transaction, + const EntryLookupInfo &lookup_info) { if (!transaction.HasContext()) { throw NotImplementedException("Can not DeltaSchemaEntry::GetEntry without context"); } auto &context = transaction.GetContext(); + auto type = lookup_info.GetCatalogType(); + auto &name = lookup_info.GetEntryName(); if (type == CatalogType::TABLE_ENTRY && name == catalog.GetName()) { auto &delta_transaction = GetDeltaTransaction(transaction); auto &delta_catalog = catalog.Cast(); @@ -164,9 +168,7 @@ optional_ptr DeltaSchemaEntry::GetEntry(CatalogTransaction transac } return delta_transaction.InitializeTableEntry(context, *this); - ; } - return nullptr; } diff --git a/test/sql/dat/attach.test b/test/sql/dat/attach.test index 59c9ea2..e3dab88 100644 --- a/test/sql/dat/attach.test +++ b/test/sql/dat/attach.test @@ -29,7 +29,7 @@ dt main dt query IIIIII show all tables; ---- -dt main dt [utf8, int64, int32, int16, int8, float32, float64, bool, binary, decimal, date32, timestamp] [VARCHAR, BIGINT, INTEGER, SMALLINT, TINYINT, FLOAT, DOUBLE, BOOLEAN, BLOB, DECIMAL(5,3), DATE, TIMESTAMP WITH TIME ZONE] false +dt main dt [utf8, int64, int32, int16, int8, float32, float64, bool, binary, decimal, date32, timestamp] [VARCHAR, BIGINT, INTEGER, SMALLINT, TINYINT, FLOAT, DOUBLE, BOOLEAN, BLOB, 'DECIMAL(5,3)', DATE, TIMESTAMP WITH TIME ZONE] 0 # We can query the table using the catalog name + the table name query I diff --git a/test/sql/dat/custom_parameters.test b/test/sql/dat/custom_parameters.test index 1a9a912..f9fc729 100644 --- a/test/sql/dat/custom_parameters.test +++ b/test/sql/dat/custom_parameters.test @@ -17,7 +17,7 @@ require-env DAT_PATH # Test with appends and several custom options query IIIII SELECT parse_filename(filename)[-15:-1], file_row_number, letter, delta_file_number, number -FROM delta_scan('${DAT_PATH}/out/reader_tests/generated/basic_append/delta', delta_file_number=1, file_row_number=1, filename=1) +FROM delta_scan('${DAT_PATH}/out/reader_tests/generated/basic_append/delta') ---- .snappy.parquet 0 d 7 4 .snappy.parquet 1 e 7 5 @@ -32,7 +32,7 @@ set enable_logging=true # Test with appends and several custom options query IIIII SELECT parse_filename(filename)[-15:-1], file_row_number, letter, delta_file_number, number -FROM delta_scan('${DAT_PATH}/out/reader_tests/generated/basic_append/delta', delta_file_number=1, file_row_number=1, filename=1) +FROM delta_scan('${DAT_PATH}/out/reader_tests/generated/basic_append/delta') WHERE filename != 'henk' and letter = 'd' ---- .snappy.parquet 0 d 7 4 @@ -42,5 +42,5 @@ WHERE filename != 'henk' and letter = 'd' query IIIII SELECT filter_type, files_before, files_after, filters_before, filters_after FROM delta_filter_pushdown_log() order by filter_type; ---- -constant 2 1 [] [letter='d'] -dynamic 1 1 [letter='d'] [letter='d'] +constant 2 1 [] ['letter=\'d\''] +dynamic 1 1 ['letter=\'d\''] ['letter=\'d\''] diff --git a/test/sql/delta_kernel_rs/logging.test b/test/sql/delta_kernel_rs/logging.test index 68fd5b1..8beeb5d 100644 --- a/test/sql/delta_kernel_rs/logging.test +++ b/test/sql/delta_kernel_rs/logging.test @@ -16,7 +16,7 @@ SELECT * FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/basic_partitioned') # No kernel logging available yet: we need to set delta_kernel_logging=true query I -SELECT count(*) FROM duckdb_logs WHERE starts_with(type, 'delta.Kernel') +SELECT count(*) FROM duckdb_logs WHERE type='DeltaKernel' ---- 0 @@ -31,14 +31,18 @@ SELECT * FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/basic_partitioned') # Now we have log! query I -SELECT count(*) > 50 FROM duckdb_logs WHERE starts_with(type, 'delta.Kernel') +SELECT count(*) > 50 FROM duckdb_logs WHERE type='DeltaKernel' ---- true +# The log can be automatically parsed into structured data +statement ok +select log_level, target, file, line from duckdb_logs_parsed('DeltaKernel') + statement ok set delta_kernel_logging=true; statement error set delta_kernel_logging=false; ---- -Invalid Input Error: Can not disable 'delta_kernel_logging' after enabling it \ No newline at end of file +Invalid Input Error: Can not disable 'delta_kernel_logging' after enabling it diff --git a/test/sql/delta_kernel_rs/simple_with_dv.test b/test/sql/delta_kernel_rs/simple_with_dv.test index 044cfe4..60b9039 100644 --- a/test/sql/delta_kernel_rs/simple_with_dv.test +++ b/test/sql/delta_kernel_rs/simple_with_dv.test @@ -52,7 +52,7 @@ WHERE value > 3 # With filter: ensures the deletion vector is applied properly on top of pushed down filters with the file_row_number column query II -FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/table-with-dv-small/', file_row_number=1) +SELECT *, file_row_number FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/table-with-dv-small/') WHERE value > 3 ---- 4 4 @@ -63,7 +63,7 @@ WHERE value > 3 # With filter and a delta scan based extra constant column query II -FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/table-with-dv-small/', delta_file_number=1) +select value, delta_file_number FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/table-with-dv-small/') WHERE value > 3 ---- 4 7 @@ -75,7 +75,7 @@ WHERE value > 3 # With filter, delta-extension-originated const column, and parquet-originated const column query III SELECT value, parse_filename(filename)[-15:-1], delta_file_number -FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/table-with-dv-small/', delta_file_number=1, filename=1) +FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/table-with-dv-small/') WHERE value > 3 ---- 4 .snappy.parquet 7 @@ -87,7 +87,7 @@ WHERE value > 3 # With PRUNED filter, delta-extension-originated const column, and parquet-originated const column query II SELECT parse_filename(filename)[-15:-1], delta_file_number -FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/table-with-dv-small/', delta_file_number=1, filename=1) +FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/table-with-dv-small/') WHERE value > 3 ---- .snappy.parquet 7 @@ -99,7 +99,7 @@ WHERE value > 3 # With PRUNED filters, delta-extension-originated const column, and parquet-originated const column query I SELECT delta_file_number -FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/table-with-dv-small/', delta_file_number=1, filename=1) +FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/table-with-dv-small/') WHERE value > 3 and filename is not null ---- 7 @@ -111,7 +111,7 @@ WHERE value > 3 and filename is not null # Enabling the file_row_number option, but projecting it out query I SELECT value -FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/table-with-dv-small/', file_row_number=1) +FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/table-with-dv-small/') ---- 1 2 diff --git a/test/sql/delta_kernel_rs/simple_without_dv.test b/test/sql/delta_kernel_rs/simple_without_dv.test index 1282000..0526342 100644 --- a/test/sql/delta_kernel_rs/simple_without_dv.test +++ b/test/sql/delta_kernel_rs/simple_without_dv.test @@ -28,7 +28,7 @@ SELECT value, parse_filename(filename)[-15:-1] FROM delta_scan('${DELTA_KERNEL_T # FileRowNumer param (i.e. ParquetReader provided) query II -SELECT * FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/table-without-dv-small', file_row_number=1) +SELECT *, file_row_number FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/table-without-dv-small') ---- 0 0 1 1 diff --git a/test/sql/generated/column_mapping.test b/test/sql/generated/column_mapping.test new file mode 100644 index 0000000..74a7f7d --- /dev/null +++ b/test/sql/generated/column_mapping.test @@ -0,0 +1,54 @@ +# name: test/sql/generated/column_mapping.test +# description: Test column mapping & schema evolution +# group: [generated] + +require parquet + +require delta + +require-env GENERATED_DATA_AVAILABLE + +# evolution_simple: +# CREATE TABLE evolution_simple AS SELECT CAST(1 AS INT) AS a; +# ALTER TABLE evolution_simple ADD COLUMN b BIGINT;, +# INSERT INTO evolution_simple VALUES (2, 2); +query II +from parquet_scan('./data/generated/evolution_simple/delta_lake/**/*.parquet', union_by_name=1) order by a +---- +1 NULL +2 2 + +query II +from delta_scan('./data/generated/evolution_simple/delta_lake') order by a +---- +1 NULL +2 2 + +# evolution_column_change: +# CREATE TABLE evolution_column_change AS SELECT 'value1' AS a, 'value2' AS b; +# ALTER TABLE evolution_column_change DROP COLUMN b; +# INSERT INTO evolution_column_change VALUES ('value3'); +# ALTER TABLE evolution_column_change ADD COLUMN b BIGINT; +# INSERT INTO evolution_column_change VALUES ('value4', 5); + +query II +from delta_scan('./data/generated/evolution_column_change/delta_lake') order by a +---- +value1 NULL +value3 NULL +value4 5 + +query II +SELECT a,b from delta_scan('./data/generated/evolution_column_change/delta_lake') order by a +---- +value1 NULL +value3 NULL +value4 5 + +# Ensure we play ball with projections and generated columns, etc +query IIII +SELECT file_row_number, filename.substring(-8,8), b, a from delta_scan('./data/generated/evolution_column_change/delta_lake') order by a +---- +0 .parquet NULL value1 +0 .parquet NULL value3 +0 .parquet 5 value4 diff --git a/test/sql/generated/file_skipping_all_types.test b/test/sql/generated/file_skipping_all_types.test index 26aa405..6c56519 100644 --- a/test/sql/generated/file_skipping_all_types.test +++ b/test/sql/generated/file_skipping_all_types.test @@ -19,7 +19,7 @@ WHERE value2 > 2.5 and value3 < 3.5 ---- -analyzed_plan :.*File Filters:.*value3<3.5.*value2>2.5.*value1>0.5.*Scanning Files: 1/5.* +analyzed_plan :.*File Filters:.*value1>0.5.*value2>2.5.*value3<3.5.*Scanning Files: 1/5.* query III SELECT value1, value2, value3 @@ -68,7 +68,7 @@ WHERE value2 > 2 and value3 < 4 ---- -analyzed_plan :.*File Filters:.*value3<4.*value2>2.*value1>1.*Scanning Files: 1/5.* +analyzed_plan :.*File Filters:.*value1>1.*value2>2.*value3<4.*Scanning Files: 1/5.* query III SELECT value1, value2, value3 @@ -99,7 +99,7 @@ WHERE value2 = '2' and value3 = '2' ---- -analyzed_plan :.*File Filters:.*value3='2'.*value2='2'.*value1='2'.*Scanning Files: 1/5.* +analyzed_plan :.*File Filters:.*value1='2'.*value2='2'.*value3='2'.*Scanning Files: 1/5.* query III SELECT value1, value2, value3 diff --git a/test/sql/generated/file_skipping_dynamic.test b/test/sql/generated/file_skipping_dynamic.test index 9c2aaac..879765a 100644 --- a/test/sql/generated/file_skipping_dynamic.test +++ b/test/sql/generated/file_skipping_dynamic.test @@ -26,11 +26,11 @@ query IIIII SELECT filter_type, filters_before, filters_after, files_before, files_after FROM delta_filter_pushdown_log() ---- -constant [] [value3=1002, value2=102] 5 1 +constant [] ['value2=102', 'value3=1002'] 5 1 # Clear logging storage statement ok -set enable_logging=false;set logging_storage='stdout';set logging_storage='memory';set enable_logging=true; +pragma truncate_duckdb_logs; # Try other column query IIII @@ -44,11 +44,11 @@ query IIIII SELECT filter_type, filters_before, filters_after, files_before, files_after FROM delta_filter_pushdown_log() ---- -constant [] [value3=1002] 5 1 +constant [] ['value3=1002'] 5 1 # Clear logging storage statement ok -set enable_logging=false;set logging_storage='stdout';set logging_storage='memory';set enable_logging=true; +pragma truncate_duckdb_logs; ### Now we try dynamic pushdown query IIII @@ -62,11 +62,11 @@ query IIIII SELECT filter_type, filters_before, filters_after, files_before, files_after FROM delta_filter_pushdown_log() ---- -dynamic [] [value2=102] 5 1 +dynamic [] ['value2=102'] 5 1 # Clear logging storage statement ok -set enable_logging=false;set logging_storage='stdout';set logging_storage='memory';set enable_logging=true; +pragma truncate_duckdb_logs; # Dynamic pushdown, other column query IIII @@ -80,11 +80,11 @@ query IIIII SELECT filter_type, filters_before, filters_after, files_before, files_after FROM delta_filter_pushdown_log() ---- -dynamic [] [value1=13] 5 1 +dynamic [] ['value1=13'] 5 1 # Clear logging storage statement ok -set enable_logging=false;set logging_storage='stdout';set logging_storage='memory';set enable_logging=true; +pragma truncate_duckdb_logs; ### Try partition column query IIII @@ -99,11 +99,11 @@ query IIIII SELECT filter_type, filters_before, filters_after, files_before, files_after FROM delta_filter_pushdown_log() ---- -dynamic [] [part=2] 5 5 +dynamic [] ['part=2'] 5 5 # Clear logging storage statement ok -set enable_logging=false;set logging_storage='stdout';set logging_storage='memory';set enable_logging=true; +pragma truncate_duckdb_logs; # Now let's get funky: a dynamic join filter plus a constant filter query IIII @@ -119,12 +119,12 @@ SELECT filter_type, filters_before, filters_after, files_before, files_after FROM delta_filter_pushdown_log() ORDER BY filter_type ---- -constant [] [value1=13] 5 1 -dynamic [value1=13] [value2=103, value1=13] 1 1 +constant [] ['value1=13'] 5 1 +dynamic ['value1=13'] ['value1=13', 'value2=103'] 1 1 # Clear logging storage statement ok -set enable_logging=false;set logging_storage='stdout';set logging_storage='memory';set enable_logging=true; +pragma truncate_duckdb_logs; # Slightly weird case here: pushing down an identical dynamic filter and constant filter will make it show up twice with the second doing nothing query IIII @@ -140,12 +140,12 @@ SELECT filter_type, filters_before, filters_after, files_before, files_after FROM delta_filter_pushdown_log() ORDER BY filter_type ---- -constant [] [value1=13] 5 1 -dynamic [value1=13] [value1=13 AND value1=13 AND value1=13] 1 1 +constant [] ['value1=13'] 5 1 +dynamic ['value1=13'] ['value1=13 AND value1=13 AND value1=13'] 1 1 # Clear logging storage statement ok -set enable_logging=false;set logging_storage='stdout';set logging_storage='memory';set enable_logging=true; +pragma truncate_duckdb_logs; # Now we control the output of the filtered files by changing the delta_scan_explain_files_filtered setting statement ok @@ -171,5 +171,5 @@ SELECT filter_type, filters_before, filters_after, files_before, files_after FROM delta_filter_pushdown_log() ORDER BY filter_type ---- -constant [] [value1=12] NULL NULL -dynamic [] [value1=12] NULL NULL \ No newline at end of file +constant [] ['value1=12'] NULL NULL +dynamic [] ['value1=12'] NULL NULL diff --git a/test/sql/generated/file_skipping_params.test b/test/sql/generated/file_skipping_params.test index 7d96f4e..9300cf5 100644 --- a/test/sql/generated/file_skipping_params.test +++ b/test/sql/generated/file_skipping_params.test @@ -29,7 +29,7 @@ constant dynamic statement ok -set enable_logging=false;set logging_storage='stdout';set logging_storage='memory';set enable_logging=true; +pragma truncate_duckdb_logs; # delta_scan: constant only filters query IIII @@ -45,7 +45,7 @@ SELECT filter_type FROM delta_filter_pushdown_log() ORDER BY filter_type constant statement ok -set enable_logging=false;set logging_storage='stdout';set logging_storage='memory';set enable_logging=true; +pragma truncate_duckdb_logs; # delta_scan: dynamic only filters query IIII @@ -61,7 +61,7 @@ SELECT filter_type FROM delta_filter_pushdown_log() ORDER BY filter_type dynamic statement ok -set enable_logging=false;set logging_storage='stdout';set logging_storage='memory';set enable_logging=true; +pragma truncate_duckdb_logs; # delta_scan: no filters query IIII @@ -76,7 +76,7 @@ SELECT filter_type FROM delta_filter_pushdown_log() ORDER BY filter_type ---- statement ok -set enable_logging=false;set logging_storage='stdout';set logging_storage='memory';set enable_logging=true; +pragma truncate_duckdb_logs; # attach: default = all filters statement ok @@ -96,7 +96,7 @@ constant dynamic statement ok -set enable_logging=false;set logging_storage='stdout';set logging_storage='memory';set enable_logging=true; +pragma truncate_duckdb_logs; # attach: pushdown mode can be configured statement ok @@ -115,4 +115,4 @@ SELECT filter_type FROM delta_filter_pushdown_log() ORDER BY filter_type dynamic statement ok -set enable_logging=false;set logging_storage='stdout';set logging_storage='memory';set enable_logging=true; \ No newline at end of file +pragma truncate_duckdb_logs; diff --git a/test/sql/generated/late_materialization.test b/test/sql/generated/late_materialization.test new file mode 100644 index 0000000..9468d90 --- /dev/null +++ b/test/sql/generated/late_materialization.test @@ -0,0 +1,27 @@ +# name: test/sql/generated/late_materialization.test +# description: test the late materialization optimization + +require parquet + +require delta + +require-env GENERATED_DATA_AVAILABLE + +statement ok +pragma enable_verification + +statement ok +CREATE VIEW lineitem AS FROM delta_scan('data/generated/delta_rs_tpch_sf0_01/lineitem/delta_lake'); + +statement ok +FROM lineitem ORDER BY l_orderkey DESC LIMIT 5; + +statement ok +ATTACH 'data/generated/delta_rs_tpch_sf0_01/lineitem/delta_lake' as dt (TYPE delta); + +# TODO: figure out why the serialization verification breaks here +statement ok +pragma disable_verification + +statement ok +FROM dt ORDER BY l_orderkey DESC LIMIT 5; diff --git a/test/sql/generated/partitioned_large.test b/test/sql/generated/partitioned_large.test index 753c4c1..3641559 100644 --- a/test/sql/generated/partitioned_large.test +++ b/test/sql/generated/partitioned_large.test @@ -10,7 +10,7 @@ require-env GENERATED_DATA_AVAILABLE statement ok CREATE VIEW t AS SELECT part::INT as part, sum(i) as value - FROM delta_scan('./data/generated/simple_partitioned_large/delta_lake', delta_file_number=1) + FROM delta_scan('./data/generated/simple_partitioned_large/delta_lake') GROUP BY part ORDER BY part @@ -47,7 +47,7 @@ FROM t # We can disable pushing down partition information statement ok CREATE VIEW t2 AS SELECT part::INT as part, sum(i) as value - FROM delta_scan('./data/generated/simple_partitioned_large/delta_lake', delta_file_number=1, pushdown_partition_info=0) + FROM delta_scan('./data/generated/simple_partitioned_large/delta_lake', pushdown_partition_info=0) GROUP BY part ORDER BY part diff --git a/test/sql/generated/schema_evolution.test b/test/sql/generated/schema_evolution.test new file mode 100644 index 0000000..4098a37 --- /dev/null +++ b/test/sql/generated/schema_evolution.test @@ -0,0 +1,35 @@ +# name: test/sql/generated/schema_evolution.test +# description: Test schema evolution +# group: [generated] + +require parquet + +require delta + +require-env GENERATED_DATA_AVAILABLE + +query IIIIIII +SELECT * FROM delta_scan('./data/generated/evolution_type_widening/delta_lake') +---- +42 42.42 42 2042-01-01 42.42 42 42 +42 42.42 42 2042-01-01 42.42 42 42 + +# # Integer column is currently only one we evolve here +# # TODO: once spark supports more type widening stuff, add tests for it +query II +SELECT typeof(integer), integer FROM delta_scan('./data/generated/evolution_type_widening/delta_lake') +---- +SMALLINT 42 +SMALLINT 42 + +query I +SELECT * FROM delta_scan('./data/generated/evolution_struct_field_modification/delta_lake') ORDER BY top_level_column.struct_field_a +---- +{'struct_field_a': value1, 'struct_field_b': value2, 'struct_field_c': NULL} +{'struct_field_a': value3, 'struct_field_b': value4, 'struct_field_c': value5} + +query I +SELECT * FROM delta_scan('./data/generated/evolution_struct_field_modification_nested/delta_lake') ORDER BY top_level_column.top_level_struct.struct_field_a +---- +{'top_level_struct': {'struct_field_a': value1, 'struct_field_b': value2, 'struct_field_c': NULL}} +{'top_level_struct': {'struct_field_a': value3, 'struct_field_b': value4, 'struct_field_c': value5}} diff --git a/test/sql/generated/simple_partitioned.test b/test/sql/generated/simple_partitioned.test index 47e1ed8..2693757 100644 --- a/test/sql/generated/simple_partitioned.test +++ b/test/sql/generated/simple_partitioned.test @@ -8,23 +8,6 @@ require delta require-env GENERATED_DATA_AVAILABLE -# With a projection and delta constant column -query III -SELECT delta_file_number, part, i -FROM delta_scan('./data/generated/simple_partitioned/delta_lake', delta_file_number=1) -ORDER BY i ----- -7 0 0 -7 1 1 -7 0 2 -7 1 3 -7 0 4 -7 1 5 -7 0 6 -7 1 7 -7 0 8 -7 1 9 - # Simplest case query II FROM delta_scan('./data/generated/simple_partitioned/delta_lake/') @@ -41,6 +24,23 @@ ORDER BY i 8 0 9 1 +# With a projection and delta constant column +query III +SELECT delta_file_number, part, i +FROM delta_scan('./data/generated/simple_partitioned/delta_lake') +ORDER BY i +---- +7 0 0 +7 1 1 +7 0 2 +7 1 3 +7 0 4 +7 1 5 +7 0 6 +7 1 7 +7 0 8 +7 1 9 + # Partitioned with a plus symbol to test encoding query II FROM delta_scan('./data/generated/simple_partitioned_with_url_encoding/delta_lake/') @@ -77,7 +77,7 @@ ORDER BY i # With a projection and delta constant column query III SELECT delta_file_number, part, i -FROM delta_scan('./data/generated/simple_partitioned/delta_lake', delta_file_number=1) +FROM delta_scan('./data/generated/simple_partitioned/delta_lake') ORDER BY i ---- 7 0 0 @@ -94,7 +94,7 @@ ORDER BY i # different permutation query III SELECT part, delta_file_number, i -FROM delta_scan('./data/generated/simple_partitioned/delta_lake', delta_file_number=1) +FROM delta_scan('./data/generated/simple_partitioned/delta_lake') ORDER BY i ---- 0 7 0 @@ -111,7 +111,7 @@ ORDER BY i # different permutation again query III SELECT part, i, delta_file_number -FROM delta_scan('./data/generated/simple_partitioned/delta_lake', delta_file_number=1) +FROM delta_scan('./data/generated/simple_partitioned/delta_lake') ORDER BY i ---- 0 0 7 @@ -128,7 +128,7 @@ ORDER BY i # With a projection and both a base multifilereader column and the file_row_number option query IIII SELECT parse_filename(filename)[-8:-1], part, i, file_row_number -FROM delta_scan('./data/generated/simple_partitioned/delta_lake', file_row_number=1, filename=1) +FROM delta_scan('./data/generated/simple_partitioned/delta_lake') ORDER BY i ---- .parquet 0 0 0 @@ -145,7 +145,7 @@ ORDER BY i # Final boss: add the delta_file_number to the mix query IIIII SELECT delta_file_number, parse_filename(filename)[-8:-1], part, i, file_row_number -FROM delta_scan('./data/generated/simple_partitioned/delta_lake', file_row_number=1, filename=1, delta_file_number=1) +FROM delta_scan('./data/generated/simple_partitioned/delta_lake') ORDER BY i ---- 7 .parquet 0 0 0 @@ -158,3 +158,20 @@ ORDER BY i 7 .parquet 1 7 3 7 .parquet 0 8 4 7 .parquet 1 9 4 + +# Compatibility check with olde custom options +query IIII +SELECT parse_filename(filename)[-8:-1], part, i, file_row_number +FROM delta_scan('./data/generated/simple_partitioned/delta_lake', filename=1, file_row_number=1) +ORDER BY i +---- +.parquet 0 0 0 +.parquet 1 1 0 +.parquet 0 2 1 +.parquet 1 3 1 +.parquet 0 4 2 +.parquet 1 5 2 +.parquet 0 6 3 +.parquet 1 7 3 +.parquet 0 8 4 +.parquet 1 9 4 diff --git a/test/sql/generated/tpch.test_slow b/test/sql/generated/tpch.test_slow index 236a37a..fb7d35c 100644 --- a/test/sql/generated/tpch.test_slow +++ b/test/sql/generated/tpch.test_slow @@ -16,10 +16,10 @@ require-env GENERATED_DATA_AVAILABLE foreach table customer lineitem nation orders part partsupp region supplier statement ok -create view ${table}_delta as from delta_scan('./data/generated/tpch_sf0_01/${table}/delta_lake'); +create view ${table}_delta as from delta_scan('./data/generated/tpch_sf1/${table}/delta_lake'); statement ok -create view ${table}_parquet as from parquet_scan('./data/generated/tpch_sf0_01/${table}/parquet/**/*.parquet'); +create view ${table}_parquet as from parquet_scan('./data/generated/tpch_sf1/${table}/parquet/**/*.parquet'); # NOTE: switch this to _parquet to easily compare plans while debugging statement ok @@ -32,7 +32,7 @@ loop i 1 9 query I PRAGMA tpch(${i}) ---- -:duckdb/extension/tpch/dbgen/answers/sf0.01/q0${i}.csv +:duckdb/extension/tpch/dbgen/answers/sf1/q0${i}.csv endloop @@ -41,7 +41,7 @@ loop i 10 23 query I PRAGMA tpch(${i}) ---- -:duckdb/extension/tpch/dbgen/answers/sf0.01/q${i}.csv +:duckdb/extension/tpch/dbgen/answers/sf1/q${i}.csv endloop diff --git a/test/sql/main/test_error_messages.test b/test/sql/main/test_error_messages.test new file mode 100644 index 0000000..ce0d926 --- /dev/null +++ b/test/sql/main/test_error_messages.test @@ -0,0 +1,49 @@ +# name: test/sql/main/test_error_messages.test +# description: Test that clean error messages are generated +# group: [delta_generated] + +require parquet + +require delta + +require httpfs + +statement error +FROM delta_scan('./doesnt_exist_and_never_will_exist_i_hope'); +---- +IO Error: DeltKernel InvalidTableLocationError (28): Invalid table location: Path does not exist: + +statement ok +ATTACH './doesnt_exist_and_never_will_exist_i_hope' AS s1 (TYPE delta); + +statement error +SHOW ALL TABLES; +---- +IO Error: DeltKernel InvalidTableLocationError (28): Invalid table location: Path does not exist: + +statement ok +DETACH s1; + +statement ok +CREATE SECRET s1 ( + TYPE S3, + ENDPOINT 'http://localhost:1337' +) + +statement error +FROM delta_scan('s3://bucket/doesnt/exist/either'); +---- +IO Error: DeltKernel ObjectStoreError (8): Error interacting with object store: Generic S3 error: Error after + +statement error +FROM delta_scan('duck://bucket/doesnt/exist/either'); +---- +IO Error: DeltKernel ObjectStoreError (8): Error interacting with object store: Generic URL error: Unable to recognise URL + +statement ok +ATTACH 's3://bucket/doesnt/exist/either' AS s1 (TYPE delta); + +statement error +SHOW ALL TABLES; +---- +IO Error: DeltKernel ObjectStoreError (8): Error interacting with object store: Generic S3 error: Error after \ No newline at end of file diff --git a/test/sql/main/test_expression.test b/test/sql/main/test_expression.test index 8eca5d4..1e458f3 100644 --- a/test/sql/main/test_expression.test +++ b/test/sql/main/test_expression.test @@ -6,9 +6,6 @@ require parquet require delta -# TODO still broken: -# - Decimal -# - StructExpression query I SELECT unnest(get_delta_test_expression()) ---- @@ -29,11 +26,11 @@ false '1970-01-01 00:00:00.0001'::TIMESTAMP '1970-02-02'::DATE '\x00\x00\xDE\xAD\xBE\xEF\xCA\xFE'::BLOB -0.0000000042 +0.001 NULL struct_pack("'top'" := struct_pack("'a'" := 500, "'b'" := list_value(5, 0))) list_value(5, 0) -42 +struct_pack((5 OR 20)) not((col is NULL)) (0 IN (0)) (0 + 0) diff --git a/vcpkg.json b/vcpkg.json index 2f3c27d..fed3c2c 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -1,13 +1,37 @@ { - "dependencies": [ - "azure-identity-cpp", - "azure-storage-blobs-cpp", - "azure-storage-files-datalake-cpp", - "openssl", - "zlib", - { - "name": "aws-sdk-cpp", - "features": [ "sts", "sso" ] - } - ] + "dependencies": [ + "vcpkg-cmake", + "azure-identity-cpp", + "azure-storage-blobs-cpp", + "azure-storage-files-datalake-cpp", + "openssl", + "zlib", + { + "name": "aws-sdk-cpp", + "features": [ + "sts", + "sso", + "identity-management" + ] + } + ], + "vcpkg-configuration": { + "registries": [ + { + "kind": "git", + "repository": "https://github.com/duckdb/vcpkg-duckdb-ports", + "baseline": "0f9bf648ba1ee29291890a1ca9a49a80bba017eb", + "packages": [ + "vcpkg-cmake" + ] + } + ] + }, + "builtin-baseline": "5e5d0e1cd7785623065e77eff011afdeec1a3574", + "overrides": [ + { + "name": "openssl", + "version": "3.0.8" + } + ] } \ No newline at end of file