Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mockup of collation aware hash function #2

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,16 @@ class OpenHashMap[K : ClassTag, @specialized(Long, Int, Double) V: ClassTag](

def this() = this(64)

protected var _keySet = new OpenHashSet[K](initialCapacity)
protected var _keySet = new OpenHashSet[K](initialCapacity, 0.7)

/*
specialCase match {
case -1 => None
case _ => Some(o =>
CollationFactory.fetchCollation(specialCase)
.hashFunction.applyAsLong(o.asInstanceOf[UTF8String])
.toInt)
}) */

// Init in constructor (instead of in declaration) to work around a Scala compiler specialization
// bug that would generate two arrays (one for Object and one for specialized T).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import scala.reflect._
import com.google.common.hash.Hashing

import org.apache.spark.annotation.Private
import org.apache.spark.sql.catalyst.util.CollationFactory
import org.apache.spark.unsafe.types.UTF8String

/**
* A simple, fast hash set optimized for non-null insertion-only use case, where keys are never
Expand All @@ -43,7 +45,17 @@ import org.apache.spark.annotation.Private
@Private
class OpenHashSet[@specialized(Long, Int, Double, Float) T: ClassTag](
initialCapacity: Int,
loadFactor: Double)
loadFactor: Double,
var specialPassedInHasher: Option[Object => Int] = Some(o => {
val i = CollationFactory.fetchCollation(1)
.hashFunction.applyAsLong(o.asInstanceOf[UTF8String])
.toInt
// scalastyle:off println
println(s"Hashing: $o -> $i")
// scalastyle:on println
i
})
)
extends Serializable {

require(initialCapacity <= OpenHashSet.MAX_CAPACITY,
Expand All @@ -67,7 +79,10 @@ class OpenHashSet[@specialized(Long, Int, Double, Float) T: ClassTag](
case ClassTag.Int => new IntHasher().asInstanceOf[Hasher[T]]
case ClassTag.Double => new DoubleHasher().asInstanceOf[Hasher[T]]
case ClassTag.Float => new FloatHasher().asInstanceOf[Hasher[T]]
case _ => new Hasher[T]
case _ =>
specialPassedInHasher.map(f =>
new CustomHasher(f.asInstanceOf[Any => Int]).asInstanceOf[Hasher[T]]).getOrElse(
new Hasher[T])
}

protected var _capacity = nextPowerOf2(initialCapacity)
Expand Down Expand Up @@ -118,8 +133,15 @@ class OpenHashSet[@specialized(Long, Int, Double, Float) T: ClassTag](
* See: https://issues.apache.org/jira/browse/SPARK-45599
*/
@annotation.nowarn("cat=other-non-cooperative-equals")
private def keyExistsAtPos(k: T, pos: Int) =
_data(pos) equals k
private def keyExistsAtPos(k: T, pos: Int) = {
classTag[T] match {
case ClassTag.Long => _data(pos) equals k
case ClassTag.Int => _data(pos) equals k
case ClassTag.Double => _data(pos) equals k
case ClassTag.Float => _data(pos) equals k
case _ => _data(pos).asInstanceOf[UTF8String].semanticEquals(k.asInstanceOf[UTF8String], 1)
}
}

/**
* Add an element to the set. This one differs from add in that it doesn't trigger rehashing.
Expand Down Expand Up @@ -291,9 +313,6 @@ object OpenHashSet {
* A set of specialized hash function implementation to avoid boxing hash code computation
* in the specialized implementation of OpenHashSet.
*/
sealed class Hasher[@specialized(Long, Int, Double, Float) T] extends Serializable {
def hash(o: T): Int = o.hashCode()
}

class LongHasher extends Hasher[Long] {
override def hash(o: Long): Int = (o ^ (o >>> 32)).toInt
Expand All @@ -314,6 +333,16 @@ object OpenHashSet {
override def hash(o: Float): Int = java.lang.Float.floatToIntBits(o)
}

class Hasher[@specialized(Long, Int, Double, Float) T] extends Serializable {
def hash(o: T): Int = o.hashCode()
}

class CustomHasher(f: Any => Int) extends Hasher[Any] {
override def hash(o: Any): Int = {
f(o)
}
}

private def grow1(newSize: Int): Unit = {}
private def move1(oldPos: Int, newPos: Int): Unit = { }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, UnresolvedWith
import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, Expression, ExpressionDescription, ImplicitCastInputTypes, SortOrder}
import org.apache.spark.sql.catalyst.trees.UnaryLike
import org.apache.spark.sql.catalyst.types.PhysicalDataType
import org.apache.spark.sql.catalyst.util.GenericArrayData
import org.apache.spark.sql.catalyst.util.{GenericArrayData}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, ArrayType, BooleanType, DataType}
import org.apache.spark.util.collection.OpenHashMap
Expand Down Expand Up @@ -74,7 +74,6 @@ case class Mode(
if (buffer.isEmpty) {
return null
}

reverseOpt.map { reverse =>
val defaultKeyOrdering = if (reverse) {
PhysicalDataType.ordering(child.dataType).asInstanceOf[Ordering[AnyRef]].reverse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,11 @@ abstract class TypedAggregateWithHashMapAsBuffer
override def createAggregationBuffer(): OpenHashMap[AnyRef, Long] = {
// Initialize new counts map instance here.
new OpenHashMap[AnyRef, Long]()
/* 64, child.dataType match {
case StringType if child.dataType.asInstanceOf[StringType].isUTF8BinaryLcaseCollation => 1
case StringType => 0
case _ => -1
}) */
}

protected def child: Expression
Expand Down Expand Up @@ -681,6 +686,11 @@ abstract class TypedAggregateWithHashMapAsBuffer
val ins = new DataInputStream(bis)
try {
val counts = new OpenHashMap[AnyRef, Long]
/* (64, child.dataType match {
case StringType if child.dataType.asInstanceOf[StringType].isUTF8BinaryLcaseCollation => 1
case StringType => 0
case _ => -1
}) */
// Read unsafeRow size and content in bytes.
var sizeOfNextRow = ins.readInt()
while (sizeOfNextRow >= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.util

import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

object UnsafeRowUtils {

Expand Down
85 changes: 55 additions & 30 deletions sql/core/benchmarks/CollationBenchmark-jdk21-results.txt
Original file line number Diff line number Diff line change
@@ -1,54 +1,79 @@
OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - equalsFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE 2948 2958 13 0.0 29483.6 1.0X
UNICODE 2040 2042 3 0.0 20396.6 1.4X
UTF8_BINARY 2043 2043 0 0.0 20426.3 1.4X
UNICODE_CI 16318 16338 28 0.0 163178.4 0.2X
UTF8_BINARY_LCASE 2889 2899 14 0.0 28891.4 1.0X
UNICODE 2018 2020 3 0.0 20175.4 1.4X
UTF8_BINARY 2017 2019 2 0.0 20173.7 1.4X
UNICODE_CI 17402 17403 3 0.0 174016.8 0.2X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - compareFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE 3227 3228 1 0.0 32272.1 1.0X
UNICODE 16637 16643 9 0.0 166367.7 0.2X
UTF8_BINARY 3132 3137 7 0.0 31319.2 1.0X
UNICODE_CI 17816 17829 18 0.0 178162.4 0.2X
UTF8_BINARY_LCASE 2937 2966 42 0.0 29366.7 1.0X
UNICODE 16791 16796 7 0.0 167906.4 0.2X
UTF8_BINARY 3123 3125 3 0.0 31227.3 0.9X
UNICODE_CI 17878 17880 3 0.0 178777.4 0.2X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - hashFunction: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE 4824 4824 0 0.0 48243.7 1.0X
UNICODE 69416 69475 84 0.0 694158.3 0.1X
UTF8_BINARY 3806 3808 2 0.0 38062.8 1.3X
UNICODE_CI 60943 60975 45 0.0 609426.2 0.1X
UTF8_BINARY_LCASE 4809 4824 21 0.0 48088.3 1.0X
UNICODE 65472 65489 24 0.0 654719.7 0.1X
UTF8_BINARY 3804 3806 3 0.0 38043.0 1.3X
UNICODE_CI 52962 53004 59 0.0 529620.9 0.1X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - contains: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE 11979 11980 1 0.0 119790.4 1.0X
UNICODE 6469 6474 7 0.0 64694.8 1.9X
UTF8_BINARY 7253 7253 1 0.0 72528.3 1.7X
UNICODE_CI 319124 319881 1070 0.0 3191244.0 0.0X
UTF8_BINARY_LCASE 116774 116794 28 0.0 1167739.5 1.0X
UNICODE 51045 51116 100 0.0 510448.0 2.3X
UTF8_BINARY 8184 8186 2 0.0 81841.3 14.3X
UNICODE_CI 452447 452538 129 0.0 4524465.6 0.3X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - startsWith: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE 11584 11595 15 0.0 115841.4 1.0X
UNICODE 6155 6156 2 0.0 61548.7 1.9X
UTF8_BINARY 6979 6982 5 0.0 69785.6 1.7X
UNICODE_CI 318228 318726 705 0.0 3182275.2 0.0X
UTF8_BINARY_LCASE 60647 60692 63 0.0 606473.3 1.0X
UNICODE 53281 53281 1 0.0 532809.5 1.1X
UTF8_BINARY 7855 7861 8 0.0 78554.6 7.7X
UNICODE_CI 457434 458464 1456 0.0 4574338.8 0.1X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - endsWith: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE 11655 11664 12 0.0 116552.8 1.0X
UNICODE 6235 6239 5 0.0 62350.8 1.9X
UTF8_BINARY 7066 7069 5 0.0 70658.1 1.6X
UNICODE_CI 313515 313999 685 0.0 3135149.1 0.0X
UTF8_BINARY_LCASE 57293 57312 27 0.0 572926.5 1.0X
UNICODE 52931 52955 34 0.0 529311.5 1.1X
UTF8_BINARY 7990 7992 3 0.0 79899.9 7.2X
UNICODE_CI 454790 459591 6790 0.0 4547899.8 0.1X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - mode - 30105 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------
UTF8_BINARY_LCASE - mode - 30105 elements 43 44 1 7.0 143.1 1.0X
UNICODE - mode - 30105 elements 3 3 0 112.9 8.9 16.1X
UTF8_BINARY - mode - 30105 elements 3 3 0 113.9 8.8 16.3X
UNICODE_CI - mode - 30105 elements 102 103 1 3.0 338.2 0.4X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation unit benchmarks - mode [struct] - 30105 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------------------------
UNICODE - mode struct - 30105 elements 3 3 0 113.9 8.8 1.0X
UTF8_BINARY - mode struct - 30105 elements 3 3 0 113.5 8.8 1.0X

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
collation e2e benchmarks - mode - 10000 elements: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------
mode df column with collation - UTF8_BINARY_LCASE 58 68 7 0.2 5761.2 1.0X
mode df column with collation - UNICODE 45 51 7 0.2 4482.6 1.3X
mode df column with collation - UTF8_BINARY 43 46 5 0.2 4253.4 1.4X
mode df column with collation - UNICODE_CI 41 46 5 0.2 4085.2 1.4X

Loading
Loading