Skip to content

wilkerlucio/wsscode-async

Repository files navigation

core.async helpers

async cljdoc Test

Core.async is the standard way to handle async features in Clojure and ClojureScript programs.

Although core.async is built upon CSP, often (specially in CLJS) is desired to have something that’s more like Promises/Futures.

Core.async provides a promise-chan, which is a channel that has a promise-like semantic: after realization, any read on it will keep returning the same value. That’s helpful but this doesn’t cover the problem of error propagation, in a promise system it’s expected that errors can flow up to be captured by some higher level code.

This library provide some helpers to deal with this problem, and also:

  1. helpers to integrate with Javascript Promises in ClojureScript environments.

  2. helpers to write tests using core.async and cljs.test.

  3. serialized event callback handling

  4. ID based async callbacks

  5. helpers for timeouts using error handling

  6. helpers to pulling retry operations

Setup

This library uses separated namespaces for Clojure and ClojureScript, when using the Clojure side, use the namespace com.wsscode.async.async-clj, and for ClojureScript use com.wsscode.async.async-cljs. Unless pointed otherwise, the features on both namespaces are the same.

If you want to use this with Clojure Common files, you can use the require like this:

(ns my-ns
  (:require [#?(:clj  com.wsscode.async.async-clj
                :cljs com.wsscode.async.async-cljs)
             :as wa
             :refer [go-promise <?]]))

Error propagation

To deal with error propagation, the trick is to return the error object as the channel value, but also throw that error when reading the channel. Let’s illustrate that with an example:

(ns my-ns
  (:require [com.wsscode.async.async-clj :refer [go-promise <?]))

(defn async-divide [x y]
  ; (1)
  (go-promise
    (/ x y)))

(defn run [d]
  (go-promise
    (try
      ; (2)
      (<? (async-divide 6 d))
      (catch Throwable _
        "ERROR"))))

(comment
  (<!! (run 2)) ; => 3 (3)
  (<!! (run 0)) ; => "ERROR" (4)
  )
  1. go-promise is similar to go, but will return a promise-channel, so in case this result gets passed to multiple readers, all of them will get the realized value or error. This also wraps the block in a try/catch, so if some exception happens it will get returned as the channel value.

  2. <? works like <!, but once it reads a value, it will check if it’s an error, and if so it will throw that error, propagating it up on the chain.

  3. propagate value back

  4. error propagated from async-divide trying to divide by zero

By following this pattern of error capture and send, this system ends up working in the same ways you would expect a promise, all while still in the same go blocks, making it compatible with standard core.async functionality. Later we will also talk about how to integrate with Javascript Promises in this system.

Nil returns

Another difference when using go-promise is that different from regular go, you can return nil as a value. Since the promise will always return the same value, a nil is not ambiguous.

ℹ️
In the implementation side, go-promise check if the value returned is nil, and if so it just closes the channel, making it an effective nil value.

Extra takers

We just saw the helper <? to do a take and check for errors, here is a list of other take helpers provided by this library:

  • <? - take and throw errors

  • <?! (clj only) - take and throw errors blocking on thread

  • <!p (cljs only) - take from JS Promise

  • <!maybe - if param is a channel, take from it, otherwise return its value

  • <!!maybe (clj only) - like <!maybe but blocking the thread

  • <?maybe - take and throw errors, return value if it’s not a channel

  • <?!maybe (clj only) - like <?maybe but blocking the thread

💡
in ClojureScript <?maybe can also take from Promises

Error catching on streaming processes

If you try to use try/catch in a loop, you will notice that it doesn’t work. To go around this limitation this library provides the helper go-try-stream, which will do the necessary setup to make this work.

Example usage:

(wa/go-try-stream [value c]
  (swap! vals conj value)
  (catch :default e
    (swap! vals conj (ex-message e))))

Javascript Promises

While working in Javascript it’s common to need to handle Promises, to help with this there is a macro in this library that enables the read of JS promises as if they were core.async channels, the <!p helper:

(ns my-ns
  (:require [com.wsscode.async.async-cljs :refer [go-promise <? <!p]))

; (1)
(go-promise
  (-> (js/fetch "/") <!p
      (.text) <!p
      js/console.log))
  1. Read the index text of the current domain, note we are waiting for two different promises in this example, the first one for the fetch headers and the second to get the body text.

ℹ️
the way <!p works is by first converting the Promise into a core.async channel and them read on that channel, for core.async sake it’s channels all the way.

Note that this strategy allows the mixing of both core.async channels and promises in the same system, you can both park for channels or promises.

Javascript Async tests

Dealing with async tests in cljs.test can be annoying, the core doesn’t have any integration with core.async, neither it handles common problems like timing out a test. This library provides a helper called deftest-async that aims to facilitate the tests of async core using core.async. Example usage:

(ns com.wsscode.async.async-cljs-test
  (:require [clojure.test :refer [is are run-tests async testing deftest]]
            [com.wsscode.async.async-cljs :as wa :refer [deftest-async <! go]]))

(deftest-async my-test
  (is (= "foo" (<! (go "foo")))))

This macro will do a couple of things:

  1. It will wrap the body in a go-promise block, allowing the use of parking operations

  2. Try/catch this block, if any error happens (sync or async) that generates a test case that will fail with that error

  3. Add a 2 seconds timeout, if the go block doesn’t return in this time it will cancel and fail the test

You can configure the timeout duration, example:

(ns com.wsscode.async.async-cljs-test
  (:require [clojure.test :refer [is are run-tests async testing deftest]]
            [com.wsscode.async.async-cljs :as wa :refer [deftest-async <! go]]))

(deftest-async my-test
  {::wa/timeout 5000} ; 5 seconds timeout
  (is (= "foo" (<! (go "foo")))))
💡
if you want to use this helper with a different test constructor (from Workspaces or Devcards for example) you can use the wa/async-test helper instead

Channel timeout

Timeout is something that we often need, since default core.async doesn’t has a concept of errors, its hard to abstract that away, and requires some tedious code using alts!.

This library provides the timeout-chan helper that returns a new channel that will get the result, or a timeout error after the timeout time specified.

(ns com.wsscode.async.async-cljs-test
  (:require [clojure.test :refer [is are run-tests async testing deftest]]
            [cljs.core.async :as async]
            [com.wsscode.async.async-cljs :as wa :refer [deftest-async <! go]]))

(wa/deftest-async timeout-chan-test
  (try
    ; 100ms timeout
    (wa/<? (wa/timeout-chan 100 (go (<! (async/timeout 500)))))
    (is (= "Timeout was expected" true))
    (catch :default _
      (is (= true true)))))

Pulling retry

Pulling retry provides an easy helper to keep retrying some operation until the value is something expected.

This function takes the following options and then the body:

  • ::wa/done? - a fn that should return true if the value of expression is expected and the retry should stop

  • ::wa/retry-ms (default 10ms) - time to wait before trying the operation again

  • ::wa/timeout (default 2s) - Max time to wait for operation, bail after

Body can be sync or async.

(ns com.wsscode.async.async-cljs-test
  (:require [clojure.test :refer [is are run-tests async testing deftest]]
            [cljs.core.async :as async]
            [com.wsscode.async.async-cljs :as wa :refer [deftest-async <! go]]))

(wa/deftest-async pulling-retry-test
  (testing "sync body"
    (let [x (atom nil)]
      (go
        (<! (async/timeout 300))
        (reset! x 10))
      (is (= (<! (wa/pulling-retry {::wa/done? number?} @x)) 10))))

  (testing "async body"
    (let [x (atom nil)]
      (go
        (<! (async/timeout 300))
        (reset! x 10))
      (is (= (<! (wa/pulling-retry {::wa/done? number?} (go @x))) 10))))

  (testing "quick done syntax"
    (let [x (atom nil)]
      (go
        (<! (async/timeout 300))
        (reset! x 10))
      (is (= (<! (wa/pulling-retry number? @x)) 10))))

  (testing "stop after timeout"
    (let [x (atom 0)]
      (<! (wa/pulling-retry {::wa/done?   neg?
                             ::wa/timeout 100}
            (go
              (<! (async/timeout 200))
              (swap! x + 10)
              @x)))

      (<! (async/timeout 500))

      (is (= @x 10)))))

Serialised event callback handling

This library provides a helper to serialize async event callbacks. By default, if you do event handling like this:

(.on some-object "event"
  (fn handler-fn [e]
    (go
      (-> (do-operation e) <!
          (do-more) <!))))

In case many events come rapidly, the callbacks will run in between each other, a lot of times that’s not a problem, but if you need sequencing then this may get you in trouble.

To handle this you can use the event-queue! helper:

(.on some-object "event"
  (wap/event-queue!
    (fn handler-fn [e]
      (go
        (-> (do-operation e) <!
            (do-more) <!)))))
ℹ️
wap is alias for com.wsscode.async.processing

The event-queue! returns a new callback function that instead of calling handler directly, it will add the event to a queue for processing, in case the handler returns a channel, that channel will be awaited before processing the next event, this a very easy and quick way to ensure serialisation.

By default the queue will use one (async/chan (async/dropping-buffer 1024)). You can override it with:

(.on some-object "event"
  (wap/event-queue! {::wap/channel (async/chan (async/sliding-buffer 1024))}
    (fn handler-fn [e]
      (go
        (-> (do-operation e) <!
            (do-more) <!)))))

Killing the process loop

One way to stop the processing from running is to send a custom channel, and when you want to stop processing you close it. Example:

(let [ch (async/chan (async/sliding-buffer 1024))]
  (.on some-object "event"
    (wap/event-queue! {::wap/channel ch}
      (fn handler-fn [e]
        (go
          (-> (do-operation e) <!
              (do-more) <!)))))

  ; later in the future
  (async/close! ch))

ID based callback mechanism

If you use something like Websockets for communication, depending on the library you are using they may or may not include some way to handle callback events. To handle this (or any other case were message callbacks are not a native option) this library provides some helpers.

The idea is to send a message providing some ID, and then wait for a response message to come, the response will include the same ID from the request, so they match.

This process happens in three main steps:

  1. Once we send a message requiring a callback, create something to get notified once the response arrives

  2. If you read a message that wants a response, create and send the response message

  3. Listen to message responses on the event entry point

This mechanism assumes your message are maps.

To implement 1, you create a function that wraps whatever your transmit function is:

(defn send-message! [msg]
  (original-send-message! msg)
  ; this will check if the message has a request-id, and if so will create a channel
  ; that will have data available once the message is replied
  (wap/await! msg))

Then, wrap your read side with the capture-response! helper:

(defn handle-message [msg]
  ; this will fire the handler when the message contains ::wap/response-id, otherwise
  ; it lets the message flow
  (if-not (wap/capture-response! msg)
    (original-handle-msg msg)))

In your handle, to reply a message, to this:

(defn some-handler [msg]
  (send-message! (wap/reply-message msg "reply value")))

And finally, to issue a request and wait for the callback:

(go
  (let [res (<? (send-message! (assoc msg ::wap/request-id (wap/random-request-id))))]
    (print "Response: " res)))
ℹ️
The await! helper has a built-in timeout mechanism, the default wait time is 5s.

API

There are other minor helpers not mentioned in this document, but they all have documentation on the functions, to check it out see the cljdoc page of this library.