8000 Question DuckLake support reading data using Spark · Issue #78 · duckdb/ducklake · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Question DuckLake support reading data using Spark #78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
soumilshah1995 opened this issue May 28, 2025 · 5 comments
Closed

Question DuckLake support reading data using Spark #78

soumilshah1995 opened this issue May 28, 2025 · 5 comments

Comments

@soumilshah1995
Copy link

Does DuckLake support reading data using Spark?
I'm particularly interested in understanding the available compute options. As data volume grows, will it scale effectively—specifically in terms of reading data efficiently and performing aggregations?

@Mytherin
Copy link
Contributor

Spark currently does not yet have support for reading DuckLake tables - but there is no technical reason this is not possible. It just hasn't been implemented yet.

We are also planning to add support for exporting metadata to Iceberg, which will then allow that to be used from Spark.

@staticlibs
Copy link

Copying from Discord just for the record, the following way to access DuckLake from Spark is supposed to work once #45 is fixed:

CREATE TABLE test1 USING org.apache.spark.sql.jdbc OPTIONS (
    driver "org.duckdb.DuckDBDriver",
    url "jdbc:duckdb:",
    prepareQuery "ATTACH 'ducklake:/path/to/lake1.db' as lake1;",
    dbtable "lake1.nl_train_stations"
);
SELECT * FROM test1;

@staticlibs
Copy link

@soumilshah1995

The following Spark syntax support (new ducklake JDBC option) is proposed for 1.3.1:

CREATE TABLE test1 USING org.apache.spark.sql.jdbc OPTIONS (
    driver "org.duckdb.DuckDBDriver",
    url "jdbc:duckdb:",
    jdbc_stream_results "true",
    ducklake "postgres:postgresql://user:pwd@127.0.0.1:5432/lake1",
    dbtable "lake1.nl_train_stations"
);

SELECT COUNT(*) from test1;

SELECT * FROM test1 WHERE name_long = 'Johan Cruijff ArenA';
[...]
578
Time taken: 3.224 seconds, Fetched 1 row(s)
41	ASB	8400074	Bijlmer A	Bijlmer ArenA	Johan Cruijff ArenA	amsterdam-bijlmer-arena	NL	knooppuntStoptreinstation	52.3122215271	4.9469442367554
Time taken: 0.785 seconds, Fetched 1 row(s)

Or with Spark Java API:

Properties config = new Properties();
config.put("ducklake", "postgres:postgresql://user:pwd@127.0.0.1:5432/lake1");
config.put("jdbc_stream_results", "true");
Dataset<Row> jdbc1 = spark.read()
        .jdbc("jdbc:duckdb:", "lake1.nl_train_stations", config);
jdbc1.show();

will it scale effectively—specifically in terms of reading data efficiently and performing aggregations?

In this example Spark is accessing DuckLake data by loading DuckDB instance in-process and passing SQL queries to it. So reading parquet data will be as effective as it can be in plain DuckDB (with filter/projection pushdowns). Additionally Spark JDBC supports partitionColumn options, that "describe how to partition the table when reading in parallel from multiple workers". So configuring the DuckLake access like this:

CREATE TABLE test1 USING org.apache.spark.sql.jdbc OPTIONS (
    driver "org.duckdb.DuckDBDriver",
    url "jdbc:duckdb:",
    jdbc_stream_results "true",
    ducklake "postgres:postgresql://postgres:postgres@127.0.0.1:5432/lake1",
    dbtable "lake1.nl_train_stations",

    partitionColumn "id",
    lowerBound "1",
    upperBound "100",
    numPartitions "4"
);

SELECT * FROM test1;

will make Spark to issue the following SQL queries to DuckDB:

SELECT "id","code","uic","name_short","name_medium","name_long","slug","country","type","geo_lat","geo_lng" FROM lake1.nl_train_stations  WHERE "id" >= 75    
SELECT "id","code","uic","name_short","name_medium","name_long","slug","country","type","geo_lat","geo_lng" FROM lake1.nl_train_stations  WHERE "id" >= 51 AND "id" < 75    
SELECT "id","code","uic","name_short","name_medium","name_long","slug","country","type","geo_lat","geo_lng" FROM lake1.nl_train_stations  WHERE "id" >= 27 AND "id" < 51

These queries, which DuckLake translates to parquet fetching, are run by Spark in parallel, so they allow some control over read scaling.

Result set streaming is limited in DuckDB (only a single streaming result set per connection), but it works fine with Spark, that opens multiple connections, so it is as effective as it can be on DuckDB/DuckLake side.

About the aggregates, Spark option pushDownAggregate won't be effective with DuckDB (only supported with Postgres and some other DBs), instead aggregates can be specified in native SQL like this:

CREATE OR REPLACE TEMPORARY VIEW avg1 USING org.apache.spark.sql.jdbc OPTIONS (
    driver "org.duckdb.DuckDBDriver",
    url "jdbc:duckdb:",
    jdbc_stream_results "true",
    ducklake "postgres:postgresql://postgres:postgres@127.0.0.1:5432/lake1",
    dbtable "(SELECT AVG(id) FROM lake1.nl_train_stations) AS a"
);

SELECT * FROM avg1;

Another option for aggregates is to use a persistent local DuckDB DB (instead of default :memory: one) and create native views over DuckLake queries in it, with DuckDB CLI:

$ duckdb /home/alex/projects/duck/lake/local.db
D ATTACH 'ducklake:postgres:postgresql://postgres:postgres@127.0.0.1:5432/lake1';
D CREATE VIEW avg1_native AS SELECT AVG(id) AS a FROM lake1.nl_train_stations;

And then use this view from Spark:

CREATE OR REPLACE TEMPORARY VIEW avg1 USING org.apache.spark.sql.jdbc OPTIONS (
    driver "org.duckdb.DuckDBDriver",
    url "jdbc:duckdb:/home/alex/projects/duck/lake/local.db",
    jdbc_stream_results "true",
    ducklake "postgres:postgresql://postgres:postgres@127.0.0.1:5432/lake1",
    dbtable "local.avg1_native"
);

SELECT * FROM avg1;

All this is completely different from exporting metadata to Iceberg, that is mentioned above, that will make Spark to read Iceberg metadata and fetch parquet files by itself. Just I hope that Spark JDBC can be robust too with DuckLake, and it currently looks very close to be fully working.

@soumilshah1995
Copy link
Author

Thanks a lot

@staticlibs
Copy link

Just for the record, a PySpark script that can be used (with 1.3.1) to export all DuckLake tables to Spark:

duckdb_url = "jdbc:duckdb:;jdbc_stream_results=true;allow_unsigned_extensions=true;"
ducklake_path = "postgres:postgresql://postgres:postgres@127.0.0.1:5432/lake1"

spark.sql(f"""
  CREATE OR REPLACE TEMPORARY VIEW ducklake_tables
  USING jdbc
  OPTIONS (
    url "{duckdb_url}",
    ducklake "{ducklake_path}",
    dbtable "information_schema.tables"
  )
""")

table_names = spark.sql("""
  SELECT table_name FROM ducklake_tables WHERE table_catalog = 'ducklake'
""").collect()

for row in table_names:
  tname = row[0]
  spark.sql(f"DROP TABLE IF EXISTS {tname}")
  spark.sql(f"""
    CREATE TABLE {tname}
    USING jdbc
    OPTIONS (
      url "{duckdb_url}",
      ducklake "{ducklake_path}",
      dbtable "{tname}"
    )
  """)

It creates Spark tables as linked JDBC tables, in Spark SQL:

> SHOW TABLES;
nl_train_stations
...
Time taken: 0.323 seconds, Fetched 2 row(s)

Running Spark SQL query like this (data from this example):

SELECT * FROM nl_train_stations WHERE name_long LIKE 'Johan Cruijff%';

is translated to this DuckLake query:

SELECT "id","code","uic","name_short","name_medium","name_long","slug","country","type","geo_lat","geo_lng" FROM nl_train_stations  WHERE ("name_long" IS NOT NULL) AND ("name_long" LIKE 'Johan Cruijff%' ESCAPE '\')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants
0