Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added semantic check for calls to s.bag() from a UDF of s.updateWith* #85

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions emma-common/src/main/scala/eu/stratosphere/emma/api/Stateful.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ object Stateful {
*/
sealed class Bag[S <: Identity[K], K] private(val state: Map[K, S]) {

var updateInProgress = false // This is for detecting when the user calls .bag() from the UDF of updateWith*

/**
* Private constructor that transforms a DataBag into a stateful Bag.
*
Expand All @@ -39,6 +41,7 @@ object Stateful {
}})
}


/**
* Computes and flattens a delta without additional inputs. The UDF `f` is allowed to modify the state element `s`,
* however with the restriction that the modification should not affect it's identity.
Expand All @@ -47,11 +50,12 @@ object Stateful {
* @tparam B The type of the output elements.
* @return The flattened result of all update invocations.
*/
def updateWithZero[B](f: S => DataBag[B]): DataBag[B] = for {
s <- DataBag(state.values.toList)
b <- f(s)
} yield b

def updateWithZero[B](f: S => DataBag[B]): DataBag[B] = checkNoBagAccess {
for {
s <- DataBag(state.values.toList)
b <- f(s)
} yield b
}

/**
* Computes and flattens the `leftJoin` with `updates` which passes for each update element `u` it's
Expand All @@ -66,11 +70,13 @@ object Stateful {
* @return The flattened result of all update invocations.
*/
def updateWithOne[A, B](updates: DataBag[A])
(k: A => K, f: (S, A) => DataBag[B]): DataBag[B] = for {
(k: A => K, f: (S, A) => DataBag[B]): DataBag[B] = checkNoBagAccess {
for {
a <- updates
s <- DataBag(state.get(k(a)).toList)
b <- f(s, a)
} yield b
}

/**
* Computes and flattens the `nestJoin(p, f)` which nests the current state elements `s` against their
Expand Down Expand Up @@ -99,17 +105,33 @@ object Stateful {
* @return The flattened collection
*/
def updateWithMany[A, B](updates: DataBag[A])
(k: A => K, f: (S, DataBag[A]) => DataBag[B]): DataBag[B] = for {
(k: A => K, f: (S, DataBag[A]) => DataBag[B]): DataBag[B] = checkNoBagAccess {
for {
s <- DataBag(state.values.toList)
b <- f(s, updates withFilter { k(_) == s.identity })
} yield b
}


/**
* Converts the stateful bag into an immutable Bag.
*
* @return A Databag containing the set of elements contained in this stateful Bag.
*/
def bag(): DataBag[S] = DataBag((for (x <- state) yield kryo.copy(x._2)).toSeq)
def bag(): DataBag[S] = {
if (updateInProgress)
throw new StatefulAccessedFromUdfException()

DataBag((for (x <- state) yield kryo.copy(x._2)).toSeq)
}



private def checkNoBagAccess[T](body: => T): T = {
updateInProgress = true
val result = body
updateInProgress = false
result
}
}
}
14 changes: 14 additions & 0 deletions emma-common/src/main/scala/eu/stratosphere/emma/api/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,18 @@ package object api {

implicit def materializeCSVConverters[T]: CSVConverters[T] =
macro ConvertersMacros.materializeCSVConverters[T]

// -----------------------------------------------------
// exceptions
// -----------------------------------------------------

class InvalidProgramException(message: String = null, cause: Throwable = null) extends Exception(message, cause) {}

class StatefulAccessedFromUdfException()
extends InvalidProgramException("""
| Called .bag() from the UDF of an updateWith* on the same stateful that is
| "being updated. A possible fix is to save the result of the .bag() call to a DataBag before the
| "updateWith* call, and use that one in place of this .bag() call.""".stripMargin)
// Note: Don't catch this!
// This should terminate the program, because the stateful will be in an invalid state after throwing this.
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.scalatest.junit.JUnitRunner
class StatefulNativeTest extends FlatSpec with Matchers with BeforeAndAfter {

"Stateful native" should "do deep copies" in {
import eu.stratosphere.emma.api.StatefulNativeTest.Foo

val b1 = DataBag(Seq(Foo(1, 1)))

Expand All @@ -28,7 +29,10 @@ class StatefulNativeTest extends FlatSpec with Matchers with BeforeAndAfter {
}
}

object StatefulNativeTest {

case class Foo(@id s: Int, var n: Int) extends Identity[Int] {
override def identity = n
}

case class Foo(@id s: Int, var n: Int) extends Identity[Int] {
override def identity = n
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package eu.stratosphere.emma.macros.program

import eu.stratosphere.emma.api.StatefulAccessedFromUdfException
import eu.stratosphere.emma.macros.program.comprehension.ComprehensionAnalysis

private[emma] trait SemanticChecks extends ComprehensionAnalysis {
import universe._

def doSemanticChecks(t: Tree): Unit =
checkForStatefulAccessedFromUdf(t)

/**
* Checks that .bag() is not called from the UDF of an updateWith* on the same stateful that is being updated.
*/
def checkForStatefulAccessedFromUdf(t: Tree): Unit =
t.traverse {
case q"${updateWith @ Select(ident @ Ident(_), _)}[$_]($udf)"
if api.updateWith contains updateWith.symbol =>
if ((udf: Tree).freeTerms.map(x => x.asInstanceOf[Symbol]) contains ident.symbol)
throw new StatefulAccessedFromUdfException()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import scala.language.existentials
import scala.language.experimental.macros
import scala.reflect.macros.blackbox

class WorkflowMacros(val c: blackbox.Context) extends ControlFlow with Comprehension {
class WorkflowMacros(val c: blackbox.Context) extends ControlFlow with Comprehension with SemanticChecks {
import universe._

val ENGINE = typeOf[Engine]
Expand All @@ -18,6 +18,8 @@ class WorkflowMacros(val c: blackbox.Context) extends ControlFlow with Comprehen
// TODO: Add more comprehensive ScalaDoc
def parallelize[T: c.WeakTypeTag](e: Expr[T]) = {

doSemanticChecks(e.tree)

// Create a normalized version of the original tree
val normalized = normalize(e.tree)

Expand Down Expand Up @@ -73,6 +75,8 @@ class WorkflowMacros(val c: blackbox.Context) extends ControlFlow with Comprehen
// TODO: Add more comprehensive ScalaDoc
def comprehend[T: c.WeakTypeTag](e: Expr[T]) = {

doSemanticChecks(e.tree)

// Create a normalized version of the original tree
val normalized = normalize(e.tree)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ private[emma] trait ComprehensionModel extends BlackBoxUtil {
) ++ apply.alternatives

val monadic = Set(map, flatMap, withFilter)

val updateWith = Set(updateWithZero, updateWithOne, updateWithMany)
}

// Type constructors
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package eu.stratosphere.emma.macros

import eu.stratosphere.emma.api._
import eu.stratosphere.emma.api.model._
import org.junit.runner.RunWith
import org.scalatest.{Matchers, FlatSpec}
import org.scalatest.junit.JUnitRunner
import org.scalatest.prop.PropertyChecks

@RunWith(classOf[JUnitRunner])
class SemanticChecksTest extends FlatSpec with PropertyChecks with Matchers {
import CheckForStatefulAccessedFromUdfTest.Foo

"The updateWith* functions" should "throw an exception when the stateful is accessed from the updateWith* UDF" in {
val b = DataBag(Seq(Foo(1, 1)))

// These should throw exceptions:

intercept[StatefulAccessedFromUdfException] {
val s = stateful[Foo, Int](b)
s.updateWithZero(x => s.bag())
}

val us = DataBag(Seq(Foo(1, 2)))

intercept[StatefulAccessedFromUdfException] {
val s = stateful[Foo, Int](b)
s.updateWithOne(us)(_.s, (x,y) => s.bag())
}
intercept[StatefulAccessedFromUdfException] {
val s = stateful[Foo, Int](b)
s.updateWithMany(us)(_.s, (x,y) => s.bag())
}

// And these should not throw exceptions:
val s = stateful[Foo, Int](b)
s.updateWithZero(x => DataBag())
s.updateWithOne(us)(_.s, (x, y) => DataBag())
s.updateWithMany(us)(_.s, (x, y) => DataBag())
}

"parallelize" should "check for the stateful being accessed from the updateWith* UDF" in {
val b = DataBag(Seq(Foo(1, 1)))

"""emma.parallelize {
val s = stateful[Foo, Int](b)
s.updateWithZero(x => s.bag())
}""" shouldNot compile

"""emma.parallelize {
val s = stateful[Foo, Int](b)
s.updateWithOne(us)(_.s, (x,y) => s.bag())
}""" shouldNot compile

"""emma.parallelize {
val s = stateful[Foo, Int](b)
s.updateWithMany(us)(_.s, (x,y) => s.bag())
}""" shouldNot compile

// This should compile
val dummy = emma.parallelize {
val s = stateful[Foo, Int](b)
val us = DataBag(Seq(Foo(1, 2)))
s.updateWithZero(x => DataBag())
s.updateWithOne(us)(_.s, (x, y) => DataBag())
s.updateWithMany(us)(_.s, (x, y) => DataBag())
}
}
}

object CheckForStatefulAccessedFromUdfTest {
case class Foo(@id s: Int, var n: Int) extends Identity[Int] {
override def identity = n
}
}