Skip to content

Commit

Permalink
Add support for rollup instances level hints
Browse files Browse the repository at this point in the history
This allows survey versions to be rolled up to questions (and topics).
Currently there is only support for a single hierarchy but that
limitation exists elsewhere already.

This will require re-indexing once the config has been updated.
  • Loading branch information
Timothy Jennison committed Nov 27, 2024
1 parent 1745454 commit 2806e42
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import bio.terra.tanagra.underlay.Underlay;
import bio.terra.tanagra.underlay.entitymodel.Attribute;
import bio.terra.tanagra.underlay.entitymodel.Entity;
import bio.terra.tanagra.underlay.entitymodel.Hierarchy;
import bio.terra.tanagra.underlay.entitymodel.Relationship;
import bio.terra.tanagra.underlay.entitymodel.entitygroup.CriteriaOccurrence;
import bio.terra.tanagra.underlay.entitymodel.entitygroup.EntityGroup;
Expand Down Expand Up @@ -421,6 +422,12 @@ public static SequencedJobSet getJobSetForCriteriaOccurrence(
// TODO: Handle >1 occurrence entity.
Entity occurrenceEntity = criteriaOccurrence.getOccurrenceEntities().get(0);
if (criteriaOccurrence.hasInstanceLevelDisplayHints(occurrenceEntity)) {
// TODO: Handle >1 hierarchy.
Hierarchy hierarchy =
criteriaOccurrence.getCriteriaEntity().hasHierarchies()
? criteriaOccurrence.getCriteriaEntity().getHierarchies().get(0)
: null;

Relationship occurrenceCriteriaRelationship =
criteriaOccurrence.getOccurrenceCriteriaRelationship(occurrenceEntity.getName());
Relationship occurrencePrimaryRelationship =
Expand Down Expand Up @@ -458,7 +465,14 @@ public static SequencedJobSet getJobSetForCriteriaOccurrence(
.getInstanceLevelDisplayHints(
criteriaOccurrence.getName(),
occurrenceEntity.getName(),
criteriaOccurrence.getCriteriaEntity().getName())));
criteriaOccurrence.getCriteriaEntity().getName()),
hierarchy,
hierarchy != null
? underlay
.getIndexSchema()
.getHierarchyAncestorDescendant(
criteriaOccurrence.getCriteriaEntity().getName(), hierarchy.getName())
: null));
}

if (criteriaOccurrence.getCriteriaEntity().hasHierarchies()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,30 @@
import bio.terra.tanagra.api.shared.DataType;
import bio.terra.tanagra.indexing.job.BigQueryJob;
import bio.terra.tanagra.indexing.job.dataflow.beam.BigQueryBeamUtils;
import bio.terra.tanagra.indexing.job.dataflow.beam.CountUtils;
import bio.terra.tanagra.indexing.job.dataflow.beam.DataflowUtils;
import bio.terra.tanagra.query.sql.SqlField;
import bio.terra.tanagra.query.sql.SqlQueryField;
import bio.terra.tanagra.underlay.entitymodel.Attribute;
import bio.terra.tanagra.underlay.entitymodel.Entity;
import bio.terra.tanagra.underlay.entitymodel.Hierarchy;
import bio.terra.tanagra.underlay.entitymodel.Relationship;
import bio.terra.tanagra.underlay.entitymodel.entitygroup.CriteriaOccurrence;
import bio.terra.tanagra.underlay.indextable.ITEntityMain;
import bio.terra.tanagra.underlay.indextable.ITHierarchyAncestorDescendant;
import bio.terra.tanagra.underlay.indextable.ITInstanceLevelDisplayHints;
import bio.terra.tanagra.underlay.indextable.ITRelationshipIdPairs;
import bio.terra.tanagra.underlay.serialization.SZIndexer;
import com.google.api.services.bigquery.model.TableRow;
import jakarta.annotation.Nullable;
import java.io.Serializable;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
Expand All @@ -48,6 +53,8 @@ public class WriteInstanceLevelDisplayHints extends BigQueryJob {
private final @Nullable ITRelationshipIdPairs occurrenceCriteriaRelationshipIdPairsTable;
private final @Nullable ITRelationshipIdPairs occurrencePrimaryRelationshipIdPairsTable;
private final ITInstanceLevelDisplayHints indexTable;
private final @Nullable Hierarchy hierarchy;
private final @Nullable ITHierarchyAncestorDescendant ancestorDescendantTable;

@SuppressWarnings("checkstyle:ParameterNumber")
public WriteInstanceLevelDisplayHints(
Expand All @@ -59,7 +66,9 @@ public WriteInstanceLevelDisplayHints(
ITEntityMain primaryEntityIndexTable,
@Nullable ITRelationshipIdPairs occurrenceCriteriaRelationshipIdPairsTable,
@Nullable ITRelationshipIdPairs occurrencePrimaryRelationshipIdPairsTable,
ITInstanceLevelDisplayHints indexTable) {
ITInstanceLevelDisplayHints indexTable,
@Nullable Hierarchy hierarchy,
@Nullable ITHierarchyAncestorDescendant ancestorDescendantTable) {
super(indexerConfig);
this.criteriaOccurrence = criteriaOccurrence;
this.occurrenceEntity = occurrenceEntity;
Expand All @@ -69,6 +78,8 @@ public WriteInstanceLevelDisplayHints(
this.occurrenceCriteriaRelationshipIdPairsTable = occurrenceCriteriaRelationshipIdPairsTable;
this.occurrencePrimaryRelationshipIdPairsTable = occurrencePrimaryRelationshipIdPairsTable;
this.indexTable = indexTable;
this.hierarchy = hierarchy;
this.ancestorDescendantTable = ancestorDescendantTable;
}

@Override
Expand Down Expand Up @@ -119,8 +130,8 @@ public void run(boolean isDryRun) {
readInRelationshipIdPairs(
pipeline, occCriIdPairsSql, entityAIdColumnName, entityBIdColumnName);

// Build a query to select all occurrence-criteria id pairs, and the pipeline steps to read the
// results and build a (occurrence id, criteria id) KV PCollection.
// Build a query to select all occurrence-primary id pairs, and the pipeline steps to read the
// results and build a (occurrence id, primary id) KV PCollection.
String occPriIdPairsSql =
getQueryRelationshipIdPairs(
entityAIdColumnName,
Expand All @@ -134,17 +145,32 @@ public void run(boolean isDryRun) {
readInRelationshipIdPairs(
pipeline, occPriIdPairsSql, entityAIdColumnName, entityBIdColumnName);

PCollection<KV<Long, Long>> rollupOccCriIdPairKVs = null;
if (hierarchy != null
&& criteriaOccurrence.hasRollupInstanceLevelDisplayHints(occurrenceEntity)) {
PCollection<KV<Long, Long>> ancestorDescendantRelationshipsPC =
BigQueryBeamUtils.readAncestorDescendentRelationshipsFromBQ(
pipeline, ancestorDescendantTable);

// Expand the set of occurrences to include a repeat for each ancestor.
rollupOccCriIdPairKVs =
CountUtils.repeatOccurrencesForHints(occCriIdPairKVs, ancestorDescendantRelationshipsPC);
}
final PCollection<KV<Long, Long>> finalRollupOccCriIdPairKVs = rollupOccCriIdPairKVs;

criteriaOccurrence
.getAttributesWithInstanceLevelDisplayHints(occurrenceEntity)
.forEach(
attribute -> {
(attribute, rollup) -> {
PCollection<KV<Long, Long>> idPairsKVs =
rollup ? finalRollupOccCriIdPairKVs : occCriIdPairKVs;
if (attribute.isValueDisplay()) {
LOGGER.info("enum val hint: {}", attribute.getName());
enumValHint(occCriIdPairKVs, occPriIdPairKVs, occIdRowKVs, attribute);
enumValHint(idPairsKVs, occPriIdPairKVs, occIdRowKVs, attribute);
} else if (DataType.INT64.equals(attribute.getDataType())
|| DataType.DOUBLE.equals(attribute.getDataType())) {
LOGGER.info("numeric range hint: {}", attribute.getName());
numericRangeHint(occCriIdPairKVs, occIdRowKVs, attribute);
numericRangeHint(idPairsKVs, occIdRowKVs, attribute);
} // TODO: Calculate display hints for other data types.
});

Expand Down Expand Up @@ -287,13 +313,15 @@ private void numericRangeHint(
occIdAndNumValCriId
.apply(Filter.by(cogb -> cogb.getValue().getAll(numValTag).iterator().hasNext()))
.apply(
MapElements.into(
FlatMapElements.into(
TypeDescriptors.kvs(TypeDescriptors.longs(), TypeDescriptors.doubles()))
.via(
cogb ->
KV.of(
cogb.getValue().getOnly(criIdTag),
cogb.getValue().getOnly(numValTag))));
cogb -> {
Iterable<Long> criIds = cogb.getValue().getAll(criIdTag);
return StreamSupport.stream(criIds.spliterator(), false)
.map((Long criId) -> KV.of(criId, cogb.getValue().getOnly(numValTag)))
.toList();
}));

// Compute numeric range for each criteriaId.
PCollection<IdNumericRange> numericRanges = numericRangeHint(criteriaValuePairs);
Expand Down Expand Up @@ -361,23 +389,28 @@ private void enumValHint(
.and(criIdTag, occCriIdPairs)
.and(priIdTag, occPriIdPairs)
.apply(CoGroupByKey.create());

PCollection<KV<IdEnumValue, Long>> criteriaEnumPrimaryPairs =
occIdAndAttrsCriIdPriId
.apply(Filter.by(cogb -> cogb.getValue().getAll(occAttrsTag).iterator().hasNext()))
.apply(
MapElements.into(
FlatMapElements.into(
TypeDescriptors.kvs(
new TypeDescriptor<IdEnumValue>() {}, TypeDescriptors.longs()))
.via(
cogb -> {
Long criId = cogb.getValue().getOnly(criIdTag);
Iterable<Long> criIds = cogb.getValue().getAll(criIdTag);
Long priId = cogb.getValue().getOnly(priIdTag);

TableRow occAttrs = cogb.getValue().getOnly(occAttrsTag);
String enumValue = (String) occAttrs.get(enumValColName);
String enumDisplay = (String) occAttrs.get(enumDisplayColName);

return KV.of(new IdEnumValue(criId, enumValue, enumDisplay), priId);
return StreamSupport.stream(criIds.spliterator(), false)
.map(
(Long criId) ->
KV.of(new IdEnumValue(criId, enumValue, enumDisplay), priId))
.toList();
}));

// Compute enum values and counts for each criteriaId.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import bio.terra.tanagra.api.shared.DataType;
import bio.terra.tanagra.exception.SystemException;
import bio.terra.tanagra.underlay.ColumnSchema;
import bio.terra.tanagra.underlay.indextable.ITHierarchyAncestorDescendant;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
Expand All @@ -20,8 +21,11 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.text.StringSubstitutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class BigQueryBeamUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryBeamUtils.class);

private BigQueryBeamUtils() {}

Expand Down Expand Up @@ -65,6 +69,22 @@ public static PCollection<KV<Long, Long>> readTwoFieldRowsFromBQ(
}));
}

/**
* Build a query to select all ancestor-descendant pairs from the ancestor-descendant table, and
* the pipeline step to read the results.
*/
public static PCollection<KV<Long, Long>> readAncestorDescendentRelationshipsFromBQ(
Pipeline pipeline, ITHierarchyAncestorDescendant ancestorDescendantTable) {
String ancestorDescendantSql =
"SELECT * FROM " + ancestorDescendantTable.getTablePointer().render();
LOGGER.info("ancestor-descendant query: {}", ancestorDescendantSql);
return BigQueryBeamUtils.readTwoFieldRowsFromBQ(
pipeline,
ancestorDescendantSql,
ITHierarchyAncestorDescendant.Column.DESCENDANT.getSchema().getColumnName(),
ITHierarchyAncestorDescendant.Column.ANCESTOR.getSchema().getColumnName());
}

public static String getTableSqlPath(String projectId, String datasetId, String tableName) {
final String template = "${projectId}:${datasetId}.${tableName}";
Map<String, String> params =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.KvSwap;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
Expand Down Expand Up @@ -136,4 +137,53 @@ public static PCollection<KV<Long, Long>> repeatOccurrencesForHierarchy(
.and(ancestorOccurrences)
.apply(Flatten.pCollections());
}

/**
* For each occurrence (occurrence, criteria), generate a new occurrence for each ancestor of the
* criteria node (occurrence, ancestor).
*
* <p>This is the same concept as repeatOccurrencesForHierarchy but over occurrence ids.
*
* @param occurrences a collection of all occurrences that we want to count and the criteria
* they're associated with
* @param descendantAncestor a collection of (descendant, ancestor) pairs for the criteria nodes
* that we want a count for. note that this is the expanded set of all transitive
* relationships in the hierarchy, not just the parent/child pairs
* @return an expanded collection of occurrences (occurrence, ancestor), where each occurrence has
* been repeated for each ancestor of its primary node. note for later steps that this will
* contain multiple keys
*/
public static PCollection<KV<Long, Long>> repeatOccurrencesForHints(
PCollection<KV<Long, Long>> occurrences, PCollection<KV<Long, Long>> descendantAncestor) {
// Remove duplicate occurrences.
PCollection<KV<Long, Long>> distinctOccurrences =
occurrences.apply(
"remove duplicate occurrences before repeating for hints", Distinct.create());

// Swap (occurrence, criteria) to (criteria, occurrence). Duplicate keys are allowed at this
// point.
PCollection<KV<Long, Long>> criteriaOccurrences =
distinctOccurrences.apply(
"swap (occurrence, criteria) to (criteria, occurrence)", KvSwap.create());

// JOIN: distinctOccurrences (criteria, occurrence) INNER JOIN descendantAncestor (descendant,
// ancestor)
// ON criteria=descendant
// RESULT: occurrenceToAncestorAndOccurrence (criteria=descendant, (occurrence, ancestor))
PCollection<KV<Long, KV<Long, Long>>> criteriaToOccurrenceAndAncestor =
Join.innerJoin(
"inner join occurrences with ancestors", criteriaOccurrences, descendantAncestor);

// Get rid of the descendant node. That was only needed as the innerJoin field.
// RESULT: (occurrence, ancestor)
PCollection<KV<Long, Long>> occurrenceAncestors =
criteriaToOccurrenceAndAncestor.apply(Values.create());

// The descendant-ancestor pairs don't include a self-reference row (i.e. descendant=ancestor).
// So to get the full set of occurrences, concatenate the original occurrences with the ancestor
// duplicates.
return PCollectionList.of(distinctOccurrences)
.and(occurrenceAncestors)
.apply(Flatten.pCollections());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,16 @@ private SZCriteriaOccurrence deserializeCriteriaOccurrence(String criteriaOccurr
? new HashSet<>()
: szCriteriaOccurrence.occurrenceEntities;
szCriteriaOccurrence.occurrenceEntities.forEach(
szOccurrenceEntity ->
szOccurrenceEntity.attributesWithInstanceLevelHints =
szOccurrenceEntity.attributesWithInstanceLevelHints == null
? new HashSet<>()
: szOccurrenceEntity.attributesWithInstanceLevelHints);
szOccurrenceEntity -> {
szOccurrenceEntity.attributesWithInstanceLevelHints =
szOccurrenceEntity.attributesWithInstanceLevelHints == null
? new HashSet<>()
: szOccurrenceEntity.attributesWithInstanceLevelHints;
szOccurrenceEntity.attributesWithRollupInstanceLevelHints =
szOccurrenceEntity.attributesWithRollupInstanceLevelHints == null
? new HashSet<>()
: szOccurrenceEntity.attributesWithRollupInstanceLevelHints;
});

return szCriteriaOccurrence;
} catch (IOException ioEx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ private static CriteriaOccurrence fromConfigCriteriaOccurrence(
Map<String, Relationship> occurrenceCriteriaRelationships = new HashMap<>();
Map<String, Relationship> occurrencePrimaryRelationships = new HashMap<>();
Map<String, Set<String>> occurrenceAttributesWithInstanceLevelHints = new HashMap<>();
Map<String, Set<String>> occurrenceAttributesWithRollupInstanceLevelHints = new HashMap<>();
szCriteriaOccurrence.occurrenceEntities.forEach(
szOccurrenceEntity -> {
// Get the occurrence entity.
Expand Down Expand Up @@ -518,6 +519,9 @@ private static CriteriaOccurrence fromConfigCriteriaOccurrence(
// Get the attributes with instance-level hints.
occurrenceAttributesWithInstanceLevelHints.put(
occurrenceEntity.getName(), szOccurrenceEntity.attributesWithInstanceLevelHints);
occurrenceAttributesWithRollupInstanceLevelHints.put(
occurrenceEntity.getName(),
szOccurrenceEntity.attributesWithRollupInstanceLevelHints);
});

// Build the primary-criteria relationship.
Expand All @@ -532,7 +536,8 @@ private static CriteriaOccurrence fromConfigCriteriaOccurrence(
occurrenceCriteriaRelationships,
occurrencePrimaryRelationships,
primaryCriteriaRelationship,
occurrenceAttributesWithInstanceLevelHints);
occurrenceAttributesWithInstanceLevelHints,
occurrenceAttributesWithRollupInstanceLevelHints);
}

@VisibleForTesting
Expand Down
Loading

0 comments on commit 2806e42

Please sign in to comment.