diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1e759b6266c61..7343ca86d3465 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2671,6 +2671,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val ANALYZE_PARTITION_STATS_ENABLED = + buildConf("spark.sql.statistics.update.partitionStats.enabled") + .doc("When this config is enabled, Spark will also update partition statistics in analyze " + + "table command (i.e., ANALYZE TABLE .. COMPUTE STATISTICS [NOSCAN]). Note the command " + + "will also become more expensive. When this config is disabled, Spark will only " + + "update table level statistics.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + val CBO_ENABLED = buildConf("spark.sql.cbo.enabled") .doc("Enables CBO for estimation of plan statistics when set true.") @@ -5099,6 +5109,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def autoSizeUpdateEnabled: Boolean = getConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED) + def analyzePartitionStatsEnabled: Boolean = getConf(SQLConf.ANALYZE_PARTITION_STATS_ENABLED) + def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED) def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 7a18fbdd03d88..a680127c0ee69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -231,6 +231,7 @@ object CommandUtils extends Logging { tableIdent: TableIdentifier, noScan: Boolean): Unit = { val sessionState = sparkSession.sessionState + val partitionStatsEnabled = sessionState.conf.analyzePartitionStatsEnabled val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB) @@ -249,7 +250,7 @@ object CommandUtils extends Logging { } else { // Compute stats for the whole table val rowCounts: Map[TablePartitionSpec, BigInt] = - if (noScan) { + if (noScan || !partitionStatsEnabled) { Map.empty } else { calculateRowCountsPerPartition(sparkSession, tableMeta, None) @@ -266,8 +267,8 @@ object CommandUtils extends Logging { if (newStats.isDefined) { sessionState.catalog.alterTableStats(tableIdentWithDB, newStats) } - // Also update partition stats - if (newPartitions.nonEmpty) { + // Also update partition stats when the config is enabled + if (newPartitions.nonEmpty && partitionStatsEnabled) { sessionState.catalog.alterPartitions(tableIdentWithDB, newPartitions) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 571fae6e98dd4..4b9e478bc89f8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -372,54 +372,70 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto partition.stats } - withTable(tableName) { - withTempPath { path => - // Create a table with 3 partitions all located under a single top-level directory 'path' - sql( - s""" - |CREATE TABLE $tableName (key STRING, value STRING) - |USING hive - |PARTITIONED BY (ds STRING) - |LOCATION '${path.toURI}' - """.stripMargin) + Seq(true, false).foreach { partitionStatsEnabled => + withSQLConf(SQLConf.ANALYZE_PARTITION_STATS_ENABLED.key -> partitionStatsEnabled.toString) { + withTable(tableName) { + withTempPath { path => + // Create a table with 3 partitions all located under a directory 'path' + sql( + s""" + |CREATE TABLE $tableName (key STRING, value STRING) + |USING hive + |PARTITIONED BY (ds STRING) + |LOCATION '${path.toURI}' + """.stripMargin) - val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03") + val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03") - partitionDates.foreach { ds => - sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds') LOCATION '$path/ds=$ds'") - sql("SELECT * FROM src").write.mode(SaveMode.Overwrite) - .format("parquet").save(s"$path/ds=$ds") - } + partitionDates.foreach { ds => + sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds') LOCATION '$path/ds=$ds'") + sql("SELECT * from src").write.mode(SaveMode.Overwrite) + .format("parquet").save(s"$path/ds=$ds") + } - assert(getCatalogTable(tableName).stats.isEmpty) - partitionDates.foreach { ds => - assert(queryStats(ds).isEmpty) - } + assert(getCatalogTable(tableName).stats.isEmpty) + partitionDates.foreach { ds => + assert(queryStats(ds).isEmpty) + } - sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS NOSCAN") + sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS NOSCAN") - // Table and partition stats should also have been updated - assert(getTableStats(tableName).sizeInBytes == 3 * 4411) - assert(getTableStats(tableName).rowCount.isEmpty) - partitionDates.foreach { ds => - val partStats = queryStats(ds) - assert(partStats.nonEmpty) - assert(partStats.get.sizeInBytes == 4411) - assert(partStats.get.rowCount.isEmpty) - } + val expectedRowCount = 25 - sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS") + // Table size should also have been updated + assert(getTableStats(tableName).sizeInBytes > 0) + // Row count should NOT be updated with the `NOSCAN` option + assert(getTableStats(tableName).rowCount.isEmpty) - assert(getTableStats(tableName).sizeInBytes == 3 * 4411) - // Table row count should be updated - assert(getTableStats(tableName).rowCount.get == 75) + partitionDates.foreach { ds => + val partStats = queryStats(ds) + if (partitionStatsEnabled) { + assert(partStats.nonEmpty) + assert(partStats.get.sizeInBytes > 0) + assert(partStats.get.rowCount.isEmpty) + } else { + assert(partStats.isEmpty) + } + } - partitionDates.foreach { ds => - val partStats = queryStats(ds) - assert(partStats.nonEmpty) - // The scan option should update partition row count - assert(partStats.get.sizeInBytes == 4411) - assert(partStats.get.rowCount.get == 25) + sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS") + + assert(getTableStats(tableName).sizeInBytes > 0) + // Table row count should be updated + assert(getTableStats(tableName).rowCount.get == 3 * expectedRowCount) + + partitionDates.foreach { ds => + val partStats = queryStats(ds) + if (partitionStatsEnabled) { + assert(partStats.nonEmpty) + // The scan option should update partition row count + assert(partStats.get.sizeInBytes > 0) + assert(partStats.get.rowCount.get == expectedRowCount) + } else { + assert(partStats.isEmpty) + } + } + } } } }