Skip to content

Commit

Permalink
MAINT-1521 Improve batch upload for Description changes
Browse files Browse the repository at this point in the history
  • Loading branch information
QuyenLy87 committed Dec 9, 2020
1 parent ad94c70 commit 96a7939
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.ihtsdo.otf.transformationandtemplate.service.client;

import org.ihtsdo.otf.rest.client.terminologyserver.pojo.CodeSystem;

import java.util.Map;

public class AuthoringProject {
private String key;
private String title;
private String branchPath;
private String branchState;
private Long branchHeadTimestamp;
private Long branchBaseTimestamp;
private Map<String, Object> metadata;
private CodeSystem codeSystem;


public String getKey() {
return key;
}

public String getTitle() {
return title;
}

public String getBranchPath() {
return branchPath;
}

public Long getBranchHeadTimestamp() {
return branchHeadTimestamp;
}

public void setBranchHeadTimestamp(Long branchHeadTimestamp) {
this.branchHeadTimestamp = branchHeadTimestamp;
}

public Long getBranchBaseTimestamp() {
return branchBaseTimestamp;
}

public void setBranchBaseTimestamp(Long branchBaseTimestamp) {
this.branchBaseTimestamp = branchBaseTimestamp;
}

public String getBranchState() {
return branchState;
}

public Map<String, Object> getMetadata() {
return metadata;
}

public void setMetadata(Map<String, Object> metadata) {
this.metadata = metadata;
}

public void setCodeSystem(CodeSystem codeSystem) {
this.codeSystem = codeSystem;
}

public CodeSystem getCodeSystem() {
return codeSystem;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ public AuthoringTask putTaskInProgress(String projectKey, String taskKey) {
return updateAuthoringTaskNotNullFieldsAreSet(task);
}

public AuthoringProject retrieveProject(String projectKey) {
return restClient.get()
.uri(uriBuilder -> uriBuilder.path("/projects/{projectKey}").build(projectKey))
.retrieve()
.bodyToMono(AuthoringProject.class)
.block();
}

public AuthoringTask updateAuthoringTaskNotNullFieldsAreSet(AuthoringTask task) {
return restClient.put()
.uri(uriBuilder -> uriBuilder.path("/projects/{projectKey}/tasks/{taskKey}").build(task.getProjectKey(), task.getKey()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class HighLevelAuthoringService {
private final Logger logger = LoggerFactory.getLogger(getClass());

private static final Comparator<DescriptionPojo> DESCRIPTION_WITHOUT_ID_COMPARATOR = Comparator.comparing(DescriptionPojo::getTerm).thenComparing(DescriptionPojo::getLang);
private static final Comparator<DescriptionPojo> DESCRIPTION_WITH_ID_COMPARATOR = Comparator.comparing(DescriptionPojo::getDescriptionId);
private static final Comparator<DescriptionPojo> DESCRIPTION_WITH_CONCEPT_ID_COMPARATOR = Comparator.comparing(DescriptionPojo::getTerm).thenComparing(DescriptionPojo::getLang).thenComparing(DescriptionPojo::getConceptId);
private static final Comparator<DescriptionReplacementPojo> DESCRIPTION_REPLACEMENT_WITH_CONCEPT_ID_COMPARATOR = Comparator.comparing(DescriptionReplacementPojo::getId).thenComparing(DescriptionReplacementPojo::getConceptId);
private static final Comparator<ConceptValidationResult> CONCEPT_VALIDATION_RESULT_COMPARATOR = Comparator.comparing(ConceptValidationResult::getSeverity);
Expand Down Expand Up @@ -201,21 +202,38 @@ public List<ChangeResult<? extends SnomedComponent>> updateDescriptions(
// Initial terminology server communication check
snowstormClient.getBranch("MAIN");

// Project branch path
String projectBranchPath = authoringServicesClient.retrieveProject(request.getProjectKey()).getBranchPath();

// retrieve all concepts before processing update
List<ConceptPojo> concepts = new ArrayList <>();
for (List<DescriptionPojo> descriptionProcessingBatch : Iterables.partition(descriptions, processingBatchMaxSize)) {
Set<String> descriptionIds = descriptionProcessingBatch.stream().map(DescriptionPojo::getDescriptionId).collect(Collectors.toSet());
concepts.addAll(snowstormClient.getFullConcepts(SnowstormClient.ConceptBulkLoadRequest.byDescriptionId(descriptionIds), projectBranchPath));
}

Set<String> descriptionsFound = new HashSet<>();
Map<String, ConceptPojo> conceptMap = concepts.stream().collect(Collectors.toMap(ConceptPojo::getConceptId, Function.identity()));
Map<String, DescriptionPojo> descriptionIdMap = descriptions.stream().collect(Collectors.toMap(DescriptionPojo::getDescriptionId, Function.identity()));
// Split into batches
int batchNumber = 0;
for (List<DescriptionPojo> descriptionTaskBatch : Iterables.partition(descriptions, request.getBatchSize())) {
for (List<ConceptPojo> conceptTaskBatch : Iterables.partition(conceptMap.values(), request.getBatchSize())) {
batchNumber++;
String branchPath = getBatchBranch(request, batchNumber);

// Split into smaller batches if the number of per branch changes exceeds the number of concepts which should be processed at a time.
for (List<DescriptionPojo> descriptionProcessingBatch : Iterables.partition(descriptionTaskBatch, processingBatchMaxSize)) {
List<ChangeResult<DescriptionPojo>> changesBatch = changes.stream()
.filter(descriptionPojoChangeResult -> descriptionProcessingBatch.contains(descriptionPojoChangeResult.getComponent()))
.collect(Collectors.toList());
updateDescriptionBatch(descriptionProcessingBatch, changesBatch, branchPath);
for (List<ConceptPojo> conceptProcessingBatch : Iterables.partition(conceptTaskBatch, processingBatchMaxSize)) {
descriptionsFound.addAll(updateDescriptionBatch(conceptProcessingBatch, descriptionIdMap, changes, branchPath));
}
}

// Fail all descriptions which were not found to update
for (String notFoundDescriptionId : difference(descriptionIdMap.keySet(), descriptionsFound)) {
getChangeResult(changes, descriptionIdMap.get(notFoundDescriptionId), DESCRIPTION_WITH_ID_COMPARATOR).fail("Description not found on the specified branch.");
}

// Mark all changes which have not failed as successful
changes.stream().filter(change -> change.getSuccess() == null).forEach(ChangeResult::success);
} catch (WebClientException | TimeoutException e) {// This RuntimeException is thrown by WebClient
logger.error("Failed to communicate with the terminology server.", e);
failAllRemaining(changes, "Failed to communicate with the terminology server.");
Expand Down Expand Up @@ -504,13 +522,8 @@ public List<ChangeResult<? extends SnomedComponent>> updateAxioms(
return new ArrayList<>(changes);
}

private void updateDescriptionBatch(List<DescriptionPojo> descriptionBatch, List<ChangeResult<DescriptionPojo>> changesBatch, String branchPath) throws BusinessServiceException, TimeoutException {
// Batch load concepts by description id
Set<String> descriptionIds = descriptionBatch.stream().map(DescriptionPojo::getDescriptionId).collect(Collectors.toSet());
List<ConceptPojo> concepts = snowstormClient.getFullConcepts(SnowstormClient.ConceptBulkLoadRequest.byDescriptionId(descriptionIds), branchPath);

private Set<String> updateDescriptionBatch(List<ConceptPojo> concepts, Map<String, DescriptionPojo> descriptionIdMap, List<ChangeResult<DescriptionPojo>> changes, String branchPath) throws BusinessServiceException, TimeoutException {
// Update existing descriptions
Map<String, DescriptionPojo> descriptionIdMap = descriptionBatch.stream().collect(Collectors.toMap(DescriptionPojo::getDescriptionId, Function.identity()));
Set<String> descriptionsFound = new HashSet<>();
for (ConceptPojo loadedConcept : concepts) {
for (DescriptionPojo loadedDescription : loadedConcept.getDescriptions()) {
Expand All @@ -535,17 +548,13 @@ private void updateDescriptionBatch(List<DescriptionPojo> descriptionBatch, List
}
}
}
// Fail all descriptions which were not found to update
for (String notFoundDescriptionId : difference(descriptionIdMap.keySet(), descriptionsFound)) {
getChangeResult(changesBatch, descriptionIdMap.get(notFoundDescriptionId), DESCRIPTION_WITHOUT_ID_COMPARATOR).fail("Description not found on the specified branch.");
}

if (!descriptionsFound.isEmpty()) {
Map<String, ConceptPojo> conceptMap = concepts.stream().collect(Collectors.toMap(ConceptPojo::getConceptId, Function.identity()));
bulkValidateThenUpdateConcepts(conceptMap, branchPath, changesBatch);
// Mark all changes which have not failed as successful
changesBatch.stream().filter(change -> change.getSuccess() == null).forEach(ChangeResult::success);
bulkValidateThenUpdateConcepts(conceptMap, branchPath, changes);
}

return descriptionsFound;
}

private void updateAxiomBatch(List<AxiomPojo> axiomBatch, List<ChangeResult<AxiomPojo>> changesBatch, String branchPath) throws BusinessServiceException, TimeoutException {
Expand Down

0 comments on commit 96a7939

Please sign in to comment.