Skip to content

Commit

Permalink
Rebase and address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ljfgem committed Feb 9, 2023
1 parent ad98949 commit 2c388fe
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -350,14 +350,17 @@ private SqlNode generateRightChildForSqlJoinWithLateralViews(BiRel e, Result rig
}

/**
* Override this method to handle the conversion for RelNode `f(x).y.z` where `f` is an operator, which
* returns a struct containing field `y`, `y` is also a struct containing field `z`.
* Override this method to handle the conversion for {@link RexFieldAccess} `f(x).y.z` where `f` is an operator,
* which returns a struct containing field `y`, `y` is also a struct containing field `z`.
*
* Calcite will convert this RelNode to a SqlIdentifier directly (check
* Calcite will convert this RelNode to a {@link SqlIdentifier} directly (check
* {@link org.apache.calcite.rel.rel2sql.SqlImplementor.Context#toSql(RexProgram, RexNode)}),
* which is not aligned with our expectation since we want to apply transformations on `f(x)` with
* {@link com.linkedin.coral.common.transformers.SqlCallTransformer}. Therefore, we override this
* method to convert `f(x)` to SqlCall, `.` to {@link com.linkedin.coral.common.functions.FunctionFieldReferenceOperator#DOT}
* method to convert `f(x)` to {@link SqlCall}, `.` to {@link com.linkedin.coral.common.functions.FunctionFieldReferenceOperator#DOT},
* so `f(x).y.z` will be converted to `(f(x).y).z`.
*
* Check `CoralSparkTest#testConvertFieldAccessOnFunctionCall` for unit test and example.
*/
@Override
public Context aliasContext(Map<String, RelDataType> aliases, boolean qualified) {
Expand All @@ -373,7 +376,8 @@ public SqlNode toSql(RexProgram program, RexNode rex) {
accessNames.add(((RexFieldAccess) referencedExpr).getField().getName());
referencedExpr = ((RexFieldAccess) referencedExpr).getReferenceExpr();
}
if (referencedExpr.getKind() == SqlKind.OTHER_FUNCTION || referencedExpr.getKind() == SqlKind.CAST) {
final SqlKind sqlKind = referencedExpr.getKind();
if (sqlKind == SqlKind.OTHER_FUNCTION || sqlKind == SqlKind.CAST || sqlKind == SqlKind.ROW) {
SqlNode functionCall = toSql(program, referencedExpr);
Collections.reverse(accessNames);
for (String accessName : accessNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,147 +9,151 @@

import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.util.SqlShuttle;

import com.linkedin.coral.common.transformers.OperatorRenameSqlCallTransformer;
import com.linkedin.coral.common.transformers.SqlCallTransformers;
import com.linkedin.coral.spark.containers.SparkUDFInfo;
import com.linkedin.coral.spark.transformers.FallBackToHiveUDFTransformer;
import com.linkedin.coral.spark.transformers.TransportableUDFTransformer;
import com.linkedin.coral.spark.transformers.FallBackToLinkedInHiveUDFTransformer;
import com.linkedin.coral.spark.transformers.TransportUDFTransformer;

import static com.linkedin.coral.spark.transformers.TransportableUDFTransformer.*;
import static com.linkedin.coral.spark.transformers.TransportUDFTransformer.*;


/**
* This class extends the class of {@link org.apache.calcite.sql.util.SqlShuttle} and initialize a {@link com.linkedin.coral.common.transformers.SqlCallTransformers}
* which containing a list of {@link com.linkedin.coral.common.transformers.SqlCallTransformer} to traverse the hierarchy of a {@link org.apache.calcite.sql.SqlCall}
* and converts the functions from Coral operator to Spark operator if it is required
*
* In this converter, we need to apply {@link TransportUDFTransformer} before {@link FallBackToLinkedInHiveUDFTransformer}
* because we should try to transform a UDF to an equivalent Transport UDF before falling back to LinkedIn Hive UDF.
*/
public class CoralToSparkSqlCallConverter extends SqlShuttle {
private final SqlCallTransformers sqlCallTransformers;

public CoralToSparkSqlCallConverter(Set<SparkUDFInfo> sparkUDFInfos) {
this.sqlCallTransformers = SqlCallTransformers.of(
// Transportable UDFs
new TransportableUDFTransformer("com.linkedin.dali.udf.date.hive.DateFormatToEpoch",
// Transport UDFs
new TransportUDFTransformer("com.linkedin.dali.udf.date.hive.DateFormatToEpoch",
"com.linkedin.stdudfs.daliudfs.spark.DateFormatToEpoch", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.dali.udf.date.hive.EpochToDateFormat",
new TransportUDFTransformer("com.linkedin.dali.udf.date.hive.EpochToDateFormat",
"com.linkedin.stdudfs.daliudfs.spark.EpochToDateFormat", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.dali.udf.date.hive.EpochToEpochMilliseconds",
new TransportUDFTransformer("com.linkedin.dali.udf.date.hive.EpochToEpochMilliseconds",
"com.linkedin.stdudfs.daliudfs.spark.EpochToEpochMilliseconds", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.dali.udf.isguestmemberid.hive.IsGuestMemberId",
new TransportUDFTransformer("com.linkedin.dali.udf.isguestmemberid.hive.IsGuestMemberId",
"com.linkedin.stdudfs.daliudfs.spark.IsGuestMemberId", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.dali.udf.istestmemberid.hive.IsTestMemberId",
new TransportUDFTransformer("com.linkedin.dali.udf.istestmemberid.hive.IsTestMemberId",
"com.linkedin.stdudfs.daliudfs.spark.IsTestMemberId", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.dali.udf.maplookup.hive.MapLookup",
new TransportUDFTransformer("com.linkedin.dali.udf.maplookup.hive.MapLookup",
"com.linkedin.stdudfs.daliudfs.spark.MapLookup", DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12,
sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.dali.udf.sanitize.hive.Sanitize",
new TransportUDFTransformer("com.linkedin.dali.udf.sanitize.hive.Sanitize",
"com.linkedin.stdudfs.daliudfs.spark.Sanitize", DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12,
sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.dali.udf.watbotcrawlerlookup.hive.WATBotCrawlerLookup",
new TransportUDFTransformer("com.linkedin.dali.udf.watbotcrawlerlookup.hive.WATBotCrawlerLookup",
"com.linkedin.stdudfs.daliudfs.spark.WatBotCrawlerLookup", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.DateFormatToEpoch",
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.DateFormatToEpoch",
"com.linkedin.stdudfs.daliudfs.spark.DateFormatToEpoch", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.EpochToDateFormat",
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.EpochToDateFormat",
"com.linkedin.stdudfs.daliudfs.spark.EpochToDateFormat", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.EpochToEpochMilliseconds",
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.EpochToEpochMilliseconds",
"com.linkedin.stdudfs.daliudfs.spark.EpochToEpochMilliseconds", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.GetProfileSections",
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.GetProfileSections",
"com.linkedin.stdudfs.daliudfs.spark.GetProfileSections", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.stdudfs.stringudfs.hive.InitCap",
new TransportUDFTransformer("com.linkedin.stdudfs.stringudfs.hive.InitCap",
"com.linkedin.stdudfs.stringudfs.spark.InitCap",
"ivy://com.linkedin.standard-udfs-common-sql-udfs:standard-udfs-string-udfs:1.0.1?classifier=spark_2.11",
"ivy://com.linkedin.standard-udfs-common-sql-udfs:standard-udfs-string-udfs:1.0.1?classifier=spark_2.12",
sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.IsGuestMemberId",
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.IsGuestMemberId",
"com.linkedin.stdudfs.daliudfs.spark.IsGuestMemberId", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.IsTestMemberId",
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.IsTestMemberId",
"com.linkedin.stdudfs.daliudfs.spark.IsTestMemberId", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.MapLookup",
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.MapLookup",
"com.linkedin.stdudfs.daliudfs.spark.MapLookup", DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12,
sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.PortalLookup",
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.PortalLookup",
"com.linkedin.stdudfs.daliudfs.spark.PortalLookup", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.Sanitize",
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.Sanitize",
"com.linkedin.stdudfs.daliudfs.spark.Sanitize", DALI_UDFS_IVY_URL_SPARK_2_11, DALI_UDFS_IVY_URL_SPARK_2_12,
sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.stdudfs.userinterfacelookup.hive.UserInterfaceLookup",
new TransportUDFTransformer("com.linkedin.stdudfs.userinterfacelookup.hive.UserInterfaceLookup",
"com.linkedin.stdudfs.userinterfacelookup.spark.UserInterfaceLookup",
"ivy://com.linkedin.standard-udf-userinterfacelookup:userinterfacelookup-std-udf:0.0.27?classifier=spark_2.11",
"ivy://com.linkedin.standard-udf-userinterfacelookup:userinterfacelookup-std-udf:0.0.27?classifier=spark_2.12",
sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.WatBotCrawlerLookup",
new TransportUDFTransformer("com.linkedin.stdudfs.daliudfs.hive.WatBotCrawlerLookup",
"com.linkedin.stdudfs.daliudfs.spark.WatBotCrawlerLookup", DALI_UDFS_IVY_URL_SPARK_2_11,
DALI_UDFS_IVY_URL_SPARK_2_12, sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.jemslookup.udf.hive.JemsLookup",
new TransportUDFTransformer("com.linkedin.jemslookup.udf.hive.JemsLookup",
"com.linkedin.jemslookup.udf.spark.JemsLookup",
"ivy://com.linkedin.jobs-udf:jems-udfs:2.1.7?classifier=spark_2.11",
"ivy://com.linkedin.jobs-udf:jems-udfs:2.1.7?classifier=spark_2.12", sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.stdudfs.parsing.hive.UserAgentParser",
new TransportUDFTransformer("com.linkedin.stdudfs.parsing.hive.UserAgentParser",
"com.linkedin.stdudfs.parsing.spark.UserAgentParser",
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:3.0.3?classifier=spark_2.11",
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:3.0.3?classifier=spark_2.12", sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.stdudfs.parsing.hive.Ip2Str",
new TransportUDFTransformer("com.linkedin.stdudfs.parsing.hive.Ip2Str",
"com.linkedin.stdudfs.parsing.spark.Ip2Str",
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:3.0.3?classifier=spark_2.11",
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:3.0.3?classifier=spark_2.12", sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.stdudfs.lookup.hive.BrowserLookup",
new TransportUDFTransformer("com.linkedin.stdudfs.lookup.hive.BrowserLookup",
"com.linkedin.stdudfs.lookup.spark.BrowserLookup",
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:3.0.3?classifier=spark_2.11",
"ivy://com.linkedin.standard-udfs-parsing:parsing-stdudfs:3.0.3?classifier=spark_2.12", sparkUDFInfos),

new TransportableUDFTransformer("com.linkedin.jobs.udf.hive.ConvertIndustryCode",
new TransportUDFTransformer("com.linkedin.jobs.udf.hive.ConvertIndustryCode",
"com.linkedin.jobs.udf.spark.ConvertIndustryCode",
"ivy://com.linkedin.jobs-udf:jobs-udfs:2.1.6?classifier=spark_2.11",
"ivy://com.linkedin.jobs-udf:jobs-udfs:2.1.6?classifier=spark_2.12", sparkUDFInfos),

// Transportable UDF for unit test
new TransportableUDFTransformer("com.linkedin.coral.hive.hive2rel.CoralTestUDF",
// Transport UDF for unit test
new TransportUDFTransformer("com.linkedin.coral.hive.hive2rel.CoralTestUDF",
"com.linkedin.coral.spark.CoralTestUDF",
"ivy://com.linkedin.coral.spark.CoralTestUDF?classifier=spark_2.11", null, sparkUDFInfos),

// Built-in operator
new OperatorRenameSqlCallTransformer("CARDINALITY", 1, "size"),
new OperatorRenameSqlCallTransformer(SqlStdOperatorTable.CARDINALITY, 1, "size"),

// Fall back to the original Hive UDF defined in StaticHiveFunctionRegistry after failing to apply transformers above
new FallBackToHiveUDFTransformer(sparkUDFInfos));
new FallBackToLinkedInHiveUDFTransformer(sparkUDFInfos));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class SparkSqlRewriter extends SqlShuttle {
* is translated to
* SELECT named_struct(.....)
*
* Check `CoralSparkTest#testAvoidCastToRow` for unit test and a more complex example.
*
* Also replaces:
*
* CAST(NULL AS NULL)
Expand All @@ -70,15 +72,14 @@ && containsSqlRowTypeSpec((SqlDataTypeSpec) call.getOperandList().get(1))) {
private boolean containsSqlRowTypeSpec(SqlDataTypeSpec sqlDataTypeSpec) {
if (sqlDataTypeSpec instanceof SqlRowTypeSpec) {
return true;
}
if (sqlDataTypeSpec instanceof SqlArrayTypeSpec) {
} else if (sqlDataTypeSpec instanceof SqlArrayTypeSpec) {
return containsSqlRowTypeSpec(((SqlArrayTypeSpec) sqlDataTypeSpec).getElementTypeSpec());
}
if (sqlDataTypeSpec instanceof SqlMapTypeSpec) {
} else if (sqlDataTypeSpec instanceof SqlMapTypeSpec) {
return containsSqlRowTypeSpec(((SqlMapTypeSpec) sqlDataTypeSpec).getKeyTypeSpec())
|| containsSqlRowTypeSpec(((SqlMapTypeSpec) sqlDataTypeSpec).getValueTypeSpec());
} else {
return false;
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@


/**
* After failing to transform UDF with {@link TransportableUDFTransformer},
* After failing to transform UDF with {@link TransportUDFTransformer},
* we use this transformer to fall back to the original Hive UDF defined in
* {@link com.linkedin.coral.hive.hive2rel.functions.StaticHiveFunctionRegistry}.
* This is reasonable since Spark understands and has ability to run Hive UDF.
* Check `CoralSparkTest#testFallBackToHiveUDFTransformer()` for an example.
* Check `CoralSparkTest#testFallBackToLinkedInHiveUDFTransformer()` for an example.
*/
public class FallBackToHiveUDFTransformer extends SqlCallTransformer {
private static final Logger LOG = LoggerFactory.getLogger(FallBackToHiveUDFTransformer.class);
public class FallBackToLinkedInHiveUDFTransformer extends SqlCallTransformer {
private static final Logger LOG = LoggerFactory.getLogger(FallBackToLinkedInHiveUDFTransformer.class);

/**
* Some LinkedIn UDFs get registered correctly in a SparkSession, and hence a DataFrame is successfully
Expand All @@ -46,33 +46,35 @@ public class FallBackToHiveUDFTransformer extends SqlCallTransformer {
"com.linkedin.coral.hive.hive2rel.CoralTestUnsupportedUDF");
private final Set<SparkUDFInfo> sparkUDFInfos;

public FallBackToHiveUDFTransformer(Set<SparkUDFInfo> sparkUDFInfos) {
public FallBackToLinkedInHiveUDFTransformer(Set<SparkUDFInfo> sparkUDFInfos) {
this.sparkUDFInfos = sparkUDFInfos;
}

@Override
protected boolean condition(SqlCall sqlCall) {
final String functionClassName = sqlCall.getOperator().getName();
if (UNSUPPORTED_HIVE_UDFS.contains(functionClassName)) {
throw new UnsupportedUDFException(functionClassName);
}
return functionClassName.contains(".") && !functionClassName.equals(".");
final SqlOperator operator = sqlCall.getOperator();
final String operatorName = operator.getName();
return operator instanceof VersionedSqlUserDefinedFunction && operatorName.contains(".")
&& !operatorName.equals(".");
}

@Override
protected SqlCall transform(SqlCall sqlCall) {
final VersionedSqlUserDefinedFunction operator = (VersionedSqlUserDefinedFunction) sqlCall.getOperator();
final String functionClassName = operator.getName();
final String expandedFunctionName = operator.getViewDependentFunctionName();
final String operatorName = operator.getName();
if (UNSUPPORTED_HIVE_UDFS.contains(operatorName)) {
throw new UnsupportedUDFException(operatorName);
}
final String viewDependentFunctionName = operator.getViewDependentFunctionName();
final List<String> dependencies = operator.getIvyDependencies();
List<URI> listOfUris = dependencies.stream().map(URI::create).collect(Collectors.toList());
LOG.info("Function: {} is not a Builtin UDF or Transportable UDF. We fall back to its Hive "
+ "function with ivy dependency: {}", functionClassName, String.join(",", dependencies));
LOG.info("Function: {} is not a Builtin UDF or Transport UDF. We fall back to its Hive "
+ "function with ivy dependency: {}", operatorName, String.join(",", dependencies));
final SparkUDFInfo sparkUDFInfo =
new SparkUDFInfo(functionClassName, expandedFunctionName, listOfUris, SparkUDFInfo.UDFTYPE.HIVE_CUSTOM_UDF);
new SparkUDFInfo(operatorName, viewDependentFunctionName, listOfUris, SparkUDFInfo.UDFTYPE.HIVE_CUSTOM_UDF);
sparkUDFInfos.add(sparkUDFInfo);
final SqlOperator convertedFunction =
createSqlOperatorOfFunction(expandedFunctionName, operator.getReturnTypeInference());
createSqlOperator(viewDependentFunctionName, operator.getReturnTypeInference());
return convertedFunction.createCall(sqlCall.getParserPosition(), sqlCall.getOperandList());
}
}
Loading

0 comments on commit 2c388fe

Please sign in to comment.