Skip to content

Commit

Permalink
[Sample PR] Spark View Catalog with HMS and Coral
Browse files Browse the repository at this point in the history
  • Loading branch information
ljfgem committed Oct 17, 2022
1 parent 6723e22 commit 9d25f11
Show file tree
Hide file tree
Showing 11 changed files with 1,862 additions and 1 deletion.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ allprojects {
url 'https://linkedin.jfrog.io/artifactory/avro-util/'
}
maven {
url 'https://linkedin.bintray.com/maven/'
url uri('/Users/jiefli/.m2/repository')
}
}

Expand Down
25 changes: 25 additions & 0 deletions coral-spark-catalog/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
dependencies {
compile project(':coral-spark')
compile(deps.'linkedin-calcite-core') {
artifact {
name = 'calcite-core'
extension = 'jar'
type = 'jar'
classifier = 'shaded'
}
}

compile 'org.apache.spark:spark-hive_2.12:3.1.3-SNAPSHOT'

testCompile('org.apache.hive:hive-metastore:2.3.7') {
// avro-tools brings in whole bunch of hadoop classes causing duplicates and conflicts
exclude group: 'org.apache.avro', module: 'avro-tools'
}
testCompile('org.apache.hive:hive-exec:2.3.7:core') {
exclude group: 'org.apache.avro', module: 'avro-tools'
// These exclusions are required to prevent duplicate classes since we include
// shaded jar above
exclude group: 'org.apache.calcite', module: 'calcite-core'
exclude group: 'org.apache.calcite', module: 'calcite-avatica'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Copyright 2022 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.spark.catalog;

import java.util.Map;

import com.google.common.collect.ImmutableMap;

import org.apache.spark.sql.connector.catalog.View;
import org.apache.spark.sql.types.StructType;


public class CoralSparkView implements View {

private final String name;
private final String sql;
private final StructType viewSchema;
private final String catalogName;
private final String[] namespace;

public CoralSparkView(String name, String sql, StructType viewSchema, String catalogName, String[] namespace) {
this.name = name;
this.sql = sql;
this.viewSchema = viewSchema;
this.catalogName = catalogName;
this.namespace = namespace;
}

@Override
public String name() {
return name;
}

@Override
public String sql() {
return sql;
}

@Override
public StructType schema() {
return viewSchema;
}

@Override
public String[] columnAliases() {
return new String[0];
}

@Override
public String[] columnComments() {
return new String[0];
}

@Override
public String currentCatalog() {
return catalogName;
}

@Override
public String[] currentNamespace() {
return namespace;
}

@Override
public Map<String, String> properties() {
return ImmutableMap.of();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/**
* Copyright 2018-2022 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.spark.catalog;

import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.avro.Schema;
import org.apache.calcite.rel.RelNode;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.avro.SchemaConverters;
import org.apache.spark.sql.catalyst.FunctionIdentifier;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException;
import org.apache.spark.sql.catalyst.catalog.CatalogFunction;
import org.apache.spark.sql.catalyst.catalog.FunctionResource;
import org.apache.spark.sql.catalyst.catalog.FunctionResourceType;
import org.apache.spark.sql.connector.catalog.CatalogExtension;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.catalog.View;
import org.apache.spark.sql.connector.catalog.ViewCatalog;
import org.apache.spark.sql.connector.catalog.ViewChange;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.util.Utils;
import org.apache.thrift.TException;

import scala.Option;
import scala.collection.JavaConverters;

import com.linkedin.coral.common.HiveMscAdapter;
import com.linkedin.coral.hive.hive2rel.HiveToRelConverter;
import com.linkedin.coral.schema.avro.ViewToAvroSchemaConverter;
import com.linkedin.coral.spark.CoralSpark;
import com.linkedin.coral.spark.containers.SparkUDFInfo;


public class CoralSparkViewCatalog<T extends TableCatalog & SupportsNamespaces>
implements ViewCatalog, CatalogExtension {
private String catalogName;
private T sessionCatalog;
private IMetaStoreClient hiveClient;
private HiveToRelConverter hiveToRelConverter;
private ViewToAvroSchemaConverter viewToAvroSchemaConverter;

@Override
public Identifier[] listViews(String... strings) throws NoSuchNamespaceException {
return new Identifier[0];
}

@Override
public View loadView(Identifier identifier) {
String db = identifier.namespace()[0];
String tbl = identifier.name();

final Table table;
try {
table = hiveClient.getTable(db, tbl);
} catch (TException e) {
return null;
}
if (!table.getTableType().equalsIgnoreCase("VIRTUAL_VIEW")) {
return null;
}

try {
boolean forceLowercaseSchema = (Boolean) SparkSession.active().conf().get(SQLConf.FORCE_LOWERCASE_DALI_SCHEMA());
final Schema avroSchema = viewToAvroSchemaConverter.toAvroSchema(db, tbl, false, forceLowercaseSchema);
final DataType schema = SchemaConverters.toSqlType(avroSchema).dataType();
final RelNode relNode = hiveToRelConverter.convertView(db, tbl);
final CoralSpark coralSpark = CoralSpark.create(relNode, avroSchema);
registerUDFs(coralSpark.getSparkUDFInfoList());
return new CoralSparkView(tbl, coralSpark.getSparkSql(), (StructType) schema, name(), new String[] { db });
} catch (Exception e) {
e.printStackTrace();
return null;
}
}

@Override
public View createView(Identifier identifier, String s, String s1, String[] strings, StructType structType,
String[] strings1, String[] strings2, Map<String, String> map)
throws ViewAlreadyExistsException, NoSuchNamespaceException {
return null;
}

@Override
public View alterView(Identifier identifier, ViewChange... viewChanges)
throws NoSuchViewException, IllegalArgumentException {
return null;
}

@Override
public boolean dropView(Identifier identifier) {
return false;
}

@Override
public void renameView(Identifier identifier, Identifier identifier1)
throws NoSuchViewException, ViewAlreadyExistsException {

}

private void registerUDFs(List<SparkUDFInfo> sparkUDFInfoList) {
final SparkSession sparkSession = SparkSession.active();
Set<URI> mavenDependencies = new HashSet<>();
sparkUDFInfoList.forEach(sparkUDFInfo -> mavenDependencies.addAll(sparkUDFInfo.getArtifactoryUrls()));

final List<FunctionResource> resolvedFunctionResources =
mavenDependencies.stream().map(f -> new FunctionResource(FunctionResourceType.fromString("jar"), f.toString()))
.collect(Collectors.toList());
try {
sparkSession.sessionState().catalog()
.loadFunctionResources(JavaConverters.asScalaBuffer(resolvedFunctionResources));
} catch (Exception e) {
e.printStackTrace();
}
sparkUDFInfoList.forEach(udf -> {
switch (udf.getUdfType()) {
case HIVE_CUSTOM_UDF:
sparkSession.sessionState().catalog()
.registerFunction(new CatalogFunction(new FunctionIdentifier(udf.getFunctionName()), udf.getClassName(),
JavaConverters.asScalaBuffer(resolvedFunctionResources)), true, Option.apply(null));
break;
case TRANSPORTABLE_UDF:
try {
Utils.classForName(udf.getClassName(), true, false).getMethod("register", String.class).invoke(null,
udf.getFunctionName());
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
e.printStackTrace();
}
break;
default:
throw new RuntimeException("Unsupported UDF type: " + udf.getUdfType());
}
});
}

@Override
public void setDelegateCatalog(CatalogPlugin sparkSessionCatalog) {
if (sparkSessionCatalog instanceof TableCatalog && sparkSessionCatalog instanceof SupportsNamespaces) {
this.sessionCatalog = (T) sparkSessionCatalog;
} else {
throw new IllegalArgumentException("Invalid session catalog: " + sparkSessionCatalog);
}
}

@Override
public String[][] listNamespaces() throws NoSuchNamespaceException {
return new String[0][];
}

@Override
public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException {
return new String[0][];
}

@Override
public Map<String, String> loadNamespaceMetadata(String[] namespace) throws NoSuchNamespaceException {
return null;
}

@Override
public void createNamespace(String[] namespace, Map<String, String> metadata) throws NamespaceAlreadyExistsException {

}

@Override
public void alterNamespace(String[] namespace, NamespaceChange... changes) throws NoSuchNamespaceException {

}

@Override
public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException {
return false;
}

@Override
public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
return new Identifier[0];
}

@Override
public org.apache.spark.sql.connector.catalog.Table loadTable(Identifier ident) throws NoSuchTableException {
return sessionCatalog.loadTable(ident);
}

@Override
public org.apache.spark.sql.connector.catalog.Table createTable(Identifier ident, StructType schema,
Transform[] partitions, Map<String, String> properties)
throws TableAlreadyExistsException, NoSuchNamespaceException {
return null;
}

@Override
public org.apache.spark.sql.connector.catalog.Table alterTable(Identifier ident, TableChange... changes)
throws NoSuchTableException {
return null;
}

@Override
public boolean dropTable(Identifier ident) {
return false;
}

@Override
public void renameTable(Identifier oldIdent, Identifier newIdent)
throws NoSuchTableException, TableAlreadyExistsException {

}

@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
try {
hiveClient = new HiveMetaStoreClient(new HiveConf());
} catch (MetaException e) {
e.printStackTrace();
}
hiveToRelConverter = new HiveToRelConverter(new HiveMscAdapter(hiveClient));
viewToAvroSchemaConverter = ViewToAvroSchemaConverter.create(new HiveMscAdapter(hiveClient));
}

@Override
public String name() {
return catalogName;
}
}
Loading

0 comments on commit 9d25f11

Please sign in to comment.