Skip to content

Suspendable Computations

satabin edited this page Mar 2, 2013 · 3 revisions

tiscaf has a new mechanism for handling HLets with blocking behaviors. The HLet trait's main method is aact(talk: HTalk): Futur[Unit] which executes the actual computation asynchronously (hence the name "aact"). The server will answer to the client only once the future is completed. In the getting started guide, we saw that a simple non suspendable HLet may simply implement the HSimpleLet trait, which exposes the act(talk: HTalk): Unit method.

The new mechanism is more powerful than the old talkExecutor (see the old documentation) in two ways:

  1. a computation may easily be suspended has many times as one wishes,
  2. the user has also access to the session data (not only to the request data) to take his decisions whether to suspend or not the computation.

If one wants to implement blocking behaviors, or suspendable computation (e.g. for publish/subscribe server), one may mixin to HLet the HSuspendable trait , which exposes utility methods to suspend a computation.

trait HSuspendable {

  //------------------- to implement ------------------------

  /** This method is called whenever the suspend method is called.
   *  Implementer may choose how to store the promise whenever the computation
   *  is suspended. */
  protected def onSuspend[T: Manifest](promise: Suspended[T])

  //------------------------ few helpers --------------------

  protected def suspend[T: Manifest]: Future[T] = ...

  protected def suspend[T](resume: T => Unit)(implicit manifest: Manifest[T],
    executionContext: ExecutionContext): Future[Unit] = ...

}

It simply exposes the suspend helper methods. Two different methods exist, one allowing people to use it in for-comprehensions, the other one allowing a CPS way of programming.

A small example may better illustrate how it works. The following shows the two ways of defining a server that holds requests until two requests to '/resume' have been made. In which case the pending requests are resumed with two new identifiers

Using for-Comprehensions

object Test extends App with HServer {
  def ports = Set(8910)
  def apps = List(app)

  object app extends HApp {
    def resolve(req: HReqData) = req.uriPath match {
      case "resume" => Some(resumeLet)
      case path => Some(new SuspendLet(path))
    }
  }

  import scala.collection.mutable._

  val pending = new HashSet[Suspended[Int]] with SynchronizedSet[Suspended[Int]]

  var i = 0
  def next = {
    i += 1
    i
  }

  object resumeLet extends HSimpleLet {
    def act(talk: HTalk) {
      val toRemove = pending map { s =>
        s.resume(next)
        s
      }
      pending --= toRemove
      talk.setContentLength(3).write("ok\n")
    }
  }

  class SuspendLet(path: String) extends HLet with HSuspendable {

    def aact(talk: HTalk)(implicit executionContext: ExecutionContext) = {

      println("path: " + path)

      for {
        i <- suspend[Int]
        j <- suspend[Int]
        result = path + "#" + i + "#" + j + "\n"
      } yield talk.setContentLength(result.size).write(result)

    }

    def onSuspend[T: Manifest](suspended: Suspended[T]) {
      suspended match {
        case s: Suspended[Int] if manifest[T] <:< manifest[Int] =>
          pending += s
        case _ => // ignore
      }
    }

  }

  start
}

Using CPS

object Test extends App with HServer {
  def ports = Set(8910)
  def apps = List(app)

  object app extends HApp {
    def resolve(req: HReqData) = req.uriPath match {
      case "resume" => Some(resumeLet)
      case path => Some(new SuspendLet(path))
    }
  }

  import scala.collection.mutable._

  val pending = new HashSet[Suspended[Int]] with SynchronizedSet[Suspended[Int]]

  var i = 0
  def next = {
    i += 1
    i
  }

  object resumeLet extends HSimpleLet {
    def act(talk: HTalk) {
      val toRemove = pending map { s =>
        s.resume(next)
        s
      }
      pending --= toRemove
      talk.setContentLength(3).write("ok\n")
    }
  }

  class SuspendLet(path: String) extends HLet with HSuspendable {

    def aact(talk: HTalk)(implicit executionContext: ExecutionContext) = {

      println("path: " + path)

      suspend[Int] { i =>
        suspend[Int] { j =>
          val result = path + "#" + i + "#" + j + "\n"
          talk.setContentLength(result.size).write(result)
        }
      }
    }

    def onSuspend[T: Manifest](suspended: Suspended[T]) {
      suspended match {
        case s: Suspended[Int] if manifest[T] <:< manifest[Int] =>
          pending += s
        case _ => // ignore
      }
    }

  }

  start
}
Clone this wiki locally