Skip to content

Commit

Permalink
Acknowledge (#31)
Browse files Browse the repository at this point in the history
* move test data to the client section, readme++

* move acks to the clientParams

* add explicitAck subscriber parameter to ack from a function

* remove obsolete code

* even more broken

* fix the old behavior and add wfDispatcher01 example

* fix explicitAcks blocking

* refactor to subscribeTest

* make waitQueue work with ack()

* add doc and example for explicit ack
  • Loading branch information
zhirafovod authored May 3, 2024
1 parent 580660c commit bb83012
Show file tree
Hide file tree
Showing 35 changed files with 369 additions and 129 deletions.
134 changes: 103 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
* [Concurrent, Event Driven, Non-blocking](#concurrent-event-driven-non-blocking)
* [Atomic State Updates](#atomic-state-updates)
* [Pure Function Pipelines - $serial and $parallel](#pure-function-pipelines---serial-and-parallel)
* [Pub/Sub Clients Configuration](#pubsub-clients-configuration)
* [Test Data](#test-data)
* [Dispatcher mode](#dispatcher-mode)
* [Durability](#durability)
* [Pub/Sub durability models](#pubsub-durability-models)
* [Test Data](#test-data-)
* [Pulsar](#pulsar)
* [Kafka](#kafka)
* [Test Client Durability](#test-client-durability)
* [Pulsar Client Durability](#pulsar-client-durability)
* [Kafka Client Durability](#kafka-client-durability)
* [Workflow Step Logs](#workflow-step-logs)
* [snapshots](#snapshots)
* [Workflow snapshots](#workflow-snapshots)
* [retries](#retries)
* [Workflow APIs](#workflow-apis)
<!-- TOC -->
Expand Down Expand Up @@ -127,9 +130,9 @@ start: ${ (produceParams.data; $millis()) } #record start time, after test datas
# producer will be sending some test data
produceParams:
type: "my-topic"
data: ${['luke', 'han', 'leia', 'R2-D2', 'Owen', 'Biggs', 'Obi-Wan', 'Anakin', 'Chewbacca', 'Wedge'].('https://swapi.tech/api/people/?search='&$)}
client:
type: test
data: ${['luke', 'han', 'leia', 'R2-D2', 'Owen', 'Biggs', 'Obi-Wan', 'Anakin', 'Chewbacca', 'Wedge'].('https://swapi.tech/api/people/?search='&$)}
# the subscriber's 'to' function will be called on each received event
subscribeParams: #parameters for subscribing to an event
source: cloudEvent
Expand Down Expand Up @@ -223,7 +226,7 @@ array and append $rebel to it, in order to illustrate the dangers of the well-kn
joinResistance: |
/${
function($url){(
$rebel := $fetch($url).json().results[0].name;
$rebel := $fetch($url).json().results[0].name;
$set( "/rebelForces", $rebelForces~>$append($rebel)) /* BUG!! */
)}
}
Expand Down Expand Up @@ -251,53 +254,69 @@ function($x){ $x ~> f1 ~> f2 ~> f3 ~> function($x){$set('/saveResult/-', $x) } }
```
This pipeline can be concurrently dispatched safely.

# Durability
Stated Workflows provides a `$serial` and a `$parallel` function that should be used when you want to run continuous
workflow with data coming from a HTTP or cloudEvent sources. Each workflow invocation input and step processing logs are
persisted in the template in the beginning of step invocation and in the end. When the workflow is finished, the logs
are removed. Periodic snapshots include TemplateProcessor template, output and options, and can be used to recover
workflow processing from the last snapshotted state.

StatedREPL `restore` command can be used to recover the template execution from a snapshot.

## Pub/Sub durability models
Stated Workflow comes with built-in HTTP, Pulsar, Kafka and Test clients for cloudEvent subscribe command. Each client
implements its own durability model.
# Pub/Sub Clients Configuration
Stated Workflow comes with built-in HTTP, Pulsar, Kafka, Dispatcher and Test clients for cloudEvent subscribe command. Each client
implements its own durability model.

Pulsar and Kafka use server side acknowledgement to ensure that the message is not lost. Test client uses a simple
acknowledgement in the template. HTTP client blocks the synchronous HTTP response until the first step persist.

### Test Data
Test publisher and subscriber can be used to develop and test workflows. Test publisher may include a `data` type to
send it directly to test subscriber dispatcher with acknowledgement in the template.
## Test Data
Test clients for publisher and subscriber can be used to develop and test workflows. Test publisher may include `testData` array of data
to send it directly to test subscriber dispatcher with acknowledgement in the template.

Example snippets from `example/joinResistanceRecovery.yaml` template.
```yaml
produceParams:
type: "rebelDispatch"
data: ['luke', 'han', 'leia']
client:
type: test # test client produces directly to the test subscriber dispatcher
testData: ['luke', 'han', 'leia'] # test producer only makes sense when it includes test data
subscribeParams: #parameters for subscribing to an event
source: cloudEvent
type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events
to: /${saveRebelWorkflow}
subscriberId: rebelArmy
initialPosition: latest
parallelism: 1
acks: [] # if the acks are present in client: type: test, then the subscriber will be storing acknowledgement in this field
client:
type: test
data: ['obi-wan'] # test subscriber may include test data to run without a producer
acks: [] # if the acks are present in client: type: test, then the subscriber will be storing acknowledgement in this field
```
## Pulsar

## Dispatcher mode
When subscriber client type is set to `dispatcher` or `test`, it will be running the workflow and expecting an external process to add data.

# Durability
Stated Workflows provides a `$serial` and a `$parallel` function that should be used when you want to run continuous
workflow with data coming from a HTTP or cloudEvent sources. Each workflow invocation input and step processing logs are
persisted in the template in the beginning of step invocation and in the end. When the workflow is finished, the logs
are removed. Periodic snapshots include TemplateProcessor template, output and options, and can be used to recover
workflow processing from the last snapshotted state.

StatedREPL `restore` command can be used to recover the template execution from a snapshot.

## Pub/Sub durability models
Pub/Sub clients implement its own durability model. Pulsar and Kafka use server side acknowledgement to ensure that the message is not lost. Test client uses a simple
acknowledgement in the template. HTTP client blocks the synchronous HTTP response until the first step persist.

### Test Client Durability
To develop and test workflows with the test client adding `acks` field will enable test acknowledgements in the client. Messages processed by
the workflow will be added to the `acks` array.
```
client:
type: test
data: ['obi-wan'] # test subscriber may include test data to run without a producer
acks: [] # if the acks are present in client: type: test, then the subscriber will be storing acknowledgement in this field
```
### Pulsar Client Durability
Pulsar pub/sub relies on server side acknowledgement, similar to the test data. On failure or restart, the subscriber
will continue from the next unacknowledged message, and will skip steps already completed in the restored workflow
snapshot.
## Kafka
### Kafka Client Durability
Kafka can rely on the consumer group commited offset. For parallelism greater than 1, the subscriber will be storing
all unacknowledged messages and calculating the lowest offset to be commited. A combination of snapshotted stated and
kafka server side consumer offset helps to minimize double-processing of already processed steps.
kafka server side consumer offset helps to minimize double-processing of already processed steps.
## Workflow Step Logs
The `$serial` and `$parallel` functions accept an array of object called "steps". A step is nothing but an object with
Expand Down Expand Up @@ -338,7 +357,7 @@ The id is used as a log key. The logs are stored inside each step.
}
}
```
When the step completes, its invocation log is removed.
When a step completes, its invocation log is removed.
```json
{
"out": "${ $serial([f1, f2])}",
Expand All @@ -355,7 +374,7 @@ When the step completes, its invocation log is removed.
The `$serial` and `$parallel` functions understand the logs. When a template is restored from a snapshot, `$serial` and
`$parallel` use these logs to skip completed steps and resume their work at the last incomplete step in every invocation log.

## snapshots
## Workflow snapshots
Snapshots save the state of a stated template, repeatedly as it runs. Snapshotting allows a Stated Workflow to be stopped
non-gracefully, and restored from the snapshot. The step logs allow invocations to restart where they left off.

Expand Down Expand Up @@ -440,6 +459,59 @@ get all 3 of them there.
]
```

## Explicit Acknowledgement
The `ack` function can be used to acknowledge the message in the subscriber. Providing in the client configuration `explicitAck: true` will enable
the explicit acknowledgement in the subscriber. The `ack` function should be called at the end of the subscriber workflow invocation to acknowledge the message.
```yaml
start: "${( $subscribe(subscribeParams); $publish(publishParams) )}"
subscribeParams: #parameters for subscribing to a http request
to: ../${func}
type: "rebel"
parallelism: 2
source: "cloudEvent"
client:
explicitAck: true
acks: []
type: dispatcher
publishParams:
type: "rebel"
source: "cloudEvent"
client:
type: test
data: [{type: "rebel", name: "luke"},{type: "rebel", name: "han"},{type: "rebel", name: "leia"}]
func: "/${ function($data){( $console.log('got: ' & $data); $forked('/input',$data); )} }"
input: null
process: |
${(
$console.log('processing: ' & $$.input);
$$.input != null ? (
$joined('/results/-', $$.input);
$ack($$.input)
)
: 0
)}
results: []
report: "${( $console.log('result added: ' & $$.results) )}"
```
```yaml ["$count(data)=3"]
> .init -f "example/explicitAck.yaml" --tail "/results until $~>$count=3"
[
{
"type": "rebel",
"name": "luke"
},
{
"type": "rebel",
"name": "han"
},
{
"type": "rebel",
"name": "leia"
}
]
```

# retries
Each step can provide an optional boolean function `shouldRetry`. On a workflow invocation failure the function will be
called with an invocation log passed as an argument. If the function returns true, the function will be retried.
Expand Down
2 changes: 1 addition & 1 deletion example/backpressure.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# producer will be sending some test data
produceParams:
type: "my-topic"
data: ${[1..10].('bleep_' & $string($))}
client:
type: test
data: ${[1..10].('bleep_' & $string($))}
# the subscriber's 'to' function will be called on each received event
slowSubscribeParams: #parameters for subscribing to an event
source: cloudEvent
Expand Down
4 changes: 2 additions & 2 deletions example/correlate.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ listenForResponse:
initialPosition: latest
command:
type: 'my-topic'
data: [{type: 'DO_IT', correlationId: "${ $floor($random()*100000) }" }]
client:
type: test
data: [{type: 'DO_IT', correlationId: "${ $floor($random()*100000) }" }]
resp:
type: 'my-topic'
data: [{type: 'DONE_IT', correlationId: 'set me'}]
client:
type: test
data: [{type: 'DONE_IT', correlationId: 'set me'}]
listenForCommand:
source: cloudEvent
type: 'my-topic'
Expand Down
2 changes: 1 addition & 1 deletion example/dumpster/changePersistence.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# producer will be sending some test data
produceParams:
type: "my-topic"
data: ['luke', 'han', 'leia']
client:
type: test
data: ['luke', 'han', 'leia']
# the subscriber's 'to' function will be called on each received event
subscribeParams: #parameters for subscribing to an event
source: cloudEvent
Expand Down
12 changes: 6 additions & 6 deletions example/dumpster/pubsub-kafka.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# Droid R2-D2 is sending messages to the Rebel Alliance's communication channel
produceParams:
type: "rebel-comm-channel"
data: |
${
function(){
{'message': 'Rebel Fleet Coordinates', 'location': $random()}
}
}
client:
type: pulsar
data: |
${
function(){
{'message': 'Rebel Fleet Coordinates', 'location': $random()}
}
}
# Droid C-3PO will intercept and log each received message for the Rebel Alliance
subscribeParams: #parameters for subscribing to a holocomm transmission
source: cloudEvent
Expand Down
2 changes: 1 addition & 1 deletion example/dumpster/pubsub-pulsar-backup.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# producer will be sending some test data
produceParams:
type: "my-topic"
data: ['luke', 'han', 'leia']
client:
type: pulsar
data: ['luke', 'han', 'leia']
# the subscriber's 'to' function will be called on each received event
subscribeParams: #parameters for subscribing to an event
source: cloudEvent
Expand Down
2 changes: 1 addition & 1 deletion example/dumpster/test.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
start$: $subscribe(subscribeParams)
name: nozzleWork
subscribeParams: #parameters for subscribing to a cloud event
testData: "${ [1..10000].({'name': 'nozzleTime', 'order':$}) }"
type: sys:cron
filter$: function($e){ $e.name='nozzleTime' }
to: ../${myWorkflow$}
parallelism: 8
source: cloudEvent
client:
type: test
testData: "${ [1..10000].({'name': 'nozzleTime', 'order':$}) }"

myWorkflow$: |
function($e){
Expand Down
4 changes: 3 additions & 1 deletion example/dumpster/wf-all.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ start$: $subscribe(subscribeParams, {})
name: nozzleWork
subscribeParams: #parameters for subscribing to a cloud event
source: cloudEvent
testData: "${ [1].([{'name': 'nozzleTime', 'order':$}]) }"
type: 'my-topic'
filter$: function($e){ $e.name='nozzleTime' }
to: ../${myWorkflow$}
parallelism: 2
subscriberId: ../${name}
client:
type: test
testData: "${ [1].([{'name': 'nozzleTime', 'order':$}]) }"
myWorkflow$: |
function($e){
$e ~> $serial([step1, step2])
Expand Down
4 changes: 3 additions & 1 deletion example/dumpster/wf-continuous.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ start$: $subscribe(subscribeParams, {})
name: nozzleWork
subscribeParams: #parameters for subscribing to a cloud event
source: cloudEvent
testData: "${ [1].([{'name': 'nozzleTime', 'order':$}]) }"
type: 'my-topic'
filter$: function($e){ $e.name='nozzleTime' }
to: ../${myWorkflow$}
parallelism: 2
subscriberId: ../${name}
client:
type: test
testData: "${ [1].([{'name': 'nozzleTime', 'order':$}]) }"
myWorkflow$: |
function($e){
$e ~> $serial([step1, step2])
Expand Down
8 changes: 6 additions & 2 deletions example/dumpster/wf-wip.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,24 @@ start2$: "$recover(subscribeParams2) ~> $subscribe(subscribeParams2, {})"
name: nozzleWork
subscribeParams1: #parameters for subscribing to a cloud event
source: cloudEvent
testData: "${ [1].([{'name': 'nozzleTime1', 'order':$}]) }"
type: 'my-topic'
filter$: function($e){ $e.name='nozzleTime' }
to: ../${myWorkflow1$}
parallelism: 2
subscriberId: ../${name}
client:
type: test
testData: "${ [1].([{'name': 'nozzleTime1', 'order':$}]) }"
subscribeParams2: #parameters for subscribing to a cloud event
source: cloudEvent
testData: "${ [1].([{'name': 'nozzleTime2', 'order':$}]) }"
type: 'my-topic'
filter$: function($e){ $e.name='nozzleTime' }
to: ../${myWorkflow2$}
parallelism: 2
subscriberId: 'nozzleWork2'
client:
type: test
testData: "${ [1].([{'name': 'nozzleTime2', 'order':$}]) }"
myWorkflow1$: |
function($e){
$e ~> $serial([step1, step2])
Expand Down
2 changes: 1 addition & 1 deletion example/dumpster/workflow-fso.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
# -H'appd-pid: ImZvb0BiYXIuY29tIg=='
knowledgeStore:
type: "my-topic"
data: "${ function(){ {'msg': 'hello', 'rando': $random()} } }"
client:
type: test
data: "${ function(){ {'msg': 'hello', 'rando': $random()} } }"
# producer will be invoking "to" function for each consumed event
subscribeParams: #parameters for subscribing to a cloud event
source: cloudEvent
Expand Down
Loading

0 comments on commit bb83012

Please sign in to comment.