Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.4+ / -Connect support #195

Open
Jolanrensen opened this issue Apr 15, 2023 · 61 comments · May be fixed by #218
Open

Spark 3.4+ / -Connect support #195

Jolanrensen opened this issue Apr 15, 2023 · 61 comments · May be fixed by #218
Labels
enhancement New feature or request
Milestone

Comments

@Jolanrensen
Copy link
Collaborator

https://spark.apache.org/news/spark-3-4-0-released.html

@Jolanrensen
Copy link
Collaborator Author

Jolanrensen commented Apr 18, 2023

Seems like ScalaReflection.scala has now really deviated too far from KotlinReflection.scala requiring a major overhaul to function. Maybe it's time to try a new approach, such as #178 which would improve maintainability hugely as well as fix most compatibility issues we face.

This will require time and investment I'm not sure I have with my work on DataFrame (especially with the low number of downloads this library currently has).
Let me know if you are still an avid user and would like me to invest time into rebuilding the hacky base this library is built upon!

@Jolanrensen Jolanrensen added the enhancement New feature or request label Apr 18, 2023
@hawkaa
Copy link

hawkaa commented May 12, 2023

Hi. I've just started looking at Spark 3.4 and the first issue we ran into was that we're missing this library. For sure it would be a big win if we could support it. 🙏

@zaleslaw
Copy link

zaleslaw commented May 12, 2023

Please vote top comment if you need! Or write here like @hawkaa

@shane-atg
Copy link

I am very interested in keeping this alive as well.

@NatElkins
Copy link

What would be the next step towards moving forward with this?

@Jolanrensen
Copy link
Collaborator Author

What would be the next step towards moving forward with this?

The next step would be to investigate to find a new way to encode Kotlin Data Classes (both at the top-level of DataFrames, as well as inside columns) and keep inferring types to encoders without using KotlinReflection.scala, such that it's compatible with all versions of Scala and Spark 3.X by default. That way we can keep the API relatively the same and improve the maintainability, compatibility, and stability.
I'm not sure which mechanisms of Spark we can leverage for this; I was thinking of maybe using UDTs and a compiler plugin/annotation processor to generate the UDT classes... but that won't work for top-level tables.
Unfortunately, I'm too occupied with Kotlin DataFrame at the moment, but if someone could provide a proof of concept I'm sure I can provide some help :).

@BierDav
Copy link

BierDav commented Sep 14, 2023

So if I understood that correctly, we will be able to create spark Dataset from a kotlinx DataFrame? That's exactly what I wanted to do, because working with spark Datasets is not that smooth.

Btw. is there currently a workaround for this?

@Jolanrensen
Copy link
Collaborator Author

Jolanrensen commented Sep 14, 2023

So if I understood that correctly, we will be able to create spark Dataset from a kotlinx DataFrame? That's exactly what I wanted to do, because working with spark Datasets is not that smooth.

Btw. is there currently a workaround for this?

No, that's currently not on the roadmap. They're two separate projects, although, we are exploring interop with other databases in DataFrame (Kotlin/dataframe#408) (including Spark).

If you want to convert from Kotlin DataFrame to Spark DataSets, that's actually quite simple:

@DataSchema
data class Name(
    val firstName: String,
    val lastName: String,
)

@DataSchema
data class Person(
    val name: Name,
    val age: Int,
    val city: String?,
    val weight: Int?,
    val isHappy: Boolean,
)

// Kotlin DataFrame
val df: DataFrame<Person> = listOf(
    Person(Name("Alice", "Cooper"), 15, "London", 54, true),
    Person(Name("Bob", "Dylan"), 45, "Dubai", 87, true),
    Person(Name("Charlie", "Daniels"), 20, "Moscow", null, false),
    Person(Name("Charlie", "Chaplin"), 40, "Milan", null, true),
    Person(Name("Bob", "Marley"), 30, "Tokyo", 68, true),
    Person(Name("Alice", "Wolf"), 20, null, 55, false),
    Person(Name("Charlie", "Byrd"), 30, "Moscow", 90, true),
).toDataFrame()

withSpark {
    // Spark Dataset
    val sparkDs: DataSet<Person> = df.toList().toDS()
}

Note that df.toList()/df.toListOf<>() only works if the return type is a data class, which is also what's needed for Spark.

If you want to be able to convert any Kotlin DataFrame to a Spark Dataset<Row>, we'll need to convert the schema as well:

/**
 * Converts the DataFrame to a Spark Dataset of Rows using the provided SparkSession and JavaSparkContext.
 *
 * @param spark The SparkSession object to use for creating the DataFrame.
 * @param sc The JavaSparkContext object to use for converting the DataFrame to RDD.
 * @return A Dataset of Rows representing the converted DataFrame.
 */
fun DataFrame<*>.toSpark(spark: SparkSession, sc: JavaSparkContext): Dataset<Row> {
    val rows = sc.toRDD(rows().map(DataRow<*>::toSpark))
    return spark.createDataFrame(rows, schema().toSpark())
}

/**
 * Converts a DataRow to a Spark Row object.
 *
 * @return The converted Spark Row.
 */
fun DataRow<*>.toSpark(): Row =
    RowFactory.create(
        *values().map {
            when (it) {
                is DataRow<*> -> it.toSpark()
                else -> it
            }
        }.toTypedArray()
    )

/**
 * Converts a DataFrameSchema to a Spark StructType.
 *
 * @return The converted Spark StructType.
 */
fun DataFrameSchema.toSpark(): StructType =
    DataTypes.createStructType(
        columns.map { (name, schema) ->
            DataTypes.createStructField(name, schema.toSpark(), schema.nullable)
        }
    )

/**
 * Converts a ColumnSchema object to Spark DataType.
 *
 * @return The Spark DataType corresponding to the given ColumnSchema object.
 * @throws IllegalArgumentException if the column type or kind is unknown.
 */
fun ColumnSchema.toSpark(): DataType =
    when (this) {
        is ColumnSchema.Value -> type.toSpark() ?: error("unknown data type: $type")
        is ColumnSchema.Group -> schema.toSpark()
        is ColumnSchema.Frame -> error("nested dataframes are not supported")
        else -> error("unknown column kind: $this")
    }

/**
 * Returns the corresponding Spark DataType for a given Kotlin type.
 *
 * @return The Spark DataType that corresponds to the Kotlin type, or null if no matching DataType is found.
 */
fun KType.toSpark(): DataType? = when(this) {
    typeOf<Byte>(), typeOf<Byte?>() -> DataTypes.ByteType
    typeOf<Short>(), typeOf<Short?>() -> DataTypes.ShortType
    typeOf<Int>(), typeOf<Int?>() -> DataTypes.IntegerType
    typeOf<Long>(), typeOf<Long?>() -> DataTypes.LongType
    typeOf<Boolean>(), typeOf<Boolean?>() -> DataTypes.BooleanType
    typeOf<Float>(), typeOf<Float?>() -> DataTypes.FloatType
    typeOf<Double>(), typeOf<Double?>() -> DataTypes.DoubleType
    typeOf<String>(), typeOf<String?>() -> DataTypes.StringType
    typeOf<LocalDate>(), typeOf<LocalDate?>() -> DataTypes.DateType
    typeOf<Date>(), typeOf<Date?>() -> DataTypes.DateType
    typeOf<Timestamp>(), typeOf<Timestamp?>() -> DataTypes.TimestampType
    typeOf<Instant>(), typeOf<Instant?>() -> DataTypes.TimestampType
    typeOf<ByteArray>(), typeOf<ByteArray?>() -> DataTypes.BinaryType
    typeOf<Decimal>(), typeOf<Decimal?>() -> DecimalType.SYSTEM_DEFAULT()
    typeOf<BigDecimal>(), typeOf<BigDecimal?>() -> DecimalType.SYSTEM_DEFAULT()
    typeOf<BigInteger>(), typeOf<BigInteger?>() -> DecimalType.SYSTEM_DEFAULT()
    typeOf<CalendarInterval>(), typeOf<CalendarInterval?>() -> DataTypes.CalendarIntervalType
    else -> null
}

withSpark {
    // Spark Dataset
    val sparkDs: DataSet<Row> = df.toSpark(spark, sc)
}

Edit: for conversion the other way around, check the Wiki: https://github.com/Kotlin/kotlin-spark-api/wiki/Kotlin-DataFrame-interoperability

@Jolanrensen Jolanrensen changed the title Spark 3.4 support Spark 3.4+ support Sep 15, 2023
@NatElkins
Copy link

@Jolanrensen Can you explain a little bit more about what ScalaReflection.scala and KotlinReflection.scala do, what they're for, and why the latter is a blocker to Spark 3.4 support? And what some of the considerations about using UDTs as a replacement might be?

Thank you!

@Jolanrensen
Copy link
Collaborator Author

@Jolanrensen Can you explain a little bit more about what ScalaReflection.scala and KotlinReflection.scala do, what they're for, and why the latter is a blocker to Spark 3.4 support? And what some of the considerations about using UDTs as a replacement might be?

Thank you!

Sure! But I gotta give a warning. I'm not the original author of the patch, just the maintainer of the rest of the library, so this will be my best understanding of what's going on.

One of the biggest features of the Kotlin Spark API is the automatic recognizing and encoding of types as Datasets. Without this, encoders would need to be given explicitly, as is the case for the Java API of Spark. This is the difference between:

spark.createDataset(listOf(1, 2, 3), Encoders.INT())
// and
listOf(1, 2, 3, 4, 5).toDS()

or even

data class Person1 @JvmOverloads constructor(
    var name: String? = null,
    var age: Int? = null,
) : Serializable

spark.createDataset(listOf(Person1("A", 1)), Encoders.bean(Person1::javaClass))

// and
data class Person2(
    val name: String,
    val age: Int,
)
listOf(Person2("A", 1)).toDS()

To do this, we need to automatically generate an encoder based on the typing information provided by the reified type parameter in toDS<>() and the encoder<>() function.

If you follow the generateEncoder function in the same file you can see we'll attempt to create an Encoder from the given KType. This can be either a predefined encoder (from ENCODERS) or some data class, Map, iterable, Array, UDT or anything we can generate an Encoder for on the fly.

An (Expression)Encoder needs two things: An Expression to serialize an object and one to deserialize it. Functions to create these serializers and deserializers exist in ScalaReflection.scala, but they of course can only handle types supported by Spark itself. We want to be able to create (de)serializers for Kotlin Data classes, plus Tuples inside data classes, arrays inside tuples inside data classes inside maps, etc. We need the logic in this file but somehow inject extra functionality. At least, that's what @asm0dey likely thought when making it :). The downside is that we keep bumping into spark-internal functions if we want to call it from the Kotlin side (not even starting about the incompatibilities between Kotlin and Scala). So, a new module was created using the same org.apache.spark.sql package (to be able to call Spark internal functions) and the code from ScalaReflection.scala was copied to KotlinReflection.scala modifying its behavior for Kotlin data class support.

To help with the de(serializing) of Kotlin-specific stuff, a schema/predefinedDt argument was added such that the schema: DataType of a certain type can be generated in Kotlin instead of Scala.

Well, this worked, but, having a large piece of copied internal code in your codebase is bound to cause some issues over time. And so it did...

After each major release of Spark it was a large hassle to keep compatibility between KotlinReflection.scala and the rest of Spark. Especially, since internal calls can change between minor releases and break on a bytecode level. This is why we have so many releases (one for each Scala12/13 and minor Spark combo). Plus, if Spark adds a new feature to 3.Z, well, they can just update their ScalaReflection.scala file. We on the other hand need to support 3.X, 3.Y, ánd 3.Z with just one codebase (which we currently do with a preprocessor, but it's not a walk in the park).

Spark 3.4 was the straw that broke the camel's back. ScalaReflection.scala changed file location and a lot was changed between the last version. It was a good wake-up call to show that this wasn't the way forward. A simple preprocessor cannot ensure compatibility between these versions anymore and who knows what else will break in Spark 3.5 or with Scala 3 even.

We need a new way to encode Kotlin Data Classes while maintaining the current flexibility but without relying on internal Spark code. Spark version bumps (even major ones) need to be doable with minor preprocessor changes. (One version of the API for ALL spark versions is unrealistic, but one for, say 3.0, 3.1, etc. will probably be fine)

There are probably several ways to do this:

  • UDT: Make data classes automatically a user-defined-type, either with an @Annotation or using a Kotlin (2.0) compiler plugin. Downside: UDTs (afaik) only allow user defined objects serialized inside others, such as Tuples, not as a top-level table-like object. This could be possible, but I don't know how. Second, I don't know how (de)serializing of other known JVM types (such as tuples, Seqs, arrays etc) inside data classes would work.
  • Java Bean: Make a compiler plugin that will convert all data classes to something like Person1 at compile time. Top-level should now work, but again I'm not sure about nested types.
  • Other clever Kotlinx reflection + Spark magic: Somehow be able to create an Encoder for any data class using reflection without the use of Spark internals or a compiler plugin. This would be the holy grail, but again, no clue how to do that.
  • Maybe even another way. I'm no Spark expert by any means. Maybe this gitbook could provide any inspiration.

Hopefully, this has given you or anyone interested enough inspiration and explanation to give it a try :) If someone can provide a proof-of-concept, I'd be happy to explore it further.

@asm0dey
Copy link
Contributor

asm0dey commented Sep 19, 2023 via email

@NatElkins
Copy link

Thank you for the comprehensive answer! I will ponder this for a bit.

I have a strong interest in using Kotlin and Spark together for a work project. It doesn't seem like there is a hard blocker per se (I can always just use the Java API), just that some of the nice-to-haves of this library may not be available unless I contribute a fix.

@asm0dey
Copy link
Contributor

asm0dey commented Nov 27, 2023

@Jolanrensen we should probably take a look at the connect API: https://spark.apache.org/docs/latest/spark-connect-overview.html

@Jolanrensen
Copy link
Collaborator Author

Jolanrensen commented Nov 29, 2023

@Jolanrensen we should probably take a look at the connect API: https://spark.apache.org/docs/latest/spark-connect-overview.html

Allowing the Spark driver and code to use different versions from the application code might indeed solve a lot of problems for us!
However, we'd still need a completely new way to encode data classes.

@asm0dey
Copy link
Contributor

asm0dey commented Nov 29, 2023

I can imagine that we'll generate pojos from used data classes. in the compile time, I know you are doing something similar in dataframe, but I don't have any idea how to implement it :)
And POJOs are natively supported by Spark

@Jolanrensen
Copy link
Collaborator Author

@asm0dey A compiler plugin could do that :)

@gregfriis
Copy link

Hi, do you have any estimate on when (or if) 3.4+ will be supported?

@Jolanrensen
Copy link
Collaborator Author

@gregfriis I'm sorry, no, we currently don't have the resources to figure that out. What could help is if someone from the community could provide a proof of concept solution. That way we could better weigh the time/cost to properly support it.

@Jolanrensen
Copy link
Collaborator Author

Jolanrensen commented Mar 11, 2024

Small weekend/hobby update regarding the issue:

I tried Spark Connect but locally on my machine I couldn't get it to work reliably yet. Plus it requires running Spark locally with some special script, so for now, that's not ideal.

I did experiment with the potential compiler plugin route and I do have a hunch that it might be possible :). It does require some special configuration and modules, but it should be doable.

The basic angle is: Converting annotated Kotlin data classes to something Spark sees as a Scala case class. This can automatically provide us with all supported (and even nested) types without having to traverse the classes ourselves.

In theory, this is not that difficult, but it comes with a few gotchas:

  • Scala case classes implement scala.Product, so we need to do that with our data classes too
  • Spark grabs names and type information from the primary constructor, so our primary constructor needs to have only Scala collections (only things that are supported by ScalaReflection.scala, plus, for the argument names to be readable by Scala's reflection, we need the -parameters compiler argument.
  • For Spark to be able to read the values, we need a function in the class without arguments and with the same name for each of the properties of the data class.

To recreate my experiment, we need:

Scala module with type retrieval

Stolen from KotlinReflection.scala, this piece of code can create a Scala Type from a Java class, something we need to make an encoder using ScalaReflection.encoderFor() later on:

object TypeRetrieval {
  val universe: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe

  import universe._

  def getType[T](clazz: Class[T]): universe.Type = {
    clazz match {
      // not sure about this line, without it, no spark dependencies are needed
      case _ if clazz == classOf[Array[Byte]] => localTypeOf[Array[Byte]]
      case _ => {
        val mir = runtimeMirror(clazz.getClassLoader)
        mir.classSymbol(clazz).toType
      }
    }
  }
}

Kotlin module depending on the Scala module

This little helper function can create a Spark ExpressionEncoder for any type or class. This would be similar to calling Encoders.product[T] or ExpressionEncoder[T] from Scala.

inline fun <reified T : Any> encoderFor(): Encoder<T> = encoderFor(T::class.java)
fun <T> encoderFor(klass: Class<T>): Encoder<T> =
    ExpressionEncoder.apply(
        ScalaReflection.encoderFor(
            TypeRetrieval.getType(klass),
            false
        ) as AgnosticEncoder<T>
    )

Next, to limit the amount of generated code, we can create a little bridge from data classes to Product, implementing some stuff with reflection. This may be omitted if it seems to cause too much overhead, as it can also be generated by a compiler plugin right in the data class.

abstract class DataClassProduct(private val klass: KClass<*>) : Product {

    override fun productPrefix(): String = klass.simpleName!!

    private val params
        get() = klass
            .primaryConstructor!!
            .parameters
            .withIndex()
            .associate { (i, it) -> i to it }

    override fun canEqual(that: Any?): Boolean = that!!::class == klass
    override fun productElement(n: Int): Any = params[n]?.let { param ->
        klass.declaredMemberProperties
            .firstOrNull { it.name == param.name }
            ?.call(this)
    } ?: throw IndexOutOfBoundsException(n.toString())

    override fun productArity(): Int = params.size

    override fun productElementNames(): Iterator<String> = CollectionConverters.asScala(
        iterator {
            for (param in params.values) {
                yield(param.name!!)
            }
        }
    )
}

Writing actual Kotlin data classes

NOTE: we need the -parameters tag for compiling it, so in build.gradle.kts:

kotlin {
    jvmToolchain(8)
    compilerOptions {
        javaParameters = true
    }
}

Let's say we now want to encode a simple data class. We could just write

@SomeSparkAnnotation
data class Person(
    val name: String,
    val age: Int,
    val hobbies: List<String>,
    val address: Pair<String, Int>,
)

What the compiler plugin would produce

and then the compiler plugin could convert this to:

data class Person(
    val name: String,
    val age: Int,
    val hobbies: scala.collection.immutable.List<String>, // converting java.util.List to scala.collection.immutable.List
    val address: scala.Tuple2<String, Int>, // converting kotlin.Pair to scala.Tuple2
): DataClassProduct(Person::class) {
    // accessors for Spark
    fun name() = this.name
    fun age() = this.age
    fun hobbies() = this.hobbies
    fun address() = this.address

    companion object {
        // so we can still create it normally from kotlin
        operator fun invoke(
            name: String,
            age: Int,
            hobbies: List<String>,
            address: Pair<String, Int>,
        ): Person =
            Person(
                name = name,
                age = age,
                hobbies = CollectionConverters.asScala(hobbies).toList(),
                address = address.toTuple() // from scalaTuplesInKotlin
            )
    }
}

Running this with Spark will work correctly:

val test = Person(
    name = "Jolan",
    age = 22,
    hobbies = listOf("Programming", "Gaming"),
    address = "Kerkstraat" to 1
)
spark.createDataset(listOf(test), encoderFor<Person>()).show()
// +-----+---+--------------------+---------------+
// | name|age|             hobbies|        address|
// +-----+---+--------------------+---------------+
// |Jolan| 22|[Programming, Gam...|{Kerkstraat, 1}|
// +-----+---+--------------------+---------------+

But, we might need something more advanced, because calling test.hobbies will now result in a Scala list again :/

It does have some promise though, as nested data classes like these even work :), plus we don't need to hack into the Spark source code anymore.

How further

We could try to generate a Person_: DataClassProduct that we can auto-convert to and from by also autogenerating and registering a UDT for each data class. This UDT would then use the encoder of Person_ for all the typing information and handle the conversion from scala-kotlin types. This needs some further experimenting.

(I do have some issues calling the deserializer on an encoded row with the ExpressionEncoder, both in Scala and Kotlin, strange...)
(Edit: fixed by creating (de)serializer like:

private val encoder = encoderFor(Person_::class.java)
private val serializer = encoder.createSerializer()
private val deserializer = encoder.resolveAndBind(DataTypeUtils.toAttributes(encoder.schema()) as Seq<Attribute>, `SimpleAnalyzer$`.`MODULE$`).createDeserializer()

)

And of course, actually build a compiler plugin. This is tricky and requires Kotlin 2.0.

@asm0dey
Copy link
Contributor

asm0dey commented Mar 11, 2024

Hey, this is just awesome research, thank you!

I think it makes more sense to compile to Java POJOs rather than to case classes. POJOs are natively supported by JavaReflection and should be significantly easier to generate. Moreover, it might be easy to generate syntetic (like Person_) POJOs and convert them to data classes automatically

@Jolanrensen
Copy link
Collaborator Author

@asm0dey you mean using Encoders.bean()?
That can indeed be done relatively easily, also generated, but this limits us in other ways again: nullability/default arguments are needed, and, for instance, nested tuples are not supported anymore:

data class AddressJava @JvmOverloads constructor(
    var street: String = "",
    var city: String = "",
) : Serializable

data class PersonJava @JvmOverloads constructor(
    var name: String = "",
    var age: Int = -1,
    var tupleTest: Tuple2<AddressJava, Int>? = null,
    var listTest: List<AddressJava> = emptyList(),
) : Serializable
...
val data = listOf(
    PersonJava("Jolan", 25, Tuple2(AddressJava("Street", "City"), 5), listOf(AddressJava("Street", "City"))),
)

val encoder = Encoders.bean(PersonJava::class.java)
val df = spark.createDataset(data, encoder)
df.show()
// +---+----------------+-----+---------+
// |age|        listTest| name|tupleTest|
// +---+----------------+-----+---------+
// | 25|[{City, Street}]|Jolan|       {}|
// +---+----------------+-----+---------+

And calling df.head() breaks everything:

Exception in thread "main" org.apache.spark.SparkRuntimeException: Error while decoding: org.apache.spark.SparkRuntimeException: Couldn't find a valid constructor on class scala.Tuple2.

This is something that is possible in the current version of the Kotlin Spark API, so it seems a bit harsh to break that.

We could extend JavaTypeInference, but we already tried to make our own
KotlinReflection.encoderFor(class) before...

@asm0dey
Copy link
Contributor

asm0dey commented Mar 11, 2024

Huh, right, I forgot about the tuples support. Is it possible and necessary to support default argument values? It seems that backend should not care at all because at runtime they are already known

@Jolanrensen
Copy link
Collaborator Author

Jolanrensen commented Mar 11, 2024

Java bean support requires an empty constructor + getters/setters, so yeah :/. That's what @JvmOverloads achieves.

Actually, we can do it with lateinit var's

@asm0dey
Copy link
Contributor

asm0dey commented Mar 11, 2024

no-args plugin already adds a noargs constructor to a data class without default arguments, see here: https://kotlinlang.org/docs/no-arg-plugin.html

You can even force it to work on your custom annotation. Doesn't solve the nested tuples feature though

@Jolanrensen
Copy link
Collaborator Author

Jolanrensen commented Mar 12, 2024

2 updates:

First: We can successfully define a UDT for a class (en/de)coding it using the ExpressionEncoder of a different class. No generics are supported afaik though. So we cannot make a generic UDT for java.util.Lists which parses it using an IterableEncoder. (Probably explains why that hasn't been done before)

The downside of this approach is: no real column creation:

root
 |-- value: persondataclassudt (nullable = true)

Second: I tweaked the data class in such a way that it can work perfectly, both as a case class from Spark/Scala, as well as a data class from Kotlin :)

Given:

@SomeAnnotation
data class Address(
    val streetAndNumber: Pair<String, Int>,
    val city: String,
)

We generate:

class Address( // main constructor with scala names/types
    streetAndNumber: Tuple2<String, Int>,
    city: String,
) : Product {
    // secondary in companion object with kotlin names/types
    // Mainly, so Spark doesn't see this constructor 
    companion object {
        operator fun invoke(
            streetAndNumber: Pair<String, Int>,
            city: String,
        ): Address = Address(
            streetAndNumber = streetAndNumber.toTuple(),
            city = city,
        )
    }

    private val scalaStreetAndNumber = streetAndNumber
    private val scalaCity = city

    // For Scala
    fun streetAndNumber() = scalaStreetAndNumber
    fun city() = scalaCity

    // For Kotlin
    val streetAndNumber = scalaStreetAndNumber.toPair()
    val city = scalaCity

    // Product functions
    override fun canEqual(that: Any?): Boolean = that is Address
    override fun productElement(n: Int): Any = when (n) {
        0 -> scalaStreetAndNumber
        1 -> scalaCity
        else -> throw IndexOutOfBoundsException(n.toString())
    }
    override fun productArity(): Int = 2

    // data class functions
    override fun equals(other: Any?): Boolean {
        if (this === other) return true
        if (other !is Address) return false

        if (streetAndNumber != other.streetAndNumber) return false
        if (city != other.city) return false

        return true
    }
    override fun hashCode(): Int {
        var result = streetAndNumber.hashCode()
        result = 31 * result + city.hashCode()
        return result
    }
    fun copy(
        streetAndNumber: Pair<String, Int> = this.streetAndNumber,
        city: String = this.city,
    ): Address = Address(
        streetAndNumber = streetAndNumber,
        city = city,
    )
    override fun toString(): String = "Address(streetAndNumber=$streetAndNumber, city='$city')"
}

Even when nested, the ExpressionEncoder is perfectly happy with this class. It can serialize and deserialize it and nested types like this and other Scala case classes (like tuples) work too.
Plus, from Kotlin we can do anything with it like it's still the original data class.

The plugin does need to create these converters:

  • Pair <-> Tuple2 (unless the user actually wants a Tuple2)
  • Triple <-> Tuple3 (same)
  • Java Array <-> Scala array (probably)
  • java.util.List <-> scala.collection.Seq
  • java.util.Set <-> scala.collection.Set
  • java.util.Map <-> scala.collection.Map
  • nullability <-> scala.Option (maybe?)

The rest should be good to go

Edit: Okay, nullability detection is not really there:

encoderFor<Address>().schema().printTreeString()
// root
// |-- streetAndNumber: struct (nullable = true)
// |    |-- _1: string (nullable = true)
// |    |-- _2: integer (nullable = true)
// |-- city: string (nullable = true)

I'm not sure how to convey these to Scala correctly. In JavaTypeInference, they check for @Nonnull annotations, but in ScalaReflection they don't..

@asm0dey
Copy link
Contributor

asm0dey commented Mar 12, 2024

Wow, man, this sounds stunning! How?

@Jolanrensen
Copy link
Collaborator Author

Wow, man, this sounds stunning! How?

Well if it walks like a case class and quacks like a case class XD. But jokes aside, this still requires building a Kotlin 2.0 compiler plugin to rewrite the user-written data classes on the fly, which is no easy feat, but definitely possible.

@asm0dey
Copy link
Contributor

asm0dey commented Mar 13, 2024

IIRC the reason why I created it in that package was Spark only could read it from that package :) I would never do it without a serious reason. Also, I'm not sure if you can do it in Kotlin — probably it operates with Scala-specific reflection, but here I might be wrong.

If it's possible to implement with Kotlin and outside Spark packages - we won't be affected by changes in Spark anymore, which will bring a bearable amount of maintenance.

@Jolanrensen
Copy link
Collaborator Author

I'm trying the KotlinTypeInference route rn. Seems that most things work fine with Kotlin reflection since, like here, the actual encoders are somewhere else currently. This means we can add both Scala encoders and Java encoders, as well as a special product-like encoder for data classes. I'm not sure how backwards compatible that is, but it seems like the cleanest custom solution to retrieving encoders for Kotlin.

We cannot go fully custom with our encoders, as only the ones used here are compatible, but hey, we can try

@asm0dey
Copy link
Contributor

asm0dey commented Mar 13, 2024

It was never or goal to go fully custom, we should support the sensible subset of possibilities. Looking with a fresh eye I start to think that we actually don't need all these Scala intricacies, we need only to support the Kotlin staff and he smart about friendship delegation of everything else.

The most complex part of support - encoders for data classes and recursive generics are already written and probably should be reused.

@Jolanrensen
Copy link
Collaborator Author

Update time :)

I've been trying to make my own mixed version of JavaTypeInference and ScalaReflection in pure Kotlin with success!

image

This will allow us to create a complete replacement of the old encoding system with data classes supported out of the box :). (I did need 1 dirty hack to get the names of the columns right for data classes, but I'll get to that later haha)

One downside of this approach is that it's definitely Spark 3.4+. This is because they switched their encoding system for Spark connect to this. So, instead of defining a (de)serializer for each type, like before, they define intermediate "AgnosticEncoder" objects which later will get (de)serializers generated when building an ExpressionEncoder. This meant I had to use ProductEncoder for Kotlin data classes since they are the only Encoder that works by reading the values and instantiating a new instance of any class by calling the constructor with all arguments at once.

I'll try to make a new branch in this repo with this approach and see if we can replace the encoding with the new system. This should allow us to drop the :core module entirely and be more spark-version-independent (in theory).

@asm0dey
Copy link
Contributor

asm0dey commented Mar 17, 2024 via email

@asm0dey
Copy link
Contributor

asm0dey commented Mar 17, 2024

AFAIR it should be totally possible to write our own implementation of data class instantiation. I'll try to look at it a bit later

@Jolanrensen
Copy link
Collaborator Author

@asm0dey I thought so too, but if we use AgnosticEncoders (which provides Spark connect compatibility as far as I know), we can only use the encoders given to us.
If we want to define our own serializers and deserializers (which should still be possible) we'll have to define them for all types again, like before.

@Jolanrensen Jolanrensen linked a pull request Mar 17, 2024 that will close this issue
9 tasks
@Jolanrensen
Copy link
Collaborator Author

You can check out #218 to see my progress :).
I inserted my new encoder system in the project and updated references across the board.
I disabled :core for now, but we might still need it for the vararg UDFs. I remember the Kotlin type system sorta broke down when it had to unwrap those by itself... so yeah. But it should be okay to depend on some Scala library (as long as we name it "utils" or something), as long as it doesn't contain Spark.

All other Spark helpers should be doable in pure Kotlin (as I did with debug() and debugCodegen()).

The encoders are still incomplete, so there are still many failing tests, but we can start experimenting with what does and what does not work on that branch.

@Jolanrensen
Copy link
Collaborator Author

Jolanrensen commented Mar 17, 2024

One of the main problems I currently have is with the encoding of data class names using the ProductEncoder. This is because it calls the EncoderField.name() function both for the name of the column, as well as the name of the function for where to retrieve the value. readMethodName is completely ignored.

One way to tackle this is by marking you data classes like:

data class User(@get:JvmName("name") val name: String, @get:JvmName("age") val age: Int)

which sets the JVM function names of getName() and getAge() to name() and age() respectively.

However, this doesn't work for data classes we don't have access to, like Pair and Triple.

My name hack tries to override the name() function, such that only the first time it's called it returns "getAge" and the rest of the time it returns "age". This works for simple data sets (like you see in the picture), but it breaks down when the name() function is called multiple times earlier on. This for instance happens when you try to encode a List<DataClass>.

So yeah... ideas are welcome. You can turn it off by setting KotlinTypeInference.DO_NAME_HACK = false

@asm0dey Maybe a little compiler plugin after all? :) At least for manual user-created data classes

@Jolanrensen
Copy link
Collaborator Author

Jolanrensen commented Mar 19, 2024

Update: I managed to create a little compiler plugin that in the IR stage adds @JvmNames to the getters of @Sparkify annotated data classes :D. Just need to create a Gradle plugin around it so I can publish it (locally) and check if it works in combination with Spark.
Since it just touches IR it should be compatible both with K1 and K2 :).
Also, I did test that I can call the function using Java reflection. That works.

Come to think of it 🤔 I should check whether the @get:JvmName annotation is present already and skip it if so.

Plus, we might be able to add something like @ColumnName("name") (which will just be converted to @get:JvmName("name")) for custom column names :) this annotation would have to be taken into account in the encoding but that's okay.

Finally, I need to take a look at @Transient which we supported before. We could handle it with serializing from a data class, but we cannot instantiate a new data class if we don't have all constructor parameters... (Maybe @Transient requires a default argument to be present (can be warned in FIR) and @jvmoverloads does the rest? Need to test)

@Jolanrensen
Copy link
Collaborator Author

image
image

Yesss! it works in combination with spark

@Jolanrensen
Copy link
Collaborator Author

image
And with @ColumnName :)

@Jolanrensen
Copy link
Collaborator Author

Most tests are now fixed at:
#218
The PR contains the compiler+gradle plugin which can process @Sparkify and @ColumnName annotations. To build the project, both compiler-plugin and gradle-plugin need to be published to mavenLocal.

There's just one place that now fails and that is when returning a @Sparkify data class from a UDF. It's because of this Spark function which does an instance check for the value instead of using the proper deserialize method :/. So, to fix this, we need to make our data classes a scala.Product after all. I'll see what I can do in IR.

@Jolanrensen
Copy link
Collaborator Author

Jolanrensen commented Mar 24, 2024

image

They're a hassle to make, but compiler plugins are awesome :) The compiler plugin now converts:

@Sparkify
data class User(
    val name: String,
    @ColumnName("test") val age: Int,
)

in IR successfully to:

@Sparkify
data class User(
    @get:JvmName("name") val name: String,
    @get:JvmName("test") @ColumnName("test") val age: Int,
): scala.Product {
  override fun canEqual(that: Any?): Boolean = that is User
  override fun productArity(): Int = 2
  override fun productElement(n: Int): Any? =
    if (n == 0) this.name
    else if (n == 1) this.age
    else throw IndexOutOfBoundsException()
}

This is scala version independent since it just queries the classpath for scala.Product and scala.Equals. So unless, Scala changes a lot, this should be future proof.

@Jolanrensen Jolanrensen added this to the 2.0.0 milestone May 14, 2024
@mdsadiqueinam
Copy link

mdsadiqueinam commented Jun 3, 2024

Sorry to bother is there any expected release date for this version

@Jolanrensen
Copy link
Collaborator Author

@mdsadique-inam Sorry for my absence, work called and KotlinConf and DataFrame had priority.

There are still some (large) issues with notebook support (Spark and java serialization breaks everything), so until I can solve that I won't be able to release it.
What I could do, and I plan to do that hopefully soon, is to make a pre-release of this branch #218 so you can try it for yourself and I can gather feedback :).
Unfortunately that also requires some guides/docs etc. and Kotlin DataFrame still has priority to our team, meaning my work on Spark is usually after work/in the weekends so I don't dare to put a date on it.

@mdsadiqueinam
Copy link

@mdsadique-inam Sorry for my absence, work called and KotlinConf and DataFrame had priority.

There are still some (large) issues with notebook support (Spark and java serialization breaks everything), so until I can solve that I won't be able to release it.
What I could do, and I plan to do that hopefully soon, is to make a pre-release of this branch #218 so you can try it for yourself and I can gather feedback :).
Unfortunately that also requires some guides/docs etc. and Kotlin DataFrame still has priority to our team, meaning my work on Spark is usually after work/in the weekends so I don't dare to put a date on it.

I understand your situation, so therefore I am also willing to contribute here, but don't know where to start, I am looking into it.

@Jolanrensen
Copy link
Collaborator Author

@mdsadique-inam If you're familiar with how Spark serialization works. This is what I'm currently struggling with:
https://gist.github.com/Jolanrensen/7ebcdbd0dc8daf252aa5e14e12d29409
Even without the kotlin-spark-api, running any lambda function in a notebook breaks Spark. I've tried %dumpClassesForSpark (they are stored in System.getProperty("spark.repl.class.outputDir")), with or without spark.sql.codegen.wholeStage, using anonymous objects instead of lambdas, @JvmSerializableLambda, but none seem to work. Ideas are welcome :)

@Jolanrensen
Copy link
Collaborator Author

Also, did anyone get spark-connect to work, like at all? I'm trying to run it with a sample project with scala 2.12 (2.13 breaks) and spark 3.5.1 (or 3.5.0), on java 8, 11, any combination, but I keep getting NoSuchMethodErrors. Even though I literally follow https://spark.apache.org/docs/latest/spark-connect-overview.html#use-spark-connect-in-standalone-applications.
I want to know if I can build something spark-connect safe, but it looks like spark-connect atm is not stable enough to try :/

@Jolanrensen
Copy link
Collaborator Author

Jolanrensen commented Jun 15, 2024

Alright! I finally got a working spark-connect connection using the following setup:

Spark-connect server:

  • Spark 3.5.1
  • Scala 2.12
  • Java 1.8
  • not sure if needed, but in spark-defaults.conf:
    • spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
    • spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"

Client:

  • Spark-connect 3.5.1
    • sql api as compileOnly
    • spark-connect client as implementation
  • Scala 2.12/2.13 (both work)
  • JDK 17 (toolchain, gradle, project, sourceCompatibility, the whole shebang)
  • JVM argument "--add-opens=java.base/java.nio=ALL-UNNAMED"

I cannot add a dependency on kotlin-spark-api yet because of clashes between the normal sparkSql and this sqlApi, but let's see if I can make a "watered down" version of the API with the knowledge I have now :)

Watered down version: https://github.com/Kotlin/kotlin-spark-api/tree/3.4%2B-spark-connect

@Jolanrensen
Copy link
Collaborator Author

Jolanrensen commented Jun 18, 2024

Okay, that setup breaks down when you actually try to submit classes in a jar with spark.addArtifact() because the client runs a newer java version... back to the drawing board!

Now with an actual dependence on the watered down Spark API:

Spark-connect server:

  • Spark 3.5.1
  • Scala 2.13/2.12 (must match the client I'm afraid)
  • Java 17 (newest possible for Spark)
  • spark-defaults.conf trick for netty

Spark client:

  • Kotlin 2.0 works
  • Spark-connect 3.5.1
    • sql api as compileOnly
    • spark-connect client as implementation
  • Scala 2.13/2.12 (must match the server) (library as compileOnly on all modules)
  • JDK (1.)8 (sourceCompatibility, targetCompatibility, jvmTarget)
  • Toolchain can be java 17, as long as you use "--add-opens=java.base/java.nio=ALL-UNNAMED"
  • use shadow and runShadow to run the project with spark.addArtifact("path to jar")

working branch: https://github.com/Kotlin/kotlin-spark-api/tree/71f115a9fa0ebec4b44e5bc3857e0fc7bacc190b

Encoding seems to work, but UDTs don't :(

@Jolanrensen Jolanrensen changed the title Spark 3.4+ support Spark 3.4+ / -Connect support Jun 20, 2024
@Jolanrensen
Copy link
Collaborator Author

Jolanrensen commented Jun 20, 2024

I managed to get spark-connect work in notebooks too!

https://github.com/Kotlin/kotlin-spark-api/tree/3.4%2B-spark-connect

image
image

I modified the integration with jupyter to, at the beginning of each cell, add a line that sends all created .class files to a class cache accessible by spark-connect :).

Now I need to decide how to go forward:

  • I either drop normal spark completely and make the kotlin-spark-api just use spark-connect. This is good for compatibility, but bad for getting started quickly and a lot of things are not supported (yet), like RDDs, UDTs (so I cannot support kotlinx.datetime etc.), UDAFs...
  • Or I support both spark-connect and normal spark. This is gonna be a fun java-preprocessor hell again resulting in sparkVersions * scalaVersions * useSparkConnect * modules packages... Plus normal spark in notebooks I couldn't get to work at all anymore, so that's fun.

@asm0dey Any wise words? :)

@leourbina
Copy link

I'm happy to lend a hand, would love to see this come to fruition.

@leourbina
Copy link

leourbina commented Jul 25, 2024

@Jolanrensen Let me know if there's a better communication channel, but happy to chat directly on how I could contribute.

@Jolanrensen
Copy link
Collaborator Author

@leourbina I'm always reachable at https://kotlinlang.slack.com/archives/C015B9ZRGJF for anything :) That might be a good place for ideas, discussions etc.. Better there than to flood this issue even more haha

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

10 participants