Skip to content

Commit

Permalink
BDP-7396: TransportUdf Determine scala version in CoralSpark and retu…
Browse files Browse the repository at this point in the history
…rn respective ivy link (#119)
  • Loading branch information
HotSushi authored Aug 10, 2021
1 parent ab35a3d commit 9d78a74
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 44 deletions.
3 changes: 2 additions & 1 deletion coral-spark/build.gradle
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
apply from: "$rootDir/gradle/java-publication.gradle"
apply from: "spark_itest.gradle"

dependencies {
compile deps.'gson'
compile deps.'slf4j-api'
compile deps.'slf4j-log4j12'
compile project(':coral-hive')

compileOnly deps.'spark'.'sql'
testCompile(deps.'hive'.'hive-exec-core') {
exclude group: 'org.apache.avro', module: 'avro-tools'
// These exclusions are required to prevent duplicate classes since we include
// shaded jar above
exclude group: 'org.apache.calcite', module: 'calcite-core'
exclude group: 'org.apache.calcite', module: 'calcite-avatica'
}

testCompile deps.'hadoop'.'hadoop-mapreduce-client-core'
testCompile deps.'kryo'
}
Expand Down
32 changes: 32 additions & 0 deletions coral-spark/spark_itest.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
def itests = ['spark', 'spark3']

for (String itest : itests) {
sourceSets {
create("${itest}test", {
java {
compileClasspath += main.output
runtimeClasspath += main.output
}
})
}

configurations {
getByName("${itest}testCompile").extendsFrom(testCompile)
getByName("${itest}testRuntime").extendsFrom(testRuntime)
}

tasks.register("${itest}Test", Test) {
description = "Run ${itest} integration tests"
testClassesDirs = project.sourceSets.getByName("${itest}test").output.classesDirs
classpath = project.sourceSets.getByName("${itest}test").runtimeClasspath
shouldRunAfter test
useTestNG()
}

check.dependsOn(tasks.getByName("${itest}Test"))
}

dependencies {
sparktestCompile(deps.'spark'.'hive')
spark3testCompile(deps.'spark3'.'hive')
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
package com.linkedin.coral.spark;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linkedin.coral.spark.containers.SparkUDFInfo;
import com.linkedin.coral.spark.exceptions.UnsupportedUDFException;


/**
Expand All @@ -28,88 +31,103 @@ class TransportableUDFMap {
private TransportableUDFMap() {
}

private static final Map<String, SparkUDFInfo> UDF_MAP = new HashMap();
public static final String STANDARD_UDFS_DALI_UDFS_URL =
"ivy://com.linkedin.standard-udfs-dali-udfs:standard-udfs-dali-udfs:1.0.4?classifier=spark";
private static final Logger LOG = LoggerFactory.getLogger(TransportableUDFMap.class);
private static final Map<String, Map<ScalaVersion, SparkUDFInfo>> UDF_MAP = new HashMap();
public static final String DALI_UDFS_IVY_URL_SPARK_2_11 =
"ivy://com.linkedin.standard-udfs-dali-udfs:standard-udfs-dali-udfs:2.0.0?classifier=spark_2.11";
public static final String DALI_UDFS_IVY_URL_SPARK_2_12 =
"ivy://com.linkedin.standard-udfs-dali-udfs:standard-udfs-dali-udfs:2.0.0?classifier=spark_2.12";

enum ScalaVersion {
SCALA_2_11,
SCALA_2_12
}

static {

// LIHADOOP-48502: The following UDFs are the legacy Hive UDF. Since they have been converted to
// Transport UDF, we point their class files to the corresponding Spark jar.
add("com.linkedin.dali.udf.date.hive.DateFormatToEpoch", "com.linkedin.stdudfs.daliudfs.spark.DateFormatToEpoch",
STANDARD_UDFS_DALI_UDFS_URL);
DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12);

add("com.linkedin.dali.udf.date.hive.EpochToDateFormat", "com.linkedin.stdudfs.daliudfs.spark.EpochToDateFormat",
STANDARD_UDFS_DALI_UDFS_URL);
DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12);

add("com.linkedin.dali.udf.date.hive.EpochToEpochMilliseconds",
"com.linkedin.stdudfs.daliudfs.spark.EpochToEpochMilliseconds", STANDARD_UDFS_DALI_UDFS_URL);
"com.linkedin.stdudfs.daliudfs.spark.EpochToEpochMilliseconds", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12);

add("com.linkedin.dali.udf.isguestmemberid.hive.IsGuestMemberId",
"com.linkedin.stdudfs.daliudfs.spark.IsGuestMemberId", STANDARD_UDFS_DALI_UDFS_URL);
"com.linkedin.stdudfs.daliudfs.spark.IsGuestMemberId", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12);

// LIHADOOP-49851 add the transportudf spark version for lookup UDF
add("com.linkedin.dali.udf.istestmemberid.hive.IsTestMemberId",
"com.linkedin.stdudfs.daliudfs.spark.IsTestMemberId", STANDARD_UDFS_DALI_UDFS_URL);
"com.linkedin.stdudfs.daliudfs.spark.IsTestMemberId", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12);

add("com.linkedin.dali.udf.maplookup.hive.MapLookup", "com.linkedin.stdudfs.daliudfs.spark.MapLookup",
STANDARD_UDFS_DALI_UDFS_URL);
DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12);

add("com.linkedin.dali.udf.sanitize.hive.Sanitize", "com.linkedin.stdudfs.daliudfs.spark.Sanitize",
STANDARD_UDFS_DALI_UDFS_URL);
DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12);

// LIHADOOP-49851 add the transportudf spark version for lookup UDF
add("com.linkedin.dali.udf.watbotcrawlerlookup.hive.WATBotCrawlerLookup",
"com.linkedin.stdudfs.daliudfs.spark.WatBotCrawlerLookup", STANDARD_UDFS_DALI_UDFS_URL);
"com.linkedin.stdudfs.daliudfs.spark.WatBotCrawlerLookup", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12);

// LIHADOOP-48502: The following UDFs are already defined using Transport UDF.
// The class name is the corresponding Hive UDF.
// We point their class files to the corresponding Spark jar file.
add("com.linkedin.stdudfs.daliudfs.hive.DateFormatToEpoch", "com.linkedin.stdudfs.daliudfs.spark.DateFormatToEpoch",
STANDARD_UDFS_DALI_UDFS_URL);
DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12);

add("com.linkedin.stdudfs.daliudfs.hive.EpochToDateFormat", "com.linkedin.stdudfs.daliudfs.spark.EpochToDateFormat",
STANDARD_UDFS_DALI_UDFS_URL);
DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12);

add("com.linkedin.stdudfs.daliudfs.hive.EpochToEpochMilliseconds",
"com.linkedin.stdudfs.daliudfs.spark.EpochToEpochMilliseconds", STANDARD_UDFS_DALI_UDFS_URL);
"com.linkedin.stdudfs.daliudfs.spark.EpochToEpochMilliseconds", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12);

add("com.linkedin.stdudfs.daliudfs.hive.GetProfileSections",
"com.linkedin.stdudfs.daliudfs.spark.GetProfileSections", STANDARD_UDFS_DALI_UDFS_URL);
"com.linkedin.stdudfs.daliudfs.spark.GetProfileSections", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12);

add("com.linkedin.stdudfs.stringudfs.hive.InitCap", "com.linkedin.stdudfs.stringudfs.spark.InitCap",
"ivy://com.linkedin.standard-udfs-common-sql-udfs:standard-udfs-string-udfs:0.0.7?classifier=spark");
"ivy://com.linkedin.standard-udfs-common-sql-udfs:standard-udfs-string-udfs:0.0.7?classifier=spark", null);

add("com.linkedin.stdudfs.daliudfs.hive.IsGuestMemberId", "com.linkedin.stdudfs.daliudfs.spark.IsGuestMemberId",
STANDARD_UDFS_DALI_UDFS_URL);
DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12);

add("com.linkedin.stdudfs.daliudfs.hive.IsTestMemberId", "com.linkedin.stdudfs.daliudfs.spark.IsTestMemberId",
STANDARD_UDFS_DALI_UDFS_URL);
DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12);

add("com.linkedin.stdudfs.daliudfs.hive.MapLookup", "com.linkedin.stdudfs.daliudfs.spark.MapLookup",
STANDARD_UDFS_DALI_UDFS_URL);
DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12);

add("com.linkedin.stdudfs.daliudfs.hive.PortalLookup", "com.linkedin.stdudfs.daliudfs.spark.PortalLookup",
STANDARD_UDFS_DALI_UDFS_URL);
DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12);

add("com.linkedin.stdudfs.daliudfs.hive.Sanitize", "com.linkedin.stdudfs.daliudfs.spark.Sanitize",
STANDARD_UDFS_DALI_UDFS_URL);
DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12);

add("com.linkedin.stdudfs.userinterfacelookup.hive.UserInterfaceLookup",
"com.linkedin.stdudfs.userinterfacelookup.spark.UserInterfaceLookup",
"ivy://com.linkedin.standard-udf-userinterfacelookup:userinterfacelookup-std-udf:0.0.9?classifier=spark");
"ivy://com.linkedin.standard-udf-userinterfacelookup:userinterfacelookup-std-udf:0.0.9?classifier=spark", null);

add("com.linkedin.stdudfs.daliudfs.hive.WatBotCrawlerLookup",
"com.linkedin.stdudfs.daliudfs.spark.WatBotCrawlerLookup", STANDARD_UDFS_DALI_UDFS_URL);
"com.linkedin.stdudfs.daliudfs.spark.WatBotCrawlerLookup", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12);

add("com.linkedin.jemslookup.udf.hive.JemsLookup", "com.linkedin.jemslookup.udf.spark.JemsLookup",
"ivy://com.linkedin.jobs-udf:jems-udfs:1.0.0?classifier=spark");
"ivy://com.linkedin.jobs-udf:jems-udfs:1.0.0?classifier=spark", null);

add("com.linkedin.stdudfs.parsing.hive.UserAgentParser", "com.linkedin.stdudfs.parsing.spark.UserAgentParser",
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:2.0.1?classifier=spark");
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:2.0.1?classifier=spark", null);

add("com.linkedin.stdudfs.parsing.hive.Ip2Str", "com.linkedin.stdudfs.parsing.spark.Ip2Str",
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:2.0.1?classifier=spark");
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:2.0.1?classifier=spark", null);
}

/**
Expand All @@ -119,20 +137,37 @@ private TransportableUDFMap() {
* @return Optional<SparkUDFInfo>
*/
static Optional<SparkUDFInfo> lookup(String className) {
return Optional.ofNullable(UDF_MAP.get(className));
ScalaVersion scalaVersion = getScalaVersion();
return Optional.ofNullable(UDF_MAP.get(className)).map(scalaMap -> Optional.ofNullable(scalaMap.get(scalaVersion))
.orElseThrow(() -> new UnsupportedUDFException(String.format(
"Transport UDF for class '%s' is not supported for scala %s, please contact " + "the UDF owner for upgrade",
className, scalaVersion.toString()))));
}

public static void add(String className, String sparkClassName, String artifactoryUrlSpark211,
String artifactoryUrlSpark212) {
Map scalaToTransportUdfMap = new HashMap<ScalaVersion, SparkUDFInfo>() {
{
put(ScalaVersion.SCALA_2_11, artifactoryUrlSpark211 == null ? null : new SparkUDFInfo(sparkClassName, null,
Collections.singletonList(URI.create(artifactoryUrlSpark211)), SparkUDFInfo.UDFTYPE.TRANSPORTABLE_UDF));
put(ScalaVersion.SCALA_2_12, artifactoryUrlSpark212 == null ? null : new SparkUDFInfo(sparkClassName, null,
Collections.singletonList(URI.create(artifactoryUrlSpark212)), SparkUDFInfo.UDFTYPE.TRANSPORTABLE_UDF));
}
};
UDF_MAP.put(className, scalaToTransportUdfMap);
}

public static void add(String className, String sparkClassName, String artifcatoryUrl) {
static ScalaVersion getScalaVersion() {
try {
URI url = new URI(artifcatoryUrl);
List<URI> listOfUris = new LinkedList<>();
listOfUris.add(url);

// Set the function name to null here because it is determined dynamically from the enclosing SqlOperand
UDF_MAP.put(className,
new SparkUDFInfo(sparkClassName, null, listOfUris, SparkUDFInfo.UDFTYPE.TRANSPORTABLE_UDF));
} catch (URISyntaxException e) {
throw new RuntimeException(String.format("Artifactory URL is malformed %s", artifcatoryUrl), e);
String sparkVersion = SparkSession.active().version();
if (sparkVersion.matches("2\\.[\\d\\.]*"))
return ScalaVersion.SCALA_2_11;
if (sparkVersion.matches("3\\.[\\d\\.]*"))
return ScalaVersion.SCALA_2_12;
throw new IllegalStateException(String.format("Unsupported Spark Version %s", sparkVersion));
} catch (IllegalStateException | NoClassDefFoundError ex) {
LOG.warn("Couldn't determine Spark version, falling back to scala_2.11", ex);
return ScalaVersion.SCALA_2_11;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Copyright 2018-2021 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.spark;

import org.apache.spark.sql.SparkSession;
import org.testng.Assert;
import org.testng.annotations.Test;

import com.linkedin.coral.spark.exceptions.UnsupportedUDFException;


public class TransportableUDFMapTest {

@Test
public void testScalaVersionWithSparkSession() {
SparkSession ss = SparkSession.builder().appName(TransportableUDFMapTest.class.getSimpleName()).master("local[1]")
.enableHiveSupport().getOrCreate();
Assert.assertEquals(TransportableUDFMap.getScalaVersion(), TransportableUDFMap.ScalaVersion.SCALA_2_12);
ss.close();
}

@Test
public void testDefaultScalaVersion() {
// If SparkSession is not active, getScalaVersion should return Scala2.11
Assert.assertEquals(TransportableUDFMap.getScalaVersion(), TransportableUDFMap.ScalaVersion.SCALA_2_11);
}

@Test
public void testLookupWithSparkSession() {
SparkSession ss = SparkSession.builder().appName(TransportableUDFMapTest.class.getSimpleName()).master("local[1]")
.enableHiveSupport().getOrCreate();
String stdDaliUdf = "com.linkedin.dali.udf.date.hive.EpochToDateFormat";
Assert.assertTrue(TransportableUDFMap.lookup(stdDaliUdf).get().getArtifactoryUrls().get(0).toString()
.contains("classifier=spark_2.12"));
ss.close();
}

@Test
public void testDefaultLookup() {
// If SparkSession is not active, lookup should return Scala2.11
String stdDaliUdf = "com.linkedin.dali.udf.date.hive.EpochToDateFormat";
Assert.assertTrue(TransportableUDFMap.lookup(stdDaliUdf).get().getArtifactoryUrls().get(0).toString()
.contains("classifier=spark_2.11"));
}

@Test
public void testLookupDoesNotExist() {
String stdClassName = "com.linkedin.dali.udf.date.hive.UdfDoesNotExist";
Assert.assertFalse(TransportableUDFMap.lookup(stdClassName).isPresent());
}

@Test
public void testLookupExistButNotReadyForSpark3() {
SparkSession ss = SparkSession.builder().appName(TransportableUDFMapTest.class.getSimpleName()).master("local[1]")
.enableHiveSupport().getOrCreate();
try {
String stdClassName = "com.transport.test.hive.Spark3Fail";
TransportableUDFMap.add(stdClassName, "com.transport.test.spark.Spark3Fail",
"com.transport:spark-udf:0.0.0?classifier=spark", null);
TransportableUDFMap.lookup(stdClassName);
} catch (Exception ex) {
Assert.assertTrue(ex instanceof UnsupportedUDFException);
} finally {
ss.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Copyright 2018-2021 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.spark;

import org.apache.spark.sql.SparkSession;
import org.testng.Assert;
import org.testng.annotations.Test;


public class TransportableUDFMapTest {

@Test
public void testScalaVersionWithSparkSession() {
SparkSession ss = SparkSession.builder().appName(TransportableUDFMapTest.class.getSimpleName()).master("local[1]")
.enableHiveSupport().getOrCreate();
Assert.assertEquals(TransportableUDFMap.getScalaVersion(), TransportableUDFMap.ScalaVersion.SCALA_2_11);
ss.close();
}

@Test
public void testDefaultScalaVersion() {
// If SparkSession is not active, getScalaVersion should return Scala2.11
Assert.assertEquals(TransportableUDFMap.getScalaVersion(), TransportableUDFMap.ScalaVersion.SCALA_2_11);
}

@Test
public void testLookupWithSparkSession() {
SparkSession ss = SparkSession.builder().appName(TransportableUDFMapTest.class.getSimpleName()).master("local[1]")
.enableHiveSupport().getOrCreate();
String stdDaliUdf = "com.linkedin.dali.udf.date.hive.EpochToDateFormat";
Assert.assertTrue(TransportableUDFMap.lookup(stdDaliUdf).get().getArtifactoryUrls().get(0).toString()
.contains("classifier=spark_2.11"));
ss.close();
}

@Test
public void testDefaultLookup() {
// If SparkSession is not active, lookup should return Scala2.11
String stdDaliUdf = "com.linkedin.dali.udf.date.hive.EpochToDateFormat";
Assert.assertTrue(TransportableUDFMap.lookup(stdDaliUdf).get().getArtifactoryUrls().get(0).toString()
.contains("classifier=spark_2.11"));
}

@Test
public void testLookupDoesNotExist() {
String stdClassName = "com.linkedin.dali.udf.date.hive.UdfDoesNotExist";
Assert.assertFalse(TransportableUDFMap.lookup(stdClassName).isPresent());
}

@Test
public void testLookupExistButNotReadyForSpark3() {
SparkSession ss = SparkSession.builder().appName(TransportableUDFMapTest.class.getSimpleName()).master("local[1]")
.enableHiveSupport().getOrCreate();
String stdClassName = "com.transport.test.hive.Spark3Fail";
TransportableUDFMap.add(stdClassName, "com.transport.test.spark.Spark3Fail",
"com.transport:spark-udf:0.0.0?classifier=spark_2.11", null);
assert TransportableUDFMap.lookup(stdClassName).get().getArtifactoryUrls().get(0).toString()
.contains("classifier=spark_2.11");
ss.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void beforeClass() throws HiveException, MetaException {
UnsupportedHiveUDFsInSpark.add("com.linkedin.coral.hive.hive2rel.CoralTestUnsupportedUDF");

TransportableUDFMap.add("com.linkedin.coral.hive.hive2rel.CoralTestUDF", "com.linkedin.coral.spark.CoralTestUDF",
"ivy://com.linkedin.coral.spark.CoralTestUDF");
"ivy://com.linkedin.coral.spark.CoralTestUDF", null);
}

@Test
Expand Down
Loading

0 comments on commit 9d78a74

Please sign in to comment.