From eef06ec41622fee39501cc58826fb68adeb3e3e7 Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Sun, 6 Oct 2024 21:50:37 +1100 Subject: [PATCH 1/3] - Add rand_range with min and max + randn with mean and variance - Add rand_range with min and max - Add randn with mean and variance Formatting Revert unrelated changes --- .../apache/spark/sql/daria/functions.scala | 101 ++++++++++++++++-- .../spark/sql/daria/functionsTests.scala | 55 ++++++++-- 2 files changed, 140 insertions(+), 16 deletions(-) diff --git a/unsafe/src/main/scala/org/apache/spark/sql/daria/functions.scala b/unsafe/src/main/scala/org/apache/spark/sql/daria/functions.scala index d4322e5..ba8c027 100644 --- a/unsafe/src/main/scala/org/apache/spark/sql/daria/functions.scala +++ b/unsafe/src/main/scala/org/apache/spark/sql/daria/functions.scala @@ -2,25 +2,110 @@ package org.apache.spark.sql.daria import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.expressions.{Expression, RandGamma} -import org.apache.spark.sql.functions.{lit, log, rand, when} +import org.apache.spark.sql.functions.{lit, log, when} +import org.apache.spark.sql.{functions => F} import org.apache.spark.util.Utils object functions { private def withExpr(expr: Expression): Column = Column(expr) - def randGamma(seed: Long, shape: Double, scale: Double): Column = withExpr(RandGamma(seed, shape, scale)).alias("gamma_random") - def randGamma(shape: Double, scale: Double): Column = randGamma(Utils.random.nextLong, shape, scale) - def randGamma(): Column = randGamma(1.0, 1.0) + /** + * Generate a column with independent and identically distributed (i.i.d.) samples + * from the Gamma distribution with the specified shape and scale parameters. + * + * @note The function is non-deterministic in general case. + */ + def rand_gamma(seed: Long, shape: Double, scale: Double): Column = withExpr(RandGamma(seed, shape, scale)).alias("gamma_random") - def randLaplace(seed: Long, mu: Double, beta: Double): Column = { + /** + * Generate a column with independent and identically distributed (i.i.d.) samples + * from the Gamma distribution with the specified shape and scale parameters. + * + * @note The function is non-deterministic in general case. + */ + def rand_gamma(shape: Double, scale: Double): Column = rand_gamma(Utils.random.nextLong, shape, scale) + + /** + * Generate a column with independent and identically distributed (i.i.d.) samples + * from the Gamma distribution with default parameters (shape = 1.0, scale = 1.0). + * + * @return A column with i.i.d. samples from the default Gamma distribution. + * + * @note The function is non-deterministic in general case. + */ + def rand_gamma(): Column = rand_gamma(1.0, 1.0) + + /** + * Generate a column with independent and identically distributed (i.i.d.) samples + * from the Laplace distribution with the specified location parameter `mu` and scale parameter `beta`. + * + * @note The function is non-deterministic in general case. + */ + def rand_laplace(seed: Long, mu: Double, beta: Double): Column = { val mu_ = lit(mu) val beta_ = lit(beta) - val u = rand(seed) + val u = F.rand(seed) when(u < 0.5, mu_ + beta_ * log(lit(2) * u)) .otherwise(mu_ - beta_ * log(lit(2) * (lit(1) - u))) .alias("laplace_random") } - def randLaplace(mu: Double, beta: Double): Column = randLaplace(Utils.random.nextLong, mu, beta) - def randLaplace(): Column = randLaplace(0.0, 1.0) + /** + * Generate a column with independent and identically distributed (i.i.d.) samples + * from the Laplace distribution with the specified location parameter `mu` and scale parameter `beta`. + * + * @note The function is non-deterministic in general case. + */ + def rand_laplace(mu: Double, beta: Double): Column = rand_laplace(Utils.random.nextLong, mu, beta) + + /** + * Generate a column with independent and identically distributed (i.i.d.) samples + * from the Laplace distribution with default parameters (mu = 0.0, beta = 1.0). + * + * @note The function is non-deterministic in general case. + */ + def rand_laplace(): Column = rand_laplace(0.0, 1.0) + + /** + * Generate a random column with independent and identically distributed (i.i.d.) samples + * uniformly distributed in [`min`, `max`). + * + * @note The function is non-deterministic in general case. + */ + def rand_range(seed: Long, min: Int, max: Int): Column = { + val min_ = lit(min) + val max_ = lit(max) + min_ + (max_ - min_) * F.rand(seed) + } + + /** + * Generate a random column with independent and identically distributed (i.i.d.) samples + * uniformly distributed in [`min`, `max`). + * + * @note The function is non-deterministic in general case. + */ + def rand_range(min: Int, max: Int): Column = { + rand_range(Utils.random.nextLong, min, max) + } + + /** + * Generate a column with independent and identically distributed (i.i.d.) samples from + * the standard normal distribution with given `mean` and `variance`. + * + * @note The function is non-deterministic in general case. + */ + def randn(seed: Long, mean: Double, variance: Double): Column = { + val stddev = math.sqrt(variance) + F.randn(seed) * lit(stddev) + lit(mean) + } + + /** + * Generate a column with independent and identically distributed (i.i.d.) samples from + * the standard normal distribution with given `mean` and `variance`. + * + * @note The function is non-deterministic in general case. + */ + def randn(mean: Double, variance: Double): Column = { + randn(Utils.random.nextLong, mean, variance) + } } diff --git a/unsafe/src/test/scala/org/apache/spark/sql/daria/functionsTests.scala b/unsafe/src/test/scala/org/apache/spark/sql/daria/functionsTests.scala index 5147c13..c026387 100644 --- a/unsafe/src/test/scala/org/apache/spark/sql/daria/functionsTests.scala +++ b/unsafe/src/test/scala/org/apache/spark/sql/daria/functionsTests.scala @@ -2,8 +2,7 @@ package org.apache.spark.sql.daria import com.github.mrpowers.spark.fast.tests.{ColumnComparer, DataFrameComparer} import org.apache.spark.sql.daria.functions._ -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.functions.stddev +import org.apache.spark.sql.{functions => F} import utest._ object functionsTests extends TestSuite with DataFrameComparer with ColumnComparer with SparkSessionTestWrapper { @@ -11,11 +10,11 @@ object functionsTests extends TestSuite with DataFrameComparer with ColumnCompar val tests = Tests { 'rand_gamma - { "has correct mean and standard deviation" - { - val sourceDF = spark.range(100000).select(randGamma(2.0, 2.0)) + val sourceDF = spark.range(100000).select(rand_gamma(2.0, 2.0)) val stats = sourceDF .agg( - mean("gamma_random").as("mean"), - stddev("gamma_random").as("stddev") + F.mean("gamma_random").as("mean"), + F.stddev("gamma_random").as("stddev") ) .collect()(0) @@ -31,11 +30,11 @@ object functionsTests extends TestSuite with DataFrameComparer with ColumnCompar 'rand_laplace - { "has correct mean and standard deviation" - { - val sourceDF = spark.range(100000).select(randLaplace()) + val sourceDF = spark.range(100000).select(rand_laplace()) val stats = sourceDF .agg( - mean("laplace_random").as("mean"), - stddev("laplace_random").as("std_dev") + F.mean("laplace_random").as("mean"), + F.stddev("laplace_random").as("std_dev") ) .collect()(0) @@ -47,5 +46,45 @@ object functionsTests extends TestSuite with DataFrameComparer with ColumnCompar assert(math.abs(laplaceStdDev - math.sqrt(2.0)) < 0.5) } } + + 'rand - { + "has correct min and max" - { + val min = 5 + val max = 10 + val sourceDF = spark.range(100000).select(rand_range(min, max).as("rand_min_max")) + val stats = sourceDF + .agg( + F.min("rand_min_max").as("min"), + F.min("rand_min_max").as("max") + ) + .collect()(0) + + val uniformMin = stats.getAs[Double]("min") + val uniformMax = stats.getAs[Double]("max") + + assert(uniformMin >= min) + assert(uniformMax <= max) + } + } + + 'randn - { + "has correct mean and variance" - { + val mean = 1 + val variance = 2 + val sourceDF = spark.range(100000).select(randn(mean, variance).as("rand_normal")) + val stats = sourceDF + .agg( + F.mean("rand_normal").as("mean"), + F.variance("rand_normal").as("variance") + ) + .collect()(0) + + val normalMean = stats.getAs[Double]("mean") + val normalVariance = stats.getAs[Double]("variance") + + assert(math.abs(normalMean - mean) <= 0.1) + assert(math.abs(normalVariance - variance) <= 0.1) + } + } } } From cff8c124e39a4a201e543051d055498af5e5e045 Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Sat, 12 Oct 2024 14:22:57 +1100 Subject: [PATCH 2/3] Update Add comment and formatting --- .../apache/spark/sql/daria/functions.scala | 6 ++--- .../spark/sql/daria/functionsTests.scala | 27 ++++++++++--------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/unsafe/src/main/scala/org/apache/spark/sql/daria/functions.scala b/unsafe/src/main/scala/org/apache/spark/sql/daria/functions.scala index 2726084..6995a8f 100644 --- a/unsafe/src/main/scala/org/apache/spark/sql/daria/functions.scala +++ b/unsafe/src/main/scala/org/apache/spark/sql/daria/functions.scala @@ -15,7 +15,7 @@ object functions { * * @note The function is non-deterministic in general case. */ - def rand_gamma(seed: Long, shape: Double, scale: Double): Column = withExpr(RandGamma(seed, shape, scale)).alias("gamma_random") + def rand_gamma(seed: Long, shape: Double, scale: Double): Column = withExpr(RandGamma(seed, shape, scale)).alias("gamma_random") /** * Generate a column with independent and identically distributed (i.i.d.) samples @@ -39,7 +39,7 @@ object functions { * * @note The function is non-deterministic in general case. */ - def rand_gamma(shape: Column, scale: Column): Column = rand_gamma(lit(Utils.random.nextLong), shape, scale) + def rand_gamma(shape: Column, scale: Column): Column = rand_gamma(lit(Utils.random.nextLong), shape, scale) /** * Generate a column with independent and identically distributed (i.i.d.) samples @@ -58,7 +58,7 @@ object functions { * @note The function is non-deterministic in general case. */ def rand_laplace(seed: Long, mu: Column, beta: Column): Column = { - val u = F.rand(seed) - lit(0.5) + val u = F.rand(seed) - lit(0.5) mu - beta * signum(u) * log(lit(1) - (lit(2) * F.abs(u))) } diff --git a/unsafe/src/test/scala/org/apache/spark/sql/daria/functionsTests.scala b/unsafe/src/test/scala/org/apache/spark/sql/daria/functionsTests.scala index b0b06da..3b84924 100644 --- a/unsafe/src/test/scala/org/apache/spark/sql/daria/functionsTests.scala +++ b/unsafe/src/test/scala/org/apache/spark/sql/daria/functionsTests.scala @@ -61,19 +61,20 @@ object functionsTests extends TestSuite with DataFrameComparer with ColumnCompar ) .collect()(0) - val laplaceMean = stats.getAs[Double]("mean") - val laplaceStdDev = stats.getAs[Double]("std_dev") + val laplaceMean = stats.getAs[Double]("mean") + val laplaceStdDev = stats.getAs[Double]("std_dev") val expectedStdDev = math.sqrt(2) - // Laplace distribution with mean=0.0 and scale=1.0 has mean=0.0 and stddev=sqrt(2.0) + // Laplace distribution with mean=0.0 and scale=1.0 has mean=0.0 and stddev=sqrt(2.0) * beta (1.0 by default) assert(math.abs(laplaceMean) <= 0.1) assert(math.abs(laplaceStdDev - expectedStdDev) <= 0.1) } "has correct mean and standard deviation with custom mean and scale" - { - val mu = 1.0 + val mu = 1.0 val beta = 2.0 - val sourceDF = spark.range(100000) + val sourceDF = spark + .range(100000) .withColumn("mu", lit(mu)) .withColumn("beta", lit(beta)) .select(rand_laplace(F.col("mu"), F.col("beta")).alias("laplace_random")) @@ -84,11 +85,11 @@ object functionsTests extends TestSuite with DataFrameComparer with ColumnCompar ) .collect()(0) - val laplaceMean = stats.getAs[Double]("mean") - val laplaceStdDev = stats.getAs[Double]("std_dev") + val laplaceMean = stats.getAs[Double]("mean") + val laplaceStdDev = stats.getAs[Double]("std_dev") val expectedStdDev = beta * math.sqrt(2) - // Laplace distribution with mean=0.0 and scale=1.0 has mean=0.0 and stddev=sqrt(2.0) + // Laplace distribution should have same mean as mu input and standard deviation = sqrt(2) * beta assert(math.abs(laplaceMean - mu) <= 0.1) assert(math.abs(laplaceStdDev - expectedStdDev) <= 0.1) } @@ -114,9 +115,10 @@ object functionsTests extends TestSuite with DataFrameComparer with ColumnCompar } "has correct min and max using min max column" - { - val min = 5 - val max = 10 - val sourceDF = spark.range(100000) + val min = 5 + val max = 10 + val sourceDF = spark + .range(100000) .withColumn("min", lit(min)) .withColumn("max", lit(max)) .select(rand_range(F.col("min"), F.col("max")).as("rand_min_max")) @@ -157,7 +159,8 @@ object functionsTests extends TestSuite with DataFrameComparer with ColumnCompar "has correct mean and variance using mean and variance column" - { val mean = 1.0 val variance = 2.0 - val sourceDF = spark.range(100000) + val sourceDF = spark + .range(100000) .withColumn("mean", lit(mean)) .withColumn("variance", lit(variance)) .select(randn(mean, variance).as("rand_normal")) From 3fb21741967da2f8be01e0c16b4a8ac035045171 Mon Sep 17 00:00:00 2001 From: Tuan Pham Date: Wed, 23 Oct 2024 21:58:28 +1100 Subject: [PATCH 3/3] Add alias for rand functions --- .../apache/spark/sql/daria/functions.scala | 167 ++++++++++++++++-- 1 file changed, 152 insertions(+), 15 deletions(-) diff --git a/unsafe/src/main/scala/org/apache/spark/sql/daria/functions.scala b/unsafe/src/main/scala/org/apache/spark/sql/daria/functions.scala index 6995a8f..cc4a030 100644 --- a/unsafe/src/main/scala/org/apache/spark/sql/daria/functions.scala +++ b/unsafe/src/main/scala/org/apache/spark/sql/daria/functions.scala @@ -2,7 +2,7 @@ package org.apache.spark.sql.daria import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.expressions.{Expression, RandGamma} -import org.apache.spark.sql.functions.{lit, log, signum, when} +import org.apache.spark.sql.functions.{lit, log, signum} import org.apache.spark.sql.{functions => F} import org.apache.spark.util.Utils @@ -15,7 +15,7 @@ object functions { * * @note The function is non-deterministic in general case. */ - def rand_gamma(seed: Long, shape: Double, scale: Double): Column = withExpr(RandGamma(seed, shape, scale)).alias("gamma_random") + def randGamma(seed: Long, shape: Double, scale: Double): Column = withExpr(RandGamma(seed, shape, scale)).alias("gamma_random") /** * Generate a column with independent and identically distributed (i.i.d.) samples @@ -23,7 +23,7 @@ object functions { * * @note The function is non-deterministic in general case. */ - def rand_gamma(seed: Column, shape: Column, scale: Column): Column = withExpr(RandGamma(seed.expr, shape.expr, scale.expr)).alias("gamma_random") + def randGamma(seed: Column, shape: Column, scale: Column): Column = withExpr(RandGamma(seed.expr, shape.expr, scale.expr)).alias("gamma_random") /** * Generate a column with independent and identically distributed (i.i.d.) samples @@ -31,7 +31,7 @@ object functions { * * @note The function is non-deterministic in general case. */ - def rand_gamma(shape: Double, scale: Double): Column = rand_gamma(Utils.random.nextLong, shape, scale) + def randGamma(shape: Double, scale: Double): Column = randGamma(Utils.random.nextLong, shape, scale) /** * Generate a column with independent and identically distributed (i.i.d.) samples @@ -39,17 +39,64 @@ object functions { * * @note The function is non-deterministic in general case. */ - def rand_gamma(shape: Column, scale: Column): Column = rand_gamma(lit(Utils.random.nextLong), shape, scale) + def randGamma(shape: Column, scale: Column): Column = randGamma(lit(Utils.random.nextLong), shape, scale) + + /** + * Generate a column with independent and identically distributed (i.i.d.) samples + * from the Gamma distribution with the specified shape and scale parameters. + * + * @note The function is non-deterministic in general case. + */ + def randGamma(): Column = randGamma(1.0, 1.0) + + /** + * Generate a column with independent and identically distributed (i.i.d.) samples + * from the Gamma distribution with the specified shape and scale parameters. + * + * An alias of `randGamma` + * @note The function is non-deterministic in general case. + */ + def rand_gamma(seed: Long, shape: Double, scale: Double): Column = randGamma(seed, shape, scale) + + /** + * Generate a column with independent and identically distributed (i.i.d.) samples + * from the Gamma distribution with the specified shape and scale parameters. + * An alias of `randGamma` + * @note The function is non-deterministic in general case. + */ + def rand_gamma(seed: Column, shape: Column, scale: Column): Column = randGamma(seed, shape, scale) + + /** + * Generate a column with independent and identically distributed (i.i.d.) samples + * from the Gamma distribution with the specified shape and scale parameters. + * + * An alias of `randGamma` + * + * @note The function is non-deterministic in general case. + */ + def rand_gamma(shape: Double, scale: Double): Column = randGamma(shape, scale) + + /** + * Generate a column with independent and identically distributed (i.i.d.) samples + * from the Gamma distribution with the specified shape and scale parameters. + * + * An alias of `randGamma` + * + * @note The function is non-deterministic in general case. + */ + def rand_gamma(shape: Column, scale: Column): Column = randGamma(shape, scale) /** * Generate a column with independent and identically distributed (i.i.d.) samples * from the Gamma distribution with default parameters (shape = 1.0, scale = 1.0). * + * An alias of `randGamma` + * * @return A column with i.i.d. samples from the default Gamma distribution. * * @note The function is non-deterministic in general case. */ - def rand_gamma(): Column = rand_gamma(1.0, 1.0) + def rand_gamma(): Column = randGamma() /** * Generate a column with independent and identically distributed (i.i.d.) samples @@ -57,7 +104,7 @@ object functions { * * @note The function is non-deterministic in general case. */ - def rand_laplace(seed: Long, mu: Column, beta: Column): Column = { + def randLaplace(seed: Long, mu: Column, beta: Column): Column = { val u = F.rand(seed) - lit(0.5) mu - beta * signum(u) * log(lit(1) - (lit(2) * F.abs(u))) } @@ -68,33 +115,75 @@ object functions { * * @note The function is non-deterministic in general case. */ + def randLaplace(seed: Long, mu: Double, beta: Double): Column = { + randLaplace(seed, lit(mu), lit(beta)) + } + + /** + * Generate a column with independent and identically distributed (i.i.d.) samples + * from the Laplace distribution with the specified location parameter `mu` and scale parameter `beta`. + * + * @note The function is non-deterministic in general case. + */ + def randLaplace(mu: Column, beta: Column): Column = randLaplace(Utils.random.nextLong, mu, beta) + + /** + * Generate a column with independent and identically distributed (i.i.d.) samples + * from the Laplace distribution with the specified location parameter `mu` and scale parameter `beta`. + * + * @note The function is non-deterministic in general case. + */ + def randLaplace(mu: Double, beta: Double): Column = randLaplace(Utils.random.nextLong, mu, beta) + + /** + * Generate a column with independent and identically distributed (i.i.d.) samples + * from the Laplace distribution with default parameters (mu = 0.0, beta = 1.0). + * + * @note The function is non-deterministic in general case. + */ + def randLaplace(): Column = randLaplace(0.0, 1.0) + + /** + * Generate a column with independent and identically distributed (i.i.d.) samples + * from the Laplace distribution with the specified location parameter `mu` and scale parameter `beta`. + * + * An alias of `randLaplace` + * + * @note The function is non-deterministic in general case. + */ def rand_laplace(seed: Long, mu: Double, beta: Double): Column = { - rand_laplace(seed, lit(mu), lit(beta)) + randLaplace(seed, mu, beta) } /** * Generate a column with independent and identically distributed (i.i.d.) samples * from the Laplace distribution with the specified location parameter `mu` and scale parameter `beta`. * + * An alias of `randLaplace` + * * @note The function is non-deterministic in general case. */ - def rand_laplace(mu: Column, beta: Column): Column = rand_laplace(Utils.random.nextLong, mu, beta) + def rand_laplace(mu: Column, beta: Column): Column = randLaplace(mu, beta) /** * Generate a column with independent and identically distributed (i.i.d.) samples * from the Laplace distribution with the specified location parameter `mu` and scale parameter `beta`. * + * An alias of `randLaplace` + * * @note The function is non-deterministic in general case. */ - def rand_laplace(mu: Double, beta: Double): Column = rand_laplace(Utils.random.nextLong, mu, beta) + def rand_laplace(mu: Double, beta: Double): Column = randLaplace(mu, beta) /** * Generate a column with independent and identically distributed (i.i.d.) samples * from the Laplace distribution with default parameters (mu = 0.0, beta = 1.0). * + * An alias of `randLaplace` + * * @note The function is non-deterministic in general case. */ - def rand_laplace(): Column = rand_laplace(0.0, 1.0) + def rand_laplace(): Column = randLaplace() /** * Generate a random column with independent and identically distributed (i.i.d.) samples @@ -102,7 +191,7 @@ object functions { * * @note The function is non-deterministic in general case. */ - def rand_range(seed: Long, min: Column, max: Column): Column = { + def randRange(seed: Long, min: Column, max: Column): Column = { min + (max - min) * F.rand(seed) } @@ -112,28 +201,76 @@ object functions { * * @note The function is non-deterministic in general case. */ + def randRange(seed: Long, min: Int, max: Int): Column = { + randRange(seed, lit(min), lit(max)) + } + + /** + * Generate a random column with independent and identically distributed (i.i.d.) samples + * uniformly distributed in [`min`, `max`). + * + * @note The function is non-deterministic in general case. + */ + def randRange(min: Int, max: Int): Column = { + randRange(Utils.random.nextLong, min, max) + } + + /** + * Generate a random column with independent and identically distributed (i.i.d.) samples + * uniformly distributed in [`min`, `max`). + * + * @note The function is non-deterministic in general case. + */ + def randRange(min: Column, max: Column): Column = { + randRange(Utils.random.nextLong, min, max) + } + + /** + * Generate a random column with independent and identically distributed (i.i.d.) samples + * uniformly distributed in [`min`, `max`). + * + * An alias of `randRange` + * + * @note The function is non-deterministic in general case. + */ + def rand_range(seed: Long, min: Column, max: Column): Column = { + randRange(seed, min, max) + } + + /** + * Generate a random column with independent and identically distributed (i.i.d.) samples + * uniformly distributed in [`min`, `max`). + * + * An alias of `randRange` + * + * @note The function is non-deterministic in general case. + */ def rand_range(seed: Long, min: Int, max: Int): Column = { - rand_range(seed, lit(min), lit(max)) + randRange(seed, min, max) } /** * Generate a random column with independent and identically distributed (i.i.d.) samples * uniformly distributed in [`min`, `max`). * + * An alias of `randRange` + * * @note The function is non-deterministic in general case. */ def rand_range(min: Int, max: Int): Column = { - rand_range(Utils.random.nextLong, min, max) + randRange(min, max) } /** * Generate a random column with independent and identically distributed (i.i.d.) samples * uniformly distributed in [`min`, `max`). * + * An alias of `randRange` + * * @note The function is non-deterministic in general case. */ def rand_range(min: Column, max: Column): Column = { - rand_range(Utils.random.nextLong, min, max) + randRange(min, max) } /**