From 1a4ad9a58d38552c8cd9aa8070f867b510fae87c Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Fri, 19 Apr 2024 14:34:43 -0700 Subject: [PATCH 1/3] add support to consume messages from cop --- package-lock.json | 274 ++++++++++++++++++++++++++++- package.json | 3 + src/workflow/StatedWorkflow.js | 73 +++++++- yarn.lock | 306 +++++++++++++++++++++++---------- 4 files changed, 564 insertions(+), 92 deletions(-) diff --git a/package-lock.json b/package-lock.json index 8bc7aeb..ee4dee7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,11 +9,14 @@ "version": "0.0.1", "license": "Apache-2.0", "dependencies": { + "@kafkajs/confluent-schema-registry": "^3.3.0", "@opentelemetry/api": "^1.8.0", "@opentelemetry/sdk-metrics-base": "^0.31.0", + "avsc": "^5.7.7", "body-parser": "^1.20.2", "express": "^4.19.2", "kafkajs": "^2.2.4", + "kafkajs-lz4": "^1.2.1", "pulsar-client": "^1.9.0", "stated-js": "^0.1.20" }, @@ -1075,6 +1078,17 @@ "@jridgewell/sourcemap-codec": "^1.4.14" } }, + "node_modules/@kafkajs/confluent-schema-registry": { + "version": "3.3.0", + "resolved": "https://registry.npmjs.org/@kafkajs/confluent-schema-registry/-/confluent-schema-registry-3.3.0.tgz", + "integrity": "sha512-ImuqcHdJuJLNvfDgQwPP0EEKY4yzwGJ1t0Jyf0aAwWK2TGExXTQmeXCtcu3rdSHBEwrS7T30c6HNVxBPVA1hHQ==", + "dependencies": { + "ajv": "^7.1.0", + "avsc": ">= 5.4.13 < 6", + "mappersmith": ">= 2.30.1 < 3", + "protobufjs": "^6.10.1" + } + }, "node_modules/@mapbox/node-pre-gyp": { "version": "1.0.11", "resolved": "https://registry.npmjs.org/@mapbox/node-pre-gyp/-/node-pre-gyp-1.0.11.tgz", @@ -1221,6 +1235,60 @@ "node": ">=14" } }, + "node_modules/@protobufjs/aspromise": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/@protobufjs/aspromise/-/aspromise-1.1.2.tgz", + "integrity": "sha512-j+gKExEuLmKwvz3OgROXtrJ2UG2x8Ch2YZUxahh+s1F2HZ+wAceUNLkvy6zKCPVRkU++ZWQrdxsUeQXmcg4uoQ==" + }, + "node_modules/@protobufjs/base64": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/@protobufjs/base64/-/base64-1.1.2.tgz", + "integrity": "sha512-AZkcAA5vnN/v4PDqKyMR5lx7hZttPDgClv83E//FMNhR2TMcLUhfRUBHCmSl0oi9zMgDDqRUJkSxO3wm85+XLg==" + }, + "node_modules/@protobufjs/codegen": { + "version": "2.0.4", + "resolved": "https://registry.npmjs.org/@protobufjs/codegen/-/codegen-2.0.4.tgz", + "integrity": "sha512-YyFaikqM5sH0ziFZCN3xDC7zeGaB/d0IUb9CATugHWbd1FRFwWwt4ld4OYMPWu5a3Xe01mGAULCdqhMlPl29Jg==" + }, + "node_modules/@protobufjs/eventemitter": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@protobufjs/eventemitter/-/eventemitter-1.1.0.tgz", + "integrity": "sha512-j9ednRT81vYJ9OfVuXG6ERSTdEL1xVsNgqpkxMsbIabzSo3goCjDIveeGv5d03om39ML71RdmrGNjG5SReBP/Q==" + }, + "node_modules/@protobufjs/fetch": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@protobufjs/fetch/-/fetch-1.1.0.tgz", + "integrity": "sha512-lljVXpqXebpsijW71PZaCYeIcE5on1w5DlQy5WH6GLbFryLUrBD4932W/E2BSpfRJWseIL4v/KPgBFxDOIdKpQ==", + "dependencies": { + "@protobufjs/aspromise": "^1.1.1", + "@protobufjs/inquire": "^1.1.0" + } + }, + "node_modules/@protobufjs/float": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/@protobufjs/float/-/float-1.0.2.tgz", + "integrity": "sha512-Ddb+kVXlXst9d+R9PfTIxh1EdNkgoRe5tOX6t01f1lYWOvJnSPDBlG241QLzcyPdoNTsblLUdujGSE4RzrTZGQ==" + }, + "node_modules/@protobufjs/inquire": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@protobufjs/inquire/-/inquire-1.1.0.tgz", + "integrity": "sha512-kdSefcPdruJiFMVSbn801t4vFK7KB/5gd2fYvrxhuJYg8ILrmn9SKSX2tZdV6V+ksulWqS7aXjBcRXl3wHoD9Q==" + }, + "node_modules/@protobufjs/path": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/@protobufjs/path/-/path-1.1.2.tgz", + "integrity": "sha512-6JOcJ5Tm08dOHAbdR3GrvP+yUUfkjG5ePsHYczMFLq3ZmMkAD98cDgcT2iA1lJ9NVwFd4tH/iSSoe44YWkltEA==" + }, + "node_modules/@protobufjs/pool": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@protobufjs/pool/-/pool-1.1.0.tgz", + "integrity": "sha512-0kELaGSIDBKvcgS4zkjz1PeddatrjYcmMWOlAuAPwAeccUrPHdUqo/J6LiymHHEiJT5NrF1UVwxY14f+fy4WQw==" + }, + "node_modules/@protobufjs/utf8": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@protobufjs/utf8/-/utf8-1.1.0.tgz", + "integrity": "sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw==" + }, "node_modules/@sinclair/typebox": { "version": "0.27.8", "resolved": "https://registry.npmjs.org/@sinclair/typebox/-/typebox-0.27.8.tgz", @@ -1319,11 +1387,15 @@ "@types/istanbul-lib-report": "*" } }, + "node_modules/@types/long": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/@types/long/-/long-4.0.2.tgz", + "integrity": "sha512-MqTGEo5bj5t157U6fA/BiDynNkn0YknVdh48CMPkTSpFTVmvao5UQmm7uEF6xBEo7qIMAlY/JSleYaE6VOdpaA==" + }, "node_modules/@types/node": { "version": "20.10.7", "resolved": "https://registry.npmjs.org/@types/node/-/node-20.10.7.tgz", "integrity": "sha512-fRbIKb8C/Y2lXxB5eVMj4IU7xpdox0Lh8bUPEdtLysaylsml1hOOx1+STloRs/B9nf7C6kPRmmg/V7aQW7usNg==", - "dev": true, "dependencies": { "undici-types": "~5.26.4" } @@ -1403,6 +1475,21 @@ "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, + "node_modules/ajv": { + "version": "7.2.4", + "resolved": "https://registry.npmjs.org/ajv/-/ajv-7.2.4.tgz", + "integrity": "sha512-nBeQgg/ZZA3u3SYxyaDvpvDtgZ/EZPF547ARgZBrG9Bhu1vKDwAIjtIf+sDtJUKa2zOcEbmRLBRSyMraS/Oy1A==", + "dependencies": { + "fast-deep-equal": "^3.1.1", + "json-schema-traverse": "^1.0.0", + "require-from-string": "^2.0.2", + "uri-js": "^4.2.2" + }, + "funding": { + "type": "github", + "url": "https://github.com/sponsors/epoberezkin" + } + }, "node_modules/ansi-escapes": { "version": "4.3.2", "resolved": "https://registry.npmjs.org/ansi-escapes/-/ansi-escapes-4.3.2.tgz", @@ -1490,6 +1577,14 @@ "resolved": "https://registry.npmjs.org/async/-/async-3.2.5.tgz", "integrity": "sha512-baNZyqaaLhyLVKm/DlvdW051MSgO6b8eVfIezl9E5PqWxFgzLm/wQntEW4zOytVburDEr0JlALEpdOFwvErLsg==" }, + "node_modules/avsc": { + "version": "5.7.7", + "resolved": "https://registry.npmjs.org/avsc/-/avsc-5.7.7.tgz", + "integrity": "sha512-9cYNccliXZDByFsFliVwk5GvTq058Fj513CiR4E60ndDwmuXzTJEp/Bp8FyuRmGyYupLjHLs+JA9/CBoVS4/NQ==", + "engines": { + "node": ">=0.11" + } + }, "node_modules/babel-jest": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/babel-jest/-/babel-jest-29.7.0.tgz", @@ -1602,6 +1697,25 @@ "resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.2.tgz", "integrity": "sha512-3oSeUO0TMV67hN1AmbXsK4yaqU7tjiHlbxRDZOpH0KW9+CeX4bRAaX0Anxt0tx2MrpRpWwQaPwIlISEJhYU5Pw==" }, + "node_modules/base64-js": { + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz", + "integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ] + }, "node_modules/bindings": { "version": "1.5.0", "resolved": "https://registry.npmjs.org/bindings/-/bindings-1.5.0.tgz", @@ -1695,6 +1809,29 @@ "node-int64": "^0.4.0" } }, + "node_modules/buffer": { + "version": "5.7.1", + "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz", + "integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "dependencies": { + "base64-js": "^1.3.1", + "ieee754": "^1.1.13" + } + }, "node_modules/buffer-from": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/buffer-from/-/buffer-from-1.1.2.tgz", @@ -1992,6 +2129,11 @@ "node": ">= 8" } }, + "node_modules/cuint": { + "version": "0.2.2", + "resolved": "https://registry.npmjs.org/cuint/-/cuint-0.2.2.tgz", + "integrity": "sha512-d4ZVpCW31eWwCMe1YT3ur7mUDnTXbgwyzaL320DrcRT45rfjYxkt5QWLrmOJ+/UEAI2+fQgKe/fCjR8l4TpRgw==" + }, "node_modules/debug": { "version": "2.6.9", "resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz", @@ -2267,6 +2409,11 @@ "node": ">= 0.10.0" } }, + "node_modules/fast-deep-equal": { + "version": "3.1.3", + "resolved": "https://registry.npmjs.org/fast-deep-equal/-/fast-deep-equal-3.1.3.tgz", + "integrity": "sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==" + }, "node_modules/fast-json-stable-stringify": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/fast-json-stable-stringify/-/fast-json-stable-stringify-2.1.0.tgz", @@ -2663,6 +2810,25 @@ "node": ">=0.10.0" } }, + "node_modules/ieee754": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz", + "integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ] + }, "node_modules/import-local": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/import-local/-/import-local-3.1.0.tgz", @@ -3594,6 +3760,11 @@ "integrity": "sha512-xyFwyhro/JEof6Ghe2iz2NcXoj2sloNsWr/XsERDK/oiPCfaNhl5ONfp+jQdAZRQQ0IJWNzH9zIZF7li91kh2w==", "dev": true }, + "node_modules/json-schema-traverse": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-1.0.0.tgz", + "integrity": "sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug==" + }, "node_modules/json5": { "version": "2.2.3", "resolved": "https://registry.npmjs.org/json5/-/json5-2.2.3.tgz", @@ -3622,6 +3793,17 @@ "node": ">=14.0.0" } }, + "node_modules/kafkajs-lz4": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/kafkajs-lz4/-/kafkajs-lz4-1.2.1.tgz", + "integrity": "sha512-+dGdRcVtHXEYmuwyPFuO0g7kTf+L97EPdm+wMk1h5jmHW/39NVWRaaw2WIeXuCRW7m9LnM3VzyvmSpqf6qZPfA==", + "dependencies": { + "lz4": "~0.6.0" + }, + "engines": { + "node": ">=8.0.0" + } + }, "node_modules/kleur": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/kleur/-/kleur-3.0.3.tgz", @@ -3704,6 +3886,11 @@ "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==" }, + "node_modules/long": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/long/-/long-4.0.0.tgz", + "integrity": "sha512-XsP+KhQif4bjX1kbuSiySJFNAehNxgLb6hPRGJ9QsUr8ajHkuXGdrHmFUTUUXhDwVX2R5bY4JNZEwbUiMhV+MA==" + }, "node_modules/lru-cache": { "version": "5.1.1", "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-5.1.1.tgz", @@ -3713,6 +3900,21 @@ "yallist": "^3.0.2" } }, + "node_modules/lz4": { + "version": "0.6.5", + "resolved": "https://registry.npmjs.org/lz4/-/lz4-0.6.5.tgz", + "integrity": "sha512-KSZcJU49QZOlJSItaeIU3p8WoAvkTmD9fJqeahQXNu1iQ/kR0/mQLdbrK8JY9MY8f6AhJoMrihp1nu1xDbscSQ==", + "hasInstallScript": true, + "dependencies": { + "buffer": "^5.2.1", + "cuint": "^0.2.2", + "nan": "^2.13.2", + "xxhashjs": "^0.2.2" + }, + "engines": { + "node": ">= 0.10" + } + }, "node_modules/make-dir": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/make-dir/-/make-dir-4.0.0.tgz", @@ -3770,6 +3972,11 @@ "tmpl": "1.0.5" } }, + "node_modules/mappersmith": { + "version": "2.43.4", + "resolved": "https://registry.npmjs.org/mappersmith/-/mappersmith-2.43.4.tgz", + "integrity": "sha512-IyUw53aE3/SPH3eOkqSuD+Hcstpcl4dpxDDgZsPz65R2SlOikq0VHxo3kMPzUVvw7cCHunTmlpNXl5n/KzPcpg==" + }, "node_modules/media-typer": { "version": "0.3.0", "resolved": "https://registry.npmjs.org/media-typer/-/media-typer-0.3.0.tgz", @@ -3920,6 +4127,11 @@ "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", "integrity": "sha512-Tpp60P6IUJDTuOq/5Z8cdskzJujfwqfOTkrwIwj7IRISpnkJnT6SyJ4PCPnGMoFjC9ddhal5KVIYtAt97ix05A==" }, + "node_modules/nan": { + "version": "2.19.0", + "resolved": "https://registry.npmjs.org/nan/-/nan-2.19.0.tgz", + "integrity": "sha512-nO1xXxfh/RWNxfd/XPfbIfFk5vgLsAxUR9y5O0cHMJu/AW9U95JLXqthYHjEp+8gQ5p96K9jUp8nbVOxCdRbtw==" + }, "node_modules/natural-compare": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/natural-compare/-/natural-compare-1.4.0.tgz", @@ -4266,6 +4478,31 @@ "node": ">= 6" } }, + "node_modules/protobufjs": { + "version": "6.11.4", + "resolved": "https://registry.npmjs.org/protobufjs/-/protobufjs-6.11.4.tgz", + "integrity": "sha512-5kQWPaJHi1WoCpjTGszzQ32PG2F4+wRY6BmAT4Vfw56Q2FZ4YZzK20xUYQH4YkfehY1e6QSICrJquM6xXZNcrw==", + "hasInstallScript": true, + "dependencies": { + "@protobufjs/aspromise": "^1.1.2", + "@protobufjs/base64": "^1.1.2", + "@protobufjs/codegen": "^2.0.4", + "@protobufjs/eventemitter": "^1.1.0", + "@protobufjs/fetch": "^1.1.0", + "@protobufjs/float": "^1.0.2", + "@protobufjs/inquire": "^1.1.0", + "@protobufjs/path": "^1.1.2", + "@protobufjs/pool": "^1.1.0", + "@protobufjs/utf8": "^1.1.0", + "@types/long": "^4.0.1", + "@types/node": ">=13.7.0", + "long": "^4.0.0" + }, + "bin": { + "pbjs": "bin/pbjs", + "pbts": "bin/pbts" + } + }, "node_modules/proxy-addr": { "version": "2.0.7", "resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.7.tgz", @@ -4292,6 +4529,14 @@ "node": ">=10.16.0" } }, + "node_modules/punycode": { + "version": "2.3.1", + "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.3.1.tgz", + "integrity": "sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg==", + "engines": { + "node": ">=6" + } + }, "node_modules/pure-rand": { "version": "6.0.4", "resolved": "https://registry.npmjs.org/pure-rand/-/pure-rand-6.0.4.tgz", @@ -4385,6 +4630,14 @@ "node": ">=0.10.0" } }, + "node_modules/require-from-string": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/require-from-string/-/require-from-string-2.0.2.tgz", + "integrity": "sha512-Xf0nWe6RseziFMu+Ap9biiUbmplq6S9/p+7w7YXP/JBHhrUDDUhwa+vANyubuqfZWTveU//DYVGsDG7RKL/vEw==", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/resolve": { "version": "1.22.8", "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.22.8.tgz", @@ -4954,8 +5207,7 @@ "node_modules/undici-types": { "version": "5.26.5", "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz", - "integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==", - "dev": true + "integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==" }, "node_modules/unpipe": { "version": "1.0.0", @@ -4995,6 +5247,14 @@ "browserslist": ">= 4.21.0" } }, + "node_modules/uri-js": { + "version": "4.4.1", + "resolved": "https://registry.npmjs.org/uri-js/-/uri-js-4.4.1.tgz", + "integrity": "sha512-7rKUyy33Q1yc98pQ1DAmLtwX109F7TIfWlW1Ydo8Wl1ii1SeHieeh0HHfPeL2fMXK6z0s8ecKs9frCuLJvndBg==", + "dependencies": { + "punycode": "^2.1.0" + } + }, "node_modules/util-deprecate": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", @@ -5162,6 +5422,14 @@ "wtfnode": "proxy.js" } }, + "node_modules/xxhashjs": { + "version": "0.2.2", + "resolved": "https://registry.npmjs.org/xxhashjs/-/xxhashjs-0.2.2.tgz", + "integrity": "sha512-AkTuIuVTET12tpsVIQo+ZU6f/qDmKuRUcjaqR+OIvm+aCBsZ95i7UVY5WJ9TMsSaZ0DA2WxoZ4acu0sPH+OKAw==", + "dependencies": { + "cuint": "^0.2.2" + } + }, "node_modules/y18n": { "version": "5.0.8", "resolved": "https://registry.npmjs.org/y18n/-/y18n-5.0.8.tgz", diff --git a/package.json b/package.json index 7191de8..4676b1a 100644 --- a/package.json +++ b/package.json @@ -28,11 +28,14 @@ ], "author": "Geoff Hendrey , Sergey Sergeev ", "dependencies": { + "@kafkajs/confluent-schema-registry": "^3.3.0", "@opentelemetry/api": "^1.8.0", "@opentelemetry/sdk-metrics-base": "^0.31.0", + "avsc": "^5.7.7", "body-parser": "^1.20.2", "express": "^4.19.2", "kafkajs": "^2.2.4", + "kafkajs-lz4": "^1.2.1", "pulsar-client": "^1.9.0", "stated-js": "^0.1.20" }, diff --git a/src/workflow/StatedWorkflow.js b/src/workflow/StatedWorkflow.js index 4ad79a6..ee505e7 100644 --- a/src/workflow/StatedWorkflow.js +++ b/src/workflow/StatedWorkflow.js @@ -17,7 +17,6 @@ import StatedREPL from "stated-js/dist/src/StatedREPL.js"; import {WorkflowMetrics} from "./WorkflowMeters.js"; import express from 'express'; import Pulsar from 'pulsar-client'; -import {Kafka, logLevel} from 'kafkajs'; import winston from "winston"; import {WorkflowDispatcher} from "./WorkflowDispatcher.js"; import Step from "./Step.js"; @@ -26,6 +25,13 @@ import {WorkflowPersistence} from "./WorkflowPersistence.js"; import {Delay} from "../test/TestTools.js" import {Snapshot} from "./Snapshot.js"; import {PulsarClientMock} from "../test/PulsarMock.js"; +import {SchemaRegistry} from "@kafkajs/confluent-schema-registry"; +import LZ4 from "kafkajs-lz4"; + +// workaround for kafkajs issue +import kafkaPkg from 'kafkajs'; +const {Kafka, KafkaConfig, CompressionTypes, CompressionCodecs, logLevel} = kafkaPkg; + //This class is a wrapper around the TemplateProcessor class that provides workflow functionality @@ -385,6 +391,67 @@ export class StatedWorkflow { })(); } + // this function provide access to COP CloudEvent sources when deployed as a Zodiac function + subscribeCOPKafka(subscriptionParams) { + const {type, initialOffset = 'earliest', maxConsume = -1} = subscriptionParams; + + // TODO: - validate + const kafkaParams = subscriptionParams.client.params; + kafkaParams.sasl.password = process.env.KAFKA_SASL_PASSWORD; + + CompressionCodecs[CompressionTypes.LZ4] = new LZ4().codec; + this.kafkaClient = new Kafka(kafkaParams); + + // TODO: SCHEMA_REGISTRY_URL should be coming form the zodiac function + const registry = new SchemaRegistry({ host: subscriptionParams.client.schemaRegistryUrl }); + + // TODO: parameterize + const defaultConsumerGroup = 'alerting.sys.stated-workflow'; + + // TODO: merge the default client params with the allowed client params from the subscriptionParams + const consumer = this.kafkaClient.consumer({ groupId: defaultConsumerGroup }); + + (async () => {try { + await consumer.connect(); + await consumer.subscribe({ topic: type, fromBeginning: true }); + + } catch (e) { + this.logger.debug(`Kafka subscriber - failed, e: ${e}`); + } + this.logger.debug(`Kafka subscriber - subscribed.`); + + this.consumers.set(type, consumer); + let countdown = maxConsume; + + // main consumer processor + await consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + let data; + this.logger.debug(`Kafka subscriber got ${message} from ${topic}:${partition}.`); + try { + data = await registry.decode(message.value); + } catch (error) { + console.error("Unable to parse data to JSON:", error); + } + const ackFunction = async (data2ack) => { + console.log(`acknowledging data: ${StatedREPL.stringify(data)} with data2ack: ${StatedREPL.stringify(data2ack)}`); + // TODO: make the below code working + // const currentOffset = this.templateProcessor.output(subscribeParamsJsonPointer + 'offset',); + // if (currentOffset < message.offset + 1) { + // await consumer.commitOffsets([{ topic, partition, offset: message.offset + 1 }]); + // this.templateProcessor.setData(subscribeParamsJsonPointer + 'offset', message.offset + 1; + // } + } + await this.workflowDispatcher.dispatchToAllSubscribers(type, data, ackFunction); + + if (countdown && --countdown === 0) { + // Disconnect the consumer if maxConsume messages have been processed + await consumer.disconnect(); + } + } + })})(); + } + async subscribeKafka(subscriptionParams) { const { type, initialOffset = 'earliest', maxConsume = -1 } = subscriptionParams; this.logger.debug(`Kafka subscribe params ${StatedREPL.stringify(subscriptionParams)} with clientParams ${StatedREPL.stringify(clientParams)}`); @@ -471,6 +538,7 @@ export class StatedWorkflow { await dispatcher.drainBatch(); // in test mode we wanna actually wait for all the test events to process return; } + // clientType test means that the data will be sent directly from publish function to the dispatcher if(clientType==='test'){ this.logger.debug(`No 'real' subscription created because client.type='test' set for subscription params ${StatedREPL.stringify(subscriptionParams)}`); @@ -484,6 +552,9 @@ export class StatedWorkflow { }; // validates that we have a dispatcher created for this subscriptionParams. this.workflowDispatcher.getDispatcher(subscriptionParams, testDataAckFunctionGenerator); + }else if (clientType === 'cop') { + this.logger.debug(`subscribing to cop cloud event sources ${clientParams}`) + this.subscribeCOPKafka(subscriptionParams); }else if (clientType === 'kafka') { this.logger.debug(`subscribing to kafka using ${clientParams}`) this.createKafkaClient(clientParams); diff --git a/yarn.lock b/yarn.lock index 9eef294..2ebebc0 100644 --- a/yarn.lock +++ b/yarn.lock @@ -23,7 +23,7 @@ resolved "https://registry.npmjs.org/@babel/compat-data/-/compat-data-7.23.5.tgz" integrity sha512-uU27kfDRlhfKl+w1U6vp16IuvSLtjAxdArVXPa9BvLkrr7CYIsxH5adpHObeAGY/41+syctUWOZ140a2Rvkgjw== -"@babel/core@^7.0.0", "@babel/core@^7.0.0-0", "@babel/core@^7.11.6", "@babel/core@^7.12.3", "@babel/core@^7.8.0": +"@babel/core@^7.11.6", "@babel/core@^7.12.3": version "7.23.7" resolved "https://registry.npmjs.org/@babel/core/-/core-7.23.7.tgz" integrity sha512-+UpDgowcmqe36d4NwqvKsyPMlOLNGMsfMmQ5WGCu+siCe3t3dfe9njrzGfdN4qq+bcNUt0+Vw6haRxBOycs4dw== @@ -297,7 +297,7 @@ resolved "https://registry.npmjs.org/@bcoe/v8-coverage/-/v8-coverage-0.2.3.tgz" integrity sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw== -"@colors/colors@^1.6.0", "@colors/colors@1.6.0": +"@colors/colors@1.6.0", "@colors/colors@^1.6.0": version "1.6.0" resolved "https://registry.npmjs.org/@colors/colors/-/colors-1.6.0.tgz" integrity sha512-Ir+AOibqzrIsL6ajt3Rz3LskB7OiMVHqltZmspbW/TJuTVuyOMirVqAkjfY6JISiLHgyNqicAC8AyHHGzNd/dA== @@ -551,6 +551,16 @@ "@jridgewell/resolve-uri" "^3.1.0" "@jridgewell/sourcemap-codec" "^1.4.14" +"@kafkajs/confluent-schema-registry@^3.3.0": + version "3.3.0" + resolved "https://registry.npmjs.org/@kafkajs/confluent-schema-registry/-/confluent-schema-registry-3.3.0.tgz" + integrity sha512-ImuqcHdJuJLNvfDgQwPP0EEKY4yzwGJ1t0Jyf0aAwWK2TGExXTQmeXCtcu3rdSHBEwrS7T30c6HNVxBPVA1hHQ== + dependencies: + ajv "^7.1.0" + avsc ">= 5.4.13 < 6" + mappersmith ">= 2.30.1 < 3" + protobufjs "^6.10.1" + "@mapbox/node-pre-gyp@^1.0.9": version "1.0.11" resolved "https://registry.npmjs.org/@mapbox/node-pre-gyp/-/node-pre-gyp-1.0.11.tgz" @@ -573,7 +583,7 @@ dependencies: "@opentelemetry/api" "^1.0.0" -"@opentelemetry/api@^1.0.0", "@opentelemetry/api@^1.8.0", "@opentelemetry/api@>=1.0.0 <1.2.0": +"@opentelemetry/api@^1.0.0", "@opentelemetry/api@^1.8.0": version "1.8.0" resolved "https://registry.npmjs.org/@opentelemetry/api/-/api-1.8.0.tgz" integrity sha512-I/s6F7yKUDdtMsoBWXJe8Qz40Tui5vsuKCWJEWVL+5q9sSWRzzx6v2KeNsOBEwd94j0eWkpWCH4yB6rZg9Mf0w== @@ -608,6 +618,59 @@ resolved "https://registry.npmjs.org/@opentelemetry/semantic-conventions/-/semantic-conventions-1.5.0.tgz" integrity sha512-wlYG/U6ddW1ilXslnDLLQYJ8nd97W8JJTTfwkGhubx6dzW6SUkd+N4/MzTjjyZlrHQunxHtkHFvVpUKiROvFDw== +"@protobufjs/aspromise@^1.1.1", "@protobufjs/aspromise@^1.1.2": + version "1.1.2" + resolved "https://registry.npmjs.org/@protobufjs/aspromise/-/aspromise-1.1.2.tgz" + integrity sha512-j+gKExEuLmKwvz3OgROXtrJ2UG2x8Ch2YZUxahh+s1F2HZ+wAceUNLkvy6zKCPVRkU++ZWQrdxsUeQXmcg4uoQ== + +"@protobufjs/base64@^1.1.2": + version "1.1.2" + resolved "https://registry.npmjs.org/@protobufjs/base64/-/base64-1.1.2.tgz" + integrity sha512-AZkcAA5vnN/v4PDqKyMR5lx7hZttPDgClv83E//FMNhR2TMcLUhfRUBHCmSl0oi9zMgDDqRUJkSxO3wm85+XLg== + +"@protobufjs/codegen@^2.0.4": + version "2.0.4" + resolved "https://registry.npmjs.org/@protobufjs/codegen/-/codegen-2.0.4.tgz" + integrity sha512-YyFaikqM5sH0ziFZCN3xDC7zeGaB/d0IUb9CATugHWbd1FRFwWwt4ld4OYMPWu5a3Xe01mGAULCdqhMlPl29Jg== + +"@protobufjs/eventemitter@^1.1.0": + version "1.1.0" + resolved "https://registry.npmjs.org/@protobufjs/eventemitter/-/eventemitter-1.1.0.tgz" + integrity sha512-j9ednRT81vYJ9OfVuXG6ERSTdEL1xVsNgqpkxMsbIabzSo3goCjDIveeGv5d03om39ML71RdmrGNjG5SReBP/Q== + +"@protobufjs/fetch@^1.1.0": + version "1.1.0" + resolved "https://registry.npmjs.org/@protobufjs/fetch/-/fetch-1.1.0.tgz" + integrity sha512-lljVXpqXebpsijW71PZaCYeIcE5on1w5DlQy5WH6GLbFryLUrBD4932W/E2BSpfRJWseIL4v/KPgBFxDOIdKpQ== + dependencies: + "@protobufjs/aspromise" "^1.1.1" + "@protobufjs/inquire" "^1.1.0" + +"@protobufjs/float@^1.0.2": + version "1.0.2" + resolved "https://registry.npmjs.org/@protobufjs/float/-/float-1.0.2.tgz" + integrity sha512-Ddb+kVXlXst9d+R9PfTIxh1EdNkgoRe5tOX6t01f1lYWOvJnSPDBlG241QLzcyPdoNTsblLUdujGSE4RzrTZGQ== + +"@protobufjs/inquire@^1.1.0": + version "1.1.0" + resolved "https://registry.npmjs.org/@protobufjs/inquire/-/inquire-1.1.0.tgz" + integrity sha512-kdSefcPdruJiFMVSbn801t4vFK7KB/5gd2fYvrxhuJYg8ILrmn9SKSX2tZdV6V+ksulWqS7aXjBcRXl3wHoD9Q== + +"@protobufjs/path@^1.1.2": + version "1.1.2" + resolved "https://registry.npmjs.org/@protobufjs/path/-/path-1.1.2.tgz" + integrity sha512-6JOcJ5Tm08dOHAbdR3GrvP+yUUfkjG5ePsHYczMFLq3ZmMkAD98cDgcT2iA1lJ9NVwFd4tH/iSSoe44YWkltEA== + +"@protobufjs/pool@^1.1.0": + version "1.1.0" + resolved "https://registry.npmjs.org/@protobufjs/pool/-/pool-1.1.0.tgz" + integrity sha512-0kELaGSIDBKvcgS4zkjz1PeddatrjYcmMWOlAuAPwAeccUrPHdUqo/J6LiymHHEiJT5NrF1UVwxY14f+fy4WQw== + +"@protobufjs/utf8@^1.1.0": + version "1.1.0" + resolved "https://registry.npmjs.org/@protobufjs/utf8/-/utf8-1.1.0.tgz" + integrity sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw== + "@sinclair/typebox@^0.27.8": version "0.27.8" resolved "https://registry.npmjs.org/@sinclair/typebox/-/typebox-0.27.8.tgz" @@ -686,7 +749,12 @@ dependencies: "@types/istanbul-lib-report" "*" -"@types/node@*": +"@types/long@^4.0.1": + version "4.0.2" + resolved "https://registry.npmjs.org/@types/long/-/long-4.0.2.tgz" + integrity sha512-MqTGEo5bj5t157U6fA/BiDynNkn0YknVdh48CMPkTSpFTVmvao5UQmm7uEF6xBEo7qIMAlY/JSleYaE6VOdpaA== + +"@types/node@*", "@types/node@>=13.7.0": version "20.10.7" resolved "https://registry.npmjs.org/@types/node/-/node-20.10.7.tgz" integrity sha512-fRbIKb8C/Y2lXxB5eVMj4IU7xpdox0Lh8bUPEdtLysaylsml1hOOx1+STloRs/B9nf7C6kPRmmg/V7aQW7usNg== @@ -735,6 +803,16 @@ agent-base@6: dependencies: debug "4" +ajv@^7.1.0: + version "7.2.4" + resolved "https://registry.npmjs.org/ajv/-/ajv-7.2.4.tgz" + integrity sha512-nBeQgg/ZZA3u3SYxyaDvpvDtgZ/EZPF547ARgZBrG9Bhu1vKDwAIjtIf+sDtJUKa2zOcEbmRLBRSyMraS/Oy1A== + dependencies: + fast-deep-equal "^3.1.1" + json-schema-traverse "^1.0.0" + require-from-string "^2.0.2" + uri-js "^4.2.2" + ansi-escapes@^4.2.1: version "4.3.2" resolved "https://registry.npmjs.org/ansi-escapes/-/ansi-escapes-4.3.2.tgz" @@ -809,6 +887,11 @@ async@^3.2.3: resolved "https://registry.npmjs.org/async/-/async-3.2.5.tgz" integrity sha512-baNZyqaaLhyLVKm/DlvdW051MSgO6b8eVfIezl9E5PqWxFgzLm/wQntEW4zOytVburDEr0JlALEpdOFwvErLsg== +"avsc@>= 5.4.13 < 6", avsc@^5.7.7: + version "5.7.7" + resolved "https://registry.npmjs.org/avsc/-/avsc-5.7.7.tgz" + integrity sha512-9cYNccliXZDByFsFliVwk5GvTq058Fj513CiR4E60ndDwmuXzTJEp/Bp8FyuRmGyYupLjHLs+JA9/CBoVS4/NQ== + babel-jest@^29.7.0: version "29.7.0" resolved "https://registry.npmjs.org/babel-jest/-/babel-jest-29.7.0.tgz" @@ -874,6 +957,11 @@ balanced-match@^1.0.0: resolved "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.2.tgz" integrity sha512-3oSeUO0TMV67hN1AmbXsK4yaqU7tjiHlbxRDZOpH0KW9+CeX4bRAaX0Anxt0tx2MrpRpWwQaPwIlISEJhYU5Pw== +base64-js@^1.3.1: + version "1.5.1" + resolved "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz" + integrity sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA== + bindings@^1.5.0: version "1.5.0" resolved "https://registry.npmjs.org/bindings/-/bindings-1.5.0.tgz" @@ -881,7 +969,7 @@ bindings@^1.5.0: dependencies: file-uri-to-path "1.0.0" -body-parser@^1.20.2, body-parser@1.20.2: +body-parser@1.20.2, body-parser@^1.20.2: version "1.20.2" resolved "https://registry.npmjs.org/body-parser/-/body-parser-1.20.2.tgz" integrity sha512-ml9pReCu3M61kGlqoTm2umSXTlRTuGTx0bfYj+uIUKKYycG5NtSbeetV3faSU6R7ajOPw0g/J1PvK4qNy7s5bA== @@ -914,7 +1002,7 @@ braces@^3.0.2: dependencies: fill-range "^7.0.1" -browserslist@^4.22.2, "browserslist@>= 4.21.0": +browserslist@^4.22.2: version "4.22.2" resolved "https://registry.npmjs.org/browserslist/-/browserslist-4.22.2.tgz" integrity sha512-0UgcrvQmBDvZHFGdYUehrCNIazki7/lUP3kkoi/r3YB2amZbFM9J43ZRkJTXBUZK4gmx56+Sqk9+Vs9mwZx9+A== @@ -936,6 +1024,14 @@ buffer-from@^1.0.0: resolved "https://registry.npmjs.org/buffer-from/-/buffer-from-1.1.2.tgz" integrity sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ== +buffer@^5.2.1: + version "5.7.1" + resolved "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz" + integrity sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ== + dependencies: + base64-js "^1.3.1" + ieee754 "^1.1.13" + bytes@3.1.2: version "3.1.2" resolved "https://registry.npmjs.org/bytes/-/bytes-3.1.2.tgz" @@ -970,16 +1066,7 @@ caniuse-lite@^1.0.30001565: resolved "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001576.tgz" integrity sha512-ff5BdakGe2P3SQsMsiqmt1Lc8221NR1VzHj5jXN5vBny9A6fpze94HiVV/n7XRosOlsShJcvMv5mdnpjOGCEgg== -chalk@^2.4.1: - version "2.4.2" - resolved "https://registry.npmjs.org/chalk/-/chalk-2.4.2.tgz" - integrity sha512-Mti+f9lpJNcwF4tWV8/OrTTtF1gZi+f8FqlyAdouralcFWFQWF2+NgCHShjkCb+IFBLq9buZwE1xckQU4peSuQ== - dependencies: - ansi-styles "^3.2.1" - escape-string-regexp "^1.0.5" - supports-color "^5.3.0" - -chalk@^2.4.2: +chalk@^2.4.1, chalk@^2.4.2: version "2.4.2" resolved "https://registry.npmjs.org/chalk/-/chalk-2.4.2.tgz" integrity sha512-Mti+f9lpJNcwF4tWV8/OrTTtF1gZi+f8FqlyAdouralcFWFQWF2+NgCHShjkCb+IFBLq9buZwE1xckQU4peSuQ== @@ -1040,14 +1127,7 @@ collect-v8-coverage@^1.0.0: resolved "https://registry.npmjs.org/collect-v8-coverage/-/collect-v8-coverage-1.0.2.tgz" integrity sha512-lHl4d5/ONEbLlJvaJNtsF/Lz+WvB07u2ycqTYbdrq7UypDXailES4valYb2eWiJFxZlVmpGekfqoxQhzyFdT4Q== -color-convert@^1.9.0: - version "1.9.3" - resolved "https://registry.npmjs.org/color-convert/-/color-convert-1.9.3.tgz" - integrity sha512-QfAUtd+vFdAtFQcC8CCyYt1fYWxSqAiK2cSD6zDB8N3cpsEBAvRxp9zOGg6G/SHHJYAT88/az/IuDGALsNVbGg== - dependencies: - color-name "1.1.3" - -color-convert@^1.9.3: +color-convert@^1.9.0, color-convert@^1.9.3: version "1.9.3" resolved "https://registry.npmjs.org/color-convert/-/color-convert-1.9.3.tgz" integrity sha512-QfAUtd+vFdAtFQcC8CCyYt1fYWxSqAiK2cSD6zDB8N3cpsEBAvRxp9zOGg6G/SHHJYAT88/az/IuDGALsNVbGg== @@ -1061,16 +1141,16 @@ color-convert@^2.0.1: dependencies: color-name "~1.1.4" -color-name@^1.0.0, color-name@~1.1.4: - version "1.1.4" - resolved "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz" - integrity sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA== - color-name@1.1.3: version "1.1.3" resolved "https://registry.npmjs.org/color-name/-/color-name-1.1.3.tgz" integrity sha512-72fSenhMw2HZMTVHeCA9KCmpEIbzWiQsjN+BHcBbS9vr1mtt+vJjPdksIBNUmKAW8TFUDPJK5SUU3QhE9NEXDw== +color-name@^1.0.0, color-name@~1.1.4: + version "1.1.4" + resolved "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz" + integrity sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA== + color-string@^1.6.0: version "1.9.1" resolved "https://registry.npmjs.org/color-string/-/color-string-1.9.1.tgz" @@ -1159,26 +1239,10 @@ cross-spawn@^7.0.3: shebang-command "^2.0.0" which "^2.0.1" -debug@^4.1.0: - version "4.3.4" - resolved "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz" - integrity sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ== - dependencies: - ms "2.1.2" - -debug@^4.1.1: - version "4.3.4" - resolved "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz" - integrity sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ== - dependencies: - ms "2.1.2" - -debug@^4.3.1: - version "4.3.4" - resolved "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz" - integrity sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ== - dependencies: - ms "2.1.2" +cuint@^0.2.2: + version "0.2.2" + resolved "https://registry.npmjs.org/cuint/-/cuint-0.2.2.tgz" + integrity sha512-d4ZVpCW31eWwCMe1YT3ur7mUDnTXbgwyzaL320DrcRT45rfjYxkt5QWLrmOJ+/UEAI2+fQgKe/fCjR8l4TpRgw== debug@2.6.9: version "2.6.9" @@ -1187,7 +1251,7 @@ debug@2.6.9: dependencies: ms "2.0.0" -debug@4: +debug@4, debug@^4.1.0, debug@^4.1.1, debug@^4.3.1: version "4.3.4" resolved "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz" integrity sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ== @@ -1378,6 +1442,11 @@ express@^4.19.2: utils-merge "1.0.1" vary "~1.1.2" +fast-deep-equal@^3.1.1: + version "3.1.3" + resolved "https://registry.npmjs.org/fast-deep-equal/-/fast-deep-equal-3.1.3.tgz" + integrity sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q== + fast-json-stable-stringify@^2.1.0: version "2.1.0" resolved "https://registry.npmjs.org/fast-json-stable-stringify/-/fast-json-stable-stringify-2.1.0.tgz" @@ -1619,6 +1688,11 @@ iconv-lite@0.4.24: dependencies: safer-buffer ">= 2.1.2 < 3" +ieee754@^1.1.13: + version "1.2.1" + resolved "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz" + integrity sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA== + import-local@^3.0.2: version "3.1.0" resolved "https://registry.npmjs.org/import-local/-/import-local-3.1.0.tgz" @@ -1640,7 +1714,7 @@ inflight@^1.0.4: once "^1.3.0" wrappy "1" -inherits@^2.0.3, inherits@2, inherits@2.0.4: +inherits@2, inherits@2.0.4, inherits@^2.0.3: version "2.0.4" resolved "https://registry.npmjs.org/inherits/-/inherits-2.0.4.tgz" integrity sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ== @@ -1949,7 +2023,7 @@ jest-resolve-dependencies@^29.7.0: jest-regex-util "^29.6.3" jest-snapshot "^29.7.0" -jest-resolve@*, jest-resolve@^29.7.0: +jest-resolve@^29.7.0: version "29.7.0" resolved "https://registry.npmjs.org/jest-resolve/-/jest-resolve-29.7.0.tgz" integrity sha512-IOVhZSrg+UvVAshDSDtHyFCCBUl/Q3AAJv8iZ6ZjnZ74xzvwuzLXid9IIIPgTnY62SJjfuupMKZsZQRsCvxEgA== @@ -2141,6 +2215,11 @@ json-parse-even-better-errors@^2.3.0: resolved "https://registry.npmjs.org/json-parse-even-better-errors/-/json-parse-even-better-errors-2.3.1.tgz" integrity sha512-xyFwyhro/JEof6Ghe2iz2NcXoj2sloNsWr/XsERDK/oiPCfaNhl5ONfp+jQdAZRQQ0IJWNzH9zIZF7li91kh2w== +json-schema-traverse@^1.0.0: + version "1.0.0" + resolved "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-1.0.0.tgz" + integrity sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug== + json5@^2.2.3: version "2.2.3" resolved "https://registry.npmjs.org/json5/-/json5-2.2.3.tgz" @@ -2151,6 +2230,13 @@ jsonata@^2.0.3: resolved "https://registry.npmjs.org/jsonata/-/jsonata-2.0.4.tgz" integrity sha512-vfavX4/G/yrYxE+UrmT/oUJ3ph7KqUrb0R7b0LVRcntQwxw+Z5kA1pNUIQzX5hF04Oe1eKxyoIPsmXtc2LgJTQ== +kafkajs-lz4@^1.2.1: + version "1.2.1" + resolved "https://registry.npmjs.org/kafkajs-lz4/-/kafkajs-lz4-1.2.1.tgz" + integrity sha512-+dGdRcVtHXEYmuwyPFuO0g7kTf+L97EPdm+wMk1h5jmHW/39NVWRaaw2WIeXuCRW7m9LnM3VzyvmSpqf6qZPfA== + dependencies: + lz4 "~0.6.0" + kafkajs@^2.2.4: version "2.2.4" resolved "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz" @@ -2215,6 +2301,11 @@ logform@^2.3.2, logform@^2.4.0: safe-stable-stringify "^2.3.1" triple-beam "^1.3.0" +long@^4.0.0: + version "4.0.0" + resolved "https://registry.npmjs.org/long/-/long-4.0.0.tgz" + integrity sha512-XsP+KhQif4bjX1kbuSiySJFNAehNxgLb6hPRGJ9QsUr8ajHkuXGdrHmFUTUUXhDwVX2R5bY4JNZEwbUiMhV+MA== + lru-cache@^5.1.1: version "5.1.1" resolved "https://registry.npmjs.org/lru-cache/-/lru-cache-5.1.1.tgz" @@ -2229,6 +2320,16 @@ lru-cache@^6.0.0: dependencies: yallist "^4.0.0" +lz4@~0.6.0: + version "0.6.5" + resolved "https://registry.npmjs.org/lz4/-/lz4-0.6.5.tgz" + integrity sha512-KSZcJU49QZOlJSItaeIU3p8WoAvkTmD9fJqeahQXNu1iQ/kR0/mQLdbrK8JY9MY8f6AhJoMrihp1nu1xDbscSQ== + dependencies: + buffer "^5.2.1" + cuint "^0.2.2" + nan "^2.13.2" + xxhashjs "^0.2.2" + make-dir@^3.1.0: version "3.1.0" resolved "https://registry.npmjs.org/make-dir/-/make-dir-3.1.0.tgz" @@ -2250,6 +2351,11 @@ makeerror@1.0.12: dependencies: tmpl "1.0.5" +"mappersmith@>= 2.30.1 < 3": + version "2.43.4" + resolved "https://registry.npmjs.org/mappersmith/-/mappersmith-2.43.4.tgz" + integrity sha512-IyUw53aE3/SPH3eOkqSuD+Hcstpcl4dpxDDgZsPz65R2SlOikq0VHxo3kMPzUVvw7cCHunTmlpNXl5n/KzPcpg== + media-typer@0.3.0: version "0.3.0" resolved "https://registry.npmjs.org/media-typer/-/media-typer-0.3.0.tgz" @@ -2337,11 +2443,6 @@ mkdirp@^1.0.3: resolved "https://registry.npmjs.org/mkdirp/-/mkdirp-1.0.4.tgz" integrity sha512-vVqVZQyf3WLx2Shd0qJ9xuvqgAyKPLAiqITEtqW0oIUjzo3PePDd6fW9iFz30ef7Ysp/oiWqbhszeGWW2T6Gzw== -ms@^2.1.1: - version "2.1.3" - resolved "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz" - integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA== - ms@2.0.0: version "2.0.0" resolved "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz" @@ -2352,11 +2453,16 @@ ms@2.1.2: resolved "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz" integrity sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w== -ms@2.1.3: +ms@2.1.3, ms@^2.1.1: version "2.1.3" resolved "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz" integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA== +nan@^2.13.2: + version "2.19.0" + resolved "https://registry.npmjs.org/nan/-/nan-2.19.0.tgz" + integrity sha512-nO1xXxfh/RWNxfd/XPfbIfFk5vgLsAxUR9y5O0cHMJu/AW9U95JLXqthYHjEp+8gQ5p96K9jUp8nbVOxCdRbtw== + natural-compare@^1.4.0: version "1.4.0" resolved "https://registry.npmjs.org/natural-compare/-/natural-compare-1.4.0.tgz" @@ -2561,6 +2667,25 @@ prompts@^2.0.1: kleur "^3.0.3" sisteransi "^1.0.5" +protobufjs@^6.10.1: + version "6.11.4" + resolved "https://registry.npmjs.org/protobufjs/-/protobufjs-6.11.4.tgz" + integrity sha512-5kQWPaJHi1WoCpjTGszzQ32PG2F4+wRY6BmAT4Vfw56Q2FZ4YZzK20xUYQH4YkfehY1e6QSICrJquM6xXZNcrw== + dependencies: + "@protobufjs/aspromise" "^1.1.2" + "@protobufjs/base64" "^1.1.2" + "@protobufjs/codegen" "^2.0.4" + "@protobufjs/eventemitter" "^1.1.0" + "@protobufjs/fetch" "^1.1.0" + "@protobufjs/float" "^1.0.2" + "@protobufjs/inquire" "^1.1.0" + "@protobufjs/path" "^1.1.2" + "@protobufjs/pool" "^1.1.0" + "@protobufjs/utf8" "^1.1.0" + "@types/long" "^4.0.1" + "@types/node" ">=13.7.0" + long "^4.0.0" + proxy-addr@~2.0.7: version "2.0.7" resolved "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.7.tgz" @@ -2578,6 +2703,11 @@ pulsar-client@^1.9.0: bindings "^1.5.0" node-addon-api "^4.3.0" +punycode@^2.1.0: + version "2.3.1" + resolved "https://registry.npmjs.org/punycode/-/punycode-2.3.1.tgz" + integrity sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg== + pure-rand@^6.0.0: version "6.0.4" resolved "https://registry.npmjs.org/pure-rand/-/pure-rand-6.0.4.tgz" @@ -2634,6 +2764,11 @@ require-directory@^2.1.1: resolved "https://registry.npmjs.org/require-directory/-/require-directory-2.1.1.tgz" integrity sha512-fGxEI7+wsG9xrvdjsrlmL22OMTTiHRwAMroiEeMgq8gzoLC/PQr7RsRDSTLUg/bZAZtF+TVIkHc6/4RIKrui+Q== +require-from-string@^2.0.2: + version "2.0.2" + resolved "https://registry.npmjs.org/require-from-string/-/require-from-string-2.0.2.tgz" + integrity sha512-Xf0nWe6RseziFMu+Ap9biiUbmplq6S9/p+7w7YXP/JBHhrUDDUhwa+vANyubuqfZWTveU//DYVGsDG7RKL/vEw== + resolve-cwd@^3.0.0: version "3.0.0" resolved "https://registry.npmjs.org/resolve-cwd/-/resolve-cwd-3.0.0.tgz" @@ -2667,7 +2802,7 @@ rimraf@^3.0.2: dependencies: glob "^7.1.3" -safe-buffer@~5.2.0, safe-buffer@5.2.1: +safe-buffer@5.2.1, safe-buffer@~5.2.0: version "5.2.1" resolved "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz" integrity sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ== @@ -2682,31 +2817,12 @@ safe-stable-stringify@^2.3.1: resolved "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz" integrity sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg== -semver@^6.0.0: - version "6.3.1" - resolved "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz" - integrity sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA== - -semver@^6.3.0, semver@^6.3.1: +semver@^6.0.0, semver@^6.3.0, semver@^6.3.1: version "6.3.1" resolved "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz" integrity sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA== -semver@^7.3.5: - version "7.5.4" - resolved "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz" - integrity sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA== - dependencies: - lru-cache "^6.0.0" - -semver@^7.5.3: - version "7.5.4" - resolved "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz" - integrity sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA== - dependencies: - lru-cache "^6.0.0" - -semver@^7.5.4: +semver@^7.3.5, semver@^7.5.3, semver@^7.5.4: version "7.5.4" resolved "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz" integrity sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA== @@ -2859,13 +2975,6 @@ statuses@2.0.1: resolved "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz" integrity sha512-RwNA9Z/7PrK06rYLIzFMlaF+l73iwpzsqRIFgbMLbTcLD6cOao82TaWefPXQvB2fOC4AjuYSEndS7N/mTCbkdQ== -string_decoder@^1.1.1: - version "1.3.0" - resolved "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz" - integrity sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA== - dependencies: - safe-buffer "~5.2.0" - string-argv@^0.3.2: version "0.3.2" resolved "https://registry.npmjs.org/string-argv/-/string-argv-0.3.2.tgz" @@ -2888,6 +2997,13 @@ string-length@^4.0.1: is-fullwidth-code-point "^3.0.0" strip-ansi "^6.0.1" +string_decoder@^1.1.1: + version "1.3.0" + resolved "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz" + integrity sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA== + dependencies: + safe-buffer "~5.2.0" + strip-ansi@^6.0.0, strip-ansi@^6.0.1: version "6.0.1" resolved "https://registry.npmjs.org/strip-ansi/-/strip-ansi-6.0.1.tgz" @@ -3017,7 +3133,7 @@ undici-types@~5.26.4: resolved "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz" integrity sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA== -unpipe@~1.0.0, unpipe@1.0.0: +unpipe@1.0.0, unpipe@~1.0.0: version "1.0.0" resolved "https://registry.npmjs.org/unpipe/-/unpipe-1.0.0.tgz" integrity sha512-pjy2bYhSsufwWlKwPc+l3cN7+wuJlK6uz0YdJEOlQDbl6jo/YlPi4mb8agUkVC8BF7V8NuzeyPNqRksA3hztKQ== @@ -3030,6 +3146,13 @@ update-browserslist-db@^1.0.13: escalade "^3.1.1" picocolors "^1.0.0" +uri-js@^4.2.2: + version "4.4.1" + resolved "https://registry.npmjs.org/uri-js/-/uri-js-4.4.1.tgz" + integrity sha512-7rKUyy33Q1yc98pQ1DAmLtwX109F7TIfWlW1Ydo8Wl1ii1SeHieeh0HHfPeL2fMXK6z0s8ecKs9frCuLJvndBg== + dependencies: + punycode "^2.1.0" + util-deprecate@^1.0.1: version "1.0.2" resolved "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz" @@ -3146,6 +3269,13 @@ wtfnode@^0.9.1: resolved "https://registry.npmjs.org/wtfnode/-/wtfnode-0.9.1.tgz" integrity sha512-Ip6C2KeQPl/F3aP1EfOnPoQk14Udd9lffpoqWDNH3Xt78svxPbv53ngtmtfI0q2Te3oTq79XKTnRNXVIn/GsPA== +xxhashjs@^0.2.2: + version "0.2.2" + resolved "https://registry.npmjs.org/xxhashjs/-/xxhashjs-0.2.2.tgz" + integrity sha512-AkTuIuVTET12tpsVIQo+ZU6f/qDmKuRUcjaqR+OIvm+aCBsZ95i7UVY5WJ9TMsSaZ0DA2WxoZ4acu0sPH+OKAw== + dependencies: + cuint "^0.2.2" + y18n@^5.0.5: version "5.0.8" resolved "https://registry.npmjs.org/y18n/-/y18n-5.0.8.tgz" From ed37ada154a8eeef2f119f00d3bc8e0e7edd6f7f Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Mon, 29 Apr 2024 12:24:17 -0700 Subject: [PATCH 2/3] add support for CloudEvent based dispatching to root API --- src/workflow/StatedWorkflow.js | 10 ++++------ src/workflow/WorkflowDispatcher.js | 3 ++- src/workflow/WorkflowManager.js | 29 +++++++++++++++++++++-------- stated-workflow-api.js | 23 ++++++++++++++++++++++- 4 files changed, 49 insertions(+), 16 deletions(-) diff --git a/src/workflow/StatedWorkflow.js b/src/workflow/StatedWorkflow.js index ee505e7..057b1ff 100644 --- a/src/workflow/StatedWorkflow.js +++ b/src/workflow/StatedWorkflow.js @@ -395,6 +395,9 @@ export class StatedWorkflow { subscribeCOPKafka(subscriptionParams) { const {type, initialOffset = 'earliest', maxConsume = -1} = subscriptionParams; + //make sure a dispatcher exists for the combination of type and subscriberId + this.workflowDispatcher.getDispatcher(subscriptionParams); + // TODO: - validate const kafkaParams = subscriptionParams.client.params; kafkaParams.sasl.password = process.env.KAFKA_SASL_PASSWORD; @@ -435,12 +438,7 @@ export class StatedWorkflow { } const ackFunction = async (data2ack) => { console.log(`acknowledging data: ${StatedREPL.stringify(data)} with data2ack: ${StatedREPL.stringify(data2ack)}`); - // TODO: make the below code working - // const currentOffset = this.templateProcessor.output(subscribeParamsJsonPointer + 'offset',); - // if (currentOffset < message.offset + 1) { - // await consumer.commitOffsets([{ topic, partition, offset: message.offset + 1 }]); - // this.templateProcessor.setData(subscribeParamsJsonPointer + 'offset', message.offset + 1; - // } + // TODO: add ack logic } await this.workflowDispatcher.dispatchToAllSubscribers(type, data, ackFunction); diff --git a/src/workflow/WorkflowDispatcher.js b/src/workflow/WorkflowDispatcher.js index 1ccf923..c6c42d9 100644 --- a/src/workflow/WorkflowDispatcher.js +++ b/src/workflow/WorkflowDispatcher.js @@ -13,6 +13,7 @@ // limitations under the License. import {StatedWorkflow} from "./StatedWorkflow.js"; import jp from "stated-js/dist/src/JsonPointer.js"; +import StatedREPL from "stated-js/dist/src/StatedREPL.js"; // This class is used to add events to a queue and dispatch them to one or more subscribed workflow function with the // given parallelism. Tracks the number of active events and the number of events in the queue. @@ -231,7 +232,7 @@ export class WorkflowDispatcher { // We can acknowledge all data in-flight once we persist the data in the template snapshot async acknowledgeCallbacks() { for (const [data, dataAckCallback] of this.dataAckCallbacks.entries()) { - console.log(`Acknowledging data: ${data}`); + console.log(`Acknowledging data: ${StatedREPL.stringify(data)}`); dataAckCallback(data); } for (const dispatcherKeys of this.dispatchers.values()) { diff --git a/src/workflow/WorkflowManager.js b/src/workflow/WorkflowManager.js index b41bfa1..a81a758 100644 --- a/src/workflow/WorkflowManager.js +++ b/src/workflow/WorkflowManager.js @@ -1,18 +1,21 @@ #!/usr/bin/env node --experimental-vm-modules import { StatedWorkflow } from "./StatedWorkflow.js"; import StatedREPL from "stated-js/dist/src/StatedREPL.js"; -import fs from "fs"; import TemplateProcessor from "stated-js"; - import {WorkflowMetrics} from "./WorkflowMeters.js"; +import fs from "fs"; +import util from "util"; +const mkdir = util.promisify(fs.mkdir); + // WorkflowManager.js.js export class WorkflowManager { constructor() { this.workflows = {}; this.dispatchersByType = {}; this.workflowMetrics = new WorkflowMetrics(); - }t + this.statePath = './.state'; + } async createTypesMap(sw) { if (sw.workflowDispatcher == undefined) { @@ -31,9 +34,9 @@ export class WorkflowManager { async createWorkflow(template, context) { const workflowId = WorkflowManager.generateUniqueId(); const sw = await StatedWorkflow.newWorkflow(template, context, - {cbmon: this.workflowMetrics.monitorCallback(workflowId), - ackOnSnapshot: true}); - sw.templateProcessor.options = {'snapshot': {'snapshotIntervalSeconds': 1, path: `./${workflowId}.json`}}; + {cbmon: this.workflowMetrics.monitorCallback(workflowId), ackOnSnapshot: true}); + sw.templateProcessor.options = {'snapshot': {'snapshotIntervalSeconds': 1, path: `${this.statePath}/${workflowId}.json`}}; + await this.ensureStatePathDir(); this.workflows[workflowId] = sw; await sw.templateProcessor.initialize(template) await this.createTypesMap(sw); @@ -134,12 +137,12 @@ export class WorkflowManager { async getWorkflowSnapshot(workflowId) { console.log(`Reading snapshot object with ID ${workflowId}`); - const snapshotContent = fs.readFileSync(`./${workflowId}.json`, 'utf8'); + const snapshotContent = fs.readFileSync(`${this.statePath}/${workflowId}.json`, 'utf8'); return JSON.parse(snapshotContent); } async restoreWorkflow(workflowId) { - const snapshotContent = fs.readFileSync(`./${workflowId}.json`, 'utf8'); + const snapshotContent = fs.readFileSync(`${this.statePath}/${workflowId}.json`, 'utf8'); const snapshot = JSON.parse(snapshotContent); console.log(`Restoring workflow with ID ${workflowId}`); @@ -169,5 +172,15 @@ export class WorkflowManager { } } + async ensureStatePathDir() { + try { + await mkdir(this.statePath, {recursive: true}); + } catch (error) { + // if the file exists, we can continue without error + if (error.code !== 'EEXIST') { + throw error; + } + } + } } diff --git a/stated-workflow-api.js b/stated-workflow-api.js index ebfb0dd..2af4457 100755 --- a/stated-workflow-api.js +++ b/stated-workflow-api.js @@ -2,6 +2,7 @@ import express from 'express'; import bodyParser from 'body-parser'; import { WorkflowManager } from "./src/workflow/WorkflowManager.js"; +import StatedREPL from "stated-js/dist/src/StatedREPL.js"; const app = express(); app.use(bodyParser.json()); @@ -12,6 +13,26 @@ app.get('/', (req, res) => { res.json({ status: 'OK' }); }); +app.post('/', async (req, res) => { + console.log(`Received POST /event with data: ${StatedREPL.stringify(req.body)}`); + if (req.body !== undefined && req.body.type === 'workflow') { + try { + const workflowId = await workflowManager.createWorkflow(req.body.workflow); + res.json({ workflowId, status: 'Started' }); + } catch (error) { + console.error('Error in POST /workflow:', error); + res.status(500).send({'error': error.toString()}); + } + return; + } + try { + res.json(await workflowManager.sendCloudEvent([req.body])); + } catch (error) { + console.error(`Error in POST /event`, error); + res.status(500).send({'error': error.toString()}); + } +}); + app.post('/workflow', async (req, res) => { try { const workflowId = await workflowManager.createWorkflow(req.body); @@ -107,7 +128,7 @@ app.listen(8080, () => { app.post('/event', async (req, res) => { if (!Array.isArray(req.body)) { - console.log(`data must be an array of events, but received ${req.body}`); + console.log(`data must be an array of events, but received ${StatedREPL.stringify(req.body)}`); res.status(400).send({'error': 'data must be an array of events'}); return; }; From 862980ad29363c90e388faa583563fc248b65f25 Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Fri, 8 Mar 2024 13:44:44 -0800 Subject: [PATCH 3/3] migrate swapi from dev to tech (#21) Co-authored-by: Sergey Sergeev --- README.md | 14 +++---- example/concurrent-homeworlds.json | 2 +- example/dumpster/changePersistence.yaml | 22 +++++++++++ .../dumpster/concurrent-homeworlds-debug.json | 6 +++ example/dumpster/fso-orion.json | 22 +++++++++++ example/dumpster/fso1.json | 6 +++ example/dumpster/pubsub-kafka.yaml | 33 ++++++++++++++++ example/dumpster/pubsub-pulsar-backup.yaml | 31 +++++++++++++++ example/dumpster/test.json | 7 ++++ example/dumpster/test.yaml | 22 +++++++++++ example/dumpster/wf-all.json | 19 +++++++++ example/dumpster/wf-all.yaml | 21 ++++++++++ example/dumpster/wf-continuous.yaml | 22 +++++++++++ example/dumpster/wf-wip.yaml | 39 +++++++++++++++++++ example/dumpster/wf.json | 22 +++++++++++ example/dumpster/workflow-fso.yaml | 32 +++++++++++++++ example/homeworld-scrambled.json | 2 +- example/homeworld.json | 2 +- example/homeworlds-steps-error.json | 2 +- example/homeworlds-steps-with-retry.json | 2 +- example/homeworlds-steps.json | 2 +- example/inhabitants-with-delay.yaml | 18 +++++++-- example/inhabitants.yaml | 2 +- example/joinResistance.yaml | 2 +- example/joinResistanceBug.yaml | 4 +- example/joinResistanceFast.yaml | 2 +- example/joinResistanceRecovery.yaml | 2 +- example/pubsub-data-function-pulsar.yaml | 2 +- example/rebelCommunication.yaml | 2 +- example/resistanceSnapshot.json | 32 +++++++-------- src/test/StatedWorkflow.test.js | 21 +++++----- 31 files changed, 367 insertions(+), 50 deletions(-) create mode 100644 example/dumpster/changePersistence.yaml create mode 100644 example/dumpster/concurrent-homeworlds-debug.json create mode 100644 example/dumpster/fso-orion.json create mode 100644 example/dumpster/fso1.json create mode 100644 example/dumpster/pubsub-kafka.yaml create mode 100644 example/dumpster/pubsub-pulsar-backup.yaml create mode 100644 example/dumpster/test.json create mode 100644 example/dumpster/test.yaml create mode 100644 example/dumpster/wf-all.json create mode 100644 example/dumpster/wf-all.yaml create mode 100644 example/dumpster/wf-continuous.yaml create mode 100644 example/dumpster/wf-wip.yaml create mode 100644 example/dumpster/wf.json create mode 100644 example/dumpster/workflow-fso.yaml diff --git a/README.md b/README.md index 7243941..32455fd 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ template. ```json > .init -f "example/homeworld.json" { - "lukePersonDetails": "${ $fetch('https://swapi.dev/api/people/?search=luke').json().results[0]}", + "lukePersonDetails": "${ $fetch('https://swapi.tech/api/people/?name=luke').json().result[0].properties}", "lukeHomeworldURL": "${ lukePersonDetails.homeworld }", "homeworldDetails": "${ $fetch(lukeHomeworldURL).json() }", "homeworldName": "${ homeworldDetails.name }" @@ -127,7 +127,7 @@ start: ${ (produceParams.data; $millis()) } #record start time, after test datas # producer will be sending some test data produceParams: type: "my-topic" - data: ${['luke', 'han', 'leia', 'R2-D2', 'Owen', 'Biggs', 'Obi-Wan', 'Anakin', 'Chewbacca', 'Wedge'].('https://swapi.dev/api/people/?search='&$)} + data: ${['luke', 'han', 'leia', 'R2-D2', 'Owen', 'Biggs', 'Obi-Wan', 'Anakin', 'Chewbacca', 'Wedge'].('https://swapi.tech/api/people/?search='&$)} client: type: test # the subscriber's 'to' function will be called on each received event @@ -398,7 +398,7 @@ Below command will start the workflow and tail the output until `simulateFailure "function": "/${ \n function($rebel){ \n $rebel ~> $serial(\n [fetchRebel, saveRebel],\n {'workflowInvocation': $rebel} \n ) } }\n" }, "fetchRebel": { - "function": "/${ \n function($rebel){(\n $console.debug('fetchRebel input: ' & $rebel);\n $r := $rebel.$fetch('https://swapi.dev/api/people/?search='&$).json().results[0];\n $console.debug('fetchRebel fetched: ' & $r); \n $set('/fetchLog/-',$rebel);\n $console.debug('logged fetch: ' & $r);\n $r;\n )} \n}\n" + "function": "/${ \n function($rebel){(\n $console.debug('fetchRebel input: ' & $rebel);\n $r := $rebel.$fetch('https://swapi.tech/api/people/?name='&$).json().result[0].properties;\n $console.debug('fetchRebel fetched: ' & $r); \n $set('/fetchLog/-',$rebel);\n $console.debug('logged fetch: ' & $r);\n $r;\n )} \n}\n" }, "saveRebel": { "function": "/${ \n function($rebel){(\n $console.debug('saveRebel input: ' & $rebel);\n ($count(rebels) = 1 and simulateFailure)?(\n $set('/simulateFailure', false); \n $console.log('sleep forever on : ' & $rebel);\n $sleep(1000000);\n );\n $rebel ? $set('/rebels/-',{'name':$rebel.name, 'url':$rebel.homeworld});\n $console.debug('saveRebel saved: ' & {'name':$rebel.name, 'url':$rebel.homeworld});\n )} \n}\n" @@ -427,15 +427,15 @@ get all 3 of them there. [ { "name": "Luke Skywalker", - "url": "https://swapi.dev/api/planets/1/" + "url": "https://www.swapi.tech/api/planets/1" }, { "name": "Han Solo", - "url": "https://swapi.dev/api/planets/22/" + "url": "https://www.swapi.tech/api/planets/22" }, { "name": "Leia Organa", - "url": "https://swapi.dev/api/planets/2/" + "url": "https://www.swapi.tech/api/planets/2" } ] ``` @@ -454,7 +454,7 @@ The following example shows how to use the `shouldRetry` function to retry a ste "connectionError": true, "steps": [ { - "function": "${ function($person){$fetch('https://swapi.dev/api/people/?search='& $person).json().results[0]} }" + "function": "${ function($person){$fetch('https://swapi.tech/api/people/?name='& $person).json().result[0].properties} }" }, { "function": "${ function($personDetail){$personDetail.homeworld } }" diff --git a/example/concurrent-homeworlds.json b/example/concurrent-homeworlds.json index 4627e03..f0584e8 100644 --- a/example/concurrent-homeworlds.json +++ b/example/concurrent-homeworlds.json @@ -1,6 +1,6 @@ { "people": ["luke", "han"], - "personDetails": "!${ people.$fetch('https://swapi.dev/api/people/?search='& $).json().results[0]}", + "personDetails": "!${ people.$fetch('https://swapi.tech/api/people/?search='& $).json().results[0]}", "homeworldURLs": "${ personDetails.homeworld }", "homeworldDetails": "!${ homeworldURLs.$fetch($).json() }", "homeworldName": "${ homeworldDetails.name }" diff --git a/example/dumpster/changePersistence.yaml b/example/dumpster/changePersistence.yaml new file mode 100644 index 0000000..01e5252 --- /dev/null +++ b/example/dumpster/changePersistence.yaml @@ -0,0 +1,22 @@ +# producer will be sending some test data +produceParams: + type: "my-topic" + data: ['luke', 'han', 'leia'] + client: + type: test +# the subscriber's 'to' function will be called on each received event +subscribeParams: #parameters for subscribing to an event + source: cloudEvent + type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events + to: /${joinResistance} + subscriberId: rebelArmy + initialPosition: latest + client: + type: test +joinResistance: /${ function($rebel){ $set('/rebelForces', rebelForces~>$append($rebel))} } +# starts producer function +send$: $publish(produceParams) +# starts consumer function +recv$: $subscribe(subscribeParams) +# the subscriber's `to` function will write the received data here +rebelForces: [ ] diff --git a/example/dumpster/concurrent-homeworlds-debug.json b/example/dumpster/concurrent-homeworlds-debug.json new file mode 100644 index 0000000..7c334e9 --- /dev/null +++ b/example/dumpster/concurrent-homeworlds-debug.json @@ -0,0 +1,6 @@ +{ + "people": ["luke", "han"], + "personDetails": "${ people.$fetch('https://swapi.tech/api/people/?search='& $).json().results[0]}", + "homeworldURLs": "${ personDetails.homeworld }", + "homeworldDetails": "${ homeworldURLs.$fetch( $ ).json() }" +} \ No newline at end of file diff --git a/example/dumpster/fso-orion.json b/example/dumpster/fso-orion.json new file mode 100644 index 0000000..80ac777 --- /dev/null +++ b/example/dumpster/fso-orion.json @@ -0,0 +1,22 @@ +{ + "start$": "$subscribe(subscribeParams, {})", + "name": "nozzleWork", + "subscribeParams": { + "source": "cloudEvent", + "testData": "${ [1].([{'name': 'nozzleTime', 'order':$}]) }", + "type": "my-topic", + "filter$": "function($e){ $e.name='nozzleTime' }", + "to": "../${myWorkflow$}", + "parallelism": 2, + "subscriberId": "../${name}" + }, + "myWorkflow$": "function($e){\n $e ~> $serial([step1, step2])\n}\n", + "step1": { + "name": "primeTheNozzle", + "function": "${function($e){ $e~>|$|{'primed':true}| }}" + }, + "step2": { + "name": "sprayTheNozzle", + "function": "${function($e){ $e~>|$|{'sprayed':true}| }}" + } +} diff --git a/example/dumpster/fso1.json b/example/dumpster/fso1.json new file mode 100644 index 0000000..da493c1 --- /dev/null +++ b/example/dumpster/fso1.json @@ -0,0 +1,6 @@ +{ + "lukePersonDetails": "${ curl -i -X 'GET' -H 'Accept: application/json' -H 'Layer-Id: 55f8da9d-93de-4bf2-b818-c2c2ac1d3f8d' -H 'Layer-Type: TENANT' $APPD_JSON_STORE_URL/v1/objects/extensibility:solution/agent -H'appd-pty: IlVTRVIi' -H'appd-pid: ImZvb0BiYXIuY29tIg=='}", + "lukeHomeworldURL": "${ lukePersonDetails.homeworld }", + "homeworldDetails": "${ $fetch(lukeHomeworldURL).json() }", + "homeworldName": "${ homeworldDetails.name }" +} \ No newline at end of file diff --git a/example/dumpster/pubsub-kafka.yaml b/example/dumpster/pubsub-kafka.yaml new file mode 100644 index 0000000..cbbc1aa --- /dev/null +++ b/example/dumpster/pubsub-kafka.yaml @@ -0,0 +1,33 @@ +# Droid R2-D2 is sending messages to the Rebel Alliance's communication channel +produceParams: + type: "rebel-comm-channel" + data: | + ${ + function(){ + {'message': 'Rebel Fleet Coordinates', 'location': $random()} + } + } + client: + type: pulsar +# Droid C-3PO will intercept and log each received message for the Rebel Alliance +subscribeParams: #parameters for subscribing to a holocomm transmission + source: cloudEvent + type: /${ produceParams.type } # subscribe to the same channel as R2-D2 to intercept messages + to: | + /${ + function($e){( + $set('/interceptedMessages/-', $e); + )} + } + subscriberId: protocolDroid + initialPosition: latest + client: + type: pulsar +# Activates R2-D2's message transmission function every 50 milliseconds +send: "${ $setInterval( function(){ $publish(produceParams)}, 50) }" +# Activates C-3PO's message interception function +recv$: $subscribe(subscribeParams) +# interceptedMessages is where C-3PO will store the results of message decoding +interceptedMessages: [ ] +# This condition stops the operation when interceptedMessages has 10 elements +stop$: ($count(interceptedMessages)>=10?($clearInterval(send);'missionAccomplished'):'operationOngoing') diff --git a/example/dumpster/pubsub-pulsar-backup.yaml b/example/dumpster/pubsub-pulsar-backup.yaml new file mode 100644 index 0000000..4bb6589 --- /dev/null +++ b/example/dumpster/pubsub-pulsar-backup.yaml @@ -0,0 +1,31 @@ +# producer will be sending some test data +produceParams: + type: "my-topic" + data: ['luke', 'han', 'leia'] + client: + type: pulsar +# the subscriber's 'to' function will be called on each received event +subscribeParams: #parameters for subscribing to an event + source: cloudEvent + type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events + to: /${joinResistance} + subscriberId: rebelArmy + initialPosition: latest + client: + type: pulsar +joinResistance: | + /${ + function($rebel){ + $rebel ~> $serial([joinResistanceStep]) + } + } +joinResistanceStep: + name: joinResistanceStep + function: /${ function($rebel){ $set('/rebelForces', rebelForces~>$append($rebel))} } +# starts producer function +send$: $publish(produceParams) +# starts consumer function +recv$: $subscribe(subscribeParams) +# the subscriber's `to` function will write the received data here +rebelForces: [ ] +recover$: $recoverTo(joinResistance) diff --git a/example/dumpster/test.json b/example/dumpster/test.json new file mode 100644 index 0000000..f6f424f --- /dev/null +++ b/example/dumpster/test.json @@ -0,0 +1,7 @@ +{ + "step1": {"function": "${ function($e){ $e~>|$|{'primed':true}|} }"}, + "step2": {"function": "${ function($e){ $e~>|$|{'sprayed':true}|} }"}, + "workflow": "${ function($e){ $e ~> $serial([step1, step2]) } }", + "output": "${ (koink; [{'name': '1'}, {'name': '2'}] ~> $map(workflow))}", + "konik": "some konik" +} \ No newline at end of file diff --git a/example/dumpster/test.yaml b/example/dumpster/test.yaml new file mode 100644 index 0000000..4bc2ebc --- /dev/null +++ b/example/dumpster/test.yaml @@ -0,0 +1,22 @@ +start$: $subscribe(subscribeParams) +name: nozzleWork +subscribeParams: #parameters for subscribing to a cloud event + testData: "${ [1..10000].({'name': 'nozzleTime', 'order':$}) }" + type: sys:cron + filter$: function($e){ $e.name='nozzleTime' } + to: ../${myWorkflow$} + parallelism: 8 + source: cloudEvent + client: + type: test + +myWorkflow$: | + function($e){ + $e ~> $serial([step1, step2]) + } +step1: + name: primeTheNozzle + function: ${function($e){ ($e~>|$|{'primed':true}|) }} +step2: + name: sprayTheNozzle + function: ${function($e){ $e~>|$|{'sprayed':true}| }} diff --git a/example/dumpster/wf-all.json b/example/dumpster/wf-all.json new file mode 100644 index 0000000..660972f --- /dev/null +++ b/example/dumpster/wf-all.json @@ -0,0 +1,19 @@ +{ + "startEvent": "tada", + "a": { + "function": "${ function($in) { ( $console.log($in); [$in, 'a'] ~> $join('->') )} }" + }, + "b": { + "function": "${ function($in) { [$in, 'b'] ~> $join('->') } }" + }, + "c": { + "function": "${ function($in) { ( $console.log($in); [$in, 'c'] ~> $join('->') )} }" + }, + "d": { + "function": "${ function($in) { ( $console.log($in); [$in, 'd'] ~> $join('->') )} }" + }, + "workflow1": "${ function($startEvent) { $startEvent ~> $serial([a, b]) } }", + "workflow1out": "${ workflow1(startEvent)}", + "workflow2": "${ function($startEvent) { $startEvent ~> $parallel([c,d]) } }", + "workflow2out": "${ workflow2(startEvent)}" +} \ No newline at end of file diff --git a/example/dumpster/wf-all.yaml b/example/dumpster/wf-all.yaml new file mode 100644 index 0000000..4c33870 --- /dev/null +++ b/example/dumpster/wf-all.yaml @@ -0,0 +1,21 @@ +start$: $subscribe(subscribeParams, {}) +name: nozzleWork +subscribeParams: #parameters for subscribing to a cloud event + source: cloudEvent + testData: "${ [1].([{'name': 'nozzleTime', 'order':$}]) }" + type: 'my-topic' + filter$: function($e){ $e.name='nozzleTime' } + to: ../${myWorkflow$} + parallelism: 2 + subscriberId: ../${name} +myWorkflow$: | + function($e){ + $e ~> $serial([step1, step2]) + } +step1: + name: primeTheNozzle + function: ${function($e){ $e~>|$|{'primed':true}| }} +step2: + name: sprayTheNozzle + function: ${function($e){ $e~>|$|{'sprayed':true}| }} + diff --git a/example/dumpster/wf-continuous.yaml b/example/dumpster/wf-continuous.yaml new file mode 100644 index 0000000..e263b2c --- /dev/null +++ b/example/dumpster/wf-continuous.yaml @@ -0,0 +1,22 @@ +start$: $subscribe(subscribeParams, {}) +name: nozzleWork +subscribeParams: #parameters for subscribing to a cloud event + source: cloudEvent + testData: "${ [1].([{'name': 'nozzleTime', 'order':$}]) }" + type: 'my-topic' + filter$: function($e){ $e.name='nozzleTime' } + to: ../${myWorkflow$} + parallelism: 2 + subscriberId: ../${name} +myWorkflow$: | + function($e){ + $e ~> $serial([step1, step2]) + } +step1: + name: primeTheNozzle + function: ${function($e){ $e~>|$|{'primed':true}| }} +step2: + name: sprayTheNozzle + function: ${function($e){ $e~>|$|{'sprayed':true}| }} +stop$: ($count(step1)=5?($clearInterval(send$);'done'):'still going') + diff --git a/example/dumpster/wf-wip.yaml b/example/dumpster/wf-wip.yaml new file mode 100644 index 0000000..991171f --- /dev/null +++ b/example/dumpster/wf-wip.yaml @@ -0,0 +1,39 @@ +start1$: "$recover(subscribeParams1) ~> $subscribe(subscribeParams1, {})" +start2$: "$recover(subscribeParams2) ~> $subscribe(subscribeParams2, {})" +name: nozzleWork +subscribeParams1: #parameters for subscribing to a cloud event + source: cloudEvent + testData: "${ [1].([{'name': 'nozzleTime1', 'order':$}]) }" + type: 'my-topic' + filter$: function($e){ $e.name='nozzleTime' } + to: ../${myWorkflow1$} + parallelism: 2 + subscriberId: ../${name} +subscribeParams2: #parameters for subscribing to a cloud event + source: cloudEvent + testData: "${ [1].([{'name': 'nozzleTime2', 'order':$}]) }" + type: 'my-topic' + filter$: function($e){ $e.name='nozzleTime' } + to: ../${myWorkflow2$} + parallelism: 2 + subscriberId: 'nozzleWork2' +myWorkflow1$: | + function($e){ + $e ~> $serial([step1, step2]) + } +myWorkflow2$: | + function($e){ + $e ~> $serial([step3, step4]) + } +step1: + name: primeTheNozzle + function: ${function($e){ $e~>|$|{'primed':true}| }} +step2: + name: sprayTheNozzle + function: ${function($e){ $e~>|$|{'sprayed':true}| }} +step3: + name: primeTheNozzle + function: ${function($e){ $e~>|$|{'primed':true}| }} +step4: + name: sprayTheNozzle + function: ${function($e){ $e~>|$|{'sprayed':true}| }} diff --git a/example/dumpster/wf.json b/example/dumpster/wf.json new file mode 100644 index 0000000..80ac777 --- /dev/null +++ b/example/dumpster/wf.json @@ -0,0 +1,22 @@ +{ + "start$": "$subscribe(subscribeParams, {})", + "name": "nozzleWork", + "subscribeParams": { + "source": "cloudEvent", + "testData": "${ [1].([{'name': 'nozzleTime', 'order':$}]) }", + "type": "my-topic", + "filter$": "function($e){ $e.name='nozzleTime' }", + "to": "../${myWorkflow$}", + "parallelism": 2, + "subscriberId": "../${name}" + }, + "myWorkflow$": "function($e){\n $e ~> $serial([step1, step2])\n}\n", + "step1": { + "name": "primeTheNozzle", + "function": "${function($e){ $e~>|$|{'primed':true}| }}" + }, + "step2": { + "name": "sprayTheNozzle", + "function": "${function($e){ $e~>|$|{'sprayed':true}| }}" + } +} diff --git a/example/dumpster/workflow-fso.yaml b/example/dumpster/workflow-fso.yaml new file mode 100644 index 0000000..85613fe --- /dev/null +++ b/example/dumpster/workflow-fso.yaml @@ -0,0 +1,32 @@ +# producer will be sending data function output to the "type" topic +# curl -i +# -X 'GET' +# -H 'Accept: application/json' +# -H 'Layer-Id: 55f8da9d-93de-4bf2-b818-c2c2ac1d3f8d' +# -H 'Layer-Type: TENANT' $APPD_JSON_STORE_URL/v1/objects/extensibility:solution/agent +# -H'appd-pty: IlVTRVIi' +# -H'appd-pid: ImZvb0BiYXIuY29tIg==' +knowledgeStore: + type: "my-topic" + data: "${ function(){ {'msg': 'hello', 'rando': $random()} } }" + client: + type: test +# producer will be invoking "to" function for each consumed event +subscribeParams: #parameters for subscribing to a cloud event + source: cloudEvent + type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events + to: /${ function($e){( + $set('/rxLog', rxLog~>$append($e)); + )} } + subscriberId: dingus + initialPosition: latest + client: + type: test +# starts producer function +send: "${ $setInterval( function(){ $publish(produceParams)}, 50) }" +# starts consumer function +recv$: $subscribe(subscribeParams) +# rxLog is a field of the template where the consumer function will be storing results of event processing +rxLog: [ ] +# this is a condition that will stop the workflow when rxLog has 5 elements +stop$: ($count(rxLog)=10?($clearInterval(send);'done'):'still going') diff --git a/example/homeworld-scrambled.json b/example/homeworld-scrambled.json index 3e8cfcd..5320df4 100644 --- a/example/homeworld-scrambled.json +++ b/example/homeworld-scrambled.json @@ -1,6 +1,6 @@ { "homeworldName": "${ homeworldDetails.name }", - "lukePersonDetails": "${ $fetch('https://swapi.dev/api/people/?search=luke').json().results[0]}", + "lukePersonDetails": "${ $fetch('https://swapi.tech/api/people/?search=luke').json().results[0]}", "homeworldDetails": "${ $fetch(lukeHomeworldURL).json() }", "lukeHomeworldURL": "${ lukePersonDetails.homeworld }" } \ No newline at end of file diff --git a/example/homeworld.json b/example/homeworld.json index 5f24fc5..425020b 100644 --- a/example/homeworld.json +++ b/example/homeworld.json @@ -1,5 +1,5 @@ { - "lukePersonDetails": "${ $fetch('https://swapi.dev/api/people/?search=luke').json().results[0]}", + "lukePersonDetails": "${ $fetch('https://swapi.tech/api/people/?name=luke').json().result[0].properties}", "lukeHomeworldURL": "${ lukePersonDetails.homeworld }", "homeworldDetails": "${ $fetch(lukeHomeworldURL).json() }", "homeworldName": "${ homeworldDetails.name }" diff --git a/example/homeworlds-steps-error.json b/example/homeworlds-steps-error.json index a82fc32..952b5c2 100644 --- a/example/homeworlds-steps-error.json +++ b/example/homeworlds-steps-error.json @@ -3,7 +3,7 @@ "workflow": "${ function($person){$person~>$serial(steps)} }", "steps": [ { - "function": "${ function($person){$fetch('https://swapi.dev/api/people/?search='& $person).json().results[0]} }" + "function": "${ function($person){$fetch('https://swapi.tech/api/people/?search='& $person).json().results[0]} }" }, { "function": "${ function($personDetail){$personDetail.homeworld & '--broken--'} }" diff --git a/example/homeworlds-steps-with-retry.json b/example/homeworlds-steps-with-retry.json index e99c29e..81ce738 100644 --- a/example/homeworlds-steps-with-retry.json +++ b/example/homeworlds-steps-with-retry.json @@ -4,7 +4,7 @@ "connectionError": true, "steps": [ { - "function": "${ function($person){$fetch('https://swapi.dev/api/people/?search='& $person).json().results[0]} }" + "function": "${ function($person){$fetch('https://swapi.tech/api/people/?name='& $person).json().result[0].properties} }" }, { "function": "${ function($personDetail){$personDetail.homeworld } }" diff --git a/example/homeworlds-steps.json b/example/homeworlds-steps.json index a2de299..4c8ee08 100644 --- a/example/homeworlds-steps.json +++ b/example/homeworlds-steps.json @@ -3,7 +3,7 @@ "workflow": "${ function($person){$person~>$serial(steps)} }", "steps": [ { - "function": "${ function($person){$fetch('https://swapi.dev/api/people/?search='& $person).json().results[0]} }" + "function": "${ function($person){$fetch('https://swapi.tech/api/people/?search='& $person).json().results[0]} }" }, { "function": "${ function($personDetail){$personDetail.homeworld} }" diff --git a/example/inhabitants-with-delay.yaml b/example/inhabitants-with-delay.yaml index 7ca9d27..bff8c84 100644 --- a/example/inhabitants-with-delay.yaml +++ b/example/inhabitants-with-delay.yaml @@ -2,7 +2,7 @@ produceParams: type: "residents" # fetch one page of planet data from the Star Wars API - data: ${[1].($fetch('https://swapi.dev/api/planets/?page=' & $string($)).json().results)} + data: ${[1].($fetch('https://swapi.tech/api/planets/?page=' & $string($)).json().results)} client: type: test subscribeResidents: @@ -16,9 +16,21 @@ subscribeResidents: getResidentsWorkflow: function: /${ function($planetInfo){ $planetInfo ~> $serial([extractResidents, fetchResidents]) } } extractResidents: - function: /${ function($planet){( $sleep($random() * 10) ; $planet.residents.($fetch($).json()) )} } # add a random delay + function: | + /${ + function($planet){( + $sleep($random() * 10) ; + $console.log("extracting residents for planet: " & $planet); + $planet.($fetch("https://swapi.tech/api/people/?homeworld=" & $)).json().results[] + )} + } fetchResidents: - function: /${ function($residents){$residents.($set('/residents/-',{'name':$.name, 'url':$.url}))} } + function: | + /${ + function($residents){ + $residents.($set('/residents/-',{'name':$.name, 'url':$.url})) + } + } residents: [ ] # starts producer function send$: $publish(produceParams) diff --git a/example/inhabitants.yaml b/example/inhabitants.yaml index 45b171b..be8b5c4 100644 --- a/example/inhabitants.yaml +++ b/example/inhabitants.yaml @@ -1,7 +1,7 @@ # producer will be sending some test data produceParams: type: "my-topic" - data: ${[1..6].($fetch('https://swapi.dev/api/planets/?page=' & $string($)).json().results)} + data: ${[1..6].($fetch('https://swapi.tech/api/planets/?page=' & $string($)).json().results)} client: type: test subscribeResidents: diff --git a/example/joinResistance.yaml b/example/joinResistance.yaml index 5bdfe84..cfdb9ee 100644 --- a/example/joinResistance.yaml +++ b/example/joinResistance.yaml @@ -2,7 +2,7 @@ start: ${ (produceParams.data; $millis()) } #record start time, after test datas # producer will be sending some test data produceParams: type: "my-topic" - data: ${['luke', 'han', 'leia', 'R2-D2', 'Owen', 'Biggs', 'Obi-Wan', 'Anakin', 'Chewbacca', 'Wedge'].('https://swapi.dev/api/people/?search='&$)} + data: ${['luke', 'han', 'leia', 'R2-D2', 'Owen', 'Biggs', 'Obi-Wan', 'Anakin', 'Chewbacca', 'Wedge'].('https://swapi.tech/api/people/?search='&$)} client: type: test # the subscriber's 'to' function will be called on each received event diff --git a/example/joinResistanceBug.yaml b/example/joinResistanceBug.yaml index cef3394..541847c 100644 --- a/example/joinResistanceBug.yaml +++ b/example/joinResistanceBug.yaml @@ -2,7 +2,7 @@ start: ${ (produceParams.data; $millis()) } #record start time, after test datas # producer will be sending some test data produceParams: type: "my-topic" - data: ${['luke', 'han', 'leia', 'R2-D2', 'Owen', 'Biggs', 'Obi-Wan', 'Anakin', 'Chewbacca', 'Wedge'].('https://swapi.dev/api/people/?search='&$)} + data: ${['luke', 'han', 'leia', 'R2-D2', 'Owen', 'Biggs', 'Obi-Wan', 'Anakin', 'Chewbacca', 'Wedge'].('https://swapi.tech/api/people/?name='&$)} client: type: test # the subscriber's 'to' function will be called on each received event @@ -18,7 +18,7 @@ subscribeParams: #parameters for subscribing to an event joinResistance: | /${ function($url){( - $rebel := $fetch($url).json().results[0].name; + $rebel := $fetch($url).json().result[0].properties.name; $set( "/rebelForces", $rebelForces~>$append($rebel)) /* BUG!! */ )} } diff --git a/example/joinResistanceFast.yaml b/example/joinResistanceFast.yaml index f253cbe..59701cb 100644 --- a/example/joinResistanceFast.yaml +++ b/example/joinResistanceFast.yaml @@ -2,7 +2,7 @@ start: ${ (produceParams.data; $millis()) } #record start time, after test datas # producer will be sending some test data produceParams: type: "my-topic" - data: ${['luke', 'han', 'leia', 'R2-D2', 'Owen', 'Biggs', 'Obi-Wan', 'Anakin', 'Chewbacca', 'Wedge'].('https://swapi.dev/api/people/?search='&$)} + data: ${['luke', 'han', 'leia', 'R2-D2', 'Owen', 'Biggs', 'Obi-Wan', 'Anakin', 'Chewbacca', 'Wedge'].('https://swapi.tech/api/people/?search='&$)} client: type: test # the subscriber's 'to' function will be called on each received event diff --git a/example/joinResistanceRecovery.yaml b/example/joinResistanceRecovery.yaml index 136fa67..18786c0 100644 --- a/example/joinResistanceRecovery.yaml +++ b/example/joinResistanceRecovery.yaml @@ -29,7 +29,7 @@ fetchRebel: /${ function($rebel){( $console.debug('fetchRebel input: ' & $rebel); - $r := $rebel.$fetch('https://swapi.dev/api/people/?search='&$).json().results[0]; + $r := $rebel.$fetch('https://swapi.tech/api/people/?name='&$).json().result[0].properties; $console.debug('fetchRebel fetched: ' & $r); $set('/fetchLog/-',$rebel); $console.debug('logged fetch: ' & $r); diff --git a/example/pubsub-data-function-pulsar.yaml b/example/pubsub-data-function-pulsar.yaml index f6f9f19..625d492 100644 --- a/example/pubsub-data-function-pulsar.yaml +++ b/example/pubsub-data-function-pulsar.yaml @@ -28,7 +28,7 @@ step2: ${ function($e){( { 'sender': $e.sender, - 'name': $fetch('https://swapi.dev/api/people/'& $e.sender).json().name, + 'name': $fetch('https://swapi.tech/api/people/'& $e.sender).json().name, 'location': $e.location } )} } diff --git a/example/rebelCommunication.yaml b/example/rebelCommunication.yaml index 1ae860d..ba81180 100644 --- a/example/rebelCommunication.yaml +++ b/example/rebelCommunication.yaml @@ -29,7 +29,7 @@ step2: ${ function($e){( { 'sender': $e.sender, - 'name': $fetch('https://swapi.dev/api/people/'& $e.sender).json().name, + 'name': $fetch('https://swapi.tech/api/people/'& $e.sender).json().name, 'location': $e.location } )} } diff --git a/example/resistanceSnapshot.json b/example/resistanceSnapshot.json index 5e0e838..affd985 100644 --- a/example/resistanceSnapshot.json +++ b/example/resistanceSnapshot.json @@ -28,7 +28,7 @@ "function": "/${ \n function($rebel){ \n $rebel ~> $serial(\n [fetchRebel, saveRebel],\n {'workflowInvocation': $rebel} \n ) } }\n" }, "fetchRebel": { - "function": "/${ \n function($rebel){(\n $console.debug('fetchRebel input: ' & $rebel);\n $r := $rebel.$fetch('https://swapi.dev/api/people/?search='&$).json().results[0];\n $console.debug('fetchRebel fetched: ' & $r); \n $set('/fetchLog/-',$rebel);\n $console.debug('logged fetch: ' & $r);\n $r;\n )} \n}\n" + "function": "/${ \n function($rebel){(\n $console.debug('fetchRebel input: ' & $rebel);\n $r := $rebel.$fetch('https://swapi.tech/api/people/?search='&$).json().results[0];\n $console.debug('fetchRebel fetched: ' & $r); \n $set('/fetchLog/-',$rebel);\n $console.debug('logged fetch: ' & $r);\n $r;\n )} \n}\n" }, "saveRebel": { "function": "/${ \n function($rebel){(\n $console.debug('saveRebel input: ' & $rebel);\n ($count(rebels) = 1 and simulateFailure)?(\n $set('/simulateFailure', false); \n $console.log('sleep forever on : ' & $rebel);\n $sleep(1000000);\n );\n $rebel ? $set('/rebels/-',{'name':$rebel.name, 'url':$rebel.homeworld});\n $console.debug('saveRebel saved: ' & {'name':$rebel.name, 'url':$rebel.homeworld});\n )} \n}\n" @@ -91,21 +91,21 @@ "eye_color": "brown", "birth_year": "29BBY", "gender": "male", - "homeworld": "https://swapi.dev/api/planets/22/", + "homeworld": "https://swapi.tech/api/planets/22/", "films": [ - "https://swapi.dev/api/films/1/", - "https://swapi.dev/api/films/2/", - "https://swapi.dev/api/films/3/" + "https://swapi.tech/api/films/1/", + "https://swapi.tech/api/films/2/", + "https://swapi.tech/api/films/3/" ], "species": [], "vehicles": [], "starships": [ - "https://swapi.dev/api/starships/10/", - "https://swapi.dev/api/starships/22/" + "https://swapi.tech/api/starships/10/", + "https://swapi.tech/api/starships/22/" ], "created": "2014-12-10T16:49:14.582000Z", "edited": "2014-12-20T21:17:50.334000Z", - "url": "https://swapi.dev/api/people/14/" + "url": "https://swapi.tech/api/people/14/" } } } @@ -126,21 +126,21 @@ "eye_color": "brown", "birth_year": "29BBY", "gender": "male", - "homeworld": "https://swapi.dev/api/planets/22/", + "homeworld": "https://swapi.tech/api/planets/22/", "films": [ - "https://swapi.dev/api/films/1/", - "https://swapi.dev/api/films/2/", - "https://swapi.dev/api/films/3/" + "https://swapi.tech/api/films/1/", + "https://swapi.tech/api/films/2/", + "https://swapi.tech/api/films/3/" ], "species": [], "vehicles": [], "starships": [ - "https://swapi.dev/api/starships/10/", - "https://swapi.dev/api/starships/22/" + "https://swapi.tech/api/starships/10/", + "https://swapi.tech/api/starships/22/" ], "created": "2014-12-10T16:49:14.582000Z", "edited": "2014-12-20T21:17:50.334000Z", - "url": "https://swapi.dev/api/people/14/" + "url": "https://swapi.tech/api/people/14/" } } } @@ -151,7 +151,7 @@ "rebels": [ { "name": "Luke Skywalker", - "url": "https://swapi.dev/api/planets/1/" + "url": "https://swapi.tech/api/planets/1/" } ], "fetchLog": [ diff --git a/src/test/StatedWorkflow.test.js b/src/test/StatedWorkflow.test.js index 7e697a9..484c354 100644 --- a/src/test/StatedWorkflow.test.js +++ b/src/test/StatedWorkflow.test.js @@ -977,15 +977,16 @@ test("Snapshot and recover for workflow", async () => { logWithDate(`Recovering from a snapshot with ${snapshot.output.residents.length} residents`); await tp.initialize(snapshot.template, '/', snapshot.output); - // Calculate unique residents - let uniqResidents = 0; + // Calculate residents + let residents = 0; do { - uniqResidents = Object.keys(tp.output?.residents.reduce((counts, o)=>{ counts[o.name] = (counts[o.name] || 0) + 1; return counts }, {})).length; - logWithDate(`Got ${uniqResidents} unique residents processed`); + // uniqResidents = Object.keys(tp.output?.residents.reduce((counts, o)=>{ counts[o.name] = (counts[o.name] || 0) + 1; return counts }, {})).length; + residents = tp.output?.residents; + logWithDate(`Got ${residents} unique residents processed`); await new Promise(resolve => setTimeout(resolve, 1000)); - } while (uniqResidents < 32) + } while (residents < 32) - logWithDate(`We got ${uniqResidents} unique residents processed with ${tp.output.residents.length} total residents`); + logWithDate(`We got ${residents} unique residents processed with ${tp.output.residents.length} total residents`); await sw.close(); logWithDate("Stopped stated workflow before test end"); }, 20000); // 20s timeout for times swapi not behaving @@ -1111,7 +1112,7 @@ test("workflow snapshot and restore", async () => { // acknowledged. expect(snapshot.output.rebels.length).toEqual(1); expect(StatedREPL.stringify(snapshot.output.rebels)).toEqual(StatedREPL.stringify([ - {"name": "Luke Skywalker", "url": "https://swapi.dev/api/planets/1/"}])); + {"name": "Luke Skywalker", "url": "https://www.swapi.tech/api/planets/1"}])); expect(StatedREPL.stringify(snapshot.output.subscribeParams.acks)).toEqual(StatedREPL.stringify(["luke"])); // the output should also have a log for 'han' invocationId complete for fetchRebel expect (snapshot.output.fetchRebel.log.han.end).toBeDefined(); @@ -1144,9 +1145,9 @@ test("workflow snapshot and restore", async () => { // Validate each rebel saved exactly once expect(StatedREPL.stringify(tp.output.rebels)).toEqual(StatedREPL.stringify([ - {"name": "Luke Skywalker", "url": "https://swapi.dev/api/planets/1/"}, - {"name": "Han Solo", "url": "https://swapi.dev/api/planets/22/"}, - {"name": "Leia Organa", "url": "https://swapi.dev/api/planets/2/"} + {"name": "Luke Skywalker", "url": "https://www.swapi.tech/api/planets/1"}, + {"name": "Han Solo", "url": "https://www.swapi.tech/api/planets/22"}, + {"name": "Leia Organa", "url": "https://www.swapi.tech/api/planets/2"} ])); // Expect that each rebel data was acknowledged once. expect(StatedREPL.stringify(tp.output.subscribeParams.acks)).toEqual(StatedREPL.stringify(["luke", "han", "leia"]));