You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Problem:
When I use protobuf scala case class with repeated fields as input parameter in UDF function, I get next error:
'Project [unresolvedalias(FramelessUdf([email protected]), Some(org.apache.spark.sql.Column$$Lambda$1470/0x0000000801614ee0@7f53a31f))]
+- LocalRelation [id#0, names#1, count#2]
org.apache.spark.sql.AnalysisException: unresolved operator 'Project [unresolvedalias(FramelessUdf([email protected]), Some(org.apache.spark.sql.Column$$Lambda$1470/0x0000000801614ee0@7f53a31f))];
'Project [unresolvedalias(FramelessUdf([email protected]), Some(org.apache.spark.sql.Column$$Lambda$1470/0x0000000801614ee0@7f53a31f))]
+- LocalRelation [id#0, names#1, count#2]
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:57)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis$(CheckAnalysis.scala:56)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:188)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$48(CheckAnalysis.scala:620)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$48$adapted(CheckAnalysis.scala:618)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:618)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:97)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:188)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:214)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:211)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3887)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1519)
at com.github.scalapb.RecordStreamingQueryTest.$anonfun$new$2(RecordStreamingQueryTest.scala:33)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.featurespec.AnyFeatureSpecLike$$anon$1.apply(AnyFeatureSpecLike.scala:313)
at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
at org.scalatest.featurespec.AnyFeatureSpec.withFixture(AnyFeatureSpec.scala:1827)
at org.scalatest.featurespec.AnyFeatureSpecLike.invokeWithFixture$1(AnyFeatureSpecLike.scala:311)
at org.scalatest.featurespec.AnyFeatureSpecLike.$anonfun$runTest$1(AnyFeatureSpecLike.scala:323)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.featurespec.AnyFeatureSpecLike.runTest(AnyFeatureSpecLike.scala:323)
at org.scalatest.featurespec.AnyFeatureSpecLike.runTest$(AnyFeatureSpecLike.scala:305)
at org.scalatest.featurespec.AnyFeatureSpec.runTest(AnyFeatureSpec.scala:1827)
at org.scalatest.featurespec.AnyFeatureSpecLike.$anonfun$runTests$1(AnyFeatureSpecLike.scala:382)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:390)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:427)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
at org.scalatest.featurespec.AnyFeatureSpecLike.runTests(AnyFeatureSpecLike.scala:382)
at org.scalatest.featurespec.AnyFeatureSpecLike.runTests$(AnyFeatureSpecLike.scala:381)
at org.scalatest.featurespec.AnyFeatureSpec.runTests(AnyFeatureSpec.scala:1827)
at org.scalatest.Suite.run(Suite.scala:1114)
at org.scalatest.Suite.run$(Suite.scala:1096)
at org.scalatest.featurespec.AnyFeatureSpec.org$scalatest$featurespec$AnyFeatureSpecLike$$super$run(AnyFeatureSpec.scala:1827)
at org.scalatest.featurespec.AnyFeatureSpecLike.$anonfun$run$1(AnyFeatureSpecLike.scala:423)
at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
at org.scalatest.featurespec.AnyFeatureSpecLike.run(AnyFeatureSpecLike.scala:423)
at org.scalatest.featurespec.AnyFeatureSpecLike.run$(AnyFeatureSpecLike.scala:422)
at com.github.scalapb.RecordStreamingQueryTest.org$scalatest$BeforeAndAfterAll$$super$run(RecordStreamingQueryTest.scala:13)
at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
at com.github.scalapb.RecordStreamingQueryTest.run(RecordStreamingQueryTest.scala:13)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
at org.scalatest.tools.Runner$.run(Runner.scala:798)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)
Testscenario:
record.proto
syntax = "proto3";
option java_package = "com.github.scalapb";
message Record {
int32 id = 1;
repeated string names = 2;
}
TestClass:
val rowData = java.util.List.of(Row(1, Seq("name1"), 1))
val schema = StructType(
Array(
StructField("id", DataTypes.IntegerType, nullable = true),
StructField("names", DataTypes.createArrayType(StringType), nullable = true),
StructField("count", DataTypes.IntegerType, nullable = true)
)
)
val df = spark.createDataFrame(rowData, schema)
import scalapb.spark.Implicits._
val my_udf = ProtoSQL.udf { record: Record =>
//serialization to protobuf byte array with schema registry
}
df.select(my_udf(struct(col("id"), col("names")).as[Record]))
.show(false)
The text was updated successfully, but these errors were encountered:
Env:
Problem:
When I use protobuf scala case class with repeated fields as input parameter in UDF function, I get next error:
Testscenario:
record.proto
TestClass:
The text was updated successfully, but these errors were encountered: