Skip to content

Commit

Permalink
341: Fix Validation Rule Engine concurrency issues (#997)
Browse files Browse the repository at this point in the history
* Updated fhirpath to point to receiving facility id

* Add thread safety test and flag for rule loading

* Updated fhirpath

* Update RuleEngine.java

Double checked locking added on ensureRulesLoaded

* Updated receiver id fhirpath in tests and added new test to make sure we validate using the receiver id

* Update etor/src/test/groovy/gov/hhs/cdc/trustedintermediary/etor/ruleengine/RuleEngineTest.groovy

Co-authored-by: halprin <[email protected]>

---------

Co-authored-by: Luis Pabon <[email protected]>
Co-authored-by: halprin <[email protected]>
  • Loading branch information
3 people authored Apr 2, 2024
1 parent 8cc3fbc commit 40639da
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ public void unloadRules() {

public void ensureRulesLoaded() {
if (rules.isEmpty()) {
loadRules();
synchronized (this) {
if (rules.isEmpty()) {
loadRules();
}
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion etor/src/main/resources/rule_definitions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"violationMessage": "Message doesn't have required receiver id",
"conditions": [ ],
"validations": [
"Bundle.entry.resource.ofType(MessageHeader).destination.receiver.resolve().identifier.value.exists()"
"Bundle.entry.resource.ofType(MessageHeader).destination.receiver.resolve().identifier.where(system = 'urn:ietf:rfc:3986').value.exists()"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import gov.hhs.cdc.trustedintermediary.wrappers.HapiFhir
import gov.hhs.cdc.trustedintermediary.wrappers.Logger
import gov.hhs.cdc.trustedintermediary.wrappers.formatter.Formatter
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.Coding
import org.hl7.fhir.r4.model.MessageHeader
import org.hl7.fhir.r4.model.Organization
import org.hl7.fhir.r4.model.Reference
import spock.lang.Specification

import java.nio.file.Files
Expand Down Expand Up @@ -80,7 +84,7 @@ class RuleEngineIntegrationTest extends Specification {
where:
testFile | validation
"Orders/001_OML_O21_short.fhir" | "Bundle.entry.resource.ofType(MessageHeader).focus.resolve().category.exists()"
"Orders/003_AL_ORM_O01_NBS_Fully_Populated_1_hl7_translation.fhir" | "Bundle.entry.resource.ofType(MessageHeader).destination.receiver.resolve().identifier.value.exists()"
"Orders/003_AL_ORM_O01_NBS_Fully_Populated_1_hl7_translation.fhir" | "Bundle.entry.resource.ofType(MessageHeader).destination.receiver.resolve().identifier.where(system = 'urn:ietf:rfc:3986').value.exists()"
// Once we fix the mapping for ORM from story #900 and update the FHIR files in /examples/Test/Orders, we can uncomment the below line
// "Orders/003_AL_ORM_O01_NBS_Fully_Populated_1_hl7_translation.fhir" | "Bundle.entry.resource.ofType(Observation).where(code.coding.code = '57723-9').value.coding.code.exists()"
}
Expand All @@ -95,10 +99,78 @@ class RuleEngineIntegrationTest extends Specification {
where:
testFile | validation
"Orders/001_OML_O21_short.fhir" | "Bundle.entry.resource.ofType(MessageHeader).destination.receiver.resolve().identifier.value.exists()"
"Orders/001_OML_O21_short.fhir" | "Bundle.entry.resource.ofType(MessageHeader).destination.receiver.resolve().identifier.where(system = 'urn:ietf:rfc:3986').value.exists()"
"Orders/001_OML_O21_short.fhir" | "Bundle.entry.resource.ofType(Observation).where(code.coding.code = '57723-9').value.coding.code.exists()"
}
def "validation passes: Message has required receiver id"() {
given:
def fhirPathValidation = "Bundle.entry.resource.ofType(MessageHeader).destination.receiver.resolve().identifier.where(system = 'urn:ietf:rfc:3986').value.exists()"
def rule = createValidationRule([], [fhirPathValidation])
when:
Organization receiverOrganization = new Organization()
receiverOrganization.setId(UUID.randomUUID().toString())
receiverOrganization
.addIdentifier()
.setSystem("urn:ietf:rfc:3986")
.setValue("simulated-hospital-id")
def bundle = createMessageBundle(receiverOrganization: receiverOrganization)
// for some reason, we need to encode and decode the bundle for resolve() to work
def fhirResource = new HapiFhirResource(fhir.parseResource(fhir.encodeResourceToJson(bundle), Bundle))
then:
rule.isValid(fhirResource)
when:
receiverOrganization = new Organization()
receiverOrganization.setId(UUID.randomUUID().toString())
receiverOrganization
.addIdentifier()
.setSystem("another-system")
.setValue("simulated-hospital-id")
bundle = createMessageBundle(receiverOrganization: receiverOrganization)
fhirResource = new HapiFhirResource(fhir.parseResource(fhir.encodeResourceToJson(bundle), Bundle))
then:
!rule.isValid(fhirResource)
when:
receiverOrganization = new Organization()
receiverOrganization.setId(UUID.randomUUID().toString())
receiverOrganization
.addIdentifier()
.setValue("simulated-hospital-id")
bundle = createMessageBundle(receiverOrganization: receiverOrganization)
fhirResource = new HapiFhirResource(fhir.parseResource(fhir.encodeResourceToJson(bundle), Bundle))
then:
!rule.isValid(fhirResource)
}
Bundle createMessageBundle(Map params) {
String messageTypeCode = params.messageTypeCode as String ?: "ORM_O01"
Organization receiverOrganization = params.receiverOrganization as Organization ?: new Organization()
MessageHeader messageHeader = params.messageType as MessageHeader ?: new MessageHeader()
MessageHeader.MessageDestinationComponent destination = messageHeader.addDestination()
String receiverOrganizationFullUrl = "Organization/" + receiverOrganization.getId()
destination.setReceiver(new Reference(receiverOrganizationFullUrl))
Coding eventCoding = new Coding()
eventCoding.setSystem("http://terminology.hl7.org/CodeSystem/v2-0003")
String[] parts = messageTypeCode.split("_")
eventCoding.setCode(parts[1])
eventCoding.setDisplay(String.format("%s^%s^%s", parts[0], parts[1], messageTypeCode))
messageHeader.setEvent(eventCoding)

Bundle bundle = new Bundle()
bundle.setType(Bundle.BundleType.MESSAGE)
bundle.addEntry().setResource(messageHeader)
bundle.addEntry().setFullUrl(receiverOrganizationFullUrl).setResource(receiverOrganization)
return bundle
}

Rule createValidationRule(List<String> ruleConditions, List<String> ruleValidations) {
return new ValidationRule(
name: "Rule name",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,27 @@ class RuleEngineTest extends Specification {
1 * mockRuleLoader.loadRules(_ as String) >> [Mock(Rule)]
}

def "ensureRulesLoaded loads rules only once on multiple threads"() {
given:
def threadsNum = 10
def iterations = 4

when:
List<Thread> threads = []
(1..threadsNum).each { threadId ->
threads.add(new Thread({
for (int i = 0; i < iterations; i++) {
ruleEngine.ensureRulesLoaded()
}
}))
}
threads*.start()
threads*.join()

then:
1 * mockRuleLoader.loadRules(_ as String) >> [Mock(Rule)]
}

def "ensureRulesLoaded logs an error if there is an exception loading the rules"() {
given:
def exception = new RuleLoaderException("Error loading rules", new Exception())
Expand Down

0 comments on commit 40639da

Please sign in to comment.