Skip to content

Commit

Permalink
what it could look like
Browse files Browse the repository at this point in the history
  • Loading branch information
GideonPotok committed May 20, 2024
1 parent 6dafa39 commit 2268044
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ case class Mode(
this(child, 0, 0, Some(reverse))
}

private lazy val binaryKeys: scala.collection.mutable.Map[String, String] =
scala.collection.mutable.Map.empty

// Returns null for empty inputs
override def nullable: Boolean = true

Expand Down Expand Up @@ -83,8 +86,24 @@ case class Mode(
input: InternalRow): OpenHashMap[AnyRef, Long] = {
val key = child.eval(input)

val keyNew = child.dataType match {
case c: StringType if
!CollationFactory.fetchCollation(c.collationId).supportsBinaryEquality =>
val collationId = c.collationId
key match {
case (key: String, _) =>
CollationFactory.getCollationKey(UTF8String.fromString(key), collationId)
case (key: UTF8String, _) =>
CollationFactory.getCollationKey(key, collationId)
case (key, _) => key
}
case _ => key
}
if (key != null) {
buffer.changeValue(InternalRow.copyValue(key).asInstanceOf[AnyRef], 1L, _ + 1L)
buffer.changeValue(InternalRow.copyValue(keyNew).asInstanceOf[AnyRef], 1L, _ + 1L)
if(key != keyNew && !binaryKeys.contains(keyNew.toString)) {
binaryKeys.put(keyNew.toString, key.toString)
}
}
buffer
}
Expand All @@ -102,29 +121,16 @@ case class Mode(
if (buffer.isEmpty) {
return null
}
val collationAwareBuffer = child.dataType match {
case c: StringType if
!CollationFactory.fetchCollation(c.collationId).supportsBinaryEquality =>
val collationId = c.collationId
val modeMap = buffer.toSeq.groupMapReduce {
case (key: String, _) =>
CollationFactory.getCollationKey(UTF8String.fromString(key), collationId)
case (key: UTF8String, _) =>
CollationFactory.getCollationKey(key, collationId)
case (key, _) => key
}(x => x)((x, y) => (x._1, x._2 + y._2)).values
modeMap
// case s: StructType => getBufferForStructType(buffer, s)
case _ => buffer
}
val collationAwareBuffer = buffer
reverseOpt.map { reverse =>
val defaultKeyOrdering = if (reverse) {
PhysicalDataType.ordering(child.dataType).asInstanceOf[Ordering[AnyRef]].reverse
} else {
PhysicalDataType.ordering(child.dataType).asInstanceOf[Ordering[AnyRef]]
}
val ordering = Ordering.Tuple2(Ordering.Long, defaultKeyOrdering)
collationAwareBuffer.maxBy { case (key, count) => (count, key) }(ordering)
collationAwareBuffer.maxBy { case (key, count) => (count,
binaryKeys.getOrElse(key.toString, key)) }(ordering)
}.getOrElse(collationAwareBuffer.maxBy(_._2))._1
}
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,8 @@ object CollationBenchmark extends CollationBenchmarkBase {
benchmarkStartsWith(collationTypes, inputs)
benchmarkEndsWith(collationTypes, inputs)
benchmarkMode(collationTypes, generateBaseInputStringswithUniqueGroupNumber(10000L))
benchmarkModeStruct(collationTypes.filter(c => c == "UNICODE" || c == "UTF8_BINARY"), generateBaseInputStringswithUniqueGroupNumber(10000L))
benchmarkModeStruct(collationTypes.filter(c => c == "UNICODE" || c == "UTF8_BINARY"),
generateBaseInputStringswithUniqueGroupNumber(10000L))
benchmarkModeOnDataFrame(collationTypes, generateDataframeInput(10000L))
}
}
Expand Down

0 comments on commit 2268044

Please sign in to comment.