Core.async
is the standard way to handle async features in Clojure and ClojureScript programs.
Although core.async is built upon CSP, often (specially in CLJS) is desired to have something that’s more like Promises/Futures.
Core.async provides a promise-chan
, which is a channel that has a promise-like semantic:
after realization, any read on it will keep returning the same value. That’s helpful but
this doesn’t cover the problem of error propagation, in a promise system it’s expected
that errors can flow up to be captured by some higher level code.
This library provide some helpers to deal with this problem, and also:
-
helpers to integrate with Javascript Promises in ClojureScript environments.
-
helpers to write tests using core.async and cljs.test.
-
serialized event callback handling
-
ID
based async callbacks -
helpers for timeouts using error handling
-
helpers to pulling retry operations
This library uses separated namespaces for Clojure and ClojureScript, when using the
Clojure side, use the namespace com.wsscode.async.async-clj
, and for ClojureScript
use com.wsscode.async.async-cljs
. Unless pointed otherwise, the features on both
namespaces are the same.
If you want to use this with Clojure Common files, you can use the require like this:
(ns my-ns
(:require [#?(:clj com.wsscode.async.async-clj
:cljs com.wsscode.async.async-cljs)
:as wa
:refer [go-promise <?]]))
To deal with error propagation, the trick is to return the error object as the channel value, but also throw that error when reading the channel. Let’s illustrate that with an example:
(ns my-ns
(:require [com.wsscode.async.async-clj :refer [go-promise <?]))
(defn async-divide [x y]
; (1)
(go-promise
(/ x y)))
(defn run [d]
(go-promise
(try
; (2)
(<? (async-divide 6 d))
(catch Throwable _
"ERROR"))))
(comment
(<!! (run 2)) ; => 3 (3)
(<!! (run 0)) ; => "ERROR" (4)
)
-
go-promise
is similar togo
, but will return a promise-channel, so in case this result gets passed to multiple readers, all of them will get the realized value or error. This also wraps the block in a try/catch, so if some exception happens it will get returned as the channel value. -
<?
works like<!
, but once it reads a value, it will check if it’s an error, and if so it will throw that error, propagating it up on the chain. -
propagate value back
-
error propagated from async-divide trying to divide by zero
By following this pattern of error capture and send, this system ends up working in the
same ways you would expect a promise, all while still in the same go
blocks, making
it compatible with standard core.async functionality. Later we will also talk about how
to integrate with Javascript Promises in this system.
Another difference when using go-promise
is that different from regular go
, you can
return nil
as a value. Since the promise will always return the same value, a nil
is not ambiguous.
ℹ️
|
In the implementation side, go-promise check if the value returned is nil , and
if so it just closes the channel, making it an effective nil value.
|
We just saw the helper <?
to do a take and check for errors, here is a list of other
take helpers provided by this library:
-
<?
- take and throw errors -
<?!
(clj only) - take and throw errors blocking on thread -
<!p
(cljs only) - take from JS Promise -
<!maybe
- if param is a channel, take from it, otherwise return its value -
<!!maybe
(clj only) - like<!maybe
but blocking the thread -
<?maybe
- take and throw errors, return value if it’s not a channel -
<?!maybe
(clj only) - like<?maybe
but blocking the thread
💡
|
in ClojureScript <?maybe can also take from Promises
|
If you try to use try/catch
in a loop, you will notice that it doesn’t work. To go
around this limitation this library provides the helper go-try-stream
, which will
do the necessary setup to make this work.
Example usage:
(wa/go-try-stream [value c]
(swap! vals conj value)
(catch :default e
(swap! vals conj (ex-message e))))
While working in Javascript it’s common to need to handle Promises, to help with this
there is a macro in this library that enables the read of JS promises as if they
were core.async
channels, the <!p
helper:
(ns my-ns
(:require [com.wsscode.async.async-cljs :refer [go-promise <? <!p]))
; (1)
(go-promise
(-> (js/fetch "/") <!p
(.text) <!p
js/console.log))
-
Read the index text of the current domain, note we are waiting for two different promises in this example, the first one for the fetch headers and the second to get the body text.
ℹ️
|
the way <!p works is by first converting the Promise into a core.async channel
and them read on that channel, for core.async sake it’s channels all the way.
|
Note that this strategy allows the mixing of both core.async
channels and promises
in the same system, you can both park for channels or promises.
Dealing with async tests in cljs.test can be annoying, the core doesn’t have any integration
with core.async, neither it handles common problems like timing out a test. This library
provides a helper called deftest-async
that aims to facilitate the tests of async core
using core.async. Example usage:
(ns com.wsscode.async.async-cljs-test
(:require [clojure.test :refer [is are run-tests async testing deftest]]
[com.wsscode.async.async-cljs :as wa :refer [deftest-async <! go]]))
(deftest-async my-test
(is (= "foo" (<! (go "foo")))))
This macro will do a couple of things:
-
It will wrap the body in a
go-promise
block, allowing the use of parking operations -
Try/catch this block, if any error happens (sync or async) that generates a test case that will fail with that error
-
Add a 2 seconds timeout, if the
go
block doesn’t return in this time it will cancel and fail the test
You can configure the timeout duration, example:
(ns com.wsscode.async.async-cljs-test
(:require [clojure.test :refer [is are run-tests async testing deftest]]
[com.wsscode.async.async-cljs :as wa :refer [deftest-async <! go]]))
(deftest-async my-test
{::wa/timeout 5000} ; 5 seconds timeout
(is (= "foo" (<! (go "foo")))))
💡
|
if you want to use this helper with a different test constructor (from Workspaces
or Devcards for example) you can use the wa/async-test helper instead
|
Timeout is something that we often need, since default core.async doesn’t has a concept
of errors, its hard to abstract that away, and requires some tedious code using alts!
.
This library provides the timeout-chan
helper that returns a new channel that will
get the result, or a timeout error after the timeout time specified.
(ns com.wsscode.async.async-cljs-test
(:require [clojure.test :refer [is are run-tests async testing deftest]]
[cljs.core.async :as async]
[com.wsscode.async.async-cljs :as wa :refer [deftest-async <! go]]))
(wa/deftest-async timeout-chan-test
(try
; 100ms timeout
(wa/<? (wa/timeout-chan 100 (go (<! (async/timeout 500)))))
(is (= "Timeout was expected" true))
(catch :default _
(is (= true true)))))
Pulling retry provides an easy helper to keep retrying some operation until the value is something expected.
This function takes the following options and then the body:
-
::wa/done?
- afn
that should return true if the value of expression is expected and the retry should stop -
::wa/retry-ms
(default 10ms) - time to wait before trying the operation again -
::wa/timeout
(default 2s) - Max time to wait for operation, bail after
Body can be sync or async.
(ns com.wsscode.async.async-cljs-test
(:require [clojure.test :refer [is are run-tests async testing deftest]]
[cljs.core.async :as async]
[com.wsscode.async.async-cljs :as wa :refer [deftest-async <! go]]))
(wa/deftest-async pulling-retry-test
(testing "sync body"
(let [x (atom nil)]
(go
(<! (async/timeout 300))
(reset! x 10))
(is (= (<! (wa/pulling-retry {::wa/done? number?} @x)) 10))))
(testing "async body"
(let [x (atom nil)]
(go
(<! (async/timeout 300))
(reset! x 10))
(is (= (<! (wa/pulling-retry {::wa/done? number?} (go @x))) 10))))
(testing "quick done syntax"
(let [x (atom nil)]
(go
(<! (async/timeout 300))
(reset! x 10))
(is (= (<! (wa/pulling-retry number? @x)) 10))))
(testing "stop after timeout"
(let [x (atom 0)]
(<! (wa/pulling-retry {::wa/done? neg?
::wa/timeout 100}
(go
(<! (async/timeout 200))
(swap! x + 10)
@x)))
(<! (async/timeout 500))
(is (= @x 10)))))
This library provides a helper to serialize async event callbacks. By default, if you do event handling like this:
(.on some-object "event"
(fn handler-fn [e]
(go
(-> (do-operation e) <!
(do-more) <!))))
In case many events come rapidly, the callbacks will run in between each other, a lot of times that’s not a problem, but if you need sequencing then this may get you in trouble.
To handle this you can use the event-queue!
helper:
(.on some-object "event"
(wap/event-queue!
(fn handler-fn [e]
(go
(-> (do-operation e) <!
(do-more) <!)))))
ℹ️
|
wap is alias for com.wsscode.async.processing
|
The event-queue!
returns a new callback function that instead of calling handler
directly,
it will add the event to a queue for processing, in case the handler returns a channel,
that channel will be awaited before processing the next event, this a very easy and
quick way to ensure serialisation.
By default the queue will use one (async/chan (async/dropping-buffer 1024))
. You can
override it with:
(.on some-object "event"
(wap/event-queue! {::wap/channel (async/chan (async/sliding-buffer 1024))}
(fn handler-fn [e]
(go
(-> (do-operation e) <!
(do-more) <!)))))
One way to stop the processing from running is to send a custom channel, and when you want to stop processing you close it. Example:
(let [ch (async/chan (async/sliding-buffer 1024))]
(.on some-object "event"
(wap/event-queue! {::wap/channel ch}
(fn handler-fn [e]
(go
(-> (do-operation e) <!
(do-more) <!)))))
; later in the future
(async/close! ch))
If you use something like Websockets for communication, depending on the library you are using they may or may not include some way to handle callback events. To handle this (or any other case were message callbacks are not a native option) this library provides some helpers.
The idea is to send a message providing some ID
, and then wait for a response message
to come, the response will include the same ID
from the request, so they match.
This process happens in three main steps:
-
Once we send a message requiring a callback, create something to get notified once the response arrives
-
If you read a message that wants a response, create and send the response message
-
Listen to message responses on the event entry point
This mechanism assumes your message are maps.
To implement 1
, you create a function that wraps whatever your transmit function is:
(defn send-message! [msg]
(original-send-message! msg)
; this will check if the message has a request-id, and if so will create a channel
; that will have data available once the message is replied
(wap/await! msg))
Then, wrap your read side with the capture-response!
helper:
(defn handle-message [msg]
; this will fire the handler when the message contains ::wap/response-id, otherwise
; it lets the message flow
(if-not (wap/capture-response! msg)
(original-handle-msg msg)))
In your handle, to reply a message, to this:
(defn some-handler [msg]
(send-message! (wap/reply-message msg "reply value")))
And finally, to issue a request and wait for the callback:
(go
(let [res (<? (send-message! (assoc msg ::wap/request-id (wap/random-request-id))))]
(print "Response: " res)))
ℹ️
|
The await! helper has a built-in timeout mechanism, the default wait time is 5s.
|
There are other minor helpers not mentioned in this document, but they all have documentation on the functions, to check it out see the cljdoc page of this library.