Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cop cloud events #30

Merged
merged 3 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ template.
```json
> .init -f "example/homeworld.json"
{
"lukePersonDetails": "${ $fetch('https://swapi.dev/api/people/?search=luke').json().results[0]}",
"lukePersonDetails": "${ $fetch('https://swapi.tech/api/people/?name=luke').json().result[0].properties}",
"lukeHomeworldURL": "${ lukePersonDetails.homeworld }",
"homeworldDetails": "${ $fetch(lukeHomeworldURL).json() }",
"homeworldName": "${ homeworldDetails.name }"
Expand Down Expand Up @@ -127,7 +127,7 @@ start: ${ (produceParams.data; $millis()) } #record start time, after test datas
# producer will be sending some test data
produceParams:
type: "my-topic"
data: ${['luke', 'han', 'leia', 'R2-D2', 'Owen', 'Biggs', 'Obi-Wan', 'Anakin', 'Chewbacca', 'Wedge'].('https://swapi.dev/api/people/?search='&$)}
data: ${['luke', 'han', 'leia', 'R2-D2', 'Owen', 'Biggs', 'Obi-Wan', 'Anakin', 'Chewbacca', 'Wedge'].('https://swapi.tech/api/people/?search='&$)}
client:
type: test
# the subscriber's 'to' function will be called on each received event
Expand Down Expand Up @@ -398,7 +398,7 @@ Below command will start the workflow and tail the output until `simulateFailure
"function": "/${ \n function($rebel){ \n $rebel ~> $serial(\n [fetchRebel, saveRebel],\n {'workflowInvocation': $rebel} \n ) } }\n"
},
"fetchRebel": {
"function": "/${ \n function($rebel){(\n $console.debug('fetchRebel input: ' & $rebel);\n $r := $rebel.$fetch('https://swapi.dev/api/people/?search='&$).json().results[0];\n $console.debug('fetchRebel fetched: ' & $r); \n $set('/fetchLog/-',$rebel);\n $console.debug('logged fetch: ' & $r);\n $r;\n )} \n}\n"
"function": "/${ \n function($rebel){(\n $console.debug('fetchRebel input: ' & $rebel);\n $r := $rebel.$fetch('https://swapi.tech/api/people/?name='&$).json().result[0].properties;\n $console.debug('fetchRebel fetched: ' & $r); \n $set('/fetchLog/-',$rebel);\n $console.debug('logged fetch: ' & $r);\n $r;\n )} \n}\n"
},
"saveRebel": {
"function": "/${ \n function($rebel){(\n $console.debug('saveRebel input: ' & $rebel);\n ($count(rebels) = 1 and simulateFailure)?(\n $set('/simulateFailure', false); \n $console.log('sleep forever on : ' & $rebel);\n $sleep(1000000);\n );\n $rebel ? $set('/rebels/-',{'name':$rebel.name, 'url':$rebel.homeworld});\n $console.debug('saveRebel saved: ' & {'name':$rebel.name, 'url':$rebel.homeworld});\n )} \n}\n"
Expand Down Expand Up @@ -427,15 +427,15 @@ get all 3 of them there.
[
{
"name": "Luke Skywalker",
"url": "https://swapi.dev/api/planets/1/"
"url": "https://www.swapi.tech/api/planets/1"
},
{
"name": "Han Solo",
"url": "https://swapi.dev/api/planets/22/"
"url": "https://www.swapi.tech/api/planets/22"
},
{
"name": "Leia Organa",
"url": "https://swapi.dev/api/planets/2/"
"url": "https://www.swapi.tech/api/planets/2"
}
]
```
Expand All @@ -454,7 +454,7 @@ The following example shows how to use the `shouldRetry` function to retry a ste
"connectionError": true,
"steps": [
{
"function": "${ function($person){$fetch('https://swapi.dev/api/people/?search='& $person).json().results[0]} }"
"function": "${ function($person){$fetch('https://swapi.tech/api/people/?name='& $person).json().result[0].properties} }"
},
{
"function": "${ function($personDetail){$personDetail.homeworld } }"
Expand Down
2 changes: 1 addition & 1 deletion example/concurrent-homeworlds.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"people": ["luke", "han"],
"personDetails": "!${ people.$fetch('https://swapi.dev/api/people/?search='& $).json().results[0]}",
"personDetails": "!${ people.$fetch('https://swapi.tech/api/people/?search='& $).json().results[0]}",
"homeworldURLs": "${ personDetails.homeworld }",
"homeworldDetails": "!${ homeworldURLs.$fetch($).json() }",
"homeworldName": "${ homeworldDetails.name }"
Expand Down
22 changes: 22 additions & 0 deletions example/dumpster/changePersistence.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# producer will be sending some test data
produceParams:
type: "my-topic"
data: ['luke', 'han', 'leia']
client:
type: test
# the subscriber's 'to' function will be called on each received event
subscribeParams: #parameters for subscribing to an event
source: cloudEvent
type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events
to: /${joinResistance}
subscriberId: rebelArmy
initialPosition: latest
client:
type: test
joinResistance: /${ function($rebel){ $set('/rebelForces', rebelForces~>$append($rebel))} }
# starts producer function
send$: $publish(produceParams)
# starts consumer function
recv$: $subscribe(subscribeParams)
# the subscriber's `to` function will write the received data here
rebelForces: [ ]
6 changes: 6 additions & 0 deletions example/dumpster/concurrent-homeworlds-debug.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"people": ["luke", "han"],
"personDetails": "${ people.$fetch('https://swapi.tech/api/people/?search='& $).json().results[0]}",
"homeworldURLs": "${ personDetails.homeworld }",
"homeworldDetails": "${ homeworldURLs.$fetch( $ ).json() }"
}
22 changes: 22 additions & 0 deletions example/dumpster/fso-orion.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"start$": "$subscribe(subscribeParams, {})",
"name": "nozzleWork",
"subscribeParams": {
"source": "cloudEvent",
"testData": "${ [1].([{'name': 'nozzleTime', 'order':$}]) }",
"type": "my-topic",
"filter$": "function($e){ $e.name='nozzleTime' }",
"to": "../${myWorkflow$}",
"parallelism": 2,
"subscriberId": "../${name}"
},
"myWorkflow$": "function($e){\n $e ~> $serial([step1, step2])\n}\n",
"step1": {
"name": "primeTheNozzle",
"function": "${function($e){ $e~>|$|{'primed':true}| }}"
},
"step2": {
"name": "sprayTheNozzle",
"function": "${function($e){ $e~>|$|{'sprayed':true}| }}"
}
}
6 changes: 6 additions & 0 deletions example/dumpster/fso1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"lukePersonDetails": "${ curl -i -X 'GET' -H 'Accept: application/json' -H 'Layer-Id: 55f8da9d-93de-4bf2-b818-c2c2ac1d3f8d' -H 'Layer-Type: TENANT' $APPD_JSON_STORE_URL/v1/objects/extensibility:solution/agent -H'appd-pty: IlVTRVIi' -H'appd-pid: ImZvb0BiYXIuY29tIg=='}",
"lukeHomeworldURL": "${ lukePersonDetails.homeworld }",
"homeworldDetails": "${ $fetch(lukeHomeworldURL).json() }",
"homeworldName": "${ homeworldDetails.name }"
}
33 changes: 33 additions & 0 deletions example/dumpster/pubsub-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Droid R2-D2 is sending messages to the Rebel Alliance's communication channel
produceParams:
type: "rebel-comm-channel"
data: |
${
function(){
{'message': 'Rebel Fleet Coordinates', 'location': $random()}
}
}
client:
type: pulsar
# Droid C-3PO will intercept and log each received message for the Rebel Alliance
subscribeParams: #parameters for subscribing to a holocomm transmission
source: cloudEvent
type: /${ produceParams.type } # subscribe to the same channel as R2-D2 to intercept messages
to: |
/${
function($e){(
$set('/interceptedMessages/-', $e);
)}
}
subscriberId: protocolDroid
initialPosition: latest
client:
type: pulsar
# Activates R2-D2's message transmission function every 50 milliseconds
send: "${ $setInterval( function(){ $publish(produceParams)}, 50) }"
# Activates C-3PO's message interception function
recv$: $subscribe(subscribeParams)
# interceptedMessages is where C-3PO will store the results of message decoding
interceptedMessages: [ ]
# This condition stops the operation when interceptedMessages has 10 elements
stop$: ($count(interceptedMessages)>=10?($clearInterval(send);'missionAccomplished'):'operationOngoing')
31 changes: 31 additions & 0 deletions example/dumpster/pubsub-pulsar-backup.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# producer will be sending some test data
produceParams:
type: "my-topic"
data: ['luke', 'han', 'leia']
client:
type: pulsar
# the subscriber's 'to' function will be called on each received event
subscribeParams: #parameters for subscribing to an event
source: cloudEvent
type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events
to: /${joinResistance}
subscriberId: rebelArmy
initialPosition: latest
client:
type: pulsar
joinResistance: |
/${
function($rebel){
$rebel ~> $serial([joinResistanceStep])
}
}
joinResistanceStep:
name: joinResistanceStep
function: /${ function($rebel){ $set('/rebelForces', rebelForces~>$append($rebel))} }
# starts producer function
send$: $publish(produceParams)
# starts consumer function
recv$: $subscribe(subscribeParams)
# the subscriber's `to` function will write the received data here
rebelForces: [ ]
recover$: $recoverTo(joinResistance)
7 changes: 7 additions & 0 deletions example/dumpster/test.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"step1": {"function": "${ function($e){ $e~>|$|{'primed':true}|} }"},
"step2": {"function": "${ function($e){ $e~>|$|{'sprayed':true}|} }"},
"workflow": "${ function($e){ $e ~> $serial([step1, step2]) } }",
"output": "${ (koink; [{'name': '1'}, {'name': '2'}] ~> $map(workflow))}",
"konik": "some konik"
}
22 changes: 22 additions & 0 deletions example/dumpster/test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
start$: $subscribe(subscribeParams)
name: nozzleWork
subscribeParams: #parameters for subscribing to a cloud event
testData: "${ [1..10000].({'name': 'nozzleTime', 'order':$}) }"
type: sys:cron
filter$: function($e){ $e.name='nozzleTime' }
to: ../${myWorkflow$}
parallelism: 8
source: cloudEvent
client:
type: test

myWorkflow$: |
function($e){
$e ~> $serial([step1, step2])
}
step1:
name: primeTheNozzle
function: ${function($e){ ($e~>|$|{'primed':true}|) }}
step2:
name: sprayTheNozzle
function: ${function($e){ $e~>|$|{'sprayed':true}| }}
19 changes: 19 additions & 0 deletions example/dumpster/wf-all.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"startEvent": "tada",
"a": {
"function": "${ function($in) { ( $console.log($in); [$in, 'a'] ~> $join('->') )} }"
},
"b": {
"function": "${ function($in) { [$in, 'b'] ~> $join('->') } }"
},
"c": {
"function": "${ function($in) { ( $console.log($in); [$in, 'c'] ~> $join('->') )} }"
},
"d": {
"function": "${ function($in) { ( $console.log($in); [$in, 'd'] ~> $join('->') )} }"
},
"workflow1": "${ function($startEvent) { $startEvent ~> $serial([a, b]) } }",
"workflow1out": "${ workflow1(startEvent)}",
"workflow2": "${ function($startEvent) { $startEvent ~> $parallel([c,d]) } }",
"workflow2out": "${ workflow2(startEvent)}"
}
21 changes: 21 additions & 0 deletions example/dumpster/wf-all.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
start$: $subscribe(subscribeParams, {})
name: nozzleWork
subscribeParams: #parameters for subscribing to a cloud event
source: cloudEvent
testData: "${ [1].([{'name': 'nozzleTime', 'order':$}]) }"
type: 'my-topic'
filter$: function($e){ $e.name='nozzleTime' }
to: ../${myWorkflow$}
parallelism: 2
subscriberId: ../${name}
myWorkflow$: |
function($e){
$e ~> $serial([step1, step2])
}
step1:
name: primeTheNozzle
function: ${function($e){ $e~>|$|{'primed':true}| }}
step2:
name: sprayTheNozzle
function: ${function($e){ $e~>|$|{'sprayed':true}| }}

22 changes: 22 additions & 0 deletions example/dumpster/wf-continuous.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
start$: $subscribe(subscribeParams, {})
name: nozzleWork
subscribeParams: #parameters for subscribing to a cloud event
source: cloudEvent
testData: "${ [1].([{'name': 'nozzleTime', 'order':$}]) }"
type: 'my-topic'
filter$: function($e){ $e.name='nozzleTime' }
to: ../${myWorkflow$}
parallelism: 2
subscriberId: ../${name}
myWorkflow$: |
function($e){
$e ~> $serial([step1, step2])
}
step1:
name: primeTheNozzle
function: ${function($e){ $e~>|$|{'primed':true}| }}
step2:
name: sprayTheNozzle
function: ${function($e){ $e~>|$|{'sprayed':true}| }}
stop$: ($count(step1)=5?($clearInterval(send$);'done'):'still going')

39 changes: 39 additions & 0 deletions example/dumpster/wf-wip.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
start1$: "$recover(subscribeParams1) ~> $subscribe(subscribeParams1, {})"
start2$: "$recover(subscribeParams2) ~> $subscribe(subscribeParams2, {})"
name: nozzleWork
subscribeParams1: #parameters for subscribing to a cloud event
source: cloudEvent
testData: "${ [1].([{'name': 'nozzleTime1', 'order':$}]) }"
type: 'my-topic'
filter$: function($e){ $e.name='nozzleTime' }
to: ../${myWorkflow1$}
parallelism: 2
subscriberId: ../${name}
subscribeParams2: #parameters for subscribing to a cloud event
source: cloudEvent
testData: "${ [1].([{'name': 'nozzleTime2', 'order':$}]) }"
type: 'my-topic'
filter$: function($e){ $e.name='nozzleTime' }
to: ../${myWorkflow2$}
parallelism: 2
subscriberId: 'nozzleWork2'
myWorkflow1$: |
function($e){
$e ~> $serial([step1, step2])
}
myWorkflow2$: |
function($e){
$e ~> $serial([step3, step4])
}
step1:
name: primeTheNozzle
function: ${function($e){ $e~>|$|{'primed':true}| }}
step2:
name: sprayTheNozzle
function: ${function($e){ $e~>|$|{'sprayed':true}| }}
step3:
name: primeTheNozzle
function: ${function($e){ $e~>|$|{'primed':true}| }}
step4:
name: sprayTheNozzle
function: ${function($e){ $e~>|$|{'sprayed':true}| }}
22 changes: 22 additions & 0 deletions example/dumpster/wf.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"start$": "$subscribe(subscribeParams, {})",
"name": "nozzleWork",
"subscribeParams": {
"source": "cloudEvent",
"testData": "${ [1].([{'name': 'nozzleTime', 'order':$}]) }",
"type": "my-topic",
"filter$": "function($e){ $e.name='nozzleTime' }",
"to": "../${myWorkflow$}",
"parallelism": 2,
"subscriberId": "../${name}"
},
"myWorkflow$": "function($e){\n $e ~> $serial([step1, step2])\n}\n",
"step1": {
"name": "primeTheNozzle",
"function": "${function($e){ $e~>|$|{'primed':true}| }}"
},
"step2": {
"name": "sprayTheNozzle",
"function": "${function($e){ $e~>|$|{'sprayed':true}| }}"
}
}
32 changes: 32 additions & 0 deletions example/dumpster/workflow-fso.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# producer will be sending data function output to the "type" topic
# curl -i
# -X 'GET'
# -H 'Accept: application/json'
# -H 'Layer-Id: 55f8da9d-93de-4bf2-b818-c2c2ac1d3f8d'
# -H 'Layer-Type: TENANT' $APPD_JSON_STORE_URL/v1/objects/extensibility:solution/agent
# -H'appd-pty: IlVTRVIi'
# -H'appd-pid: ImZvb0BiYXIuY29tIg=='
knowledgeStore:
type: "my-topic"
data: "${ function(){ {'msg': 'hello', 'rando': $random()} } }"
client:
type: test
# producer will be invoking "to" function for each consumed event
subscribeParams: #parameters for subscribing to a cloud event
source: cloudEvent
type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events
to: /${ function($e){(
$set('/rxLog', rxLog~>$append($e));
)} }
subscriberId: dingus
initialPosition: latest
client:
type: test
# starts producer function
send: "${ $setInterval( function(){ $publish(produceParams)}, 50) }"
# starts consumer function
recv$: $subscribe(subscribeParams)
# rxLog is a field of the template where the consumer function will be storing results of event processing
rxLog: [ ]
# this is a condition that will stop the workflow when rxLog has 5 elements
stop$: ($count(rxLog)=10?($clearInterval(send);'done'):'still going')
2 changes: 1 addition & 1 deletion example/homeworld-scrambled.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"homeworldName": "${ homeworldDetails.name }",
"lukePersonDetails": "${ $fetch('https://swapi.dev/api/people/?search=luke').json().results[0]}",
"lukePersonDetails": "${ $fetch('https://swapi.tech/api/people/?search=luke').json().results[0]}",
"homeworldDetails": "${ $fetch(lukeHomeworldURL).json() }",
"lukeHomeworldURL": "${ lukePersonDetails.homeworld }"
}
Loading
Loading