Skip to content

Commit

Permalink
Merge branch 'smithjessk-implement-async' into 'master'
Browse files Browse the repository at this point in the history
Implement async



See merge request !3
  • Loading branch information
axel22 committed Jul 27, 2016
2 parents a7420fa + 2b50dd3 commit be3a4d8
Show file tree
Hide file tree
Showing 7 changed files with 457 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package org.coroutines.extra



import org.coroutines._
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
import scala.language.experimental.macros
import scala.reflect.macros.whitebox.Context
import scala.util.{ Success, Failure }



object AsyncAwait {
/** Await the result of a future.
*
* When called inside an `async` body, this function will block until its
* associated future completes.
*
* @return A coroutine that yields a tuple. `async` will assign this tuple's
* second element to hold the completed result of the `Future` passed
* into the coroutine. The coroutine will directly return the
* result of the future.
*/
def await[R]: Future[R] ~~> (Future[R], R) =
coroutine { (awaitedFuture: Future[R]) =>
yieldval(awaitedFuture)
var result: R = null.asInstanceOf[R]
awaitedFuture.value match {
case Some(Success(x)) => result = x
case Some(Failure(error)) => throw error
case None => sys.error("Future was not completed")
}
result
}

/** Calls `body`, blocking on any calls to `await`.
*
* @param body A coroutine to be invoked.
* @return A `Future` wrapping the result of the coroutine. The future fails
* if `body` throws an exception or one of the `await`s takes a failed
* future.
*/
def asyncCall[Y, R](body: ~~~>[Future[Y], R]): Future[R] = {
val c = call(body())
val p = Promise[R]
def loop() {
if (!c.resume) {
c.tryResult match {
case Success(result) => p.success(result)
case Failure(exception) => p.failure(exception)
}
} else {
val awaitedFuture = c.value
if (awaitedFuture.isCompleted) {
loop()
} else {
awaitedFuture onComplete {
case _ => loop()
}
}
}
}
Future { loop() }
p.future
}

/** Wraps `body` inside a coroutine and asynchronously invokes it using `asyncMacro`.
*
* @param body The block of code to wrap inside an asynchronous coroutine.
* @return A `Future` wrapping the result of `body`.
*/
def async[Y, R](body: =>R): Future[R] = macro asyncMacro[Y, R]

/** Implements `async`.
*
* Wraps `body` inside a coroutine and calls `asyncCall`.
*
* @param body The function to be wrapped in a coroutine.
* @return A tree that contains an invocation of `asyncCall` on a coroutine
* with `body` as its body.
*/
def asyncMacro[Y, R](c: Context)(body: c.Tree): c.Tree = {
import c.universe._

q"""
val c = coroutine { () =>
$body
}
_root_.org.coroutines.extra.AsyncAwait.asyncCall(c)
"""
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package org.coroutines.extra



import org.coroutines._
import org.scalatest._
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
import scala.concurrent.duration._
import scala.language.{ reflectiveCalls, postfixOps }
import scala.util.Success



class TestException(msg: String = "") extends Throwable(msg)


class AsyncAwaitTest extends FunSuite with Matchers {
import AsyncAwait._

/** Source: https://git.io/vorXv
* The use of Async/Await as opposed to pure futures allows this control flow
* to be written more easily.
* The execution blocks when awaiting for the result of `f1`. `f2` only blocks
* after `AsyncAwait.await(f1)` evaluates to `true`.
*/
test("simple test") {
val future = async {
val f1 = Future(true)
val f2 = Future(42)
if (await(f1)) {
await(f2)
} else {
0
}
}
assert(Await.result(future, 1 seconds) == 42)
}

/** Asynchronous blocks of code can be defined either outside of or within any
* part of an `async` block. This allows the user to avoid triggering the
* computation of slow futures until it is necessary.
* For instance, computation will not begin on `innerFuture` until
* `await(trueFuture)` evaluates to true.
*/
test("nested async blocks") {
val outerFuture = async {
val trueFuture = Future { true }
if (await(trueFuture)) {
val innerFuture = async {
await(Future { 100 } )
}
await(innerFuture)
} else {
200
}
}
assert(Await.result(outerFuture, 1 seconds) == 100)
}

/** Uncaught exceptions thrown inside async blocks cause the associated futures
* to fail.
*/
test("error handling test 1") {
val errorMessage = "System error!"
val exception = intercept[RuntimeException] {
val future = async {
sys.error(errorMessage)
await(Future("dog"))
}
val result = Await.result(future, 1 seconds)
}
assert(exception.getMessage == errorMessage)
}

test("error handling test 2") {
val errorMessage = "Internal await error"
val exception = intercept[RuntimeException] {
val future = async {
await(Future {
sys.error(errorMessage)
"Here ya go"
})
}
val result = Await.result(future, 1 seconds)
}
assert(exception.getMessage == errorMessage)
}

/** Source: https://git.io/vowde
* Without the closing `()`, the compiler complains about expecting return
* type `Future[Unit]` but finding `Future[Nothing]`.
*/
test("uncaught exception within async after await") {
val future = async {
await(Future(()))
throw new TestException
()
}
intercept[TestException] {
Await.result(future, 1 seconds)
}
}

// Source: https://git.io/vowdk
test("await failing future within async") {
val base = Future[Int] { throw new TestException }
val future = async {
val x = await(base)
x * 2
}
intercept[TestException] { Await.result(future, 1 seconds) }
}

/** Source: https://git.io/vowdY
* Exceptions thrown inside `await` calls are properly bubbled up. They cause
* the async block's future to fail.
*/
test("await failing future within async after await") {
val base = Future[Any] { "five!".length }
val future = async {
val a = await(base.mapTo[Int])
val b = await(Future { (a * 2).toString }.mapTo[Int])
val c = await(Future { (7 * 2).toString })
b + "-" + c
}
intercept[ClassCastException] {
Await.result(future, 1 seconds)
}
}

test("nested failing future within async after await") {
val base = Future[Any] { "five!".length }
val future = async {
val a = await(base.mapTo[Int])
val b = await(
await(Future((Future { (a * 2).toString }).mapTo[Int])))
val c = await(Future { (7 * 2).toString })
b + "-" + c
}
intercept[ClassCastException] {
Await.result(future, 1 seconds)
}
}

test("await should bubble up exceptions") {
def thrower() = {
throw new TestException
Future(1)
}

var exceptionFound = false
val future = async {
try {
await(thrower())
()
} catch {
case _: TestException => exceptionFound = true
}
}
val r = Await.result(future, 1 seconds)
assert(exceptionFound)
}

test("await should bubble up exceptions from failed futures") {
def failer(): Future[Int] = {
Future.failed(new TestException("kaboom"))
}

var exceptionFound = false
val future = async {
try {
await(failer())
()
} catch {
case _: TestException => exceptionFound = true
}
}
val r = Await.result(future, 1 seconds)
assert(exceptionFound)
}
}
2 changes: 2 additions & 0 deletions dependencies.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ coroutines = [
artifact = ["com.storm-enroute", "scalameter", "0.8-SNAPSHOT", "test;bench"]
}
]

coroutines-extra = []
73 changes: 73 additions & 0 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,63 @@ object CoroutinesBuild extends MechaRepoBuild {
mechaDocsPathKey := "coroutines-common"
)

val coroutinesExtraSettings =
Defaults.defaultSettings ++ MechaRepoPlugin.defaultSettings ++ Seq(
name := "coroutines-extra",
organization := "com.storm-enroute",
version <<= frameworkVersion,
scalaVersion <<= coroutinesScalaVersion,
crossScalaVersions <<= coroutinesCrossScalaVersions,
libraryDependencies <++= (scalaVersion)(sv => extraDependencies(sv)),
scalacOptions ++= Seq(
"-deprecation",
"-unchecked",
"-optimise",
"-Yinline-warnings"
),
resolvers ++= Seq(
"Sonatype OSS Snapshots" at
"https://oss.sonatype.org/content/repositories/snapshots",
"Sonatype OSS Releases" at
"https://oss.sonatype.org/content/repositories/releases"
),
ivyLoggingLevel in ThisBuild := UpdateLogging.Quiet,
publishMavenStyle := true,
publishTo <<= version { (v: String) =>
val nexus = "https://oss.sonatype.org/"
if (v.trim.endsWith("SNAPSHOT"))
Some("snapshots" at nexus + "content/repositories/snapshots")
else
Some("releases" at nexus + "service/local/staging/deploy/maven2")
},
publishArtifact in Test := false,
pomIncludeRepository := { _ => false },
pomExtra :=
<url>http://storm-enroute.com/</url>
<licenses>
<license>
<name>BSD-style</name>
<url>http://opensource.org/licenses/BSD-3-Clause</url>
<distribution>repo</distribution>
</license>
</licenses>
<scm>
<url>git@github.com:storm-enroute/coroutines.git</url>
<connection>scm:git:git@github.com:storm-enroute/coroutines.git</connection>
</scm>
<developers>
<developer>
<id>axel22</id>
<name>Aleksandar Prokopec</name>
<url>http://axel22.github.com/</url>
</developer>
</developers>,
mechaPublishKey <<= mechaPublishKey.dependsOn(publish),
mechaDocsRepoKey := "[email protected]:storm-enroute/apidocs.git",
mechaDocsBranchKey := "gh-pages",
mechaDocsPathKey := "coroutines-extra"
)

def commonDependencies(scalaVersion: String) =
CrossVersion.partialVersion(scalaVersion) match {
case Some((2, major)) if major >= 11 => Seq(
Expand All @@ -169,6 +226,14 @@ object CoroutinesBuild extends MechaRepoBuild {
case _ => Nil
}

def extraDependencies(scalaVersion: String) =
CrossVersion.partialVersion(scalaVersion) match {
case Some((2, major)) if major >= 11 => Seq(
"org.scalatest" % "scalatest_2.11" % "2.2.6" % "test"
)
case _ => Nil
}

lazy val Benchmarks = config("bench") extend (Test)

lazy val coroutines: Project = Project(
Expand All @@ -191,4 +256,12 @@ object CoroutinesBuild extends MechaRepoBuild {
settings = coroutinesCommonSettings
) dependsOnSuperRepo


lazy val coroutinesExtra: Project = Project(
"coroutines-extra",
file("coroutines-extra"),
settings = coroutinesExtraSettings
) dependsOn(
coroutines % "compile->compile;test->test"
) dependsOnSuperRepo
}
Loading

0 comments on commit be3a4d8

Please sign in to comment.