-
Notifications
You must be signed in to change notification settings - Fork 20
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1 from storm-enroute/master
Update master to be parallel with storm-enroute/coroutines
- Loading branch information
Showing
7 changed files
with
457 additions
and
13 deletions.
There are no files selected for viewing
94 changes: 94 additions & 0 deletions
94
coroutines-extra/src/main/scala/org/coroutines/extra/AsyncAwait.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
package org.coroutines.extra | ||
|
||
|
||
|
||
import org.coroutines._ | ||
import scala.annotation.unchecked.uncheckedVariance | ||
import scala.concurrent.ExecutionContext.Implicits.global | ||
import scala.concurrent._ | ||
import scala.language.experimental.macros | ||
import scala.reflect.macros.whitebox.Context | ||
import scala.util.{ Success, Failure } | ||
|
||
|
||
|
||
object AsyncAwait { | ||
/** Await the result of a future. | ||
* | ||
* When called inside an `async` body, this function will block until its | ||
* associated future completes. | ||
* | ||
* @return A coroutine that yields a tuple. `async` will assign this tuple's | ||
* second element to hold the completed result of the `Future` passed | ||
* into the coroutine. The coroutine will directly return the | ||
* result of the future. | ||
*/ | ||
def await[R]: Future[R] ~~> (Future[R], R) = | ||
coroutine { (awaitedFuture: Future[R]) => | ||
yieldval(awaitedFuture) | ||
var result: R = null.asInstanceOf[R] | ||
awaitedFuture.value match { | ||
case Some(Success(x)) => result = x | ||
case Some(Failure(error)) => throw error | ||
case None => sys.error("Future was not completed") | ||
} | ||
result | ||
} | ||
|
||
/** Calls `body`, blocking on any calls to `await`. | ||
* | ||
* @param body A coroutine to be invoked. | ||
* @return A `Future` wrapping the result of the coroutine. The future fails | ||
* if `body` throws an exception or one of the `await`s takes a failed | ||
* future. | ||
*/ | ||
def asyncCall[Y, R](body: ~~~>[Future[Y], R]): Future[R] = { | ||
val c = call(body()) | ||
val p = Promise[R] | ||
def loop() { | ||
if (!c.resume) { | ||
c.tryResult match { | ||
case Success(result) => p.success(result) | ||
case Failure(exception) => p.failure(exception) | ||
} | ||
} else { | ||
val awaitedFuture = c.value | ||
if (awaitedFuture.isCompleted) { | ||
loop() | ||
} else { | ||
awaitedFuture onComplete { | ||
case _ => loop() | ||
} | ||
} | ||
} | ||
} | ||
Future { loop() } | ||
p.future | ||
} | ||
|
||
/** Wraps `body` inside a coroutine and asynchronously invokes it using `asyncMacro`. | ||
* | ||
* @param body The block of code to wrap inside an asynchronous coroutine. | ||
* @return A `Future` wrapping the result of `body`. | ||
*/ | ||
def async[Y, R](body: =>R): Future[R] = macro asyncMacro[Y, R] | ||
|
||
/** Implements `async`. | ||
* | ||
* Wraps `body` inside a coroutine and calls `asyncCall`. | ||
* | ||
* @param body The function to be wrapped in a coroutine. | ||
* @return A tree that contains an invocation of `asyncCall` on a coroutine | ||
* with `body` as its body. | ||
*/ | ||
def asyncMacro[Y, R](c: Context)(body: c.Tree): c.Tree = { | ||
import c.universe._ | ||
|
||
q""" | ||
val c = coroutine { () => | ||
$body | ||
} | ||
_root_.org.coroutines.extra.AsyncAwait.asyncCall(c) | ||
""" | ||
} | ||
} |
183 changes: 183 additions & 0 deletions
183
coroutines-extra/src/test/scala/org/coroutines/extra/async-await-tests.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
package org.coroutines.extra | ||
|
||
|
||
|
||
import org.coroutines._ | ||
import org.scalatest._ | ||
import scala.annotation.unchecked.uncheckedVariance | ||
import scala.concurrent.ExecutionContext.Implicits.global | ||
import scala.concurrent._ | ||
import scala.concurrent.duration._ | ||
import scala.language.{ reflectiveCalls, postfixOps } | ||
import scala.util.Success | ||
|
||
|
||
|
||
class TestException(msg: String = "") extends Throwable(msg) | ||
|
||
|
||
class AsyncAwaitTest extends FunSuite with Matchers { | ||
import AsyncAwait._ | ||
|
||
/** Source: https://git.io/vorXv | ||
* The use of Async/Await as opposed to pure futures allows this control flow | ||
* to be written more easily. | ||
* The execution blocks when awaiting for the result of `f1`. `f2` only blocks | ||
* after `AsyncAwait.await(f1)` evaluates to `true`. | ||
*/ | ||
test("simple test") { | ||
val future = async { | ||
val f1 = Future(true) | ||
val f2 = Future(42) | ||
if (await(f1)) { | ||
await(f2) | ||
} else { | ||
0 | ||
} | ||
} | ||
assert(Await.result(future, 1 seconds) == 42) | ||
} | ||
|
||
/** Asynchronous blocks of code can be defined either outside of or within any | ||
* part of an `async` block. This allows the user to avoid triggering the | ||
* computation of slow futures until it is necessary. | ||
* For instance, computation will not begin on `innerFuture` until | ||
* `await(trueFuture)` evaluates to true. | ||
*/ | ||
test("nested async blocks") { | ||
val outerFuture = async { | ||
val trueFuture = Future { true } | ||
if (await(trueFuture)) { | ||
val innerFuture = async { | ||
await(Future { 100 } ) | ||
} | ||
await(innerFuture) | ||
} else { | ||
200 | ||
} | ||
} | ||
assert(Await.result(outerFuture, 1 seconds) == 100) | ||
} | ||
|
||
/** Uncaught exceptions thrown inside async blocks cause the associated futures | ||
* to fail. | ||
*/ | ||
test("error handling test 1") { | ||
val errorMessage = "System error!" | ||
val exception = intercept[RuntimeException] { | ||
val future = async { | ||
sys.error(errorMessage) | ||
await(Future("dog")) | ||
} | ||
val result = Await.result(future, 1 seconds) | ||
} | ||
assert(exception.getMessage == errorMessage) | ||
} | ||
|
||
test("error handling test 2") { | ||
val errorMessage = "Internal await error" | ||
val exception = intercept[RuntimeException] { | ||
val future = async { | ||
await(Future { | ||
sys.error(errorMessage) | ||
"Here ya go" | ||
}) | ||
} | ||
val result = Await.result(future, 1 seconds) | ||
} | ||
assert(exception.getMessage == errorMessage) | ||
} | ||
|
||
/** Source: https://git.io/vowde | ||
* Without the closing `()`, the compiler complains about expecting return | ||
* type `Future[Unit]` but finding `Future[Nothing]`. | ||
*/ | ||
test("uncaught exception within async after await") { | ||
val future = async { | ||
await(Future(())) | ||
throw new TestException | ||
() | ||
} | ||
intercept[TestException] { | ||
Await.result(future, 1 seconds) | ||
} | ||
} | ||
|
||
// Source: https://git.io/vowdk | ||
test("await failing future within async") { | ||
val base = Future[Int] { throw new TestException } | ||
val future = async { | ||
val x = await(base) | ||
x * 2 | ||
} | ||
intercept[TestException] { Await.result(future, 1 seconds) } | ||
} | ||
|
||
/** Source: https://git.io/vowdY | ||
* Exceptions thrown inside `await` calls are properly bubbled up. They cause | ||
* the async block's future to fail. | ||
*/ | ||
test("await failing future within async after await") { | ||
val base = Future[Any] { "five!".length } | ||
val future = async { | ||
val a = await(base.mapTo[Int]) | ||
val b = await(Future { (a * 2).toString }.mapTo[Int]) | ||
val c = await(Future { (7 * 2).toString }) | ||
b + "-" + c | ||
} | ||
intercept[ClassCastException] { | ||
Await.result(future, 1 seconds) | ||
} | ||
} | ||
|
||
test("nested failing future within async after await") { | ||
val base = Future[Any] { "five!".length } | ||
val future = async { | ||
val a = await(base.mapTo[Int]) | ||
val b = await( | ||
await(Future((Future { (a * 2).toString }).mapTo[Int]))) | ||
val c = await(Future { (7 * 2).toString }) | ||
b + "-" + c | ||
} | ||
intercept[ClassCastException] { | ||
Await.result(future, 1 seconds) | ||
} | ||
} | ||
|
||
test("await should bubble up exceptions") { | ||
def thrower() = { | ||
throw new TestException | ||
Future(1) | ||
} | ||
|
||
var exceptionFound = false | ||
val future = async { | ||
try { | ||
await(thrower()) | ||
() | ||
} catch { | ||
case _: TestException => exceptionFound = true | ||
} | ||
} | ||
val r = Await.result(future, 1 seconds) | ||
assert(exceptionFound) | ||
} | ||
|
||
test("await should bubble up exceptions from failed futures") { | ||
def failer(): Future[Int] = { | ||
Future.failed(new TestException("kaboom")) | ||
} | ||
|
||
var exceptionFound = false | ||
val future = async { | ||
try { | ||
await(failer()) | ||
() | ||
} catch { | ||
case _: TestException => exceptionFound = true | ||
} | ||
} | ||
val r = Await.result(future, 1 seconds) | ||
assert(exceptionFound) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -159,6 +159,63 @@ object CoroutinesBuild extends MechaRepoBuild { | |
mechaDocsPathKey := "coroutines-common" | ||
) | ||
|
||
val coroutinesExtraSettings = | ||
Defaults.defaultSettings ++ MechaRepoPlugin.defaultSettings ++ Seq( | ||
name := "coroutines-extra", | ||
organization := "com.storm-enroute", | ||
version <<= frameworkVersion, | ||
scalaVersion <<= coroutinesScalaVersion, | ||
crossScalaVersions <<= coroutinesCrossScalaVersions, | ||
libraryDependencies <++= (scalaVersion)(sv => extraDependencies(sv)), | ||
scalacOptions ++= Seq( | ||
"-deprecation", | ||
"-unchecked", | ||
"-optimise", | ||
"-Yinline-warnings" | ||
), | ||
resolvers ++= Seq( | ||
"Sonatype OSS Snapshots" at | ||
"https://oss.sonatype.org/content/repositories/snapshots", | ||
"Sonatype OSS Releases" at | ||
"https://oss.sonatype.org/content/repositories/releases" | ||
), | ||
ivyLoggingLevel in ThisBuild := UpdateLogging.Quiet, | ||
publishMavenStyle := true, | ||
publishTo <<= version { (v: String) => | ||
val nexus = "https://oss.sonatype.org/" | ||
if (v.trim.endsWith("SNAPSHOT")) | ||
Some("snapshots" at nexus + "content/repositories/snapshots") | ||
else | ||
Some("releases" at nexus + "service/local/staging/deploy/maven2") | ||
}, | ||
publishArtifact in Test := false, | ||
pomIncludeRepository := { _ => false }, | ||
pomExtra := | ||
<url>http://storm-enroute.com/</url> | ||
<licenses> | ||
<license> | ||
<name>BSD-style</name> | ||
<url>http://opensource.org/licenses/BSD-3-Clause</url> | ||
<distribution>repo</distribution> | ||
</license> | ||
</licenses> | ||
<scm> | ||
<url>git@github.com:storm-enroute/coroutines.git</url> | ||
<connection>scm:git:git@github.com:storm-enroute/coroutines.git</connection> | ||
</scm> | ||
<developers> | ||
<developer> | ||
<id>axel22</id> | ||
<name>Aleksandar Prokopec</name> | ||
<url>http://axel22.github.com/</url> | ||
</developer> | ||
</developers>, | ||
mechaPublishKey <<= mechaPublishKey.dependsOn(publish), | ||
mechaDocsRepoKey := "[email protected]:storm-enroute/apidocs.git", | ||
mechaDocsBranchKey := "gh-pages", | ||
mechaDocsPathKey := "coroutines-extra" | ||
) | ||
|
||
def commonDependencies(scalaVersion: String) = | ||
CrossVersion.partialVersion(scalaVersion) match { | ||
case Some((2, major)) if major >= 11 => Seq( | ||
|
@@ -169,6 +226,14 @@ object CoroutinesBuild extends MechaRepoBuild { | |
case _ => Nil | ||
} | ||
|
||
def extraDependencies(scalaVersion: String) = | ||
CrossVersion.partialVersion(scalaVersion) match { | ||
case Some((2, major)) if major >= 11 => Seq( | ||
"org.scalatest" % "scalatest_2.11" % "2.2.6" % "test" | ||
) | ||
case _ => Nil | ||
} | ||
|
||
lazy val Benchmarks = config("bench") extend (Test) | ||
|
||
lazy val coroutines: Project = Project( | ||
|
@@ -191,4 +256,12 @@ object CoroutinesBuild extends MechaRepoBuild { | |
settings = coroutinesCommonSettings | ||
) dependsOnSuperRepo | ||
|
||
|
||
lazy val coroutinesExtra: Project = Project( | ||
"coroutines-extra", | ||
file("coroutines-extra"), | ||
settings = coroutinesExtraSettings | ||
) dependsOn( | ||
coroutines % "compile->compile;test->test" | ||
) dependsOnSuperRepo | ||
} |
Oops, something went wrong.