Skip to content

Commit

Permalink
Spark Integration
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Jul 15, 2024
1 parent a6675cc commit 0510d8d
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 0 deletions.
4 changes: 4 additions & 0 deletions docs/spark-integration/.pages
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
title: Spark Integration
nav:
- index.md
- ...
27 changes: 27 additions & 0 deletions docs/spark-integration/UCProxy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# UCProxy

`UCProxy` is a`TableCatalog` ([Spark SQL]({{ book.spark_sql }}/connector/catalog/TableCatalog/)).

## TablesApi { #tablesApi }

`UCProxy` creates a `TablesApi` client when requested to [initialize](#initialize).

## Initialize TableCatalog { #initialize }

??? note "CatalogPlugin"

```scala
initialize(
name: String,
options: CaseInsensitiveStringMap): Unit
```

`initialize` is part of the `CatalogPlugin` ([Spark SQL]({{ book.spark_sql }}/connector/catalog/CatalogPlugin#initialize)) abstraction.

`initialize` asserts that `uri` option is specified.

`initialize` creates an [ApiClient](../client/ApiClient.md) based on the `uri` option (the host, the port and the scheme).

`initialize` tells the `ApiClient` to use `Authorization` header for every request based on `token` option, if defined.

In the end, `initialize` creates this [TablesApi](#tablesApi).
28 changes: 28 additions & 0 deletions docs/spark-integration/UCSingleCatalog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# UCSingleCatalog

`UCSingleCatalog` is a `TableCatalog` ([Spark SQL]({{ book.spark_sql }}/connector/catalog/TableCatalog/)).

## DeltaCatalog { #deltaCatalog }

`UCSingleCatalog` creates a `DeltaCatalog` ([Delta Lake]({{ book.delta }}/DeltaCatalog)) when requested to [initialize](#initialize).

`DeltaCatalog` is a `DelegatingCatalogExtension` ([Spark SQL]({{ book.spark_sql }}/connector/catalog/DelegatingCatalogExtension)) that is supposed to delegate to a [UCProxy](UCProxy.md).

## Initialize TableCatalog { #initialize }

??? note "CatalogPlugin"

```scala
initialize(
name: String,
options: CaseInsensitiveStringMap): Unit
```

`initialize` is part of the `CatalogPlugin` ([Spark SQL]({{ book.spark_sql }}/connector/catalog/CatalogPlugin#initialize)) abstraction.

`initialize` creates a [UCProxy](UCProxy.md) to [initialize](UCProxy.md#initialize).

`initialize` creates a `DeltaCatalog` ([Delta Lake]({{ book.delta }}/DeltaCatalog)) that is told to delegate to the `UCProxy` for unsupported `TableCatalog` features.

!!! note "Dynamic Loading"
`initialize` expects that `org.apache.spark.sql.delta.catalog.DeltaCatalog` class is available on the CLASSPATH only (not at build time) and loads it dynamically.
86 changes: 86 additions & 0 deletions docs/spark-integration/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Spark Integration

As of [this commit]({{ uc.commit }}/8fb49245e2aa2126901f7f69016f6762b083b238), Unity Catalog supports Apache Spark {{ spark.version }} and Delta Lake {{ delta.version }} using [UCSingleCatalog](UCSingleCatalog.md) and [UCProxy](UCProxy.md).

The following features are supported:

* Read/write (partitioned) external parquet tables
* Read/write (partitioned) external delta tables
* Support the path table syntax of delta tables

## Demo

Download Apache Spark {{ spark.version }} from [Preview release of Spark 4.0](https://spark.apache.org/news/spark-4.0.0-preview1.html).

``` shell
build/sbt clean package publishLocal
```

!!! note
You have to change `build.sbt` to build the Spark Integration module. A fix is coming.

``` shell
./bin/spark-shell \
--conf spark.jars.ivy=$HOME/.ivy2 \
--packages \
io.delta:delta-spark_2.13:{{ delta.version }},io.unitycatalog:unitycatalog-spark:0.1.0-SNAPSHOT \
--conf spark.sql.catalog.unity=io.unitycatalog.connectors.spark.UCSingleCatalog \
--conf spark.sql.catalog.unity.uri=http://localhost:8080 \
--conf spark.sql.catalog.unity.token=not_used_token
```

```text
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 4.0.0-preview1
/_/
Using Scala version 2.13.14 (OpenJDK 64-Bit Server VM, Java 17.0.11)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context Web UI available at http://192.168.68.100:4040
Spark context available as 'sc' (master = local[*], app id = local-1720981549342).
Spark session available as 'spark'.
```

```scala
assert(spark.version == "{{ spark.version }}", "Unity Catalog supports Apache Spark {{ spark.version }}")
assert(io.delta.VERSION == "{{ delta.version }}", "Unity Catalog supports Delta Lake {{ delta.version }}")
```

``` text
scala> spark.catalog.listTables
...
scala.NotImplementedError: an implementation is missing
at scala.Predef$.$qmark$qmark$qmark(Predef.scala:344)
at io.unitycatalog.connectors.spark.UCSingleCatalog.listTables(UCSingleCatalog.scala:37)
at org.apache.spark.sql.execution.datasources.v2.ShowTablesExec.run(ShowTablesExec.scala:40)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
...
```

``` text
scala> spark.catalog.tableExists("unity.default.numbers")
org.apache.spark.sql.delta.DeltaAnalysisException: [DELTA_CONFIGURE_SPARK_SESSION_WITH_EXTENSION_AND_CATALOG] This Delta operation requires the SparkSession to be configured with the
DeltaSparkSessionExtension and the DeltaCatalog. Please set the necessary
configurations when creating the SparkSession as shown below.
SparkSession.builder()
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
...
.getOrCreate()
If you are using spark-shell/pyspark/spark-submit, you can add the required configurations to the command as show below:
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
at org.apache.spark.sql.delta.DeltaErrorsBase.configureSparkSessionWithExtensionAndCatalog(DeltaErrors.scala:1698)
at org.apache.spark.sql.delta.DeltaErrorsBase.configureSparkSessionWithExtensionAndCatalog$(DeltaErrors.scala:1695)
at org.apache.spark.sql.delta.DeltaErrors$.configureSparkSessionWithExtensionAndCatalog(DeltaErrors.scala:3413)
at org.apache.spark.sql.delta.DeltaLog.checkRequiredConfigurations(DeltaLog.scala:625)
... 133 elided
```
7 changes: 7 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,17 @@ extra:
property: !ENV GOOGLE_ANALYTICS_KEY
book:
title: Unity Catalog
delta: https://books.japila.pl/delta-lake-internals
spark_sql: https://books.japila.pl/spark-sql-internals
uc:
version: 0.1.0
commit: https://github.com/unitycatalog/unitycatalog/commit
github: https://github.com/unitycatalog/unitycatalog/blob/main
armeria:
version: 1.28.4
javadoc: https://javadoc.io/doc/com.linecorp.armeria/armeria-javadoc/1.28.4
delta:
version: 4.0.0rc1
hibernate:
javadoc: https://docs.jboss.org/hibernate/orm/6.5/javadocs
java:
Expand All @@ -128,6 +132,8 @@ extra:
link: https://jaceklaskowski.medium.com
- icon: fontawesome/brands/mastodon
link: https://fosstodon.org/@jaceklaskowski
spark:
version: 4.0.0-preview1

# The sections with ... use awesome-pages plugin
# https://github.com/lukasgeiter/mkdocs-awesome-pages-plugin
Expand All @@ -137,6 +143,7 @@ nav:
- Features:
- features/index.md
- ... | persistent-storage/**.md
- ... | spark-integration/**.md
- ... | server/**.md
- ... | client/**.md
- ... | cli/**.md

0 comments on commit 0510d8d

Please sign in to comment.