Skip to content

Commit

Permalink
tests pass
Browse files Browse the repository at this point in the history
h

mockup

added new bms
  • Loading branch information
GideonPotok committed Jun 5, 2024
1 parent 5d171d6 commit 03e0f36
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,16 @@ class OpenHashMap[K : ClassTag, @specialized(Long, Int, Double) V: ClassTag](

def this() = this(64)

protected var _keySet = new OpenHashSet[K](initialCapacity)
protected var _keySet = new OpenHashSet[K](initialCapacity, 0.7)

/*
specialCase match {
case -1 => None
case _ => Some(o =>
CollationFactory.fetchCollation(specialCase)
.hashFunction.applyAsLong(o.asInstanceOf[UTF8String])
.toInt)
}) */

// Init in constructor (instead of in declaration) to work around a Scala compiler specialization
// bug that would generate two arrays (one for Object and one for specialized T).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import scala.reflect._
import com.google.common.hash.Hashing

import org.apache.spark.annotation.Private
import org.apache.spark.sql.catalyst.util.CollationFactory
import org.apache.spark.unsafe.types.UTF8String

/**
* A simple, fast hash set optimized for non-null insertion-only use case, where keys are never
Expand All @@ -43,7 +45,17 @@ import org.apache.spark.annotation.Private
@Private
class OpenHashSet[@specialized(Long, Int, Double, Float) T: ClassTag](
initialCapacity: Int,
loadFactor: Double)
loadFactor: Double,
var specialPassedInHasher: Option[Object => Int] = Some(o => {
val i = CollationFactory.fetchCollation(1)
.hashFunction.applyAsLong(o.asInstanceOf[UTF8String])
.toInt
// scalastyle:off println
println(s"Hashing: $o -> $i")
// scalastyle:on println
i
})
)
extends Serializable {

require(initialCapacity <= OpenHashSet.MAX_CAPACITY,
Expand All @@ -67,7 +79,10 @@ class OpenHashSet[@specialized(Long, Int, Double, Float) T: ClassTag](
case ClassTag.Int => new IntHasher().asInstanceOf[Hasher[T]]
case ClassTag.Double => new DoubleHasher().asInstanceOf[Hasher[T]]
case ClassTag.Float => new FloatHasher().asInstanceOf[Hasher[T]]
case _ => new Hasher[T]
case _ =>
specialPassedInHasher.map(f =>
new CustomHasher(f.asInstanceOf[Any => Int]).asInstanceOf[Hasher[T]]).getOrElse(
new Hasher[T])
}

protected var _capacity = nextPowerOf2(initialCapacity)
Expand Down Expand Up @@ -118,8 +133,15 @@ class OpenHashSet[@specialized(Long, Int, Double, Float) T: ClassTag](
* See: https://issues.apache.org/jira/browse/SPARK-45599
*/
@annotation.nowarn("cat=other-non-cooperative-equals")
private def keyExistsAtPos(k: T, pos: Int) =
_data(pos) equals k
private def keyExistsAtPos(k: T, pos: Int) = {
classTag[T] match {
case ClassTag.Long => _data(pos) equals k
case ClassTag.Int => _data(pos) equals k
case ClassTag.Double => _data(pos) equals k
case ClassTag.Float => _data(pos) equals k
case _ => _data(pos).asInstanceOf[UTF8String].semanticEquals(k.asInstanceOf[UTF8String], 1)
}
}

/**
* Add an element to the set. This one differs from add in that it doesn't trigger rehashing.
Expand Down Expand Up @@ -291,9 +313,6 @@ object OpenHashSet {
* A set of specialized hash function implementation to avoid boxing hash code computation
* in the specialized implementation of OpenHashSet.
*/
sealed class Hasher[@specialized(Long, Int, Double, Float) T] extends Serializable {
def hash(o: T): Int = o.hashCode()
}

class LongHasher extends Hasher[Long] {
override def hash(o: Long): Int = (o ^ (o >>> 32)).toInt
Expand All @@ -314,6 +333,16 @@ object OpenHashSet {
override def hash(o: Float): Int = java.lang.Float.floatToIntBits(o)
}

class Hasher[@specialized(Long, Int, Double, Float) T] extends Serializable {
def hash(o: T): Int = o.hashCode()
}

class CustomHasher(f: Any => Int) extends Hasher[Any] {
override def hash(o: Any): Int = {
f(o)
}
}

private def grow1(newSize: Int): Unit = {}
private def move1(oldPos: Int, newPos: Int): Unit = { }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
package org.apache.spark.sql.catalyst.expressions.aggregate

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, TypeCheckResult, UnresolvedWithinGroup}
import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, UnresolvedWithinGroup}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, Expression, ExpressionDescription, ImplicitCastInputTypes, SortOrder}
import org.apache.spark.sql.catalyst.trees.UnaryLike
import org.apache.spark.sql.catalyst.types.PhysicalDataType
import org.apache.spark.sql.catalyst.util.{CollationFactory, GenericArrayData, UnsafeRowUtils}
import org.apache.spark.sql.catalyst.util.{GenericArrayData}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, BooleanType, DataType, StringType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, BooleanType, DataType}
import org.apache.spark.util.collection.OpenHashMap

case class Mode(
Expand All @@ -49,21 +48,6 @@ case class Mode(

override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)

override def checkInputDataTypes(): TypeCheckResult = {
val defaultCheck = super.checkInputDataTypes()
if (defaultCheck.isFailure) {
defaultCheck
} else if (UnsafeRowUtils.isBinaryStable(child.dataType) ||
child.dataType.isInstanceOf[StringType]) {
TypeCheckResult.TypeCheckSuccess
} else {
TypeCheckResult.TypeCheckFailure(
"The input to the function 'mode' was a complex type with non-binary collated fields," +
" which are currently not supported by 'mode'."
)
}
}

override def prettyName: String = "mode"

override def update(
Expand All @@ -90,25 +74,15 @@ case class Mode(
if (buffer.isEmpty) {
return null
}
val collationAwareBuffer = child.dataType match {
case c: StringType if
!CollationFactory.fetchCollation(c.collationId).supportsBinaryEquality =>
val collationId = c.collationId
val modeMap = buffer.toSeq.groupMapReduce {
case (k, _) => CollationFactory.getCollationKey(k.asInstanceOf[UTF8String], collationId)
}(x => x)((x, y) => (x._1, x._2 + y._2)).values
modeMap
case _ => buffer
}
reverseOpt.map { reverse =>
val defaultKeyOrdering = if (reverse) {
PhysicalDataType.ordering(child.dataType).asInstanceOf[Ordering[AnyRef]].reverse
} else {
PhysicalDataType.ordering(child.dataType).asInstanceOf[Ordering[AnyRef]]
}
val ordering = Ordering.Tuple2(Ordering.Long, defaultKeyOrdering)
collationAwareBuffer.maxBy { case (key, count) => (count, key) }(ordering)
}.getOrElse(collationAwareBuffer.maxBy(_._2))._1
buffer.maxBy { case (key, count) => (count, key) }(ordering)
}.getOrElse(buffer.maxBy(_._2))._1
}

override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): Mode =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,11 @@ abstract class TypedAggregateWithHashMapAsBuffer
override def createAggregationBuffer(): OpenHashMap[AnyRef, Long] = {
// Initialize new counts map instance here.
new OpenHashMap[AnyRef, Long]()
/* 64, child.dataType match {
case StringType if child.dataType.asInstanceOf[StringType].isUTF8BinaryLcaseCollation => 1
case StringType => 0
case _ => -1
}) */
}

protected def child: Expression
Expand Down Expand Up @@ -681,6 +686,11 @@ abstract class TypedAggregateWithHashMapAsBuffer
val ins = new DataInputStream(bis)
try {
val counts = new OpenHashMap[AnyRef, Long]
/* (64, child.dataType match {
case StringType if child.dataType.asInstanceOf[StringType].isUTF8BinaryLcaseCollation => 1
case StringType => 0
case _ => -1
}) */
// Read unsafeRow size and content in bytes.
var sizeOfNextRow = ins.readInt()
while (sizeOfNextRow >= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.util

import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

object UnsafeRowUtils {

Expand Down
72 changes: 44 additions & 28 deletions sql/core/benchmarks/CollationBenchmark-jdk21-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,62 +2,78 @@ OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - equalsFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE 2896 2898 3 0.0 28958.7 1.0X
UNICODE 2038 2040 3 0.0 20377.5 1.4X
UTF8_BINARY 2053 2054 1 0.0 20534.9 1.4X
UNICODE_CI 16779 16802 34 0.0 167785.2 0.2X
UTF8_BINARY_LCASE 2889 2899 14 0.0 28891.4 1.0X
UNICODE 2018 2020 3 0.0 20175.4 1.4X
UTF8_BINARY 2017 2019 2 0.0 20173.7 1.4X
UNICODE_CI 17402 17403 3 0.0 174016.8 0.2X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - compareFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE 4705 4705 0 0.0 47048.0 1.0X
UNICODE 18863 18867 6 0.0 188625.3 0.2X
UTF8_BINARY 4894 4901 11 0.0 48936.8 1.0X
UNICODE_CI 19595 19598 4 0.0 195953.0 0.2X
UTF8_BINARY_LCASE 2937 2966 42 0.0 29366.7 1.0X
UNICODE 16791 16796 7 0.0 167906.4 0.2X
UTF8_BINARY 3123 3125 3 0.0 31227.3 0.9X
UNICODE_CI 17878 17880 3 0.0 178777.4 0.2X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - hashFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE 5011 5013 2 0.0 50113.1 1.0X
UNICODE 68309 68319 13 0.0 683094.7 0.1X
UTF8_BINARY 3887 3887 1 0.0 38869.8 1.3X
UNICODE_CI 56675 56686 15 0.0 566750.3 0.1X
UTF8_BINARY_LCASE 4809 4824 21 0.0 48088.3 1.0X
UNICODE 65472 65489 24 0.0 654719.7 0.1X
UTF8_BINARY 3804 3806 3 0.0 38043.0 1.3X
UNICODE_CI 52962 53004 59 0.0 529620.9 0.1X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - contains: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE 10534 10534 1 0.0 105336.8 1.0X
UNICODE 5835 5836 2 0.0 58348.9 1.8X
UTF8_BINARY 6451 6453 3 0.0 64506.4 1.6X
UNICODE_CI 313827 314029 285 0.0 3138270.1 0.0X
UTF8_BINARY_LCASE 116774 116794 28 0.0 1167739.5 1.0X
UNICODE 51045 51116 100 0.0 510448.0 2.3X
UTF8_BINARY 8184 8186 2 0.0 81841.3 14.3X
UNICODE_CI 452447 452538 129 0.0 4524465.6 0.3X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - startsWith: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE 10164 10165 2 0.0 101635.6 1.0X
UNICODE 5683 5684 1 0.0 56828.5 1.8X
UTF8_BINARY 6280 6281 2 0.0 62802.3 1.6X
UNICODE_CI 307901 317477 13542 0.0 3079007.4 0.0X
UTF8_BINARY_LCASE 60647 60692 63 0.0 606473.3 1.0X
UNICODE 53281 53281 1 0.0 532809.5 1.1X
UTF8_BINARY 7855 7861 8 0.0 78554.6 7.7X
UNICODE_CI 457434 458464 1456 0.0 4574338.8 0.1X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - endsWith: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE 10360 10361 1 0.0 103596.7 1.0X
UNICODE 5667 5668 0 0.0 56674.0 1.8X
UTF8_BINARY 6307 6309 3 0.0 63069.2 1.6X
UNICODE_CI 311942 312293 496 0.0 3119419.4 0.0X
UTF8_BINARY_LCASE 57293 57312 27 0.0 572926.5 1.0X
UNICODE 52931 52955 34 0.0 529311.5 1.1X
UTF8_BINARY 7990 7992 3 0.0 79899.9 7.2X
UNICODE_CI 454790 459591 6790 0.0 4547899.8 0.1X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - mode - 30105 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE - mode - 30105 elements 4 4 0 80.4 12.4 1.0X
UNICODE - mode - 30105 elements 0 0 0 1277.7 0.8 15.9X
UTF8_BINARY - mode - 30105 elements 0 0 0 1282.2 0.8 15.9X
UNICODE_CI - mode - 30105 elements 9 9 0 32.5 30.7 0.4X
UTF8_BINARY_LCASE - mode - 30105 elements 43 44 1 7.0 143.1 1.0X
UNICODE - mode - 30105 elements 3 3 0 112.9 8.9 16.1X
UTF8_BINARY - mode - 30105 elements 3 3 0 113.9 8.8 16.3X
UNICODE_CI - mode - 30105 elements 102 103 1 3.0 338.2 0.4X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - mode [struct] - 30105 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------------------------
UNICODE - mode struct - 30105 elements 3 3 0 113.9 8.8 1.0X
UTF8_BINARY - mode struct - 30105 elements 3 3 0 113.5 8.8 1.0X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation e2e benchmarks - mode - 10000 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------
mode df column with collation - UTF8_BINARY_LCASE 58 68 7 0.2 5761.2 1.0X
mode df column with collation - UNICODE 45 51 7 0.2 4482.6 1.3X
mode df column with collation - UTF8_BINARY 43 46 5 0.2 4253.4 1.4X
mode df column with collation - UNICODE_CI 41 46 5 0.2 4085.2 1.4X

Loading

0 comments on commit 03e0f36

Please sign in to comment.