-
Notifications
You must be signed in to change notification settings - Fork 44
Question DuckLake support reading data using Spark #78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Spark currently does not yet have support for reading DuckLake tables - but there is no technical reason this is not possible. It just hasn't been implemented yet. We are also planning to add support for exporting metadata to Iceberg, which will then allow that to be used from Spark. |
Copying from Discord just for the record, the following way to access DuckLake from Spark is supposed to work once #45 is fixed: CREATE TABLE test1 USING org.apache.spark.sql.jdbc OPTIONS (
driver "org.duckdb.DuckDBDriver",
url "jdbc:duckdb:",
prepareQuery "ATTACH 'ducklake:/path/to/lake1.db' as lake1;",
dbtable "lake1.nl_train_stations"
);
SELECT * FROM test1; |
The following Spark syntax support (new CREATE TABLE test1 USING org.apache.spark.sql.jdbc OPTIONS (
driver "org.duckdb.DuckDBDriver",
url "jdbc:duckdb:",
jdbc_stream_results "true",
ducklake "postgres:postgresql://user:pwd@127.0.0.1:5432/lake1",
dbtable "lake1.nl_train_stations"
);
SELECT COUNT(*) from test1;
SELECT * FROM test1 WHERE name_long = 'Johan Cruijff ArenA';
Or with Spark Java API: Properties config = new Properties();
config.put("ducklake", "postgres:postgresql://user:pwd@127.0.0.1:5432/lake1");
config.put("jdbc_stream_results", "true");
Dataset<Row> jdbc1 = spark.read()
.jdbc("jdbc:duckdb:", "lake1.nl_train_stations", config);
jdbc1.show();
In this example Spark is accessing DuckLake data by loading DuckDB instance in-process and passing SQL queries to it. So reading parquet data will be as effective as it can be in plain DuckDB (with filter/projection pushdowns). Additionally Spark JDBC supports CREATE TABLE test1 USING org.apache.spark.sql.jdbc OPTIONS (
driver "org.duckdb.DuckDBDriver",
url "jdbc:duckdb:",
jdbc_stream_results "true",
ducklake "postgres:postgresql://postgres:postgres@127.0.0.1:5432/lake1",
dbtable "lake1.nl_train_stations",
partitionColumn "id",
lowerBound "1",
upperBound "100",
numPartitions "4"
);
SELECT * FROM test1; will make Spark to issue the following SQL queries to DuckDB: SELECT "id","code","uic","name_short","name_medium","name_long","slug","country","type","geo_lat","geo_lng" FROM lake1.nl_train_stations WHERE "id" >= 75
SELECT "id","code","uic","name_short","name_medium","name_long","slug","country","type","geo_lat","geo_lng" FROM lake1.nl_train_stations WHERE "id" >= 51 AND "id" < 75
SELECT "id","code","uic","name_short","name_medium","name_long","slug","country","type","geo_lat","geo_lng" FROM lake1.nl_train_stations WHERE "id" >= 27 AND "id" < 51 These queries, which DuckLake translates to parquet fetching, are run by Spark in parallel, so they allow some control over read scaling. Result set streaming is limited in DuckDB (only a single streaming result set per connection), but it works fine with Spark, that opens multiple connections, so it is as effective as it can be on DuckDB/DuckLake side. About the aggregates, Spark option CREATE OR REPLACE TEMPORARY VIEW avg1 USING org.apache.spark.sql.jdbc OPTIONS (
driver "org.duckdb.DuckDBDriver",
url "jdbc:duckdb:",
jdbc_stream_results "true",
ducklake "postgres:postgresql://postgres:postgres@127.0.0.1:5432/lake1",
dbtable "(SELECT AVG(id) FROM lake1.nl_train_stations) AS a"
);
SELECT * FROM avg1; Another option for aggregates is to use a persistent local DuckDB DB (instead of default $ duckdb /home/alex/projects/duck/lake/local.db
D ATTACH 'ducklake:postgres:postgresql://postgres:postgres@127.0.0.1:5432/lake1';
D CREATE VIEW avg1_native AS SELECT AVG(id) AS a FROM lake1.nl_train_stations; And then use this view from Spark: CREATE OR REPLACE TEMPORARY VIEW avg1 USING org.apache.spark.sql.jdbc OPTIONS (
driver "org.duckdb.DuckDBDriver",
url "jdbc:duckdb:/home/alex/projects/duck/lake/local.db",
jdbc_stream_results "true",
ducklake "postgres:postgresql://postgres:postgres@127.0.0.1:5432/lake1",
dbtable "local.avg1_native"
);
SELECT * FROM avg1; All this is completely different from exporting metadata to Iceberg, that is mentioned above, that will make Spark to read Iceberg metadata and fetch parquet files by itself. Just I hope that Spark JDBC can be robust too with DuckLake, and it currently looks very close to be fully working. |
Thanks a lot
Just for the record, a PySpark script that can be used (with 1.3.1) to export all DuckLake tables to Spark: duckdb_url = "jdbc:duckdb:;jdbc_stream_results=true;allow_unsigned_extensions=true;"
ducklake_path = "postgres:postgresql://postgres:postgres@127.0.0.1:5432/lake1"
spark.sql(f"""
CREATE OR REPLACE TEMPORARY VIEW ducklake_tables
USING jdbc
OPTIONS (
url "{duckdb_url}",
ducklake "{ducklake_path}",
dbtable "information_schema.tables"
)
""")
table_names = spark.sql("""
SELECT table_name FROM ducklake_tables WHERE table_catalog = 'ducklake'
""").collect()
for row in table_names:
tname = row[0]
spark.sql(f"DROP TABLE IF EXISTS {tname}")
spark.sql(f"""
CREATE TABLE {tname}
USING jdbc
OPTIONS (
url "{duckdb_url}",
ducklake "{ducklake_path}",
dbtable "{tname}"
)
""") It creates Spark tables as linked JDBC tables, in Spark SQL: > SHOW TABLES;
nl_train_stations
...
Time taken: 0.323 seconds, Fetched 2 row(s) Running Spark SQL query like this (data from this example): SELECT * FROM nl_train_stations WHERE name_long LIKE 'Johan Cruijff%'; is translated to this DuckLake query: SELECT "id","code","uic","name_short","name_medium","name_long","slug","country","type","geo_lat","geo_lng" FROM nl_train_stations WHERE ("name_long" IS NOT NULL) AND ("name_long" LIKE 'Johan Cruijff%' ESCAPE '\') |
Does DuckLake support reading data using Spark?
I'm particularly interested in understanding the available compute options. As data volume grows, will it scale effectively—specifically in terms of reading data efficiently and performing aggregations?
The text was updated successfully, but these errors were encountered: