diff --git a/README.md b/README.md index 3e9b024..a6a20b9 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ State Conductor flows are defined using a subset of [Amazon States Language (ASL The _State Conductor_ can be used to perform an arbitrary number of context-based processing actions on a subset of documents. Actions could include: invoking a [MarkLogic Data Hub](https://docs.marklogic.com/datahub/) flow, transforming a document, applying metadata, or calling an external process. -The _State Conductor_ requires a "Driver" to process documents and move them through the installed Flows' states. The _State Conductor_ currently (release 0.4.0) supports a [CoRB2](https://github.com/marklogic-community/corb2) driver, and [CPF](https://docs.marklogic.com/guide/cpf) driver. +The _State Conductor_ requires a "Driver" to process documents and move them through the installed Flows' states. The _State Conductor_ supports a [Data Services](https://github.com/aclavio/marklogic-state-conductor/tree/develop/state-conductor-dataservices) driver, a [CoRB2](https://github.com/marklogic-community/corb2) driver, and a [CPF](https://docs.marklogic.com/guide/cpf) driver. 1. [Installation](#installation) 2. [Usage](#usage) @@ -36,8 +36,8 @@ repositories { } } dependencies { - mlBundle "com.marklogic:marklogic-state-conductor:0.6.0" - mlBundle "com.marklogic:marklogic-state-conductor-cpf:0.6.0" // if using the cpf driver + mlBundle "com.marklogic:marklogic-state-conductor:0.6.1" + mlBundle "com.marklogic:marklogic-state-conductor-cpf:0.6.1" // if using the cpf driver } ``` @@ -206,10 +206,10 @@ New (optional) temporal parameters `startDate` and `endDate` in v0.3.0. ## Roadmap -- 0.6.0+ +- 0.7.0+ - DHS support - - Full support for Choice Rules - Batch support - Retention Policy for job documents - Flush out the validator and validate flow files on deployment - Support for Parallel and Map states + - Support for Retries diff --git a/build.gradle b/build.gradle index 66c2a57..f46ce1e 100644 --- a/build.gradle +++ b/build.gradle @@ -10,5 +10,5 @@ allprojects { } group = "com.marklogic" - version = "0.6.0" + version = "0.6.1" } diff --git a/state-conductor-dataservices/src/test/java/com/marklogic/StateConductorServiceTest.java b/state-conductor-dataservices/src/test/java/com/marklogic/StateConductorServiceTest.java index ab2dff3..1e0943e 100644 --- a/state-conductor-dataservices/src/test/java/com/marklogic/StateConductorServiceTest.java +++ b/state-conductor-dataservices/src/test/java/com/marklogic/StateConductorServiceTest.java @@ -275,7 +275,7 @@ public void testProcessBadJob() throws IOException { assertEquals(null, errorResp); assertNotNull(resp); assertEquals("missing-flow", badJob1Doc.getFlowName()); - assertEquals("new", badJob1Doc.getFlowStatus()); // TODO seems like this should be "failed" + assertEquals("failed", badJob1Doc.getFlowStatus()); } @Test @@ -317,15 +317,82 @@ public void testBatchProcessWithBadJob() throws IOException { assertEquals("add-collection-2", jobDoc.getFlowState()); assertEquals(true, meta.getCollections().contains("testcol1")); + jobDoc = getJobDocument(badJob1Uri); + logger.info(jobDoc.toString()); + assertEquals("missing-flow", jobDoc.getFlowName()); + assertEquals("failed", jobDoc.getFlowStatus()); + assertEquals(job1Uri, resp.get(0).get("job").asText()); assertEquals(true, resp.get(0).get("result").asBoolean()); assertEquals(job2Uri, resp.get(1).get("job").asText()); assertEquals(true, resp.get(1).get("result").asBoolean()); assertEquals(badJob1Uri, resp.get(2).get("job").asText()); - assertEquals(false, resp.get(2).get("result").asBoolean()); - assertNotNull(resp.get(2).get("error")); + assertEquals(true, resp.get(2).get("result").asBoolean()); assertEquals(job3Uri, resp.get(3).get("job").asText()); assertEquals(true, resp.get(3).get("result").asBoolean()); } + @Test + public void testProcessFailedJob() throws IOException { + // this bad job will go into 'failed' the first time through + // the second attempt should return 'false' for the failed job + + ArrayNode resp = null; + Exception errorResp = null; + StateConductorJob badJob1Doc; + + try { + resp = service.processJob(Arrays.stream(new String[]{badJob1Uri})); + } catch (Exception err) { + errorResp = err; + } + + badJob1Doc = getJobDocument(badJob1Uri); + logger.info(badJob1Doc.toString()); + + assertEquals(null, errorResp); + assertNotNull(resp); + assertEquals(badJob1Uri, resp.get(0).get("job").asText()); + assertEquals(true, resp.get(0).get("result").asBoolean()); + assertEquals("missing-flow", badJob1Doc.getFlowName()); + assertEquals("failed", badJob1Doc.getFlowStatus()); + + try { + resp = service.processJob(Arrays.stream(new String[]{badJob1Uri})); + } catch (Exception err) { + errorResp = err; + } + + assertEquals(null, errorResp); + assertNotNull(resp); + assertEquals(badJob1Uri, resp.get(0).get("job").asText()); + assertEquals(false, resp.get(0).get("result").asBoolean()); + } + + @Test + public void testExpectedExceptions() throws IOException { + ArrayNode resp = null; + Exception errorResp = null; + StateConductorJob badJob1Doc; + + String[] jobs = new String[] {"/not/a/real/job1.json", "/not/a/real/job2.json"}; + + try { + resp = service.processJob(Arrays.stream(jobs)); + } catch (Exception err) { + errorResp = err; + } + + logger.info(resp.toString()); + + assertEquals(null, errorResp); + assertNotNull(resp); + assertEquals("/not/a/real/job1.json", resp.get(0).get("job").asText()); + assertEquals(false, resp.get(0).get("result").asBoolean()); + assertNotNull(resp.get(0).get("error").asText()); + assertEquals("/not/a/real/job2.json", resp.get(1).get("job").asText()); + assertEquals(false, resp.get(1).get("result").asBoolean()); + assertNotNull(resp.get(1).get("error").asText()); + } + } diff --git a/state-conductor-dhf5-example/src/test/ml-modules/root/test/suites/StateConductorDHF5Suite/test-scDhf5Integration.sjs b/state-conductor-dhf5-example/src/test/ml-modules/root/test/suites/StateConductorDHF5Suite/test-scDhf5Integration.sjs index 60e5a0e..76d73ab 100644 --- a/state-conductor-dhf5-example/src/test/ml-modules/root/test/suites/StateConductorDHF5Suite/test-scDhf5Integration.sjs +++ b/state-conductor-dhf5-example/src/test/ml-modules/root/test/suites/StateConductorDHF5Suite/test-scDhf5Integration.sjs @@ -37,14 +37,17 @@ jobDoc = xdmp.toJSON({ provenance: [], }); -try { - respDoc = sc.executeStateByJobDoc(jobDoc, false); -} catch (err) { - error = err; -} - +respDoc = sc.executeStateByJobDoc(jobDoc, false); +xdmp.log(respDoc); assertions.push( - test.assertEqual('INVALID-STATE-DEFINITION', error.name, 'handled missing dhf flow') + test.assertEqual('failed', respDoc.flowStatus, 'status check'), + test.assertEqual('JS-JAVASCRIPT', respDoc.errors['runStep1'].name, 'handled missing dhf flow'), + test.assertTrue( + respDoc.errors['runStep1'].data.includes( + 'Error: The flow with the name MissingFlow could not be found.' + ), + 'handled missing dhf flow' + ) ); // test executing flow step diff --git a/state-conductor-example/src/main/ml-data/state-conductor-flow/branching-flow.asl.json b/state-conductor-example/src/main/ml-data/state-conductor-flow/branching-flow.asl.json index 742c12c..e4b8140 100644 --- a/state-conductor-example/src/main/ml-data/state-conductor-flow/branching-flow.asl.json +++ b/state-conductor-example/src/main/ml-data/state-conductor-flow/branching-flow.asl.json @@ -1,47 +1,47 @@ -{ - "Comment": "example of a branching flow using a 'Choice' type state", - "mlDomain": { - "context": [ - { - "scope": "collection", - "value": "enrollee" - } - ] - }, - "StartAt": "find-gender", - "States": { - "find-gender": { - "Type": "Choice", - "Comment": "determine's enrollee's gender", - "Choices": [ - { - "Resource": "/state-conductor/actions/branching-test-flow/gender-is-male.sjs", - "Next": "enroll-in-mens-health" - }, - { - "Resource": "/state-conductor/actions/branching-test-flow/gender-is-female.sjs", - "Next": "enroll-in-womens-health" - } - ], - "Default": "has-undetermined-gender" - }, - "enroll-in-mens-health": { - "Type": "Task", - "End": true, - "Comment": "adds enrollee to the men's health program", - "Resource": "/state-conductor/actions/branching-test-flow/enroll-in-mens-health.sjs" - }, - "enroll-in-womens-health": { - "Type": "Task", - "End": true, - "Comment": "adds enrollee to the womens's health program", - "Resource": "/state-conductor/actions/branching-test-flow/enroll-in-womens-health.sjs" - }, - "has-undetermined-gender": { - "Type": "Task", - "End": true, - "Comment": "flags enrollee for follow-up", - "Resource": "/state-conductor/actions/branching-test-flow/flag-for-follow-up.sjs" - } - } -} \ No newline at end of file +{ + "Comment": "example of a branching flow using a 'Choice' type state", + "mlDomain": { + "context": [ + { + "scope": "collection", + "value": "enrollee" + } + ] + }, + "StartAt": "find-gender", + "States": { + "find-gender": { + "Type": "Choice", + "Comment": "determine's enrollee's gender", + "Choices": [ + { + "Resource": "/state-conductor/actions/custom/branching-test-flow/gender-is-male.sjs", + "Next": "enroll-in-mens-health" + }, + { + "Resource": "/state-conductor/actions/custom/branching-test-flow/gender-is-female.sjs", + "Next": "enroll-in-womens-health" + } + ], + "Default": "has-undetermined-gender" + }, + "enroll-in-mens-health": { + "Type": "Task", + "End": true, + "Comment": "adds enrollee to the men's health program", + "Resource": "/state-conductor/actions/custom/branching-test-flow/enroll-in-mens-health.sjs" + }, + "enroll-in-womens-health": { + "Type": "Task", + "End": true, + "Comment": "adds enrollee to the womens's health program", + "Resource": "/state-conductor/actions/custom/branching-test-flow/enroll-in-womens-health.sjs" + }, + "has-undetermined-gender": { + "Type": "Task", + "End": true, + "Comment": "flags enrollee for follow-up", + "Resource": "/state-conductor/actions/custom/branching-test-flow/flag-for-follow-up.sjs" + } + } +} diff --git a/state-conductor-example/src/test/java/com/marklogic/StatusServiceTest.java b/state-conductor-example/src/test/java/com/marklogic/StatusServiceTest.java index 2687f3c..07e49e3 100644 --- a/state-conductor-example/src/test/java/com/marklogic/StatusServiceTest.java +++ b/state-conductor-example/src/test/java/com/marklogic/StatusServiceTest.java @@ -93,22 +93,27 @@ public void testNoQueryParams() { contentType(ContentType.JSON). body("rest-test-flow", notNullValue()). body("rest-test-flow.flowName", equalTo("rest-test-flow")). - body("rest-test-flow.working", notNullValue()). - body("rest-test-flow.working.add-collection-1", greaterThanOrEqualTo(0)). - body("rest-test-flow.working.add-collection-2", greaterThanOrEqualTo(0)). - body("rest-test-flow.working.success", greaterThanOrEqualTo(0)). - body("rest-test-flow.complete", notNullValue()). - body("rest-test-flow.complete.add-collection-1", greaterThanOrEqualTo(0)). - body("rest-test-flow.complete.add-collection-2", greaterThanOrEqualTo(0)). - body("rest-test-flow.complete.success", greaterThanOrEqualTo(0)). + body("rest-test-flow.totalPerStatus", notNullValue()). + body("rest-test-flow.totalPerStatus.new", greaterThanOrEqualTo(0)). + body("rest-test-flow.totalPerStatus.working", greaterThanOrEqualTo(0)). + body("rest-test-flow.totalPerStatus.waiting", greaterThanOrEqualTo(0)). + body("rest-test-flow.totalPerStatus.complete", greaterThanOrEqualTo(0)). + body("rest-test-flow.totalPerStatus.failed", greaterThanOrEqualTo(0)). + body("rest-test-flow.totalPerState", notNullValue()). + body("rest-test-flow.totalPerState.add-collection-1", greaterThanOrEqualTo(0)). + body("rest-test-flow.totalPerState.add-collection-2", greaterThanOrEqualTo(0)). + body("rest-test-flow.totalPerState.success", greaterThanOrEqualTo(0)). body("rest-test-flow2", notNullValue()). body("rest-test-flow2.flowName", equalTo("rest-test-flow2")). - body("rest-test-flow2.working", notNullValue()). - body("rest-test-flow2.working.test-data", greaterThanOrEqualTo(0)). - body("rest-test-flow2.working.success", greaterThanOrEqualTo(0)). - body("rest-test-flow2.complete", notNullValue()). - body("rest-test-flow2.complete.test-data", greaterThanOrEqualTo(0)). - body("rest-test-flow2.complete.success", greaterThanOrEqualTo(0)); + body("rest-test-flow2.totalPerStatus", notNullValue()). + body("rest-test-flow2.totalPerStatus.new", greaterThanOrEqualTo(0)). + body("rest-test-flow2.totalPerStatus.working", greaterThanOrEqualTo(0)). + body("rest-test-flow2.totalPerStatus.waiting", greaterThanOrEqualTo(0)). + body("rest-test-flow2.totalPerStatus.complete", greaterThanOrEqualTo(0)). + body("rest-test-flow2.totalPerStatus.failed", greaterThanOrEqualTo(0)). + body("rest-test-flow2.totalPerState", notNullValue()). + body("rest-test-flow2.totalPerState.test-data", greaterThanOrEqualTo(0)). + body("rest-test-flow2.totalPerState.success", greaterThanOrEqualTo(0)); } @Test @@ -124,14 +129,16 @@ public void testFlowNameParam() { contentType(ContentType.JSON). body("rest-test-flow", notNullValue()). body("rest-test-flow.flowName", equalTo("rest-test-flow")). - body("rest-test-flow.working", notNullValue()). - body("rest-test-flow.working.add-collection-1", greaterThanOrEqualTo(0)). - body("rest-test-flow.working.add-collection-2", greaterThanOrEqualTo(0)). - body("rest-test-flow.working.success", greaterThanOrEqualTo(0)). - body("rest-test-flow.complete", notNullValue()). - body("rest-test-flow.complete.add-collection-1", greaterThanOrEqualTo(0)). - body("rest-test-flow.complete.add-collection-2", greaterThanOrEqualTo(0)). - body("rest-test-flow.complete.success", greaterThanOrEqualTo(0)); + body("rest-test-flow.totalPerStatus", notNullValue()). + body("rest-test-flow.totalPerStatus.new", greaterThanOrEqualTo(0)). + body("rest-test-flow.totalPerStatus.working", greaterThanOrEqualTo(0)). + body("rest-test-flow.totalPerStatus.waiting", greaterThanOrEqualTo(0)). + body("rest-test-flow.totalPerStatus.complete", greaterThanOrEqualTo(0)). + body("rest-test-flow.totalPerStatus.failed", greaterThanOrEqualTo(0)). + body("rest-test-flow.totalPerState", notNullValue()). + body("rest-test-flow.totalPerState.add-collection-1", greaterThanOrEqualTo(0)). + body("rest-test-flow.totalPerState.add-collection-2", greaterThanOrEqualTo(0)). + body("rest-test-flow.totalPerState.success", greaterThanOrEqualTo(0)); } @Test @@ -147,12 +154,15 @@ public void testFlowName2Param() { contentType(ContentType.JSON). body("rest-test-flow2", notNullValue()). body("rest-test-flow2.flowName", equalTo("rest-test-flow2")). - body("rest-test-flow2.working", notNullValue()). - body("rest-test-flow2.working.test-data", greaterThanOrEqualTo(0)). - body("rest-test-flow2.working.success", greaterThanOrEqualTo(0)). - body("rest-test-flow2.complete", notNullValue()). - body("rest-test-flow2.complete.test-data", greaterThanOrEqualTo(0)). - body("rest-test-flow2.complete.success", greaterThanOrEqualTo(0)); + body("rest-test-flow2.totalPerStatus", notNullValue()). + body("rest-test-flow2.totalPerStatus.new", greaterThanOrEqualTo(0)). + body("rest-test-flow2.totalPerStatus.working", greaterThanOrEqualTo(0)). + body("rest-test-flow2.totalPerStatus.waiting", greaterThanOrEqualTo(0)). + body("rest-test-flow2.totalPerStatus.complete", greaterThanOrEqualTo(0)). + body("rest-test-flow2.totalPerStatus.failed", greaterThanOrEqualTo(0)). + body("rest-test-flow2.totalPerState", notNullValue()). + body("rest-test-flow2.totalPerState.test-data", greaterThanOrEqualTo(0)). + body("rest-test-flow2.totalPerState.success", greaterThanOrEqualTo(0)); } @Test @@ -170,18 +180,55 @@ public void testTemporalParam() { contentType(ContentType.JSON). body("rest-test-flow", notNullValue()). body("rest-test-flow.flowName", equalTo("rest-test-flow")). - body("rest-test-flow.totalComplete", equalTo(0)). - body("rest-test-flow.totalWorking", equalTo(1)). - body("rest-test-flow.totalFailed", equalTo(0)). - body("rest-test-flow.totalNew", greaterThanOrEqualTo(1)). - body("rest-test-flow.working", notNullValue()). - body("rest-test-flow.working.add-collection-1", equalTo(1)). - body("rest-test-flow.working.add-collection-2", equalTo(0)). - body("rest-test-flow.working.success", equalTo(0)). - body("rest-test-flow.complete", notNullValue()). - body("rest-test-flow.complete.add-collection-1", equalTo(0)). - body("rest-test-flow.complete.add-collection-2", equalTo(0)). - body("rest-test-flow.complete.success", equalTo(0)); + body("rest-test-flow.totalPerStatus", notNullValue()). + body("rest-test-flow.totalPerStatus.complete", equalTo(0)). + body("rest-test-flow.totalPerStatus.working", equalTo(1)). + body("rest-test-flow.totalPerStatus.failed", equalTo(0)). + body("rest-test-flow.totalPerStatus.new", greaterThanOrEqualTo(1)). + body("rest-test-flow.totalPerState", notNullValue()). + body("rest-test-flow.totalPerState.add-collection-1", equalTo(1)). + body("rest-test-flow.totalPerState.add-collection-2", equalTo(0)). + body("rest-test-flow.totalPerState.success", equalTo(0)); + } + + @Test + public void testDetailedParam() { + given(). + log().uri(). + when(). + queryParam("rs:flowName", "rest-test-flow2"). + queryParam("rs:detailed", true). + get("/v1/resources/state-conductor-status"). + then(). + log().body(). + statusCode(200). + contentType(ContentType.JSON). + body("rest-test-flow2", notNullValue()). + body("rest-test-flow2.flowName", equalTo("rest-test-flow2")). + body("rest-test-flow2.totalPerStatus", notNullValue()). + body("rest-test-flow2.totalPerStatus.complete", greaterThanOrEqualTo(0)). + body("rest-test-flow2.totalPerStatus.working", greaterThanOrEqualTo(0)). + body("rest-test-flow2.totalPerStatus.failed", greaterThanOrEqualTo(0)). + body("rest-test-flow2.totalPerStatus.new", greaterThanOrEqualTo(0)). + body("rest-test-flow2.totalPerState", notNullValue()). + body("rest-test-flow2.totalPerState.test-data", greaterThanOrEqualTo(0)). + body("rest-test-flow2.totalPerState.success", greaterThanOrEqualTo(0)). + body("rest-test-flow2.detailedTotalPerStatus", notNullValue()). + body("rest-test-flow2.detailedTotalPerStatus.new", notNullValue()). + body("rest-test-flow2.detailedTotalPerStatus.new.test-data", greaterThanOrEqualTo(0)). + body("rest-test-flow2.detailedTotalPerStatus.new.success", greaterThanOrEqualTo(0)). + body("rest-test-flow2.detailedTotalPerStatus.working", notNullValue()). + body("rest-test-flow2.detailedTotalPerStatus.working.test-data", greaterThanOrEqualTo(0)). + body("rest-test-flow2.detailedTotalPerStatus.working.success", greaterThanOrEqualTo(0)). + body("rest-test-flow2.detailedTotalPerStatus.waiting", notNullValue()). + body("rest-test-flow2.detailedTotalPerStatus.waiting.test-data", greaterThanOrEqualTo(0)). + body("rest-test-flow2.detailedTotalPerStatus.waiting.success", greaterThanOrEqualTo(0)). + body("rest-test-flow2.detailedTotalPerStatus.complete", notNullValue()). + body("rest-test-flow2.detailedTotalPerStatus.complete.test-data", greaterThanOrEqualTo(0)). + body("rest-test-flow2.detailedTotalPerStatus.complete.success", greaterThanOrEqualTo(0)). + body("rest-test-flow2.detailedTotalPerStatus.failed", notNullValue()). + body("rest-test-flow2.detailedTotalPerStatus.failed.test-data", greaterThanOrEqualTo(0)). + body("rest-test-flow2.detailedTotalPerStatus.failed.success", greaterThanOrEqualTo(0)); } @Test diff --git a/state-conductor-example/src/test/java/com/marklogic/ext/AbstractStateConductorRestTest.java b/state-conductor-example/src/test/java/com/marklogic/ext/AbstractStateConductorRestTest.java index b6435aa..7385e80 100644 --- a/state-conductor-example/src/test/java/com/marklogic/ext/AbstractStateConductorRestTest.java +++ b/state-conductor-example/src/test/java/com/marklogic/ext/AbstractStateConductorRestTest.java @@ -116,7 +116,7 @@ protected StringHandle loadTokenizedResource(String name, Map to } File file = new File(resource.getFile()); - String content = Files.readString(file.toPath()); + String content = new String(Files.readAllBytes(file.toPath())); for (Map.Entry token : tokens.entrySet()) { content = content.replaceAll(token.getKey(), token.getValue()); @@ -126,7 +126,7 @@ protected StringHandle loadTokenizedResource(String name, Map to } protected StringHandle replaceTokensInResource(FileHandle file, Map tokens) throws IOException { - String content = Files.readString(file.get().toPath()); + String content = new String(Files.readAllBytes(file.get().toPath())); for (Map.Entry token : tokens.entrySet()) { content = content.replaceAll(token.getKey(), token.getValue()); diff --git a/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/executeStateByJobDoc-tests.sjs b/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/executeStateByJobDoc-tests.sjs index 9237fe7..c9225e0 100644 --- a/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/executeStateByJobDoc-tests.sjs +++ b/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/executeStateByJobDoc-tests.sjs @@ -80,15 +80,12 @@ jobDoc = xdmp.toJSON({ provenance: [], }); -error = null; - -try { - error = sc.executeStateByJobDoc(jobDoc, false); -} catch (e) { - error = e; -} +assertion = sc.executeStateByJobDoc(jobDoc, false); -assertions.push(test.assertEqual('INVALID-STATE-DEFINITION', error.name, 'status check working')); +assertions.push( + test.assertEqual('failed', assertion.flowStatus, 'status check'), + test.assertEqual('INVALID-STATE-DEFINITION', assertion.errors['find-gender'].name) +); //checks see if the context was updated with a task jobDoc = xdmp.toJSON({ @@ -162,13 +159,12 @@ jobDoc = xdmp.toJSON({ modules: xdmp.modulesDatabase(), provenance: [], context: { - eventName: "series-of-clicks-and-beeps:1234" - } + eventName: 'series-of-clicks-and-beeps:1234', + }, }); assertion = sc.executeStateByJobDoc(jobDoc, false); - assertions.push(test.assertEqual('waiting', assertion.flowStatus, 'waiting flowStatus')); assertions.push( test.assertEqual( @@ -178,7 +174,6 @@ assertions.push( ) ); - jobDoc = xdmp.toJSON({ id: '0405536f-dd84-4ca6-8de8-c57062b2252d', flowName: 'wait-flow', @@ -189,19 +184,20 @@ jobDoc = xdmp.toJSON({ modules: xdmp.modulesDatabase(), provenance: [], context: { - eventName: "" - } + eventName: '', + }, }); +assertion = sc.executeStateByJobDoc(jobDoc, false); -error = null; -try { - error = sc.executeStateByJobDoc(jobDoc, false); -} catch (e) { - error = e; -} - -assertions.push(test.assertEqual('INVALID-STATE-DEFINITION', error.name, 'eventPath empty string')); +assertions.push( + test.assertEqual('failed', assertion.flowStatus, 'status check'), + test.assertEqual( + 'INVALID-STATE-DEFINITION', + assertion.errors['dialUpPath'].name, + 'eventPath empty string' + ) +); //checks a waiting state jobDoc = xdmp.toJSON({ @@ -213,22 +209,21 @@ jobDoc = xdmp.toJSON({ database: xdmp.database(), modules: xdmp.modulesDatabase(), provenance: [], - context: { - - } + context: {}, }); -error = null; -try { - error = sc.executeStateByJobDoc(jobDoc, false); -} catch (e) { - error = e; -} +assertion = sc.executeStateByJobDoc(jobDoc, false); -assertions.push(test.assertEqual('INVALID-STATE-DEFINITION', error.name, 'eventPath missing property ')); +assertions.push( + test.assertEqual('failed', assertion.flowStatus, 'status check'), + test.assertEqual( + 'INVALID-STATE-DEFINITION', + assertion.errors['dialUpPath'].name, + 'eventPath missing property' + ) +); //eventPath tests end - //unKnown database (content) jobDoc = xdmp.toJSON({ id: '0405536f-dd84-4ca6-8de8-c57062b2252d', @@ -241,14 +236,12 @@ jobDoc = xdmp.toJSON({ provenance: [], }); -error = null; -try { - error = sc.executeStateByJobDoc(jobDoc, false); -} catch (e) { - error = e; -} +assertion = sc.executeStateByJobDoc(jobDoc, false); -assertions.push(test.assertEqual('XDMP-NODB', error.name, 'unKnown database content')); +assertions.push( + test.assertEqual('failed', assertion.flowStatus, 'status check'), + test.assertEqual('XDMP-NODB', assertion.errors['find-gender'].name, 'unknown database content') +); //unKnown module database jobDoc = xdmp.toJSON({ @@ -262,14 +255,12 @@ jobDoc = xdmp.toJSON({ provenance: [], }); -error = null; -try { - error = sc.executeStateByJobDoc(jobDoc, false); -} catch (e) { - error = e; -} +assertion = sc.executeStateByJobDoc(jobDoc, false); -assertions.push(test.assertEqual('TRANSITIONERROR', error.name, 'unKnown database module')); +assertions.push( + test.assertEqual('failed', assertion.flowStatus, 'status check'), + test.assertEqual('XDMP-NODB', assertion.errors['find-gender'].name, 'unknown database modules') +); //unKnown database both jobDoc = xdmp.toJSON({ @@ -283,14 +274,12 @@ jobDoc = xdmp.toJSON({ provenance: [], }); -error = null; -try { - error = sc.executeStateByJobDoc(jobDoc, false); -} catch (e) { - error = e; -} +assertion = sc.executeStateByJobDoc(jobDoc, false); -assertions.push(test.assertEqual('XDMP-NODB', error.name, 'unKnown database both')); +assertions.push( + test.assertEqual('failed', assertion.flowStatus, 'status check'), + test.assertEqual('XDMP-NODB', assertion.errors['find-gender'].name, 'unknown database both') +); // missing action modules test jobDoc = xdmp.toJSON({ @@ -304,14 +293,15 @@ jobDoc = xdmp.toJSON({ provenance: [], }); -error = null; -try { - error = sc.executeStateByJobDoc(jobDoc, false); -} catch (e) { - error = e; -} +assertion = sc.executeStateByJobDoc(jobDoc, false); + assertions.push( - test.assertTrue(error.data[1].includes('XDMP-MODNOTFOUND'), 'detected missing action module') + test.assertEqual('failed', assertion.flowStatus, 'status check'), + test.assertEqual( + 'XDMP-MODNOTFOUND', + assertion.errors['set-prop1'].name, + 'detected missing action module' + ) ); // missing condition modules test @@ -326,14 +316,31 @@ jobDoc = xdmp.toJSON({ provenance: [], }); -error = null; -try { - error = sc.executeStateByJobDoc(jobDoc, false); -} catch (e) { - error = e; -} +assertion = sc.executeStateByJobDoc(jobDoc, false); + assertions.push( - test.assertTrue(error.data[1].includes('XDMP-MODNOTFOUND'), 'detected missing action module') + test.assertEqual('failed', assertion.flowStatus, 'status check'), + test.assertEqual( + 'XDMP-MODNOTFOUND', + assertion.errors['branch'].name, + 'detected missing condition module' + ) ); +// checks that a flow that terminates in a "Fail" state has the "failed" status applied +jobDoc = xdmp.toJSON({ + id: '0405536f-dd84-4ca6-8de8-c57062b2252d', + flowName: 'test-flow', + flowStatus: 'working', + flowState: 'failed', + uri: '/data/test-doc1.json', + database: xdmp.database(), + modules: xdmp.modulesDatabase(), + provenance: [], +}); + +assertion = sc.executeStateByJobDoc(jobDoc, false); + +assertions.push(test.assertEqual('failed', assertion.flowStatus, 'status check')); + assertions; diff --git a/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/resumeWaitingJobByJobDoc-tests.sjs b/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/resumeWaitingJobByJobDoc-tests.sjs index 416b8b7..d967f5c 100644 --- a/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/resumeWaitingJobByJobDoc-tests.sjs +++ b/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/resumeWaitingJobByJobDoc-tests.sjs @@ -81,14 +81,12 @@ jobDoc = xdmp.toJSON({ provenance: [], }); -error = null; -try { - error = sc.resumeWaitingJobByJobDoc(jobDoc, 'testing', false); -} catch (e) { - error = e; -} +assertion = sc.resumeWaitingJobByJobDoc(jobDoc, 'testing', false); -assertions.push(test.assertEqual('XDMP-NODB', error.name, 'unKnown database content')); +assertions.push( + test.assertEqual('failed', assertion.flowStatus, 'flowStatus check'), + test.assertEqual('XDMP-NODB', assertion.errors['dialUp'].name, 'unknown database content') +); //unKnown module database jobDoc = xdmp.toJSON({ diff --git a/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/retryJobAtStateByJobDoc-test.sjs b/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/retryJobAtStateByJobDoc-test.sjs index aceb56a..f125b61 100644 --- a/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/retryJobAtStateByJobDoc-test.sjs +++ b/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/retryJobAtStateByJobDoc-test.sjs @@ -123,15 +123,12 @@ jobDoc = xdmp.toJSON({ provenance: [], }); -error = null; +assertion = sc.retryJobAtStateByJobDoc(jobDoc, 'unknownStep', 'testing', false); -try { - error = sc.retryJobAtStateByJobDoc(jobDoc, 'unknownStep', 'testing', false); -} catch (e) { - error = e; -} - -assertions.push(test.assertEqual('INVALID-STATE-DEFINITION', error.name, 'unknownStep')); +assertions.push( + test.assertEqual('failed', assertion.flowStatus, 'status check'), + test.assertEqual('INVALID-STATE-DEFINITION', assertion.errors['dialUp'].name, 'unknownStep') +); //retry an NEW state jobDoc = xdmp.toJSON({ @@ -145,15 +142,12 @@ jobDoc = xdmp.toJSON({ provenance: [], }); -error = null; - -try { - error = sc.retryJobAtStateByJobDoc(jobDoc, 'NEW', 'testing', false); -} catch (e) { - error = e; -} +assertion = sc.retryJobAtStateByJobDoc(jobDoc, 'NEW', 'testing', false); -assertions.push(test.assertEqual('INVALID-STATE-DEFINITION', error.name, 'NEW')); +assertions.push( + test.assertEqual('failed', assertion.flowStatus, 'status check'), + test.assertEqual('INVALID-STATE-DEFINITION', assertion.errors['dialUp'].name, 'NEW') +); //unKnown database (content) jobDoc = xdmp.toJSON({ @@ -167,14 +161,12 @@ jobDoc = xdmp.toJSON({ provenance: [], }); -error = null; -try { - error = sc.retryJobAtStateByJobDoc(jobDoc, 'dialUp', 'testing', false); -} catch (e) { - error = e; -} +assertion = sc.retryJobAtStateByJobDoc(jobDoc, 'dialUp', 'testing', false); -assertions.push(test.assertEqual('XDMP-NODB', error.name, 'unKnown database content')); +assertions.push( + test.assertEqual('failed', assertion.flowStatus, 'status check'), + test.assertEqual('XDMP-NODB', assertion.errors['dialUp'].name, 'unknown database content') +); //unKnown module database jobDoc = xdmp.toJSON({ @@ -205,12 +197,11 @@ jobDoc = xdmp.toJSON({ provenance: [], }); -error = null; -try { - error = sc.retryJobAtStateByJobDoc(jobDoc, 'dialUp', 'testing', false); -} catch (e) { - error = e; -} +assertion = sc.retryJobAtStateByJobDoc(jobDoc, 'dialUp', 'testing', false); + +assertions.push( + test.assertEqual('failed', assertion.flowStatus, 'status check'), + test.assertEqual('XDMP-NODB', assertion.errors['dialUp'].name, 'unknown database both') +); -assertions.push(test.assertEqual('XDMP-NODB', error.name, 'unKnown database content')); assertions; diff --git a/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/startProcessingFlowByJobDoc-tests.sjs b/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/startProcessingFlowByJobDoc-tests.sjs index c3d5df9..c439db1 100644 --- a/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/startProcessingFlowByJobDoc-tests.sjs +++ b/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/startProcessingFlowByJobDoc-tests.sjs @@ -61,14 +61,12 @@ jobDoc = xdmp.toJSON({ provenance: [], }); -error = null; -try { - error = sc.startProcessingFlowByJobDoc(jobDoc, false); -} catch (e) { - error = e; -} +assertion = sc.startProcessingFlowByJobDoc(jobDoc, false); -assertions.push(test.assertEqual('MISSING-FLOW-FILE', error.name, 'missing flow file')); +assertions.push( + test.assertEqual('failed', assertion.flowStatus, 'status check'), + test.assertEqual('MISSING-FLOW-FILE', assertion.errors['find-gender'].name, 'missing flow file') +); //check for missing flow file jobDoc = xdmp.toJSON({ @@ -82,14 +80,16 @@ jobDoc = xdmp.toJSON({ provenance: [], }); -error = null; -try { - error = sc.startProcessingFlowByJobDoc(jobDoc, false); -} catch (e) { - error = e; -} +assertion = sc.startProcessingFlowByJobDoc(jobDoc, false); -assertions.push(test.assertEqual('INVALID-STATE-DEFINITION', error.name, 'no StartAt step')); +assertions.push( + test.assertEqual('failed', assertion.flowStatus, 'status check'), + test.assertEqual( + 'INVALID-STATE-DEFINITION', + assertion.errors['find-gender'].name, + 'no StartAt step' + ) +); //check for missing flow file jobDoc = xdmp.toJSON({ @@ -119,14 +119,12 @@ jobDoc = xdmp.toJSON({ provenance: [], }); -error = null; -try { - error = sc.startProcessingFlowByJobDoc(jobDoc, false); -} catch (e) { - error = e; -} +assertion = sc.startProcessingFlowByJobDoc(jobDoc, false); -assertions.push(test.assertEqual('XDMP-NODB', error.name, 'unKnown database content')); +assertions.push( + test.assertEqual('failed', assertion.flowStatus, 'status check'), + test.assertEqual('XDMP-NODB', assertion.errors['find-gender'].name, 'unknown database content') +); //unKnown database (module) jobDoc = xdmp.toJSON({ @@ -156,13 +154,11 @@ jobDoc = xdmp.toJSON({ provenance: [], }); -error = null; -try { - error = sc.startProcessingFlowByJobDoc(jobDoc, false); -} catch (e) { - error = e; -} +assertion = sc.startProcessingFlowByJobDoc(jobDoc, false); -assertions.push(test.assertEqual('XDMP-NODB', error.name, 'unKnown database both')); +assertions.push( + test.assertEqual('failed', assertion.flowStatus, 'status check'), + test.assertEqual('XDMP-NODB', assertion.errors['find-gender'].name, 'unknown database both') +); assertions; diff --git a/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/test-data/flows/test-flow.asl.json b/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/test-data/flows/test-flow.asl.json index 7be8153..75a4b74 100644 --- a/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/test-data/flows/test-flow.asl.json +++ b/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/test-data/flows/test-flow.asl.json @@ -1,30 +1,34 @@ -{ - "Comment": "does things", - "mlDomain": { - "context": [ - { - "scope": "directory", - "value": "/test/" - }, - { - "scope": "collection", - "value": "test" - } - ] - }, - "StartAt": "set-prop1", - "States": { - "set-prop1": { - "Type": "Task", - "Comment": "initial state of the flow", - "Resource": "/state-conductor/actions/custom/set-prop1.sjs", - "Next": "set-prop2" - }, - "set-prop2": { - "Type": "Task", - "End": true, - "Comment": "updates a property", - "Resource": "/state-conductor/actions/custom/set-prop2.sjs" - } - } -} \ No newline at end of file +{ + "Comment": "does things", + "mlDomain": { + "context": [ + { + "scope": "directory", + "value": "/test/" + }, + { + "scope": "collection", + "value": "test" + } + ] + }, + "StartAt": "set-prop1", + "States": { + "set-prop1": { + "Type": "Task", + "Comment": "initial state of the flow", + "Resource": "/state-conductor/actions/custom/set-prop1.sjs", + "Next": "set-prop2" + }, + "set-prop2": { + "Type": "Task", + "End": true, + "Comment": "updates a property", + "Resource": "/state-conductor/actions/custom/set-prop2.sjs" + }, + "failed": { + "Type": "Fail", + "Comment": "used to test application of failure status" + } + } +} diff --git a/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/timeWait-test.sjs b/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/timeWait-test.sjs index e64042b..42c9752 100644 --- a/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/timeWait-test.sjs +++ b/state-conductor-example/src/test/ml-modules/root/test/suites/StateConductorSuite/timeWait-test.sjs @@ -20,11 +20,19 @@ jobDoc = xdmp.toJSON({ errors: {}, }); +//"test excute job") //"test excute job") assertion = sc.executeStateByJobDoc(jobDoc, false); -assertions.push(test.assertEqual('waiting', assertion.flowStatus, 'waiting flowStatus')); assertions.push( - test.assertTrue(assertion.hasOwnProperty('currentlyWaiting'), 'waiting currentlyWaiting') + test.assertEqual('waiting', assertion.flowStatus, 'waiting flowStatus')); +assertions.push( + test.assertTrue(assertion.hasOwnProperty('currentlyWaiting')) +); +assertions.push( + test.assertTrue(assertion.currentlyWaiting.hasOwnProperty("nextTaskTime")) +); +assertions.push( + test.assertTrue(assertion.provenance[0].waiting.hasOwnProperty("doneNextTaskTime")) ); jobDoc = xdmp.toJSON({ diff --git a/state-conductor-modules/src/main/ml-modules/root/state-conductor/actions/common/dhf/dhf5MakeEnvelopeStep.sjs b/state-conductor-modules/src/main/ml-modules/root/state-conductor/actions/common/dhf/dhf5MakeEnvelopeStep.sjs index f44ab58..6fc2c27 100644 --- a/state-conductor-modules/src/main/ml-modules/root/state-conductor/actions/common/dhf/dhf5MakeEnvelopeStep.sjs +++ b/state-conductor-modules/src/main/ml-modules/root/state-conductor/actions/common/dhf/dhf5MakeEnvelopeStep.sjs @@ -1,3 +1,4 @@ +const sc = require('/state-conductor/state-conductor.sjs'); const dhf5Ingestion = require('/data-hub/5/builtins/steps/ingestion/default/main.sjs'); function performAction(uri, options = {}, context = {}) { @@ -9,13 +10,13 @@ function performAction(uri, options = {}, context = {}) { value: doc, }; - xdmp.log( + xdmp.trace( + sc.TRACE_EVENT, Sequence.from([ 'Execute DHF make envelope step:', - ' uri: ' + uri, - ' options: ' + options, - ]), - 'debug' + '- uri: ' + uri, + '- options: ' + options, + ]) ); // execute the dhf flow step diff --git a/state-conductor-modules/src/main/ml-modules/root/state-conductor/actions/common/dhf/dhf5RunFlowAction.sjs b/state-conductor-modules/src/main/ml-modules/root/state-conductor/actions/common/dhf/dhf5RunFlowAction.sjs index f6550dd..dff850c 100644 --- a/state-conductor-modules/src/main/ml-modules/root/state-conductor/actions/common/dhf/dhf5RunFlowAction.sjs +++ b/state-conductor-modules/src/main/ml-modules/root/state-conductor/actions/common/dhf/dhf5RunFlowAction.sjs @@ -1,3 +1,4 @@ +const sc = require('/state-conductor/state-conductor.sjs'); const DataHub = require('/data-hub/5/datahub.sjs'); const datahub = new DataHub(); @@ -15,14 +16,15 @@ function performAction(uri, options = {}, context = {}) { // get the steps for the given flow const numSteps = Object.keys(flow.steps).length; - xdmp.log( + xdmp.trace( + sc.TRACE_EVENT, Sequence.from([ 'Execute DHF flow:', - ' uri: ' + uri, - ' flowName: ' + flowName, - ' numSteps: ' + numSteps, - ' flowOptions: ' + flowOptions, - ' flowContext: ' + flowContext, + '- uri: ' + uri, + '- flowName: ' + flowName, + '- numSteps: ' + numSteps, + '- flowOptions: ' + flowOptions, + '- flowContext: ' + flowContext, ]) ); diff --git a/state-conductor-modules/src/main/ml-modules/root/state-conductor/actions/common/dhf/dhf5RunFlowStepAction.sjs b/state-conductor-modules/src/main/ml-modules/root/state-conductor/actions/common/dhf/dhf5RunFlowStepAction.sjs index 6758826..bea51c8 100644 --- a/state-conductor-modules/src/main/ml-modules/root/state-conductor/actions/common/dhf/dhf5RunFlowStepAction.sjs +++ b/state-conductor-modules/src/main/ml-modules/root/state-conductor/actions/common/dhf/dhf5RunFlowStepAction.sjs @@ -1,3 +1,4 @@ +const sc = require('/state-conductor/state-conductor.sjs'); const DataHub = require('/data-hub/5/datahub.sjs'); const datahub = new DataHub(); @@ -8,6 +9,8 @@ function performAction(uri, options = {}, context = {}) { const flowOptions = options.flowOptions || {}; const flowContext = options.flowContext || {}; + flowOptions.stateConductorContext = context; + // setup the dhf runFlow content const contentObjs = { uri: uri, @@ -15,14 +18,15 @@ function performAction(uri, options = {}, context = {}) { value: fn.head(xdmp.invokeFunction(() => cts.doc(uri))), }; - xdmp.log( + xdmp.trace( + sc.TRACE_EVENT, Sequence.from([ 'Execute DHF flow:', - ' uri: ' + uri, - ' flowName: ' + flowName, - ' step: ' + step, - ' flowOptions: ' + flowOptions, - ' flowContext: ' + flowContext, + '- uri: ' + uri, + '- flowName: ' + flowName, + '- step: ' + step, + '- flowOptions: ' + flowOptions, + '- flowContext: ' + flowContext, ]) ); diff --git a/state-conductor-modules/src/main/ml-modules/root/state-conductor/state-conductor.sjs b/state-conductor-modules/src/main/ml-modules/root/state-conductor/state-conductor.sjs index 01545e9..8fdc8cc 100644 --- a/state-conductor-modules/src/main/ml-modules/root/state-conductor/state-conductor.sjs +++ b/state-conductor-modules/src/main/ml-modules/root/state-conductor/state-conductor.sjs @@ -22,7 +22,7 @@ const FLOW_FILE_EXTENSION = '.asl.json'; const FLOW_JOBID_PROP_NAME = 'state-conductor-job'; const FLOW_STATUS_NEW = 'new'; const FLOW_STATUS_WORKING = 'working'; -const FLOW_STATUS_WATING = 'waiting'; +const FLOW_STATUS_WAITING = 'waiting'; const FLOW_STATUS_COMPLETE = 'complete'; const FLOW_STATUS_FAILED = 'failed'; const FLOW_NEW_STEP = 'NEW'; @@ -327,7 +327,7 @@ function processJob(uri) { executeStateByJobDoc(jobDoc); // continue processing return true; - } else if (FLOW_STATUS_WATING === status) { + } else if (FLOW_STATUS_WAITING === status) { // execute resume resumeWaitingJobByJobDoc(jobDoc, 'processJob'); // continue processing @@ -352,12 +352,16 @@ function startProcessingFlowByJobDoc(jobDoc, save = true) { const currFlowName = jobObj.flowName; const status = jobObj.flowStatus; - try { - // sanity check - if (FLOW_STATUS_NEW !== status) { - fn.error(null, 'INVALID-FLOW-STATUS', 'Cannot start a flow not in the NEW status'); - } + // sanity check + if (FLOW_STATUS_NEW !== status) { + xdmp.trace( + TRACE_EVENT, + `INVALID-FLOW-STATUS: Cannot start a flow that is not in the ${FLOW_STATUS_NEW} status` + ); + fn.error(null, 'INVALID-FLOW-STATUS', 'Cannot start a flow not in the NEW status'); + } + try { // grab the flow definition from the correct db const currFlow = getFlowDocumentFromDatabase(currFlowName, jobObj.database).toObject(); currFlow.flowName = jobObj.flowName; @@ -382,7 +386,7 @@ function startProcessingFlowByJobDoc(jobDoc, save = true) { xdmp.nodeReplace(jobDoc.root, jobObj); } } catch (err) { - handleError( + return handleError( err.name, `startProcessingFlowByJobDoc error for flow "${currFlowName}"`, err, @@ -425,16 +429,20 @@ function resumeWaitingJobByJobDoc(jobDoc, resumeBy, save = true) { xdmp.trace(TRACE_EVENT, `resumeWaitingJob flow "${flowName}"`); xdmp.trace(TRACE_EVENT, `resumeWaitingJob flow state "${stateName}"`); - try { - // sanity check - if (FLOW_STATUS_WATING !== flowStatus) { - return fn.error( - null, - 'INVALID-FLOW-STATUS', - 'Cannot resume a flow that is not in the ' + FLOW_STATUS_WATING + ' status' - ); - } + // sanity check + if (FLOW_STATUS_WAITING !== flowStatus) { + xdmp.trace( + TRACE_EVENT, + `INVALID-FLOW-STATUS: Cannot resume a flow that is not in the ${FLOW_STATUS_WAITING} status` + ); + return fn.error( + null, + 'INVALID-FLOW-STATUS', + 'Cannot resume a flow that is not in the ' + FLOW_STATUS_WAITING + ' status' + ); + } + try { flowObj = getFlowDocumentFromDatabase(flowName, jobObj.database).toObject(); try { @@ -447,7 +455,7 @@ function resumeWaitingJobByJobDoc(jobDoc, resumeBy, save = true) { ); } } catch (err) { - handleError( + return handleError( err.name, `resumeWaitingJobByJobDoc error for flow "${flowName}"`, err, @@ -501,29 +509,31 @@ function retryJobAtStateByJobDoc(jobDoc, stateName, retriedBy, save = true) { xdmp.trace(TRACE_EVENT, `retryJobAtStateByJobDoc flow "${flowName}"`); xdmp.trace(TRACE_EVENT, `retryJobAtStateByJobDoc flow state "${stateName}"`); - try { - // sanity check - if (FLOW_STATUS_FAILED !== flowStatus) { - return fn.error( - null, - 'INVALID-FLOW-STATUS', - 'Cannot try a flow that is not in the ' + FLOW_STATUS_FAILED + ' status' - ); - } + // sanity check + if (FLOW_STATUS_FAILED !== flowStatus) { + xdmp.trace( + TRACE_EVENT, + `INVALID-FLOW-STATUS: Cannot retry a flow that is not in the ${FLOW_STATUS_FAILED} status` + ); + return fn.error( + null, + 'INVALID-FLOW-STATUS', + 'Cannot try a flow that is not in the ' + FLOW_STATUS_FAILED + ' status' + ); + } + try { flowObj = getFlowDocumentFromDatabase(flowName, jobObj.database).toObject(); - - try { - state = flowObj.States[stateName]; - } catch (e) { - return fn.error( + state = flowObj.States[stateName]; + if (!state) { + fn.error( null, 'INVALID-STATE-DEFINITION', `Can't Find the state "${stateName}" in flow "${flowName}"` ); } } catch (err) { - handleError( + return handleError( err.name, `retryJobAtStateByJobDoc error for flow "${flowName}"`, err, @@ -571,13 +581,17 @@ function transition(jobDoc, jobObj, stateName, state, flowObj, save = true) { `executing transitions for state: ${stateName} with status of ${jobObj.flowStatus}` ); - if (jobObj.flowStatus === FLOW_STATUS_WATING) { + if (jobObj.flowStatus === FLOW_STATUS_WAITING) { xdmp.trace(TRACE_EVENT, `transition wait: ${stateName}`); + let pro = JSON.parse(JSON.stringify(jobObj.currentlyWaiting)); + pro['doneNextTaskTime'] = pro['nextTaskTime']; + delete pro['nextTaskTime']; + jobObj.provenance.push({ date: new Date().toISOString(), state: stateName, - waiting: jobObj.currentlyWaiting, + waiting: pro, }); } else if (!inTerminalState(jobObj, flowObj)) { xdmp.trace(TRACE_EVENT, `transition from non-terminal state: ${stateName}`); @@ -659,9 +673,14 @@ function transition(jobDoc, jobObj, stateName, state, flowObj, save = true) { } else { xdmp.trace(TRACE_EVENT, `transition complete: ${stateName}`); - // terminal states have no "Next" target state - jobObj.flowStatus = FLOW_STATUS_COMPLETE; // TODO if is a "Fail" state shouldn't we change to the "failed" status? + // determine the final status + if (STATE_FAIL === state.Type.toLowerCase()) { + jobObj.flowStatus = FLOW_STATUS_FAILED; + } else { + jobObj.flowStatus = FLOW_STATUS_COMPLETE; + } + // terminal states have no "Next" target state jobObj.provenance.push({ date: new Date().toISOString(), from: stateName, @@ -674,7 +693,7 @@ function transition(jobDoc, jobObj, stateName, state, flowObj, save = true) { xdmp.nodeReplace(jobDoc.root, jobObj); } } catch (err) { - handleError( + return handleError( 'TRANSITIONERROR', `transition error for state "${stateName}"`, err, @@ -704,29 +723,33 @@ function executeStateByJobDoc(jobDoc, save = true) { xdmp.trace(TRACE_EVENT, `executing flow "${flowName}"`); xdmp.trace(TRACE_EVENT, `flow state "${stateName}"`); - try { - // sanity check - if (FLOW_STATUS_WORKING !== jobObj.flowStatus) { - return fn.error( - null, - 'INVALID-FLOW-STATUS', - 'Cannot execute a flow that is not in the WORKING status' - ); - } + // sanity check + if (FLOW_STATUS_WORKING !== jobObj.flowStatus) { + xdmp.trace( + TRACE_EVENT, + 'INVALID-FLOW-STATUS: Cannot execute a flow that is not in the WORKING status' + ); + return fn.error( + null, + 'INVALID-FLOW-STATUS', + 'Cannot execute a flow that is not in the WORKING status' + ); + } + try { flowObj = getFlowDocumentFromDatabase(flowName, jobObj.database).toObject(); try { state = flowObj.States[stateName]; } catch (e) { - return fn.error( + fn.error( null, 'INVALID-STATE-DEFINITION', `Can't Find the state "${stateName}" in flow "${flowName}"` ); } } catch (err) { - handleError( + return handleError( err.name, `executeStateByJobDoc error for flow "${flowName}"`, err, @@ -785,7 +808,11 @@ function executeStateByJobDoc(jobDoc, save = true) { jobObj.context = result; } } - } else if (state.Type && state.Type.toLowerCase() === STATE_WAIT && (state.Event || state.EventPath )) { + } else if ( + state.Type && + state.Type.toLowerCase() === STATE_WAIT && + (state.Event || state.EventPath) + ) { //updated the job Doc to have info about why its waiting xdmp.trace(TRACE_EVENT, `waiting for state: ${stateName}`); @@ -793,14 +820,14 @@ function executeStateByJobDoc(jobDoc, save = true) { let eventToWaitFor; ///checks if there is EventPath use that over using Event - if (state.hasOwnProperty("EventPath")){ + if (state.hasOwnProperty('EventPath')) { eventToWaitFor = lib.materializeReferencePath(state.EventPath, jobObj.context); } else { eventToWaitFor = state.Event; } //makes sure there is an event set - if (eventToWaitFor == null || eventToWaitFor === "") { + if (eventToWaitFor == null || eventToWaitFor === '') { fn.error( null, 'INVALID-STATE-DEFINITION', @@ -812,8 +839,7 @@ function executeStateByJobDoc(jobDoc, save = true) { jobObj.currentlyWaiting = { event: eventToWaitFor, }; - jobObj.flowStatus = FLOW_STATUS_WATING; - + jobObj.flowStatus = FLOW_STATUS_WAITING; } else if (state.Type && state.Type.toLowerCase() === STATE_WAIT && state.Seconds) { //updated the job Doc to have info about why its waiting xdmp.trace(TRACE_EVENT, `waiting for state: ${stateName}`); @@ -830,7 +856,7 @@ function executeStateByJobDoc(jobDoc, save = true) { seconds: state.Seconds, nextTaskTime: nextTaskTime, }; - jobObj.flowStatus = FLOW_STATUS_WATING; + jobObj.flowStatus = FLOW_STATUS_WAITING; } else { fn.error( null, @@ -865,7 +891,7 @@ function executeStateByJobDoc(jobDoc, save = true) { ); } - jobObj.flowStatus = FLOW_STATUS_WATING; + jobObj.flowStatus = FLOW_STATUS_WAITING; } else { fn.error( null, @@ -879,7 +905,7 @@ function executeStateByJobDoc(jobDoc, save = true) { } return transition(jobDoc, jobObj, stateName, state, flowObj, save); } else { - handleError( + return handleError( 'INVALID-STATE-DEFINITION', Sequence.from([`state "${stateName}" not found in flow`]), null, @@ -901,6 +927,7 @@ function executeStateByJobDoc(jobDoc, save = true) { * @returns the action module's resposne */ function executeActionModule(modulePath, uri, params, context, { database, modules }) { + const startTime = xdmp.elapsedTime(); let resp = invokeOrApplyFunction( () => { declareUpdate(); @@ -920,6 +947,10 @@ function executeActionModule(modulePath, uri, params, context, { database, modul modules: modules ? modules : xdmp.modulesDatabase(), } ); + xdmp.trace( + TRACE_EVENT, + `Action module "${modulePath}" completed in ${xdmp.elapsedTime().subtract(startTime)}` + ); return fn.head(resp); } @@ -934,6 +965,7 @@ function executeActionModule(modulePath, uri, params, context, { database, modul * @returns boolean response of the module */ function executeConditionModule(modulePath, uri, params, context, { database, modules }) { + const startTime = xdmp.elapsedTime(); let resp = invokeOrApplyFunction( () => { const conditionModule = require(modulePath); @@ -956,6 +988,10 @@ function executeConditionModule(modulePath, uri, params, context, { database, mo modules: modules ? modules : xdmp.modulesDatabase(), } ); + xdmp.trace( + TRACE_EVENT, + `Condition module "${modulePath}" completed in ${xdmp.elapsedTime().subtract(startTime)}` + ); return fn.head(resp); } @@ -1073,9 +1109,16 @@ function inTerminalState(job, flow) { * @param {*} flowName * @returns */ -function getFlowCounts(flowName, { startDate, endDate }) { +function getFlowCounts(flowName, { startDate, endDate, detailed = false }) { const flow = getFlowDocument(flowName).toObject(); const states = Object.keys(flow.States); + const statuses = [ + FLOW_STATUS_NEW, + FLOW_STATUS_WORKING, + FLOW_STATUS_WAITING, + FLOW_STATUS_COMPLETE, + FLOW_STATUS_FAILED, + ]; let baseQuery = []; if (startDate) { @@ -1116,32 +1159,30 @@ function getFlowCounts(flowName, { startDate, endDate }) { ) ); - let numComplete = 0; - let numWorking = 0; - let numNew = 0; - let numFailed = 0; - const resp = { flowName: flowName, - totalComplete: numComplete, - totalWorking: numWorking, - totalFailed: numFailed, - totalNew: numNew, + totalPerStatus: {}, + totalPerState: {}, }; invokeOrApplyFunction( () => { - resp.totalComplete = numInStatus(FLOW_STATUS_COMPLETE); - resp.totalWorking = numInStatus(FLOW_STATUS_WORKING); - resp.totalNew = numInStatus(FLOW_STATUS_NEW); - resp.totalFailed = numInStatus(FLOW_STATUS_FAILED); - - [FLOW_STATUS_WORKING, FLOW_STATUS_COMPLETE].forEach((status) => { - resp[status] = {}; - states.forEach((state) => { - resp[status][state] = numInState(status, state); - }); + statuses.forEach((status) => (resp.totalPerStatus[status] = numInStatus(status))); + + states.forEach((state) => { + resp.totalPerState[state] = numInState(statuses, state); }); + + if (detailed) { + let details = {}; + statuses.forEach((status) => { + details[status] = {}; + states.forEach((state) => { + details[status][state] = numInState(status, state); + }); + }); + resp.detailedTotalPerStatus = details; + } }, { database: xdmp.database(STATE_CONDUCTOR_JOBS_DB), @@ -1268,7 +1309,7 @@ function emmitEvent(event, batchSize = 100, save = true) { null, cts.andQuery([ cts.collectionQuery(JOB_COLLECTION), - cts.jsonPropertyValueQuery('flowStatus', FLOW_STATUS_WATING), + cts.jsonPropertyValueQuery('flowStatus', FLOW_STATUS_WAITING), cts.jsonPropertyScopeQuery( 'currentlyWaiting', cts.jsonPropertyValueQuery('event', event) @@ -1301,43 +1342,43 @@ function emmitEvent(event, batchSize = 100, save = true) { }, { database: xdmp.database(STATE_CONDUCTOR_JOBS_DB), - }); + } + ); + + // handle flows + // grab all state conductor flows with a event context and matching event + const flows = cts + .search( + cts.andQuery([ + cts.collectionQuery(FLOW_COLLECTION), + cts.jsonPropertyScopeQuery('mlDomain', cts.jsonPropertyValueQuery('scope', 'event')), + cts.jsonPropertyScopeQuery('mlDomain', cts.jsonPropertyValueQuery('value', event)), + ]) + ) + .toArray(); + // determine which flows should run and create state conductor jobs + let flowsToTrigger = flows.filter((flow) => { + // find the flows where the event and scope are in the same object + let eventContext = flow.xpath("mlDomain/context[scope = 'event' and value = '" + event + "' ]"); - // handle flows - // grab all state conductor flows with a event context and matching event - const flows = cts - .search( - cts.andQuery([ - cts.collectionQuery(sc.FLOW_COLLECTION), - cts.jsonPropertyScopeQuery('mlDomain', cts.jsonPropertyValueQuery('scope', 'event')), - cts.jsonPropertyScopeQuery('mlDomain', cts.jsonPropertyValueQuery('value', event)) - ]) - ) - .toArray(); - - // determine which flows should run and create state conductor jobs - let flowsToTrigger = flows.filter((flow) => { - // find the flows where the event and scope are in the same object - let eventContext = flow.xpath("mlDomain/context[scope = 'event' and value = '"+ event + "' ]"); - - return fn.exists(eventContext); - }) - - let flowsToTriggerResp = flowsToTrigger.map((flow) => { - // create a state conductor job for the event flows - let flowName = sc.getFlowNameFromUri(fn.documentUri(flow)); - let resp = sc.createStateConductorJob(flowName, null); - return {flowName: flowName, JobId: resp} - xdmp.trace(sc.TRACE_EVENT, `created state conductor job for event flow: ${resp}`); - }); + return fn.exists(eventContext); + }); + + let flowsToTriggerResp = flowsToTrigger.map((flow) => { + // create a state conductor job for the event flows + let flowName = getFlowNameFromUri(fn.documentUri(flow)); + let resp = createStateConductorJob(flowName, null); + xdmp.trace(TRACE_EVENT, `created state conductor job for event flow: ${resp}`); + return { flowName: flowName, JobId: resp }; + }); - const output = { - jobDocumentsTriggered: fn.head(uris), - flowsTriggered: flowsToTriggerResp - }; + const output = { + jobDocumentsTriggered: fn.head(uris), + flowsTriggered: flowsToTriggerResp, + }; - return output + return output; } /** @@ -1427,9 +1468,8 @@ function handleError(name, message, err, jobDoc, jobObj, save = true) { xdmp.nodeReplace(jobDoc.root, jobObj); } - // trigger CPF error state - - fn.error(null, name, Sequence.from([message, err])); + // trigger CPF error state (intentionally commented out) + //fn.error(null, name, Sequence.from([message, err])); return jobObj; } @@ -1444,17 +1484,17 @@ module.exports = { FLOW_ITEM_COLLECTION, FLOW_STATUS_NEW, FLOW_STATUS_WORKING, - FLOW_STATUS_WATING, + FLOW_STATUS_WAITING, FLOW_STATUS_COMPLETE, FLOW_STATUS_FAILED, JOB_COLLECTION, JOB_DIRECTORY, - retryJobAtState, - retryJobAtStateByJobDoc, addJobMetadata, batchCreateStateConductorJob, checkFlowContext, createStateConductorJob, + emmitEvent, + executeStateByJobDoc, getAllFlowsContextQuery, getApplicableFlows, getFlowContextQuery, @@ -1465,13 +1505,13 @@ module.exports = { getFlowNameFromUri, getFlowNames, getInitialState, + getJobDocuments, getJobIds, + invokeOrApplyFunction, processJob, resumeWaitingJob, resumeWaitingJobByJobDoc, - executeStateByJobDoc, + retryJobAtState, + retryJobAtStateByJobDoc, startProcessingFlowByJobDoc, - emmitEvent, - getJobDocuments, - invokeOrApplyFunction, }; diff --git a/state-conductor-modules/src/main/ml-modules/root/state-conductor/tasks/waitTask.sjs b/state-conductor-modules/src/main/ml-modules/root/state-conductor/tasks/waitTask.sjs index 430924a..bb793c4 100644 --- a/state-conductor-modules/src/main/ml-modules/root/state-conductor/tasks/waitTask.sjs +++ b/state-conductor-modules/src/main/ml-modules/root/state-conductor/tasks/waitTask.sjs @@ -30,7 +30,7 @@ sc.invokeOrApplyFunction( } }, { - database: sc.STATE_CONDUCTOR_JOBS_DB, + database: xdmp.database(sc.STATE_CONDUCTOR_JOBS_DB), } ); diff --git a/state-conductor-modules/src/main/ml-modules/services/state-conductor-status.sjs b/state-conductor-modules/src/main/ml-modules/services/state-conductor-status.sjs index da74427..d2bd07f 100644 --- a/state-conductor-modules/src/main/ml-modules/services/state-conductor-status.sjs +++ b/state-conductor-modules/src/main/ml-modules/services/state-conductor-status.sjs @@ -34,11 +34,13 @@ function get(context, params) { const flowNames = params.flowName ? [params.flowName] : sc.getFlowNames(); const startDate = params.startDate; const endDate = params.endDate; + const detailed = params.detailed === 'true'; const resp = flowNames.reduce((acc, name) => { acc[name] = sc.getFlowCounts(name, { startDate: startDate, endDate: endDate, + detailed: detailed, }); return acc; }, {});