diff --git a/.gitignore b/.gitignore index dcf808e6752..890e6ed6dec 100644 --- a/.gitignore +++ b/.gitignore @@ -65,6 +65,7 @@ embedded_zookeeper/ /externals/kyuubi-spark-sql-engine/operation_logs/ /externals/kyuubi-spark-sql-engine/engine_operation_logs/ /externals/kyuubi-spark-sql-engine/spark-warehouse/ +/externals/kyuubi-spark-sql-engine/benchmarks/*-results.txt /work/ /docs/_build/ /kyuubi-common/metrics/ diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetHelper.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetHelper.scala new file mode 100644 index 00000000000..24fab3405a1 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetHelper.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.schema + +import java.sql.{Date, Timestamp} +import java.time.{Instant, LocalDate} + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +trait RowSetHelper { + protected def genRow(value: Int): Row = { + val boolVal = value % 3 match { + case 0 => true + case 1 => false + case _ => null + } + val byteVal = value.toByte + val shortVal = value.toShort + val longVal = value.toLong + val floatVal = java.lang.Float.valueOf(s"$value.$value") + val doubleVal = java.lang.Double.valueOf(s"$value.$value") + val stringVal = value.toString * value + val decimalVal = new java.math.BigDecimal(s"$value.$value") + val day = java.lang.String.format("%02d", java.lang.Integer.valueOf(value % 30 + 1)) + val dateVal = Date.valueOf(s"2018-11-$day") + val timestampVal = Timestamp.valueOf(s"2018-11-17 13:33:33.$value") + val binaryVal = Array.fill[Byte](value)(value.toByte) + val arrVal = Array.fill(value)(doubleVal).toSeq + val mapVal = Map(value -> doubleVal) + val interval = new CalendarInterval(value, value, value) + val localDate = LocalDate.of(2018, 11, 17) + val instant = Instant.now() + + Row( + boolVal, + byteVal, + shortVal, + value, + longVal, + floatVal, + doubleVal, + stringVal, + decimalVal, + dateVal, + timestampVal, + binaryVal, + arrVal, + mapVal, + interval, + localDate, + instant) + } + + protected val schemaStructFields: Seq[StructField] = Seq( + ("a", "boolean", "boolVal"), + ("b", "tinyint", "byteVal"), + ("c", "smallint", "shortVal"), + ("d", "int", "value"), + ("e", "bigint", "longVal"), + ("f", "float", "floatVal"), + ("g", "double", "doubleVal"), + ("h", "string", "stringVal"), + ("i", "decimal", "decimalVal"), + ("j", "date", "dateVal"), + ("k", "timestamp", "timestampVal"), + ("l", "binary", "binaryVal"), + ("m", "array", "arrVal"), + ("n", "map", "mapVal"), + ("o", "interval", "interval"), + ("p", "date", "localDate"), + ("q", "timestamp", "instant")) + .map { case (colName, typeName, comment) => + StructField(colName, CatalystSqlParser.parseDataType(typeName)).withComment(comment) + } + + protected val schema: StructType = StructType(schemaStructFields) +} diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala index dec18589775..66ce45aed1c 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala @@ -20,7 +20,6 @@ package org.apache.kyuubi.engine.spark.schema import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import java.time.{Instant, LocalDate} import scala.collection.JavaConverters._ @@ -32,70 +31,7 @@ import org.apache.spark.unsafe.types.CalendarInterval import org.apache.kyuubi.KyuubiFunSuite import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion -class RowSetSuite extends KyuubiFunSuite { - - def genRow(value: Int): Row = { - val boolVal = value % 3 match { - case 0 => true - case 1 => false - case _ => null - } - val byteVal = value.toByte - val shortVal = value.toShort - val longVal = value.toLong - val floatVal = java.lang.Float.valueOf(s"$value.$value") - val doubleVal = java.lang.Double.valueOf(s"$value.$value") - val stringVal = value.toString * value - val decimalVal = new java.math.BigDecimal(s"$value.$value") - val day = java.lang.String.format("%02d", java.lang.Integer.valueOf(value + 1)) - val dateVal = Date.valueOf(s"2018-11-$day") - val timestampVal = Timestamp.valueOf(s"2018-11-17 13:33:33.$value") - val binaryVal = Array.fill[Byte](value)(value.toByte) - val arrVal = Array.fill(value)(doubleVal).toSeq - val mapVal = Map(value -> doubleVal) - val interval = new CalendarInterval(value, value, value) - val localDate = LocalDate.of(2018, 11, 17) - val instant = Instant.now() - - Row( - boolVal, - byteVal, - shortVal, - value, - longVal, - floatVal, - doubleVal, - stringVal, - decimalVal, - dateVal, - timestampVal, - binaryVal, - arrVal, - mapVal, - interval, - localDate, - instant) - } - - val schema: StructType = new StructType() - .add("a", "boolean") - .add("b", "tinyint") - .add("c", "smallint") - .add("d", "int") - .add("e", "bigint") - .add("f", "float") - .add("g", "double") - .add("h", "string") - .add("i", "decimal") - .add("j", "date") - .add("k", "timestamp") - .add("l", "binary") - .add("m", "array") - .add("n", "map") - .add("o", "interval") - .add("p", "date") - .add("q", "timestamp") - +class RowSetSuite extends KyuubiFunSuite with RowSetHelper { private val rows: Seq[Row] = (0 to 10).map(genRow) ++ Seq(Row.fromSeq(Seq.fill(17)(null))) test("column based set") { diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/TRowSetBenchmark.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/TRowSetBenchmark.scala new file mode 100644 index 00000000000..80f3840c803 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/TRowSetBenchmark.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kyuubi + +import scala.concurrent.duration._ + +import org.apache.hive.service.rpc.thrift.TProtocolVersion +import org.apache.hive.service.rpc.thrift.TProtocolVersion._ +import org.apache.spark.kyuubi.benchmark.{Benchmark, KyuubiBenchmarkBase} +import org.apache.spark.sql.Row +import org.apache.spark.sql.types._ + +import org.apache.kyuubi.KyuubiFunSuite +import org.apache.kyuubi.engine.spark.schema.{RowSet, RowSetHelper} + +/** + * Benchmark to measure the performance of generate TRowSet. + * + * {{{ + * RUN_BENCHMARK=1 ./build/mvn clean test \ + * -pl externals/kyuubi-spark-sql-engine -am \ + * -Dtest=none -DwildcardSuites=org.apache.spark.kyuubi.TRowSetBenchmark + * }}} + */ +class TRowSetBenchmark extends KyuubiFunSuite with RowSetHelper with KyuubiBenchmarkBase { + private val runBenchmark = sys.env.contains("RUN_BENCHMARK") + + private val rowCount = 1000 + private val rotations = 5 + private lazy val allRows = (0 until rowCount).map(genRow) + + test("row-based toTRowSet benchmark") { + assume(runBenchmark) + val rowSetType = "column-based" + withHeader(rowSetType) { + tRowSetGenerationBenchmark(HIVE_CLI_SERVICE_PROTOCOL_V5, rowSetType) + } + } + + test("column-based toTRowSet benchmark") { + assume(runBenchmark) + val rowSetType = "column-based" + withHeader(rowSetType) { + tRowSetGenerationBenchmark(HIVE_CLI_SERVICE_PROTOCOL_V6, rowSetType) + } + } + + private def tRowSetGenerationBenchmark( + protocolVersion: TProtocolVersion, + rowSetType: String): Unit = { + val benchmark = + new Benchmark( + s"$rowSetType TRowSet benchmark", + rowCount, + warmupTime = 3.seconds, + output = output) + + schemaStructFields.zipWithIndex.foreach { + case (field, idx) => + val rowsOfSingleType = allRows.map(row => Row(row.get(idx))) + val schemaOfSingleType = StructType(Seq(field)) + + val commentOrName = field.getComment().getOrElse(field.dataType.typeName) + benchmark.addCase(s"$commentOrName", rotations) { _ => + benchmarkToTRowSet( + rowsOfSingleType, + schemaOfSingleType, + protocolVersion) + } + } + + benchmark.addCase(s"with all types", rotations) { _ => + benchmarkToTRowSet(allRows, schema, protocolVersion) + } + + benchmark.run() + } + + private def benchmarkToTRowSet( + rows: Seq[Row], + schema: StructType, + protocolVersion: TProtocolVersion): Unit = { + RowSet.toTRowSet(rows, schema, protocolVersion) + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/benchmark/Benchmark.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/benchmark/Benchmark.scala new file mode 100644 index 00000000000..040d8ef99fd --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/benchmark/Benchmark.scala @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kyuubi.benchmark + +import java.io.{OutputStream, PrintStream} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ +import scala.util.Try + +import org.apache.commons.io.output.TeeOutputStream +import org.apache.commons.lang3.SystemUtils +import org.apache.spark.util.Utils + +/** + * Copied from Apache Spark + * [[org.apache.spark.benchmark.Benchmark]] of the spark-core module + * + * Utility class to benchmark components. An example of how to use this is: + * val benchmark = new Benchmark("My Benchmark", valuesPerIteration) + * benchmark.addCase("V1")() + * benchmark.addCase("V2")() + * benchmark.run + * This will output the average time to run each function and the rate of each function. + * + * The benchmark function takes one argument that is the iteration that's being run. + * + * @param name name of this benchmark. + * @param valuesPerIteration number of values used in the test case, used to compute rows/s. + * @param minNumIters the min number of iterations that will be run per case, not counting warm-up. + * @param warmupTime amount of time to spend running dummy case iterations for JIT warm-up. + * @param minTime further iterations will be run for each case until this time is used up. + * @param outputPerIteration if true, the timing for each run will be printed to stdout. + * @param output optional output stream to write benchmark results to + */ +class Benchmark( + name: String, + valuesPerIteration: Long, + minNumIters: Int = 2, + warmupTime: FiniteDuration = 2.seconds, + minTime: FiniteDuration = 2.seconds, + outputPerIteration: Boolean = false, + output: Option[OutputStream] = None) { + import Benchmark._ + val benchmarks = mutable.ArrayBuffer.empty[Benchmark.Case] + + val out = if (output.isDefined) { + new PrintStream(new TeeOutputStream(System.out, output.get)) + } else { + System.out + } + + /** + * Adds a case to run when run() is called. The given function will be run for several + * iterations to collect timing statistics. + * + * @param name of the benchmark case + * @param numIters if non-zero, forces exactly this many iterations to be run + */ + def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = { + addTimerCase(name, numIters) { timer => + timer.startTiming() + f(timer.iteration) + timer.stopTiming() + } + } + + /** + * Adds a case with manual timing control. When the function is run, timing does not start + * until timer.startTiming() is called within the given function. The corresponding + * timer.stopTiming() method must be called before the function returns. + * + * @param name of the benchmark case + * @param numIters if non-zero, forces exactly this many iterations to be run + */ + def addTimerCase(name: String, numIters: Int = 0)(f: Benchmark.Timer => Unit): Unit = { + benchmarks += Benchmark.Case(name, f, numIters) + } + + /** + * Runs the benchmark and outputs the results to stdout. This should be copied and added as + * a comment with the benchmark. Although the results vary from machine to machine, it should + * provide some baseline. + */ + def run(): Unit = { + require(benchmarks.nonEmpty) + // scalastyle:off + println("Running benchmark: " + name) + + val results = benchmarks.map { c => + println(" Running case: " + c.name) + measure(valuesPerIteration, c.numIters)(c.fn) + } + println + + val firstBest = results.head.bestMs + // The results are going to be processor specific so it is useful to include that. + out.println(Benchmark.getJVMOSInfo()) + out.println(Benchmark.getProcessorName()) + val nameLen = Math.max(40, Math.max(name.length, benchmarks.map(_.name.length).max)) + out.printf( + s"%-${nameLen}s %14s %14s %11s %12s %13s %10s\n", + name + ":", + "Best Time(ms)", + "Avg Time(ms)", + "Stdev(ms)", + "Rate(M/s)", + "Per Row(ns)", + "Relative") + out.println("-" * (nameLen + 80)) + results.zip(benchmarks).foreach { case (result, benchmark) => + out.printf( + s"%-${nameLen}s %14s %14s %11s %12s %13s %10s\n", + benchmark.name, + "%5.0f" format result.bestMs, + "%4.0f" format result.avgMs, + "%5.0f" format result.stdevMs, + "%10.1f" format result.bestRate, + "%6.1f" format (1000 / result.bestRate), + "%3.1fX" format (firstBest / result.bestMs)) + } + out.println + // scalastyle:on + } + + /** + * Runs a single function `f` for iters, returning the average time the function took and + * the rate of the function. + */ + def measure(num: Long, overrideNumIters: Int)(f: Timer => Unit): Result = { + System.gc() // ensures garbage from previous cases don't impact this one + val warmupDeadline = warmupTime.fromNow + while (!warmupDeadline.isOverdue) { + f(new Benchmark.Timer(-1)) + } + val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters + val minDuration = if (overrideNumIters != 0) 0 else minTime.toNanos + val runTimes = ArrayBuffer[Long]() + var totalTime = 0L + var i = 0 + while (i < minIters || totalTime < minDuration) { + val timer = new Benchmark.Timer(i) + f(timer) + val runTime = timer.totalTime() + runTimes += runTime + totalTime += runTime + + if (outputPerIteration) { + // scalastyle:off + println(s"Iteration $i took ${NANOSECONDS.toMicros(runTime)} microseconds") + // scalastyle:on + } + i += 1 + } + // scalastyle:off + println(s" Stopped after $i iterations, ${NANOSECONDS.toMillis(runTimes.sum)} ms") + // scalastyle:on + assert(runTimes.nonEmpty) + val best = runTimes.min + val avg = runTimes.sum / runTimes.size + val stdev = if (runTimes.size > 1) { + math.sqrt(runTimes.map(time => (time - avg) * (time - avg)).sum / (runTimes.size - 1)) + } else 0 + Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0, stdev / 1000000.0) + } +} + +object Benchmark { + + /** + * Object available to benchmark code to control timing e.g. to exclude set-up time. + * + * @param iteration specifies this is the nth iteration of running the benchmark case + */ + class Timer(val iteration: Int) { + private var accumulatedTime: Long = 0L + private var timeStart: Long = 0L + + def startTiming(): Unit = { + assert(timeStart == 0L, "Already started timing.") + timeStart = System.nanoTime + } + + def stopTiming(): Unit = { + assert(timeStart != 0L, "Have not started timing.") + accumulatedTime += System.nanoTime - timeStart + timeStart = 0L + } + + def totalTime(): Long = { + assert(timeStart == 0L, "Have not stopped timing.") + accumulatedTime + } + } + + case class Case(name: String, fn: Timer => Unit, numIters: Int) + case class Result(avgMs: Double, bestRate: Double, bestMs: Double, stdevMs: Double) + + /** + * This should return a user helpful processor information. Getting at this depends on the OS. + * This should return something like "Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz" + */ + def getProcessorName(): String = { + val cpu = if (SystemUtils.IS_OS_MAC_OSX) { + Utils.executeAndGetOutput(Seq("/usr/sbin/sysctl", "-n", "machdep.cpu.brand_string")) + .stripLineEnd + } else if (SystemUtils.IS_OS_LINUX) { + Try { + val grepPath = Utils.executeAndGetOutput(Seq("which", "grep")).stripLineEnd + Utils.executeAndGetOutput(Seq(grepPath, "-m", "1", "model name", "/proc/cpuinfo")) + .stripLineEnd.replaceFirst("model name[\\s*]:[\\s*]", "") + }.getOrElse("Unknown processor") + } else { + System.getenv("PROCESSOR_IDENTIFIER") + } + cpu + } + + /** + * This should return a user helpful JVM & OS information. + * This should return something like + * "OpenJDK 64-Bit Server VM 1.8.0_65-b17 on Linux 4.1.13-100.fc21.x86_64" + */ + def getJVMOSInfo(): String = { + val vmName = System.getProperty("java.vm.name") + val runtimeVersion = System.getProperty("java.runtime.version") + val osName = System.getProperty("os.name") + val osVersion = System.getProperty("os.version") + s"${vmName} ${runtimeVersion} on ${osName} ${osVersion}" + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/benchmark/KyuubiBenchmarkBase.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/benchmark/KyuubiBenchmarkBase.scala new file mode 100644 index 00000000000..4d5aa4904e0 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/benchmark/KyuubiBenchmarkBase.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kyuubi.benchmark + +import java.io.{File, FileOutputStream, OutputStream} + +import scala.collection.JavaConverters._ + +import com.google.common.reflect.ClassPath +import org.scalatest.Assertions._ + +trait KyuubiBenchmarkBase { + var output: Option[OutputStream] = None + + private val prefix = { + val benchmarkClasses = ClassPath.from(Thread.currentThread.getContextClassLoader) + .getTopLevelClassesRecursive("org.apache.spark.kyuubi").asScala.toArray + assert(benchmarkClasses.nonEmpty) + val benchmark = benchmarkClasses.find(_.load().getName.endsWith("Benchmark")) + val targetDirOrProjDir = + new File(benchmark.get.load().getProtectionDomain.getCodeSource.getLocation.toURI) + .getParentFile.getParentFile + if (targetDirOrProjDir.getName == "target") { + targetDirOrProjDir.getParentFile.getCanonicalPath + "/" + } else { + targetDirOrProjDir.getCanonicalPath + "/" + } + } + + def withHeader(appendix: String)(func: => Unit): Unit = { + val resultFileName = + s"${this.getClass.getSimpleName.replace("$", "")}-$appendix-results.txt" + val dir = new File(s"${prefix}benchmarks/") + if (!dir.exists()) { + // scalastyle:off println + println(s"Creating ${dir.getAbsolutePath} for benchmark results.") + // scalastyle:on println + dir.mkdirs() + } + val file = new File(dir, resultFileName) + if (!file.exists()) { + file.createNewFile() + } + output = Some(new FileOutputStream(file)) + + func + + output.foreach { o => + if (o != null) { + o.close() + } + } + } +}