From b7ecdc8f32814e871bf2d578ed93d3a729472111 Mon Sep 17 00:00:00 2001 From: Geoffrey Hendrey Date: Tue, 20 Feb 2024 12:06:47 -0800 Subject: [PATCH 01/19] wip. Try '.init -f example/backpressure-wf.yaml --options={snapshot:{}}' --- example/backpressure-wf.yaml | 25 +++++++ yarn.lock | 135 ++++++++++------------------------- 2 files changed, 62 insertions(+), 98 deletions(-) create mode 100644 example/backpressure-wf.yaml diff --git a/example/backpressure-wf.yaml b/example/backpressure-wf.yaml new file mode 100644 index 0000000..45b171b --- /dev/null +++ b/example/backpressure-wf.yaml @@ -0,0 +1,25 @@ +# producer will be sending some test data +produceParams: + type: "my-topic" + data: ${[1..6].($fetch('https://swapi.dev/api/planets/?page=' & $string($)).json().results)} + client: + type: test +subscribeResidents: + source: cloudEvent + type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events + to: /${ getResidentsWorkflow } + subscriberId: subscribeResidents + parallelism: 4 + client: + type: test +getResidentsWorkflow: + function: /${ function($planetInfo){ $planetInfo ~> $serial([extractResidents, fetchResidents]) } } +extractResidents: + function: /${ function($planet){$planet.residents.($fetch($).json())} } +fetchResidents: + function: /${ function($resident){$resident?$set('/residents/-',{'name':$resident.name, 'url':$resident.url})} } +residents: [ ] +# starts producer function +send$: $publish(produceParams) +recv$: $subscribe(subscribeResidents) + diff --git a/yarn.lock b/yarn.lock index a830643..c517b9d 100644 --- a/yarn.lock +++ b/yarn.lock @@ -23,7 +23,7 @@ resolved "https://registry.npmjs.org/@babel/compat-data/-/compat-data-7.23.5.tgz" integrity sha512-uU27kfDRlhfKl+w1U6vp16IuvSLtjAxdArVXPa9BvLkrr7CYIsxH5adpHObeAGY/41+syctUWOZ140a2Rvkgjw== -"@babel/core@^7.0.0", "@babel/core@^7.0.0-0", "@babel/core@^7.11.6", "@babel/core@^7.12.3", "@babel/core@^7.8.0": +"@babel/core@^7.11.6", "@babel/core@^7.12.3": version "7.23.7" resolved "https://registry.npmjs.org/@babel/core/-/core-7.23.7.tgz" integrity sha512-+UpDgowcmqe36d4NwqvKsyPMlOLNGMsfMmQ5WGCu+siCe3t3dfe9njrzGfdN4qq+bcNUt0+Vw6haRxBOycs4dw== @@ -297,7 +297,7 @@ resolved "https://registry.npmjs.org/@bcoe/v8-coverage/-/v8-coverage-0.2.3.tgz" integrity sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw== -"@colors/colors@^1.6.0", "@colors/colors@1.6.0": +"@colors/colors@1.6.0", "@colors/colors@^1.6.0": version "1.6.0" resolved "https://registry.npmjs.org/@colors/colors/-/colors-1.6.0.tgz" integrity sha512-Ir+AOibqzrIsL6ajt3Rz3LskB7OiMVHqltZmspbW/TJuTVuyOMirVqAkjfY6JISiLHgyNqicAC8AyHHGzNd/dA== @@ -839,13 +839,13 @@ bindings@^1.5.0: dependencies: file-uri-to-path "1.0.0" -body-parser@^1.20.2: - version "1.20.2" - resolved "https://registry.npmjs.org/body-parser/-/body-parser-1.20.2.tgz" - integrity sha512-ml9pReCu3M61kGlqoTm2umSXTlRTuGTx0bfYj+uIUKKYycG5NtSbeetV3faSU6R7ajOPw0g/J1PvK4qNy7s5bA== +body-parser@1.20.1: + version "1.20.1" + resolved "https://registry.npmjs.org/body-parser/-/body-parser-1.20.1.tgz" + integrity sha512-jWi7abTbYwajOytWCQc37VulmWiRae5RyTpaCyDcS5/lMdtwSz5lOpDE67srw/HYe35f1z3fDQw+3txg7gNtWw== dependencies: bytes "3.1.2" - content-type "~1.0.5" + content-type "~1.0.4" debug "2.6.9" depd "2.0.0" destroy "1.2.0" @@ -853,17 +853,17 @@ body-parser@^1.20.2: iconv-lite "0.4.24" on-finished "2.4.1" qs "6.11.0" - raw-body "2.5.2" + raw-body "2.5.1" type-is "~1.6.18" unpipe "1.0.0" -body-parser@1.20.1: - version "1.20.1" - resolved "https://registry.npmjs.org/body-parser/-/body-parser-1.20.1.tgz" - integrity sha512-jWi7abTbYwajOytWCQc37VulmWiRae5RyTpaCyDcS5/lMdtwSz5lOpDE67srw/HYe35f1z3fDQw+3txg7gNtWw== +body-parser@^1.20.2: + version "1.20.2" + resolved "https://registry.npmjs.org/body-parser/-/body-parser-1.20.2.tgz" + integrity sha512-ml9pReCu3M61kGlqoTm2umSXTlRTuGTx0bfYj+uIUKKYycG5NtSbeetV3faSU6R7ajOPw0g/J1PvK4qNy7s5bA== dependencies: bytes "3.1.2" - content-type "~1.0.4" + content-type "~1.0.5" debug "2.6.9" depd "2.0.0" destroy "1.2.0" @@ -871,7 +871,7 @@ body-parser@1.20.1: iconv-lite "0.4.24" on-finished "2.4.1" qs "6.11.0" - raw-body "2.5.1" + raw-body "2.5.2" type-is "~1.6.18" unpipe "1.0.0" @@ -890,7 +890,7 @@ braces@^3.0.2: dependencies: fill-range "^7.0.1" -browserslist@^4.22.2, "browserslist@>= 4.21.0": +browserslist@^4.22.2: version "4.22.2" resolved "https://registry.npmjs.org/browserslist/-/browserslist-4.22.2.tgz" integrity sha512-0UgcrvQmBDvZHFGdYUehrCNIazki7/lUP3kkoi/r3YB2amZbFM9J43ZRkJTXBUZK4gmx56+Sqk9+Vs9mwZx9+A== @@ -946,16 +946,7 @@ caniuse-lite@^1.0.30001565: resolved "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001576.tgz" integrity sha512-ff5BdakGe2P3SQsMsiqmt1Lc8221NR1VzHj5jXN5vBny9A6fpze94HiVV/n7XRosOlsShJcvMv5mdnpjOGCEgg== -chalk@^2.4.1: - version "2.4.2" - resolved "https://registry.npmjs.org/chalk/-/chalk-2.4.2.tgz" - integrity sha512-Mti+f9lpJNcwF4tWV8/OrTTtF1gZi+f8FqlyAdouralcFWFQWF2+NgCHShjkCb+IFBLq9buZwE1xckQU4peSuQ== - dependencies: - ansi-styles "^3.2.1" - escape-string-regexp "^1.0.5" - supports-color "^5.3.0" - -chalk@^2.4.2: +chalk@^2.4.1, chalk@^2.4.2: version "2.4.2" resolved "https://registry.npmjs.org/chalk/-/chalk-2.4.2.tgz" integrity sha512-Mti+f9lpJNcwF4tWV8/OrTTtF1gZi+f8FqlyAdouralcFWFQWF2+NgCHShjkCb+IFBLq9buZwE1xckQU4peSuQ== @@ -1016,14 +1007,7 @@ collect-v8-coverage@^1.0.0: resolved "https://registry.npmjs.org/collect-v8-coverage/-/collect-v8-coverage-1.0.2.tgz" integrity sha512-lHl4d5/ONEbLlJvaJNtsF/Lz+WvB07u2ycqTYbdrq7UypDXailES4valYb2eWiJFxZlVmpGekfqoxQhzyFdT4Q== -color-convert@^1.9.0: - version "1.9.3" - resolved "https://registry.npmjs.org/color-convert/-/color-convert-1.9.3.tgz" - integrity sha512-QfAUtd+vFdAtFQcC8CCyYt1fYWxSqAiK2cSD6zDB8N3cpsEBAvRxp9zOGg6G/SHHJYAT88/az/IuDGALsNVbGg== - dependencies: - color-name "1.1.3" - -color-convert@^1.9.3: +color-convert@^1.9.0, color-convert@^1.9.3: version "1.9.3" resolved "https://registry.npmjs.org/color-convert/-/color-convert-1.9.3.tgz" integrity sha512-QfAUtd+vFdAtFQcC8CCyYt1fYWxSqAiK2cSD6zDB8N3cpsEBAvRxp9zOGg6G/SHHJYAT88/az/IuDGALsNVbGg== @@ -1037,16 +1021,16 @@ color-convert@^2.0.1: dependencies: color-name "~1.1.4" -color-name@^1.0.0, color-name@~1.1.4: - version "1.1.4" - resolved "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz" - integrity sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA== - color-name@1.1.3: version "1.1.3" resolved "https://registry.npmjs.org/color-name/-/color-name-1.1.3.tgz" integrity sha512-72fSenhMw2HZMTVHeCA9KCmpEIbzWiQsjN+BHcBbS9vr1mtt+vJjPdksIBNUmKAW8TFUDPJK5SUU3QhE9NEXDw== +color-name@^1.0.0, color-name@~1.1.4: + version "1.1.4" + resolved "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz" + integrity sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA== + color-string@^1.6.0: version "1.9.1" resolved "https://registry.npmjs.org/color-string/-/color-string-1.9.1.tgz" @@ -1135,27 +1119,6 @@ cross-spawn@^7.0.3: shebang-command "^2.0.0" which "^2.0.1" -debug@^4.1.0: - version "4.3.4" - resolved "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz" - integrity sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ== - dependencies: - ms "2.1.2" - -debug@^4.1.1: - version "4.3.4" - resolved "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz" - integrity sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ== - dependencies: - ms "2.1.2" - -debug@^4.3.1: - version "4.3.4" - resolved "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz" - integrity sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ== - dependencies: - ms "2.1.2" - debug@2.6.9: version "2.6.9" resolved "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz" @@ -1163,7 +1126,7 @@ debug@2.6.9: dependencies: ms "2.0.0" -debug@4: +debug@4, debug@^4.1.0, debug@^4.1.1, debug@^4.3.1: version "4.3.4" resolved "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz" integrity sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ== @@ -1616,7 +1579,7 @@ inflight@^1.0.4: once "^1.3.0" wrappy "1" -inherits@^2.0.3, inherits@2, inherits@2.0.4: +inherits@2, inherits@2.0.4, inherits@^2.0.3: version "2.0.4" resolved "https://registry.npmjs.org/inherits/-/inherits-2.0.4.tgz" integrity sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ== @@ -1925,7 +1888,7 @@ jest-resolve-dependencies@^29.7.0: jest-regex-util "^29.6.3" jest-snapshot "^29.7.0" -jest-resolve@*, jest-resolve@^29.7.0: +jest-resolve@^29.7.0: version "29.7.0" resolved "https://registry.npmjs.org/jest-resolve/-/jest-resolve-29.7.0.tgz" integrity sha512-IOVhZSrg+UvVAshDSDtHyFCCBUl/Q3AAJv8iZ6ZjnZ74xzvwuzLXid9IIIPgTnY62SJjfuupMKZsZQRsCvxEgA== @@ -2308,11 +2271,6 @@ mkdirp@^1.0.3: resolved "https://registry.npmjs.org/mkdirp/-/mkdirp-1.0.4.tgz" integrity sha512-vVqVZQyf3WLx2Shd0qJ9xuvqgAyKPLAiqITEtqW0oIUjzo3PePDd6fW9iFz30ef7Ysp/oiWqbhszeGWW2T6Gzw== -ms@^2.1.1: - version "2.1.3" - resolved "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz" - integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA== - ms@2.0.0: version "2.0.0" resolved "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz" @@ -2323,7 +2281,7 @@ ms@2.1.2: resolved "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz" integrity sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w== -ms@2.1.3: +ms@2.1.3, ms@^2.1.1: version "2.1.3" resolved "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz" integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA== @@ -2648,7 +2606,7 @@ rimraf@^3.0.2: dependencies: glob "^7.1.3" -safe-buffer@~5.2.0, safe-buffer@5.2.1: +safe-buffer@5.2.1, safe-buffer@~5.2.0: version "5.2.1" resolved "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz" integrity sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ== @@ -2663,31 +2621,12 @@ safe-stable-stringify@^2.3.1: resolved "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz" integrity sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg== -semver@^6.0.0: - version "6.3.1" - resolved "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz" - integrity sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA== - -semver@^6.3.0, semver@^6.3.1: +semver@^6.0.0, semver@^6.3.0, semver@^6.3.1: version "6.3.1" resolved "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz" integrity sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA== -semver@^7.3.5: - version "7.5.4" - resolved "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz" - integrity sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA== - dependencies: - lru-cache "^6.0.0" - -semver@^7.5.3: - version "7.5.4" - resolved "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz" - integrity sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA== - dependencies: - lru-cache "^6.0.0" - -semver@^7.5.4: +semver@^7.3.5, semver@^7.5.3, semver@^7.5.4: version "7.5.4" resolved "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz" integrity sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA== @@ -2840,13 +2779,6 @@ statuses@2.0.1: resolved "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz" integrity sha512-RwNA9Z/7PrK06rYLIzFMlaF+l73iwpzsqRIFgbMLbTcLD6cOao82TaWefPXQvB2fOC4AjuYSEndS7N/mTCbkdQ== -string_decoder@^1.1.1: - version "1.3.0" - resolved "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz" - integrity sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA== - dependencies: - safe-buffer "~5.2.0" - string-argv@^0.3.2: version "0.3.2" resolved "https://registry.npmjs.org/string-argv/-/string-argv-0.3.2.tgz" @@ -2869,6 +2801,13 @@ string-length@^4.0.1: is-fullwidth-code-point "^3.0.0" strip-ansi "^6.0.1" +string_decoder@^1.1.1: + version "1.3.0" + resolved "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz" + integrity sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA== + dependencies: + safe-buffer "~5.2.0" + strip-ansi@^6.0.0, strip-ansi@^6.0.1: version "6.0.1" resolved "https://registry.npmjs.org/strip-ansi/-/strip-ansi-6.0.1.tgz" @@ -2998,7 +2937,7 @@ undici-types@~5.26.4: resolved "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz" integrity sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA== -unpipe@~1.0.0, unpipe@1.0.0: +unpipe@1.0.0, unpipe@~1.0.0: version "1.0.0" resolved "https://registry.npmjs.org/unpipe/-/unpipe-1.0.0.tgz" integrity sha512-pjy2bYhSsufwWlKwPc+l3cN7+wuJlK6uz0YdJEOlQDbl6jo/YlPi4mb8agUkVC8BF7V8NuzeyPNqRksA3hztKQ== From 12c26f658e16db825a5b9205e73eb608f8e81cb5 Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Wed, 21 Feb 2024 12:40:06 -0800 Subject: [PATCH 02/19] snapshotting test++ --- example/backpressure-wf.yaml | 4 +- src/test/StatedWorkflow.test.js | 78 ++++++++++++++++++++++++++++-- src/workflow/StatedWorkflow.js | 12 ++++- src/workflow/WorkflowDispatcher.js | 3 +- 4 files changed, 87 insertions(+), 10 deletions(-) diff --git a/example/backpressure-wf.yaml b/example/backpressure-wf.yaml index 45b171b..61fb63f 100644 --- a/example/backpressure-wf.yaml +++ b/example/backpressure-wf.yaml @@ -15,9 +15,9 @@ subscribeResidents: getResidentsWorkflow: function: /${ function($planetInfo){ $planetInfo ~> $serial([extractResidents, fetchResidents]) } } extractResidents: - function: /${ function($planet){$planet.residents.($fetch($).json())} } + function: /${ function($planet){( $sleep($random() * 100) ; $planet.residents.($fetch($).json()) )} } fetchResidents: - function: /${ function($resident){$resident?$set('/residents/-',{'name':$resident.name, 'url':$resident.url})} } + function: /${ function($residents){$residents.($set('/residents/-',{'name':$.name, 'url':$.url}))} } residents: [ ] # starts producer function send$: $publish(produceParams) diff --git a/src/test/StatedWorkflow.test.js b/src/test/StatedWorkflow.test.js index a7dd914..82619d1 100644 --- a/src/test/StatedWorkflow.test.js +++ b/src/test/StatedWorkflow.test.js @@ -775,7 +775,7 @@ test.skip("Template Data Change Callback with rate limit", async () => { expect(counts).toEqual([0,10]); }); -/* + const isMacOS = process.platform === 'darwin'; if (isMacOS) { test("Pulsar consumer integration test", async () => { @@ -785,7 +785,7 @@ if (isMacOS) { const {templateProcessor: tp} = await StatedWorkflow.newWorkflow(template); // keep steps execution logs for debugging - tp.options = {'keepLogs': true} + tp.options = {'keepLogs': true, 'snapshot': {'snapshotIntervalSeconds': 0.01}}; await tp.initialize(); @@ -871,8 +871,6 @@ if (isMacOS) { }, 300000) } - */ - test("backpressure due to max parallelism", async () => { // Load the YAML from the file @@ -904,3 +902,75 @@ test("backpressure due to max parallelism", async () => { }); +test("serial workflow with backpressure", async () => { + + // Load the YAML from the file + const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'backpressure-wf.yaml'); + const templateYaml = fs.readFileSync(yamlFilePath, 'utf8'); + var template = yaml.load(templateYaml); + const sw = await StatedWorkflow.newWorkflow(template); + const {templateProcessor:tp} = sw; + tp.options = {'snapshot': {'snapshotIntervalSeconds': 0.01}} + + const defaultSnapshotPath = path.join(__dirname, '../', '../','defaultSnapshot.json'); + + // make sure default snapshot is deleted before the test + try { + await unlink(defaultSnapshotPath); + console.log(`${new Date().toISOString()}: Deleted previous default snapshot file: ${defaultSnapshotPath}`) + } catch (e) { + if (e.code !== 'ENOENT') { + throw e; + } + } + + await tp.initialize(); + console.log(`${new Date().toISOString()}: initialized stated workflow template...`); + + let anyResidentsSnapshotted = false; + let snapshot; + while (!anyResidentsSnapshotted) { + + try { + const snapshotContent = fs.readFileSync(defaultSnapshotPath, 'utf8'); + snapshot = JSON.parse(snapshotContent); + console.log(`${new Date().toISOString()}: Snapshot has ${snapshot.output.residents.length} residents`); + if (snapshot.output?.residents?.length > 10) { + anyResidentsSnapshotted = true; + break; + } + } catch (e) { + console.log(`${new Date().toISOString()}: Error checking snapshotted residents: ${e.message}`); + } + await new Promise(resolve => setTimeout(resolve, 1000)); // Poll every 50ms + } + + // kill the wf + console.log(`${new Date().toISOString()}: Stopping stated workflow...`); + await sw.close(); + console.log(`${new Date().toISOString()}: Stopped stated workflow, with ${tp.output.residents.length} residents`); + + + // restore from snapshot + // create a new template object + // var template = yaml.load(templateYaml); + // read the snapshot + + console.log(`${new Date().toISOString()}: Recovering from a snapshot with ${snapshot.output.residents.length} residents`) + await tp.initialize(snapshot.template, '/', snapshot.output); + + // 82 unique residents + + // tp.output?.residents.reduce((counts, o)=>{ counts[o.name] = (counts[o.name] || 0) + 1; return counts }, {}).size(); + + let uniqResidents = 0; + do { + uniqResidents = Object.keys(tp.output?.residents.reduce((counts, o)=>{ counts[o.name] = (counts[o.name] || 0) + 1; return counts }, {})).length + console.log(`${new Date().toISOString()}: Got ${uniqResidents} unique residents processed`); + await new Promise(resolve => setTimeout(resolve, 1000)); // Poll every 50ms + } while (uniqResidents < 82) + + console.log(`${new Date().toISOString()}: We got ${uniqResidents} unique residents processed with ${tp.output.residents.length} total residents`); +}, 100000); + + diff --git a/src/workflow/StatedWorkflow.js b/src/workflow/StatedWorkflow.js index dc180b3..4d29a84 100644 --- a/src/workflow/StatedWorkflow.js +++ b/src/workflow/StatedWorkflow.js @@ -30,6 +30,7 @@ import fs from "fs"; import path from "path"; import {Delay} from "../test/TestTools.js" import {Snapshot} from "./Snapshot.js"; +import {rateLimit} from "stated-js/dist/src/utils/rateLimit.js"; const writeFile = util.promisify(fs.writeFile); const basePath = path.join(process.cwd(), '.state'); @@ -89,7 +90,8 @@ export class StatedWorkflow { this.templateProcessor.initCallbacks = [ // --- clear the dispatcher --- ()=>{this.workflowDispatcher && this.workflowDispatcher.clear()}, - //--- start periodic snapshotting --- + //--- add rateLimited --- + // ()=>{ ()=>{ const {snapshot: snapshotOpts} = this.templateProcessor.options; if(!snapshotOpts){ @@ -365,11 +367,15 @@ export class StatedWorkflow { try { data = await consumer.receive(); let obj; + let messageId; try { const str = data.getData().toString(); + messageId = data.getMessageId(); obj = JSON.parse(str); } catch (error) { console.error("unable to parse data to json:", error); + // TODO - should we acknowledge the message here? + continue; } let resolve; this.latch = new Promise((_resolve) => { @@ -379,13 +385,15 @@ export class StatedWorkflow { this.templateProcessor.setDataChangeCallback('/', async (data, jsonPtrs, removed) => { for (let jsonPtr of jsonPtrs) { if (/^\/step\d+\/log\/.*$/.test(jsonPtr)) { - await writeFile(path.join(basePath,'template.json') , StatedREPL.stringify(data), 'utf8'); + fs.writeFileSync(path.join(basePath,'template.json') , StatedREPL.stringify(data), 'utf8'); } if (/^\/step1\/log\/.*$/.test(jsonPtr)) { // TODO: await persist the step const dataThatChanged = jp.get(data, jsonPtr); if (dataThatChanged.start !== undefined && dataThatChanged.end === undefined) { resolve(); + // consumer.acknowledgeId(data); + consumer.acknowledgeId(messageId); } } } diff --git a/src/workflow/WorkflowDispatcher.js b/src/workflow/WorkflowDispatcher.js index 103d383..c25fac5 100644 --- a/src/workflow/WorkflowDispatcher.js +++ b/src/workflow/WorkflowDispatcher.js @@ -142,8 +142,7 @@ export class WorkflowDispatcher { return new Promise(async (resolve, reject) => { const tryAddToQueue = async () => { - this._logActivity("active", this.active); - this._logActivity("queue", this.queue.length); + this._logActivity("log", {"t": new Date().getTime(), "acivie": this.active, "queue": this.queue.length}); if (this.active < this.parallelism) { this.queue.push(data); resolve(); // Resolve the promise to signal that the data was queued From e8a50dcb774545c1a2be788c3d26ecba8a85a747 Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Wed, 21 Feb 2024 17:20:22 -0800 Subject: [PATCH 03/19] WIP: pulsarClientMock --- src/test/StatedWorkflow.test.js | 20 +++++ src/workflow/StatedWorkflow.js | 22 +++-- src/workflow/utils/PulsarMock.js | 140 +++++++++++++++++++++++++++++++ 3 files changed, 174 insertions(+), 8 deletions(-) create mode 100644 src/workflow/utils/PulsarMock.js diff --git a/src/test/StatedWorkflow.test.js b/src/test/StatedWorkflow.test.js index 82619d1..14be19d 100644 --- a/src/test/StatedWorkflow.test.js +++ b/src/test/StatedWorkflow.test.js @@ -797,6 +797,26 @@ if (isMacOS) { }) + test("Pulsar consumer data function mock client test", async () => { + const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'pubsub-data-function-pulsar-mock.yaml'); + const templateYaml = fs.readFileSync(yamlFilePath, 'utf8'); + let template = yaml.load(templateYaml); + + const {templateProcessor: tp} = await StatedWorkflow.newWorkflow(template); + // keep steps execution logs for debugging + tp.options = {'keepLogs': true, 'snapshot': {}}; + + await tp.initialize(); + + while (tp.output.farFarAway?.length + tp.output.nearBy?.length < 2) { + await new Promise(resolve => setTimeout(resolve, 50)); // Poll every 50ms + } + + expect(tp.output.interceptedMessages?.length).toBeGreaterThanOrEqual(2) + expect(tp.output.farFarAway?.length + tp.output.nearBy?.length).toEqual(2); + + }) + test("Pulsar consumer data function integration test", async () => { const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'pubsub-data-function-pulsar.yaml'); const templateYaml = fs.readFileSync(yamlFilePath, 'utf8'); diff --git a/src/workflow/StatedWorkflow.js b/src/workflow/StatedWorkflow.js index 4d29a84..1016138 100644 --- a/src/workflow/StatedWorkflow.js +++ b/src/workflow/StatedWorkflow.js @@ -31,6 +31,7 @@ import path from "path"; import {Delay} from "../test/TestTools.js" import {Snapshot} from "./Snapshot.js"; import {rateLimit} from "stated-js/dist/src/utils/rateLimit.js"; +import {PulsarClientMock} from "./utils/PulsarMock.js"; const writeFile = util.promisify(fs.writeFile); const basePath = path.join(process.cwd(), '.state'); @@ -184,13 +185,12 @@ export class StatedWorkflow { } } + createPulsarClientMock(params) { + if (this.pulsarClient) return; - ensureClient(params) { - if (!params || params.type == 'pulsar') { - this.createPulsarClient(params); - } else if (params.type == 'kafka') { - this.createKafkaClient(params); - } + this.pulsarClient = new PulsarClientMock({ + serviceUrl: 'pulsar://localhost:6650', + }); } createPulsarClient(params) { @@ -230,6 +230,10 @@ export class StatedWorkflow { this.publishKafka(params, clientParams); } else if(clientType==="pulsar") { this.publishPulsar(params, clientParams); + }else if(clientType === 'pulsarMock'){ + this.logger.debug(`publishing to pulsarMock using ${clientParams}`) + this.createPulsarClientMock(clientParams); + this.publishPulsar(params, clientParams); }else{ throw new Error(`Unsupported clientType: ${clientType}`); } @@ -327,8 +331,6 @@ export class StatedWorkflow { // }); - - if (source === 'http') { return this.onHttp(subscribeOptions); } @@ -506,6 +508,10 @@ export class StatedWorkflow { this.logger.debug(`subscribing to pulsar (default) using ${clientParams}`) this.createPulsarClient(clientParams); this.subscribePulsar(subscriptionParams); + }else if(clientType === 'pulsarMock'){ + this.logger.debug(`subscribing to pulsarMock using ${clientParams}`) + this.createPulsarClientMock(clientParams); + this.subscribePulsar(subscriptionParams); }else{ throw new Error(`unsupported client.type in ${StatedREPL.stringify(subscriptionParams)}`); } diff --git a/src/workflow/utils/PulsarMock.js b/src/workflow/utils/PulsarMock.js new file mode 100644 index 0000000..93c14c0 --- /dev/null +++ b/src/workflow/utils/PulsarMock.js @@ -0,0 +1,140 @@ +export class PulsarClientMock { + constructor() { + this.inMemoryStore = new Map(); + this.messageIdCounter = 0; + this.listeners = new Map(); // Add this to track listeners per topic + } + + async createProducer(config) { + const topic = config.topic; + if (!this.inMemoryStore.has(topic)) { + this.inMemoryStore.set(topic, []); + } + + return { + send: async (message) => { + const messageId = new MessageId(`message-${++this.messageIdCounter}`); + const messageInstance = new Message(topic, undefined, message.data, messageId); + const messages = this.inMemoryStore.get(topic) || []; + messages.push(messageInstance); + + // Notify listeners that a new message is available + const listeners = this.listeners.get(topic) || []; + if (listeners.length > 0) { + const listener = listeners.shift(); // Remove the listener from the queue + listener(messageInstance); // Resolve the listener's promise with the new message + } + + return { messageId: messageId.toString() }; + }, + close: async () => {}, + }; + } + + async subscribe(config) { + const topic = config.topic; + + return { + receive: () => { + return new Promise((resolve) => { + const messages = this.inMemoryStore.get(topic) || []; + if (messages.length > 0) { + // Immediately resolve with the next message if available + const message = messages.shift(); // Assuming FIFO delivery + resolve(message); + } else { + // No messages available, add listener to be resolved later + if (!this.listeners.has(topic)) { + this.listeners.set(topic, []); + } + this.listeners.get(topic).push(resolve); + } + }); + }, + acknowledge: async (message) => { + // Acknowledge logic here (not directly relevant to the blocking receive) + }, + acknowledgeId: async (messageId) => { + // AcknowledgeId logic here + }, + close: async () => {}, + }; + } + + close() { + return Promise.resolve(null); + } +} + +export class MessageId { + constructor(id) { + this.id = id; + } + + static earliest() { + return new MessageId("earliest"); + } + + static latest() { + return new MessageId("latest"); + } + + static deserialize(data) { + // Assuming the input data is a Buffer containing a string ID + return new MessageId(data.toString()); + } + + serialize() { + // Convert the ID to a Buffer + return Buffer.from(this.id); + } + + toString() { + return this.id; + } +} + +export class Message { + constructor(topicName, properties, data, messageId, publishTimestamp, eventTimestamp, redeliveryCount, partitionKey) { + this.topicName = topicName; + this.properties = properties; + this.data = data; + this.messageId = messageId; + this.publishTimestamp = publishTimestamp; + this.eventTimestamp = eventTimestamp; + this.redeliveryCount = redeliveryCount; + this.partitionKey = partitionKey; + } + + getTopicName() { + return this.topicName; + } + + getProperties() { + return this.properties; + } + + getData() { + return this.data; + } + + getMessageId() { + return this.messageId; + } + + getPublishTimestamp() { + return this.publishTimestamp; + } + + getEventTimestamp() { + return this.eventTimestamp; + } + + getRedeliveryCount() { + return this.redeliveryCount; + } + + getPartitionKey() { + return this.partitionKey; + } +} \ No newline at end of file From 62d7fc279076940588b78a47007bc8caeb5e9f3d Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Thu, 22 Feb 2024 16:16:23 -0800 Subject: [PATCH 04/19] store messages in a global static variables and add acknowledgement --- src/workflow/utils/PulsarMock.js | 82 +++++++++++++++++++++----------- 1 file changed, 55 insertions(+), 27 deletions(-) diff --git a/src/workflow/utils/PulsarMock.js b/src/workflow/utils/PulsarMock.js index 93c14c0..f64ffa7 100644 --- a/src/workflow/utils/PulsarMock.js +++ b/src/workflow/utils/PulsarMock.js @@ -1,29 +1,27 @@ export class PulsarClientMock { + static inMemoryStore = new Map(); // Static store to survive client restarts + static messageIdCounter = 0; // Global message ID counter + static listeners = new Map(); // Global listeners map + constructor() { - this.inMemoryStore = new Map(); - this.messageIdCounter = 0; - this.listeners = new Map(); // Add this to track listeners per topic + // No longer needed to initialize static properties in the constructor } async createProducer(config) { const topic = config.topic; - if (!this.inMemoryStore.has(topic)) { - this.inMemoryStore.set(topic, []); + if (!PulsarClientMock.inMemoryStore.has(topic)) { + PulsarClientMock.inMemoryStore.set(topic, []); } return { send: async (message) => { - const messageId = new MessageId(`message-${++this.messageIdCounter}`); - const messageInstance = new Message(topic, undefined, message.data, messageId); - const messages = this.inMemoryStore.get(topic) || []; - messages.push(messageInstance); + const messageId = new MessageId(`message-${++PulsarClientMock.messageIdCounter}`); + const messageInstance = new Message(topic, undefined, message.data, messageId, Date.now(), Date.now(), 0, ''); + const messages = PulsarClientMock.inMemoryStore.get(topic) || []; + messages.push({ message: messageInstance, visible: true }); // Notify listeners that a new message is available - const listeners = this.listeners.get(topic) || []; - if (listeners.length > 0) { - const listener = listeners.shift(); // Remove the listener from the queue - listener(messageInstance); // Resolve the listener's promise with the new message - } + PulsarClientMock.notifyListeners(topic); return { messageId: messageId.toString() }; }, @@ -37,30 +35,60 @@ export class PulsarClientMock { return { receive: () => { return new Promise((resolve) => { - const messages = this.inMemoryStore.get(topic) || []; - if (messages.length > 0) { - // Immediately resolve with the next message if available - const message = messages.shift(); // Assuming FIFO delivery - resolve(message); - } else { - // No messages available, add listener to be resolved later - if (!this.listeners.has(topic)) { - this.listeners.set(topic, []); + const tryResolve = () => { + const messages = PulsarClientMock.inMemoryStore.get(topic) || []; + const messageIndex = messages.findIndex(m => m.visible); + if (messageIndex !== -1) { + // Make the message invisible for a certain timeout + messages[messageIndex].visible = false; + setTimeout(() => { + messages[messageIndex].visible = true; + PulsarClientMock.notifyListeners(topic); + }, 30000); // 30 seconds timeout + resolve(messages[messageIndex].message); + } else { + // No visible messages available, add listener to be resolved later + if (!PulsarClientMock.listeners.has(topic)) { + PulsarClientMock.listeners.set(topic, []); + } + PulsarClientMock.listeners.get(topic).push(tryResolve); } - this.listeners.get(topic).push(resolve); - } + }; + + tryResolve(); }); }, acknowledge: async (message) => { - // Acknowledge logic here (not directly relevant to the blocking receive) + const messages = PulsarClientMock.inMemoryStore.get(topic) || []; + const messageIndex = messages.findIndex(m => m.message.messageId.id === message.messageId.id); + if (messageIndex !== -1) { + messages.splice(messageIndex, 1); // Remove the acknowledged message + } }, acknowledgeId: async (messageId) => { - // AcknowledgeId logic here + const messages = PulsarClientMock.inMemoryStore.get(topic) || []; + const messageIndex = messages.findIndex(m => m.message.messageId.id === messageId.id); + if (messageIndex !== -1) { + messages.splice(messageIndex, 1); // Remove the acknowledged message + } }, close: async () => {}, }; } + static clear() { + PulsarClientMock.inMemoryStore.clear(); + PulsarClientMock.listeners.clear(); + } + + static notifyListeners(topic) { + const listeners = PulsarClientMock.listeners.get(topic) || []; + while (listeners.length > 0) { + const listener = listeners.shift(); // Remove the listener from the queue + listener(); // Try resolving a listener with any newly available message + } + } + close() { return Promise.resolve(null); } From 397a77bb9d7028113602b88192772f8d63e51cb3 Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Fri, 23 Feb 2024 10:42:11 -0800 Subject: [PATCH 05/19] move PulsarMock to test, update snapshot test --- example/backpressure-wf.yaml | 2 +- src/{workflow/utils => test}/PulsarMock.js | 0 src/test/StatedWorkflow.test.js | 64 ++++++++++++---------- src/workflow/StatedWorkflow.js | 2 +- 4 files changed, 36 insertions(+), 32 deletions(-) rename src/{workflow/utils => test}/PulsarMock.js (100%) diff --git a/example/backpressure-wf.yaml b/example/backpressure-wf.yaml index 61fb63f..dc946f1 100644 --- a/example/backpressure-wf.yaml +++ b/example/backpressure-wf.yaml @@ -15,7 +15,7 @@ subscribeResidents: getResidentsWorkflow: function: /${ function($planetInfo){ $planetInfo ~> $serial([extractResidents, fetchResidents]) } } extractResidents: - function: /${ function($planet){( $sleep($random() * 100) ; $planet.residents.($fetch($).json()) )} } + function: /${ function($planet){( $sleep($random() * 100) ; $planet.residents.($fetch($).json()) )} } # add a random delay fetchResidents: function: /${ function($residents){$residents.($set('/residents/-',{'name':$.name, 'url':$.url}))} } residents: [ ] diff --git a/src/workflow/utils/PulsarMock.js b/src/test/PulsarMock.js similarity index 100% rename from src/workflow/utils/PulsarMock.js rename to src/test/PulsarMock.js diff --git a/src/test/StatedWorkflow.test.js b/src/test/StatedWorkflow.test.js index 14be19d..40c4405 100644 --- a/src/test/StatedWorkflow.test.js +++ b/src/test/StatedWorkflow.test.js @@ -776,8 +776,15 @@ test.skip("Template Data Change Callback with rate limit", async () => { }); -const isMacOS = process.platform === 'darwin'; -if (isMacOS) { +/** + * Pulsar Integration Tests + * + * 1. start docker-compose + * docker-compose -f docker/docker-compose.yaml up -d + * 2. run the tests + * ENABLE_INTEGRATION_TESTS=true yarn test StatedWorkflow.test.js + */ +if (process.env.ENABLE_INTEGRATION_TESTS) { test("Pulsar consumer integration test", async () => { const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'pubsub-pulsar.yaml'); const templateYaml = fs.readFileSync(yamlFilePath, 'utf8'); @@ -922,22 +929,29 @@ test("backpressure due to max parallelism", async () => { }); +/** + * This test validates that the workflow can be recovered from a snapshot. + */ test("serial workflow with backpressure", async () => { + // Logging function to avoid repetition of console.log with date stamps + const logWithDate = (message) => { + console.log(`${new Date().toISOString()}: ${message}`); + }; // Load the YAML from the file const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'backpressure-wf.yaml'); const templateYaml = fs.readFileSync(yamlFilePath, 'utf8'); var template = yaml.load(templateYaml); const sw = await StatedWorkflow.newWorkflow(template); - const {templateProcessor:tp} = sw; + const {templateProcessor: tp} = sw; tp.options = {'snapshot': {'snapshotIntervalSeconds': 0.01}} const defaultSnapshotPath = path.join(__dirname, '../', '../','defaultSnapshot.json'); - // make sure default snapshot is deleted before the test + // Make sure default snapshot is deleted before the test try { await unlink(defaultSnapshotPath); - console.log(`${new Date().toISOString()}: Deleted previous default snapshot file: ${defaultSnapshotPath}`) + logWithDate(`Deleted previous default snapshot file: ${defaultSnapshotPath}`); } catch (e) { if (e.code !== 'ENOENT') { throw e; @@ -945,52 +959,42 @@ test("serial workflow with backpressure", async () => { } await tp.initialize(); - console.log(`${new Date().toISOString()}: initialized stated workflow template...`); + logWithDate("Initialized stated workflow template..."); let anyResidentsSnapshotted = false; let snapshot; - while (!anyResidentsSnapshotted) { + // Wait for the snapshot file to include at least 10 residents + while (!anyResidentsSnapshotted) { try { const snapshotContent = fs.readFileSync(defaultSnapshotPath, 'utf8'); snapshot = JSON.parse(snapshotContent); - console.log(`${new Date().toISOString()}: Snapshot has ${snapshot.output.residents.length} residents`); + logWithDate(`Snapshot has ${snapshot.output.residents.length} residents`); if (snapshot.output?.residents?.length > 10) { anyResidentsSnapshotted = true; break; } } catch (e) { - console.log(`${new Date().toISOString()}: Error checking snapshotted residents: ${e.message}`); + logWithDate(`Error checking snapshot residents: ${e.message}`); } - await new Promise(resolve => setTimeout(resolve, 1000)); // Poll every 50ms + await new Promise(resolve => setTimeout(resolve, 1000)); // Poll every 1s } - // kill the wf - console.log(`${new Date().toISOString()}: Stopping stated workflow...`); + // Kill the wf + logWithDate("Stopping stated workflow..."); await sw.close(); - console.log(`${new Date().toISOString()}: Stopped stated workflow, with ${tp.output.residents.length} residents`); - - - // restore from snapshot - // create a new template object - // var template = yaml.load(templateYaml); - // read the snapshot + logWithDate(`Stopped stated workflow, with ${tp.output.residents.length} residents`); - console.log(`${new Date().toISOString()}: Recovering from a snapshot with ${snapshot.output.residents.length} residents`) + logWithDate(`Recovering from a snapshot with ${snapshot.output.residents.length} residents`); await tp.initialize(snapshot.template, '/', snapshot.output); - // 82 unique residents - - // tp.output?.residents.reduce((counts, o)=>{ counts[o.name] = (counts[o.name] || 0) + 1; return counts }, {}).size(); - + // Calculate unique residents let uniqResidents = 0; do { - uniqResidents = Object.keys(tp.output?.residents.reduce((counts, o)=>{ counts[o.name] = (counts[o.name] || 0) + 1; return counts }, {})).length - console.log(`${new Date().toISOString()}: Got ${uniqResidents} unique residents processed`); - await new Promise(resolve => setTimeout(resolve, 1000)); // Poll every 50ms + uniqResidents = Object.keys(tp.output?.residents.reduce((counts, o)=>{ counts[o.name] = (counts[o.name] || 0) + 1; return counts }, {})).length; + logWithDate(`Got ${uniqResidents} unique residents processed`); + await new Promise(resolve => setTimeout(resolve, 1000)); } while (uniqResidents < 82) - console.log(`${new Date().toISOString()}: We got ${uniqResidents} unique residents processed with ${tp.output.residents.length} total residents`); + logWithDate(`We got ${uniqResidents} unique residents processed with ${tp.output.residents.length} total residents`); }, 100000); - - diff --git a/src/workflow/StatedWorkflow.js b/src/workflow/StatedWorkflow.js index 1016138..a18582b 100644 --- a/src/workflow/StatedWorkflow.js +++ b/src/workflow/StatedWorkflow.js @@ -31,7 +31,7 @@ import path from "path"; import {Delay} from "../test/TestTools.js" import {Snapshot} from "./Snapshot.js"; import {rateLimit} from "stated-js/dist/src/utils/rateLimit.js"; -import {PulsarClientMock} from "./utils/PulsarMock.js"; +import {PulsarClientMock} from "../test/PulsarMock.js"; const writeFile = util.promisify(fs.writeFile); const basePath = path.join(process.cwd(), '.state'); From 24a4058eeb70028a52759fd6e4a9a864ac968552 Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Fri, 23 Feb 2024 10:47:37 -0800 Subject: [PATCH 06/19] test to use pulsarMock --- example/pubsub-data-function-pulsar-mock.yaml | 51 +++++++++++++++++++ src/test/StatedWorkflow.test.js | 41 +++++++-------- 2 files changed, 72 insertions(+), 20 deletions(-) create mode 100644 example/pubsub-data-function-pulsar-mock.yaml diff --git a/example/pubsub-data-function-pulsar-mock.yaml b/example/pubsub-data-function-pulsar-mock.yaml new file mode 100644 index 0000000..559ed7e --- /dev/null +++ b/example/pubsub-data-function-pulsar-mock.yaml @@ -0,0 +1,51 @@ +# Droid R2-D2 is sending messages to the Rebel Alliance's communication channel +produceParams: + type: "rebel-comm-channel" + data: | + ${ + function(){ + {'message': 'Rebel Fleet Coordinates', 'location': $random(), 'sender': $floor($random()*10)+1 } + } + } + client: + type: pulsarMock +# Droid C-3PO will intercept and log each received message for the Rebel Alliance +subscribeParams: #parameters for subscribing to a holocomm transmission + source: cloudEvent + type: /${ produceParams.type } # subscribe to the same channel as R2-D2 to intercept messages + to: /${processMessageWorkflow} + subscriberId: protocolDroid + initialPosition: latest + client: + type: pulsarMock +processMessageWorkflow: /${ function($message){ $message ~> $serial([step1, step2, step3]) }} +step1: + name: logMessage + function: /${ function($e){( $set('/interceptedMessages/-', $e); $e)} } +step2: + name: identifySender + function: | + ${ function($e){( + { + 'sender': $e.sender, + 'name': $fetch('https://swapi.dev/api/people/'& $e.sender).json().name, + 'location': $e.location + } + )} } +step3: + name: classifyLocation + function: | + /${ function($e){( + $e.location > 0.5 ? $set('/farFarAway/-', $e) : $set('/nearBy/-', $e); $e + )} } +# Activates R2-D2's message transmission function every 500 milliseconds +send: "${ $setInterval( function(){ $publish(produceParams)}, 500) }" +# Activates C-3PO's message interception function +recv$: $subscribe(subscribeParams) +# interceptedMessages is where C-3PO will store the results of message decoding +interceptedMessages: [ ] +farFarAway: [ ] +nearBy: [ ] +# This condition stops the transmission operation when interceptedMessages has 10 elements +stop$: ($count(interceptedMessages)>=2?($clearInterval(send);'transmissionAccomplished'):'transmissionOngoing') +recover$!: $recover(processMessageWorkflow) diff --git a/src/test/StatedWorkflow.test.js b/src/test/StatedWorkflow.test.js index 40c4405..df51553 100644 --- a/src/test/StatedWorkflow.test.js +++ b/src/test/StatedWorkflow.test.js @@ -804,26 +804,6 @@ if (process.env.ENABLE_INTEGRATION_TESTS) { }) - test("Pulsar consumer data function mock client test", async () => { - const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'pubsub-data-function-pulsar-mock.yaml'); - const templateYaml = fs.readFileSync(yamlFilePath, 'utf8'); - let template = yaml.load(templateYaml); - - const {templateProcessor: tp} = await StatedWorkflow.newWorkflow(template); - // keep steps execution logs for debugging - tp.options = {'keepLogs': true, 'snapshot': {}}; - - await tp.initialize(); - - while (tp.output.farFarAway?.length + tp.output.nearBy?.length < 2) { - await new Promise(resolve => setTimeout(resolve, 50)); // Poll every 50ms - } - - expect(tp.output.interceptedMessages?.length).toBeGreaterThanOrEqual(2) - expect(tp.output.farFarAway?.length + tp.output.nearBy?.length).toEqual(2); - - }) - test("Pulsar consumer data function integration test", async () => { const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'pubsub-data-function-pulsar.yaml'); const templateYaml = fs.readFileSync(yamlFilePath, 'utf8'); @@ -998,3 +978,24 @@ test("serial workflow with backpressure", async () => { logWithDate(`We got ${uniqResidents} unique residents processed with ${tp.output.residents.length} total residents`); }, 100000); + + +test("Pulsar consumer data function mock client test", async () => { + const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'pubsub-data-function-pulsar-mock.yaml'); + const templateYaml = fs.readFileSync(yamlFilePath, 'utf8'); + let template = yaml.load(templateYaml); + + const {templateProcessor: tp} = await StatedWorkflow.newWorkflow(template); + // keep steps execution logs for debugging + tp.options = {'keepLogs': true, 'snapshot': {}}; + + await tp.initialize(); + + while (tp.output.farFarAway?.length + tp.output.nearBy?.length < 2) { + await new Promise(resolve => setTimeout(resolve, 50)); // Poll every 50ms + } + + expect(tp.output.interceptedMessages?.length).toBeGreaterThanOrEqual(2) + expect(tp.output.farFarAway?.length + tp.output.nearBy?.length).toEqual(2); + +}) From 6fa94b8e6679320afeee4ea26d20c1101453642d Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Fri, 23 Feb 2024 10:56:27 -0800 Subject: [PATCH 07/19] test names++ --- example/{backpressure-wf.yaml => inhabitants-with-delay.yaml} | 0 ...data-function-pulsar-mock.yaml => rebelCommunication.yaml} | 0 src/test/StatedWorkflow.test.js | 4 ++-- 3 files changed, 2 insertions(+), 2 deletions(-) rename example/{backpressure-wf.yaml => inhabitants-with-delay.yaml} (100%) rename example/{pubsub-data-function-pulsar-mock.yaml => rebelCommunication.yaml} (100%) diff --git a/example/backpressure-wf.yaml b/example/inhabitants-with-delay.yaml similarity index 100% rename from example/backpressure-wf.yaml rename to example/inhabitants-with-delay.yaml diff --git a/example/pubsub-data-function-pulsar-mock.yaml b/example/rebelCommunication.yaml similarity index 100% rename from example/pubsub-data-function-pulsar-mock.yaml rename to example/rebelCommunication.yaml diff --git a/src/test/StatedWorkflow.test.js b/src/test/StatedWorkflow.test.js index df51553..15b23ed 100644 --- a/src/test/StatedWorkflow.test.js +++ b/src/test/StatedWorkflow.test.js @@ -919,7 +919,7 @@ test("serial workflow with backpressure", async () => { }; // Load the YAML from the file - const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'backpressure-wf.yaml'); + const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'inhabitants-with-delay.yaml'); const templateYaml = fs.readFileSync(yamlFilePath, 'utf8'); var template = yaml.load(templateYaml); const sw = await StatedWorkflow.newWorkflow(template); @@ -981,7 +981,7 @@ test("serial workflow with backpressure", async () => { test("Pulsar consumer data function mock client test", async () => { - const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'pubsub-data-function-pulsar-mock.yaml'); + const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'rebelCommunication.yaml'); const templateYaml = fs.readFileSync(yamlFilePath, 'utf8'); let template = yaml.load(templateYaml); From 86542291edcc5bfa1a56e94636113fe2ae789694 Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Fri, 23 Feb 2024 11:31:25 -0800 Subject: [PATCH 08/19] Improve test logging and check to run integration tests --- src/test/StatedWorkflow.test.js | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/test/StatedWorkflow.test.js b/src/test/StatedWorkflow.test.js index 15b23ed..10ab74b 100644 --- a/src/test/StatedWorkflow.test.js +++ b/src/test/StatedWorkflow.test.js @@ -22,6 +22,7 @@ import {EnhancedPrintFunc} from "./TestTools.js"; import {rateLimit} from "stated-js/dist/src/utils/rateLimit.js"; import util from "util"; import {fn} from "jest-mock"; +import {PulsarClientMock} from "./PulsarMock.js"; const __filename = fileURLToPath(import.meta.url); @@ -781,10 +782,10 @@ test.skip("Template Data Change Callback with rate limit", async () => { * * 1. start docker-compose * docker-compose -f docker/docker-compose.yaml up -d - * 2. run the tests + * 2. run the tests with ENABLE_INTEGRATION_TESTS set to "true" * ENABLE_INTEGRATION_TESTS=true yarn test StatedWorkflow.test.js */ -if (process.env.ENABLE_INTEGRATION_TESTS) { +if (process.env.ENABLE_INTEGRATION_TESTS === "true") { test("Pulsar consumer integration test", async () => { const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'pubsub-pulsar.yaml'); const templateYaml = fs.readFileSync(yamlFilePath, 'utf8'); @@ -980,7 +981,7 @@ test("serial workflow with backpressure", async () => { }, 100000); -test("Pulsar consumer data function mock client test", async () => { +test("subscribePulsar with pulsarMock client", async () => { const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'rebelCommunication.yaml'); const templateYaml = fs.readFileSync(yamlFilePath, 'utf8'); let template = yaml.load(templateYaml); @@ -998,4 +999,6 @@ test("Pulsar consumer data function mock client test", async () => { expect(tp.output.interceptedMessages?.length).toBeGreaterThanOrEqual(2) expect(tp.output.farFarAway?.length + tp.output.nearBy?.length).toEqual(2); + PulsarClientMock.inMemoryStore + }) From f6fcecc6e321918e65616349fc55a0b5bc3cf0d7 Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Fri, 23 Feb 2024 11:52:58 -0800 Subject: [PATCH 09/19] remove callback to save template file --- src/test/StatedWorkflow.test.js | 3 --- src/workflow/StatedWorkflow.js | 7 ++++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/test/StatedWorkflow.test.js b/src/test/StatedWorkflow.test.js index 10ab74b..075a8fd 100644 --- a/src/test/StatedWorkflow.test.js +++ b/src/test/StatedWorkflow.test.js @@ -998,7 +998,4 @@ test("subscribePulsar with pulsarMock client", async () => { expect(tp.output.interceptedMessages?.length).toBeGreaterThanOrEqual(2) expect(tp.output.farFarAway?.length + tp.output.nearBy?.length).toEqual(2); - - PulsarClientMock.inMemoryStore - }) diff --git a/src/workflow/StatedWorkflow.js b/src/workflow/StatedWorkflow.js index a18582b..70dac14 100644 --- a/src/workflow/StatedWorkflow.js +++ b/src/workflow/StatedWorkflow.js @@ -384,11 +384,12 @@ export class StatedWorkflow { resolve = _resolve; //we assign our resolve variable that is declared outside this promise so that our onDataChange callbacks can use it }); + // TODO: switch to pass acknowledgement callback to dispatch this.templateProcessor.setDataChangeCallback('/', async (data, jsonPtrs, removed) => { for (let jsonPtr of jsonPtrs) { - if (/^\/step\d+\/log\/.*$/.test(jsonPtr)) { - fs.writeFileSync(path.join(basePath,'template.json') , StatedREPL.stringify(data), 'utf8'); - } + // if (/^\/step\d+\/log\/.*$/.test(jsonPtr)) { + // fs.writeFileSync(path.join(basePath,'template.json') , StatedREPL.stringify(data), 'utf8'); + // } if (/^\/step1\/log\/.*$/.test(jsonPtr)) { // TODO: await persist the step const dataThatChanged = jp.get(data, jsonPtr); From 362bef4e034eeaa09cf3b6d4676792ba38e1830c Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Fri, 23 Feb 2024 12:41:55 -0800 Subject: [PATCH 10/19] ack test++ --- src/test/PulsarMock.js | 75 ++++++++++++++++++++++++--------- src/test/StatedWorkflow.test.js | 8 ++++ src/workflow/StatedWorkflow.js | 3 +- 3 files changed, 64 insertions(+), 22 deletions(-) diff --git a/src/test/PulsarMock.js b/src/test/PulsarMock.js index f64ffa7..a86612a 100644 --- a/src/test/PulsarMock.js +++ b/src/test/PulsarMock.js @@ -1,14 +1,18 @@ export class PulsarClientMock { - static inMemoryStore = new Map(); // Static store to survive client restarts - static messageIdCounter = 0; // Global message ID counter - static listeners = new Map(); // Global listeners map + static inMemoryStore = new Map(); // Static store to hold messages for each topic + static messageIdCounter = 0; // Global counter to generate unique message IDs + static listeners = new Map(); // Global map to hold listeners for message consumption + static ackTimeout = 30000; // Default acknowledgment timeout (in milliseconds) + static acknowledgedMessages = new Map(); // Static store to hold acknowledged messages for each topic - constructor() { - // No longer needed to initialize static properties in the constructor + // Allows configuration of the acknowledgment timeout + static configureAckTimeout(timeout) { + this.ackTimeout = timeout; } async createProducer(config) { const topic = config.topic; + // Ensure a message queue exists for the topic if (!PulsarClientMock.inMemoryStore.has(topic)) { PulsarClientMock.inMemoryStore.set(topic, []); } @@ -20,7 +24,6 @@ export class PulsarClientMock { const messages = PulsarClientMock.inMemoryStore.get(topic) || []; messages.push({ message: messageInstance, visible: true }); - // Notify listeners that a new message is available PulsarClientMock.notifyListeners(topic); return { messageId: messageId.toString() }; @@ -39,15 +42,16 @@ export class PulsarClientMock { const messages = PulsarClientMock.inMemoryStore.get(topic) || []; const messageIndex = messages.findIndex(m => m.visible); if (messageIndex !== -1) { - // Make the message invisible for a certain timeout + // Make the message temporarily invisible to simulate message locking messages[messageIndex].visible = false; setTimeout(() => { + // Make the message visible again after the timeout messages[messageIndex].visible = true; PulsarClientMock.notifyListeners(topic); - }, 30000); // 30 seconds timeout + }, PulsarClientMock.ackTimeout); resolve(messages[messageIndex].message); } else { - // No visible messages available, add listener to be resolved later + // No visible messages available, wait for new messages if (!PulsarClientMock.listeners.has(topic)) { PulsarClientMock.listeners.set(topic, []); } @@ -59,33 +63,62 @@ export class PulsarClientMock { }); }, acknowledge: async (message) => { - const messages = PulsarClientMock.inMemoryStore.get(topic) || []; - const messageIndex = messages.findIndex(m => m.message.messageId.id === message.messageId.id); - if (messageIndex !== -1) { - messages.splice(messageIndex, 1); // Remove the acknowledged message - } + PulsarClientMock.acknowledgeMessage(topic, message.messageId.id); }, acknowledgeId: async (messageId) => { - const messages = PulsarClientMock.inMemoryStore.get(topic) || []; - const messageIndex = messages.findIndex(m => m.message.messageId.id === messageId.id); - if (messageIndex !== -1) { - messages.splice(messageIndex, 1); // Remove the acknowledged message - } + PulsarClientMock.acknowledgeMessage(topic, messageId.id); }, close: async () => {}, }; } + static acknowledgeMessage(topic, messageId) { + const messages = PulsarClientMock.inMemoryStore.get(topic) || []; + const messageIndex = messages.findIndex(m => m.message.messageId.id === messageId); + if (messageIndex !== -1) { + const [acknowledgedMessage] = messages.splice(messageIndex, 1); // Remove and get the acknowledged message + // Store acknowledged message + if (!this.acknowledgedMessages.has(topic)) { + this.acknowledgedMessages.set(topic, []); + } + this.acknowledgedMessages.get(topic).push(acknowledgedMessage.message); + } + } + + static getTopics() { + return Array.from(PulsarClientMock.inMemoryStore.keys()); + } + + // Method to return statistics for the client + static getStats(topic) { + const messages = this.inMemoryStore.get(topic) || []; + const acknowledged = this.acknowledgedMessages.get(topic) || []; + const inFlight = messages.filter(m => !m.visible).length; + const queueLength = messages.length + acknowledged.length; + + return { + acknowledgedCount: acknowledged.length, + inFlightCount: inFlight, + queueLength, + }; + } + + // Method to return all acknowledged messages for a topic + static getAcknowledgedMessages(topic) { + return this.acknowledgedMessages.get(topic) || []; + } + static clear() { PulsarClientMock.inMemoryStore.clear(); PulsarClientMock.listeners.clear(); + PulsarClientMock.acknowledgedMessages.clear(); } static notifyListeners(topic) { const listeners = PulsarClientMock.listeners.get(topic) || []; while (listeners.length > 0) { - const listener = listeners.shift(); // Remove the listener from the queue - listener(); // Try resolving a listener with any newly available message + const listener = listeners.shift(); + listener(); } } diff --git a/src/test/StatedWorkflow.test.js b/src/test/StatedWorkflow.test.js index 075a8fd..f6845df 100644 --- a/src/test/StatedWorkflow.test.js +++ b/src/test/StatedWorkflow.test.js @@ -23,6 +23,7 @@ import {rateLimit} from "stated-js/dist/src/utils/rateLimit.js"; import util from "util"; import {fn} from "jest-mock"; import {PulsarClientMock} from "./PulsarMock.js"; +import Pulsar from "pulsar-client"; const __filename = fileURLToPath(import.meta.url); @@ -998,4 +999,11 @@ test("subscribePulsar with pulsarMock client", async () => { expect(tp.output.interceptedMessages?.length).toBeGreaterThanOrEqual(2) expect(tp.output.farFarAway?.length + tp.output.nearBy?.length).toEqual(2); + + while (!Array.isArray(PulsarClientMock.getAcknowledgedMessages(PulsarClientMock.getTopics()[0])) + || PulsarClientMock.getAcknowledgedMessages(PulsarClientMock.getTopics()[0]).length < 10) { + let ackedMessages = PulsarClientMock.getAcknowledgedMessages(PulsarClientMock.getTopics()[0]); + console.log("Messages acknowledged: ", ackedMessages); + await new Promise(resolve => setTimeout(resolve, 500)); // Poll every 50ms + } }) diff --git a/src/workflow/StatedWorkflow.js b/src/workflow/StatedWorkflow.js index 70dac14..b045e7e 100644 --- a/src/workflow/StatedWorkflow.js +++ b/src/workflow/StatedWorkflow.js @@ -413,7 +413,8 @@ export class StatedWorkflow { console.error("Error receiving or dispatching message:", error); } finally { if (data !== undefined) { - await this.latch; + // FIXME: remove below, we should be acknowledging in the onDataChange callback + // await this.latch; consumer.acknowledge(data); } From a7bbcc93dd7302762d132fc607eaeabe881a0043 Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Fri, 23 Feb 2024 13:16:57 -0800 Subject: [PATCH 11/19] acknowledge from dataChangeCallback --- example/rebelCommunication.yaml | 7 ++++--- src/test/StatedWorkflow.test.js | 29 ++++++++++++++++++----------- src/workflow/StatedWorkflow.js | 14 +++++++------- 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/example/rebelCommunication.yaml b/example/rebelCommunication.yaml index 559ed7e..1de7ec0 100644 --- a/example/rebelCommunication.yaml +++ b/example/rebelCommunication.yaml @@ -15,6 +15,7 @@ subscribeParams: #parameters for subscribing to a holocomm transmission type: /${ produceParams.type } # subscribe to the same channel as R2-D2 to intercept messages to: /${processMessageWorkflow} subscriberId: protocolDroid + parallelism: 5 initialPosition: latest client: type: pulsarMock @@ -38,8 +39,8 @@ step3: /${ function($e){( $e.location > 0.5 ? $set('/farFarAway/-', $e) : $set('/nearBy/-', $e); $e )} } -# Activates R2-D2's message transmission function every 500 milliseconds -send: "${ $setInterval( function(){ $publish(produceParams)}, 500) }" +# Activates R2-D2's message transmission function every 50 milliseconds +send: "${ $setInterval( function(){ $publish(produceParams)}, 50) }" # Activates C-3PO's message interception function recv$: $subscribe(subscribeParams) # interceptedMessages is where C-3PO will store the results of message decoding @@ -47,5 +48,5 @@ interceptedMessages: [ ] farFarAway: [ ] nearBy: [ ] # This condition stops the transmission operation when interceptedMessages has 10 elements -stop$: ($count(interceptedMessages)>=2?($clearInterval(send);'transmissionAccomplished'):'transmissionOngoing') +stop$: ($count(interceptedMessages)>=10?($clearInterval(send);'transmissionAccomplished'):'transmissionOngoing') recover$!: $recover(processMessageWorkflow) diff --git a/src/test/StatedWorkflow.test.js b/src/test/StatedWorkflow.test.js index f6845df..c415bc2 100644 --- a/src/test/StatedWorkflow.test.js +++ b/src/test/StatedWorkflow.test.js @@ -979,7 +979,7 @@ test("serial workflow with backpressure", async () => { } while (uniqResidents < 82) logWithDate(`We got ${uniqResidents} unique residents processed with ${tp.output.residents.length} total residents`); -}, 100000); +}, 10000); test("subscribePulsar with pulsarMock client", async () => { @@ -991,19 +991,26 @@ test("subscribePulsar with pulsarMock client", async () => { // keep steps execution logs for debugging tp.options = {'keepLogs': true, 'snapshot': {}}; + PulsarClientMock.ackTimeout = 2000; // 2 seconds + await tp.initialize(); - while (tp.output.farFarAway?.length + tp.output.nearBy?.length < 2) { - await new Promise(resolve => setTimeout(resolve, 50)); // Poll every 50ms + while (tp.output.farFarAway?.length + tp.output.nearBy?.length < 10) { + console.log(`Waiting for at least 10 messages. So far received from farFarAway: ${tp.output.farFarAway?.length}, from nearBy: ${tp.output.nearBy?.length}`); + await new Promise(resolve => setTimeout(resolve, 500)); // Poll every 50ms } + console.log(`Received 10 or more messages. Received from farFarAway: ${tp.output.farFarAway?.length}, from nearBy: ${tp.output.nearBy?.length}`); - expect(tp.output.interceptedMessages?.length).toBeGreaterThanOrEqual(2) - expect(tp.output.farFarAway?.length + tp.output.nearBy?.length).toEqual(2); + expect(tp.output.interceptedMessages?.length).toBeGreaterThanOrEqual(10) + expect(tp.output.farFarAway?.length + tp.output.nearBy?.length).toBeGreaterThanOrEqual(10); - while (!Array.isArray(PulsarClientMock.getAcknowledgedMessages(PulsarClientMock.getTopics()[0])) - || PulsarClientMock.getAcknowledgedMessages(PulsarClientMock.getTopics()[0]).length < 10) { - let ackedMessages = PulsarClientMock.getAcknowledgedMessages(PulsarClientMock.getTopics()[0]); - console.log("Messages acknowledged: ", ackedMessages); + console.log("waiting for at least 10 messages to be acknowledged"); + const topic = PulsarClientMock.getTopics()[0]; // we use only one topic in the test + while (!Array.isArray(PulsarClientMock.getAcknowledgedMessages(topic)) + || PulsarClientMock.getAcknowledgedMessages(topic).length < 10) { + let ackedMessages = PulsarClientMock.getAcknowledgedMessages(topic); + console.log(`PulsarMock topic ${topic} stats: ${StatedREPL.stringify(PulsarClientMock.getStats(topic))}`); await new Promise(resolve => setTimeout(resolve, 500)); // Poll every 50ms - } -}) + }; + console.log(`PulsarMock topic ${topic} stats: ${StatedREPL.stringify(PulsarClientMock.getStats(topic))}`); +}, 10000) diff --git a/src/workflow/StatedWorkflow.js b/src/workflow/StatedWorkflow.js index b045e7e..347ed65 100644 --- a/src/workflow/StatedWorkflow.js +++ b/src/workflow/StatedWorkflow.js @@ -362,17 +362,17 @@ export class StatedWorkflow { }); // Store the consumer in the map this.consumers.set(type, consumer); - let data; + let message; let countdown = maxConsume; while (true) { try { - data = await consumer.receive(); + message = await consumer.receive(); let obj; let messageId; try { - const str = data.getData().toString(); - messageId = data.getMessageId(); + const str = message.getData().toString(); + messageId = message.getMessageId(); obj = JSON.parse(str); } catch (error) { console.error("unable to parse data to json:", error); @@ -396,7 +396,7 @@ export class StatedWorkflow { if (dataThatChanged.start !== undefined && dataThatChanged.end === undefined) { resolve(); // consumer.acknowledgeId(data); - consumer.acknowledgeId(messageId); + consumer.acknowledge(message); } } } @@ -412,10 +412,10 @@ export class StatedWorkflow { } catch (error) { console.error("Error receiving or dispatching message:", error); } finally { - if (data !== undefined) { + if (message !== undefined) { // FIXME: remove below, we should be acknowledging in the onDataChange callback // await this.latch; - consumer.acknowledge(data); + // consumer.acknowledge(data); } if (this.pulsarClient === undefined) { From a0111a111624ad926a6a7db64ef250ada78b3dd6 Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Fri, 23 Feb 2024 13:37:55 -0800 Subject: [PATCH 12/19] move acks from setDataCahangeCallback to dispatch callbacks --- src/test/StatedWorkflow.test.js | 2 -- src/workflow/StatedWorkflow.js | 38 +++++++----------------------- src/workflow/WorkflowDispatcher.js | 16 ++++++++++--- 3 files changed, 21 insertions(+), 35 deletions(-) diff --git a/src/test/StatedWorkflow.test.js b/src/test/StatedWorkflow.test.js index c415bc2..13e12da 100644 --- a/src/test/StatedWorkflow.test.js +++ b/src/test/StatedWorkflow.test.js @@ -991,8 +991,6 @@ test("subscribePulsar with pulsarMock client", async () => { // keep steps execution logs for debugging tp.options = {'keepLogs': true, 'snapshot': {}}; - PulsarClientMock.ackTimeout = 2000; // 2 seconds - await tp.initialize(); while (tp.output.farFarAway?.length + tp.output.nearBy?.length < 10) { diff --git a/src/workflow/StatedWorkflow.js b/src/workflow/StatedWorkflow.js index 347ed65..30c3b08 100644 --- a/src/workflow/StatedWorkflow.js +++ b/src/workflow/StatedWorkflow.js @@ -368,12 +368,11 @@ export class StatedWorkflow { while (true) { try { message = await consumer.receive(); - let obj; + let messageData; let messageId; try { - const str = message.getData().toString(); - messageId = message.getMessageId(); - obj = JSON.parse(str); + const messageDataStr = message.getData().toString(); + messageData = JSON.parse(messageDataStr); } catch (error) { console.error("unable to parse data to json:", error); // TODO - should we acknowledge the message here? @@ -384,40 +383,19 @@ export class StatedWorkflow { resolve = _resolve; //we assign our resolve variable that is declared outside this promise so that our onDataChange callbacks can use it }); - // TODO: switch to pass acknowledgement callback to dispatch - this.templateProcessor.setDataChangeCallback('/', async (data, jsonPtrs, removed) => { - for (let jsonPtr of jsonPtrs) { - // if (/^\/step\d+\/log\/.*$/.test(jsonPtr)) { - // fs.writeFileSync(path.join(basePath,'template.json') , StatedREPL.stringify(data), 'utf8'); - // } - if (/^\/step1\/log\/.*$/.test(jsonPtr)) { - // TODO: await persist the step - const dataThatChanged = jp.get(data, jsonPtr); - if (dataThatChanged.start !== undefined && dataThatChanged.end === undefined) { - resolve(); - // consumer.acknowledgeId(data); - consumer.acknowledge(message); - } - } - } - - }); - + // we create a callback to acknowledge the message + const dataAckCallback = async () => { + return consumer.acknowledge(message); + } //if the dispatchers max parallelism is reached this loop should block, which is why we await - await this.workflowDispatcher.dispatchToAllSubscribers(type, obj); + await this.workflowDispatcher.dispatchToAllSubscribers(type, messageData, dataAckCallback); if(countdown && --countdown===0){ break; } } catch (error) { console.error("Error receiving or dispatching message:", error); } finally { - if (message !== undefined) { - // FIXME: remove below, we should be acknowledging in the onDataChange callback - // await this.latch; - // consumer.acknowledge(data); - } - if (this.pulsarClient === undefined) { break; } diff --git a/src/workflow/WorkflowDispatcher.js b/src/workflow/WorkflowDispatcher.js index c25fac5..591a889 100644 --- a/src/workflow/WorkflowDispatcher.js +++ b/src/workflow/WorkflowDispatcher.js @@ -26,6 +26,7 @@ export class WorkflowDispatcher { this.subscriberId = subscriberId; this.type = type; this.queue = []; + this.dataAckCallbacks = {}; this.active = 0; this.promises = []; this.batchMode = false; @@ -73,12 +74,12 @@ export class WorkflowDispatcher { } } - async dispatchToAllSubscribers(type, data) { + async dispatchToAllSubscribers(type, data, dataAckCallback) { const keysSet = this.dispatchers.get(type); if (keysSet) { for (let key of keysSet) { const dispatcher = this.dispatcherObjects.get(key); - await dispatcher.addToQueue(data); // You can pass the actual data you want to dispatch here + await dispatcher.addToQueue(data, dataAckCallback); // You can pass the actual data you want to dispatch here } } else { StatedWorkflow.logger.warn(`No subscribers found for type ${type}`); @@ -112,6 +113,14 @@ export class WorkflowDispatcher { const index = this.promises.indexOf(promise); if (index > -1) { this.promises.splice(index, 1); + if (this.dataAckCallbacks[eventData]) { + console.log("calling dataAckCallbacks for ", eventData); + try { + this.dataAckCallbacks[eventData](); + } catch (error) { + console.error("Error calling dataAckCallbacks:", error); + } + } } this._dispatch(); }); @@ -138,13 +147,14 @@ export class WorkflowDispatcher { record.shift(); //we keep a history of the active count for 10 values over time } } - async addToQueue(data) { + async addToQueue(data, dataAckCallback) { return new Promise(async (resolve, reject) => { const tryAddToQueue = async () => { this._logActivity("log", {"t": new Date().getTime(), "acivie": this.active, "queue": this.queue.length}); if (this.active < this.parallelism) { this.queue.push(data); + if (dataAckCallback) this.dataAckCallbacks[data] = dataAckCallback; resolve(); // Resolve the promise to signal that the data was queued this._dispatch(); // Attempt to dispatch the next task } else { From 2b80fdf0abdff5ca81c6ff14b433bb6f9cb6dac7 Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Fri, 23 Feb 2024 13:40:25 -0800 Subject: [PATCH 13/19] finally delete callback --- src/workflow/WorkflowDispatcher.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/workflow/WorkflowDispatcher.js b/src/workflow/WorkflowDispatcher.js index 591a889..0cdf605 100644 --- a/src/workflow/WorkflowDispatcher.js +++ b/src/workflow/WorkflowDispatcher.js @@ -119,6 +119,8 @@ export class WorkflowDispatcher { this.dataAckCallbacks[eventData](); } catch (error) { console.error("Error calling dataAckCallbacks:", error); + } finally { + delete this.dataAckCallbacks[eventData]; } } } From d4ca530072bd79412ff635adf465b6be5b4f91d2 Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Fri, 23 Feb 2024 14:17:19 -0800 Subject: [PATCH 14/19] tests++ --- example/inhabitants-with-delay.yaml | 4 ++-- example/rebelCommunication.yaml | 1 - src/test/StatedWorkflow.test.js | 6 +++--- src/workflow/WorkflowDispatcher.js | 2 +- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/example/inhabitants-with-delay.yaml b/example/inhabitants-with-delay.yaml index dc946f1..e816ef5 100644 --- a/example/inhabitants-with-delay.yaml +++ b/example/inhabitants-with-delay.yaml @@ -9,13 +9,13 @@ subscribeResidents: type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events to: /${ getResidentsWorkflow } subscriberId: subscribeResidents - parallelism: 4 + parallelism: 8 client: type: test getResidentsWorkflow: function: /${ function($planetInfo){ $planetInfo ~> $serial([extractResidents, fetchResidents]) } } extractResidents: - function: /${ function($planet){( $sleep($random() * 100) ; $planet.residents.($fetch($).json()) )} } # add a random delay + function: /${ function($planet){( $sleep($random() * 10) ; $planet.residents.($fetch($).json()) )} } # add a random delay fetchResidents: function: /${ function($residents){$residents.($set('/residents/-',{'name':$.name, 'url':$.url}))} } residents: [ ] diff --git a/example/rebelCommunication.yaml b/example/rebelCommunication.yaml index 1de7ec0..9a21896 100644 --- a/example/rebelCommunication.yaml +++ b/example/rebelCommunication.yaml @@ -49,4 +49,3 @@ farFarAway: [ ] nearBy: [ ] # This condition stops the transmission operation when interceptedMessages has 10 elements stop$: ($count(interceptedMessages)>=10?($clearInterval(send);'transmissionAccomplished'):'transmissionOngoing') -recover$!: $recover(processMessageWorkflow) diff --git a/src/test/StatedWorkflow.test.js b/src/test/StatedWorkflow.test.js index 13e12da..db26ee5 100644 --- a/src/test/StatedWorkflow.test.js +++ b/src/test/StatedWorkflow.test.js @@ -914,8 +914,8 @@ test("backpressure due to max parallelism", async () => { /** * This test validates that the workflow can be recovered from a snapshot. */ -test("serial workflow with backpressure", async () => { - // Logging function to avoid repetition of console.log with date stamps +test("Snapshot and recovery for workflow", async () => { + // Logging function to console.log with date stamps const logWithDate = (message) => { console.log(`${new Date().toISOString()}: ${message}`); }; @@ -979,7 +979,7 @@ test("serial workflow with backpressure", async () => { } while (uniqResidents < 82) logWithDate(`We got ${uniqResidents} unique residents processed with ${tp.output.residents.length} total residents`); -}, 10000); +}, 20000); // 20s timeout for times swapi not behaving test("subscribePulsar with pulsarMock client", async () => { diff --git a/src/workflow/WorkflowDispatcher.js b/src/workflow/WorkflowDispatcher.js index 0cdf605..a5b3b65 100644 --- a/src/workflow/WorkflowDispatcher.js +++ b/src/workflow/WorkflowDispatcher.js @@ -114,7 +114,7 @@ export class WorkflowDispatcher { if (index > -1) { this.promises.splice(index, 1); if (this.dataAckCallbacks[eventData]) { - console.log("calling dataAckCallbacks for ", eventData); + console.debug("calling dataAckCallbacks for ", eventData); try { this.dataAckCallbacks[eventData](); } catch (error) { From d513bb6077582a48bebdf936d3421e63fefac855 Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Fri, 23 Feb 2024 14:20:01 -0800 Subject: [PATCH 15/19] tests timeout++ --- src/test/StatedWorkflow.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/StatedWorkflow.test.js b/src/test/StatedWorkflow.test.js index db26ee5..0b18dec 100644 --- a/src/test/StatedWorkflow.test.js +++ b/src/test/StatedWorkflow.test.js @@ -1011,4 +1011,4 @@ test("subscribePulsar with pulsarMock client", async () => { await new Promise(resolve => setTimeout(resolve, 500)); // Poll every 50ms }; console.log(`PulsarMock topic ${topic} stats: ${StatedREPL.stringify(PulsarClientMock.getStats(topic))}`); -}, 10000) +}, 20000) From 7cc556b950723c47affc95a48d09ce0936672147 Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Fri, 23 Feb 2024 16:31:39 -0800 Subject: [PATCH 16/19] fix data race in callback creation --- example/inhabitants-with-delay.yaml | 7 +-- src/test/PulsarMock.js | 68 ++++++++++++++++++++--------- src/test/StatedWorkflow.test.js | 32 ++++++++++---- src/workflow/StatedWorkflow.js | 11 +++-- src/workflow/WorkflowDispatcher.js | 25 ++++++----- 5 files changed, 95 insertions(+), 48 deletions(-) diff --git a/example/inhabitants-with-delay.yaml b/example/inhabitants-with-delay.yaml index e816ef5..7ca9d27 100644 --- a/example/inhabitants-with-delay.yaml +++ b/example/inhabitants-with-delay.yaml @@ -1,7 +1,8 @@ # producer will be sending some test data produceParams: - type: "my-topic" - data: ${[1..6].($fetch('https://swapi.dev/api/planets/?page=' & $string($)).json().results)} + type: "residents" + # fetch one page of planet data from the Star Wars API + data: ${[1].($fetch('https://swapi.dev/api/planets/?page=' & $string($)).json().results)} client: type: test subscribeResidents: @@ -9,7 +10,7 @@ subscribeResidents: type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events to: /${ getResidentsWorkflow } subscriberId: subscribeResidents - parallelism: 8 + parallelism: 4 client: type: test getResidentsWorkflow: diff --git a/src/test/PulsarMock.js b/src/test/PulsarMock.js index a86612a..5102e0c 100644 --- a/src/test/PulsarMock.js +++ b/src/test/PulsarMock.js @@ -1,32 +1,47 @@ export class PulsarClientMock { - static inMemoryStore = new Map(); // Static store to hold messages for each topic - static messageIdCounter = 0; // Global counter to generate unique message IDs - static listeners = new Map(); // Global map to hold listeners for message consumption - static ackTimeout = 30000; // Default acknowledgment timeout (in milliseconds) - static acknowledgedMessages = new Map(); // Static store to hold acknowledged messages for each topic + static inMemoryStore = new Map(); + static messageIdCounter = 0; + static listeners = new Map(); + static ackTimeout = 30000; + static acknowledgedMessages = new Map(); + static operationLocks = new Map(); // Map to keep track of ongoing operations by topic + + // Helper method to lock operations per topic + static async lockOperation(topic) { + while (this.operationLocks.get(topic)) { + await this.operationLocks.get(topic); + } + let resolveLock; + const lockPromise = new Promise(resolve => resolveLock = resolve); + this.operationLocks.set(topic, lockPromise); + return resolveLock; + } - // Allows configuration of the acknowledgment timeout static configureAckTimeout(timeout) { this.ackTimeout = timeout; } async createProducer(config) { const topic = config.topic; - // Ensure a message queue exists for the topic if (!PulsarClientMock.inMemoryStore.has(topic)) { PulsarClientMock.inMemoryStore.set(topic, []); } return { send: async (message) => { - const messageId = new MessageId(`message-${++PulsarClientMock.messageIdCounter}`); - const messageInstance = new Message(topic, undefined, message.data, messageId, Date.now(), Date.now(), 0, ''); - const messages = PulsarClientMock.inMemoryStore.get(topic) || []; - messages.push({ message: messageInstance, visible: true }); - - PulsarClientMock.notifyListeners(topic); - - return { messageId: messageId.toString() }; + const resolveLock = await PulsarClientMock.lockOperation(topic); // Lock operation for topic + try { + const messageId = new MessageId(`message-${++PulsarClientMock.messageIdCounter}`); + const messageInstance = new Message(topic, undefined, message.data, messageId, Date.now(), Date.now(), 0, ''); + const messages = PulsarClientMock.inMemoryStore.get(topic); + messages.push({ message: messageInstance, visible: true }); + + PulsarClientMock.notifyListeners(topic); + return { messageId: messageId.toString() }; + } finally { + resolveLock(); // Unlock operation for topic + PulsarClientMock.operationLocks.delete(topic); // Cleanup lock + } }, close: async () => {}, }; @@ -63,21 +78,33 @@ export class PulsarClientMock { }); }, acknowledge: async (message) => { - PulsarClientMock.acknowledgeMessage(topic, message.messageId.id); + const resolveLock = await PulsarClientMock.lockOperation(topic); // Lock operation for topic + try { + await PulsarClientMock.acknowledgeMessage(topic, message.messageId.id); + } finally { + resolveLock(); // Unlock operation for topic + PulsarClientMock.operationLocks.delete(topic); // Cleanup lock + } }, acknowledgeId: async (messageId) => { - PulsarClientMock.acknowledgeMessage(topic, messageId.id); + const resolveLock = await PulsarClientMock.lockOperation(topic); // Lock operation for topic + try { + await PulsarClientMock.acknowledgeMessage(topic, messageId.id); + } finally { + resolveLock(); // Unlock operation for topic + PulsarClientMock.operationLocks.delete(topic); // Cleanup lock + } }, close: async () => {}, }; } - static acknowledgeMessage(topic, messageId) { + static async acknowledgeMessage(topic, messageId) { + // No need to lock here since it's already locked in acknowledge and acknowledgeId methods const messages = PulsarClientMock.inMemoryStore.get(topic) || []; const messageIndex = messages.findIndex(m => m.message.messageId.id === messageId); if (messageIndex !== -1) { - const [acknowledgedMessage] = messages.splice(messageIndex, 1); // Remove and get the acknowledged message - // Store acknowledged message + const [acknowledgedMessage] = messages.splice(messageIndex, 1); if (!this.acknowledgedMessages.has(topic)) { this.acknowledgedMessages.set(topic, []); } @@ -85,6 +112,7 @@ export class PulsarClientMock { } } + static getTopics() { return Array.from(PulsarClientMock.inMemoryStore.keys()); } diff --git a/src/test/StatedWorkflow.test.js b/src/test/StatedWorkflow.test.js index 0b18dec..69e2829 100644 --- a/src/test/StatedWorkflow.test.js +++ b/src/test/StatedWorkflow.test.js @@ -552,7 +552,7 @@ test("recover incomplete workflow - step 1 is incomplete - should rerun steps 1 } } } - } + } step2: name: sprayTheNozzle function: \${function($e){ $e~>|$|{'sprayed':true}| }} @@ -914,7 +914,7 @@ test("backpressure due to max parallelism", async () => { /** * This test validates that the workflow can be recovered from a snapshot. */ -test("Snapshot and recovery for workflow", async () => { +test("Snapshot and recover for workflow", async () => { // Logging function to console.log with date stamps const logWithDate = (message) => { console.log(`${new Date().toISOString()}: ${message}`); @@ -952,7 +952,7 @@ test("Snapshot and recovery for workflow", async () => { const snapshotContent = fs.readFileSync(defaultSnapshotPath, 'utf8'); snapshot = JSON.parse(snapshotContent); logWithDate(`Snapshot has ${snapshot.output.residents.length} residents`); - if (snapshot.output?.residents?.length > 10) { + if (snapshot.output?.residents?.length > 5) { anyResidentsSnapshotted = true; break; } @@ -976,18 +976,34 @@ test("Snapshot and recovery for workflow", async () => { uniqResidents = Object.keys(tp.output?.residents.reduce((counts, o)=>{ counts[o.name] = (counts[o.name] || 0) + 1; return counts }, {})).length; logWithDate(`Got ${uniqResidents} unique residents processed`); await new Promise(resolve => setTimeout(resolve, 1000)); - } while (uniqResidents < 82) + } while (uniqResidents < 32) logWithDate(`We got ${uniqResidents} unique residents processed with ${tp.output.residents.length} total residents`); + await sw.close(); + logWithDate("Stopped stated workflow before test end"); }, 20000); // 20s timeout for times swapi not behaving test("subscribePulsar with pulsarMock client", async () => { + + const defaultSnapshotPath = path.join(__dirname, '../', '../','defaultSnapshot.json'); + try { + await unlink(defaultSnapshotPath); + console.log(`Deleted previous default snapshot file: ${defaultSnapshotPath}`); + } catch (e) { + if (e.code !== 'ENOENT') { + throw e; + } + } + + PulsarClientMock.clear(); const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'rebelCommunication.yaml'); const templateYaml = fs.readFileSync(yamlFilePath, 'utf8'); let template = yaml.load(templateYaml); - const {templateProcessor: tp} = await StatedWorkflow.newWorkflow(template); + const sw = await StatedWorkflow.newWorkflow(template); + await sw.close(); + const {templateProcessor: tp} = sw; // keep steps execution logs for debugging tp.options = {'keepLogs': true, 'snapshot': {}}; @@ -1004,11 +1020,11 @@ test("subscribePulsar with pulsarMock client", async () => { console.log("waiting for at least 10 messages to be acknowledged"); const topic = PulsarClientMock.getTopics()[0]; // we use only one topic in the test + while (!Array.isArray(PulsarClientMock.getAcknowledgedMessages(topic)) || PulsarClientMock.getAcknowledgedMessages(topic).length < 10) { - let ackedMessages = PulsarClientMock.getAcknowledgedMessages(topic); console.log(`PulsarMock topic ${topic} stats: ${StatedREPL.stringify(PulsarClientMock.getStats(topic))}`); - await new Promise(resolve => setTimeout(resolve, 500)); // Poll every 50ms + await new Promise(resolve => setTimeout(resolve, 500)); // Poll every 500ms }; console.log(`PulsarMock topic ${topic} stats: ${StatedREPL.stringify(PulsarClientMock.getStats(topic))}`); -}, 20000) +}, 200000) diff --git a/src/workflow/StatedWorkflow.js b/src/workflow/StatedWorkflow.js index 30c3b08..8933022 100644 --- a/src/workflow/StatedWorkflow.js +++ b/src/workflow/StatedWorkflow.js @@ -362,14 +362,12 @@ export class StatedWorkflow { }); // Store the consumer in the map this.consumers.set(type, consumer); - let message; let countdown = maxConsume; while (true) { try { - message = await consumer.receive(); + const message = await consumer.receive(); let messageData; - let messageId; try { const messageDataStr = message.getData().toString(); messageData = JSON.parse(messageDataStr); @@ -385,7 +383,8 @@ export class StatedWorkflow { // we create a callback to acknowledge the message const dataAckCallback = async () => { - return consumer.acknowledge(message); + const promise = consumer.acknowledge(message); + console.log(`acknowledging message ${message} for messageData: ${messageData}`); } //if the dispatchers max parallelism is reached this loop should block, which is why we await @@ -768,8 +767,8 @@ export class StatedWorkflow { // await consumer.disconnect(); // } try { - await this.workflowDispatcher.clear(); - await this.templateProcessor.close(); + if (this.workflowDispatcher) await this.workflowDispatcher.clear(); + if (this.templateProcessor) await this.templateProcessor.close(); } catch (error) { console.error("Error closing workflow dispatcher:", error); diff --git a/src/workflow/WorkflowDispatcher.js b/src/workflow/WorkflowDispatcher.js index a5b3b65..7ee78a9 100644 --- a/src/workflow/WorkflowDispatcher.js +++ b/src/workflow/WorkflowDispatcher.js @@ -26,7 +26,7 @@ export class WorkflowDispatcher { this.subscriberId = subscriberId; this.type = type; this.queue = []; - this.dataAckCallbacks = {}; + this.dataAckCallbacks = new Map(); this.active = 0; this.promises = []; this.batchMode = false; @@ -113,15 +113,15 @@ export class WorkflowDispatcher { const index = this.promises.indexOf(promise); if (index > -1) { this.promises.splice(index, 1); - if (this.dataAckCallbacks[eventData]) { - console.debug("calling dataAckCallbacks for ", eventData); - try { - this.dataAckCallbacks[eventData](); - } catch (error) { - console.error("Error calling dataAckCallbacks:", error); - } finally { - delete this.dataAckCallbacks[eventData]; - } + if (this.dataAckCallbacks.get(eventData)) { + // console.debug("calling dataAckCallbacks for ", eventData); + const callbackPromise = this.dataAckCallbacks.get(eventData)(); + callbackPromise.then(() => { + // console.debug("dataAckCallbacks resolved for ", eventData); + }).catch(error => { + console.error("Error calling dataAckCallbacks:", error); + }); + delete this.dataAckCallbacks.get(eventData); } } this._dispatch(); @@ -156,7 +156,10 @@ export class WorkflowDispatcher { this._logActivity("log", {"t": new Date().getTime(), "acivie": this.active, "queue": this.queue.length}); if (this.active < this.parallelism) { this.queue.push(data); - if (dataAckCallback) this.dataAckCallbacks[data] = dataAckCallback; + if (dataAckCallback) { + // console.debug("adding dataAckCallbacks for ", data); + this.dataAckCallbacks.set(data, dataAckCallback); + } resolve(); // Resolve the promise to signal that the data was queued this._dispatch(); // Attempt to dispatch the next task } else { From 9f9ae553c64d45da06e633ff444e6f0abd5feaf9 Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Fri, 23 Feb 2024 19:41:57 -0800 Subject: [PATCH 17/19] remove locks which do not work --- example/pubsub-data-function-pulsar.yaml | 2 +- example/rebelCommunication.yaml | 2 +- src/test/PulsarMock.js | 68 +++++++----------------- 3 files changed, 22 insertions(+), 50 deletions(-) diff --git a/example/pubsub-data-function-pulsar.yaml b/example/pubsub-data-function-pulsar.yaml index a312b7c..f6f9f19 100644 --- a/example/pubsub-data-function-pulsar.yaml +++ b/example/pubsub-data-function-pulsar.yaml @@ -4,7 +4,7 @@ produceParams: data: | ${ function(){ - {'message': 'Rebel Fleet Coordinates', 'location': $random(), 'sender': $floor($random()*10)+1 } + {'message': 'Rebel Fleet Coordinates', 'location': $random(), 'sender': $ceil($random()*10) } } } client: diff --git a/example/rebelCommunication.yaml b/example/rebelCommunication.yaml index 9a21896..1ae860d 100644 --- a/example/rebelCommunication.yaml +++ b/example/rebelCommunication.yaml @@ -4,7 +4,7 @@ produceParams: data: | ${ function(){ - {'message': 'Rebel Fleet Coordinates', 'location': $random(), 'sender': $floor($random()*10)+1 } + {'message': 'Rebel Fleet Coordinates', 'location': $random(), 'sender': $ceil($random()*10) } } } client: diff --git a/src/test/PulsarMock.js b/src/test/PulsarMock.js index 5102e0c..a86612a 100644 --- a/src/test/PulsarMock.js +++ b/src/test/PulsarMock.js @@ -1,47 +1,32 @@ export class PulsarClientMock { - static inMemoryStore = new Map(); - static messageIdCounter = 0; - static listeners = new Map(); - static ackTimeout = 30000; - static acknowledgedMessages = new Map(); - static operationLocks = new Map(); // Map to keep track of ongoing operations by topic - - // Helper method to lock operations per topic - static async lockOperation(topic) { - while (this.operationLocks.get(topic)) { - await this.operationLocks.get(topic); - } - let resolveLock; - const lockPromise = new Promise(resolve => resolveLock = resolve); - this.operationLocks.set(topic, lockPromise); - return resolveLock; - } + static inMemoryStore = new Map(); // Static store to hold messages for each topic + static messageIdCounter = 0; // Global counter to generate unique message IDs + static listeners = new Map(); // Global map to hold listeners for message consumption + static ackTimeout = 30000; // Default acknowledgment timeout (in milliseconds) + static acknowledgedMessages = new Map(); // Static store to hold acknowledged messages for each topic + // Allows configuration of the acknowledgment timeout static configureAckTimeout(timeout) { this.ackTimeout = timeout; } async createProducer(config) { const topic = config.topic; + // Ensure a message queue exists for the topic if (!PulsarClientMock.inMemoryStore.has(topic)) { PulsarClientMock.inMemoryStore.set(topic, []); } return { send: async (message) => { - const resolveLock = await PulsarClientMock.lockOperation(topic); // Lock operation for topic - try { - const messageId = new MessageId(`message-${++PulsarClientMock.messageIdCounter}`); - const messageInstance = new Message(topic, undefined, message.data, messageId, Date.now(), Date.now(), 0, ''); - const messages = PulsarClientMock.inMemoryStore.get(topic); - messages.push({ message: messageInstance, visible: true }); - - PulsarClientMock.notifyListeners(topic); - return { messageId: messageId.toString() }; - } finally { - resolveLock(); // Unlock operation for topic - PulsarClientMock.operationLocks.delete(topic); // Cleanup lock - } + const messageId = new MessageId(`message-${++PulsarClientMock.messageIdCounter}`); + const messageInstance = new Message(topic, undefined, message.data, messageId, Date.now(), Date.now(), 0, ''); + const messages = PulsarClientMock.inMemoryStore.get(topic) || []; + messages.push({ message: messageInstance, visible: true }); + + PulsarClientMock.notifyListeners(topic); + + return { messageId: messageId.toString() }; }, close: async () => {}, }; @@ -78,33 +63,21 @@ export class PulsarClientMock { }); }, acknowledge: async (message) => { - const resolveLock = await PulsarClientMock.lockOperation(topic); // Lock operation for topic - try { - await PulsarClientMock.acknowledgeMessage(topic, message.messageId.id); - } finally { - resolveLock(); // Unlock operation for topic - PulsarClientMock.operationLocks.delete(topic); // Cleanup lock - } + PulsarClientMock.acknowledgeMessage(topic, message.messageId.id); }, acknowledgeId: async (messageId) => { - const resolveLock = await PulsarClientMock.lockOperation(topic); // Lock operation for topic - try { - await PulsarClientMock.acknowledgeMessage(topic, messageId.id); - } finally { - resolveLock(); // Unlock operation for topic - PulsarClientMock.operationLocks.delete(topic); // Cleanup lock - } + PulsarClientMock.acknowledgeMessage(topic, messageId.id); }, close: async () => {}, }; } - static async acknowledgeMessage(topic, messageId) { - // No need to lock here since it's already locked in acknowledge and acknowledgeId methods + static acknowledgeMessage(topic, messageId) { const messages = PulsarClientMock.inMemoryStore.get(topic) || []; const messageIndex = messages.findIndex(m => m.message.messageId.id === messageId); if (messageIndex !== -1) { - const [acknowledgedMessage] = messages.splice(messageIndex, 1); + const [acknowledgedMessage] = messages.splice(messageIndex, 1); // Remove and get the acknowledged message + // Store acknowledged message if (!this.acknowledgedMessages.has(topic)) { this.acknowledgedMessages.set(topic, []); } @@ -112,7 +85,6 @@ export class PulsarClientMock { } } - static getTopics() { return Array.from(PulsarClientMock.inMemoryStore.keys()); } From 0feb7f07ad1c365eb17de3102c6cec932d7a3454 Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Fri, 23 Feb 2024 20:42:07 -0800 Subject: [PATCH 18/19] feedback++ --- src/test/PulsarMock.js | 131 +++++++++++++++++++++----------- src/test/StatedWorkflow.test.js | 8 +- src/workflow/StatedWorkflow.js | 4 +- 3 files changed, 91 insertions(+), 52 deletions(-) diff --git a/src/test/PulsarMock.js b/src/test/PulsarMock.js index a86612a..13c8250 100644 --- a/src/test/PulsarMock.js +++ b/src/test/PulsarMock.js @@ -1,18 +1,16 @@ export class PulsarClientMock { - static inMemoryStore = new Map(); // Static store to hold messages for each topic - static messageIdCounter = 0; // Global counter to generate unique message IDs - static listeners = new Map(); // Global map to hold listeners for message consumption - static ackTimeout = 30000; // Default acknowledgment timeout (in milliseconds) - static acknowledgedMessages = new Map(); // Static store to hold acknowledged messages for each topic + static inMemoryStore = new Map(); + static messageIdCounter = 0; + static listeners = new Map(); + static ackTimeout = 30000; + static acknowledgedMessages = new Map(); // Stores acknowledgments per subscriber ID per topic - // Allows configuration of the acknowledgment timeout static configureAckTimeout(timeout) { this.ackTimeout = timeout; } async createProducer(config) { const topic = config.topic; - // Ensure a message queue exists for the topic if (!PulsarClientMock.inMemoryStore.has(topic)) { PulsarClientMock.inMemoryStore.set(topic, []); } @@ -22,8 +20,9 @@ export class PulsarClientMock { const messageId = new MessageId(`message-${++PulsarClientMock.messageIdCounter}`); const messageInstance = new Message(topic, undefined, message.data, messageId, Date.now(), Date.now(), 0, ''); const messages = PulsarClientMock.inMemoryStore.get(topic) || []; - messages.push({ message: messageInstance, visible: true }); + messages.push({ message: messageInstance, subscriberIds: new Set() }); // Track which subscribers have received the message + // Ensure all subscribers are aware of the new message PulsarClientMock.notifyListeners(topic); return { messageId: messageId.toString() }; @@ -32,29 +31,46 @@ export class PulsarClientMock { }; } + /** + * creates a consumer per topic and subscriberId. If more than one subscriber with the same subscriberId is created + * for the same topic, they will process messages in a FIFO manner. + */ + async subscribe(config) { const topic = config.topic; + const subscriberId = config.subscription; + return { + + /** + * receives either returns the next visible message, or blocks until a message is available. + */ receive: () => { return new Promise((resolve) => { const tryResolve = () => { const messages = PulsarClientMock.inMemoryStore.get(topic) || []; - const messageIndex = messages.findIndex(m => m.visible); - if (messageIndex !== -1) { - // Make the message temporarily invisible to simulate message locking - messages[messageIndex].visible = false; + const messageIndex = messages.findIndex(m => !m.subscriberIds.has(subscriberId)); + if (messageIndex !== -1) { // there a message available for this subscriber + const message = messages[messageIndex]; + message.subscriberIds.add(subscriberId); // Mark as received by this subscriber + + // Make the message acknowledged after a timeout for this subscriber setTimeout(() => { - // Make the message visible again after the timeout - messages[messageIndex].visible = true; - PulsarClientMock.notifyListeners(topic); + if (!PulsarClientMock.isAcknowledged(topic, message.message.messageId.id, subscriberId)) { + // If not acknowledged, make it visible again to all subscribers + message.subscriberIds.delete(subscriberId); + PulsarClientMock.notifyListeners(topic); + } }, PulsarClientMock.ackTimeout); - resolve(messages[messageIndex].message); - } else { - // No visible messages available, wait for new messages + + resolve(message.message); + + } else { // no message available for this subscriber, wait for the next one if (!PulsarClientMock.listeners.has(topic)) { PulsarClientMock.listeners.set(topic, []); } + // add this function invocation to the list of listeners for this topic PulsarClientMock.listeners.get(topic).push(tryResolve); } }; @@ -62,58 +78,77 @@ export class PulsarClientMock { tryResolve(); }); }, - acknowledge: async (message) => { - PulsarClientMock.acknowledgeMessage(topic, message.messageId.id); + acknowledge: async (message) => {/**/ + PulsarClientMock.acknowledgeMessage(topic, message.messageId.id, subscriberId); }, acknowledgeId: async (messageId) => { - PulsarClientMock.acknowledgeMessage(topic, messageId.id); + PulsarClientMock.acknowledgeMessage(topic, messageId.id, subscriberId); }, close: async () => {}, }; } - static acknowledgeMessage(topic, messageId) { - const messages = PulsarClientMock.inMemoryStore.get(topic) || []; - const messageIndex = messages.findIndex(m => m.message.messageId.id === messageId); - if (messageIndex !== -1) { - const [acknowledgedMessage] = messages.splice(messageIndex, 1); // Remove and get the acknowledged message - // Store acknowledged message - if (!this.acknowledgedMessages.has(topic)) { - this.acknowledgedMessages.set(topic, []); - } - this.acknowledgedMessages.get(topic).push(acknowledgedMessage.message); + static isAcknowledged(topic, messageId, subscriberId) { + const topicAcks = this.acknowledgedMessages.get(topic); + const subscriberAcks = topicAcks ? topicAcks.get(subscriberId) : undefined; + return subscriberAcks ? subscriberAcks.has(messageId) : false; + } + + static acknowledgeMessage(topic, messageId, subscriberId) { + if (!this.acknowledgedMessages.has(topic)) { + this.acknowledgedMessages.set(topic, new Map()); + } + + const topicAcks = this.acknowledgedMessages.get(topic); + if (!topicAcks.has(subscriberId)) { + topicAcks.set(subscriberId, new Set()); } + + const subscriberAcks = topicAcks.get(subscriberId); + subscriberAcks.add(messageId); } static getTopics() { return Array.from(PulsarClientMock.inMemoryStore.keys()); } - // Method to return statistics for the client - static getStats(topic) { + static getStats(topic, subscriberId) { const messages = this.inMemoryStore.get(topic) || []; - const acknowledged = this.acknowledgedMessages.get(topic) || []; - const inFlight = messages.filter(m => !m.visible).length; - const queueLength = messages.length + acknowledged.length; + const topicAcks = this.acknowledgedMessages.get(topic); + const subscriberAcks = topicAcks ? (topicAcks.get(subscriberId) || new Set()) : new Set(); + + // Calculate inFlight count as messages not yet acknowledged by this subscriber + const inFlight = messages.filter(m => !subscriberAcks.has(m.message.messageId.id)).length; + // Queue length includes messages not yet received or acknowledged by this subscriber + const queueLength = messages.length - subscriberAcks.size; return { - acknowledgedCount: acknowledged.length, + acknowledgedCount: subscriberAcks.size, inFlightCount: inFlight, queueLength, }; } - // Method to return all acknowledged messages for a topic - static getAcknowledgedMessages(topic) { - return this.acknowledgedMessages.get(topic) || []; - } + static getAcknowledgedMessages(topic, subscriberId) { + const topicAcks = this.acknowledgedMessages.get(topic); + const subscriberAcks = topicAcks ? topicAcks.get(subscriberId) : undefined; - static clear() { - PulsarClientMock.inMemoryStore.clear(); - PulsarClientMock.listeners.clear(); - PulsarClientMock.acknowledgedMessages.clear(); + if (!subscriberAcks) { + return []; + } + + const messages = this.inMemoryStore.get(topic) || []; + // Filter messages that have been acknowledged by the subscriber + return messages + .filter(m => subscriberAcks.has(m.message.messageId.id)) + .map(m => m.message); } + /** + * Notify all listeners for a topic about a message available. + * A listener is a receive invocation that is waiting for a message to be available. + * A message can be a new one, or the one with expired visibility timeout + */ static notifyListeners(topic) { const listeners = PulsarClientMock.listeners.get(topic) || []; while (listeners.length > 0) { @@ -122,6 +157,12 @@ export class PulsarClientMock { } } + static clear() { + PulsarClientMock.inMemoryStore.clear(); + PulsarClientMock.listeners.clear(); + PulsarClientMock.acknowledgedMessages.clear(); + } + close() { return Promise.resolve(null); } diff --git a/src/test/StatedWorkflow.test.js b/src/test/StatedWorkflow.test.js index 69e2829..eedf3c9 100644 --- a/src/test/StatedWorkflow.test.js +++ b/src/test/StatedWorkflow.test.js @@ -16,7 +16,6 @@ import fs from 'fs'; import yaml from 'js-yaml'; import { fileURLToPath } from 'url'; import path from 'path'; -import {WorkflowDispatcher} from "../workflow/WorkflowDispatcher.js"; import StatedREPL from "stated-js/dist/src/StatedREPL.js"; import {EnhancedPrintFunc} from "./TestTools.js"; import {rateLimit} from "stated-js/dist/src/utils/rateLimit.js"; @@ -1020,11 +1019,12 @@ test("subscribePulsar with pulsarMock client", async () => { console.log("waiting for at least 10 messages to be acknowledged"); const topic = PulsarClientMock.getTopics()[0]; // we use only one topic in the test + const subscriberId = tp.output.subscribeParams.type; while (!Array.isArray(PulsarClientMock.getAcknowledgedMessages(topic)) - || PulsarClientMock.getAcknowledgedMessages(topic).length < 10) { - console.log(`PulsarMock topic ${topic} stats: ${StatedREPL.stringify(PulsarClientMock.getStats(topic))}`); + || PulsarClientMock.getAcknowledgedMessages(topic, subscriberId).length < 10) { + console.log(`PulsarMock topic ${topic} stats for subscriberId ${subscriberId}: ${StatedREPL.stringify(PulsarClientMock.getStats(topic, subscriberId))}`); await new Promise(resolve => setTimeout(resolve, 500)); // Poll every 500ms }; - console.log(`PulsarMock topic ${topic} stats: ${StatedREPL.stringify(PulsarClientMock.getStats(topic))}`); + console.log(`PulsarMock topic ${topic} stats for subscriberId ${subscriberId}: ${StatedREPL.stringify(PulsarClientMock.getStats(topic, subscriberId))}`); }, 200000) diff --git a/src/workflow/StatedWorkflow.js b/src/workflow/StatedWorkflow.js index 8933022..bc8f6a9 100644 --- a/src/workflow/StatedWorkflow.js +++ b/src/workflow/StatedWorkflow.js @@ -19,12 +19,10 @@ import Pulsar from 'pulsar-client'; import {Kafka, logLevel} from 'kafkajs'; import winston from "winston"; import {WorkflowDispatcher} from "./WorkflowDispatcher.js"; -import {StepLog} from "./StepLog.js"; import Step from "./Step.js"; import {createStepPersistence} from "./StepPersistence.js"; import {TemplateUtils} from "./utils/TemplateUtils.js"; import {WorkflowPersistence} from "./WorkflowPersistence.js"; -import jp from "stated-js/dist/src/JsonPointer.js"; import util from "util"; import fs from "fs"; import path from "path"; @@ -384,7 +382,7 @@ export class StatedWorkflow { // we create a callback to acknowledge the message const dataAckCallback = async () => { const promise = consumer.acknowledge(message); - console.log(`acknowledging message ${message} for messageData: ${messageData}`); + console.log(`acknowledging messageId ${StatedREPL.stringify(message.getMessageId().toString())} for messageData: ${StatedREPL.stringify(messageData)}`); } //if the dispatchers max parallelism is reached this loop should block, which is why we await From caeb87794b987105ef82c12a558440f2e9b19224 Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Sat, 24 Feb 2024 23:56:27 -0800 Subject: [PATCH 19/19] PulsarMocks comments++ --- src/test/PulsarMock.js | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/test/PulsarMock.js b/src/test/PulsarMock.js index 13c8250..c26a3cb 100644 --- a/src/test/PulsarMock.js +++ b/src/test/PulsarMock.js @@ -1,9 +1,18 @@ export class PulsarClientMock { + // A map used to simulate an in-memory store for messages. The keys are topic names, and the values are arrays of message objects. static inMemoryStore = new Map(); + + // A counter to generate unique message IDs. It is incremented each time a message is sent. static messageIdCounter = 0; + + // A map where keys are topic names and values are arrays of receive function invocations (listeners). These listeners are called to notify about new messages. static listeners = new Map(); + + // The default time in milliseconds to wait before a message is considered not acknowledged (acknowledged messages are removed from visibility to simulate message acknowledgment behavior). static ackTimeout = 30000; - static acknowledgedMessages = new Map(); // Stores acknowledgments per subscriber ID per topic + + // A nested map where the first key is the topic name, the second key is the subscriber ID, and the value is a set of message IDs that have been acknowledged by the subscriber. + static acknowledgedMessages = new Map(); static configureAckTimeout(timeout) { this.ackTimeout = timeout;